Seata AT模式下的源码解析(三)

news2025/1/13 2:56:23

7. 网络请求

7.1 TransactionManager

事务管理器,在客户端主要用于发起事务请求、提交事务、回滚事务请求等,用于跟 TC 进行通信的类,其中获取当前接口的实现类是通过 TransactionManagerHolder 进行获取,然后通过 SPI 接口获取到默认实现类,这里的默认实现类是 DefaultTransactionManager

public interface TransactionManager {

    /**
     * 开启全局事务
     *
     * @param applicationId           应用id,指明当前事务所属的应用
     * @param transactionServiceGroup 事务服务的分组
     * @param name                    全局事务的名称
     * @param timeout                 全局事务的超时时间
     */
    String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException;

    /**
     * 根据指定的xid进行事务的提交
     */
    GlobalStatus commit(String xid) throws TransactionException;

    /**
     * 根据xid进行事务回滚
     */
    GlobalStatus rollback(String xid) throws TransactionException;

    /**
     * 获取到当前事务的状态
     */
    GlobalStatus getStatus(String xid) throws TransactionException;

    /**
     * 上报对应的全局事务的状态
     */
    GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException;
}

7.2 RMClient

RM客户端实例化工具类,用于创建 RM 客户端, GlobalTransactionScanner.initClient() 方法,在bean对象创建时进行初始化,与之相同的还有一个 TMClient 逻辑都一样

public class RMClient {
    public static void init(String applicationId, String transactionServiceGroup) {
        //通过单例模式创建一个Netty的 RM客户端端
        RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        //设置默认的资源管理器
        rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
        /**
         * 设置事务方法的处理器,默认 DefaultRMHandler,其中根据类型选择对于模式的处理器
         * RMHandlerAT
         * RMHandlerSaga
         * RMHandlerTCC
         * RMHandlerXA
         */
        rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
        //初始化时注册对于方法类型的处理器
        rmNettyRemotingClient.init();
    }
}

7.3 RemotingClient

远程调用客户端,用于 ClientServer 的通信,下面是依赖结构图,目前客户端的实现就以下两个:(TM用于一阶段的通信,RM用于二阶段接收TC对事务的处理);

  • RmNettyRemotingClient:资源管理器客户端,管理分支事务处理的资源,驱动分支事务提交或回滚,与TC进行通信以注册分支事务和报告分支事务的状态
  • TmNettyRemotiongClient:事务管理器,定义全局事务的范围,开始全局事务、提交或回滚全局事务

在这里插入图片描述

7.3.1 TmNettyRemotiongClient

采用了单例模式,底层使用 Netty 作为通讯框架,TM 主要用于发送请求然后处理发送请求过后的数据

public static TmNettyRemotingClient getInstance() {
    if (instance == null) {
        synchronized (TmNettyRemotingClient.class) {
            if (instance == null) {
                NettyClientConfig nettyClientConfig = new NettyClientConfig();
                //配置消费的线程池
                final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                    nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                    KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                    new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
                                           nettyClientConfig.getClientWorkerThreads()),
                    RejectedPolicies.runsOldestTaskPolicy());
                //创建TM客户端
                instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
            }
        }
    }
    return instance;
}
init()

初始化方法,在上面 GlobalTransactional.initClient() 中进行调用,注册对应消息类型的处理器

public void init() {
    // 注册消息类型的处理器
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();
        if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
            getClientChannelManager().reconnect(transactionServiceGroup);
        }
    }
}
registerProcessor()

TM 注册的消息类型处理是以下几种:

  • MessageType.TYPE_SEATA_MERGE_RESULT:合并请求的结果()
  • MessageType.TYPE_GLOBAL_BEGIN_RESULT:全局事务开启的结果处理()
  • MessageType.TYPE_GLOBAL_COMMIT_RESULT:提交事务的结果处理
  • MessageType.TYPE_GLOBAL_REPORT_RESULT:重新上报的结果处理
  • MessageType.TYPE_GLOBAL_ROLLBACK_RESULT:回滚的结果处理
  • MessageType.TYPE_GLOBAL_STATUS_RESULT:状态的结果处理
  • MessageType.TYPE_REG_CLT_RESULT
  • MessageType.TYPE_BATCH_RESULT_MSG:批量发送的结果
  • MessageType.TYPE_HEARTBEAT_MSG:心跳消息的处理
private void registerProcessor() {
    // 1.registry TC response processor
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    ..................省略代码
}

7.3.2 RmNettyRemotiongClient

主要用于处理 TC 端主动下发的数据监听,用于 二阶段处理,初始化方法跟 TM 一样,只是注册的处理器不一样

