DataX入门

news2025/1/12 23:15:07

目录

1. DataX介绍

2. DataX支持的常用数据源类型       

3. 设计理念

4. DataX框架设计

        4.1. Reader 

        4.2. Writer

        4.3. Framework

5. DataX的运行流程

6. DataX与Sqoop对比

7. 部署

8. 配置详解

9. 案例 同步MySql到HDFS

9.1. 整体结构

9.2. mySqlReader

9.2.1. 使用tableMode

9.2.2. 使用QuerySQLMode

9.3. HDFSWriter

9.3.1. 在Hive中建表的时候指定表( NULL DEFINED AS '' )

9.3.2. 修改源码 点击参考

9.4. setting

9.5. 完整配置示例

9.6. 模拟传输

9.6.1. 启动hadoop和Hive创建Hive表

9.6.2. 启动传输任务

9.6.3. 查看数据

9.7. DataX传参

10. DataX参数优化 

10.1. 速度控制

10.2. 内存调整


1. DataX介绍

        DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQLOracle)HDFSHiveODPSHBaseFTP等各种异构数据源之间稳定高效的数据同步功能               

        源码地址:    点击进入  

        组件地址:    点击下载

2. DataX支持的常用数据源类型       

类型数据源
关系型数据库MySql
Oracle
SQLServer
PostgreSQL
NoSql数据存储HBase 0.94 / 1.1
Phoenix 4.x / 5.x
MongoDB
Hive
无结构化数据存储TxtFile
FTP
HDFS
ElasticSearch     支持读不支持写

3. 设计理念

        为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

         对于使用者,只需要学习DataX的数据源配置方式就可以将数据源里面的数据进行传输

4. DataX框架设计

        DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中

        4.1. Reader 

                数据采集模块,负责采集数据源的数据,将数据发送给Framework

        4.2. Writer

                数据写入模块,负责不断向Framework取数据, 并将数据写入到目的端

        4.3. Framework

                用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,控流,并发,数据转换等核心技术问题

5. DataX的运行流程

        DataX采集全量数据的原理是通过sql查询的方式获取数据,查询语句会被切割,例如 通过时间查询,按照时间维度将数据切分成多个Task, DataX在传输数据的时候将启动TaskGroup,每个TaskGroup负责一定的并发度运行其所得的Task,单个TaskGroup的并发的固定为5,总TaskGroup数量与配置的总并发度有关:  TaskGroup数量 = 总并发度 / 5 

6. DataX与Sqoop对比

功能DataXSqoop
运行模式单进程多线程MR
分布式不支持,可以通过调度系统规避支持
控流需要定制开发
统计信息已有一些统计,上报需要定制没有, 分布式数据数据不方便
数据校验在core部分与校验功能没有, 分布式收集数据不方便
监控需要定制需要定制

7. 部署

下载DataX安装包到服务器解压并测试运行

python <datax_home>/bin/datax.py <datax_home>/job/job.json

8. 配置详解

        可以使用如下命名查看DataX配置文件模板

python bin/datax.py -r mysqlreader -w hdfswriter

        配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地

Reader和Writer的具体参数可参考官方文档,点击查看

9. 案例 同步MySql到HDFS

9.1. 整体结构

{
	"job": {
		"content": [{
			"reader": {},
			"writer": {}
		}],
		"setting": {
			"speed": {
				"channel": 1
			}
		}
	}
}

9.2. mySqlReader

9.2.1. 使用tableMode

{
	"name": "mysqlreader",                               //Reader名称 , 固定写法
	"parameter": {
		"username": "root",                              //数据库用户密码
		"password": "123456",
		"connection": [{
			"jdbcUrl": ["jdbc:mysql://node1:3306/gmall"],//数据库jdbc url
			"table": ["tb"]                   //数据库jdbc url
		}],
		"column": ["id", "name", "age"],                 //同步的字段 ["*"]表示所有字段
		"where": "id>=0",                                //while过滤条件
		"splitPk": ""                                    //分片字段,如果没有这个字段,或者值为空,则只有一个Task
	}
}

9.2.2. 使用QuerySQLMode

{
	"name": "mysqlreader",                               //Reader名称 , 固定写法
	"parameter": {
		"username": "root",                              //数据库用户密码
		"password": "123456",
		"connection": [{
			"jdbcUrl": ["jdbc:mysql://node1:3306/gmall"],//数据库jdbc url
			"table": ["select * from tb"]                //数据库jdbc url
		}]
	}
}

