当前位置: 代码网 > 科技>电脑产品>内存 > RabbitMQ-如何保证消息不丢失

RabbitMQ-如何保证消息不丢失

2024年07月28日 内存 我要评论
由于mq是基于内存存储消息的,那么在mq服务宕机等一些情况下可能导致消息的丢失。RabbitMQ提供了publisher confirm机制来避免消息发送到Mq的过程中丢失,消息发送到Mq以后,会返回一个结果给发送者,表示消息的发送成功。第三种是correlated:MQ异步回调方式返回回执消息,即生产者发送完消息后可以干其他的事情,直到接收到mq的回执。第二种是 simple:表示同步阻塞并等待mq的回执消息,即发送完消息后不能干其他的事情,只能等待mq的回执,很显然这样效率很低。

rabbitmq常用于 异步发送,mysql,redis,es之间的数据同步 ,分布式事务,削峰填谷等.....

在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证rabbitmq的消息不丢失就显得尤为重要。

首先要分析问题,我们就要明确rabbitmq在什么时候可能会出现消息丢失的情况呢?

我们直接说结果

rabbitmq在每个阶段都有可能使消息发生丢失

我们在这里把他们简单归结为三个层面

层面一 :生产者发送消息没有到达交换机或者没有到达绑定的队列。

层面二:rabbitmq宕机可能导致的消息的丢失。

层面三:消费者宕机导致消息丢失。

层面一的解决方法常见的是

1.生产者确认机制

rabbitmq提供了publisher confirm机制来避免消息发送到mq的过程中丢失,消息发送到mq以后,会返回一个结果给发送者,表示消息的发送成功。

情况一:发送成功 生产者正常发送消息到队列之后会返回一个publish-confirm ack 这个意思是告诉生产者已经接收到消息了。

情况二:发送失败 这里的发送失败有两种,一种是生产者发送到交换机失败 此时返回 publish-confirm nack  。第二种是生产者发送到队列失败 返回 publish-return ack。

开启生产者确认机制的代码如下 ,在生产者的配置文件中加入以下配置
 

spring:
  rabbitmq:
    publisher-confirm-type: correlated #开启生产者确认机制
    publisher-returns: true

这里的

publisher-confirm-type:有三种模式可以选择:

第一种是none:代表关闭confirm机制

第二种是 simple:表示同步阻塞并等待mq的回执消息,即发送完消息后不能干其他的事情,只能等待mq的回执,很显然这样效率很低。

第三种是correlated:mq异步回调方式返回回执消息,即生产者发送完消息后可以干其他的事情,直到接收到mq的回执。很明显这种效率要优于第二种。

配置return callback的代码如下,每个rabbittemplate只能配置一个 代码如下
 

package com.itheima.publisher.com.it.heima.config;

import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.core.returnedmessage;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.beansexception;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.applicationcontext;
import org.springframework.context.applicationcontextaware;
import org.springframework.context.annotation.configuration;

/**
 * @auther: qujingchuan
 * @date: 2024/1/13 10:34
 * @description:
 */
@slf4j
@configuration
public class mqconfirmconfig implements applicationcontextaware {
    @autowired
    private rabbittemplate rabbittemplate;
    @override
    public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
        //配置回调
        rabbittemplate.setreturnscallback(new rabbittemplate.returnscallback() {
            @override
            public void returnedmessage(returnedmessage returnedmessage) {
                log.debug("收到消息return的callback,  {},{},{},{},{}",
                        returnedmessage.getexchange(),
                        returnedmessage.getroutingkey(),
                        returnedmessage.getmessage(),
                        returnedmessage.getreplycode(),
                        returnedmessage.getreplytext());
            }
        });
    }
}

