当前位置: 代码网 > 科技>操作系统>Windows > RabbitMQ之交换机

RabbitMQ之交换机

2024年07月31日 Windows 我要评论
在RabbitMQ工作流程有一项叫在中生产者发送的信息不会直接投递到队列中,而是先将消息投递到交换机中,在由交换机路由到一个或多个队列中。

目录

前言

exchange(交换机)的类型与应用

        - 交换机的属性

1. 直连交换机:direct exchange

2. 主题交换机:topic exchange

3. 扇形交换机:fanout exchange

4. 默认交换机(直连)


前言

        在讲交换机之前我们需要了解一些概念,在rabbitmq工作流程有一项叫exchange(交换机:消息的分发中心),它的作用是将生产者发送的消息转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。

exchange(交换机)的类型与应用

- 交换机的属性

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

  • type:交换机名称
  • durability:是否持久化。如果持久性,则rabbitmq重启后,交换机还存在
  • auto-delete:当所有与之绑定的消息队列都完成了对此交换机的使用后,是否删掉它
  • internal:当前exchange是否用于rabbitmq内部使用,默认fasle
  • arguments:扩展参数

1. 直连交换机:direct exchange

  • 这是最简单的一种交换机类型。
  • 当一个队列与交换机绑定时,需要指定一个路由键(routingkey),只有当消息的路由键与该队列绑定时指定的绑定键(bindingkey)完全匹配时,消息才会被路由到该队列。

  • 生产者
package com.ycxw.publisher.demos;

import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

//定义队列
@configuration
@suppresswarnings("all")
public class rabbitconfig {

    /**
     * 定义队列 q1
     * @return
     */
    @bean
    public queue directq1() {
        return new queue("direct-q1");
    }

    /**
     * 定义队列 q2
     * @return
     */
    @bean
    public queue directq2() {
        return new queue("direct-q2");
    }

    /**
     * 自定义直连交换机
     * @return
     */
    @bean
    public directexchange directexchange() {
        return new directexchange("direct-exchange", true, false);
    }

    /**
     * 将队列 q1与交换机进行绑定,并设置路由键
     * @return
     */
    @bean
    public binding bindingq1() {
        return bindingbuilder.bind(directq1())
                .to(directexchange())
                .with("direct_orange");
    }

    /**
     * 将队列 q2与交换机进行绑定,并设置路由键
     * @return
     */
    @bean
    public binding bindingq2() {
        return bindingbuilder.bind(directq2())
                .to(directexchange())
                .with("direct_black");
    }

}
package com.ycxw.publisher.demos;

import com.fasterxml.jackson.core.jsonprocessingexception;
import com.fasterxml.jackson.databind.objectmapper;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

/**
 * 模拟发送请求
 */
@restcontroller
public class sender {
    @autowired
    private amqptemplate rabbittemplate;

    @requestmapping("/send1")
    public string sendfirst() {
        /*向消息队列发送消息 converandsend(交换机,路由键,发送的信息)*/
        rabbittemplate.convertandsend("direct-exchange", "direct_orange", "我是q1");
        return "🫶";
    }

    @requestmapping("/send2")
    public string sendsecond() throws jsonprocessingexception {
        rabbittemplate.convertandsend("direct-exchange", "direct_black", "我是q2");
        return "🫶";
    }
}
  •  消费者接受信息
package com.ycxw.consumer.demos;

import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;

@component
@rabbitlistener(queues = {"direct-q1"})
public class directreceiver {

    @rabbithandler
    public void handler(string msg) {
        system.out.println(msg);
    }
}
package com.ycxw.consumer.demos;

import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;

@component
@rabbitlistener(queues = {"direct-q2"})
public class directreceiver2 {

    @rabbithandler
    public void handler(string msg) {
        system.out.println(msg);
    }
}

  • 测试 

q1队列:

q2队列:

2. 主题交换机:topic exchange

     主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用 分开,其中

  • * 表示一个单词 
  • # 表示任意数量(零个或多个)单词。

