- 需求:业务单位需要观测订单动态,所以要在订单的各个状态节点上传状态至状态池系统
- 方案:又是easy的需求,不就是在每个订单状态改变时触发上传状态,为了不影响订单正常流转,增加一个消息中间件mq
前言
在java中,@transactional是一个非常重要的注解,用于声明事务管理的行为。
它可以被应用在类级别或方法级别上,并且提供了多种选项来控制事务的传播行为、隔离级别、超时设置和回滚条件等。
一、java的@transactional解释
@transactional 注解在 java 中用于声明式事务管理,它可以应用于接口、接口方法、类以及类的方法上。在默认配置下,spring 中的 @transactional 注解会使得方法在事务的上下文中执行,其事务边界默认为方法开始执行时开始,方法正常结束时提交事务,方法执行过程中抛出异常时回滚事务。
但是,你可以通过在 @transactional 注解中设置 propagation 属性来改变事务的传播行为,例如可以设置为propagation.mandatory,这意味着该方法必须在一个已经存在的事务中执行,否则就会抛出异常。
另外,你可以通过设置 phase 属性来改变事务的提交时机,例如可以设置为phase.after_completion,这样事务就会在整个事务完成后提交,不论事务是正常结束还是异常结束。
以下是一个使用 @transactional 注解的示例,其中 propagation 设置为propagation.required(默认值),表示如果当前存在事务,则加入该事务;如果不存在,则创建一个新的事务。
import org.springframework.transaction.annotation.transactional;
import org.springframework.stereotype.service;
@service
public class myservice {
@transactional
public void sometransactionalmethod() {
// 方法执行的代码
}
}
二、踩坑
接到的需求中:每个订单状态改变时触发上传状态,为了不影响订单正常流转,增加一个消息中间件mq
流程图如下:

二、状态统一入口:推送
private sendresult sendstatustomq(sendstatuspoolmsgvo sendstatuspoolmsgvo) {
string msg = json.tojsonstring(sendstatuspoolmsgvo, serializerfeature.prettyformat,
serializerfeature.writemapnullvalue);
logger.info("sendstatuspoolmsgtomq,发送的消息{}", msg);
message<string> message = messagebuilder.withpayload(msg).build();
sendresult sendresult = new sendresult();
try {
sendresult = rocketmqtemplate.syncsend(statuspooltopic, message, 3000, 2);
} catch (exception e) {
e.printstacktrace();
log.error("sendstatustomq err", e);
taskmqmodel taskmqmodel = new taskmqmodel();
taskmqmodel.settopic(statuspooltopic);
taskmqmodel.setmessagetype(messagetypeconstant.mq_supple_statuspool);
taskmqmodel.setdisposeflag(requestconstant.request_original);
taskmqmodel.setrawmessage(msg);
taskmqmodel.setcreatetime(new date());
taskmqservice.save(taskmqmodel);
}
return sendresult;
}
我这里做了个状态统一入口,同时为了避免推送mq服务失败,做了个异常捕获保存推送消息,方便重推。
三、mq消费
// ... 其他业务逻辑 ...
// 获取订单信息
string orderno = msgvo.getorderno();
ordermodel ordermodel = orderservice.getorderbyorderno(orderno);
//如果是客户下单再去待处理订单查询
if (objects.isnull(fmordermodel)) {
logger.err("未查到信息,初始化失败!");
return;
}
//
然后我的日志全是:
[2024-08-8 08:36:50.161][error][consumemessagethread_2] 未查到信息,初始化失败! [2024-08-8 08:37:57.015][error][consumemessagethread_2] 未查到信息,初始化失败! [2024-08-8 08:42:45.536][error][consumemessagethread_2] 未查到信息,初始化失败!
我查看数据库是完全能查到的,有点奇怪❗❗❗❗❓❓❓❓
四、问题与解决
在java中,@transactional 注解通常用于声明方法执行的事务边界,确保方法内的一系列操作要么全部成功,要么全部失败。它通常与spring框架的事务管理器一起使用,以提供声明式的事务管理。
如果你的订单数据保存操作使用了 @transactional,并且在保存数据之后立即推送消息到消息队列(mq),那么消费者可能查不到数据的原因可能与事务的隔离级别有关。
下面是我搜到的一些可能导致该问题的原因:
- 事务隔离级别:如果你的数据库事务使用了较高的隔离级别(如可重复读或串行化),那么在事务提交之前,其他事务(包括mq消费者)可能看不到该事务所做的更改。
- 事务传播行为:如果mq消费者也运行在spring管理的事务中,并且事务的传播行为设置为不支持当前事务(例如,
propagation.not_supported),那么消费者可能会在不同的事务上下文中运行,从而看不到未提交的更改。 - 延迟提交:如果事务在消息发送之后才提交,mq消费者可能会在事务提交之前读取数据库,因此看不到新数据。
- mq消息延迟:如果消息队列本身存在延迟,消费者可能会在数据还未写入数据库时就接收到消息。
- 数据库缓存:某些数据库实现可能会缓存数据,这可能导致即使数据已经写入,消费者也无法立即看到更新。
- mq消费者处理逻辑:如果mq消费者在处理消息时没有正确地处理事务或数据库查询,也可能导致查不到数据。
解决方式
在spring框架中,我通过实现transactionsynchronization接口来在事务提交后执行回调操作。
以下是实现事务提交回调发送消息的方法:
@component
public class transactionalmessagesender implements transactionsynchronization {
@autowired
private somemessageservice messageservice; // 消息发送服务
@override
public void beforecommit(boolean readonly) {
// 这里可以执行一些操作,但事务还未提交
}
@override
public void beforecompletion() {
// 事务即将提交,可以在这里准备发送消息
}
@override
public void aftercommit() {
// 事务已经提交,可以在这里发送消息
sendresult = rocketmqtemplate.syncsend(statuspooltopic, message, 3000, 2);
}
@override
public void aftercompletion(transactionstatus status) {
// 事务已经完成,无论是提交还是回滚
if (status == transactionstatus.committed) {
// 这里可以执行一些事务提交后的清理工作
}
}
}
五、总结
在 spring 框架中使用 @transactional 注解时,事务管理器会在事务边界内管理数据库操作。
当你在一个事务中执行数据库操作后立即发送消息到消息队列(mq),可能会遇到一个问题:mq 消费者在接收到消息并尝试查询数据库时,发现数据库中并没有预期的数据。
这个问题的根本原因在于事务的隔离级别和事务提交的时间点。
当事务尚未提交时,其他事务(包括 mq 消费者的查询操作)是无法看到该事务内的修改的。
即使你在事务中已经执行了数据库操作(如插入或更新),这些修改对其他事务来说仍然是不可见的,直到当前事务提交。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论