9.3. HDFSWriter

{
	"name": "hdfswriter",                    //Writer名称 , 固定写法
	"parameter": {
		"column": [{                         //列信息,包括列名和类型 类型为Hive表字段类型,目前不支持decimal,binary,arrays,maps,struicts
			"name": "id",
			"type": "bigint"
		}, {
			"name": "name",
			"type": "string"
		}, {
			"name": "age",
			"type": "bigint"
		}],
		"defaultFS": "hdfs://node2:8020",    //HDFS文件系统namenode节点地址,不支持传HA的集群名
		"path": "/mydatax",                  //HDFS文件系统目标路径
		"fileName": "tb",                    //HDFS文件名前缀
		"fileType": "text",                  //HDFS文件类型
		"compress": "gzip",                  //HDFS压缩类型 text文件支持压缩gzip bzip2;  orc文件支持压缩NONE SNAPPY
		"fieldDelimiter": "\t",              //HDFS的分隔符
		"writeMode": "append"                //数据写入的模式 append 追加 ; nonConflict: 若写入目录有同名(前缀相同文件),报错
	}
}

注意:

        HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(''),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。

解决方法有两种,任意一种都可以

9.3.1. 在Hive中建表的时候指定表( NULL DEFINED AS '' )

9.3.2. 修改源码 点击参考

9.4. setting

{
	"speed": {             //传输速度配置
		"channel": 1       //并发数
	},
	"errorLimit": {        // 容错比例配置
		"record": 1,       //错误条数上限,超出则任务失败
		"percentage": 0.02 //错误比例上限,超出则任务失败
	}
}

9.5. 完整配置示例

{
	"job": {
		"content": [{
			"reader": {
				"name": "mysqlreader",
				"parameter": {
					"username": "root",
					"password": "123456",
					"connection": [{
						"jdbcUrl": ["jdbc:mysql://node1:3306/yangxp"],
						"table": ["tb"]
					}],
					"column": ["id", "name", "age"],
					"where": "id>=0",
					"splitPk": ""
				}
			},
			"writer": {
				"name": "hdfswriter",
				"parameter": {
					"column": [{
						"name": "id",
						"type": "bigint"
					}, {
						"name": "name",
						"type": "string"
					}, {
						"name": "age",
						"type": "bigint"
					}],
					"defaultFS": "hdfs://node2:8020",
					"path": "/mydatax",
					"fileName": "tb",
					"fileType": "text",
					"compress": "gzip",
					"fieldDelimiter": "\t",
					"writeMode": "append"
				}
			}
		}],
		"setting": {
			"speed": {
				"channel": 1
			},
			"errorLimit": {
				"record": 1,
				"percentage": 0.02
			}
		}
	}
}

9.6. 模拟传输

9.6.1. 启动hadoop和Hive创建Hive表

DROP TABLE IF EXISTS tb;
CREATE EXTERNAL TABLE tb
(
    `id`         STRING COMMENT '编号',
    `name`       STRING COMMENT '姓名',
    `age`  STRING COMMENT '年龄'
) COMMENT '年龄表'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    NULL DEFINED AS ''
    LOCATION '/mydatax/';

9.6.2. 启动传输任务

将配置内容写入mysql_to_hive.json配置文件到目录<datax_home>/job/下并启动任务

python <datax_home>/bin/datax.py <datax_home>/job/mysql_to_hive.json

9.6.3. 查看数据

数据被压缩后上传到Hive,直接在看会乱码

 可以借助hive客户端查看表数据,或者使用hdfs的zcat查看

hdfs dfs -cat /mydatax/tb__af7eac41_a69e_4976_bb68_a98cd5d3a689.gz|zcat

9.7. DataX传参

        通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。

        DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。

{
    "name": "hdfswriter",
    "parameter": {
        ...
        "path": "/mydatax/${dt}",
        ...
    }
}
 python <datax_home>/bin/datax.py -p"-Ddt=2023-03-04" <datax_home>/job/mysql_to_hdfs.json

10. DataX参数优化 

10.1. 速度控制

        DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。

参数说明
job.setting.speed.channel

并发数

job.setting.speed.record

总record限速

job.setting.speed.byte

总byte限速

core.transport.channel.speed.record

单个channel的record限速,默认为10000(10000条/s)

core.transport.channel.speed.byte

单个channel的byte限速,默认值1024*1024(1M/s)

