一、rabbitmq发送消息
一、简单模式
概述
一个生产者一个消费者
模型
代码
//没有交换机,两个参数为routingkey和消息内容
rabbittemplate.convertandsend("test1_queue","haha");
二、工作队列模式
概述
一个生产者,多个消费者,消费者之间负载均衡
模型
代码
//没有交换机,两个参数为routingkey和消息内容
rabbittemplate.convertandsend("test1_queue","haha");
三、发布订阅模式
概述
生产者把消息给交换机,交换机把消息推送给与它绑定的所有队列,消费者监听自己的队列
模型
代码
//该模式下,交换机与队列绑定无需routingkey,因此效率最高
rabbittemplate.convertandsend("fanout_exchange","","lala");
四、路由模式
概述
交换机与队列由routing key绑定,生产者发送消息时指定交换机和routing key,则对应的队列便会收到消息
模型
代码
rabbittemplate.convertandsend("direct_exchange","test1_queue","lala");
五、主题模式(通配符模式)
概述
交换机与队列由routing key绑定,但routing key由通配符和具体的字符组成,生产者输入具体的字符,交换机根据routing key的规则模糊匹配到对应的队列,则对应的队列会收到消息
模型
代码
/**
* 交换机与队列绑定
* @return
*/
@bean
binding truckhistorybinding(){
return bindingbuilder.bind(test1queue()).to(topicexchange()).with("*.test1.*");
}
@getmapping("/sendmessage")
public void sendmessage() {
//需要字符串的模糊匹配,效率最低
rabbittemplate.convertandsend("topic_exchange","aa.test1.cc","lala");
}
二、rabbitmq接收消息
一、拉模式
概述
消费者可以主动拉取队列里的消息
代码
rabbittemplate.execute(channel->{
//通过channel.basicget方法可以单条获取消息,其返回值时getreponse
getresponse response = channel.basicget("my_queue",false);
string message = new string(response.getbody());
}
)
二、推模式
概述
通过发布订阅模式,订阅队列里的消息
代码
@rabbitlistener(queues="my_queue")
public void onmessage(message messge,channel channel){
string msg = new string (message.getbody());
}
三、消息的手动确认
注意:
手动确认需要先将自动确认的配置注释掉;
消息确认模式有:
acknowledgemode.none:自动确认
acknowledgemode.auto:根据情况确认
acknowledgemode.manual:手动确认
默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
或在 rabbitlistenercontainerfactory 中进行开启手动 ack
@bean
public rabbitlistenercontainerfactory<?> rabbitlistenercontainerfactory(connectionfactory connectionfactory){
simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();
factory.setconnectionfactory(connectionfactory);
factory.setmessageconverter(new jackson2jsonmessageconverter());
factory.setacknowledgemode(acknowledgemode.manual); //开启手动 ack
return factory;
}
消费消息手动确认的监听器
获取消息消费的唯一标识
message.getmessageproperties().getdeliverytag();
执行业务处理
消息确认
//消费消息的手动确认,消息确认成功-basicack
//第一个参数deliverytag,消息的唯一标识
//第二个参数multiple,消息是否支持批量确认,如果是true,代表可以一次性确认标识小于等于当前标识的所有消息
//如果是false,只会确认当前消息
channel.basicack(deliverytag,false);
消息确认失败处理,根据条件判断设置是否重回队列 ,是否支持批量处理
//说明消费消息处理失败,如果不进行确认(自动确认,投递成功即确认,消费是否正常,不关心),消息就会丢失
//消息处理失败确认,代表消息没有正确消费,注意:此种方式一次只能确认一个消息
//第一给参数是消息的唯一标识,
//第二个参数是代表是否重回队列,如果是true,重新将该消息放入队列,再次消费
//注意:第二个参数要谨慎,必须要结合具体业务场景,根据业务判断是否需要重回队列,一旦处理不当,机会导致消息循环入队,消息挤压
//不重回队列 require = false
// channel.basicreject(deliverytag,false);
//重回队列 require = true
channel.basicreject(deliverytag,true);
//消息处理失败确认,代表消息没有正确消费,注意,此种方式支持批量
//第一个参数是消息的唯一标识,
//第二个参数是代表是否支持批量确认
//第三给参数代表是否重回队列
//不重回队列 require = false
channel.basicnack(deliverytag,true,false);
//重回队列 require = true
channel.basicnack(deliverytag,false,true);
发表评论