什么是数据同步利器DataX,如何使用?

news2024/11/22 19:31:54

转载至我的博客 https://www.infrastack.cn ,公众号:架构成长指南

今天给大家分享一个阿里开源的数据同步工具DataX,在Github拥有14.8k的star,非常受欢迎,官网地址:https://github.com/alibaba/DataX

什么是 Datax?

DataX 是阿里云 DataWorks数据集成 的开源版本,使用Java 语言编写,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

应用场景有那些?

  1. 数据仓库同步:DataX 可以帮助将数据从一个数据仓库(如关系型数据库、大数据存储系统等)同步到另一个数据仓库,实现数据的迁移、备份或复制。
  2. 数据库迁移:当我们需要将数据从一个数据库平台迁移到另一个数据库平台时,DataX 可以帮助完成数据的转移和转换工作
  3. 数据集成与同步:DataX 可以用作数据集成工具,用于将多个数据源的数据进行整合和同步。它支持多种数据源,包括关系型数据库、NoSQL 数据库、文件系统等,可以将这些数据源的数据整合到一个目标数据源中。
  4. 数据清洗与转换:DataX 提供了丰富的数据转换能力,可以对数据进行清洗、过滤、映射、格式转换等操作。这对于数据仓库、数据湖和数据集市等数据存储和分析平台非常有用,可以帮助提高数据质量和一致性。
  5. 数据备份与恢复:DataX 可以用于定期备份和恢复数据。通过配置定时任务,可以将数据从源端备份到目标端,并在需要时进行数据恢复。

DataX支持那些数据源?

架构设计

DataX作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX 开源版本支持单机多线程模式完成同步作业运行,如下图

  1. DataX完成单个数据同步的作业,称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张表的mysql数据同步到odps里面。 DataX的调度决策是:

  1. Job根据分表切分成了100个Task。
  2. 根据20个并发,DataX计算需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责5个并发共计运行25个Task。

如何使用 Datax?

点击datax 下载,下载后解压至本地某个目录,如下图

image-20240203222845753

用例说明

这里为了方便演示,我们同步MySQL的user_info表至MySQL的ods_test_mysql_user_info_m,同步条件为更新时间字段,如下

在实际工作中你可以选择不同类型的数据源测试


drop table ods_test_mysql_user_info_m

CREATE TABLE `user_info` (
  `id` int NOT NULL COMMENT 'ID',
  `name` varchar(50) NOT NULL COMMENT '名称',
  `sex` tinyint NOT NULL COMMENT '性别 1男 2女',
  `phone` varchar(11) COMMENT '手机',
	`address` varchar(1000)  COMMENT '地址',
	`age` int  COMMENT '年龄',
	`create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '创建时间',
  `update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用户信息表';

CREATE TABLE `ods_test_mysql_user_info_m` (
  `id` int NOT NULL COMMENT 'ID',
  `name` varchar(50) NOT NULL COMMENT '名称',
  `sex` tinyint NOT NULL COMMENT '性别 1男 2女',
  `phone` varchar(11) COMMENT '手机',
	`address` varchar(1000)  COMMENT '地址',
	`age` int  COMMENT '年龄',
	`create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '创建时间',
  `update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用户信息数仓表';

在user_info表中插入数据如下

创建作业的配置文件(json格式)

在 datax 的 script 目录,创建ods_test_mysql_user_info_m.json文件,配置如下,mysqlreader表示读取端,mysqlwriter表示写入端

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["id","name","sex","phone","address","age","create_time","update_time"],
            		         "splitPk": "id",
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"],
                                "table": ["user_info"]
                            }
                        ],
                        "password": "root",
                        "username": "root",
                        "where": "update_time > '${updateTime}' "
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                       "writeMode": "replace",
                        "column": ["id","name","sex","phone","address","age","create_time","update_time"],
                        "connection": [
                            {
                                "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false",
                                "table": ["ods_test_mysql_user_info_m"]
                            }
                        ],
                        "username": "root",
                        "password": "root",
                        "preSql": [],
                        "session": [
                          "set session sql_mode='ANSI'"
                        ]

                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

创建执行脚本

为了更贴合实际,写一个调度脚本sync.sh支持动态参数来执行任务

#!/bin/bash
## 执行示例 sh /Users/weizhao.dong/Documents/soft/datax/datax-script/call.sh /Users/weizhao.dong/Documents/soft/datax/datax-script/dwd_g2park_inout_report_s.json 1
jsonScript=$1
echo '执行脚本:'$jsonScript
interval=$2
echo "时间间隔(分钟):"$interval
now_time=$(date '+%Y-%m-%d %H:%M:%S')
echo "当前时间:"$now_time
update_time=$(date -v -${interval}M  '+%Y-%m-%d %H:%M:%S')
#linux 更新时间获取
#update_time=$(date -d "${now_time} $interval minute ago" +"%Y-%m-%d %H:%M:%S")
echo "更新时间:"$update_time
#执行
python3 /Users/weizhao.dong/Documents/soft/datax/bin/datax.py $jsonScript -p "-DupdateTime='${update_time}'"

