DataX-阿里开源离线同步工具在Windows上实现Sqlserver到Mysql全量同步和增量同步

news2025/1/9 23:29:05

场景

Kettle-开源的ETL工具集-实现SqlServer到Mysql表的数据同步并部署在Windows服务器上:

Kettle-开源的ETL工具集-实现SqlServer到Mysql表的数据同步并部署在Windows服务器上_etl实现sqlserver报表服务器_霸道流氓气质的博客-CSDN博客

上面讲过Kettle的使用,下面记录下阿里开源异构数据源同步工具DataX

DataX

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、

HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。

 

设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,

DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,

只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

当前使用现状

DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。

目前每天完成同步8w多道作业,每日传输数据量超过300TB

DataX所支持的数据源

GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。

下面记录一个具体的示例-从Sqlserver同步数据到Mysql中,且表结构一样。

注:

博客:
霸道流氓气质的博客_CSDN博客-C#,架构之路,SpringBoot领域博主

实现

1、DataX在Windows上的安装

参考官网快速开始文档:

DataX/userGuid.md at master · alibaba/DataX · GitHub

安装并配置好所需要的环境依赖。

 

这里不需要自己编译,所以只配置了jdk1.8以及Python3的环境变量。

按照文档下载地址,下载DataX工具包

https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz

下载之后解压即可。

2、启动并测试stream2stream数据转换

解压之后来到bin目录下,新建创建作业的配置文件stream2stream.json文件

修改json内容为

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}

这是官方提供的模板的示例json文件,用来自检是否成功配置和启动DataX。

然后在bin目录下打开cmd执行

python datax.py  ./stream2stream.json

等待执行完成没有提示报错,但是发现中文乱码

 

DataX命令框中文乱码需要设置编码格式,先在cmd中输入

chcp 65001

然后再执行上面命令

执行过程中的中文输出也不再乱码

 

执行结果也不再乱码

 

3、获取不同数据源转换的json模板。

上面是从stream到stream的数据源转换,如果是其它数据源的json模板如何获取。

DataX提供了获取不同数据源转换的json模板获取的指令

可以通过命令查看配置模板:

python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

如何获取数据源的名称,比如这里从sqlserver读取,写入到mysql,那么获取json模板的命令:

python datax.py -r sqlserverreader -w mysqlwriter

此时会返回一个sqlserver到mysql的json模板。

这是因为在其源码中目录就是这样叫的。

 

获取说可以直接点击进去里面的doc目录,查看示例的json文件内容

 

并且每个配置项的参数也有对应的说明

 

sqlserverreader参数说明

DataX/sqlserverreader.md at master · alibaba/DataX · GitHub

mysqlwriter参数说明

DataX/mysqlwriter.md at master · alibaba/DataX · GitHub

所以这里新建全量更新的json文件sqlserver2mysqlALL.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
								"jdbc:sqlserver://localhost:1433;DatabaseName=数据库名"
								],
                                "table": [
								"表名"
								]
                            }
                        ],
                        "password": "改成自己的密码",
                        "username": "用户名",
						"column": [
						"checkid",
						"cardID",
						"hphm",
						"startTime",
						"endTime",
						"linenumber",
						"cwgt",
						"cwgtUL",
						"cwgtJudge",
						"cwkc",
						"cwkcResult",
						"cwkcUL",
						"cwkcJudge",
						"cwkk",
						"cwkkResult",
						"cwkkUL",
						"cwkkJudge",
						"cwkg",
						"cwkgResult",
						"cwkgUL",
						"cwkgJudge",
						"wkccJudge",
						]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
					    "checkid",
						"cardID",
						"hphm",
						"startTime",
						"endTime",
						"linenumber",
						"cwgt",
						"cwgtUL",
						"cwgtJudge",
						"cwkc",
						"cwkcResult",
						"cwkcUL",
						"cwkcJudge",
						"cwkk",
						"cwkkResult",
						"cwkkUL",
						"cwkkJudge",
						"cwkg",
						"cwkgResult",
						"cwkgUL",
						"cwkgJudge",
						"wkccJudge",
						],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/数据库名?useUnicode=true&characterEncoding=gbk",
                                "table": [
								"表名"
								]
                            }
                        ],
                        "password": "密码",
                        "preSql": [
						"delete from vehicleresult"
						],
                        "session": [],
                        "username": "用户名",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

注意这里的流程就是从sqlserver中读取指定列的数据,这里的column就是配置的那些列。

然后写入到mysql时需要预先执行一下删除语句,在preSql中配置的

delete from vehicleresult

vehicleresult是表名。然后写入模式是直接插入

然后执行以上json模板的命令

python datax.py  ./sqlserver2mysqlALL.json

即可实现全量更新。

注意事项,两边的数据结构包括类型、长度、是否非空等要保持一致。

比如sqlserver中某个字段不为空,存在空数据,但是mysql中对应字段设置为不为空,在同步时就会认定为脏数据进而同步失败。

上面全量更新结果

 

4、以上命令每执行一次,则进行一次全量更新,所以需要一个定时bat脚本来定时执行命令。

