当前位置: 代码网 > it编程>编程语言>Java > Springboot RabbitMQ 消息队列使用示例详解

Springboot RabbitMQ 消息队列使用示例详解

2024年06月10日 Java 我要评论
一、概念介绍:rabbitmq中几个重要的概念介绍:channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 tcp 连接内地虚拟连接,amqp 命令都是通过信道发出去的,

一、概念介绍:

rabbitmq中几个重要的概念介绍:

  • channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 tcp 连接内地虚拟连接,amqp 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 tcp 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 tcp 连接。
  • exchanges:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • 交换机类型主要有以下几种:
  • direct exchange(直连交换机):这种类型的交换机根据消息的routing key(路由键)进行精确匹配,只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景。
  • fanout exchange(扇形交换机):这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
  • topic exchange(主题交换机):这种类型的交换机支持基于模式匹配的路由键,可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
  • headers exchange(头交换机):这种类型的交换机不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。适用于需要在消息头中携带额外信息的场景。
  • queues:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

二、引入依赖:

 <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-amqp</artifactid>
 </dependency>

三、添加配置信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手动提交

四、direct exchange(直连交换机)模式

1、新建配置文件 rabbitdirectconfig类

package com.example.direct;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 直连交换机--这种类型的交换机根据消息的routing key(路由键)进行精确匹配,
 * 只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景
 */
