当前位置: 代码网 > it编程>开发工具>Docker > rabbitmq

rabbitmq

2024年08月02日 Docker 我要评论
用途● 流量削峰最大处理量如果是一秒一万条订单,一秒钟来了两万条,可以先存在消息队列里面,按照能力去消费处理● 应用解耦下单后,需要去调用很多其他系统,使用我们的发布订阅,让需要接受这条消息的服务监听这个queue● 异步处理在我们一些需要异步调用的场景中,回调核心概念生产者交换机(需要重点理解)接受生产者的消息,并按照规则推到队列里面,这些规则的配置可以实现不同场景的需求队列消费者安装docker。

用途

  • 流量削峰

最大处理量如果是一秒一万条订单,一秒钟来了两万条,可以先存在消息队列里面,按照能力去消费处理

  • 应用解耦

下单后,需要去调用很多其他系统,使用我们的发布订阅,让需要接受这条消息的服务监听这个queue

  • 异步处理

在我们一些需要异步调用的场景中,回调

核心概念

生产者
交换机(需要重点理解)接受生产者的消息,并按照规则推到队列里面,这些规则的配置可以实现不同场景的需求
队列
消费者

安装

docker

docker run -d -p 15672:15672  -p  5672:5672  -e rabbitmq_default_user=admin -e rabbitmq_default_pass=admin --name rabbitmq --hostname=rabbitmqhostone  rabbitmq:management

3.8.8 https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.8
22.3 https://www.erlang-solutions.com/downloads/

# 安装erlang
rpm -ivh esl-erlang_22.3.1-1_centos_7_amd64.rpm

warning: esl-erlang_22.3.1-1_centos_7_amd64.rpm: header v4 rsa/sha1 signature, key id a14f4fca: nokey
error: failed dependencies:
执行以下命令:

yum install epel-release
yum install unixodbc unixodbc-devel wxbase wxgtk sdl wxgtk-gl

yum install socat -y

#安装rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

#添加开机启动 rabbitmq 服务
chkconfig rabbitmq-server on

#启动服务
/sbin/service rabbitmq-server start

#查看服务状态
/sbin/service rabbitmq-server status

#停止服务(选择执行)
/sbin/service rabbitmq-server stop

#开启 web 管理插件,rabbitmq 默认不开启
rabbitmq-plugins enable rabbitmq_management

# 现在登录如果使用ip是无法登录的
# 添加配置文件,去掉 ip 限制
cd /etc/rabbitmq

vim rabbitmq-env.conf
#  specifies new style config file location
config_file=/etc/rabbitmq/rabbitmq.conf

vim rabbitmq.conf

loopback_users = none

/sbin/service rabbitmq-server restart

#创建账号
rabbitmqctl add_user admin 123

#设置用户角色
rabbitmqctl set_user_tags admin administrator

#设置用户权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
#户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

#当前用户和角色
rabbitmqctl list_users  

# 关闭防火墙
# 查看防火墙状态: 
systemctl status firewalld.service

# 关闭防火墙
systemctl stop firewalld.service

# 开机禁用防火墙
systemctl disable firewalld.service

hello world

还是国际惯例,咱们来一个 hello world,实现的功能也很简单,创建一个生产者,发送一条 hello world 的消息,再创建一个 消费者,消费这条消息,并在控制台打印

我们创建一个 maven 的简单项目,后面再去整合 springboot, 只需要引入两个依赖

 <dependency>
            <groupid>com.rabbitmq</groupid>
            <artifactid>amqp-client</artifactid>
            <version>5.8.0</version>
        </dependency>

        <dependency>
            <groupid>commons-io</groupid>
            <artifactid>commons-io</artifactid>
            <version>2.6</version>
        </dependency>

创建一个生产者

/**
 * 生产者:发消息
 */
public class producer {
    // 队列名称
    public static final string queue_name = "hello";

