DataX-一款稳定高效的数据同步工具-从安装、启动、配置、使用总结,看这篇让你一步到位

news2025/1/8 5:45:23

 

前言

大数据部门现阶段ETL按同步方式分为两种:

  • 实时同步:DTS、CloudCanal
  • 离线同步:dataworks-DI节点

但CloudCanal在使用中出现了部分问题,归纳总结后主要为以下几点:

  • 部分使用场景获取不到binlog点位
  • 停止任务,修改数据源ip后,重启任务源端ip不变
  • 业务库增加,会导致表数据终止增量同步任务
  • 停止任务后,启动任务或进行位点回溯,任务异常无法恢复

基于以上背景,需要调研新的同步工具用于辅助或取代CloudCanal工具的使用

一 DataX 简介及架构原理

1.1 概述

DataX是阿里巴巴使用 Java 和 Python 开发的一个异构数据源离线同步工具;致力于实现包括关系型数据库MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute、DRDS等各种异结构数据源之间稳定高效的数据同步功能。下图为DataX支持的数据源:

类型

数据源

Reader(读)

Writer(写)

RDBMS 关系型数据库

MySQL

Oracle

OceanBase

SQLServer

PostgreSQL

DRDS

通用RDBMS

阿里云数仓数据存储

ODPS

ADS

OSS

OCS

NoSQL数据存储

OTS

Hbase0.94

Hbase1.1

Phoenix4.x

Phoenix5.x

MongoDB

Hive

Cassandra

无结构化数据存储

TxtFile

FTP

HDFS

Elasticsearch

时间序列数据库

OpenTSDB

TSDB

1.2 设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责链接各种数据源

  1. 当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,就能跟已有的数据源做到无缝数据同步

1.3 框架设计

Framework 主题框架+ plugin 插件

  1. 将软件核心功能写入 Framework 主体框架中
  2. 主体框架为插件预留接口,如果后期需要什么新功能,可以再去开发插件实现,而主体框架无需改动
  • 1根据同步核心功能为主题
  • 不断的去新增某数据源的支持,对不同数据源的读取或写入功能,以插件的形式开发
  • 如果需要新功能只需要开发插件即可,不需要动主体框架

将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中

1.4 架构功能

DataX采用Framework + plugin架构构建,将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中

  • Reader:Reader做为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题
    • 缓冲:Reader 和 Writer 存在读写速度不一致的情况,所以中间需要一个组件作为缓冲,缓冲的功能就位于 Framework 中
    • 流控:DataX 可以随意根据需求调整数据传输速度,流控功能也位于 Framework 中
    • 并发:并发同步或写入数据,也可以控制速度,想要速度快点,设置并发高一点,反之亦然
    • 数据转换:Reader 的数据源与 写 Writer 的数据源 数据结构可能不同,数据转换操作也在 Framework 中完成

1.5 DataX的运行流程

Job

  • 单个数据同步的作业,称为一个Job,一个Job启动一个进程

Task

  • 根据不同数据源切分策略,一个Job会切分多个Task并行执行
  • Task 是DataX作业的最小单元,每个Task负责一个部分数据的同步工作

TaskGroup

  • Scheduler 调度模块会对Task 进行分组,每个Task 组称为一个Task Group
  • 每个Task Group 负责以一定的并发度运行其所分得的Task ,单个TaskGroup的并发为5

Reader -> Channel -> Writer

每个Task启动后,都会固定启动 Reader -> Channel -> Writer 的线程来完成工作

Channel 类似于 Flume 中的 MemoryChannel 来做数据的缓冲

1.6 DataX调度决策思路

案例

- 用户提交了一个DataX 作业,并且配置了总的并发度为 20

- 目是对一个 有100张分表的 mysql 数据源进行同步

dataX调度决策思路

  • 1. DataX Job 根据分库分表切分策略,将同步工作分成 100个Task
  • 2. 根据配置的总的并发度20,以及每个Task Group 的并发度 5,
  • 3. DataX 计算共需要分配 4的Task Group
  • 4. 4个 TaskGroup 平分 100 个Task ,每一个TaskGroup 负责运行 25个Task

