当前位置: 代码网 > it编程>编程语言>Java > 详解RabbitMQ核心机制

详解RabbitMQ核心机制

2025年09月28日 Java 我要评论
1. mq1.1 mq 概述mq,消息队列,一种在分布式系统中用于通信的关键组件本质上是一个队列,遵循 fifo(先入先出)原则,队列中存储的内容是消息(message)消息可以非常简单,比如只包含文

1. mq

1.1 mq 概述

mq,消息队列,一种在分布式系统中用于通信的关键组件

本质上是一个队列,遵循 fifo(先入先出)原则,队列中存储的内容是消息(message)

消息可以非常简单,比如只包含文本字符串或 json 数据,也可以很复杂,如内嵌对象。mq 主要用于分布式系统之间的通信,解决数据传递的效率和可靠性问题

1.2 系统间通信方式

在分布式系统中,系统之间的调用通常有两种方式:

1.同步通信:

直接调用对方的服务,数据从一端发出后立即到达另一端。这种方式响应快,但可能导致调用方阻塞,尤其在处理耗时操作时效率低下

2.异步通信

数据从一端发出后,先进入一个容器进行临时存储,当满足特定条件(如接收方准备好)时,再由容器转发给另一端

mq 就是这个容器的具体实现,它解耦了发送方和接收方,提高了系统的灵活性和可扩展性

1.3 mq的作用

mq的核心工作是接收、存储和转发消息

1.3.1.异步解耦:

一些耗时操作(如发送注册短信或邮件通知)不需要即时返回结果。mq 可以将这些操作异步化。例如,用户注册后,系统立即返回注册成功消息,同时将通知任务放入 mq;mq 在后台异步处理通知,避免了用户等待

这降低了系统耦合度,提升响应速度。

1.3.2.流量削峰:

面对突发流量(如秒杀或促销活动),系统可能因过载而崩溃,mq 能缓冲请求,将峰值流量排队处理

例如,在高并发场景下,请求先进入 mq 队列,系统根据自身处理能力逐步消费消息,防止资源耗尽

这避免了为处理峰值而过度投资资源,优化了成本效率。

1.3.3.消息分发:

当多个系统需要对同一数据做出响应时,mq 可实现高效的消息分发

例如,支付成功后,支付系统向 mq 发送一条消息;其他系统(如订单系统、库存系统)订阅该消息,无需轮询数据库

这减少了冗余查询,提高了数据一致性和系统性能。

1.3.4.延迟通知:

mq 支持延迟消息功能,适用于在特定时间后触发操作的场景

例如,在电子商务平台中,用户下单后未支付,系统将超时取消订单的任务放入 mq 延迟队列;mq 在指定时间(如下单后 30 分钟)自动发送消息,触发取消流程

这简化了定时任务管理,提升了用户体验

1.4.rabbitmq

rabbitmq 是 mq 的一种流行实现,它基于 amqp(高级消息队列协议),提供了可靠的消息传递、队列管理和路由功能。并能处理高吞吐量和复杂消息的路由需求

2.rabbitmq 工作模式

rabbitmq支持多种工作模式来处理消息的生产和消费。这些模式适用于不同场景,帮助实现高效、可靠的消息传递

2.1 simple (简单模式)

最基本的点对点模式。生产者(p)将消息发送到队列(queue),消费者(c)从队列中取出消息。队列充当缓存区,确保消息在传递过程中不会丢失

一个生产者对应一个消费者,每条消息只能被消费一次,简单易用

适用场景:消息需要被单个消费者处理的场景,例如日志记录

2.2 work queue (工作队列模式)

扩展了简单模式,消息被分发到不同的消费者,实现负载均衡

每个消费者接收不同的消息,同时支持并行处理,提高系统吞吐量

适用场景:集群环境中的异步任务处理,例如短信通知服务:订单消息发送到队列,多个短信服务实例竞争消息并发送通知

2.3 publish/subscribe (发布/订阅模式)

引入交换机,生产者发送消息到交换机,交换机将消息复制并广播到所有绑定的队列,每个队列对应一个消费者。

适用场景:消息需要被多个消费者同时接收的场景,例如实时通知或广播消息

