当前位置: 代码网 > it编程>编程语言>Java > Spring Boot 事务实战之如何解决 DB 与 MQ 的"双写不一致"问题

Spring Boot 事务实战之如何解决 DB 与 MQ 的"双写不一致"问题

2025年12月10日 Java 我要评论
摘要:在分布式系统中,“先存数据库还是先发消息”是一个经典的架构难题。特别是在 im 系统的多媒体消息处理场景中,如果处理顺序不当,不仅会导致对象存储(oss)中产生无法回收的

摘要:在分布式系统中,“先存数据库还是先发消息”是一个经典的架构难题。特别是在 im 系统的多媒体消息处理场景中,如果处理顺序不当,不仅会导致对象存储(oss)中产生无法回收的“孤儿文件”,还会引发并发重复处理的问题。本文结合代码案例,探讨如何利用 spring 的 transactionsynchronizationmanager 实现 事务提交后触发 (trigger after commit) 机制,优雅解决数据库与消息队列的“双写一致性”问题。

1. 引言:一个看似简单的顺序问题

在“信令与媒体分离”的架构中,核心流程通常如下:api 服务收到消息 -> 落库(标记为 pending) -> 异步通知 worker 搬运文件

这一流程涉及两个异构系统的写操作:

  • db write:将消息元数据写入 mysql。
  • mq write:将搬运任务发布到 nats。

在实际开发中,直觉性的代码编写往往会陷入以下误区:

误区一:先发消息,后入库

// ❌ 错误示范
natspublisher.publish(task); // 1. 消息发出,worker 开始下载转存
messagerepository.save(message); // 2. 数据库报错(如字段超长、唯一键冲突)
// 后果:db 回滚,业务无记录,但 oss 中产生了一个永远无法被引用的“孤儿文件”。

误区二:在事务内发消息

// ❌ 错误示范
@transactional
public void handle() {
    messagerepository.save(message);
    natspublisher.publish(task); 
    // 3. 代码执行完毕,但在事务提交(commit)的一瞬间数据库连接断开
}
// 后果:worker 收到任务并完成处理,但在回调更新状态时发现 db 中不存在该记录。

2. 核心方案:事务提交后的“惊险一跃”

为了保证 “只有数据库确确实实持久化成功了,才去触发异步任务”,最佳实践是利用 spring 框架提供的事务同步机制。

以下是优化后的代码实现:

2.1 主业务逻辑

// 1. 准备阶段:预生成任务(纯内存操作,无副作用)
// 此时并没有真正发送 nats 消息,只是构建了对象
list<mediatransfertask> mediatasks = preparemediatransfertasks(msg, ids.sessionid());
// 2. 构建消息实体
wxmessage message = buildmessage(msg, accountid, ids.sessionid(), ids.senderid());
try {
    // 【核心步骤 a】数据库落库 (source of truth)
    // 这是唯一的“事实来源”。如果这里失败,后续一切都不应发生。
    messagerepository.save(message);
    log.info("message saved: id={}, wxid={}", message.getid(), message.getwxid());
    // 3. 发布会话更新事件 (内存事件或 mq)
    sessionupdateevent event = sessionupdateevent.builder()
        .accountid(accountid)
        // ... build params
        .build();
    sessioneventpublisher.publishsessionupdate(event);
    // 【核心步骤 b】注册事务回调
    // 关键点:这里不是立即发送,而是“预约”发送
    publishmediatransfertasksaftercommit(mediatasks);
    return processresult.success();
} catch (dataintegrityviolationexception e) {
    // 【并发场景的保护】
    // 如果两个线程同时处理同一条消息(如网络重放或客户端重试),
    // 数据库的唯一索引会抛出此异常。
    // 由于消息发送逻辑在事务提交后执行,失败的线程事务回滚,
    // 因此“aftercommit”钩子不会被触发,完美避免了 worker 重复搬运文件。
    log.debug("duplicate message (concurrent): {}", msg.getmessageid());
    return processresult.duplicate();
}

2.2 事务同步器的实现

publishmediatransfertasksaftercommit 方法利用了 spring 的 transactionsynchronizationmanager 来挂载回调。
private void publishmediatransfertasksaftercommit(list<mediatransfertask> tasks) {
    if (collectionutils.isempty(tasks)) {
        return;
    }
    // 判断当前是否在事务中
    if (transactionsynchronizationmanager.isactualtransactionactive()) {
        // 注册同步器
        transactionsynchronizationmanager.registersynchronization(new transactionsynchronization() {
            @override
            public void aftercommit() {
                // 【真正发送的时机】
                // 只有当 db 事务成功 commit 后,这一行才会执行
                // 此时 db 里一定有数据,worker 回调一定能成功
                tasks.foreach(mediatransferpublisher::publishmediatransfer);
                log.debug("async media tasks published after commit: size={}", tasks.size());
            }
        });
    } else {
        // 如果不在事务中(比如非事务方法调用),则立即发送(降级策略)
        tasks.foreach(mediatransferpublisher::publishmediatransfer);
    }
}

3. 深度解析:方案优势

3.1 杜绝“孤儿资源”

通过 aftercommit 钩子,严格保证了因果关系:因(db落库成功) -> 果(触发搬运)。 如果 messagerepository.save(message) 因为任何原因(业务校验失败、数据库异常)导致事务回滚,aftercommit 回调将永远不会被执行,nats 消息也就不会发出,从而从源头上避免了 oss 资源的浪费。

3.2 天然的幂等性防护

代码中对 dataintegrityviolationexception 的捕获处理是该方案的另一大亮点。 在分布式场景下,消息重复投递是常见现象。

  • 无保护模式:若未加控制,两个线程可能都会发出 nats 消息,导致 worker 下载上传两次同样的图片,浪费带宽和计算资源。
  • 事务同步模式:数据库的唯一约束(unique key)充当了“守门员”。第二个线程在 save 时会因冲突被拒绝,随之事务回滚。由于事务未成功提交, 其注册的 aftercommit 钩子自动失效。最终,只有抢锁成功的线程才会发出唯一的一条异步任务。

4. 兜底策略:应对“反向不一致”

虽然该方案解决了“有文件没记录”的问题,但理论上仍存在极低概率的“反向不一致”:db 提交成功了,但在执行 aftercommit 发送 nats 消息的一瞬间,服务宕机或断电。

此时,数据库中存在一条状态为 pending 的记录,但永远不会有 worker 来处理它。

为了达到金融级的一致性,系统应补充一个兜底补偿机制

  • **定时任务 (compensation job)**:每隔一定周期(如 5 分钟)扫描一次消息表。
  • 筛选条件create_time < 5分钟前 and media_status = 'pending'
  • 补偿动作:重新构建 mediatransfertask 并补发到 nats。

5. 总结

在处理“数据库事务”与“外部系统调用(mq/rpc)”混合的业务场景时,“事务同步器(transaction synchronization)” 是 spring 体系中解决双写一致性问题的利器。

通过这一模式的重构,系统实现了:

  • 资源一致性:杜绝了 oss 孤儿文件。
  • 并发安全性:利用数据库锁自动解决并发任务重复发布问题。
  • 逻辑严密性:确保状态流转严格遵循业务时序。

核心原则:先落库,再提交,回调之中发消息。

到此这篇关于spring boot 事务实战之如何优雅解决 db 与 mq 的"双写不一致"问题的文章就介绍到这了,更多相关springboot db与mq双写不一致内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com