在开发中,我们会遇到需要延时任务的业务场景,例如:用户下单之后未在规定的时间内支付成功,该订单会自动取消; 用户注册成功15分钟后,发消息通知用户;还有比如到期自动收货,超时自动退款等都是类似的延时任务的业务问题。
这里主要介绍一下几种方法:
- 1、定时任务
- 2、jdk延迟队列 delayqueue
- 3、redis过期监听
- 4、redisson分布式延迟队列
- 5、rocketmq延迟消息
- 6、rabbitmq死信队列
1、定时任务
写一个定时任务,定期扫描数据库中的订单,如果时间过期,就取消这个订单。这种实现方法成本低、实现容易。这里使用@scheduled注解实现,也可以用quartz框架实现定时任务。
@scheduled(cron = "30 * * * * ?") public void scanorder(){ orderservice.scanorder(); //每30秒扫描数据库 找出过期未支付的订单,取消该订单 }
优点:实现容易,成本低,不依赖其他组件。
缺点:
- 时间不够精确。因为扫描是有间隔的,但却随时会产生过期的订单,所以可能会导致有些订单已经过期了一段时间后才被扫描到。
- 增加了数据库的压力。频繁的访问数据库,当数据越来越多时,访问数据库的成本也会增加。
2、jdk延迟队列 delayqueue
delayqueue是jdk提供的一个无界队列,它的本质是封装了一个priorityqueue(优先队列), priorityqueue内部使用完全二叉堆来实现队列排序,在向队列中插入元素时,需要给出这个元素的delay时间,也就是过期时间,队列中最小的元素会被放在队首,队列中的元素只有到了delay时间才允许从队列中取出。
具体的实现思路就是:首先创建一个实体类实现delay接口,然后将它放入delayqueue队列中。
(1)定义实现delayed接口的实体类
需要实现delayed接口的两个方法:getdelay()和compareto()
import com.fasterxml.jackson.annotation.jsonformat; import lombok.allargsconstructor; import lombok.data; import lombok.noargsconstructor; import java.util.concurrent.delayed; import java.util.concurrent.timeunit; @data @allargsconstructor @noargsconstructor public class mydelay implements delayed { private string ordernumber; //订单编号 @jsonformat(locale = "zh", timezone = "gmt+8", pattern = "yyyy-mm-dd hh:mm:ss") private long time; //过期时间 @override public long getdelay(timeunit timeunit) { return time - system.currenttimemillis(); } @override public int compareto(delayed delayed) { mydelay mydelay = (mydelay)delayed; return this.time.compareto(mydelay.gettime()); } }
(2)将延时任务放入队列
package com.demo; import com.demo.config.mydelay; import java.util.concurrent.delayqueue; public class demo { public static void main(string[] args) throws interruptedexception { mydelay mydelay1 = new mydelay("0001", 5l); mydelay mydelay2 = new mydelay("0002", 10l); mydelay mydelay3 = new mydelay("0003", 15l); delayqueue<mydelay> delaydelayqueue = new delayqueue<mydelay>(); delaydelayqueue.add(mydelay1); delaydelayqueue.add(mydelay2); delaydelayqueue.add(mydelay3); while (delaydelayqueue.size()!=0) { /** * 取队列头部元素是否过期 */ //delayqueue的put/add方法是线程安全的,因为put/add方法内部使用了reentrantlock锁进行线程同步。 // delayqueue还提供了两种出队的方法 poll() 和 take() , // poll() 为非阻塞获取,没有到期的元素直接返回null; // take() 阻塞方式获取,没有到期的元素线程将会等待。 mydelay order = delaydelayqueue.poll(); if(order!=null) { system.out.println("订单编号:"+order.getordernumber()+",超时取消!"); } thread.sleep(1000); } } }
优点:不依赖任何第三方组件,实现方便。
缺点:因为delayqueue是基于jvm的,如果放入的订单过多,会造成jvm溢出。如果jvm重启了,那所有的数据就丢失了。
3、redis过期监听
redis是一个高性能的key,value数据库,除了用作缓存之外,它还提供了过期监听的功能。
在redis.conf中配置
配置notify-keyspace-events "ex" 即可开启此功能。
springboot 项目集成redis配置过期监听
在pom中引入依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> </dependency>
在yml中配置redis源
redis: #数据库索引 database: 0 host: 127.0.0.1 port: 6379 password: 123456 jedis: pool: #最大连接数 max-active: 15 #最大阻塞等待时间(负数表示没限制) max-wait: -1 #最大空闲 max-idle: 15 #最小空闲 min-idle: 0 #连接超时时间 timeout: 10000
编写redis配置类
package com.example.study_demo.config; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.data.redis.connection.redisconnectionfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.listener.redismessagelistenercontainer; /** * redis配置 */ @configuration public class redisconfig { @autowired private redisconnectionfactory redisconnectionfactory; @bean public redismessagelistenercontainer redismessagelistenercontainer() { redismessagelistenercontainer redismessagelistenercontainer = new redismessagelistenercontainer(); redismessagelistenercontainer.setconnectionfactory(redisconnectionfactory); return redismessagelistenercontainer; } @bean public keyexpiredlistener keyexpiredlistener() { return new keyexpiredlistener(this.redismessagelistenercontainer()); } }
编写redis工具类
package com.example.study_demo.utils; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.boundsetoperations; import org.springframework.data.redis.core.hashoperations; import org.springframework.data.redis.core.redistemplate; import org.springframework.data.redis.core.valueoperations; import org.springframework.stereotype.component; import java.util.*; import java.util.concurrent.timeunit; @component public class rediscache { @autowired public redistemplate redistemplate; /** * 缓存基本的对象,integer、string、实体类等 * * @param key 缓存的键值 * @param value 缓存的值 */ public <t> void setcacheobject(final string key, final t value) { redistemplate.opsforvalue().set(key, value); } /** * 缓存基本的对象,integer、string、实体类等 * * @param key 缓存的键值 * @param value 缓存的值 * @param timeout 时间 * @param timeunit 时间颗粒度 */ public <t> void setcacheobject(final string key, final t value, final integer timeout, final timeunit timeunit) { redistemplate.opsforvalue().set(key, value, timeout, timeunit); } /** * 设置有效时间 * * @param key redis键 * @param timeout 超时时间 * @return true=设置成功;false=设置失败 */ public boolean expire(final string key, final long timeout) { return expire(key, timeout, timeunit.seconds); } /** * 设置有效时间 * * @param key redis键 * @param timeout 超时时间 * @param unit 时间单位 * @return true=设置成功;false=设置失败 */ public boolean expire(final string key, final long timeout, final timeunit unit) { return redistemplate.expire(key, timeout, unit); } /** * 获得缓存的基本对象。 * * @param key 缓存键值 * @return 缓存键值对应的数据 */ public <t> t getcacheobject(final string key) { valueoperations<string, t> operation = redistemplate.opsforvalue(); return operation.get(key); } /** * 删除单个对象 * * @param key */ public boolean deleteobject(final string key) { return redistemplate.delete(key); } /** * 删除集合对象 * * @param collection 多个对象 * @return */ public long deleteobject(final collection collection) { return redistemplate.delete(collection); } /** * 缓存list数据 * * @param key 缓存的键值 * @param datalist 待缓存的list数据 * @return 缓存的对象 */ public <t> long setcachelist(final string key, final list<t> datalist) { long count = redistemplate.opsforlist().rightpushall(key, datalist); return count == null ? 0 : count; } /** * 获得缓存的list对象 * * @param key 缓存的键值 * @return 缓存键值对应的数据 */ public <t> list<t> getcachelist(final string key) { return redistemplate.opsforlist().range(key, 0, -1); } /** * 缓存set * * @param key 缓存键值 * @param dataset 缓存的数据 * @return 缓存数据的对象 */ public <t> boundsetoperations<string, t> setcacheset(final string key, final set<t> dataset) { boundsetoperations<string, t> setoperation = redistemplate.boundsetops(key); iterator<t> it = dataset.iterator(); while (it.hasnext()) { setoperation.add(it.next()); } return setoperation; } /** * 获得缓存的set * * @param key * @return */ public <t> set<t> getcacheset(final string key) { return redistemplate.opsforset().members(key); } /** * 缓存map * * @param key * @param datamap */ public <t> void setcachemap(final string key, final map<string, t> datamap) { if (datamap != null) { redistemplate.opsforhash().putall(key, datamap); } } /** * 获得缓存的map * * @param key * @return */ public <t> map<string, t> getcachemap(final string key) { return redistemplate.opsforhash().entries(key); } /** * 往hash中存入数据 * * @param key redis键 * @param hkey hash键 * @param value 值 */ public <t> void setcachemapvalue(final string key, final string hkey, final t value) { redistemplate.opsforhash().put(key, hkey, value); } /** * 获取hash中的数据 * * @param key redis键 * @param hkey hash键 * @return hash中的对象 */ public <t> t getcachemapvalue(final string key, final string hkey) { hashoperations<string, string, t> opsforhash = redistemplate.opsforhash(); return opsforhash.get(key, hkey); } /** * 删除hash中的数据 * * @param key * @param hkey */ public void delcachemapvalue(final string key, final string hkey) { hashoperations hashoperations = redistemplate.opsforhash(); hashoperations.delete(key, hkey); } /** * 获取多个hash中的数据 * * @param key redis键 * @param hkeys hash键集合 * @return hash对象集合 */ public <t> list<t> getmulticachemapvalue(final string key, final collection<object> hkeys) { return redistemplate.opsforhash().multiget(key, hkeys); } /** * 获得缓存的基本对象列表 * * @param pattern 字符串前缀 * @return 对象列表 */ public collection<string> keys(final string pattern) { return redistemplate.keys(pattern); } }
编写监控类
在代码中继承keyspaceeventmessagelistener ,实现onmessage就可以监听过期的数据量
package com.example.study_demo.config; import lombok.extern.slf4j.slf4j; import org.springframework.data.redis.connection.message; import org.springframework.data.redis.listener.keyexpirationeventmessagelistener; import org.springframework.data.redis.listener.redismessagelistenercontainer; @slf4j public class keyexpiredlistener extends keyexpirationeventmessagelistener { public keyexpiredlistener(redismessagelistenercontainer listenercontainer) { super(listenercontainer); } @override public void onmessage(message message, byte[] pattern) { string expiredkey = message.tostring(); log.info("订单{}过期了", expiredkey); } }
测试
package com.demo; import com.demo.config.mydelay; import java.util.concurrent.delayqueue; public class demo { public static void main(string[] args) throws interruptedexception { long expire = 5l; //设置过期时间 string key = "0001"; rediscache rediscache = new rediscache(); rediscache.setcacheobject(key,"订单过期了"); rediscache.expire(key,expire); } }
优点:由于redis的高性能,所以在设置以及消费key时的速度可以保证。
缺点: 由于redis的key过期策略的原因,当一个key过期时,无法立刻保证将其删除,自然我们监听事件也无法第一时间消费到这个key,所以会存在一定的延迟。 此外,在redis5.0之前,订阅发布消息并没有被持久化,自然也没有所谓的确认机制,所以一旦消费信息过程中我们的客户端发生了宕机,这条消息就彻底丢失了。
4、redisson分布式延迟队列
redisson是一个基于redis实现的java驻内存数据网络,它不仅提供了一系列的分布式java常用对象,还提供了许多分布式服务。redisson除了提供我们常用的分布式锁外,还提供了一个分布式延迟队列rdelayedqueue ,它是一种基于zset结构实现的延迟队列,其实现类是redissondelayedqueue,在springboot中整合使用redisson分布式延迟队列的步骤如下:
引入pom依赖,yml中配置redis连接
<dependency> <groupid>org.redisson</groupid> <artifactid>redisson-spring-boot-starter</artifactid> <version>3.10.5</version> </dependency>
创建延时队列生产者
import org.redisson.api.rdelayedqueue; import org.redisson.api.redissonclient; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; /** * 延迟队列生产者 */ @service public class rdelayqueueproducer { @autowired private redissonclient redissonclient; public void addtask(string taskid, long delaytime){ //创建一个延迟队列 rdelayedqueue<string> delayedqueue = redissonclient.getdelayedqueue(redissonclient.getqueue("my_delayqueue")); //将任务添加到延迟队列,指定延迟时间 delayedqueue.offer(taskid,delaytime,java.util.concurrent.timeunit.seconds); } }
创建延时队列消费者
import org.redisson.api.rdelayedqueue; import org.redisson.api.redissonclient; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; /** * 延迟队列消费者 */ @service public class rdelayqueueconsumer { @autowired private redissonclient redissonclient; public void consumetask(){ rdelayedqueue<string> delayedqueue = redissonclient.getdelayedqueue(redissonclient.getqueue("my_delayqueue")); while (true){ string poll = delayedqueue.poll(); if(poll!=null){ //收到消息进行处理 system.out.println("收到消息:"+poll); } } } }
测试
@postmapping("/test") public void test(){ rdelayqueueproducer.addtask("0001",5); rdelayqueueproducer.addtask("0002",10); rdelayqueueproducer.addtask("0003",15); }
优点:使用简单,并且其实现类中大量使用lua脚本保证其原子性,不会有并发重复问题。
缺点:需要依赖redis
5、rocketmq延迟消息
rocketmq是阿里巴巴开源的一款分布式消息中间件,基于高可用分布式集群技术,提供低延迟的、可靠的消息发布与订阅服务。下面是在springboot中集成rocketmq延迟消息的步骤:
安装并启动 rocketmq 服务
可参考rocketmq 官方文档进行安装和启动
引入依赖
<dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.2.2</version> </dependency>
配置rocketmq
spring: rocketmq: name-server: 127.0.0.1:9876 # rocketmq nameserver地址 producer: group: my-group # 生产者组名
创建消息生产者
import org.apache.rocketmq.spring.core.rocketmqtemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; @service public class rocketmqproducerservice { @autowired private rocketmqtemplate rocketmqtemplate; public void sendmessage(string topic, string message,long delay) { // 发送延迟消息,延迟级别为16,对应延迟时间为delay rocketmqtemplate.syncsend(topic, message, delay, 16); } }
创建消息消费者
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener; import org.apache.rocketmq.spring.core.rocketmqlistener; import org.springframework.stereotype.service; @service @rocketmqmessagelistener(topic = "test-topic", consumergroup = "my-consumer-group") public class rocketmqconsumerservice implements rocketmqlistener<string> { @override public void onmessage(string message) { system.out.println("接收到消息: " + message); //检查订单是否支付 } }
测试
import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.restcontroller; @restcontroller public class rocketmqtestcontroller { @autowired private rocketmqproducerservice producerservice; @getmapping("/sendmessage") public string sendmessage() { string topic = "test-topic"; string message = "0001"; //发送订单编号到rocketmq long delay = 3000; producerservice.sendmessage(topic, message, delay); return "消息发送成功"; } }
优点:系统之间完全解耦,只需要关注生产及消费即可。其吞吐量极高。
缺点:rocketmq是重量级的组件,引入后,随之而来的消息丢失等问题都增加了系统的复杂度。
6、rabbitmq死信队列
当rabbitmq中的一条正常信息,因为过了存活时间(ttl过期)、队列长度超限等原因无法被消费时,就会被当成一条死信消息,投递到死信队列。基于这样的机制,我们可以给消息设置一个ttl ,等消息过期就会进入死信队列,我们再消费死信队列即可,这样,就可以达到和rocketmq一样的效果。springboot集成rabbitmq的步骤如下:
安装并启动 rabbitmq 服务
可参考rabbitmq官方文档进行安装和启动
引入依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency>
配置rabbitmq
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
配置 rabbitmq 队列和交换机
import org.springframework.amqp.core.*; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import java.util.hashmap; import java.util.map; @configuration public class rabbitmqconfig { public static final string order_exchange = "order.exchange"; public static final string order_queue = "order.queue"; public static final string order_routing_key = "order.routing.key"; public static final string dead_letter_exchange = "dead.letter.exchange"; public static final string dead_letter_queue = "dead.letter.queue"; public static final string dead_letter_routing_key = "dead.letter.routing.key"; // 死信交换机 @bean public directexchange deadletterexchange() { return new directexchange(dead_letter_exchange); } // 死信队列 @bean public queue deadletterqueue() { return new queue(dead_letter_queue); } // 绑定死信队列和死信交换机 @bean public binding deadletterbinding() { return bindingbuilder.bind(deadletterqueue()).to(deadletterexchange()).with(dead_letter_routing_key); } // 正常交换机 @bean public directexchange orderexchange() { return new directexchange(order_exchange); } // 正常队列,设置死信交换机和路由键,以及消息ttl为30分钟(1800000毫秒) @bean public queue orderqueue() { map<string, object> args = new hashmap<>(); args.put("x-dead-letter-exchange", dead_letter_exchange); args.put("x-dead-letter-routing-key", dead_letter_routing_key); args.put("x-message-ttl", 1800000); return new queue(order_queue, true, false, false, args); } // 绑定正常队列和正常交换机 @bean public binding orderbinding() { return bindingbuilder.bind(orderqueue()).to(orderexchange()).with(order_routing_key); } }
创建消息生产者
import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; @service public class ordermessageproducer { @autowired private rabbittemplate rabbittemplate; public void sendordermessage(string message) { rabbittemplate.convertandsend(rabbitmqconfig.order_exchange, rabbitmqconfig.order_routing_key, message); } }
创建消息消费者
import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.stereotype.service; @service public class ordermessageconsumer { @rabbitlistener(queues = rabbitmqconfig.dead_letter_queue) public void receiveordermessage(string message) { system.out.println("收到订单: " + message); // 模拟检查订单支付状态 } }
测试
import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.restcontroller; @restcontroller public class ordermessagecontroller { @autowired private ordermessageproducer ordermessageproducer; @getmapping("/sendordermessage") public string sendordermessage() { string message = "0001"; //订单编号 ordermessageproducer.sendordermessage(message); return "订单消息已发送,30分钟后处理"; } }
优点:同rocketmq一样可以使业务解耦。
缺点:rabbitmq 的 ttl 是基于队列的,而不是基于单个消息的精确时间控制。当队列中有多个消息时,即使某个消息的 ttl 已经过期,也需要等待前面的消息被处理完才能进入死信队列,导致消息的实际处理时间可能会有一定的延迟,无法保证精确的延迟时间。
到此这篇关于java中如何实现订单超时自动取消功能的文章就介绍到这了,更多相关java订单超时自动取消内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论