    // 发消息
    public static void main(string[] args) throws ioexception, timeoutexception {
        // 创建一个连接工厂
        connectionfactory factory = new connectionfactory();
        // 工厂 ip 连接 rabbitmq 的队列
        factory.sethost("172.16.0.28");
        // 用户名
        factory.setusername("admin");
        // 密码
        factory.setpassword("123");
        // 创建连接
        connection connection = factory.newconnection();
        // 获取信道
        channel channel =  connection.createchannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化(默认 false,内存)
         * 3.该队列是否值供一个消费者进行消费,是否进行消费共享, true 可以多个消费者消费
         * 4.是否自动删除  最后一个消费者断开连接后 该队列是否自动删除 false 不自动删除
         * 5.其他参数
         */
        channel.queuedeclare(queue_name, false, false, false, null);

        // 发消息
        string message = "hello world";

        /**
         * 发送一个消费
         * 1.发送到哪个交换机
         * 2.路由的 key 值是哪个,本次是队列的名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        channel.basicpublish("", queue_name, null, message.getbytes());

        system.out.println("消息发送完毕");
    }

}

消费者

/**
 * 消费者
 */
public class consumer {

    // 队列名称
    public static final string queue_name = "hello";

    // 接收消息
    public static void main(string[] args) throws ioexception, timeoutexception {
        // 创建一个连接工厂
        connectionfactory factory = new connectionfactory();
        // 工厂 ip 连接 rabbitmq 的队列
        factory.sethost("172.16.0.28");
        // 用户名
        factory.setusername("admin");
        // 密码
        factory.setpassword("123");
        // 创建连接
        connection connection = factory.newconnection();
        // 获取信道
        channel channel =  connection.createchannel();

        // 声明 接收消息
        delivercallback delivercallback = (consumertag, message) -> {
            system.out.println(new string(message.getbody()));
        };

        // 声明 取消消息的回调
        cancelcallback cancelcallback = (consumertag) -> {
            system.out.println("消息 消费被中断");
        };

        /**
         * 消费者消费消息:
         *  1。 消费哪个队列
         *  2. 消费成功后是否要自动应答,true 代表自动应答, false 代表手动应答
         *  3。消费者未成功消费的回调
         *  4。消费者取消消费的回调
         */
        channel.basicconsume(queue_name, true, delivercallback, cancelcallback);
    }
}

我们启动下


我们来简单梳理下,在生产者中我们主要做的是,定义一个 队列,并往这个队列中发送消息,消费者中则是指定监听对应的 queue

消息应答

消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

  • 自动应答
  • 手动应答
    • channel.basicack(用于肯定确认)  rabbitmq 已知道该消息并且成功的处理消息,可以将其丢弃了
      • 是否批量应答

批量应答的理解,不建议使用,可能会应答没有处理完的消息

自动入队

发生点在工作线程
没有 ack 的消息会被重新放回队列被别的消费者消费

文字说明,我们启动两个消费者,消费者c1,c2分别接收消息 m1, m2, 在c1 ack之前把c1关掉,这时m1会被c2重新消费

/**
 * 消息在手动应答时是不丢失,放回队列中重新消费
 */
public class task02 {

    // 队列名称
    public static final  string task_queue_name = "ack_queue";

    public static void main(string[] args) throws  exception{
        channel channel = rabbitmqutils.getchannel();

        boolean durable = true;

        // 声明队列
        channel.queuedeclare(task_queue_name, durable, false, false, null);

        scanner scanner = new scanner(system.in);
        while (scanner.hasnext()) {
            string message = scanner.next();
            channel.basicpublish("", task_queue_name, null, message.getbytes("utf-8")); // 解决中文编码
            system.out.println("生产者发送消息: " + message);
        }
    }
}

public class work3 {
    // 队列名称
    public static final  string task_queue_name = "ack_queue";

    public static void main(string[] args) throws ioexception, timeoutexception {
        channel channel = rabbitmqutils.getchannel();
        system.out.println("c1 等待接收消息处理时间较短");


        delivercallback delivercallback = (consumertag, message) -> {
            // 沉睡 1 s
            sleeputils.sleep(1);
            system.out.println("接收到消息: " + new string(message.getbody(), "utf-8"));
            // 手动应答
            /**
             * 1.消息的标记 tag
             * 2.是否批量应答 false 不批量应答信道中的消息 true: 批量
             */
            channel.basicack(message.getenvelope().getdeliverytag(), false);
        };

        // 采用手动应答
        boolean autoack = false;
        channel.basicconsume(task_queue_name, autoack, delivercallback, (consumertag)->{
            system.out.println(consumertag + "消费者取消消费接口回调逻辑");
        });

    }
}

public class work4 {
    // 队列名称
    public static final string task_queue_name = "ack_queue";

    public static void main(string[] args) throws ioexception, timeoutexception {
        channel channel = rabbitmqutils.getchannel();
        system.out.println("c2 等待接收消息处理时间较长");


        delivercallback delivercallback = (consumertag, message) -> {
            // 沉睡 30 s
            sleeputils.sleep(30);
            system.out.println("接收到消息: " + new string(message.getbody(), "utf-8"));
            // 手动应答
            /**
             * 1.消息的标记 tag
             * 2.是否批量应答 false 不批量应答信道中的消息 true: 批量
             */
            channel.basicack(message.getenvelope().getdeliverytag(), false);
        };

        // 采用手动应答
        boolean autoack = false;
        channel.basicconsume(task_queue_name, autoack, delivercallback, (consumertag) -> {
            system.out.println(consumertag + "消费者取消消费接口回调逻辑");
        });

    }
}

我们先启动 task02 ,发送两条消息,后面分别启动 work4, work3,work3 和 work4根据轮训机制,会分别取到一条消息,然后再 ack 之前,我们把 work4 关掉,会发现两条消息都被 work3 消费了

持久化

这里的处理发生在 ,生产者发送消息的时候
需要分别设置队列和消息的持久化


这里存在一种情况,消息在落盘之前 宕机了,消息也会丢失,后面会讲到处理方式(需要发布确认)

发布确认

这一小节来处理上一小节提出的问题,确保消息能被发布
发布确认总共有三种策略,下面我们我们分别说明,代码演示下,重点计算下每种策略所花的时间
首先我们需要开启发布确认

main 函数,下面我们分别写三个方法,分别实现 每种发布确认策略

// 批量发消息的个数
    public static final int message_count = 1000;

    public static void main(string[] args) throws exception {
//        1. 单个确认
        publishmessageindividually(); // 发布 1000个单独确认消息耗时 398ms
//        2. 批量确认
//        publishmessagebatch(); // 发布 1000个批量确认消息耗时 69ms
//        3. 异步批量确认
//        publishmessageasync(); // 发布 1000个异步确认消息耗时 33ms
    }
  • 单个发布确认

串行,一条消息发布确认后才可以开始下一条消息
没发送一个消息调用一次 channel.waitforconfirms();

// 单个确认
public static void publishmessageindividually() throws exception {
    channel channel = rabbitmqutils.getchannel();
    //  队列的声明

    string queuename = uuid.randomuuid().tostring();
    channel.queuedeclare(queuename, true, false, false, null);

    // 开启发布确认
    channel.confirmselect();

    // 开始时间
    long begin = system.currenttimemillis();

    // 批量发消息
    for (int i = 0; i < message_count; i++) {
        string message = i + "";
        channel.basicpublish("", queuename, null, message.getbytes());
        // 单个消息马上进行发布确认
        boolean flag = channel.waitforconfirms();
        if (flag) {
            system.out.println("消息发送成功");
        }
    }

    long end = system.currenttimemillis();

    system.out.println("发布 " + message_count + "个单独确认消息耗时 " + (end - begin) + "ms");
}
  • 批量发布确认

计算发送的消息,达到一定量之后调用一次 channel.waitforconfirms();
本质上还是同步,而且会存在某些消息没有被发布的问题,这个实现其实个人感觉有点鸡肋

// 批量发送确认
    public static void publishmessagebatch() throws exception {
        channel channel = rabbitmqutils.getchannel();

        //  队列的声明
        string queuename = uuid.randomuuid().tostring();
        channel.queuedeclare(queuename, true, false, false, null);

        // 开启发布确认
        channel.confirmselect();

        // 开始时间
        long begin = system.currenttimemillis();

        // 批量确认大小
        int batchsize = 100;

        // 未确认消息个数

        // 批量发消息 批量发布确认
        for (int i = 0; i < message_count; i++) {
            string message = i + "";
            channel.basicpublish("", queuename, null, message.getbytes());

            // 判断达到100条消息的时候,批量确认一次
            if (i % batchsize == 0) {
                // 发布确认
                channel.waitforconfirms();
            }
        }

        long end = system.currenttimemillis();

        system.out.println("发布 " + message_count + "个批量确认消息耗时 " + (end - begin) + "ms");
    }
  • 异步发布确认


这里是通过回调函数来异步确认

// 异步发布确认
    public static void publishmessageasync() throws exception {
        channel channel = rabbitmqutils.getchannel();

        //  队列的声明
        string queuename = uuid.randomuuid().tostring();
        channel.queuedeclare(queuename, true, false, false, null);

        // 开启发布确认
        channel.confirmselect();

        /**
         * 线程安全有序的一个哈希表 适用于高并发的情况下
         * 1. 轻松的将序号和消息进行关联
         * 2. 轻松批量删除条目 只要给到序号
         * 3. 支持高并发(多线程)
         */
        concurrentskiplistmap<long, string> outstandingconfirms =
                new concurrentskiplistmap<>();

        // 开始时间
        long begin = system.currenttimemillis();

        // 准备消息的监听器 监听哪些消息成功了 哪些消息失败了

        // 消息确认成功 回调函数
        confirmcallback ackcallback = (deliverytag, multiple) -> {
            // 2. 删除已经确认的消息,剩下的就是未确认的消息
            if (multiple) {
                concurrentnavigablemap<long, string> confirmed = outstandingconfirms.headmap(deliverytag);
                confirmed.clear();
            } else {
                outstandingconfirms.remove(deliverytag);
            }
            system.out.println("确认的消息: " + deliverytag);
        };

        // 消息确认失败 回调函数
        confirmcallback nackcallback = (deliverytag, multiple) -> {
            // 3. 打印一下未确认的消息都有哪些
            string message = outstandingconfirms.get(deliverytag);
            system.out.println("未确认的消息是 " + message + "未确认的消息: " + deliverytag);
        };

        /**
         * 1. 监听哪些消息成功了
         * 2. 监听哪些消息失败了
         */
        channel.addconfirmlistener(ackcallback, nackcallback); // 异步通知


        // 批量发送消息
        for (int i = 0; i < message_count; i++) {
            string message = i + "";
            channel.basicpublish("", queuename, null, message.getbytes());

            // 1. 此处记录下所有要发送的消息 消息的总和
            outstandingconfirms.put(channel.getnextpublishseqno(), message);

        }

        // 结束时间
        long end = system.currenttimemillis();

        system.out.println("发布 " + message_count + "个异步确认消息耗时 " + (end - begin) + "ms");
    }

有两个点需要说明

  1. channel.addconfirmlistener(ackcallback, nackcallback); // 异步通知 这里添加回调函数
  2. concurrentskiplistmap 创建一个 并发集合,记录消息状态

todo 这里可以补充下哈,但还是感谢尚硅谷老师

交换机

这一小节会介绍几种常见交换机绑定队列的方式和几种常见交换机

前面我们没有手动去指定交换机

默认会给我们提供一个无名交换机

类似的,如果我们没有给队列命名,我们采用的也就是临时队列

绑定关系则是指的,路由与队列之间的映射关系

下面我们来介绍不同类型的交换机

fanout

广播,会把接收到的消息 广播到它知道的所有队列中

/**
 * 发消息
 */
public class emitlog {

    // 交换机名称
    private static final string exchange_name = "logs";

    public static void main(string[] args) throws exception {

        channel channel = rabbitmqutils.getchannel();

        channel.exchangedeclare(exchange_name, "fanout");

        scanner scanner = new scanner(system.in);

        while (scanner.hasnext()) {
            string message = scanner.next();
            channel.basicpublish(exchange_name, "", null, message.getbytes("utf-8"));
            system.out.println("生产者发出消息: " + message);
        }
    }
}

/**
 * 消息接收
 */
public class receivelogs01 {

    // 交换机名称
    private static final string exchange_name = "logs";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();

        // 声明一个交换机
        channel.exchangedeclare(exchange_name, "fanout");

        // 声明一个队列 临时队列
        // 队列的名称是随机的
        // 当消费者断开与队列的连接的时候 队列就自动删除

        string queuename = channel.queuedeclare().getqueue();

        /**
         * 绑定交换机与队列
         */
        channel.queuebind(queuename, exchange_name, "");
        system.out.println("等待接收消息,把接收到消息打印在屏幕上......");

        // 接收消息

        // 消费者取消消息时回调接口
        delivercallback delivercallback = (consumertag, message) -> {
            system.out.println("receivelogs01接收到的消息:" + new string(message.getbody(), "utf-8"));
        };

        channel.basicconsume(queuename, true, delivercallback, consumertag -> {
        });
    }

}

/**
 * 消息接收
 */
public class receivelogs02 {

