当前位置: 代码网 > it编程>编程语言>Java > SpringBoot整合RocketMQ实现发送同步消息

SpringBoot整合RocketMQ实现发送同步消息

2024年05月18日 Java 我要评论
一、简介rocketmq 是一款开源的分布式消息中间件,由阿里巴巴开源。由阿里巴巴集团开发并开源,目前被捐赠给apache基金会,并入选孵化器项目,2017年从apache基金会毕业后,rocketm

一、简介

rocketmq 是一款开源的分布式消息中间件,由阿里巴巴开源。由阿里巴巴集团开发并开源,目前被捐赠给apache基金会,并入选孵化器项目,2017年从apache基金会毕业后,rocketmq被指定为顶级项目(tlp)。它具有高可用性、高性能、低延迟等特点,广泛应用于阿里巴巴集团内部以及众多外部企业的业务系统中。

1.1、rocketmq 主要特点

  • 分布式架构:rocketmq 是基于分布式架构设计的,支持水平扩展,可以灵活地部署和扩展。
  • 高可用性:rocketmq 支持主从架构,消息存储采用主从复制的方式,保证了消息的高可用性和可靠性。
  • 高性能:rocketmq 在消息存储、消息传输等方面进行了优化,具有较高的吞吐量和较低的延迟。
  • 丰富的特性:rocketmq 提供了丰富的特性,包括顺序消息、延迟消息、事务消息、消息过滤、消息轨迹等,满足了不同场景下的需求。
  • 监控和管理:rocketmq 提供了丰富的监控和管理功能,可以通过控制台或者监控工具实时监控消息的状态和性能指标。
  • 开源社区支持:rocketmq 是一款开源的消息中间件,拥有活跃的开源社区,提供了丰富的文档和示例,为用户提供了便利的支持和帮助。

1.2、rocketmq 核心组件

rocketmq 的架构主要包括以下核心组件:

  • nameserver:nameserver 是 rocketmq 的路由管理组件,负责管理 broker 的路由信息。客户端在发送消息或者消费消息时,需要先从 nameserver 获取相应的 broker 地址,然后再与 broker 建立连接。
  • broker:broker 是 rocketmq 的消息存储和传输组件,负责存储消息以及向消费者传递消息。一个 rocketmq 系统可以包含多个 broker,每个 broker 负责存储一部分消息数据,并提供相应的消息服务。
  • producer:producer 是消息的生产者,负责产生消息并将消息发送到 broker 中。producer 将消息发送到指定的 topic,然后由 broker 存储并传递给相应的消费者。
  • consumer:consumer 是消息的消费者,负责订阅并消费 broker 中的消息。consumer 通过订阅指定的 topic 来接收消息,并进行相应的业务处理。
  • topic:topic 是消息的主题,用于对消息进行分类和管理。producer 将消息发送到指定的 topic,而 consumer 则订阅相应的 topic 来接收消息。
  • message queue:message queue 是消息队列,用于存储消息。每个 topic 可以分为多个 message queue,每个 message queue 保存了一部分消息,多个 message queue 组成了一个 topic 的完整消息存储。

总的来说,rocketmq是阿里推出的优秀开源分布式消息中间件,具有高性能、高可靠、高并发等优点,是构建分布式系统不可或缺的基础组件之一。

1.3、概念

同步发送指的是生产者在发送消息后,会阻塞当前线程,直到收到broker的发送响应后才返回,响应中包含消息是否发送成功的状态。同步发送的优缺点如下:

优点:

  • 发送可靠性高,能够立即得知发送结果
  • 发送反馈直接返回,不需要通过专门的监听通道接收结果

缺点:

  • 当broker发生故障或者网络延迟时,生产者线程会被阻塞,影响发送效率
  • 对于发送端吞吐量较高的场景,同步发送存在性能瓶颈

1.4、场景

同步发送对消息可靠性传输有较高要求的场景,如通知消息等,发送端对发送吞吐量要求不是特别高的场景

二、父工程

因为这个系统会有很多rocketmq的知识,我准备拆开写,避免频繁的导入依赖和包,我这里采用分模块开发。

2.1、父工程依赖

pom.xml

<parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>2.6.0</version>
    <relativepath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-web</artifactid>
    </dependency>

    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-test</artifactid>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupid>org.apache.rocketmq</groupid>
        <artifactid>rocketmq-spring-boot-starter</artifactid>
        <version>2.2.3</version>
    </dependency>

    <dependency>
        <groupid>org.apache.commons</groupid>
        <artifactid>commons-lang3</artifactid>
        <version>3.12.0</version>
    </dependency>

    <dependency>
        <groupid>org.projectlombok</groupid>
        <artifactid>lombok</artifactid>
        <version>1.18.26</version>
    </dependency>

</dependencies>

<dependencymanagement>
    <dependencies>
        <dependency>
            <groupid>com.alian</groupid>
            <artifactid>common-rocketmq-dto</artifactid>
            <version>1.0.0-snapshot</version>
        </dependency>
    </dependencies>
</dependencymanagement>

2.2、公共模块

这里需要注意的是下面这个依赖,其实就是一个会员类。如果你传输的不是对象也可以不要,我这里演示就创建了。

<dependency>
    <groupid>com.alian</groupid>
    <artifactid>common-rocketmq-dto</artifactid>
    <version>1.0.0-snapshot</version>
</dependency>

member.java

package com.alian.common;

import lombok.data;

import java.util.date;

@data
public class member {

    private long id;

    private string membername;

    private int age;

    private date birthday;
}

后续的项目都会用到这个父工程和公共模块,后面就不再过多说明了。

三、生产者

我们在父工程下新建一个模块用于发送同步消息。

3.1 maven依赖

pom.xml

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactid>rocketmq</artifactid>
        <groupid>com.alian</groupid>
        <version>1.0.0-snapshot</version>
    </parent>
    <modelversion>4.0.0</modelversion>

    <artifactid>01-send-sync-message</artifactid>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupid>com.alian</groupid>
            <artifactid>common-rocketmq-dto</artifactid>
            <version>1.0.0-snapshot</version>
        </dependency>
    </dependencies>
</project>

3.2 application配置

application.properties

server.port=8001

# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的生产者组
rocketmq.producer.group=sync_group
# 发送同步消息超时时间
rocketmq.producer.send-message-timeout=3000
# 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器
rocketmq.producer.retry-next-server=true
# 用于指定消息发送失败时的重试次数
rocketmq.producer.retry-times-when-send-failed=3
# 设置消息压缩的阈值,为0表示禁用消息体的压缩
rocketmq.producer.compress-message-body-threshold=0

3.3 发送字符串消息

消息的发送比较简单,我们直接引用

    @autowired
    private rocketmqtemplate rocketmqtemplate;

此对象封装了一系列的消息发送方法。

sendstrmessagetest.java

package com.alian.sync;

import lombok.extern.slf4j.slf4j;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.junit.jupiter.api.aftereach;
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.messaging.message;
import org.springframework.messaging.messageheaders;
import org.springframework.messaging.support.messagebuilder;

@slf4j
@springboottest
public class sendstrmessagetest {

    @autowired
    private rocketmqtemplate rocketmqtemplate;

    @test
    public void syncsendstringmessage() {
        string topic = "string_message_topic";
        string message = "我是一条同步文本消息:syncsendstringmessage";
        sendresult sendresult = rocketmqtemplate.syncsend(topic, message);
        log.info("同步发送返回的结果:{}", sendresult);
    }

    @test
    public void syncsendstringmessagewithbuilder() {
        string topic = "string_message_topic";
        string message = "我是一条同步的文本消息:syncsendstringmessagewithbuilder";
        message<string> msg = messagebuilder.withpayload(message)
                // 消息类型
                .setheader(messageheaders.content_type, "text/plain")
                .build();
        sendresult sendresult = rocketmqtemplate.syncsend(topic, msg);
        log.info("同步发送返回的结果:{}", sendresult);
    }

    @test
    public void syncsendstringmessagewithbuildertimeout() {
        string topic = "string_message_topic";
        string message = "我是一条同步的文本消息:syncsendstringmessagewithbuildertimeout";
        message<string> msg = messagebuilder.withpayload(message)
                // 消息类型
                .setheader(messageheaders.content_type, "text/plain")
                .build();
        // 3秒发送超时
        sendresult sendresult = rocketmqtemplate.syncsend(topic, msg, 3000);
        log.info("同步发送返回的结果:{}", sendresult);
    }

