DataX3同步Mysql数据库数据到Mysql数据库和DataX3同步mysql数据库数据到Starrocks数据库

news2025/1/23 7:25:42

DataX3同步Mysql数据库数据到Mysql数据库和DataX3同步mysql数据库数据到Starrocks

  • 一、认识DataX
  • 二、DataX3概览
  • 三、DataX3框架设计
  • 四、DataX3插件体系
  • 五、DataX3核心架构
  • 六、DataX 3六大核心优势
    • 1.可靠的数据质量监控
    • 2.丰富的数据转换功能
    • 3.精准的速度控制
    • 4.强劲的同步性能
    • 5.健壮的容错机制
    • 6.极简的使用体验
  • 七、DataX3同步Mysql数据库数据到Mysql数据库
  • 八、DataX3同步Mysql数据库数据到Starrocks数据库

一、认识DataX

  • DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

特征:

  • DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

二、DataX3概览

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
在这里插入图片描述
设计理念:

  • 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

当前使用现状:

  • DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。

三、DataX3框架设计

在这里插入图片描述

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader作为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

四、DataX3插件体系

经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:
在这里插入图片描述

五、DataX3核心架构

DataX 3开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

在这里插入图片描述
核心模块介绍:

  • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程:

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  • DataXJob根据分库分表切分成了100个Task。
  • 根据20个并发,DataX计算共需要分配4个TaskGroup。
  • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

六、DataX 3六大核心优势

1.可靠的数据质量监控

完美解决数据传输个别类型失真问题

  • DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。

提供作业全链路的流量、数据量的运行时监控

  • DataX3运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。

提供脏数据探测:

  • 在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!

2.丰富的数据转换功能

DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。

3.精准的速度控制

还在为同步过程对在线存储压力影响而担心吗?新版本DataX3提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

"speed": {
   "channel": 5,
   "byte": 1048576,
   "record": 10000
}

4.强劲的同步性能

DataX3每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。另外,DataX团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试。

5.健壮的容错机制

DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX3的设计中,重点完善了框架和插件的稳定性。目前DataX3可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。

  • 线程内部重试:DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。
  • 线程级别重试:目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

6.极简的使用体验

  • 易用:下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。

  • 详细:DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。

    • 传输过程中打印传输速度、进度等

在这里插入图片描述

  • 传输过程中会打印进程相关的CPU、JVM等

在这里插入图片描述

  • 在任务结束之后,打印总体运行情况
    在这里插入图片描述

七、DataX3同步Mysql数据库数据到Mysql数据库

配置文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 6
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "aa",
                        "password": "1",
                        "splitPk": "id",
                        "column": [
                            "id",
                            "user_id",
                            "tag_id",
                            "question_id",
                            "direction",
                            "is_in_tags",
                            "created_at",
                            "updated_at"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "`user_question`"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://mysql-01-dev.com:3306/optics_dev?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "parameter": {
                        "writeMode": "insert",
                        "username": "debug_test_rw",
                        "password": "1",
                        "column": [
                            "id",
                            "user_id",
                            "tag_id",
                            "question_id",
                            "direction",
                            "is_in_tags",
                            "created_at",
                            "updated_at"
                        ],
                        "preSql": [
                            "delete from user_question_bak"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "user_question_bak"
                                ],
                                "jdbcUrl": "jdbc:mysql://dd-mysql-01-test.com:3306/optics_test?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"
                            }
                        ]
                    },
                    "name": "mysqlwriter"
                }
            }
        ]
    }
}

八、DataX3同步Mysql数据库数据到Starrocks数据库