@configuration
public class rabbitdirectconfig {
    /**
     * 队列名称
     */
    public static final string queue_message ="queue_message";
    public static final string queue_user ="queue_user";
    /**
     * 交换机
     */
    public static final string exchange="exchange_01";
    /**
     * 路由
     */
    public static final string routing_key="routing_key_01";
    @bean
    public queue queue01() {
        return new queue(queue_message, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @bean
    public queue queue02() {
        return new queue(queue_user, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @bean
    public directexchange exchange01() {
        return new directexchange(exchange,
                true, //是否持久化
                false //是否排他
        );
    }
    @bean
    public binding demobinding() {
        return bindingbuilder.bind(queue01()).to(exchange01()).with(routing_key);
    }
    @bean
    public binding demobinding2() {
        return bindingbuilder.bind(queue02()).to(exchange01()).with(routing_key);
    }
}

2、添加消息生产者 producer类

package com.example.direct;
import com.example.entity.user;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.stereotype.component;
import javax.annotation.resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@component
public class producer {
    @resource
    rabbittemplate rabbittemplate;
    public void sendmessagebyexchangeandroute(string message){
        rabbittemplate.convertandsend(rabbitdirectconfig.exchange, rabbitdirectconfig.routing_key,message);
    }
    /**
     * 默认交换器,隐式地绑定到每个队列,路由键等于队列名称。
     * @param message
     */
    public void sendmessagebyqueue(string message){
        rabbittemplate.convertandsend(rabbitdirectconfig.queue_message,message);
    }
    public void sendmessage(user user){
        rabbittemplate.convertandsend(rabbitdirectconfig.queue_user,user);
    }
}

3、添加消息消费者

package com.example.direct;
import com.example.entity.user;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@component
public class consumer {
    @rabbitlistener(queues = rabbitdirectconfig.queue_user)
    public void onmessage(user user){
        system.out.println("收到的实体bean消息:"+user);
    }
    @rabbitlistener(queues = rabbitdirectconfig.queue_message)
    public void onmessage2(string message){
        system.out.println("收到的字符串消息:"+message);
    }
}

4、 测试

package com.example;
import com.example.entity.user;
import com.example.direct.producer;
import com.example.fanout.fanoutproducer;
import com.example.topic.topicproducer;
import org.junit.jupiter.api.test;
import org.springframework.boot.test.context.springboottest;
import javax.annotation.resource;
@springboottest
class springbootrabbitmqapplicationtests {
    @resource
    producer producer;
    @test
    public void sendmessage() throws interruptedexception {
        producer.sendmessagebyqueue("哈哈");
        producer.sendmessage(new user().setage(10).setname("wasin"));
    }
}

五、topic exchange(主题交换机)模式

1、新建rabbittopicconfig类

package com.example.topic;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 主题交换机--这种类型的交换机支持基于模式匹配的路由键,
 * 可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
 */
@configuration
public class rabbittopicconfig {
    /**
     * 交换机
     */
    public static final string exchange = "exchange_topic1";
    /**
     * 队列名称
     */
    public static final string queue_topic1 = "queue_topic";
    /**
     * 路由
     * "*" 与 "#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)
     * 可以匹配 aa.wasin.aa.bb  wasin.aa.bb  wasin.aa ....
     * aa.bb.wasin.cc 无法匹配
     */
    public static final string routing_key1 = "*.wasin.#";
    @bean
    public queue queue() {
        return new queue(queue_topic1, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @bean
    public topicexchange exchange() {
        return new topicexchange(exchange,
                true, //是否持久化
                false //是否排他
        );
    }
    @bean
    public binding binding() {
        return bindingbuilder.bind(queue()).to(exchange()).with(routing_key1);
    }
}

2、新建 消息生产者和发送者

topicproducer类

package com.example.topic;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.stereotype.component;
import javax.annotation.resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@component
public class topicproducer {
    @resource
    rabbittemplate rabbittemplate;
    /**
     * @param routekey 路由
     * @param message 消息
     */
    public void sendmessagebyqueue(string routekey, string message){
        rabbittemplate.convertandsend(rabbittopicconfig.exchange,routekey,message);
    }
}

topicconsumer类

package com.example.topic;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@slf4j
@component
public class topicconsumer {
    @rabbitlistener(queues = rabbittopicconfig.queue_topic1)
    public void onmessage2(string message){
        log.info("topic收到的字符串消息:{}",message);
    }
}

六、fanout exchange(扇形交换机)模式

1、 新建 rabbitfanoutconfig类

package com.example.fanout;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 扇形交换机--这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,
 * 不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
 */
@configuration
public class rabbitfanoutconfig {
    /**
     * 交换机
     */
    public static final string exchange = "exchange_fanout";
    /**
     * 队列名称
     */
    public static final string queue_fanout1 = "queue_fanout";
    /**
     * 队列名称
     */
    public static final string queue_fanout2 = "queue_fanout2";
    @bean
    public queue queuefanout1() {
        return new queue(queue_fanout1, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @bean
    public queue queuefanout2() {
        return new queue(queue_fanout2, //队列名称
                true, //是否持久化
                false, //是否排他
                false //是否自动删除
        );
    }
    @bean
    public fanoutexchange exchangefanout() {
        return new fanoutexchange(exchange,
                true, //是否持久化
                false //是否排他
        );
    }
    @bean
    public binding bindingfanout() {
        return bindingbuilder.bind(queuefanout1()).to(exchangefanout());
    }
    @bean
    public binding bindingfanout2() {
        return bindingbuilder.bind(queuefanout2()).to(exchangefanout());
    }
}

2、新建 消息生产者和发送者

fanoutproducer类:

package com.example.fanout;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.stereotype.component;
import javax.annotation.resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@component
public class fanoutproducer {
    @resource
    rabbittemplate rabbittemplate;
    /**
     * @param message 消息
     */
    public void sendmessagebyqueue(string message) {
        rabbittemplate.convertandsend(rabbitfanoutconfig.exchange, "", message);
    }
}

fanoutconsumer类

package com.example.fanout;
import com.rabbitmq.client.channel;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.support.amqpheaders;
import org.springframework.messaging.handler.annotation.header;
import org.springframework.stereotype.component;
import java.io.ioexception;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@slf4j
@component
public class fanoutconsumer {
    /**
     * 手动提交
     * @param message
     * @param channel
     * @param tag
     * @throws ioexception
     */
    @rabbitlistener(queues = rabbitfanoutconfig.queue_fanout1)
    public void onmessage(string message, channel channel, @header(amqpheaders.delivery_tag) long tag) throws ioexception {
        log.info("fanout1收到的字符串消息:{}",message);
        channel.basicack(tag,false);
    }
    @rabbitlistener(queues = rabbitfanoutconfig.queue_fanout2)
    public void onmessage2(string message){
        log.info("fanout2到的字符串消息:{}",message);
    }
}

到此这篇关于springboot rabbitmq 消息队列使用的文章就介绍到这了,更多相关springboot rabbitmq 消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com