12.分布式事务流程与事务消息源码分析

news2025/1/22 15:41:32

highlight: arduino-light

Rocket事务流程&源码分析

Rocket解决分布式事务流程

image.png

事务消息分 2 个阶段:

① 正常事务消息的发送与提交:

a.发送消息(half 消息)

b.服务响应消息写入结果

c.根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)

d.根据本地事务状态执行 Commit 或者 Rollback ( Commit操作生成消息索引,消息对消费者可见)

② 事务消息的补偿流程(补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况)

a.对没有 Commit / Rolback 的事务消息( pending 状态的消息),从服务端发起-一次"回查"

b. Producer 收到回查消息,检查回查消息对应的本地事务的状态

c.根据本地事务状态,重新 Commit 或者 Rollback

3、事务的消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

① TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

② TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

③ TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

原文链接:https://blog.csdn.net/weixin_42367582/article/details/112639915

TransactionMQProducer发送事务消息

事务消息发送时,需要打上相应的标记PROPERTYTRANSACTIONPREPARED=true,与普通消息进行区分

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); 给broker发送消息后,根据返回状态,进行相应处理

给broker发送消息后,根据返回状态,进行相应处理

发送事务消息成功

LocalTransactionExecuter或者TransactionListener执行本地事务

并返回本地事务完成状态,包括UNKNOW、ROLLBACK、COMMIT

发送事务消息失败

FLUSHDISKTIMEOUT、FLUSHSLAVETIMEOUT、SLAVENOTAVAILABLE:

都是消息发送失败状态, 标记本地事务状态为ROLLBACK_MESSAGE

之后通过endTransaction将相应本地事务执行状态信息回传给broker.注意发送消息的方式为one way

broker端处理事务消息

TransactionMessageBridge,负责主要的事务消息存储逻辑。

half消息消费队列: prepare消息消费队列即预处理消息(prepare),事务消息首先进入此消息消费队列。

对应的TOPIC是:

RMQSYSTRANSHALFTOPIC

op消息消费队列:事务消息处理完成后,进入op消息消费队列,op消息消费队列主要用来记录事务消息完成状态。

RMQSYSTRANSOPHALF_TOPIC

EndTransactionProcessor的processRequest方法,处理producer端回传的事务状态

如果事务状态是commit,将消息还原成原来的topic和queueId,存储到commitLog中,并且删除预处理消息(prepare),然后将消息存储在主题为:RMQSYSTRANSOPHALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。

如果事务状态是rollback,删除掉prepare消息,同样也是将消息存储在主题为:RMQSYSTRANSOPHALF_TOPIC的主题中,代表这些消息已经被处理。

如果事务状态是unknown,broker定时执行回查。

定时任务回查

如果第一次producer返回的事务消息为UNKNOW,则需要进行事务回查

事务回查,broker端主要逻辑在TransactionalMessageService的check方法

prepare消息,会存储在RMQSYSTRANSHALFTOPIC消息队列中

prepare消息,被处理成功后(消息状态是回滚或者提交),会存储在RMQSYSTRANSOPHALF_TOPIC消息队列中。

所以通过回查prepare消息队列,可以对一些失败的事务消息,进行重试。

为了充分利用commitLog顺序写的特性,

回查时,只要发送了回查消息,pepare消息消费队列消费进度会往前推动,同时往prepare消息队列写入一条新的消息,如果回查失败,新增的消息可以再次发送回查消息。

如果回查成功,可以根据op消息队列中的消息,判断重复,避免重复发送回查消息。

producer端事务回查处理逻辑主要在TransactionListener的checkLocalTransaction方法,一般重写checkLocalTransaction方法,实现自定义的回查逻辑。

默认询问 15 次。

原文链接:https://blog.csdn.net/zycxnanwang/article/details/107431892

事务消息代码示例

