【Seata源码学习 】篇三 seata客户端全局事务开启、提交与回滚

news2024/11/18 20:31:59

1.GlobalTransactionalInterceptor 对事务方法对增强

我们已经知道 GlobalTransactionScanner 会给bean的类或方法上面标注有@GlobalTransactional 注解 和 @GlobalLock的 添加一个 advisor (DefaultPointcutAdvisor ,advisor = 绑定了PointCut 的 advise)

而此处的 DefaultPointcutAdvisor 的 advice 为 GlobalTransactionalInterceptor,PointCut 为 Pointcut.TRUE(匹配所有方法)

然后由 DynamicAdvisedInterceptor 回调函数 获取当前 bean对应的 AdvisedSupport , 从中遍历所有的 advisor,然后再获取 此 advisor 绑定的 PointCut 对目标方法进行匹配 ,如果满足,则添加到 拦截器链路中,后续递归调用

接下来我们看下 GlobalTransactionalInterceptor 对目标方法做了哪些增强

io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke

 public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        // Method.getDeclaringClass 获取方法声明的类 如果是静态方法,则返回方法所在类;如果是实例方法,则返回方法所在类的超类
        //!specificMethod.getDeclaringClass().equals(Object.class) 排除Object方法
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //默认包装生成的DefaultPointcutAdvisor 拦截所有的方法
            // public DefaultPointcutAdvisor(Advice advice) {
            //		this(Pointcut.TRUE, advice);
            //	}
            //因此在此处对方法进行过滤  只处理标注了@GlobalTransactional 和 @GlobalLock 注解的方法
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if (!localDisable) {
                if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                    //根据@GlobalTransactional 注解的属性封装事务切面信息
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) {
                        transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                    } else {
                        transactional = this.aspectTransactional;
                    }
                    //处理全局事务
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
                    //处理全局锁
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        return methodInvocation.proceed();
    }

2.全局事务执行器 TransactionalExecutor 的匿名实现