2.4 routing(路由模式)

发布/订阅模式的变种,增加路由键。生产者发送消息时指定routingkey,交换机根据bindingkey规则将消息筛选后路由到特定队列

适用场景:需要根据特定规则分发消息的场景,例如将错误日志路由到专门的处理服务

2.5 topics(通配符模式)

路由模式的升级版,支持通配符匹配routingkey。routingkey使用点分隔符(如"order.*"),交换机根据模式规则路由消息

适用场景:需要灵活匹配和过滤消息的场景,例如订单系统中的多级分类

2.6 rpc(rpc通信模式)

实现远程过程调用(rpc),生产者发送请求消息,消费者处理并返回响应。通过两个队列(请求队列和响应队列)模拟回调机制

适用场景:分布式系统中的远程调用,例如微服务间的方法调用

2.7 publisher confirms(发布确认模式)

确保消息可靠发送到rabbitmq服务器的机制。生产者将通道设置为confirm模式后,每条消息获得唯一id,服务器异步发送确认(ack)表示消息已接收

适用场景:对数据安全性要求高的场景,例如金融交易或订单处理(如支付系统)

3. rabbitmq 的实现

3.1.spring amqp

spring 提供了rabbitmq 开发的封装,spring amqp通过集成spring生态,大幅简化了消息队列的实现

3.1.1.引入依赖

<!--spring mvc相关依赖--> 
<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-test</artifactid>
    <scope>test</scope>
</dependency>
<!--rabbitmq相关依赖--> 
<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

3.1.2.配置 constants

public class constants {
    public static final string work_queue = "work_queue";
    public static final string fanout_queue1 = "fanout_queue1";
    public static final string fanout_queue2 = "fanout_queue2";
    public static final string fanout_exchange = "fanout_exchange";
    public static final string direct_queue1 = "direct_queue1";
    public static final string direct_queue2 = "direct_queue2";
    public static final string direct_exchange = "direct_exchange";
    public static final string topic_queue1  = "topic_queue1";
    public static final string topic_queue2 = "topic_queue2";
    public static final string topic_exchange = "topic_exchange";
}

3.1.3.配置 config

@configuration
public class rabbitmqconfig {
    //work_queue
    @bean("workqueue")
    public queue workqueue(){
        return queuebuilder.durable(constants.work_queue).build();
    }
    //fanout_queue
    @bean("fanoutqueue1")
    public queue fanoutqueue1(){
        return queuebuilder.durable(constants.fanout_queue1).build();
    }
    @bean("fanoutqueue2")
    public queue fanoutqueue2(){
        return queuebuilder.durable(constants.fanout_queue2).build();
    }
    @bean("fanoutexchange")
    public fanoutexchange fanoutexchange(){
        return exchangebuilder.fanoutexchange(constants.fanout_exchange).build();
    }
    @bean("bindingfanoutqueue1")
    public binding bindingfanoutqueue1(@qualifier("fanoutexchange") fanoutexchange fanoutexchange,@qualifier("fanoutqueue1")  queue queue){
        return bindingbuilder.bind(queue).to(fanoutexchange);
    }
    @bean("bindingfanoutqueue2")
    public binding bindingfanoutqueue2(@qualifier("fanoutexchange") fanoutexchange fanoutexchange,@qualifier("fanoutqueue2")  queue queue){
        return bindingbuilder.bind(queue).to(fanoutexchange);
    }
    // direct_queue
    @bean("directqueue1")
    public queue directqueue1(){
        return queuebuilder.durable(constants.direct_queue1).build();
    }
    @bean("directqueue2")
    public queue directqueue2(){
        return queuebuilder.durable(constants.direct_queue2).build();
    }
    @bean("directexchange")
    public directexchange directexchange(){
        return exchangebuilder.directexchange(constants.direct_exchange).build();
    }
    @bean("bindingdirectqueue1")
    public binding bindingdirectqueue1(@qualifier("directexchange") directexchange directexchange, @qualifier("directqueue1") queue queue){
        return bindingbuilder.bind(queue).to(directexchange()).with("a");
    }
    @bean("bindingdirectqueue2")
    public binding bindingdirectqueue2(@qualifier("directexchange") directexchange directexchange, @qualifier("directqueue2") queue queue){
        return bindingbuilder.bind(queue).to(directexchange()).with("b");
    }
    @bean("bindingdirectqueue3")
    public binding bindingdirectqueue3(@qualifier("directexchange") directexchange directexchange, @qualifier("directqueue2") queue queue){
        return bindingbuilder.bind(queue).to(directexchange()).with("c");
    }
    @bean("topicqueue1")
    public queue topicqueue1(){
        return queuebuilder.durable(constants.topic_queue1).build();
    }
    @bean("topicqueue2")
    public queue topicqueue2(){
        return queuebuilder.durable(constants.topic_queue2).build();
    }
    @bean("topicexchange")
    public topicexchange topicexchange(){
        return exchangebuilder.topicexchange(constants.topic_exchange).build();
    }
    @bean("bindingtopicqueue1")
    public binding bindingtopicqueue1(@qualifier("topicexchange") topicexchange topicexchange, @qualifier("topicqueue1") queue queue){
        return bindingbuilder.bind(queue).to(topicexchange).with("*.x.*");
    }
    @bean("bindingtopicqueue2")
    public binding bindingtopicqueue2(@qualifier("topicexchange") topicexchange topicexchange, @qualifier("topicqueue2") queue queue){
        return bindingbuilder.bind(queue).to(topicexchange).with("*.*.y");
    }
    @bean("bindingtopicqueue3")
    public binding bindingtopicqueue3(@qualifier("topicexchange") topicexchange topicexchange, @qualifier("topicqueue2") queue queue){
        return bindingbuilder.bind(queue).to(topicexchange).with("xy'.#");
    }
}