registerProcessor()

RM 管理主要用于处理来自服务端下发的请求,主要注册的类型有以下:

  • MessageType.TYPE_BRANCH_COMMIT:分支提交(RmBranchCommitProcessor)
  • MessageType.TYPE_BRANCH_ROLLBACK:分支回滚(RmBranchRollbackProcessor)
  • MessageType.TYPE_RM_DELETE_UNDOLOG:删除undolog(RmUndoLogProcessor)
  • MessageType.TYPE_SEATA_MERGE_RESULT:合并结果(ClientOnResponseProcessor)
  • MessageType.TYPE_BRANCH_REGISTER_RESULT:分支注册结果处理
  • MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT:状态报告结果
  • MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT:全局锁查询结果处理
  • MessageType.TYPE_REG_RM_RESULT
  • MessageType.TYPE_BATCH_RESULT_MSG:批量发送结果信息
  • MessageType.TYPE_HEARTBEAT_MSG:心跳结果(ClientHeartbeatProcessor)
private void registerProcessor() {
        // 1.registry rm client handle branch commit processor
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        // 2.registry rm client handle branch rollback processor
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        ...........省略相同代码
    }

TM、RM 都是采用 netty 进行通信,可以在两个对象的 构造方法 中看到设置的最终 handler 对象是 io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler 对象;

AbstractNettyRemoting.processMessage() 中获取的又是根据消息类型区分的 RemotingProcessor 处理器,就是上面注册的处理器

class ClientHandler extends ChannelDuplexHandler {
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof RpcMessage)) {
            return;
        }
        //调用 io.seata.core.rpc.netty.AbstractNettyRemoting.processMessage 的方法
        processMessage(ctx, (RpcMessage) msg);
    }
}

7.3.3 RemotingProcessor

  • ClientHeartbeatProcessor:客户端心跳处理
  • ClientOnResponseProcessor:客户端处理 TC 的回复消息
  • ServerHeartbeatProcessor:服务端心跳处理
  • ServerOnRequestProcessor:服务端用于处理客户端发送的分支注册、分支状态上报、全局事务开启、全局锁查询、全局事务回滚、全局事务状态的请求
  • ServerOnResponseProcessor:服务端处理客户端发送的分支提交结果、分支回滚结果的处理器
  • RmBranchCommitProcessor:分支提交的处理器
  • RmBranchRollbackProcessor:分支回滚处理器
  • RmUndoLogProcessor:undolog删除处理器

7.4 MessageTypeAware(请求实体)

请求实体的接口,就只定义了一个方法,用于请求的类型

public interface MessageTypeAware {

    /**
     * 获取到请求类型的code码
     */
    short getTypeCode();

}

7.4.1 AbstractTransactionRequestToTC

对于客户端来说,请求体就是 TM --> TC ,seata中服务端的实体处理结构就是下面这样,抽象出了一个 AbstractTransactionRequestToTC 的抽象类,(tm发起的请求实体,tc就需要转换成对应的实体处理)

在这里插入图片描述

public abstract class AbstractTransactionRequestToTC extends AbstractTransactionRequest {

    /**
     * TC入栈处理器
     */
    protected TCInboundHandler handler;

    /**
     * 设置TC的入栈处理器
     *
     * @param handler the handler
     */
    public void setTCInboundHandler(TCInboundHandler handler) {
        this.handler = handler;
    }
}

7.4.2 AbstractTransactionRequestToRM

对于TC来说,给RM发送请求的实体对象是 AbstractTransactionRequestToRM,那么 RM就需要转换成 AbstractTransactionRequestToRM 下面的对于类型

public abstract class AbstractTransactionRequestToRM extends AbstractTransactionRequest {

    /**
     * 设置RM入栈处理器
     */
    protected RMInboundHandler handler;

    
    public void setRMInboundMessageHandler(RMInboundHandler handler) {
        this.handler = handler;
    }
}

7.4.3 RMInboundHandler

public interface RMInboundHandler {

    /**
     * 处理分支提交的请求
     */
    BranchCommitResponse handle(BranchCommitRequest request);

    /**
     * 处理分支回滚的请求
     */
    BranchRollbackResponse handle(BranchRollbackRequest request);

    /**
     * 处理undolog删除日志的请求
     */
    void handle(UndoLogDeleteRequest request);
}

7.4.4 TCInboundHandler

public interface TCInboundHandler {

