文章目录
- 概述
- 定义
- 与Sqoop对比
- 框架设计
- 支持插件
- 核心架构
- 核心优势
- 部署
- 基础环境
- 安装
- 从stream读取数据并打印到控制台
- 读取MySQL写入HDFS
- 读取HDFS写入MySQL
- 执行流程
概述
定义
DataX 官网地址 https://maxwells-daemon.io/
DataX GitHub源码地址 https://github.com/alibaba/DataX
DataX 是Alibaba集团下阿里云 DataWorks数据集成的开源版本,用作异构数据源离线同步工具或平台;其实现了如 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、HBase、ClickHouse 等各种异构数据源之间稳定高效的数据同步功能。本文全部内容只对最新框架3.0系列说明,最新版本为datax_v202210
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步;基于插件式扩展能力上可以说DataX框架具备支持任意数据源类型的数据同步工作的能力。
与Sqoop对比
Apache Sqoop™是一种用于在Apache Hadoop和结构化数据存储(如关系数据库)之间高效传输批量数据的工具,最新的稳定版本是1.4.7,而其Sqoop2的最新版本是1.99.7,但是1.99.7与1.4.7不兼容,而且特性不完整,因此Sqoop2不用于生产部署。Sqoop1.4.7在2017年后就没有再更新,不是说Sqoop不好,是官方已没有需要修复的问题,稳定,据说项目PMC也都解散了。如果业务只需要对关系数据库同步的HDFS(还包括hive、hbase),使用sqoop也是可以的。Sqoop也可以实现增量数据同步,比如通过查询的sql中增加时间过滤字段,也可以结合自身job记住带有单调递增的编号字段实现增量同步。
虽然说DataX是单机版压力大,但可以通过手工调度系统布置多个节点分开配置来实现类似多台分布式处理,提高处理能力。
框架设计
DataX框架设计也比较简单,与其他数据采集框架如Flume相似,采用Framework + plugin架构构建;将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer:为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
支持插件
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据、时序数据库等都已经接入;DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源,具体数据源使用说明根据需要点击读或写查看使用详细介绍。下面支持类型就在DataX GitHub主页READERME上。
核心架构
DataX 支持单机多线程模式完成同步作业运行,这里以一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
核心模块介绍:
- DataX中完成单个数据同步的作业称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
- DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。
DataX调度流程拿一个举例,比如用户提交了一个DataX作业并配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
- DataXJob根据分库分表切分成了100个Task。
- 根据20个并发,DataX计算共需要分配4个TaskGroup。
- 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
核心优势
- 可靠的数据质量监控
- 完美解决数据传输个别类型失真问题:支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。
- 提供作业全链路的流量、数据量运行时监控:DataX运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,可以实时了解作业状态;并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予更多性能排查信息。
- 提供脏数据探测:在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,提供多种的脏数据处理模式,准确把控数据质量大关。
- 丰富的数据转换功能
- DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。
- 精准的速度控制
- DataX提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在库可以承受的范围内达到最佳的同步速度。
- 强劲的同步性能
- DataX每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡;性能测试相关详情可以参照每单个数据源的详细介绍。
- 健壮的容错机制
- DataX3可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
- 线程内部重试:DataX的核心插件都经过全盘review,不同的网络交互方式都有不同的重试策略。
- 线程级别重试:目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。
- DataX3可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
- 极简的使用体验
- 易用:下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。
- 详细:DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。
- 传输过程中打印传输速度、进度等。
- 传输过程中会打印进程相关的CPU、JVM等
- 在任务结束之后,打印总体运行情况
部署
基础环境
- linux
- JDK(1.8以上,推荐1.8,最好也使用1.8,jdk11有些场景如hdfs会报错)
- Python(2或3都可以)
- Apache Maven 3.x(如果需要源码编译安装)
安装
# 下载最新版本datax_v202210的datax
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz
# 解压文件
tar -xvf datax.tar.gz
# 进入根目录
cd datax/
# 自检脚本
python ./bin/datax.py ./job/job.json
从stream读取数据并打印到控制台
创建json格式作业的配置文件,可以通过查看配置模板示例
python bin/datax.py -r streamreader -w streamwriter
在job目录下创建stream2stream.json,vim stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,welcome to DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
# 执行job
python bin/datax.py job/stream2stream.json
读取MySQL写入HDFS
可以通过GitHub找到支持数据通道并通过查阅读、写相关文档,非常详细,不仅包含实现原理、功能说明、约束限制,还对每一种数据通道提供了性能测试报告,可见DataX是把性能做到了极致。参数的说明
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gtc3fCRv-1671803026649)(image-20221223135319256.png)]
需要同步数据表为test数据库的student表
在job目录下创建mysql2hdfs.json,vim job/mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"age"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://hadoop3:3308/test"],
"table": ["student"]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "INT"
}
],
"defaultFS": "hdfs://hadoop1:9000",
"fieldDelimiter": "\t",
"fileName": "student.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
# 执行job
python bin/datax.py job/mysql2hdfs.json
从控制台的日志打印可以看到这个job写入hdfs时先写入临时文件,全部成功则修改文件名和路径;如果个别失败则整个job失败,删除临时路径。查看hdfs上可以看到文件已经写入成功,并且固定加了一串后缀
点击文件查看内容和间隔符也是正确的
如果是HA模式可以hadoopConfig里配置
"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.aliDfs.namenode1": "",
"dfs.namenode.rpc-address.aliDfs.namenode2": "",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
读取HDFS写入MySQL
创建一张同样表结构的student1表,在job目录下创建hdfs2mysql.json,vim job/hdfs2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": ["*"],
"defaultFS": "hdfs://hadoop1:9000",
"encoding": "UTF-8",
"fieldDelimiter": "\t",
"fileType": "text",
"path": "/student.txt__6eeb1730_21bd_40e9_a360_16de5396b140"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name",
"age"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop3:3308/test?useUnicode=true&characterEncoding=gbk",
"table": ["student1"]
}
],
"password": "123456",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
# 由于我的mysql是8,因此需要将plugin/writer/mysqlwriter/libs的mysql-connector-java-5.1.47.jar替换为高版本,这里就直接使用mysql-connector-java-8.0.29.jar
rm plugin/writer/mysqlwriter/libs/mysql-connector-java-5.1.47.jar
cp mysql-connector-java-8.0.29.jar plugin/writer/mysqlwriter/libs/
# 执行job
python bin/datax.py job/hdfs2mysql.json
查看student1表已经有4条包含指定3个字段的数据
执行流程
- 解析配置,包括job.json、core.json、plugin.json三个配置
- 设置jobId到configuration当中
- 启动Engine,通过Engine.start()进入启动程序
- 设置RUNTIME_MODE configuration当中
- 通过JobContainer的start()方法启动
- 依次执行job的preHandler()、init()、prepare()、split()、schedule()、post()、postHandle()等方法。
- init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息
- prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来
- split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型
- channel的计数主要是根据byte和record的限速来实现的,在split()的函数中第一步就是计算channel的大小
- split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回
- split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置
- schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量
- schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。scheduler的具体实现类为ProcessInnerScheduler。
- taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务。
- 本人博客网站IT小神 www.itxiaoshen.com