当前位置: 代码网 > it编程>编程语言>Java > Spring Boot 整合 RocketMQ 的全流程(消息发送、消费、重试)

Spring Boot 整合 RocketMQ 的全流程(消息发送、消费、重试)

2026年01月07日 Java 我要评论
rocketmq 作为阿里开源的分布式消息中间件,凭借高吞吐量、低延迟、高可靠性等特性,被广泛应用于分布式系统的异步通信、解耦、削峰填谷等场景。spring boot 作为主流的微服务开发框架,其自动

rocketmq 作为阿里开源的分布式消息中间件,凭借高吞吐量、低延迟、高可靠性等特性,被广泛应用于分布式系统的异步通信、解耦、削峰填谷等场景。spring boot 作为主流的微服务开发框架,其自动配置机制能极大简化与第三方组件的整合过程。本文将从实战角度出发,详细讲解 spring boot 如何整合 rocketmq,覆盖普通消息发送、消息消费、消息重试等核心流程,并提供完整的代码示例。

一、环境准备

1. 基础环境依赖

  • jdk 8 及以上(rocketmq 对 jdk 版本有一定要求,推荐 8/11)
  • maven 3.6+(项目构建工具)
  • rocketmq 4.9.7(本文使用的稳定版本,可根据需求选择最新版本)
  • spring boot 2.7.15(与 rocketmq starter 适配的版本)

2. rocketmq 服务部署

首先需要搭建 rocketmq 服务环境,可选择本地单机部署集群部署,本文以本地单机为例:

  1. 从 rocketmq 官网 下载对应版本的安装包,解压到本地目录。
  2. 启动 nameserver:
    # 进入 rocketmq 解压目录的 bin 文件夹
    cd rocketmq-all-4.9.7-bin-release/bin
    # 启动 nameserver(windows 系统执行 mqnamesrv.cmd)
    nohup sh mqnamesrv &
  3. 启动 broker:
    # 启动 broker(windows 系统执行 mqbroker.cmd)
    nohup sh mqbroker -n localhost:9876 autocreatetopicenable=true &

    注:autocreatetopicenable=true 表示自动创建主题,方便测试,生产环境建议提前手动创建主题。

二、项目初始化与依赖配置

1. 创建 spring boot 项目

