一、延迟通知概述
延迟通知是指消息在发送后不会立即被消费,而是在指定的时间延迟后才被处理的消息传递机制。常见应用场景包括:
- 订单超时自动取消
- 定时任务调度
- 会议/活动前提醒
- 账单到期通知
二、rabbitmq 实现延迟通知的两种方案
方案对比
| 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| ttl + 死信队列 | 无需安装插件,原生支持 | 1. 队列级ttl不支持动态延迟 2. 消息级ttl存在性能问题 | 延迟时间固定或较少变化的场景 |
| 延迟插件 | 1. 支持每条消息单独设置延迟时间 2. 性能更好 3. 配置简单 | 需要安装额外插件 | 延迟时间不固定,需要灵活设置的场景 |
三、方案一:基于ttl和死信队列实现
1. 原理
- 利用消息或队列的ttl(time-to-live)特性使消息过期
- 配置死信交换机(dlx)接收过期消息
- 将死信消息路由到实际处理队列
2. 代码实现
2.1 配置类
package com.example.delaynotify.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.util.hashmap;
import java.util.map;
@configuration
public class ttldelayconfig {
// 普通交换机
public static final string delay_exchange = "delay_exchange";
// 普通队列
public static final string delay_queue = "delay_queue";
// 死信交换机
public static final string dead_letter_exchange = "dead_letter_exchange";
// 死信队列
public static final string dead_letter_queue = "dead_letter_queue";
// 路由键
public static final string delay_routing_key = "delay.key";
public static final string dead_letter_routing_key = "dead.letter.key";
// 声明死信交换机
@bean
public exchange deadletterexchange() {
return exchangebuilder.directexchange(dead_letter_exchange).durable(true).build();
}
// 声明死信队列
@bean
public queue deadletterqueue() {
return queuebuilder.durable(dead_letter_queue).build();
}
// 声明普通交换机
@bean
public exchange delayexchange() {
return exchangebuilder.directexchange(delay_exchange).durable(true).build();
}
// 声明延迟队列并绑定死信交换机
@bean
public queue delayqueue() {
map<string, object> args = new hashmap<>();
// 设置死信交换机
args.put("x-dead-letter-exchange", dead_letter_exchange);
// 设置死信路由键
args.put("x-dead-letter-routing-key", dead_letter_routing_key);
// 队列级ttl (10秒) - 如果需要消息级ttl可以不设置此参数
args.put("x-message-ttl", 10000);
return queuebuilder.durable(delay_queue)
.witharguments(args)
.build();
}
// 绑定普通队列和普通交换机
@bean
public binding delaybinding() {
return bindingbuilder.bind(delayqueue())
.to(delayexchange())
.with(delay_routing_key)
.noargs();
}
// 绑定死信队列和死信交换机
@bean
public binding deadletterbinding() {
return bindingbuilder.bind(deadletterqueue())
.to(deadletterexchange())
.with(dead_letter_routing_key)
.noargs();
}
}
2.2 生产者 - 发送延迟消息
package com.example.delaynotify.service;
import com.example.delaynotify.config.ttldelayconfig;
import org.springframework.amqp.amqpexception;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagepostprocessor;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
@service
public class ttldelaymessageservice {
@autowired
private rabbittemplate rabbittemplate;
// 发送固定延迟时间的消息(队列级ttl)
public void sendfixeddelaymessage(string message) {
system.out.println("发送固定延迟消息: " + message + ", 时间: " + system.currenttimemillis());
rabbittemplate.convertandsend(
ttldelayconfig.delay_exchange,
ttldelayconfig.delay_routing_key,
message
);
}
// 发送自定义延迟时间的消息(消息级ttl)
public void sendcustomdelaymessage(string message, long delaymillis) {
system.out.println("发送自定义延迟消息: " + message + ", 延迟时间: " + delaymillis + "ms, 时间: " + system.currenttimemillis());
rabbittemplate.convertandsend(
ttldelayconfig.delay_exchange,
ttldelayconfig.delay_routing_key,
message,
new messagepostprocessor() {
@override
public message postprocessmessage(message message) throws amqpexception {
// 设置消息级ttl
message.getmessageproperties().setexpiration(string.valueof(delaymillis));
return message;
}
}
);
}
}
2.3 消费者 - 接收延迟消息
package com.example.delaynotify.consumer;
import com.example.delaynotify.config.ttldelayconfig;
import com.rabbitmq.client.channel;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.io.ioexception;
@component
public class ttldelaymessageconsumer {
@rabbitlistener(queues = ttldelayconfig.dead_letter_queue)
public void receivedelaymessage(string message, channel channel, message msg) throws ioexception {
try {
system.out.println("接收到延迟消息: " + message + ", 时间: " + system.currenttimemillis());
// 处理业务逻辑 - 例如发送通知、更新状态等
processdelaymessage(message);
// 手动确认消息
channel.basicack(msg.getmessageproperties().getdeliverytag(), false);
} catch (exception e) {
system.out.println("消息处理失败: " + e.getmessage());
// 拒绝消息并丢弃
channel.basicnack(msg.getmessageproperties().getdeliverytag(), false, false);
}
}
private void processdelaymessage(string message) {
// 模拟发送通知的业务逻辑
system.out.println("执行通知业务: " + message);
// 这里可以调用邮件、短信、推送等服务
}
}
四、方案二:基于延迟插件实现
1. 安装延迟插件
1.1 docker环境安装
# 下载插件(根据rabbitmq版本选择对应版本) wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez # 复制插件到容器 docker cp rabbitmq_delayed_message_exchange-3.11.1.ez rabbitmq:/plugins # 进入容器启用插件 docker exec -it rabbitmq bash rabbitmq-plugins enable rabbitmq_delayed_message_exchange exit # 重启rabbitmq容器 docker restart rabbitmq
1.2 验证安装
在rabbitmq管理界面新建交换机时,如果能看到x-delayed-message类型,则表示插件安装成功。
2. 代码实现
2.1 配置类
package com.example.delaynotify.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.util.hashmap;
import java.util.map;
@configuration
public class plugindelayconfig {
// 延迟交换机
public static final string delay_plugin_exchange = "delay_plugin_exchange";
// 延迟队列
public static final string delay_plugin_queue = "delay_plugin_queue";
// 路由键
public static final string delay_plugin_routing_key = "delay.plugin.key";
// 声明延迟交换机(类型为x-delayed-message)
@bean
public customexchange delaypluginexchange() {
map<string, object> args = new hashmap<>();
// 设置底层路由模式为direct
args.put("x-delayed-type", "direct");
return new customexchange(
delay_plugin_exchange,
"x-delayed-message",
true, // 持久化
false, // 非自动删除
args
);
}
// 声明延迟队列
@bean
public queue delaypluginqueue() {
return queuebuilder.durable(delay_plugin_queue).build();
}
// 绑定延迟交换机和延迟队列
@bean
public binding delaypluginbinding() {
return bindingbuilder.bind(delaypluginqueue())
.to(delaypluginexchange())
.with(delay_plugin_routing_key)
.noargs();
}
}
2.2 生产者 - 发送延迟消息
package com.example.delaynotify.service;
import com.example.delaynotify.config.plugindelayconfig;
import org.springframework.amqp.amqpexception;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagepostprocessor;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
@service
public class plugindelaymessageservice {
@autowired
private rabbittemplate rabbittemplate;
// 发送延迟消息
public void senddelaymessage(string message, long delaymillis) {
system.out.println("使用插件发送延迟消息: " + message + ", 延迟时间: " + delaymillis + "ms, 时间: " + system.currenttimemillis());
rabbittemplate.convertandsend(
plugindelayconfig.delay_plugin_exchange,
plugindelayconfig.delay_plugin_routing_key,
message,
new messagepostprocessor() {
@override
public message postprocessmessage(message message) throws amqpexception {
// 设置延迟时间(毫秒)
message.getmessageproperties().setdelay((int) delaymillis);
return message;
}
}
);
}
}
2.3 消费者 - 接收延迟消息
package com.example.delaynotify.consumer;
import com.example.delaynotify.config.plugindelayconfig;
import com.rabbitmq.client.channel;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.io.ioexception;
@component
public class plugindelaymessageconsumer {
@rabbitlistener(queues = plugindelayconfig.delay_plugin_queue)
public void receivedelaymessage(string message, channel channel, message msg) throws ioexception {
try {
system.out.println("接收到插件延迟消息: " + message + ", 时间: " + system.currenttimemillis());
// 处理业务逻辑 - 例如发送通知、更新状态等
processdelaymessage(message);
// 手动确认消息
channel.basicack(msg.getmessageproperties().getdeliverytag(), false);
} catch (exception e) {
system.out.println("插件延迟消息处理失败: " + e.getmessage());
// 拒绝消息并丢弃
channel.basicnack(msg.getmessageproperties().getdeliverytag(), false, false);
}
}
private void processdelaymessage(string message) {
// 模拟发送通知的业务逻辑
system.out.println("执行通知业务: " + message);
// 这里可以调用邮件、短信、推送等服务
}
}
五、controller层实现
package com.example.delaynotify.controller;
import com.example.delaynotify.service.plugindelaymessageservice;
import com.example.delaynotify.service.ttldelaymessageservice;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.requestparam;
import org.springframework.web.bind.annotation.restcontroller;
@restcontroller
public class delaynotifycontroller {
@autowired
private ttldelaymessageservice ttldelaymessageservice;
@autowired
private plugindelaymessageservice plugindelaymessageservice;
// 基于ttl的固定延迟
@getmapping("/ttl/fixed")
public string sendfixedttldelay(@requestparam string message) {
ttldelaymessageservice.sendfixeddelaymessage(message);
return "固定延迟消息已发送 (10秒)";
}
// 基于ttl的自定义延迟
@getmapping("/ttl/custom")
public string sendcustomttldelay(@requestparam string message, @requestparam long delaymillis) {
ttldelaymessageservice.sendcustomdelaymessage(message, delaymillis);
return "自定义延迟消息已发送 (" + delaymillis + "ms)";
}
// 基于插件的延迟
@getmapping("/plugin/delay")
public string sendplugindelay(@requestparam string message, @requestparam long delaymillis) {
plugindelaymessageservice.senddelaymessage(message, delaymillis);
return "插件延迟消息已发送 (" + delaymillis + "ms)";
}
}
六、application.yml配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: /
# 生产者确认配置
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
# 消费者配置
listener:
simple:
acknowledge-mode: manual
prefetch: 1
concurrency: 1
max-concurrency: 5
七、完整的通知场景实现示例
订单超时通知场景
package com.example.delaynotify.service;
import com.example.delaynotify.config.plugindelayconfig;
import org.springframework.amqp.core.messagepostprocessor;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
import java.time.localdatetime;
import java.time.format.datetimeformatter;
@service
public class ordernotifyservice {
@autowired
private rabbittemplate rabbittemplate;
private static final datetimeformatter formatter = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss");
/**
* 创建订单并设置超时通知
* @param orderid 订单id
* @param notifydelayseconds 超时时间(秒)
*/
public void createorderandsettimeout(string orderid, int notifydelayseconds) {
// 1. 保存订单逻辑
system.out.println("创建订单: " + orderid + " 时间: " + localdatetime.now().format(formatter));
// 2. 设置延迟通知
string notifymessage = "订单[" + orderid + "]已超时,需要取消处理";
long delaymillis = notifydelayseconds * 1000l;
system.out.println("设置订单超时通知,延迟: " + notifydelayseconds + "秒");
// 使用延迟插件发送通知消息
rabbittemplate.convertandsend(
plugindelayconfig.delay_plugin_exchange,
plugindelayconfig.delay_plugin_routing_key,
notifymessage,
message -> {
message.getmessageproperties().setdelay((int) delaymillis);
return message;
}
);
}
}
八、两种方案对比与选择建议
1. 性能对比
- ttl+死信队列:当使用消息级ttl时,rabbitmq需要为每条消息设置过期时间,会造成额外的性能开销
- 延迟插件:插件内部使用优先队列实现,性能更优,特别适合大量不同延迟时间的消息场景
2. 灵活性对比
- ttl+死信队列:如果要支持不同的延迟时间,需要创建多个不同ttl的队列
- 延迟插件:每条消息都可以设置不同的延迟时间,更加灵活
3. 选择建议
- 如果延迟时间固定或种类较少,可以使用ttl+死信队列方案,无需安装插件
- 如果延迟时间不固定或种类较多,强烈建议使用延迟插件方案
- 对于生产环境,建议使用延迟插件方案,性能更好、配置更简洁
通过以上两种方案,您可以根据实际需求选择合适的方式实现rabbitmq的延迟通知功能,满足订单超时、定时提醒等各种业务场景。
到此这篇关于rabbitmq实现延迟通知的完整方案的文章就介绍到这了,更多相关rabbitmq 延迟通知内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论