```java public class Producer { ​    public static void main(String[] args) throws Exception {        //1.创建消息生产者producer,并制定生产者组名        TransactionMQProducer producer = new TransactionMQProducer("group5");        //2.指定Nameserver地址        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); ​        //添加事务监听器        producer.setTransactionListener(new TransactionListener() {            /*             * 在该方法中执行本地事务             * @param msg             * @param arg             * @return             */            @Override          public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {                            //接收到消息89757              try{                //1调用外部接口             //2判断外部接口成功更新数据库状态             //3判断外部接口失败不更新数据库状态                                  //如果走的是1和2 那么直接返回COMMIT_MESSAGE即可,这样不会触发回查。                return LocalTransactionState.COMMIT_MESSAGE;                                //如果走的是1和3                //这是正常情况 相当于2个事务都失败了                                  //执行了1没执行2(回查更新数据库)   或者 1和2都没执行(回查相当于重试)                  //目前存在的问题是如果调用外部接口成功 系统宕机,本地数据库没更新 那么返回的也是UNKNOW                //因为Producer调用executeLocalTransaction方法时系统宕机                //触发回查,判断外部接口有没有调用成功和数据库到底有没有更新,如果没有调用或者没有更新                //那么根据情况调用接口和更新数据库即可。                 return LocalTransactionState.UNKNOW;                                //还有一种情况是2个事务 A是本地事务 B是外部事务 C生成订单                //A执行成功 B执行失败                //比如A扣减库存B扣减余额,B的余额为0,扣减失败                //此时需要回滚A返回ROLLBACK_MESSAGE 此时消息不会发送给C                return LocalTransactionState.ROLLBACK_MESSAGE;                                  //总结                //本地事务执行成功,则返回COMMIT_MESSAGE,提交本地事务发送消息给外部事物系统                //本地事务执行失败,则返回ROLLBACK_MESSAGE                //本地事务执行异常 返回UNKNOW 执行回查。                               }catch(Exception e){                               }           } ​            /*             * 该方法是MQ进行消息事务状态回查             * @param msg             * @return             */            @Override            public LocalTransactionState checkLocalTransaction(MessageExt msg) {                System.out.println("消息的Tag:" + msg.getTags());                return LocalTransactionState.COMMIT_MESSAGE;           }       }); ​        //3.启动producer        producer.start(); ​

       //4.创建消息对象,指定主题Topic、Tag和消息体        /**             * 参数一:消息主题Topic             * 参数二:消息Tag             * 参数三:消息内容             */        Message msg = new Message("TransactionTopic", tags[i],                                 ("89757").getBytes());        //5.发送消息        SendResult result = producer.sendMessageInTransaction(msg, null);        //发送状态        SendStatus status = result.getSendStatus();        System.out.println("发送结果:" + result);        //线程睡1秒        TimeUnit.SECONDS.sleep(2); ​        //6.关闭生产者producer        //producer.shutdown();   } } ​ ```

事务消息源码解读

TransactionMQProducer#sendMessageInTransaction

1.发送Prepared消息

2.获取发送结果

3.发送成功

4.执行本地事务

5.发送确认消息

```java public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { //获取消息监听器 TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } //校验消息 Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
//设置属性TRAN_MSG 为 true 即事务消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
//设置生产者组
MessageAccessor.putProperty(msg, 
                            MessageConst.PROPERTY_PRODUCER_GROUP,
                            this.defaultMQProducer.getProducerGroup());
try {
    //发送事务消息
    sendResult = this.send(msg);
} catch (Exception e) {
    throw new MQClientException("send message Exception", e);
}
        //默认消息状态为UNKNOWN
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
//获取发送消息结果
switch (sendResult.getSendStatus()) {
    case SEND_OK: {
        try {
            //根据sendResult重新设置消息的事务id
            if (sendResult.getTransactionId() != null) {
                msg.putUserProperty("__transactionId__",
                                    sendResult.getTransactionId());
            }
            String transactionId = msg
               .getProperty
                (MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);


            if (null != transactionId && !"".equals(transactionId)) {
                msg.setTransactionId(transactionId);
            }
            //localTransactionExecuter是null
            if (null != localTransactionExecuter) {
                localTransactionState = 
                    localTransactionExecuter
                                                .executeLocalTransactionBranch(msg, arg);
            } else if (transactionListener != null) {
                //transactionListener不为空
                log.debug("Used new transaction API");
                //调用了Producer中自定义的事务监听器的executeLocalTransaction方法
                //执行本地事务
                localTransactionState = transactionListener
                                            .executeLocalTransaction(msg, arg);
            }
            // 如果事务监听器执行完以后localTransactionState变成了null 
            // 即executeLocalTransaction方法的返回值是null
            // 设置消息状态为UNKNOWN
            if (null == localTransactionState) {
                localTransactionState = LocalTransactionState.UNKNOW;
            }
                                //如果不是COMMIT_MESSAGE 打印日志
            if (localTransactionState != 
                LocalTransactionState.COMMIT_MESSAGE) {
                log.info("executeLocalTransactionBranch return {}",
                                                                                                        localTransactionState);
                log.info(msg.toString());
            }
        } catch (Throwable e) {
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    break;
    //发送消息失败
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    case SLAVE_NOT_AVAILABLE:
        //如果从节点为空 设置消息状态为回滚 
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
    default:
        break;
}

try {
    //向服务器Broker发送事务消息的状态
    //服务器Broker会根据消息状态判断是发送消息(Commit)还是删除消息(Rollback)
    this.endTransaction
        (sendResult, localTransactionState, localException);
} catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

//所以一定要判断事务消息的发送结果 不要以为没有异常就是发送成功了
//也有可能是发送失败
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;

} ```

