DataX数据同步工具使用

news2025/1/11 18:40:08

1.DataX 简介

DataX 是阿里云 DataWorks 数据集成 的开源版本,主要就是用于实现数据间的离线同步。 DataX 致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各种异构数据源(即不同的数据库) 间稳定高效的数据同步功能。
在这里插入图片描述

图片为了 解决异构数据源同步问题,DataX 将复杂的网状同步链路变成了星型数据链路 ,DataX 作为中间传输载体负责连接各种数据源;

当需要接入一个新的数据源时,只需要将此数据源对接到 DataX,便能跟已有的数据源作为无缝数据同步。

2.DataX3.0 框架设计

DataX 采用 Framework + Plugin 架构,将数据源读取和写入抽象称为 Reader/Writer 插件,纳入到整个同步框架中。
在这里插入图片描述

角色作用
Reader(采集模块)负责采集数据源的数据,将数据发送给 Framework。
Writer(写入模块)负责不断向 Framework 中取数据,并将数据写入到目的端。
Framework(中间商)负责连接 Reader 和 Writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

3.DataX3.0 核心架构

DataX 完成单个数据同步的作业,我们称为 Job,DataX 接收到一个 Job 后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分、TaskGroup 管理等功能。

在这里插入图片描述

  • DataX Job 启动后,会根据不同源端的切分策略,将 Job 切分成多个小的 Task (子任务),以便于并发执行。
  • 接着 DataX Job 会调用 Scheduler 模块,根据配置的并发数量,将拆分成的 Task 重新组合,组装成TaskGroup(任务组)
  • 每一个 Task 都由 TaskGroup 负责启动,Task 启动后,会固定启动 Reader --> Channel -->
    Writer 线程来完成任务同步工作。
  • DataX 作业运行启动后,Job 会对 TaskGroup 进行监控操作,等待所有 TaskGroup 完成后,Job
    便会成功退出(异常退出时 值非 0 )

DataX 调度过程:

1.首先 DataX Job 模块会根据分库分表切分成若干个 Task,然后根据用户配置并发数,来计算需要分配多少个 TaskGroup;
2.计算过程:Task / Channel = TaskGroup,最后由 TaskGroup 根据分配好的并发数来运行 Task(任务)

3.使用 DataX 实现数据同步

3.1准备工作

JDK(1.8 以上,推荐 1.8)
Python(2,3 版本都可以)
Apache Maven 3.x(Compile DataX)(手动打包使用,使用 tar 包方式不需要安装)

主机名操作系统IP 地址软件包
node1CentOS 7.4192.168.1.1jdk-8u181-linux-x64.tar.gz datax.tar.gz
node2CentOS 7.4192.168.1.2jdk-8u181-linux-x64.tar.gz datax.tar.gz

3.2安装 JDK

首先需要去https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html(需要创建 Oracle 账号)下载jdk

[root@node1 ~] ls
anaconda-ks.cfg  jdk-8u181-linux-x64.tar.gz
[root@node1 ~] tar zxf jdk-8u181-linux-x64.tar.gz 
[root@node1 ~] ls
anaconda-ks.cfg  jdk1.8.0_181  jdk-8u181-linux-x64.tar.gz
[root@node1 ~] mv jdk1.8.0_181 /usr/local/java
[root@node1 ~] cat <<END >> /etc/profile
export JAVA_HOME=/usr/local/java
export PATH=$PATH:"$JAVA_HOME/bin"
END
[root@node1 ~] source /etc/profile
[root@node1 ~] java -version
  • 因为 CentOS 7 上自带 Python 2.7 的软件包,所以不需要进行安装。

3.3Linux 上安装 DataX 软件

