当前位置: 代码网 > it编程>编程语言>Java > Spring Boot整合RabbitMQ

Spring Boot整合RabbitMQ

2024年07月28日 Java 我要评论
当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就是死信队列。一般在生产中,先启动消费者服务,再启动生产者服务,因此我们通常把配置类放在消费者服务中,否则可能导致无队列监听而服务启动失败或报错。DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。启动测试类,执行成功,查看RabbitMQ Web页面,成功绑定,且各存在一条消息。

github:springbootdemo
gitee:springbootdemo
微信公众号:在这里插入图片描述

0 开发环境

  • jdk:1.8
  • spring boot:2.7.18
  • rabbitmq:3.13.1
  • erlang:26.2.4

1 安装rabbitmq

1.1 安装rabbitmq

下载地址:https://www.rabbitmq.com/docs/download

windows下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.1/rabbitmq-server-3.13.1.exe

因为rabbitmq依赖erlang,所以下载完成后先不要安装,先在 https://www.rabbitmq.com/docs/which-erlang 查看所需的erlang版本,下载对应erlang安装包

在这里插入图片描述

1.2 安装erlang

erlang下载地址:https://www.erlang.org/downloads

依次安装erlang、rabbitmq

计算机服务中会出现rabbitmq服务,直接右键启动即可。

也可在开始菜单中找到rabbitmq server目录,点击rabbitmq service - start启动

在这里插入图片描述

1.3 启用rabbitmq管理界面

默认情况下,rabbitmq没有启用web端客户端插件,需要启用才可以生效

参考文档:https://www.rabbitmq.com/docs/management

进入到rabbitmq安装目录下 rabbitmq server\rabbitmq_server-3.13.1\sbin 执行命令

rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

安装成功,重启rabbitmq服务,浏览器访问 127.0.0.1:15672

在这里插入图片描述

默认账号密码都是guest,登录成功后界面如下

在这里插入图片描述

2 广播模式fanout

fanout,发布订阅模式,是一种广播机制,是没有路由key的模式。

2.1 新建生产者

新建module spring-boot-rabbitmq-producer

2.1.1 引入依赖

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

2.1.2 配置rabbitmq

server:
  port: 8090
#
#配置rabbitmq服务
spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    host: 127.0.0.1
    port: 5672

2.1.3 新建配置类

@configuration
public class rabbitmqfanoutconfig {

    /**
     * fanout 交换机
     *
     * @return
     */
    @bean
    public fanoutexchange fanoutexchange() {
        return new fanoutexchange("fanout_user_exchange", true, false);
    }

    /**
     * 发送短信队列
     *
     * @return
     */
    @bean
    public queue smsqueue() {
        //durable       是否持久化,默认为false
        //exclusive     是否只能被当前创建的连接使用,默认为false
        //autodelete    是否自动删除,默认为false

        //一般设置队列的持久化,其余两个默认false
        return new queue("sms.fanout.queue", true);
    }

    /**
     * 发送邮件队列
     *
     * @return
     */
    @bean
    public queue emailqueue() {
        return new queue("email.fanout.queue", true);
    }

    /**
     * 发送微信队列
     *
     * @return
     */
    @bean
    public queue wechatqueue() {
        return new queue("wechat.fanout.queue", true);
    }

    /**
     * 将队列和交换机绑定
     *
     * @return
     */
    @bean
    public binding smsbindingfanout() {
        return bindingbuilder.bind(smsqueue()).to(fanoutexchange());
    }

    @bean
    public binding emailbindingfanout() {
        return bindingbuilder.bind(emailqueue()).to(fanoutexchange());
    }

    @bean
    public binding wechatbindingfanout() {
        return bindingbuilder.bind(wechatqueue()).to(fanoutexchange());
    }
}

2.1.4 新建生产者服务

@service
public class userservice {

    @autowired
    private rabbittemplate rabbittemplate;

