DataX 概述、部署、数据同步运用示例

news2025/1/14 0:18:26

文章目录

      • 什么是 DataX?
      • DataX 设计框架
      • DataX 核心架构
      • DataX 部署
      • DataX 数据同步(MySQL —> HDFS)

什么是 DataX?

DataX 是阿里巴巴集团开源的、通用的数据抽取工具,广泛使用的离线数据同步工具/平台。它设计用于支持多种数据源之间的高效数据传输,可以实现不同数据源之间的数据同步、迁移、ETL(抽取、转换、加载)等数据操作。

主要特点和功能包括:

  1. 多数据源支持:DataX支持多种数据源,包括关系型数据库(如 MySQL、Oracle、SQL Server)、NoSQL 数据库(如 MongoDB、HBase、Redis)、HDFS、FTP、Hive 等。

  2. 扩展性:DataX具有良好的扩展性,可以通过编写插件来支持新的数据源或者数据目的地,以满足不同数据存储系统的需求。

  3. 灵活配置:通过配置文件,用户可以灵活地定义数据抽取任务,包括数据源、目的地、字段映射、转换规则等,以适应不同的数据处理需求。

  4. 高效传输:DataX 采用分布式、并行的数据传输策略,可以将数据以高效、快速的方式传输到目标数据源,提高数据处理的效率。

  5. 任务调度和监控:DataX 提供了任务调度和监控的功能,可以通过控制台查看任务运行情况,监控任务的健康状态,以及处理异常情况。

  6. 开源社区支持:DataX 是开源项目,拥有活跃的开源社区支持,可以获取到丰富的文档、示例和开发者社区的帮助。

在这里插入图片描述

DataX 设计框架

DataX 本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,理论上 DataX 框架可以支持任意数据源类型的数据同步工作。同时 DataX 插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

DataX 本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。

  • Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。

  • Framework:Framework 用于连接 readerwriter,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX 核心架构

  1. DataX 完成单个数据同步的作业,我们称之为 Job,DataX 接受到一个 Job 之后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task(子任务),以便于并发执行。Task 便是 DataX 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。

  3. 切分多个 Task 之后,DataX Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。

  4. 每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为 5

  5. DataX 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非 0

DataX 部署

DataX 3.0 下载地址

1. 解压压缩包

tar -zxvf datax.tar.gz -C /opt/module/

2. 配置环境变量

sudo vim /etc/profile.d/my.sh

# 添加如下内容:
#DATAX_HOME
export DATAX_HOME=/opt/module/datax

刷新环境变量:source /etc/profile.d/my.sh

3. 运行测试程序

运行 DataX 自带的测试程序。

cd $DATAX_HOME

python bin/datax.py job/job.json

其中 datax.py 是执行的主程序,job.json 中存储的是数据源插件配置(其中有数据源的连接信息、存储信息、配置信息等)。

正常运行完成后,会看到输出相关日志内容:

在这里插入图片描述

DataX 无需进行其它配置,解压后即可快速使用。

DataX 数据同步(MySQL —> HDFS)

1.创建 MySQL 测试库表

CREATE DATABASE `test_datax` CHARACTER SET 'utf8mb4';
USE `test_datax`;

-- 商品属性表
DROP TABLE IF EXISTS sku_info;
CREATE TABLE sku_info(
    `sku_id`      varchar(100) COMMENT "商品id",
    `name`        varchar(200) COMMENT "商品名称",
    `category_id` varchar(100) COMMENT "所属分类id",
    `from_date`   varchar(100) COMMENT "上架日期",
    `price`       decimal(10,2) COMMENT "商品单价"
) COMMENT "商品属性表";

insert into sku_info
values ('1', 'xiaomi 10', '1', '2020-01-01', 2000),
       ('2', '手机壳', '1', '2020-02-01', 10),
       ('3', 'apple 12', '1', '2020-03-01', 5000),
       ('4', 'xiaomi 13', '1', '2020-04-01', 6000),
       ('5', '破壁机', '2', '2020-01-01', 500),
       ('6', '洗碗机', '2', '2020-02-01', 2000),
       ('7', '热水壶', '2', '2020-03-01', 100),
       ('8', '微波炉', '2', '2020-04-01', 600),
       ('9', '自行车', '3', '2020-01-01', 1000),
       ('10', '帐篷', '3', '2020-02-01', 100),
       ('11', '烧烤架', '3', '2020-02-01', 50),
       ('12', '遮阳伞', '3', '2020-03-01', 20);

