当前位置: 代码网 > 手机>品牌>华为 > RabbitMQ

RabbitMQ

2024年07月31日 华为 我要评论
​​​​​​在消费者项目里,新建。

介绍

 特点

关键词解释

如和保证新消息的可靠性(消息不丢失) 

 交换机的类型

direct exchange(直连交换机)
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:

1、将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
2、当一个携带着路由值为r的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为r的队列。

fanout exchange(扇型交换机)
扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列。不同于直连交换机,路由键在此类型上不启任务作用。如果n个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的发送给这所有的n个队列

topic exchange(主题交换机)
主题交换机(topic exchanges)中,队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。

扇型交换机和主题交换机异同:

  • 对于扇型交换机路由键是没有意义的,只要有消息,它都发送到它绑定的所有队列上
  • 对于主题交换机,路由规则由路由键决定,只有满足路由键的规则,消息才可以路由到对应的队列上

 路由规则

* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的,举个小例子
队列q1 绑定键为*.tt.* 队列q2绑定键为 tt.#
如果一条消息携带的路由键为 a.tt.b,那么队列q1将会收到;
如果一条消息携带的路由键为tt.aa.bb,那么队列q2将会收到;

主题交换机的强大之处

当一个队列的绑定键为 #(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。

当* (星号) 和#(井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能

死信队列?

延迟队列?

生产者如何将消息可靠投递到mq?

 mq如何将消息可靠投递到消费者?

如何保证rabbitmq消息队列的高可用? 

如何避免消息重复投递或重复消费?

在消息生产时,mq内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个 bizid(对于同一业务全局唯一,如支付id、订单id、帖子id 等)作为去重的依据,避免同一条消息被重复消费。

rabbitmq如何保证消息的顺序性

 rabbitmq消息重试机制

查看更多:

 mq——rabbit面试问题_mq面试必会6题经典-csdn博客

消息确认回调机制

rabbitmq-provider项目的application.yml文件上,加上消息确认的配置项后

server:
  port: 8021
spring:
  #给项目来个名字
  application:
    name: rabbitmq-provider
  #配置rabbitmq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: root
    #虚拟host 可以不设置,使用server默认host
    virtual-host: jcccchost
    #消息确认配置项
 
    #确认消息已发送到交换机(exchange)
    publisher-confirms: true
    #确认消息已发送到队列(queue)
    publisher-returns: true

然后是配置相关的消息确认回调函数,rabbitconfig.java

import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.connection.correlationdata;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class rabbitconfig {
 
    @bean
    public rabbittemplate createrabbittemplate(connectionfactory connectionfactory){
        rabbittemplate rabbittemplate = new rabbittemplate();
        rabbittemplate.setconnectionfactory(connectionfactory);
        //设置开启mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbittemplate.setmandatory(true);
 
        rabbittemplate.setconfirmcallback(new rabbittemplate.confirmcallback() {
            @override
            public void confirm(correlationdata correlationdata, boolean ack, string cause) {
                system.out.println("confirmcallback:     "+"相关数据:"+correlationdata);
                system.out.println("confirmcallback:     "+"确认情况:"+ack);
                system.out.println("confirmcallback:     "+"原因:"+cause);
            }
        });
 
        rabbittemplate.setreturncallback(new rabbittemplate.returncallback() {
            @override
            public void returnedmessage(message message, int replycode, string replytext, string exchange, string routingkey) {
                system.out.println("returncallback:     "+"消息:"+message);
                system.out.println("returncallback:     "+"回应码:"+replycode);
                system.out.println("returncallback:     "+"回应信息:"+replytext);
                system.out.println("returncallback:     "+"交换机:"+exchange);
                system.out.println("returncallback:     "+"路由键:"+routingkey);
            }
        });
        return rabbittemplate;
    }
}

推送消息存在四种情况:

1、消息推送到server,但是在server里找不到交换机--触发的是 confirmcallback 回调函数。
2、消息推送到server,找到交换机了,但是没找到队列--触发的是 confirmcallbackretruncallback两个回调函数
3、消息推送到sever,交换机和队列啥都没找到--触发的是confirmcallback回调函数
4、消息推送成功--触发的是confirmcallback回调函数。

消费者消息确认机制

自动确认, 这也是默认的消息确认情况。 acknowledgemode.none

手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。

看了上面这么多介绍,接下来我们一起配置下,看看一般的消息接收手动确认是怎么样的。
​​​​​​
在消费者项目里,新建messagelistenerconfig.java上添加代码相关的配置代码:

import com.elegant.rabbitmqconsumer.receiver.myackreceiver;
import org.springframework.amqp.core.acknowledgemode;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;
import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class messagelistenerconfig {
 
    @autowired
    private cachingconnectionfactory connectionfactory;
    @autowired
    private myackreceiver myackreceiver;//消息接收处理类
 
    @bean
    public simplemessagelistenercontainer simplemessagelistenercontainer() {
        simplemessagelistenercontainer container = new simplemessagelistenercontainer(connectionfactory);
        container.setconcurrentconsumers(1);
        container.setmaxconcurrentconsumers(1);
        // rabbitmq默认是自动确认,这里改为手动确认消息
        container.setacknowledgemode(acknowledgemode.manual); 
        //设置一个队列
        container.setqueuenames("testdirectqueue");
        //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
        //  container.setqueuenames("testdirectqueue","testdirectqueue2","testdirectqueue3");
 
        //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addqueues
        //container.setqueues(new queue("testdirectqueue",true));
        //container.addqueues(new queue("testdirectqueue2",true));
        //container.addqueues(new queue("testdirectqueue3",true));
        container.setmessagelistener(myackreceiver);
 
        return container;
    }
}

