当前位置: 代码网 > 服务器>网络>网络协议 > 【RabbitMQ】交换机详解看这一篇就够了

【RabbitMQ】交换机详解看这一篇就够了

2024年07月28日 网络协议 我要评论
交换机(Exchange)是消息队列中的一个重要概念,用于实现消息的路由和分发。交换机接收发布到它的消息,并根据特定的规则将消息发送到一个或多个队列中。在消息队列系统中,交换机起到了消息的分发中心的作用。它接收生产者发送的消息,并根据预定义的路由规则将消息发送到符合条件的队列中,然后由消费者从队列中获取并处理消息。

请添加图片描述

目录

✨前言

什么是交换机

使用交换机的好处

一、交换机(exchange)的类型

1.直连交换机:direct exchange

2.主题交换机:topic exchange

3.扇形交换机:fanout exchange

4.首部交换机:headers exchange

5.默认交换机:default exchange

6.死信交换机:dead letter exchange

 二、案例演示:交换机的使用

2.1直连交换机(direct exchange)

2.2主题交换机(topic exchange)

2.3扇形交换机(fanout exchange)


✨前言

什么是交换机

在 rabbitmq 消息队列系统中,交换机(exchange)是用于接收生产者发送的消息并将其路由到相应的队列的组件。它是消息路由的核心组件之一。使用交换机将生产者的消息分发到队列中可以处理更为复杂的代码或者说可以更精准的发送到队列中。

有了交换机我们的消息不是直接发给队列的而是发送给交换机再通过特殊的条件找到符合条件的队列再由队列发送给消费者,但这中间还有些概念需要我们了解一下分别是路由键和绑定键

使用交换机的好处

刚刚提到使用交换机将消息发送到队列中比直接发送到队列中好,那么具体好在那里我这里举个例

  1. 路由控制:通过交换机,可以根据消息的路由键将消息路由到与之匹配的队列。这样,可以根据消息的属性或标签来定向分发消息,实现精确的消息路由控制。

  2. 消息过滤:交换机可以根据消息的路由键、消息头部属性等信息对消息进行过滤和筛选,将符合特定条件的消息发送到相应的队列。这样可以实现消息的订阅和过滤机制,灵活地处理不同类型的消息。

  3. 广播和多播:通过使用扇形交换机(fanout exchange),可以将消息广播到所有与之绑定的队列,实现消息的广播和多播机制,方便实现发布-订阅模式。

  4. 解耦和灵活性:通过将消息发送到交换机而不是直接发送到队列,生产者和消费者之间实现了解耦。生产者只需要将消息发送到指定的交换机,而不需要知道具体的队列。这样,可以灵活地增加、删除或修改队列,而不会对生产者产生影响。

  5. 可扩展性:使用交换机可以实现消息的分发和负载均衡机制。通过将消息发送到多个队列,可以实现横向扩展和并发处理,提高系统的吞吐量和性能。

如果你还不是很理解的话不妨看看这个案例:

假设我们正在构建一个电子商务网站,该网站有一个订单处理系统。在用户提交订单后,需要执行一系列异步任务,例如生成发货单、发送通知邮件、更新库存等。这些任务可能会消耗较长的时间,并且需要并行处理。

 

一、交换机(exchange)的类型

了解完交换机的概念,我们来认识一下交换机的类型

1.直连交换机:direct exchange

直连交换机是最简单的交换机类型,它将消息的路由键与绑定键进行精确匹配,当我们的路由键和绑定键一致的时候,将消息发送到与之完全匹配的队列。

 

 注意:直连交换机只能根据绑定键进行路由,无法实现更复杂的路由逻辑。这意味着在需要进行高级路由或消息过滤的情况下,直连交换机可能无法满足需求。如果我们需要一个消息发送到多个队列中需要在交换机上绑定多个路由键,也是非常的麻烦


2.主题交换机:topic exchange

