当前位置: 代码网 > it编程>编程语言>Java > SpringBoot整合RocketMq实现分布式事务

SpringBoot整合RocketMq实现分布式事务

2024年11月06日 Java 我要评论
大家好,今天我们继续分布式事务的学习,之前我们已经实战了springboot整合atomikosspringboot整合himly来实现分布式事务,今天我们继续学习springboot整合rocket

大家好,今天我们继续分布式事务的学习,之前我们已经实战了

springboot整合atomikos

springboot整合himly 

来实现分布式事务,今天我们继续学习springboot整合rocketmq来实现分布式事务

mq实现分布式事务原理

rocketmq提供了事务消息,要实现分布式事务主要还是利用它的事务消息

  • 1:服务a首先会发送一条半事务的消息到mq,此时服务接收方还是无法消费这条消息的
  • 2:半事务消息发送成功之后,服务a开始执行本地业务逻辑
  • 3:服务a执行完本地业务之后,提交事务,事务提交成功之后,mq这条半事务消息就会变成原始可消费的消息
  • 4:服务接收方这时候就可以消费到这条消息,继续执行后续业务了

那么在这整个过程中可能会出现的异常有哪些呢?

1:半事务消息发送失败

如果半事务消息发送失败,那么服务a就不会继续执行接下来的业务了,整个流程会直接退出

2:本地事务提交成功,发送commit消息失败

服务a事务提交之后,需要发送一条消息告诉mq,这条半事务消息可以消费了,但是这时候,commit消息发送失败了,那么这条消息就还是处于半事务状态,所以mq会进行回查

回查服务a这个事务是否成功了,如果成功了,就会发送回查结果,如果本地事务成功了,那么回查就会发送commit消息,这条消息重新设置为可消费状态

实战

服务-a

@transactional(rollbackfor = exception.class)
@override
public string mqinsert(test test) {

    //本地服务调用
    testdao.insert(test);

    //发送半事务消息
    //test是我们本地需要保存的一个对象
    message<string> message = messagebuilder.withpayload(jsonobject.tojsonstring(test)).build();
    rocketmqtemplate.sendmessageintransaction("test-topic", message, null);

    return "success";
}
@rocketmqtransactionlistener
public class transactionmqlistener implements rocketmqlocaltransactionlistener {

    @resource
    private testdao testdao;

    //执行本地事务
    @override
    public rocketmqlocaltransactionstate executelocaltransaction(message message, object o) {
        //获取半事务消息
        test test = jsonobject.parseobject(new string((byte[]) message.getpayload()), test.class);
        system.out.println("test | executelocaltransaction | 消息是:" + jsonobject.tojsonstring(test));
        
        //根据id查询该记录是否保存成功了
        test testexist = testdao.querybyid(test.getid());
        if(testexist == null) {
            //说明本地事务提交失败了,需要回滚
            return rocketmqlocaltransactionstate.rollback;
        }
        
        //本地事务提交成功
        return rocketmqlocaltransactionstate.commit;
    }

    //事务回查
    @override
    public rocketmqlocaltransactionstate checklocaltransaction(message message) {
        test test = jsonobject.parseobject(new string((byte[]) message.getpayload()), test.class);
        system.out.println("test | checklocaltransaction | 消息是:" + jsonobject.tojsonstring(test));
        
        //还是根据id去查询记录
        test testexist = testdao.querybyid(test.getid());
        if(testexist == null) {
            //不存在,说明本地事务提交失败,回滚
            return rocketmqlocaltransactionstate.rollback;
        }
        
        //本地事务提交成功了
        return rocketmqlocaltransactionstate.commit;
    }
}

服务b

@component
@rocketmqmessagelistener(topic = "test-topic",consumergroup = "cpy-consumer-group")
public class cpylistener implements rocketmqlistener<string> {


    //消息监听
    @override
    public void onmessage(string s) {
        system.out.println("cpy服务收到消息:" + jsonobject.tojsonstring(s));
    }
}

测试

我们先把这里的状态改成 unknown,来模拟本地事务提交失败的场景,来验证事务回查的效果

发送事务消息之后,我们这里是unknown状态,所以没有提交成功

此时服务-b也没有消费到消息

过了一会,mq事务消息进行回查,此时因为数据库已经存在这条记录了,所以直接commit

这时候服务消费方也成功消费到消息了

到此这篇关于springboot整合rocketmq实现分布式事务的文章就介绍到这了,更多相关springboot rocketmq分布式事务内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com