引言
redis stream 是 redis 5.0 引入的一种数据结构,用于处理日志类型的数据。它提供了高效、可靠的方式来处理和存储时间序列数据,如事件、消息等。其设计灵感源于 kafka 和类似的消息队列系统,且完全集成在 redis 中,利用了 redis 的高性能和持久化特性。
依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
说明:此部分定义了 redis 相关的依赖,确保项目能够引入并使用 spring boot 提供的 redis 启动器。
redistemplate 配置
package com.mjg.config;
import com.fasterxml.jackson.annotation.jsonautodetect;
import com.fasterxml.jackson.annotation.propertyaccessor;
import com.fasterxml.jackson.databind.objectmapper;
import com.fasterxml.jackson.databind.serializationfeature;
import com.fasterxml.jackson.datatype.jsr310.javatimemodule;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.data.redis.connection.redisconnectionfactory;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.data.redis.serializer.jackson2jsonredisserializer;
import org.springframework.data.redis.serializer.stringredisserializer;
@configuration
public class redisconfig {
@bean
public redistemplate<string, object> redistemplate(redisconnectionfactory connectionfactory) {
redistemplate<string, object> template = new redistemplate<>();
template.setconnectionfactory(connectionfactory);
jackson2jsonredisserializer<object> jackson2jsonredisserializer = new jackson2jsonredisserializer<>(object.class);
objectmapper om = new objectmapper();
om.setvisibility(propertyaccessor.all, jsonautodetect.visibility.any);
// om.activatedefaulttyping(laissezfairesubtypevalidator.instance, objectmapper.defaulttyping.non_final, jsontypeinfo.as.property);
// 注册 java 8 日期时间模块
om.registermodule(new javatimemodule());
om.configure(serializationfeature.write_dates_as_timestamps, false);
om.configure(serializationfeature.fail_on_empty_beans, false);
jackson2jsonredisserializer.serialize(om);
stringredisserializer stringredisserializer = new stringredisserializer();
// key 采用 string 的序列化方式
template.setkeyserializer(stringredisserializer);
// hash 的 key 也采用 string 的序列化方式
template.sethashkeyserializer(stringredisserializer);
// value 序列化方式采用 jackson
template.setvalueserializer(jackson2jsonredisserializer);
// hash 的 value 序列化方式采用 jackson
template.sethashvalueserializer(jackson2jsonredisserializer);
template.afterpropertiesset();
return template;
}
}
说明:此配置类用于设置 redistemplate 的序列化方式,以满足不同数据类型的存储和读取需求。
redisstreamconfig
package com.mjg.config;
import cn.hutool.core.convert.convert;
import cn.hutool.core.util.strutil;
import lombok.requiredargsconstructor;
import lombok.sneakythrows;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.disposablebean;
import org.springframework.beans.factory.initializingbean;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.data.redis.connection.redisconnectionfactory;
import org.springframework.data.redis.connection.redisservercommands;
import org.springframework.data.redis.connection.stream.consumer;
import org.springframework.data.redis.connection.stream.objectrecord;
import org.springframework.data.redis.connection.stream.readoffset;
import org.springframework.data.redis.connection.stream.streamoffset;
import org.springframework.data.redis.core.rediscallback;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.data.redis.core.streamoperations;
import org.springframework.data.redis.stream.streamlistener;
import org.springframework.data.redis.stream.streammessagelistenercontainer;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
import org.springframework.util.assert;
import java.net.inetaddress;
import java.time.duration;
import java.util.properties;
@slf4j
@requiredargsconstructor
@configuration
public class redisstreamconfig implements initializingbean, disposablebean {
private final redistemplate<string, object> redistemplate;
public static string streamname = "user-event-stream";
public static string usereventgroup = "user-event-group";
private final threadpooltaskexecutor threadpooltaskexecutor;
/**
* 消息侦听器容器,用于监听 redis stream 中的消息
*
* @param connectionfactory redis 连接工厂,用于创建 redis 连接
* @param messageconsumer 消息消费者,用于处理接收到的消息
* @return 返回 {@link streammessagelistenercontainer}<{@link string}, {@link objectrecord}<{@link string}, {@link string}>> 类型的消息侦听器容器
*/
@bean
public streammessagelistenercontainer<string, objectrecord<string, string>> messagelistenercontainer(redisconnectionfactory connectionfactory, messageconsumer messageconsumer) {
streammessagelistenercontainer<string, objectrecord<string, string>> listenercontainer = streamcontainer(streamname, connectionfactory, messageconsumer);
listenercontainer.start();
return listenercontainer;
}
/**
* 创建一个流容器,用于监听 redis stream 中的数据
*
* @param streamname redis stream 的名称
* @param connectionfactory redis 连接工厂
* @param streamlistener 绑定的监听类
* @return 返回 streammessagelistenercontainer 对象
*/
@sneakythrows
private streammessagelistenercontainer<string, objectrecord<string, string>> streamcontainer(string streamname, redisconnectionfactory connectionfactory, streamlistener<string, objectrecord<string, string>> streamlistener) {
streammessagelistenercontainer.streammessagelistenercontaineroptions<string, objectrecord<string, string>> options =
streammessagelistenercontainer.streammessagelistenercontaineroptions
.builder()
.polltimeout(duration.ofseconds(5)) // 拉取消息超时时间
.batchsize(10) // 批量抓取消息
.targettype(string.class) // 传递的数据类型
.executor(threadpooltaskexecutor)
.build();
streammessagelistenercontainer<string, objectrecord<string, string>> container = streammessagelistenercontainer
.create(connectionfactory, options);
// 指定消费最新的消息
streamoffset<string> offset = streamoffset.create(streamname, readoffset.lastconsumed());
// 创建消费者
streammessagelistenercontainer.streamreadrequest<string> streamreadrequest = buildstreamreadrequest(offset, streamlistener);
// 指定消费者对象
container.register(streamreadrequest, streamlistener);
return container;
}
/**
* 生成流读取请求
*
* @param offset 偏移量,用于指定从 redis stream 中的哪个位置开始读取消息
* @param streamlistener 流侦听器,用于处理接收到的消息
* @return 返回一个 streamreadrequest 对象,表示一个流读取请求
* @throws exception 当 streamlistener 无法识别为 messageconsumer 类型时,抛出异常
*/
private streammessagelistenercontainer.streamreadrequest<string> buildstreamreadrequest(streamoffset<string> offset, streamlistener<string, objectrecord<string, string>> streamlistener) throws exception {
consumer consumer;
if (streamlistener instanceof messageconsumer) {
consumer = consumer.from(usereventgroup, inetaddress.getlocalhost().gethostname());
} else {
throw new exception("无法识别的 stream key");
}
// 关闭自动 ack 确认
return streammessagelistenercontainer.streamreadrequest.builder(offset)
.errorhandler((error) -> {
log.error(error.getmessage());
})
.cancelonerror(e -> false)
.consumer(consumer)
// 关闭自动 ack 确认
.autoacknowledge(false)
.build();
}
/**
* 检查 redis 版本是否符合要求
*
* @throws illegalstateexception 如果 redis 版本小于 5.0.0 版本,抛出该异常
*/
private void checkredisversion() {
// 获得 redis 版本
properties info = redistemplate.execute((rediscallback<properties>) redisservercommands::info);
assert.notnull(info, "redis info is null");
object redisversion = info.get("redis_version");
integer anint = convert.toint(redisversion);
if (anint < 5) {
throw new illegalstateexception(strutil.format("您当前的 redis 版本为 {},小于最低要求的 5.0.0 版本!", redisversion));
}
}
@override
public void destroy() throws exception {
}
@override
public void afterpropertiesset() throws exception {
checkredisversion();
streamoperations<string, object, object> streamoperations = redistemplate.opsforstream();
if (boolean.false.equals(redistemplate.haskey(streamname))) {
streamoperations.creategroup(streamname, readoffset.from("0"), usereventgroup);
}
}
}
说明:该配置类实现了对 redis stream 的相关配置,包括消息监听容器的创建、流读取请求的生成、redis 版本的检查以及组的创建等功能。
生产者
package com.mjg.config;
import lombok.requiredargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.springframework.data.redis.connection.stream.recordid;
import org.springframework.data.redis.connection.stream.streamrecords;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.stereotype.component;
import java.util.collections;
@component
@requiredargsconstructor
@slf4j
public class messageproducer {
private final redistemplate<string, object> redistemplate;
public void sendmessage(string streamkey, object message) {
recordid recordid = redistemplate
.opsforstream().add(streamrecords.newrecord()
.ofmap(collections.singletonmap("data", message))
.withstreamkey(streamkey));
if (recordid!= null) {
log.info("message sent to stream '{}' with recordid: {}", streamkey, recordid);
}
}
}
说明:messageproducer 类负责向 redis stream 发送消息。
消费者
package com.mjg.config;
import lombok.requiredargsconstructor;
import org.springframework.data.redis.connection.stream.objectrecord;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.data.redis.stream.streamlistener;
import org.springframework.stereotype.component;
@requiredargsconstructor
@component
public class messageconsumer implements streamlistener<string, objectrecord<string, string>> {
private final redistemplate<string, object> redistemplate;
@override
public void onmessage(objectrecord<string, string> message) {
string stream = message.getstream();
string messageid = message.getid().tostring();
string messagebody = message.getvalue();
system.out.println("received message from stream '" + stream + "' with messageid: " + messageid);
system.out.println("message body: " + messagebody);
// 消息应答
redistemplate.opsforstream().acknowledge(redisstreamconfig.streamname, redisstreamconfig.usereventgroup, message.getid());
}
}
说明:messageconsumer 类实现了 streamlistener 接口,用于处理从 redis stream 接收到的消息,并进行相应的应答操作。
测试
@requiredargsconstructor
@slf4j
@restcontroller
public class messagecontroller {
public static string streamname = "user-event-stream";
private final messageproducer messageproducer;
@getmapping("/send")
public void send() {
messageproducer.sendmessage(streamname, "hello 啦啦啦啦" + localdatetime.now());
}
}
说明:messagecontroller 类中的 send 方法通过调用 messageproducer 来发送消息到指定的 redis stream 中。

以上就是springboot使用redis stream实现轻量消息队列的示例代码的详细内容,更多关于springboot redis stream轻量消息队列的资料请关注代码网其它相关文章!
发表评论