引言
redis stream 是 redis 5.0 引入的一种数据结构,用于处理日志类型的数据。它提供了高效、可靠的方式来处理和存储时间序列数据,如事件、消息等。其设计灵感源于 kafka 和类似的消息队列系统,且完全集成在 redis 中,利用了 redis 的高性能和持久化特性。
依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> </dependency>
说明:此部分定义了 redis 相关的依赖,确保项目能够引入并使用 spring boot 提供的 redis 启动器。
redistemplate 配置
package com.mjg.config; import com.fasterxml.jackson.annotation.jsonautodetect; import com.fasterxml.jackson.annotation.propertyaccessor; import com.fasterxml.jackson.databind.objectmapper; import com.fasterxml.jackson.databind.serializationfeature; import com.fasterxml.jackson.datatype.jsr310.javatimemodule; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.data.redis.connection.redisconnectionfactory; import org.springframework.data.redis.core.redistemplate; import org.springframework.data.redis.serializer.jackson2jsonredisserializer; import org.springframework.data.redis.serializer.stringredisserializer; @configuration public class redisconfig { @bean public redistemplate<string, object> redistemplate(redisconnectionfactory connectionfactory) { redistemplate<string, object> template = new redistemplate<>(); template.setconnectionfactory(connectionfactory); jackson2jsonredisserializer<object> jackson2jsonredisserializer = new jackson2jsonredisserializer<>(object.class); objectmapper om = new objectmapper(); om.setvisibility(propertyaccessor.all, jsonautodetect.visibility.any); // om.activatedefaulttyping(laissezfairesubtypevalidator.instance, objectmapper.defaulttyping.non_final, jsontypeinfo.as.property); // 注册 java 8 日期时间模块 om.registermodule(new javatimemodule()); om.configure(serializationfeature.write_dates_as_timestamps, false); om.configure(serializationfeature.fail_on_empty_beans, false); jackson2jsonredisserializer.serialize(om); stringredisserializer stringredisserializer = new stringredisserializer(); // key 采用 string 的序列化方式 template.setkeyserializer(stringredisserializer); // hash 的 key 也采用 string 的序列化方式 template.sethashkeyserializer(stringredisserializer); // value 序列化方式采用 jackson template.setvalueserializer(jackson2jsonredisserializer); // hash 的 value 序列化方式采用 jackson template.sethashvalueserializer(jackson2jsonredisserializer); template.afterpropertiesset(); return template; } }
说明:此配置类用于设置 redistemplate 的序列化方式,以满足不同数据类型的存储和读取需求。
redisstreamconfig
package com.mjg.config; import cn.hutool.core.convert.convert; import cn.hutool.core.util.strutil; import lombok.requiredargsconstructor; import lombok.sneakythrows; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.disposablebean; import org.springframework.beans.factory.initializingbean; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.data.redis.connection.redisconnectionfactory; import org.springframework.data.redis.connection.redisservercommands; import org.springframework.data.redis.connection.stream.consumer; import org.springframework.data.redis.connection.stream.objectrecord; import org.springframework.data.redis.connection.stream.readoffset; import org.springframework.data.redis.connection.stream.streamoffset; import org.springframework.data.redis.core.rediscallback; import org.springframework.data.redis.core.redistemplate; import org.springframework.data.redis.core.streamoperations; import org.springframework.data.redis.stream.streamlistener; import org.springframework.data.redis.stream.streammessagelistenercontainer; import org.springframework.scheduling.concurrent.threadpooltaskexecutor; import org.springframework.util.assert; import java.net.inetaddress; import java.time.duration; import java.util.properties; @slf4j @requiredargsconstructor @configuration public class redisstreamconfig implements initializingbean, disposablebean { private final redistemplate<string, object> redistemplate; public static string streamname = "user-event-stream"; public static string usereventgroup = "user-event-group"; private final threadpooltaskexecutor threadpooltaskexecutor; /** * 消息侦听器容器,用于监听 redis stream 中的消息 * * @param connectionfactory redis 连接工厂,用于创建 redis 连接 * @param messageconsumer 消息消费者,用于处理接收到的消息 * @return 返回 {@link streammessagelistenercontainer}<{@link string}, {@link objectrecord}<{@link string}, {@link string}>> 类型的消息侦听器容器 */ @bean public streammessagelistenercontainer<string, objectrecord<string, string>> messagelistenercontainer(redisconnectionfactory connectionfactory, messageconsumer messageconsumer) { streammessagelistenercontainer<string, objectrecord<string, string>> listenercontainer = streamcontainer(streamname, connectionfactory, messageconsumer); listenercontainer.start(); return listenercontainer; } /** * 创建一个流容器,用于监听 redis stream 中的数据 * * @param streamname redis stream 的名称 * @param connectionfactory redis 连接工厂 * @param streamlistener 绑定的监听类 * @return 返回 streammessagelistenercontainer 对象 */ @sneakythrows private streammessagelistenercontainer<string, objectrecord<string, string>> streamcontainer(string streamname, redisconnectionfactory connectionfactory, streamlistener<string, objectrecord<string, string>> streamlistener) { streammessagelistenercontainer.streammessagelistenercontaineroptions<string, objectrecord<string, string>> options = streammessagelistenercontainer.streammessagelistenercontaineroptions .builder() .polltimeout(duration.ofseconds(5)) // 拉取消息超时时间 .batchsize(10) // 批量抓取消息 .targettype(string.class) // 传递的数据类型 .executor(threadpooltaskexecutor) .build(); streammessagelistenercontainer<string, objectrecord<string, string>> container = streammessagelistenercontainer .create(connectionfactory, options); // 指定消费最新的消息 streamoffset<string> offset = streamoffset.create(streamname, readoffset.lastconsumed()); // 创建消费者 streammessagelistenercontainer.streamreadrequest<string> streamreadrequest = buildstreamreadrequest(offset, streamlistener); // 指定消费者对象 container.register(streamreadrequest, streamlistener); return container; } /** * 生成流读取请求 * * @param offset 偏移量,用于指定从 redis stream 中的哪个位置开始读取消息 * @param streamlistener 流侦听器,用于处理接收到的消息 * @return 返回一个 streamreadrequest 对象,表示一个流读取请求 * @throws exception 当 streamlistener 无法识别为 messageconsumer 类型时,抛出异常 */ private streammessagelistenercontainer.streamreadrequest<string> buildstreamreadrequest(streamoffset<string> offset, streamlistener<string, objectrecord<string, string>> streamlistener) throws exception { consumer consumer; if (streamlistener instanceof messageconsumer) { consumer = consumer.from(usereventgroup, inetaddress.getlocalhost().gethostname()); } else { throw new exception("无法识别的 stream key"); } // 关闭自动 ack 确认 return streammessagelistenercontainer.streamreadrequest.builder(offset) .errorhandler((error) -> { log.error(error.getmessage()); }) .cancelonerror(e -> false) .consumer(consumer) // 关闭自动 ack 确认 .autoacknowledge(false) .build(); } /** * 检查 redis 版本是否符合要求 * * @throws illegalstateexception 如果 redis 版本小于 5.0.0 版本,抛出该异常 */ private void checkredisversion() { // 获得 redis 版本 properties info = redistemplate.execute((rediscallback<properties>) redisservercommands::info); assert.notnull(info, "redis info is null"); object redisversion = info.get("redis_version"); integer anint = convert.toint(redisversion); if (anint < 5) { throw new illegalstateexception(strutil.format("您当前的 redis 版本为 {},小于最低要求的 5.0.0 版本!", redisversion)); } } @override public void destroy() throws exception { } @override public void afterpropertiesset() throws exception { checkredisversion(); streamoperations<string, object, object> streamoperations = redistemplate.opsforstream(); if (boolean.false.equals(redistemplate.haskey(streamname))) { streamoperations.creategroup(streamname, readoffset.from("0"), usereventgroup); } } }
说明:该配置类实现了对 redis stream 的相关配置,包括消息监听容器的创建、流读取请求的生成、redis 版本的检查以及组的创建等功能。
生产者
package com.mjg.config; import lombok.requiredargsconstructor; import lombok.extern.slf4j.slf4j; import org.springframework.data.redis.connection.stream.recordid; import org.springframework.data.redis.connection.stream.streamrecords; import org.springframework.data.redis.core.redistemplate; import org.springframework.stereotype.component; import java.util.collections; @component @requiredargsconstructor @slf4j public class messageproducer { private final redistemplate<string, object> redistemplate; public void sendmessage(string streamkey, object message) { recordid recordid = redistemplate .opsforstream().add(streamrecords.newrecord() .ofmap(collections.singletonmap("data", message)) .withstreamkey(streamkey)); if (recordid!= null) { log.info("message sent to stream '{}' with recordid: {}", streamkey, recordid); } } }
说明:messageproducer 类负责向 redis stream 发送消息。
消费者
package com.mjg.config; import lombok.requiredargsconstructor; import org.springframework.data.redis.connection.stream.objectrecord; import org.springframework.data.redis.core.redistemplate; import org.springframework.data.redis.stream.streamlistener; import org.springframework.stereotype.component; @requiredargsconstructor @component public class messageconsumer implements streamlistener<string, objectrecord<string, string>> { private final redistemplate<string, object> redistemplate; @override public void onmessage(objectrecord<string, string> message) { string stream = message.getstream(); string messageid = message.getid().tostring(); string messagebody = message.getvalue(); system.out.println("received message from stream '" + stream + "' with messageid: " + messageid); system.out.println("message body: " + messagebody); // 消息应答 redistemplate.opsforstream().acknowledge(redisstreamconfig.streamname, redisstreamconfig.usereventgroup, message.getid()); } }
说明:messageconsumer 类实现了 streamlistener 接口,用于处理从 redis stream 接收到的消息,并进行相应的应答操作。
测试
@requiredargsconstructor @slf4j @restcontroller public class messagecontroller { public static string streamname = "user-event-stream"; private final messageproducer messageproducer; @getmapping("/send") public void send() { messageproducer.sendmessage(streamname, "hello 啦啦啦啦" + localdatetime.now()); } }
说明:messagecontroller 类中的 send 方法通过调用 messageproducer 来发送消息到指定的 redis stream 中。
以上就是springboot使用redis stream实现轻量消息队列的示例代码的详细内容,更多关于springboot redis stream轻量消息队列的资料请关注代码网其它相关文章!
发表评论