主题交换机基于模式匹配的方式将消息路由到队列。它使用通配符来进行匹配,支持通配符符号 "*" 和 "#"。其中 "*" 表示匹配一个单词,"#" 表示匹配一个或多个单词。


3.扇形交换机:fanout exchange

扇形交换机将消息广播到所有与之绑定的队列。无论消息的路由键是什么,扇形交换机都会将消息发送到所有绑定的队列中。这种类型的交换机常用于实现发布-订阅模式,将消息广播给多个消费者。


4.首部交换机:headers exchange

首部交换机和扇形交换机一样都不要路由键,首交换机根据消息headers的属性进行匹配和路由。在消息发送时,可以指定一组键值对作为消息的头部属性,交换机会根据这些属性进行匹配。首部交换机提供了更灵活的匹配方式,但相对复杂度较高,通常使用较少。


5.默认交换机:default exchange

默认交换机是一个预定义的无名交换机,它会自动将消息发送到与之路由键名称相同的队列中。当生产者没有显式地指定交换机时,消息会被发送到默认交换机中。


6.死信交换机:dead letter exchange

死信交换机用于处理无法被消费者正确处理的消息。当消息在队列中变成死信(例如超过重试次数或队列已满),它将被发送到死信交换机,并根据死信交换机的绑定规则路由到指定的死信队列中进行进一步处理。

案例讲解 

生产者生产一条1分钟后超时的订单消息到正常交换机exchange-a中,消息匹配到队列queue-a,但一分钟后仍未消费。 消息会被投递到死信交换机dlxy-exchange中,并发送到死信队列中, 死信队列dlx-queue的消费者拿到消息后,根据消息去查询订单的状态,如果仍然是未支付状态,将订单状态更新为超时状态。

 

 二、案例演示:交换机的使用

编写代码之前先了解一下我们交换机的参数属性

 name交换机名称

type:交换机类型,direct、 topic、fanout、 headers

durability:是杏需要持久化,如果持久性,则rabbitmq重启后,交换机还存在

auto delete:当最后一个绑定到exchange上的队列删除后,自动删除该exchangeinternal:

internal:当前exchange是否用于rabbitmq内部使用,默认为falsearguments:

arguments:扩展参数,用于扩展amqp协议定制化使用

 

2.1直连交换机(direct exchange)

🧑‍🍳生产者

package org.example.produce.config;

import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
@suppresswarnings("all")
public class rabbitconfig {

    /**
     * 定义队列
     * @return
     */
    @bean
    public queue directqueue(){
        return new queue("direct-queue");
    }

    /**
     * 自定义直连交换机
     * @return
     */
    @bean
    public directexchange directexchange(){
        return new directexchange("direct-exchange",true,false);
    }

    /**
     * 将队列与交换机进行绑定,并设置路由键
     * @return
     */
    @bean
    public binding binding(){
        return bindingbuilder.bind(directqueue())
                .to(directexchange())
                .with("direct_routing_key");
    }

}

🧑‍💼消费者

package org.example.produce.controller;
 
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
 
import java.util.map;
 
@component
@rabbitlistener(queues = {"direct-queue"})
public class directreceiver {
 
    @rabbithandler
    public void handler(map<string,object> json){
        system.out.println(json);
    }
}

👇👇controller

package org.example.produce.controller;

import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

import java.util.hashmap;
import java.util.map;

@restcontroller
public class sender {
    @autowired
    private amqptemplate rabbittemplate;


    @requestmapping("/senddata")
    public string senddata() {
        map<string,object> data=new hashmap<>();
        data.put("msg","hello 我是直连交换机");
        rabbittemplate.convertandsend("direct-exchange","direct_routing_key", data);
        return "😎";
    }
}

🌷🌷效果展示:

 

 

2.2主题交换机(topic exchange)

🧑‍🍳生产者

package org.example.produce.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
@suppresswarnings("all")
public class rabbitconfig {
  