    /**
     * 处理开启全局事务的请求
     */
    GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);

    /**
     * 处理全局事务提交的请求
     */
    GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);

    /**
     * 处理事务回滚的请求
     */
    GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);

    /**
     * 处理分支注册的请求
     */
    BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);

    /**
     * 处理分支状态上报的请求
     */
    BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);

    /**
     * 查询全局锁
     */
    GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);

    /**
     * 全局事务状态查询
     */
    GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);

    /**
     * 上报事务状态该的请求
     */
    GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);

}

上面的两个抽象类中,为什么要在实体类中定义两个 handler 呢?
让我一起看看 seata 的实体设置,这样的用意是什么。
例如:下面这个 分支提交请求 ,其中覆写了 handle() ,而这个 hander 的类型是上面两种 TCInboundHandler、RMInboundHandler 两个接口,两个接口给每一个请求方法都定义了一个方法,参数不同而已,也就是说,通过下面这种方法可以根据指定的参数调用到对应的处理方法,只需要设置定义的 handler 即可

public class BranchCommitRequest extends AbstractBranchEndRequest {

    @Override
    public short getTypeCode() {
        return MessageType.TYPE_BRANCH_COMMIT;
    }

    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this);
    }
}

io.seata.rm.AbstractRMHandler#onRequest :这里的调用路径就是 netty 收到 TC 的分支提交请求,然后根据类型找到 RmBranchCommitProcessor 然后在 process() 中 调用 DefaultRMHandler.onRequest() 方法

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToRM)) {
            throw new IllegalArgumentException();
        }
        AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
    	//客户端设置自己为handler,就可以根据对应的请求类型调用到指定的方法
        transactionRequest.setRMInboundMessageHandler(this);

        return transactionRequest.handle(context);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/24589.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【没用的小知识又增加了--电机】

一些乱七八糟的笔记.. 怎么计算电流环带宽 https://www.csdn.net/tags/MtTaMgysMTgwMTQwLWJsb2cO0O0O.html 理解电机控制系统中的带宽问题 - 知乎 电机控制电路程序带宽和硬件带宽的关系&#xff0c;应该如何设计相关参数&#xff1f; - 知乎 怎么理解Clarke和park变换&am…

[附源码]java毕业设计西柚网购物系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

论文阅读【5】Attention Is All You Need

1.概述 1.1 论文相关 题目&#xff1a;注意你所有需要的&#xff08;Attention Is All You Need&#xff09;发表时间&#xff1a;2017出版&#xff1a;NIPS原文地址&#xff1a;经典模型了&#xff0c;网上一搜就能搜索到代码&#xff1a; 1.2 动机 因为循环神经网络通常是…

Poison Ink: Robust and Invisible Backdoor Attack 论文笔记

1. 论文信息 论文名称Poison Ink: Robust and Invisible Backdoor Attack作者Jie Zhang&#xff08;中国科学技术大学&#xff09;会议/出版社IEEE Transactions on Image Processingpdf&#x1f4c4;在线pdf 2. introduction 文章提出了一种新的攻击方式称为“Poison Ink”…

CRF条件随机场

文章目录定义转移概率 & 发射概率损失函数单条路径的求解viterbi解码贪婪算法维特比算法参考解读定义 CRF&#xff1a;condition random field 解决序列预测问题。比如TTS的前端分词&#xff0c;实体命名识别等。 转移概率 & 发射概率 发射分数&#xff1a;将输入预测…

PPT 最后一页写什么结束语既得体又能瞬间提升格调?

谢邀&#xff01;我只分享一个现下最流行的方法&#xff0c;绝对让尾页逼格满满&#xff01;罗永浩雷军都在用的「金句法」。 提到这份方法&#xff0c;你可能会觉得很陌生&#xff0c;但你一定见过这样的页面&#xff1a; 这样的页面还有很多&#xff0c;多是以一句话收尾&…

LeetCode刷题(python版)——Topic81. 搜索旋转排序数组 II

一、题设 已知存在一个按非降序排列的整数数组 nums &#xff0c;数组中的值不必互不相同。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff08;0 < k < nums.length&#xff09;上进行了 旋转 &#xff0c;使数组变为 [nums[k], nums[k1], ..., nu…

简单网络管理协议SNMP

SNMP一、 网络管理基本概念网络管理主要构件管理站被管设备网络管理协议SNMP&#xff08;简单网络管理协议&#xff09;协议组成二、管理信息结构SMI功能被管对象的命名被管对象的数据类型编码方法三、管理信息库MIB定义要点四、SNMP基于UDP服务两种基本管理功能机制探询trap五…

期末复习 C语言再学习

