DataX(MySQL同步数据到Doris)

news2025/1/16 11:01:10

1.场景

这里演示介绍的使用 Doris 的 Datax 扩展 DorisWriter实现从Mysql数据定时抽取数据导入到Doris数仓表里

2.编译 DorisWriter

这个的扩展的编译可以不在 doris 的 docker 编译环境下进行,本文是在 windows 下的 WLS 下进行编译的

首先从github上拉取源码

 git clone https://github.com/apache/incubator-doris.git

进入到incubator-doris/extension/DataX/ 执行编译

首先执行:

sh init_env.sh
这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:

  1. 将 DataX 代码库 clone 到本地。
  2. 将 doriswriter/ 目录软链到 DataX/doriswriter 目录。
  3. 在 DataX/pom.xml 文件中添加 <module>doriswriter</module> 模块。
  4. 将 DataX/core/pom.xml 文件中的 httpclient 版本从 4.5 改为 4.5.13 httpclient v4.5 在处理 307 转发时有bug。
  5. 这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中,方便开发者提交代码

2.1 开始编译
这里我为了加快编译速度去掉了很多无用的插件:这里直接在Datax目录下的pom.xml里注释掉就行

 hbase11xreader
 hbase094xreader
 tsdbreader
 oceanbasev10reader
 odpswriter
 hdfswriter
 adswriter
 ocswriter
 oscarwriter
 oceanbasev10writer

然后进入到incubator-doris/extension/DataX/ 目录下的 Datax 目录,执行编译

这里我是执行的将 Datax 编译成 tar 包,和官方的编译命令不太一样。

 mvn -U clean package assembly:assembly -Dmaven.test.skip=true

image.png

编译完成以后,tar 包在 Datax/target 目录下,你可以将这tar包拷贝到你需要的地方,这里我是直接在 datax 执行测试,这里因为的 python 版本是 3.x版本,需要将 bin 目录下的三个文件换成 python 3能之别的版本,这个你可以去下面的地址下载:

https://github.com/WeiYe-Jing...
将下载的三个文件替换 bin 目录下的文件以后,整个编译,安装就完成了

如果你编译不成功也可以从我的百度网盘上下载编译好的包,注意我上边编译去掉的那些插件

链接: https://pan.baidu.com/s/1ObQ4Md0A_0ut4O6-_gPSQg

提取码: 424s 

3.数据接入

这个时候我们就可以开始使用 Datax 的doriswriter扩展开始从 Mysql(或者其他数据源)直接将数据抽取出来导入到 Doris 表中了。

3.1 Mysql 数据库准备

下面是我数据库的建表脚本(mysql 8):

CREATE TABLE `order_analysis` (
   `date` varchar(19) DEFAULT NULL,
   `user_src` varchar(9) DEFAULT NULL,
   `order_src` varchar(11) DEFAULT NULL,
   `order_location` varchar(2) DEFAULT NULL,
   `new_order` int DEFAULT NULL,
   `payed_order` int DEFAULT NULL,
   `pending_order` int DEFAULT NULL,
   `cancel_order` int DEFAULT NULL,
   `reject_order` int DEFAULT NULL,
   `good_order` int DEFAULT NULL,
   `report_order` int DEFAULT NULL
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT

示例数据:

INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-12 00:00:00', '广告二维码', 'Android APP', '上海', 15253, 13210, 684, 1247, 1000, 10824, 862);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-14 00:00:00', '微信朋友圈H5页面', 'iOS APP', '广州', 17134, 11270, 549, 204, 224, 10234, 773);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '地推二维码扫描', 'iOS APP', '北京', 16061, 9418, 1220, 1247, 458, 13877, 749);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '微信朋友圈H5页面', '微信公众号', '武汉', 12749, 11127, 1773, 6, 5, 9874, 678);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', '地推二维码扫描', 'iOS APP', '上海', 13086, 15882, 1727, 1764, 1429, 12501, 625);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', '微信朋友圈H5页面', 'iOS APP', '武汉', 15129, 15598, 1204, 1295, 1831, 11500, 320);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', '地推二维码扫描', 'Android APP', '杭州', 20687, 18526, 1398, 550, 213, 12911, 185);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', '应用商店', '微信公众号', '武汉', 12388, 11422, 702, 106, 158, 5820, 474);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-20 00:00:00', '微信朋友圈H5页面', '微信公众号', '上海', 14298, 11682, 1880, 582, 154, 7348, 354);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-21 00:00:00', '地推二维码扫描', 'Android APP', '深圳', 22079, 14333, 5565, 1742, 439, 8246, 211);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-22 00:00:00', 'UC浏览器引流', 'iOS APP', '上海', 28968, 18151, 7212, 2373, 1232, 10739, 578);

