一.redis stream 消息队列模版配置类
/**
* redis stream 消息队列配置
*/
@configuration
@requiredargsconstructor
public class redisstreamconfiguration {
private static final logger log = loggerfactory.getlogger(redisstreamconfiguration.class);
private final redisconnectionfactory redisconnectionfactory;
private final consumer1 consumer1;
private final consumer2 consumer2;
// 定义需要自定义的配置常量
private static final int batch_size = 10; // 每次批量拉取的消息数量
private static final duration poll_timeout = duration.ofseconds(3); // 拉取消息的阻塞超时时间
private static final string thread_name_prefix = "your-business"; // 线程名称前缀
private static final string group_name_1 = "group1"; // 第一个消费者组名称
private static final string group_name_2 = "group2"; // 第二个消费者组名称
private static final string consumer_name_1 = "consumer1"; // 第一个消费者名称
private static final string consumer_name_2 = "consumer2"; // 第二个消费者名称
private static final string stream_topic_key = short_link_stats_stream_topic_key; // stream的主题键
@bean
public executorservice asyncstreamconsumer() {
log.info("redis stream 消息队列配置线程池");
atomicinteger index = new atomicinteger();
int processors = runtime.getruntime().availableprocessors();
// 创建一个自定义线程池
return new threadpoolexecutor(
processors,
processors + (processors >> 1),
60,
timeunit.seconds,
new linkedblockingqueue<>(),
runnable -> {
thread thread = new thread(runnable);
thread.setname(thread_name_prefix + "_" + index.incrementandget());
thread.setdaemon(true);
return thread;
}
);
}
@bean(initmethod = "start", destroymethod = "stop")
public streammessagelistenercontainer<string, maprecord<string, string, string>> streammessagelistenercontainer(
executorservice asyncstreamconsumer) {
// 配置 streammessagelistenercontainer 容器选项
streammessagelistenercontainer.streammessagelistenercontaineroptions<string, maprecord<string, string, string>> options =
streammessagelistenercontainer.streammessagelistenercontaineroptions.builder()
.batchsize(batch_size) // 批量拉取消息数量
.executor(asyncstreamconsumer) // 使用配置好的线程池
.polltimeout(poll_timeout) // 拉取消息的超时时间
.build();
// 创建 streammessagelistenercontainer 实例
streammessagelistenercontainer<string, maprecord<string, string, string>> container =
streammessagelistenercontainer.create(redisconnectionfactory, options);
// 配置第一个消息监听器
container.receiveautoack(
consumer.from(group_name_1, consumer_name_1), // 指定第一个消费者组和消费者名称
streamoffset.create(stream_topic_key, readoffset.lastconsumed()), // 指定主题和偏移量
consumer1 // 指定第一个消息处理逻辑
);
// 配置第二个消息监听器
container.receiveautoack(
consumer.from(group_name_2, consumer_name_2), // 指定第二个消费者组和消费者名称
streamoffset.create(stream_topic_key, readoffset.lastconsumed()), // 指定主题和偏移量
consumer2 // 指定第二个消息处理逻辑
);
return container;
}
}
1. 介绍
redisstreamconfiguration 是一个用于配置 redis stream 消息队列的 spring 配置类。它通过 redis stream 实现消息的异步处理和多消费者消费,适用于需要高吞吐量、低延迟的业务场景。
2. 关键组件和自定义参数
此类主要配置了 redis stream 消息监听容器 streammessagelistenercontainer,包括线程池配置、消费批次和超时时间等,方便用户根据业务需求自定义。
核心参数
batch_size:定义每次批量拉取的消息数量。通过设定合适的批量大小,可以减少消费请求次数,提升处理效率。poll_timeout:设置从 redis stream 拉取消息的超时时间。超时控制允许程序在无消息时保持阻塞,等待消息到达。thread_name_prefix:设置线程名称前缀,帮助识别不同业务模块的线程。group_name_1和group_name_2:定义两个不同的消费者组,适用于同一 stream 多个消费者并行处理消息的场景。consumer_name_1和consumer_name_2:为每个消费者组指定独立的消费者名称,有助于实现消费任务的分配和管理。
代码实现
配置了 streammessagelistenercontainer 来处理 stream 消息,并分别为两个消费者组和消费者注册不同的监听器。
3. 主要方法说明
executorservice(线程池配置)
@bean
public executorservice asyncstreamconsumer() { ... }
用于创建一个自定义线程池,为 redis stream 的消息消费提供异步执行环境。processors 设置了核心线程数为 cpu 核心数,最大线程数为 processors + (processors >> 1),即核心数的 1.5 倍。线程命名使用 thread_name_prefix 前缀,方便日志记录和排查问题。
streammessagelistenercontainer(消息监听容器)
@bean(initmethod = "start", destroymethod = "stop")
public streammessagelistenercontainer<string, maprecord<string, string, string>> streammessagelistenercontainer(...) { ... }
该方法创建并配置了 redis stream 的监听容器。关键步骤如下:
构建容器选项:包括批次大小、线程池、拉取超时时间等参数。
容器实例化:通过
streammessagelistenercontainer.create()创建容器,初始化时自动启动。消息监听器配置
- 为第一个消费者组
group_name_1和消费者consumer_name_1配置了消息监听器consumer1,实现自动确认并消费消息。 - 为第二个消费者组
group_name_2和消费者consumer_name_2配置了另一组消息监听器consumer2,以便多消费者处理。
- 为第一个消费者组
4. 应用场景
此配置适用于 redis stream 在大规模并发场景下的消息队列管理。通过灵活配置多个消费者组和消费者,可以实现负载均衡的多线程消费逻辑。
二.消费者模版
/**
* 消息队列消费者
*/
@requiredargsconstructor
@slf4j
@component
public class shortlinkstatssaveconsumer implements streamlistener<string, maprecord<string, string, string>> {
private final redissonclient redissonclient;
private final stringredistemplate stringredistemplate;
private final messagequeueidempotenthandler messagequeueidempotenthandler;
@override
public void onmessage(maprecord<string, string, string> message) {
string stream = message.getstream();
recordid id = message.getid();
if (!messagequeueidempotenthandler.ismessageprocessed(id.tostring())) {
// 判断当前的这个消息流程是否执行完成
if (messagequeueidempotenthandler.isaccomplish(id.tostring())) {
return;
}
throw new serviceexception("消息未完成流程,需要消息队列重试");
}
try {
map<string, string> producermap = message.getvalue();
//你自己的业务逻辑
}
// 删除消息
stringredistemplate.opsforstream().delete(objects.requirenonnull(stream), id.getvalue());
} catch (throwable ex) {
messagequeueidempotenthandler.delmessageprocessed(id.tostring());
log.error("消费异常", ex);
throw ex;
}
//消费完删除
messagequeueidempotenthandler.setaccomplish(id.tostring());
}
}
本模板实现了一个 redis stream 消息队列消费者的基础结构。该模板主要围绕幂等性检查、消息解析与处理以及消费状态管理三个核心功能,确保消息在高并发环境下的安全性与一致性。
三.生产者模版
/**
* 短链接监控状态保存消息队列生产者
*/
@component
@requiredargsconstructor
public class producer implements messagequeueproducer{
private final stringredistemplate stringredistemplate;
/**
* 发送消息
*/
public void send(map<string, string> producermap) {
stringredistemplate.opsforstream().add(your_key, producermap);
}
}
注意your_key 替换成你自己的即可
四.总结
1. redis stream 消息队列的优势
redis stream 是 redis 提供的一种强大的消息队列解决方案,适用于高吞吐量、低延迟的业务场景。与传统的消息队列系统(如 rabbitmq 或 kafka)相比,redis stream 在集成与配置方面更加简单,尤其适合基于 redis 的应用程序。redis stream 提供了以下优势:
- 高吞吐量:支持高并发和快速消息消费,能够在瞬间处理大量的消息。
- 顺序消费:保证消息的顺序消费,适用于需要顺序处理的业务场景。
- 消费组机制:通过消费组管理消息消费,可以通过多个消费者并行消费,提高处理能力。
- 持久化与备份:可以将消息存储在 redis 中,具备一定的持久化能力,防止数据丢失。
2. redis stream 配置与应用
本文介绍了如何在 spring boot 中集成 redis stream 消息队列的配置与消费逻辑,主要包括:
- 消息消费配置:通过
streammessagelistenercontainer实现消息的异步消费。配置了批量拉取的数量、阻塞超时、线程池等自定义参数,帮助提升系统的并发处理能力。 - 多消费者并行处理:通过消费者组(consumer group)机制,实现多消费者并行消费同一个 stream,提高消息处理的吞吐量和效率。
- 幂等性与消费确认:通过
messagequeueidempotenthandler来保证消息的幂等性,避免重复消费的问题。处理逻辑保证每条消息只会被消费一次,且在消费失败时能够适当回滚,确保系统的可靠性。
3. 消费者与生产者模板
- 消费者模板:消费者通过实现
streamlistener接口来处理从 redis stream 拉取的消息。为了保证幂等性,消费者首先检查消息是否已经处理过,未完成的消息会被标记并重试,确保消息处理的安全性。 - 生产者模板:生产者通过
stringredistemplate将消息发送到 redis stream。当业务中有新的消息需要处理时,生产者将消息添加到 redis stream 进行后续处理。
4. 应用场景
redis stream 适用于许多场景,特别是需要高并发、高吞吐量且保证顺序消费的业务需求。例如,短链接生成与访问统计、订单处理、日志收集等业务场景,都能通过 redis stream 实现高效、可靠的消息队列。
到此这篇关于springboot集成redis消息队列的实现示例的文章就介绍到这了,更多相关springboot redis消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论