Seata TC端协调全局事务

news2024/9/21 22:25:48

 1、Seata server注册器

//来自RM分支事务注册
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
//开启全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
//提交全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
//回滚全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
//RM分支事务提交
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
//RM分支事务回滚
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

2、DefaultCoordinator协调者

TC开启全局事务
io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
            timeout);
        MDC.put(RootContext.MDC_KEY_XID, session.getXid());
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

        session.begin();

        // transaction start event
        eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

        return session.getXid();
    }
TC提交全局事务
io.seata.server.coordinator.DefaultCore#doGlobalCommit   
   @Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start committing event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
            globalSession.getBeginTime(), null, globalSession.getStatus()));

        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                // if not retrying, skip the canBeCommittedAsync branches
                if (!retrying && branchSession.canBeCommittedAsync()) {
                    return CONTINUE;
                }

                BranchStatus currentStatus = branchSession.getStatus();
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    globalSession.removeBranch(branchSession);
                    return CONTINUE;
                }
                try {
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                    switch (branchStatus) {
                        case PhaseTwo_Committed:
                            globalSession.removeBranch(branchSession);
                            return CONTINUE;
                        case PhaseTwo_CommitFailed_Unretryable:
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error(
                                    "Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
                                return CONTINUE;
                            } else {
                                SessionHelper.endCommitFailed(globalSession);
                                LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                                return false;
                            }
                        default:
                            if (!retrying) {
                                globalSession.queueToRetryCommit();
                                return false;
                            }
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                    branchSession.getBranchId(), branchStatus);
                                return CONTINUE;
                            } else {
                                LOGGER.error(
                                    "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                                return false;
                            }
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                        new String[] {branchSession.toString()});
                    if (!retrying) {
                        globalSession.queueToRetryCommit();
                        throw new TransactionException(ex);
                    }
                }
                return CONTINUE;
            });
            // Return if the result is not null
            if (result != null) {
                return result;
            }
            //If has branch and not all remaining branches can be committed asynchronously,
            //do print log and return false
            if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
                LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false;
            }
        }
        // if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is executed to improve concurrency performance, and the global transaction ends..
        if (success && globalSession.getBranchSessions().isEmpty() && retrying) {
            SessionHelper.endCommitted(globalSession);

            // committed event
            eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
                globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
                globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));

            LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
        }
        return success;
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // just lock changeStatus

        boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
            // Highlight: Firstly, close the session, then no more branch can be registered.
            globalSession.closeAndClean();
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                if (globalSession.canBeCommittedAsync()) {
                    globalSession.asyncCommit();
                    return false;
                } else {
                    globalSession.changeStatus(GlobalStatus.Committing);
                    return true;
                }
            }
            return false;
        });

        if (shouldCommit) {
            boolean success = doGlobalCommit(globalSession, false);
            //If successful and all remaining branches can be committed asynchronously, do async commit.
            if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
                globalSession.asyncCommit();
                return GlobalStatus.Committed;
            } else {
                return globalSession.getStatus();
            }
        } else {
            return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
        }
    }
TC回滚全局事务
io.seata.server.coordinator.DefaultCoordinator#doGlobalRollback
  @Override
    public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start rollback event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),
                GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),
                globalSession.getApplicationId(),
                globalSession.getTransactionServiceGroup(), globalSession.getBeginTime(),
                null, globalSession.getStatus()));

        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
        } else {
            Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
                BranchStatus currentBranchStatus = branchSession.getStatus();
                if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                    globalSession.removeBranch(branchSession);
                    return CONTINUE;
                }
                try {
                    BranchStatus branchStatus = branchRollback(globalSession, branchSession);
                    switch (branchStatus) {
                        case PhaseTwo_Rollbacked:
                            globalSession.removeBranch(branchSession);
                            LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            return CONTINUE;
                        case PhaseTwo_RollbackFailed_Unretryable:
                            SessionHelper.endRollbackFailed(globalSession);
                            LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        default:
                            LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            if (!retrying) {
                                globalSession.queueToRetryRollback();
                            }
                            return false;
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex,
                        "Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
                        new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
                    if (!retrying) {
                        globalSession.queueToRetryRollback();
                    }
                    throw new TransactionException(ex);
                }
            });
            // Return if the result is not null
            if (result != null) {
                return result;
            }

            // In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch
            // transaction registration when rolling back.
            // 1. New branch transaction and rollback branch transaction have no data association
            // 2. New branch transaction has data association with rollback branch transaction
            // The second query can solve the first problem, and if it is the second problem, it may cause a rollback
            // failure due to data changes.
            GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
            if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {
                LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false;
            }
        }
        if (success) {
            SessionHelper.endRollbacked(globalSession);

            // rollbacked event
            eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),
                    GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),
                    globalSession.getApplicationId(),
                    globalSession.getTransactionServiceGroup(),
                    globalSession.getBeginTime(), System.currentTimeMillis(),
                    globalSession.getStatus()));

            LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
        }
        return success;
    }
    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // just lock changeStatus
        boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
            globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                globalSession.changeStatus(GlobalStatus.Rollbacking);
                return true;
            }
            return false;
        });
        if (!shouldRollBack) {
            return globalSession.getStatus();
        }

        doGlobalRollback(globalSession, false);
        return globalSession.getStatus();
    }