2.配置 DataX 插件

我们进入 DataX 官网,下滑找到对应组件插件的文档。

我们这里是从 MySQL 中读数据,然后存储到 HDFS 上,所以我们这里就先去查找 MySQL Reader 插件文档。

然后就会找到一个 MySQL Reader 的插件配置样例,如下所示:

{
    "job": {
        "setting": {
         	// 指定通道数量(并发)
            "speed": {
                 "channel": 3
            },
            // 允许的误差范围
            "errorLimit": { 
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
            	// 读取配置
                "reader": {
                	// 固定写法——mysqlreader
                    "name": "mysqlreader",
                    "parameter": {
                        // 账号密码
                        "username": "root",
                        "password": "root",
                        // 选取需要进行同步的字段
                        "column": [
                            "id",
                            "name"
                        ],
                        // 通过数据切片进行数据同步
                        "splitPk": "db_id",
                        
                        // 指定库表连接信息
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
                // 写入配置
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

了解 MySQL Reader 插件配置后,我们现在来学习 HDFS Writer 插件配置如何编写。

HDFS Writer 插件配置样例如下所示:

{
    "setting": {},
    "job": {
         // 指定通道数量(并发)
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                // 读取配置
                "reader": {
                    // 固定写法——txtfilereader
                    "name": "txtfilereader",
                    // 指定读取路径、编码、字段
                    "parameter": {
                        "path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "long"
                            },
                            {
                                "index": 1,
                                "type": "long"
                            },
                            {
                                "index": 2,
                                "type": "long"
                            },
                            {
                                "index": 3,
                                "type": "long"
                            },
                            {
                                "index": 4,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 5,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 6,
                                "type": "STRING"
                            },
                            {
                                "index": 7,
                                "type": "STRING"
                            },
                            {
                                "index": 8,
                                "type": "STRING"
                            },
                            {
                                "index": 9,
                                "type": "BOOLEAN"
                            },
                            {
                                "index": 10,
                                "type": "date"
                            },
                            {
                                "index": 11,
                                "type": "date"
                            }
                        ],
                        "fieldDelimiter": "\t"
                    }
                },
                // 写入配置
                "writer": {
                	// 固定写法——hdfswriter
                    "name": "hdfswriter",
                    // 指定存储路径、格式、文件前缀名、字段
                    "parameter": {
                        "defaultFS": "hdfs://xxx:port",
                        "fileType": "orc",
                        "path": "/user/hive/warehouse/writerorc.db/orcfull",
                        "fileName": "xxxx",
                        "column": [
                            {
                                "name": "col1",
                                "type": "TINYINT"
                            },
                            {
                                "name": "col2",
                                "type": "SMALLINT"
                            },
                            {
                                "name": "col3",
                                "type": "INT"
                            },
                            {
                                "name": "col4",
                                "type": "BIGINT"
                            },
                            {
                                "name": "col5",
                                "type": "FLOAT"
                            },
                            {
                                "name": "col6",
                                "type": "DOUBLE"
                            },
                            {
                                "name": "col7",
                                "type": "STRING"
                            },
                            {
                                "name": "col8",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "col9",
                                "type": "CHAR"
                            },
                            {
                                "name": "col10",
                                "type": "BOOLEAN"
                            },
                            {
                                "name": "col11",
                                "type": "date"
                            },
                            {
                                "name": "col12",
                                "type": "TIMESTAMP"
                            }
                        ],
                        // 指定存储模式
                        "writeMode": "append",
                        // 指定存储间隔符
                        "fieldDelimiter": "\t",
                        // 指定数据压缩模式
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}

3. 编写 DataX 同步 MySQL 数据到 HDFS 插件配置

cd $DATAX_HOME/job

vim test.json

添加下列内容:

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": { 
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "000000",
                        "column": [
                            "name",
                            "sku_id",
                            "category_id",
                            "from_date",
                            "price"
                        ],
                        "splitPk": "",
                        "connection": [
                            {
                                "table": [
                                    "sku_info"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://hadoop120:8020",
                        "fileType": "text",
                        "path": "/testInputPath",
                        "fileName": "sku_info",
                        "column": [
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "sku_id",
                                "type": "STRING"
                            },
                            {
                                "name": "category_id",
                                "type": "STRING"
                            },
                            {
                                "name": "from_date",
                                "type": "STRING"
                            },
                            {
                                "name": "price",
                                "type": "DOUBLE"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"gzip"
                    }
                }
            }
        ]
    }
}

上面的插件配置信息表明,将 MySQL 中 test_datax.sku_info 表的全量数据同步到 HDFS 中的 /testInputPath 路径下,如果指定输出文件类型为 text,那么必须指定压缩模式为 gzipbzip2

启动 Hadoop 集群,提前创建 HDFS 中的存储路径

hadoop fs -mkdir /testInputPath

运行 DataX,进行数据同步

cd $DATAX_HOME

python bin/datax.py job/test.json

执行完成后如下所示:

在这里插入图片描述

在 HDFS 上查看存储路径下的文件:

在这里插入图片描述

如果想要直接查看 HDFS 文件中存储的内容,可以运行下列命令:

hadoop fs -cat /testInputPath/* | zcat

在这里插入图片描述

其它数据源数据同步方式也基本上差不多,多看看官方插件配置文档就会了。

更多内容请查看:DataX 官网

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1013041.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构——查找(二叉排序树)

文章目录 前言一、二叉排序树构造二叉排序树步骤构造二叉排序树步骤图二叉排序树的查找二叉排序树查找递归算法二叉排序树查找非递归算法 二叉排序树的插入二叉排序树插入结点——递归算法二叉排序树插入结点——非递归算法 二叉排序树的删除 总结 前言 二叉排序树查找定义 二…

Qt的ui文件不能简单复制

在使用vsQt开发时,直接复制另外一个widget类的ui文件,简单改名成当前类对应的ui文件,会导致编译出错。尽可能使用添加的Qt class自带的ui文件,因为ui文件的配置文件中有许多与当前类相关的字符串,简单复制容易报错。

一年一度的中秋节马上又要到了,给你的浏览器也来点氛围感吧

说在前面 一年一度的中秋节马上又要到了,给你的浏览器也来点氛围感吧 🌕🌕🌕 插件设计 效果 首先我们应该要先确定一下我们想要实现的效果是怎样的,如上图,我们希望在页面上鼠标点击的时候会在点击区域随…

【送书活动】用“价值”的视角来看安全:《构建新型网络形态下的网络空间安全体系》

文章目录 每日一句正能量前言本书概况赠书活动目录 每日一句正能量 成功与失败,幸福与不幸,在各自心里的定义都不会相同。 前言 过去,安全从未如此复杂; 现在,安全从未如此重要; 未来,安全更需如…

Kasisto AI:金融对话人工智能

【产品介绍】​ 名称 Kasisto 成立时间​ Kasisto创立于2013年​。 具体描述 Kasisto 数字体验平台 KAI 为全渠道虚拟助理和聊天机器人提供支持,他们在移动应用程序、 网站、消息传递平台和支持语音的设备上精通银行业…

交换瓶子问题(暴力求解 + 图论解法)

交换瓶子问题 文章目录 交换瓶子问题前言题目描述暴力解法【能过】图论解法知识预备【交换环】 代码暴力做法和图论做法的对比总结 前言 知道题目用暴力算法是可以过的,注意数据范围是1~10000,卡在一个微妙的地方,不免让人想用暴力算法&…

PyTorch深度学习(一)【线性模型、梯度下降、随机梯度下降】

这个系列是实战(刘二大人讲的pytorch) 建议把代码copy下来放在编译器查看(因为很多备注在注释里面) 线性模型(Linear Model): import numpy as npimport matplotlib.pyplot as plt #绘图的包​x_data [1.0, 2.0, …

Cesium 地球网格构造

Cesium 地球网格构造 Cesium原理篇:3最长的一帧之地形(2:高度图) HeightmapTessellator 用于从高程图像创建网格。提供了一个函数 computeVertices,可以根据高程图像创建顶点数组。 该函数的参数包括高程图像、高度数据的结构、网格宽高、…

Gradle的简介、下载、安装、配置及使用流程

Gradle的简介、下载、安装、配置及使用流程 1.Gradle的简介 Gradle是一个基于Apache Ant和Apache Maven概念的项目自动化构建开源工具。它使用一种基于Groovy的特定领域语言(DSL)来声明项目设置,也增加了基于Kotlin语言的kotlin-based DSL,抛弃了基于X…

AI项目六:基于YOLOV5的CPU版本部署openvino

若该文为原创文章,转载请注明原文出处。 一、CPU版本DEMO测试 1、创建一个新的虚拟环境 conda create -n course_torch_openvino python3.8 2、激活环境 conda activate course_torch_openvino 3、安装pytorch cpu版本 pip install torch torchvision torchau…

vcruntime140_1.dll修复方法分享,教你安全靠谱的修复手段

在使用Windows操作系统的过程中,我们有时会遇到vcruntime140_1.dll文件丢失或损坏的情况。本文将详细介绍vcruntime140_1.dll的作用,以及多种解决方法和修复该文件时需要注意的问题,希望能帮助读者更好地处理这一问题。 一.vcruntime140_1.dl…

数据结构——【堆】

一、堆的相关概念 1.1、堆的概念 1、堆在逻辑上是一颗完全二叉树(类似于一颗满二叉树只缺了右下角)。 2、堆的实现利用的是数组,我们通常会利用动态数组来存放元素,这样可以快速拓容也不会很浪费空间,我们是将这颗完…

【Java】SpringData JPA快速上手,关联查询,JPQL语句书写

JPA框架 文章目录 JPA框架认识SpringData JPA使用JPA快速上手方法名称拼接自定义SQL关联查询JPQL自定义SQL语句 ​ 在我们之前编写的项目中,我们不难发现,实际上大部分的数据库交互操作,到最后都只会做一个事情,那就是把数据库中的…

电容 stm32

看到stm32电源部分都会和电容配套使用,所以对电容的作用产生了疑惑 电源 负电荷才能在导体内部自由移动,电池内部的化学能驱使着电源正电附近的电子移动向电源负极区域。 电容 将电容接上电池,电容的两端一段被抽走电子,一端蓄积…

【STL容器】vector

文章目录 前言vector1.1 vector的定义1.2 vector的迭代器1.3 vector的元素操作1.3.1 Member function1.3.2 capacity1.3.3 modify 1.4 vector的优缺点 前言 vector是STL的容器,它提供了动态数组的功能。 注:文章出现的代码并非STL库里的源码&#xff0c…

C++ PrimerPlus 复习 第三章 处理数据

第一章 命令编译链接文件 make文件 第二章 进入c 第三章 处理数据 文章目录 C变量的命名规则;C内置的整型——unsigned long、long、unsigned int、int、unsigned short、short、char、unsigned char、signed char和bool;如何知道自己计算机类型宽度获…

Jenkins Maven pom jar打包未拉取最新包解决办法,亲测可行

Jenkins Maven pom jar打包未拉取最新包解决办法,亲测可行 1. 发布新版的snapshots版本的jar包,默认Jenkins打包不拉取snapshots包2. 设置了snapshot拉取后,部分包还未更新,需要把包版本以snapshot结尾3. IDEA无法更新snapshots包…

超炫的开关效果

超炫的开关动画 代码如下 <!DOCTYPE html> <html> <head><meta charset"UTF-8"><title>Switch</title><style>/* 谁家好人 一个按钮写三百多行样式代码 &#x1f622;&#x1f622;*/*,*:after,*:before {box-sizing: bor…

代码随想录--栈与队列-用队列实现栈

使用队列实现栈的下列操作&#xff1a; push(x) -- 元素 x 入栈pop() -- 移除栈顶元素top() -- 获取栈顶元素empty() -- 返回栈是否为空 &#xff08;这里要强调是单向队列&#xff09; 用两个队列que1和que2实现队列的功能&#xff0c;que2其实完全就是一个备份的作用 impo…

产教融合 | 力软联合重庆科技学院开展低代码应用开发培训

近日&#xff0c;力软与重庆科技学院联合推出了为期两周的低代码应用开发培训课程&#xff0c;来自重庆科技学院相关专业的近百名师生参加了此次培训。 融合研学与实践&#xff0c;方能成为当代数字英才。本次培训全程采用线下模式&#xff0c;以“力软低代码平台”为软件开发…