DataX详解和架构介绍

news2025/1/12 7:53:02

系列文章目录

一、 DataX详解和架构介绍
二、 DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel


文章目录

  • 系列文章目录
  • DataX是什么?
  • DataX支持的数据源
  • DataX的框架设计
  • DataX核心架构
      • 核心模块介绍:
      • DataX调度流程:
  • DataX部署和配置


DataX是什么?

DataX是阿里开源的异构数据源离线同步工具。它致力于实现包括关系型数据库(如MySQL、Oracle等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
在这里插入图片描述

DataX的设计理念是将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源时,只需要将此数据源对接到DataX,便能与已有的数据源实现无缝数据同步。

DataX的架构主要基于Framework + Plugin的设计模式。它将数据读取和写入抽象成为Reader和Writer插件,这些插件可以接入不同的数据源,实现数据的读取和写入操作。同时,DataX提供了丰富的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。

DataX的核心优势包括稳定性、高效性、易用性和扩展性。它经过长时间大规模生产环境的验证,能够保证数据同步的稳定性和可靠性;通过多线程、多进程、流式处理等技术手段,实现高效的数据同步;提供简单易用的配置方式,用户可以通过配置文件来定义数据源、目标端、同步策略等;支持丰富的插件体系,可以方便地扩展新的数据源和目标端。

此外,DataX还提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在库可以承受的范围内达到最佳的同步速度。同时,它还具有强劲的同步性能、健壮的容错机制以及极简的使用体验等特点。

总之,DataX是一个强大而灵活的数据同步工具,能够有效地解决异构数据源之间的数据同步问题。通过合理的配置和优化,它可以帮助用户实现高效、稳定、可靠的数据同步操作。


DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入 。DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。

DataX的框架设计

datax_framework_new
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX核心架构

DataX 3.0采用微内核架构模式, 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
datax_arch

核心模块介绍:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

DataX调度流程:

DataX的调度流程可以分为以下几个步骤:

  • Job切分:首先,DataX的Job模块会根据分库分表策略将Job切分成若干个小的Task。这是为了确保每个Task可以独立执行,并且可以并发执行以提高效率。
  • 并发数与TaskGroup计算:然后,根据用户配置的并发数,DataX会计算需要分配多少个TaskGroup。计算的方式是将总的Task数量除以每个TaskGroup中的Task数量(通常为5),从而得到TaskGroup的数量。
  • TaskGroup分配与启动:接下来,DataX会根据计算出的TaskGroup数量,将Task分配到各个TaskGroup中。每个TaskGroup会启动多个TaskExecutor来执行具体的Task。
  • TaskExecutor启动:当TaskGroup启动后,其中的TaskExecutor会启动ReaderThread和WriterThread。ReaderThread负责从数据源读取数据,WriterThread负责将数据写入目标端。这两个线程协同工作,实现了数据的读取、转换和写入过程。
  • 数据同步:在每个TaskExecutor中,ReaderThread和WriterThread会不断地从数据源读取数据,并将数据写入目标端,直到所有的数据都同步完成。
    整个调度流程依赖于Java底层线程池进行并发控制,DataX通过合理的调度策略和线程管理机制,实现了高效、稳定、可靠的数据同步。

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

DataX部署和配置

  • 工具部署

    • 方法一、直接下载DataX工具包:DataX下载地址

      下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

      $ cd  {YOUR_DATAX_HOME}/bin
      $ python datax.py {YOUR_JOB.json}
      

      自检脚本:
      python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

    • 方法二、下载DataX源码,自己编译:DataX源码

      (1)、下载DataX源码:

      $ git clone git@github.com:alibaba/DataX.git
      

      (2)、通过maven打包:

      $ cd  {DataX_source_code_home}
      $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
      

      打包成功,日志显示如下:

      [INFO] BUILD SUCCESS
      [INFO] -----------------------------------------------------------------
      [INFO] Total time: 08:12 min
      [INFO] Finished at: 2015-12-13T16:26:48+08:00
      [INFO] Final Memory: 133M/960M
      [INFO] -----------------------------------------------------------------
      

      打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下:

      $ cd  {DataX_source_code_home}
      $ ls ./target/datax/datax/
      bin		conf		job		lib		log		log_perf	plugin		script		tmp
      
  • 配置示例:从stream读取数据并打印到控制台

    • 第一步、创建作业的配置文件(json格式)

      可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

      $ cd  {YOUR_DATAX_HOME}/bin
      $  python datax.py -r streamreader -w streamwriter
      DataX (UNKNOWN_DATAX_VERSION), From Alibaba !
      Copyright (C) 2010-2015, Alibaba Group. All Rights Reserved.
      Please refer to the streamreader document:
          https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 
      
      Please refer to the streamwriter document:
           https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
       
      Please save the following configuration as a json file and  use
           python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
      to run the job.
      
      {
          "job": {
              "content": [
                  {
                      "reader": {
                          "name": "streamreader", 
                          "parameter": {
                              "column": [], 
                              "sliceRecordCount": ""
                          }
                      }, 
                      "writer": {
                          "name": "streamwriter", 
                          "parameter": {
                              "encoding": "", 
                              "print": true
                          }
                      }
                  }
              ], 
              "setting": {
                  "speed": {
                      "channel": ""
                  }
              }
          }
      }
      

      根据模板配置json如下:

      #stream2stream.json
      {
        "job": {
          "content": [
            {
              "reader": {
                "name": "streamreader",
                "parameter": {
                  "sliceRecordCount": 10,
                  "column": [
                    {
                      "type": "long",
                      "value": "10"
                    },
                    {
                      "type": "string",
                      "value": "hello,你好,世界-DataX"
                    }
                  ]
                }
              },
              "writer": {
                "name": "streamwriter",
                "parameter": {
                  "encoding": "UTF-8",
                  "print": true
                }
              }
            }
          ],
          "setting": {
            "speed": {
              "channel": 5
             }
          }
        }
      }
      
    • 第二步:启动DataX

      $ cd {YOUR_DATAX_DIR_BIN}
      $ python datax.py ./stream2stream.json 
      

      同步结束,显示日志如下:

      ...
      2023-12-17 11:20:25.263 [job-0] INFO  JobContainer - 
      任务启动时刻                    : 2023-12-17 11:20:15
      任务结束时刻                    : 2023-12-17 11:20:25
      任务总计耗时                    :                 10s
      任务平均流量                    :              205B/s
      记录写入速度                    :              5rec/s
      读出记录总数                    :                  50
      读写失败总数                    :                   0
      

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1436544.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring boot(2.4.x之前版本)和spring cloud项目中配置文件的作用

为了防止理解问题&#xff0c;pom.xml 版本依赖如下 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!--…

【开源】SpringBoot框架开发城市桥梁道路管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询城市桥梁4.2 新增城市桥梁4.3 编辑城市桥梁4.4 删除城市桥梁4.5 查询单个城市桥梁 五、免责说明 一、摘要 1.1 项目介绍 基于VueSpringBootMySQL的城市桥梁道路管理系统&#xff0c;支持…

结构体的特殊声明和自引用

结构体的特殊声明 结构体我们通常是这么声明的&#xff1a; struct Student { char name[20]; int age; int num; }; 或者 struct Student { char name[20]; int age; int num; }s1, s2; 这是我们正常的声明方式&#xff0c; 也是正规的声明方式&#xff0c; 第二…

第三百一十一回

文章目录 1. 概念介绍2. 使用方法3. 示例代码4. 内容总结 我们在上一章回中介绍了"如何在输入框中提示错误"相关的内容&#xff0c;本章回中将介绍如何在输入框中处理光标.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在使用TextField组件作为…

【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的高可靠消息服务设计实现

盘点本年度探索对外服务的百万请求量的高可靠消息服务设计实现 前提回顾消息服务逻辑架构运作流程消息路由系统数据存储系统BitCask结构异地存储容灾 推送系统数据消费模式推、拉模式的切换 实现低延时推送快速确认消息三层存储结构HeapMemoryDirectMemory 总结和展望 前提回顾…

【Mybatis】从0学习Mybatis(2)

前言 本篇文章是从0学习Mybatis的第一篇文章&#xff0c;由于篇幅太长CSDN会限流&#xff0c;因此我打算分开两期来写&#xff0c;这是第二期&#xff01;第一期在这儿&#xff1a;【Mybatis】从0学习Mybatis&#xff08;1&#xff09;-CSDN博客 1.什么是ResultMap结果映射&am…

力扣热门100题 - 5.最长回文子串

力扣热门100题 - 5.最长回文子串 题目描述&#xff1a;示例&#xff1a;提示&#xff1a;解题思路&#xff1a;&#xff08;动态规划&#xff09;代码&#xff1a; 题目链接&#xff1a;5. 最长回文子串 题目描述&#xff1a; 给你一个字符串 s&#xff0c;找到 s 中最长的回…

Python爬虫requests库详解

使用 requests 上一节中&#xff0c;我们了解了 urllib 的基本用法&#xff0c;但是其中确实有不方便的地方&#xff0c;比如处理网页验证和 Cookies 时&#xff0c;需要写 Opener 和 Handler 来处理。为了更加方便地实现这些操作&#xff0c;就有了更为强大的库 requests&…

「HarmonyOS」CustomDialogController自定义弹窗使用方法

需求背景&#xff1a; 在开发的过程中&#xff0c;总会遇到一些功能需要使用到弹窗进行信息的输入和修改&#xff0c;如用户个人信息的修改&#xff1b;在UI设计上每个App通常都会有各自的样式&#xff0c;而不是使用系统的标准样式&#xff0c;所以通常我们需要进行自定义弹窗…

TryHackMe-Net Sec Challenge练习

本文相关的TryHackMe实验房间链接&#xff1a;TryHackMe | Why Subscribe nmap nmap -T5 -p- 10.10.90.32 -T5 扫描速度 -p- 全端口扫描 答题&#xff1a; 这题叫我们找藏在http服务下的flag&#xff0c;根据上面扫出来的端口&#xff0c;所以我们开始搞80 这里简单介绍一下…

2.6 假期作业

分布编译 -ESc iso 1.预处理:头文件展开&#xff0c;宏替换&#xff0c;删除注释&#xff0c;不会查找语法错误 例&#xff1a;gcc -E 1.c -o 1.i 2.编译&#xff1a;生成汇编文件&#xff0c;会查找语法错误 例&#xff1a;gcc -S 1.i -o 1.s 3.汇编&#xff1a;生成二…

亚马逊认证考试系列 - 知识点 - LightSail介绍

一、引言 在当今云计算的时代&#xff0c;亚马逊网络服务&#xff08;AWS&#xff09;已成为业界领先的云服务提供商。其中&#xff0c;LightSail服务是AWS为简化云计算的入门和使用而推出的一项服务。它特别适合那些想要快速搭建网站、开发环境或小型应用的用户。通过LightSa…

代码随想录算法训练营第二八天 | 分割 子集

目录 复原IP地址子集子集 II LeetCode 93.复原IP地址 LeetCode 78.子集 LeetCode 90.子集II 复原IP地址 一些字符串的基本操作不会 s.insert(i 1, ‘.’); s.deleteCharAt(i 1); class Solution {List<String> result new ArrayList<>();public List<St…

EMC测试介绍

EMC测试介绍 EMC包括电磁干扰(EMI) 和抗电磁干扰(EMS)两个部分。发射干扰传导发射测试极限线以峰值检坡器测量时使用的决策树应用EN55022标准的波形示例测试仪器![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/4580f693ae9a4f84891ece29681c7bf2.png) 辐射发射测试…

【Iceberg学习二】Branch和Tag在Iceberg中的应用

Iceberg 表元数据保持一个快照日志&#xff0c;记录了对表所做的更改。快照在 Iceberg 中至关重要&#xff0c;因为它们是读者隔离和时间旅行查询的基础。为了控制元数据大小和存储成本&#xff0c;Iceberg 提供了快照生命周期管理程序&#xff0c;如 expire_snapshots&#xf…

c语言--指针的传值调用和传址调用

目录 一、前言二、传值调用。三、传址调用四、总结 一、前言 学习指针的目的是使用指针解决问题&#xff0c;那什么问题&#xff0c;非指针不可呢&#xff1f; 二、传值调用。 写个函数&#xff0c;交换两个整数的内容。 #include<stdio.h> void Swap1(int x, int y)…

【halcon】write_image 图片保存

前言 write_image 是一个可以用来保存图片的算子&#xff0c;可以将Image对象保存成各种格式的图片。还可以对图片进行压缩。 正文 参数&#xff1a; Image&#xff08;输入对象&#xff09;&#xff1a; 输入图像或输入图像的数组。支持的像素类型包括byte、direction、cy…

opencv c++ (6):直方图

1. 绘制直方图 api不在做详细介绍&#xff0c;具体看以下代码例子 #include <iostream> #include<opencv.hpp> #include<opencv2\highgui\highgui.hpp>using namespace std; using namespace cv;int main() {Mat src imread("src.jpg");if (src…

Qt 常见容器类用法(二)

目录 QList类 QLinkedList类 QList类 对于不同的数据类型&#xff0c;QList<T>采取不同的存储策略&#xff0c;存储策略如下&#xff1a; 如果T是一个指针类型或指针大小的基本数据类型(该基本类型占有的字节数和指针类型占有的字节数相同)&#xff0c;QList<T>…

python 爬虫安装http请求库

我的是window环境&#xff0c;安装的python3&#xff0c;如果再linux环境&#xff1a;pip install requests 开始&#xff1a; 上面我们成功发送请求并获取到响应&#xff0c;现在需要解析html或xml获取数据&#xff0c;因此我使用现成的工具库Beautiful Soup