【大数据进阶第三阶段之Datax学习笔记】使用阿里云开源离线同步工具DataX 实现数据同步

news2025/1/10 2:02:53

【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax概述 

【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax快速入门 

 【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax类图

【大数据进阶第三阶段之Datax学习笔记】使用阿里云开源离线同步工具Datax实现数据同步 

1、准备工作:

  • JDK(1.8 以上,推荐 1.8)
  • Python(23 版本都可以)
  • Apache Maven 3.x(Compile DataX)(手动打包使用,使用 tar 包方式不需要安装)
主机名操作系统IP 地址软件包
MySQL-1CentOS 7.4192.168.1.1jdk-8u181-linux-x64.tar.gz datax.tar.gz
MySQL-2CentOS 7.4192.168.1.2

2、安装 JDK:

下载地址:Java Archive Downloads - Java SE 8(需要创建 Oracle 账号)

[root@MySQL-1 ~]# ls
anaconda-ks.cfg  jdk-8u181-linux-x64.tar.gz
[root@MySQL-1 ~]# tar zxf jdk-8u181-linux-x64.tar.gz 
[root@DataX ~]# ls
anaconda-ks.cfg  jdk1.8.0_181  jdk-8u181-linux-x64.tar.gz
[root@MySQL-1 ~]# mv jdk1.8.0_181 /usr/local/java
[root@MySQL-1 ~]# cat <<END >> /etc/profile
export JAVA_HOME=/usr/local/java
export PATH=$PATH:"$JAVA_HOME/bin"
END
[root@MySQL-1 ~]# source /etc/profile
[root@MySQL-1 ~]# java -version
  • 因为 CentOS 7 上自带 Python 2.7 的软件包,所以不需要进行安装。

3、Linux 上安装 DataX 软件 

[root@MySQL-1 ~]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
[root@MySQL-1 ~]# tar zxf datax.tar.gz -C /usr/local/
[root@MySQL-1 ~]# rm -rf /usr/local/datax/plugin/*/._*  
  • 当未删除时,可能会输出:[/usr/local/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件.

验证

[root@MySQL-1 ~]# cd /usr/local/datax/bin
[root@MySQL-1 ~]# python datax.py ../job/job.json

输出

2021-12-13 19:26:28.828 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-13 19:26:28.829 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.060s |  All Task WaitReaderTime 0.068s | Percentage 100.00%
2021-12-13 19:26:28.829 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-13 19:26:18
任务结束时刻                    : 2021-12-13 19:26:28
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

4、DataX 基本使用

查看 streamreader \--> streamwriter 的模板:

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r streamreader -w streamwriter

输出

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据模板编写 json 文件

[root@MySQL-1 ~]# cat <<END > test.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [        # 同步的列名 (* 表示所有)
       {
           "type":"string",
            "value":"Hello."
       },
       {
           "type":"string",
            "value":"河北彭于晏"
       },
   ], 
                        "sliceRecordCount": "3"     # 打印数量
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "utf-8",     # 编码
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"         # 并发 (即 sliceRecordCount * channel = 结果)
            }
        }
    }
}

输出:(要是复制我上面的话,需要把 # 带的内容去掉)

image.png

5、安装 MySQL 数据库

分别在两台主机上安装:

[root@MySQL-1 ~]# yum -y install mariadb mariadb-server mariadb-libs mariadb-devel   
[root@MySQL-1 ~]# systemctl start mariadb												# 安装 MariaDB 数据库
[root@MySQL-1 ~]# mysql_secure_installation												# 初始化	
NOTE: RUNNING ALL PARTS OF THIS SCRIPT IS RECOMMENDED FOR ALL MariaDBSERVERS IN PRODUCTION USE!  PLEASE READ EACH STEP CAREFULLY!Enter current password for root (enter for none):	     	# 直接回车
OK, successfully used password, moving on...
Set root password? [Y/n] y                       	 	 	# 配置 root 密码
New password: 
Re-enter new password: 
Password updated successfully!
Reloading privilege tables..... Success!
Remove anonymous users? [Y/n] y                			 	# 移除匿名用户... skipping.
Disallow root login remotely? [Y/n] n            		 	# 允许 root 远程登录... skipping.
Remove test database and access to it? [Y/n] y 		     	# 移除测试数据库... skipping.
Reload privilege tables now? [Y/n] y             	     	# 重新加载表... Success!

1)准备同步数据(要同步的两台主机都要有这个表)