应用程序开启一个数据库事务,进行数据库操作,并且在事务中发送一条 PREPARE 消息,PREPARE 消息发送成功后通知应用程序记录本地事务状态,然后提交本地事务。

RocketMQ 在收到类型为 PREPARE 的消息时,首先备份消息的原主题与原消息消费队列,然后将消息存储在主题为 RMQSYSTRANSHALFTOPIC 的消息队列中,故 PREPARE 的消息是不会被客户端消费的。

Broker 消息服务器开启一个定时任务处理 RMQSYSTRANSHALFTOPIC 中的消息,会每隔指定时间向消息发送者发起事务状态查询请求 ,询问消息发送者客户端本地事务是否成功,然后根据回查状态决定是提交还是回滚,即对处于 PREPARE 状态进行提交或回滚操作。

发送者如果明确得知事务成功,则可以返回 COMMIT,服务端会提交该条消息,具体操作是恢复原消息的主题与队列,重新发送到 Broker,消费端感知后消费。

发送者如果无法明确得知事务状态,则返回 UNOWN,此时服务端会等待一定时间后再次向发送者询问,默认询问 15 次。

发送者如果非常明确得知事务失败,则可以返回 ROLLBACK。

在具体实践中,消息发送者在无法获取事务状态时不要武断的返回 ROLLBACK,而是要返回 UNOWN,让服务端定时重试回查,说明如下:

image.png

在将 PREPARE 消息发送到 Broker 后,服务端发起事务查询时本地事务可能还未提交,为了避免无效的事务回查机制,RocketMQ 通常至少在收到 PREPARE 消息 6s 后才会发起第一次事务回查,可通过 transactionTimeOut 配置。故客户端在实现事务回查时无法证明事务状态时不应该返回 ROLLBACK,而是返回 UNOWN。

6.应用场景:绑定管家卡

方案1:

1.执行开户,本地事务持久化

2.调用工行绑定管家卡接口

本地事务执行成功,此时宕机,调用第三方接口失败。

方案2:

1.调用工行绑定管家卡接口

2.执行开户,本地事务持久化

调用第三方接口成功,此时宕机,本地事务执行失败。

由于工行绑定管家卡是第三方接口,所以不受本地事务限制。

方案3:

1.发送事务消息

2.判断事务消息发送状态

3.如果发送成功,执行本地事务:开户持久化+调用绑定管家卡接口

4.判断本地事务:开户持久化+调用绑定管家卡接口返回的结果

5.如果结果是成功:什么也不做

6.如果结果是失败:那么等待回查

参考事务消息源码:https://blog.csdn.net/zycxnanwang/article/details/107431892

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/683282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Midjourney使用教程:三 图片风格提示

这里我根据现在的官方文档来继续我们的Midjourney的教程,看到这里如果你去实践的话,估计你已经有了好多张属于自己的图片。 这时候你不在满足简单的提示生成的Midjourney的默认风格图片,实际上你可以通过一些关键词做提示,来改变…

初始网络原理

目录 网络发展史 独立模式 网络互连 局域网LAN 广域网WAN 网络通信基础 IP地址 端口号 认识协议 五元组 协议分层 OSI七层模型 TCP/IP五层(或四层) 网络设备所在分层 封装和分用 网络发展史 独立模式 独立模式:计算机之间相互…

第八十三天学习记录:计算机硬件技术基础:汇编语言程序设计

一、汇编语言指令 汇编语言的语句是在指令系统的基础上形成的,按其作用与编译情况分为两大类:指令性语句(符号指令)和指示性语句(伪指令)。 指令性语句是可执行语句,与机器指令相对应&#xff…

USB转换方案介绍

随着科技的不断发展,我们的生活中出现了越来越多的电子设备。然而,这些设备通常具有不同的连接端口和协议,这可能会使它们之间的连接变得困难。这时候,使用USB转换就成为了一种非常方便和实用的解决方法。 无论是在家庭、办公室还…

自动化测试——处理场景自动化测试场景详细,跟着上高速

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 1、定位一组对象 …

城市消防应急通信三级作战网构建

项目背景 随着我国《消防信息化“十三五”总体规划》对消防信息化的发展规划做了统一部署,以城市为代表的消防通信成为专网通信行业重点关注的领域之一。目前,我国城市化发展面临高层建筑林立、地铁、人防工程分布密集,大型综合体不断涌现&a…

【运维】服务器系统安装 -- 服务器版