    // 交换机名称
    private static final string exchange_name = "logs";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();

        // 声明一个交换机
        channel.exchangedeclare(exchange_name, "fanout");

        // 声明一个队列 临时队列
        // 队列的名称是随机的
        // 当消费者断开与队列的连接的时候 队列就自动删除

        string queuename = channel.queuedeclare().getqueue();

        /**
         * 绑定交换机与队列
         */
        channel.queuebind(queuename, exchange_name, "");
        system.out.println("等待接收消息,把接收到消息打印在屏幕上......");

        // 接收消息

        // 消费者取消消息时回调接口
        delivercallback delivercallback = (consumertag, message) -> {
            system.out.println("receivelogs02接收到的消息:" + new string(message.getbody(), "utf-8"));
        };

        channel.basicconsume(queuename, true, delivercallback, consumertag -> {
        });
    }
}

我们可以看到,我们发送的m1和 m2,会被 两个队列全部接收

direct

发送的时候必须指定路由规则,exchange需要根据routingkey把消息发送给每一个匹配的queue
如果多个队列具有相同的 routingkey,和 fanout 的情况就会类似

public class directlog {
    // 交换机名称
    private static final string exchange_name = "direct_logs";

    public static void main(string[] args) throws exception {

        channel channel = rabbitmqutils.getchannel();

        scanner scanner = new scanner(system.in);

        while (scanner.hasnext()) {
            string message = scanner.next();
            channel.basicpublish(exchange_name, "error", null, message.getbytes("utf-8"));
            system.out.println("生产者发出消息: " + message);
        }
    }
}

