当前位置: 代码网 > 手机>品牌>华为 > Rabbitmq消息顺序的问题以及解决方案

Rabbitmq消息顺序的问题以及解决方案

2024年07月31日 华为 我要评论
场景1:一个queue,多个consumer一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。人话就是:我确实保证了消息是按按顺序接的。但是由于每一个消息执行的是时间不一样。如果我前面的执行消息比较长,会导致我后面的操作比前面的操作更早执行。就出现了顺序错误。

1.1消息顺序的场景

场景1:一个queue,多个consumer

一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从mq里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。,

人话就是:我确实保证了消息是按按顺序接的。但是由于每一个消息执行的是时间不一样。如果我前面的执行消息比较长,会导致我后面的操作比前面的操作更早执行。就出现了顺序错误

 consumer多线程消费

一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。

人话:就是线程的执行是强制式的,你永远不知道下一个执行的会是哪个指令。

2.解决方案:

单消费者、单线程消费就行。

如果你非要多线程,你可以参考下,多线程输出a,b,c

 

/**
 * @author xdarker
 * 2018-5-17
 */
public class main {
	
	public static void main(string[] args) throws interruptedexception {
		
		int num = 1;//当前正在执行线程的标记
		abcprint print = new abcprint(num); 
		
		thread threada = new thread(new runnablea(print));
		thread threadb = new thread(new runnableb(print));
		thread threadc = new thread(new runnablec(print));
		threada.start();
		thread.sleep(500);
		threadb.start();
		thread.sleep(500);
		threadc.start();
	}
}
class runnablea implements runnable{
	
	private abcprint print;
	public runnablea(abcprint print) {
		super();
		this.print = print;	
	}
 
	@override
	public void run() {		
		print.printa();
		
	}
}
class runnableb implements runnable{
	
	private abcprint print;
	public runnableb(abcprint print) {
		super();
		this.print = print;
	}
 
	@override
	public void run() {		
		print.printb();
	}
}
class runnablec implements runnable{
	
	private abcprint print;
	public runnablec(abcprint print) {
		super();
		this.print = print;
	}
 
	@override
	public void run() {		
		print.printc();
	}
}
class abcprint {
 
	private int num;//当前正在执行线程的标记
	public abcprint(int num) {
		super();
		this.num = num;
	}
	
	
	public void printa(){
		for (int j = 0; j < 2; j++)//表示 循环打印2轮
		synchronized(this){
	              while(num != 1){
				try {
					this.wait();
				} catch (interruptedexception e) {
					e.printstacktrace();
				}
			}
			for (int i = 0; i < 3; i++) {//表示 打印3次
				system.out.println("a");  
			}
			
		    //打印a线程执行完 ,通知打印b线程
                    num = 2;  
                    this.notifyall();  
	        }
	}
	
	public void printb(){
		for (int j = 0; j < 2; j++)//表示 循环打印2轮
		synchronized(this){
			while(num != 2){
				try {
					this.wait();
				} catch (interruptedexception e) {
					e.printstacktrace();
				}
			}
			for (int i = 0; i < 2; i++) {//表示 打印2次
				system.out.println("b");  
			}
		    //打印b线程执行完 ,通知打印c线程
                    num = 3;  
                    this.notifyall();  
		}
	}
	
	public void printc(){
		for (int j = 0; j < 2; j++)//表示 循环打印2轮
		synchronized(this){
			while(num != 3){
				try {
					this.wait();
				} catch (interruptedexception e) {
					e.printstacktrace();
				}
			}
			
	            system.out.println("c");  
	            //打印c线程执行完 ,通知打印a线程
                    num = 1;  
                    this.notifyall();  
		}
	}	
}	

 

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com