再学DataX

news2025/1/22 21:36:24

一、DataX简介

DataX官网文档:https://github.com/alibaba/DataX/blob/master/introduction.md

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

在这里插入图片描述

1.1、DataX 3.0框架设计

在这里插入图片描述

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

1.2、DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行

在这里插入图片描述

1、Job

DataX完成单个数据同步的作业。一个Job对应一个进程。Job模块负责task切分,TaskGroup管理等。

2、Task

Task是DataX的最小单元,每个task负责一部分数据同步工作。

3、TaskGroup

Job在切分完多个Task后,会调用DataX的scheduler模块,根据配置的并发量,将拆分成的多个Task分配到不同的TaskGroup中,每个TaskGroup负责以一定并发运行分配给他的全部Task,每个TaskGroup默认的并发量是5.

4、Task的执行流程

每个Task由TaskGroup启动,每个Task对固定启动Reader—>Channel—>Writer的线程来完成数据同步工作。

DataX Job运行起来后,由Job监控并等待每个TaskGroup的task执行完成,等所有TaskGroup任务执行完成后,Job成功退出。否则,异常退出。

5、DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。

DataX的调度决策思路是:

1)DataX Job根据分库分表切分成了100个Task。

2)由于配置了20个并发,每个TaskGroup默认并发度是5,所以需要4个TaskGroup

3)由4个TaskGroup平均切分100个Task,每个TaskGroup被分到了25个Task,共启动5个并发。

1.3、DataX优势

1、可靠的监控

2、数据转换功能丰富

3、精准的流控

4、同步性能好

5、容错机制健壮

6、使用体验好

二、DataX源码解读

2.1、入口类:Engine

入口类为com.alibaba.datax.core.Engine.java main函数

1、解析args入参:

Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");

BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);

针对命令行参数采用了org.apache.commons的BasicParser解析,针对任务的配置文件则通过其本身的ConfigParser进行解析(可以支持本地和网络文件)。

2、启动Engine
参数启动完毕后,调用Engine.start方法启动

ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);

然后选择是Job模式还是TaskGroup模式:

boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);

实际上基本都是Job模式,后续我们主要以JobContainer为切入点,另一个则为TaskGroupContainer。两者均继承自AbstractContainer基类,并通过调用他们的start方法进行启动。

2.2、JobContainer

JobContainer.start方法是入口

preHander

Job前置操作,即初始化preHandler插件并执行其preHandler

1)init
初始化reader和writer,实际方法中根据读写插件各自执行了对应的初始化方法:

//必须先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);

2)prepare
全局准备工作,比如odpswriter清空目标表。由于读写插件的特殊性质,其方法内部主要也是执行了各类型插件的方法来实现准备工作

this.prepareJobReader();
this.prepareJobWriter();

3)split
拆分Task,参数adviceNumber为建议的拆分数。除此之外我们还可以通过字节和事务的限速来进行控制,从而决定Channel的数量。

  • job.setting.speed.byte:总BPS限速,如果存在值则单个Channel的BPS不能为空,通过总限速除以单个Channel限速得出Channel的需求数量;
  • core.transport.channel.speed.byte:单个Channel的BPS限速;
  • job.setting.speed.record:总TPS限速,如果存在则单个Channel的TPS不能为空,通过总限速除以单个Channel限速得出Channel的需求数量;
  • core.transport.channel.speed.record:单个Channel的TPS限速;

4)schedule
schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中, 同时不同的执行模式调用不同的调度策略,将所有任务调度起来

由于实际任务是由TaskGroupContainer执行,为此我们还需要划分对应TaskGroup需要运行的Task,该参数通过core.container.taskGroup.channel进行配置,默认为5。决定每个Group运行那些Task的则由以下方法进行决定,将直接返回对应任务组的配置参数。

/**
 * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
 */
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
	this.needChannelNumber, channelsPerTaskGroup);

完成任务分配后我们就需要根据运行模式决定调度器,通过这里的源码可以明显看出其DataX 3.0是经过了阉割,仅保留了单机运行模式。

executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);