可以通过配置 config 一次性大量的声明,队列、交换机、绑定关系等,大幅度缩减了频繁创建文件的次数

3.1.4.producercontroller

@restcontroller
@requestmapping("/producer")
public class producercontroller {
    @autowired
    private rabbittemplate rabbittemplate;
    @requestmapping("/work")
    public string work(){
        for (int i = 0; i <10 ; i++) {
            rabbittemplate.convertandsend("", constants.work_queue,"hello work queue" + i);
        }
        return "发送成功";
    }
    @requestmapping("/fanout")
    public string fanout(){
        for (int i = 0; i <10 ; i++) {
            rabbittemplate.convertandsend(constants.fanout_exchange,"","hello fanout queue" + i);
        }
        return "发送成功";
    }
    @requestmapping("/direct/{routingkey}")
    public string direct(@pathvariable("routingkey") string routingkey){
        rabbittemplate.convertandsend(constants.direct_exchange,routingkey,"hello direct this is routingkey " + routingkey);
        return "发送成功";
    }
    @requestmapping("/topic/{routingkey}")
    public string topic(@pathvariable("routingkey") string routingkey){
        rabbittemplate.convertandsend(constants.topic_exchange,routingkey,"hello topic " + routingkey);
        return "发送成功";
    }
}

这是一个基于spring boot的rabbitmq消息生产者控制器(producercontroller),用于向rabbitmq消息队列发送消息。它实现了四种常见的消息队列模式,通过http接口触发消息发送

在实际应用中,它可以作为微服务架构中的生产者模块,用于解耦系统组件、实现异步处理或事件驱动架构

3.1.5.worklistener

@component
public class worklistener {
    @rabbitlistener( queues = constants.work_queue )
    public void worklistener1(string message) {
        system.out.println("队列["+constants.work_queue+"] 接收到消息:" + message);
    }
    @rabbitlistener( queues = constants.work_queue )
    public void worklistener2(string message) {
        system.out.println("队列["+constants.work_queue+"] 接收到消息:" + message);
    }
}

这是一个 spring 组件,用于实现rabbitmq的并行消费功能

@rabbitlistener 不仅可用于方法,还可用于类级别。当标注在处理方法时如上图代码所示,当标注在类时,需要搭配 @rabbithandler 使用,将 @rabbithandler 标注在类的方法上

3.1.6.fanoutlistener

