【学习Seata1.6源码#03】TC 集群具有高可用架构的秘密

news2024/12/24 10:04:01

一、背景

TC 集群具有高可用架构,应用到集群是这样一个间接的关系:应用 -》事务分组 -》TC 集群,应用启动后所指定的事务分组不能变,可通过配置中心变更事务分组所属的 TC 集群,Seata 客户端监听到这个变更后,会切换到新的 TC 集群。

本篇从源码梳理这个高可用能力是如何实现的。

二、环境配置

客户端配置使用nacos配置中心和nacos注册中心,

seata:
  enabled: true
  # Seata 应用编号
  application-id: seataclistock
  # Seata 事务组编号,用于 TC 集群名。该配置需要与服务端提到的group相对应,也需要与下面的相对应
  tx-service-group: tx_group_stock
  # 关闭自动代理
  enable-auto-data-source-proxy: false
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: nacos
    nacos:
      serverAddr: 
      namespace: seata # 需要与服务端添加的配置文件相同
      group: SEATA_GROUP_ROCKTEST
      username: seata
      password: seata
      data-id: seataClient.tx_group_busin.properties
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: nacos
    nacos:
      application: seata-server
      serverAddr: 
      namespace: seata  # 需要与服务端添加的配置文件相同
      group: SEATA_GROUP_ROCKTEST   # 需要与服务端添加的配置文件相同
      username: seata
      password: seata
复制代码

三、从配置中心获取TC集群

服务注册的能力要依赖配置中心,从nacos的配置中心获取配置NacosConfiguration#initSeataConfig

  • Data Id:seataClient.tx_group_stock.properties
  • Group:SEATA_GROUP_LWKTEST

其中的service.vgroupMapping.tx_group_stock的值是dev_cluster_1,接下来注册能力就要使用这个集群来工作。

private static void initSeataConfig() {
    try {
        String nacosDataId = getNacosDataId();
        String config = configService.getConfig(nacosDataId, getNacosGroup(), DEFAULT_CONFIG_TIMEOUT);
        if (StringUtils.isNotBlank(config)) {
            seataConfig = ConfigProcessor.processConfig(config, getNacosDataType());

            NacosListener nacosListener = new NacosListener(nacosDataId, null);
            configService.addListener(nacosDataId, getNacosGroup(), nacosListener);
        }
    } catch (NacosException | IOException e) {
        LOGGER.error("init config properties error", e);
    }
}
复制代码

RegistryFactory#getInstance()这是个单例机制,所以源码梳理起来很简单,下边获取TC服务的时候会调用此单例方法做初始化。

  1. 读取配置文件中registry.type,
  2. 配置的值是nacos,所以读出的值是nacos
  3. 通过SPI加载并实例化 NacosRegistryProvider
public class RegistryFactory {

    /**
     * Gets instance.
     *
     * @return the instance
     */
    public static RegistryService getInstance() {
        return RegistryFactoryHolder.INSTANCE;
    }

    private static RegistryService buildRegistryService() {
        RegistryType registryType;
        //registryTypeName = "registry.type"
        String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
            ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
                + ConfigurationKeys.FILE_ROOT_TYPE);
        try {
            // nacos
            registryType = RegistryType.getType(registryTypeName);
        } catch (Exception exx) {
            throw new NotSupportYetException("not support registry type: " + registryTypeName);
        }
        // 通过SPI 加载并实例化 NacosRegistryProvider
        return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();

    }

    private static class RegistryFactoryHolder {
        private static final RegistryService INSTANCE = buildRegistryService();
    }
}
复制代码

TM、RM 客户端需要与TC通信,所以在其初始化时必然会有获取TC集群的逻辑,对应在源码TmNettyRemotingClient#init 中的reconnect方法。

@Override
public void init() {
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        //父类中会开启定时任务来执行 getClientChannelManager().reconnect(transactionServiceGroup)
        super.init();
        if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
            getClientChannelManager().reconnect(transactionServiceGroup);
        }
    }
}
复制代码

reconnect中的.NettyClientChannelManager#getAvailServerList 是根据seata.tx-service-group的值来检索TC集群信息。直接提供出来调用堆栈,方便大家快速熟悉调用链路:

getServiceGroup:111, RegistryService (io.seata.discovery.registry)
lookup:145, NacosRegistryServiceImpl (io.seata.discovery.registry.nacos)
getAvailServerList:257, NettyClientChannelManager (io.seata.core.rpc.netty)
reconnect:171, NettyClientChannelManager (io.seata.core.rpc.netty)
init:198, TmNettyRemotingClient (io.seata.core.rpc.netty)
init:47, TMClient (io.seata.tm)
initClient:220, GlobalTransactionScanner (io.seata.spring.annotation)
afterPropertiesSet:512, GlobalTransactionScanner (io.seata.spring.annotation)
复制代码