  //  定义路由规则
    public static string a_key = "*.orange.*" ;
    public static string b_key = "*.* rabbit";
    public static string c_key = "lazy.#";


    /**
     * 定义队列
     * @return
     */
    @bean
    public queue topicqueue(){
        return new queue("topic-queue",true);
    }


    /**
     * 定义主题交换机
     * @return
     */
    @bean
    public topicexchange topicexchange(){
        return new topicexchange("topic-exchange",true,false);
    }

    /**
     * 将队列与交换机进行绑定,并设置路由键
     * @return
     */
    @bean
    public binding bindinga(){
        return bindingbuilder.bind(topicqueue())
                .to(topicexchange())
                .with(a_key);
    }




}

🧑‍💼消费者

package org.example.produce.controller;
 
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
 
import java.util.map;

@component
@rabbitlistener(queues = {"topic-queue"})
public class topicreceiver {

    @rabbithandler
    public void handler(map<string,object> json){
        system.out.println(json);
    }

}

👇👇controller

package org.example.produce.controller;

import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

import java.util.hashmap;
import java.util.map;

@restcontroller
public class sender {
    @autowired
    private amqptemplate rabbittemplate;


    @requestmapping("/sendtopic")
    public string sendtopic() {
        map<string,object> data=new hashmap<>();
        data.put("msg","你好!! 我是主题交换机");
        rabbittemplate.convertandsend("topic-exchange","aa.orange.bb", data);
        return "😎";
    }
}

🌷🌷效果展示:

 

 

2.3扇形交换机(fanout exchange)

🧑‍🍳生产者

package org.example.produce.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
@suppresswarnings("all")
public class rabbitconfig {
  

  @bean
    public queue queuex(){
        return new queue("queue-x");
    }
 
    @bean
    public queue queuey(){
        return new queue("queue-y");
    }
 
    @bean
    public queue queuez(){
        return new queue("queue-z");
    }
 
    /**
     * 定义扇形交换机,与路由键无关
     * @return
     */
    @bean
    public fanoutexchange fanoutexchange(){
        return new fanoutexchange("fanout-exchange",true,false);
    }
 
    @bean
    public binding bindingx(){
        return bindingbuilder.bind(queuex())
                .to(fanoutexchange());
    }
 
    @bean
    public binding bindingy(){
        return bindingbuilder.bind(queuey())
                .to(fanoutexchange());
    }
 
    @bean
    public binding bindingz(){
        return bindingbuilder.bind(queuez())
                .to(fanoutexchange());
    }




}

🧑‍💼消费者

package org.example.produce.controller;
 
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
 
import java.util.map;
 
@component
public class fanoutreceiver {
 
    @rabbitlistener(queues = {"queue-x"})
    @rabbithandler
    public void handlery(map<string,object> json){
        system.out.println("已接受到队列queue-x传递过来的消息:"+json);
    }
 
    @rabbitlistener(queues = {"queue-y"})
    @rabbithandler
    public void handlerx(map<string,object> json){
        system.out.println("已接受到队列queue-y传递过来的消息:"+json);
    }
 
    @rabbitlistener(queues = {"queue-z"})
    @rabbithandler
    public void handlerz(map<string,object> json){
        system.out.println("已接受到队列queue-z传递过来的消息:"+json);
    }
}

👇👇controller

package org.example.produce.controller;

import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

import java.util.hashmap;
import java.util.map;

@restcontroller
public class sender {
    @autowired
    private amqptemplate rabbittemplate;


    @requestmapping("/sendfanout")
    public string sendfanout() {
        map<string,object> data=new hashmap<>();
        data.put("msg","我是扇形交换机,广播通知");
        rabbittemplate.convertandsend("fanout-exchange",null, data);
        return "😎";
    }
}

🌷🌷效果展示:

 

请添加图片描述

到这里我的分享就结束了,欢迎到评论区探讨交流!!

💖如果觉得有用的话还请点个赞吧 💖

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com