From b8e8212edd350de014d3a77c0bbacd19e3adc251 Mon Sep 17 00:00:00 2001 From: cuianbing Date: Thu, 22 Jan 2026 09:11:39 +0800 Subject: [PATCH] =?UTF-8?q?:construction:=20=20RocketMQ=E9=9B=86=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 63 +++++++++++++++ .../lg/springboot3jdk17/Application.java | 5 ++ .../RocketMQMessageTestController.java | 23 ++++++ .../mq/consumer/RocketMQTestConsumer.java | 25 ++++++ .../mq/consumer/RocketMQTestMstConsumer.java | 47 +++++++++++ .../mq/producer/RocketMQProducer.java | 18 +++++ src/main/resources/application.properties | 7 ++ src/main/resources/log4j2.xml | 79 +++++++++++++++++++ 8 files changed, 267 insertions(+) create mode 100644 src/main/java/com/yongfeng/lg/springboot3jdk17/controller/RocketMQMessageTestController.java create mode 100644 src/main/java/com/yongfeng/lg/springboot3jdk17/mq/consumer/RocketMQTestConsumer.java create mode 100644 src/main/java/com/yongfeng/lg/springboot3jdk17/mq/consumer/RocketMQTestMstConsumer.java create mode 100644 src/main/java/com/yongfeng/lg/springboot3jdk17/mq/producer/RocketMQProducer.java create mode 100644 src/main/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index 1ce04fd..cf221e4 100644 --- a/pom.xml +++ b/pom.xml @@ -14,13 +14,22 @@ UTF-8 UTF-8 3.5.9 + 2.2.3 + org.springframework.boot spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-logging + + @@ -35,12 +44,66 @@ lombok true + org.springframework.boot spring-boot-starter-test test + + + + org.springframework.boot + spring-boot-starter-log4j2 + + + + com.fasterxml.jackson.core + jackson-databind + + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + ${rockermq.version} + + + org.apache.rocketmq + rocketmq-client + + + org.slf4j + slf4j-api + + + + + + org.apache.rocketmq + rocketmq-client + 4.7.1 + + + org.slf4j + slf4j-api + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + + diff --git a/src/main/java/com/yongfeng/lg/springboot3jdk17/Application.java b/src/main/java/com/yongfeng/lg/springboot3jdk17/Application.java index 42e6058..a78db29 100644 --- a/src/main/java/com/yongfeng/lg/springboot3jdk17/Application.java +++ b/src/main/java/com/yongfeng/lg/springboot3jdk17/Application.java @@ -1,12 +1,17 @@ package com.yongfeng.lg.springboot3jdk17; +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Import; @SpringBootApplication +@Import(RocketMQAutoConfiguration.class) public class Application { public static void main(String[] args) { + // 配置RocketMQ使用SLf4j日志框架 + System.setProperty("rocketmq.client.logUseSlf4j", "true"); SpringApplication.run(Application.class, args); } diff --git a/src/main/java/com/yongfeng/lg/springboot3jdk17/controller/RocketMQMessageTestController.java b/src/main/java/com/yongfeng/lg/springboot3jdk17/controller/RocketMQMessageTestController.java new file mode 100644 index 0000000..54e2331 --- /dev/null +++ b/src/main/java/com/yongfeng/lg/springboot3jdk17/controller/RocketMQMessageTestController.java @@ -0,0 +1,23 @@ +package com.yongfeng.lg.springboot3jdk17.controller; + + +import com.yongfeng.lg.springboot3jdk17.mq.producer.RocketMQProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/test/mq") +public class RocketMQMessageTestController { + + @Autowired + private RocketMQProducer rocketMQProducer; + + @GetMapping("/send-message") + public String sendMessage(@RequestParam("msg") String msg) { + rocketMQProducer.sendMessage("cuianbing-test-topic", msg); + return "发送成功"; + } +} diff --git a/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/consumer/RocketMQTestConsumer.java b/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/consumer/RocketMQTestConsumer.java new file mode 100644 index 0000000..f83cf9e --- /dev/null +++ b/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/consumer/RocketMQTestConsumer.java @@ -0,0 +1,25 @@ +package com.yongfeng.lg.springboot3jdk17.mq.consumer; + +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +@Component +@RocketMQMessageListener(topic = "cuianbing-test-topic", + consumerGroup = "cuianbing-test-topic-group", + selectorType = SelectorType.TAG, + selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, //CLUSTERING集群消费,多个实例负载均衡 BROADCASTING 广播消费模式,每个实例都全量消费 + consumeMode = ConsumeMode.CONCURRENTLY, // CONCURRENTLY 并发消费 ORDERLY顺序消费消息。一个队列,一个线程 + consumeThreadMax = 64 , // 注意,这个参数已废弃,由下面的consumeThreadNumber替代 + consumeThreadNumber = 20 // 消费者的最大线程数配置。 默认值是 20 +) +public class RocketMQTestConsumer implements RocketMQListener { + @Override + public void onMessage(String message) { + System.out.println("收到消息内容为:" + message); + } +} diff --git a/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/consumer/RocketMQTestMstConsumer.java b/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/consumer/RocketMQTestMstConsumer.java new file mode 100644 index 0000000..4b68134 --- /dev/null +++ b/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/consumer/RocketMQTestMstConsumer.java @@ -0,0 +1,47 @@ +package com.yongfeng.lg.springboot3jdk17.mq.consumer; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import java.net.SocketAddress; + +@Component +@RocketMQMessageListener(topic = "cuianbing-test-topic", + consumerGroup = "cuianbing-test-topic-group2", + selectorType = SelectorType.TAG, + selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, //CLUSTERING集群消费,多个实例负载均衡 BROADCASTING 广播消费模式,每个实例都全量消费 + consumeMode = ConsumeMode.CONCURRENTLY, // CONCURRENTLY 并发消费 ORDERLY顺序消费消息。一个队列,一个线程 + consumeThreadMax = 64 , // 注意,这个参数已废弃,由下面的consumeThreadNumber替代 + consumeThreadNumber = 20 // 消费者的最大线程数配置。 默认值是 20 +) +/** + * 使用MessageExt原生对象可以获取更多的信息 + */ +public class RocketMQTestMstConsumer implements RocketMQListener { + @Override + public void onMessage(MessageExt message) { +// String brokerName; +// int queueId; +// int storeSize; +// long queueOffset; +// int sysFlag; +// long bornTimestamp; +// SocketAddress bornHost; +// long storeTimestamp; +// SocketAddress storeHost; +// String msgId; +// long commitLogOffset; +// int bodyCRC; +// int reconsumeTimes; +// long preparedTransactionOffset; + System.out.println("消息接收成功,消息的内容为:" + + new String(message.getBody()) + + "\n消息对象为:" + message); + } +} diff --git a/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/producer/RocketMQProducer.java b/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/producer/RocketMQProducer.java new file mode 100644 index 0000000..2c3ff0a --- /dev/null +++ b/src/main/java/com/yongfeng/lg/springboot3jdk17/mq/producer/RocketMQProducer.java @@ -0,0 +1,18 @@ +package com.yongfeng.lg.springboot3jdk17.mq.producer; + +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class RocketMQProducer { + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + // 发送简单消息 + public void sendMessage(String topic, String message) { + rocketMQTemplate.convertAndSend(topic, message); + System.out.println("消息发送成功,发送内容为: " + message); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index b01c1a6..952eb02 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,3 +1,10 @@ # 端口号 server.port=8080 +rocketmq.name-server=10.232.12.251:9876;10.232.112.143:9876 +rocketmq.producer.group=springboot3-jdk17--producer-group +rocketmq.producer.send-message-timeout=3000 +rocketmq.producer.retry-times-when-send-failed=2 +rocketmq.producer.retry-next-server=true +rocketmq.consumer.group=springboot-consumer-group +rocketmq.consumer.topic=test-topic \ No newline at end of file diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..c490691 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,79 @@ + + + + + + + ./logs + + %d{yyyy-MM-dd HH:mm:ss.SSS} | %t | %-5level | %logger{50} | %msg%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file