RM注册分支事务到TC

io.seata.server.coordinator.DefaultCoordinator#doBranchRegister    
@Override
    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        response.setBranchId(
                core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                        request.getXid(), request.getApplicationData(), request.getLockKey()));
    }

RM提交分支事务到TC

RM回滚分支事务到TC

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1381483.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL夯实之路-事务详解

事务四大特性 事务需要通过严格的acid测试。Acid表示原子性,一致性,隔离性,持久性。 原子性(atomicity) 事务是不可分割的最小单元,对于整个事务的操作,要么全部提交成功,要么全部…

Linux———ps命令详解

目录 ps 命令("process status" 的缩写。) 常用选项和参数: a:显示所有用户的进程,包括其他用户的进程。​ u:显示详细的进程信息,包括进程的所有者、CPU 使用率、内存使用量等。…

白嫖aws创建Joplin server服务器

网上有很多的Joplin服务器的搭建教程,但是基本都是抄来抄去,对初学者实在是太不友好了。 话不多说,说干就干,自己从头找资料搭了一个,这可能是全网最好的Joplin服务器搭建教程了。 aws服务器 aws的服务器还是很香的&…

MAVROS的进一步理解

一、Mavros简介 顾名思义, mavros就是mavlinkros。mavros是PX4官方提供的一个运行于ros下收发mavlink消息的工具,利用mavros可以发送mavlink消息给飞控(可以控制飞机),并且可以从飞控中接受数据(例如:飞控的位置速度 IMU数据等等…

论文阅读_训练大模型用于角色扮演

英文名称: Character-LLM: A Trainable Agent for Role-Playing 中文名称: 角色-LLM:训练Agent用于角色扮演 文章: [https://arxiv.org/abs/2310.10158](https://arxiv.org/abs/2310.10158) 作者: Yunfan Shao, Linyang Li, Junqi Dai, Xipeng Qiu 机构: 复旦大学…

使用numpy处理图片——分离通道

大纲 读入图片分离通道堆叠法复制修改法 生成图片 在《使用numpy处理图片——滤镜》中,我们剥离了RGB中的一个颜色,达到一种滤镜的效果。 如果我们只保留一种元素,就可以做到PS中分离通道的效果。 读入图片 import numpy as np import PIL.…

【UE Niagara学习笔记】04 - 火焰喷射时的黑烟效果

目录 效果 步骤 一、创建烟雾材质 二、添加新的发射器 三、设置新发射器 3.1 删除Color模块 3.2 减少生成的粒子数量 3.3 设置粒子初始颜色 3.4 设置烟雾的位置偏移 3.5 设置烟雾淡出 在上一篇博客(【UE Niagara学习笔记】03 - 火焰喷射效果&#xf…

【开源】基于JAVA+Vue+SpringBoot的医院门诊预约挂号系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 功能性需求2.1.1 数据中心模块2.1.2 科室医生档案模块2.1.3 预约挂号模块2.1.4 医院时政模块 2.2 可行性分析2.2.1 可靠性2.2.2 易用性2.2.3 维护性 三、数据库设计3.1 用户表3.2 科室档案表3.3 医生档案表3.4 医生放号…

5文件操作

包含头文件<fstream> 操作文件三大类&#xff1a; ofstream : 写文件ifstream &#xff1a;读文件fstream : 读写文件 5.1文本文件 -文件以ascii的形式存储在计算机中 5.1.1写文件 步骤&#xff1a; 包含头文件 #include "fstream"创建流对象 ofs…

【Linux运维】LVM和RAID学习及实践

LVM和RAID学习及实践 背景LVM简介新加硬盘的操作RAID-磁盘阵列应用场景RAID0RAID1其他结构RAID制作RAID 小结 背景 某台服务器的磁盘管理需要自己动手处理&#xff0c;找了一些资料也踩了一些坑&#xff0c;在这里记录一下&#xff0c;先介绍一下LVM和RAID这两个东西。在计算机…

Java实现在线编辑预览office文档

文章目录 1 在线编辑1.1 PageOffice简介1.2 前端项目1.2.1 配置1.2.2 页面部分 1.3 后端项目1.3.1 pom.xml1.3.2 添加配置1.3.3 controller 2 在线预览2.1 引言2.2 市面上现有的文件预览服务2.2.1 微软2.2.2 Google Drive查看器2.2.3 阿里云 IMM2.2.4 XDOC 文档预览2.2.5 Offic…

逆变器3前级推免(高频变压器)

一节电池标压是在2.8V—4.2V之间&#xff0c;所以24V电压需要大概七节电池串联。七节电池电压大概在19.6V—29.4V之间。 从24V的电池逆变到到220V需要升压的过程。那么我们具体需要升压到多少&#xff1f; 市电AC220V是有效值电压&#xff0c;峰值电压是220V*1.414311V 如果…

ubuntu安装node

1 下载 node 官网下载 如果需要其他版本&#xff0c;点击上图的Other Downloads 这里下载的版本是20.11.0 Linux Binaries (x64)&#xff0c;下载下来后是node-v20.11.0-linux-x64.tar.xz这样的格式&#xff0c;直接右键解压得到如下目录&#xff1a; 直接拷贝该文件夹到指定目…

一些前端学习过程的自测练习题

目录 页面设计部分 1 设计一个简单的学院网站首页&#xff1b; 2.按照图示要求完成简单的登录页面 3.完成如下网站设计 4.完成如下网站设计&#xff08;练习页面布局&#xff09; 5 利用下面素材&#xff0c;设计一个满足H5规范的网页&#xff08;移动端页面练习&#xff…

有道云笔记编辑 Markdown 文件 - GitHub README.md

有道云笔记编辑 Markdown 文件 - GitHub README.md 1. 新建 -> Markdown2. GitHub README.mdReferences 1. 新建 -> Markdown ​ 2. GitHub README.md ​​​ References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

group by 查询慢的话,如何优化?

1、说明 根据一定的规则&#xff0c;进行分组。 group by可能会慢在哪里&#xff1f;因为它既用到临时表&#xff0c;又默认用到排序。有时候还可能用到磁盘临时表。 如果执行过程中&#xff0c;会发现内存临时表大小到达了上限&#xff08;控制这个上限的参数就是tmp_table…

中通快递批量查询方法

你是否经常需要处理大量的中通快递单号&#xff0c;却苦于一个个等待查询&#xff1f;现在&#xff0c;有了固乔快递查询助手&#xff0c;这个问题迎刃而解&#xff01;通过批量查询功能&#xff0c;你可以轻松管理、追踪你的中通快递单号&#xff0c;大大提高工作效率。 一、下…

spark中Rdd依赖和SparkSQL介绍--学习笔记

1&#xff0c;RDD的依赖 1.1概念 rdd的特性之一 相邻rdd之间存在依赖关系&#xff08;因果关系&#xff09; 窄依赖 每个父RDD的一个Partition最多被子RDD的一个Partition所使用 父rdd和子rdd的分区是一对一&#xff08;多对一&#xff09; 触发窄依赖的算子 map()&…

提交代码,SVN被锁定,提示:svn is already locked解决方案

今天遇到一个问题&#xff0c;svn 在提交代码的时候出现了svn is already locked&#xff0c;解决方案如下图 点击clean up 点击ok即可 来看官方对clean up的解释&#xff1a;它的作用就是查找工作拷贝中的所有遗留的日志文件&#xff0c;删除进程中工作拷贝的锁。 参考&…

掌握 Vue 响应式系统,让数据驱动视图(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…