public class receivelogsdirect01 {
    public static final string exchange_name = "direct_logs";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();
        // 声明一个交换机
        channel.exchangedeclare(exchange_name, builtinexchangetype.direct);
        // 声明一个队列
        channel.queuedeclare("console", false, false, false, null);

        channel.queuebind("console", exchange_name, "info");
        channel.queuebind("console", exchange_name, "warning");
        
        delivercallback delivercallback = (consumertag, message) -> {
            system.out.println("receivelogsdirect01接收到的消息:" + new string(message.getbody(), "utf-8"));
        };

        channel.basicconsume("console", true, delivercallback, consumertag -> {
        });
    }
}

public class receivelogsdirect02 {

    public static final string exchange_name = "direct_logs";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();
        // 声明一个交换机
        channel.exchangedeclare(exchange_name, builtinexchangetype.direct);
        // 声明一个队列
        channel.queuedeclare("disk", false, false, false, null);

        channel.queuebind("disk", exchange_name, "error");


        delivercallback delivercallback = (consumertag, message) -> {
            system.out.println("receivelogsdirect02接收到的消息:" + new string(message.getbody(), "utf-8"));
        };

        channel.basicconsume("disk", true, delivercallback, consumertag -> {
        });
    }
}

这里在测试的时候,我们需要向不同的 routingkey 发消息,对应的消息就会根据  routingkey 进入到不同的队列

topic

可以理解为 是在 direct 的基础上加上了模糊匹配的规则,模糊匹配规则有如下两条

  • *可以代替一个单词
  • #可以替代零个或多个单词

public class emitlogtopic {
    // 交换机名称
    private static final string exchange_name = "topic_logs";

    public static void main(string[] args) throws exception {

        channel channel = rabbitmqutils.getchannel();

        scanner scanner = new scanner(system.in);

        /**
         * q1-->绑定的是
         * 中间带 orange 带 3 个单词的字符串(*.orange.*)
         * q2-->绑定的是
         * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
         * 第一个单词是 lazy 的多个单词(lazy.#)
         *
         */
        map<string, string> bindingkeymap = new hashmap<>();
        bindingkeymap.put("quick.orange.rabbit", "被队列 q1q2 接收到");
        bindingkeymap.put("lazy.orange.elephant", "被队列 q1q2 接收到");
        bindingkeymap.put("quick.orange.fox", "被队列 q1 接收到");
        bindingkeymap.put("lazy.brown.fox", "被队列 q2 接收到");

        bindingkeymap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 q2 接收一次");
        bindingkeymap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingkeymap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingkeymap.put("lazy.orange.male.rabbit", "是四个单词但匹配 q2");
        for (map.entry<string, string> bindingkeyentry :
                bindingkeymap.entryset()) {
            string bindingkey =
                    bindingkeyentry.getkey();
            string message = bindingkeyentry.getvalue();
            channel.basicpublish(exchange_name, bindingkey, null,
                    message.getbytes("utf-8"));
            system.out.println("生产者发出消息" + message);
        }

    }
}

