一、简介
今天我们讲讲如何批量发送消息,主要还是使用方法rocketmqtemplate的syncsend方法。
1.1、特点
批量发送和单条发送消息的主要区别有以下几点:
- 网络开销 发送单条消息时,每个消息都需要单独建立网络连接、发送数据包、等待响应等,网络开销较大。批量发送可以将多条消息打包在一起发送,减少网络连接建立的次数,降低网络开销
- 吞吐量 由于批量发送减少了网络开销,所以可以在单位时间内发送更多的消息,提高了吞吐量。在高并发高流量场景下,批量发送能够发挥更好的性能
- 消息顺序 单条发送消息的顺序是有序的,后发送的在队列中排在前发送的后面。而对于批量发送,一个批次内的消息顺序是固定的,但不同批次之间的消息顺序是无序的,会按照到达顺序存储在队列中。如果需要严格消息顺序,单条发送更合适
- 消息重试 如果批量发送的一个批次中有部分消息发送失败,需重发整个批次,没有选择重发其中部分消息的功能(涉及幂等性问题)。单条发送失败时只需重发该单条消息
- 编程复杂度 批量发送需要构造messagebatch或message列表对象,编程略微复杂些。单条发送只需构造单个message对象
二、maven依赖
pom.xml
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactid>rocketmq</artifactid> <groupid>com.alian</groupid> <version>1.0.0-snapshot</version> </parent> <modelversion>4.0.0</modelversion> <artifactid>06-send-batched-message</artifactid> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupid>com.alian</groupid> <artifactid>common-rocketmq-dto</artifactid> <version>1.0.0-snapshot</version> </dependency> </dependencies> </project>
父工程已经在我上一篇文章里,通用公共包也在我上一篇文章里有说明,包括消费者。具体参考:springboot整合rocketmq实现发送同步消息_java_代码网 (jb51.net)
三、application配置
application.properties
server.port=8005 # rocketmq地址 rocketmq.name-server=192.168.0.234:9876 # 默认的生产者组 rocketmq.producer.group=batched_group # 发送同步消息超时时间 rocketmq.producer.send-message-timeout=3000 # 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器 rocketmq.producer.retry-next-server=true # 用于指定消息发送失败时的重试次数 rocketmq.producer.retry-times-when-send-failed=3 # 设置消息压缩的阈值,为0表示禁用消息体的压缩 rocketmq.producer.compress-message-body-threshold=0
四、批量发送
在 rocketmq 中,rocketmqtemplate的syncsend方法,它允许你批量发送同步消息,主要参数:
- topic:(普通消息都发送到topic=string_message_topic)
- collection<t>:消息集合
测试类都引入依赖
@autowired private rocketmqtemplate rocketmqtemplate;
4.1、同步消息
@test public void syncsendbatchstringmessageswithbuilder() { string topic = "string_message_topic"; string message = "超级喜欢golang语言:"; list<message<string>> messagelist = new arraylist<>(); for (int i = 0; i < 10; i++) { message<string> rocketmessage = messagebuilder.withpayload(message + i) // 设置消息类型 .setheader(messageheaders.content_type, "text/plain") .build(); // 加入到列表 messagelist.add(rocketmessage); } // 使用syncsend发送批量消息 sendresult sendresult = rocketmqtemplate.syncsend(topic, messagelist); log.info("同步批量发送普通消息结果:{}",sendresult); }
运行结果:
[_group_string_1] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:0
[_group_string_2] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:1
[_group_string_4] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:3
[_group_string_5] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:4
[_group_string_3] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:2
[_group_string_6] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:5
[_group_string_7] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:6
[_group_string_8] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:7
[_group_string_9] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:8
[group_string_10] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送普通消息:9
4.2、异步消息
@test public void asyncsendbatchstringmessagewithbuilder() { string topic = "string_message_topic"; string message = "alian超级喜欢golang语言:"; list<message<string>> messagelist = new arraylist<>(); for (int i = 0; i < 10; i++) { message<string> rocketmessage = messagebuilder.withpayload(message + i) // 设置消息类型 .setheader(messageheaders.content_type, "text/plain") .build(); // 加入到列表 messagelist.add(rocketmessage); } // 使用asyncsend发送批量消息 rocketmqtemplate.asyncsend(topic, messagelist, new sendcallback() { @override public void onsuccess(sendresult sendresult) { // 异步发送成功的回调逻辑 log.info("异步批量发送普通消息成功: " + sendresult); } @override public void onexception(throwable e) { // 异步发送失败的回调逻辑 log.info("异步批量发送普通消息失败: " + e.getmessage()); } }); }
运行结果:
[_group_string_1] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:0
[_group_string_8] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:1
[_group_string_3] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:7
[_group_string_6] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:4
[_group_string_9] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:2
[_group_string_5] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:6
[group_string_10] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:3
[_group_string_2] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:8
[_group_string_4] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:9
[_group_string_7] c.a.concurrent.stringmessageconsumer : 字符串消费者接收到的消息: 异步批量发送普通消息:5
4.3、顺序消息
在 rocketmq 中,rocketmqtemplate的syncsendorderly方法,它允许你批量发送同步消息,主要参数:
- topic:(和之前有区别,普通消息都发送到topic=ordered_string_message_topic)
- collection<t>:消息集合
- hashkey:通过hashkey发送到同一个队列
@test public void syncsendbatchorderlystringmessageswithbuilder() { string topic = "ordered_string_message_topic"; string message = "同步批量发送顺序消息,超级喜欢go语言:"; list<message<string>> messagelist = new arraylist<>(); for (int i = 0; i < 10; i++) { message<string> rocketmessage = messagebuilder.withpayload(message + i) // 设置消息类型 .setheader(messageheaders.content_type, "text/plain") .build(); // 加入到列表 messagelist.add(rocketmessage); } // 使用syncsendorderly发送批量顺序消息,消费者线程设置为1 sendresult sendresult = rocketmqtemplate.syncsendorderly(topic, messagelist, "alian_sync_ordered"); log.info("批量发送顺序消息发送结果:{}",sendresult); }
运行结果:
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:0
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:1
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:2
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:3
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:4
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:5
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:6
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:7
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:8
[group_string_10] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢go语言:9
所以我之前说批量发送消息的topic不一样,因为
@slf4j @service @rocketmqmessagelistener(topic = "ordered_string_message_topic", consumergroup = "ordered_group_string", consumemode = consumemode.orderly) public class stringmessageconsumer implements rocketmqlistener<string> { @override public void onmessage(string message) { log.info("字符串消费者接收到的消息: {}", message); // 处理消息的业务逻辑 } }
顺序消息要顺序消费,也就是每次是一个线程去消费,相当于单线程,也就有序了。关键就是配置了:consumemode = consumemode.orderly
当然,我们也可以把消费者线程数设置为 consumethreadnumber = 1,也就是单线程消费了,从而确保了消息的顺序消费(指单实例):
@rocketmqmessagelistener(topic = "ordered_string_message_topic", consumergroup = "concurrent_group_string", consumethreadnumber = 1) public class stringmessageconsumer implements rocketmqlistener<string> { @override public void onmessage(string message) { log.info("字符串消费者接收到的消息: {}", message); // 处理消息的业务逻辑 } }
4.4、关于异步批量发送
有可能你会写下面的异步批量发送顺序消息
@test public void asyncsendbatchorderlystringmessagewithbuilder2() { string topic = "ordered_string_message_topic"; string message = "alian超级喜欢golang语言:"; list<string> messagelist = new arraylist<>(); for (int i = 0; i < 10; i++) { // 加入到列表 messagelist.add(message + i); } // 使用 asyncsendorderly 发送批量消息 rocketmqtemplate.asyncsendorderly(topic, messagelist, "alian_async_ordered", new sendcallback() { @override public void onsuccess(sendresult sendresult) { // 异步发送成功的回调逻辑 log.info("异步消息发送字符串消息成功: " + sendresult); } @override public void onexception(throwable e) { // 异步发送失败的回调逻辑 log.info("异步消息发送字符串消息失败: " + e.getmessage()); } }); }
其实这个是不对的,最终的结果是一个把你这里的messagelist,当做了一个消息列表接收了,如下结果:
[group_string_18] com.alian.ordered.stringmessageconsumer : 字符串消费者接收到的消息: ["alian超级喜欢golang语言:0","alian超级喜欢golang语言:1","alian超级喜欢golang语言:2","alian超级喜欢golang语言:3","alian超级喜欢golang语言:4","alian超级喜欢golang语言:5","alian超级喜欢golang语言:6","alian超级喜欢golang语言:7","alian超级喜欢golang语言:8","alian超级喜欢golang语言:9"]
rocketmq对于单条消息和批量消息在队列中是如何被处理的?
- 对于单条发送的消息,rocketmq会按照队列中的顺序,将每条消息分发给一个消费者线程。因此,即使有多个消费者线程,由于每条消息都被单独处理,消费的顺序仍然会与发送的顺序一致。
- 对于批量发送的消息,情况就有所不同。批量消息是作为一个整体发送的,因此在队列中,它们被视为一个单独的实体。当rocketmq从队列中取出批量消息时,它会将整个批量消息作为一个整体分发给一个消费者线程。如果有多个消费者线程,由于操作系统的线程调度策略,处理批量消息的线程可能会在处理消息的过程中被调度出去,从而允许其他线程处理后面的消息。这样就可能导致消费的顺序与发送的顺序不一致。
4.5、结论
为此我测试了多次,得到结论:
- 单条发送消息到同一个队列,使用多个消费线程消费该队列,由于消息本身是有序的,所以消费顺序也是有序的
- 单批次批量发送消息到同一个队列,使用单个消费线程消费该队列,由于消费线程是单一的,所以消费顺序也是有序的
- 单批次批量发送消息到同一个队列,使用多个消费线程消费时,消费顺序就不是有序的了
五、其他
既然知道批量消息是作为一个整体的,那么肯定就会对消息大小有限制,在 apache rocketmq 中,批量消息的大小默认限制是4mb。这意味着,你不能发送总大小超过4mb的批量消息。
如果你想修改这个限制,你需要修改rocketmq的配置。具体的修改方法如下:
- 找到rocketmq的配置文件broker.conf,这个文件通常位于rocketmq安装目录的conf目录下。
- 在broker.conf文件中,找到maxmessagesize这个配置项。这个配置项决定了批量消息的最大大小。
- 修改maxmessagesize的值为你想要的大小。注意,这个值是以字节为单位的,所以如果你想设置批量消息的最大大小为8mb,你应该设置maxmessagesize=8388608。
- 保存并关闭broker.conf文件。
- 重启rocketmq的broker服务,以使新的配置生效。
虽然你可以通过修改配置来增加批量消息的最大大小,但是你应该谨慎地考虑这个决定。增加批量消息的最大大小可能会增加broker的内存使用量,并可能影响到消息的发送和接收性能。因此,在修改这个配置之前,你应该先考虑你的应用的需求和broker的性能。
因为优先的是@rocketmqmessagelistener 注解中设置 consumergroup 和messagemodel 参数。
以上就是springboot整合rocketmq批量发送消息的实现的详细内容,更多关于springboot rocketmq批量发送消息的资料请关注代码网其它相关文章!
发表评论