当前位置: 代码网 > it编程>编程语言>Java > SpringBoot使用Redis Stream实现轻量消息队列的示例代码

SpringBoot使用Redis Stream实现轻量消息队列的示例代码

2024年09月11日 Java 我要评论
引言redis stream 是 redis 5.0 引入的一种数据结构,用于处理日志类型的数据。它提供了高效、可靠的方式来处理和存储时间序列数据,如事件、消息等。其设计灵感源于 kafka 和类似的

引言

redis stream 是 redis 5.0 引入的一种数据结构,用于处理日志类型的数据。它提供了高效、可靠的方式来处理和存储时间序列数据,如事件、消息等。其设计灵感源于 kafka 和类似的消息队列系统,且完全集成在 redis 中,利用了 redis 的高性能和持久化特性。

依赖

<dependency>  
    <groupid>org.springframework.boot</groupid>  
    <artifactid>spring-boot-starter-data-redis</artifactid>  
</dependency>

说明:此部分定义了 redis 相关的依赖,确保项目能够引入并使用 spring boot 提供的 redis 启动器。

redistemplate 配置

package com.mjg.config;  

import com.fasterxml.jackson.annotation.jsonautodetect;  
import com.fasterxml.jackson.annotation.propertyaccessor;  
import com.fasterxml.jackson.databind.objectmapper;  
import com.fasterxml.jackson.databind.serializationfeature;  
import com.fasterxml.jackson.datatype.jsr310.javatimemodule;  
import org.springframework.context.annotation.bean;  
import org.springframework.context.annotation.configuration;  
import org.springframework.data.redis.connection.redisconnectionfactory;  
import org.springframework.data.redis.core.redistemplate;  
import org.springframework.data.redis.serializer.jackson2jsonredisserializer;  
import org.springframework.data.redis.serializer.stringredisserializer;  

@configuration  
public class redisconfig {  

    @bean  
    public redistemplate<string, object> redistemplate(redisconnectionfactory connectionfactory) {  
        redistemplate<string, object> template = new redistemplate<>();  
        template.setconnectionfactory(connectionfactory);  
        jackson2jsonredisserializer<object> jackson2jsonredisserializer = new jackson2jsonredisserializer<>(object.class);  
        objectmapper om = new objectmapper();  
        om.setvisibility(propertyaccessor.all, jsonautodetect.visibility.any);  
//        om.activatedefaulttyping(laissezfairesubtypevalidator.instance, objectmapper.defaulttyping.non_final, jsontypeinfo.as.property);  
        // 注册 java 8 日期时间模块  
        om.registermodule(new javatimemodule());  
        om.configure(serializationfeature.write_dates_as_timestamps, false);  
        om.configure(serializationfeature.fail_on_empty_beans, false);  
        jackson2jsonredisserializer.serialize(om);  
        stringredisserializer stringredisserializer = new stringredisserializer();  
        // key 采用 string 的序列化方式  
        template.setkeyserializer(stringredisserializer);  
        // hash 的 key 也采用 string 的序列化方式  
        template.sethashkeyserializer(stringredisserializer);  
        // value 序列化方式采用 jackson  
        template.setvalueserializer(jackson2jsonredisserializer);  
        // hash 的 value 序列化方式采用 jackson  
        template.sethashvalueserializer(jackson2jsonredisserializer);  
        template.afterpropertiesset();  
        return template;  
    }  
}

说明:此配置类用于设置 redistemplate 的序列化方式,以满足不同数据类型的存储和读取需求。

redisstreamconfig

package com.mjg.config;  

import cn.hutool.core.convert.convert;  
import cn.hutool.core.util.strutil;  
import lombok.requiredargsconstructor;  
import lombok.sneakythrows;  
import lombok.extern.slf4j.slf4j;  
import org.springframework.beans.factory.disposablebean;  
import org.springframework.beans.factory.initializingbean;  
import org.springframework.context.annotation.bean;  
import org.springframework.context.annotation.configuration;  
import org.springframework.data.redis.connection.redisconnectionfactory;  
import org.springframework.data.redis.connection.redisservercommands;  
import org.springframework.data.redis.connection.stream.consumer;  
import org.springframework.data.redis.connection.stream.objectrecord;  
import org.springframework.data.redis.connection.stream.readoffset;  
import org.springframework.data.redis.connection.stream.streamoffset;  
import org.springframework.data.redis.core.rediscallback;  
import org.springframework.data.redis.core.redistemplate;  
import org.springframework.data.redis.core.streamoperations;  
import org.springframework.data.redis.stream.streamlistener;  
import org.springframework.data.redis.stream.streammessagelistenercontainer;  
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;  
import org.springframework.util.assert;  