MariaDB [(none)]> create database `course-study`;
Query OK, 1 row affected (0.00 sec)MariaDB [(none)]> create table `course-study`.t_member(ID int,Name varchar(20),Email varchar(30));
Query OK, 0 rows affected (0.00 sec)

在这里插入图片描述
因为是使用 DataX 程序进行同步的,所以需要在双方的数据库上开放权限:

grant all privileges on *.* to root@'%' identified by '123123';
flush privileges;

2)创建存储过程:

DELIMITER $$
CREATE PROCEDURE test()
BEGIN
declare A int default 1;
while (A < 3000000)do
insert into `course-study`.t_member values(A,concat("LiSa",A),concat("LiSa",A,"@163.com"));
set A = A + 1;
END while;
END $$
DELIMITER ;

在这里插入图片描述
3)调用存储过程(在数据源配置,验证同步使用):

call test();

6、通过 DataX 实 MySQL 数据同步

1)生成 MySQL 到 MySQL 同步的模板:

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter
{"job": {"content": [{"reader": {"name": "mysqlreader",							# 读取端"parameter": {"column": [], 								# 需要同步的列 (* 表示所有的列)"connection": [{"jdbcUrl": [], 						# 连接信息"table": []							# 连接表}], "password": "", 							# 连接用户"username": "", 							# 连接密码"where": ""									# 描述筛选条件}}, "writer": {"name": "mysqlwriter",							# 写入端"parameter": {"column": [], 								# 需要同步的列"connection": [{"jdbcUrl": "", 						# 连接信息"table": []							# 连接表}], "password": "", 							# 连接密码"preSql": [], 								# 同步前. 要做的事"session": [], "username": "",								# 连接用户 "writeMode": ""								# 操作类型}}}], "setting": {"speed": {"channel": ""										# 指定并发数}}}
}

2)编写 json 文件:

[root@MySQL-1 ~]# vim install.json
{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"username": "root","password": "123123","column": ["*"],"splitPk": "ID","connection": [{"jdbcUrl": ["jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"], "table": ["t_member"]}]}}, "writer": {"name": "mysqlwriter", "parameter": {"column": ["*"], "connection": [{"jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8","table": ["t_member"]}], "password": "123123","preSql": ["truncate t_member"], "session": ["set session sql_mode='ANSI'"], "username": "root", "writeMode": "insert"}}}], "setting": {"speed": {"channel": "5"}}}
}

3)验证

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py install.json

输出:

2021-12-15 16:45:15.120 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-15 16:45:15.120 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2999999 records, 107666651 bytes | Speed 2.57MB/s, 74999 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 82.173s |  All Task WaitReaderTime 75.722s | Percentage 100.00%
2021-12-15 16:45:15.124 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-15 16:44:32
任务结束时刻                    : 2021-12-15 16:45:15
任务总计耗时                    :                 42s
任务平均流量                    :            2.57MB/s
记录写入速度                    :          74999rec/s
读出记录总数                    :             2999999
读写失败总数                    :                   0

你们可以在目的数据库进行查看,是否同步完成。
在这里插入图片描述

  • 上面的方式相当于是完全同步,但是当数据量较大时,同步的时候被中断,是件很痛苦的事情;
  • 所以在有些情况下,增量同步还是蛮重要的。

7、使用 DataX 进行增量同步

使用 DataX 进行全量同步和增量同步的唯一区别就是:增量同步需要使用 where 进行条件筛选。(即,同步筛选后的 SQL)


1)编写 json 文件:

[root@MySQL-1 ~]# vim where.json
{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"username": "root","password": "123123","column": ["*"],"splitPk": "ID","where": "ID <= 1888","connection": [{"jdbcUrl": ["jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"], "table": ["t_member"]}]}}, "writer": {"name": "mysqlwriter", "parameter": {"column": ["*"], "connection": [{"jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8","table": ["t_member"]}], "password": "123123","preSql": ["truncate t_member"], "session": ["set session sql_mode='ANSI'"], "username": "root", "writeMode": "insert"}}}], "setting": {"speed": {"channel": "5"}}}
}
  • 需要注意的部分就是:where(条件筛选) 和 preSql(同步前,要做的事) 参数。

2)验证:

[root@MySQL-1 ~]# python /usr/local/data/bin/data.py where.json

输出:

2021-12-16 17:34:38.534 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-16 17:34:38.534 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.002s |  All Task WaitReaderTime 100.570s | Percentage 100.00%
2021-12-16 17:34:38.537 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-16 17:34:06
任务结束时刻                    : 2021-12-16 17:34:38
任务总计耗时                    :                 32s
任务平均流量                    :            1.61KB/s
记录写入速度                    :             62rec/s
读出记录总数                    :                1888
读写失败总数                    :                   0

目标数据库上查看:
在这里插入图片描述
3)基于上面数据,再次进行增量同步:

主要是 where 配置:"where": "ID > 1888 AND ID <= 2888"						# 通过条件筛选来进行增量同步
同时需要将我上面的 preSql 删除(因为我上面做的操作时 truncate 表)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1373715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新生儿成长的阳光之钙:补充注意事项指南

引言&#xff1a; 钙是新生儿骨骼发育不可或缺的重要元素&#xff0c;对于宝宝的生长发育起着至关重要的作用。本文将深入探讨钙的功能、补充时机&#xff0c;以及在给新生儿补充钙时应该注意的事项&#xff0c;为小天使们提供最贴心的呵护。 第一部分&#xff1a;钙的重要性与…

基于ssm社区老年人关怀服务系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本社区老年人关怀服务系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数…

Linux系统——测试端口连通性方法

目录 一、TCP端口连通性测试 1、ssh 2、telnet&#xff08;可能需要安装&#xff09; 3、curl 4、tcping&#xff08;需要安装&#xff09; 5、nc&#xff08;需要安装&#xff09; 6、nmap&#xff08;需要安装&#xff09; 二、UDP端口连通性测试 1、nc&#xff08;…

11Spring IoC注解式开发(上)(元注解/声明Bean的注解/注解的使用/负责实例化Bean的注解)

注解的存在主要是为了简化XML的配置。Spring6倡导全注解开发。 注解开发的优点:提高开发效率 注解开发的缺点:在一定程度上违背了OCP原则&#xff0c;使用注解的开发的前提是需求比较固定&#xff0c;变动较小。 1 注解的注解称为元注解 自定义一个注解: package com.sunspl…

保留几位小数的函数、全排列函数、​反斜杠的作用、二进制、八进制、十六进制的输入​、求三角形面积的三种方法、求平方根、N次方如何表示

保留几位小数的函数 方法一&#xff1a; 头文件 #include<iomanip> 格式 cout<<fixed<<setprecision(int n)<<a; 作用&#xff1a;把a保留三位小数 方法二&#xff1a; 还有一种方法&#xff0c;就是用C从C语言保留的printf()方法。 保留二位小数&a…

QML实现的图片浏览器

很久之前实现了一个QWidget版本的图片浏览器:基于Qt5的图片浏览器QHImageViewer 今天用QML也实现一个,功能差不多: ●悬浮工具栏 ●支持图片缩放、旋转、还原、旋转、拖动。 ●拖动图片时,释放鼠标图片会惯性滑动。 ●支持左右翻页查看文件夹中的图片。 ●支持保存图片至本…

低代码的应用场景

Gartner 在 2019 年的低代码调研报告中&#xff0c;曾经绘制过一张用来阐述低代码适用场景的“应用金字塔”&#xff0c;如下图所示&#xff1a; 应用级别划分&#xff1a;从下往上&#xff0c;分别为工作组级(Workgroup Class)、部门级(Departmental Class)、企业级(Enterpris…

【HashMap】结构和底层原理

文章目录 HashMap结构和底层原理 HashMap 结构和底层原理 ​ HashMap 是我们非常常用到数据结构&#xff0c;由数组和链表构成的数据结构&#xff0c;数组里面每个地方都存了 key-value 这样的实例&#xff0c;在Java7叫 Entry 在 Java8 中叫 Node ​ 因为他本身所有的位置都…

模型的权值平均的原理和Pytorch的实现

一、前言 模型权值平均是一种用于改善深度神经网络泛化性能的技术。通过对训练过程中不同时间步的模型权值进行平均&#xff0c;可以得到更宽的极值点&#xff08;optima&#xff09;并提高模型的泛化能力。 在PyTorch中&#xff0c;官方提供了实现模型权值平均的方法。 这里…

李沐-《动手学深度学习》--02-目标检测