    @aftereach
    public void waiting() {
        try {
            thread.sleep(3000l);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }

}

这里需要注意的是,我发送字符串消息的时候,topic都是 string_message_topic,因为我这里是本地开发环境,并且我在配置文件中配置了

autocreatetopicenable=true

当该参数设置为true时,如果生产者在发送消息时使用了一个在broker端不存在的topic,则broker会自动创建该topic,允许消息正常发送和存储。

当该参数设置为false时,如果生产者使用了不存在的topic,则broker会直接拒绝发送请求,不会自动创建topic。

官方对该参数的解释是:自动创建topic的特性主要是为了方便,但也可能带来一些风险,比如有的应用程序由于编码上的低级错误导致无意中创建了大量的topic。因此,生产环境建议将该参数设置为false,只有手工创建所需的topic。

3.4 发送json消息

sendjsonmessagetest.java

package com.alian.sync;

import com.alibaba.fastjson.jsonobject;
import lombok.extern.slf4j.slf4j;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.junit.jupiter.api.aftereach;
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.messaging.message;
import org.springframework.messaging.messageheaders;
import org.springframework.messaging.support.messagebuilder;

import java.util.hashmap;
import java.util.map;

@slf4j
@springboottest
public class sendjsonmessagetest {

    @autowired
    private rocketmqtemplate rocketmqtemplate;

    @test
    public void sendjsonmessage() {
        string topic = "json_message_topic";
        jsonobject json = new jsonobject();
        json.put("name", "alian");
        json.put("age", "28");
        json.put("hobby", "java");
        sendresult sendresult = rocketmqtemplate.syncsend(topic, json);
        log.info("同步发送返回的结果:{}", sendresult);
    }


    @test
    public void sendjsonmessagewithbuilder() {
        string topic = "json_message_topic";
        jsonobject json = new jsonobject();
        json.put("name", "alian");
        json.put("age", "28");
        json.put("hobby", "java");
        message<jsonobject> msg = messagebuilder.withpayload(json)
                // 消息类型
                .setheader(messageheaders.content_type, "application/json")
                .build();
        sendresult sendresult = rocketmqtemplate.syncsend(topic, msg);
        log.info("同步发送返回的结果:{}", sendresult);
    }

    @test
    public void sendmapmessage() {
        string topic = "json_message_topic";
        map<string, string> map = new hashmap<>();
        map.put("1", "java");
        map.put("2", "go");
        map.put("3", "c");
        map.put("4", "vue");
        map.put("5", "react");
        sendresult sendresult = rocketmqtemplate.syncsend(topic, map);
        log.info("同步发送返回的结果:{}", sendresult);
    }

    @aftereach
    public void waiting() {
        try {
            thread.sleep(3000l);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }

}

这里需要注意的是,我发送json消息的时候,topic都是 json_message_topic这里map消息也能被jsonobject消费。

3.5 发送java对象消息

sendjavaobjectmessagetest.java

package com.alian.sync;

import com.alian.common.member;
import com.alibaba.fastjson.jsonobject;
import lombok.extern.slf4j.slf4j;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.junit.jupiter.api.aftereach;
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.messaging.message;
import org.springframework.messaging.messageheaders;
import org.springframework.messaging.support.messagebuilder;

import java.util.date;
import java.util.hashmap;
import java.util.map;

@slf4j
@springboottest
public class sendjavaobjectmessagetest {

    @autowired
    private rocketmqtemplate rocketmqtemplate;

    @test
    public void sendjavaobjectmessage() {
        string topic = "java_object_message_topic";
        member member = new member();
        member.setid(10086l);
        member.setmembername("alian");
        member.setage(28);
        member.setbirthday(new date());
        sendresult sendresult = rocketmqtemplate.syncsend(topic, member);
        log.info("同步发送返回的结果:{}", sendresult);
    }

    @test
    public void sendjavaobjectmessagewithbuilder() {
        string topic = "java_object_message_topic";
        member member = new member();
        member.setid(10086l);
        member.setmembername("alian");
        member.setage(28);
        member.setbirthday(new date());
        message<member> msg = messagebuilder.withpayload(member)
                // 设置消息类型
                .setheader(messageheaders.content_type, "application/json")
                .build();
        sendresult sendresult = rocketmqtemplate.syncsend(topic, msg);
        log.info("同步发送返回的结果:{}", sendresult);
    }

