比Sqoop功能更加强大开源数据同步工具DataX实战

news2025/1/11 6:12:31

文章目录

  • 概述
    • 定义
    • 与Sqoop对比
    • 框架设计
    • 支持插件
    • 核心架构
    • 核心优势
  • 部署
    • 基础环境
    • 安装
    • 从stream读取数据并打印到控制台
    • 读取MySQL写入HDFS
    • 读取HDFS写入MySQL
  • 执行流程

概述

定义

DataX 官网地址 https://maxwells-daemon.io/

DataX GitHub源码地址 https://github.com/alibaba/DataX

DataX 是Alibaba集团下阿里云 DataWorks数据集成的开源版本,用作异构数据源离线同步工具或平台;其实现了如 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、HBase、ClickHouse 等各种异构数据源之间稳定高效的数据同步功能。本文全部内容只对最新框架3.0系列说明,最新版本为datax_v202210

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步;基于插件式扩展能力上可以说DataX框架具备支持任意数据源类型的数据同步工作的能力。

image-20221220095956222

与Sqoop对比

Apache Sqoop™是一种用于在Apache Hadoop和结构化数据存储(如关系数据库)之间高效传输批量数据的工具,最新的稳定版本是1.4.7,而其Sqoop2的最新版本是1.99.7,但是1.99.7与1.4.7不兼容,而且特性不完整,因此Sqoop2不用于生产部署。Sqoop1.4.7在2017年后就没有再更新,不是说Sqoop不好,是官方已没有需要修复的问题,稳定,据说项目PMC也都解散了。如果业务只需要对关系数据库同步的HDFS(还包括hive、hbase),使用sqoop也是可以的。Sqoop也可以实现增量数据同步,比如通过查询的sql中增加时间过滤字段,也可以结合自身job记住带有单调递增的编号字段实现增量同步。

image-20221220111210753

虽然说DataX是单机版压力大,但可以通过手工调度系统布置多个节点分开配置来实现类似多台分布式处理,提高处理能力。

框架设计

image-20221220100428396

DataX框架设计也比较简单,与其他数据采集框架如Flume相似,采用Framework + plugin架构构建;将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer:为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

支持插件

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据、时序数据库等都已经接入;DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源,具体数据源使用说明根据需要点击读或写查看使用详细介绍。下面支持类型就在DataX GitHub主页READERME上。

image-20221220103041502

核心架构

DataX 支持单机多线程模式完成同步作业运行,这里以一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

image-20221220103535381

核心模块介绍:

  • DataX中完成单个数据同步的作业称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

DataX调度流程拿一个举例,比如用户提交了一个DataX作业并配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  • DataXJob根据分库分表切分成了100个Task。
  • 根据20个并发,DataX计算共需要分配4个TaskGroup。
  • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

核心优势

  • 可靠的数据质量监控
    • 完美解决数据传输个别类型失真问题:支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。
    • 提供作业全链路的流量、数据量运行时监控:DataX运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,可以实时了解作业状态;并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予更多性能排查信息。
    • 提供脏数据探测:在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,提供多种的脏数据处理模式,准确把控数据质量大关。
  • 丰富的数据转换功能
    • DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。
  • 精准的速度控制
    • DataX提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在库可以承受的范围内达到最佳的同步速度。
  • 强劲的同步性能
    • DataX每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡;性能测试相关详情可以参照每单个数据源的详细介绍。
  • 健壮的容错机制
    • DataX3可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
      • 线程内部重试:DataX的核心插件都经过全盘review,不同的网络交互方式都有不同的重试策略。
      • 线程级别重试:目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。
  • 极简的使用体验
    • 易用:下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。
    • 详细:DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。
      • 传输过程中打印传输速度、进度等。
      • 传输过程中会打印进程相关的CPU、JVM等
      • 在任务结束之后,打印总体运行情况

部署

基础环境

  • linux
  • JDK(1.8以上,推荐1.8,最好也使用1.8,jdk11有些场景如hdfs会报错)
  • Python(2或3都可以)
  • Apache Maven 3.x(如果需要源码编译安装)

安装

# 下载最新版本datax_v202210的datax
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz
# 解压文件
tar -xvf datax.tar.gz
# 进入根目录
cd datax/
# 自检脚本
python ./bin/datax.py ./job/job.json

从stream读取数据并打印到控制台

创建json格式作业的配置文件,可以通过查看配置模板示例

python bin/datax.py -r streamreader -w streamwriter

image-20221223105936102

在job目录下创建stream2stream.json,vim stream2stream.json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,welcome to DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}
# 执行job
python bin/datax.py job/stream2stream.json

image-20221223112141856

读取MySQL写入HDFS

可以通过GitHub找到支持数据通道并通过查阅读、写相关文档,非常详细,不仅包含实现原理、功能说明、约束限制,还对每一种数据通道提供了性能测试报告,可见DataX是把性能做到了极致。参数的说明

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gtc3fCRv-1671803026649)(image-20221223135319256.png)]

需要同步数据表为test数据库的student表

