当前位置: 代码网 > it编程>编程语言>Java > SpringBoot集成Redisson实现消息队列的示例代码

SpringBoot集成Redisson实现消息队列的示例代码

2024年10月10日 Java 我要评论
包含组件内容redisqueue:消息队列监听标识redisqueueinit:redis队列监听器redisqueuelistener:redis消息队列监听实现redisqueueservice:

包含组件内容

  • redisqueue:消息队列监听标识
  • redisqueueinit:redis队列监听器
  • redisqueuelistener:redis消息队列监听实现
  • redisqueueservice:redis消息队列服务工具

代码实现

redisqueue

import java.lang.annotation.elementtype;
import java.lang.annotation.retention;
import java.lang.annotation.retentionpolicy;
import java.lang.annotation.target;

/**
 * redis消息队列注解
 */
@target(elementtype.type)
@retention(retentionpolicy.runtime)
public @interface redisqueue {
    /**
     * 队列名
     */
    string value();
}

redisqueueinit

import jakarta.annotation.resource;
import java.util.map;
import java.util.concurrent.executorservice;
import java.util.concurrent.linkedblockingqueue;
import java.util.concurrent.threadfactory;
import java.util.concurrent.threadpoolexecutor;
import java.util.concurrent.timeunit;
import java.util.concurrent.atomic.atomicboolean;
import java.util.concurrent.atomic.atomicinteger;
import lombok.extern.slf4j.slf4j;
import org.jetbrains.annotations.notnull;
import org.redisson.redissonshutdownexception;
import org.redisson.api.rblockingqueue;
import org.redisson.api.redissonclient;
import org.springframework.beans.beansexception;
import org.springframework.context.applicationcontext;
import org.springframework.context.applicationcontextaware;
import org.springframework.stereotype.component;

/**
 * 初始化redis队列监听器
 *
 * @author 十八
 * @createtime 2024-09-09 22:49
 */
@slf4j
@component
public class redisqueueinit implements applicationcontextaware {

    public static final string redis_queue_prefix = "redis-queue";
    final atomicboolean shutdownrequested = new atomicboolean(false);
    @resource
    private redissonclient redissonclient;
    private executorservice executorservice;

    public static string buildqueuename(string queuename) {
        return redis_queue_prefix + ":" + queuename;
    }

    @override
    public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
        map<string, redisqueuelistener> queuelisteners = applicationcontext.getbeansoftype(redisqueuelistener.class);
        if (!queuelisteners.isempty()) {
            executorservice = createthreadpool();
            for (map.entry<string, redisqueuelistener> entry : queuelisteners.entryset()) {
                redisqueue redisqueue = entry.getvalue().getclass().getannotation(redisqueue.class);
                if (redisqueue != null) {
                    string queuename = redisqueue.value();
                    executorservice.submit(() -> listenqueue(queuename, entry.getvalue()));
                }
            }
        }
    }

    private executorservice createthreadpool() {
        return new threadpoolexecutor(
                runtime.getruntime().availableprocessors() * 2,
                runtime.getruntime().availableprocessors() * 4,
                60l, timeunit.seconds,
                new linkedblockingqueue<>(100),
                new namedthreadfactory(redis_queue_prefix),
                new threadpoolexecutor.callerrunspolicy()
        );
    }

    private void listenqueue(string queuename, redisqueuelistener redisqueuelistener) {
        queuename = buildqueuename(queuename);
        rblockingqueue<?> blockingqueue = redissonclient.getblockingqueue(queuename);
        log.info("redis队列监听开启: {}", queuename);
        while (!shutdownrequested.get() && !redissonclient.isshutdown()) {
            try {
                object message = blockingqueue.take();
                executorservice.submit(() -> redisqueuelistener.consume(message));
            } catch (redissonshutdownexception e) {
                log.info("redis连接关闭,停止监听队列: {}", queuename);
                break;
            } catch (exception e) {
                log.error("监听队列异常: {}", queuename, e);
            }
        }
    }

    public void shutdown() {
        if (executorservice != null) {
            executorservice.shutdown();
            try {
                if (!executorservice.awaittermination(60, timeunit.seconds)) {
                    executorservice.shutdownnow();
                }
            } catch (interruptedexception ex) {
                executorservice.shutdownnow();
                thread.currentthread().interrupt();
            }
        }
        shutdownrequested.set(true);
        if (redissonclient != null && !redissonclient.isshuttingdown()) {
            redissonclient.shutdown();
        }
    }

    private static class namedthreadfactory implements threadfactory {
        private final atomicinteger threadnumber = new atomicinteger(1);
        private final string nameprefix;

        public namedthreadfactory(string prefix) {
            this.nameprefix = prefix;
        }

        @override
        public thread newthread(@notnull runnable r) {
            return new thread(r, nameprefix + "-" + threadnumber.getandincrement());
        }
    }

}