3.2 doris数据库准备

下面是我上面数据表在doris对应的建表脚本

CREATE TABLE `order_analysis` (
   `date` datetime DEFAULT NULL,
   `user_src` varchar(30) DEFAULT NULL,
   `order_src` varchar(50) DEFAULT NULL,
   `order_location` varchar(10) DEFAULT NULL,
   `new_order` int DEFAULT NULL,
   `payed_order` int DEFAULT NULL,
   `pending_order` int DEFAULT NULL,
   `cancel_order` int DEFAULT NULL,
   `reject_order` int DEFAULT NULL,
   `good_order` int DEFAULT NULL,
   `report_order` int DEFAULT NULL
 ) ENGINE=OLAP
 DUPLICATE KEY(`date`,user_src)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`user_src`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

3.3 Datax Job JSON文件

创建并编辑datax job任务json文件,并保存到指定目录

 {
     "job": {
         "setting": {
             "speed": {
                 "channel": 1
             },
             "errorLimit": {
                 "record": 0,
                 "percentage": 0
             }
         },
         "content": [
             {
                 "reader": {
                     "name": "mysqlreader",
                     "parameter": {
                         "username": "root",
                         "password": "zh",
                         "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order" ],
                         "connection": [ { "table": [ "order_analysis" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/demo" ] } ] }
                 },
                 "writer": {
                     "name": "doriswriter",
                     "parameter": {
                         "feLoadUrl": ["fe:8030"],
                         "beLoadUrl": ["be1:8040","be1:8040","be1:8040","be1:8040","be1:8040","be1:8040"],
                         "jdbcUrl": "jdbc:mysql://fe:9030/",
                         "database": "test_2",
                         "table": "order_analysis",
                         "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order"],
                         "username": "root",
                         "password": "",
                         "postSql": [],
                         "preSql": [],
                         "loadProps": {
                         },
                         "maxBatchRows" : 10000,
                         "maxBatchByteSize" : 104857600,
                         "labelPrefix": "datax_doris_writer_demo_",
                         "lineDelimiter": "\n"
                     }
                 }
             }
         ]
     }
 }

这块 Mysql reader 使用方式参照:

https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

doriswriter的使用及参数说明:

 https://github.com/apache/incubator-doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md

或者

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "My",
                        "column": ["id","md5","eid","industry_code","start_date","end_date","is_valid","source","create_time ","update_time","row_update_time","local_row_update_time"],
                        "connection": [ { "table": [ "t_last_industry_all" ], "jdbcUrl": [ "jdbc:mysql://IP:3306/log" ] } ] }
                },
                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "feLoadUrl": ["IP:8030"],
                        "beLoadUrl": ["IP:8040"],
                        "jdbcUrl": "jdbc:mysql://IP:9030/",
                        "database": "mysqltodoris",
                        "table": "t_last",
                        "column": ["id","md5","eid","industry_code","start_date","end_date","is_valid","source","create_time ","update_time","row_update_time","local_row_update_time"],
                        "username": "root",
                        "password": "123456",
                        "postSql": [],
                        "preSql": [],
                        "loadProps": {
                        },
                        "maxBatchRows" : 300000,
                        "maxBatchByteSize" : 20971520
                    }
                }
            }
        ]
    }
}

4.执行Datax数据导入任务

python bin/datax.py doris.json
然后就可以看到执行结果:

再去 Doris 数据库中查看你的表,数据就已经导入进去了,任务执行结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/980656.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

升哲科技城市级“算力+数字底座”服务亮相2023服贸会

9月2日至6日&#xff0c;以“开放引领发展&#xff0c;合作共赢未来”为主题的2023年中国国际服务贸易交易会在北京隆重举办。作为城市级数据服务商&#xff0c;升哲科技&#xff08;SENSORO&#xff09;连续第四年参加服贸会&#xff0c;携城市级“算力数字底座”服务及在城市…

语音芯片WTN6的驱动

前言 &#xff08;1&#xff09;本系列是基于STM32的项目笔记&#xff0c;内容涵盖了STM32各种外设的使用&#xff0c;由浅入深。 &#xff08;2&#xff09;小编使用的单片机是STM32F105RCT6&#xff0c;项目笔记基于小编的实际项目&#xff0c;但是博客中的内容适用于各种单片…

java八股文面试[数据库]——MySQL中事务的特性

在关系型数据库管理系统中&#xff0c;一个逻辑工作单元要成为事务&#xff0c;必须满足这 4 个特性&#xff0c;即所谓的 ACID&#xff1a;原子性&#xff08;Atomicity&#xff09;、一致性&#xff08;Consistency&#xff09;、隔离性&#xff08;Isolation&#xff09;和持…

[移动通讯]【Carrier Aggregation-3】【5G】

前言&#xff1a; 参考&#xff1a; 5G Mobile Communications&#xff1a;《Carrier Aggregation in 5G》 目录&#xff1a; 1&#xff1a; carrier Allocation Schemes 2&#xff1a; 网络结构 3&#xff1a; LTE CA 4: 5G CA 一 Carrier Allocation Schemes CA 主要作用…

问脉基础调研

基本功能&#xff1a; 资产清点 清点镜像、镜像软件资产数据与详细信息清点容器、应用软件资产数据与详细信息清点集群、Pod、Service、Ingress、Secrets 等数十种资产数据与详细信息提供资产与资产、资产与事件关联查看提供仪表盘总览当前检测对象数据情况 镜像安全 镜像漏…

Text文件在MATLAB中读写示例基础

背景 为了便于和外部程序进行交换&#xff0c;以及查看文件中的数据&#xff0c;也常常采用文本数据格式与外界交换数据。在文本格式中&#xff0c;数据采用ASCII码格式&#xff0c;可以使用字母和数字字符。可以在文本编辑器中查看和编辑ASCII文本数据。MATLAB提供了导入函数…

raise EOFError(“No data left in file“) EOFError: No data left in file

在linux服务器里跑分割模型的时候出现了以下错误&#xff0c;但是在自己电脑上运行相同程序时没有错误&#xff0c;可以运行。 ise EOFError(“No data left in file”) EOFError: No data left in file 到底是因为什么&#xff1f; GPT给的回答&#xff1a; “EOFError: No …

3dMax全球学习资源、资源文件和教程 !

此样例教育教程和学习资源旨在提供使用Autodesk 3ds Max时的计划知识和培训、正确的工作流、流程管理和最佳实践。 您在Autodesk三维设计领域的职业生涯 有关使用3ds Max和Maya在计算机图形领域开始职业生涯的提示&#xff08;包括新的3ds Max和Maya介绍教程&#xff0c;以复…

js reduce求和

let unReadCount resultList.reduce((pre, cur) > {return pre cur.unReadCount}, 0)

自动气象站:无线数据传输、多场景应用

自动气象站能够和环境监控云平台组成气象环境监控系统&#xff0c;能够全天候无人值守地监测气象要素&#xff0c;实现实时监测和数据传输&#xff0c;具有多要素集成、无线数据传输、多场景应用的优势。 一、自动气象站可以全天候不间断地监测空气温度、湿度、大气压力、雨量…

Gin框架---环境搭建