新建bat文件并修改内容为

#设置编码
chcp 65001
@echo off
title "同步数据"
set INTERVAL=15
timeout %INTERVAL%
 
:Again

python datax.py  ./sqlserver2mysqlALL.json

echo %date% %time:~0,8%
 
timeout %INTERVAL%
 
goto Again

以上内容代表每15秒执行一次

python datax.py  ./sqlserver2mysqlALL.json

将此bat放在bin下与json文件同级目录下,双击执行即可。

5、以上是全量更新,如何实现增量更新。

注意这里增量更新有条件限制,首先这里的数据没有删除只会新增和更新,而且更新只会更新当天的数据。

所以这里首先执行以上上面的全量更新,确保第一次对接将数据获取,然后后面用定时任务执行增量更新,只需要

查询和替换当前的数据即可。

另外得保证有日期时间字段,那么在读取数据和写入数据时就可以用where条件限制查询当前的数据。

另外这里的主键并不是自增的int型数据,不然也可以根据自增主键id进行增量更新。

这里的sqlserver是由三方系统提供且无法更改为需要的类型

 

修改上面的sqlserverreader添加where条件,查询当天的数据

Sqlserver中查询当天的数据

where datediff(day,startTime,getdate())=0

其中startTime为时间字段。

Mysql中查询当天的数据

WHERE DATE(startTime) = CURDATE()

所以修改上面的json文件为

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
								"jdbc:sqlserver://localhost:1433;DatabaseName=数据库名"
								],
                                "table": [
								"表名"
								]
                            }
                        ],
                        "password": "改成自己的密码",
                        "username": "用户名",
						"where": "datediff(day,startTime,getdate())=0",
						"column": [
						"checkid",
						"cardID",
						"hphm",
						"startTime",
						"endTime",
						"linenumber",
						"cwgt",
						"cwgtUL",
						"cwgtJudge",
						"cwkc",
						"cwkcResult",
						"cwkcUL",
						"cwkcJudge",
						"cwkk",
						"cwkkResult",
						"cwkkUL",
						"cwkkJudge",
						"cwkg",
						"cwkgResult",
						"cwkgUL",
						"cwkgJudge",
						"wkccJudge",
						]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
					    "checkid",
						"cardID",
						"hphm",
						"startTime",
						"endTime",
						"linenumber",
						"cwgt",
						"cwgtUL",
						"cwgtJudge",
						"cwkc",
						"cwkcResult",
						"cwkcUL",
						"cwkcJudge",
						"cwkk",
						"cwkkResult",
						"cwkkUL",
						"cwkkJudge",
						"cwkg",
						"cwkgResult",
						"cwkgUL",
						"cwkgJudge",
						"wkccJudge",
						],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/数据库名?useUnicode=true&characterEncoding=gbk",
                                "table": [
								"表名"
								]
                            }
                        ],
                        "password": "密码",
                        "preSql": [
						"delete from 表名 WHERE DATE(startTime) = CURDATE();"
						],
                        "session": [],
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

此时再用bat脚本定时执行即可,定时时间自行修改上面的15参数。

然后再新增一条今天的数据测试同步效果。

将上面增量更新的json命名为sqlserver2mysqlAdd.json

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/453457.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2.19 信号概述

1.信号的概念 1.1 信号的概念 信号是 Linux 进程间通信的最古老的方式之一,是事件发生时对进程的通知机制,有时也 称之为软件中断,它是在软件层次上对中断机制的一种模拟,是一种异步通信的方式。信号 可以导致一个正在运行的进程…

【Prompt使用场景】

Prompt使用场景 场景1:问答问题(技巧1)场景2:基于示例回答(技巧2)场景3:推理场景4:无中生有——写代码(技巧3)场景5:锦上添花——改写内容(技巧4)场景6:锦上添花——信息解释场景7:化繁为简——信息总结(技巧5)场景8:化繁为简——信息提取(技巧6)场景1:问…

化工厂5G+蓝牙+LoRa室内人员定位系统解决方案

随着化工行业的不断发展,化工厂的生产和管理工作变得越来越复杂和繁琐。人员定位成为一项重要的任务,尤其是在化工厂室内,为了确保员工的安全和提高工作效率,需要实现对人员的快速准确定位。因此,化工厂室内人员定位系…

催收公司承信科技申请纳斯达克IPO上市,募资1500万美元

来源:猛兽财经 作者:猛兽财经 猛兽财经获悉,来自苏州的催收公司,承信信息科技有限公司(下称“承信科技”)近期已向美国证券交易委员会(SEC)提交招股书,申请在纳斯达克I…

vue yarn npm

2016年左右 ,facebook针对npm包管理工具存在的性能问题进行了针对性开发并发布了yarn新的node包开发管理工具,具体对比,同学们自行网上搜索资料对比。 配置 1、先下载好NodeJS,然后输入如下命令安装yarn npm install -g yarn 2、…

【并发基础】一篇文章带你彻底搞懂Java线程中断的底层原理——interrupt()、interrupted()、isInterrupted()

