当前位置: 代码网 > it编程>数据库>Redis > Redis实现延迟队列的项目示例

Redis实现延迟队列的项目示例

2024年07月03日 Redis 我要评论
最近用到一个延迟消息的功能,第一时间想到使用mq或者mq的插件,因为数据量不大,所以尝试使用redis来实现了,毕竟redis也天生支持类似mq的队列消费,所以,在这里总结了一下redis实现延迟消息

最近用到一个延迟消息的功能,第一时间想到使用mq或者mq的插件,因为数据量不大,所以尝试使用redis来实现了,毕竟redis也天生支持类似mq的队列消费,所以,在这里总结了一下redis实现延迟消息队列的方式。

一、监听key过期时间

处理流程:当redis的一个key过期时,redis会生成一个事件,通知订阅了该事件的客户端(keyexpirationeventmessagelistener),然后在客户端的回调方法中处理逻辑。
1)新建springboot项目,maven依赖及yml如下
maven依赖:

<dependencies>
     <dependency>
         <groupid>org.springframework.boot</groupid>
         <artifactid>spring-boot-starter-web</artifactid>
     </dependency>
     <dependency>
         <groupid>org.springframework.boot</groupid>
         <artifactid>spring-boot-starter-data-redis</artifactid>
     </dependency>

     <dependency>
         <groupid>org.projectlombok</groupid>
         <artifactid>lombok</artifactid>
     </dependency>

 </dependencies>

yml文件

server:
  port: 8000

spring:
  redis:
    database: 0
    host: xxxx
    port: 6379
    password: xxxxxx
    lettuce:
      pool:
        #最大连接数
        max-active: 8
        #最大阻塞等待时间
        max-wait: -1
        #最大空闲
        max-idle: 8
        #最小空闲
        min-idle: 0
    #连接超时时间
    timeout: 5000

2)修改redis.conf文件开启事件通知配置
默认的配置:notify-keyspace-events “”
修改为:notify-keyspace-events ex,该配置表示监听key的过期事件

3)设置redis监听配置,注入bean redismessagelistenercontaine

@configuration
public class redistimeoutconfiguration {

    @autowired
    private redisconnectionfactory redisconnectionfactory;

    @bean
    public redismessagelistenercontainer redismessagelistenercontainer() {
        redismessagelistenercontainer redismessagelistenercontainer = new redismessagelistenercontainer();
        redismessagelistenercontainer.setconnectionfactory(redisconnectionfactory);
        return redismessagelistenercontainer;
    }

    @bean
    public keyexpiredlistener keyexpiredlistener() {
        return new keyexpiredlistener(this.redismessagelistenercontainer());
    }
}

4)创建监听器类,重写key过期回调方法onmessage

@slf4j
public class keyexpiredlistener extends keyexpirationeventmessagelistener {

    @autowired
    public redistemplate<string, string> redistemplate;

    public keyexpiredlistener(redismessagelistenercontainer listenercontainer) {
        super(listenercontainer);
    }

    @override
    public void onmessage(message message, byte[] bytes) {
        string channel = new string(message.getchannel(), standardcharsets.utf_8);
        //过期的key
        string key = new string(message.getbody(), standardcharsets.utf_8);
        log.info("redis key 过期:bytes={},channel={},key={}", new string(bytes), channel, key);
    }
}

5)编写测试接口:写入一个带过期时间的key

@restcontroller
@requestmapping("/demo")
public class basiccontroller {

    @autowired
    public redistemplate<string, string> redistemplate;

    @getmapping(value = "/test")
    public void redistest() {
        redistemplate.opsforvalue().set("test", "5s后过期", 5, timeunit.seconds);
    }
}

执行后,onmessage监听方法打印结果:

 redis key 过期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test

该方案缺点:可靠性问题,redis 是一个内存数据库,尽管它提供了数据持久化选项(如 rdb 和 aof),但在某些情况下(如意外崩溃或重启),可能会丢失一些未处理的过期事件。

二、zset + score

基本思路是将消息按需发送的时间作为分数存储在有序集合zset中,然后定期检查并处理到期的消息。代码例子如下:
1)创建 delayedmessageservice 类

@slf4j
@service
public class delayedmessageservice {

    private static final string delayed_messages_zset = "delayed:messages";


    @autowired
    private redistemplate<string, string> redistemplate;

    public void addmessage(string message, long delaymillis) {
        long score = system.currenttimemillis() + delaymillis;
        redistemplate.opsforzset().add(delayed_messages_zset, message, score);
    }


