datax工具介绍及简单使用

news2025/1/13 13:53:34

介绍

Datax是一个异构数据源离线同步工具,致力于实现包括关系数据库、HDFS、Hive、ODPS、Hbase等各种异构数据源之间稳定高效的数据同步功能

  • 设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步;
DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已 持续稳定运行了7年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB;

  • 框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中,软件核心功能写入Framework 主体框架中。
在这里插入图片描述

主体框架为插件预留接口,如果后期需要什么新功能,可以再去开发插件实现,而主体框架无需改动。

  • 3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业 生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系
DataX作业生命周期的时序图
在这里插入图片描述

模块关系
DataX完成单个数据同步的作业,我们称之为Job,DataX接收到一个Job之后,将启 动一个进程来完成整个作业同步过程
DataX Job模块是单个作业的中枢管理节点, 承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理 等功能
DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行
Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作;
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)
每一个TaskGroup负责并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5(可配置) ;
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader— >Channel — >Writer的线程来完成任务同步工作
datax作业运行起来之后,job监控并等待多个taskgroup模块任务完成,完成之后job成功退出;否则,异常退出,进程退出值非0。

  • 调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个 100张分表的mysql数据同步到odps里面;
调度决策思路:
1)DataXJob根据分库分表切分成了100个Task;
2)根据20个并发,DataX计算共需要分配4个TaskGroup;
3)4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运 行25个Task;
4)计算
100个Task,20个并发就是20个Channel;4(20 / 5 = 4) 个Taskgroup,每个Taskgroup内的25(100 / 4 = 25)个Task,能够同时运行的任务是20个Channel等于20个并发。

环境需求与安装

1)环境需求
Linux
JDK(1.8以上,推荐1.8)
Python(2或3都可以)

2)安装工具
下载后解压至本地某个目录,进入bin目录,即可运行同步作业
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
自检脚本
$ python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
如果报错
运行以下指令,再输入自检脚本指令
rm -rf {YOUR_DATAX_HOME}/plugin//._

具体就不详细介绍了。

入门案例

  • Stream -> Stream
    目的:打印输出10行 hello,世界
    步骤:
    1)打开Datax工具目录
    cd {YOUR_DATAX_HOME}

2)获取模板文件

python bin/datax.py -r streamreader -w streamwriter

3)修改

vim  job/stream2Stream.json

参数

column表示列

type表示该列的类型

value表示该列的值

column里面可以写多个列

sliceRecordCount:表示要打印多少次

encodding设置字符编码格式

print表示是否打印到控制台

setting

speed表示控制并发数

channel设置并发的数量

如果设置的print为true,则会打印slicRecordCount*channel次
如果是从mysql导入hdfs等其他操作,则会是真正代表并发数,而不是打印多少次

内容

{"job": {
"content": [{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{type:"long",value:"1024"},{
type:"string",value:"hello,世界"}],
"sliceRecordCount": "10"}},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "utf-8",
"print": true
}}}],
"setting": 
{"speed": {"channel": "2"}}}}

4)运行

python bin/datax.py  job/stream2Stream.json
  • MySQL -> MySQL
    目的:MySQL数据库的t_emp表数据导入到另一个MySQL数据库的表中

步骤:获得模板文件
python bin/datax.py -r mysqlreader -w mysqlwriter

修改如下
vim job/mysql2Mysql.json

内容