{
    "job": {
        "setting": {
            "speed": {
                "channel": 8
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [{
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "username": "debug_test_rw",
                    "password": "11111",
                    "splitPk": "id",
                    "column": [
                        "id",
                        "tag_id",
                        "user_id",
                        "app_user_id",
                        "is_owner",
                        "sort",
                        "title",
                        "created_at",
                        "updated_at"
                    ],
                    "connection": [{
                        "table": [
                            "`tag_user_20230116`"
                        ],
                        "jdbcUrl": [
                            "jdbc:mysql://mysql-01-test.com:3306/optics_test?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"
                        ]
                    }]
                }
            },
            "writer": {
                "name": "starrockswriter",
                "parameter": {
                    "username": "dd_scr_starrocks_rw",
                    "password": "11111",
                    "database": "dd_scr_starrocks",
                    "table": "tag_user",
                    "column": [
                        "id",
                        "tag_id",
                        "user_id",
                        "app_user_id",
                        "is_owner",
                        "sort",
                        "title",
                        "created_at",
                        "updated_at"
                    ],
                    "preSql": [
                        "truncate table dd_scr_starrocks.tag_user"
                    ],
                    "postSql": [],
                    "jdbcUrl": "jdbc:mysql://10.129.66.144:9030/",
                    "loadUrl": ["10.129.66.144:8030", "10.129.88.67:8030", "10.129.68.124:8030"],
                    "loadProps": {
                        "format": "json",
                        "strip_outer_array": true
                    }
                }
            }
        }]
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/483556.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【AI面试】目标检测中one-stage、two-stage算法的内容和优缺点对比汇总

在深度学习领域中,图像分类,目标检测和目标分割是三个相对来说较为基础的任务了。再加上图像生成(GAN,VAE,扩散模型),keypoints关键点检测等等,基本上涵盖了图像领域大部分场景了。 …

【解决办法】adobe photoshop :Assertion failed!

问题 PS启动时出现如下图错误(实际行数可能不一样,program和file一样): ASSERTION FAILED Program…\node-vulcanjs\build\Release\VulcanMessagerLib.node File: C:\bid\workspace\CCX-Process\release…\vulcanadapter.cc Lin…

深度学习实战27-Pytorch框架+BERT实现中文文本的关系抽取

大家好,我是微学AI,今天给大家介绍一下深度学习实战27-Pytorch框架+BERT实现中文文本的关系抽取,关系抽取任务是一项重要的任务,其核心是从一段自然语言文本中抽取实体之间具有的关系。随着深度学习的发展,很多预训练模型在关系抽取任务上取得了显著的成果,其中BERT模型是…

Matlab实现多个窗口间的数据传递(不用GUIDE)

在用多个matlab的figure进行数据交互时,数据传入是较为简单的,可以直接用function的形参实现,但如何把数据传回,是个比较麻烦的问题。 在GUIDE下,系统自动生成了output_fcn函数,可以用它来实现从子窗口到主…

【P4】JMeter 原生录制方式——HTTP代理服务器

文章目录 一、准备工作二、原生录制方式——HTTP2.1、设计说明2.2、测试计划设计 三、原生录制方式——HTTPS3.1、设计说明3.2、测试计划设计 四、HTTP代理服务器主要参数说明4.1、目标控制器4.2、分组:在组间添加分割4.3、分组:每个组放入一个新的控制器…

2023年清华大学五道口金融学院招收公开招考博士研究生(普博)拟录取名单公示

公示期:十个工作日( 2023年4月24日至5月9日 ) 经综合考核和研究生招生工作领导小组讨论,报学校研究生招生工作领导小组批准,清华大学五道口金融学院2023年公开招考博士研究生拟录取名单,现已确定&#xff…

Python 扩展教程(1): 调用百度AI

关于AI 自有计算机以来,人们就想让计算机具有人的感知、意识、概念、思维、行为,代替人的工作。AI (Artificial Interligence)是计算机科学的一个分支,专注研究、开发、模拟、扩展人的智能的理论、方法、技术及应用。 从研究领域和方法上&…

【Linux】6. 实现进度条和git基本认识和使用

编写小程序 – 进度条 1. 理解缓冲区概念 2. 理解\n 和 \r的区别 在操作系统层面:\n 表示换行 \r表示回车 在语言层面: \n就是回车换行 3. 进度条的需求分析 4. 代码编译 5. 代码优化 到这里进度条的编写也就完成了,✿✿ヽ(▽)ノ✿&#…

C语言基础应用(六)数组

引言 现程序要求,录入班里60名同学的所有成绩,我们应该怎么录入呢?按照我们之前所学习的难道要声明60个变量来录入成绩嘛? 就像: int main() {int a1,a2,a3,...,a60;scanf("%d%d%d...%d",&a1,&a2,…

anaconda使用教程

一.创建conda虚拟环境 conda create -n AI python3.8 conda create -n #代表创建conda虚拟环境 AI #创建的虚拟环境的名称 python3.8 #代表指定的Python版本 二.查看已创建的conda虚拟环境 conda env list三.激活conda虚拟环境 conda activate AI #AI 是co…

手把手教你爬取网站信息

如题,理解这一部分需要一定的Python基础,有些代码我不做详细解释了,但是用这个方法是确实可以爬到的。 此次用以下这个页面(可以用md5软件解密) 1476409DEDD7A55FE86915BC370A3ECD 爬取电影的详情数据 1. 在抓包⼯具…

Linux常见指令 (2)

Linux常见指令 ⑵ 补充man描述:用法:例子 echo描述:用法:例子 echo 字符串例子 echo 字符串 > 文件例子 追加重定向(>>)例子 输出重定向(>)来创建文件 && (>)来清空文件 cat描述:用法:例子 cat && cat 文件补充:例子 cat 文件 && cat &…

深入理解SeaTunnel:易用、高性能、支持实时流式和离线批处理的海量数据集成平台

深入理解SeaTunnel:易用、高性能、支持实时流式和离线批处理的海量数据集成平台 一、认识SeaTunnel二、SeaTunnel 系统架构、工作流程与特性三、SeaTunnel工作架构四、部署SeaTunnel1.安装Java2.下载SeaTunnel3.安装连接器 五、快速启动作业1.添加作业配置文件以定义…

ChatGPT火了,将给网络安全行业带来什么影响?

ChatGPT火了,将给网络安全行业带来什么影响? 一、简介 作为全新的人工智能(AI)聊天机器人,ChatGPT被认为正在“掀起新一轮AI革命”。在股市上甚至出现了“ChatGPT概念股”的当下,ChatGPT究竟对于网络安全…

Mysql 苞米豆 多数据源 读写分离(小项目可用)

目录 0 课程视频 1 配置 1.1 加依赖 1.2 yml 配置文件 -> druid配置后报错 搞不定 2 代码 2.1 实体类 2.2 mapper -> 调用操作数据库方法 操作数据库 2.3 service -> 指定数据源 -> 用Mapper 接口 -> 操作数据库 2.4 controller -> 用户使用接口 -&…

当~python批量获取某电商:商品数据并作可视化

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 开发环境: 首先我们先来安装一下写代码的软件&#xff08;对没安装的小白说&#xff09; Python 3.8 / 编译器 Pycharm 2021.2版本 / 编辑器 专业版是付费的 <文章下方名片可获取魔法永久用~> 社区版是免费的 …

利用Python如何实现数据驱动的接口自动化测试

目录 前言 1、需求 2、方案 3、实现 总结 前言 大家在接口测试的过程中&#xff0c;很多时候会用到对CSV的读取操作&#xff0c;本文主要说明Python3对CSV的写入和读取。下面话不多说了&#xff0c;来一起看看详细的介绍吧。 1、需求 某API&#xff0c;GET方法&#xff…

HbuilderX打包AndroidAPP使用教程

HBuilder是DCloud&#xff08;数字天堂&#xff09;推出的一款支持HTML5的Web开发IDE。HBuilder的编写用到了Java、C、Web和Ruby。HBuilder本身主体是由Java编写。它基于Eclipse&#xff0c;所以顺其自然地兼容了Eclipse的插件。 HbuildX打包android的apk安装包时需要新建项目 …

Insix:面向真实的生成数据增强,用于Nuclei实例分割

文章目录 InsMix: Towards Realistic Generative Data Augmentation for Nuclei Instance Segmentation摘要本文方法数据增强方法具有形态学约束的前景增强提高鲁棒性的背景扰动 实验结果 InsMix: Towards Realistic Generative Data Augmentation for Nuclei Instance Segment…

Go语言开发小技巧易错点100例(七)

往期回顾&#xff1a; Go语言开发小技巧&易错点100例&#xff08;一&#xff09;Go语言开发小技巧&易错点100例&#xff08;二&#xff09;Go语言开发小技巧&易错点100例&#xff08;三&#xff09;Go语言开发小技巧&易错点100例&#xff08;四&#xff09;Go…