1、先给项目导入redisson依赖
<dependency>
<groupid>org.redisson</groupid>
<artifactid>redisson-spring-boot-starter</artifactid>
<version>3.15.4</version>
</dependency>
2、配置redis

3、创建 redissonconfig 配置

4、封装 redis 延迟队列工具类
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rblockingdeque;
import org.redisson.api.rdelayedqueue;
import org.redisson.api.redissonclient;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
import java.util.map;
import java.util.concurrent.timeunit;
/**
* @author bigboss
*/
@slf4j
@component
public class redisdelayqueueutil {
@autowired
private redissonclient redissonclient;
/**
* 添加延迟队列
*
* @param value 队列值
* @param delay 延迟时间
* @param timeunit 时间单位
* @param queuecode 队列键
* @param <t>
*/
public <t> void adddelayqueue(t value, long delay, timeunit timeunit, string queuecode) {
try {
rblockingdeque<object> blockingdeque = redissonclient.getblockingdeque(queuecode);
rdelayedqueue<object> delayedqueue = redissonclient.getdelayedqueue(blockingdeque);
//删除之前进来的id值
delayedqueue.remove(value);
delayedqueue.offer(value, delay, timeunit);
log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queuecode, value, timeunit.toseconds(delay) + "秒");
} catch (exception e) {
log.error("(添加延时队列失败) {}", e.getmessage());
throw new runtimeexception("(添加延时队列失败)");
}
}
/**
* 获取延迟队列
*
* @param queuecode
* @param <t>
* @return
* @throws interruptedexception
*/
public <t> t getdelayqueue(string queuecode) throws interruptedexception {
rblockingdeque<map> blockingdeque = redissonclient.getblockingdeque(queuecode);
t value = (t) blockingdeque.take();
return value;
}
/**
* 删除延迟队列
*
* @param queuecode
* @param <t>
* @return
*/
public <t> t removedelayqueue(t value, string queuecode) {
rblockingdeque<object> blockingdeque = redissonclient.getblockingdeque(queuecode);
rdelayedqueue<object> delayedqueue = redissonclient.getdelayedqueue(blockingdeque);
delayedqueue.remove(value);
return value;
}
}
5、创建延迟队列业务枚举
import lombok.allargsconstructor;
import lombok.getter;
import lombok.noargsconstructor;
/**
* @author: bigboss369
* @created: 2022/8/11/011
*/
@getter
@noargsconstructor
@allargsconstructor
public enum redisdelayqueueenum {
pengyuyan("pengyuyan","心跳", "pengyuyan");
/**
* 延迟队列 redis key
*/
private string code;
/**
* 中文描述
*/
private string name;
/**
* 延迟队列具体业务实现的 bean
* 可通过 spring 的上下文获取
*/
private string beanid;
}
6、定义延迟队列执行器
/**
* @author: bigboss369
* @created: 2022/8/11/011
*/
public interface redisdelayqueuehandle<t> {
void execute(t t);
}
7、创建枚举中定义的bean,并实现延迟队列执行器
import com.a.service.redisdelayqueuehandle;
import lombok.extern.slf4j.slf4j;
import org.springframework.stereotype.component;
import java.util.map;
@component(value = "pengyuyan")
@slf4j
public class pengyuyan implements redisdelayqueuehandle<map> {
@override
public void execute(map map) {
log.info("收到心跳延迟消息map:" + map);
}
}
8、创建延迟队列消费线程,项目启动完成后开启
import com.a.service.redisdelayqueuehandle;
import com.a.util.redisdelayqueueutil;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
import org.springframework.stereotype.component;
/**
* @author bigboss
*/
@slf4j
@component
public class redisdelayqueuerunner implements commandlinerunner {
@autowired
private redisdelayqueueutil redisdelayqueueutil;
@override
public void run(string... args) {
redisdelayqueueenum[] queueenums = redisdelayqueueenum.values();
for (redisdelayqueueenum queueenum : queueenums) {
new thread(() -> {
try {
while (true) {
object value = redisdelayqueueutil.getdelayqueue(queueenum.getcode());
log.debug("----value-->" + value);
redisdelayqueuehandle redisdelayqueuehandle = springutil.getbean(queueenum.getbeanid());
redisdelayqueuehandle.execute(value);
}
} catch (interruptedexception e) {
log.error("(redis延迟队列异常中断) {}", e.getmessage());
}
}).start();
}
log.info("(redis延迟队列启动成功)");
}
}
9、创建一个测试接口,模拟添加延迟队列

发送心跳到队列里

类成功接收到心跳包

总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论