当前位置: 代码网 > it编程>编程语言>Java > SpringBoot集成和使用RabbitMQ方式

SpringBoot集成和使用RabbitMQ方式

2024年12月13日 Java 我要评论
1. 引言rabbitmq 是一个流行的消息代理系统,广泛应用于分布式系统中的异步通信、任务解耦和负载分配。除了这些基本功能,rabbitmq 还支持通过死信队列(dead-letter queue,

1. 引言

rabbitmq 是一个流行的消息代理系统,广泛应用于分布式系统中的异步通信、任务解耦和负载分配。除了这些基本功能,rabbitmq 还支持通过死信队列(dead-letter queue, dlq)实现延时消息的发送。延时消息在某些场景下非常有用,例如订单超时未支付的自动取消、延时通知等。

本文将结合 rabbitmq 的基本使用,深入探讨如何在 spring boot 中集成和使用 rabbitmq,同时讲解如何通过死信队列实现延时消息的机制。

2. 环境配置

在开始编写代码之前,我们需要确保开发环境已经正确配置。

2.1. maven 依赖

首先,在 spring boot 项目中添加 rabbitmq 的依赖:

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

2.2. rabbitmq 安装与配置

rabbitmq 可以通过 docker 或直接在本地安装。这里我们以 docker 为例:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

这将启动一个带有管理插件的 rabbitmq 容器,并暴露出 5672 和 15672 端口,分别用于 amqp 和管理界面。

3. 基本概念与原理

在深入代码之前,了解 rabbitmq 的几个核心概念非常重要:

  • 生产者(producer):发送消息的应用程序。
  • 消费者(consumer):接收消息的应用程序。
  • 队列(queue):消息存储的地方。
  • 交换机(exchange):接收生产者发送的消息,并根据路由规则将消息转发到相应的队列。
  • 绑定(binding):队列与交换机之间的关联,定义了消息如何从交换机路由到队列。
  • 死信队列(dead-letter queue, dlq):用于存储处理失败、被拒绝或超时的消息。

3.1. 交换机类型

  • direct exchange:将消息路由到绑定了特定路由键的队列。
  • fanout exchange:将消息广播到绑定的所有队列。
  • topic exchange:根据路由键的模式匹配,将消息路由到一个或多个队列。
  • headers exchange:基于消息头的内容进行路由。

4. spring boot 中的基本使用

4.1. 配置类

创建一个配置类,用于设置队列、交换机和绑定关系:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class rabbitmqconfig {

    public static final string queue_name = "demoqueue";
    public static final string exchange_name = "demoexchange";
    public static final string routing_key = "demoroutingkey";

    @bean
    public queue demoqueue() {
        return new queue(queue_name, true);
    }

    @bean
    public directexchange demoexchange() {
        return new directexchange(exchange_name);
    }

    @bean
    public binding demobinding(queue demoqueue, directexchange demoexchange) {
        return bindingbuilder.bind(demoqueue).to(demoexchange).with(routing_key);
    }
}

4.2. 生产者

创建一个消息生产者,用于发送消息到指定的交换机和路由键:

import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.stereotype.service;

@service
public class rabbitmqproducer {

    private final rabbittemplate rabbittemplate;

    public rabbitmqproducer(rabbittemplate rabbittemplate) {
        this.rabbittemplate = rabbittemplate;
    }

    public void sendmessage(string message) {
        rabbittemplate.convertandsend(rabbitmqconfig.exchange_name, rabbitmqconfig.routing_key, message);
        system.out.println("sent message: " + message);
    }
}

4.3. 消费者

创建一个消息消费者,监听队列并处理消息:

import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.service;

@service
public class rabbitmqconsumer {

    @rabbitlistener(queues = rabbitmqconfig.queue_name)
    public void receivemessage(string message) {
        system.out.println("received message: " + message);
    }
}

5. 死信队列与延时消息

5.1. 死信队列配置

为了实现延时消息,我们可以利用 rabbitmq 的死信队列机制。

当消息在原队列中存留超过指定时间时,会自动转发到死信队列,我们可以通过消费死信队列的消息来实现延时效果。

import org.springframework.amqp.core.queue;

@bean
public queue demoqueue() {
    return queuebuilder.durable(queue_name)
            .withargument("x-dead-letter-exchange", "deadletterexchange")
            .withargument("x-dead-letter-routing-key", "deadletterroutingkey")
            .withargument("x-message-ttl", 60000) // 设置消息在原队列的存活时间(60秒)
            .build();
}

@bean
public queue deadletterqueue() {
    return new queue("deadletterqueue", true);
}

@bean
public directexchange deadletterexchange() {
    return new directexchange("deadletterexchange");
}

@bean
public binding deadletterbinding() {
    return bindingbuilder.bind(deadletterqueue()).to(deadletterexchange()).with("deadletterroutingkey");
}

在上述配置中,x-message-ttl 参数指定了消息在原队列中的存活时间,当超时后,消息将被转发到指定的死信队列。

5.2. 延时消息的处理

消费者监听死信队列,实现延时消息的处理逻辑:

import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.service;

@service
public class delayedmessageconsumer {

    @rabbitlistener(queues = "deadletterqueue")
    public void receivedelayedmessage(string message) {
        system.out.println("received delayed message: " + message);
        // 处理延时消息的逻辑
    }
}

6. 消息确认机制

为了保证消息的可靠性,rabbitmq 提供了生产者和消费者的消息确认机制。

生产者确认用于确保消息成功发送到交换机或队列,消费者确认用于确保消息被成功处理。

6.1. 生产者确认

在生产者端,我们可以配置 rabbittemplate 来监听消息是否成功发送:

import org.springframework.amqp.rabbit.connection.correlationdata;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.stereotype.service;

import javax.annotation.postconstruct;

@service
public class rabbitmqproducerwithconfirm {

    private final rabbittemplate rabbittemplate;

    public rabbitmqproducerwithconfirm(rabbittemplate rabbittemplate) {
        this.rabbittemplate = rabbittemplate;
    }

    @postconstruct
    public void init() {
        rabbittemplate.setconfirmcallback(new rabbittemplate.confirmcallback() {
            @override
            public void confirm(correlationdata correlationdata, boolean ack, string cause) {
                if (ack) {
                    system.out.println("message sent successfully");
                } else {
                    system.out.println("message failed to send: " + cause);
                }
            }
        });
    }

    public void sendmessage(string message) {
        rabbittemplate.convertandsend(rabbitmqconfig.exchange_name, rabbitmqconfig.routing_key, message);
    }
}

6.2. 消费者确认

在消费者端,默认情况下 spring amqp 自动确认消息。

如果需要手动确认,可以在 @rabbitlistener 注解中设置 ackmode

import org.springframework.amqp.core.acknowledgemode;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.rabbit.listener.api.channelawaremessagelistener;
import org.springframework.stereotype.service;
import com.rabbitmq.client.channel;

@service
public class rabbitmqconsumerwithack implements channelawaremessagelistener {

    @override
    @rabbitlistener(queues = rabbitmqconfig.queue_name, ackmode = "manual")
    public void onmessage(org.springframework.amqp.core.message message, channel channel) throws exception {
        try {
            string body = new string(message.getbody());
            system.out.println("received message: " + body);

            // 处理消息...
            channel.basicack(message.getmessageproperties().getdeliverytag(), false);
        } catch (exception e) {
            channel.basicnack(message.getmessageproperties().getdeliverytag(), false, true);
        }
    }
}

7. 集群与高可用性

7.1 rabbitmq 集群模式概述

rabbitmq 支持集群模式,可以提升消息代理的可靠性和可用性。在集群模式下,多个 rabbitmq 节点共同组成一个集群,每个节点都能够接收和发送消息,从而分担系统负载。通过 docker compose 或 kubernetes,可以快速部署一个高可用的 rabbitmq 集群。

集群中的节点分为两种角色:ram 节点和 disk 节点。ram 节点将数据存储在内存中,适合对性能要求较高但对数据持久化要求较低的场景;disk 节点则会将数据持久化到磁盘,保证数据在节点重启或宕机后的恢复能力。根据不同的应用需求,可以混合使用这两种节点类型来优化性能和持久化策略。

7.2 docker compose 部署集群

使用 docker 可以非常方便地部署一个 rabbitmq 集群。

以下示例展示了如何使用 docker compose 创建一个包含三个节点的 rabbitmq 集群:

version: '3'
services:
  rabbitmq-node1:
    image: rabbitmq:management
    container_name: rabbitmq-node1
    ports:
      - "5673:5672"
      - "15673:15672"
    environment:
      rabbitmq_erlang_cookie: "mycookie"
      rabbitmq_nodename: "rabbit@rabbitmq-node1"
  rabbitmq-node2:
    image: rabbitmq:management
    container_name: rabbitmq-node2
    ports:
      - "5674:5672"
      - "15674:15672"
    environment:
      rabbitmq_erlang_cookie: "mycookie"
      rabbitmq_nodename: "rabbit@rabbitmq-node2"
    depends_on:
      - rabbitmq-node1
  rabbitmq-node3:
    image: rabbitmq:management
    container_name: rabbitmq-node3
    ports:
      - "5675:5672"
      - "15675:15672"
    environment:
      rabbitmq_erlang_cookie: "mycookie"
      rabbitmq_nodename: "rabbit@rabbitmq-node3"
    depends_on:
      - rabbitmq-node1
      - rabbitmq-node2

使用上述配置,可以通过以下命令启动集群:

docker-compose up -d

集群启动后,可以使用以下命令将节点 2 和节点 3 加入到节点 1 的集群中:

docker exec -it rabbitmq-node2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq-node1
rabbitmqctl start_app
exit

docker exec -it rabbitmq-node3 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq-node1
rabbitmqctl start_app
exit

至此,一个基本的 rabbitmq 集群已经部署完成。

7.3 kubernetes 部署集群

在 kubernetes 环境中,可以通过 helm chart 快速部署 rabbitmq 集群。helm 是一个 kubernetes 包管理工具,支持简单、高效地管理 kubernetes 应用。

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install my-rabbitmq bitnami/rabbitmq

安装完成后,rabbitmq 集群将自动运行在 kubernetes 集群中,并提供高可用性。可以通过修改 helm chart 配置文件调整集群的节点数量、资源分配等参数,以适应不同的业务需求。

8. 监控与管理

8.1 rabbitmq management plugin

rabbitmq 提供了丰富的管理工具,通过内置的 management plugin,可以方便地监控和管理集群。

management plugin 启用后,可以通过 web 界面访问 rabbitmq 的管理控制台。

启用 management plugin:

rabbitmq-plugins enable rabbitmq_management

在集群节点上启用后,可以通过 http://{hostname}:15672 访问管理界面。默认的用户名和密码均为 guest,建议在生产环境中修改默认密码或禁用该账户。

8.2 监控队列与交换机

通过 rabbitmq management plugin,可以实时查看队列和交换机的状态,包括:

  • 队列的消息堆积数量、消费者情况等。
  • 交换机的消息路由情况、绑定信息等。

这些数据可以帮助运维人员及时了解系统的运行状态,发现并解决潜在的性能问题。

8.3 prometheus 和 grafana 集成

为了进一步增强监控能力,可以将 rabbitmq 的监控数据接入 prometheus 和 grafana。这些工具提供了更加灵活和可视化的监控方案,适用于复杂的生产环境。

1. 启用 prometheus exporter

rabbitmq 提供了 prometheus exporter 插件,用于将 rabbitmq 的监控数据暴露给 prometheus:

rabbitmq-plugins enable rabbitmq_prometheus

启用后,prometheus 可以通过 http 访问 rabbitmq 的监控数据。

2. 配置 grafana 仪表盘

在 prometheus 收集到 rabbitmq 的监控数据后,可以在 grafana 中创建相应的仪表盘,展示 rabbitmq 的性能指标。例如,队列长度、消息处理速率、节点健康状况等。grafana 提供了直观的可视化界面,帮助运维人员实时监控和分析系统的运行状态。

8.4 cli 管理

除了 web ui,rabbitmq 还支持通过 cli 进行管理。常用的 cli 命令包括:

  • rabbitmqctl status:查看节点的状态。
  • rabbitmqctl list_queues:列出所有队列及其消息数量。
  • rabbitmqctl list_connections:查看所有连接及其状态。

cli 工具对于自动化运维和批量操作非常有用,可以通过脚本实现对 rabbitmq 集群的批量管理。

8.5 日志与告警管理

1. 日志配置

rabbitmq 支持多种日志级别(debug、info、warning、error),可以根据需要调整日志输出的详细程度。

通过合理的日志配置,可以帮助运维人员快速定位和解决问题。

rabbitmqctl set_log_level info

2. 告警配置

rabbitmq 支持基于阈值的告警机制,可以在队列长度、磁盘使用率或内存使用率达到一定水平时触发告警。

通过与邮件或短信系统集成,可以在异常情况发生时及时通知相关人员,确保问题能够在第一时间得到处理。

9. 总结

本文详细介绍了如何在 spring boot 项目中集成 rabbitmq,并结合死信队列实现延时消息。通过这些配置和机制,开发者可以在分布式系统中构建更为灵活和可靠的消息传递系统。

扩展阅读:

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

  • 基于Maven pom文件使用分析

    基于Maven pom文件使用分析

    projectmaven 是一个强大的构建和依赖管理工具,pom.xml 文件是 maven 项目的核心配置文件,用于定义项目的构建、依赖关系、插件、目标等。它... [阅读全文]
  • Java8 CompletableFuture异步编程解读

    Java8 CompletableFuture异步编程解读

    completablefuturede介绍java 8 引入了 completablefuture 类,这是 java 异步编程的一个重要进展。completa... [阅读全文]
  • java集合超详细(最新推荐)

    1 - 概述所有的集合类和集合接口都在java.util包下。在内存中申请一块空间用来存储数据,在java中集合就是替换掉定长的数组的一种引用数据类型。2 - 集合与数组的区别长度…

    2024年12月14日 编程语言
  • Mybatis中SQL的执行过程详解

    Mybatis中SQL的执行过程详解

    mybatis 框架sql执行过程数据库操作映射方式mybatis支持两种方式进行数据库操作映射:映射文件:通过xml文件来定义sql语句和映射关系注解方式:通... [阅读全文]
  • IDEA性能优化方法解决卡顿问题

    IDEA性能优化方法解决卡顿问题

    前言我们在使用 intellij idea的时候有时候会觉得卡顿,那么如何在不升级电脑配置的情况下让 intellij idea的性能更好呢?我们可以通过去修改... [阅读全文]
  • java 抽象类示例详解

    一、抽象类概述:我们知道,类用来模拟现实事物。一个类可以模拟一类事物,而某个类的一个实例化对象可以模拟某个属于该类的具体的事物。类中描绘了该类所有对象共同的特性,当一个类中给出的信…

    2024年12月14日 编程语言

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com