业务场景描述
在我们公司的电商平台中,存在大量异步事件需要实时处理,例如用户下单、库存更新、支付回调等。这些事件对消息的可靠性、顺序性和高吞吐量有较高要求。传统的消息中间件(如kafka、rabbitmq)在运维成本或部署复杂度上存在一定挑战,在部分场景下难以满足“轻量、低延迟、易集成” 的需求。
经过调研和验证,redis 6.0+ 提供的 streams 特性在嵌入式部署、快速上手方面具有显著优势。本篇文章将分享我们在生产环境中基于 redis streams 构建实时消息处理的完整经验,包括技术选型、核心代码示例、踩坑解决和优化方案。
技术选型过程
- 消息可靠性:redis streams 支持持久化,且提供 ack 机制和 pending list,能够有效追踪消费进度。
- 顺序消费:同一消费者组内,可保证分片流(同一 key)中消息按写入顺序被串行消费。
- 横向扩展:可通过 stream 分片(多个 stream key)或消费者组内多实例并行消费提高吞吐。
- 运营成本:redis 已是团队基础设施,集群部署与监控成熟度高,二次成本低。
- 客户端生态:lettuce、jedis、redisson 等客户端均有支持,编码友好。
基于以上考量,最终选型 redis streams,落地于现有 redis 集群,无需额外独立中间件部署。
实现方案详解
环境与依赖
maven 依赖(以 lettuce 客户端为例):
<dependencies>
<dependency>
<groupid>io.lettuce</groupid>
<artifactid>lettuce-core</artifactid>
<version>6.1.5.release</version>
</dependency>
<dependency>
<groupid>org.slf4j</groupid>
<artifactid>slf4j-api</artifactid>
<version>1.7.30</version>
</dependency>
<dependency>
<groupid>ch.qos.logback</groupid>
<artifactid>logback-classic</artifactid>
<version>1.2.3</version>
</dependency>
</dependencies>
springboot 配置(application.yml):
spring:
redis:
host: redis-cluster-host
port: 6379
password: your_password
timeout: 2000ms
流程设计
- producer 将事件写入 stream:xadd
- 多消费者(consumer group)并行读取:xreadgroup
- 消费确认:xack
- 异常消息追踪:pending-list 与 xclaim 回补处理
生产者实现
import io.lettuce.core.redisclient;
import io.lettuce.core.api.statefulredisconnection;
import io.lettuce.core.api.sync.rediscommands;
import java.util.hashmap;
import java.util.map;
public class redisstreamproducer {
private redisclient client;
private statefulredisconnection<string, string> connection;
private rediscommands<string, string> commands;
private static final string stream_key = "orderstream";
public redisstreamproducer(string uri) {
client = redisclient.create(uri);
connection = client.connect();
commands = connection.sync();
}
public string sendmessage(map<string, string> message) {
// xadd key * field value [field value ...]
return commands.xadd(stream_key, message);
}
public void shutdown() {
connection.close();
client.shutdown();
}
public static void main(string[] args) {
redisstreamproducer producer = new redisstreamproducer("redis://:your_password@redis-host:6379/0");
map<string, string> order = new hashmap<>();
order.put("orderid", "123456");
order.put("userid", "u7890");
order.put("amount", "258.50");
string messageid = producer.sendmessage(order);
system.out.println("消息发送成功, id=" + messageid);
producer.shutdown();
}
}
消费者实现
import io.lettuce.core.redisclient;
import io.lettuce.core.streammessage;
import io.lettuce.core.api.statefulredisconnection;
import io.lettuce.core.api.sync.rediscommands;
import io.lettuce.core.models.stream.consumer;
import io.lettuce.core.models.stream.pendingmessage;
import java.time.duration;
import java.util.list;
import java.util.map;
public class redisstreamconsumer {
private redisclient client;
private statefulredisconnection<string, string> connection;
private rediscommands<string, string> commands;
private static final string stream_key = "orderstream";
private static final string group_name = "ordergroup";
private static final string consumer_name = "consumer-1";
public redisstreamconsumer(string uri) {
client = redisclient.create(uri);
connection = client.connect();
commands = connection.sync();
// 创建消费者组, 如果已创建可 ignore
try {
commands.xgroupcreate(stream_key, group_name, "$", true);
} catch (exception e) {
// group exists
}
}
public void consume() {
while (true) {
// 从 pending list 先处理未 ack 的消息
list<pendingmessage> pending = commands.xpending(stream_key, group_name, range.unbounded(), limit.from(10));
for (pendingmessage pm : pending) {
// 重新消费
streammessage<string, string> msg = commands.xclaim(
stream_key,
group_name,
consumer_name,
5000,
pm.getid());
process(msg.getbody());
commands.xack(stream_key, group_name, pm.getid());
}
// 正常读取新消息
list<streammessage<string, string>> messages = commands.xreadgroup(
consumer.from(group_name, consumer_name),
xreadargs.streamoffset.lastconsumed(stream_key));
if (messages != null) {
for (streammessage<string, string> msg : messages) {
process(msg.getbody());
commands.xack(stream_key, group_name, msg.getid());
}
}
// 轮询间隔
try {
thread.sleep(200);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
break;
}
}
}
private void process(map<string, string> body) {
// 业务处理逻辑
system.out.println("处理订单: " + body);
}
public void shutdown() {
connection.close();
client.shutdown();
}
public static void main(string[] args) {
redisstreamconsumer consumer = new redisstreamconsumer("redis://:your_password@redis-host:6379/0");
consumer.consume();
consumer.shutdown();
}
}
踩过的坑与解决方案
1.消息重复消费
- 问题:消费者处理过程中抛出异常导致 ack 未发送,pending list 中累积大量消息。
- 解决:定期扫描 pending list,并结合 xclaim 将“活跃但挂起”消息重新分配给健康消费者处理;同时在业务端做好幂等控制。
2.消息积压与内存压力
- 问题:stream 长度持续增长,redis 实例内存压力上升。
- 解决:使用
xtrim maxlen ~ n对流进行修剪,结合业务保留时间策略,定期分批清理历史消息。
3.消费者实例重启后状态丢失
- 问题:未及时恢复 pending list 中未处理消息,导致部分消息长时间滞留。
- 解决:消费者启动时优先处理 pending list,再进入正常消费流程;并通过定时任务对挂起较久的消息进行报警或二次补偿处理。
总结与最佳实践
- redis streams 适合轻量级、低运维成本的实时消息场景,结合 ack、pending list 能保证高可靠性。
- 采用消费者组(consumer group)可支持横向扩展,读写分离与顺序消费兼得。
- 业务侧必须做好幂等设计,避免消息重复带来的副作用。
- 对 stream 进行合理修剪,避免数据无节制增长导致内存问题。
- 建议结合监控告警,对 pending list 长度、消费者积压情况进行实时监控。
到此这篇关于基于redis streams的实时消息处理实战指南的文章就介绍到这了,更多相关redis streams消息处理内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论