引入rabbitmq依赖
<!-- springboot集成rabbitmq --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency>
增加rabbitmq配置
#rabbitmq配置 spring: rabbitmq: host: ip地址 port: 5672 username: 账号 password: 密码 virtual-host: /
配置rabbitmq交换机以及队列
package com.ckm.ball.config; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.fanoutexchange; import org.springframework.amqp.core.queue; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; //rabbitmq绑定交换机 / 队列 @configuration public class rabbitmqconfig { //========================================================rabbitmq queue========================================================// //创建fanout模式交换机 @bean public fanoutexchange fanoutexchangeprocess() { return new fanoutexchange("process-data-change-exchange", true, false); } //创建队列 @bean public queue processdatachangequeue() { return new queue("process-data-change-queue", true); } //将队列绑定到交换机 @bean public binding chatbindexchange() { return bindingbuilder.bind(processdatachangequeue()).to(fanoutexchangeprocess()); } }
编写接口,模拟生产消息
@resource private rabbittemplate rabbittemplate; @getmapping("/producemessage") @apioperation(value = "生产消息", tags = "测试接口") public void updatetokentime() { //生产消息,会到交换机,交换机下发给队列,队列监听到就会消费,执行业务逻辑 rabbittemplate.convertandsend("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh"); }
编写消息监听类,模拟消费消息
package com.ckm.ball.config; import lombok.extern.slf4j.slf4j; import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.messaging.handler.annotation.payload; import org.springframework.stereotype.component; import java.util.date; @slf4j @component public class rabbitmqdatasynclistenerprocess { //监听process-data-change-queue队列 -> 消费 @rabbitlistener(queues = "process-data-change-queue") public void orderdead(@payload string productidandorderid) { log.info("当前时间:{},收到队列信息:{}", new date().tostring(), productidandorderid); //执行你的业务逻辑 for (int i = 0; i < 5; i++) { system.out.println("循环次数: " + (i + 1)); try { // 暂停 2000 毫秒(2 秒) thread.sleep(2000); } catch (interruptedexception e) { // 处理异常 system.err.println("线程被中断: " + e.getmessage()); } } } }
rabbitmq 中的交换机的作用
rabbitmq 中的交换机(exchange)是消息路由的核心组件。它负责接收来自生产者发送的消息,并根据特定的路由规则将这些消息传递给一个或多个队列(queue)。交换机的主要功能和类型
1.消息路由:
- 交换机决定消息应该发送到哪些队列,基于绑定(binding)和路由键(routing key)。
2.类型:
- 直连交换机(direct exchange):消息直接发送到与路由键精确匹配的队列。
- 主题交换机(topic exchange):消息根据路由键模式匹配一个或多个队列,支持通配符。
- 扇出交换机(fanout exchange):将消息广播到所有绑定的队列,不考虑路由键。
- 头交换机(headers exchange):通过消息的属性(headers)进行路由,而不是使用路由键。
工作流程
- 生产者发送消息到交换机。
- 交换机根据配置的路由规则和队列的绑定关系,将消息路由到相应的队列。
- 消费者从队列中获取消息进行处理。
在我的代码中生产消息语句:
convertandsend(交换机,路由键也就是队列,你想传递的参数)
在扇出交换机(fanout exchange)模式不需要指定路由键,因为指定了也没用。
rabbittemplate.convertandsend("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh");
在扇出交换机(fanout exchange)模式,应改成:
rabbittemplate.convertandsend("process-data-change-exchange", "", "hhhhhhhhhhhhhh");
在扇出交换机中,可以将路由键设置为空字符串 “”,因为扇出交换机会将消息发送到所有绑定的队列,而不需要考虑路由键的具体值。
- 在扇出交换机中,路由键被忽略。
- 消息会被广播到所有与交换机绑定的队列中。
四种交换机模式
1. 直连交换机(direct exchange)
直连交换机:发送到匹配路由键的队列。
// 创建直连交换机 @bean public directexchange directexchange() { return new directexchange("direct-exchange", true, false); } // 创建队列 @bean public queue directqueue() { return new queue("direct-queue", true); } // 将队列绑定到直连交换机,同时指定路由键 @bean public binding directbinding() { return bindingbuilder.bind(directqueue()).to(directexchange()).with("direct-routing-key"); }
生产消息:
直连交换机生产消息:需要指定路由键。
// 发送消息到直连交换机 rabbittemplate.convertandsend("direct-exchange", "direct-routing-key", "your message here");
2. 主题交换机(topic exchange)
主题交换机:支持模糊匹配路由键。
// 创建主题交换机 @bean public topicexchange topicexchange() { return new topicexchange("topic-exchange", true, false); } // 创建队列 @bean public queue topicqueue() { return new queue("topic-queue", true); } // 将队列绑定到主题交换机,同时指定路由键 @bean public binding topicbinding() { return bindingbuilder.bind(topicqueue()).to(topicexchange()).with("topic.#"); }
生产消息:
主题交换机生产消息:需要指定符合主题模式的路由键。
// 发送消息到主题交换机 rabbittemplate.convertandsend("topic-exchange", "topic.routing.key", "your message here");
3. 扇出交换机(fanout exchange)
扇出交换机:将消息广播到所有绑定的队列。
// 创建扇出交换机 @bean public fanoutexchange fanoutexchange() { return new fanoutexchange("fanout-exchange", true, false); } // 创建队列 @bean public queue fanoutqueue1() { return new queue("fanout-queue-1", true); } @bean public queue fanoutqueue2() { return new queue("fanout-queue-2", true); } // 将队列绑定到扇出交换机 @bean public binding fanoutbinding1() { return bindingbuilder.bind(fanoutqueue1()).to(fanoutexchange()); } @bean public binding fanoutbinding2() { return bindingbuilder.bind(fanoutqueue2()).to(fanoutexchange()); }
生产消息:
扇出交换机生产消息:不需要路由键,使用空字符串即可。
// 发送消息到扇出交换机 rabbittemplate.convertandsend("fanout-exchange", "", "your message here");
4. 头交换机(headers exchange)
头交换机:根据消息头中匹配的属性进行路由。
// 创建头交换机 @bean public headersexchange headersexchange() { return new headersexchange("headers-exchange", true, false); } // 创建队列 @bean public queue headersqueue() { return new queue("headers-queue", true); } // 将队列绑定到头交换机,同时指定头属性 @bean public binding headersbinding() { map<string, object> headers = new hashmap<>(); headers.put("format", "pdf"); headers.put("type", "report"); return bindingbuilder.bind(headersqueue()) .to(headersexchange()) .whereall(headers) .match(); }
生产消息:
头交换机生产消息:需要构建一个带有头属性的消息。
// 发送消息到头交换机 messageproperties messageproperties = new messageproperties(); messageproperties.setheader("format", "pdf"); messageproperties.setheader("type", "report"); message message = new message("your message here".getbytes(), messageproperties); rabbittemplate.send("headers-exchange", "", message);
到此这篇关于spring boot中使用rabbitmq 生产消息和消费消息的文章就介绍到这了,更多相关spring boot生产消息和消费消息内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论