import java.net.inetaddress;  
import java.time.duration;  
import java.util.properties;  

@slf4j  
@requiredargsconstructor  
@configuration  
public class redisstreamconfig implements initializingbean, disposablebean {  
    private final redistemplate<string, object> redistemplate;  

    public static string streamname = "user-event-stream";  
    public static string usereventgroup = "user-event-group";  
    private final threadpooltaskexecutor threadpooltaskexecutor;  

    /**  
     * 消息侦听器容器,用于监听 redis stream 中的消息  
     *  
     * @param connectionfactory redis 连接工厂,用于创建 redis 连接  
     * @param messageconsumer   消息消费者,用于处理接收到的消息  
     * @return 返回 {@link streammessagelistenercontainer}<{@link string}, {@link objectrecord}<{@link string}, {@link string}>> 类型的消息侦听器容器  
     */  
    @bean  
    public streammessagelistenercontainer<string, objectrecord<string, string>> messagelistenercontainer(redisconnectionfactory connectionfactory, messageconsumer messageconsumer) {  
        streammessagelistenercontainer<string, objectrecord<string, string>> listenercontainer = streamcontainer(streamname, connectionfactory, messageconsumer);  
        listenercontainer.start();  
        return listenercontainer;  
    }  

    /**  
     * 创建一个流容器,用于监听 redis stream 中的数据  
     *  
     * @param streamname        redis stream 的名称  
     * @param connectionfactory redis 连接工厂  
     * @param streamlistener    绑定的监听类  
     * @return 返回 streammessagelistenercontainer 对象  
     */  
    @sneakythrows  
    private streammessagelistenercontainer<string, objectrecord<string, string>> streamcontainer(string streamname, redisconnectionfactory connectionfactory, streamlistener<string, objectrecord<string, string>> streamlistener) {  
        streammessagelistenercontainer.streammessagelistenercontaineroptions<string, objectrecord<string, string>> options =  
                streammessagelistenercontainer.streammessagelistenercontaineroptions  
                      .builder()  
                      .polltimeout(duration.ofseconds(5)) // 拉取消息超时时间  
                      .batchsize(10) // 批量抓取消息  
                      .targettype(string.class) // 传递的数据类型  
                      .executor(threadpooltaskexecutor)  
                      .build();  
        streammessagelistenercontainer<string, objectrecord<string, string>> container = streammessagelistenercontainer  
              .create(connectionfactory, options);  
        // 指定消费最新的消息  
        streamoffset<string> offset = streamoffset.create(streamname, readoffset.lastconsumed());  
        // 创建消费者  
        streammessagelistenercontainer.streamreadrequest<string> streamreadrequest = buildstreamreadrequest(offset, streamlistener);  
        // 指定消费者对象  
        container.register(streamreadrequest, streamlistener);  
        return container;  
    }  

    /**  
     * 生成流读取请求  
     *  
     * @param offset         偏移量,用于指定从 redis stream 中的哪个位置开始读取消息  
     * @param streamlistener 流侦听器,用于处理接收到的消息  
     * @return 返回一个 streamreadrequest 对象,表示一个流读取请求  
     * @throws exception 当 streamlistener 无法识别为 messageconsumer 类型时,抛出异常  
     */  
    private streammessagelistenercontainer.streamreadrequest<string> buildstreamreadrequest(streamoffset<string> offset, streamlistener<string, objectrecord<string, string>> streamlistener) throws exception {  
        consumer consumer;  
        if (streamlistener instanceof messageconsumer) {  
            consumer = consumer.from(usereventgroup, inetaddress.getlocalhost().gethostname());  
        } else {  
            throw new exception("无法识别的 stream key");  
        }  
        // 关闭自动 ack 确认  
        return streammessagelistenercontainer.streamreadrequest.builder(offset)  
              .errorhandler((error) -> {  
                    log.error(error.getmessage());  
                })  
              .cancelonerror(e -> false)  
              .consumer(consumer)  
                // 关闭自动 ack 确认  
              .autoacknowledge(false)  
              .build();  
    }  