主题交换机会根据这个规则将数据发送到对应的(多个)队列上。 

  • 生产者
    /**
     * 定义路由键规则
     */
    public static string a_key ="*.orange.*";
    public static string b_key ="*.*.black";
    public static string c_key ="pink.#";

    /**
     * 定义队列 q1
     * @return
     */
    @bean
    public queue topicq1() {
        return new queue("topic-q1");
    }

    /**
     * 定义队列 q2
     * @return
     */
    @bean
    public queue topicq2() {
        return new queue("topic-q2");
    }
    /**
     * 自定义主题交换机
     * @return
     */
    @bean
    public topicexchange topicexchange() {
        return new topicexchange("topic-exchange", true, false);
    }

    @bean
    public binding topic_q1(){
        return bindingbuilder.bind(topicq1())
                .to(topicexchange())
                .with(a_key);
    }

    @bean
    public binding topic_q2(){
        return bindingbuilder.bind(topicq2())
                .to(topicexchange())
                .with(b_key);
    }

    @bean
    public binding topicq2(){
        return bindingbuilder.bind(topicq2())
                .to(topicexchange())
                .with(c_key);
    }

    /**
     * 同时绑定q1、q2
     */
    @bean
    public binding topicq1(){
        return bindingbuilder.bind(topicq1())
                .to(topicexchange())
                .with(c_key);
    }
  • 消费者接受信息
package com.ycxw.consumer.demos;
 
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
 
import java.util.map;


@component
public class topicreceiver {

    @rabbitlistener(queues = {"topic-q1"})
    @rabbithandler
    public void handler(string msg){
        system.out.println("已接受到队列topic-q1传递过来的消息:"+msg);
    }

    @rabbitlistener(queues = {"topic-q2"})
    @rabbithandler
    public void handlerb(string msg) {
        system.out.println("已接受到队列topic-q2传递过来的消息:" + msg);
    }
}
  • 测试

 q1队列:

q2队列:

同时调用两个队列:

  

3. 扇形交换机:fanout exchange

        扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。 

这个交换机没有路由键概念,就算你绑了路由键也是无视的。 

  • 生产者
    /**
     * 定义队列 q1
     * @return
     */
    @bean
    public queue fanoutq1() {
        return new queue("fanout-q1");
    }

    /**
     * 定义队列 q2
     * @return
     */
    @bean
    public queue fanoutq2() {
        return new queue("fanout-q2");
    }

    /**
     * 定义扇形交换机
     * @return
     */
    @bean
    public fanoutexchange fanoutexchange(){
        return new fanoutexchange("fanout-exchange",true,false);
    }

    /**
     * 绑定队列 (没有路由键)
     * @return
     */
    @bean
    public binding fanout_q1(){
        return bindingbuilder.bind(fanoutq1())
                .to(fanoutexchange());
    }

    @bean
    public binding fanout_q2(){
        return bindingbuilder.bind(fanoutq2())
                .to(fanoutexchange());
    }
  • 模拟发送信息
    @requestmapping("/send4")
    public string sendfour() throws jsonprocessingexception {
        //必须填写路由键这项,否则接收不到信息。
        //由于扇形交换机没有路由键,所以这向需要填空,不然会将发送的信息(hello)解析为路由键
        rabbittemplate.convertandsend("fanout-exchange","","hello");
        return "🫶";
    }
  • 消费者
package com.ycxw.consumer.demos;

import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;


@component
public class fanoutreceiver {

    @rabbitlistener(queues = {"fanout-q1"})
    @rabbithandler
    public void handler(string msg) {
        system.out.println("已接受到队列fanout-q1传递过来的消息:" + msg);
    }

    @rabbitlistener(queues = {"fanout-q2"})
    @rabbithandler
    public void handlerb(string msg) {
        system.out.println("已接受到队列fanout-q2传递过来的消息:" + msg);
    }
}
  • 测试

4. 默认交换机(直连)

        实际上是一个由rabbitmq预先声明好的名字为空字符串的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com