后续我们仅能描述单机模式下关于任务调度的工作原理:
Step1:调度器初始化的核心方法initStandaloneScheduler

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/22315.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MyBatis基于XML的使用——动态sql

1、动态sql 动态 SQL 是 MyBatis 的强大特性之一。如果你使用过 JDBC 或其它 类似的框架&#xff0c;你应该能理解根据不同条件拼接 SQL 语句有多痛苦&#xff0c;例如拼接时要确保不能忘记添加必要的空格&#xff0c;还要注意去掉列表最后一个列名的逗号。 利用动态 SQL&#…

14服务-ClearDiagnosticlnformation

诊断协议那些事儿 诊断协议那些事儿专栏系列文章&#xff0c;本文介绍存储数据传输服务下的14服务ClearDiagnosticlnformation&#xff0c;客户端使用ClearDiagnosticlnformation服务清除一个或多个服务器存储器中的诊断信息。 关联文章&#xff1a;$19服务:DTCStatusMask和s…

CockroachDB-读和写

本文知识点来源于官网地址https://www.cockroachlabs.com/docs/stable/architecture/reads-and-writes-overview.html 查询执行 当CRDB执行查询时&#xff0c;集群将请求路由到包含相关数据的范围的Leaseholder。如果查询涉及多个范围&#xff0c;则请求将发送给多个Leasehol…

求实数的整数次幂(循环版)(高效)(位运算解题)

求实数的整数次幂(循环版)(高效) (10 分) 原理图&#xff1a; 请编写函数&#xff0c;用循环语句以最快的方法求任意实数的任意整数次幂。 函数原型 double Power(double x, int n); 说明&#xff1a;参数 x 为底数&#xff0c;n 为指数。若参数正确&#xff0c;则函数值为…

智能驾驶开启产业新赛道:资本扎堆布局车规级高精定位

2022年被称为高阶智能驾驶元年的背后&#xff0c;新的产业链正在悄然发展。 车规级高精定位便是其中之一。2022年10月&#xff0c;主业聚焦于动力总成测试的上海华依科技集团股份有限公司&#xff08;以下简称“华依科技”&#xff0c;688071.SH&#xff09;&#xff0c;发布公…

漫画风格迁移神器 AnimeGANv2:快速生成你的漫画形象

生成你的漫画形象&#xff01; 漫画风格迁移神器 AnimeGANv2 文章目录生成你的漫画形象&#xff01; 漫画风格迁移神器 AnimeGANv2快速在线生成你的漫画形象AnimeGAN 简要介绍与其他动漫风格迁移模型的效果对比AnimeGANv2 的优点AnimeGANv2 风格多样化AnimeGANv2 网络结构快速生…

基于stm32单片机的水位检测自动抽水系统

资料编号&#xff1a;106 下面是相关功能视频演示&#xff1a; 106-基于stm32单片机的水位检测自动抽水系统Proteus仿真&#xff08;源码仿真全套资料&#xff09;功能介绍&#xff1a; 使用滑动变阻器模拟水位监测器&#xff0c;通过改变电压值表示水位的变化。stm32通过ADC…

【前端】从 0 到 1 实现一个网站框架(一、注册 [1] )

Hi~你好呀&#xff0c;等你很久啦~ 我是 LStar&#xff0c;一枚来自北京的初二女生&#xff0c;2020 年年初加入 CSDN。 话不多说&#xff0c;直入主题~&#xff08;我现在看两年多前我 11 岁那会发的文章&#xff0c;越看越想笑。为了不让四年后 18 岁的我看着这篇文章露出 …

超详细的mysql多表操作教程

目录 外键约束 概念 特点 操作 多表联合查询 概念 操作 多表操作总结 外键约束 概念 特点 定义一个外键时&#xff0c;需要遵守下列规则&#xff1a; 主表必须已经存在于数据库中&#xff0c;或者是当前正在创建的表。 必须为主表定义主键。 主键不能包含空值&#xf…

967亿销售额!博世解码智能汽车新蓝图

随着新一轮科技革命和产业变革的深化&#xff0c;在低碳化、电动化和智能化的推动下&#xff0c;处于变革关键时期的新能源汽车产业&#xff0c;正逐步由“政策驱动”转向“市场驱动”&#xff0c;智能化、网联化成为新趋势。 据中国汽车工业协会统计&#xff0c;今年我国新能…