对应的手动确认消息监听类,myackreceiver.java(手动确认模式需要实现 channelawaremessagelistener):
//之前的相关监听器可以先注释掉,以免造成多个同类型监听器都监听同一个队列。
//这里的获取消息转换,只作参考,如果报数组越界可以自己根据格式去调整。

import com.rabbitmq.client.channel;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.listener.api.channelawaremessagelistener;
import org.springframework.stereotype.component;
import java.util.hashmap;
import java.util.map;
 
@component
 
public class myackreceiver implements channelawaremessagelistener {
 
    @override
    public void onmessage(message message, channel channel) throws exception {
        long deliverytag = message.getmessageproperties().getdeliverytag();
        try {
            //因为传递消息的时候用的map传递,所以将map从message内取出需要做些处理
            string msg = message.tostring();
            string[] msgarray = msg.split("'");//可以点进message里面看源码,单引号直接的数据就是我们的map消息数据
            map<string, string> msgmap = mapstringtomap(msgarray[1].trim(),3);
            string messageid=msgmap.get("messageid");
            string messagedata=msgmap.get("messagedata");
            string createtime=msgmap.get("createtime");
            system.out.println("  myackreceiver  messageid:"+messageid+"  messagedata:"+messagedata+"  createtime:"+createtime);
            system.out.println("消费的主题消息来自:"+message.getmessageproperties().getconsumerqueue());
            channel.basicack(deliverytag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
//			channel.basicreject(deliverytag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
        } catch (exception e) {
            channel.basicreject(deliverytag, false);
            e.printstacktrace();
        }
    }
 
     //{key=value,key=value,key=value} 格式转换成map
    private map<string, string> mapstringtomap(string str,int entrynum ) {
        str = str.substring(1, str.length() - 1);
        string[] strs = str.split(",",entrynum);
        map<string, string> map = new hashmap<string, string>();
        for (string string : strs) {
            string key = string.split("=")[0].trim();
            string value = string.split("=")[1];
            map.put(key, value);
        }
        return map;
    }
}

这时,先调用接口/senddirectmessage, 给直连交换机testdirectexchange 的队列testdirectqueue 推送一条消息,可以看到监听器正常消费了下来:

到这里,我们其实已经掌握了怎么去使用消息消费的手动确认了。
但是这个场景往往不够! 因为很多伙伴之前给我评论反应,他们需要这个消费者项目里面,监听的好几个队列都想变成手动确认模式,而且处理的消息业务逻辑不一样。

没有问题,接下来看代码

场景: 除了直连交换机的队列testdirectqueue需要变成手动确认以外,我们还需要将一个其他的队列

或者多个队列也变成手动确认,而且不同队列实现不同的业务处理。

那么我们需要做的第一步,往simplemessagelistenercontainer里添加多个队列:

 

然后我们的手动确认消息监听类,myackreceiver.java 就可以同时将上面设置到的队列的消息都消费下来。

但是我们需要做不用的业务逻辑处理,那么只需要 根据消息来自的队列名进行区分处理即可,如:

import com.rabbitmq.client.channel;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.listener.api.channelawaremessagelistener;
import org.springframework.stereotype.component;
import java.util.hashmap;
import java.util.map;
 
@component
public class myackreceiver implements channelawaremessagelistener {
 
    @override
    public void onmessage(message message, channel channel) throws exception {
        long deliverytag = message.getmessageproperties().getdeliverytag();
        try {
            //因为传递消息的时候用的map传递,所以将map从message内取出需要做些处理
            string msg = message.tostring();
            string[] msgarray = msg.split("'");//可以点进message里面看源码,单引号直接的数据就是我们的map消息数据
            map<string, string> msgmap = mapstringtomap(msgarray[1].trim(),3);
            string messageid=msgmap.get("messageid");
            string messagedata=msgmap.get("messagedata");
            string createtime=msgmap.get("createtime");
            
            if ("testdirectqueue".equals(message.getmessageproperties().getconsumerqueue())){
                system.out.println("消费的消息来自的队列名为:"+message.getmessageproperties().getconsumerqueue());
                system.out.println("消息成功消费到  messageid:"+messageid+"  messagedata:"+messagedata+"  createtime:"+createtime);
                system.out.println("执行testdirectqueue中的消息的业务处理流程......");  
            }
 
            if ("fanout.a".equals(message.getmessageproperties().getconsumerqueue())){
                system.out.println("消费的消息来自的队列名为:"+message.getmessageproperties().getconsumerqueue());
                system.out.println("消息成功消费到  messageid:"+messageid+"  messagedata:"+messagedata+"  createtime:"+createtime);
                system.out.println("执行fanout.a中的消息的业务处理流程......");
 
            }
            
            channel.basicack(deliverytag, true);
//			channel.basicreject(deliverytag, true);//为true会重新放回队列
        } catch (exception e) {
            channel.basicreject(deliverytag, false);
            e.printstacktrace();
        }
    }
 
    //{key=value,key=value,key=value} 格式转换成map
    private map<string, string> mapstringtomap(string str,int ennum) {
        str = str.substring(1, str.length() - 1);
        string[] strs = str.split(",",ennum);
        map<string, string> map = new hashmap<string, string>();
        for (string string : strs) {
            string key = string.split("=")[0].trim();
            string value = string.split("=")[1];
            map.put(key, value);
        }
        return map;
    }
}

ok,这时候我们来分别往不同队列推送消息,看看效果:

调用接口/senddirectmessage 和/sendfanoutmessage

 查看更详细的内容:

rabbitmq详解,用心看完这一篇就够了【重点】-csdn博客

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com