注意事项:

  1.         1.若配置了总record限速,则必须配置单个channel的record限速
  2.         2.若配置了总byte限速,则必须配置单个channe的byte限速
  3.         3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:

计算公式为:

        min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)

配置示例:

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576 //单个channel byte限速1M/s
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte" : 5242880 //总byte限速5M/s
            }
        },
        ...
    }
}

10.2. 内存调整

        当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。

        建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。

        调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:

python <datax_home>/bin/datax.py --jvm="-Xms8G -Xmx8G" <datax_home>/job/mysql_to_hdfs..json

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/388080.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言函数:字符串函数及模拟实现strncpy()、strncat()、strncmp()

C语言函数&#xff1a;字符串函数及模拟实现strncpy()、strncat()、strncmp() 在了解strncpy、strncat()、前&#xff0c;需要先了解strcpy()、strncat()&#xff1a; C语言函数&#xff1a;字符串函数及模拟实现strlen() 、strcpy()、 strcat()_srhqwe的博客-CSDN博客 strncp…

包装类详解(装箱,拆箱)

Object 引用可以指向任意类型的对象&#xff0c;但有例外出现了&#xff0c;8 种基本数据类型不是对象&#xff0c;那岂不是刚才的泛型机制要失效了&#xff1f;&#xff08;泛型详解._阿瞒有我良计15的博客-CSDN博客&#xff09; 实际上也确实如此&#xff0c;为了解决这个问题…

数据结构与算法-二叉树-序列化与反序列化

可能性探究:我们可以想到的是只有一棵树按照某个序是唯一确定要给结构的情况才可能被序列化和反序列化&#xff0c;比如我们对于以下的二叉树可以找到它的先序、中序、后序如下&#xff1a; 根据它的先序和后序我们找不到任何其他可能的树&#xff0c;所以可以根据先序和后序去…

NLP预训练模型

Models Corpus RoBERTa: A Robustly Optimized BERT Pretraining Approach 与BERT主要区别在于&#xff1a; large mini-batches 保持总训练tokens数一致&#xff0c;使用更大的学习率、更大的batch size&#xff0c;adam β20.98\beta_20.98β2​0.98&#xff1b;dynamic ma…

aws eks 集群初始化过程中pause容器的启动逻辑

eks集群默认策略在磁盘使用量达到threshold时会清除镜像&#xff0c;其中pause镜像也可能会被清除 https://aws.amazon.com/cn/premiumsupport/knowledge-center/eks-worker-nodes-image-cache/ pause容器能够为pod创建初始的名称空间&#xff0c;pod的内的容器共享其中的网络空…

库函数qsort 的模拟实现

在之前了解了库函数qsort的使用之后 我们来模拟实现一下上篇有介绍 qsort的底层实现是快速排序 由于害怕没有人了解过快速排序 我就用大家熟知的冒泡排序进行模拟实现 先来展示完整代码 以下代码为升序排序 如果降序将冒泡排序中的大于号改为小于号就可以了#define _CRT_SECURE…

malloc实现原理探究

2021年末面试蔚来汽车&#xff0c;面试官考察了malloc/free的实现机制。当时看过相关的文章&#xff0c;有一点印象&#xff0c;稍微说了一点东西&#xff0c;不过自己感到不满意。今天尝试研究malloc的实现细节&#xff0c;看了几篇博文&#xff0c;发现众说纷纭&#xff0c;且…

MySql启动错误(Mac系统 安装 mysql-8.0.32-macos13-arm64 后每次点击启动 无法启动) --- 已解决

MySql启动的时候: 立即变红! 查看日志如下: 2023-03-04T14:18:01.089671Z 0 [System] [MY-010910] [Server] /usr/local/mysql/bin/mysqld: Shutdown complete (mysqld 8.0.32) MySQL Community Server - GPL. 2023-03-04T14:18:10.304169Z 0 [System] [MY-010116] [Server]…

【EDA工具使用】——VCS和Verdi的联合仿真的简单使用

目录 1.芯片开发所需的工具环境 2.编译仿真工具 3.三步式混合编译仿真&#xff08;最常用&#xff09;​编辑 4.两步式混合编译仿真​编辑 5.VCS的使用 ​6.verdi的使用 1.产生fsdb文件的两种方法​编辑 1.芯片开发所需的工具环境 2.编译仿真工具 3.三步式混合编译仿真…

按位与为零的三元组[掩码+异或的作用]

