文章目录
- 什么是 DataX?
- DataX 设计框架
- DataX 核心架构
- DataX 部署
- DataX 数据同步(MySQL —> HDFS)
什么是 DataX?
DataX 是阿里巴巴集团开源的、通用的数据抽取工具,广泛使用的离线数据同步工具/平台。它设计用于支持多种数据源之间的高效数据传输,可以实现不同数据源之间的数据同步、迁移、ETL(抽取、转换、加载)等数据操作。
主要特点和功能包括:
-
多数据源支持:DataX支持多种数据源,包括关系型数据库(如 MySQL、Oracle、SQL Server)、NoSQL 数据库(如 MongoDB、HBase、Redis)、HDFS、FTP、Hive 等。
-
扩展性:DataX具有良好的扩展性,可以通过编写插件来支持新的数据源或者数据目的地,以满足不同数据存储系统的需求。
-
灵活配置:通过配置文件,用户可以灵活地定义数据抽取任务,包括数据源、目的地、字段映射、转换规则等,以适应不同的数据处理需求。
-
高效传输:DataX 采用分布式、并行的数据传输策略,可以将数据以高效、快速的方式传输到目标数据源,提高数据处理的效率。
-
任务调度和监控:DataX 提供了任务调度和监控的功能,可以通过控制台查看任务运行情况,监控任务的健康状态,以及处理异常情况。
-
开源社区支持:DataX 是开源项目,拥有活跃的开源社区支持,可以获取到丰富的文档、示例和开发者社区的帮助。
DataX 设计框架
DataX 本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,理论上 DataX 框架可以支持任意数据源类型的数据同步工作。同时 DataX 插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
DataX 本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。
-
Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
-
Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。
-
Framework:Framework 用于连接
reader
和writer
,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
DataX 核心架构
-
DataX 完成单个数据同步的作业,我们称之为 Job,DataX 接受到一个 Job 之后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
-
DataXJob 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task(子任务),以便于并发执行。Task 便是 DataX 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。
-
切分多个 Task 之后,DataX Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。
-
每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为
5
。 -
DataX 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非
0
。
DataX 部署
DataX 3.0
下载地址
1. 解压压缩包
tar -zxvf datax.tar.gz -C /opt/module/
2. 配置环境变量
sudo vim /etc/profile.d/my.sh
# 添加如下内容:
#DATAX_HOME
export DATAX_HOME=/opt/module/datax
刷新环境变量:source /etc/profile.d/my.sh
3. 运行测试程序
运行 DataX 自带的测试程序。
cd $DATAX_HOME
python bin/datax.py job/job.json
其中 datax.py
是执行的主程序,job.json
中存储的是数据源插件配置(其中有数据源的连接信息、存储信息、配置信息等)。
正常运行完成后,会看到输出相关日志内容:
DataX 无需进行其它配置,解压后即可快速使用。
DataX 数据同步(MySQL —> HDFS)
1.创建 MySQL 测试库表
CREATE DATABASE `test_datax` CHARACTER SET 'utf8mb4';
USE `test_datax`;
-- 商品属性表
DROP TABLE IF EXISTS sku_info;
CREATE TABLE sku_info(
`sku_id` varchar(100) COMMENT "商品id",
`name` varchar(200) COMMENT "商品名称",
`category_id` varchar(100) COMMENT "所属分类id",
`from_date` varchar(100) COMMENT "上架日期",
`price` decimal(10,2) COMMENT "商品单价"
) COMMENT "商品属性表";
insert into sku_info
values ('1', 'xiaomi 10', '1', '2020-01-01', 2000),
('2', '手机壳', '1', '2020-02-01', 10),
('3', 'apple 12', '1', '2020-03-01', 5000),
('4', 'xiaomi 13', '1', '2020-04-01', 6000),
('5', '破壁机', '2', '2020-01-01', 500),
('6', '洗碗机', '2', '2020-02-01', 2000),
('7', '热水壶', '2', '2020-03-01', 100),
('8', '微波炉', '2', '2020-04-01', 600),
('9', '自行车', '3', '2020-01-01', 1000),
('10', '帐篷', '3', '2020-02-01', 100),
('11', '烧烤架', '3', '2020-02-01', 50),
('12', '遮阳伞', '3', '2020-03-01', 20);
2.配置 DataX 插件
我们进入 DataX 官网,下滑找到对应组件插件的文档。
我们这里是从 MySQL 中读数据,然后存储到 HDFS 上,所以我们这里就先去查找 MySQL Reader 插件文档。
然后就会找到一个 MySQL Reader 的插件配置样例,如下所示:
{
"job": {
"setting": {
// 指定通道数量(并发)
"speed": {
"channel": 3
},
// 允许的误差范围
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
// 读取配置
"reader": {
// 固定写法——mysqlreader
"name": "mysqlreader",
"parameter": {
// 账号密码
"username": "root",
"password": "root",
// 选取需要进行同步的字段
"column": [
"id",
"name"
],
// 通过数据切片进行数据同步
"splitPk": "db_id",
// 指定库表连接信息
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
// 写入配置
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
了解 MySQL Reader 插件配置后,我们现在来学习 HDFS Writer 插件配置如何编写。
HDFS Writer 插件配置样例如下所示:
{
"setting": {},
"job": {
// 指定通道数量(并发)
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
// 读取配置
"reader": {
// 固定写法——txtfilereader
"name": "txtfilereader",
// 指定读取路径、编码、字段
"parameter": {
"path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "long"
},
{
"index": 3,
"type": "long"
},
{
"index": 4,
"type": "DOUBLE"
},
{
"index": 5,
"type": "DOUBLE"
},
{
"index": 6,
"type": "STRING"
},
{
"index": 7,
"type": "STRING"
},
{
"index": 8,
"type": "STRING"
},
{
"index": 9,
"type": "BOOLEAN"
},
{
"index": 10,
"type": "date"
},
{
"index": 11,
"type": "date"
}
],
"fieldDelimiter": "\t"
}
},
// 写入配置
"writer": {
// 固定写法——hdfswriter
"name": "hdfswriter",
// 指定存储路径、格式、文件前缀名、字段
"parameter": {
"defaultFS": "hdfs://xxx:port",
"fileType": "orc",
"path": "/user/hive/warehouse/writerorc.db/orcfull",
"fileName": "xxxx",
"column": [
{
"name": "col1",
"type": "TINYINT"
},
{
"name": "col2",
"type": "SMALLINT"
},
{
"name": "col3",
"type": "INT"
},
{
"name": "col4",
"type": "BIGINT"
},
{
"name": "col5",
"type": "FLOAT"
},
{
"name": "col6",
"type": "DOUBLE"
},
{
"name": "col7",
"type": "STRING"
},
{
"name": "col8",
"type": "VARCHAR"
},
{
"name": "col9",
"type": "CHAR"
},
{
"name": "col10",
"type": "BOOLEAN"
},
{
"name": "col11",
"type": "date"
},
{
"name": "col12",
"type": "TIMESTAMP"
}
],
// 指定存储模式
"writeMode": "append",
// 指定存储间隔符
"fieldDelimiter": "\t",
// 指定数据压缩模式
"compress":"NONE"
}
}
}
]
}
}
3. 编写 DataX 同步 MySQL 数据到 HDFS 插件配置
cd $DATAX_HOME/job
vim test.json
添加下列内容:
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "000000",
"column": [
"name",
"sku_id",
"category_id",
"from_date",
"price"
],
"splitPk": "",
"connection": [
{
"table": [
"sku_info"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop120:8020",
"fileType": "text",
"path": "/testInputPath",
"fileName": "sku_info",
"column": [
{
"name": "name",
"type": "STRING"
},
{
"name": "sku_id",
"type": "STRING"
},
{
"name": "category_id",
"type": "STRING"
},
{
"name": "from_date",
"type": "STRING"
},
{
"name": "price",
"type": "DOUBLE"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"compress":"gzip"
}
}
}
]
}
}
上面的插件配置信息表明,将 MySQL 中 test_datax.sku_info
表的全量数据同步到 HDFS 中的 /testInputPath
路径下,如果指定输出文件类型为 text
,那么必须指定压缩模式为 gzip
或 bzip2
。
启动 Hadoop 集群,提前创建 HDFS 中的存储路径
hadoop fs -mkdir /testInputPath
运行 DataX,进行数据同步
cd $DATAX_HOME
python bin/datax.py job/test.json
执行完成后如下所示:
在 HDFS 上查看存储路径下的文件:
如果想要直接查看 HDFS 文件中存储的内容,可以运行下列命令:
hadoop fs -cat /testInputPath/* | zcat
其它数据源数据同步方式也基本上差不多,多看看官方插件配置文档就会了。
更多内容请查看:DataX 官网