Seata源码学习(五)- Seata服务端(TC)源码解读

news2025/1/23 5:58:55

Seata源码分析- Seata服务端(TC)源码解读

上节课我们已经分析到了SQL语句最终的执行器,但是再往下分析之前,我们需要先来分析一下TM客户端与TC端通讯以后,TC端的具体操作

服务端表解释

我们的Seata服务端在应用的时候需要准备三张表,那么这三张表分别代表的意思就是

  1. branch_table 分支事务表
  2. global_table 全局事务表
  3. lock_table 全局锁表

客户端请求服务端以后,我们就需要把对应的全局事务包括分支事务和全局锁全部存放到这里。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-54n1wCiF-1676380289869)(image-20220311185545970-1647008601419.png)]

TC服务端启动入口

那么我们任何的Java工程启动都需要主函数main,所以我们就从这里入手,首先在seata源码工程中搜索这个入口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PemtLXu0-1676380289870)(image-20220311175232256.png)]

这里我们看Server.java这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务

// 协调协调者
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);

全局事务开始方法分析

然后进入到其中我们可以看到很多的全局事务处理的方法

// 处理全局事务开始
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    throws TransactionException {
    // 响应客户端XID
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
                               request.getTransactionName(), request.getTimeout()));
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                    rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
    }
}

// 处理全局提交
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
    throws TransactionException {
    MDC.put(RootContext.MDC_KEY_XID, request.getXid());
    response.setGlobalStatus(core.commit(request.getXid()));
}

// 处理全局回滚
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
                                RpcContext rpcContext) throws TransactionException {
    MDC.put(RootContext.MDC_KEY_XID, request.getXid());
    response.setGlobalStatus(core.rollback(request.getXid()));
}
.....

在这其中我们首先关注doGlobalBegin方法中的core.begin()方法,来看一下具体操作

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 创建全局事务Session
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
                                                              timeout);
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());

    // 为Session中添加回调监听 SessionHolder.getRootSessionManager()去获取一个全局Session管理器DataBaseSessionManager
    // 观察者设计模式
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    // 全局事务开启
    session.begin();

    // transaction start event
    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
                                             session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

    return session.getXid();
}

在向下我们要关注一下全局Session管理器DataBaseSessionManager,进入到getRootSessionManager()方法中

/**
* Gets root session manager.
* 获取一个全局Session管理器
* @return the root session manager
*/
public static SessionManager getRootSessionManager() {
    if (ROOT_SESSION_MANAGER == null) {
        throw new ShouldNeverHappenException("SessionManager is NOT init!");
    }
    return ROOT_SESSION_MANAGER;
}

这个管理器如何生成的那,我们可以看一下init初始化方法

public static void init(String mode) {
    if (StringUtils.isBlank(mode)) {
        mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
    }
    // 判断Seata模式,当前为DB
    StoreMode storeMode = StoreMode.get(mode);
    if (StoreMode.DB.equals(storeMode)) {
        // 通过SPI机制读取SessionManager接口实现类,读取的是META-INF.service目录,在通过反射机制创建对象DataBaseSessionManager
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
....
                                                                      
}

读取的文件

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f9FDfGGf-1676380289870)(image-20220311182615967.png)]

再回到begin方法中,我们就知道DataBaseSessionManager是如何创建的,包括下面这一步就是创建DataBaseSessionManager

// 观察者设计模式,创建DataBaseSessionManager
 session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

但是此时有一个问题,就是我们的init方法在哪里调用的拿,其实我们回到Server中,我们发现在构建默认协调者之前就调用了init方法,说明在执行处理全局事务开始之前,就已经创建好了这个SessionManager了

SessionHolder.init(parameterParser.getStoreMode());

// 默认协调者
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);

好了此时分析清楚如何得到这个SessionManager以后,我们在回过头来看代码session.begin()位置

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 创建全局事务Session
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
                                                              timeout);
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());

    // 为Session中添加回调监听 SessionHolder.getRootSessionManager()去获取一个全局Session管理器DataBaseSessionManager
    // 观察者设计模式,创建DataBaseSessionManager
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    // 全局事务开始
    session.begin();

    // transaction start event
    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
                                             session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

    return session.getXid();
}

session.begin()

@Override
public void begin() throws TransactionException {
    // 声明全局事务开始
    this.status = GlobalStatus.Begin;
    // 开始时间
    this.beginTime = System.currentTimeMillis();
    // 激活全局事务
    this.active = true;
    // 将SessionManager放入到集合中,调用onBegin方法
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
        lifecycleListener.onBegin(this);
    }
}

