spring boot 3 集成 rabbitmq 实践指南
1. rabbitmq 核心原理
1.1 什么是rabbitmq
rabbitmq是一个开源的消息代理和队列服务器,使用erlang语言开发,基于amqp(advanced message queuing protocol)协议实现。它支持多种消息传递模式,具有高可用性、可扩展性和可靠性等特点。
1.2 核心概念
1.2.1 基础组件
producer(生产者)
- 消息的发送者
- 负责创建消息并发布到rabbitmq中
consumer(消费者)
- 消息的接收者
- 连接到rabbitmq服务器并订阅队列
exchange(交换机)
- 接收生产者发送的消息并根据路由规则转发到队列
- 类型:
- direct exchange:根据routing key精确匹配
- topic exchange:根据routing key模式匹配
- fanout exchange:广播到所有绑定队列
- headers exchange:根据消息属性匹配
queue(队列)
- 消息存储的地方
- 支持持久化、临时、自动删除等特性
binding(绑定)
- 交换机和队列之间的虚拟连接
- 定义消息路由规则
1.2.2 高级特性
消息持久化
- 交换机持久化:创建时设置durable=true
- 队列持久化:创建时设置durable=true
- 消息持久化:设置delivery-mode=2
消息确认机制
- 生产者确认:publisher confirm和return机制
- 消费者确认:自动确认、手动确认、批量确认
死信队列(dlx)
- 消息被拒绝且不重新入队
- 消息过期(ttl)
- 队列达到最大长度
1.3 应用场景
异步处理
- 发送邮件、短信通知
- 日志处理、报表生成
- 文件处理、图片处理
应用解耦
- 系统间通信
- 服务解耦
- 流程分离
流量控制
- 削峰填谷
- 请求缓冲
- 流量整形
定时任务
- 延迟队列
- 定时处理
- 任务调度
2. 环境搭建
2.1 基础环境
- spring boot: 3.x
- java: 17+
- rabbitmq: 3.12+
- maven/gradle
2.2 依赖配置
<dependencies>
<!-- spring boot starter amqp -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
<!-- spring boot starter web -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<!-- lombok -->
<dependency>
<groupid>org.projectlombok</groupid>
<artifactid>lombok</artifactid>
<optional>true</optional>
</dependency>
<!-- jackson -->
<dependency>
<groupid>com.fasterxml.jackson.core</groupid>
<artifactid>jackson-databind</artifactid>
</dependency>
</dependencies>2.3 基础配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 消息确认配置
publisher-confirm-type: correlated # 开启发布确认
publisher-returns: true # 开启发布返回
template:
mandatory: true # 消息路由失败返回
# 消费者配置
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 每次获取消息数量
retry:
enabled: true # 开启重试
initial-interval: 1000 # 重试间隔时间
max-attempts: 3 # 最大重试次数
multiplier: 1.0 # 重试时间乘数
# ssl配置(可选)
ssl:
enabled: false
key-store: classpath:keystore.p12
key-store-password: password
trust-store: classpath:truststore.p12
trust-store-password: password3. 核心配置类
3.1 rabbitmq配置类
@configuration
@enablerabbit
public class rabbitmqconfig {
// 交换机名称
public static final string business_exchange = "business.exchange";
public static final string dead_letter_exchange = "dead.letter.exchange";
// 队列名称
public static final string business_queue = "business.queue";
public static final string dead_letter_queue = "dead.letter.queue";
// 路由键
public static final string business_key = "business.key";
public static final string dead_letter_key = "dead.letter.key";
// 业务交换机
@bean
public directexchange businessexchange() {
return exchangebuilder.directexchange(business_exchange)
.durable(true)
.build();
}
// 死信交换机
@bean
public directexchange deadletterexchange() {
return exchangebuilder.directexchange(dead_letter_exchange)
.durable(true)
.build();
}
// 业务队列
@bean
public queue businessqueue() {
map<string, object> args = new hashmap<>(3);
// 消息过期时间
args.put("x-message-ttl", 60000);
// 队列最大长度
args.put("x-max-length", 1000);
// 死信交换机
args.put("x-dead-letter-exchange", dead_letter_exchange);
args.put("x-dead-letter-routing-key", dead_letter_key);
return queuebuilder.durable(business_queue)
.witharguments(args)
.build();
}
// 死信队列
@bean
public queue deadletterqueue() {
return queuebuilder.durable(dead_letter_queue).build();
}
// 业务绑定
@bean
public binding businessbinding() {
return bindingbuilder.bind(businessqueue())
.to(businessexchange())
.with(business_key);
}
// 死信绑定
@bean
public binding deadletterbinding() {
return bindingbuilder.bind(deadletterqueue())
.to(deadletterexchange())
.with(dead_letter_key);
}
// 消息转换器
@bean
public messageconverter messageconverter() {
return new jackson2jsonmessageconverter();
}
// rabbittemplate配置
@bean
public rabbittemplate rabbittemplate(connectionfactory connectionfactory) {
rabbittemplate rabbittemplate = new rabbittemplate(connectionfactory);
rabbittemplate.setmessageconverter(messageconverter());
return rabbittemplate;
}
}3.2 消息确认配置
@configuration
@slf4j
public class rabbitconfirmconfig implements rabbittemplate.confirmcallback, rabbittemplate.returnscallback {
@autowired
private rabbittemplate rabbittemplate;
@postconstruct
public void init() {
rabbittemplate.setconfirmcallback(this);
rabbittemplate.setreturnscallback(this);
}
@override
public void confirm(correlationdata correlationdata, boolean ack, string cause) {
if (ack) {
log.info("消息发送到交换机成功: correlationdata={}", correlationdata);
} else {
log.error("消息发送到交换机失败: correlationdata={}, cause={}", correlationdata, cause);
// 处理失败逻辑,如重试、告警等
}
}
@override
public void returnedmessage(returnedmessage returned) {
log.error("消息路由到队列失败: exchange={}, routingkey={}, replycode={}, replytext={}, message={}",
returned.getexchange(),
returned.getroutingkey(),
returned.getreplycode(),
returned.getreplytext(),
new string(returned.getmessage().getbody()));
// 处理失败逻辑,如重试、告警等
}
}4. 消息生产者
4.1 消息发送服务
@service
@slf4j
public class messageproducer {
@autowired
private rabbittemplate rabbittemplate;
public void sendmessage(object message, string exchange, string routingkey) {
correlationdata correlationdata = new correlationdata(uuid.randomuuid().tostring());
try {
rabbittemplate.convertandsend(exchange, routingkey, message, correlationdata);
log.info("消息发送成功: message={}, exchange={}, routingkey={}, correlationdata={}",
message, exchange, routingkey, correlationdata);
} catch (exception e) {
log.error("消息发送异常: message={}, exchange={}, routingkey={}, correlationdata={}, error={}",
message, exchange, routingkey, correlationdata, e.getmessage());
throw new runtimeexception("消息发送失败", e);
}
}
public void senddelaymessage(object message, string exchange, string routingkey, long delaymillis) {
correlationdata correlationdata = new correlationdata(uuid.randomuuid().tostring());
messagepostprocessor messagepostprocessor = msg -> {
msg.getmessageproperties().setdelay((int) delaymillis);
return msg;
};
try {
rabbittemplate.convertandsend(exchange, routingkey, message, messagepostprocessor, correlationdata);
log.info("延迟消息发送成功: message={}, exchange={}, routingkey={}, delay={}, correlationdata={}",
message, exchange, routingkey, delaymillis, correlationdata);
} catch (exception e) {
log.error("延迟消息发送异常: message={}, exchange={}, routingkey={}, delay={}, correlationdata={}, error={}",
message, exchange, routingkey, delaymillis, correlationdata, e.getmessage());
throw new runtimeexception("延迟消息发送失败", e);
}
}
}5. 消息消费者
5.1 消息处理服务
@service
@slf4j
public class messageconsumer {
@rabbitlistener(queues = rabbitmqconfig.business_queue)
public void handlemessage(message message, channel channel) throws ioexception {
long deliverytag = message.getmessageproperties().getdeliverytag();
try {
// 获取消息内容
string messagebody = new string(message.getbody());
log.info("收到消息: message={}, deliverytag={}", messagebody, deliverytag);
// 业务处理
processmessage(messagebody);
// 手动确认消息
channel.basicack(deliverytag, false);
log.info("消息处理成功: deliverytag={}", deliverytag);
} catch (exception e) {
log.error("消息处理异常: deliverytag={}, error={}", deliverytag, e.getmessage());
// 判断是否重新投递
if (message.getmessageproperties().getredelivered()) {
log.error("消息已重试,拒绝消息: deliverytag={}", deliverytag);
channel.basicreject(deliverytag, false);
} else {
log.info("消息首次处理失败,重新投递: deliverytag={}", deliverytag);
channel.basicnack(deliverytag, false, true);
}
}
}
private void processmessage(string message) {
// 实现具体的业务逻辑
log.info("处理消息: {}", message);
}
}5.2 死信消息处理
@service
@slf4j
public class deadletterconsumer {
@rabbitlistener(queues = rabbitmqconfig.dead_letter_queue)
public void handledeadletter(message message, channel channel) throws ioexception {
long deliverytag = message.getmessageproperties().getdeliverytag();
try {
string messagebody = new string(message.getbody());
log.info("收到死信消息: message={}, deliverytag={}", messagebody, deliverytag);
// 死信消息处理逻辑
processdeadletter(messagebody);
channel.basicack(deliverytag, false);
log.info("死信消息处理成功: deliverytag={}", deliverytag);
} catch (exception e) {
log.error("死信消息处理异常: deliverytag={}, error={}", deliverytag, e.getmessage());
channel.basicreject(deliverytag, false);
}
}
private void processdeadletter(string message) {
// 实现死信消息处理逻辑
log.info("处理死信消息: {}", message);
}
}6. 接口控制器
@restcontroller
@requestmapping("/api/mq")
@slf4j
public class messagecontroller {
@autowired
private messageproducer messageproducer;
@postmapping("/send")
public responseentity<string> sendmessage(@requestbody messagedto message) {
try {
messageproducer.sendmessage(message.getcontent(),
rabbitmqconfig.business_exchange,
rabbitmqconfig.business_key);
return responseentity.ok("消息发送成功");
} catch (exception e) {
log.error("消息发送失败: {}", e.getmessage());
return responseentity.status(httpstatus.internal_server_error)
.body("消息发送失败: " + e.getmessage());
}
}
@postmapping("/send/delay")
public responseentity<string> senddelaymessage(
@requestbody messagedto message,
@requestparam long delaymillis) {
try {
messageproducer.senddelaymessage(message.getcontent(),
rabbitmqconfig.business_exchange,
rabbitmqconfig.business_key,
delaymillis);
return responseentity.ok("延迟消息发送成功");
} catch (exception e) {
log.error("延迟消息发送失败: {}", e.getmessage());
return responseentity.status(httpstatus.internal_server_error)
.body("延迟消息发送失败: " + e.getmessage());
}
}
}7. 监控与运维
7.1 rabbitmq管理界面
- 访问地址:http://localhost:15672
- 默认账号:guest/guest
- 主要功能:
- 队列监控
- 交换机管理
- 连接状态
- 消息追踪
7.2 prometheus + grafana监控
# prometheus.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['localhost:15692']7.3 日志配置
logging:
level:
org.springframework.amqp: info
com.your.package: debug
pattern:
console: "%d{yyyy-mm-dd hh:mm:ss} [%thread] %-5level %logger{36} - %msg%n"7.4 告警配置
@configuration
public class rabbitmqalertconfig {
@value("${alert.dingtalk.webhook}")
private string webhookurl;
@bean
public alertservice alertservice() {
return new dingtalkalertservice(webhookurl);
}
}8. 最佳实践
8.1 消息幂等性处理
@service
public class messageidempotenthandler {
@autowired
private redistemplate<string, string> redistemplate;
public boolean isprocessed(string messageid) {
string key = "mq:processed:" + messageid;
return boolean.true.equals(redistemplate.opsforvalue().setifabsent(key, "1", 24, timeunit.hours));
}
}8.2 消息重试策略
@configuration
public class retryconfig {
@bean
public retrytemplate retrytemplate() {
retrytemplate retrytemplate = new retrytemplate();
fixedbackoffpolicy backoffpolicy = new fixedbackoffpolicy();
backoffpolicy.setbackoffperiod(1000);
retrytemplate.setbackoffpolicy(backoffpolicy);
simpleretrypolicy retrypolicy = new simpleretrypolicy();
retrypolicy.setmaxattempts(3);
retrytemplate.setretrypolicy(retrypolicy);
return retrytemplate;
}
}8.3 消息序列化
@configuration
public class messageconverterconfig {
@bean
public messageconverter jsonmessageconverter() {
jackson2jsonmessageconverter converter = new jackson2jsonmessageconverter();
converter.setcreatemessageids(true);
return converter;
}
}8.4 消息追踪
@aspect
@component
@slf4j
public class messagetraceaspect {
@around("@annotation(org.springframework.amqp.rabbit.annotation.rabbitlistener)")
public object tracemessage(proceedingjoinpoint joinpoint) throws throwable {
string messageid = mdc.get("messageid");
log.info("开始处理消息: messageid={}", messageid);
try {
object result = joinpoint.proceed();
log.info("消息处理完成: messageid={}", messageid);
return result;
} catch (exception e) {
log.error("消息处理异常: messageid={}, error={}", messageid, e.getmessage());
throw e;
}
}
}9. 常见问题与解决方案
9.1 消息丢失问题
- 生产者确认机制
- 消息持久化
- 手动确认模式
- 集群高可用
9.2 消息重复消费
- 幂等性处理
- 消息去重
- 业务检查
9.3 消息堆积问题
- 增加消费者数量
- 提高处理效率
- 队列分片
- 死信队列处理
9.4 性能优化
- 合理设置预取数量
- 批量确认消息
- 消息压缩
- 连接池优化
10. 高可用部署
10.1 集群配置
spring:
rabbitmq:
addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672
username: admin
password: password
virtual-host: /10.2 镜像队列
# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'10.3 负载均衡
# nginx.conf
upstream rabbitmq_cluster {
server rabbit1:15672;
server rabbit2:15672;
server rabbit3:15672;
}11. 参考资源
到此这篇关于spring boot 3 集成 rabbitmq 实践指南的文章就介绍到这了,更多相关spring boot 3 集成 rabbitmq内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论