注:总的并发度为20,表示整个Job 最多能同时运行 20个Task

二 DataX安装使用

环境准备

  • Linux
  • JDK(推荐1.8)
  • Python(推荐 Python2.6x)

2.1 下载安装

1)DataX安装包下载:下载

2)将安装包上传至服务器任意节点,我这里上传至node1节点 /opt/module目录下

tar -zxvf datax.tar.gz -C /opt/module/

2)运行自检脚本

python /../bin/datax.py /../job/job.json
  • 出现下图日志说明环境没有问题

  • 如报错执行以下命令,删除隐藏文件
find /datax/plugin/reader/ -type f -name "._*er" | xargs rm -rf
find /datax/plugin/writer/ -type f -name "._*er" | xargs rm -rf

2.2从Stream流读取数流打印到控制台

查看官方提供模板

[root@node1 datax]#  python  bin/datax.py  -r streamreader -w streamwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [],
                        "sliceRecordCount": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据模板修改配置

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "sliceRecordCount": 10,
                        "column": [
                            {
                                "type": "long",
                                "value": "10"
                            },
                            {
                                "type": "string",
                                "value": "hello,DataX"
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "UTF-8",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

执行脚本(部分日志)

2.3 Mysql两表间的数据迁移

准备
创建数据库和表并加载测试数据

drop database test;
create database test;
use test;
CREATE TABLE `table_a` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '',
  `name` varchar(32) DEFAULT '' COMMENT '',
  `time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

INSERT INTO test.table_a (name) VALUES ('a');
INSERT INTO test.table_a (name) VALUES ('b');
INSERT INTO test.table_a (name) VALUES ('c');
INSERT INTO test.table_a (name) VALUES ('d');
INSERT INTO test.table_a (name) VALUES ('e');


create database test_bck;
use test_bck;
CREATE TABLE `table_a` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '',
  `name` varchar(32) DEFAULT '' COMMENT '',
  `time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

模板

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "源端数据库类型",
                    "parameter": {
                        "column": [ # 同步字段
                             "字段A",
                             "字段B"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://xxx:3306/数据库"
                                ], 
                                "table": [
                                    "table_name1,table_name2,..."
                                ]
                            }
                        ], 
                        "password": "数据库密码", 
                        "username": "数据库账号"  
                    }
                }, 
                "writer": {
                    "name": "目标端数据库类型", 
                    "parameter": {
                        "print": true, 
                        "column": [
                             "字段A",
                             "字段B"
                        ], 
                        "splitPk": "分片字段,丢给task,所以必须是整形。一般用主键即可(非必须)",
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://xxx:3306/数据库", 
                                "table": ["表名"]
                                "querySql":["这里可以写查询数据的sql语句,但是如果这里写了,则不允许再配置table,否则会报错(非必须,table和querySql只能存在一个)"]
                            }
                        ], 
                        "where":"过滤条件",
                        "password": "数据库密码", 
                        "username": "数据库账号"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "并发数",  # 
                "byte":1048576, # 同步速率(byte)/s
                "record":10000 # 同步速率(记录数)/s    byte、record 满足其一条件即可
            }, 
            "errorLimit": { # 对于脏数据的监控 record:脏数据条数,percentage 脏数据比例,满足其一条件即可
                "record": 5, 
                "percentage": 0.01
            }
        }
    }
}

配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://host_ip:3306/test"
                                ],
                                "table": [
                                    "table_a"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "print": true,
                        "column": [
                             "id",
                             "name",
                             "time"
                        ],

                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://host_ip:3306/test_bck",
                                "table": ["table_b"]
                            }
                        ],

                        "password": "123456",
                        "username": "root"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1",
                "byte":1048576,
                "record":10000
            },
            "errorLimit": {
                "record": 1,
                "percentage": 0.01
            }
        }
    }
}

执行脚本(部分日志)

MySQL的参数介绍

三 DataX-Web安装使用

简介

Datax-web基于datax,增加了web界面

Datax-web把Datax基于命令行任务通过java代码串联了起来,并提供了可视化的任务管理