    @aftereach
    public void waiting() {
        try {
            thread.sleep(3000l);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }

}

这里需要注意的是,我发送java对象消息的时候,topic都是 java_object_message_topic

四、消费者

我们在父工程下新建一个模块用于发送同步消息。

4.1 maven依赖

pom.xml

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactid>rocketmq</artifactid>
        <groupid>com.alian</groupid>
        <version>1.0.0-snapshot</version>
    </parent>
    <modelversion>4.0.0</modelversion>

    <artifactid>08-comsume-concurrent</artifactid>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupid>com.alian</groupid>
            <artifactid>common-rocketmq-dto</artifactid>
            <version>1.0.0-snapshot</version>
        </dependency>
    </dependencies>

</project>

4.2 application配置

application.properties

server.port=8008

# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的消费者组
rocketmq.consumer.group=concurrent_consumer_group
# 批量拉取消息的数量
rocketmq.consumer.pull-batch-size=10
# 集群消费模式
rocketmq.consumer.message-model=clustering

实际上对于本文来说,下面两个配置不用配置,也不会生效。

# 默认的消费者组
rocketmq.consumer.group=concurrent_consumer_group
# 集群消费模式
rocketmq.consumer.message-model=clustering

因为优先的是@rocketmqmessagelistener 注解中设置 consumergroup 和messagemodel 参数。

4.3 消费字符串消息

stringmessageconsumer.java

package com.alian.concurrent;

import lombok.extern.slf4j.slf4j;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.service;

@slf4j
@service
@rocketmqmessagelistener(topic = "string_message_topic",  
						 consumergroup = "concurrent_group_string",
						 consumethreadnumber = 1)
public class stringmessageconsumer implements rocketmqlistener<string> {

    @override
    public void onmessage(string message) {
        log.info("字符串消费者接收到的消息: {}", message);
        // 处理消息的业务逻辑
    }
}

如果要消费消息,我们需要实现rocketmqlistener<t>,实现onmessage方法。发送的是什么对象,我们接收就是什么对象,也就是t是什么对象。生产者发送的字符串消息,我们这里就用string接收,也就是rocketmqlistener<string>

package org.apache.rocketmq.spring.core;

public interface rocketmqlistener<t> {
    void onmessage(t var1);
}

同时加上@rocketmqmessagelistener注解,主要用到三个注解

  • topic:指定该消费者订阅的topic名称,可以是单个topic,也可以是用||分隔的多个topic
  • consumergroup:指定该消费者所属的消费组名称,消费组用于组织同一类消费实例,相同消费组的消费实例可以消费完全相同的消息,rocketmq 通过消费者组(consumer group)来维护不同消费者的消费进度。 每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(topic)和队列(queue)上已经消费到的位置。

需要注意的是:如果topic不存在,只有在生产者发送消息时,并且autocreatetopicenable设置为true的情况下,broker端才会自动创建该topic。消费者启动时,即使autocreatetopicenable=true,也不会自动创建不存在的topic。

具体来说:

  1. 生产者启动并发送消息到一个不存在的topic时:
  • 如果autocreatetopicenable=true,broker会自动创建该topic,允许消息发送成功
  • 如果autocreatetopicenable=false,broker会拒绝消息发送,报错该topic不存在
  1. 消费者启动订阅一个不存在的topic时:
  • 无论autocreatetopicenable的值是true还是false,broker都不会自动创建该topic
  • 消费者会启动成功,但获取不到消息,处于持续等待状态

所以生产者发送消息时才可能自动创建topic,而消费者启动时是不会自动创建topic的。

4.4 消费json消息

jsonmessageconsumer.java

package com.alian.concurrent;

import com.alibaba.fastjson.jsonobject;
import lombok.extern.slf4j.slf4j;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.service;

@slf4j
@service
@rocketmqmessagelistener(topic = "json_message_topic", consumergroup = "concurrent_group_json")
public class jsonmessageconsumer implements rocketmqlistener<jsonobject> {