io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction

  Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final AspectTransactional aspectTransactional) throws Throwable {
        boolean succeed = true;
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
    								// 责任链模式 继续执行链路上的其他拦截器方法,如果已经执行到最后一个
    								// 则直接执行目标方法
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = aspectTransactional.getName();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    //注解解析
                    // reset the value of timeout
                    int timeout = aspectTransactional.getTimeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }

                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(aspectTransactional.getPropagation());
                    transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
                    transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getRollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            //根据异常类型执行不同的钩子方法
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    succeed = false;
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    succeed = false;
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {
            if (degradeCheck) {
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }

将事务参数信息封装到TransactionInfo对象中,如事务传播行为,超时时间,超时重试次数,异常回滚类型等等,

如果出现异常,则根据异常类型分别执行不同的钩子方法

并且会继续回调拦截器链路上的其他拦截器方法

3.TransactionalTemplate 全局事务执行过程的模版方法

io.seata.tm.api.TransactionalTemplate#execute

public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Get transactionInfo
        //获取@GlobalTransation注解的属性封装的TransactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        // GlobalTransactionContext 全局事务上下文对象 用于创建一个新事务,或者获取当前事务
        // GlobalTransactionContext.getCurrent - > RootContext.getXID -> ContextCore.get
        // ContextCore 是一个接口 seata有两个实现  FastThreadLocalContextCore  ThreadLocalContextCore 都是基于ThreadLocal存储XID
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation.
        // 获取当前事务的传播行为
        Propagation propagation = txInfo.getPropagation();
        // 用于存储被挂起的事务XID
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            //处理事务的传播行为
            switch (propagation) {
                //如果当前事务的传播行为是 NOT_SUPPORTED 则以非事务的方式执行调用methodInvocation.proceed()
                // 如果当前拦截器不为拦截链的最后一个,则将获取下一个拦截器执行invoke方法,如果是最后一个,则直接执行目标方法
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    //如果当前存在全局事务,则挂起当前事务
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    // 继续执行拦截器链
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    // 如果当前存在事务 则挂起当前事务 并创建一个新的事务
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:
                    // If transaction is not existing, execute without transaction.
                    // 如果不存在事务 则跳过当前事务拦截器 执行拦截器链并返回
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:
                    // If transaction is existing, throw exception.
                    // 有事务抛出异常
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
                    // If transaction is not existing, throw exception.
                    // 要求必须有事务,没事务抛出异常
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            // 如果当前的事务上下文中不存在事务,实例化默认全局事务对象 且此次事务发起为 TM 角色为 Launcher
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            // 记录当前的全局锁配置,存放到 ThreadLocal
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //    else do nothing. Of course, the hooks will still be triggered.
                // 执行全局事务开启的前后置钩子方法
                // 如果当前事务的角色是 Participant 也就是 RM ,判断当前事务上下文RootContext是否存在XID,如果不存在,抛出异常
                // 如果当前事务的角色是 launcher 也就是 TM ,判断当前事务上下文RootContext是否存在XID,如果存在,抛出异常
                // 如果不存在,则通过TmNettyRemotingClient  向TC发送一个 GlobalReportRequest 同步消息,并获取TC返回的XID,绑定到RootContext
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    // 执行执行拦截器链路
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    // 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)
                    // 执行异常回滚的前后钩子方法
                    // 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalRollbackRequest 同步消息
                    // 并记录TC返回的当前事务状态Status
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                //  如果方法执行过程中没有出现异常
                //  执行事务提交的前后置方法
                //  如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalCommitRequest 同步消息
                // 并记录TC返回的当前事务状态Status
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                // 恢复以前的全局锁配置
                resumeGlobalLockConfig(previousConfig);
                // 执行整个事务完成的前后置方法
                triggerAfterCompletion();
                // 移除当前绑定的事务钩子对象
                cleanUp();
            }
        } finally {
            // If the transaction is suspended, resume it.
            // 当前事务执行完毕后,恢复挂起的事务,
            // 获取suspendedResourcesHolder关联的xid,由RootContext重新绑定
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

判断RootContext中是否绑定了XID,如果没有绑定,说明当前不存在事务返回null,如果有绑定XID,则返回默认的GlobalTransaction实现,记录当前全局事务的状态为beging,且为事务的参与者participate。接下来根据全局事务不同的传播行为,进一步判断需不需要挂起当前的全局事务,或者跳过事务处理,如果当前的传播行为要求有一个事务,而当前不存在全局事务(GlobalTransaction对象为null),则无参实例化GlobalTransaction,默认为事务的发起者luancher,事务状态未知

接着执行不同的钩子方法,且都是由事务发起者luancher使用 TmNettyRemotingClient 与 TC 通信,发送GlobalReportRequest消息,如果链路执行顺利,则发送GlobalCommitRequest消息,如果出现异常,发送GlobalRollbackRequest

4.总结与流程图

seata客户端模版方法执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1226205.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

更新文章分类

CategoryController PutMappingpublic Result update(RequestBody Validated Category category){categoryService.update(category);return Result.success();} CategoryService //更新分类void update(Category category); CategoryServiceImpl Overridepublic void update(…

大数据Doris(二十六):数据导入(Routine Load)介绍

文章目录 数据导入(Routine Load)介绍 一、​​​​​​​适用场景

asp.net智能考试系统VS开发sqlserver数据库web结构c#编程计算机网页项目

一、源码特点 asp.net 智能考试系统 是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 系统运行视频 https://www.bilibili.com/video/BV1gz4y1A7Qp/ 二、功能介绍 本系统使用Microsoft Visual Studio 201…

数据结构【DS】队列

队列&#xff1a;只允许在表尾(队尾)进行插入&#xff0c;而在表头(队头)进行删除的线性表。 循环队列 初始(队空)时&#xff1a; &#x1d478;.&#x1d487;&#x1d493;&#x1d490;&#x1d48f;&#x1d495;&#x1d478;.&#x1d493;&#x1d486;&#x1d482;&am…

iOS源码-工程目录讲解

1、 工程目录 1.1、xib 主要的界面渲染控制&#xff0c;ios开发常用的界面&#xff0c;可以在这里快速开发出来 1.2、base 基本的类&#xff0c;子类继承base类&#xff0c;就具备父类的方法&#xff0c;无需在重写 1.3、util 基础的类一些&#xff0c;处理时间等 1.4、…

Schrodinger Shape Screen 工具使用方法

schrodinger的shape screen方法是一种基于ligand的筛选方法。需要提供一个参考分子&#xff0c;和需要筛选的分子库。shape screen可以根据原子类型、药效团对分子的形状相似度进行打分。 shape screen面板 shape screen面板如下&#xff1a; 1. 参考分子来源&#xff0c;可以…

Unity使用Visual Studio Code 调试

Unity 使用Visual Studio Code 调试C# PackageManager安装Visual Studio EditorVisual Studio Code安装Unity 插件修改Unity配置调试 PackageManager安装Visual Studio Editor 打开 Window->PackageManger卸载 Visual Studio Code Editor &#xff0c;这个已经被官方废弃安…

数据结构【DS】树的性质

度为m的树 m叉树 至少有一个节点的度m 允许所有节点的度都<m 一定是非空树&#xff0c;至少有m1个节点 可以是空树 节点数 总度数 1m叉树&#xff1a; 高度为h的m叉树 节点数最少为&#xff1a;h具有n个结点的m叉树 最大高度&#xff1a;n度为m的树&#xff1a; 具有…

portraiture2024ps磨皮插件参数设置教程

ps磨皮插件一般是第三方软件&#xff0c;通过安装的方式放在ps的相关文件夹中。但也有一些插件是放置在系统软件目录的&#xff0c;不与ps文件放在一起。本文会给大家具体介绍以上两种不同的情况&#xff0c;方便大家了解ps磨皮插件放在哪个文件夹&#xff0c;ps的磨皮插件在哪…

【网络】OSI模型 与 TCP/IP模型 对比

一、OSI模型 OSI模型包含7个层次&#xff0c;从下到上分别是&#xff1a; 1. 物理层&#xff08;Physical Layer&#xff09; - 功能&#xff1a;处理与电子设备物理接口相关的细节&#xff08;如电压、引脚布局、同步&#xff0c;等等&#xff09;。 - 协议&#xff1a;以…

从关键新闻和最新技术看AI行业发展(2023.11.6-11.19第十期) |【WeThinkIn老实人报】

Rocky Ding 公众号&#xff1a;WeThinkIn 写在前面 【WeThinkIn老实人报】旨在整理&挖掘AI行业的关键新闻和最新技术&#xff0c;同时Rocky会对这些关键信息进行解读&#xff0c;力求让读者们能从容跟随AI科技潮流。也欢迎大家提出宝贵的优化建议&#xff0c;一起交流学习&…

系列一、堆里面的分区:Eden、From、To、老年代各自的特点

一、堆里面的分区&#xff1a;Eden、From、To、老年代各自的特点 堆是对象共享的区域&#xff0c;也是垃圾回收器主要工作的地方。主要分为新生区、养老区和元空间&#xff0c;而这三块地方中GC主要工作在新生区和养老区&#xff0c;其中新生区占1/3、养老区占2/3&#xff0c;新…

airlearning-ue4安装的踩坑记录

最近要安装airlearning-ue4&#xff0c;用于实现无人机仿真环境&#xff0c;该项目地址为&#xff1a;GitHub - harvard-edge/airlearning-ue4: Environment Generator for Air Learning Project. This version is build on top of UE4 game engine 由于这个项目已经完成好几年…

Ubuntu20.04 安装微信 【deepin-wine方式极简安装】推荐,两行命令就解决了。

参考git仓库地址: GitHub - zq1997/deepin-wine: 【deepin源移植】Debian/Ubuntu上最快的QQ/微信安装方式【deepin源移植】Debian/Ubuntu上最快的QQ/微信安装方式. Contribute to zq1997/deepin-wine development by creating an account on GitHub.https://github.com/zq199…

项目全生命周期阶段检查单

项目全生命周期阶段检查单 1、立项阶段 2、计划阶段 3、需求阶段 4、设计阶段 5、编码集成阶段 6、测试阶段 7、交付阶段 8、结项阶段

PyTorch微调权威指南3:使用数据增强

如果你曾经参与过 PyTorch 模型的微调&#xff0c;可能会遇到 PyTorch 的内置变换函数&#xff0c;这使得数据增强变得轻而易举。 即使你之前没有使用过这些功能&#xff0c;也不必担心。 在本文中&#xff0c;我们将深入研究 PyTorch 变换换函数的世界。 我们将探索你可以使用…

【计算机毕业设计】Springboot 社区助老志愿服务系统-96682, 免费送源码,【开题选题+程序定制+论文书写+答辩ppt书写-原创定制程序】

Springboot 社区助老志愿服务系统 摘要 大数据时代下&#xff0c;数据呈爆炸式地增长。为了迎合信息化时代的潮流和信息化安全的要求&#xff0c;利用互联网服务于其他行业&#xff0c;促进生产&#xff0c;已经是成为一种势不可挡的趋势。在图书馆管理的要求下&#xff0c;开发…

【Linux】C文件系统详解(三)——如何理解缓冲区以及自主封装一个文件接口

文章目录 如何理解缓冲区现象概念:文件缓冲区为什么要有缓冲区缓冲区在哪里 自己封装一个简单的文件接口自主封装目标 代码关于缓冲区强制刷新内核 关于字符串格式化函数printf和scanf函数 如何理解缓冲区 以前写过一个进度条, 有一个输出缓冲区->这个缓冲区在哪里,为什么要…

五、Linux目录结构

1.基本介绍 1.Linux的文件系统是采用级层式的树状目录结构&#xff0c;在此结构中的最上层是根目录"r/"&#xff0c;然后在此目录下再创建其他的目录。 2.深刻理解linux树状文件目录是非常重要的 3.记住一句经典的话&#xff1a;在Linux世界里&#xff0c;一切皆文件…

9、传统计算机视觉 —— 边缘检测

本节介绍一种利用传统计算机视觉方法来实现图片边缘检测的方法。 什么是边缘检测? 边缘检测是通过一些算法来识别图像中物体之间,或者物体与背景之间的边界,也就是边缘。 边缘通常是图像中灰度变化显著的地方,标志着不同区域的分界线。 在一张图像中,边缘可以是物体的…