阿里的又一款数据高效同步工具DataX,真香!

news2025/1/12 23:41:18

我们公司有个项目的数据量高达五千万,但是因为报表那块数据不太准确,业务库和报表库又是跨库操作,所以并不能使用 SQL 来进行同步。当时的打算是通过 mysqldump 或者存储的方式来进行同步,但是尝试后发现这些方案都不切实际:

  • mysqldump:不仅备份需要时间,同步也需要时间,而且在备份的过程,可能还会有数据产出(也就是说同步等于没同步)

  • 存储方式:这个效率太慢了,要是数据量少还好,我们使用这个方式的时候,三个小时才同步两千条数据 ...

后面在网上查看后:

发现 DataX 这个工具用来同步不仅速度快,而且同步的数据量基本上也相差无几。

一、DataX 简介


DataX 是阿里云 DataWorks 数据集成 的开源版本,主要就是用于实现数据间的离线同步。DataX 致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各种异构数据源(即不同的数据库) 间稳定高效的数据同步功能。

为了 解决异构数据源同步问题,DataX 将复杂的网状同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源;当需要接入一个新的数据源时,只需要将此数据源对接到 DataX,便能跟已有的数据源作为无缝数据同步。

1.DataX3.0 框架设计

DataX 采用 Framework + Plugin 架构,将数据源读取和写入抽象称为 Reader/Writer 插件,纳入到整个同步框架中。

2.DataX3.0 核心架构

DataX 完成单个数据同步的作业,我们称为 Job,DataX 接收到一个 Job 后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分、TaskGroup 管理等功能。

DataX Job 启动后,会根据不同源端的切分策略,将 Job 切分成多个小的 Task (子任务),以便于并发执行。接着 DataX Job 会调用 Scheduler 模块,根据配置的并发数量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)

每一个 Task 都由 TaskGroup 负责启动,Task 启动后,会固定启动 Reader --> Channel --> Writer 线程来完成任务同步工作。DataX 作业运行启动后,Job 会对 TaskGroup 进行监控操作,等待所有 TaskGroup 完成后,Job 便会成功退出(异常退出时 值非 0)

DataX 调度过程:

首先 DataX Job 模块会根据分库分表切分成若干个 Task,然后根据用户配置并发数,来计算需要分配多少个 TaskGroup(计算过程:Task / Channel = TaskGroup)

最后由 TaskGroup 根据分配好的并发数来运行 Task(任务)

二、使用 DataX 实现数据同步


准备工作:

  • JDK(1.8 以上,推荐 1.8)

  • Python(2,3 版本都可以)

  • Apache Maven 3.x(Compile DataX)(手动打包使用,使用 tar 包方式不需要安装)

安装 JDK:

[root@MySQL-1 ~]# ls
anaconda-ks.cfg  jdk-8u181-linux-x64.tar.gz
[root@MySQL-1 ~]# tar zxf jdk-8u181-linux-x64.tar.gz 
[root@DataX ~]# ls
anaconda-ks.cfg  jdk1.8.0_181  jdk-8u181-linux-x64.tar.gz
[root@MySQL-1 ~]# mv jdk1.8.0_181 /usr/local/java
[root@MySQL-1 ~]# cat <<END >> /etc/profile
export JAVA_HOME=/usr/local/java
export PATH=$PATH:"$JAVA_HOME/bin"
END
[root@MySQL-1 ~]# source /etc/profile
[root@MySQL-1 ~]# java -version

因为 CentOS 7 上自带 Python 2.7 的软件包,所以不需要进行安装。

1.Linux 上安装 DataX 软件

[root@MySQL-1 ~]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
[root@MySQL-1 ~]# tar zxf datax.tar.gz -C /usr/local/
[root@MySQL-1 ~]# rm -rf /usr/local/datax/plugin/*/._*    # 需要删除隐藏文件 (重要)

当未删除时,可能会输出:

[/usr/local/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件.

验证:

[root@MySQL-1 ~]# cd /usr/local/datax/bin
[root@MySQL-1 ~]# python datax.py ../job/job.json    # 用来验证是否安装成功

输出

2021-12-13 19:26:28.828 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-13 19:26:28.829 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.060s |  All Task WaitReaderTime 0.068s | Percentage 100.00%
2021-12-13 19:26:28.829 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-13 19:26:18
任务结束时刻                    : 2021-12-13 19:26:28
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

2.DataX 基本使用

查看 streamreader --> streamwriter 的模板:

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r streamreader -w streamwriter

输出:

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据模板编写 json 文件

[root@MySQL-1 ~]# cat <<END > test.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [     # 同步的列名 (* 表示所有)
       {
           "type":"string",
    "value":"Hello."
       },
       {
           "type":"string",
    "value":"河北彭于晏"
       },
   ], 
                        "sliceRecordCount": "3"    # 打印数量
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "utf-8",    # 编码
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"      # 并发 (即 sliceRecordCount * channel = 结果)
            }
        }
    }
}

