消息队列(mq)作为分布式系统的核心组件,核心价值是「异步通信、系统解耦、流量削峰」—— 通过消息中间件实现服务间的异步交互,避免服务直接调用导致的耦合,同时缓冲高并发流量(如秒杀、订单峰值),保障系统稳定性。主流消息队列中,rabbitmq 适合复杂路由、低延迟场景,kafka 适合高吞吐、大数据场景。
本文聚焦 springboot 集成 rabbitmq 与 kafka 的完整实战,嵌入可直接复用的代码教学,覆盖生产者、消费者、消息路由、可靠性保障等核心能力,帮你快速落地异步通信场景,解决服务耦合、流量削峰等问题。
一、核心认知:消息队列的核心价值与选型
1. 核心价值
- 系统解耦:服务间通过消息通信,无需感知对方存在,修改一个服务不影响其他服务;
- 异步通信:无需同步等待服务响应,发送消息后立即返回,提升接口响应速度;
- 流量削峰:高并发场景下,消息队列缓冲请求,消费者按能力消费,避免下游服务被压垮;
- 可靠投递:通过持久化、确认机制,确保消息不丢失、不重复消费。
2. 选型对比(rabbitmq vs kafka)
| 特性 | rabbitmq | kafka |
|---|---|---|
| 吞吐量 | 中低吞吐 | 高吞吐(百万级 / 秒) |
| 延迟 | 低延迟(毫秒级) | 中延迟(毫秒级) |
| 路由能力 | 支持复杂路由(交换机) | 简单路由(主题分区) |
| 可靠性 | 强可靠性(确认机制完善) | 可靠性可配置 |
| 适用场景 | 订单通知、日志告警 | 大数据采集、秒杀削峰 |
二、核心实战一:springboot 集成 rabbitmq(完整代码教学)
rabbitmq 基于 amqp 协议,核心是「交换机 + 队列 + 绑定」的路由模型,支持 direct、topic、fanout 等多种交换机类型,适配复杂路由场景。
1. 环境准备
(1)安装 rabbitmq
本地部署:docker 命令快速启动(推荐)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
- 5672:消息通信端口;15672:管理界面端口(访问 http://localhost:15672,默认账号 guest/guest)。
(2)引入依赖(maven)
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>(3)配置文件(application.yml)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: / # 虚拟主机(默认/)
publisher-confirm-type: correlated # 开启生产者确认机制
publisher-returns: true # 开启消息回退机制
listener:
simple:
acknowledge-mode: manual # 消费者手动确认消息
concurrency: 2 # 消费者核心线程数
max-concurrency: 5 # 消费者最大线程数2. 核心代码实现
(1)配置类:声明交换机、队列、绑定关系
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class rabbitmqconfig {
// 交换机名称(topic交换机,支持模糊路由,最常用)
public static final string topic_exchange = "order_exchange";
// 队列名称(订单通知队列)
public static final string order_queue = "order_queue";
// 路由键(匹配规则:order.* 匹配 order.create、order.cancel 等)
public static final string routing_key = "order.#";
// 1. 声明topic交换机
@bean
public topicexchange topicexchange() {
// durable=true:交换机持久化,重启rabbitmq不丢失
return exchangebuilder.topicexchange(topic_exchange).durable(true).build();
}
// 2. 声明队列
@bean
public queue orderqueue() {
// durable=true:队列持久化;exclusive=false:不排他;autodelete=false:不自动删除
return queuebuilder.durable(order_queue).build();
}
// 3. 绑定交换机与队列(指定路由键)
@bean
public binding bindingexchangequeue(topicexchange topicexchange, queue orderqueue) {
return bindingbuilder.bind(orderqueue).to(topicexchange).with(routing_key);
}
}(2)生产者:发送消息(含确认机制,确保消息投递成功)
import org.springframework.amqp.rabbit.connection.correlationdata;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.stereotype.component;
import javax.annotation.resource;
import java.util.uuid;
@component
public class orderproducer {
@resource
private rabbittemplate rabbittemplate;
// 发送订单创建消息
public void sendordercreatemsg(long orderid, string userid) {
// 1. 构建消息内容
string msg = string.format("用户%s创建订单:%s", userid, orderid);
// 2. 消息id(用于确认机制,追踪消息)
correlationdata correlationdata = new correlationdata(uuid.randomuuid().tostring());
// 3. 发送消息(交换机、路由键、消息内容、消息id)
rabbittemplate.convertandsend(
rabbitmqconfig.topic_exchange,
"order.create", // 具体路由键(匹配 order.#)
msg,
correlationdata
);
}
// 4. 生产者确认回调(确认消息是否到达交换机)
@resource
public void setrabbittemplate(rabbittemplate rabbittemplate) {
// 消息到达交换机回调
rabbittemplate.setconfirmcallback((correlationdata, ack, cause) -> {
if (ack) {
system.out.println("消息到达交换机,消息id:" + correlationdata.getid());
} else {
system.out.println("消息未到达交换机,原因:" + cause);
// 消息投递失败,可重试或记录日志
}
});
// 消息无法路由到队列回调(回退机制)
rabbittemplate.setreturnscallback(returned -> {
system.out.println("消息无法路由,路由键:" + returned.getroutingkey() + ",原因:" + returned.getreplytext());
});
}
}(3)消费者:接收消息(手动确认,确保消息消费成功)
import com.rabbitmq.client.channel;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.io.ioexception;
@component
public class orderconsumer {
// 监听订单队列
@rabbitlistener(queues = rabbitmqconfig.order_queue)
public void consumeordermsg(string msg, channel channel, message message) throws ioexception {
try {
// 1. 处理业务逻辑(如更新订单状态、发送短信通知)
system.out.println("接收订单消息:" + msg);
// 2. 手动确认消息(multiple=false:只确认当前消息;true:确认所有未确认消息)
channel.basicack(message.getmessageproperties().getdeliverytag(), false);
} catch (exception e) {
// 3. 消息消费失败,拒绝消息并重回队列(或死信队列)
// requeue=true:重回队列;false:不重回队列(需配置死信队列处理)
channel.basicnack(message.getmessageproperties().getdeliverytag(), false, true);
system.out.println("消息消费失败,已重回队列:" + msg);
}
}
}3. 测试代码(controller 层)
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.pathvariable;
import org.springframework.web.bind.annotation.restcontroller;
import javax.annotation.resource;
@restcontroller
public class ordercontroller {
@resource
private orderproducer orderproducer;
@getmapping("/order/create/{orderid}/{userid}")
public string createorder(@pathvariable long orderid, @pathvariable string userid) {
orderproducer.sendordercreatemsg(orderid, userid);
return "订单创建消息已发送";
}
}三、核心实战二:springboot 集成 kafka(完整代码教学)
kafka 基于发布 / 订阅模型,核心是「主题(topic)+ 分区(partition)+ 消费者组(consumer group)」,高吞吐特性适合大数据场景。
1. 环境准备
(1)安装 kafka
docker 启动单节点 kafka(简化版,生产环境需集群):
# 启动zookeeper(kafka依赖zookeeper管理元数据) docker run -d --name zookeeper -p 2181:2181 confluentinc/cp-zookeeper:latest # 启动kafka docker run -d --name kafka -p 9092:9092 \ -e kafka_zookeeper_connect=localhost:2181 \ -e kafka_advertised_listeners=plaintext://localhost:9092 \ confluentinc/cp-kafka:latest
(2)引入依赖(maven)
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-boot-starter-kafka</artifactid>
</dependency>(3)配置文件(application.yml)
spring:
kafka:
bootstrap-servers: localhost:9092 # kafka服务地址
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.stringserializer
value-serializer: org.apache.kafka.common.serialization.stringserializer
acks: 1 # 消息确认机制(1:领导者分区确认;all:所有副本确认)
retries: 3 # 消息发送失败重试次数
# 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
group-id: order-group # 消费者组id(同一组内消费者负载均衡消费)
auto-offset-reset: earliest # 无偏移量时,从最早消息开始消费
enable-auto-commit: false # 关闭自动提交偏移量,手动提交
# 监听配置
listener:
ack-mode: manual_immediate # 手动提交偏移量2. 核心代码实现
(1)生产者:发送消息(异步发送,支持回调)
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.support.sendresult;
import org.springframework.stereotype.component;
import org.springframework.util.concurrent.listenablefuture;
import org.springframework.util.concurrent.listenablefuturecallback;
import javax.annotation.resource;
@component
public class kafkaorderproducer {
// 主题名称(kafka无需提前声明,发送消息时自动创建)
public static final string order_topic = "order_topic";
@resource
private kafkatemplate<string, string> kafkatemplate;
// 异步发送订单消息
public void sendordermsg(long orderid, string userid) {
string msg = string.format("用户%s创建订单:%s", userid, orderid);
// 发送消息(主题、消息键、消息内容)
listenablefuture<sendresult<string, string>> future = kafkatemplate.send(order_topic, orderid.tostring(), msg);
// 回调函数:处理发送结果
future.addcallback(new listenablefuturecallback<sendresult<string, string>>() {
@override
public void onsuccess(sendresult<string, string> result) {
system.out.println("消息发送成功,分区:" + result.getrecordmetadata().partition());
}
@override
public void onfailure(throwable ex) {
system.out.println("消息发送失败,原因:" + ex.getmessage());
// 失败重试逻辑
}
});
}
}(2)消费者:接收消息(手动提交偏移量)
import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.kafka.support.acknowledgment;
import org.springframework.stereotype.component;
@component
public class kafkaorderconsumer {
// 监听订单主题,手动提交偏移量
@kafkalistener(topics = kafkaorderproducer.order_topic, groupid = "order-group")
public void consumeordermsg(consumerrecord<string, string> record, acknowledgment acknowledgment) {
try {
// 1. 获取消息内容
string key = record.key();
string msg = record.value();
system.out.println("接收kafka消息,订单id:" + key + ",内容:" + msg);
// 2. 处理业务逻辑
// 3. 手动提交偏移量(确认消息消费成功)
acknowledgment.acknowledge();
} catch (exception e) {
system.out.println("消息消费失败:" + e.getmessage());
// 消费失败可记录日志,人工介入处理
}
}
}3. 测试代码(复用上面的 ordercontroller,注入 kafkaorderproducer 即可)
@resource
private kafkaorderproducer kafkaorderproducer;
@getmapping("/kafka/order/create/{orderid}/{userid}")
public string createorderbykafka(@pathvariable long orderid, @pathvariable string userid) {
kafkaorderproducer.sendordermsg(orderid, userid);
return "kafka订单消息已发送";
}四、消息可靠性保障(企业级实战必备)
1. 消息不丢失
- 生产者:开启确认机制(rabbitmq 确认 + 回退,kafka acks=all + 重试);
- 中间件:消息持久化(rabbitmq 交换机 / 队列持久化,kafka 主题分区持久化);
- 消费者:手动确认消息,避免自动确认导致消费失败后消息丢失。
2. 消息不重复消费
- 核心方案:基于业务唯一标识(如订单 id)做幂等性处理,消费前先检查消息是否已处理;
- 示例代码(消费端幂等处理):
// 基于redis实现幂等(消费前检查消息是否已处理)
public void consumewithidempotent(string msgid, string msg) {
string key = "msg:processed:" + msgid;
boolean isprocessed = stringredistemplate.opsforvalue().setifabsent(key, "1", 24, timeunit.hours);
if (boolean.true.equals(isprocessed)) {
// 未处理,执行业务逻辑
system.out.println("处理消息:" + msg);
} else {
// 已处理,直接跳过
system.out.println("消息已重复消费,跳过:" + msg);
}
}五、避坑指南
坑点 1:rabbitmq 消息堆积,消费者消费缓慢
表现:队列消息堆积过多,下游服务处理不及时;✅ 解决方案:增加消费者线程数(调整 concurrency/max-concurrency),拆分队列,避免单个队列承载过多消息。
坑点 2:kafka 消费者组配置错误,导致重复消费
表现:同一消费者组内多个消费者消费同一消息;✅ 解决方案:确保同一主题的消费者在同一消费者组,且分区数≥消费者数(负载均衡的前提)。
坑点 3:消息确认机制未配置,导致消息丢失
表现:生产者发送消息后,中间件宕机,消息丢失;✅ 解决方案:生产环境必须开启生产者确认机制和消息持久化,缺一不可。
六、终极总结:消息队列实战的核心是「异步解耦 + 可靠传递」
消息队列的本质是「中介者」,通过它实现服务间的间接通信,核心价值不在于「发送消息」,而在于「安全、高效地传递消息」,同时解耦系统、缓冲流量。
核心原则总结:
- 选型按需:复杂路由选 rabbitmq,高吞吐大数据选 kafka,不盲目跟风;
- 可靠性优先:生产环境必须保障消息不丢失、不重复消费,幂等性是底线;
- 性能可控:合理配置消费者线程、队列 / 分区数量,避免消息堆积或资源浪费。
记住:消息队列不是「万能的」,过度依赖会增加系统复杂度,仅在需要异步、解耦、削峰的场景使用,才能最大化其价值。
到此这篇关于springboot 集成消息队列实战(rabbitmq/kafka):异步通信与解耦,落地高可靠消息传递的文章就介绍到这了,更多相关springboot 集成消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论