当前位置: 代码网 > 服务器>软件设计>开源 > 【RabbitMQ】RabbitMQ配置与交换机学习

【RabbitMQ】RabbitMQ配置与交换机学习

2024年07月28日 开源 我要评论
RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)。RabbitMQ支持多种消息传递协议,具有高可靠性、高可用性和高性能等特点。它允许应用程序通过消息队列进行异步通信,从而实现解耦和负载均衡。RabbitMQ的核心概念包括交换机(Exchange)、队列(Queue)和绑定(Binding),它们共同构成了消息的路由和传递机制。RabbitMQ的架构如图:publisher:生产者,也就是发送消息的一方consumer:消费者,也就是消费消息的一方queue。

文章目录

    • 简介
    • 安装和部署
        • 1. 安装rabbitmq
        • 2.创建virtual-host
        • 3. 添加依赖
        • 4.修改配置文件
    • workqueues模型
        • 1.编写消息发送测试类
        • 2.编写消息接收(监听)类
        • 3. 实现能者多劳
    • 交换机
      • fanout交换机
        • 1.消息发送
        • 2.消息监听
      • direct交换机
        • 1.消息发送
        • 2.消息接收
      • topic交换机
        • 1.消息发送
        • 2.消息接收
    • 声明队列和交换机
        • 声明队列
        • 声明交换机
        • 绑定队列和交换机
          • 1.fanout示例
          • 2. direct示例
          • 3.基于注解的方式声明队列和交换机
        • 消息转换器
    • 总结

简介

rabbitmq是一个开源的消息代理软件,它实现了高级消息队列协议(amqp)。rabbitmq支持多种消息传递协议,具有高可靠性、高可用性和高性能等特点。它允许应用程序通过消息队列进行异步通信,从而实现解耦和负载均衡。rabbitmq的核心概念包括交换机(exchange)、队列(queue)和绑定(binding),它们共同构成了消息的路由和传递机制。

rabbitmq的架构如图:

47ab6076-c1b1-45e1-b8c4-f6c41a601b21

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

安装和部署

这里以centos7为例:

1. 安装rabbitmq
docker run \
 -e rabbitmq_default_user=shijun \
 -e rabbitmq_default_pass=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hm-net\
 -d \
 rabbitmq:3.8-management

安装完成后访问:http://虚拟机ip地址:15672

输入刚才的账号密码:shijun 123321,就能进入控制台界面。

image-20240608222043550

2.创建virtual-host

按顺序点击,填入“name”和“descrption”,然后点击“add virtual host”按钮:

image-20240609151722019

然后在右上角切换到创建的virtual-host:

image-20240609152323104

3. 添加依赖
        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
        </dependency>
        <!--amqp依赖,包含rabbitmq-->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-amqp</artifactid>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
        </dependency>
4.修改配置文件
logging:
  pattern:
    dateformat: mm-dd hh:mm:ss:sss
spring:
  rabbitmq:
    host: 192.168.56.101 # 你的虚拟机ip
    port: 5672 # 端口
    virtual-host: /mq-demo # 虚拟主机
    username: shijun # 用户名
    password: 123321 # 密码

workqueues模型

image-20240609154853698

在控制台创建一个work.queue队列:

image-20240609155329959

1.编写消息发送测试类
import static org.junit.jupiter.api.assertions.*;
@springboottest
class springamqptest {

    @autowired
    private rabbittemplate rabbittemplate;

    @test
    public void testworkqueue() throws interruptedexception {
        // 队列名称
        string queuename = "work.queue";
        // 消息
        string message = "hello, message_";
        for (int i = 0; i < 50; i++) {
            // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
            rabbittemplate.convertandsend(queuename, message + i);
            thread.sleep(20);
        }
    }
}
2.编写消息接收(监听)类
@component
public class springrabbitlistener {
    
