当前位置: 代码网 > it编程>编程语言>Java > RabbitMQ保证消息的可靠性问题及解决

RabbitMQ保证消息的可靠性问题及解决

2026年04月29日 Java 我要评论
消息可靠性问题在消息队列,任何一个环节出问题都会导致消息的不可靠(消息丢失),如何确保消息的可靠性呢,需要考虑到其中的每个角色,生产者可靠性、mq可靠性、消费者可靠性。生产者可靠性生产者重试首先第一种

消息可靠性问题

在消息队列,任何一个环节出问题都会导致消息的不可靠(消息丢失),如何确保消息的可靠性呢,需要考虑到其中的每个角色,生产者可靠性、mq可靠性、消费者可靠性。

生产者可靠性

生产者重试

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与mq的连接中断。

为了解决这个问题,springamqp提供的消息发送时的重试机制。即:当rabbittemplate与mq连接超时后,多次重试。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置mq的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
        max-attempts: 3 # 总共尝试次数

耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行补偿错误。如使用定时任务去扫描这个表,重新发送消息

生产者确认

1.publisher return

消息投递成功但路由失败会调用publisher return回调方法返回异常信息。

2.publisher confirm

消息投递成功返回ack,投递失败返回nack。

注意:消息投递成功但可能路由失败了,此时会通过publisher confirm返回ack,通过publisher return回调方法返回异常信息。

默认两种机制都是关闭状态,需要通过配置文件来开启。

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

mq可靠性

为了提升性能,默认情况下mq的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

在配置的时候默认都会持久化

消费者可靠性

消费者确认机制

为了确认消费者是否成功处理消息,rabbitmq提供了消费者确认机制(consumer acknowledgement)。

即:当消费者处理消息结束后,应该向rabbitmq发送一个回执,告知rabbitmq消息处理状态。

回执有三种可选值:

  • ack:成功处理消息,rabbitmq从队列中删除该消息
  • nack:消息处理失败,rabbitmq需要再次投递消息
  • reject:消息处理失败并拒绝该消息,rabbitmq从队列中删除该消息

springamqp帮我们实现了消息确认,并可以通过配置文件设置消息确认的处理方式,有三种模式:

none:不处理。即消息投递给消费者后消息会立刻从mq删除。非常不安全,不建议使用

manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

auto:自动模式。当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

  • 如果是业务异常,会自动返回nack
  • 如果是消息处理或校验异常,自动返回reject,返回的异常包括:messageconversionexception、methodargumenttypemismatchexception等

通过下面的配置可以修改消息确认的处理方式为auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

auto模式就是平常的写法

manual模式需要手写

  • message:是spring amqp封装的底层消息对象。
  • channel:是消费端与mq基于通道的操作对象。
    @rabbitlistener(queues = "simple.queue")
    public void listensimplequeuemessage(string msg, channel channel, message message) throws interruptedexception, ioexception {
        log.info("spring 消费者接收到消息:【" + msg + "】");
        //返回nack
        //每个参数的意义:1.消息的标记 2.是否确认之前所有未确认的消息 3.是否重新入队
        channel.basicnack(message.getmessageproperties().getdeliverytag(), false, true);
//        log.info("消息处理完成");
//        //返回ack,每个参数的意义:1.消息的标记 2.是否确认之前所有消息
//        channel.basicack(message.getmessageproperties().getdeliverytag(), false);
    }

失败重试机制

本地重试

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次返回到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息投递就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

为了应对上述情况spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的投递到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
          max-attempts: 3 # 最大重试次数
  • 开启本地重试时,消息处理过程中抛出异常,不会请求到队列,而是在消费者本地重试
  • 重试达到最大次数后,spring会返回reject,消息会被丢弃

失败消息入队

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由messagerecovery接口来定义的,它有3个不同实现:

  • rejectanddontrequeuerecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • immediaterequeuemessagerecoverer:重试耗尽后,返回nack,消息重新入队
  • republishmessagerecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是republishmessagerecoverer,失败后将消息投递到一个固定交换机,通过交换机将消息转发到失败消息队列,程序监听失败消息队列,接收到失败消息,将失败消息存入失败消息表,通过定时任务进行处理。

//在consumer服务中定义处理失败消息的交换机和队列
@bean
public directexchange errormessageexchange(){
    return new directexchange("error.direct");
}
@bean
public queue errorqueue(){
    return new queue("error.queue", true);
}
@bean
public binding errorbinding(queue errorqueue, directexchange errormessageexchange){
    return bindingbuilder.bind(errorqueue).to(errormessageexchange).with("error");
}
//定义一个republishmessagerecoverer,指定失败消息投递交换机的名称及routingkey
@bean
public messagerecoverer republishmessagerecoverer(rabbittemplate rabbittemplate){
    return new republishmessagerecoverer(rabbittemplate, "error.direct", "error");
}

监听失败消息队列将失败消息写入数据库中,由人工定期处理

业务幂等性

幂等性:在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。

在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。

例如:

  • 根据id删除数据
  • 查询数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行,然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况。

例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • mq消息的重复投递

因此,我们必须想办法保证消息处理的幂等性。

这里给出两种方案:

  • 唯一消息id
  • 业务状态判断

唯一消息id思路非常简单:

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息id保存到数据库或redis
  • 如果下次又收到相同消息,去数据库或redis查询判断是否存在,存在则为重复消息放弃处理。

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求,不同的业务场景判断的思路也不一样。

例如在支付通知案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行更新时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,使用唯一消息id的方案需要操作数据库或redis保存消息id,所以更推荐使用业务判断的方案。

1.创建交换机,队列,消息进行持久化

2.生产者:

  • 开启消息发送失败的重试策略,设置重试次数和间隔比例,耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行错误补偿.如使用定时任务去扫描这个表,重新发送消息
  • 开启confirm机制,保证消息正确到达交换机,到达返回ack,没有到达返回nack,写入数据库,后期重试
  • 开启return机制,保证消息正确到达队列,没有到达调用returncallback,写入数据库,后期重试

3.消费者:

  • 开启手动ack,让消费者消费成功后,手动提交.使用redis来记录消费失败的次数,如果到达阈值,则记录到数据库,后期使用人工干预
  • 自动ack + 重试耗尽的失败策略,定义错误交换机队列,后期通过人工进行干预

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com