confirm callback需要每次发消息的时候都要配置(要制定发消息的id方便回执的时候直到是谁发的消息)这里写一个测试类方便大家看。

 @test
    void testconfirmcallback() throws interruptedexception {
        //创建cd 参数为每次发送消息的id
        correlationdata correlationdata = new correlationdata(uuid.randomuuid().tostring());
        //添加confirmcallback
        correlationdata.getfuture().addcallback(new listenablefuturecallback<correlationdata.confirm>() {
            @override
            public void onfailure(throwable ex) {
                //这种情况一般是运行出现bug,一般不会发生。
                log.error("消息回调失败",ex);
            }

            @override
            public void onsuccess(correlationdata.confirm result) {
                log.debug("收到confirm callback 回执");
                if (result.isack()){
                    //消息发送成功
                    log.debug("消息发送成功收到ack");
                }else {
                    //消息发送失败
                    log.debug("消息发送失败收到nack,原因:{}",result.getreason());
                    //todo 重发消息等业务
                }
            }
        });

        rabbittemplate.convertandsend("amqp.test","amqptest","hello qjc",correlationdata);

        thread.sleep(2000);
    }

那么我们如何解决这个问题呢
方案一:重发消息 

方案二:记录日志

方案三:保存到数据库中定时发送,发送成功后删除表中的数据。

方案四:交给人工处理。

~生产者确认机制需要额外的网络和系统的资源开销,尽量不要使用。

~如果业务需要,那么无需开启publisher-return机制,因为一般路由失败都是自己业务的原因。

~对于nack消息可以有限次数的重试,依然失败则记录异常消息。

层面二的解决方法常见的是

2.消息持久化

由于mq是基于内存存储消息的,那么在mq服务宕机等一些情况下可能导致消息的丢失。同时内存空间有限,当消费者出现故障或者处理过慢,会导致消息积压,mq会对消息做迁移(page out 写入磁盘)从而引发mq阻塞。我们将消息存储在磁盘上就避免了这个问题。

一 :持久化交换机。

这里要选择durable,因为transient是临时交换机,当mq宕机后会消失。

代码展示
 

 @bean
    public directexchange simpleexchange(){
        //分别是三个参数 交换机名称 是否持久化 当没有队列绑定时是否自动删除
        return new directexchange("qjc.exchange",true,false);
    }

二 :持久化队列。

这个与交换机类似,在此不做赘述。

代码展示

@bean
    public queue simplequeue(){
        //springamqp在使用queuebuilder来创建队列的时候,默认就是持久化的
        return queuebuilder.durable("qjc.queue").build();
    }

三 :持久化消息。

这里选择delivery mode 选择2 ,1是不持久的。

代码展示

 message message = messagebuilder.withbody("hello".getbytes(standardcharsets.utf_8))
                .setdeliverymode(messagedeliverymode.persistent)
                .build();
如果不选择持久化队列,交换机,消息的话我们还有另一种方案

lazy queue(惰性队列)

惰性队列的特征如下

~接受到消息的时候直接存入磁盘而非内存(内存中只保留最近的消息)

~消费者需要消息的时候才会从磁盘中取出数据加载到内存

~支持数百万条的消息存储

在mq3.12版本后,所有的队列都是lazy queue模式,无法更改。

如果各位小伙伴的版本低于3.12那我这里提供了两种方式创建惰性队列

或用注解声明

    @rabbitlistener(queuestodeclare = @queue(
            name = "lazy.queue",
            durable = "true",
            arguments = @argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenlazyqueue(string msg){
        log.debug("接收到lazyqueue的消息" + msg);
    }

3.消费者确认机制

rabbitmq支持消费者确认机制,即:当消费者处理消息后可以向mq发送ack回执,mq收到消息后会在队列中删除该消息。

springamqp已经实现了消息确认的功能,并且允许我们通过配置文件选择ack的处理方式,有三种方式。

- none: 不处理。即消息投递给消费者后立刻ack,消息会立刻从mq删除。非常不安全,不建议使用  
- manual: 手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活  
- auto: 自动模式。springamqp利用aop对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.  
当业务出现异常时,根据异常判断返回不同结果:  
- 如果是业务异常,会自动返回nack  
- 如果是消息处理或校验异常,自动返回reject

注意我们需要再消费者的配置文件中加入参数

这就是mq保证消息不丢失的一些方式和解决方案。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com