输出:(要是复制我上面的话,需要把 # 带的内容去掉)

3.安装 MySQL 数据库

分别在两台主机上安装:

[root@MySQL-1 ~]# yum -y install mariadb mariadb-server mariadb-libs mariadb-devel   
[root@MySQL-1 ~]# systemctl start mariadb                  # 安装 MariaDB 数据库
[root@MySQL-1 ~]# mysql_secure_install                                  # 初始化 
NOTE: RUNNING ALL PARTS OF THIS SCRIPT IS RECOMMENDED FOR ALL MariaDB
      SERVERS IN PRODUCTION USE!  PLEASE READ EACH STEP CAREFULLY!

Enter current password for root (enter for none):               # 直接回车
OK, successfully used password, moving on...
Set root password? [Y/n] y                                  # 配置 root 密码
New password: 123123
Re-enter new password: 123123
Password updated successfully!
Reloading privilege tables..
 ... Success!
Remove anonymous users? [Y/n] y                          # 移除匿名用户
 ... skipping.
Disallow root login remotely? [Y/n] n                     # 允许 root 远程登录
 ... skipping.
Remove test database and access to it? [Y/n] y                # 移除测试数据库
 ... skipping.
Reload privilege tables now? [Y/n] y                           # 重新加载表
 ... Success!
1)准备同步数据(要同步的两台主机都要有这个表)
MariaDB [(none)]> create database `course-study`;
Query OK, 1 row affected (0.00 sec)

MariaDB [(none)]> create table `course-study`.t_member(ID int,Name varchar(20),Email varchar(30));
Query OK, 0 rows affected (0.00 sec)

因为是使用 DataX 程序进行同步的,所以需要在双方的数据库上开放权限:

grant all privileges on *.* to root@'%' identified by '123123';
flush privileges;
2)创建存储过程:
DELIMITER $$
CREATE PROCEDURE test()
BEGIN
declare A int default 1;
while (A < 3000000)do
insert into `course-study`.t_member values(A,concat("LiSa",A),concat("LiSa",A,"@163.com"));
set A = A + 1;
END while;
END $$
DELIMITER ;
3)调用存储过程(在数据源配置,验证同步使用):
call test();

4.通过 DataX 实 MySQL 数据同步

1)生成 MySQL 到 MySQL 同步的模板:
[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",    # 读取端
                    "parameter": {
                        "column": [],      # 需要同步的列 (* 表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [],     # 连接信息
                                "table": []    # 连接表
                            }
                        ], 
                        "password": "",     # 连接用户
                        "username": "",     # 连接密码
                        "where": ""     # 描述筛选条件
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter",    # 写入端
                    "parameter": {
                        "column": [],             # 需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "",            # 连接信息
                                "table": []    # 连接表
                            }
                        ], 
                        "password": "",     # 连接密码
                        "preSql": [],      # 同步前. 要做的事
                        "session": [], 
                        "username": "",     # 连接用户 
                        "writeMode": ""     # 操作类型
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""      # 指定并发数
            }
        }
    }
}

2)编写 json 文件:

[root@MySQL-1 ~]# vim install.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}
3)验证
[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py install.json

输出:

2021-12-15 16:45:15.120 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-15 16:45:15.120 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2999999 records, 107666651 bytes | Speed 2.57MB/s, 74999 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 82.173s |  All Task WaitReaderTime 75.722s | Percentage 100.00%
2021-12-15 16:45:15.124 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-15 16:44:32
任务结束时刻                    : 2021-12-15 16:45:15
任务总计耗时                    :                 42s
任务平均流量                    :            2.57MB/s
记录写入速度                    :          74999rec/s
读出记录总数                    :             2999999
读写失败总数                    :                   0

你们可以在目的数据库进行查看,是否同步完成。

上面的方式相当于是完全同步,但是当数据量较大时,同步的时候被中断,是件很痛苦的事情;

所以在有些情况下,增量同步还是蛮重要的

5.使用 DataX 进行增量同步

使用 DataX 进行全量同步和增量同步的唯一区别就是:增量同步需要使用 where 进行条件筛选。

即,同步筛选后的 SQL。

1)编写 json 文件:
[root@MySQL-1 ~]# vim where.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "where": "ID <= 1888",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