{"job": {
"content": [{
"reader": {
"name": "mysqlreader","parameter": {
"column": ["*"],"splitPk": "emp_id",
"connection": [{"jdbcUrl": [jdbc:mysql://localhost:3306/demo?
useUnicode=true&characterEncoding=utf8"],"table": ["t_emp"]}]
"username": "root","password": "123456",}},
"writer": {
"name": "mysqlwriter","parameter": {
"column": ["*"],"connection": [{
"jdbcUrl": "jdbc:mysql://localhost:3306/democlone? useUnicode=true&characterEncoding=utf8","table": ["t_emp"]}],
"preSql": ["truncate t_emp"],
"session": ["set session sql_mode='ANSI'"],
"username": "root","password": "123456",
"writeMode": "insert"}}}],
"setting": {"speed": {"channel": "2"}}}}
//mysql端提前将表建好
//参数
//session sql_mode 数据校验模式

运行

python bin/datax.py job/mysql2Mysql.json
  • MySQL -> HDFS

目的:MySQL数据库中的t_emp表导入到HDFS中的/datax/mysql2hdfs/
步骤:
获得模板文件
修改如下
内容

{"job": {"content": [{
"reader": {"name": "mysqlreader","parameter": {
"column": [
"emp_id",
"dept_id",
"emp_name",
"emp_gender",
"emp_in_time"],
"connection": [{"jdbcUrl":["jdbc:mysql://localhost:3306/demo"],
"table": ["t_emp"]}],
"password": "123456","username": "root",
"where": ""}},
"writer": {"name": "hdfswriter",
"parameter": {"column": [{
type:"int",name:"emp_id"},{
type:"int",name:"dept_id"},{
type:"string",name:"emp_name"},{
type:"string",name:"emp_gender"},{
type:"date",name:"emp_in_time"}],
"compress":"",
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "mysqlToHdfs",
"fileType": "orc",
"path": "/datax/mysql2hdfs",
"writeMode": "append"}}}],
"setting": {"speed": {"channel": "2"}}}}

运行

python bin/datax.py job/mysql2Hdfs.json
  • MySQL -> Hive
    目的:MySQL数据库中的t_emp表导入到Hive数据仓库的orc_emp表
    步骤:类似上面的步骤

  • Hive -> MySQL

增量导入

目的:解决全量同步数据量较大时,同步被中断的问题
使用 DataX 进行全量同步和增量同步的唯一区别
增量同步需要使用 where 进 行条件筛选
关键参数
使用where参数或者querySql参数进行增量导入
步骤
修改
vim job/mysql2HiveIncrement.json
内容

{"job": {"content": [{
"reader": {"name": "mysqlreader","parameter": {
"column": [
"emp_id",
"dept_id",
"emp_name",
"emp_gender",
"emp_in_time"],
"connection": [{"jdbcUrl":["jdbc:mysql://localhost:3306/demo"],
"querySql":["select * from t_emp_from_hive
where emp_id > ${id}"]}],
"password": "123456","username": "root"}},
"writer": {"name": "hivewriter","parameter": {
"column": [{
type:"int",name:"emp_id"},{
type:"string",name:"emp_name"},{
type:"string",name:"emp_gender"},{
type:"date",name:"emp_in_time"}],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "mysqlToHiveTEmp",
"fileType": "orc",
"path": "/user/hive/warehouse/demo.db/orc_emp", "writeMode": "append"}}}],
"setting": {"speed": {"channel": "3"}}}}

运行
赋值格式
-p “-DpropertyName=propertyValue -D…”
命令

$ python bin/datax.py job/mysql2HiveIncrement.json -p "-Did=20 -D"
$ python bin/datax.py job/mysql2HiveIncrement.json -p "-DstartId=0 -DendId=3"

Java执行Datax脚本

项目配置
环境
Windows 10+
安装 JDK1.8+、开发工具
安装 Python2.7.5+
下载解压datax.tar.gz
Maven本地仓库配置
解压Datax工具压缩包,提取两个jar文件 -core -common
安装两个依赖到Maven本地库

创建Maven项目
pom.xml文件中添加依赖

	<dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-core</artifactId>
            <version>0.0.1</version>
        </dependency>
        <dependency>
        <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>0.0.1</version>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.13</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
public class JobMysql2Mysql {

static Logger log = LoggerFactory.getLogger(Stream2Stream.class);

public static void main(String[] args) {
int exitCode = 0;
String dataXPath = "D:/MyProgramFiles/BigData/datax/";  String jobFilePath = dataXPath + "job/mysql2Mysql.json"; System.setProperty("datax.home", dataXPath);
String[] dataXArgs = {
"-job",  jobFilePath,
"-mode", "standalone",
"-jobid", "-1"
};
try {
Engine.entry(dataXArgs);
} catch(Throwable throwable) {
exitCode = -1;
log.error("\n\n经DataX智能分析,该任务最可能的错误原因::\n"
+ ExceptionTracker.trace(throwable));
}
System.exit(exitCode);
}
}
MySQL 增量导入
//目的
//迭代的方式将MySQL数据表中数据导入到另一个MySQL数据库表中
//使用Java程序迭代并传递参数的形式,完成MySQL数据库之间的数据同步

public class JobMysql2MysqlIncrement {
static Logger log = LoggerFactory.getLogger(Stream2Stream.class);

public static void main(String[] args) {
int exitCode = 0;
String dataXPath = "D:/MyProgramFiles/BigData/datax/";
String jobFilePath = dataXPath + "job/mysql2MysqlIncrement.json"; System.setProperty("datax.home", dataXPath);
String[] dataXArgs = {
"-job",  jobFilePath,
"-mode", "standalone",
"-jobid", "-1"
};
try {
// 数据表中总行数,可以使用JDBC获取
int rowTotal = 5;
// 迭代增量的形式,分批次导入数据
for(int i = 0; i < rowTotal; i = i + 3) {
System.setProperty("offset", String.valueOf(i));
System.setProperty("rows", "3");
Engine.entry(dataXArgs);
System.out.println("------------Job End------------");
}
} catch(Throwable throwable) {
exitCode = -1;
log.error("\n\n经DataX智能分析,该任务最可能的错误原因::\n" + ExceptionTracker.trace(throwable));
}
System.exit(exitCode);
}
}

注意
配置文件中需要写查询语句limit限制导入的数据量,java脚本中的参数才能传入,最后才能正常分批次运行

配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://192.168.2.102:3306/goods_sources?useUnicode=true&characterEncoding=utf8"],
                                "querySql":["select * from s_order limit ${offset},${rows}"]
                            }
                        ],
                        "username": "root",
                        "password": "123456"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.2.102:3306/goods_targets?useUnicode=true&characterEncoding=utf8",    
                         "table": ["t_order"]
                            }
                        ],
                        "password": "123456",
                        "session": ["set session sql_mode='ANSI'"],
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}

补充

	DataX和sqoop的比较
		
	对于脏数据的处理
		在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,提供多种的脏数据处理模式
		Job支持用户对于脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record值)或者脏数据占比阈值(percentage值),当Job传输过程出现的脏数据大于用户指定的数量/百分比,DataX Job报错退出
		图中的配置的意思是当脏数据大于10条,或者脏数据比例达到0.05%,任务就会报错
	健壮的容错机制
		DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性
	丰富的数据转换功能
		DataX作为一个服务于大数据的ETL(Extract-Transform-Load 的缩写,即数据抽取、转换、装载的过程)工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/992868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java转网络安全渗透测试,挖漏洞真香啊

