🚧 RocketMQ集成

This commit is contained in:
cuianbing
2026-01-22 09:11:39 +08:00
parent 512a23f241
commit b8e8212edd
8 changed files with 267 additions and 0 deletions

63
pom.xml
View File

@@ -14,13 +14,22 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>3.5.9</spring-boot.version> <spring-boot.version>3.5.9</spring-boot.version>
<rockermq.version>2.2.3</rockermq.version>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<!-- 排除SpringBoot默认的日志依赖 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
@@ -35,12 +44,66 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- 引入Log4j2依赖包含SLF4J桥接 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- RocketMQ相关依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rockermq.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<dependency> <dependency>

View File

@@ -1,12 +1,17 @@
package com.yongfeng.lg.springboot3jdk17; package com.yongfeng.lg.springboot3jdk17;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
@SpringBootApplication @SpringBootApplication
@Import(RocketMQAutoConfiguration.class)
public class Application { public class Application {
public static void main(String[] args) { public static void main(String[] args) {
// 配置RocketMQ使用SLf4j日志框架
System.setProperty("rocketmq.client.logUseSlf4j", "true");
SpringApplication.run(Application.class, args); SpringApplication.run(Application.class, args);
} }

View File

@@ -0,0 +1,23 @@
package com.yongfeng.lg.springboot3jdk17.controller;
import com.yongfeng.lg.springboot3jdk17.mq.producer.RocketMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test/mq")
public class RocketMQMessageTestController {
@Autowired
private RocketMQProducer rocketMQProducer;
@GetMapping("/send-message")
public String sendMessage(@RequestParam("msg") String msg) {
rocketMQProducer.sendMessage("cuianbing-test-topic", msg);
return "发送成功";
}
}

View File

@@ -0,0 +1,25 @@
package com.yongfeng.lg.springboot3jdk17.mq.consumer;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "cuianbing-test-topic",
consumerGroup = "cuianbing-test-topic-group",
selectorType = SelectorType.TAG,
selectorExpression = "*",
messageModel = MessageModel.CLUSTERING, //CLUSTERING集群消费多个实例负载均衡 BROADCASTING 广播消费模式,每个实例都全量消费
consumeMode = ConsumeMode.CONCURRENTLY, // CONCURRENTLY 并发消费 ORDERLY顺序消费消息。一个队列一个线程
consumeThreadMax = 64 , // 注意这个参数已废弃由下面的consumeThreadNumber替代
consumeThreadNumber = 20 // 消费者的最大线程数配置。 默认值是 20
)
public class RocketMQTestConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息内容为:" + message);
}
}

View File

@@ -0,0 +1,47 @@
package com.yongfeng.lg.springboot3jdk17.mq.consumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.net.SocketAddress;
@Component
@RocketMQMessageListener(topic = "cuianbing-test-topic",
consumerGroup = "cuianbing-test-topic-group2",
selectorType = SelectorType.TAG,
selectorExpression = "*",
messageModel = MessageModel.CLUSTERING, //CLUSTERING集群消费多个实例负载均衡 BROADCASTING 广播消费模式,每个实例都全量消费
consumeMode = ConsumeMode.CONCURRENTLY, // CONCURRENTLY 并发消费 ORDERLY顺序消费消息。一个队列一个线程
consumeThreadMax = 64 , // 注意这个参数已废弃由下面的consumeThreadNumber替代
consumeThreadNumber = 20 // 消费者的最大线程数配置。 默认值是 20
)
/**
* 使用MessageExt原生对象可以获取更多的信息
*/
public class RocketMQTestMstConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
// String brokerName;
// int queueId;
// int storeSize;
// long queueOffset;
// int sysFlag;
// long bornTimestamp;
// SocketAddress bornHost;
// long storeTimestamp;
// SocketAddress storeHost;
// String msgId;
// long commitLogOffset;
// int bodyCRC;
// int reconsumeTimes;
// long preparedTransactionOffset;
System.out.println("消息接收成功,消息的内容为:" +
new String(message.getBody()) +
"\n消息对象为" + message);
}
}