    public void register(string username, string password) {
        //模拟用户注册
        string userid = uuid.randomuuid().tostring();

        //saveuser(user);

        //发送用户信息给rabbitmq fanout
        rabbittemplate.convertandsend("fanout_user_exchange", "", userid + username);
    }
}

2.1.5 新建测试类

@springboottest(classes = rabbitmqproducerapplication.class, webenvironment = springboottest.webenvironment.defined_port)
public class rabbitmqproducerapplicationtest {

    @autowired
    private userservice userservice;

    @test
    public void contextloads() {
        userservice.register("张三", "zhangsan");
    }
}

启动测试类,执行成功,查看rabbitmq web页面,成功绑定,且各存在一条消息

在这里插入图片描述

2.2 新建消费者

新建module spring-boot-rabbitmq-consumer

2.2.1 引入依赖

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

2.2.2 配置rabbitmq

server:
  port: 8090
#
#配置rabbitmq服务
spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    host: 127.0.0.1
    port: 5672

2.2.3 新建短信消费服务

@rabbitlistener(queues = "sms.fanout.queue")
@service
public class fanoutsmsservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //发送短信

        system.out.println("sms.fanout:" + message);
    }
}

2.2.4 新建邮件消费服务

@rabbitlistener(queues = "email.fanout.queue")
@service
public class fanoutemailservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //发送邮件

        system.out.println("email.fanout:" + message);
    }
}

2.2.5 新建微信消费服务

@rabbitlistener(queues = "wechat.fanout.queue")
@service
public class fanoutwechatservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //推送微信消息

        system.out.println("wechat.fanout:" + message);
    }
}

2.3 测试

新建启动类,启动服务,控制台输出如下,消费者成功接收到消息

在这里插入图片描述

rabitmq中消息为0

在这里插入图片描述

再次执行生产者测试类发送新的消息,消费者成功接收并打印

在这里插入图片描述

3 路由模式direct

有routing-key的匹配模式,direct模式是fanout模式上的一种叠加,增加了路由routingkey的模式。

3.1 消费者

3.1.1 新建短信消费服务

这里,我们不在生产者中使用配置类来绑定交换机和队列了,直接在消费者中使用注解来绑定

使用 @rabbitlistener(bindings = @queuebinding()) 注解来绑定交换机和队列

  • bindings 用来确定队列和交换机的绑定关系
  • value 队列名称,与生产者对应
  • exchange 交换机名称,与生产者对应;type设置rabbitmq模式,默认为direct
  • key 路由key

但有些高级用法,还是推荐使用配置类

@rabbitlistener(bindings = @queuebinding(
        value = @queue(value = "sms.direct.queue", durable = "true", autodelete = "false"),
        exchange = @exchange(value = "direct_user_exchange", type = exchangetypes.direct),
        key = "sms"))
@service
public class directsmsservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //发送短信

        system.out.println("sms.direct:" + message);
    }
}

3.1.2 新建邮件消费服务

@rabbitlistener(bindings = @queuebinding(
        value = @queue(value = "email.direct.queue", durable = "true", autodelete = "false"),
        exchange = @exchange(value = "direct_user_exchange"),
        key = "email"))
@service
public class directemailservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //发送邮件

        system.out.println("email.direct:" + message);
    }
}

3.1.3 新建微信消费服务

@rabbitlistener(bindings = @queuebinding(
        value = @queue(value = "wechat.direct.queue", durable = "true", autodelete = "false"),
        exchange = @exchange(value = "direct_user_exchange"),
        key = "wechat"))
@service
public class directwechatservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //推送微信消息

        system.out.println("wechat.direct:" + message);
    }
}

3.2 生产者

3.2.1 调整服务类

@service
public class userservice {

    @autowired
    private rabbittemplate rabbittemplate;

	//...

    public void registerdirect(string username, string password) {
        //模拟用户注册
        string userid = uuid.randomuuid().tostring();

        //saveuser(user);

        rabbittemplate.convertandsend("direct_user_exchange", "sms", userid + username);
        rabbittemplate.convertandsend("direct_user_exchange", "email", userid + username);
    }
}