@component
public class fanoutlistener {
    @rabbitlistener(queues = constants.fanout_queue1)
    public void fanoutlistener1(string message){
        system.out.println("队列["+constants.fanout_queue1+"] 接收到消息:" + message);
    }
    @rabbitlistener(queues = constants.fanout_queue2)
    public void fanoutlistener2(string message){
        system.out.println( "队列["+constants.fanout_queue2+"] 接收到消息:" + message);
    }
}

fanoutlistener 是一个基于 spring amqp 的消息消费者组件,专门用于处理 ​fanout 类型交换机的消息。它通过@rabbitlistener注解监听两个不同的队列,实现 ​广播模式​ 的消息消费

3.1.7.directlistener

@component
public class directlistener {
    @rabbitlistener(queues = constants.direct_queue1)
    public void queuelistener1(string msg) throws interruptedexception {
        system.out.println("队列["+constants.direct_queue1+"] 接收到消息:" + msg);
    }
    @rabbitlistener(queues = constants.direct_queue2)
    public void queuelistener2(string msg) throws interruptedexception {
        system.out.println("队列["+constants.direct_queue2+"] 接收到消息:" + msg);
    }
}

directlistener专门用于处理 ​direct 类型交换机的消息,实现​路由键精准匹配​的消息分发模式,实现原理上同

3.1.8.topicslistener

public class topicslistener {
    @rabbitlistener(queues = constants.topic_queue1)
    public void topiclistener1(string message){
        system.out.println( "队列["+constants.topic_queue1+"] 接收到消息:" + message);
    }
    @rabbitlistener(queues = constants.topic_queue2)
    public void topiclistener2(string message){
        system.out.println( "队列["+constants.topic_queue2+"] 接收到消息:" + message);
    }
}

topicslistener专门用于处理 ​topic 类型交换机的消息,实现 ​通配符路由​ 的灵活消息分发模式

总结:

spring amqp:基于spring框架的抽象层,提供声明式配置(如注解驱动),简化了消息生产、消费和资源管理。它封装了原生api,减少了样板代码,支持与spring boot无缝集成。适用于快速开发、维护性要求高的企业应用

4.rabbitmq 高级特性

4.1 消息确认

消息确认用于确保消息被消费者正确处理,防止消息丢失或重复处理。它通过让消费者向生产者或代理发送确认信号来实现

4.1.1.自动确认

消息代理在将消息传递给消费者后,将立即自动确认,无需消费者干预

这种方式简单高效,但风险较高:如果消费者处理消息时崩溃,消息可能丢失,因为没有重试机制。

4.1.2手动确认

消费者在处理消息后,需要显式发送确认信号给消息代理

这种方式提供了更高的控制性,确保消息只有在成功处理后才被标记为完成。如果处理失败,消费者可以选择重新入队或丢弃消息

4.1.2.1.手动确认步骤

1.消费者订阅消息:消费者连接到队列,并声明需要手动确认模式。

2.处理消息:消费者接收消息并执行业务逻辑(如数据处理或存储)。

3.发送确认信号:如果处理成功,消费者发送一个确认(ack)信号给代理;如果失败,发送否定确认(nack)信号,让消息重新入队或丢弃。

4.代理响应:代理收到ack后,从队列中删除消息;收到nack后,根据配置重试或移至死信队列

4.1.2.2.代码实现

在 application.yml 中配置相关属性

  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

controller

