在分布式系统开发中,消息队列是解耦服务、提升可靠性的关键组件。rabbitmq 作为业界广泛使用的消息中间件,与 spring boot 的整合能显著提升开发效率。本文将手把手教你完成 spring boot 与 rabbitmq 的整合,涵盖基础配置、消息发送与接收,以及高级特性如手动确认和消息确认机制。
1.环境准备
确保已安装 jdk 1.8+、maven 3.0+ 和 rabbitmq 服务。rabbitmq 可通过 docker 快速启动:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
2.创建 spring boot 项目
使用 spring initializr 创建项目,添加以下依赖:
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>3.配置 rabbitmq 连接
在 src/main/resources/application.yml中配置连接参数:
spring:
rabbitmq:
host: localhost # rabbitmq 服务器地址
port: 5672 # 默认端口
username: guest # 默认用户名
password: guest # 默认密码
virtual-host: / # 虚拟主机(默认)
publisher-confirm-type: correlated # 开启消息确认
publisher-returns: true # 开启消息失败回调
listener:
simple:
acknowledge-mode: manual # 手动确认消息
prefetch: 1 # 每次预取的消息数4.声明队列和交换机
创建配置类定义队列、交换机及绑定关系:
@configuration
public class rabbitconfig {
// 直连交换机
public static final string direct_exchange = "direct_exchange";
// 队列
public static final string direct_queue = "direct_queue";
// 路由键
public static final string routing_key = "routing_key";
@bean
public exchange directexchange() {
return exchangebuilder.directexchange(direct_exchange).build();
}
@bean
public queue directqueue() {
return new queue(direct_queue, true); // durable=true 表示持久化
}
@bean
public binding binding(queue queue, exchange exchange) {
return bindingbuilder.bind(queue).to(exchange).with(routing_key);
}
}5.生产者(发送消息)
创建 rabbitproducer.java 注入 rabbittemplate 发送消息:
@service
public class rabbitproducer {
@autowired
private rabbittemplate rabbittemplate;
public void sendmessage(string message) {
rabbittemplate.convertandsend(
rabbitconfig.direct_exchange,
rabbitconfig.routing_key,
message
);
}
}6.消费者(接收消息)
创建 rabbitconsumer.java 监听队列:
@service
public class rabbitconsumer {
@rabbitlistener(queues = rabbitconfig.direct_queue)
public void receivemessage(string message) {
system.out.println("收到消息: " + message);
// 手动确认消息(需配置 acknowledge-mode: manual)
// 实际项目中需处理异常和重试逻辑
}
}7.消息确认机制
生产者确认
在 application.yml 中已配置 publisher-confirm-type: correlated,可通过回调确认消息是否成功到达交换机:
@bean
public rabbittemplate rabbittemplate(connectionfactory connectionfactory) {
rabbittemplate template = new rabbittemplate(connectionfactory);
template.setconfirmcallback((correlationdata, ack, cause) -> {
if (ack) {
system.out.println("消息确认成功");
} else {
system.err.println("消息确认失败: " + cause);
}
});
template.setmandatory(true); // 开启强制回调
return template;
}消费者确认:配置 acknowledge-mode: manual 后,需在消费者中手动确认:
@rabbitlistener(queues = rabbitconfig.direct_queue)
public void receivemessage(string message, channel channel, @header(amqpheaders.delivery_tag) long deliverytag) {
try {
system.out.println("处理消息: " + message);
channel.basicack(deliverytag, false); // 确认消息
} catch (exception e) {
channel.basicnack(deliverytag, false, true); // 拒绝消息并重新入队
}
}
8.死信队列配置
处理无法被消费的消息,创建死信队列并绑定:
@bean
public queue deadletterqueue() {
map<string, object> args = new hashmap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 指定死信交换机
args.put("x-dead-letter-routing-key", "dlx_routing_key");
return new queue("dlx_queue", true, false, false, args);
}
9.测试与验证
确保 rabbitmq 服务已启动(默认端口 15672 为管理界面)。
创建 rabbitmqapplicationtests.java:
@springboottest
@autoconfiguremockmvc
public class rabbitmqapplicationtests {
@autowired
private rabbitproducer producer;
@test
public void testsendmessage() {
producer.sendmessage("hello rabbitmq!");
}
}10.常见问题与解决方案
连接失败:检查 host 和 port 是否正确,确保 rabbitmq 服务已启动。
消息未消费:确认消费者监听的是正确的队列,检查 acknowledge-mode 配置。
性能调优:调整 prefetch-count 和 concurrency 参数优化吞吐量。
11.总结
本文从零开始实现了 spring boot 与 rabbitmq 的整合,涵盖基础配置、消息发送与接收、高级特性(如手动确认和死信队列)。通过合理使用这些特性,可以构建高可靠、高性能的分布式系统。如需进一步学习,可参考 rabbitmq 官方文档。
到此这篇关于spring boot 整合 rabbitmq从入门到实战步骤的文章就介绍到这了,更多相关spring boot 整合 rabbitmq内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论