[root@node1 ~] wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
[root@node1 ~] tar zxf datax.tar.gz -C /usr/local/
[root@node1 ~] rm -rf /usr/local/datax/plugin/*/._*      # 需要删除隐藏文件 (重要)
  • 当未删除时,可能会输出:[/usr/local/datax/plugin/reader/._drdsreader/plugin.json]
    不存在. 请检查您的配置文件.

验证:

[root@node1  ~] cd /usr/local/datax/bin
[root@node1  ~] python datax.py ../job/job.json       # 用来验证是否安装成功

输出:

2021-12-13 19:26:28.828 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-13 19:26:28.829 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.060s |  All Task WaitReaderTime 0.068s | Percentage 100.00%
2021-12-13 19:26:28.829 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-13 19:26:18
任务结束时刻                    : 2021-12-13 19:26:28
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

3.4DataX 基本使用

查看 streamreader --> streamwriter 的模板:

[root@node1~] python /usr/local/datax/bin/datax.py -r streamreader -w streamwriter

输出:

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据模板编写 json 文件

[root@node1 ~] cat <<END > test.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [        # 同步的列名 (* 表示所有)
       {
           "type":"string",
    "value":"Hello."
       },
       {
           "type":"string",
    "value":"河北彭于晏"
       },
   ], 
                        "sliceRecordCount": "3"     # 打印数量
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "utf-8",     # 编码
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"         # 并发 (即 sliceRecordCount * channel = 结果)
            }
        }
    }
}

输出:(要是复制我上面的话,需要把 # 带的内容去掉)
在这里插入图片描述

3.5安装 MySQL 数据库

分别在两台主机上安装:

[root@node1 ~] yum -y install mariadb mariadb-server mariadb-libs mariadb-devel   
[root@node1  ~] systemctl start mariadb            # 安装 MariaDB 数据库
[root@node1 ~] mysql_secure_installation            # 初始化 
NOTE: RUNNING ALL PARTS OF THIS SCRIPT IS RECOMMENDED FOR ALL MariaDB
      SERVERS IN PRODUCTION USE!  PLEASE READ EACH STEP CAREFULLY!

Enter current password for root (enter for none):       # 直接回车
OK, successfully used password, moving on...
Set root password? [Y/n] y                            # 配置 root 密码
New password: 
Re-enter new password: 
Password updated successfully!
Reloading privilege tables..
 ... Success!
Remove anonymous users? [Y/n] y                     # 移除匿名用户
 ... skipping.
Disallow root login remotely? [Y/n] n                # 允许 root 远程登录
 ... skipping.
Remove test database and access to it? [Y/n] y         # 移除测试数据库
 ... skipping.
Reload privilege tables now? [Y/n] y                    # 重新加载表
 ... Success!

1)准备同步数据(要同步的两台主机都要有这个表)

MariaDB [(none)]> create database `course-study`;
Query OK, 1 row affected (0.00 sec)

MariaDB [(none)]> create table `course-study`.t_member(ID int,Name varchar(20),Email varchar(30));
Query OK, 0 rows affected (0.00 sec)

在这里插入图片描述
因为是使用 DataX 程序进行同步的,所以需要在双方的数据库上开放权限:

grant all privileges on *.* to root@'%' identified by '123123';
flush privileges;

2)创建存储过程:

DELIMITER $$
CREATE PROCEDURE test()
BEGIN
declare A int default 1;
while (A < 3000000)do
insert into `course-study`.t_member values(A,concat("LiSa",A),concat("LiSa",A,"@163.com"));
set A = A + 1;
END while;
END $$
DELIMITER ;

在这里插入图片描述
3)调用存储过程(在数据源配置,验证同步使用):

call test();

3.6通过 DataX 实 MySQL 数据同步

1)生成 MySQL 到 MySQL 同步的模板:

[root@node1 ~]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",       # 读取端
                    "parameter": {
                        "column": [],         # 需要同步的列 (* 表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [],       # 连接信息
                                "table": []       # 连接表
                            }
                        ], 
                        "password": "",        # 连接用户
                        "username": "",        # 连接密码
                        "where": ""         # 描述筛选条件
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter",       # 写入端
                    "parameter": {
                        "column": [],         # 需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "",       # 连接信息
                                "table": []       # 连接表
                            }
                        ], 
                        "password": "",        # 连接密码
                        "preSql": [],         # 同步前. 要做的事
                        "session": [], 
                        "username": "",        # 连接用户 
                        "writeMode": ""        # 操作类型
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""          # 指定并发数
            }
        }
    }
}

2)编写 json 文件:

[root@node1 ~]# vim install.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

3)验证

[root@node1 ~]# python /usr/local/datax/bin/datax.py install.json

输出:

2021-12-15 16:45:15.120 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-15 16:45:15.120 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2999999 records, 107666651 bytes | Speed 2.57MB/s, 74999 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 82.173s |  All Task WaitReaderTime 75.722s | Percentage 100.00%
2021-12-15 16:45:15.124 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-15 16:44:32
任务结束时刻                    : 2021-12-15 16:45:15
任务总计耗时                    :                 42s
任务平均流量                    :            2.57MB/s
记录写入速度                    :          74999rec/s
读出记录总数                    :             2999999
读写失败总数                    :                   0

你们可以在目的数据库进行查看,是否同步完成。
在这里插入图片描述

  • 上面的方式相当于是完全同步,但是当数据量较大时,同步的时候被中断,是件很痛苦的事情;
  • 所以在有些情况下,增量同步还是蛮重要的。

3.7 使用 DataX 进行增量同步

使用 DataX 进行全量同步和增量同步的唯一区别就是:增量同步需要使用 where 进行条件筛选。(即,同步筛选后的 SQL)

1)编写 json 文件:

[root@node1 ~]# vim where.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "where": "ID <= 1888",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}
  • 需要注意的部分就是:where(条件筛选) 和 preSql(同步前,要做的事) 参数。

2)验证:

[root@node1 ~]# python /usr/local/data/bin/data.py where.json

输出:

2021-12-16 17:34:38.534 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-16 17:34:38.534 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.002s |  All Task WaitReaderTime 100.570s | Percentage 100.00%
2021-12-16 17:34:38.537 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-16 17:34:06
任务结束时刻                    : 2021-12-16 17:34:38
任务总计耗时                    :                 32s
任务平均流量                    :            1.61KB/s
记录写入速度                    :             62rec/s
读出记录总数                    :                1888
读写失败总数                    :                   0

目标数据库上查看:
在这里插入图片描述
3)基于上面数据,再次进行增量同步:

主要是 where 配置:"where": "ID > 1888 AND ID <= 2888"      # 通过条件筛选来进行增量同步
同时需要将我上面的 preSql 删除(因为我上面做的操作时 truncate 表)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/467112.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

# Linux shell终端设置代理服务器的方法

Linux shell终端设置代理服务器的方法 文章目录 Linux shell终端设置代理服务器的方法1 变量列表2 设置方法2.1 设置代理2.2 测试代理 3 软件专用代理3.1 yum专用代理3.2 git专用代理3.3 wget专用代理3.4 curl专用代理3.5 pip专用代理3.6 aria2c专用代理 4 最后 通过设置Linux变…

三个方法教你快速找到LinkedIn领英的潜在客户(置顶收藏)

第三点–最后一点是重点&#xff0c;要看到最后 相信很多外贸业务员都有这样的问题&#xff1a;“为什么你运营的账户询问的人数那么多&#xff0c;我自己运营的账户却没有人问呢&#xff1f;你是不是有什么好的运营技巧啊&#xff0c;可以给我说一下吗&#xff1f;”事实上&a…

Shell编程循环语句for while until(心有所觉,但亦做不解)

一、for 循环 1.用法和特点 读取不同的变量值&#xff0c;用来逐个执行同一组命令 for循环经常使用在已经知道要进行多少次循环的场景 for 变量 in 取值列表 -- 默认取值分割符&#xff08;空格、制表符、换行符&#xff09;do 或 {命令序列 done 或 }2.执行指…

时序预测 | Matlab实现SSA-BiLSTM、BiLSTM麻雀算法优化双向长短期记忆神经网络时间序列预测(含优化前后对比)

时序预测 | Matlab实现SSA-BiLSTM、BiLSTM麻雀算法优化双向长短期记忆神经网络时间序列预测(含优化前后对比) 目录 时序预测 | Matlab实现SSA-BiLSTM、BiLSTM麻雀算法优化双向长短期记忆神经网络时间序列预测(含优化前后对比)预测效果基本介绍程序设计参考资料 预测效果 基本介…

数学中为什么要研究各种各样的变换?

从信号处理角度来说 分析平稳信号所蕴涵的信息, 一般地Fourier 变换就能应付自如。但以不稳定动力系统为特征的物理世界, 信号往往具有如下特点: 非平稳、非线性、非确定、非可积、非连续、非光滑、非周期、非对称等等。使用Fourier 变换分析、解释非平稳信号, 就显得无能为力…

【日志系列】日志框架Log4j2源码解析

初始化 LoggerFactory private static final Logger logger LoggerFactory.getLogger(LogFilter.class);LoggerFactory#getLogger() public static Logger getLogger(Class<?> clazz) {Logger logger getLogger(clazz.getName());if (DETECT_LOGGER_NAME_MISMATCH) {…

设计模式 Map+函数式接口减少if else

参考资料 代码优雅之道——如何干掉过多的if else 目录 一. 前期准备1.1 标记邮箱种类的接口1.2 邮箱类型区分类1.3 入参Form实体类 二. 邮件发送的业务聚合类三. 定义函数式接口&#xff0c;创建邮件发送的Map四. 效果 一. 前期准备 1.1 标记邮箱种类的接口 import java.la…

一文让你真正了解正则表达式

1 正则表达式是什么 正则表达式(Regular Expression)其实就是一门工具&#xff0c;目的是为了字符串模式匹配&#xff0c;从而实现搜索和替换功能。它起源于上个20世纪50年代科学家在数学领域做的一些研究工作&#xff0c;后来才被引入到计算机领域中。从它的命名我们可以知道…

3自由度并联绘图机器人实现写字功能(二)

1. 功能说明 本文示例将实现R305b样机3自由度并联绘图机器人写字的功能。本实验使用的样机是用探索者兼容零件制作的。 2. 电子硬件 在这个示例中&#xff0c;采用了以下硬件&#xff0c;请大家参考&#xff1a; 主控板 Basra主控板&#xff08;兼容Arduino Uno&#xff09; 扩…

Visual Studio C# WinForm开发入门(5):TabControl 控件使用

TabContrl选项卡控件可创建标签化窗口&#xff0c;在实际 编程中经常用到&#xff0c;该控件的作用是将相关的组件组合到一系列选项卡页面上。 比如下面的例子&#xff0c;在tabPage1页面和tabPage2页面各放了2个checkBox控件&#xff0c;通过点击不同page即可切换&#xff1a;…

交叉编译工具

工具链有一个松散的名称约定&#xff0c;如 arch[-vendor][-os]-abi-language . arch 适用于架构&#xff0c;编译器用于哪个目标平台&#xff1a; arm &#xff0c; mips &#xff0c; x86 &#xff0c; i686 ... vendor 是工具链供应商&#xff0c;以厂家名称命名的&#xf…

权威学者、企业CFO荟聚上海国家会计学院,共探「智能会计 价值财务」

4月21日&#xff0c;由用友主办的「智能会计 价值财务」2023企业数智化财务创新峰会在上海国家会计学院圆满举办。学院权威教授、业内专家与来自央国企、行业领先企业的财务先锋&#xff0c;线下云端共聚一堂&#xff0c;数万人共探大型企业财务数智化的全新价值主张。 会议伊始…

WLAN - 五大安全措施

文章目录 1 概述2 五大安全措施2.1 SSID 访问控制2.2 物理地址过滤 MAC2.3 有线等效保密 WEP2.4 WPA&#xff08;IEEE 802.11i 草案&#xff09;2.5 WPA2&#xff08;IEEE 802.11i&#xff09; 3 扩展3.1 网工软考真题 1 概述 无线局域网面临着两个主要问题&#xff0c;一是增…

【Unity入门】19.定时调用Invoke

【Unity入门】定时调用Invoke 大家好&#xff0c;我是Lampard~~ 欢迎来到Unity入门系列博客&#xff0c;所学知识来自B站阿发老师~感谢 &#xff08;一&#xff09;计时器 &#xff08;1&#xff09;Invoke 单词调用 计时器我们并不陌生&#xff0c;在cocos上有着schedule类是…

深度学习 - 45.MMOE Gate 简单实现 By Keras

目录 一.引言 二.MMoE 模型分析 三.MMoE 逻辑实现 • Input • Expert Output • Gate Output • Weighted Sum • Sigmoid Output • 完整代码 四.总结 一.引言 上一篇文章介绍了 MMoE 借鉴 MoE 的思路&#xff0c;为每一类输出构建一个 Gate 并最终加权多个 Exper…

http---HTTP缓存

目录 1、缓存介绍 2、http缓存 3、强缓存 4、协商缓存 1、缓存介绍 缓存&#xff1a;存储将被用的数据&#xff0c;让数据访问更快。 缓存相关术语 命中&#xff1a;在缓存中找到了请求的数据不命中/穿透&#xff1a;缓存中没有需要的数据命中率&#xff1a;命中次数/总…

Yarn(Yet Another Reource Negotiator)另一个资源协调者

官网引用 总结性 产生的需求 YARN工作逻辑 通用的资源管理系统&#xff0c;为上一层应用提供统一的资源管理和调度。解决集群资源利用率&#xff0c;数据共享&#xff0c;资源管理统一问题&#xff0c;yarn取代Job Tracker角色 组件说明 Client 向RM提交任务&#xff0c;终…

1、软件测试概述

1、软件测试概述 一、软件生命周期二、软件开发模型1、瀑布模型2、增量模型3、原型模型4、敏捷开发 三、软件质量1、软件质量概念2、影响软件质量的因素 一、软件生命周期 软件生命周期分为多个阶段&#xff0c;每个阶段有明确的任务&#xff0c;通常&#xff0c;可将软件生命…

ARM寄存器组织

ARM有37个32位长的寄存器&#xff1a; 1个用做PC&#xff08;Program Counter&#xff09;&#xff1b; 1个用做CPSR(Current Program Status Register)&#xff1b; 5个用做SPSR&#xff08;Saved Program Status Registers&#xff09;&#xff1b; 30个通用寄存器。 AR…

Unity之OpenXR+XR Interaction Toolkit实现 射线和物体交互事件回调

前言 前面我们介绍了如何抓取物体&#xff0c;今天我们来说一下如何和3D的物体进行交互&#xff0c;得到接触的事件回调。 交互的两种方式&#xff1a; 1.直接抓取或者射线抓取物体&#xff0c;得到接触回调 2.射线或者手部触摸物体后&#xff0c;得到接触回调 准备工作 有了…