@restcontroller
@requestmapping("/ack")
public class producercontroller {
    @autowired
    rabbittemplate rabbittemplate;
    @requestmapping("/auto")
    public string auto() {
        for (int i = 0; i < 1; i++) {
            rabbittemplate.convertandsend(constants.akc_exchange_auto,"abc","auto 发送消息:" + i);
        }
        return "auto success";
    }
    @requestmapping("/none")
    public string none() {
        for (int i = 0; i < 1; i++) {
            rabbittemplate.convertandsend(constants.akc_exchange_none,"abc","none 发送消息:" + i);
        }
        return "none success";
    }
    @requestmapping("/manual")
    public string manual() {
        for (int i = 0; i < 1; i++) {
            rabbittemplate.convertandsend(constants.akc_exchange_manual,"abc","manual 发送消息:" + i);
        }
        return "manual success";
    }

nonelistener

@component
public class nonelistener {
    @rabbitlistener(queues = constants.akc_queue_none)
    public void listen(message message, channel channel) throws exception {
        system.out.println("接收到消息: "+ new string(message.getbody(),"utf-8")+
                "deliverytag: " + message.getmessageproperties().getdeliverytag() );
        system.out.println("auto 业务逻辑处理完毕");
    }
}

autolistener

@component
public class autolistener {
    @rabbitlistener(queues = constants.akc_queue_auto)
    public void listen(message message, channel channel) throws exception {
        system.out.println("接收到消息: "+ new string(message.getbody(),"utf-8")+
                "deliverytag: " + message.getmessageproperties().getdeliverytag() );
        system.out.println("auto 业务逻辑处理完毕");
    }
}

消费端不确认收到消息时会自动重复消息入队

4.2.持久性

在 rabbitmq 中,持久性是确保消息在服务意外停止或重启后不丢失的关键机制。它通过交换器、队列和消息三部分的持久化来实现

4.2.1.交换机持久化

在 spring amqp 中,交换器是消息路由的入口。如果交换器不持久化,rabbitmq服务重启后,其元数据(如名称、类型)会丢失,导致消息无法被正确路由

实现时,在声明交换器时将 durable 参数设为true

exchange exchange = exchangebuilder.topicexchange("myexchange").durable(true).build();

4.2.2.队列持久化

在 spring amqp 中,队列是消息存储的容器。如果队列不持久化,服务重启后队列会被删除,所有消息也会丢失

实现时,在声明队列时将 durable 参数设为true。也就是在声明队列时,使用 .durable() 来使队列持久化

queue queue = queuebuilder.durable("myqueue").build(); // 默认durable=true

4.2.3.消息持久化

消息本身需要显式设置为持久化,否则即使队列持久化,消息也可能在重启后丢失

4.2.3.1.rabbitmq 客户端实现

在 rabbitmq 客户端中,可以通过设置 messageproperties.persistent_text_plain ,将这个参数传入 channel.basicpublish() 中 完成消息持久化,当然 队列持久化是消息持久化的前提。

 string messagecontent = "this is a persistent message";
            // 2. 设置消息属性为持久化
            channel.basicpublish("", queue_name, messageproperties.persistent_text_plain,
 messagecontent.getbytes());

messageproperties.persistent_text_plain 是库 com.rabbitmq.client.messageproperties 类中的一个静态常量,不需要用户手动编写代码,用于设置消息为持久化模式。

4.2.3.2.rabbittemplate 实现

如果使用 rabbittemplate 发送持久化消息,代码如下:

public class test {
    public static void main(string[] args) {
        string message = "this is a persistent message";
        message messageobject = new message(message.getbytes(), new messageproperties());
        messageobject.getmessageproperties().setdeliverymode(messagedeliverymode.persistent);
        rabbittemplate.convertandsend(constant.ack_exchange_name, "ack", messageobject);
    }
}

message :

属于org.springframework.amqp.core.message 类, 是spring amqp框架中的核心类,用于封装一条amqp消息

它表示发送或接收的消息实体,包含消息体(payload)和消息属性(如头部信息、路由键等)。主要作用包括:

存储消息的原始字节数据(body)

提供访问和修改消息元数据的接口(通过 messageproperties)

在rabbitmq客户端和服务器之间传输消息时,确保数据结构的标准化

message.getbytes:将字符串消息转换为字节数组,作为消息的实际内容(body),以便rabbitmq处理。消息体必须是字节数组,因为amqp协议支持二进制数据传输

new messageproperties():初始化消息的属性对象,用于设置消息的元数据(如头部、优先级、持久化模式等)

4.3.发送方确认

在使用rabbitmq时,消息持久化可以防止服务器崩溃导致的消息丢失,但如果消息在传输过程中丢失(例如rabbitmq重启期间生产者投递失败),消息根本未到达服务器,持久化也无法解决

rabbitmq提供了confirm机制(发送方确认)来确保生产者知道消息是否成功到达exchange。相比事务机制,confirm机制性能更高,是实际工作中的首选方案

confirm 机制允许生产者设置一个回调监听 (confirmcallback)。无论消息是否到达 exchange,rabbitmq 都会触发回调:

1.如果消息成功到达exchange,回调返回 ack = true。
2.如果消息未到达exchange(例如网络故障或exchange不存在),回调返回 ack = false,并提供失败原因(cause)。
该机制仅确认消息是否到达 exchange,不保证消息被 queue 处理(后续需结合return退回模式处理queue级错误)

4.3.1.配置 confirm 机制