通过 spring initializr(https://start.spring.io/)创建一个 spring boot 项目,选择基础依赖(如 spring web),也可手动创建 maven 项目并配置 pom.xml。

2. 引入 rocketmq 依赖

在 pom.xml 中添加 rocketmq spring boot starter 依赖,注意版本适配(本文使用 2.2.3 版本,与 rocketmq 4.9.7 适配):

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.7.15</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.example</groupid>
    <artifactid>spring-boot-rocketmq-demo</artifactid>
    <version>0.0.1-snapshot</version>
    <name>spring-boot-rocketmq-demo</name>
    <description>spring boot rocketmq 整合示例</description>
    <properties>
        <java.version>1.8</java.version>
        <rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version>
    </properties>
    <dependencies>
        <!-- spring web 依赖,用于提供接口测试 -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <!-- rocketmq spring boot starter -->
        <dependency>
            <groupid>org.apache.rocketmq</groupid>
            <artifactid>rocketmq-spring-boot-starter</artifactid>
            <version>${rocketmq-spring-boot-starter.version}</version>
        </dependency>
        <!-- 测试依赖 -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>
</project>

3. 配置 rocketmq 连接信息

在 application.yml(或 application.properties)中配置 rocketmq 的 nameserver 地址、生产者组等信息:

spring:
  application:
    name: spring-boot-rocketmq-demo
# rocketmq 配置
rocketmq:
  # nameserver 地址,多个地址用分号分隔
  name-server: localhost:9876
  # 生产者配置
  producer:
    # 生产者组名,必须唯一
    group: demo-producer-group
    # 发送消息的超时时间,默认 3000ms
    send-message-timeout: 3000
    # 消息体最大长度,默认 4mb
    max-message-size: 4194304
    # 压缩消息的阈值,默认 4kb
    compress-message-body-threshold: 4096
    # 重试次数,默认 2 次
    retry-times-when-send-failed: 2
    # 异步发送失败时是否重试其他 broker,默认 false
    retry-next-server: false

三、消息发送:普通消息、同步 / 异步 / 单向发送

rocketmq 支持同步发送异步发送单向发送三种消息发送方式,适用于不同的业务场景:

  • 同步发送:发送后等待 broker 响应,可靠性最高,适用于重要消息(如订单创建)。
  • 异步发送:发送后不阻塞,通过回调函数处理响应,适用于需要高吞吐量且允许少量延迟的场景。
  • 单向发送:发送后不等待响应,性能最高,可靠性最低,适用于日志收集等不重要消息。

1. 封装消息发送工具类

创建 rocketmqproducerservice 类,注入 rocketmqtemplate(由 rocketmq starter 提供的模板类,简化消息发送),实现三种发送方式:

package com.example.demo.service;
import org.apache.rocketmq.client.producer.sendcallback;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.apache.rocketmq.spring.support.rocketmqheaders;
import org.springframework.messaging.message;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.stereotype.service;
import javax.annotation.resource;
/**
 * rocketmq 生产者服务
 */
@service
public class rocketmqproducerservice {
    // 注入 rocketmq 模板类
    @resource
    private rocketmqtemplate rocketmqtemplate;
    /**
     * 同步发送消息
     * @param topic 主题名(可携带标签,格式:topic:tag)
     * @param message 消息内容
     * @return 发送结果
     */
    public sendresult sendsyncmessage(string topic, string message) {
        // 构建消息(可添加消息头,如自定义 key)
        message<string> msg = messagebuilder.withpayload(message)
                .setheader(rocketmqheaders.keys, "sync-key-" + system.currenttimemillis())
                .build();
        // 同步发送
        return rocketmqtemplate.syncsend(topic, msg);
    }
    /**
     * 异步发送消息
     * @param topic 主题名
     * @param message 消息内容
     */
    public void sendasyncmessage(string topic, string message) {
        message<string> msg = messagebuilder.withpayload(message)
                .setheader(rocketmqheaders.keys, "async-key-" + system.currenttimemillis())
                .build();
        // 异步发送,通过 sendcallback 处理回调
        rocketmqtemplate.asyncsend(topic, msg, new sendcallback() {
            @override
            public void onsuccess(sendresult sendresult) {
                // 发送成功处理
                system.out.println("异步发送消息成功:" + sendresult);
            }
            @override
            public void onexception(throwable e) {
                // 发送失败处理
                system.err.println("异步发送消息失败:" + e.getmessage());
            }
        });
    }
    /**
     * 单向发送消息(不关心发送结果)
     * @param topic 主题名
     * @param message 消息内容
     */
    public void sendonewaymessage(string topic, string message) {
        message<string> msg = messagebuilder.withpayload(message)
                .setheader(rocketmqheaders.keys, "oneway-key-" + system.currenttimemillis())
                .build();
        // 单向发送
        rocketmqtemplate.sendoneway(topic, msg);
    }
}

2. 编写测试接口

创建 messagesendcontroller 控制器,提供 http 接口测试消息发送:

package com.example.demo.controller;
import org.apache.rocketmq.client.producer.sendresult;
import com.example.demo.service.rocketmqproducerservice;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestparam;
import org.springframework.web.bind.annotation.restcontroller;
import javax.annotation.resource;
/**
 * 消息发送测试控制器
 */
@restcontroller
@requestmapping("/message")
public class messagesendcontroller {
    @resource
    private rocketmqproducerservice rocketmqproducerservice;
    // 测试同步发送
    @getmapping("/sync/send")
    public string sendsyncmessage(@requestparam string msg) {
        // 主题名:demo-topic,标签:demo-tag(标签用于消息过滤)
        string topic = "demo-topic:demo-tag";
        sendresult sendresult = rocketmqproducerservice.sendsyncmessage(topic, msg);
        return "同步发送消息成功:" + sendresult;
    }
    // 测试异步发送
    @getmapping("/async/send")
    public string sendasyncmessage(@requestparam string msg) {
        string topic = "demo-topic:demo-tag";
        rocketmqproducerservice.sendasyncmessage(topic, msg);
        return "异步发送消息请求已提交";
    }
    // 测试单向发送
    @getmapping("/oneway/send")
    public string sendonewaymessage(@requestparam string msg) {
        string topic = "demo-topic:demo-tag";
        rocketmqproducerservice.sendonewaymessage(topic, msg);
        return "单向发送消息完成";
    }
}

四、消息消费:消费者配置与消息监听

rocketmq 的消费端通过消息监听器监听指定主题的消息,spring boot 整合后可通过注解 @rocketmqmessagelistener 快速实现消费逻辑。

1. 编写消息消费者

创建 rocketmqconsumerservice 类,实现 rocketmqlistener 接口,通过注解配置消费者组、监听的主题和标签:

package com.example.demo.consumer;
import org.apache.rocketmq.spring.annotation.consumemode;
import org.apache.rocketmq.spring.annotation.messagemodel;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.component;
/**
 * rocketmq 消费者服务
 * 注解说明:
 * - consumergroup:消费者组名,必须唯一
 * - topic:监听的主题名
 * - selectorexpression:标签表达式,* 表示所有标签,也可指定具体标签(如 demo-tag)
 * - messagemodel:消息模式,clustering(集群模式)/broadcasting(广播模式),默认集群模式
 * - consumemode:消费模式,concurrently(并发消费)/orderly(顺序消费),默认并发消费
 */
@component
@rocketmqmessagelistener(
        consumergroup = "demo-consumer-group",
        topic = "demo-topic",
        selectorexpression = "*",
        messagemodel = messagemodel.clustering,
        consumemode = consumemode.concurrently
)
public class rocketmqconsumerservice implements rocketmqlistener<string> {
    /**
     * 消息消费逻辑
     * @param message 消息内容
     */
    @override
    public void onmessage(string message) {
        system.out.println("接收到消息:" + message);
        // 模拟业务处理
        // 注意:消费端抛出异常会触发消息重试
        // handlebusiness(message);
    }
    /**
     * 模拟业务处理
     * @param message 消息内容
     */
    private void handlebusiness(string message) {
        // 业务逻辑代码
    }
}

2. 消费者核心配置说明

  • 消费者组(consumergroup):必须唯一,同一组的消费者共同消费主题的消息(集群模式下)。
  • 消息模式(messagemodel)
    • clustering(集群模式):同组消费者分摊消费消息,一条消息仅被一个消费者消费,默认模式。
    • broadcasting(广播模式):同组消费者都会消费同一条消息,适用于通知类消息(如配置更新)。
  • 消费模式(consumemode)
    • concurrently(并发消费):多线程并发消费,消费速度快,默认模式。
    • orderly(顺序消费):单线程消费,保证消息按顺序消费,适用于有序消息场景(如订单状态变更)。
  • 标签表达式(selectorexpression):用于过滤消息,支持 *(所有标签)、tag1 || tag2(多个标签)等表达式。

五、消息重试:消费失败后的重试机制

在实际业务中,消息消费可能因网络异常、业务处理失败等原因失败,rocketmq 提供了消费重试机制,确保消息被成功消费。

1. 重试机制原理

  • 默认重试:当消费者消费消息时抛出异常,rocketmq 会将消息重新放入队列,等待重试消费,默认重试次数为 16 次(每次重试的间隔时间逐渐增加:1s、5s、10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min)。
  • 死信队列:当消息重试 16 次后仍消费失败,会被发送到死信队列(dlq),死信队列的命名规则为:%dlq%+消费者组名,可通过消费死信队列的消息进行人工处理。

2. 自定义重试配置与异常处理

(1)配置消费重试次数

在 application.yml 中添加消费者的重试配置(也可通过注解属性配置):

# 消费者重试配置(可在注解中覆盖)
rocketmq:
  consumer:
    # 消费线程数
    consume-thread-max: 20
    # 批量消费的最大消息数
    consume-message-batch-max-size: 1
    # 消费超时时间
    consume-timeout: 15

(2)手动控制重试:返回消费状态

上述示例中,消费者实现的是 rocketmqlistener 接口,无法手动控制消费状态,若需要自定义重试逻辑,可实现 rocketmqpushconsumerlistener 接口(或使用 messagelistenerconcurrently),返回 consumeconcurrentlystatus 枚举:

package com.example.demo.consumer;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import org.apache.rocketmq.client.exception.mqclientexception;
import org.apache.rocketmq.common.message.messageext;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.apache.rocketmq.spring.core.rocketmqpushconsumerlifecyclelistener;
import org.springframework.stereotype.component;
import java.util.list;
/**
 * 自定义重试逻辑的消费者
 */
@component
@rocketmqmessagelistener(
        consumergroup = "demo-consumer-group-retry",
        topic = "demo-topic",
        selectorexpression = "*"
)
public class retryrocketmqconsumerservice implements rocketmqlistener<messageext>, rocketmqpushconsumerlifecyclelistener {
    @override
    public void onmessage(messageext messageext) {
        string message = new string(messageext.getbody());
        system.out.println("接收到消息(带重试逻辑):" + message + ",重试次数:" + messageext.getreconsumetimes());
        try {
            // 模拟业务处理失败
            int a = 1 / 0;
            // 业务处理成功,无需重试
        } catch (exception e) {
            system.err.println("消息消费失败:" + e.getmessage());
            // 若重试次数超过 3 次,直接返回成功(不再重试),否则抛出异常触发重试
            if (messageext.getreconsumetimes() >= 3) {
                system.out.println("消息重试次数已达上限,不再重试");
                // 可将消息记录到数据库,后续人工处理
                return;
            }
            // 抛出异常,触发重试
            throw new runtimeexception("消费失败,触发重试");
        }
    }
    @override
    public void preparestart(org.apache.rocketmq.client.consumer.defaultmqpushconsumer consumer) {
        // 可自定义消费者配置,如设置重试次数
        try {
            // 设置消费线程数
            consumer.setconsumethreadmax(20);
            // 设置消息监听器(若需要更细粒度的控制)
            consumer.registermessagelistener((messagelistenerconcurrently) (msgs, context) -> {
                for (messageext msg : msgs) {
                    onmessage(msg);
                }
                return consumeconcurrentlystatus.consume_success;
            });
        } catch (mqclientexception e) {
            throw new runtimeexception(e);
        }
    }
}

3. 死信队列处理

当消息进入死信队列后,可创建专门的消费者监听死信队列,进行人工干预处理:

package com.example.demo.consumer;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.component;
/**
 * 死信队列消费者
 * 死信队列名称:%dlq% + 原消费者组名
 */
@component
@rocketmqmessagelistener(
        consumergroup = "dlq-consumer-group",
        topic = "%dlq%demo-consumer-group",
        selectorexpression = "*"
)
public class dlqrocketmqconsumerservice implements rocketmqlistener<string> {
    @override
    public void onmessage(string message) {
        system.out.println("接收到死信队列消息:" + message);
        // 人工处理逻辑,如记录日志、通知运维、手动重试等
    }
}

六、测试验证

1. 启动项目

运行 spring boot 项目的主类,确保 rocketmq nameserver 和 broker 已启动。

2. 发送消息测试

通过 http 接口发送消息,例如:

  • 同步发送:http://localhost:8080/message/sync/send?msg=hello rocketmq sync
  • 异步发送:http://localhost:8080/message/async/send?msg=hello rocketmq async
  • 单向发送:http://localhost:8080/message/oneway/send?msg=hello rocketmq oneway

3. 验证消费与重试

观察控制台输出,可看到消费者成功接收消息;若在消费端模拟业务异常(如除以 0),可看到消息重试的日志,重试次数达到上限后进入死信队列。

七、生产环境注意事项

  1. 主题与消费者组规划:提前手动创建主题(关闭自动创建),消费者组名需唯一且有明确的业务含义。
  2. 消息重试配置:根据业务场景调整重试次数和间隔,避免无效重试占用资源。
  3. 死信队列处理:建立死信队列的监控和处理机制,防止消息丢失。
  4. 消息幂等性:由于消息重试,消费端需保证幂等性(如通过消息 key 去重)。
  5. 监控与告警:接入 rocketmq 监控平台(如 rocketmq dashboard),监控消息发送 / 消费情况,设置告警机制。
  6. 集群部署:生产环境中 rocketmq 需采用集群部署,保证高可用。

八、总结

本文详细介绍了 spring boot 整合 rocketmq 的全流程,包括环境搭建、依赖配置、消息发送(同步 / 异步 / 单向)、消息消费(集群 / 广播、并发 / 顺序)、消息重试与死信队列处理,并提供了完整的代码示例。通过本文的实战内容,你可以快速掌握 rocketmq 在 spring boot 项目中的核心使用方式,并根据实际业务场景进行扩展和优化。rocketmq 的功能远不止于此,后续还可深入学习顺序消息、事务消息、延迟消息等高级特性,进一步满足复杂的业务需求。

到此这篇关于spring boot 整合 rocketmq 的全流程(消息发送、消费、重试)的文章就介绍到这了,更多相关spring boot 整合 rocketmq内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com