环境准备

  • MySQL (推荐5.5+) 必选,对应客户端可以选装, Linux服务上若安装mysql的客户端可以通过部署脚本快速初始化数据库
  • JDK(推荐1.8)
  • Python(2.x)(支持Python3需要修改替换datax/bin下面的三个python文件)
  • datax:data-web执行任务时需要依赖datax中的python脚本

3.1 下载安装

下载

下载链接 提取码: cpsk

安装

  1. 解压
tar -zxvf datax-web-2.1.2.tar.gz -C /opt/module/
  1. 安装:执行datax-web-2.1.2/bin/install.sh脚本,安装datax-web
./datax-web-2.1.2/bin/install.sh
  1. 根据提示进行安装,安装时有两种情况
  • 第一种:安装datax-web的服务器已经具备mysql的服务。以下代码中我将中间的日志信息省略掉,只保留的需要用户输入的信息,按照下图一步步进行即可
Do you want to decompress this package: [datax-admin_2.1.2_1.tar.gz]? (Y/N)Y
Do you want to decompress this package: [datax-executor_2.1.2_1.tar.gz]? (Y/N)Y
Do you want to confiugre and install [datax-admin]? (Y/N)Y
Do you want to initalize database with sql: [/opt/apps/datax-web-2.1.2/bin/db/datax_web.sql]? (Y/N)Y
Please input the db host(default: 127.0.0.1): localhost # mysql所在服务器的ip地址,默认是127.0.0.1,这里localhost填与不填没有任何意义,只作为举例
Please input the db port(default: 3306): 3306 # mysql端口默认3306,如果是3306可以直接点击enter键,如果不是输入对应端口号
Please input the db username(default: root): root # 默认msyql用户是root,根据实际情况填写
Please input the db password(default: ): 123456 # 输入mysql用户对应的密码
Please input the db name(default: dataxweb)dataxweb # 输入作为datax-web的系统数据库,默认为dataxweb
Do you want to confiugre and install [datax-executor]? (Y/N)Y

执行完上面步骤,datax-web的基础安装就完成了,安装过程如下图所示:

  • 第二种,安装datax-web的服务器没有msyql服务,这个时候还是正常执行install.sh安装脚本,步骤按照第一种的情况往下执行就可以,不过因为当前服务器没有msyql的服务,所示在安装过程中,不会有下图中的提示

其他步骤同第一种情况相同,在安装完成后,可以通过配置文件自由配置所要连接的Mysql数据库,修改datax-web-2.1.2/modules/datax-admin/conf目录下的bootstrap.properties文件,在此文件中可以自由配置Mysql的连接,命令如下所示:

vi bootstrap.properties
# 配置文件中内容如下所示:
#Database
#DB_HOST=
#DB_PORT=
#DB_USERNAME=
#DB_PASSWORD=
#DB_DATABASE=
# 根据实际情况进行配置即可

配置完Mysql的数据库连接后,在mysql对应的数据库中执行datax-web-2.1.2/bin/db目录下的datax-web.sql脚本即可,执行完成后,会在该库中生成datax-web所需要的系统数据表。

3.2 配置文件修改

安装完成后,还需要配置datax-web-2.1.2/modules/datax-executor/bin/env.properties 指定PYTHON_PATH的路径,命令如下:

vi env.properties

# 在配置文件中编辑内容如下:PYTHON脚本执行位置 
PYTHON_PATH=/opt/apps/datax/bin/datax.py 

3.3 启动服务

执行datax-web-2.1.2/bin目录下的start-all.sh命令即可,命令如下图所示

./start-all.sh

启动完成后,通过jps命令查看,服务器上是否存在data-web的服务,命令如下:

[root@node1 bin]# jps
4741 DataXExecutorApplication
4478 DataXAdminApplication
4783 Jps
# 执行完jps命令后出现以上两个服务证明datax-web已经启动

启动完成后就可以通过页面进行访问了,默认端口号是9527,形式如:127.0.0.1:9527/index.html,进入页面后进行登录默认用户:admin,默认密码:123456,如下图所示:

