在Spring Boot中集成RocketMQ实现消息通信
环境准备与本地部署
下载并安装 RocketMQ 5.3.0 版本:
https://dlcdn.apache.org/rocketmq/5.3.0/
调整启动脚本内存配置:
- runserver.sh 中设置JVM参数:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- runbroker.sh 中配置:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
修改 conf/broker.conf 文件以适配本地运行:
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
brokerIP1=127.0.0.1
启动 NameServer 与 Broker
启动命名服务:
nohup sh mqnamesrv &
ps aux | grep mqnamesrv
查看日志确认启动成功:
tail -f ~/logs/rocketmqlogs/namesrv.log
# 输出示例:
2025-04-15 16:58:33 INFO main - The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
启动消息代理节点:
nohup sh mqbroker -n 127.0.0.1:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
# 正常输出:
2025-04-15 17:01:33 INFO main - The broker[U-4NFQQ751-2056.local, 30.177.33.6:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
关闭服务命令:
sh mqshutdown namesrv
sh mqshutdown broker
Spring Boot 集成配置
在项目中引入 Spring Cloud Stream 与 RocketMQ 支持依赖(基于 Spring Boot 2.3.2.RELEASE):
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
消息生产者定义
通过接口声明消息通道,绑定到特定主题:
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MessageProducer {
String OUTPUT_TOPIC1 = "output-topic1";
String OUTPUT_TOPIC2 = "output-topic2";
@Output(OUTPUT_TOPIC1)
MessageChannel outputTopic1();
@Output(OUTPUT_TOPIC2)
MessageChannel outputTopic2();
}
@Service
@EnableBinding(MessageProducer.class)
public class MessageProducerService {
@Autowired
private MessageProducer messageProducer;
public void sendMessageToTopic1(String content, String tag, String region) {
messageProducer.outputTopic1().send(MessageBuilder.withPayload(content)
.setHeader("rocketmq_TAGS", tag)
.setHeader("region", region)
.build());
}
public void sendMessageToTopic2(String content, String tag, String region) {
messageProducer.outputTopic2().send(MessageBuilder.withPayload(content)
.setHeader("rocketmq_TAGS", tag)
.setHeader("region", region)
.build());
}
}
HTTP 接口触发发送
@RestController
public class MessageController {
@Autowired
private MessageProducerService producerService;
@GetMapping("/send/topic1")
public String sendToTopic1(@RequestParam String message, @RequestParam String tag, @RequestParam String region) {
producerService.sendMessageToTopic1(message, tag, region);
return "已发送至 Topic1: " + message + ", 标签: " + tag + ", 区域: " + region;
}
@GetMapping("/send/topic2")
public String sendToTopic2(@RequestParam String message, @RequestParam String tag, @RequestParam String region) {
producerService.sendMessageToTopic2(message, tag, region);
return "已发送至 Topic2: " + message + ", 标签: " + tag + ", 区域: " + region;
}
}
消息消费者处理逻辑
定义输入通道,并使用注解监听指定主题和标签条件:
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface CustomBindings {
String INPUT_TOPIC1 = "input-topic1";
String INPUT_TOPIC2 = "input-topic2";
@Input(INPUT_TOPIC1)
SubscribableChannel inputTopic1();
@Input(INPUT_TOPIC2)
SubscribableChannel inputTopic2();
}
@Service
public class MessageConsumer {
@StreamListener(value = "input-topic1", condition = "headers['rocketmq_TAGS'] == 'TagA'")
public void handleTopic1Message(Message<String> message) {
System.out.println("接收到 Topic1 消息: " + message.getPayload());
}
@StreamListener(value = "input-topic2", condition = "headers['rocketmq_TAGS'] == 'TagB'")
public void handleTopic2Message(Message<String> message) {
System.out.println("接收到 Topic2 消息: " + message.getPayload());
}
}
测试验证
通过浏览器访问以下链接,观察消费端日志是否打印:
http://localhost:8081/send/topic1?message=test113&tag=TagA®ion=Hangzhou
尝试更换不同标签(如 TagB),观察过滤机制是否生效。
关键机制说明
利用 RocketMQ 的标签(Tag)机制实现消息的精细筛选。Spring Cloud Stream 通过 condition 表达式对消息头中的 rocketmq_TAGS 字段进行判断,仅当匹配时才触发对应处理器。
扩展建议
每个消息主题可独立封装为一个服务类,便于维护与职责分离。例如:OrderMessageProducer、NotificationMessageConsumer 等,提升代码结构清晰度与可复用性。