3.2.2 调整测试类

@springboottest(classes = rabbitmqproducerapplication.class, webenvironment = springboottest.webenvironment.defined_port)
public class rabbitmqproducerapplicationtest {

    @autowired
    private userservice userservice;

    @test
    public void contextloads() {
        userservice.register("张三", "zhangsan");
    }

    @test
    public void contextloadsdirect() {
        userservice.registerdirect("李四", "lisi");
    }
}

3.3 测试

启动消费者服务,执行生产者测试类contextloadsdirect,消费者成功收到消息并打印

在这里插入图片描述

4 主题模式topic

模糊的routing-key的匹配模式,topic模式是direct模式上的一种叠加,增加了模糊路由routingkey的模式。

4.1 消费者

4.1.1 新建配置类

一般在生产中,先启动消费者服务,再启动生产者服务,因此我们通常把配置类放在消费者服务中,否则可能导致无队列监听而服务启动失败或报错

@configuration
public class rabbitmqtopicconfig {

    /**
     * topic 交换机
     *
     * @return
     */
    @bean
    public topicexchange topicexchange() {
        return new topicexchange("topic_user_exchange", true, false);
    }

    /**
     * 发送短信队列
     *
     * @return
     */
    @bean
    public queue smsqueue() {
        //durable       是否持久化,默认为false
        //exclusive     是否只能被当前创建的连接使用,默认为false
        //autodelete    是否自动删除,默认为false

        //一般设置队列的持久化,其余两个默认false
        return new queue("sms.topic.queue", true);
    }

    /**
     * 发送邮件队列
     *
     * @return
     */
    @bean
    public queue emailqueue() {
        return new queue("email.topic.queue", true);
    }

    /**
     * 发送微信队列
     *
     * @return
     */
    @bean
    public queue wechatqueue() {
        return new queue("wechat.topic.queue", true);
    }

    /**
     * 将队列和交换机绑定, 并设置用于匹配键
     *
     * @return
     */
    @bean
    public binding smsbindingtopic() {
        return bindingbuilder.bind(smsqueue()).to(topicexchange()).with("*.sms.#");
    }

    @bean
    public binding emailbindingtopic() {
        return bindingbuilder.bind(emailqueue()).to(topicexchange()).with("#.email.#");
    }

    @bean
    public binding wechatbindingtopic() {
        return bindingbuilder.bind(wechatqueue()).to(topicexchange()).with("#.wechat.*");
    }
}

4.1.2 新建短信消费服务

@rabbitlistener(queues = "sms.topic.queue")
@service
public class topicsmsservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //发送短信

        system.out.println("sms.topic:" + message);
    }
}

4.1.3 新建邮件消费服务

@rabbitlistener(queues = "email.topic.queue")
@service
public class topicemailservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //发送邮件

        system.out.println("email.topic:" + message);
    }
}

4.1.4 新建微信消费服务

@rabbitlistener(queues = "wechat.topic.queue")
@service
public class topicwechatservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        //推送微信消息

        system.out.println("wechat.topic:" + message);
    }
}

4.2 生产者

4.2.1 调整服务类

    public void registertopic(string username, string password) {
        //模拟用户注册
        string userid = uuid.randomuuid().tostring();

        //saveuser(user);

        rabbittemplate.convertandsend("topic_user_exchange", "*.sms.email.*", userid + username);
    }

4.2.2 调整测试类

    @test
    public void contextloadstopic() {
        userservice.registertopic("王五", "wangwu");
    }

4.3 测试

启动消费者服务,执行生产者测试类contextloadstopic,消费者成功收到消息并打印

在这里插入图片描述

5 设置过期时间

5.1 调整消费者配置类

调整rabbitmqtopicconfig

    /**
     * durable         创建持久化队列phone.topic.queue
     * withargument    设置消息过期时间60000毫秒
     *
     * @return
     */
    @bean
    public queue phonequeue() {
        return queuebuilder.durable("phone.topic.queue").withargument("x-message-ttl", 60000).build();
    }

    @bean
    public binding phonebindingtopic() {
        return bindingbuilder.bind(phonequeue()).to(topicexchange()).with("#.phone.#");
    }