登录成功后,页面如下所示:

3.4 任务配置

  1. 点击项目管理,添加项目

  1. 创建执行器

  1. 创建数据源

  1. 创建任务模板(此步骤是主要是为了后边构建任务的时候,为任务指定执行器)

  1. 创建任务
  • 构建reader

  • 构建writer

  • 字段映射

  • 任务构建

到这里任务已经构建完成了

3.5 启动任务

在任务管理界面,可以将配置好的任务启动,以及可以查看 注册节点、触发时间、执行状态等情况

3.6 日志查看

3.7 资源监控

Datax使用总结

优点

  1. 速度较快,相关网友做过测试:并发数量为5的情况下,3000万的数据量比sqoop快4倍以上
  2. Datax的同步速率可以根据我们的实际情况进行配置
  3. 日志比较完善和人性化
  4. 对于脏数据的处理:在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,提供多种的脏数据处理模式
  5. 健壮的容错机制:DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性
  1. 丰富的数据转换功能:数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。
  2. 界面化配置:基于DataX开发的DataX-web,其拥有可视化界面,省去重复配置的麻烦,任务管理、资源监控一目了然

缺点

DataX非分布式,单节点运行任务,对服务器压力较大

从现有调研结果来看,使用DataX可以满足全量同步数据的需求;

后面会对DataX进行大数据量、多任务、多场景的测试!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/567878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【面试题】中高级前端工程师都需要熟悉的技能--前端缓存

前端缓存 一、前言二、web缓存分类1. HTTP缓存:2. 浏览器缓存:3. Service Worker:4. Web Storage缓存:5. 内存缓存: 三、http缓存详解1、http缓存类型a. 基于有效时间的缓存控制:b. 基于资源标识的缓存&…

从复杂到简单:通用CRM客户管理系统改变企业管理方式

一、概述 CRM客户管理系统是通过对客户生命周期的有效管理,是一种重要的客户管理工具,可以帮助企业更好的了解客户的需求,提升客户满意度,优化销售流程、提高销售业绩等,随着数字经济的不断发展和普及,CRM…

挂耳式蓝牙耳机什么牌子好?口碑最好的挂耳式耳机

耳机的种类型号越来越繁杂,相比头戴式耳机和入耳式耳机,现在最流行的开放式耳机在减轻机体重量的同时更考量到佩戴舒适度,特别是运动时候,因其不入耳不伤耳设计,既稳固又舒适。考虑到大家想挑选到自己的心仪款式又能顾…

分享5个堪称神器的效率软件

今天再来推荐5个超级好用的效率软件,无论是对你的学习还是办公都能有所帮助,每个都堪称神器中的神器,用完后觉得不好用你找我。 1.键盘映射工具——KeyTweak KeyTweak是一款键盘映射工具,它可以让你自定义键盘上的每一个按键&am…

由于找不到vcomp110.dll无法继续执行此代码怎么修复

vcomp110.dll是Microsoft Visual C的一个库文件。这个库文件是在安装了Microsoft Visual C 2012 Redistributable Package之后自动安装的。vcomp110.dll是编写多线程程序所必需的,它包含了相关的函数和数据结构,可以帮助开发人员更好地利用计算机的多核处…

优质财务共享产品的十大特征

特征一:中台化应用 多业务系统数据源,包括交易平台、业务云、其他来源加全面事项库,综合实现能力复用、数据资源共享,为数据服务打基础。 特征二:多模式业务接入 统一业务数据采集入口、开放灵活的特性、支持多模式接…

STM32采集传感器数据通过排序取稳定值

一、前言 在物联网、单片机开发中,经常需要采集各种传感器的数据。比如:温度、湿度、MQ2、MQ3、MQ4等等传感器数据。这些数据采集过程中可能有波动,偶尔不稳定,为了得到稳定的值,我们可以对数据多次采集,进行排序,去掉最大和最小的值,然后取平均值返回。 二、排序算法…

博客系统的后端设计(八) - 实现发布博客功能