/**
 * 声明主题交换机 及相关队列
 *
 * 消费者 c1
 */
public class receivelogstopic01 {

    // 交换机名称
    private static final string exchange_name = "topic_logs";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();
        // 声明一个交换机
        channel.exchangedeclare(exchange_name, builtinexchangetype.topic);
        // 声明一个队列
        string queuename = "q1";
        channel.queuedeclare(queuename, false, false, false, null);

        channel.queuebind(queuename, exchange_name, "*.orange.*");

        delivercallback delivercallback = (consumertag, message) -> {
            system.out.println(new string(message.getbody(), "utf-8"));
            system.out.println("接收队列: " + queuename + " 绑定键: " + message.getenvelope().getroutingkey());
        };

        channel.basicconsume(queuename, true, delivercallback, consumertag -> {
        });
    }
}

/**
 * 声明主题交换机 及相关队列
 * <p>
 * 消费者 c1
 */
public class receivelogstopic02 {

    // 交换机名称
    private static final string exchange_name = "topic_logs";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();
        // 声明一个交换机
        channel.exchangedeclare(exchange_name, builtinexchangetype.topic);
        // 声明一个队列
        string queuename = "q2";
        channel.queuedeclare(queuename, false, false, false, null);

        channel.queuebind(queuename, exchange_name, "*.orange.rabbit");
        channel.queuebind(queuename, exchange_name, "lazy.#");


        delivercallback delivercallback = (consumertag, message) -> {
            system.out.println(new string(message.getbody(), "utf-8"));
            system.out.println("接收队列: " + queuename + " 绑定键: " + message.getenvelope().getroutingkey());
        };

        channel.basicconsume(queuename, true, delivercallback, consumertag -> {
        });
    }
}

交换机和队列的声明方式

基于注解和编程

