redis 5.0版本引入的stream数据类型,为redis生态带来了强大而灵活的消息队列功能,弥补了之前发布/订阅模式的不足,如消息持久化、消费者组、消息确认等特性。
redis stream结合了传统消息队列和时序数据库的特点,适用于日志收集、事件驱动应用、实时分析等多种场景。
本文将介绍redis stream的6种消息处理模式。
1. 简单消费模式(simple consumption)
基本概念
简单消费模式是redis stream最基础的使用方式,不使用消费者组,直接读取流中的消息。生产者将消息追加到流中,消费者通过指定起始id来读取消息。
核心命令
# 发布消息 xadd stream_name [id] field value [field value ...] # 读取消息 xread [count count] [block milliseconds] streams stream_name start_id
实现示例
redis cli
# 添加消息到stream
> xadd mystream * sensor_id 1234 temperature 19.8 humidity 56
"1647257548956-0"
# 从头开始读取所有消息
> xread streams mystream 0
1) 1) "mystream"
2) 1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"
# 从指定id开始读取
> xread streams mystream 1647257548956-0
(empty list or set)
# 从最新的消息id之后开始读取(阻塞等待新消息)
> xread block 5000 streams mystream $
(nil)java spring boot示例
@service
public class simplestreamservice {
@autowired
private stringredistemplate redistemplate;
/**
* 发布消息到stream
*/
public string publishevent(string streamkey, map<string, object> eventdata) {
stringrecord record = streamrecords.string(eventdata).withstreamkey(streamkey);
return redistemplate.opsforstream().add(record).getvalue();
}
/**
* 从指定位置开始读取消息
*/
public list<maprecord<string, object, object>> readevents(string streamkey, string startid, int count) {
streamreadoptions readoptions = streamreadoptions.empty().count(count);
return redistemplate.opsforstream().read(readoptions, streamoffset.from(streamkey, readoffset.from(startid)));
}
/**
* 阻塞式读取消息
*/
public list<maprecord<string, object, object>> readeventsblocking(string streamkey, int timeoutmillis) {
streamreadoptions readoptions = streamreadoptions.empty().count(10).block(duration.ofmillis(timeoutmillis));
return redistemplate.opsforstream().read(readoptions, streamoffset.latest(streamkey));
}
}使用场景
- 简单的事件日志记录
- 单一消费者场景
- 时间序列数据收集
- 开发和调试阶段
优缺点
优点
- 实现简单,无需创建和管理消费者组
- 直接控制从哪个位置开始消费消息
- 适合单个消费者场景
缺点
- 无法实现负载均衡
- 无法追踪消息确认状态
- 需要手动管理已读消息id
- 服务重启需自行记录上次读取位置
2. 消费者组模式(consumer groups)
基本概念
消费者组允许多个消费者共同处理一个流的消息,实现负载均衡,并提供消息确认机制,确保消息至少被处理一次。每个消费者组维护自己的消费位置,不同消费者组之间互不干扰。
核心命令
# 创建消费者组 xgroup create stream_name group_name [id|$] [mkstream] # 从消费者组读取消息 xreadgroup group group_name consumer_name [count count] [block milliseconds] streams stream_name [>|id] # 确认消息处理完成 xack stream_name group_name message_id [message_id ...]
实现示例
redis cli
# 创建消费者组
> xgroup create mystream processing-group $ mkstream
ok
# 消费者1读取消息
> xreadgroup group processing-group consumer-1 count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"
# 确认消息已处理
> xack mystream processing-group 1647257548956-0
(integer) 1
# 消费者2读取消息(已无未处理消息)
> xreadgroup group processing-group consumer-2 count 1 streams mystream >
1) 1) "mystream"
2) (empty list or set)java spring boot示例
@service
public class consumergroupservice {
@autowired
private stringredistemplate redistemplate;
/**
* 创建消费者组
*/
public void creategroup(string streamkey, string groupname) {
try {
redistemplate.opsforstream().creategroup(streamkey, groupname);
} catch (redissystemexception e) {
// 处理流不存在的情况
if (e.getrootcause() instanceof rediscommandexecutionexception
&& e.getrootcause().getmessage().contains("nogroup")) {
redistemplate.opsforstream().creategroup(readoffset.from("0"), streamkey, groupname);
} else {
throw e;
}
}
}
/**
* 从消费者组读取消息
*/
public list<maprecord<string, object, object>> readfromgroup(
string streamkey, string groupname, string consumername, int count) {
streamreadoptions options = streamreadoptions.empty().count(count);
return redistemplate.opsforstream().read(
consumer.from(groupname, consumername),
options,
streamoffset.create(streamkey, readoffset.lastconsumed())
);
}
/**
* 阻塞式从消费者组读取消息
*/
public list<maprecord<string, object, object>> readfromgroupblocking(
string streamkey, string groupname, string consumername, int count, duration timeout) {
streamreadoptions options = streamreadoptions.empty().count(count).block(timeout);
return redistemplate.opsforstream().read(
consumer.from(groupname, consumername),
options,
streamoffset.create(streamkey, readoffset.lastconsumed())
);
}
/**
* 确认消息已处理
*/
public long acknowledgemessage(string streamkey, string groupname, string... messageids) {
return redistemplate.opsforstream().acknowledge(streamkey, groupname, messageids);
}
}使用场景
- 需要横向扩展消息处理能力的系统
- 要求消息可靠处理的业务场景
- 实现消息工作队列
- 微服务间的事件传递
优缺点
优点
- 多个消费者可以并行处理消息
- 提供消息确认机制,保证消息不丢失
- 支持消费者崩溃后恢复处理
- 每个消费者组维护独立的消费位置
缺点
- 实现相对复杂
- 需要妥善管理消费者组和消费者
- 需要显式处理消息确认
- 需要定期处理未确认的消息
3. 阻塞式消费模式(blocking consumption)
基本概念
阻塞式消费允许消费者在没有新消息时保持连接,等待新消息到达。这种模式减少了轮询开销,提高了实时性,适合对消息处理时效性要求高的场景。
核心命令
# 阻塞式简单消费 xread block milliseconds streams stream_name id # 阻塞式消费者组消费 xreadgroup group group_name consumer_name block milliseconds streams stream_name >
实现示例
redis cli
# 阻塞等待新消息(最多等待10秒) > xread block 10000 streams mystream $ (nil) # 如果10秒内没有新消息 # 使用消费者组的阻塞式消费 > xreadgroup group processing-group consumer-1 block 10000 streams mystream > (nil) # 如果10秒内没有新分配的消息
java spring boot示例
@service
public class blockingstreamconsumerservice {
@autowired
private stringredistemplate redistemplate;
/**
* 阻塞式消息消费者任务
*/
@async
public void startblockingconsumer(string streamkey, string lastid, duration timeout) {
streamreadoptions options = streamreadoptions.empty()
.count(1)
.block(timeout);
while (!thread.currentthread().isinterrupted()) {
try {
// 阻塞读取消息
list<maprecord<string, object, object>> records = redistemplate.opsforstream()
.read(options, streamoffset.from(streamkey, readoffset.from(lastid)));
if (records != null && !records.isempty()) {
for (maprecord<string, object, object> record : records) {
// 处理消息
processmessage(record);
// 更新最后读取的id
lastid = record.getid().getvalue();
}
} else {
// 超时未读取到消息,可以执行一些其他逻辑
}
} catch (exception e) {
// 异常处理
log.error("error reading from stream: {}", e.getmessage(), e);
try {
thread.sleep(1000); // 出错后等待一段时间再重试
} catch (interruptedexception ie) {
thread.currentthread().interrupt();
break;
}
}
}
}
/**
* 阻塞式消费者组消费
*/
@async
public void startgroupblockingconsumer(
string streamkey, string groupname, string consumername, duration timeout) {
streamreadoptions options = streamreadoptions.empty()
.count(1)
.block(timeout);
while (!thread.currentthread().isinterrupted()) {
try {
// 阻塞读取消息
list<maprecord<string, object, object>> records = redistemplate.opsforstream()
.read(consumer.from(groupname, consumername),
options,
streamoffset.create(streamkey, readoffset.lastconsumed()));
if (records != null && !records.isempty()) {
for (maprecord<string, object, object> record : records) {
try {
// 处理消息
processmessage(record);
// 确认消息
redistemplate.opsforstream()
.acknowledge(streamkey, groupname, record.getid().getvalue());
} catch (exception e) {
// 处理失败,记录日志
log.error("error processing message: {}", e.getmessage(), e);
}
}
}
} catch (exception e) {
log.error("error reading from stream group: {}", e.getmessage(), e);
try {
thread.sleep(1000);
} catch (interruptedexception ie) {
thread.currentthread().interrupt();
break;
}
}
}
}
private void processmessage(maprecord<string, object, object> record) {
// 实际消息处理逻辑
log.info("processing message: {}", record);
// ...处理消息的具体业务逻辑
}
}使用场景
- 实时数据处理系统
- 事件驱动的任务处理
- 低延迟要求的应用
- 即时通讯系统
- 通知服务
优缺点
优点
- 减少轮询带来的资源浪费
- 实时性好,消息到达后立即处理
- 降低redis和客户端的负载
- 节省cpu和网络资源
缺点
- 长连接可能占用redis连接资源
- 需要合理设置超时时间
- 可能需要处理网络中断后的重连
- 消费者需要具备并发处理能力
4. 扇出模式(fan-out pattern)
基本概念
扇出模式允许多个独立的消费者组同时消费同一个流中的所有消息,类似于发布/订阅模式,但具有消息持久化和回溯能力。每个消费者组独立维护自己的消费位置。
核心命令
创建多个消费者组,它们都独立消费同一个流:
xgroup create stream_name group_name_1 $ mkstream xgroup create stream_name group_name_2 $ mkstream xgroup create stream_name group_name_3 $ mkstream
实现示例
redis cli
# 创建多个消费者组
> xgroup create notifications analytics-group $ mkstream
ok
> xgroup create notifications email-group $ mkstream
ok
> xgroup create notifications mobile-group $ mkstream
ok
# 添加一条消息
> xadd notifications * type user_signup user_id 1001 email "user@example.com"
"1647345678912-0"
# 从各个消费者组读取(每个组都能收到所有消息)
> xreadgroup group analytics-group analytics-1 count 1 streams notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"
> xreadgroup group email-group email-1 count 1 streams notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"
> xreadgroup group mobile-group mobile-1 count 1 streams notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"java spring boot示例
@service
public class fanoutservice {
@autowired
private stringredistemplate redistemplate;
/**
* 初始化扇出消费者组
*/
public void initializefanoutgroups(string streamkey, list<string> groupnames) {
// 确保流存在
try {
streaminfo.xinfostream info = redistemplate.opsforstream().info(streamkey);
} catch (exception e) {
// 流不存在,发送一个初始消息
map<string, object> initialmessage = new hashmap<>();
initialmessage.put("init", "true");
redistemplate.opsforstream().add(streamkey, initialmessage);
}
// 创建所有消费者组
for (string groupname : groupnames) {
try {
redistemplate.opsforstream().creategroup(streamkey, groupname);
} catch (exception e) {
// 忽略组已存在的错误
log.info("group {} may already exist: {}", groupname, e.getmessage());
}
}
}
/**
* 发布扇出消息
*/
public string publishfanoutmessage(string streamkey, map<string, object> messagedata) {
stringrecord record = streamrecords.string(messagedata).withstreamkey(streamkey);
return redistemplate.opsforstream().add(record).getvalue();
}
/**
* 为特定组启动消费者
*/
@async
public void startgroupconsumer(
string streamkey, string groupname, string consumername,
consumer<maprecord<string, object, object>> messagehandler) {
streamreadoptions options = streamreadoptions.empty().count(10).block(duration.ofseconds(2));
while (!thread.currentthread().isinterrupted()) {
try {
list<maprecord<string, object, object>> messages = redistemplate.opsforstream().read(
consumer.from(groupname, consumername),
options,
streamoffset.create(streamkey, readoffset.lastconsumed())
);
if (messages != null && !messages.isempty()) {
for (maprecord<string, object, object> message : messages) {
try {
// 处理消息
messagehandler.accept(message);
// 确认消息
redistemplate.opsforstream().acknowledge(
streamkey, groupname, message.getid().getvalue());
} catch (exception e) {
log.error("error processing message in group {}: {}",
groupname, e.getmessage(), e);
}
}
}
} catch (exception e) {
log.error("error reading from stream for group {}: {}",
groupname, e.getmessage(), e);
try {
thread.sleep(1000);
} catch (interruptedexception ie) {
thread.currentthread().interrupt();
break;
}
}
}
}
}使用示例
@service
public class notificationservice {
@autowired
private fanoutservice fanoutservice;
@postconstruct
public void init() {
// 初始化扇出组
list<string> groups = arrays.aslist("email-group", "sms-group", "analytics-group");
fanoutservice.initializefanoutgroups("user-events", groups);
// 启动各个消费者组的处理器
fanoutservice.startgroupconsumer(
"user-events", "email-group", "email-consumer", this::processemailnotification);
fanoutservice.startgroupconsumer(
"user-events", "sms-group", "sms-consumer", this::processsmsnotification);
fanoutservice.startgroupconsumer(
"user-events", "analytics-group", "analytics-consumer", this::processanalyticsevent);
}
private void processemailnotification(maprecord<string, object, object> message) {
map<object, object> messagedata = message.getvalue();
log.info("processing email notification: {}", messagedata);
// 邮件发送逻辑
}
private void processsmsnotification(maprecord<string, object, object> message) {
map<object, object> messagedata = message.getvalue();
log.info("processing sms notification: {}", messagedata);
// 短信发送逻辑
}
private void processanalyticsevent(maprecord<string, object, object> message) {
map<object, object> messagedata = message.getvalue();
log.info("processing analytics event: {}", messagedata);
// 分析事件处理逻辑
}
public void publishuserevent(string eventtype, map<string, object> eventdata) {
map<string, object> message = new hashmap<>(eventdata);
message.put("event_type", eventtype);
message.put("timestamp", system.currenttimemillis());
fanoutservice.publishfanoutmessage("user-events", message);
}
}使用场景
- 多个系统需要独立处理同一事件流
- 实现事件广播机制
- 系统集成:一个事件触发多个业务流程
- 日志统一处理并分发到不同服务
- 通知系统:一个事件需要通过多种方式通知用户
优缺点
优点
- 实现一次发布多次消费
- 各消费者组独立工作,互不影响
- 新增消费者组可以从头开始消费所有历史消息
- 可靠性高,消息持久化存储
缺点
- 随着流数据增长,可能占用较多存储空间
- 需要合理设置流的最大长度或过期策略
- 消费者组数量过多可能增加redis负载
- 需要单独管理每个消费者组的状态
5. 重试与恢复模式(retry and recovery)
基本概念
这种模式关注处理失败消息的恢复和重试机制。redis stream消费者组会跟踪每个消息的处理状态,允许查看和管理未确认(pel - pending entry list)的消息,实现可靠的消息处理。
核心命令
# 查看消费者组中未确认的消息 xpending stream_name group_name [start_id end_id count] [consumer_name] # 查看消费者组中长时间未确认的消息详情 xpending stream_name group_name start_id end_id count [consumer_name] # 认领处理超时的消息 xclaim stream_name group_name consumer_name min_idle_time message_id [message_id ...] [justid]
实现示例
redis cli
# 查看未确认的消息数量
> xpending mystream processing-group
1) (integer) 2 # 未确认消息数量
2) "1647257548956-0" # 最小id
3) "1647257549123-0" # 最大id
4) 1) 1) "consumer-1" # 各个消费者的未确认消息数
2) (integer) 1
2) 1) "consumer-2"
2) (integer) 1
# 查看特定消费者的未确认消息
> xpending mystream processing-group - + 10 consumer-1
1) 1) "1647257548956-0" # 消息id
2) "consumer-1" # 当前持有的消费者
3) (integer) 120000 # 空闲时间(毫秒)
4) (integer) 2 # 传递次数
# 认领超过2分钟未处理的消息
> xclaim mystream processing-group consumer-2 120000 1647257548956-0
1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"java spring boot示例
@service
public class messagerecoveryservice {
@autowired
private stringredistemplate redistemplate;
/**
* 获取消费者组中的未确认消息
*/
public pendingmessagessummary getpendingmessagessummary(string streamkey, string groupname) {
return redistemplate.opsforstream().pending(streamkey, groupname);
}
/**
* 获取指定消费者的详细未确认消息
*/
public pendingmessages getpendingmessages(
string streamkey, string groupname, string consumername,
range<string> idrange, long count) {
return redistemplate.opsforstream().pending(
streamkey,
consumer.from(groupname, consumername),
idrange,
count);
}
/**
* 认领长时间未处理的消息
*/
public list<maprecord<string, object, object>> claimmessages(
string streamkey, string groupname, string newconsumername,
duration minidletime, string... messageids) {
return redistemplate.opsforstream().claim(
streamkey,
consumer.from(groupname, newconsumername),
minidletime,
messageids);
}
/**
* 定时检查和恢复未处理的消息
*/
@scheduled(fixedrate = 60000) // 每分钟执行一次
public void recoverstalemessages() {
// 配置参数
string streamkey = "mystream";
string groupname = "processing-group";
string recoveryconsumer = "recovery-consumer";
duration minidletime = duration.ofminutes(5); // 超过5分钟未处理的消息
try {
// 1. 获取所有未确认消息的摘要
pendingmessagessummary summary = getpendingmessagessummary(streamkey, groupname);
if (summary != null && summary.gettotalpendingmessages() > 0) {
// 2. 遍历每个消费者的未确认消息
for (consumer consumer : summary.getpendingmessagesperconsumer().keyset()) {
// 获取该消费者的详细未确认消息列表
pendingmessages pendingmessages = getpendingmessages(
streamkey, groupname, consumer.getname(),
range.unbounded(), 50); // 每次最多处理50条
if (pendingmessages != null) {
// 3. 筛选出空闲时间超过阈值的消息
list<string> stalemessageids = new arraylist<>();
for (pendingmessage message : pendingmessages) {
if (message.getelapsedtimesincelastdelivery().compareto(minidletime) > 0) {
stalemessageids.add(message.getidasstring());
}
}
// 4. 认领这些消息
if (!stalemessageids.isempty()) {
log.info("claiming {} stale messages from consumer {}",
stalemessageids.size(), consumer.getname());
list<maprecord<string, object, object>> claimedmessages = claimmessages(
streamkey, groupname, recoveryconsumer, minidletime,
stalemessageids.toarray(new string[0]));
// 5. 处理这些被认领的消息
processclaimedmessages(streamkey, groupname, claimedmessages);
}
}
}
}
} catch (exception e) {
log.error("error recovering stale messages: {}", e.getmessage(), e);
}
}
/**
* 处理被认领的消息
*/
private void processclaimedmessages(
string streamkey, string groupname,
list<maprecord<string, object, object>> messages) {
if (messages == null || messages.isempty()) {
return;
}
for (maprecord<string, object, object> message : messages) {
try {
// 执行消息处理逻辑
processmessage(message);
// 确认消息
redistemplate.opsforstream().acknowledge(
streamkey, groupname, message.getid().getvalue());
log.info("successfully processed recovered message: {}", message.getid());
} catch (exception e) {
log.error("failed to process recovered message {}: {}",
message.getid(), e.getmessage(), e);
// 根据业务需求决定是否将消息加入死信队列
movetodeadletterqueue(streamkey, message);
}
}
}
/**
* 将消息移至死信队列
*/
private void movetodeadletterqueue(string sourcestream, maprecord<string, object, object> message) {
string deadletterstream = sourcestream + ":dead-letter";
map<object, object> messagedata = message.getvalue();
map<string, object> dlqmessage = new hashmap<>();
messagedata.foreach((k, v) -> dlqmessage.put(k.tostring(), v));
// 添加元数据
dlqmessage.put("original_id", message.getid().getvalue());
dlqmessage.put("error_time", system.currenttimemillis());
redistemplate.opsforstream().add(deadletterstream, dlqmessage);
// 可选:从原消费者组确认该消息
// redistemplate.opsforstream().acknowledge(sourcestream, groupname, message.getid().getvalue());
}
private void processmessage(maprecord<string, object, object> message) {
// 实际的消息处理逻辑
log.info("processing recovered message: {}", message);
// ...
}
}使用场景
- 需要可靠消息处理的关键业务系统
- 处理时间较长的任务
- 需要错误重试机制的工作流
- 监控和诊断消息处理过程
- 实现死信队列处理特定失败场景
优缺点
优点
- 提高系统容错性和可靠性
- 自动恢复因消费者崩溃导致的未处理消息
- 可以识别和处理长时间未确认的消息
- 支持实现复杂的重试策略和死信处理
缺点
- 需要额外开发和维护恢复机制
- 可能导致消息重复处理,需要确保业务逻辑幂等
- 系统复杂度增加
- 需要监控和管理pel(未确认消息列表)的大小
6. 流处理窗口模式(streaming window processing)
基本概念
流处理窗口模式基于时间或消息计数划分数据流,在每个窗口内执行聚合或分析操作。这种模式适用于实时分析、趋势监测和时间序列处理。虽然redis stream本身不直接提供窗口操作,但可以结合redis的其他特性实现。
实现方式
主要通过以下几种方式实现:
1. 基于消息id的时间范围(redis消息id包含毫秒时间戳)
2. 结合redis的排序集合(sortedset)存储窗口数据
3. 使用redis的过期键实现滑动窗口
实现示例
redis cli
窗口数据收集与查询:
# 添加带时间戳的数据
> xadd temperature * sensor_id 1 value 21.5 timestamp 1647257548000
"1647257550123-0"
> xadd temperature * sensor_id 1 value 21.8 timestamp 1647257558000
"1647257560234-0"
> xadd temperature * sensor_id 1 value 22.1 timestamp 1647257568000
"1647257570345-0"
# 查询特定时间范围的数据
> xrange temperature 1647257550000-0 1647257570000-0
1) 1) "1647257550123-0"
2) 1) "sensor_id"
2) "1"
3) "value"
4) "21.5"
5) "timestamp"
6) "1647257548000"
2) 1) "1647257560234-0"
2) 1) "sensor_id"
2) "1"
3) "value"
4) "21.8"
5) "timestamp"
6) "1647257558000"java spring boot示例
@service
public class timewindowprocessingservice {
@autowired
private stringredistemplate redistemplate;
/**
* 添加数据点到流,并存储到相应的时间窗口
*/
public string adddatapoint(string streamkey, string sensorid, double value) {
long timestamp = system.currenttimemillis();
// 1. 添加到原始数据流
map<string, object> datapoint = new hashmap<>();
datapoint.put("sensor_id", sensorid);
datapoint.put("value", string.valueof(value));
datapoint.put("timestamp", string.valueof(timestamp));
stringrecord record = streamrecords.string(datapoint).withstreamkey(streamkey);
recordid recordid = redistemplate.opsforstream().add(record);
// 2. 计算所属的窗口(这里以5分钟为一个窗口)
long windowstart = timestamp - (timestamp % (5 * 60 * 1000));
string windowkey = streamkey + ":window:" + windowstart;
// 3. 将数据点添加到窗口的有序集合中,分数为时间戳
string datapointjson = new objectmapper().writevalueasstring(datapoint);
redistemplate.opsforzset().add(windowkey, datapointjson, timestamp);
// 4. 设置窗口键的过期时间(保留24小时)
redistemplate.expire(windowkey, duration.ofhours(24));
return recordid.getvalue();
}
/**
* 获取指定时间窗口内的数据点
*/
public list<map<string, object>> getwindowdata(
string streamkey, long windowstarttime, long windowendtime) {
// 计算可能的窗口键(每5分钟一个窗口)
list<string> windowkeys = new arraylist<>();
long current = windowstarttime - (windowstarttime % (5 * 60 * 1000));
while (current <= windowendtime) {
windowkeys.add(streamkey + ":window:" + current);
current += (5 * 60 * 1000);
}
// 从各个窗口获取数据点
list<map<string, object>> results = new arraylist<>();
objectmapper mapper = new objectmapper();
for (string windowkey : windowkeys) {
set<string> datapoints = redistemplate.opsforzset().rangebyscore(
windowkey, windowstarttime, windowendtime);
if (datapoints != null) {
for (string datapointjson : datapoints) {
try {
map<string, object> datapoint = mapper.readvalue(
datapointjson, new typereference<map<string, object>>() {});
results.add(datapoint);
} catch (exception e) {
log.error("error parsing data point: {}", e.getmessage(), e);
}
}
}
}
// 按时间戳排序
results.sort(comparator.comparing(dp -> long.parselong(dp.get("timestamp").tostring())));
return results;
}
/**
* 计算窗口内数据的聚合统计
*/
public map<string, object> getwindowstats(
string streamkey, string sensorid, long windowstarttime, long windowendtime) {
list<map<string, object>> windowdata = getwindowdata(streamkey, windowstarttime, windowendtime);
// 过滤特定传感器的数据
list<double> values = windowdata.stream()
.filter(dp -> sensorid.equals(dp.get("sensor_id").tostring()))
.map(dp -> double.parsedouble(dp.get("value").tostring()))
.collect(collectors.tolist());
map<string, object> stats = new hashmap<>();
stats.put("count", values.size());
if (!values.isempty()) {
doublesummarystatistics summarystats = values.stream().collect(collectors.summarizingdouble(v -> v));
stats.put("min", summarystats.getmin());
stats.put("max", summarystats.getmax());
stats.put("avg", summarystats.getaverage());
stats.put("sum", summarystats.getsum());
}
stats.put("start_time", windowstarttime);
stats.put("end_time", windowendtime);
stats.put("sensor_id", sensorid);
return stats;
}
/**
* 实现滑动窗口处理
*/
@scheduled(fixedrate = 60000) // 每分钟执行一次
public void processslidingwindows() {
string streamkey = "temperature";
long now = system.currenttimemillis();
// 处理过去10分钟窗口的数据
long windowendtime = now;
long windowstarttime = now - (10 * 60 * 1000);
list<string> sensorids = arrays.aslist("1", "2", "3"); // 示例传感器id
for (string sensorid : sensorids) {
try {
// 获取窗口统计
map<string, object> stats = getwindowstats(streamkey, sensorid, windowstarttime, windowendtime);
// 根据统计结果执行业务逻辑
if (stats.containskey("avg")) {
double avgtemp = (double) stats.get("avg");
if (avgtemp > 25.0) {
// 触发高温警报
log.warn("high temperature alert for sensor {}: {} °c", sensorid, avgtemp);
triggeralert(sensorid, "high_temp", avgtemp);
}
}
// 存储聚合结果用于历史趋势分析
saveaggregatedresults(streamkey, sensorid, stats);
} catch (exception e) {
log.error("error processing sliding window for sensor {}: {}",
sensorid, e.getmessage(), e);
}
}
}
/**
* 触发警报
*/
private void triggeralert(string sensorid, string alerttype, double value) {
map<string, object> alertdata = new hashmap<>();
alertdata.put("sensor_id", sensorid);
alertdata.put("alert_type", alerttype);
alertdata.put("value", value);
alertdata.put("timestamp", system.currenttimemillis());
redistemplate.opsforstream().add("alerts", alertdata);
}
/**
* 保存聚合结果
*/
private void saveaggregatedresults(string streamkey, string sensorid, map<string, object> stats) {
long windowtime = (long) stats.get("end_time");
string aggregatekey = streamkey + ":aggregate:" + sensorid;
// 使用时间作为分数存储聚合结果
redistemplate.opsforzset().add(
aggregatekey,
new objectmapper().writevalueasstring(stats),
windowtime);
// 保留30天的聚合数据
redistemplate.expire(aggregatekey, duration.ofdays(30));
}
}使用场景
- 实时数据分析与统计
- 趋势检测和预测
- 异常值和阈值监控
- 时间序列数据处理
- iot数据流处理和聚合
- 用户行为分析
优缺点
优点
- 支持基于时间的数据分析
- 可以实现实时聚合和计算
- 灵活的窗口定义(滑动窗口、滚动窗口)
- 可扩展以支持复杂的分析场景
缺点
- 实现复杂度较高
- 可能需要额外的数据结构和存储空间
- 对于大数据量的窗口计算可能影响性能
- 需要小心管理内存使用和数据过期策略
结论
redis stream提供了强大而灵活的消息处理功能,通过组合这些模式,可以构建出高性能、可靠且灵活的消息处理系统,满足从简单的任务队列到复杂的实时数据处理等各种应用需求。
在选择和实现这些模式时,应充分考虑业务特性、性能需求、可靠性要求以及系统规模,结合redis stream的特性,打造最适合自己应用场景的消息处理解决方案。
以上就是一文带你搞懂redis stream的6种消息处理模式的详细内容,更多关于redis stream消息处理模式的资料请关注代码网其它相关文章!
发表评论