    /**  
     * 检查 redis 版本是否符合要求  
     *  
     * @throws illegalstateexception 如果 redis 版本小于 5.0.0 版本,抛出该异常  
     */  
    private void checkredisversion() {  
        // 获得 redis 版本  
        properties info = redistemplate.execute((rediscallback<properties>) redisservercommands::info);  
        assert.notnull(info, "redis info is null");  
        object redisversion = info.get("redis_version");  
        integer anint = convert.toint(redisversion);  
        if (anint < 5) {  
            throw new illegalstateexception(strutil.format("您当前的 redis 版本为 {},小于最低要求的 5.0.0 版本!", redisversion));  
        }  
    }  

    @override  
    public void destroy() throws exception {  
    }  
    @override  
    public void afterpropertiesset() throws exception {  
        checkredisversion();  
        streamoperations<string, object, object> streamoperations = redistemplate.opsforstream();  
        if (boolean.false.equals(redistemplate.haskey(streamname))) {  
            streamoperations.creategroup(streamname, readoffset.from("0"), usereventgroup);  
        }  
    }  
}

说明:该配置类实现了对 redis stream 的相关配置,包括消息监听容器的创建、流读取请求的生成、redis 版本的检查以及组的创建等功能。

生产者

package com.mjg.config;  

import lombok.requiredargsconstructor;  
import lombok.extern.slf4j.slf4j;  
import org.springframework.data.redis.connection.stream.recordid;  
import org.springframework.data.redis.connection.stream.streamrecords;  
import org.springframework.data.redis.core.redistemplate;  
import org.springframework.stereotype.component;  

import java.util.collections;  

@component  
@requiredargsconstructor  
@slf4j  
public class messageproducer {  

    private final redistemplate<string, object> redistemplate;  

    public void sendmessage(string streamkey, object message) {  
        recordid recordid = redistemplate  
              .opsforstream().add(streamrecords.newrecord()  
                      .ofmap(collections.singletonmap("data", message))  
                      .withstreamkey(streamkey));  
        if (recordid!= null) {  
            log.info("message sent to stream '{}' with recordid: {}", streamkey, recordid);  
        }  
    }  
}

说明:messageproducer 类负责向 redis stream 发送消息。

消费者

package com.mjg.config;  

import lombok.requiredargsconstructor;  
import org.springframework.data.redis.connection.stream.objectrecord;  
import org.springframework.data.redis.core.redistemplate;  
import org.springframework.data.redis.stream.streamlistener;  
import org.springframework.stereotype.component;  

@requiredargsconstructor  
@component  
public class messageconsumer implements streamlistener<string, objectrecord<string, string>> {  

    private final redistemplate<string, object> redistemplate;  

    @override  
    public void onmessage(objectrecord<string, string> message) {  
        string stream = message.getstream();  
        string messageid = message.getid().tostring();  
        string messagebody = message.getvalue();  

        system.out.println("received message from stream '" + stream + "' with messageid: " + messageid);  
        system.out.println("message body: " + messagebody);  

//        消息应答  
        redistemplate.opsforstream().acknowledge(redisstreamconfig.streamname, redisstreamconfig.usereventgroup, message.getid());  
    }  
}

说明:messageconsumer 类实现了 streamlistener 接口,用于处理从 redis stream 接收到的消息,并进行相应的应答操作。

测试

@requiredargsconstructor  
@slf4j  
@restcontroller  
public class messagecontroller {  
    public static string streamname = "user-event-stream";  
    private final messageproducer messageproducer;  

    @getmapping("/send")  
    public void send() {  
        messageproducer.sendmessage(streamname, "hello 啦啦啦啦" + localdatetime.now());  
    }  
}

说明:messagecontroller 类中的 send 方法通过调用 messageproducer 来发送消息到指定的 redis stream 中。

以上就是springboot使用redis stream实现轻量消息队列的示例代码的详细内容,更多关于springboot redis stream轻量消息队列的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com