本方案参考马哥短连接项目中的消息幂等处理方案
一.生成者模版代码
@slf4j
@component
@requiredargsconstructor
@conditionalonproperty(name = "message-queue.type", havingvalue = "rocketmq")
public class custommessageproducer implements messagequeueproducer {
private final rocketmqtemplate rocketmqtemplate;
@value("${rocketmq.producer.topic}")
private string customtopic;
/**
* 通用的发送方法,允许自定义消息内容和处理逻辑
* @param messagepayload 消息体数据
*/
@override
public void send(map<string, string> messagepayload) {
// 生成唯一消息键,用于标识消息的幂等性
string messageid = uuid.randomuuid().tostring();
messagepayload.put("messageid", messageid);
// 构建消息
message<map<string, string>> message = messagebuilder
.withpayload(messagepayload)
.setheader(messageconst.property_keys, messageid)
.build();
// 发送消息并处理结果
try {
sendresult sendresult = rocketmqtemplate.syncsend(customtopic, message, 2000l);
log.info("消息发送成功: 状态={}, 消息id={}, 消息键={}", sendresult.getsendstatus(), sendresult.getmsgid(), messageid);
} catch (exception e) {
log.error("消息发送失败: 消息内容={}", json.tojsonstring(messagepayload), e);
// 添加自定义的失败处理逻辑
}
}
}
关键说明
- 幂等性标识:在
messagepayload中加入messageid,确保消息唯一性,便于后续在 redis 中验证和处理消息幂等。 - 自定义逻辑:为不同的业务需求,调整
customtopic及messagepayload内容。 - 异常处理:保留自定义的异常处理接口,以便对失败消息进行处理或重试。
这样设计有助于实现通用的消息发送,只需更改 messagepayload 数据结构和自定义处理逻辑即可适应不同业务。
二.消息幂等处理器模版代码
package com.example.project.mq.idempotent;
import lombok.requiredargsconstructor;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.stereotype.component;
import java.util.objects;
import java.util.concurrent.timeunit;
/**
* 消息幂等处理器
*/
@component
@requiredargsconstructor
public class messageidempotenthandler {
private final stringredistemplate stringredistemplate;
private static final string idempotent_key_prefix = "message:idempotent:";
/**
* 判断消息是否已被处理
*
* @param messageid 消息唯一标识
* @return true 表示消息未处理,可以继续处理;false 表示消息已处理,避免重复消费
*/
public boolean ismessagenotprocessed(string messageid) {
string key = idempotent_key_prefix + messageid;
// 尝试设置新键,如果不存在则返回 true 表示未处理,存在则返回 false
return boolean.true.equals(stringredistemplate.opsforvalue().setifabsent(key, "0", 2, timeunit.minutes));
}
/**
* 标记消息处理流程完成
*
* @param messageid 消息唯一标识
*/
public void markasprocessed(string messageid) {
string key = idempotent_key_prefix + messageid;
stringredistemplate.opsforvalue().set(key, "1", 2, timeunit.minutes);
}
/**
* 查询消息是否已经处理完成
*
* @param messageid 消息唯一标识
* @return true 表示消息处理已完成,false 表示未完成
*/
public boolean isprocessingcomplete(string messageid) {
string key = idempotent_key_prefix + messageid;
return objects.equals(stringredistemplate.opsforvalue().get(key), "1");
}
/**
* 处理异常时删除幂等标识
*
* @param messageid 消息唯一标识
*/
public void clearprocessedflag(string messageid) {
string key = idempotent_key_prefix + messageid;
stringredistemplate.delete(key);
}
}
职责描述
整体职责:
messageidempotenthandler主要职责是保证消息在消费过程中只被处理一次,防止重复消费。它借助 redis 存储和检查消息的唯一标识符,以实现消息的幂等性控制。方法职责:
ismessagenotprocessed:判断消息是否已处理。此方法尝试设置一个短期过期的标识,如果消息尚未被消费(即 redis 中不存在该键),则可以继续处理,否则表示消息已处理。markasprocessed:标记消息为已完成消费。成功消费后调用该方法,改变 redis 中的键值标识,标记为已完成状态,避免重复处理。isprocessingcomplete:检查消息消费流程是否完成。消费完成后,该键值会被设置为"1",此方法用于验证该状态。clearprocessedflag:清除幂等标识。在消息消费失败或异常情况下,删除标识以便消息可重新消费。
三.生产者代码模版
@override
public void onmessage(map<string, string> producermap) {
// 获取消息的唯一标识符
string keys = producermap.get("keys");
// 检查是否已处理过该消息,幂等性控制
if (!messagequeueidempotenthandler.ismessageprocessed(keys)) {
// 若该消息流程尚未完成
if (messagequeueidempotenthandler.isaccomplish(keys)) {
return; // 跳过已完成流程的消息
}
throw new serviceexception("消息未完成流程,需要消息队列重试");
}
// 业务逻辑处理
try {
//调用业务方法代码
}
} catch (throwable ex) {
log.error("消费异常", ex);
throw ex;
}
// 标记消息处理完成
messagequeueidempotenthandler.setaccomplish(keys);
}
职责描述
onmessage 模板的职责是接收并处理消息队列中的消息,确保幂等性,并在需要时抛出异常让消息队列进行重试。以下是该方法中关键步骤的职责和操作说明:
- 消息幂等性校验:
- 通过
keys作为唯一标识符来检查消息是否已被处理过,避免重复消费。 - 使用
messagequeueidempotenthandler.ismessageprocessed(keys)方法,如果消息已处理,则跳过;如果未完成处理流程,但状态为“已完成”,则直接返回;否则抛出serviceexception,让消息队列进行重试。
- 通过
- 业务逻辑处理:
- 在
try块中进行消息的实际业务处理,调用相关业务逻辑方法,避免未捕获的异常导致幂等性标记不一致。 - 如果业务处理过程中抛出异常,记录异常日志 (
log.error("消费异常", ex)) 并再次抛出异常。
- 在
- 消息处理完成标记:
- 在成功完成业务逻辑后,调用
messagequeueidempotenthandler.setaccomplish(keys)将消息标记为已完成。 - 这样可以确保消息的状态被正确更新,即使在重试后也能避免重复消费,保证消息的幂等性。
- 在成功完成业务逻辑后,调用
四.消息消费流程
假设消息消费应用场景如下:
- 接收消息:消息队列接收到新消息,获取消息的唯一标识
messageid。 - 检查是否已处理:
- 调用
ismessagenotprocessed检查 redis 中是否有该消息的标识。 - 若返回
true,则说明此消息未被处理,进入消费流程。 - 若返回
false,则说明此消息已处理,直接跳过避免重复消费。
- 调用
- 消费消息:若消息未处理,则执行具体的业务逻辑。
- 标记完成状态:
- 消费完成后,调用
markasprocessed,在 redis 中将该消息标记为已完成。
- 消费完成后,调用
- 异常处理:
- 若消费过程中抛出异常,调用
clearprocessedflag删除该消息的 redis 标识,允许系统稍后重新尝试处理该消息。
- 若消费过程中抛出异常,调用
五.总结
在本方案中,通过使用 redis 实现 mq 消息的幂等处理,确保了消息在消费过程中只会被处理一次,避免了重复消费带来的业务异常和资源浪费。其主要特点和优势如下:
- 幂等性保证:使用 redis 的
setifabsent来判断消息是否已被处理,确保消息在消费过程中仅被执行一次,避免重复消费的风险。 - 通用性设计:
- 生产者代码采用通用模板,通过
uuid生成消息的唯一标识messageid,并将其嵌入到消息体中,保证每条消息的唯一性。 - 消费者代码采用幂等处理器模板
messageidempotenthandler,支持多种状态检查和异常处理。
- 生产者代码采用通用模板,通过
- 细化的异常处理:在消费过程中,若发生异常,可以及时删除 redis 中的标识,确保系统在下次重新消费该消息时不会被误认为已处理,增强了消息消费的健壮性。
- 灵活的业务集成:该方案可以根据不同业务需求调整消息内容和自定义的业务逻辑处理,适应多种场景下的消息幂等消费需求。
- 流程化管理:通过
markasprocessed和clearprocessedflag实现了消费完成状态标记和异常重试机制,确保消费的可靠性和一致性。
总的来说,此方案为消息幂等性控制提供了一种可扩展、通用且高效的实现方式,非常适合在高并发分布式系统中应用,能够有效提高消息消费的稳定性和安全性
到此这篇关于redis处理mq消费幂等的实现示例的文章就介绍到这了,更多相关redis mq消费幂等内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论