目录
1.概要
2.简介
3.DataX处理异构数据源
4.DataX的框架
5.DataX的核心架构
6.DataX的安装
7.DataX的使用案例
8.mysql同步到mysql案例
1.概要
本篇文件将介绍一款数据同步工具DataX的原理,安装,以及使用。
2.简介
官网连接:https://github.com/alibaba/DataX
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
异构数据源就是指数据结构、存取方式、形式不一样的数据源,对于异构数据源的数据
处理起来一般是很麻烦的比如,数据格式和结构的差异、数据语义的不一致、数据访问接口的
多样性、数据存储的位置和数据管理的方式等等都使得异构数据源交互起来很困难。
3.DataX处理异构数据源
官方的设计理念
不难看出对于一般的异构数据源同步问题只能使用复杂的网状同步链路,DataX设计理念就是将自身作为一个中间载体,数据源只需要与DataX进行连接即可,降低了各个数据源直接的耦合性
4.DataX的框架
DataX使用FrameWork+Plugin的方式进行架构,也就是插件化的架构方式,这里插件具体就是Reader和Writer一个负责采集数据发送给FrameWork,一个负责从FrameWork中取数据并写入到目的端。
这里重点说一下FrameWork,它连接了reader和writer作为一个数据传输通道,处理缓冲,流控,并发,数据转换等核心技术问题,比如下游写的比较慢就可以在FrameWork中进行缓冲,同时可以控制数据的流量和流速,如果下游消费能力比较强也可以多线程并发的形式同时写数据,同时对于不同的数据源,FrameWork能自动转换数据存储格式。
5.DataX的核心架构
1.用户提交的任务首先会被封装成一个DataX的Job作业
2.之后根据不同的数据源的特性制定切分策略,例如,对于关系型数据库,可以通过指定切分主键(splitPK)来将数据表切分成多个部分,每个部分由一个Task负责同步
3.DataX Job会调用Scheduler模块,根据配置的并发数量,一般一个并发默认处理5个Task,所以会将Task重新组合成TaskGroup组任务。
4.每个Task启动后会启动Reader—>Channel—>Writer的线程来完成任务同步工作。
5.DataX作业运行后,Job监控TaskGroup模块任务,所有的TaskGroup任务完成后Job成功退出,否则,异常退出,进程退出值非0
6.DataX的安装
- 下载DataX压缩包 DataX下载地址
- 解压tar -zxvf datax.tar.gz -C {YOUR_DATAX_HOME}
- $ cd {YOUR_DATAX_HOME}/bin
- 可以看到一个DataX.py 这就是用来执行job的datax脚本
- DataX自检脚本
- python {YOUR_DATAX_HOME}/bin/datax.py{YOUR_DATAX_HOME}/job/job.json
- 执行后可以看到结果显示一系列日志信息,表示安装成功
7.DataX的使用案例
DataX需要用户编写一个json文件,内容大致就是reader和writer的配置
官方提供了不同数据源的模板,我们以mysql写入到hdfs为例
python datax.py -r mysqlreader -w hdfswriter 执行后显示模板
{ "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [], "sliceRecordCount": "" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "", "print": true } } } ], "setting": { "speed": { "channel": "" } } } }
8.mysql同步到hdfs案例
DataX提供了两种同步方式一种是以配置为主的TableMode,另外一种是以sql为主的QuerySQLMode
TableMode
- 注意同步到hdfs的目标路径必须已经存在,否则同步失败
- hdfs配置列是为例后续在hive或者spark使用方便,保证数据一致性
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
],
"where": "id>=3",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/test"
],
"table": [
"test_tablename"
]
}
],
"password": "root",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "table_province",
"fileType": "text",
"path": "/table_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
QuerySQLMode
mysql源写sql就可以了
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"querySql": [
"select id,name from test_tablename where id>=3"
]
}
],
"password": "root",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "table_province",
"fileType": "text",
"path": "/table_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}