引入
<redisson.version>3.15.5</redisson.version>
<dependency>
<groupid>org.redisson</groupid>
<artifactid>redisson-spring-boot-starter</artifactid>
<version>${redisson.version}</version>
</dependency>
放入延时队列
import org.redisson.api.redissonclient;
@autowired
private redissonclient redissonclient;
public static final string cardkitmessagedelayqueue = "queue:card_kit";
// 发送延时消息
rblockingdeque<cardkitredisbo> blockingdeque = redissonclient
.getblockingdeque(cardkitmessagedelayqueue);
rdelayedqueue<cardkitredisbo> delayedqueue = redissonclient.getdelayedqueue(blockingdeque);
// 计算时间戳
long delayinseconds = calculatedifference(model.getsendtime(), localdatetime.now());
cardkitredisbo cardkitredisbo = new cardkitredisbo();
cardkitredisbo.setid(model.getid()).settemplateid(model.gettemplateid());
delayedqueue.offer(cardkitredisbo, delayinseconds, timeunit.seconds);
监听延时队列
import cn.hutool.json.jsonutil;
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rblockingdeque;
import org.redisson.api.rlock;
import org.redisson.api.redissonclient;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.applicationarguments;
import org.springframework.boot.applicationrunner;
import org.springframework.cloud.sleuth.span;
import org.springframework.cloud.sleuth.tracer;
import org.springframework.stereotype.component;
import javax.annotation.resource;
import java.util.concurrent.timeunit;
@slf4j
@component
public class cardkitmessagelistener implements applicationrunner {
public static final string cardkitmessagedelayqueue = "queue:card_kit";
public static final string cardkitmessagedelaylock = "lock:card_kit";
@resource
private redissonclient redissonclient;
@autowired
private tracer tracer;
@autowired
private cardkitservice cardkitservice;
@override
public void run(applicationarguments args) {
new thread(() -> {
rblockingdeque<cardkitredisbo> blockingdeque = redissonclient.getblockingdeque(cardkitmessagedelayqueue);
while (true) {
// 获取定时任务锁
rlock rlock = redissonclient.getlock(cardkitmessagedelaylock);
try {
// 最多等待5秒
boolean islocked = rlock.trylock(5, timeunit.seconds);
if (islocked) {
span span = tracer.nextspan().name("occupationmessage").start();
try (tracer.spaninscope ws = tracer.withspan(span)) {
cardkitredisbo poll = blockingdeque.take();
log.info("获取延时消息:{}", jsonutil.tojsonstr(poll));
// 消费消息
cardkitservice.sendcardkit(poll);
} finally {
try {
rlock.unlock();
} catch (exception ex) {
log.warn("锁释放失败:" + ex.getmessage());
}
try {
span.end();
} catch (exception ex) {
log.error("失败", ex)
}
}
}
} catch (exception ex) {
log.error("延迟消息处理异常:" + ex.getmessage(), ex);
}
}
}).start();
}
}到此这篇关于redis延时队列的文章就介绍到这了,更多相关redis延时队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论