这里便是通过Seata客户端 seata.tx-service-group的值,找到最终TC集群的关键之处。在getServiceGroup中从nacos中获取service.vgroupMapping.tx_group_stock的值,即dev_cluster_1

default String getServiceGroup(String key) {
    //key = service.vgroupMapping.tx_group_stock
    key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
    if (!SERVICE_GROUP_NAME.contains(key)) {
        ConfigurationCache.addConfigListener(key);
        SERVICE_GROUP_NAME.add(key);
    }
    return ConfigurationFactory.getInstance().getConfig(key);
}
复制代码

然后NettyClientChannelManager#reconnect中获取 TC 集群中的所有 TC 服务节点,对每个TC 服务节点建连。

for (String serverAddress : availList) {
    try {
        acquireChannel(serverAddress);
        channelAddress.add(serverAddress);
    } catch (Exception e) {
        LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
            serverAddress, e.getMessage(), e);
    }
}
复制代码

再梳理一下,梳理这么多的关键就是通过tx_group_stock 找到 TC 集群 dev_cluster_1

  • 客户端:
    seata:
      # 默认关闭,如需启用spring.datasource.dynami.seata需要同时开启
      enabled: true
      # Seata 事务组编号,用于 TC 集群名。该配置需要与服务端提到的group相对应,也需要与下面的相对应
      tx-service-group: tx_group_stock
    复制代码
  • nacos:
    Data ID: seataClient.tx_group_stock.properties
    Group: SEATA_GROUP_ROCKTEST
    配置内容:
        ...
        service.vgroupMapping.tx_group_stock=dev_cluster_1
        ...
    复制代码
  • TC服务端:
    registry:
      # support: nacos 、 eureka 、 redis 、 zk  、 consul 、 etcd3 、 sofa
      type: nacos
      preferred-networks: 30.240.*
      nacos:
        application: seata-server
        cluster: dev_cluster_1
    复制代码

RegistryService#getServiceGroup中从nacos获取值的时候,有个细节需要注意:通过namespace + Group + Key(Data Id) 三维来唯一标示一个Key。

四、刷新TC集群

AbstractNettyRemotingClient#init中默认会每隔10s进行一次 TC 服务清单刷新与重连

timerExecutor.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        clientChannelManager.reconnect(getTransactionServiceGroup());
    }
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
复制代码

五、最后说一句

我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/113700.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TIA博途中进行积分运算的具体方法

TIA博途中进行积分运算的具体方法 如下图所示,积分是对给定函数曲线的面积进行数学计算。但是实际曲线往往没有明确的数学关系,而是随时间变化的模拟量。积分计算就是把所有由两个过程值与时间所围成的梯形区域面积相加,梯形面积等于两个过程值的平均值乘以时间间隔。 如下…

MAC 下编译调试 MySQL8.0 源码

MAC 下编译调试 MySQL8.0 源码 最近遇到几个关于 MySQL 死锁的问题,但是查看网上的资料发现并不能解决我遇到的问题,于是决定从源码寻找答案,所以在这里记录一下自己编译 MySQL 源码的过程。 环境配置如下: macOS Ventura 版本 1…

达势股份IPO前紧急叫停:年内已有多家公司急刹车,王怡为董事长

又一家准上市公司在IPO前紧急叫停。 近日,达美乐比萨(Domino’s Pizza,NYSE:DPZ)在中国的运营商——达势股份(DPC Dash Ltd,HK:01405)发布《延迟全球发售及退回香港公开发售的申请股款》&#…

箭头函数的使用

一、什么是箭头函数 从 ES6 开始支持箭头函数,它是一种新的函数表达方式,可以在某些情况下使函数的使用更加的简洁。 二、箭头函数的使用 1、单个参数的使用 例: var f v > v;// 等同于var f function (v) {return v;};2、如果箭头函…

upload-labs(writeup)