@rabbitlistener(bindings = @queuebinding(
    value = @queue(name = "direct.queue1"),
    exchange = @exchange(name = "hmall.direct", type = exchangetypes.direct),
    key = {"red", "blue"}
))
public void listendirectqueue1(string msg){
    system.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@rabbitlistener(bindings = @queuebinding(
    value = @queue(name = "direct.queue2"),
    exchange = @exchange(name = "hmall.direct", type = exchangetypes.direct),
    key = {"red", "yellow"}
))
public void listendirectqueue2(string msg){
    system.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

@rabbitlistener(bindings = @queuebinding(
    value = @queue(name = "topic.queue1"),
    exchange = @exchange(name = "hmall.topic", type = exchangetypes.topic),
    key = "china.#"
))
public void listentopicqueue1(string msg){
    system.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@rabbitlistener(bindings = @queuebinding(
    value = @queue(name = "topic.queue2"),
    exchange = @exchange(name = "hmall.topic", type = exchangetypes.topic),
    key = "#.news"
))
public void listentopicqueue2(string msg){
    system.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

消息转换器

<dependency>
    <groupid>com.fasterxml.jackson.dataformat</groupid>
    <artifactid>jackson-dataformat-xml</artifactid>
    <version>2.9.10</version>
</dependency>
@bean
public messageconverter messageconverter(){
    // 1.定义消息转换器
    jackson2jsonmessageconverter jackson2jsonmessageconverter = new jackson2jsonmessageconverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息
    jackson2jsonmessageconverter.setcreatemessageids(true);
    return jackson2jsonmessageconverter;
}

如果spring-boot-starter-web则无需重复引入

可靠性

生产者的可靠性

重试机制
spring:
  rabbitmq:
    connection-timeout: 1s # 设置mq的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

阻塞重试,建议禁用
验证方式,发送消息的时候把 rabbitmq 停用

生产者消息确认机制

publisher confirm->生产者把消息成功发送给了 exchange,ack 和 nack
publisher return->exchange路由消息失败会触发
如何开启

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制
@test
    void testpublisherconfirm() {
        // 1. 创建 correlationdata
        correlationdata cd = new correlationdata();
        // 2. 给 future 添加 confirmcallback
        cd.getfuture().addcallback(new listenablefuturecallback<correlationdata.confirm>() {
            @override
            public void onfailure(throwable ex) {
                // 2.1 future 异常 基本不会出现
                log.error("send message fail", ex);
            }

            @override
            public void onsuccess(correlationdata.confirm result) {
                // 2.2 future 接收到回执的处理逻辑,参数中的 result 就是回执内容
                if (result.isack()) {
                    log.info("发送消息成功,收到 ack");
                } else {
                    log.error("发送消息失败,收到nack,reason: {}", result.getreason());
                }
            }
        });
        // 3. 发送消息
        rabbittemplate.convertandsend("hmall.direct", "q", "hello", cd);
    }

    @postconstruct
    public void init(){
        rabbittemplate.setreturnscallback(new rabbittemplate.returnscallback() {
            @override
            public void returnedmessage(returnedmessage returned) {
                log.error("触发return callback,");
                log.info("exchange: {}", returned.getexchange());
                log.info("routingkey: {}", returned.getroutingkey());
                log.info("message: {}", returned.getmessage());
                log.info("replycode: {}", returned.getreplycode());
                log.info("replytext: {}", returned.getreplytext());
            }
        });
    }

这个案例routingkey是匹配不到 queue 的,所有会返回 ack,然后触发 returncallback

生产建议

不建议开启 publisher return ,最多仅仅开启 publisher confirm

mq本身的可靠性

数据持久化
  • 交换机持久化
  • 队列持久化
  • 消息持久化

如果在开启持久化的同时开启 ack,会在持久化完成后才ack,但是由于持久化是批量的,所以建议 ack 使用异步

惰性队列

直接把消息发到磁盘,而不是先到内存再到磁盘

消费者的可靠性

处理模式
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

消费处理完消息后的三种回执

  • ack 成功处理 rabbitmq删除这条消息
  • nack 消息处理失败 重新投递
  • reject 消息处理失败并拒绝该消息 删除

三种处理模式

  • none 投递完以后 ack
  • manual 手动模式 手动设置 ack 或者 reject
  • auto spring amqp 帮我们做了增强,正常 ack,业务异常 nack, 消息处理或者校验异常 reject
@rabbitlistener(queues = "simple.queue")
    public void listensimplequeuemessage(string msg) throws interruptedexception {
        log.info("spring 消费者接收到消息:【" + msg + "】");
        if (true) {
//            throw new messageconversionexception("故意的"); // reject
            throw new runtimeexception(""); // 会重试
        }
        log.info("消息处理完成");
    }

测试方式,先测试 none 模式,会发现直接删掉了。再测试 auto ,分别测试 messageconversionexceptionruntimeexception,前者删掉,后者触发重试

失败重试机制

默认是重新在mq中入队出队

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

可配置在客户端重试
重试三次后,返回 reject 删掉了消息

失败处理策略

messagerecovery定义
默认是丢弃rejectanddontrequeuerecoverer
immediaterequeuemessagerecoverer重新入队
republishmessagerecoverer

package com.itheima.consumer.config;

import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.amqp.rabbit.retry.messagerecoverer;
import org.springframework.amqp.rabbit.retry.republishmessagerecoverer;
import org.springframework.context.annotation.bean;

@configuration
@conditionalonproperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingvalue = "true")
public class errormessageconfig {
    @bean
    public directexchange errormessageexchange(){
        return new directexchange("error.direct");
    }
    @bean
    public queue errorqueue(){
        return new queue("error.queue", true);
    }
    @bean
    public binding errorbinding(queue errorqueue, directexchange errormessageexchange){
        return bindingbuilder.bind(errorqueue).to(errormessageexchange).with("error");
    }

    @bean
    public messagerecoverer republishmessagerecoverer(rabbittemplate rabbittemplate){
        return new republishmessagerecoverer(rabbittemplate, "error.direct", "error");
    }
}
业务幂等性
  1. 唯一消息 id,业务处理成功后把id保存到数据库,处理前查询判断这条消息是否处理过
@bean
public messageconverter messageconverter(){
    // 1.定义消息转换器
    jackson2jsonmessageconverter jjmc = new jackson2jsonmessageconverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息
    jjmc.setcreatemessageids(true);
    return jjmc;
}
  1. 业务幂等

死信队列

存放没有被消费的消息的队列
概念当中比较重要的是死信的来源,有三个

  • 消息 ttl 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒(basic.reject或basic.nack) 并且 requeue=false

这三种情况后面会分别模拟,值得说一下的是第三种情况,这里可以看一下之前讲到的 消息未应答时可以重新入队,如果这里配置不入队,就可以被添加到死信队列当中

注意一个点即可,配置的是 普通队列 与 死信交换机之间的关系

/**
 * 死信队列  生产者
 */
public class producer {

    // 普通交换机的名称
    public static final string normal_exchange = "normal_exchange";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();

        // 死信消息 设置 ttl 单位是 ms
        amqp.basicproperties properties = new amqp.basicproperties().builder()
                .expiration("10000")
                .build();

        for (int i = 0; i < 11; i++) {
            string message = "info" + i;
            channel.basicpublish(normal_exchange, "zhangsan", properties, message.getbytes());
        }

    }
}

**
 * 普通队列消费者
 */
public class consumer01 {

    // 普通交换机的名称
    public static final string normal_exchange = "normal_exchange";

    // 死信交换机的名称
    public static final string dead_exchange = "dead_exchange";

    // 普通队列的名称
    public static final string normal_queue = "normal_queue";

    // 死信队列的名称
    public static final string dead_queue = "dead_queue";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();

        // 声明死信和普通交换机, 类型为 direct
        channel.exchangedeclare(normal_exchange, builtinexchangetype.direct);
        channel.exchangedeclare(dead_exchange, builtinexchangetype.direct);

        // 声明普通队列和死信队列
        map<string, object> arguments = new hashmap<>();
        // 过期时间 不在这里设置 改为在生产者设置消息的 ttl 
        // arguments.put("x-message-ttl", 10000);
        // 正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", dead_exchange);
        // 设置死信 routing-key
        arguments.put("x-dead-letter-routing-key", "lisi");

        // 设置正常队列长度的限制
//        arguments.put("x-max-length", 6);

        channel.queuedeclare(normal_queue, false, false, false, arguments);

        channel.queuedeclare(dead_queue, false, false, false, null);

        // 绑定普通交换机与队列
        channel.queuebind(normal_queue, normal_exchange, "zhangsan");

        // 绑定死信交换机与队列
        channel.queuebind(dead_queue, dead_exchange, "lisi");

        system.out.println("等待接收消息....");

        delivercallback delivercallback = (consumertag, message) -> {
            string s = new string(message.getbody(), "utf-8");
//            if (s.equals("info5")) {
//                system.out.println("consumer01接收的消息是:" + new string(message.getbody(), "utf-8") + "此消息被拒绝");
//                channel.basicreject(message.getenvelope().getdeliverytag(), false);
//            } else {
                system.out.println("consumer01接收的消息是:" + new string(message.getbody(), "utf-8"));
                channel.basicack(message.getenvelope().getdeliverytag(), false);
//            }
        };
        // 开启手动应答
        channel.basicconsume(normal_queue, false, delivercallback, consumertag -> {
        });
    }
}

/**
 * 死信队列消费者
 */
public class consumer02 {

    // 死信队列的名称
    public static final string dead_queue = "dead_queue";

    public static void main(string[] args) throws exception {
        channel channel = rabbitmqutils.getchannel();

        system.out.println("等待接收消息....");

        delivercallback delivercallback = (consumertag, message) -> {
            system.out.println("consumer01接收的消息是:" + new string(message.getbody(), "utf-8"));
            channel.basicack(message.getenvelope().getdeliverytag(), false);
        };

        channel.basicconsume(dead_queue, false, delivercallback, consumertag -> {
        });
    }
}


我们可以看到 普通队列与死信交换机之间的关系
情况一模拟:ttl

 amqp.basicproperties properties = new amqp.basicproperties().builder()
                .expiration("10000")
                .build();

设置发送的消息的 ttl
模拟方式很简单,先启动 c1 然后关闭,然后启动消费者


情况2 超出队列大小
我们运行一次c2 ,把死信队列里面的消息消费掉

重新开始测试,为避免干扰我们去掉消息的ttl

设置队列最大长度为6,所以按照推测,如果发送11条消息,会有5条(超出部分)进入到死信队列

注:我们这里需要删除原来的队列,因为队列的参数被修改了


管理面板中删除即可
我们再次启动 c1 然后关闭 c1再开启 p

结果符合预期
情况3:
我们首先还是排除干扰,先开启c2 消费掉死信中的消息,然后删除队列normal,再然后注释掉 队列长度的配置
模拟方式也很简单,我们把 info5 这条消息 ,basicreject 给拒绝掉,看这条消息会不会进入到我们的死信队列



延迟队列

延迟队列的应用场景是很多的,订单十分钟内未付款取消等等
延迟队列的实现很简单,其实利用前面我们说到的消息的 ttl 属性就可以实现了
这里说一下 队列设置 ttl 和消息设置 ttl 的区别

这里的整合我们用 springboot
版本 2.3.8.release (大版本尽量一致)

<dependencies>
        <!--rabbitmq 依赖-->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-amqp</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupid>com.alibaba</groupid>
            <artifactid>fastjson</artifactid>
            <version>1.2.47</version>
        </dependency>

        <dependency>
            <groupid>io.springfox</groupid>
            <artifactid>springfox-swagger2</artifactid>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupid>io.springfox</groupid>
            <artifactid>springfox-swagger-ui</artifactid>
            <version>2.9.2</version>
        </dependency>

        <!--rabbitmq 测试依赖-->
        <dependency>
            <groupid>org.springframework.amqp</groupid>
            <artifactid>spring-rabbit-test</artifactid>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
        </dependency>

    </dependencies>
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

swagger 配置类

@configuration
@enableswagger2
public class swaggerconfig {

    public docket webapiconfig() {
        return new docket(documentationtype.swagger_2)
                .groupname("webapi")
                .apiinfo(webapiinfo())
                .select()
                .build();
    }

    private apiinfo webapiinfo() {
        return new apiinfobuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new contact("enjoy6288", "http://atguigu.com",
                        "1551388580@qq.com"))
                .build();
    }
}


