当前位置: 代码网 > it编程>编程语言>Java > Spring Boot 整合 RabbitMQ从入门到实战步骤

Spring Boot 整合 RabbitMQ从入门到实战步骤

2026年01月05日 Java 我要评论
在分布式系统开发中,消息队列是解耦服务、提升可靠性的关键组件。rabbitmq 作为业界广泛使用的消息中间件,与 spring boot 的整合能显著提升开发效率。本文将手把手教你完成 spring

在分布式系统开发中,消息队列是解耦服务、提升可靠性的关键组件。rabbitmq 作为业界广泛使用的消息中间件,与 spring boot 的整合能显著提升开发效率。本文将手把手教你完成 spring boot 与 rabbitmq 的整合,涵盖基础配置、消息发送与接收,以及高级特性如手动确认和消息确认机制。

1.环境准备

确保已安装 jdk 1.8+、maven 3.0+ 和 rabbitmq 服务。rabbitmq 可通过 docker 快速启动:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

2.创建 spring boot 项目

使用 spring initializr 创建项目,添加以下依赖:

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

3.配置 rabbitmq 连接

src/main/resources/application.yml中配置连接参数:

spring:
  rabbitmq:
    host: localhost       # rabbitmq 服务器地址
    port: 5672           # 默认端口
    username: guest      # 默认用户名
    password: guest      # 默认密码
    virtual-host: /      # 虚拟主机(默认)
    publisher-confirm-type: correlated  # 开启消息确认
    publisher-returns: true             # 开启消息失败回调
    listener:
      simple:
        acknowledge-mode: manual       # 手动确认消息
        prefetch: 1                    # 每次预取的消息数

4.声明队列和交换机

创建配置类定义队列、交换机及绑定关系:

@configuration
public class rabbitconfig {
    // 直连交换机
    public static final string direct_exchange = "direct_exchange";
    // 队列
    public static final string direct_queue = "direct_queue";
    // 路由键
    public static final string routing_key = "routing_key";
    @bean
    public exchange directexchange() {
        return exchangebuilder.directexchange(direct_exchange).build();
    }
    @bean
    public queue directqueue() {
        return new queue(direct_queue, true); // durable=true 表示持久化
    }
    @bean
    public binding binding(queue queue, exchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(routing_key);
    }
}

5.生产者(发送消息)

创建 rabbitproducer.java 注入 rabbittemplate 发送消息:

@service
public class rabbitproducer {
    @autowired
    private rabbittemplate rabbittemplate;
    public void sendmessage(string message) {
        rabbittemplate.convertandsend(
            rabbitconfig.direct_exchange, 
            rabbitconfig.routing_key, 
            message
        );
    }
}

6.消费者(接收消息)

创建 rabbitconsumer.java 监听队列:

@service
public class rabbitconsumer {
    @rabbitlistener(queues = rabbitconfig.direct_queue)
    public void receivemessage(string message) {
        system.out.println("收到消息: " + message);
        // 手动确认消息(需配置 acknowledge-mode: manual)
        // 实际项目中需处理异常和重试逻辑
    }
}

7.消息确认机制

生产者确认 

在 application.yml 中已配置 publisher-confirm-type: correlated,可通过回调确认消息是否成功到达交换机:

@bean
public rabbittemplate rabbittemplate(connectionfactory connectionfactory) {
    rabbittemplate template = new rabbittemplate(connectionfactory);
    template.setconfirmcallback((correlationdata, ack, cause) -> {
        if (ack) {
            system.out.println("消息确认成功");
        } else {
            system.err.println("消息确认失败: " + cause);
        }
    });
    template.setmandatory(true); // 开启强制回调
    return template;
}

消费者确认:配置 acknowledge-mode: manual 后,需在消费者中手动确认:

@rabbitlistener(queues = rabbitconfig.direct_queue)
public void receivemessage(string message, channel channel, @header(amqpheaders.delivery_tag) long deliverytag) {
    try {
        system.out.println("处理消息: " + message);
        channel.basicack(deliverytag, false); // 确认消息
    } catch (exception e) {
        channel.basicnack(deliverytag, false, true); // 拒绝消息并重新入队
    }
}

8.死信队列配置

处理无法被消费的消息,创建死信队列并绑定:

@bean
public queue deadletterqueue() {
    map<string, object> args = new hashmap<>();
    args.put("x-dead-letter-exchange", "dlx_exchange"); // 指定死信交换机
    args.put("x-dead-letter-routing-key", "dlx_routing_key");
    return new queue("dlx_queue", true, false, false, args);
}

9.测试与验证

确保 rabbitmq 服务已启动(默认端口 15672 为管理界面)。

创建 rabbitmqapplicationtests.java

@springboottest
@autoconfiguremockmvc
public class rabbitmqapplicationtests {
    @autowired
    private rabbitproducer producer;
    @test
    public void testsendmessage() {
        producer.sendmessage("hello rabbitmq!");
    }
}

10.常见问题与解决方案

连接失败‌:检查 host 和 port 是否正确,确保 rabbitmq 服务已启动。
消息未消费‌:确认消费者监听的是正确的队列,检查 acknowledge-mode 配置。
性能调优‌:调整 prefetch-count 和 concurrency 参数优化吞吐量。

11.总结

本文从零开始实现了 spring boot 与 rabbitmq 的整合,涵盖基础配置、消息发送与接收、高级特性(如手动确认和死信队列)。通过合理使用这些特性,可以构建高可靠、高性能的分布式系统。如需进一步学习,可参考 rabbitmq 官方文档。

到此这篇关于spring boot 整合 rabbitmq从入门到实战步骤的文章就介绍到这了,更多相关spring boot 整合 rabbitmq内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com