最近&#xff0c;一个做运维的朋友在学渗透测试。他说&#xff0c;他公司请别人做渗透测试的费用是 2w/人天&#xff0c;一共2周。2周 10w 的收入&#xff0c;好香~ 于是&#xff0c;我也对渗透测试产生了兴趣。开始了探索之路~ 什么是渗透测试 渗透测试这名字听起来有一种…

Java实现自动玩王铲铲的致富之路小程序游戏

文章目录 前言1.调用adb命令截屏并读取到电脑2.打开游戏&#xff0c;提前截几个图&#xff0c;准备好相应的按钮位置颜色3.根据图片路径和x,y坐标&#xff0c;读取图片相应位置的颜色值4.根据颜色值判断如何进行触摸操作5.程序效果分析5.存在的问题6.改进思路7.改进版本&#x…

vue3中通过ref获取子组件实例,取值为undefined

也就是Vue3如何通过 ref 获取子组件实例(子组件中的DOM结构、数据、及方法)&#xff0c;今天写index.vue(父组件&#xff09;时想获取子组件的数据和方法&#xff0c;通过给子组件绑定ref&#xff0c;打印子组件的数据为undefined&#xff1b;百度搜索常用方法为&#xff1a; …

小白参加红队,需要做好哪些准备?

在本文中&#xff0c;我们将为读者介绍要想加入红队&#xff0c;需要掌握哪些方面的技能。 CSDN大礼包&#xff1a;《黑客&网络安全入门&进阶学习资源包》免费分享 护网的定义是以国家组织组织事业单位、国企单位、名企单位等开展攻防两方的网络安全演习。进攻方一个…

【多线程】Thread 类 详解

Thread 类 详解 一. 创建线程1. 继承 Thread 类2. 实现 Runnable 接口3. 其他变形4. 多线程的优势-增加运行速度 二. Thread 类1. 构造方法2. 常见属性3. 启动线程-start()4. 中断线程-interrupt()5. 线程等待-join()6. 线程休眠-sleep()7. 获取当前线程引用 三. 线程的状态1. …

redis主从复制、哨兵、集群模式

redis群集有三种模式 redis群集有三种模式&#xff0c;分别是主从同步/复制、哨兵模式、Cluster&#xff0c;下面会讲解一下三种模式的工作方式&#xff0c;以及如何搭建cluster群集 ●主从复制&#xff1a;主从复制是高可用Redis的基础&#xff0c;哨兵和集群都是在主从复制…

Shell命令管理进程

Shell命令管理进程 列出进程 ps命令 top命令 管理后台进程 启动后台进程 查看后台进程 jobs和ps的区别 停止进程 Linux除了是一种多用户操作系统之外&#xff0c;还是一种多任务系统。多任务意味着可以同时运行多个程序。Linux 提供了相关的工具来列出运行中的进程,监视…

