当前位置: 代码网 > it编程>编程语言>Java > SpringBoot集成MQ的过程(四种交换机的实例)

SpringBoot集成MQ的过程(四种交换机的实例)

2025年03月12日 Java 我要评论
​rabbitmq交换机(exchange)的核心作用在rabbitmq中,​交换机是消息路由的核心组件,负责接收生产者发送的消息,并根据规则(如路由键、头信息等)将消息分发到对应的队列中。不同交换机

​rabbitmq交换机(exchange)的核心作用

在rabbitmq中,​交换机 是消息路由的核心组件,负责接收生产者发送的消息,并根据规则(如路由键、头信息等)将消息分发到对应的队列中。
不同交换机类型决定了消息的路由逻辑,使用不同的交换机在不同的场景下可以提高消息系统的高可用性。

1. 直连交换机(direct exchange)​

路由机制

  • 精确匹配路由键(routing key)​:消息会被发送到与 routing key ​完全匹配 的队列。
  • 典型场景:一对一或一对多的精确消息分发。

应用场景

  • 任务分发:如订单处理系统,根据订单类型(如 order.paymentorder.shipping)分发到不同队列。
  • 日志分类:将不同级别的日志(log.errorlog.info)路由到对应的处理服务。

 使用直连交换机实现消息发送和接收

1.创建一个springboot项目,在yml文件配置如下:

server:
  port: 8021
spring: 
  application:
    name: rabbitmq-provider
  #配置rabbitmq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

 2.初始化队列和交换机,并进行绑定

package com.atguigu.demomq;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
 * 功能:
 * 作者:程序员zxy
 * 日期:2025/3/8 下午1:55
 */
@configuration
public class directrabbitconfig {
    @bean
    public  queue testdirectqueue(){
        return new queue("testdirectqueue",true);
    }
    @bean
    directexchange testdirectexchange(){
        return new directexchange("testdirectexchange",true,false);
    }
    @bean
    binding bindingdirect(){
        return bindingbuilder.bind(testdirectqueue())
                .to(testdirectexchange())
                .with("testdirectrouting");
    }
} 

 3.实现senddirectmessage发送消息请求,由生产者发送到mq,testdirectrouting作为key,用于精确转发。

package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.restcontroller;
import java.time.localdatetime;
import java.time.format.datetimeformatter;
import java.util.hashmap;
import java.util.map;
import java.util.uuid;
/**
 * 功能:
 * 作者:程序员zxy
 * 日期:2025/3/8 下午2:12
 */
@restcontroller
public class sendmessagecontroller {
    @autowired
    private rabbittemplate rabbittemplate;
    @getmapping("/senddirectmessage")
    public string senddirectmessage() {
        string messageid = string.valueof(uuid.randomuuid());
        string messagedata = "hello mq!";
        string createtime = localdatetime.now().format(datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"));
        map<string,object> map=new hashmap<>();
        map.put("messageid",messageid);
        map.put("messagedata",messagedata);
        map.put("createtime",createtime);
        //将消息携带绑定键值:testdirectrouting 发送到交换机testdirectexchange
        rabbittemplate.convertandsend("testdirectexchange", "testdirectrouting", map);
        return "ok";
    }
}

4.此时就可以启动项目发送消息了,使用postman发送消息,返回ok说明发送成功

5.进入http://localhost:15672/,可以看到消息发送成功,我这里是请求了两次(也就是发了两条消息)。

6.接下来写消费者的消费过程,新创建一个springboot项目,在yml文件配置如下

server:
  port: 8022
spring:
  application:
    name: rabbitmq-provider
  #配置rabbitmq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

7.消费者配置类,同样testdirectrouting用于唯一识别key

package com.atguigu.demomq2;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
 * 功能:
 * 作者:程序员zxy
 * 日期:2025/3/8 下午 
 */
@configuration
public class directrabbitconfig {
    @bean
    public queue testdirectqueue() {
        return new queue("testdirectqueue",true);
    }
    @bean
    directexchange testdirectexchange() {
        return new directexchange("testdirectexchange");
    }
    @bean
    binding bindingdirect() {
        return bindingbuilder.bind(testdirectqueue()).to(testdirectexchange()).with("testdirectrouting");
    }
}

8.消费者 接收消息@rabbitlistener(queues = "testdirectqueue")用于监听指定队列发送的消息

package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.util.map;
@component
@rabbitlistener(queues = "testdirectqueue")
public class directreceiver {
    @rabbithandler
    public void process(map testmessage) {
        system.out.println("directreceiver消费者收到消息  : " + testmessage.tostring());
    }
}

