当前位置: 代码网 > it编程>编程语言>Java > Springboot使用RabbitMq延迟队列和死信队列详解

Springboot使用RabbitMq延迟队列和死信队列详解

2025年10月24日 Java 我要评论
前言在最近的项目中,结合minio文件服务器的一些特性。需要做一个分片上传的功能:用户上传文件到md5的桶下,合并文件后删除这个临时桶。会出现这样一种情况,用户上传文件传到一半就不再上传了,那么如何去

前言

在最近的项目中,结合minio文件服务器的一些特性。

需要做一个分片上传的功能:用户上传文件到md5的桶下,合并文件后删除这个临时桶。

会出现这样一种情况,用户上传文件传到一半就不再上传了,那么如何去删除,什么时候去删除时需要解决问题。

一、业务解决方案

1.quartz定时器

如果是单体项目,可以考虑使用quartz定时器。在创建桶的时候加入到定时任务里。

2.redis定时器

redis定时器需要修改配置文件,并且对redis进行监听,在创建桶时,设置过时时间,一旦时间超时,可以对key进行捕捉,最好对名字进行规范设计以便于业务

3.mq消息队列

使用延迟队列和死信队列进行定时任务

这篇主要讲解mq的方式解决问题

二、rabbitmq延迟队列

1.延迟队列

延迟队列也是一个普通的队列,和普通的队列相比,他多了几个属性,比如:

1)延迟的时间:表示队列中消息的生命周期,在指定时间后,要么抛弃这个消息,要么投递到死信队列中

2)指定死信交换机:如果不希望丢弃这个消息,那么可以将这个过期的消息丢到死信队列中

定义一个延迟队列

    //桶延迟队列
    @bean(bucket_ttl_queue)
    public queue bucketttlqueue(){
        map<string,object> deadparamsmap = new hashmap<>();
        // 设置死信队列的exchange
        deadparamsmap.put("x-dead-letter-exchange",bucket_dead_exchange);
        //设置死信队列的routekey
        deadparamsmap.put("x-dead-letter-routing-key",bucket_dead_queue);
        // 设置对接过期时间"x-message-ttl"
        deadparamsmap.put("x-message-ttl",60000*5);//5分钟
        // 设置对接可以存储的最大消息数量
        //deadparamsmap.put("x-max-length",10);
        return new queue(bucket_ttl_queue,true,false,false,deadparamsmap);
    }

延迟队列交换机

如上所说,延迟队列本就是一个普通的队列,如果你想更细粒的对他进行控制,那么需要绑定交换机,如果不绑定交换机,会绑定到默认交换机,在发送消息时,交换机写""就行,默认交换机为直连交换机

我这里指定了延迟队列的交换机,因为没有做消息幂等性,所以采用直连交换机应对在集群下消息只被消费一次

    //桶延迟交换机
    @bean(bucket_ttl_exchange)
    public directexchange bucketttlexchange() {
        return new directexchange(bucket_ttl_exchange,true,false);
    }

    // 绑定
    @bean
    public binding bucketttlbinding() {
        return bindingbuilder.bind(bucketttlqueue())
                .to(bucketttlexchange())
                .with(bucket_ttl_queue);
    }

2.死信交换机

dlx也是一个正常的exchange,和一般的exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,rabbitmq就会自动的将这个消息重新发布到设置的exchange上去,进而被路由到另一个队列

死信交换机也是普通交换机,他只是你指定接收过期消息的交换机而已

    /**
     * 死信队列
     *
     * @return
     */
    @bean(bucket_dead_queue)
    public queue bucketdeadqueue() {
        //属性参数 队列名称 是否持久化
        return new queue(bucket_dead_queue, true);
    }

    /**
     * 死信队列交换机
     *
     * @return
     */
    @bean(bucket_dead_exchange)
    public directexchange bucketdeadexchange() {
        return new directexchange(bucket_dead_exchange,true,false);
    }

    /**
     * 给死信队列绑定交换机
     *
     * @return
     */
    @bean
    public binding bucketdeadbinding() {
        return bindingbuilder.bind(bucketdeadqueue()).to(bucketdeadexchange()).with(bucket_dead_queue);
    }

3.监听器

消息处理的逻辑,在消息过期后,送到死信交换机里,监听器监听到死信交换机的消息进行删除桶以及文件的业务逻辑处理

/**
 * @description:死信队列监听器,用来删除过期的桶
 * @author manchao
 * @date 2022/2/17 9:52
 */
@configuration
public class bucketdeadconsumer {

    @autowired
    private cachingconnectionfactory cachingconnectionfactory;

    @autowired
    private miniotemplate miniotemplate;

    @bean
    public simplemessagelistenercontainer bucketdeadlistenercontainer() {
        simplemessagelistenercontainer container = new  simplemessagelistenercontainer(cachingconnectionfactory);
        // 监听队列名
        container.setqueuenames(mymqconfig.bucket_dead_queue);
        // 当前消费者数量 开启几个线程去处理数据 支持运行时动态修改
        container.setconcurrentconsumers(5);
        // 最大消费者数量  ,  消息堵塞太多的时候,会帮我自动扩展到我的最大消费者数量
        container.setmaxconcurrentconsumers(10);
        // 是否重回队列
        container.setdefaultrequeuerejected(true);
        // 手动确认
        container.setacknowledgemode(acknowledgemode.manual);
        // 设置监听器
        container.setmessagelistener(new channelawaremessagelistener(){
            @override
            public void onmessage(message message, channel channel) throws ioexception {
                // 消息的唯一性id deliverytag:该消息的index 自增长
                long deliverytag = message.getmessageproperties().getdeliverytag();
                byte[] messagebody = message.getbody();
                string s = new string(messagebody);
                system.out.println("消息: " + s);
                system.out.println("消息来自: "+message.getmessageproperties().getconsumerqueue());
                system.out.println("交换机: "+message.getmessageproperties().getreceivedexchange());
                //删除桶
                try {
                    miniotemplate.removebuckets(s, "");
                    channel.basicack(deliverytag, false);
                } catch (ioexception e) {
                    e.printstacktrace();
                    channel.basicreject(deliverytag, false);
                }
            }
        });
        return container;
    }
}

注:我设置了消息发送和确认的回调函数,为什么没有触发这个函数?因为我是从后台管理页面发的消息,没有通过rabbitteplate进行发送,不会是这个原因吧!

总结

业务的解决方法有太多种了,找到一个高可用以及简便的方法才是解决问题的关键

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com