pass1 前端验证,使用bp抓包改后缀 pass2 验证MIME类型,使用bp抓包,更改Content-Type 即可 pass3 黑名单限制,可以上传 .php3 .php5 .pthml 等等后缀, 但是要在phpstydy配置文件中更改,(我使…

网络编程 基于tcp/ip协议的C/S模型

1.socket编程的一些概念 socket作用:运行在计算机中的两个程序通过socket建立起一个通道,数据在通道中传输。socket提供了流(stream),是基于TCP协议,是一个有序、可靠、双向字节流的通道,传输数据不会丢失、不会重复、…

C#中的String和StringBuilder

一:前言 String和StringBuilder都是引用类型 StringBuilder是可变的字符串,它不会创建当前字符串的新修改实例而是在现有字符串对象中进行修改 String是不可变字符串,一旦被初始化后就不能改变其内容,String值改变的过程其实是创…

如何自定义注解

在 Spring Boot 应用中&#xff0c;使用自定义注解时需要用到 AOP&#xff0c;因此引入 AOP 相关依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId> </dependenc…

Python办公自动化|批量生成请假条

大家好&#xff0c;我是毕加锁 本文就将基于一个真实的办公案例进行讲解如何提取Excel内容并创建Word&#xff0c;主要将涉及以下三个知识点 “ openpyxl 读取 Excel 文件 python-docx 写入 Word 文件 python-docx 各类样式的设计和调整 ”需求描述 你是公司的底层小虾米&…

2022年AIGC简单展望

2022 对于社会是不平凡的一年&#xff0c;而对于科技也同样是不平凡的一年。人们在社会中遭受着失意&#xff0c;却在科技中寻找希冀。对于一个命运共同体&#xff0c;它想着如何破除衰退&#xff0c;而同样对于一个活生生的个体或者家庭&#xff0c;他们也在摸索改变命运的机遇…

SpringBoot拦截器HandlerInterceptor

一、HandlerInterceptor是什么&#xff1f; Spring Boot 拦截器主要应用于登陆校验、权限验证、乱码解决&#xff0c;同样提供了拦截器功能。 二、使用方式 在项目中实现HandlerInterceptor接口开箱即用 三、HandlerInterceptor 中实现的方法 方法介绍preHandle(HttpServletRe…

Linux操作系统之进程间的通讯—管道

文章目录进程间通讯&#xff08;IPC机制&#xff09;有哪几种方式&#xff1f;1、管道有名管道无名管道2、信号量进程间通讯&#xff08;IPC机制&#xff09;有哪几种方式&#xff1f; 管道、信号量、共享内存、消息队列、套接字 1、管道 什么是管道&#xff1f; 有名管道…

JavaScript 入门基础 - 变量 / 数据类型(二)

JavaScript 入门基础 - 变量 / 数据类型&#xff08;二&#xff09; 文章目录JavaScript 入门基础 - 变量 / 数据类型&#xff08;二&#xff09;1.变量1.1 什么是变量1.2 变量在内存中的存储1.3 变量的使用1.4 变量语法扩展1.4.1 更新变量1.4.2 声明多个变量1.4.3 声明变量的特…

node.js+uni计算机毕设项目个性化服务系统小程序(程序+小程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置&#xff1a; Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等…

一文稿定 Appium 环境配置

Appium 是一个开源的、跨平台的测试框架&#xff0c;可以用来测试 Native App、混合应用、移动 Web 应用&#xff08;H5 应用&#xff09;等&#xff0c;也是当下互联网企业实现移动自动化测试的重要工具。Appium 坚持的测试理念&#xff1a; •无需用户对 App 进行任何修改或…

vue后台管理系统项目-vue-quill-editor实现富文本编辑器功能

富文本编辑器功能实现详细过程 目录 富文本编辑器功能实现详细过程 1.安装富文本插件 2.实现效果 3.实现详细过程 可直接使用 全局引入 局部引入 配置option 扩展需求 自定义配置文字大小 1.安装富文本插件 npm install vue-quill-editor --save //或者 yarn add vu…

AI城管自动识别摊贩占道经营出店经营行为

AI城管自动识别摊贩占道经营出店经营行为通过pythonyolov7对现场画面自动AI实时监测&#xff0c;当监测到流动商贩占道经营违规摆摊出店经营时时&#xff0c;立即告警。Python是一种由Guido van Rossum开发的通用编程语言&#xff0c;它很快就变得非常流行&#xff0c;主要是因…

论多窗口相互关联下window.open打开已在的窗口时只激活不刷新的实现方案

前端博主&#xff0c;热衷各种前端向的骚操作&#xff0c;经常想到哪就写到哪&#xff0c;如果有感兴趣的技术和前端效果可以留言&#xff5e;博主看到后会去代替大家踩坑的&#xff5e; 主页: oliver尹的主页 格言: 跌倒了爬起来就好&#xff5e; 来个关注吧&#xff0c;点个赞…

Springboot+Netty实现基于天翼物联网平台CTWing(AIOT)终端TCP协议(透传模式)-设备终端(南向设备)

电信的天翼物联网平台CTWing(AIOT)原先是我们俗称的aep&#xff0c;主要用于接入nb-iot设备&#xff0c;当然也可以接入其他的设备&#xff0c;在熟悉AIOT平台后&#xff0c;做后端的我有时候急需终端样品(智能门禁&#xff0c;支付识别终端&#xff0c;CPE终端&#xff0c;考勤…

使用 Swift/SwiftUI 的音频可视化

IOS 应用程序中的音频可视化是一项流行的功能,在实现聊天功能时可能需要它。将它添加到我最近的项目之一时,我个人遇到了一些问题。 你们中的一些人可能在开发包含聊天功能的应用程序时遇到过问题,其中某些消息可能包含音频。这些消息需要在应用程序内进行适当的音频可视化,…