1、DataX使用
1.1、DataX任务提交命令
DataX的使用十分简单,用户只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可。
[song@hadoop102 datax]$ python /opt/model/datax/bin/datax.py /opt/model/datax/job/job.json
1.2、DataX配置文件格式
可以使用如下命名查看DataX配置文件模板。
[song@hadoop102 datax]$ python bin/datax.py -r mysqlreader -w hdfswriter
配置文件模板如下:
json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
Reader和Writer的具体参数可参考官方文档,地址如下:
Reader和Writer的具体参数参考官方文档
2、DataX使用案例
2.1、同步MySQL数据到HDFS案例
2.1.1、案例要求
同步gmall数据库中base_province表数据到HDFS的/base_province目录
2.1.2、需求分析
要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode
和QuerySQLMode
,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。下面分别使用两种模式进行演示。
2.1.2.1、MySQLReader之TableMode
- 数据准备:Mysql数据表
/*
Navicat Premium Data Transfer
Source Server : hadoop102
Source Server Type : MySQL
Source Server Version : 80026
Source Host : 192.168.10.102:3306
Source Schema : gmall
Target Server Type : MySQL
Target Server Version : 80026
File Encoding : 65001
Date: 04/02/2023 13:53:48
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for base_province
-- ----------------------------
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
`id` bigint(0) NULL DEFAULT NULL COMMENT 'id',
`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '省名称',
`region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '大区id',
`area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '行政区位码',
`iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '国际编码',
`iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'ISO3166编码'
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of base_province
-- ----------------------------
INSERT INTO `base_province` VALUES (1, '北京', '1', '110000', 'CN-11', 'CN-BJ');
INSERT INTO `base_province` VALUES (2, '天津', '1', '120000', 'CN-12', 'CN-TJ');
INSERT INTO `base_province` VALUES (3, '山西', '1', '140000', 'CN-14', 'CN-SX');
INSERT INTO `base_province` VALUES (4, '内蒙古', '1', '150000', 'CN-15', 'CN-NM');
INSERT INTO `base_province` VALUES (5, '河北', '1', '130000', 'CN-13', 'CN-HE');
INSERT INTO `base_province` VALUES (6, '上海', '2', '310000', 'CN-31', 'CN-SH');
INSERT INTO `base_province` VALUES (7, '江苏', '2', '320000', 'CN-32', 'CN-JS');
INSERT INTO `base_province` VALUES (8, '浙江', '2', '330000', 'CN-33', 'CN-ZJ');
INSERT INTO `base_province` VALUES (9, '安徽', '2', '340000', 'CN-34', 'CN-AH');
INSERT INTO `base_province` VALUES (10, '福建', '2', '350000', 'CN-35', 'CN-FJ');
INSERT INTO `base_province` VALUES (11, '江西', '2', '360000', 'CN-36', 'CN-JX');
INSERT INTO `base_province` VALUES (12, '山东', '2', '370000', 'CN-37', 'CN-SD');
INSERT INTO `base_province` VALUES (14, '台湾', '2', '710000', 'CN-71', 'CN-TW');
INSERT INTO `base_province` VALUES (15, '黑龙江', '3', '230000', 'CN-23', 'CN-HL');
INSERT INTO `base_province` VALUES (16, '吉林', '3', '220000', 'CN-22', 'CN-JL');
INSERT INTO `base_province` VALUES (17, '辽宁', '3', '210000', 'CN-21', 'CN-LN');
INSERT INTO `base_province` VALUES (18, '陕西', '7', '610000', 'CN-61', 'CN-SN');
INSERT INTO `base_province` VALUES (19, '甘肃', '7', '620000', 'CN-62', 'CN-GS');
INSERT INTO `base_province` VALUES (20, '青海', '7', '630000', 'CN-63', 'CN-QH');
INSERT INTO `base_province` VALUES (21, '宁夏', '7', '640000', 'CN-64', 'CN-NX');
INSERT INTO `base_province` VALUES (22, '新疆', '7', '650000', 'CN-65', 'CN-XJ');
INSERT INTO `base_province` VALUES (23, '河南', '4', '410000', 'CN-41', 'CN-HA');
INSERT INTO `base_province` VALUES (24, '湖北', '4', '420000', 'CN-42', 'CN-HB');
INSERT INTO `base_province` VALUES (25, '湖南', '4', '430000', 'CN-43', 'CN-HN');
INSERT INTO `base_province` VALUES (26, '广东', '5', '440000', 'CN-44', 'CN-GD');
INSERT INTO `base_province` VALUES (27, '广西', '5', '450000', 'CN-45', 'CN-GX');
INSERT INTO `base_province` VALUES (28, '海南', '5', '460000', 'CN-46', 'CN-HI');
INSERT INTO `base_province` VALUES (29, '香港', '5', '810000', 'CN-91', 'CN-HK');
INSERT INTO `base_province` VALUES (30, '澳门', '5', '820000', 'CN-92', 'CN-MO');
INSERT INTO `base_province` VALUES (31, '四川', '6', '510000', 'CN-51', 'CN-SC');
INSERT INTO `base_province` VALUES (32, '贵州', '6', '520000', 'CN-52', 'CN-GZ');
INSERT INTO `base_province` VALUES (33, '云南', '6', '530000', 'CN-53', 'CN-YN');
INSERT INTO `base_province` VALUES (13, '重庆', '6', '500000', 'CN-50', 'CN-CQ');
INSERT INTO `base_province` VALUES (34, '西藏', '6', '540000', 'CN-54', 'CN-XZ');
SET FOREIGN_KEY_CHECKS = 1;
- 创建配置文件base_province.json
- 配置文件内容如下
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"where": "id>=3",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"table": [
"base_province"
]
}
],
"password": "123456",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
- 参数说明
-
Reader参数说明
-
Writer参数说明
-
Setting参数说明
-
注意事项:
当Mysql中存在Null值的时候。默认情况下写入到HFDS中的是以空字符串(‘’")进行存储的,而Hive默认的null值存储格式为“\N”,当HDFS中的null值是“\N”存储的,Hive可以展示位null值,但是HFDS Writer并未提供nullFormat参数,现在出现的问题是:当mysql中的Null存储到hdfs是空字符串,hive读取hdfs中的空字符串,显示的也是空字符串,造成两种数据格式不一致。
解决该问题的方案有两个:
- 修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑,可参考添加链接描述。
- 在Hive中建表时指定null值存储格式为空字符串(‘’ ‘’),例如
DROP TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '省份名称',
`region_id` STRING COMMENT '地区ID',
`area_code` STRING COMMENT '地区编码',
`iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
) COMMENT '省份表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/base_province/';
- 提交任务
- 在HDFS创建/base_province目录,使用DataX向HDFS同步数据时,需确保目标路径已存在
- 进入DataX根目录,执行任务
但是出现了以下的错误:
原因的是因为我使用的是Mysql8.x,而DataX中的Mysql驱动是5.x,解决方案就是datax里面的mysql驱动更换成合适的8.x的版本就好了:
将datax->plugins->reader->mysqlreader->libs->mysql-connector-5…的jar包换成8.XX的版本
- 在HDFS创建/base_province目录,使用DataX向HDFS同步数据时,需确保目标路径已存在
重新执行即可。
- 查看结果
- 查看HDFS文件
[song@hadoop102 datax]$ hadoop fs -cat /base_province/* | zcat
2.1.2.2、MySQLReader之QuerySQLMode
- 编写执行文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "000000",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
- 查看结果
2.2、DataX传参
通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。
为实现这一效果,就需要使用DataX传参的功能。DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"
传入参数值,具体示例如下。
- 编写配置文件
[song@hadoop102 job]$ vim base_province_date.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "000000",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province/${dt}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
-
创建目标路径: hadoop fs -mkdir /base_province/2023-02-04
-
执行命令:
python bin/datax.py -p"-Ddt=2023-02-04" job/base_province_date.json
-
查看结果
2.3、同步HDFS数据到MySQL案例
2.3.1、案例要求
同步HDFS上的/base_province目录下的数据到MySQL gmall 数据库下的test_province表。
2.3.2、需求分析
要实现该功能,需选用HDFSReader和MySQLWriter。
- 编写配置文件
创建配置文件test_province.json
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"defaultFS": "hdfs://hadoop102:8020",
"path": "/base_province",
"column": [
"*"
],
"fileType": "text",
"compress": "gzip",
"encoding": "UTF-8",
"nullFormat": "\\N",
"fieldDelimiter": "\t",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "000000",
"connection": [
{
"table": [
"test_province"
],
"jdbcUrl": "jjdbc:mysql://hadoop102:3306/gmall?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8"
}
],
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"writeMode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
test_province表结构
/*
Navicat Premium Data Transfer
Source Server : hadoop102
Source Server Type : MySQL
Source Server Version : 80026
Source Host : hadoop102:3306
Source Schema : gmall
Target Server Type : MySQL
Target Server Version : 80026
File Encoding : 65001
Date: 04/02/2023 15:13:43
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for test_province
-- ----------------------------
DROP TABLE IF EXISTS `test_province`;
CREATE TABLE `test_province` (
`id` bigint(0) NOT NULL COMMENT 'id',
`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '省名称',
`region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '大区id',
`area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '行政区位码',
`iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '国际编码',
`iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'ISO3166编码',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
-
Reader参数说明
-
Writer参数说明
-
提交任务,在MySQL中创建gmall.test_province表
-
执行如下命令,python bin/datax.py job/test_base_province.json
[song@hadoop102 datax]$ python bin/datax.py job/test_base_province.json
- 查看结果