介绍
特点
关键词解释
如和保证新消息的可靠性(消息不丢失)
交换机的类型
direct exchange(直连交换机)
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:
1、将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
2、当一个携带着路由值为r的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为r的队列。
fanout exchange(扇型交换机)
扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列。不同于直连交换机,路由键在此类型上不启任务作用。如果n个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的发送给这所有的n个队列
topic exchange(主题交换机)
主题交换机(topic exchanges)中,队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。
扇型交换机和主题交换机异同:
- 对于扇型交换机路由键是没有意义的,只要有消息,它都发送到它绑定的所有队列上
- 对于主题交换机,路由规则由路由键决定,只有满足路由键的规则,消息才可以路由到对应的队列上
路由规则
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列q1 绑定键为*.tt.* 队列q2绑定键为 tt.#
如果一条消息携带的路由键为 a.tt.b,那么队列q1将会收到;
如果一条消息携带的路由键为tt.aa.bb,那么队列q2将会收到;
主题交换机的强大之处
当一个队列的绑定键为 #(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当* (星号) 和#(井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能
死信队列?
延迟队列?
生产者如何将消息可靠投递到mq?
mq如何将消息可靠投递到消费者?
如何保证rabbitmq消息队列的高可用?
如何避免消息重复投递或重复消费?
在消息生产时,mq内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个 bizid(对于同一业务全局唯一,如支付id、订单id、帖子id 等)作为去重的依据,避免同一条消息被重复消费。
rabbitmq如何保证消息的顺序性
rabbitmq消息重试机制
查看更多:
mq——rabbit面试问题_mq面试必会6题经典-csdn博客
消息确认回调机制
在rabbitmq-provider
项目的application.yml
文件上,加上消息确认的配置项后
server:
port: 8021
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitmq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: jcccchost
#消息确认配置项
#确认消息已发送到交换机(exchange)
publisher-confirms: true
#确认消息已发送到队列(queue)
publisher-returns: true
然后是配置相关的消息确认回调函数,rabbitconfig.java
:
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.connection.correlationdata;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class rabbitconfig {
@bean
public rabbittemplate createrabbittemplate(connectionfactory connectionfactory){
rabbittemplate rabbittemplate = new rabbittemplate();
rabbittemplate.setconnectionfactory(connectionfactory);
//设置开启mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbittemplate.setmandatory(true);
rabbittemplate.setconfirmcallback(new rabbittemplate.confirmcallback() {
@override
public void confirm(correlationdata correlationdata, boolean ack, string cause) {
system.out.println("confirmcallback: "+"相关数据:"+correlationdata);
system.out.println("confirmcallback: "+"确认情况:"+ack);
system.out.println("confirmcallback: "+"原因:"+cause);
}
});
rabbittemplate.setreturncallback(new rabbittemplate.returncallback() {
@override
public void returnedmessage(message message, int replycode, string replytext, string exchange, string routingkey) {
system.out.println("returncallback: "+"消息:"+message);
system.out.println("returncallback: "+"回应码:"+replycode);
system.out.println("returncallback: "+"回应信息:"+replytext);
system.out.println("returncallback: "+"交换机:"+exchange);
system.out.println("returncallback: "+"路由键:"+routingkey);
}
});
return rabbittemplate;
}
}
推送消息存在四种情况:
1、消息推送到server
,但是在server
里找不到交换机--触发的是 confirmcallback
回调函数。
2、消息推送到server
,找到交换机了,但是没找到队列--触发的是 confirmcallback
和retruncallback
两个回调函数。
3、消息推送到sever
,交换机和队列啥都没找到--触发的是confirmcallback
回调函数。
4、消息推送成功--触发的是confirmcallback
回调函数。
消费者消息确认机制
自动确认, 这也是默认的消息确认情况。 acknowledgemode.none
手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
看了上面这么多介绍,接下来我们一起配置下,看看一般的消息接收手动确认是怎么样的。
在消费者项目里,新建messagelistenerconfig.java
上添加代码相关的配置代码:
import com.elegant.rabbitmqconsumer.receiver.myackreceiver;
import org.springframework.amqp.core.acknowledgemode;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;
import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class messagelistenerconfig {
@autowired
private cachingconnectionfactory connectionfactory;
@autowired
private myackreceiver myackreceiver;//消息接收处理类
@bean
public simplemessagelistenercontainer simplemessagelistenercontainer() {
simplemessagelistenercontainer container = new simplemessagelistenercontainer(connectionfactory);
container.setconcurrentconsumers(1);
container.setmaxconcurrentconsumers(1);
// rabbitmq默认是自动确认,这里改为手动确认消息
container.setacknowledgemode(acknowledgemode.manual);
//设置一个队列
container.setqueuenames("testdirectqueue");
//如果同时设置多个如下: 前提是队列都是必须已经创建存在的
// container.setqueuenames("testdirectqueue","testdirectqueue2","testdirectqueue3");
//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addqueues
//container.setqueues(new queue("testdirectqueue",true));
//container.addqueues(new queue("testdirectqueue2",true));
//container.addqueues(new queue("testdirectqueue3",true));
container.setmessagelistener(myackreceiver);
return container;
}
}
对应的手动确认消息监听类,myackreceiver.java(手动确认模式需要实现 channelawaremessagelistener):
//之前的相关监听器可以先注释掉,以免造成多个同类型监听器都监听同一个队列。
//这里的获取消息转换,只作参考,如果报数组越界可以自己根据格式去调整。
import com.rabbitmq.client.channel;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.listener.api.channelawaremessagelistener;
import org.springframework.stereotype.component;
import java.util.hashmap;
import java.util.map;
@component
public class myackreceiver implements channelawaremessagelistener {
@override
public void onmessage(message message, channel channel) throws exception {
long deliverytag = message.getmessageproperties().getdeliverytag();
try {
//因为传递消息的时候用的map传递,所以将map从message内取出需要做些处理
string msg = message.tostring();
string[] msgarray = msg.split("'");//可以点进message里面看源码,单引号直接的数据就是我们的map消息数据
map<string, string> msgmap = mapstringtomap(msgarray[1].trim(),3);
string messageid=msgmap.get("messageid");
string messagedata=msgmap.get("messagedata");
string createtime=msgmap.get("createtime");
system.out.println(" myackreceiver messageid:"+messageid+" messagedata:"+messagedata+" createtime:"+createtime);
system.out.println("消费的主题消息来自:"+message.getmessageproperties().getconsumerqueue());
channel.basicack(deliverytag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
// channel.basicreject(deliverytag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
} catch (exception e) {
channel.basicreject(deliverytag, false);
e.printstacktrace();
}
}
//{key=value,key=value,key=value} 格式转换成map
private map<string, string> mapstringtomap(string str,int entrynum ) {
str = str.substring(1, str.length() - 1);
string[] strs = str.split(",",entrynum);
map<string, string> map = new hashmap<string, string>();
for (string string : strs) {
string key = string.split("=")[0].trim();
string value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
这时,先调用接口/senddirectmessage
, 给直连交换机testdirectexchange
的队列testdirectqueue
推送一条消息,可以看到监听器正常消费了下来:
到这里,我们其实已经掌握了怎么去使用消息消费的手动确认了。
但是这个场景往往不够! 因为很多伙伴之前给我评论反应,他们需要这个消费者项目里面,监听的好几个队列都想变成手动确认模式,而且处理的消息业务逻辑不一样。
没有问题,接下来看代码
场景: 除了直连交换机的队列testdirectqueue需要变成手动确认以外,我们还需要将一个其他的队列
或者多个队列也变成手动确认,而且不同队列实现不同的业务处理。
那么我们需要做的第一步,往simplemessagelistenercontainer里添加多个队列:
然后我们的手动确认消息监听类,myackreceiver.java
就可以同时将上面设置到的队列的消息都消费下来。
但是我们需要做不用的业务逻辑处理,那么只需要 根据消息来自的队列名进行区分处理即可,如:
import com.rabbitmq.client.channel;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.listener.api.channelawaremessagelistener;
import org.springframework.stereotype.component;
import java.util.hashmap;
import java.util.map;
@component
public class myackreceiver implements channelawaremessagelistener {
@override
public void onmessage(message message, channel channel) throws exception {
long deliverytag = message.getmessageproperties().getdeliverytag();
try {
//因为传递消息的时候用的map传递,所以将map从message内取出需要做些处理
string msg = message.tostring();
string[] msgarray = msg.split("'");//可以点进message里面看源码,单引号直接的数据就是我们的map消息数据
map<string, string> msgmap = mapstringtomap(msgarray[1].trim(),3);
string messageid=msgmap.get("messageid");
string messagedata=msgmap.get("messagedata");
string createtime=msgmap.get("createtime");
if ("testdirectqueue".equals(message.getmessageproperties().getconsumerqueue())){
system.out.println("消费的消息来自的队列名为:"+message.getmessageproperties().getconsumerqueue());
system.out.println("消息成功消费到 messageid:"+messageid+" messagedata:"+messagedata+" createtime:"+createtime);
system.out.println("执行testdirectqueue中的消息的业务处理流程......");
}
if ("fanout.a".equals(message.getmessageproperties().getconsumerqueue())){
system.out.println("消费的消息来自的队列名为:"+message.getmessageproperties().getconsumerqueue());
system.out.println("消息成功消费到 messageid:"+messageid+" messagedata:"+messagedata+" createtime:"+createtime);
system.out.println("执行fanout.a中的消息的业务处理流程......");
}
channel.basicack(deliverytag, true);
// channel.basicreject(deliverytag, true);//为true会重新放回队列
} catch (exception e) {
channel.basicreject(deliverytag, false);
e.printstacktrace();
}
}
//{key=value,key=value,key=value} 格式转换成map
private map<string, string> mapstringtomap(string str,int ennum) {
str = str.substring(1, str.length() - 1);
string[] strs = str.split(",",ennum);
map<string, string> map = new hashmap<string, string>();
for (string string : strs) {
string key = string.split("=")[0].trim();
string value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
ok,这时候我们来分别往不同队列推送消息,看看效果:
调用接口/senddirectmessage
和/sendfanoutmessage
,
查看更详细的内容:
rabbitmq详解,用心看完这一篇就够了【重点】-csdn博客
发表评论