目录
前言
在讲交换机之前我们需要了解一些概念,在rabbitmq工作流程有一项叫exchange(交换机:消息的分发中心),它的作用是将生产者发送的消息转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。
exchange(交换机)的类型与应用
- 交换机的属性
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
- type:交换机名称
- durability:是否持久化。如果持久性,则rabbitmq重启后,交换机还存在
- auto-delete:当所有与之绑定的消息队列都完成了对此交换机的使用后,是否删掉它
- internal:当前exchange是否用于rabbitmq内部使用,默认fasle
- arguments:扩展参数
1. 直连交换机:direct exchange
- 这是最简单的一种交换机类型。
- 当一个队列与交换机绑定时,需要指定一个路由键(routingkey),只有当消息的路由键与该队列绑定时指定的绑定键(bindingkey)完全匹配时,消息才会被路由到该队列。
- 生产者
package com.ycxw.publisher.demos;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
//定义队列
@configuration
@suppresswarnings("all")
public class rabbitconfig {
/**
* 定义队列 q1
* @return
*/
@bean
public queue directq1() {
return new queue("direct-q1");
}
/**
* 定义队列 q2
* @return
*/
@bean
public queue directq2() {
return new queue("direct-q2");
}
/**
* 自定义直连交换机
* @return
*/
@bean
public directexchange directexchange() {
return new directexchange("direct-exchange", true, false);
}
/**
* 将队列 q1与交换机进行绑定,并设置路由键
* @return
*/
@bean
public binding bindingq1() {
return bindingbuilder.bind(directq1())
.to(directexchange())
.with("direct_orange");
}
/**
* 将队列 q2与交换机进行绑定,并设置路由键
* @return
*/
@bean
public binding bindingq2() {
return bindingbuilder.bind(directq2())
.to(directexchange())
.with("direct_black");
}
}
package com.ycxw.publisher.demos;
import com.fasterxml.jackson.core.jsonprocessingexception;
import com.fasterxml.jackson.databind.objectmapper;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
/**
* 模拟发送请求
*/
@restcontroller
public class sender {
@autowired
private amqptemplate rabbittemplate;
@requestmapping("/send1")
public string sendfirst() {
/*向消息队列发送消息 converandsend(交换机,路由键,发送的信息)*/
rabbittemplate.convertandsend("direct-exchange", "direct_orange", "我是q1");
return "🫶";
}
@requestmapping("/send2")
public string sendsecond() throws jsonprocessingexception {
rabbittemplate.convertandsend("direct-exchange", "direct_black", "我是q2");
return "🫶";
}
}
- 消费者接受信息
package com.ycxw.consumer.demos;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
@rabbitlistener(queues = {"direct-q1"})
public class directreceiver {
@rabbithandler
public void handler(string msg) {
system.out.println(msg);
}
}
package com.ycxw.consumer.demos;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
@rabbitlistener(queues = {"direct-q2"})
public class directreceiver2 {
@rabbithandler
public void handler(string msg) {
system.out.println(msg);
}
}
- 测试
q1队列:
q2队列:
2. 主题交换机:topic exchange
主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用 . 分开,其中
- * 表示一个单词
- # 表示任意数量(零个或多个)单词。
主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
- 生产者
/**
* 定义路由键规则
*/
public static string a_key ="*.orange.*";
public static string b_key ="*.*.black";
public static string c_key ="pink.#";
/**
* 定义队列 q1
* @return
*/
@bean
public queue topicq1() {
return new queue("topic-q1");
}
/**
* 定义队列 q2
* @return
*/
@bean
public queue topicq2() {
return new queue("topic-q2");
}
/**
* 自定义主题交换机
* @return
*/
@bean
public topicexchange topicexchange() {
return new topicexchange("topic-exchange", true, false);
}
@bean
public binding topic_q1(){
return bindingbuilder.bind(topicq1())
.to(topicexchange())
.with(a_key);
}
@bean
public binding topic_q2(){
return bindingbuilder.bind(topicq2())
.to(topicexchange())
.with(b_key);
}
@bean
public binding topicq2(){
return bindingbuilder.bind(topicq2())
.to(topicexchange())
.with(c_key);
}
/**
* 同时绑定q1、q2
*/
@bean
public binding topicq1(){
return bindingbuilder.bind(topicq1())
.to(topicexchange())
.with(c_key);
}
- 消费者接受信息
package com.ycxw.consumer.demos;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.util.map;
@component
public class topicreceiver {
@rabbitlistener(queues = {"topic-q1"})
@rabbithandler
public void handler(string msg){
system.out.println("已接受到队列topic-q1传递过来的消息:"+msg);
}
@rabbitlistener(queues = {"topic-q2"})
@rabbithandler
public void handlerb(string msg) {
system.out.println("已接受到队列topic-q2传递过来的消息:" + msg);
}
}
- 测试
q1队列:
q2队列:
同时调用两个队列:
3. 扇形交换机:fanout exchange
扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
这个交换机没有路由键概念,就算你绑了路由键也是无视的。
- 生产者
/**
* 定义队列 q1
* @return
*/
@bean
public queue fanoutq1() {
return new queue("fanout-q1");
}
/**
* 定义队列 q2
* @return
*/
@bean
public queue fanoutq2() {
return new queue("fanout-q2");
}
/**
* 定义扇形交换机
* @return
*/
@bean
public fanoutexchange fanoutexchange(){
return new fanoutexchange("fanout-exchange",true,false);
}
/**
* 绑定队列 (没有路由键)
* @return
*/
@bean
public binding fanout_q1(){
return bindingbuilder.bind(fanoutq1())
.to(fanoutexchange());
}
@bean
public binding fanout_q2(){
return bindingbuilder.bind(fanoutq2())
.to(fanoutexchange());
}
- 模拟发送信息
@requestmapping("/send4")
public string sendfour() throws jsonprocessingexception {
//必须填写路由键这项,否则接收不到信息。
//由于扇形交换机没有路由键,所以这向需要填空,不然会将发送的信息(hello)解析为路由键
rabbittemplate.convertandsend("fanout-exchange","","hello");
return "🫶";
}
- 消费者
package com.ycxw.consumer.demos;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
public class fanoutreceiver {
@rabbitlistener(queues = {"fanout-q1"})
@rabbithandler
public void handler(string msg) {
system.out.println("已接受到队列fanout-q1传递过来的消息:" + msg);
}
@rabbitlistener(queues = {"fanout-q2"})
@rabbithandler
public void handlerb(string msg) {
system.out.println("已接受到队列fanout-q2传递过来的消息:" + msg);
}
}
- 测试
4. 默认交换机(直连)
实际上是一个由rabbitmq预先声明好的名字为空字符串的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
发表评论