项目场景:
在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?
一般实现的方法有几种:使用 redisson、rocketmq、rabbitmq等消息队列的延时投递功能。
解决方案:
一般项目集成redis的比较多,所以我这篇文章就说下redisson延迟队列,如果使用rocketmq或rabbitmq需要额外集成中间件,比较麻烦一点。
1.集成redisson
maven依赖
<dependency> <groupid>org.redisson</groupid> <artifactid>redisson-spring-boot-starter</artifactid> <version>3.21.1</version> </dependency>
yml配置,单节点配置可以兼容redis的配置方式
# redis配置
spring:
redis:
database: 0
host: 127.0.0.1
password: redis@pass
port: 6001更详细的配置参考:spring boot整合redisson的两种方式
2.配置多线程
因为延迟队列可能会多个任务同时执行,所以需要多线程处理。
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.scheduling.annotation.enableasync;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
import java.util.concurrent.threadpoolexecutor;
@configuration
@enableasync
public class executorconfig {
/**
* 异步任务自定义线程池
*/
@bean(name = "taskexecutor")
public threadpooltaskexecutor asyncserviceexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
//配置核心线程数
executor.setcorepoolsize(50);
//配置最大线程数
executor.setmaxpoolsize(500);
//配置队列大小
executor.setqueuecapacity(300);
//允许线程空闲时间
executor.setkeepaliveseconds(60);
//配置线程池中的线程的名称前缀
executor.setthreadnameprefix("taskexecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
//调用shutdown()方法时等待所有的任务完成后再关闭
executor.setwaitfortaskstocompleteonshutdown(true);
//等待所有任务完成后的最大等待时间
executor.setawaitterminationseconds(60);
return executor;
}
}3.具体业务
比如消息通知、关闭订单等 ,这里加上了@async注解,可以异步执行
import org.springframework.scheduling.annotation.async;
import org.springframework.stereotype.service;
import java.text.simpledateformat;
import java.util.date;
@service
public class asyncservice {
@async
public void executequeue(object value) {
system.out.println();
system.out.println("当前线程:"+thread.currentthread().getname());
system.out.println("执行任务:"+value);
//打印时间方便查看
simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
system.out.println("执行任务的时间:"+sdf.format(new date()));
//自己的业务逻辑,可以根据id发送通知消息等
//......
}
}4.延迟队列(关键代码)
这里包括添加延迟队列,和消费延迟队列,@postconstruct注解的意思是服务启动加载一次,参考
import org.redisson.api.rblockingqueue;
import org.redisson.api.rdelayedqueue;
import org.redisson.api.redissonclient;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
import org.springframework.stereotype.service;
import javax.annotation.postconstruct;
import javax.annotation.resource;
import java.text.simpledateformat;
import java.util.date;
import java.util.concurrent.timeunit;
@service
public class testservice {
@resource
private asyncservice asyncservice;
@resource
private threadpooltaskexecutor executor;
@autowired
private redissonclient redissonclient;
/**
* 添加延迟任务
*/
public void addqueue() {
//获取延迟队列
rblockingqueue<object> blockingqueue = redissonclient.getblockingqueue("delayedqueue");
rdelayedqueue<object> delayedqueue = redissonclient.getdelayedqueue(blockingqueue);
for (int i = 1; i <= 10; i++) {
long delaytime = 5+i; //延迟时间(秒)
// long delaytime = 5; //这里时间统一,可以测试并发执行
delayedqueue.offer("延迟任务"+i, delaytime, timeunit.seconds);
}
//打印时间方便查看
simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
system.out.println("添加任务的时间:"+sdf.format(new date()));
}
/**
* 服务启动时加载,开始消费延迟队列
*/
@postconstruct
public void consumer() {
system.out.println("服务启动时加载>>>>>>");
//获取延迟队列
rblockingqueue<object> delayedqueue = redissonclient.getblockingqueue("delayedqueue");
//启用一个线程来消费这个延迟队列
executor.execute(() ->{
while (true){
try {
// system.out.println("while中的线程:"+thread.currentthread().getname());
//获取延迟队列中的任务
object value = delayedqueue.poll();
if(value == null){
//如果没有任务就休眠1秒,休眠时间根据业务自己定义
thread.sleep(1000); //这里休眠时间越短,误差就越小
continue;
}
//异步处理延迟队列中的消息
asyncservice.executequeue(value);
} catch (exception e) {
e.printstacktrace();
}
}
});
}
}5.测试接口
import com.test.service.testservice;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
@restcontroller
@requestmapping("/test")
public class testcontroller {
@autowired
private testservice testservice;
/*
* 添加延迟任务
*/
@getmapping(value = "/addqueue")
public string addqueue() {
testservice.addqueue();
return "success";
}
}6.测试结果

总结:
- redisson的的rdelayedqueue是基于redis实现的,而redis本身并不保证数据的持久性。如果redis服务器宕机,那么所有在rdelayedqueue中的数据都会丢失。因此,我们需要在应用层面进行持久化设计,例如定期将rdelayedqueue中的数据持久化到数据库。
- 在设计延迟任务时,我们应该根据实际需求来合理设置延迟时间,避免设置过长的延迟时间导致内存占用过高。
到此这篇关于spring boot集成redisson实现延迟队列的文章就介绍到这了,更多相关springboot redisson延迟队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论