View File

@@ -0,0 +1,18 @@
package com.yongfeng.lg.springboot3jdk17.mq.producer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送简单消息
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
System.out.println("消息发送成功,发送内容为: " + message);
}
}

View File

@@ -1,3 +1,10 @@
# 端口号 # 端口号
server.port=8080 server.port=8080
rocketmq.name-server=10.232.12.251:9876;10.232.112.143:9876
rocketmq.producer.group=springboot3-jdk17--producer-group
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.producer.retry-next-server=true
rocketmq.consumer.group=springboot-consumer-group
rocketmq.consumer.topic=test-topic

View File

@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- statusLog4j2自身的日志级别debug可排查配置问题monitorInterval自动刷新配置 -->
<Configuration status="WARN" monitorInterval="30">
<!-- 全局参数:定义日志存储路径、格式等 -->
<Properties>
<!-- 日志输出目录SpringBoot会自动识别classpath、user.dir等 -->
<Property name="LOG_PATH">./logs</Property>
<!-- 日志格式:时间 | 线程 | 级别 | 类名 | 日志内容 -->
<Property name="PATTERN">%d{yyyy-MM-dd HH:mm:ss.SSS} | %t | %-5level | %logger{50} | %msg%n</Property>
</Properties>
<!-- 输出源Appender定义日志输出到哪里、怎么输出 -->
<Appenders>
<!-- 1. 控制台输出 -->
<Console name="Console" target="SYSTEM_OUT">
<!-- 日志格式编码 -->
<PatternLayout pattern="${PATTERN}" charset="UTF-8"/>
<!-- 过滤器只输出INFO及以上级别 -->
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
</Console>
<!-- 2. 所有日志文件输出(按天滚动) -->
<RollingFile name="AllFile" fileName="${LOG_PATH}/all.log"
filePattern="${LOG_PATH}/all-%d{yyyy-MM-dd}.log">
<PatternLayout pattern="${PATTERN}" charset="UTF-8"/>
<!-- 滚动策略按天滚动保留30天单个文件最大100MB -->
<Policies>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
<DefaultRolloverStrategy max="30"/>
</RollingFile>
<!-- 3. 错误日志单独输出(按天滚动) -->
<RollingFile name="ErrorFile" fileName="${LOG_PATH}/error.log"
filePattern="${LOG_PATH}/error-%d{yyyy-MM-dd}.log">
<PatternLayout pattern="${PATTERN}" charset="UTF-8"/>
<!-- 只输出ERROR及以上级别 -->
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
<SizeBasedTriggeringPolicy size="50MB"/>
</Policies>
<DefaultRolloverStrategy max="30"/>
</RollingFile>
</Appenders>
<!-- 日志记录器Logger定义不同包/类的日志级别 -->
<Loggers>
<!-- 自定义包日志级别例如你的业务代码包DEBUG级别 -->
<Logger name="com.yongfeng.lg.springboot3jdk17" level="DEBUG" additivity="false">
<AppenderRef ref="Console"/>
<!-- <AppenderRef ref="AllFile"/>-->
<!-- <AppenderRef ref="ErrorFile"/>-->
</Logger>
<Logger name="io.netty.util.concurrent.GlobalEventExecutor" level="DEBUG" additivity="false">
<AppenderRef ref="Console"/>
<!-- <AppenderRef ref="AllFile"/>-->
<!-- <AppenderRef ref="ErrorFile"/>-->
</Logger>
<!-- 第三方框架日志级别例如Spring、MyBatis避免日志过多 -->
<Logger name="org.springframework" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
<!-- <AppenderRef ref="AllFile"/>-->
</Logger>
<Logger name="com.baomidou.mybatisplus" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
<!-- <AppenderRef ref="AllFile"/>-->
</Logger>
<!-- 根日志:默认级别,所有未单独配置的日志都走这里 -->
<Root level="INFO">
<AppenderRef ref="Console"/>
<!-- <AppenderRef ref="AllFile"/>-->
<!-- <AppenderRef ref="ErrorFile"/>-->
</Root>
</Loggers>
</Configuration>