这里我们来看一下 onBegin方法,调用的是父级的方法,在这其中我们要关注addGlobalSession方法,但是要注意,这里我们用的是db模式所以调用的是db模式的DateBaseSessionManager

@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {
    addGlobalSession(globalSession);
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8PPVVtOi-1676380289871)(image-20220311184728236.png)]

@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
    if (StringUtils.isBlank(taskName)) {
        // 写入session
        boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        if (!ret) {
            throw new StoreException("addGlobalSession failed.");
        }
    } else {
        boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
        if (!ret) {
            throw new StoreException("addGlobalSession failed.");
        }
    }
}

然后我们来看写入这里

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    // 第一次进入一定是写入
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else {
        throw new StoreException("Unknown LogOperation:" + logOperation.name());
    }
}

因为我们第一次调用一定是写入,所以此时我们应该查看insertGlobalTransactionDO,此方法的作用就是写入全局事务表中global_table

@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(1, globalTransactionDO.getXid());
        ps.setLong(2, globalTransactionDO.getTransactionId());
        ps.setInt(3, globalTransactionDO.getStatus());
        ps.setString(4, globalTransactionDO.getApplicationId());
        ps.setString(5, globalTransactionDO.getTransactionServiceGroup());
        String transactionName = globalTransactionDO.getTransactionName();
        transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,
                                                                                                           transactionNameColumnSize) : transactionName;
        ps.setString(6, transactionName);
        ps.setInt(7, globalTransactionDO.getTimeout());
        ps.setLong(8, globalTransactionDO.getBeginTime());
        ps.setString(9, globalTransactionDO.getApplicationData());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps, conn);
    }
}

我们可以查看GlobalTransactionDO实体类的属性,和global_table 的字段进行比对,就能看出其中道理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/346818.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RabbitMq及其他消息队列

消息队列中间价都有哪些 先进先出 Kafka、Pulsar、RocketMQ、RabbitMQ、NSQ、ActiveMQ 架构 消费推拉模式 客户端消费者获取消息的方式,Kafka和RocketMQ是通过长轮询Pull的方式拉取消息,RabbitMQ、Pulsar、NSQ都是通过Push的方式。 pull类型的消息队…

OpenCV制作Mask图像掩码

一、掩膜(mask) 在有些图像处理的函数中有的参数里面会有mask参数,即此函数支持掩膜操作,首先何为掩膜以及有什么用,如下: 数字图像处理中的掩膜的概念是借鉴于PCB制版的过程,在半导体制造中&am…

PowerShell Install VNC-Server VNC-Viewer

前言 VNCConnect是一款屏幕共享、远程控制电脑软件,可以让您连接到世界上任何地方的远程计算机,实时观看其屏幕,并像坐在它前面一样进行控制。RealVNC可以将人和设备连接到任何地方,实现控制、支持、管理、监控、培训、协作等等。…

Java——不同的子序列

题目链接 leetcode在线oj题——不同的子序列 题目描述 给定一个字符串 s 和一个字符串 t ,计算在 s 的子序列中 t 出现的个数。 字符串的一个 子序列 是指,通过删除一些(也可以不删除)字符且不干扰剩余字符相对位置所组成的新…

【C语言学习笔记】:数组、指针相关面试题

