当前位置:首页 > 技术 > 正文内容

在Spring Boot中集成RocketMQ实现消息通信

访客 技术 2026年6月2日 1

环境准备与本地部署

下载并安装 RocketMQ 5.3.0 版本:

https://dlcdn.apache.org/rocketmq/5.3.0/

调整启动脚本内存配置:

  • runserver.sh 中设置JVM参数:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  • runbroker.sh 中配置:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"

修改 conf/broker.conf 文件以适配本地运行:

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
brokerIP1=127.0.0.1

启动 NameServer 与 Broker

启动命名服务:

nohup sh mqnamesrv &
ps aux | grep mqnamesrv

查看日志确认启动成功:

tail -f ~/logs/rocketmqlogs/namesrv.log
# 输出示例:
2025-04-15 16:58:33 INFO main - The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

启动消息代理节点:

nohup sh mqbroker -n 127.0.0.1:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
# 正常输出:
2025-04-15 17:01:33 INFO main - The broker[U-4NFQQ751-2056.local, 30.177.33.6:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

关闭服务命令:

sh mqshutdown namesrv
sh mqshutdown broker

Spring Boot 集成配置

在项目中引入 Spring Cloud Stream 与 RocketMQ 支持依赖(基于 Spring Boot 2.3.2.RELEASE):

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>

消息生产者定义

通过接口声明消息通道,绑定到特定主题:

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MessageProducer {
    String OUTPUT_TOPIC1 = "output-topic1";
    String OUTPUT_TOPIC2 = "output-topic2";

    @Output(OUTPUT_TOPIC1)
    MessageChannel outputTopic1();

    @Output(OUTPUT_TOPIC2)
    MessageChannel outputTopic2();
}

@Service
@EnableBinding(MessageProducer.class)
public class MessageProducerService {

    @Autowired
    private MessageProducer messageProducer;

    public void sendMessageToTopic1(String content, String tag, String region) {
        messageProducer.outputTopic1().send(MessageBuilder.withPayload(content)
                .setHeader("rocketmq_TAGS", tag)
                .setHeader("region", region)
                .build());
    }

    public void sendMessageToTopic2(String content, String tag, String region) {
        messageProducer.outputTopic2().send(MessageBuilder.withPayload(content)
                .setHeader("rocketmq_TAGS", tag)
                .setHeader("region", region)
                .build());
    }
}

HTTP 接口触发发送

@RestController
public class MessageController {

    @Autowired
    private MessageProducerService producerService;

    @GetMapping("/send/topic1")
    public String sendToTopic1(@RequestParam String message, @RequestParam String tag, @RequestParam String region) {
        producerService.sendMessageToTopic1(message, tag, region);
        return "已发送至 Topic1: " + message + ", 标签: " + tag + ", 区域: " + region;
    }

    @GetMapping("/send/topic2")
    public String sendToTopic2(@RequestParam String message, @RequestParam String tag, @RequestParam String region) {
        producerService.sendMessageToTopic2(message, tag, region);
        return "已发送至 Topic2: " + message + ", 标签: " + tag + ", 区域: " + region;
    }
}

消息消费者处理逻辑

定义输入通道,并使用注解监听指定主题和标签条件:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface CustomBindings {
    String INPUT_TOPIC1 = "input-topic1";
    String INPUT_TOPIC2 = "input-topic2";

    @Input(INPUT_TOPIC1)
    SubscribableChannel inputTopic1();

    @Input(INPUT_TOPIC2)
    SubscribableChannel inputTopic2();
}

@Service
public class MessageConsumer {

    @StreamListener(value = "input-topic1", condition = "headers['rocketmq_TAGS'] == 'TagA'")
    public void handleTopic1Message(Message<String> message) {
        System.out.println("接收到 Topic1 消息: " + message.getPayload());
    }

    @StreamListener(value = "input-topic2", condition = "headers['rocketmq_TAGS'] == 'TagB'")
    public void handleTopic2Message(Message<String> message) {
        System.out.println("接收到 Topic2 消息: " + message.getPayload());
    }
}

测试验证

通过浏览器访问以下链接,观察消费端日志是否打印:

http://localhost:8081/send/topic1?message=test113&tag=TagA&region=Hangzhou

尝试更换不同标签(如 TagB),观察过滤机制是否生效。

关键机制说明

利用 RocketMQ 的标签(Tag)机制实现消息的精细筛选。Spring Cloud Stream 通过 condition 表达式对消息头中的 rocketmq_TAGS 字段进行判断,仅当匹配时才触发对应处理器。

扩展建议

每个消息主题可独立封装为一个服务类,便于维护与职责分离。例如:OrderMessageProducerNotificationMessageConsumer 等,提升代码结构清晰度与可复用性。

标签: rocketmq

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。