当前位置: 代码网 > it编程>编程语言>Java > SpringBoot4.0整合RabbitMQ死信队列详解

SpringBoot4.0整合RabbitMQ死信队列详解

2026年02月12日 Java 我要评论
为啥那么讲解死信队列,因为好多人不会使用,不知道什么场景下使用,此案例是我在公司实现的一种方式,让大家都可以学习到一、死信队列的好处1.提高系统可靠性避免消息丢失,确保处理失败的消息有备份防止因消息处

为啥那么讲解死信队列,因为好多人不会使用,不知道什么场景下使用,此案例是我在公司实现的一种方式,让大家都可以学习到

一、死信队列的好处

1.提高系统可靠性

  • 避免消息丢失,确保处理失败的消息有备份
  • 防止因消息处理异常导致的消息无限重试

2.异常消息管理

  • 将异常消息与正常消息分离
  • 便于监控和排查问题消息

3.灵活的重试机制

  • 支持延迟重试
  • 可设置不同的重试策略

4.系统解耦

  • 业务逻辑与异常处理逻辑分离
  • 提高代码的可维护性

二、注解式配置说明

1.主配置注解

@configuration
public class rabbitmqconfig {
    
    // 主队列
    @bean
    public queue orderqueue() {
        return queuebuilder.durable("order.queue")
            .deadletterexchange("dlx.exchange")  // 死信交换器
            .deadletterroutingkey("dlx.routing.key")  // 死信路由键
            .ttl(10000)  // 消息10秒未消费进入死信
            .maxlength(1000)  // 队列最大长度
            .build();
    }
    
    // 死信队列
    @bean
    public queue deadletterqueue() {
        return queuebuilder.durable("dl.queue")
            .build();
    }
    
    // 死信交换器
    @bean
    public directexchange deadletterexchange() {
        return new directexchange("dlx.exchange");
    }
    
    // 绑定死信交换器和队列
    @bean
    public binding deadletterbinding() {
        return bindingbuilder.bind(deadletterqueue())
            .to(deadletterexchange())
            .with("dlx.routing.key");
    }
}

2.监听器注解

@component
public class ordermessagelistener {
    
    // 监听正常队列
    @rabbitlistener(queues = "order.queue")
    public void processordermessage(orderdto order, 
                                   channel channel, 
                                   @header(amqpheaders.delivery_tag) long tag) {
        try {
            // 业务处理逻辑
            if (processorder(order)) {
                // 手动确认
                channel.basicack(tag, false);
            } else {
                // 拒绝消息,进入死信队列
                channel.basicnack(tag, false, false);
            }
        } catch (exception e) {
            // 异常时拒绝
            channel.basicnack(tag, false, false);
        }
    }
    
    // 监听死信队列
    @rabbitlistener(queues = "dl.queue")
    public void processdeadletter(orderdto order) {
        log.error("收到死信消息: {}", order);
        // 死信消息处理逻辑
        handledeadletter(order);
    }
}

三、详细整合步骤

1.添加依赖

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

2.配置属性

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 开启消息返回机制
    publisher-returns: true
    # 开启确认机制
    publisher-confirm-type: correlated
    listener:
      simple:
        # 手动确认
        acknowledge-mode: manual
        # 重试配置
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000

3.完整配置类

@configuration
@slf4j
public class rabbitmqfullconfig {
    
    // ========== 正常业务队列配置 ==========
    @bean
    public directexchange orderexchange() {
        return new directexchange("order.exchange", true, false);
    }
    
    @bean
    public queue orderqueue() {
        map<string, object> args = new hashmap<>();
        // 死信交换器
        args.put("x-dead-letter-exchange", "order.dlx.exchange");
        // 死信路由键
        args.put("x-dead-letter-routing-key", "order.dlx.key");
        // 消息ttl(毫秒)
        args.put("x-message-ttl", 30000);
        // 队列最大长度
        args.put("x-max-length", 10000);
        return queuebuilder.durable("order.queue")
            .witharguments(args)
            .build();
    }
    