无特殊说明情况下,下面所有题s目都是linux下的32位C程序。 「1、计算以下sizeof的值。」 char str1[] {a, b, c, d, e}; char str2[] "abcde";char *ptr "abcde";char book[][80]{"计算机应用基础","C语言","C程…

Apple Safari 16.3 - macOS 专属免费浏览器 (独立安装包免费下载)

Safari 浏览器 16 for macOS Montery, Big Sur 请访问原文链接:https://sysin.org/blog/apple-safari-16/,查看最新版。原创作品,转载请保留出处。 作者主页:www.sysin.org 之前 Safari 浏览器伴随 macOS 更新一起发布&#xff…

python的opencv操作记录12——Canny算子使用

文章目录Canny算子非极大值抑制非极大值抑制中的插值滞后阈值实际应用直接使用Canny算子使用膨胀先阈值分割Canny算子 上一篇说到,我在一个小项目里需要在一幅图像中提取一根试管里的两种液体的截面。为了达到这个目的使用传统图像里的区域分割技术,实际…

脏话越多,代码越好!

你在读开源代码的时候有没有遇到过这种注释?What the fuck ?Dude,WTFFuck this !我遇到过,每次都忍不住笑,心想老外可真是性情中人,遇到不爽的地方就开骂,还直接写到注释中,甚至代码中。Bob大叔…

机械狗控制算法

一. MIT Cheetah特点 1.驱动器 Cheetah 2采用了定制的本体感受驱动器设计,具有高冲击缓解、力控制和位置控制能力。这种设计使其能够自主跳过障碍物,并以6m/s的高速跳跃,但其运动范围有限,只能进行矢状面运动。 Cheetah 3采用高扭…

C++11 lambda

Lambda 介绍 Lambda 函数也叫匿名函数, 是C 11中新增的特性; 1. Lambda函数的好处 如果你的代码里面存在大量的小函数,而这些函数一般只被调用一次,那么将他们重构成 lambda 表达式。 Lambda函数使代码变得更加紧凑、更加结构化和更富有表现…

解决gocui库的中文显示缺少的bug

gocui库地址 https://github.com/jroimartin/gocui 使用原由 最近写文档都用emacs,git客户端用的是magit。 但是写代码现在都用lvim,在lvim和终端下喜欢上了使用lazygit做git客户端。 非常喜欢lazygit在终端上的界面,扒拉了下github上代码…

【最优化理论】线性规划

文章目录什么是线性规划(Linear Programming,LP)?线性规划的标准形式非标准形LP模型转化为标准形LP模型基本概念基本解&基矩阵&基变量&非基变量基本可行解&可行基矩阵&非退化的基本可行解&退化的基本可行…

「JVM 执行引擎」栈架构的字节码的解释执行引擎

JVM 执行引擎在执行 Java 代码时有解释执行(通过解释器执行)和编译执行(通过即时编译器产生本地代码执行)两种选择; HotSpot 实际的实现中,模版解释器工作时,并不是按照概念模型中进行机械式计…

虹科分享 | CANopen协议基础知识——LSS服务

CANopen是一种架构在CAN串行总线系统上的高层通讯协议,常被用于嵌入式系统与工业控制领域,包括电机控制、机器人制造、医疗、汽车等多个行业领域。本篇文章将主要介绍CANopen的LSS服务。 一. LSS概述 Layer setting service (LSS)是CANopen的设置服务与…

Self-Supervised Log Parsing 自监督日志解析

摘要 日志在软件系统的开发和维护过程中被广泛使用,收集运行时事件并允许跟踪代码执行,从而支持各种关键任务,如故障排除和故障检测。大型软件系统会生成大量的半结构化日志记录,这对自动化分析提出了重大挑战。将带有自由形式文…

网站代理是什么?有什么需要注意的?

如今,网站代理已经成为一种不可或缺的经营方式。无论是企业还是个人,都需要通过代理来获得更多的流量和市场份额。 一、网站代理的优势 网站代理的优势在于能够为您提供更加专业、周到的服务。这些优势包括:1.丰富的内容资源,能…

2022年FIT2CLOUD飞致云开源成绩单

2023年2月15日,中国领先的开源软件公司FIT2CLOUD飞致云发布《2022年开源成绩单》,盘点公司2022年全年在开源软件产品与社区运营方面的表现。目前,飞致云旗下的核心开源软件组合包括JumpServer开源堡垒机、DataEase开源数据可视化分析平台、Me…

高压放大器在骨的逆力电研究中的应用

实验名称:高压放大器在骨的逆力电研究中的应用研究方向:生物医学测试目的:骨中的胶原和羟基磷灰石沿厚度分布不均匀,骨试样在直流电压作用下,内部出现传导电流引起试样内部温度升高,不同组分热变形不一致&a…

python3.7

一、下载安装ancconda(python3.7) ​​​​​​https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/Anaconda3-2019.07-Windows-x86.exehttps://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/Anaconda3-2019.07-Windows-x86.exe 二、配制Anaconda环境变量 此电脑——…

国外ChatGPT横空出世,国内无代码开发一样惊人,旗鼓相当

ChatGPT火爆了,究竟是什么? 有些人以为ChatGPT,只是更先进的人工智能聊天工具罢了。它除了能学习与理解人类对话语言,还能结合下文“思考”,实现与人类正常交流。这款由美国OpenAI研发的人工智能技术,2022年…