假设我们要执以上ods_test_mysql_user_info_m.json脚本,并且同步十分钟之前的数据,如下

./sync.sh ods_test_mysql_user_info_m.json 10
测试

执行./sync.sh ods_test_mysql_user_info_m.json 10进行同步

以上结果可能有些人有疑问,就三条数据执行时间为 10s,其实这个 10s主要是初始化时间,耗时过长,同步的数据量多了优势就体现出来了,以下为实际生产同步数据结果,可以看到同步63102条耗时22s

推荐用法

以上我们只是通过一个简单的示例来演示了dataX如何使用,如果只是一次性同步,没问题,但是如果是周期性进行同步,有以下几种方式推荐

crontab调度

这种方式是最简单的,可以使用操作系统中的crontab定时调度,通过crontab -e编辑corn 任务,添加对应脚本即可

海豚调度器

在种方式在大数据领域用的比较多,典型场景就是 mysql 同步到数仓,海豚调度器内置了 datax 并且提供了图形化配置界面,配置起来非常方便

同时每次执行都有记录,并且都有对应的日志

定时任务框架(elasticjob/xxl-job)

在我们实际使用的业务系统定时调度框架都支持调度 shell 脚本,通过传入对应参数也可执行

扫描下面的二维码关注我们的微信公众帐号,在微信公众帐号中回复◉加群◉即可加入到我们的技术讨论群里面共同学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1452324.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++类和对象新手保姆级上手教学(上)

前言: c其实顾名思义就是c语言的升级版,很多刚学c的同学第一感觉就是比c语言难学很多,其实没错,c里的知识更加难以理解可以说杂且抽象,光是类和对象,看起来容易,但想完全吃透,真的挺…

(免费领源码)java#springboot#mysql医院自助服务系统74853-计算机毕业设计项目选题推荐

目 录 摘要 1 绪论 1.1研究意义 1.2研究背景 1.3springboot框架介绍 1.3论文结构与章节安排 2 医院自助服务系统系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据流程 3.3.2 业务流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系统用例分…

java+springboot+vue试题库在线学习系统05umj

技术路线: B/S架构,后端springboot框架,前端Vue.js框架。 主要功能模块(至少六大功能),参考任务书并拓展 (1)用户管理模块:规定不同角色的用户对系统中各个功能模块的使用…

【学网攻】 第(29)节 -- 综合实验二

系列文章目录 目录 系列文章目录 文章目录 前言 一、综合实验 二、实验 1.引入 实验目标 实验设备 实验拓扑图 实验配置 实验验证 文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻…

【Make编译控制 06】CMake初步使用

目录 一、概述与安装 二、编译源文件 三、无关文件管理 一、概述与安装 CMake是一个跨平台的项目构建工具,相比于Makefile,CMake更加高级,因为CMake代码在执行的时候是会先翻译生成Makefile文件,再调用Makefile文件完成项目构…

【Python--网络编程之TCP三次握手】

🚀 作者 :“码上有前” 🚀 文章简介 :Python开发技术 🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬 Python网络编程之[TCP三次握手] 往期内容代码见资源,效果图如下一、实验要求二、协…

MySQL数据库基础(六):DDL数据库操作

文章目录 DDL数据库操作 一、MySQL的组成结构 二、数据库的基本操作 1、创建数据库 2、查询数据库 3、删除数据库 4、选择数据库 三、总结 DDL数据库操作 一、MySQL的组成结构 注:我们平常说的MySQL,其实主要指的是MySQL数据库管理软件。 一个M…

django定时任务(django-crontab)

目录 一:安装django-crontab: 二:添加django_crontab到你的INSTALLED_APPS设置: 三:运行crontab命令来创建或更新cron作业: 四:定义你的cron作业 五:创建你的管理命令&#xff…

电源管理芯片是指在电子设备系统中,负责对电能的变换、分配、检测等进行管理的芯片