文章目录 发布博客1. 约定前后端交互接口2. 服务器代码3. 客户端代码4. 出现的问题 发布博客 在原来的编辑页面点击发布文章按钮,是不会有什么效果的。 这是因为此时还不能实现前后端的交互。 1. 约定前后端交互接口 请求使用 POST,路径是 /blog title这…

MVVM封装

MVVM封装 一.MVVM1.依赖2.mvvm封装3.业务层调用1.MainService2.MainRepository3.MainViewModel4.TestActivity 4.独立运行main业务模块 一.MVVM 1.依赖 config.gradle //jetpackdef lifecycle_version "2.5.1"libLivedata "androidx.lifecycle:lifecycle-li…

谁懂啊!性能测试还能这么快学好。。。。。。

目录 前言 (一)用户视角看响应时间: (二)运维视角看系统的使用率 mysql等数据库关注指标 JVM(针对java技术线):Java应用程序的CPU和内存使用率 系统最大处理的任务数: &#…

如何完善企业性能测试流程?详解测试计划编写与实现

目录 前言: 性能测试流程 性能测试计划 实例代码 总结 前言: 在现代技术领域中,性能测试是非常重要的一个流程。无论是应用程序、网站、数据库等,都需要经过严格的性能测试,以确保其能够在各种条件下保持稳定的性…

RabbitMq指南:彻底解决消息可靠性问题以及实现延迟队列

提到 RabbitMq,想必大家都不陌生,我们经常使用它来进行服务之间的异步通信,如短信,日志,邮件发送等业务场景都可以使用 RabbitMq 实现,但在使用的过程中我们会遇到消息丢失的问题,也会有延迟消费…

SSM(Spring篇)

Spring Spring的IOC和DI Spring简介 介绍 Spring的分层Java SE\EE应用full-stack轻量级开源框架,以IOC(Inverse Of Control:反转控制)和AOP(Aspect Oriented Programing:面向切面编程)为内核…

AOP参数校验

场景 在面试中,可能会问道AOP的适用场景有哪些,参数校验就是其中的一个例子 在Java使用登录or注册功能时,会需要用户提交邮箱手机号等信息,此时前端可能会做参数校验,以下代码是基于后端的参数校验,可适用…

不推荐你做网工,就因为这一个字

大家好,我是许公子。 我平时闲暇的时候,很喜欢在网上逛逛,看看大家最近都在聊什么。 这几天看到个问题。 网络工程师,累吗? 里面有不少回答觉得,网工这行,太累了。 “ 做实施的时候&#xf…

【学习记录22】git删除远程主分支和文件

项目分支太多,代码冗余太多,项目也没正式启用,想着全部清空重新来一遍,参考了网上的步骤都不太理想,全部走不通,搜出来大部分都是类似下面的操作。经过一段时间折腾终于删掉并清空。因此记录一下以备后期查…

小白入门SRC漏洞挖掘的正确姿势

前言 有不少阅读过我文章的伙伴都知道,我从事网络安全行业已经好几年,积累了丰富的经验和技能。在这段时间里,我参与了多个实际项目的规划和实施,成功防范了各种网络攻击和漏洞利用,提高了安全防护水平。 也有很多小…

Azure Services -5.25-summary

文章目录 1. Resources2.Data processing process3.Virtual network and public ip address4. Kubernetes services5. Yaml file first , we enter the homepage of microsoft azure, and we can see a lot of servicse provided by the microsoft azure , 1. Resources accou…

SSM(SpringMVC篇)

SpringMVC SpringMVC简介 概述 SpringMVC是一种基于java的实现MVC设计模型的请求驱动类型的轻量级web框架,属于SpringFrameWork的后续产品,已经融合在Spring Web Flow中 SpringMVC已经成为目前最主流的框架之一,并且随着Spring3.0的发布&…

Excel高手与普通人之间的差距,全在这个比Access还简单的工具

用“Excel”还是用“Python”? 很多职场人,在工作的大部分时间都会和“Excel”打交道,普通人,我们用“Excel”的录入和统计等,很多功能是不会用的。 只有一部分人群,会“Excel”的高级操作,比…