在 spring 项目中集成 rocketmq 是非常常见的消息队列应用场景,我会以 spring boot + rocketmq 5.x(当前主流版本)为例,提供完整、可直接运行的集成方案,包括生产者、消费者的核心代码和配置说明。
一、前置条件
- 已安装并启动 rocketmq(nameserver + broker),默认端口:nameserver
9876 - spring boot 版本建议:
2.7.x或3.x(兼容 rocketmq 官方 starter) - 开发环境:jdk 8+
二、核心依赖引入
在 pom.xml 中添加 rocketmq 与 spring boot 集成的官方 starter:
<!-- spring boot 基础依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter</artifactid>
</dependency>
<!-- rocketmq spring boot starter(官方推荐) -->
<dependency>
<groupid>org.apache.rocketmq</groupid>
<artifactid>rocketmq-spring-boot-starter</artifactid>
<version>2.2.3</version> <!-- 适配 rocketmq 5.x,兼容 spring boot 2/3 -->
</dependency>
<!-- 可选:测试依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>三、核心配置(application.yml)
在 resources 目录下配置 rocketmq 连接信息:
spring:
application:
name: rocketmq-demo # 应用名称
# rocketmq 核心配置
rocketmq:
name-server: 127.0.0.1:9876 # nameserver 地址(集群用逗号分隔)
producer:
group: demo-producer-group # 生产者组(必填,标识同一类生产者)
send-message-timeout: 3000 # 发送超时时间,默认3000ms
compress-message-body-threshold: 4096 # 消息压缩阈值,默认4096字节
max-message-size: 4194304 # 最大消息大小,默认4mb
retry-times-when-send-failed: 2 # 同步发送失败重试次数
retry-times-when-send-async-failed: 2 # 异步发送失败重试次数四、生产者实现(3 种发送方式)
1. 基础同步发送(最常用)
适用于需要确认发送结果的场景(如订单创建、库存扣减):
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
@component
public class rocketmqproducer {
// 注入官方封装的 rocketmq 模板(类似 rabbittemplate)
@autowired
private rocketmqtemplate rocketmqtemplate;
/**
* 同步发送消息(阻塞等待结果)
* @param topic 消息主题(必填,需提前创建)
* @param message 消息内容
* @return 发送结果
*/
public sendresult sendsyncmessage(string topic, string message) {
try {
// 发送格式:"topic:tag"(tag可选,用于消息过滤)
sendresult sendresult = rocketmqtemplate.syncsend(topic + ":demotag", message);
system.out.println("同步发送成功,消息id:" + sendresult.getmsgid());
return sendresult;
} catch (exception e) {
system.err.println("同步发送失败:" + e.getmessage());
// 业务异常处理(如重试、记录日志、告警)
throw new runtimeexception("消息发送失败", e);
}
}
/**
* 异步发送消息(非阻塞,回调通知结果)
* @param topic 主题
* @param message 消息内容
*/
public void sendasyncmessage(string topic, string message) {
rocketmqtemplate.asyncsend(
topic + ":demotag",
message,
// 发送成功回调
sendresult -> system.out.println("异步发送成功,消息id:" + sendresult.getmsgid()),
// 发送失败回调
throwable -> system.err.println("异步发送失败:" + throwable.getmessage())
);
}
/**
* 单向发送消息(无回调,适用于日志、埋点等不关心结果的场景)
* @param topic 主题
* @param message 消息内容
*/
public void sendonewaymessage(string topic, string message) {
rocketmqtemplate.sendoneway(topic + ":demotag", message);
system.out.println("单向消息发送请求已提交");
}
}2. 发送自定义对象消息
如果需要发送 java 对象(而非字符串),只需保证对象可序列化:
// 自定义消息实体(实现 serializable)
public class ordermessage implements serializable {
private long orderid;
private string orderno;
private bigdecimal amount;
// 省略 getter/setter/tostring
}
// 生产者中新增方法
public sendresult sendobjectmessage(string topic, ordermessage ordermessage) {
return rocketmqtemplate.syncsend(topic + ":ordertag", ordermessage);
}五、消费者实现(2 种消费模式)
1. 普通消费(默认集群模式)
适用于多实例负载均衡消费(同一组消费者分摊消息):
import org.apache.rocketmq.spring.annotation.consumemode;
import org.apache.rocketmq.spring.annotation.messagemodel;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.component;
/**
* rocketmq 消费者
* - topic:订阅的主题(需与生产者一致)
* - consumergroup:消费者组(必填,同一组消费同一主题)
* - messagemodel:消费模式(clustering 集群模式,broadcasting 广播模式)
* - consumemode:消费方式(concurrently 并发消费,orderly 顺序消费)
*/
@component
@rocketmqmessagelistener(
topic = "demo_topic", // 订阅主题
consumergroup = "demo-consumer-group", // 消费者组
messagemodel = messagemodel.clustering, // 集群模式(默认)
consumemode = consumemode.concurrently // 并发消费(默认)
)
public class rocketmqconsumer implements rocketmqlistener<string> {
/**
* 消息消费逻辑(接收到消息时触发)
* @param message 消息内容(与生产者发送类型一致)
*/
@override
public void onmessage(string message) {
try {
// 核心业务逻辑:如解析消息、处理订单、更新库存等
system.out.println("接收到消息:" + message);
// 消费成功无需返回,抛出异常则会触发重试
} catch (exception e) {
system.err.println("消息消费失败:" + e.getmessage());
// 异常抛出后,rocketmq 会自动重试(默认最多16次)
throw new runtimeexception("消费失败", e);
}
}
}2. 消费自定义对象消息
如果生产者发送的是自定义对象,消费者需指定泛型为对应类型:
@component
@rocketmqmessagelistener(
topic = "demo_topic",
consumergroup = "order-consumer-group"
)
public class ordermessageconsumer implements rocketmqlistener<ordermessage> {
@override
public void onmessage(ordermessage ordermessage) {
system.out.println("接收到订单消息:" + ordermessage);
// 处理订单业务逻辑
}
}六、测试验证
编写测试类,验证生产者发送、消费者接收是否正常:
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
@springboottest
public class rocketmqdemotest {
@autowired
private rocketmqproducer rocketmqproducer;
@test
public void testsendsyncmessage() {
// 发送消息到 demo_topic 主题
rocketmqproducer.sendsyncmessage("demo_topic", "hello rocketmq + spring boot!");
// 暂停3秒,确保消费者能接收到消息
try {
thread.sleep(3000);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
}
}
}七、关键注意事项
- 主题 / 组命名规范:避免特殊字符,建议用
业务_模块_主题格式(如order_pay_topic)。 - 重试机制:消费失败默认重试 16 次,可通过
maxreconsumetimes配置重试次数。 - 消息持久化:rocketmq 默认持久化消息,即使消费者宕机,重启后仍能消费未处理的消息。
- 顺序消费:如需保证消息顺序,需将
consumemode设为orderly,且生产者发送时指定同一消息队列。 - 异常处理:生产环境建议对接告警(如钉钉、短信),避免消费失败无感知。
总结
- spring boot 集成 rocketmq 的核心是引入官方
rocketmq-spring-boot-starter,配置 nameserver 地址和生产 / 消费组。 - 生产者通过
rocketmqtemplate实现同步 / 异步 / 单向发送,支持字符串和自定义对象消息。 - 消费者通过
@rocketmqmessagelistener注解声明订阅关系,实现rocketmqlistener接口处理消息逻辑,默认集群模式并发消费。
核心关键点:主题与消费组必须配置正确,消费失败抛出异常会触发自动重试,生产环境需做好异常监控和重试次数限制。
到此这篇关于spring rocketmq集成的文章就介绍到这了,更多相关spring rocketmq集成内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论