    @bean
    public binding orderbinding() {
        return bindingbuilder.bind(orderqueue())
            .to(orderexchange())
            .with("order.key");
    }
    
    // ========== 死信队列配置 ==========
    @bean
    public directexchange deadletterexchange() {
        return new directexchange("order.dlx.exchange", true, false);
    }
    
    @bean
    public queue deadletterqueue() {
        return queuebuilder.durable("order.dl.queue")
            .build();
    }
    
    @bean
    public binding deadletterbinding() {
        return bindingbuilder.bind(deadletterqueue())
            .to(deadletterexchange())
            .with("order.dlx.key");
    }
    
    // ========== 重试队列(延时队列替代方案)==========
    @bean
    public customexchange delayexchange() {
        map<string, object> args = new hashmap<>();
        args.put("x-delayed-type", "direct");
        return new customexchange("delay.exchange", 
            "x-delayed-message", true, false, args);
    }
    
    @bean
    public queue delayqueue() {
        return queuebuilder.durable("delay.queue")
            .build();
    }
    
    @bean
    public binding delaybinding() {
        return bindingbuilder.bind(delayqueue())
            .to(delayexchange())
            .with("delay.key")
            .noargs();
    }
}

4.消息生产者

@component
@slf4j
public class messageproducer {
    
    @autowired
    private rabbittemplate rabbittemplate;
    
    // 发送普通消息
    public void sendordermessage(orderdto order) {
        correlationdata correlationdata = new correlationdata(order.getid());
        
        rabbittemplate.convertandsend(
            "order.exchange",
            "order.key",
            order,
            message -> {
                // 设置消息属性
                message.getmessageproperties()
                    .setexpiration("30000")  // 消息ttl
                    .setdeliverymode(messagedeliverymode.persistent);
                return message;
            },
            correlationdata
        );
        
        // 确认回调
        correlationdata.getfuture().addcallback(
            result -> {
                if (result.isack()) {
                    log.info("消息发送成功: {}", order.getid());
                }
            },
            ex -> log.error("消息发送失败: {}", ex.getmessage())
        );
    }
    
    // 发送延迟消息
    public void senddelaymessage(orderdto order, int delaytime) {
        rabbittemplate.convertandsend(
            "delay.exchange",
            "delay.key",
            order,
            message -> {
                message.getmessageproperties()
                    .setheader("x-delay", delaytime);
                return message;
            }
        );
    }
}

5.消息消费者(完整版)

@component
@slf4j
public class ordermessageconsumer {
    
    private static final int max_retry_count = 3;
    
    @autowired
    private messageproducer messageproducer;
    
    /**
     * 监听订单队列
     */
    @rabbitlistener(queues = "order.queue")
    public void handleordermessage(
            @payload orderdto order,
            @headers map<string, object> headers,
            channel channel,
            @header(amqpheaders.delivery_tag) long deliverytag) {
        
        try {
            log.info("收到订单消息: {}", order);
            
            // 模拟业务处理
            boolean success = processorderbusiness(order);
            
            if (success) {
                // 业务成功,确认消息
                channel.basicack(deliverytag, false);
                log.info("订单处理成功: {}", order.getid());
            } else {
                // 获取重试次数
                integer retrycount = (integer) headers.get("x-retry-count");
                retrycount = (retrycount == null) ? 1 : retrycount + 1;
                
                if (retrycount <= max_retry_count) {
                    // 重试次数未超限,重新入队
                    log.warn("订单处理失败,第{}次重试: {}", retrycount, order.getid());
                    
                    // 设置重试计数
                    headers.put("x-retry-count", retrycount);
                    
                    // 延迟重试
                    messageproducer.senddelaymessage(order, 5000);
                    
                    // 确认消息,避免重新投递
                    channel.basicack(deliverytag, false);
                } else {
                    // 超过重试次数,进入死信队列
                    log.error("订单处理失败次数超过上限,进入死信队列: {}", order.getid());
                    channel.basicnack(deliverytag, false, false);
                }
            }
        } catch (exception e) {
            log.error("处理订单消息异常: {}", e.getmessage());
            try {
                // 拒绝消息,进入死信队列
                channel.basicnack(deliverytag, false, false);
            } catch (ioexception ex) {
                log.error("拒绝消息失败: {}", ex.getmessage());
            }
        }
    }
    