需要注意的部分就是:where(条件筛选) 和 preSql(同步前,要做的事) 参数。

2)验证:
[root@MySQL-1 ~]# python /usr/local/data/bin/data.py where.json

输出:

2021-12-16 17:34:38.534 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-16 17:34:38.534 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.002s |  All Task WaitReaderTime 100.570s | Percentage 100.00%
2021-12-16 17:34:38.537 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-16 17:34:06
任务结束时刻                    : 2021-12-16 17:34:38
任务总计耗时                    :                 32s
任务平均流量                    :            1.61KB/s
记录写入速度                    :             62rec/s
读出记录总数                    :                1888
读写失败总数                    :                   0

目标数据库上查看:

3)基于上面数据,再次进行增量同步:

主要是 where 配置:"where": "ID > 1888 AND ID <= 2888"(通过条件筛选来进行增量同步)

同时需要将我上面的 preSql 删除 (因为我上面做的操作是 truncate 表)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/188081.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL的回表

核心问题 什么是回表&#xff1f; 答&#xff1a; 回表是一个过程&#xff0c;是获取到主键后再通过主键去查询数据的一个过程就叫回表。 那这个主键从哪来&#xff1f; 从叶子结点存储的内容来&#xff0c;如果存储的是非聚簇索引则通过叶子节点存储的值获取&#xff0c;该值…

机器学习笔记之生成对抗网络(一)逻辑介绍

机器学习笔记之生成对抗网络——逻辑介绍引言生成对抗网络——示例生成对抗网络——数学语言描述生成对抗网络——判别过程描述引言 本节将介绍生成对抗网络的基本逻辑与数学语言描述。 生成对抗网络——示例 生成对抗网络(Generative Adversarial Networks,GAN)&#xff0c…

视觉 → 检测提取

目标检测任务非常有趣且具有挑战性。有些任务非常复杂&#xff0c;需要更多数据才能有所产出。但在这篇文章中&#xff0c;我将展示一个符号检测的小任务&#xff0c;它可以用更少的数据完成。该项目的目的是使用计算机视觉技术从一组给定的图像中提取文本并检测各种符号。在这…

UniApp已经接了手机数据线,但运行工具警告 “没有检查到设备“ (华为手机为例 进行解决)

大部分第一次使用uni进行手机调试都会遇到这个问题 首先 将手机的数据线插入电脑的usb接口是必备前提 然后 就是手机的权限拦截了设备扫描 这就是uni工具找不到设备的原因 接入手机线后 数据会弹出一个USB的提示 点进去之后 我们要设置 允许传输文件 千万别仅充电 接下来的…

Java 以数据流的形式发送数据request Java 数据封装到request中

Java 以数据流的形式发送数据request Java 数据封装到request中 一、描述 1、在做微信支付结果通知的时候&#xff0c;看到一个描述&#xff1a;微信会把相关支付结果及用户信息通过数据流的形式发送给商户 &#xff0c;那么java如何通过数据流的形式发送数据呢&#xff1f; 二…

idea中的Debug工具的使用介绍

文章目录1、设置断点给断点添加条件2、打开DebugDebu启动方式3、Debug功能介绍左侧功能区顶部功能区使用Debug工具时要先进行打断点的操作1、设置断点 断点就是程序运行暂停的位置&#xff0c;在这个位置以后可以根据自己的操作一步一步的执行程序。 idea中设置断点&#xff1…

FreeMarker基础知识

1、总览 官网&#xff1a;http://freemarker.foofun.cn/ 视频地址&#xff1a;https://www.bilibili.com/video/BV1zZ4y1u7iA 2、FreeMarker概述 2.1 FreeMarker概念 FreeMarker 是⼀款 模板引擎&#xff1a; 即⼀种基于模板和要改变的数据&#xff0c; 并⽤来⽣成输出⽂本(…

动态化护眼全新体验,被誉为“护眼神器”的南卡护眼台灯Pro评测出炉

自从家中的孩子上小学后&#xff0c;随着课后作业的逐渐增加&#xff0c;在书房学习时间更长了&#xff0c;由于平时关注到孩子用眼习惯&#xff0c;眼睛有些轻度近视。作为年轻一代的家长&#xff0c;对孩子的用眼健康方面一定要重视&#xff0c;在照明方面&#xff0c;护眼台…

Redis基础篇:Redis简介和安装