    /**
     * 监听名为"work.queue"的rabbitmq队列,接收并处理来自队列的消息。
     * 通过延迟执行模拟消息处理时间
     * 
     * @param msg 从队列中接收到的消息内容,以字符串形式提供
     * @throws interruptedexception 如果线程在睡眠期间被中断,则抛出此异常
     */
    @rabbitlistener(queues = "work.queue")
    public void listenworkqueue1(string msg) throws interruptedexception {
        // 输出接收到消息的时间,以便跟踪消息处理的时间点
        system.out.println("消费者1接收到消息:【" + msg + "】" + localtime.now());
        // 模拟消息处理时间,让线程睡眠20毫秒
        thread.sleep(20);
    }


    @rabbitlistener(queues = "work.queue")
    public void listenworkqueue2(string msg) throws interruptedexception {
        system.err.println("消费者2........接收到消息:【" + msg + "】" + localtime.now());
        thread.sleep(200);
    }
}

运行后查看结果:

image-20240609163135799
3. 实现能者多劳

修改配置文件:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次运行,查看结果:

image-20240609164615309

交换机

交换机有不同的类型,常见的有以下几种:

  • fanout exchange(扇出交换机):将消息广播到所有绑定的队列,不考虑路由键。
  • direct exchange(直连交换机):根据消息的路由键精确匹配队列。
  • topic exchange(主题交换机):根据路由键的模式匹配队列。
  • headers exchange(头交换机):根据消息的头部信息匹配队列。

fanout交换机

image-20240610112130389

  • 可以有多个队列
  • 每个队列都要绑定到exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息
  1. 创建fanout交换机

    image-20240609170548383
  2. 创建两个队列fanout.queue1fanout.queue2

    image-20240609170655932
  3. 点击刚刚创建的交换机,进入:

    image-20240609170907536
  4. 将刚才创建的两个队列绑定到交换机,

image-20240609171002978
1.消息发送

在springamqptest类中添加测试方法:

@test
public void testfanoutexchange() {
    // 交换机名称
    string exchangename = "demo.fanout";
    // 消息
    string message = "hello, everyone!";
    rabbittemplate.convertandsend(exchangename, "", message);
}
2.消息监听

springrabbitlistener中添加两个方法:

@rabbitlistener(queues = "fanout.queue1")
public void listenfanoutqueue1(string msg) {
    system.out.println("消费者1接收到fanout消息:【" + msg + "】");
}

@rabbitlistener(queues = "fanout.queue2")
public void listenfanoutqueue2(string msg) {
    system.out.println("消费者2接收到fanout消息:【" + msg + "】");
}

运行代码,查看结果:

image-20240609173012677

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • fanoutexchange的会将消息路由到每个绑定的队列

direct交换机

image-20240610112328443

在direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个routingkey(路由key)
  • 消息的发送方在 向 exchange发送消息时,也必须指定消息的 routingkey
  • exchange不再把消息交给每一个绑定的队列,而是根据消息的routing key进行判断,只有队列的routingkey与消息的 routing key完全一致,才会接收到消息
  1. 创建direct.queue1direct.queue2两个队列,之后创建一个direct类型的交换机:

    image-20240609174620595
  2. 绑定队列到交换机,最终结果如图所示:

    image-20240609184743239

1.消息发送

springamqptest类中添加测试方法:

@test
public void testsenddirectexchange1() {
    // 交换机名称
    string exchangename = "demo.direct";
    // 消息
    string message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbittemplate.convertandsend(exchangename, "red", message);
}

@test
public void testsenddirectexchange2() {
    // 交换机名称
    string exchangename = "demo.direct";
    // 消息
    string message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
    // 发送消息
    rabbittemplate.convertandsend(exchangename, "blue", message);
}
2.消息接收

springrabbitlistener中添加方法:

@rabbitlistener(queues = "direct.queue1")
public void listendirectqueue1(string msg) {
    system.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@rabbitlistener(queues = "direct.queue2")
public void listendirectqueue2(string msg) {
    system.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

运行测试类中的testsenddirectexchange1,查看结果:

image-20240609185400537

运行测试类中的testsenddirectexchange2,查看结果:

image-20240609185710182

topic交换机

  • 通配符

    :在绑定键中,可以使用两个特殊字符来实现模式匹配:

    • *:匹配一个单词。
    • #:匹配零个或多个单词。
image-20240610112445760

如图所示,假如此时publisher发送的消息使用的routingkey共有四种:

  • china.news代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

按照之前的流程创建topic交换机和队列并进行绑定,最终结果如下:

1.消息发送

springamqptest类中添加测试方法:

@test
public void testsendtopicexchange1() {
    // 交换机名称
    string exchangename = "demo.topic";
    // 消息
    string message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbittemplate.convertandsend(exchangename, "china.news", message);
}

@test
public void testsendtopicexchange1() {
    // 交换机名称
    string exchangename = "demo.topic";
    // 消息
    string message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbittemplate.convertandsend(exchangename, "china.weather", message);
}
2.消息接收

springrabbitlistener中添加方法:

@rabbitlistener(queues = "topic.queue1")
public void listentopicqueue1(string msg){
    system.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@rabbitlistener(queues = "topic.queue2")
public void listentopicqueue2(string msg){
    system.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

运行测试类中的testsendtopicexchange1后观察结果:

image-20240609204043494

运行测试类中的testsendtopicexchange2后观察结果:

image-20240609204427086

声明队列和交换机

声明队列

队列是rabbitmq中用于存储消息的组件。spring amqp通过queue类来声明队列。队列有以下几个重要属性:

  • name:队列名称。
  • durable:是否持久化。持久化队列在rabbitmq重启后仍然存在,信息持久到磁盘。
  • exclusive:是否排他。排他队列只能被创建它的连接使用,并且在连接断开时自动删除。
  • autodelete:是否自动删除。当最后一个消费者断开连接后,自动删除队列。

比如:

import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class rabbitmqqueueconfig {

    @bean
    public queue myqueue() {
        return new queue("simple.queue");
    }
}
声明交换机

使用exchangebuilder声明交换机,exchangebuilder类提供了多种方法来配置交换机的属性。以下是一些常用的方法:

  • durable():声明持久化交换机。
  • autodelete():声明自动删除交换机。
  • withargument():添加交换机的自定义参数。
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.exchangebuilder;
import org.springframework.amqp.core.fanoutexchange;
import org.springframework.amqp.core.topicexchange;
import org.springframework.amqp.core.headersexchange;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class rabbitmqexchangeconfig {

    @bean
    public directexchange directexchange() {
        return exchangebuilder.directexchange("direct.exchange").durable(true).build();
    }

    @bean
    public fanoutexchange fanoutexchange() {
        return exchangebuilder.fanoutexchange("fanout.exchange").durable(true).build();
    }

    @bean
    public topicexchange topicexchange() {
        return exchangebuilder.topicexchange("topic.exchange").durable(true).build();
    }

}
绑定队列和交换机

bindingbuilder类提供了一些静态方法来创建绑定关系。常用的方法包括:

  • bind():绑定队列到交换机。
  • to():指定交换机。
  • with():指定路由键(用于直连交换机和主题交换机)。
  • where():指定头部信息(用于头交换机)。
1.fanout示例
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.fanoutexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class fanoutconfig {
    /**
     * 声明交换机
     * @return fanout类型交换机
     */
    @bean
    public fanoutexchange fanoutexchange(){
        return new fanoutexchange("demo.fanout");
    }

    /**
     * 第1个队列
     */
    @bean
    public queue fanoutqueue1(){
        return new queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @bean
    public binding bindingqueue1(queue fanoutqueue1, fanoutexchange fanoutexchange){
        return bindingbuilder.bind(fanoutqueue1).to(fanoutexchange);
    }

    /**
     * 第2个队列
     */
    @bean
    public queue fanoutqueue2(){
        return new queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @bean
    public binding bindingqueue2(queue fanoutqueue2, fanoutexchange fanoutexchange){
        return bindingbuilder.bind(fanoutqueue2).to(fanoutexchange);
    }
}
2. direct示例
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class directconfig {

    /**
     * 声明交换机
     * @return direct类型交换机
     */
    @bean
    public directexchange directexchange(){
        return exchangebuilder.directexchange("direct.exchange").build();
    }

    /**
     * 第1个队列
     */
    @bean
    public queue directqueue1(){
        return new queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @bean
    public binding bindingqueue1withred(queue directqueue1, directexchange directexchange){
        return bindingbuilder.bind(directqueue1).to(directexchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @bean
    public binding bindingqueue1withblue(queue directqueue1, directexchange directexchange){
        return bindingbuilder.bind(directqueue1).to(directexchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @bean
    public queue directqueue2(){
        return new queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @bean
    public binding bindingqueue2withred(queue directqueue2, directexchange directexchange){
        return bindingbuilder.bind(directqueue2).to(directexchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @bean
    public binding bindingqueue2withyellow(queue directqueue2, directexchange directexchange){
        return bindingbuilder.bind(directqueue2).to(directexchange).with("yellow");
    }
}
3.基于注解的方式声明队列和交换机

修改springrabbitlistener类:


@component
public class springrabbitlistener {

    // .......
    
    @rabbitlistener(bindings = @queuebinding(
            value = @queue(name = "direct.queue1"),
            exchange = @exchange(name = "demo.direct", type = exchangetypes.direct),
            key = {"red", "blue"}
    ))
    public void listendirectqueue1(string msg){
        system.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
    }

    @rabbitlistener(bindings = @queuebinding(
            value = @queue(name = "direct.queue2"),
            exchange = @exchange(name = "demo.direct", type = exchangetypes.direct),
            key = {"red", "yellow"}
    ))
    public void listendirectqueue2(string msg){
        system.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
    }
    
    @rabbitlistener(bindings = @queuebinding(
            value = @queue(name = "topic.queue1"),
            exchange = @exchange(name = "demo.topic", type = exchangetypes.topic),
            key = "china.#"
    ))
    public void listentopicqueue1(string msg){
        system.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
    }

    @rabbitlistener(bindings = @queuebinding(
            value = @queue(name = "topic.queue2"),
            exchange = @exchange(name = "demo.topic", type = exchangetypes.topic),
            key = "#.news"
    ))
    public void listentopicqueue2(string msg){
        system.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
    }
    
    // .......

}

删除交换机和队列后再次运行会发现又重新出现:

image-20240610105323455
消息转换器
  1. 引入jackson依赖:
        <dependency>
            <groupid>com.fasterxml.jackson.dataformat</groupid>
            <artifactid>jackson-dataformat-xml</artifactid>
        </dependency>
  1. 配置消息转换器

    publisherconsumer两个服务的启动类中添加一个bean即可:

@bean
public messageconverter messageconverter(){
    // 1.定义消息转换器
    jackson2jsonmessageconverter jackson2jsonmessageconverter = new jackson2jsonmessageconverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息
    jackson2jsonmessageconverter.setcreatemessageids(true);
    return jackson2jsonmessageconverter;
}

测试:

fanoutconfig类中声明队列:

    @bean
    public queue objectqueue() {
        return new queue("object.queue");
    }

springamqptest类中添加:

@test
public void testsendmap() throws interruptedexception {
    // 准备消息
    map<string,object> msg = new hashmap<>();
    msg.put("name", "柳岩");
    msg.put("age", 21);
    // 发送消息
    rabbittemplate.convertandsend("object.queue", msg);
}

springrabbitlistener类中添加:

@rabbitlistener(queues = "object.queue")
public void listensimplequeuemessage(map<string, object> msg) throws interruptedexception {
    system.out.println("消费者接收到object.queue消息:【" + msg + "】");
}

运行测试类查看结果:

image-20240610110529216

总结

本文较为详细的记录了rabbitmq的安装配置以及交换机学习,希望本文对大家学习rabbitmq有所帮助。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com