下面将详细介绍如何在springboot中使用rocketmq实现延迟消息,包括基于延迟级别和基于具体时间两种方式的完整实现。
一、延迟消息概述
rocketmq提供了两种类型的延迟消息机制:
- 延迟消息:消息发送后延迟指定的时间长度再被消费
- 定时消息:消息在指定的具体时间点被消费
这两种机制在订单超时取消、会议提醒、定时任务调度等场景中有广泛应用。
二、环境准备
1. 添加maven依赖
<dependency>
<groupid>org.apache.rocketmq</groupid>
<artifactid>rocketmq-spring-boot-starter</artifactid>
<version>2.2.3</version>
</dependency>
2. 配置文件设置
在application.yml中配置rocketmq连接信息:
rocketmq:
name-server: localhost:9876
producer:
group: delay-message-producer-group
三、延迟级别机制实现
1. 默认延迟级别
rocketmq默认提供18个延迟级别,定义在messagestoreconfig类中:
messagedelaylevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
对应关系:
- level=1: 延迟1秒
- level=2: 延迟5秒
- level=3: 延迟10秒
- level=4: 延迟30秒
- level=5: 延迟1分钟
- level=6: 延迟2分钟
- ...以此类推
- level=18: 延迟2小时
2. 基于延迟级别的生产者实现
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.message;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.stereotype.component;
@component
public class delaylevelproducer {
@autowired
private rocketmqtemplate rocketmqtemplate;
/**
* 发送基于延迟级别的消息
* @param topic 主题
* @param tag 标签
* @param message 消息内容
* @param delaylevel 延迟级别(1-18)
*/
public void sendmessagebydelaylevel(string topic, string tag, string message, int delaylevel) {
// 创建消息
message<string> springmessage = messagebuilder.withpayload(message).build();
// 发送延迟消息
sendresult sendresult = rocketmqtemplate.syncsend(
topic + ":" + tag,
springmessage,
3000, // 超时时间
delaylevel // 延迟级别
);
system.out.println("延迟级别消息发送成功: " + sendresult);
}
/**
* 发送订单超时取消消息(延迟15分钟)
*/
public void sendordertimeoutmessage(string orderid) {
string message = "订单超时取消: " + orderid;
// 15分钟对应level=14(根据默认配置)
sendmessagebydelaylevel("ordertopic", "timeout", message, 14);
}
}
四、基于具体时间的延迟消息实现
1. 定时消息生产者
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.common.message.messageconst;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.message;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.stereotype.component;
import java.util.date;
@component
public class scheduledmessageproducer {
@autowired
private rocketmqtemplate rocketmqtemplate;
/**
* 发送延迟指定毫秒数的消息
*/
public void sendmessagewithdelayms(string topic, string message, long delayms) {
// 计算投递时间
long delivertimems = system.currenttimemillis() + delayms;
// 创建消息并设置投递时间
message<string> springmessage = messagebuilder.withpayload(message)
.setheader(messageconst.property_delay_time_ms, string.valueof(delayms))
.setheader(messageconst.property_timer_deliver_ms, string.valueof(delivertimems))
.build();
sendresult sendresult = rocketmqtemplate.syncsend(topic, springmessage);
system.out.println("延迟毫秒消息发送成功: " + sendresult);
}
/**
* 发送指定时间点投递的消息
*/
public void sendmessageattime(string topic, string message, date delivertime) {
long delivertimems = delivertime.gettime();
// 创建消息并设置投递时间
message<string> springmessage = messagebuilder.withpayload(message)
.setheader(messageconst.property_timer_deliver_ms, string.valueof(delivertimems))
.build();
sendresult sendresult = rocketmqtemplate.syncsend(topic, springmessage);
system.out.println("定时投递消息发送成功: " + sendresult);
}
/**
* 发送10秒后投递的消息
*/
public void sendtensecondslatermessage(string topic, string message) {
sendmessagewithdelayms(topic, message, 10000l);
}
}
五、消费者实现
延迟消息的消费者与普通消息消费者相同,无需特殊配置:
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.component;
import java.time.localdatetime;
import java.time.format.datetimeformatter;
@component
@rocketmqmessagelistener(
topic = "ordertopic",
consumergroup = "delay-message-consumer-group",
selectorexpression = "timeout"
)
public class ordertimeoutconsumer implements rocketmqlistener<string> {
@override
public void onmessage(string message) {
string now = localdatetime.now().format(datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"));
system.out.println("[" + now + "] 接收到订单超时消息: " + message);
// 处理订单取消逻辑
processordercancellation(message);
}
private void processordercancellation(string message) {
// 提取订单id
string orderid = message.substring(message.indexof(":") + 2);
system.out.println("执行订单取消操作,订单id: " + orderid);
// 这里可以调用订单服务进行取消操作
}
}
六、controller层实现
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.format.annotation.datetimeformat;
import org.springframework.web.bind.annotation.*;
import java.util.date;
@restcontroller
@requestmapping("/api/delay")
public class delaymessagecontroller {
@autowired
private delaylevelproducer delaylevelproducer;
@autowired
private scheduledmessageproducer scheduledmessageproducer;
/**
* 发送基于延迟级别的消息
*/
@postmapping("/level")
public string sendbydelaylevel(
@requestparam string topic,
@requestparam string tag,
@requestparam string message,
@requestparam(defaultvalue = "3") int delaylevel) {
delaylevelproducer.sendmessagebydelaylevel(topic, tag, message, delaylevel);
return "延迟级别消息发送成功,延迟级别: " + delaylevel;
}
/**
* 发送订单超时取消消息
*/
@postmapping("/order/timeout")
public string sendordertimeout(@requestparam string orderid) {
delaylevelproducer.sendordertimeoutmessage(orderid);
return "订单超时取消消息已发送,订单id: " + orderid;
}
/**
* 发送延迟指定毫秒的消息
*/
@postmapping("/milliseconds")
public string sendbydelayms(
@requestparam string topic,
@requestparam string message,
@requestparam long delayms) {
scheduledmessageproducer.sendmessagewithdelayms(topic, message, delayms);
return "延迟毫秒消息发送成功,延迟: " + delayms + "ms";
}
/**
* 发送指定时间点的消息
*/
@postmapping("/scheduled")
public string sendscheduled(
@requestparam string topic,
@requestparam string message,
@requestparam @datetimeformat(pattern = "yyyy-mm-dd hh:mm:ss") date delivertime) {
scheduledmessageproducer.sendmessageattime(topic, message, delivertime);
return "定时消息发送成功,投递时间: " + delivertime;
}
}
七、自定义延迟级别配置
在broker的配置文件中可以自定义延迟级别:
# 在broker.conf文件中添加 messagedelaylevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h
重启broker使其生效。注意,修改延迟级别后,所有使用延迟级别的消息都会使用新的配置。
八、两种实现方式对比
| 特性 | 基于延迟级别 | 基于具体时间 |
|---|---|---|
| 灵活性 | 较低,只能使用预定义级别 | 高,可以精确到毫秒 |
| 适用版本 | 全版本支持 | rocketmq 5.x及以上版本完整支持 |
| 使用场景 | 固定延迟时间的场景 | 需要精确控制投递时间的场景 |
| 配置复杂度 | 简单,无需额外配置 | 可能需要在broker端开启相关功能 |
九、使用注意事项
延迟精度:
- 延迟消息的投递时间不是完全精确的,有一定误差
- 在高并发场景下,误差可能会增大
版本兼容性:
- 基于具体时间的延迟消息在rocketmq 5.x版本支持更完善
- 在低版本中可能需要使用延迟级别机制
性能考虑:
- 大量延迟消息可能会增加broker的负担
- 对于长时间延迟的消息,考虑使用其他方案(如定时任务+消息队列组合)
消息可靠性:
- 延迟消息同样支持持久化,确保broker重启后不会丢失
- 建议开启消息确认机制确保消息可靠投递
十、测试示例
发送订单超时取消消息(延迟15分钟):
post /api/delay/order/timeout?orderid=order123456
发送10秒后投递的消息:
post /api/delay/milliseconds?topic=testtopic&message=hellodelay&delayms=10000
发送指定时间点的消息:
post /api/delay/scheduled?topic=testtopic&message=helloscheduled&delivertime=2024-12-25%2000:00:00
通过以上配置和代码,您可以在springboot项目中轻松实现基于rocketmq的延迟消息功能,满足各种定时任务和延迟处理的业务需求。
到此这篇关于springboot+rocketmq实现延迟消息的示例代码的文章就介绍到这了,更多相关springboot rocketmq 延迟内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论