5.2 测试

启动服务,rabitmq中队列如下,队列标识为ttl

在这里插入图片描述

6 消息确认机制

以下均为生产者module的调整

6.1 调整生产者yml

server:
  port: 8090
#
#配置rabbitmq服务
spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    host: 127.0.0.1
    port: 5672
    #none 禁用发布确认模式,默认值
    #correlated 消息成功发布到交换机后会触发回调方法
    publisher-confirm-type: correlated

6.2 新建回调方法

public class messageconfirmcallback implements rabbittemplate.confirmcallback {

    @override
    public void confirm(correlationdata correlationdata, boolean b, string s) {
        if (b) {
            system.out.println("消息确认成功");
        } else {
            system.out.println("消息确认失败");
        }
    }
}

6.3 调整服务类

    public void registertopiccallback(string username, string password) {
        //模拟用户注册
        string userid = uuid.randomuuid().tostring();

        //saveuser(user);

        //设置消息确认
        rabbittemplate.setconfirmcallback(new messageconfirmcallback());
        rabbittemplate.convertandsend("topic_user_exchange", "*.sms.email.*", userid + username);
    }

6.4 调整测试类

    @test
    public void contextloadstopiccallback() {
        userservice.registertopiccallback("周六", "zhouliu");
    }

执行测试类contextloadstopiccallback,成功进入回调方法

在这里插入图片描述

7 死信队列

dlx(dead-letter-exchange),称为死信交换机或死信邮箱。

当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是dlx ,绑定dlx的队列就是死信队列。

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

dlx也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。

当这个队列中存在死信时,rabbitmq就会自动将这个消息重新发布到设置的dlx上去,进而被路由到另一个队列,即死信队列。

在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可设置为死信队列。

7.1 调整消费者配置类

调整rabbitmqtopicconfig

    @bean
    public queue dlxqueue() {
        return new queue("dlx.topic.queue", true);
    }

    @bean
    public queue vipqueue() {
        return queuebuilder.durable("vip.topic.queue")
                .withargument("x-message-ttl", 60000)
                .withargument("x-dead-letter-exchange", "dlx.topic.queue")
                .build();
    }

    @bean
    public binding dlxbindingtopic() {
        return bindingbuilder.bind(dlxqueue()).to(topicexchange()).with("#.#");
    }

    @bean
    public binding vipbindingtopic() {
        return bindingbuilder.bind(vipqueue()).to(topicexchange()).with("#.vip.#");
    }

7.2 测试

启动服务,rabitmq中队列如下,队列标识为dlx

在这里插入图片描述

7.3 调整生产者服务类

    public void registertopicdlx(string username, string password) {
        //模拟用户注册
        string userid = uuid.randomuuid().tostring();

        //saveuser(user);

        rabbittemplate.convertandsend("topic_user_exchange", "*.vip.*", userid + username);
    }

7.4 调整生产者测试类

    @test
    public void contextloadstopicdlx() {
        userservice.registertopicdlx("钱七", "qianqi");
    }

7.5 测试

关闭消费者服务,执行生产者测试类contextloadstopicdlx

未过期前

在这里插入图片描述
过期后

在这里插入图片描述

7.6 新建死信消费服务

消费者新建

@rabbitlistener(queues = "dlx.topic.queue")
@service
public class topicdlxservice {

    /**
     * 消息接收的方法
     *
     * @param message
     */
    @rabbithandler
    public void receivemessage(string message) {
        system.out.println("dlx.topic:" + message);
    }
}

7.7 测试

再启动消费者服务,成功接收死信队列中消息

在这里插入图片描述

rabbitmq中消息队列如下

在这里插入图片描述

至此,springboot成功整合rabbitmq且测试通过,集群、分布式事务等用法敬请等待后续。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com