24、DAPlink仿真器-STM32F103C8T6

参考文章&#xff1a; A、https://oshwhub.com/nice0513/daplink-fang-zhen-qi B、https://oshwhub.com/Southerly/daplink-fang-zhen-qi-swd C、https://oshwhub.com/jixin002/stm32f103c8t6_cmsis-dap 串口烧录Hex文件 问题&#xff1a;不支持U盘拖拽&#xff0c;没有识别出U…

Java使用本地浏览器打开网页工具类分享

本文主要分享一个封装工具类&#xff0c;该工具类已实现查找本地可运行的浏览器打开网页。 package com;import java.lang.reflect.Method;/*** browse util** author Roc-xb*/ public class BrowseUtil {public static final String[] BROWSERS {"firefox", "…

UDP协议和报文格式,校验和,CRC的含义

&#x1f496;&#x1f496;&#x1f496;每日一看&#xff0c;学习动力 一、UDP协议及其报文格式 UDP&#xff1a;特点&#xff1a;无连接&#xff0c;不可靠传输 报头里面有啥呢&#xff1f; 那么首先我要先提问一下&#xff1f;2个字节&#xff0c;可以表示的数据范围有多大…

【Sentinel Go】新手指南、流量控制、熔断降级和并发隔离控制

随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件&#xff0c;主要以流量为切入点&#xff0c;从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开…

C语言入门Day_17 循环的控制

目录 前言 1.break 2.continue 3.易错点 4.思维导图 前言 我们知道当循环判断的边界条件不成立以后&#xff0c;循环就结束了。除此以外&#xff0c;我们如果想要提前结束循环&#xff0c;或者在循环中跳过某一次循环代码的执行&#xff0c;应该怎么做呢&#xff1f; 假如…

数据治理-数据架构-企业数据架构

是什么 数据架构定义了对组织非常重要元素的标准术语和设计。企业数据架构的设计中包括业务数据描述&#xff0c;如数据的收集、存储、整合、移动和分布。 当数据在组织中通过源或者接口流动时&#xff0c;需要安全、集成、存储、记录、分类、共享的报表和分析&#xff0c;最终…

3D目标检测数据集 KITTI(标签格式解析、点云转图像、点云转BEV)

本文介绍在3D目标检测中&#xff0c;理解和使用KITTI 数据集&#xff0c;包括KITTI 的基本情况、下载数据集、标签格式解析、点云转图像、点云转BEV。 目录 1、KITTI数据集中3D框可视化的效果 2、先看个视频&#xff0c;了解KITTI 的基本情况 3、来到KITTI官网&#xff0c;下…

C++ 11:多线程相关问题

目录 一. 线程类thread 1.1 thread的一些接口函数 2.2 通过thread创建多线程 二. this_thread 三. 互斥锁与原子操作 3.1 多线程中的加锁与解锁 3.1.1 mutex类 3.1.2 lock_guard 类 3.3 原子性操作 四. 条件变量 4.1 线程互斥的缺陷 4.2 condition_variable 实现线程…

图片mask任务和自监督损失函数MAE、Beit、MarkFeature、DINO、DINOv2

MAE (Masked Autoencoders Are Scalable Vision Learners) 来自Masked Autoencoders Are Scalable Vision Learners&#xff0c;Our loss function computes the mean squared error (MSE) between the reconstructed and original images in the pixel space. 几个关键点&…

无涯教程-JavaScript - IMSUB函数

描述 IMSUB函数以x yi或x yj文本格式返回两个复数的差。减去复数时,实数和虚数系数分别相减,即从复数a bi中减去复数c di的方程为- (a bi)-(c in)(a-c)(b-d)我 语法 IMSUB (inumber1, inumber2)争论 Argument描述Required/OptionalInumber1The complex number from …

【C++】可变参数模板

2023年9月9日&#xff0c;周六下午 这个还是挺难学的&#xff0c;我学了好几天... 在这里我会举大量的示例程序&#xff0c;这样可以有一个更好的理解&#xff0c; 不定期更新。 目录 推荐文章&#xff1a; 示例程序一&#xff1a;拼接字符串 示例程序二&#xff1a;求整…

Python散点图

散点图 散点图是指在回归分析中&#xff0c;数据点在直角坐标系平面上的分布图&#xff0c;散点图表示因变量随自变量而变化的大致趋势&#xff0c;据此可以选择合适的函数对数据点进行拟合。用两组数据构成多个坐标点&#xff0c;考察坐标点的分布&#xff0c;判断两变量之间…

基于Java+SpringBoot+Vue前后端分离农产品直卖平台设计和实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…