 9.启动消费者,成功接收消息

10.查看mq控制台,消息成功被消费 

2. 扇出交换机(fanout exchange)​ ​

路由机制(一个交换机转发到多个队列)

  • 广播模式:忽略 routing key,将消息发送到所有绑定的队列
  • 典型场景:消息的全局通知或并行处理。

​应用场景

  • 实时通知系统:如用户注册成功后,同时发送邮件、短信、更新缓存。
  • 日志广播:多个服务订阅同一日志源,各自独立处理。

 使用扇出交换机实现消息发送和接收

1.扇出交换机配置

package com.atguigu.demomq;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.fanoutexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class fanoutexchangeconfig {
    // 定义扇出交换机
    @bean
    public fanoutexchange fanoutexchange() {
        return new fanoutexchange("fanout.user.register", true, false);
    }
    // 定义邮件队列
    @bean
    public queue emailqueue() {
        return new queue("fanout.user.email", true);
    }
    // 定义短信队列
    @bean
    public queue smsqueue() {
        return new queue("fanout.user.sms", true);
    }
    // 绑定所有队列到扇出交换机(无需路由键)
    @bean
    public binding emailbinding() {
        return bindingbuilder.bind(emailqueue()).to(fanoutexchange());
    }
    @bean
    public binding smsbinding() {
        return bindingbuilder.bind(smsqueue()).to(fanoutexchange());
    }
}

2.生产者

package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.restcontroller;
@restcontroller
public class fanoutuserservice {
    @autowired
    private rabbittemplate rabbittemplate;
    @getmapping("/sendfanoutmessage")
    public string sendregisterbroadcast() {
        rabbittemplate.convertandsend(
            "fanout.user.register", 
            "", // 扇出交换机忽略路由键
            "message mq"
        );
        return "ok fan";
    }
}

3.消费者

package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
public class fanoutnotificationconsumer {
    @rabbitlistener(queues = "fanout.user.email")
    public void handleemail(string message) {
        system.out.println("[email] received: " + message);
    }
    @rabbitlistener(queues = "fanout.user.sms")
    public void handlesms(string message) {
        system.out.println("[sms] received: " + message);
    }
}

4.请求并查看消费结果 

可以看到一个交换机完成消费两条消息 

​3. 主题交换机(topic exchange)​

  • 路由机制模式匹配路由键:使用 *(匹配一个单词)和 #(匹配多个单词)通配符。​
  • 典型场景:灵活的多条件消息路由。 ​

应用场景