目录 〇、Java线程中断与阻塞的区别 0.1 线程中断 0.2 线程阻塞 一、线程的中断 二、中断方法 2.1 void interrupt() 2.1.1 可中断的阻塞 2.1.2 不可中断的阻塞 2.1.3 实践案例 2.2 boolean isInterrupted() 2.3 boolean interrupted() 2.4 代码案例 三、源码分析…

5.数据权限

根据配置的权限字段,自动拼接sql,例如想要做部门的数据权限,每张表保存dep_id,然后查询的时候拼接 where dep_id?,这样就实现了数据隔离。 1.示例 例如部门列表查询 角色管理授予本部门权限,然后给用户分配这个角色 查询部门分…

一文搞懂java集合框架

一文搞懂java集合框架 目录 一文搞懂java集合框架什么是集合?有什么特点?框架图Collection基本介绍:接口常用方法使用代码示例 List基本介绍常用方法使用代码示例ArrayList注意事项和细节 Vector注意事项和细节 ArrayList和Vector如何创建与使…

Ddocker cgroups资源限制

目录 一、概述 1、简介 2、cgroups四大功能 3、cpu时间片概念 二、查看容器的默认CPU使用限制 1、进行CPU压力测试 三、创建容器时设置CPU使用时间限制 四、设置CPU资源占用比(设置多个容器时才有效 1、分别进入容器进行压测 查看容器运行状态 五、设置容器…

音视频八股文(4)--ffmpeg常见命令(3)

17 FFmpeg滤镜 17.1 filter的分类 按照处理数据的类型,通常多媒体的filter分为: ● 音频filter ● 视频filter ● 字幕filter 另一种按照处于编解码器的位置划分: ● prefilters: used before encoding ● intrafilters: used while encod…

ML之DR:sklearn.manifold(流形学习和降维的算法模块)的简介、部分源码解读、案例应用之详细攻略

ML之DR:sklearn.manifold(流形学习和降维的算法模块)的简介、部分源码解读、案例应用之详细攻略 目录 sklearn.manifold的简介 sklearn.manifold(流形学习和降维的算法模块)的概述 外文翻译 sklearn.manifold的部分源码解读 sklearn.manifold的简介 sklearn.ma…

常用数据加密

一、加密和解密 1、 加密 数据加密的基本过程,就是对原来为明 的文件或数据按某种算法进行处理,使其成为不可读的一段代码,通常称为 “密文”。通过这样的途径,来达到保护数据不被 非法人窃取、阅读的目的。 2、解密 加密的逆…

VUE规范及常见问题

规范: props需要写明数据类型并加上注释 多用computed属性,少用变量赋值和watch 只要一个值能用另一个或几个值计算出来,我们都用computed,这样可以减少代码量并避免因为忘记修改值而导致bug路由(pages里边的组件)用小写和-命名…

人机交互有哪些SCI期刊推荐? - 易智编译EaseEditing

以下是几个人机交互领域的SCI期刊推荐: ACM Transactions on Computer-Human Interaction (ACM TOCHI): 由ACM(Association for Computing Machinery)出版的人机交互领域的顶级期刊之一,发表关于计算机和人之间相互作…

python通过setuptools打包与分发

目录 一:setup.py文件的书写 二:setup.py 各个打包命令的使用:所需要用到的只有4个命令build / install / sdist / bdist 1:build: python setup.py build 2:install: python setup.py install 3: sdist &#xf…

Android Apk加固原理解析

前言 为什么要加固 对APP进行加固,可以有效防止移动应用被破解、盗版、二次打包、注入、反编译等,保障程序的安全性、稳定性。 常见的加固方案有很多,本文主要介绍如果通过对dex文件进行加密来达到apk加固的目的; APK加固整体…

【计算机视觉 | 目标检测】OVD:Open-Vocabulary Object Detection 论文工作总结(共八篇)

文章目录 一、2D open-vocabulary object detection的发展和研究现状二、基于大规模外部图像数据集2.1 OVR-CNN:Open-Vocabulary Object Detection Using Captions,CVPR 20212.2 Open Vocabulary Object Detection with Pseudo Bounding-Box Labels&…

Springboot创建项目bug

问题 今天创建maven项目,由于和教程不太一样,结果报错了 核心报错如下 Cannot instantiate interface org.springframework.context.ApplicationListener : org.springframework.boot.context.logging.LoggingApplicationListener 梳理 我的idea创建…

系统集成项目管理工程师 笔记(第六章:项目整体管理)

文章目录 项目整体管理6个过程制定项目章程过程 6.3 制订项目管理计划 2476.4 指导与管理项目工作 2516.5 监控项目工作 255监控项目工作的输入监控项目工作的工具与技术监控项目工作的输出 6.6 实施整体变更控制6.7结束项目或阶段 6.1 项目整体管理概述 242 6.1.1 项目整体管理…

【过程8】——能量守恒视角总结感受

一、背景 另一个角度的看到,观望着过程中自己曾经类似的经历(小舅子的工作)。 时间久了,经历多了,感悟会更加的充实;最近自己对于人在维持能量的过程中也有很多的感悟,一并做一下总结 二、过程 1.人为什么天性不愿意…