image-20221223135208968

在job目录下创建mysql2hdfs.json,vim job/mysql2hdfs.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "age"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop3:3308/test"],
                                "table": ["student"]
                            }
                        ],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            }
                        ],
                        "defaultFS": "hdfs://hadoop1:9000",
                        "fieldDelimiter": "\t",
                        "fileName": "student.txt",
                        "fileType": "text",
                        "path": "/",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
# 执行job
python bin/datax.py job/mysql2hdfs.json

image-20221223155520036

从控制台的日志打印可以看到这个job写入hdfs时先写入临时文件,全部成功则修改文件名和路径;如果个别失败则整个job失败,删除临时路径。查看hdfs上可以看到文件已经写入成功,并且固定加了一串后缀

image-20221223160056579

点击文件查看内容和间隔符也是正确的

image-20221223160227473

如果是HA模式可以hadoopConfig里配置

 "hadoopConfig":{
         "dfs.nameservices": "testDfs",
         "dfs.ha.namenodes.testDfs": "namenode1,namenode2",
         "dfs.namenode.rpc-address.aliDfs.namenode1": "",
         "dfs.namenode.rpc-address.aliDfs.namenode2": "",
         "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 }

读取HDFS写入MySQL

创建一张同样表结构的student1表,在job目录下创建hdfs2mysql.json,vim job/hdfs2mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": ["*"],
                        "defaultFS": "hdfs://hadoop1:9000",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "fileType": "text",
                        "path": "/student.txt__6eeb1730_21bd_40e9_a360_16de5396b140"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "age"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop3:3308/test?useUnicode=true&characterEncoding=gbk",
                                "table": ["student1"]
                            }
                        ],
                        "password": "123456",
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
# 由于我的mysql是8,因此需要将plugin/writer/mysqlwriter/libs的mysql-connector-java-5.1.47.jar替换为高版本,这里就直接使用mysql-connector-java-8.0.29.jar
rm plugin/writer/mysqlwriter/libs/mysql-connector-java-5.1.47.jar
cp mysql-connector-java-8.0.29.jar plugin/writer/mysqlwriter/libs/
# 执行job
python bin/datax.py job/hdfs2mysql.json

image-20221223162343967

查看student1表已经有4条包含指定3个字段的数据

image-20221223162514548

执行流程

image-20221223162656056

  • 解析配置,包括job.json、core.json、plugin.json三个配置
  • 设置jobId到configuration当中
  • 启动Engine,通过Engine.start()进入启动程序
  • 设置RUNTIME_MODE configuration当中
  • 通过JobContainer的start()方法启动
  • 依次执行job的preHandler()、init()、prepare()、split()、schedule()、post()、postHandle()等方法。
  • init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息
  • prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来
  • split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型
  • channel的计数主要是根据byte和record的限速来实现的,在split()的函数中第一步就是计算channel的大小
  • split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回
  • split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置
  • schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量
  • schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。scheduler的具体实现类为ProcessInnerScheduler。
  • taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务。
  • 本人博客网站IT小神 www.itxiaoshen.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/110893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SaaS是什么?企业采购SaaS有什么好处?

简单的来讲讲我们对SaaS的理解吧。过去通常来说,我们采购企业使用的产品,通常有比如传统的软件包下载、按照自己的需求找开发商定制、如果有自研能力的团队可以自己去开发。但是这样就比如带来很多问题: 比如业务规则更新了怎么办&#xff1…

Protobuf 了解

Protocol Buffers 是一种结构数据序列化方法,可以将C中定义的存储类的内容与二进制序列串相互转换,主要用于数据传输或数据存储,可类比XML、JSON,而XML、JSON基于文本格式,protobuf是二进制格式,所以比XML、…

【论文阅读总结】inception v4与Inception-ResNet总结

Inception-v4, Inception-ResNet和Residual connections对学习的影响1.摘要2.引言3.文献综述4.体系结构的选择4.1 Pure Inception4.2 Residual Inception Blocks【残差Inception 块】4.3 Scaling of the Residuals【残差的缩放】4.3.1实验发现4.3.1.1实验发现14.3.1.2实验发现2…

ERP容灾备份维护工作有哪些?服务器容灾备份

ERP维护工作有哪些?这是公司信息化专员工作职责:信息规划  1、参与 公司信息化系统总体构架,建立健全公司信息化各项管理制度和标准业务流程,组织公司各业务部门不断进行业务流程的梳理、优化和创新,推动信息化的持续…

408 考研《操作系统》第三章第二节:内存管理、覆盖和交换 、连续分配管理方式、动态分区分配算法

文章目录1. 内存管理的概念1.1 内存保护1.2 总结2. 覆盖与交换2.1 覆盖技术2.2 交换技术2.3 总结:3. 连续分配管理方式3.1 单一连续分配3.2 固定分区分配3.3 动态分区分配3.4 总结4. 动态分区分配算法4.1 首次适应算法4.2 最佳适应算法4.3 最坏适应算法4.4 邻近适应…