一 、目标检测算法 1. R-CNN a . 算法步骤 使用启发式搜索算法来选择锚框&#xff08;选出多个锚框大小可能不一&#xff0c;需要使用Rol pooling&#xff09;使用预训练好的模型&#xff08;去掉分类层&#xff09;对每个锚框进行特征抽取&#xff08;如VGG,AlexNet…)训练…

MYSQL篇--事务机制高频面试题

事务 1 什么是数据库事务&#xff1f; 事务是一个不可分割的数据库操作序列&#xff0c;也是数据库并发控制的基本单位&#xff0c;其执行的结果必须使数据库从一种一致性状态变到另一种一致性状态。事务是逻辑上的一组操作&#xff0c;要么都执行&#xff0c;要么都不执行。…

【sqlite3】sqlite3在linux下使用sqlitebrowser工具实现数据可视化

sqlite3在linux下使用sqlitebrowser工具实现数据可视化 1. ### install sqlitebrowser 1. ### install sqlitebrowser 安装指令 sudo apt-get install sqlitebrowser通过工具打开数据库 sqlitebrowser stereo.db打开效果

HTTPS详解及openssl简单使用

OpenSSL 中文手册 | OpenSSL 中文网 本文介绍https传输协议中涉及的概念&#xff0c;流程&#xff0c;算法&#xff0c;如何实现等相关内容。 HTTP传输过程 HTTP 之所以被 HTTPS 取代&#xff0c;最大的原因就是不安全&#xff0c;至于为什么不安全&#xff0c;看了下面这张图…

Linux第25步_在虚拟机中备份“ST官方的TF-A源码”

TF-A是ARM公司提供的&#xff0c;ST公司通过修改它&#xff0c;做了一个自己的TF-A代码。因为在后期开发中&#xff0c;若硬件被改变了&#xff0c;我们需要通过修改"ST官方的TF-A源码"就可以自己的TF-A代码了。为了防止源文件被误改了&#xff0c;我们需要将"S…

亲测,Chatgpt4.0充值(虚拟卡充值)

一、准备工作&#xff1a; 1、一个ChatGPT3.5账号 2、一张支持ChatGPT4.0的虚拟卡 二、流程【网页版充值】 充值前请先确认以下三点&#xff1a; 1&#xff0c;ChatGPT账户正常登陆。 2&#xff0c;充值过程中始终保持美区环境&#xff0c;且开启全局模式。 3&#xff0…

简洁计算器Python代码

简洁的Python计算器&#xff0c;直接上代码&#xff08;用时10分钟&#xff09;&#xff1a; Python Gui图形化开发探索GUI开发的无限可能&#xff0c;使用强大的PyQt5、默认的Tkinter和跨平台的Kivy等工具&#xff0c;让Python成为你构建应用程序的得力助手。从本机用户界面到…

在 WinForms 应用程序中实现 FTP 文件操作及模式介绍

在 WinForms 应用程序中实现 FTP 文件操作及模式介绍 简介 在许多应用程序中&#xff0c;能够从远程服务器获取文件是一个非常有用的功能。本文将详细介绍如何在 Windows Forms (WinForms) 应用程序中使用 FTP 协议进行文件操作&#xff0c;包括连接到 FTP 服务器、列出目录、…

邂逅Node.JS的那一夜

邂逅Node.JS的那一夜&#x1f303; 本篇文章&#xff0c;学习记录于&#xff1a;尚硅谷&#x1f3a2; 本篇文章&#xff0c;并不完全适合小白&#xff0c;需要有一定的HTML、CSS、JS、HTTP、Web等知识及基础学习&#xff1a; &#x1f197;&#xff0c;紧接上文&#xff0c;…

通过反射修改MultipartFile类文件名

1、背景 项目上有这样一个需求&#xff0c;前端传文件过来&#xff0c;后端接收后按照特定格式对文件进行重命名。(修改文件名需求其实也可以在前端处理的) //接口类似于下面这个样子 PosMapping("/uploadFile") public R uploadFile(List<MultipartFile> fil…

Spring Boot注解大全:从入门到精通,轻松掌握Spring Boot核心注解!

目录 1、前言 2、介绍 2.1 Spring Boot简介 2.2 为什么要学习Spring Boot注解 3、Spring Boot基本注解 3.1 SpringBootApplication 3.2 EnableAutoConfiguration 3.3 ComponentScan 4、控制器注解 4.1 RestController 4.2 RequestMapping 4.3 PathVariable 4.4 Re…