一、为什么选择 redis 做消息队列?
1.1 redis 消息队列的核心优势
轻量级部署:无需单独部署 rabbitmq、kafka 等消息队列服务,可以直接复用现有 redis 集群。例如一个电商系统可能已经使用 redis 做缓存,现在只需增加消息队列功能,无需额外维护其他中间件,显著降低运维成本;
高性能:基于内存操作,单节点 qps 可达 10 万级,满足高吞吐场景。实测表明,在标准服务器配置下,redis 处理简单消息的延迟可低至 0.1ms,远优于传统磁盘存储的消息队列;
api 简洁:依托 redis 原生命令即可实现完整队列功能:
- lpush/rpush 用于生产者推送消息
- blpop/brpop 实现消费者阻塞式拉取
- publish/subscribe 支持发布订阅模式
- xadd/xread 提供 stream 类型支持 开发人员无需学习复杂的新 api,显著降低开发成本;
支持多语言:所有主流语言的 redis 客户端(java/jedis、python/redis-py、go/redigo 等)均原生支持消息队列相关命令。例如 java 开发者可以直接使用 jedis 的 lpush() 方法发送消息,无需额外依赖;
可扩展性:通过 redis cluster 可以轻松实现消息队列的横向扩展。例如可以将不同业务的消息分配到不同分片,同时利用 redis sentinel 实现高可用,确保消息服务不间断。
1.2 适用场景与不适用场景
适用场景
- 轻量级异步通信:如电商系统中的订单状态变更通知、app 的日志上报等。例如用户下单后,系统可以通过 redis 队列异步通知库存系统扣减库存,而不影响主流程响应速度;
- 高吞吐但允许少量重复的场景:如用户行为数据同步、监控数据采集等。例如一个短视频平台需要将用户的观看记录同步到推荐系统,即使偶尔出现重复消息也不影响业务逻辑;
- 中小型系统的解耦需求:当系统规模尚未达到需要引入 kafka 等重量级组件时。例如一个初创公司的支付系统与通知系统之间使用 redis 队列解耦,避免系统间直接依赖。
不适用场景
- 金融级事务消息:如银行转账、证券交易等需要强一致性和零丢失的场景。redis 的持久化机制(rdb/aof)无法保证 100% 不丢失消息,且缺乏事务消息的回查机制;
- 复杂路由需求:如需要死信队列、优先级队列、延迟队列等高级特性时。虽然 redis 可以通过 sorted set 实现简单延迟队列,但相比 rabbitmq 的专业实现功能有限;
- 海量消息存储:如需要保存数月历史消息的聊天系统。redis 作为内存数据库,存储容量受服务器内存限制,且长期存储成本过高。例如一个日均百万消息的客服系统,使用 redis 存储一周消息就可能需要上百 gb 内存。
二、redis 实现消息队列的 3 种核心方案
方案一、基于 redis list 的简单消息队列实现
1. 方案概述
redis 的 list 数据结构是一个双向链表,具有以下特性使其非常适合实现消息队列:
- 支持从两端(o(1)时间复杂度)插入和删除元素
- 天然支持"生产者-消费者"模型
- 提供阻塞式获取消息的命令
- 内存存储,性能极高(每秒可处理数万次操作)
1.1 核心命令详解
| 角色 | 核心命令 | 作用说明 | 时间复杂度 |
|---|---|---|---|
| 生产者 | lpush key value1 value2 | 从 list 左侧插入消息(头部插入),支持批量插入,返回插入后 list 的长度 | o(1) |
| 生产者 | rpush key value1 value2 | 从 list 右侧插入消息(尾部插入),支持批量插入 | o(1) |
| 消费者 | blpop key timeout | 从 list 左侧阻塞获取消息(头部取出),若 list 为空则等待timeout秒 | o(1) |
| 消费者 | brpop key timeout | 从 list 右侧阻塞获取消息(尾部取出),若 list 为空则等待timeout秒 | o(1) |
| 监控 | llen key | 获取当前队列的消息数量 | o(1) |
| 监控 | lrange key start end | 查看队列中从start到end的消息(如lrange queue 0 9查看前10条) | o(s+n) |
2. 代码实战(java + jedis)
2.1 环境准备
首先引入 jedis 依赖(maven):
<dependency>
<groupid>redis.clients</groupid>
<artifactid>jedis</artifactid>
<version>4.4.3</version> <!-- 建议使用最新稳定版 -->
</dependency>2.2 生产者实现
import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispool;
import redis.clients.jedis.jedispoolconfig;
public class listmqproducer {
// 队列key命名规范:业务域:组件类型:数据结构:具体业务
private static final string queue_key = "redis:mq:list:order";
// 使用连接池提高性能
private static final jedispool jedispool = new jedispool(
new jedispoolconfig(),
"localhost",
6379,
2000, // 连接超时时间
null // 密码
);
public static void main(string[] args) throws interruptedexception {
try (jedis jedis = jedispool.getresource()) {
// 模拟发送10条订单消息
for (int i = 1; i <= 10; i++) {
// 消息内容格式:业务标识_序号_时间戳
string message = string.format("order_%d_%d", i, system.currenttimemillis());
// lpush命令将消息放入队列头部
long queuelength = jedis.lpush(queue_key, message);
system.out.printf("生产者发送消息:%s,当前队列长度:%d%n", message, queuelength);
// 模拟业务处理间隔
thread.sleep(500);
}
} catch (exception e) {
system.err.println("生产者异常:" + e.getmessage());
} finally {
jedispool.close();
}
}
}2.3 消费者实现
import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispool;
import redis.clients.jedis.jedispoolconfig;
import java.util.list;
public class listmqconsumer {
private static final string queue_key = "redis:mq:list:order";
private static final jedispool jedispool = new jedispool(
new jedispoolconfig(),
"localhost",
6379
);
public static void main(string[] args) {
system.out.println("消费者启动,等待接收消息...");
while (true) {
try (jedis jedis = jedispool.getresource()) {
// brpop命令参数:
// 1. 超时时间3秒(避免空轮询消耗cpu)
// 2. 可以监听多个队列
list<string> messages = jedis.brpop(3, queue_key);
if (messages != null) {
// brpop返回结果格式:
// 第一个元素是队列key
// 第二个元素是消息内容
string message = messages.get(1);
system.out.println("消费者接收消息:" + message);
// 业务处理逻辑示例
processmessage(message);
} else {
system.out.println("队列暂无消息,继续等待...");
}
} catch (exception e) {
system.err.println("消费者处理消息异常:" + e.getmessage());
// 异常处理策略:
// 1. 记录错误日志
// 2. 重试机制
// 3. 告警通知
try {
thread.sleep(5000); // 出错后暂停5秒
} catch (interruptedexception ie) {
thread.currentthread().interrupt();
}
}
}
}
private static void processmessage(string message) throws interruptedexception {
// 模拟业务处理
system.out.println("处理消息:" + message);
// 解析消息内容
string[] parts = message.split("_");
string orderid = parts[1];
// 模拟业务处理耗时
thread.sleep(1000);
system.out.println("订单" + orderid + "处理完成");
}
}3. 方案优化与问题解决
3.1 标准方案的局限性
- 消息丢失风险:
- 消费者获取消息后,如果处理过程中崩溃,消息将永久丢失
- 无消息确认机制
- 功能限制:
- 不支持广播模式(多个消费者同时消费同一条消息)
- 无优先级队列
- 无延迟队列功能
- 监控缺失:
- 缺乏消息处理状态跟踪
- 无死信队列处理机制
3.2 消息可靠性优化方案
3.2.1 消息确认机制实现
private static final string confirm_queue_key = "redis:mq:list:order:confirm";
private static final string dead_queue_key = "redis:mq:list:order:dead";
private static final int max_retry = 3;
// 优化后的消费者处理逻辑
list<string> messages = jedis.brpop(3, queue_key);
if (messages != null) {
string message = messages.get(1);
// 1. 将消息移到待确认队列(使用rpush保持顺序)
jedis.rpush(confirm_queue_key, message);
try {
// 2. 处理业务逻辑
processmessage(message);
// 3. 处理成功,从待确认队列删除
jedis.lrem(confirm_queue_key, 1, message);
} catch (exception e) {
system.err.println("处理消息失败:" + message);
// 4. 检查重试次数
long retrycount = jedis.incr("retry:" + message);
if (retrycount <= max_retry) {
// 放回主队列重试
jedis.lpush(queue_key, message);
} else {
// 超过重试次数,放入死信队列
jedis.rpush(dead_queue_key, message);
}
// 无论重试还是加入死信队列,都要从待确认队列删除
jedis.lrem(confirm_queue_key, 1, message);
}
}3.2.2 定时补偿任务
// 定时检查待确认队列(每分钟执行)
public void checkconfirmqueue() {
try (jedis jedis = jedispool.getresource()) {
// 获取待确认队列所有消息
list<string> pendingmessages = jedis.lrange(confirm_queue_key, 0, -1);
for (string message : pendingmessages) {
// 检查消息滞留时间
long createtime = long.parselong(message.split("_")[2]);
long currenttime = system.currenttimemillis();
long delay = currenttime - createtime;
// 超过30秒未处理则重试
if (delay > 30000) {
jedis.lrem(confirm_queue_key, 1, message);
jedis.lpush(queue_key, message);
system.out.println("消息超时重试:" + message);
}
}
}
}3.3 性能优化策略
- 横向扩展:
- 增加消费者实例数量,利用 list 的 brpop 命令天然支持多消费者竞争
- 可采用消费者组模式,每个组独立消费
- 批量处理:
// 生产者批量发送 jedis.lpush(queue_key, "msg1", "msg2", "msg3"); // 消费者批量获取(非阻塞) list<string> batch = jedis.rpop(queue_key, 10); // 获取最多10条
管道(pipeline)优化:
try (pipeline p = jedis.pipelined()) {
p.lpush(queue_key, "msg1");
p.lpush(queue_key, "msg2");
p.sync(); // 批量提交
}监控指标:
队列长度监控:llen key
消费者积压:比较生产和消费速率
异常告警:死信队列增长监控
4. 适用场景分析
4.1 推荐使用场景
- 异步任务处理:
- 订单创建后的后续处理(如发送通知、更新库存)
- 日志收集和分析
- 削峰填谷:
- 秒杀系统请求缓冲
- 突发流量处理
- 系统解耦:
- 微服务间通信
- 事件驱动架构
4.2 不适用场景
- 严格顺序要求:list虽然有序,但在多消费者场景下不能保证全局顺序
- 广播模式需求:需要所有消费者收到相同消息
- 持久化要求高:redis是内存数据库,虽然支持持久化但不保证100%可靠
- 复杂路由需求:需要根据消息内容路由到不同队列
5. 生产环境建议
- redis配置:
- 启用aof持久化:
appendonly yes - 合理设置内存淘汰策略:
maxmemory-policy volatile-lru - 设置合理超时:
timeout 300(秒)
- 启用aof持久化:
- 高可用:
- 使用redis sentinel或cluster
- 客户端实现故障转移
- 监控指标:
# 监控队列长度 redis-cli llen redis:mq:list:order # 监控redis内存 redis-cli info memory
- 命名规范:
- 业务域:组件类型:数据结构:具体业务
示例:payment:mq:list:refund
方案二、基于 pub/sub 的广播式消息队列方案详解
redis pub/sub 模型介绍
redis 的 pub/sub(发布 - 订阅)模型是一种高效的"一对多"消息通信机制,它允许生产者将消息发布到特定的频道(channel),而所有订阅该频道的消费者都能即时接收到这些消息。这种模式特别适合需要实时广播的场景,如新闻推送、实时聊天系统等。
核心命令及功能详解
| 角色 | 核心命令 | 作用说明 |
|---|---|---|
| 生产者 | publish channel message | 向指定频道发布消息,返回接收消息的消费者数量 |
| 消费者 | subscribe channel1 channel2 | 订阅一个或多个频道,阻塞等待消息(订阅状态下只能接收消息,无法执行其他命令) |
| 消费者 | psubscribe pattern | 使用模式匹配订阅频道(如psubscribe redis:mq:pubsub:*订阅所有匹配前缀的频道) |
2.1 代码实战(java + jedis)
生产者实现(发布消息)
import redis.clients.jedis.jedis;
public class pubsubproducer {
// 定义频道名称,采用命名空间方式避免冲突
private static final string channel_key = "redis:mq:pubsub:news";
// 创建redis连接实例
private static final jedis jedis = new jedis("localhost", 6379);
public static void main(string[] args) throws interruptedexception {
// 模拟发布3条新闻消息,实际应用中可接入实时数据源
string[] news = {
"redis 7.2版本发布,新增stream增强功能",
"基于redis的消息队列在电商场景的实践",
"redis cluster集群部署最佳实践"
};
// 循环发布消息
for (string msg : news) {
// 发布消息并获取接收者数量
long receivercount = jedis.publish(channel_key, msg);
system.out.println(string.format(
"【生产者】发布消息:%s,当前订阅者数量:%d",
msg, receivercount));
// 模拟消息间隔
thread.sleep(1000);
}
// 关闭连接
jedis.close();
}
}
消费者实现(订阅消息)
import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispubsub;
public class pubsubconsumer {
private static final string channel_key = "redis:mq:pubsub:news";
private static final jedis jedis = new jedis("localhost", 6379);
public static void main(string[] args) {
// 创建自定义的消息处理器
jedispubsub pubsub = new jedispubsub() {
// 接收到消息时的回调方法
@override
public void onmessage(string channel, string message) {
system.out.println(string.format(
"【消费者1】接收到新消息(频道:%s):%s",
channel, message));
// 此处可添加业务处理逻辑
// 例如:解析消息内容、写入数据库、触发其他操作等
}
// 成功订阅频道时的回调
@override
public void onsubscribe(string channel, int subscribedchannels) {
system.out.println(string.format(
"【消费者1】成功订阅频道:%s,当前订阅总数:%d",
channel, subscribedchannels));
}
// 可添加其他回调方法如onunsubscribe、onpsubscribe等
};
system.out.println("【消费者1】启动并开始监听...");
// 开始订阅(该方法会阻塞当前线程)
jedis.subscribe(pubsub, channel_key);
// 注意:在实际应用中,通常会将订阅逻辑放在独立线程中
// 以避免阻塞主线程
}
}
2.2 方案深度分析与应用场景
优点详解
- 实时广播能力:天然支持一对多的消息分发,一条消息可以同时被多个消费者接收
- 实现简单:无需额外中间件,使用redis原生命令即可实现
- 低延迟:消息发布后立即推送给所有订阅者,延迟通常在毫秒级
- 动态扩展:消费者可以随时加入或退出订阅,系统自动处理连接管理
缺点与限制
- 消息持久化问题:
- redis重启后所有未消费的消息都会丢失
- 消费者离线期间的消息无法恢复
- 可靠性限制:
- 缺乏消息确认机制,无法保证消息必达
- 网络中断可能导致消息丢失
- 流量控制缺失:
- 没有背压机制,生产者可能压垮消费者
- 无法限制消息堆积(因为根本不堆积)
适用场景分析
- 实时通知系统:
- 网站全局公告推送
- 在线聊天室消息分发
- 游戏服务器中的全服通知
- 日志收集与监控:
- 多个监控系统同时接收相同的日志流
- 实时统计系统指标
- 分布式系统的调试信息广播
- 临时性事件广播:
- 系统配置变更通知
- 缓存失效广播
- 服务注册中心的服务变更通知
- 不要求可靠性的场景:
- 实时数据统计(允许少量数据丢失)
- 非关键业务的实时通知
- 辅助性的系统状态更新
不适用场景
- 金融交易等要求消息100%可靠的系统
- 需要保证消息顺序的场景
- 需要消息重放或回溯的业务
- 消费者处理能力远低于生产者速率的场景
使用建议
- 频道命名规范:建议采用
业务域:子系统:消息类型的层次结构,如trade:order:create - 消费者实现:
- 为每个订阅者创建独立连接
- 将订阅逻辑放在独立线程中
- 实现重连机制处理网络中断
- 监控指标:
- 跟踪每个频道的订阅者数量
- 监控消息发布速率
- 记录消息丢失情况(需应用层实现)
- 性能优化:
- 对于高频消息,考虑消息聚合
- 大消息可考虑只发送引用id
- 合理设置redis的tcp-keepalive参数
方案 3:基于 stream 的可靠消息队列(redis 5.0+)
redis 5.0 推出的 stream 数据结构是专门为消息队列场景设计的,它完美解决了传统 list 和 pub/sub 模式的诸多缺陷。stream 支持消息持久化存储、消息确认机制、消费者组管理、死信队列等企业级特性,是目前 redis 实现可靠消息队列的最佳方案。在实际应用中,如电商订单处理、支付流水记录、日志收集等场景都能发挥重要作用。
3.1 stream 核心概念
stream:消息队列的主体,每个 stream 有唯一的 key(如"order:stream")。消息以"条目(entry)"形式存储,每个条目包含:
- 唯一 id:自动生成的格式为"时间戳-序列号"(如1680000000000-0)
- 多个字段值对:如{"order_id":"1001","amount":"199.00"}
消费者组(consumer group):通过将多个消费者归为一组,实现:
- 组内消费者共享消息,避免重复消费
- 自动负载均衡,消息均匀分配给各消费者
- 支持水平扩展,可随时增加消费者
消息确认(ack)机制:
- 消费者获取消息后,消息进入"pending"状态
- 处理完成后需显式发送ack命令
- 未确认的消息会在消费者断开后重新分配
pending 列表:
- 存储所有已获取但未确认的消息
- 记录每个消息的消费者名称、获取时间、重试次数
- 支持通过xpending命令查看待处理消息
死信队列:
- 当消息超过最大重试次数(如3次)仍未处理成功
- 可自动/手动转移到专门设计的死信stream
- 便于后续人工干预或特殊处理
3.2 核心命令详解
基本操作命令
| 操作类型 | 命令格式 | 说明 |
|---|---|---|
| 添加消息 | xadd key * field1 value1 [field2 value2...] | *表示自动生成id,可指定id保证顺序 |
| 创建消费者组 | xgroup create key groupname id [mkstream] | mkstream选项在stream不存在时自动创建 |
| 消费消息 | xreadgroup group group consumer [count n] [block ms] streams key [id] | id通常为>表示新消息,0表示pending消息 |
| 消息确认 | xack key groupname id [id...] | 支持批量确认多个消息 |
| 查看pending消息 | xpending key groupname [start end count] [consumer] | 可查看指定消费者的未确认消息 |
| 消息所有权转移 | xclaim key groupname consumer min-idle-time id [id...] | 将空闲超时的消息转给其他消费者处理 |
高级管理命令
- 消息回溯:
xread streams key 0-0从最早消息开始读取 - 范围查询:
xrange key start end [count n]按id范围查询 - 监控命令:
xinfo groups key查看消费者组信息
3.3 代码实战(java + jedis)
1. 环境准备
// maven依赖
<dependency>
<groupid>redis.clients</groupid>
<artifactid>jedis</artifactid>
<version>4.3.1</version>
</dependency>
// 连接配置
jedispoolconfig config = new jedispoolconfig();
config.setmaxtotal(10);
try (jedispool pool = new jedispool(config, "localhost", 6379)) {
jedis jedis = pool.getresource();
// 业务代码...
}2. 生产消费完整流程
生产者增强版:
public class enhancedproducer {
private static final string[] order_status = {"pending", "paid", "shipped", "completed"};
public void sendorderevent(order order) {
try (jedis jedis = pool.getresource()) {
map<string, string> fields = new hashmap<>();
fields.put("order_id", order.getid());
fields.put("user_id", order.getuserid());
fields.put("amount", order.getamount().tostring());
fields.put("status", order_status[0]);
fields.put("create_time", instant.now().tostring());
// 使用事务保证原子性
transaction t = jedis.multi();
t.xadd(stream_key, streamentryid.new_entry, fields);
t.sadd("order:ids", order.getid()); // 记录订单id集合
t.exec();
// 添加监控埋点
metrics.counter("mq.produce.count").increment();
}
}
}消费者增强版:
public class reliableconsumer implements runnable {
private static final int max_retry = 3;
@override
public void run() {
while (!thread.currentthread().isinterrupted()) {
try {
map<string, list<streamentry>> messages = jedis.xreadgroup(
group_name, consumername,
new streamparams().count(1).block(2000),
new streamoffset(stream_key, ">")
);
if (messages != null) {
messages.foreach((stream, entries) -> {
entries.foreach(entry -> {
processwithretry(entry);
});
});
}
} catch (exception e) {
logger.error("消费异常", e);
sleep(1000);
}
}
}
private void processwithretry(streamentry entry) {
int retrycount = getretrycount(entry.getid());
if (retrycount >= max_retry) {
movetodeadletter(entry);
return;
}
try {
order order = parseorder(entry.getfields());
orderservice.process(order);
jedis.xack(stream_key, group_name, entry.getid());
} catch (exception e) {
logger.warn("处理失败准备重试", e);
sleep(1000 * (retrycount + 1));
}
}
}3. 死信队列管理
public class deadlettermonitor {
public void checkpendingmessages() {
// 获取所有超时未确认的消息
list<streamentry> pending = getpendingmessages(timeout_ms);
pending.foreach(entry -> {
// 检查重试次数
if (getretrycount(entry.getid()) > max_retry) {
// 转移到死信队列
jedis.xadd(dead_stream_key, streamentryid.new_entry, entry.getfields());
jedis.xack(stream_key, group_name, entry.getid());
logger.warn("消息转入死信队列: {}", entry.getid());
// 发送告警通知
alertservice.notifyadmin(entry);
}
});
}
public void reprocessdeadletters() {
// 从死信队列重新处理
list<streamentry> deadmessages = jedis.xrange(dead_stream_key, "-", "+");
deadmessages.foreach(entry -> {
try {
manualprocess(entry.getfields());
jedis.xdel(dead_stream_key, entry.getid());
} catch (exception e) {
logger.error("死信处理失败", e);
}
});
}
}3.4 最佳实践建议
- 消费者设计原则:
- 每个消费者设置唯一标识
- 实现幂等性处理逻辑
- 添加合理的阻塞超时时间(通常1-5秒)
- 性能优化:
// 批量消费提高吞吐量
jedis.xreadgroup(group_name, consumername,
new streamparams().count(100).block(1000),
new streamoffset(stream_key, ">"));
// 批量确认减少网络开销
jedis.xack(stream_key, group_name, id1, id2, id3);
- 监控指标:
- 待处理消息数(xpending)
- 消费者延迟(当前时间 - 消息创建时间)
- 死信队列大小
- 消费成功率
- 异常处理:
// 消费者崩溃后的恢复处理
public void recoverconsumer(string failedconsumer) {
list<pendingentry> pendings = jedis.xpending(
stream_key, group_name, "-", "+", 100, failedconsumer);
pendings.foreach(pending -> {
jedis.xclaim(stream_key, group_name, currentconsumer,
timeout_ms, pending.getidasstring());
});
}通过以上实现,基于redis stream的消息队列可以达到:
- 99.9%的消息可靠性
- 每秒万级的吞吐量
- 秒级的端到端延迟
- 完善的故障恢复机制
三、三种方案的选型对比与最佳实践
3.1 方案选型对比表:
| 对比维度 | list 方案 | pub/sub 方案 | stream 方案(推荐) |
|---|---|---|---|
| 消息持久化 | 支持(需手动处理) | 不支持 | 原生支持 |
| 消息确认 | 需自定义(如rpoplpush) | 不支持 | 原生支持(ack机制) |
| 广播能力 | 不支持 | 原生支持(全量广播) | 支持(通过多消费者组实现) |
| 消费者负载均衡 | 支持(竞争消费模式) | 不支持(全量推送) | 支持(消费者组内自动均衡) |
| 死信队列 | 需自定义(备份list) | 不支持 | 支持(通过xclaim命令) |
| 实现复杂度 | 低(基础命令即可) | 低(订阅/发布模式) | 中(需理解消费者组概念) |
| 内存占用 | 线性增长 | 瞬时内存 | 可控制(支持消息修剪) |
| 历史消息回溯 | 有限支持(需保存完整list) | 不支持 | 完整支持(消息id时间序列) |
| 适用场景 | 简单异步通信 | 实时广播通知 | 可靠消息、企业级场景 |
3.2 最佳实践建议
选型决策树:
- 首要判断消息可靠性需求:
- 必须保证不丢失 → 直接选择stream
- 可接受偶尔丢失 → 进入下一判断
- 次要判断消息分发模式:
- 需要广播 → 选择pub/sub
- 点对点消费 → 选择list或stream
- 最后评估开发成本:
- 快速实现 → 选择list
- 长期维护 → 选择stream
- 首要判断消息可靠性需求:
stream方案实施细节:
- 消费者组创建示例:
xgroup create mystream mygroup $ mkstream
- 典型消费代码逻辑:
- 使用xreadgroup阻塞读取
- 业务处理成功后发送xack
- 处理失败时使用xclaim转移消息
- 设置合理的pel(pending entries list)超时
- 消费者组创建示例:
list方案优化建议:
- 可靠消费模式实现:
rpoplpush source_list backup_list # 原子操作 # 处理成功后再lrem备份列表
- 性能提升技巧:
- 批量生产:使用pipeline打包多个lpush
- 批量消费:lua脚本实现多消息批量获取
- 可靠消费模式实现:
集群环境特别注意事项:
- 跨slot访问问题:
- 所有相关key必须使用相同hash tag(如{msg})
- 或者采用客户端分片路由
- 监控重点指标:
- stream方案的pel积压长度
- list方案的内存增长曲线
- pub/sub的客户端连接数
- 跨slot访问问题:
运维管理建议:
- 容量规划:
- 按业务峰值qps的1.5倍预留资源
- stream建议单分片不超过10mb/s写入
- 监控告警:
- 设置消息积压阈值(如stream的pel>1000)
- 监控消费者延迟(xinfo groups)
- 灾备方案:
- 定期备份stream的rdb快照
- 对于关键业务实现双写机制
- 容量规划:
四、实际应用案例:电商订单异步处理
4.1 业务流程详解
电商平台的订单处理采用异步消息队列模式,通过redis stream实现可靠的消息传递和消费。整个流程包含以下关键环节:
订单创建阶段
- 用户下单后,订单服务作为生产者将订单数据持久化到mysql数据库
- 同时将订单关键信息(订单id、用户id、商品id、数量等)封装为消息,发送到名为"order_create"的stream中
- 消息格式示例:
{ "order_id": "ord20231125001", "user_id": "u10086", "product_id": "p8808", "quantity": "2" }
并行消费阶段
通知服务(消费者1):专门处理用户通知
- 消费消息后调用短信平台api或极光推送服务
- 通知内容示例:"尊敬的会员,您的订单ord20231125001已创建成功,我们将尽快为您处理"
- 支持重试机制:若首次发送失败,会按照指数退避策略重试3次
库存服务(消费者2):负责库存扣减
- 采用乐观锁机制更新库存:
update inventory set stock = stock - ? where product_id = ? and stock >= ? - 实现分布式事务:若扣减失败会记录操作日志,便于后续人工核对
- 采用乐观锁机制更新库存:
异常处理机制
- 当库存扣减失败时,消息会进入pending列表并设置5分钟超时
- 超时后自动转移到死信队列"dlq:order_create"
- 运维人员通过管理后台查看死信队列,可:
- 人工补扣库存
- 触发订单取消流程
- 联系用户协商处理
4.2 核心代码实现(生产级优化版)
订单服务(生产者)增强实现
// 订单服务(生产者)发送消息 - 增强版
public void createorder(order order) {
// 1. 数据库事务确保数据一致性
transactionstatus status = transactionmanager.gettransaction(new defaulttransactiondefinition());
try {
// 1.1 保存主订单
ordermapper.insert(order);
// 1.2 保存订单明细
order.getitems().foreach(item -> {
item.setorderid(order.getid());
orderitemmapper.insert(item);
});
// 2. 构建消息体(添加时间戳和业务标识)
map<string, string> message = new hashmap<>();
message.put("order_id", order.getid());
message.put("user_id", order.getuserid());
message.put("product_id", order.getproductid());
message.put("quantity", order.getquantity() + "");
message.put("create_time", system.currenttimemillis() + "");
message.put("biz_type", "normal_order");
// 3. 发送消息(添加重试机制)
int retrytimes = 0;
while (retrytimes < 3) {
try {
jedis.xadd("redis:mq:stream:order_create", null, message);
break;
} catch (exception e) {
retrytimes++;
if (retrytimes == 3) {
throw new runtimeexception("消息发送失败", e);
}
thread.sleep(1000 * retrytimes);
}
}
transactionmanager.commit(status);
} catch (exception e) {
transactionmanager.rollback(status);
throw new businessexception("订单创建失败", e);
}
}通知服务(消费者)完整实现
// 通知服务(消费者)完整实现
public void handlenotification() {
// 初始化消费者组(幂等操作)
initconsumergroup("redis:mq:stream:order_create", "order_group");
while (!thread.currentthread().isinterrupted()) {
try {
map<string, list<streamentry>> messages = jedis.xreadgroup(
"order_group",
"notify_consumer_" + instanceid, // 使用实例id区分消费者
1,
5000,
false,
map.of("redis:mq:stream:order_create", streamentryid.unreceived_entry)
);
if (messages != null && !messages.isempty()) {
for (streamentry entry : messages.get("redis:mq:stream:order_create")) {
map<string, string> content = entry.getfields();
string userid = content.get("user_id");
string orderid = content.get("order_id");
// 1. 发送短信(带熔断机制)
boolean smssent = circuitbreaker.execute(() ->
smsservice.send(userid, "订单通知", "您的订单" + orderid + "已创建成功"));
// 2. 发送app推送
boolean pushsent = pushservice.send(userid, "订单创建通知",
map.of("orderid", orderid, "type", "order_created"));
if (smssent || pushsent) {
// 至少一个通知发送成功才确认消息
jedis.xack("redis:mq:stream:order_create", "order_group", entry.getid());
monitorservice.recordsuccess("order_notify");
} else {
monitorservice.recordfailure("order_notify");
}
}
}
} catch (exception e) {
log.error("通知处理异常", e);
monitorservice.recorderror("order_notify", e);
thread.sleep(5000); // 异常休眠避免循环异常
}
}
}
private void initconsumergroup(string streamkey, string groupname) {
try {
jedis.xgroupcreate(streamkey, groupname, streamentryid.last_entry, true);
} catch (redisbusyexception e) {
log.info("消费者组已存在: {}", groupname);
}
}库存服务(消费者)完整实现
// 库存服务(消费者)完整实现
public void handleinventory() {
// 初始化消费者组
initconsumergroup("redis:mq:stream:order_create", "order_group");
while (!thread.currentthread().isinterrupted()) {
try {
map<string, list<streamentry>> messages = jedis.xreadgroup(
"order_group",
"inventory_consumer_" + instanceid,
1,
5000,
false,
map.of("redis:mq:stream:order_create", streamentryid.unreceived_entry)
);
if (messages != null && !messages.isempty()) {
for (streamentry entry : messages.get("redis:mq:stream:order_create")) {
map<string, string> content = entry.getfields();
string productid = content.get("product_id");
int quantity = integer.parseint(content.get("quantity"));
string orderid = content.get("order_id");
// 1. 扣减库存(带事务)
boolean success = inventoryservice.deductwithlog(
productid,
quantity,
orderid,
"order_deduction"
);
if (success) {
// 2. 确认消息
jedis.xack("redis:mq:stream:order_create", "order_group", entry.getid());
monitorservice.recordsuccess("inventory_deduct");
} else {
// 3. 记录失败日志
log.warn("库存扣减失败 orderid={}, productid={}", orderid, productid);
monitorservice.recordfailure("inventory_deduct");
// 不确认消息,让其进入pending状态
}
}
}
} catch (exception e) {
log.error("库存处理异常", e);
monitorservice.recorderror("inventory_deduct", e);
thread.sleep(5000);
}
}
}
// 库存扣减服务方法
@transactional
public boolean deductwithlog(string productid, int quantity, string bizid, string biztype) {
// 1. 扣减库存
int affected = inventorymapper.deductwithversion(
productid,
quantity,
getcurrentversion(productid)
);
if (affected == 0) {
return false;
}
// 2. 记录操作流水
inventorylog log = new inventorylog();
log.setlogid(uuid.randomuuid().tostring());
log.setproductid(productid);
log.setchangedamount(-quantity);
log.setbizid(bizid);
log.setbiztype(biztype);
log.setremarks("订单扣减");
inventorylogmapper.insert(log);
return true;
}4.3 监控与运维设计
监控指标
- 消息堆积量:
xlen redis:mq:stream:order_create - pending列表数量:
xpending redis:mq:stream:order_create order_group - 消费者延迟:通过消息时间戳与当前时间差值计算
- 消息堆积量:
运维命令示例
# 查看消费者组信息 xinfo groups redis:mq:stream:order_create # 处理死信消息 xrange dlq:order_create - + count 10 xack dlq:order_create manual_group <entry_id>
自动恢复方案
- 定时任务每小时检查pending列表
- 对于超时1小时未处理的消息:
- 尝试重新投递到原stream
- 超过3次重试则转入死信队列
- 触发企业微信告警通知运维人员
到此这篇关于redis 是如何实现消息队列的?的文章就介绍到这了,更多相关redis消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论