目录 一、环境 二、ubuntu 三、启动u盘制作 Stage 1:下载balena,制作U盘启动工具 Stage 2:下载Ubuntu 系统镜像(参考上一节:Ubuntu 22.04.2 LTS ) Stage 3:将镜像写入到U盘 四、设置开启…

FUZZ工具—Boofuzz框架实际使用

接着上一篇文章FUZZ工具—Boofuzz框架来对框架进行实际的使用; 官方提供了很多案例模板,且网上关于boofuzz的使用介绍很多,也比较成熟,在各个领域都有,可以通过官方提供的案例也看得出来,然后覆盖的面也非常…

西门子变频器G120XA的快速调试方法分享

以西门子变频器G120XA为例,接着为大家介绍一下G120X和G120XA系列变频器的快速调试方法。 西门子发布的Sinamics G120X和G120XA系列变频器,专为风机和泵的应用而设计,实现高效节能、可靠稳定和简单易用。以G120XA为例,通过下面的调…

locust学习教程(9)- event 事件

目录 1、对请求的测试前置、后置处理 2、在web界面添加新内容 3、监听测试的失败率或阀值 4、汇总总结 ​🎁更多干货 1、对请求的测试前置、后置处理 请求有一个上下文参数,通过数据有关的请求(之类的用户名,标签等&#xff…

双路高速 DA 实验

目录 双路高速 DA 实验 1、简介 2、实验任务 3、程序设计 3.1、hs_dual_da顶层模块代码 3.2、ROM 波形存储模块(rom_1024x10b) 创建单端口 ROM IP核 3.2、DA 数据发送模块(da_wave_send)代码 4、硬件设计 4.1、添加.xdc…

MongoDB数据库安装

MongoDB数据库 MongoDB数据的特点: 面相文档存储的分布式数据库 具有很强的扩展性 支持丰富的查询表达式,很接近于关系性数据库 使用类似于json的结构保存数据,可以轻易的查询到文档中内嵌的对象及数组 下载安装包 首先去官网下载安装…

用JAVA写一个下载器第2集

文章目录 一、开发环境及工具二、包名概览三、项目结构四、使用步骤1.编写代码Constant.java:Downloader.javaDownloaderTask.javaDownloadInfoThread.javaFileUtils.javaHttpUtils.javaLogUtils.javaMain.java 2.运行程序 总结 一、开发环境及工具 开发环境及工具…

如果开发说这不是Bug,你会怎么处理?

在项目过程中,如果开发说这个不是Bug,你的第一反应是什么? 不同的人有不同的处理方式,也许是如下几点:相信开发说的,开发说什么就是什么,问题关闭;自己不能决定,啥都上升…

GP232RNL——USB到UART桥接控制器

GP232RNL是一款高度集成的USB到UART桥接控制器,提供了一种简单的解决方案,可以使用最少的元器件和PCB空间,将RS232接口转换为USB接口。GP232RNL包括一个USB 2.0全速功能控制器、USB收发器、振荡器、EEPROM和带有完整的调制解调器控制信号的异…

日撸java三百行day69-70

文章目录 说明day69-70 矩阵分界1.基于矩阵分解的推荐系统(Funk-SVD算法)2.随机梯度下降(SGD)2.1 导数2.2 偏导数2.3 方向导数2.4 梯度2.5 随机梯度下降,与损失函数之间的关系 3.代码理解3.1 train() 方法3.2 mae方法&…

神经网络原理(2)

斯坦福大学的印度学生、机器学习爱好者 PararthShah 在2012年12月22日的使用买芒果的例子解释了神经网络,简单来说就是:如果你需要选芒果,但不知道什么样的芒果最好吃,一个简单粗暴的方法是尝遍所有的芒果,然后总结出个…

窗口函数之-前后函数(lag/lead)

窗口函数之-前后函数 应用:求同比增长、环比增长 lead(expression,n):返回当前行的后n行 > shift(-n) 数据超前n阶,与之对齐的就是后n行的数据lag(expression,n):返回当前行的前n行> shift(n)数据滞后n阶,与之对齐的就是前n行的数据 …

人工智能轨道交通行业周刊-第49期(2023.6.12-6.25)

本期关键词:设备智能维修、故障诊断、无人机巡查、车站联锁、LangChain、腾讯大模型 1 整理涉及公众号名单 1.1 行业类 RT轨道交通人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通RailMe…

高效进行接口测试,简单易懂!

目录 前言 正文 1.Api文档导入 2.后端接口测试 3.mock数据 4.测试集接口自动化 总结 前言 日常测试过程中,常常需要多种工具来接力完成自己的接口测试任务。 比如说, 使用swagger查看接口文档, 使用mock编造接口数据对前端页面做测试…