  • 新闻订阅系统:用户订阅特定主题(如 news.sports.*news.tech.#)。​
  • 设备状态监控:根据设备类型和区域路由消息(如 sensor.temperature.room1)。

1.配置主题交换机

package com.atguigu.demomq;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.core.topicexchange;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class topicexchangeconfig {
    // 定义主题交换机
    @bean
    public topicexchange topicexchange() {
        return new topicexchange("topic.news", true, false);
    }
    // 定义体育新闻队列
    @bean
    public queue sportsqueue() {
        return new queue("topic.news.sports", true);
    }
    // 定义科技新闻队列
    @bean
    public queue techqueue() {
        return new queue("topic.news.tech", true);
    }
    // 绑定体育队列:匹配 news.sports.*
    @bean
    public binding sportsbinding() {
        return bindingbuilder.bind(sportsqueue())
                .to(topicexchange())
                .with("news.sports.*");
    }
    // 绑定科技队列:匹配 news.tech.#
    @bean
    public binding techbinding() {
        return bindingbuilder.bind(techqueue())
                .to(topicexchange())
                .with("news.tech.#");
    }
}

2.生产者

package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.restcontroller;
@restcontroller
public class topicnewsservice {
    @autowired
    private rabbittemplate rabbittemplate;
    @getmapping("/sendtopicmessage1")
    public string  sendsportsnews() {
        rabbittemplate.convertandsend(
            "topic.news", 
            "news.sports.football",
                "* message:news.sports.football"
        );
        return "*ok";
    }
    @getmapping("/sendtopicmessage2")
    public string sendtechnews() {
        rabbittemplate.convertandsend(
            "topic.news", 
            "news.tech.ai.abc.123456",
            "# message:news.tech.ai.abc.123456"
        );
        return "#ok";
    }
}

3. 消费者

package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
public class topicnewsconsumer {
    @rabbitlistener(queues = "topic.news.sports")
    public void handlesports(string message) {
        system.out.println("[sports] received: " + message);
    }
    @rabbitlistener(queues = "topic.news.tech")
    public void handletech(string message) {
        system.out.println("[tech] received: " + message);
    }
}

4.发送请求

 可以看到消息成功消费,第一个为*通配符,第二个为#通配符

​4. 头交换机(headers exchange)​

路由机制( 我的理解是一种基于 ​多条件组合 的消息路由机制

  • 基于消息头(headers)匹配:忽略 routing key,通过键值对(headers)匹配队列绑定的条件。
  • 匹配规则x-match 参数设为 all(需全部匹配)或 any(匹配任意一个)。

应用场景

  • 复杂路由逻辑:如根据消息的版本号、语言等元数据路由。​
  • 多维度过滤:如同时匹配用户类型(user_type: vip)和地理位置(region: asia)。

1.头交换机配置

package com.atguigu.demomq;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.headersexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.util.hashmap;
import java.util.map;
@configuration
public class headersexchangeconfig {
    // 定义头交换机
    @bean
    public headersexchange headersexchange() {
        return new headersexchange("headers.user", true, false);
    }
    // 定义vip用户队列
    @bean
    public queue vipqueue() {
        return new queue("headers.user.vip", true);
    }
    // 绑定vip队列,要求同时匹配 usertype=vip 和 region=asia
    @bean
    public binding vipbinding() {
        map<string, object> headers = new hashmap<>();
        headers.put("usertype", "vip");
        headers.put("region", "asia");
        return bindingbuilder.bind(vipqueue())
                .to(headersexchange())
                .whereall(headers).match(); // whereall 表示需全部匹配
    }
}

2.生产者

package com.atguigu.demomq;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messageproperties;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.restcontroller;
@restcontroller
public class headeruservipservice {
    @autowired
    private rabbittemplate rabbittemplate;
    @getmapping("/sendheadermessage")
    public string  sendvipmessage() {
        messageproperties props = new messageproperties();
        props.setheader("usertype", "vip");
        props.setheader("region", "asia");
        message msg = new message("headermessage".getbytes(), props);
        rabbittemplate.send("headers.user", "", msg);
        return "ok";
    }
}

3.消费者

package com.atguigu.demomq2;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
public class headeruservipconsumer {
    @rabbitlistener(queues = "headers.user.vip")
    public void handlevip(message message) {
        string body = new string(message.getbody());
        system.out.println("[vip] received: " + body);
    }
}

4.postman测试 

这里仅消费交换机初始化时满足所有设定条件的消息,我们可以测试一下不满足条件时发送消息

消费者不消费消息 

总结 

需要代码自己进行测试的 可以git自取

git clone https://gitee.com/myselfzxy/mq-producer.git

git clone https://gitee.com/myselfzxy/mq-customer.git

到此这篇关于springboot集成mq,四种交换机的实例的文章就介绍到这了,更多相关springboot集成mq内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com