通过 Traefik Hub 暴露家里的网络服务

Traefik Hub 简介 &#x1f4da;️Reference: 你的云原生网络平台 -- 发布和加固你的容器从未如此简单。 Traefik Hub 为您在 Kubernetes 或其他容器平台上运行的服务提供一个网关。 Traefik Hub 定位&#xff1a; 云原生网络平台 它有 2 大核心功能&#xff0c;我这次体验感…

pytorch深度学习实战lesson23

第二十三课 AlexNet AlexNet是在2012年被发表的一个金典之作&#xff0c;并在当年取得了ImageNet最好成绩&#xff0c;也是在那年之后&#xff0c;更多的更深的神经网路被提出&#xff0c;比如优秀的vgg,GoogleLeNet. 其官方提供的数据模型&#xff0c;准确率达到57.1%,top 1-5…

认识计算机中的简单指令集

我们现在有了一个新的寄存器&#xff0c;叫做指令寄存器。它包含一个字节&#xff0c;不同的内容表示控制部分的不同操作模式。也被称为指令代码。指令寄存器是一个字节&#xff0c;因此可能有多达256条不同的指令。所有指令都涉及在总线上移动字节。指令将导致字节进出RAM&…

【JavaEE】PCB和进程调度的基本过程

文章目录什么是进程PCB的组成PID内存指针文件描述符表并行和并发进程调度相关属性进程的状态优先级上下文进程的记账信息什么是进程 进程是正在运行的程序的实例&#xff08;an instance of a computer program that is being executed&#xff09; 进程&#xff08;process&am…

《爱的四十条法则》

《爱的四十条法则》 [土]艾丽芙沙法克 作者用别样的手法间接向我们阐述了爱的四十条法则&#xff0c;每一条都会触及不同阶段的灵魂&#xff0c;我仅将文中感触较深的摘录如下&#xff1a; 1.尽管有人这样说&#xff0c;但是爱绝对不是来的快&#xff0c;去的也快的甜蜜感觉而…

长尾分布系列论文解析(二)Delving into Deep Imbalanced Regression

大纲引言回归问题中的长尾分布LDSFDS实验和结果总结引言 本文是长尾分布系列论文解析的第二篇&#xff0c;前情提要详见长尾分布系列论文解析&#xff08;一&#xff09;Decoupling Representation and Classifier for Long-Tailed Recognition&#xff0c;本篇要介绍的是回归任…

弹性力学之边界条件

作者&#xff1a;张伟伟&#xff0c;来源&#xff1a;力学酒吧 弹性力学基本方程包括平衡方程、几何方程和广义胡克定律&#xff0c;其中平衡方程和几何方程都属于微分方程。我们知道&#xff0c;在求解微分方程时&#xff0c;会出现积分常数&#xff0c;只有确定了积分常数&a…

JS —— js中的节流与防抖

文章目录 前言一、节流 1.什么是节流2.做节流可解决什么问题3.如何做节流二、防抖 1.什么是防抖2.做防抖可解决什么问题3.如何做防抖总结前言 最近有同学问到节流与防抖的相关知识点&#xff0c;于是乎&#xff0c;四处查资料&#xff0c;找一找&#xff0c;看一看&#xff0c…

单元测试:会变化的定义

有一种东西&#xff0c; 如果它太小&#xff0c;需要付出的努力就太大&#xff1b;如果它太大&#xff0c;就很难测试。 没错&#xff01;它是单元。 但是什么才是一个好的单元定义呢?为什么它如此重要? 单元的定义对测试过程有很大的影响&#xff0c;但同时单元的定义也是不…

Transformer总结和梳理

Transformer总结和梳理Positional encodingSelf-attentionMulti--head-attentionAdd&NormAdd操作Norm操作FeedForwardMASKPadding MaskedSelf-Attention Masked首先来看一下Transformer结构的结构&#xff1a;Transformer是由Encoder和Decoder两大部分组成&#xff0c;首先…