包含组件内容
- redisqueue:消息队列监听标识
- redisqueueinit:redis队列监听器
- redisqueuelistener:redis消息队列监听实现
- redisqueueservice:redis消息队列服务工具
代码实现
redisqueue
import java.lang.annotation.elementtype;
import java.lang.annotation.retention;
import java.lang.annotation.retentionpolicy;
import java.lang.annotation.target;
/**
* redis消息队列注解
*/
@target(elementtype.type)
@retention(retentionpolicy.runtime)
public @interface redisqueue {
/**
* 队列名
*/
string value();
}
redisqueueinit
import jakarta.annotation.resource;
import java.util.map;
import java.util.concurrent.executorservice;
import java.util.concurrent.linkedblockingqueue;
import java.util.concurrent.threadfactory;
import java.util.concurrent.threadpoolexecutor;
import java.util.concurrent.timeunit;
import java.util.concurrent.atomic.atomicboolean;
import java.util.concurrent.atomic.atomicinteger;
import lombok.extern.slf4j.slf4j;
import org.jetbrains.annotations.notnull;
import org.redisson.redissonshutdownexception;
import org.redisson.api.rblockingqueue;
import org.redisson.api.redissonclient;
import org.springframework.beans.beansexception;
import org.springframework.context.applicationcontext;
import org.springframework.context.applicationcontextaware;
import org.springframework.stereotype.component;
/**
* 初始化redis队列监听器
*
* @author 十八
* @createtime 2024-09-09 22:49
*/
@slf4j
@component
public class redisqueueinit implements applicationcontextaware {
public static final string redis_queue_prefix = "redis-queue";
final atomicboolean shutdownrequested = new atomicboolean(false);
@resource
private redissonclient redissonclient;
private executorservice executorservice;
public static string buildqueuename(string queuename) {
return redis_queue_prefix + ":" + queuename;
}
@override
public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
map<string, redisqueuelistener> queuelisteners = applicationcontext.getbeansoftype(redisqueuelistener.class);
if (!queuelisteners.isempty()) {
executorservice = createthreadpool();
for (map.entry<string, redisqueuelistener> entry : queuelisteners.entryset()) {
redisqueue redisqueue = entry.getvalue().getclass().getannotation(redisqueue.class);
if (redisqueue != null) {
string queuename = redisqueue.value();
executorservice.submit(() -> listenqueue(queuename, entry.getvalue()));
}
}
}
}
private executorservice createthreadpool() {
return new threadpoolexecutor(
runtime.getruntime().availableprocessors() * 2,
runtime.getruntime().availableprocessors() * 4,
60l, timeunit.seconds,
new linkedblockingqueue<>(100),
new namedthreadfactory(redis_queue_prefix),
new threadpoolexecutor.callerrunspolicy()
);
}
private void listenqueue(string queuename, redisqueuelistener redisqueuelistener) {
queuename = buildqueuename(queuename);
rblockingqueue<?> blockingqueue = redissonclient.getblockingqueue(queuename);
log.info("redis队列监听开启: {}", queuename);
while (!shutdownrequested.get() && !redissonclient.isshutdown()) {
try {
object message = blockingqueue.take();
executorservice.submit(() -> redisqueuelistener.consume(message));
} catch (redissonshutdownexception e) {
log.info("redis连接关闭,停止监听队列: {}", queuename);
break;
} catch (exception e) {
log.error("监听队列异常: {}", queuename, e);
}
}
}
public void shutdown() {
if (executorservice != null) {
executorservice.shutdown();
try {
if (!executorservice.awaittermination(60, timeunit.seconds)) {
executorservice.shutdownnow();
}
} catch (interruptedexception ex) {
executorservice.shutdownnow();
thread.currentthread().interrupt();
}
}
shutdownrequested.set(true);
if (redissonclient != null && !redissonclient.isshuttingdown()) {
redissonclient.shutdown();
}
}
private static class namedthreadfactory implements threadfactory {
private final atomicinteger threadnumber = new atomicinteger(1);
private final string nameprefix;
public namedthreadfactory(string prefix) {
this.nameprefix = prefix;
}
@override
public thread newthread(@notnull runnable r) {
return new thread(r, nameprefix + "-" + threadnumber.getandincrement());
}
}
}
redisqueuelistener
/**
* redis消息队列监听实现
*
* @author 十八
* @createtime 2024-09-09 22:51
*/
public interface redisqueuelistener<t> {
/**
* 队列消费方法
*
* @param content 消息内容
*/
void consume(t content);
}
redisqueueservice
import jakarta.annotation.resource;
import java.util.concurrent.timeunit;
import org.redisson.api.rblockingqueue;
import org.redisson.api.rdelayedqueue;
import org.redisson.api.redissonclient;
import org.springframework.stereotype.component;
/**
* redis 消息队列服务
*
* @author 十八
* @createtime 2024-09-09 22:52
*/
@component
public class redisqueueservice {
@resource
private redissonclient redissonclient;
/**
* 添加队列
*
* @param queuename 队列名称
* @param content 消息
* @param <t> 泛型
*/
public <t> void send(string queuename, t content) {
rblockingqueue<t> blockingqueue = redissonclient.getblockingqueue(redisqueueinit.buildqueuename(queuename));
blockingqueue.add(content);
}
/**
* 添加延迟队列
*
* @param queuename 队列名称
* @param content 消息类型
* @param delay 延迟时间
* @param timeunit 单位
* @param <t> 泛型
*/
public <t> void senddelay(string queuename, t content, long delay, timeunit timeunit) {
rblockingqueue<t> blockingfairqueue = redissonclient.getblockingqueue(redisqueueinit.buildqueuename(queuename));
rdelayedqueue<t> delayedqueue = redissonclient.getdelayedqueue(blockingfairqueue);
delayedqueue.offer(content, delay, timeunit);
}
/**
* 发送延迟队列消息(单位毫秒)
*
* @param queuename 队列名称
* @param content 消息类型
* @param delay 延迟时间
* @param <t> 泛型
*/
public <t> void senddelay(string queuename, t content, long delay) {
rblockingqueue<t> blockingfairqueue = redissonclient.getblockingqueue(redisqueueinit.buildqueuename(queuename));
rdelayedqueue<t> delayedqueue = redissonclient.getdelayedqueue(blockingfairqueue);
delayedqueue.offer(content, delay, timeunit.milliseconds);
}
}
测试
创建监听对象
import cn.yiyanc.infrastructure.redis.annotation.redisqueue;
import cn.yiyanc.infrastructure.redis.queue.redisqueuelistener;
import lombok.extern.slf4j.slf4j;
import org.springframework.stereotype.component;
/**
* @author 十八
* @createtime 2024-09-10 00:09
*/
@slf4j
@component
@redisqueue("test")
public class testlistener implements redisqueuelistener<string> {
@override
public void invoke(string content) {
log.info("队列消息接收 >>> {}", content);
}
}
测试用例
import jakarta.annotation.resource;
import org.springframework.web.bind.annotation.postmapping;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
/**
* @author 十八
* @createtime 2024-09-10 00:11
*/
@restcontroller
@requestmapping("queue")
public class queuecontroller {
@resource
private redisqueueservice redisqueueservice;
@postmapping("send")
public void send(string message) {
redisqueueservice.send("test", message);
redisqueueservice.senddelay("test", "delay messaege -> " + message, 1000);
}
}
测试结果

到此这篇关于springboot集成redisson实现消息队列的示例代码的文章就介绍到这了,更多相关springboot redisson消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论