1、先给项目导入redisson依赖
<dependency> <groupid>org.redisson</groupid> <artifactid>redisson-spring-boot-starter</artifactid> <version>3.15.4</version> </dependency>
2、配置redis
3、创建 redissonconfig 配置
4、封装 redis 延迟队列工具类
import lombok.extern.slf4j.slf4j; import org.redisson.api.rblockingdeque; import org.redisson.api.rdelayedqueue; import org.redisson.api.redissonclient; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import java.util.map; import java.util.concurrent.timeunit; /** * @author bigboss */ @slf4j @component public class redisdelayqueueutil { @autowired private redissonclient redissonclient; /** * 添加延迟队列 * * @param value 队列值 * @param delay 延迟时间 * @param timeunit 时间单位 * @param queuecode 队列键 * @param <t> */ public <t> void adddelayqueue(t value, long delay, timeunit timeunit, string queuecode) { try { rblockingdeque<object> blockingdeque = redissonclient.getblockingdeque(queuecode); rdelayedqueue<object> delayedqueue = redissonclient.getdelayedqueue(blockingdeque); //删除之前进来的id值 delayedqueue.remove(value); delayedqueue.offer(value, delay, timeunit); log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queuecode, value, timeunit.toseconds(delay) + "秒"); } catch (exception e) { log.error("(添加延时队列失败) {}", e.getmessage()); throw new runtimeexception("(添加延时队列失败)"); } } /** * 获取延迟队列 * * @param queuecode * @param <t> * @return * @throws interruptedexception */ public <t> t getdelayqueue(string queuecode) throws interruptedexception { rblockingdeque<map> blockingdeque = redissonclient.getblockingdeque(queuecode); t value = (t) blockingdeque.take(); return value; } /** * 删除延迟队列 * * @param queuecode * @param <t> * @return */ public <t> t removedelayqueue(t value, string queuecode) { rblockingdeque<object> blockingdeque = redissonclient.getblockingdeque(queuecode); rdelayedqueue<object> delayedqueue = redissonclient.getdelayedqueue(blockingdeque); delayedqueue.remove(value); return value; } }
5、创建延迟队列业务枚举
import lombok.allargsconstructor; import lombok.getter; import lombok.noargsconstructor; /** * @author: bigboss369 * @created: 2022/8/11/011 */ @getter @noargsconstructor @allargsconstructor public enum redisdelayqueueenum { pengyuyan("pengyuyan","心跳", "pengyuyan"); /** * 延迟队列 redis key */ private string code; /** * 中文描述 */ private string name; /** * 延迟队列具体业务实现的 bean * 可通过 spring 的上下文获取 */ private string beanid; }
6、定义延迟队列执行器
/** * @author: bigboss369 * @created: 2022/8/11/011 */ public interface redisdelayqueuehandle<t> { void execute(t t); }
7、创建枚举中定义的bean,并实现延迟队列执行器
import com.a.service.redisdelayqueuehandle; import lombok.extern.slf4j.slf4j; import org.springframework.stereotype.component; import java.util.map; @component(value = "pengyuyan") @slf4j public class pengyuyan implements redisdelayqueuehandle<map> { @override public void execute(map map) { log.info("收到心跳延迟消息map:" + map); } }
8、创建延迟队列消费线程,项目启动完成后开启
import com.a.service.redisdelayqueuehandle; import com.a.util.redisdelayqueueutil; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.commandlinerunner; import org.springframework.stereotype.component; /** * @author bigboss */ @slf4j @component public class redisdelayqueuerunner implements commandlinerunner { @autowired private redisdelayqueueutil redisdelayqueueutil; @override public void run(string... args) { redisdelayqueueenum[] queueenums = redisdelayqueueenum.values(); for (redisdelayqueueenum queueenum : queueenums) { new thread(() -> { try { while (true) { object value = redisdelayqueueutil.getdelayqueue(queueenum.getcode()); log.debug("----value-->" + value); redisdelayqueuehandle redisdelayqueuehandle = springutil.getbean(queueenum.getbeanid()); redisdelayqueuehandle.execute(value); } } catch (interruptedexception e) { log.error("(redis延迟队列异常中断) {}", e.getmessage()); } }).start(); } log.info("(redis延迟队列启动成功)"); } }
9、创建一个测试接口,模拟添加延迟队列
发送心跳到队列里
类成功接收到心跳包
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论