目录 一&#xff1a;MAC安装Go环境二&#xff1a;配置Go相关的环境变量三&#xff1a;设置GO国内代理四&#xff1a;GoLand初始化项目五&#xff1a;安装GIN框架六&#xff1a;Gin框架演示 一&#xff1a;MAC安装Go环境 Go官网地址&#xff1a;https://golang.google.cn/dl/直…

输入部件 QComboBox --组合框/下拉列表

QComboBox 类是 QWidget 类的直接子类&#xff0c;该类实现了一个组合框 一、QComboBox 类中的 属性 QComboBOx 类(组合框)属性速查表属性名说明属性名说明count获取项目数量minimumContentsLength组合框中最少字符数maxCount允许的最大项数maxVisibleItems向用户显示的最大项…

计算机网络第三章——数据链路层(中)

数声风笛离亭晚&#xff0c;君向潇湘我向秦 文章目录 ALOHA协议CSMA协议CSMA/CD协议CSMA/CA协议 总线型和星型都是广播式通信&#xff0c;看一下目的地址是否是我&#xff0c;若是我就接受否则就丢弃&#xff0c;总线型就是若是有一个断了则会影响其他的&#xff0c;型型的就是…

报错:为什么数组明明有内容但打印的length是0

文章目录 一、问题二、分析三、解决1.将异步改为同步2.设置延迟 一、问题 在日常开发中&#xff0c;for 循环遍历调用接口&#xff0c;并将接口返回的值进行拼接&#xff0c;即push到一个新的数组中&#xff0c;但是在for循环内部是可以拿到这个新的数组&#xff0c;而for循环…

接口测试工具开发文档

1 开发规划 1.1 开发人员 角 色 主要职责 负责模块 人员 备注 n xxx模块 xxx 1.2 开发计划 <附开发计划表> 1.3 开发环境和工具 开发工具 工具 作用 Notepad 编辑器 Perl 解释器 2 总体设计 设计思路&#xff1a;因为测试app和server。首先必须…

vue3中使用viewerjs实现图片预览效果

vue3中使用viewerjs实现图片预览效果 1、前言2、实现效果3、在vue3项目中使用viewer.js3.1 安装3.2 在main.js中引入3.3 组件中使用 1、前言 viewer.js是一款开源的图片预览插件&#xff0c;功能十分强大: 支持移动设备触摸事件支持响应式支持放大/缩小支持旋转&#xff08;类…

CPU及并发

2.9G Hz,即每秒进行2.9G次运算(即29亿次) 几个命令 us: 用户使用的cpu sy: 系统(内核)使用的cpu id: idle,即空闲cpu wa: 等待I/O的cpu st: 开虚拟机后会有的一个指标,即虚拟机的cpu使用率 一个进程拥有一整套虚拟地址空间,该进程的所有线程都共享该地址空间. 线程是CPU运算的最…

Docker如何安装seafile

SQLite 方式 要在 Docker 中安装 Seafile&#xff0c;您可以按照以下步骤进行操作&#xff1a; 安装 Docker&#xff1a;确保您的系统上已经安装了 Docker。您可以根据您的操作系统类型&#xff0c;在官方网站上找到适合您系统的 Docker 版本并进行安装。 下载 Seafile 镜像&…

管理类联考——数学——汇总篇——知识点突破——数据分析——计数原理——排列组合——涂色

⛲️ 一、考点讲解 1.题目特征 如果给几种颜色来填涂所给的图形&#xff0c;就是涂色问题。 2.解题方法 可以按照图形逐一依次填涂&#xff0c;也可以按照所用颜色的种数进行分类讨论。 二、考试解读 &#xff08;1&#xff09;涂色问题一般要求相邻的颜色不能相同&#xff0c…

2023年9月9日(星期六)骑行笔架山

2023年9月9日 (星期六)&#xff1a;骑行笔架山&#xff0c;早8:30到9:00&#xff0c; 大观楼门囗集合&#xff0c;9:30准时出发 【因迟到者&#xff0c;骑行速度快者&#xff0c;可自行追赶偶遇。】 偶遇地点: 大观楼门囗集合&#xff0c;家住东&#xff0c;南&#xff0c;北…