萨科微半导体宋仕强介绍说,电源管理芯片是指在电子设备系统中,负责对电能的变换、分配、检测等进行管理的芯片,其性能和可靠性直接影响电子设备的工作效率和使用寿命,是电子设备中的关键器件。萨科微slkor(www.slkormi…

牛客网SQL进阶128:未完成试卷数大于1的有效用户

官网链接: 未完成试卷数大于1的有效用户_牛客题霸_牛客网现有试卷作答记录表exam_record(uid用户ID, exam_id试卷ID, st。题目来自【牛客题霸】https://www.nowcoder.com/practice/46cb7a33f7204f3ba7f6536d2fc04286?tpId240&tqId2183007&ru%2…

【王道数据结构】【chapter5树与二叉树】【P185t4】

编程求以孩子兄弟表示法存储的森林的叶节点数 #include <iostream> typedef struct node{char data;struct node * pchild;struct node * pbrother; }node,*pnode;pnode buynode(char x) {node* tmp(pnode) malloc(sizeof (node));tmp->datax,tmp->pchild nullptr,…

Crypto-RSA1

题目&#xff1a; 已知p,q,dp,dq,c求明文&#xff1a; 首先有如下公式&#xff1a; dp≡d mod (p-1)&#xff0c;dq≡d mod (q-1) &#xff0c; m≡c^d(mod n) &#xff0c; npq python代码解题如下&#xff1a; import libnump 863763376725700856709965348654109117132049…

人工智能学习与实训笔记(三):神经网络之目标检测问题

目录 五、目标检测问题 5.1 目标检测基础概念 5.1.1 边界框&#xff08;bounding box&#xff09; 5.1.2 锚框&#xff08;Anchor box&#xff09; 5.1.3 交并比 5.2 单阶段目标检测模型YOLOv3 5.2.1 YOLOv3模型设计思想 5.2.2 YOLOv3模型训练过程 5.2.3 如何建立输出…

【JS逆向+Python模拟API请求】逆向某一个略微中等的混淆网站,并模拟调用api请求 仅供学习。注:不是源代码混淆,而是一个做代码混淆业务的网站,

逆向日期&#xff1a;2024.02.16 使用工具&#xff1a;Node.js 加密方法&#xff1a;RSA标准库 文章全程已做去敏处理&#xff01;&#xff01;&#xff01; 【需要做的可联系我】 AES解密处理&#xff08;直接解密即可&#xff09;&#xff08;crypto-js.js 标准算法&#xf…

ubuntu22.04@laptop OpenCV Get Started: 012_mouse_and_trackbar

ubuntu22.04laptop OpenCV Get Started: 012_mouse_and_trackbar 1. 源由2. mouse/trackbar应用Demo2.1 C应用Demo2.2 Python应用Demo 3. 鼠标位置跟踪注释3.1 注册回调函数3.2 回调操作3.3 效果 4. 使用轨迹栏调整图像大小4.1 初始化轨迹栏&注册回调函数4.2 回调操作4.3 效…

GEE:如何在下载CSV文件时去除不想要的属性列

作者:CSDN @ _养乐多_ 如下图所示,我们在 Google Earth Engine(GEE)平台上处理完数据,并想以csv文件格式下载属性数据到本地时,GEE会自动添加一些属性,比如用来记录点的坐标的.geo属性。那么我们如何去除.geo列或者其他不想要的列呢?比如,去除下图中index、SR_B3、SR…

如何简单上手清华AutoGPT并搭建到本地环境

一、准备工作 安装Docker&#xff1a;确保你的本地机器上已经安装了Docker。如果还没有安装&#xff0c;请访问Docker官方网站并按照指引进行安装。--点击进入Docker官网 获取清华AutoGPT的Docker镜像&#xff1a;清华AutoGPT团队可能已经提供了一个Docker镜像&#xff0c;方便…

C++入门学习(二十九)goto语句

在C中&#xff0c;goto语句是一种控制流语句&#xff0c;用于无条件地转移到程序中指定的行。goto语句的使用通常是不推荐的&#xff0c;因为它可能导致代码结构变得混乱、不易理解和维护。然而&#xff0c;在某些特殊情况下&#xff0c;goto语句可能是一种有效的解决方法。 示…

2024.2.15 模拟实现 RabbitMQ —— 消息持久化

目录 引言 约定存储方式 消息序列化 重点理解 针对 MessageFileManager 单元测试 小结 引言 问题&#xff1a; 关于 Message&#xff08;消息&#xff09;为啥在硬盘上存储&#xff1f; 回答&#xff1a; 消息操作并不涉及到复杂的增删查改消息数量可能会非常多&#xff…

大数据02-数据仓库

零、文章目录 大数据02-数据仓库 1、数据仓库介绍 &#xff08;1&#xff09;基本概念 数据仓库&#xff0c;英文名称为Data Warehouse&#xff0c;可简写为DW或DWH。数据仓库的目的是构建面向分析的集成化数据环境&#xff0c;为企业提供决策支持&#xff08;Decision Sup…