当前位置: 代码网 > it编程>编程语言>Java > SpringBoot中实现Redis Stream队列的代码实例

SpringBoot中实现Redis Stream队列的代码实例

2024年09月12日 Java 我要评论
前言简单实现一下在springboot中操作redis stream队列的方式,监听队列中的消息进行消费。jdk:1.8springboot-version:2.6.3redis:5.0.1(5版本以

前言

简单实现一下在springboot中操作redis stream队列的方式,监听队列中的消息进行消费。

  • jdk:1.8
  • springboot-version:2.6.3
  • redis:5.0.1(5版本以上才有stream队列)

准备工作

1、pom

redis 依赖包(version 2.6.3)

        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-data-redis</artifactid>
        </dependency>

2、 yml

spring: 
  redis:
    database: 0
    host: 127.0.0.1

3、 redisstreamutil工具类

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.connection.stream.maprecord;
import org.springframework.data.redis.connection.stream.streaminfo;
import org.springframework.data.redis.connection.stream.streamoffset;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.stereotype.component;

import java.util.list;
import java.util.map;

@component
public class redisstreamutil {

	@autowired
	private redistemplate<string, object> redistemplate;

	/**
	 * 创建消费组
	 *
	 * @param key   键名称
	 * @param group 组名称
	 * @return {@link string}
	 */
	public string oup(string key, string group) {
		return redistemplate.opsforstream().creategroup(key, group);
	}

	/**
	 * 获取消费者信息
	 *
	 * @param key   键名称
	 * @param group 组名称
	 * @return {@link streaminfo.xinfoconsumers}
	 */
	public streaminfo.xinfoconsumers queryconsumers(string key, string group) {
		return redistemplate.opsforstream().consumers(key, group);
	}

	/**
	 * 查询组信息
	 *
	 * @param key 键名称
	 * @return
	 */
	public streaminfo.xinfogroups querygroups(string key) {
		return redistemplate.opsforstream().groups(key);
	}

	// 添加map消息
	public string addmap(string key, map<string, object> value) {
		return redistemplate.opsforstream().add(key, value).getvalue();
	}

	// 读取消息
	public list<maprecord<string, object, object>> read(string key) {
		return redistemplate.opsforstream().read(streamoffset.fromstart(key));
	}

	// 确认消费
	public long ack(string key, string group, string... recordids) {
		return redistemplate.opsforstream().acknowledge(key, group, recordids);
	}

	// 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
	public long del(string key, string... recordids) {
		return redistemplate.opsforstream().delete(key, recordids);
	}

	// 判断是否存在key
	public boolean haskey(string key) {
		boolean aboolean = redistemplate.haskey(key);
		return aboolean != null && aboolean;
	}
}

代码实现

生产者发送消息

生产者发送消息,在service层创建addmessage方法,往队列中发送消息。

代码中addmap()方法第一个参数为key,第二个参数为value,该key要和后续配置的保持一致,暂时先记住这个key。

@service
@slf4j
@requiredargsconstructor
public class redisstreammqserviceimpl implements redisstreammqservice {

    private final redisstreamutil redisstreamutil;

    /**
     * 发送一个消息
     *
     * @return {@code object}
     */
    @override
    public object addmessage() {
        redisuser redisuser = new redisuser();
        redisuser.setage(18);
        redisuser.setname("hcr");
        redisuser.setemail("156ef561@gmail.com");

        map<string, object> message = new hashmap<>();
        message.put("user", redisuser);

        string recordid = redisstreamutil.addmap("mystream", message);
        return recordid;
    }
}

controller接口方法

@restcontroller
@requestmapping("/redis")
@slf4j
@requiredargsconstructor
public class rediscontroller {

    private final redisstreammqservice redisstreammqservice;

    @getmapping("/addmessage")
    public object addmessage() {
        return redisstreammqservice.addmessage();
    }
}

调用测试,查看redis中是否正常添加数据。

接口返回数据

1702622585248-0

查看redis中的数据

消费者监听消息进行消费

创建redisconsumerslistener监听器

import cn.hcr.utils.redisstreamutil;
import lombok.requiredargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.springframework.data.redis.connection.stream.maprecord;
import org.springframework.data.redis.connection.stream.recordid;
import org.springframework.data.redis.stream.streamlistener;
import org.springframework.stereotype.component;

import java.util.map;

@component
@slf4j
@requiredargsconstructor
public class redisconsumerslistener implements streamlistener<string, maprecord<string, string, string>> {

    public final redisstreamutil redisstreamutil;

    /**
     * 监听器
     *
     * @param message
     */
    @override
    public void onmessage(maprecord<string, string, string> message) {
        // stream的key值
        string streamkey = message.getstream();
        //消息id
        recordid recordid = message.getid();
        //消息内容
        map<string, string> msg = message.getvalue();
        log.info("【streamkey】= " + streamkey + ",【recordid】= " + recordid + ",【msg】=" + msg);

        //处理逻辑

        //逻辑处理完成后,ack消息,删除消息,group为消费组名称
        streaminfo.xinfogroups xinfogroups = redisstreamutil.querygroups(streamkey);
        xinfogroups.foreach(xinfogroup -> redisstreamutil.ack(streamkey, xinfogroup.groupname(), recordid.getvalue()));
        redisstreamutil.del(streamkey, recordid.getvalue());
    }
}

创建redisconfig配置类,配置监听

package cn.hcr.config;

import cn.hcr.listener.redisconsumerslistener;
import cn.hcr.utils.redisstreamutil;
import com.fasterxml.jackson.annotation.jsonautodetect;
import com.fasterxml.jackson.annotation.propertyaccessor;
import com.fasterxml.jackson.databind.objectmapper;
import lombok.extern.slf4j.slf4j;
import lombok.var;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.data.redis.connection.redisconnectionfactory;
import org.springframework.data.redis.connection.stream.consumer;
import org.springframework.data.redis.connection.stream.maprecord;
import org.springframework.data.redis.connection.stream.readoffset;
import org.springframework.data.redis.connection.stream.streamoffset;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.data.redis.serializer.jackson2jsonredisserializer;
import org.springframework.data.redis.serializer.stringredisserializer;
import org.springframework.data.redis.stream.streammessagelistenercontainer;
import org.springframework.data.redis.stream.subscription;

import javax.annotation.resource;
import java.time.duration;
import java.util.hashmap;
import java.util.map;
import java.util.concurrent.linkedblockingdeque;
import java.util.concurrent.threadpoolexecutor;
import java.util.concurrent.timeunit;
import java.util.concurrent.atomic.atomicinteger;

@configuration
@slf4j
public class redisconfig {

    @resource
    private redisstreamutil redisstreamutil;

    /**
     * redis序列化
     *
     * @param redisconnectionfactory
     * @return {@code redistemplate<string, object>}
     */
    @bean
    public redistemplate<string, object> redistemplate(redisconnectionfactory redisconnectionfactory) {
        redistemplate<string, object> template = new redistemplate<>();
        template.setconnectionfactory(redisconnectionfactory);
        jackson2jsonredisserializer jackson2jsonredisserializer = new jackson2jsonredisserializer(object.class);
        objectmapper om = new objectmapper();
        om.setvisibility(propertyaccessor.all, jsonautodetect.visibility.any);
        om.enabledefaulttyping(objectmapper.defaulttyping.non_final);
        jackson2jsonredisserializer.setobjectmapper(om);
        stringredisserializer stringredisserializer = new stringredisserializer();
        template.setkeyserializer(stringredisserializer);
        template.sethashkeyserializer(stringredisserializer);
        template.setvalueserializer(jackson2jsonredisserializer);
        template.sethashvalueserializer(jackson2jsonredisserializer);
        template.afterpropertiesset();
        return template;
    }

    @bean
    public subscription subscription(redisconnectionfactory factory) {
        atomicinteger index = new atomicinteger(1);
        int processors = runtime.getruntime().availableprocessors();
        threadpoolexecutor executor = new threadpoolexecutor(processors, processors, 0, timeunit.seconds,
                new linkedblockingdeque<>(), r -> {
            thread thread = new thread(r);
            thread.setname("async-stream-consumer-" + index.getandincrement());
            thread.setdaemon(true);
            return thread;
        });
        streammessagelistenercontainer.streammessagelistenercontaineroptions<string, maprecord<string, string, string>> options =
                streammessagelistenercontainer
                        .streammessagelistenercontaineroptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchsize(5)
                        .executor(executor)
                        .polltimeout(duration.ofseconds(1))
                        .errorhandler(throwable -> {
                            log.error("[mq handler exception]", throwable);
                            throwable.printstacktrace();
                        })
                        .build();
        
        //该key和group可根据需求自定义配置
        string streamname = "mystream";
        string groupname = "mygroup";

        initstream(streamname, groupname);
        var listenercontainer = streammessagelistenercontainer.create(factory, options);
        // 手动ask消息
        subscription subscription = listenercontainer.receive(consumer.from(groupname, "zhuyazhou"),
                streamoffset.create(streamname, readoffset.lastconsumed()), new redisconsumerslistener(redisstreamutil));
        // 自动ask消息
           /* subscription subscription = listenercontainer.receiveautoack(consumer.from(redismqgroup.getname(), redismqgroup.getconsumers()[0]),
                    streamoffset.create(streamname, readoffset.lastconsumed()), new reportreadmqlistener());*/
        listenercontainer.start();
        return subscription;
    }

    private void initstream(string key, string group) {
        boolean haskey = redisstreamutil.haskey(key);
        if (!haskey) {
            map<string, object> map = new hashmap<>(1);
            map.put("field", "value");
            //创建主题
            string result = redisstreamutil.addmap(key, map);
            //创建消费组
            redisstreamutil.oup(key, group);
            //将初始化的值删除掉
            redisstreamutil.del(key, result);
            log.info("stream:{}-group:{} initialize success", key, group);
        }
    }
}

redistemplate:该bean用于配置redis序列化

subscription:配置监听

initstream:初始化消费组

监听测试

使用addmessage()方法投送一条消息后,查看控制台输出信息。

【streamkey】= mystream,
【recordid】= 1702623008044-0,
【msg】=
{user=[
    "cn.hcr.pojo.redisuser",
    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
    ]
}

总结

以上就是在springboot中简单实现redis stream队列的demo,如有需要源码或者哪里不清楚的请评论或者发送私信。
template:该bean用于配置redis序列化

subscription:配置监听

initstream:初始化消费组

到此这篇关于springboot中实现redis stream队列的文章就介绍到这了,更多相关springboot实现redis stream队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com