    /**
     * 监听死信队列
     */
    @rabbitlistener(queues = "order.dl.queue")
    public void handledeadlettermessage(
            @payload orderdto order,
            @headers map<string, object> headers) {
        
        log.error("收到死信消息: {}", order);
        
        // 记录死信消息
        logdeadletter(order, headers);
        
        // 发送告警
        sendalert(order);
        
        // 人工处理或其他补偿措施
        manualprocess(order);
    }
    
    /**
     * 监听延迟队列
     */
    @rabbitlistener(queues = "delay.queue")
    public void handledelaymessage(@payload orderdto order) {
        log.info("收到延迟消息,开始重试: {}", order);
        
        // 重新发送到订单队列
        messageproducer.sendordermessage(order);
    }
    
    private boolean processorderbusiness(orderdto order) {
        // 业务处理逻辑
        // 返回true表示成功,false表示失败
        return new random().nextboolean();
    }
    
    private void logdeadletter(orderdto order, map<string, object> headers) {
        // 记录死信日志
        log.info("记录死信: {}, headers: {}", order, headers);
    }
    
    private void sendalert(orderdto order) {
        // 发送告警通知
        log.warn("发送告警: 订单{}处理失败", order.getid());
    }
    
    private void manualprocess(orderdto order) {
        // 人工处理逻辑
        log.info("等待人工处理订单: {}", order.getid());
    }
}

四、使用场景

1.订单超时取消

// 订单创建时发送延迟消息
public void createorder(orderdto order) {
    // 保存订单
    orderservice.save(order);
    
    // 发送30分钟过期的消息
    rabbittemplate.convertandsend(
        "order.exchange",
        "order.key",
        order,
        message -> {
            message.getmessageproperties()
                .setexpiration("1800000");  // 30分钟
            return message;
        }
    );
}

2.支付回调重试

// 支付回调失败时进入死信队列,人工处理
@rabbitlistener(queues = "payment.callback.queue")
public void handlepaymentcallback(paymentdto payment) {
    if (!paymentservice.processcallback(payment)) {
        throw new runtimeexception("支付回调处理失败");
    }
}

3.库存锁定与释放

// 库存锁定15分钟后自动释放
public void lockinventory(string orderid) {
    inventoryservice.lock(orderid);
    
    // 发送15分钟后到期的消息
    rabbittemplate.convertandsend(
        "inventory.exchange",
        "inventory.lock.key",
        orderid,
        message -> {
            message.getmessageproperties()
                .setexpiration("900000");  // 15分钟
            return message;
        }
    );
}

4.消息重试机制

// 分级重试策略
public class retrystrategy {
    // 第一次重试:5秒后
    // 第二次重试:30秒后
    // 第三次重试:5分钟后
    // 超过3次进入死信队列
}

五、优点总结

  1. 可靠性:确保消息不丢失,即使处理失败也有备份
  2. 灵活性:支持多种死信策略(超时、长度限制、拒绝等)
  3. 可维护性:异常处理与正常业务逻辑分离
  4. 监控性:死信队列便于监控和统计异常消息
  5. 可扩展性:支持多种重试和补偿机制

六、最佳实践建议

  1. 合理设置ttl:根据业务需求设置合适的过期时间
  2. 监控死信队列:设置告警,及时处理死信消息
  3. 限制队列大小:防止消息积压
  4. 记录详细日志:便于问题排查
  5. 死信消息分析:定期分析死信原因,优化系统

通过spring boot整合rabbitmq死信队列,可以构建更加健壮、可靠的消息驱动系统,有效处理各种异常场景,提高系统的整体稳定性。

到此这篇关于springboot4.0整合rabbitmq死信队列详解的文章就介绍到这了,更多相关springboot rabbitmq死信队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com