配置类

@configuration
public class ttlqueueconfig {

    public static final string x_exchange = "x";
    public static final string queue_a = "qa";
    public static final string queue_b = "qb";
    public static final string y_dead_letter_exchange = "y";
    public static final string dead_letter_queue = "qd";

    // 声明 xexchange
    @bean("xexchange")
    public directexchange xexchange() {
        return new directexchange(x_exchange);
    }

    // 声明 xexchange
    @bean("yexchange")
    public directexchange yexchange() {
        return new directexchange(y_dead_letter_exchange);
    }

    //声明队列 a ttl 为 10s 并绑定到对应的死信交换机
    @bean("queuea")
    public queue queuea() {
        map<string, object> args = new hashmap<>(3);
//声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", y_dead_letter_exchange);
//声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "yd");
//声明队列的 ttl
        args.put("x-message-ttl", 10000);
        return queuebuilder.durable(queue_a).witharguments(args).build();
    }


    // 声明队列 a 绑定 x 交换机
    @bean
    public binding queueabindingx(@qualifier("queuea") queue queuea,
                                 @qualifier("xexchange") directexchange xexchange) {
        return bindingbuilder.bind(queuea).to(xexchange).with("xa");
    }

    //声明队列 b ttl 为 40s 并绑定到对应的死信交换机
    @bean("queueb")
    public queue queueb() {
        map<string, object> args = new hashmap<>(3);
//声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", y_dead_letter_exchange);
//声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "yd");
//声明队列的 ttl
        args.put("x-message-ttl", 40000);
        return queuebuilder.durable(queue_b).witharguments(args).build();
    }