掩码异或的作用前言一、按位与为零的三元组二、统计分组1、map统计分组2、异或掩码总结参考资料前言 当a b 0时&#xff0c;我们能够很清楚的知道b是个什么值&#xff0c;b 0 - a -a&#xff0c;如果当a & b 0时&#xff0c;我们能够很清楚的知道b是什么值吗&#xf…

Python GUI界面编程-初识

图形用户界面(Graphical User Interface&#xff0c;简称 GUI&#xff0c;又称图形用户接口)是指采用图形方式显示的计算机操作用户界面。与早期计算机使用的命令行界面相比&#xff0c;图形界面对于用户来说在视觉上更易于接受。然而这界面若要通过在显示屏的特定位置&#xf…

HiveSQL一天一个小技巧:如何精准计算非连续日期累计值【闪电快车面试题】

0 需 求稀疏字段累计求和问题1 问题分析根据图片中数据变换的形式&#xff0c;可以看出是根据字段term补齐数据中缺失的日期&#xff0c;term为连续日期的个数&#xff0c;当为12时&#xff0c;表明由2018-12-21到2019-01-02连续日期个数为12&#xff0c;当补齐日期后&#xff…

sklearn中的逻辑回归

目录 一.名为“回归”的分类器 二.逻辑回归的优点 三.sklearn中的逻辑回归 四.linear_model.LogisticRegression 五.penalty & C(正则化) 六.逻辑回归中的特征工程 1.业务选择 2.PCA和SVD一般不用 3.统计方法可以使用&#xff0c;但不是非常必要 4.高效的嵌入法e…

【vulhub漏洞复现】CVE-2013-4547 Nginx 文件名逻辑漏洞

一、漏洞详情影响版本 Nginx 0.8.41 ~ 1.4.3 / 1.5.0 ~ 1.5.7通过%00截断绕过后缀名的限制&#xff0c;使上传的php内容文件被解析执行。当Nginx得到一个用户请求时&#xff0c;首先对url进行解析&#xff0c;进行正则匹配&#xff0c;如果匹配到以.php后缀结尾的文件名&#x…

2022秋-2023-中科大-数字图像分析-期末考试试卷回忆版

今天晚上刚考完&#xff0c;心累&#xff0c;在这里继续授人以渔(仅供参考&#xff0c;切勿对着复习不看ppt&#xff0c;ppt一定要过两遍)。 注意:往年的经验贴&#xff0c;到此为止&#xff0c;全部作废&#xff0c;一个没考。千万不要只对着复习&#xff0c;SIFT没考&#x…

JavaScript(JS)

一、三种引入方式&#xff1a; 1、内部js 通过script标签嵌入到html里面 <script>alert(hello);</script> 2、外部js 写成一个单独的.js文件&#xff0c;让html引入进来 <script src"app.js"></script> 3、行内js 直接写到html内部 &…

Python爬虫——使用socket模块进行图片下载

Python爬虫——使用socket模块进行图片下载什么是socket爬虫的工作流程socket爬取图片为什么能用socket能下载图片socket下载图片和request下载图片的区别使用socket下载一张图片使用socket下载多张图片方法1方法2什么是socket Socket 是一种通信机制&#xff0c;用于实现网络…

AQS为什么用双向链表?

首先&#xff0c;在AQS中&#xff0c;等待队列是通过Node类来表示的&#xff0c;每个Node节点包含了等待线程的信息以及等待状态。下面是Node类的部分源码&#xff1a;static final class Node {// 等待状态volatile int waitStatus;// 前驱节点volatile Node prev;// 后继节点…

【AI绘图学习笔记】深度学习相关数学原理总结(持续更新)

如题&#xff0c;这是一篇深度学习相关数学原理总结文&#xff0c;由于深度学习中涉及到较多的概率论知识&#xff08;包括随机过程&#xff0c;信息论&#xff0c;概率与统计啥啥啥的)&#xff0c;而笔者概率知识储备属实不行&#xff0c;因此特意开一章来总结&#xff08;大部…

Jackson CVE-2017-17485 反序列化漏洞

0x00 前言 同CVE-2017-15095一样&#xff0c;是CVE-2017-7525黑名单绕过的漏洞&#xff0c;主要还是看一下绕过的调用链利用方式。 可以先看&#xff1a; Jackson 反序列化漏洞原理 或者直接看总结也可以&#xff1a; Jackson总结 涉及版本&#xff1a;2.8.10和2.9.x至2.…