延迟队列应用场景
- 订单支付超时:用户下单后30分钟未支付,自动取消订单。
- 订单评价超时:订单签收后7天未评价,系统默认好评。
- 商家接单超时:下单成功后商家5分钟未接单,订单取消。
- 配送超时提醒:配送超时,推送短信提醒。
技术选型分析
针对延迟任务处理机制,主要可选方案有以下四种:定时轮询、redisson 延迟队列、消息中间件、redis 过期监听。
1. 定时任务轮询
机制: 通过定时任务如 @scheduled 或 quartz),以固定频率轮询数据库或 redis,查找已到期的任务并处理。
优点:由于springboot原生支持,实现成本低,并且可以任务统一管理
缺点:
- 处理存在延迟非实时,精度取决于轮询间隔。
- 容易空轮询,浪费 cpu 或数据库资源。
- 不适合高并发或对时效要求高的业务场景。
适用场景:小型系统、任务量小、业务容忍较大延迟的场景。
2. redisson 延迟队列(推荐)
机制说明:
基于 redis zset 和 list 封装的延迟队列,由 redisson 实现,支持回调消费。
优点:
- 接入简单,redisson 封装完备。
- 实时性较好,精度可达秒级,满足大多数业务需求。
- 可注册不同处理器,业务扩展方便。
- 支持分布式部署,天然适配 redis 集群环境。
缺点:实现依赖 redisson,同时需要保证redis 崩溃等情况设计补偿保护机制
适用场景:中大型系统、微服务架构下的延迟任务处理。
3. 消息中间件延迟队列( rabbitmq、kafka等)
机制:通过消息中间件的 ttl(消息生存时间)和死信队列机制实现延迟任务,例如 rabbitmq 的 dlx(死信交换机)或 kafka 的延迟消费。
优点:
- 毫秒级精度,适合高并发和对时效性要求高的业务。
- 高可靠性,天然支持异步解耦与分布式处理。
- 支持大规模任务并发调度。
缺点:
- 配置复杂,需要配置 ttl、dlx 等。
- 引入 mq 系统,提升系统复杂度与维护成本。
- 对运维能力有一定要求。
适用场景:高并发、高可用要求的核心业务,如订单超时关闭、促销活动控制等。
4. redis key 过期监听(不推荐)
最初是通过redis的思路来实现延迟队列功能,但是通过查询资料和官方文档发现,redis并不适合此种场景
机制:
通过启用 redis 的 keyspace notification 功能,监听键过期事件(需设置 notify-keyspace-events 配置项)。
官方文档说明:
redis 中 key 的过期事件
expired有两种触发方式:
- 在访问 key 时发现其已过期
- 后台线程定期扫描并删除过期 key
因此,并不能保证在 ttl 恰好归零时立即触发过期事件,也不保证事件一定会触发。
缺点:
- 不可靠,事件触发不精确,且可能丢失。
- 无法支持分布式监听,redis 集群环境下存在局限。
- 对核心业务流程不具备可控性。
适用场景:临时性、非强一致性场景,如验证码、状态标记清理等。
参考:
选型建议总结
| 方案 | 实现难度 | 实时性 | 可靠性 | 分布式支持 | 推荐场景 |
|---|---|---|---|---|---|
| 定时任务轮询 | 低 | 低 | 中 | 有限 | 简单、低频业务 |
| redisson 延迟队列 | 中 | 中 | 高 | 好 | 分布式、业务量中等、场景标准 |
| mq 延迟队列 | 高 | 高 | 高 | 极好 | 高并发、大量异步、核心任务场景 |
| redis 过期监听 | 低 | 不确定 | 低 | 差 | 非核心场景,缓存状态变更类任务 |
redisson 延迟队列实现
项目结构
├── config │ └── redissonconfig.java # 配置 redisson 客户端,创建 redissonclient bean ├── controller │ └── deliverycontroller.java # 提供rest 接口模拟订单创建和收货操作,触发延迟任务 ├── enums │ └── delayqueueenum.java # 定义延迟队列的业务枚举及其关联的处理类 ├── hander │ ├── delayqueuehandler.java # 延迟队列处理器接口,定义 execute 方法 │ ├── evaluationtimeouthandler.java # 处理评价超时逻辑的具体实现类 │ └── orderpaymenttimeouthandler.java # 处理订单支付超时逻辑的具体实现类 ├── runner │ └── redisdelayqueuerunner.java # 启动后监听并执行延迟队列任务,使用线程池并发处理 ├── utils │ ├── redisdelayqueueutil.java # 封装 redis 延迟队列的操作方法(添加/获取元素) │ └── springutils.java # 工具类,用于在非 spring 管理类中获取 bean └── springbootapplication.java # spring boot 主类,包含程序入口 main 方法
redis延迟队列工具类
package com.zhou.demo.utils;
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rblockingdeque;
import org.redisson.api.rdelayedqueue;
import org.redisson.api.redissonclient;
import org.springframework.stereotype.component;
import javax.annotation.resource;
import java.util.concurrent.timeunit;
/**
* redis延迟队列工具
*/
@slf4j
@component
public class redisdelayqueueutil {
@resource
private redissonclient redissonclient;
/**
* 将元素添加到延迟队列中
*
* @param queuecode 队列键(用于标识不同的队列)
* @param value 要添加到队列中的值(泛型类型)
* @param delay 延迟时间(指定元素在队列中延迟被消费的时间)
* @param timeunit 时间单位(与延迟时间配合使用,如秒、毫秒等)
*/
public <t> void adddelayqueue(string queuecode, t value, long delay, timeunit timeunit) {
try {
rblockingdeque<object> blockingdeque = redissonclient.getblockingdeque(queuecode);
rdelayedqueue<object> delayedqueue = redissonclient.getdelayedqueue(blockingdeque);
delayedqueue.offer(value, delay, timeunit);
log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}秒", queuecode, value, timeunit.toseconds(delay));
} catch (exception e) {
log.error("(添加延时队列失败) {}", e.getmessage(), e);
throw new runtimeexception("(添加延时队列失败)", e);
}
}
/**
* 获取延迟队列中的元素
*
* @param queuecode 队列键
* @param <t> 元素类型
* @return 队列中的元素
*/
public <t> t getdelayqueue(string queuecode) throws interruptedexception {
rblockingdeque<t> blockingdeque = redissonclient.getblockingdeque(queuecode);
return blockingdeque.take();
}
}redis延迟队列运行器
用于在spring boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。
package com.zhou.demo.runner;
import com.zhou.demo.enums.delayqueueenum;
import com.zhou.demo.hander.delayqueuehandler;
import com.zhou.demo.utils.redisdelayqueueutil;
import com.zhou.demo.utils.springutils;
import lombok.extern.slf4j.slf4j;
import org.springframework.boot.commandlinerunner;
import org.springframework.stereotype.component;
import javax.annotation.predestroy;
import javax.annotation.resource;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
/**
* redis延迟队列运行器
* 用于在spring boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。
*
* @author zhouquan
*/
@slf4j
@component
public class redisdelayqueuerunner implements commandlinerunner {
@resource
private redisdelayqueueutil redisdelayqueueutil;
/**
* 线程池,用于并发监听不同的延迟队列
*/
private final executorservice executorservice = executors.newcachedthreadpool();
/**
* 运行状态标志,控制线程是否持续监听队列
*/
private volatile boolean running = true;
/**
* spring boot 启动完成后自动运行的方法
* 遍历所有延迟队列枚举,为每个队列创建一个监听线程
*
* @param args 命令行参数
*/
@override
public void run(string... args) {
for (delayqueueenum queueenum : delayqueueenum.values()) {
executorservice.execute(() -> {
log.info("启动延迟队列监听线程:{}", queueenum.getcode());
while (running) {
try {
object value = redisdelayqueueutil.getdelayqueue(queueenum.getcode());
delayqueuehandler handler = springutils.getbean(queueenum.getbeanclass());
handler.execute(value);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
log.warn("线程中断:{}", queueenum.getcode());
} catch (exception ex) {
log.error("延迟队列 [{}] 处理异常:{}", queueenum.getcode(), ex.getmessage(), ex);
}
}
});
}
log.info("所有 redis 延迟队列监听启动完成");
}
/**
* 在 bean 销毁前关闭线程池,释放资源
*/
@predestroy
public void shutdown() {
log.info("准备关闭 redis 延迟队列监听线程池");
running = false;
executorservice.shutdownnow();
}
}业务枚举类
package com.zhou.demo.enums;
import com.zhou.demo.hander.evaluationtimeouthandler;
import com.zhou.demo.hander.orderpaymenttimeouthandler;
import com.zhou.demo.hander.delayqueuehandler;
import lombok.allargsconstructor;
import lombok.getter;
import lombok.noargsconstructor;
/**
* 延迟队列业务枚举
*
* @author 18324
*/
@getter
@noargsconstructor
@allargsconstructor
public enum delayqueueenum {
/**
* 订单超时
*/
order_payment_timeout("order_payment_timeout", "订单支付超时", orderpaymenttimeouthandler.class),
/**
* 评价超时
*/
evaluation_timeout("evaluation_timeout", "评价超时", evaluationtimeouthandler.class);
/**
* 延迟队列 redis key
*/
private string code;
/**
* 中文描述
*/
private string name;
/**
* 延迟队列具体业务实现的 bean
* 可通过 spring 的上下文获取
*/
private class<? extends delayqueuehandler<long>> beanclass;
}测试接口类
package com.zhou.demo.enums;
import com.zhou.demo.hander.evaluationtimeouthandler;
import com.zhou.demo.hander.orderpaymenttimeouthandler;
import com.zhou.demo.hander.delayqueuehandler;
import lombok.allargsconstructor;
import lombok.getter;
import lombok.noargsconstructor;
/**
* 延迟队列业务枚举
*
* @author 18324
*/
@getter
@noargsconstructor
@allargsconstructor
public enum delayqueueenum {
/**
* 订单超时
*/
order_payment_timeout("order_payment_timeout", "订单支付超时", orderpaymenttimeouthandler.class),
/**
* 评价超时
*/
evaluation_timeout("evaluation_timeout", "评价超时", evaluationtimeouthandler.class);
/**
* 延迟队列 redis key
*/
private string code;
/**
* 中文描述
*/
private string name;
/**
* 延迟队列具体业务实现的 bean
* 可通过 spring 的上下文获取
*/
private class<? extends delayqueuehandler<long>> beanclass;
}延迟队列任务测试


源码地址
https://gitee.com/zhouquanstudy/springboot-redisson-delayqueue
参考
[1]. springboot集成redisson实现延迟队列_redisson delayedqueue
[2]. 请勿过度依赖redis的过期监听业务
到此这篇关于spring boot 项目集成 redisson 实现延迟队列的文章就介绍到这了,更多相关spring boot redisson延迟队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论