    @scheduled(fixedrate = 1000)
    public void processmessages() {
        long now = system.currenttimemillis();
        set<zsetoperations.typedtuple<string>> messages = redistemplate.opsforzset().rangebyscorewithscores(delayed_messages_zset, 0, now);
        if (messages != null && !messages.isempty()) {
            for (zsetoperations.typedtuple<string> message : messages) {
                string msg = message.getvalue();
                long score = message.getscore().longvalue();
                if (score <= now) {
                    // process the message
                    system.out.println("processing message: " + msg);
                    // remove the message from the zset
                    redistemplate.opsforzset().remove(delayed_messages_zset, msg);
                }
            }
        }else{
            log.info("定时任务执行~");
        }
    }


}

2)编写controller接口测试,初始化zset内容

@restcontroller
@requestmapping("/demo")
public class basiccontroller {

    @autowired
    private delayedmessageservice delayedmessageservice;

    @getmapping(value = "/test2")
    public void rediszsettest() {
        // add some messages with delays
        delayedmessageservice.addmessage("message 1", 5000); // 5 seconds delay
        delayedmessageservice.addmessage("message 2", 10000); // 10 seconds delay
        delayedmessageservice.addmessage("message 3", 15000); // 15 seconds delay
    }
}

说明:

  • rediszsettest接口通过调用delayedmessageserviceaddmessage方法,将消息及其到期时间添加到 redis 的 zset 中
  • 开启一个定时任务,定期检查和处理到期的消息。使用 @scheduled 注解定期执行,每秒检查一次,注意这里使用@scheduled,不要忘了启动类上添加@enablescheduling注解,否则定时任务不会生效。fixedrate 属性表示以固定的频率(毫秒为单位)执行方法。即方法执行完成后,会立即等待指定的毫秒数,然后再次执行。
  • 通过 redistemplate.opsforzset().rangebyscorewithscores 方法按时间范围获取到期的消息,消息处理完成后,从zset 中移除处理过的消息

三、redisson框架

利用 redisson 提供的数据结构rdelayedqueuerblockingdeque,可以自动处理过期的任务并将它们移动到阻塞队列中,这样我们就可以从阻塞队列中获取任务并进行消费处理。例子如下:
1)添加依赖

<!-- redisson 依赖项 -->
<dependency>
    <groupid>org.redisson</groupid>
    <artifactid>redisson-spring-boot-starter</artifactid>
    <version>2.15.1</version>
</dependency>

2)创建delayedmessageservice

@slf4j
@service
public class delayedmessageservice {

    @autowired
    private redissonclient redissonclient;

    private rblockingdeque<string> blockingdeque;
    private rdelayedqueue<string> delayedqueue;

    @postconstruct
    public void init() {
        this.blockingdeque = redissonclient.getblockingdeque("delayedqueue");
        this.delayedqueue = redissonclient.getdelayedqueue(blockingdeque);

        executors.newsinglethreadexecutor().submit(this::processmessages);
    }

    public void addmessage(string message, long delaymillis) {
        delayedqueue.offer(message, delaymillis, timeunit.milliseconds);
    }

    public void processmessages() {
        try {
            while (true) {
                string message = blockingdeque.take();
                // process the message
                log.info("消息被处理: " + message);
                // ..业务逻辑处理
            }
        } catch (interruptedexception e) {
            thread.currentthread().interrupt();
            log.error("中断异常",e);
        }
    }


}

3)测试接口

@getmapping(value = "/test3")
    public void redisqueuetest() {
        // add some messages with delays
        delayedmessageservice.addmessage("message 1", 5000); // 5 seconds delay
        delayedmessageservice.addmessage("message 2", 10000); // 10 seconds delay
        delayedmessageservice.addmessage("message 3", 15000); // 15 seconds delay
    }

说明:

rdelayedqueue 是 redisson 提供的延迟队列,它将消息存储在指定的队列中,直到消息到期才会被转移到该队列。它的主要作用包括:

  • 延迟消息管理:我们可以使用 rdelayedqueue 的 offer 方法将消息添加到延迟队列,并指定延迟时间,消息在延迟时间到期前一直保留在 rdelayedqueue 中。
  • 消息转移:一旦消息到期,rdelayedqueue 会自动将消息转移到指定的rblockingdeque 中。

rblockingqueue是 redisson 提供的阻塞队列,它支持阻塞操作。主要作用包括:

  • 阻塞操作:支持阻塞的 take 操作,如果队列中没有元素,会一直阻塞直到有元素可供消费。

总结
个人推荐使用redisson 的rdelayedqueue 方式,感觉更加可靠和简单一些,当然zset+score也可以是个不错选择,毕竟更加灵活,延迟消息还有其他不同的方案,比如rocketmq、rabbitmq插件等,假如项目中用了redis,又不想引入更多的中间件,可以尝试使用redis来实现,为了测试,这里例子都比较简单,在实际使用过程中,还要考虑补偿机制、幂等性等问题。

参考:

1.https://blog.csdn.net/qq_34826261/article/details/120598731

到此这篇关于redis实现延迟队列的项目示例的文章就介绍到这了,更多相关redis 延迟队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com