    listener:
      simple:
        acknowledge-mode: manual
    publisher-confirm-type: correlated

4.3.2.confirm 代码实现

1.先创建 rabbittemplate bean,让新创建的 rabbittemplate 实现 confirmcallback 接口,使其替代默认的 rabbittemplate

2.重写 confirmcallback 接口内的 confirm 方法,以此来实现 confirm 机制

3.完善 confirm 内的逻辑,如果为 ack 即 exchange 收到消息,完善相应逻辑;如果为 false 可以打印失败 cause 来完善业务逻辑

    @requestmapping("/confirm")
    public string confirm() {
        rabbittemplate.setconfirmcallback(new rabbittemplate.confirmcallback() {
            @override
            public void confirm(correlationdata correlationdata, boolean b, string s) {
                system.out.println("confirm 生产端发送成功" );
                if(b){
                    system.out.printf("exchange 接收到消息 ,消息id: %s \n",correlationdata == null? null : correlationdata.getid());
                }else {
                    system.out.printf("exchange 未接收到消息,消息id:%s,cause:%s\n",correlationdata == null? null : correlationdata.getid()
                            ,s);
                }
            }
        });
        correlationdata correlationdata = new correlationdata("1");
        rabbittemplate.convertandsend(constants.direct_exchange+1,"abc","confirm.test",correlationdata);
        return "confirm success";
    }

但rabbittemplate 是单例对象,所以存在两个问题。

1.在 confirm 中设置 rabbittemplate 会影响所有使用 rabbittemplate 的方法

2.重复调用接口会提示错误

可以直接创建一个新的 rabbittemplate 类,但是需要创建一个原本的 rabbittemplate 给其他方法调用,修改完的 rabbittemplate 代码如下:

@configuration
public class rabbittemplateconfig {
    @bean
    public rabbittemplate rabbittemplate(connectionfactory connectionfactory) {
        return new rabbittemplate(connectionfactory);
    }
    @bean
    public rabbittemplate confirmrabbittemplate(connectionfactory connectionfactory) {
        rabbittemplate rabbittemplate = new rabbittemplate(connectionfactory);
        rabbittemplate.setconfirmcallback(new rabbittemplate.confirmcallback() {
            @override
            public void confirm(correlationdata correlationdata, boolean b, string s) {
                system.out.println("confirm 生产端发送成功" );
                if(b){
                    system.out.printf("exchange 接收到消息 ,消息id: %s \n" , correlationdata == null? null : correlationdata.getid());
                }else {
                    system.out.printf("exchange 未接收到消息,消息id:%s , cause: %s\n",correlationdata == null? null : correlationdata.getid()
                            ,s);
                }
            }
        });
        return rabbittemplate;
    }
}

4.4.重试机制

rabbitmq 的重试机制基于消息确认模式:

自动确认模式:rabbitmq在消息投递给消费者后自动确认。如果消费者处理失败(如抛出异常),rabbitmq会根据配置参数自动重试消息。例如,设置重试次数 n,rabbitmq会负责重发消息 n 次。
手动确认模式:消费者需显式调用确认方法。如果处理失败,应用程序可选择是否重试(通过设置 requeue 参数)。这给予应用更多控制权,但重试逻辑需由开发者实现。

重试机制配置

  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto/manual
        retry:
          enabled: true         # 开启重试机制
          initial-interval: 5000ms  # 初始重试间隔,例如5秒
          max-attempts: 5  # 最大重试次数(包括首次消

自动确认重试模式

  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto
        retry:
          enabled: true         # 开启重试机制
          initial-interval: 5000ms  # 初始重试间隔,例如5秒
          max-attempts: 5  # 最大重试次数(包括首次消费)

没开启重试机制之前,rabbitmq 能自动的将处理失败的消息一直重新入队也不会抛出异常。

重试机制开启之后,会根据设置的次数重新入队,当设置的次数耗尽也没有解决问题时,就会抛出异常。

4.5.ttl

ttl, 过期时间,单位为毫秒(ms)是rabbitmq中用于控制消息或队列生命周期的机制。

当消息或队列超过设定的存活时间后,未被消费的消息会被自动清除。

这适用于电商订单超时取消(如24小时未付款自动取消订单)或退款超时处理(如7天未处理自动退款)等场景

ttl为 0 表示消息必须立即投递给消费者,否则被丢弃;未设置ttl表示消息永不过期

4.5.1.设置消息的 ttl

1.创建 messagepostprocessor 类对象,重写 postprocessmessage 方法

2.在重写的方法里通过 message.getmessageproperties().setexpiration("10000") 设置过期时间,单位毫秒

3.把该 messagepostprocessor 类对象作为参数传输到 rabbittemplate.convertandsend。