第一章&#xff1a;Redis简介 一&#xff1a;简介 Redis诞生于2009年&#xff0c;基于内存的键值型NoSQL数据库。 二&#xff1a;特征 1&#xff1a;键值型&#xff1a;value支持多种不同的数据结构&#xff0c;功能丰富。 2&#xff1a;单线程&#xff1a;单线程执行命令&…

Kubernetes介绍

1 什么是Kubernetes&#xff1f; Kubernetes是容器集群管理系统&#xff0c;是一个开源的平台&#xff0c;可以实现容器集群的自动化部署、自动扩缩容、维护等功能。 使用Kubernetes可以&#xff1a; ● 自动化容器的部署和复制 ● 随时扩展或收缩容器规模 ● 将容器组织成组&…

第四章.神经网络—单层感知器

第四章.神经网络 4.1 单层感知器 1.单层感知器示意图 1).第一种表示方法&#xff1a; 举例说明&#xff1a; 2).第二种表示方法&#xff1a; 公式推导&#xff1a; 举例说明&#xff1a; 预测值(y)和标签值(t)相同&#xff0c;停止迭代循环. 2.学习率η 1).η取值说明&…

Python流程控制语句之跳转语句

上一篇&#xff1a;Python流程控制语句之循环语句 文章目录前言一、break 语句二、continue 语句三、pass 空语句总结前言 上一篇博客我们讲解了Python中的循环语句&#xff0c;知道循环条件一直满足时&#xff0c;代码将会一直执行下去&#xff0c;就像一辆迷路的车&#xff…

《满江红》《流浪地球2》孰能胜出,元宇宙电影能否成为票房黑马?

截止1月28日12时&#xff0c;2023年春节档期总票房达67.57亿元。其中&#xff0c;《满江红》以26.05亿元票房居2023年春节档票房榜榜首&#xff1b;《流浪地球2》位居第二&#xff0c;票房成绩为21.63亿元。摆在未来人类面前就两条路,一条向外星辰大海,一条向内元宇宙。《流浪地…

微信小程序017音乐播放器系统 php java

小程序前端框架&#xff1a;uniapp 小程序运行软件&#xff1a;微信开发者 后端技术:javaSsm(SpringSpringMVCMyBatis)vue.js 后端开发环境:idea/eclipse 数据库:mysql 基于音乐播放器小程序的设计基于现有的手机&#xff0c;可以实现首页、个人中心、用户管理&#xff0c;音乐…

拉伯证券|开盘暴跌20%,三文鱼第一股业绩变脸!

超900家公司成绩预亏&#xff0c;多家公司发布成绩预告后大跌。 佳沃食品今天开盘20%跌停&#xff0c;这是该股史上开盘最大跌幅。早盘该股成交额显着扩展&#xff0c;半日成交额超越3.5亿元&#xff0c;收盘跌18.04%。 资料显现&#xff0c;佳沃食品是优质蛋白食品领域的大消…

python入门教程(非常详细),python贪吃蛇最简单代码

大家好&#xff0c;小编来为大家解答以下问题&#xff0c;python编程代码大全设计入门&#xff0c;python入门教程(非常详细)&#xff0c;现在让我们一起来看看吧&#xff01; 1、python编程例子有哪些&#xff1f; python编程经典例子&#xff1a; 1、画爱心表白、图形都是由…

除了Navicat破解版、DBeaver,免费还好用的数据库管理工具/SQL工具还有推荐吗?

很多国内SQL学习者和开发者对Navicat、DBeaver等国外数据库管理工具已经很熟悉了。但是&#xff0c;有没有比他们更适合SQL开发者的数据库管理/SQL工具呢&#xff1f;这里&#xff0c;笔者结合自己的调研来聊一下。 笔者做过一些用户调研。 Navicat虽然功能强大&#xff0c;但…

win10安装opencv

第一步&#xff1a;会有skbuild&#xff0c;cmake等依赖库报错&#xff0c;先安装依赖pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple scikit-buildpip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple cmake第二步&#xff1a;pip3 install opencv-python若…

Python数据可视化之折线图

Python数据可视化之折线图 提示&#xff1a;前言 Python数据可视化之折线图 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录Python数据可视化之折线图前言一、导入包二、选择数据集三、折线图四、图形的大小和图表…

什么游戏视频录制软件比较好?10 款的游戏录屏软件你值得收藏

市面上有各种各样的游戏捕捉软件&#xff0c;当然&#xff0c;它们都声称是有史以来最好的游戏软件。但有些比其他的更好&#xff0c;最适合您的游戏记录器在很大程度上取决于您要玩的游戏以及您运行的 PC 类型。 目前最好的游戏屏幕录像机 让我们来探索自称是最佳游戏屏幕录…