米尔基于ARM架构核心板的国产化EtherCAT主站控制器解决方案

EtherCAT是由德国BECKHOFF自动化公司于2003年提出的实时工业以太网技术。它具有高速和高数据有效率的特点,支持多种设备连接拓扑结构。其从站节点使用专用的控制芯片,主站使用标准的以太网控制器。 EtherCAT是一种工业以太网技术,看到的大多…

【RocketMQ】RocketMQ实例--顺序消息

1、应用场景 一、以证券股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。 二、以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作&…

RK3568烧录系统

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录前言一、烧录工具二、烧录步骤单独烧录烧录整个固件总结前言 前面我们已经学会了编译系统,包括一键编译和单独编译,其中单独编译适合我们做驱…

ENSP 路由器到本地(现实)PC的FTP实验

前言: 在一个风和日丽的下午,我同事突然问我ENSP和本地PC怎么上传和下载文件?我本以为这个很简单,然后我开始了面向百度编程,但是网上的大多数都是ENSP里面的路由器、服务器和PC间的FTP实验,而不是到本地&…

嵌入式经典通信总线协议:SPI协议

目录 一、spi简介 二、SPI特性 三、spi四种工作方式 四、实现代码 1.选择开发板 2.选择SPI 3.设置硬件片选使能和通讯方式,其他根据需要选择 4. 生成代码 一、spi简介 SPI 是英语Serial Peripheral interface的缩写,顾名思义就是串行外围设备接口…

GIT:如何删除仓库中的.idea .DS_Store target文件/文件夹并设置下次不上传

0. 引言 我们常常会有在git仓库初始化时,忘记设置.gitignore文件导致一些非工程文件上传到仓库中了,导致整个仓库的不美观,甚至影响其他开发同事配置代码。这时候我们就需要删除这些指定文件,那么如何操作呢,这一章我…

php学生成绩管理系统,在线录入、统计学生成绩,多种图表展示对比学生成绩

教学质量是学校教学的生命线,只有能够客观分析自己教学成败得失的教师才是一个合格的老师。这是一款注重优化成绩采集方法、丰富成绩分析维度的小学成绩统计系统,力争做到符合教师工作习惯、使用方法简单、数据分析多样、分析结果科学,为教师…

垃圾回收机制之v8引擎

v8的内存分配 (栈(执行环境)跟堆) 堆内存负责垃圾回收机制,只有新生代和老生代两部分 新生代:对等分的(严格) 老生代: 都是由新生代转变的(连续的空间&…

Vue 实现 html 表格 (grid) 单元格编辑功能 2

第一版表格编辑实现是刚学VUE时硬凑出来 点击详见 经过网上的不断学习,代码精简功能增强,克服了上一个版本的两个bug。 欢迎没有下载积分的朋友欢迎复制转载。 主要功能: 由于取消了 vue 循环的 key 更新,故单元格不需要点击两…

Redis常见面试题(六)

目录 1、Redis支持的Java客户端有哪些? 2、Redisson是什么框架? 3、Redis和Redisson有什么关系? 4、Jedis和Redisson对比有什么优缺点? 5、Redis为什么不提供Windows版本? 6、Redis怎么在Windows下使用? 7、Redis如何设置密码访问? 8、Redis如何分析慢查询操作?…

前端线上问题如何调试

记录Vue开发过程中遇到的问题,测试环境以及本地显示都没有问题,但是一上线就出现问题,于是对于这个问题进行排查,在此记录排查问题的步骤以及方法,希望对大家有帮助。 错误信息:Uncaught TypeError: Canno…

Redis常见面试题(七)

目录 1、什么是缓存预热? 2、什么是缓存热备? 3、什么是缓存雪崩? 4、如何解决缓存雪崩? 5、什么是缓存穿透? 6、如何解决缓存穿透? 7、什么是缓存击穿? 8、如何解决缓存击穿? 9、什么是缓存抖动? 10、如何解决缓存抖动? 11、什么是缓存无底洞? 12、如何…

如何让一个 C 语言项目调用另一个 C++ 项目中某些类所提供的接口?

目前問題是這樣的:有兩個項目 一個項目是用 C 寫的 裏面提供了一個輸入輸出接口 後來從外面弄來了另外一個項目 用 C 寫的 現在需要將 C 項目中所使用的原有接口替換為使用我們的 C 項目中提供的接口 求問能夠實現否? 在项目开发过程中,我们底…

XGBoost总结

1.算法原理 XGBoost是boosting算法的其中一种。Boosting算法的思想是将许多弱分类器集成在一起形成一个强分类器。因为XGBoost是一种提升树模型,该算法思想就是不断地添加树,不断地进行特征分裂来生长一棵树,每次添加一个树,其实…

CSS3之3D转换

文章目录一、3D移动translate3d二、perspective(透视)三、translateZ四、rotateX-rotateY-rotateZ五、rotate3d(x,y,z,deg)六、3D呈现transfrom-style七、旋转木马案例一、3D移动translate3d 3D移动在2D移动的基础上多加了一个可以移动的方向&#xff0…