    @override
    public void onmessage(jsonobject json) {
        log.info("json消费者接收到的消息: {}", json);
        // 处理消息的业务逻辑
    }
}

生产者发送的jsonobject消息,我们这里就用jsonobject接收,也就是rocketmqlistener<jsonobject>

4.5 消费java对象消息

javaobjectmessageconsumer.java

package com.alian.concurrent;

import com.alian.common.member;
import lombok.extern.slf4j.slf4j;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.service;

@slf4j
@service
@rocketmqmessagelistener(topic = "java_object_message_topic", consumergroup = "concurrent_group_java_object")
public class javaobjectmessageconsumer implements rocketmqlistener<member> {

    @override
    public void onmessage(member member) {
        // 因为发序列化的原因member必须是同一个包
        log.info("java对象消费者接收到的消息: {}", member);
        // 处理消息的业务逻辑
    }
}

生产者发送的member消息,我们这里就用member接收,也就是rocketmqlistener<member>。

这里需要再次说明下发送java对象消息时,因为反序列的原因,所以生产者和消费者使用的是公共包里同一个对象,也就是发送和接收的对象的包路径要一致。

五、部分运行结果

5.1、字符串消息

同步发送返回的结果:sendresult [sendstatus=send_ok, msgid=7f000001339418b4aac22db848c70000, offsetmsgid=c0a800ea00002a9f0000000000000000, messagequeue=messagequeue [topic=string_message_topic, brokername=broker-a, queueid=1], queueoffset=0]

字符串消费者接收到的消息: 我是一条同步文本消息:syncsendstringmessage

5.2、json消息

同步发送返回的结果:sendresult [sendstatus=send_ok, msgid=7f00000137fc18b4aac22db9d42f0000, offsetmsgid=c0a800ea00002a9f0000000000000146, messagequeue=messagequeue [topic=json_message_topic, brokername=broker-a, queueid=1], queueoffset=0]

json消费者接收到的消息: {"name":"alian","age":"28","hobby":"java"}

5.3、java对象消息

同步发送返回的结果:sendresult [sendstatus=send_ok, msgid=7f000001098c18b4aac22dbaceb50000, offsetmsgid=c0a800ea00002a9f0000000000000270, messagequeue=messagequeue [topic=java_object_message_topic, brokername=broker-a, queueid=0], queueoffset=0]

java对象消费者接收到的消息: member(id=10086, membername=alian, age=28, birthday=sat mar 09 21:06:57 cst 2024)

5.4、现象说明

为什么在rocketmq中当autocreatetopicenable=true,先启动消费者,然后生产者第一次向一个未创建的topic中发送消息时,消息发送成功了(马上返回成功了),但是消费者要等一段时间才能收到?这种现象的原因是rocketmq在创建topic时,存在一个延迟同步的过程。具体来说:

  • 当生产者发送消息到一个不存在的topic时,broker会自动创建该topic。
  • 但是broker创建topic后,并不会立即将其元数据信息同步给所有的消费者,而是异步延迟同步。
  • 生产者发送消息时,broker会立即返回发送成功响应,因为它只需将消息持久化存储即可。
  • 消费者在启动时已订阅了该topic,但由于元数据未同步,暂时无法获知该topic的路由信息,所以无法立即开始消费。
  • 过了一段时间后,broker将新创建的topic元数据同步给了消费者,消费者才开始从该topic拉取并消费消息。

这种延迟同步机制是rocketmq的一个设计,目的是为了减小创建topic时对消费者的影响,避免大量消费者同时更新元数据造成系统抖动。

我们可以通过调整broker的配置项来控制这个延迟同步时间,比如:

  • brokertopicputenable=true开启自动创建topic功能
  • topicdelayenable=true开启延时创建topic功能
  • topicdelayoffsetenabled=true开启延时创建topic偏移量同步功能

通过调小topicdelayoffsetinterval可以缩短元数据同步延迟时间,但也会增加系统开销。

所以这种现象实际上是rocketmq的一个正常设计行为,目的是为了系统整体的健壮性和可用性。如果应用对延迟时间不太敏感,保持默认配置即可;如果对延迟敏感,可以适当调小延迟同步时间。

以上就是springboot整合rocketmq实现发送同步消息的详细内容,更多关于springboot rocketmq发送同步消息的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com