    //声明队列 b 绑定 x 交换机
    @bean
    public binding queuebbindingx(@qualifier("queueb") queue queue1b,
                                  @qualifier("xexchange") directexchange xexchange) {
        return bindingbuilder.bind(queue1b).to(xexchange).with("xb");
    }

    //声明死信队列 qd
    @bean("queued")
    public queue queued() {
        return new queue(dead_letter_queue);
    }

    //声明死信队列 qd 绑定关系
    @bean
    public binding deadletterbindingqad(@qualifier("queued") queue queued,
                                        @qualifier("yexchange") directexchange yexchange) {
        return bindingbuilder.bind(queued).to(yexchange).with("yd");
    }

}

消费者

@slf4j
@component
public class deadletterqueueconsumer {

    @rabbitlistener(queues = "qd")
    public void received(message message, channel channel) throws exception {
        string msg = new string(message.getbody());
        log.info("当前时间:{},收到死信队列信息{}", new date().tostring(), msg);
    }
}

控制层 生产者

@slf4j
@requestmapping("ttl")
@restcontroller
public class sendmsgcontroller {
    @autowired
    private rabbittemplate rabbittemplate;

    @getmapping("sendmsg/{message}")
    public void sendmsg(@pathvariable string message) {
        log.info("当前时间:{},发送一条信息给两个 ttl 队列:{}", new date(), message);
        rabbittemplate.convertandsend("x", "xa", "消息来自 ttl 为 10s 的队列: " + message);
        rabbittemplate.convertandsend("x", "xb", "消息来自 ttl 为 40s 的队列: " + message);
    } 
}
get http://localhost:8080/ttl/sendmsg/aaa

优化 队列ttl存在硬编码

创建一条新的队列qc,不在队列上配置 ttl, 在消息上配置 ttl

@component
public class msgttlqueueconfig {
    public static final string y_dead_letter_exchange = "y";
    public static final string queue_c = "qc";

    //声明队列 c 死信交换机
    @bean("queuec")
    public queue queueb() {
        map<string, object> args = new hashmap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", y_dead_letter_exchange);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "yd");
        //没有声明 ttl 属性
        return queuebuilder.durable(queue_c).witharguments(args).build();
    }

    //声明队列 b 绑定 x 交换机
    @bean
    public binding queuebindingc(@qualifier("queuec") queue queuec,
                                 @qualifier("xexchange") directexchange xexchange) {
        return bindingbuilder.bind(queuec).to(xexchange).with("xc");
    }
}

生产者

@getmapping("sendexpirationmsg/{message}/{ttltime}")
    public void sendmsg(@pathvariable string message, @pathvariable string ttltime) {
        rabbittemplate.convertandsend("x", "xc", message, correlationdata -> {
            correlationdata.getmessageproperties().setexpiration(ttltime);
            return correlationdata;
        });
        log.info("当前时间:{},发送一条时长{}毫秒 ttl 信息给队列 c:{}", new date(), ttltime, message);
    }
###
get http://localhost:8080/ttl/sendexpirationmsg/你好 1/20000

###
get http://localhost:8080/ttl/sendexpirationmsg/你好 2/2000

存在问题,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
(callback相关的不用管哈)

使用rabbitmq插件 实现延迟队列

rabbitmq_delayed_message_exchange 解压存放到 plugins 目录

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

@configuration
public class delayedqueueconfig {

    public static final string delayed_queue_name = "delayed.queue";
    public static final string delayed_exchange_name = "delayed.exchange";
    public static final string delayed_routing_key = "delayed.routingkey";

    @bean("delayedqueue")
    public queue delayedqueue() {
        return new queue(delayed_queue_name);
    }

    //自定义交换机 我们在这里定义的是一个延迟交换机
    @bean("delayedexchange")
    public customexchange delayedexchange() {
        map<string, object> args = new hashmap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new customexchange(delayed_exchange_name, "x-delayed-message", true, false, args);
    }

    @bean
    public binding bindingdelayedqueue(@qualifier("delayedqueue") queue queue,
                                       @qualifier("delayedexchange") customexchange
                                               delayedexchange) {
        return bindingbuilder.bind(queue).to(delayedexchange).with(delayed_routing_key).noargs();
    }
}

我们指定创建延迟交换机

@getmapping("senddelaymsg/{message}/{delaytime}")
    public void sendmsg(@pathvariable string message, @pathvariable integer delaytime) {
        rabbittemplate.convertandsend(delayed_exchange_name, delayed_routing_key, message,
                correlationdata -> {
                    correlationdata.getmessageproperties().setdelay(delaytime);
                    return correlationdata;
                });
        log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new
                date(), delaytime, message);
    }


现在正常了

补充 win

官网下载
下载完成后不要勾选启动
先执行安装插件

rabbitmq-plugins enable rabbitmq_management

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com