作者&#xff1a;小萌新 专栏&#xff1a;期末复习 作者简介&#xff1a; 大二学生 希望能和大家一起进步 本篇博客介绍&#xff1a; 考试周临近 没时间学新知识了 回顾C语言知识 一. 常量和字符串 1. 常量的四种表示方式 字面常量 这个很简单 字面意义上的常量就是了 比如…

跳表和散列表

一、跳表 复杂度&#xff1a;O(logn)&#xff1b; 跳表的更新&#xff1a;插入数据时&#xff0c;可以选择将这个数据插入到部分索引中&#xff0c;可以选择一个随机函数&#xff0c;产生随机数K&#xff0c;边将索引添加到第一到第K级索引中。 Redis为何选择跳表来实现有序集…

冯·诺依曼体系概括总结

文章目录我们常见的计算机&#xff0c;如笔记本。我们不常见的计算机&#xff0c;如服务器&#xff0c;大部分都遵守冯诺依曼体系。 截至目前&#xff0c;我们所认识的计算机&#xff0c;都是由一个个的硬件组件组成 输入单元&#xff1a;包括键盘, 鼠标&#xff0c;扫描仪, …

Java笔记(JUnit、反射、注解)

一、JUnit单元测试 1. JUnit的介绍 JUnit是一个Java语言的单元测试工具。有了它我们在开发阶段就可以对自己编写的功能模块进行单元测试&#xff08;就是一块一块去测试&#xff09;&#xff0c;看看是否达到具体预期&#xff08;这样小Bug我们自己就能解决&#xff09;。 黑盒…

敏感词检测库ToolGood.Words中 WordsHelper类使用简介

C#开源敏感词检测库ToolGood.Words中的类WordsHelper为文本辅助操作类&#xff0c;支持繁体简体互换、全角半角互换、数字转成中文大写、拼音操作等功能&#xff0c;本文对照参考文献1&#xff0c;对该类的用法进行简要介绍。   WordsHelper类中主要的辅助函数如下表所示&…

基于SVM的航空发动机故障诊断系统设计

目录 第1关&#xff1a;准备实验数据 任务描述&#xff1a; 相关知识&#xff1a; 一、获取数据&#xff1a; 二、读取数据集&#xff1a; 三、如何找出对应的数据列&#xff1a; 编程要求&#xff1a; 测试说明&#xff1a; 第二关&#xff1a;数据预处理 任务描述&…

python习题002--字符串处理

目录 一&#xff0c;题目展示&#xff1a; 二&#xff0c;题目解答 a&#xff09;判断两个字符串是否相等 b)忽略大小写判断两个字符串是否相等 c,d)判断字符串是否以指定的字符串开始或者是结尾 e&#xff09; 获取字符串的长度 f&#xff09; 字符串切片&#xff0c…

认定省级专精特新的条件

之前报省级专精特新的一个条件是先认定市级专精特新&#xff0c;但是打算2023年申报省级专精特新的企业就不需要先申请市级的专精特新了&#xff0c;那是可以直接申请省级专精特新吗&#xff1f;并不是&#xff01; 接下来打算申请2023年省级专精特新的企业需要先认定山东省创…

18.5.4 分布式恢复

文章目录18.5.4 分布式恢复18.5.4.1 分布式恢复的连接18.5.4.1.1 为分布式恢复端点选择地址18.5.4.1.2 分布式恢复的压缩18.5.4.1.3 用于分布式恢复的复制用户18.5.4.1.4 分布式恢复的 SSL 和身份验证18.5.4.2 分布式恢复的克隆18.5.4.2.1 克隆的先决条件18.5.4.2.2 克隆的阈值…

VsCode配置Kotlin运行环境

目录 需求 前置条件 步骤 问题 一 二 结果 需求 设备上有一个 Android Studio 但是太大了, 于是就想着用 VS Code 配置一个轻量级的,而且 VS Code里面好多插件我用着也比较舒服. 前置条件 Java JDK1.8 或者 11 别的版本没试过 有基于Android Studio 的 Kotlinc的环…

c++ - 第12节 - 模板进阶

1.非类型模板参数 模板参数分为类型形参与非类型形参类型形参&#xff08;可以认为是虚拟类型&#xff09;&#xff1a;出现在模板参数列表中&#xff0c;跟在class或者typename之类的参数类型名称。非类型形参&#xff08;可以认为是常量&#xff09;&#xff1a;用一个常量作…

带命名空间的arxml读取

一、前言 读取带命名空间的arxml,最好的方式是创建一个字典来存放你自己的前缀并在搜索函数中使用它们: 二、解读如下ns.xml文件: <?xml version"1.0" encoding"UTF-8"?> <AUTOSAR xmlns"http://autosar.org/schema/r4.0" xmlns:xsi…