redisqueuelistener

/**
 * redis消息队列监听实现
 *
 * @author 十八
 * @createtime 2024-09-09 22:51
 */
public interface redisqueuelistener<t> {

    /**
     * 队列消费方法
     *
     * @param content 消息内容
     */
    void consume(t content);
}

redisqueueservice

import jakarta.annotation.resource;
import java.util.concurrent.timeunit;
import org.redisson.api.rblockingqueue;
import org.redisson.api.rdelayedqueue;
import org.redisson.api.redissonclient;
import org.springframework.stereotype.component;

/**
 * redis 消息队列服务
 *
 * @author 十八
 * @createtime 2024-09-09 22:52
 */
@component
public class redisqueueservice {

    @resource
    private redissonclient redissonclient;

    /**
     * 添加队列
     *
     * @param queuename 队列名称
     * @param content   消息
     * @param <t>       泛型
     */
    public <t> void send(string queuename, t content) {
        rblockingqueue<t> blockingqueue = redissonclient.getblockingqueue(redisqueueinit.buildqueuename(queuename));
        blockingqueue.add(content);
    }

    /**
     * 添加延迟队列
     *
     * @param queuename 队列名称
     * @param content   消息类型
     * @param delay     延迟时间
     * @param timeunit  单位
     * @param <t>       泛型
     */
    public <t> void senddelay(string queuename, t content, long delay, timeunit timeunit) {
        rblockingqueue<t> blockingfairqueue = redissonclient.getblockingqueue(redisqueueinit.buildqueuename(queuename));
        rdelayedqueue<t> delayedqueue = redissonclient.getdelayedqueue(blockingfairqueue);
        delayedqueue.offer(content, delay, timeunit);
    }

    /**
     * 发送延迟队列消息(单位毫秒)
     *
     * @param queuename 队列名称
     * @param content   消息类型
     * @param delay     延迟时间
     * @param <t>       泛型
     */
    public <t> void senddelay(string queuename, t content, long delay) {
        rblockingqueue<t> blockingfairqueue = redissonclient.getblockingqueue(redisqueueinit.buildqueuename(queuename));
        rdelayedqueue<t> delayedqueue = redissonclient.getdelayedqueue(blockingfairqueue);
        delayedqueue.offer(content, delay, timeunit.milliseconds);
    }
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.redisqueue;
import cn.yiyanc.infrastructure.redis.queue.redisqueuelistener;
import lombok.extern.slf4j.slf4j;
import org.springframework.stereotype.component;

/**
 * @author 十八
 * @createtime 2024-09-10 00:09
 */
@slf4j
@component
@redisqueue("test")
public class testlistener implements redisqueuelistener<string> {
    @override
    public void invoke(string content) {
        log.info("队列消息接收 >>> {}", content);
    }
}

测试用例

import jakarta.annotation.resource;
import org.springframework.web.bind.annotation.postmapping;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

/**
 * @author 十八
 * @createtime 2024-09-10 00:11
 */
@restcontroller
@requestmapping("queue")
public class queuecontroller {

    @resource
    private redisqueueservice redisqueueservice;

    @postmapping("send")
    public void send(string message) {
        redisqueueservice.send("test", message);
        redisqueueservice.senddelay("test", "delay messaege -> " + message, 1000);
    }

}

测试结果

到此这篇关于springboot集成redisson实现消息队列的示例代码的文章就介绍到这了,更多相关springboot redisson消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com