    @requestmapping("/ttlmessage")
    public string ttlmessage(){
        system.out.println("ttl......");
        messagepostprocessor messagepostprocessor = new messagepostprocessor() {
            @override
            public message postprocessmessage(message message) throws amqpexception {
                message.getmessageproperties().setexpiration("10000");
                return message;
            }
        };
        system.out.println("ttl2 .....");
        rabbittemplate.convertandsend(constants.ttl_exchange,"abc","ttl",messagepostprocessor);
        return "ttlmessage success";
    }

4.5.2.设置队列的 ttl

只需要在声明队列时加上 .ttl() 就行

    @bean("ttlqueue")
    public queue ttlqueue(){
        return queuebuilder.durable(constants.ttl_queue).ttl(10000).build();
    }

4.5.3.最短ttl共享

消息队列会扫描当前队列中所有消息的ttl值,并选择最小的ttl作为队列的全局ttl。这意味着所有消息的过期时间会受到最短ttl的约束

4.6.死信

死信, 指因某些原因无法被消费的消息

当消息在一个队列中变成死信后,会被重新发送到另一个交换器,绑定该交换器的队列称为死信队列

消息变成死信的常见情况包括

  • 消息被拒绝(basic.reject/basic.nack)且 requeue 参数为 false。
  • 消息过期。
  • 队列达到最大长度。通过 .maxlength

当声明队列时,可以在队列加上 .deadletterexchange() 参数绑定死信交换机,接着加上 .deadletterroutingkey() 参数指定路由给 死信交换机 所绑定的死信队列

4.7.延迟队列

延迟队列(delayed queue)是一种消息传递机制,消息在发送后不会立即被消费者获取,而是等待特定时间后才可被消费

这在许多场景中非常有用,例如智能家居:用户指令在指定时间后执行,或是日常管理:会议前15分钟自动提醒。

rabbitmq本身不直接支持延迟队列,但可以通过ttl和死信队列组合模拟实现。然而,这种方式存在局限性,尤其在处理不同延迟时间的消息时

ttl机制允许设置消息的存活时间(单位:毫秒)。当消息过期时,它会被路由到死信队列,消费者从死信队列消费消息,实现延迟效果。关键步骤包括:

声明一个正常队列,并绑定到死信交换器。
生产者发送消息时,设置消息的ttl(例如10秒或20秒)。
消费者监听死信队列,消费过期消息。

4.7.1.延迟队列问题

当队列中包含不同ttl的消息时,rabbitmq只检查队首消息的过期时间。如果队首消息的ttl较长,后续短ttl消息会被阻塞,直到队首消息过期。

原因在于rabbitmq的队列设计:

队列检查原则:rabbitmq只监控队列头部(第一个消息)的过期时间。只有当头部消息过期或被消费后,才会检查下一个消息。
ttl计算起点:消息的ttl是从它被发布到队列时开始计算的(绝对时间),但rabbitmq只在处理头部时“发现”过期。
潜在问题:如果头部消息的ttl较长,它会阻塞后续消息的过期检查,即使后续消息的ttl更短。这可能导致短ttl消息延迟过期

解决方案一:为不同延迟时间创建独立队列

要解决此问题,核心思路是避免混合不同ttl的消息在同一个队列中。rabbitmq官方推荐为每个延迟时间创建独立的队列。这样,每个队列只处理单一ttl,确保过期检查的准确性。

1.设计多个队列:为每个需支持的延迟时间(如10秒、20秒)创建独立的正常队列。

每个队列绑定到同一个死信交换器。
设置队列的.deadletterexchange() 参数 和 .deadletterroutingkey()
2.生产者发送消息:根据消息的延迟需求,发送到对应的队列。

3.消费者统一监听死信队列:死信队列接收所有过期消息,消费者无需修改。

解决方案二:添加插件

通过下载插件, 添加完插件之后再创建延迟队列,只需要声明延迟交换机时添加 .delayed() 方法,设置消息时使用  message.setdelaylong() 指定延迟时间,无需额外设置过期时间或创建死信队列来完成延迟队列的功能。

4.8.消息分发

当队列有多个消费者时,rabbitmq默认采用轮询策略分发消息。每条消息只发给一个消费者,但分发顺序固定,不考虑消费者处理速度。例如:

消费者a处理速度快(每秒10条消息)。

消费者b处理速度慢(每秒2条消息)。

结果:消费者b可能积压消息,消费者a空闲,整体吞吐量下降。

因此, rabbitmq提供 channel.basicqos(prefetchcount) 方法来解决负载不均衡问题。其工作原理类似于tcp/ip的“滑动窗口”机制

prefetchcount:设置消费者允许的最大未确认消息数量(整数)。例如,(prefetchcount) = 5  表示每个消费者最多持有5条未处理消息。
计数机制:rabbitmq发送消息时计数+1,消费者确认消息后计数-1。当计数达 prefetchcount 上限时,rabbitmq暂停向该消费者发送新消息。
特殊值 prefetchcount = 0 表示无上限(恢复默认轮询)。

可以通过设置 application.yml 中的参数来设置 prefetchcount 的大小

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认模式
        prefetch: 5               # 设置 prefetchcount = 5

4.9.幂等性保障

幂等性是计算机科学中的关键概念,尤其在分布式系统和消息队列中至关重要。它确保操作被多次执行时,对系统状态的影响保持一致,从而避免重复处理导致的数据不一致

rabbitmq的传输保障模式:

最多一次:消息发送后,如果处理失败(如消费者崩溃、broker 传输给消费者失败),消息可能丢失,但不会重复。这适合对消息丢失不敏感的场景,比如日志记录。
最少一次:消息保证至少被处理一次,不会丢失。但如果处理失败,rabbitmq会重试发送消息,这可能导致消息被多次处理(即重复)。这是rabbitmq的默认模式。比如,通过发布者确认、消费者确认、broker 持久化等来保证 “最少一次” 的功能实现。
恰好一次:rabbitmq无法原生支持,因为分布式系统中存在网络故障、节点失败等问题,很难保证消息只传输一次。实现“恰好一次”需要复杂的机制,通常由应用层自己处理

在 mq 中,幂等性指同一条消息被多次消费时,对系统的影响一致。rabbitmq 支持“最多一次”和“最少一次”的传输保障,无法实现“恰好一次”。rabbitmq的 “最少一次” 模式通过重试保证消息不丢失

4.9.1.解决方案

解决幂等性的核心是为消息添加唯一标识,并确保消费前检查状态。以下是常用方法:

全局局唯一 id

1.为每条消息分配唯一 id(如 uuid 或 rabbitmq 自带 id),消费者处理前检查该 id 是否已消费。
2.使用 redis 的原子操作 setnx:将消息 id 作为 key,执行 setnx messageid 1。如果返回 1(key 不存在),则正常消费;返回 0(key 存在),则丢弃消息

业务逻辑判断:

在业务层检查数据状态,例如:

  • 查询数据库记录是否存在(如支付订单时验证是否已处理)。
  • 使用乐观锁机制:更新数据前检查版本号,确保操作只执行一次。
  • 状态机控制:业务对象定义状态(如“未处理”到“已完成”),仅当状态匹配时才处理。

到此这篇关于rabbitmq核心机制的文章就介绍到这了,更多相关rabbitmq机制内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com