什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
rabbitmq 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 rabbitmq 保证,通常也不需要开发关心。
不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。
消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,
虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息a、b、c按顺序进入队列,消费者a1拿到消息a、消费者b1拿到消息b, 结果消费者b执行速度快,就跑完了,又或者消费者a1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现
private queue creatqueue(string name){
// 创建一个 单活模式 队列
hashmap<string, object> args=new hashmap<>();
args.put("x-single-active-consumer",true);
return new queue(name,true,false,false,args);
}
创建之后,我们可以在控制台看到 消费者的激活状态
=======================>配置类
@configuration
@suppresswarnings("all")
public class directexchangeconfiguration {
@bean
public queue queue15_0() {
return creatqueue(message15.queue_0);
}
@bean
public queue queue15_1() {
return creatqueue(message15.queue_1);
}
@bean
public queue queue15_2() {
return creatqueue(message15.queue_2);
}
@bean
public queue queue15_3() {
return creatqueue(message15.queue_3);
}
@bean
public directexchange exchange15() {
// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
return new directexchange(message15.exchange, true, false);
}
@bean
public binding binding15_0() {
return bindingbuilder.bind(queue15_0()).to(exchange15()).with("0");
}
@bean
public binding binding15_1() {
return bindingbuilder.bind(queue15_1()).to(exchange15()).with("1");
}
@bean
public binding binding15_2() {
return bindingbuilder.bind(queue15_2()).to(exchange15()).with("2");
}
@bean
public binding binding15_3() {
return bindingbuilder.bind(queue15_3()).to(exchange15()).with("3");
}
/**
* 创建一个 单活 模式的队列
* 注意 :
* <p>
* 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除
*
* @param name
* @return queue
*/
private queue creatqueue(string name) {
// 创建一个 单活模式 队列
hashmap<string, object> args = new hashmap<>();
args.put("x-single-active-consumer", true);
return new queue(name, true, false, false, args);
}
=================================》生产者
@component
@slf4j
public class producer15 {
@resource
private rabbittemplate rabbittemplate;
/**
* 这里的发送是 拟投递到多个队列中
*
* @param id 业务id
* @param msg 业务信息
*/
public void syncsend(int id, string msg) {
message15 message = new message15(id, msg);
rabbittemplate.convertandsend(message15.exchange, this.getroutingkey(id), message);
}
/**
* 根据 id 取余来决定丢到那个队列中去
*
* @param id id
* @return routingkey
*/
private string getroutingkey(int id) {
return string.valueof(id % message15.queue_count);
}
}
============================》消费者
/**
* 要想保证消息的顺序,每个队列只能有一个消费者
*
* @author 深漂码农@明哥
* @date 2024-03-18
*/
@component
@rabbitlistener(queues = message15.queue_0)
@rabbitlistener(queues = message15.queue_1)
@rabbitlistener(queues = message15.queue_2)
@rabbitlistener(queues = message15.queue_3)
@slf4j
public class consumer15 {
@rabbithandler
public void onmessage(message15 message) throws interruptedexception {
log.info("[{}][consumer15 onmessage][线程编号:{} 消息内容:{}]", localdatetime.now(), thread.currentthread().getid(), message);
// 这里随机睡一会,模拟业务处理时候的耗时
long l = new random(1000).nextlong();
timeunit.milliseconds.sleep(l);
}
}
==============================》测试类
@test
void mock() throws interruptedexception {
// 先启动这个测试类,模拟多个副本情况下,看如何消费
new countdownlatch(1).await();
}
@test
void syncsend() throws interruptedexception {
// 模拟每个队列中扔 10 个数据,看看效果
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 4; j++) {
producer15.syncsend(j, " 编号:" + j + " 第:" + i + " 条消息");
}
}
timeunit.seconds.sleep(20);
}
}
ps:测试的时候时候 先启动 mock 方式。 在启动 syncsend 方法,模拟多个副本同时消费,观察是否可以
以上的是rabbitmq之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
发表评论