数据集成工具 ---- datax 3.0

news2025/1/11 18:41:53
1、datax:

        是一个异构数据源离线同步工具,致力于实现关系型数据库(mysql、oracle等)hdfs、hive、hbase等各种异构数据源之间的数据同步

2、参考网址文献:

https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/master/userGuid.mdhttps://github.com/alibaba/DataX/blob/master/introduction.md

3、Datax的框架设计:

Datax作为离线数据同步工具,主要的是采用了Framework+plugin架构构成,将数据源的读数据和写数据封装成对应的Reader和Writer插件,纳入到整体的同步框架中。

        1、Reader:作为数据的采集模块,负责采集数据源的数据,将数据发送给Framework

        2、Writer:作为数据写入模块,负责不断的向Framework取出数据,将数据写入到对应的目的端

        3、Framework:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、数据转换等核心技术问题。

4、Datax的核心架构:

Datax3.0 开源版本是支持单机多线程来完成同步作业运行,因为底层是使用java做开发。整体的架构:

模块的核心介绍:

        1、Datax完成单个数据同步做作业,被称之为job,Datax接收到一个job时就会启动一个进程来完成数据同步工作,所以Datax job 模块是单个作业的中枢管理中心,主要是承担数据清理,子任务切分、TaskGroup 管理。

        2、当Datax启动后,Datax job会根据不同的源数据将job切分成不同的Task,所以Task是Datax的最小作业单位,每一个Task都会负责一部分的数据同步。

        3、切分成多个Task后,Datax job 就会调用scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup,每一个TaskGroup都负责一定的Task任务的执行,默认TaskGroup并发数量数5个。

        4、每一个Task都是由TaskGroup所监控执行启动,每一个Task启动后都会按照Reader---Channel---Writer的执行顺序执行。

        5、当任务启动后,Datax job就会监控所有的TaskGroup的执行情况,当所有的TaskGroup任务完成后,job就会退出,当出现异常,就会异常退出并且进程退出值非0.

5、Datax的核心优势:

        1、可靠的数据质量监控

        2、丰富的数据转换功能

        3、精准的控制速度

        4、容错机制:

                1、线程内部重试

                        DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

                2、线程级别重试

                        目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

        5、极简的体验

6、Datax与Sqoop的区别:
功能Dataxsqoop
运行模式单进程多线程MR
分布式是不支持分布式支持
流控需要定制
统计信息支持不支持,分布式的数据收集不方便
数据校验只有core部分有校验功能不支持,分布式的数据收集不方便
监控需要定制需要定制
7、Datax部署:

1、下载jar包:

        下载路径:https://github.com/alibaba/DataX

2、解压文件,配置环境变量:  

#解压jar包
tar -zxvf datax.tar.gz

#配置环境变量:
vim /etc/profile


export   DATAX_HOME=/user/loacl/soft/datax
export   PATH=.:$PATH:$DATAX_HOME/bin

#配置好环境变量,让配置文件生效
source /etc/profile

3、使执行文件拥有执行权:

添加执行权:

chmod +x  data.py
8、Datax的使用:
在datax中会自动的生成模板的命令:

datax.py -r streamreader -w streamwriter
        1、streamreader  to  streamwriter,数据打印在控制台上面

参数说明:

"sliceRecordCount": 100  #指定打印的个数

"channel": 1  #指定并发度
#创建json文件:

vim streamreadertostreamwriter.json


{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [
                            {
                                "type":"string",
                                "value":"wyz"
                            },
                            {
                                "type":"int",
                                "value":"18"
                            }
                        ], 
                        "sliceRecordCount": 100  #指定打印的个数
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1  #指定并发度
            }
        }
    }
}



#脚本执行命令:
datax.py streamreadertostreamwriter.json
        2、mysql to mysql
1、可以通过命令获取模板:
datax.py  -r mysqlreader -w mysqlwriter 

2、可以通过github上的模板进行编写:分别是mysqlreader和mysqlwriter,参数会比较详细


3、在插入数据的需要注意是在将数据写入的时候如果出现在数据,那么此时可能是创建的表出了问题
    例如:表中的某个字段是主键,主键唯一
vim mysqlreaderTomysqlwriter.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                                "id",
							 "name",
							 "age",
							 "clazz",
							 "gender"
                        
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://192.168.226.1:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""      #不是必须要写的,作用是可以在读数据时进行一次过滤
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                                "id",
							 "name",
							 "age",
							 "clazz",
							 "gender"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.226.1:3306/bigdata25", 
                                "table": ["data_test"]
                            }
                        ], 
                        "password": "123456", 
                        "preSql": [],   #不是必须写的,作用是再写入数据前可以执行该sql
                        "session": [], 
                        "username": "root", 
                        "writeMode": "insert"   #必选,指定数据写入的模式,分成三种:insert(一般默认)、replace、update
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}




#执行脚本:
datax.py   mysqlreaderTomysqlwriter.json

 将数据写入到mysql时,写入的表是需要提前创建的。

        3、mysql to hdfs

参数解释:

使用datax的时候hdfswriter只支持两种文件形式,分别是text和orc

"fileType": "text",  #支持两种方式:text和orc,text表示的是textfile,orc表示的orcfile

"compress": "", #指定文件的压缩形式,不指定代表不用压缩,
                text支持的压缩方式:gzip,bzip2,
                orc支持的压缩方式有NONE和SNAPPY

 "writeMode": "append"   #表示的是数据在写入的操作,分成三种:
                            append,写入前不做任何处理  
                            nonconflit 如果文件存在,直接报错  
                            truncate:如果文件存在,那就先删除在写入

"path": "/bigdata25/datax/datax_mysqltohdfs/"  #文件不存在是需要提前创建的

vim mysqlTohdfs.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "stu_mysqltohdfs", 
                        "fileType": "text",  
                        "path": "/bigdata25/datax/datax_mysqltohdfs/", 
                        "writeMode": "append"                         												
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

#执行脚本:
datax.py mysqlTohdfs.json
        4、mysql to hive

 原理思想:

实际上还是将数据存入到hdfs上面,hive通过记录元数据信息来获取数据
原理:
	创建好hive表在保存在hdfs上,是有文件路径,然后通过写入到指定的hdfs文件路径就能将数据写入到hive表中
当开启hive的时候,在hive中创建的表会默认的存储hdfs的/user/hive/warehouse/目录下

前期准备:

前期准备:
启动hive(后台启动):nohup hive --service metastore &
连接hive:hive

创建hive表(在没有说明的情况下一般在都是创建一个外部表)
创建一个datax数据库:
	create database datax;
切换数据库:use datax
创建外部表:
	create external table if not exists datax_mysqltohive(
    	 id string,
         name string,
         age int,
         clazz string,
		gender string
    		)
row format delimited  fields terminated  by ',' stored as textfile

编写脚本:

vim mysqlTohive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "datax_mysqltohive1", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}


#执行脚本命令:
datax.py  mysqlTohive.json
        5、mysql to hbase(Hbase11XWriter)

参数解释:

"mode": "normal" 写hbase的模式,目前只支持normal模式

"hbaseConfig"  {"hbase.zookeeper.quorum": "***"} 描述:连接HBase集群需要的配置信息,JSON格式

"table": "writer" 表的名称,大小写比较敏感

"encoding" 编码方式

"rowkeyColumn" 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量


"versionColumn" 表示指定写入hbase的时间戳,支持当前时间、指定时间列、指定时间

前期准备:

前期准备:

启动zookeeper:
	zkServer.sh start(每一个节点上都是需要启动的)
查看zk的状态:
	zkServer.sh status 

启动hbase:
	start-hbase.sh

连接hbase:
	sqlline.py master,node1,node2

进入hbase的客户端:hbase shell 

hbase中查看表的命令:!table
退出命令 !quit
查看表:list
在hbase中创建创建表,指定表名和列簇:create 'student','cf1'
				

编写脚本:

vim mysqlTohbase.json



{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                 "writer": {
                      "name": "hbase11xwriter",
                      "parameter": {
                        "hbaseConfig": {
                          "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
                        },
                        "table": "NEW_STU",
                        "mode": "normal",
                        "rowkeyColumn": [
                            {
                              "index":0,
                              "type":"string"
                            },
                            {
                              "index":-1,
                              "type":"string",
                              "value":"_"
                            }
                        ],
                        "column": [
                          {
                            "index":1,
                            "name": "cf1:name",
                            "type": "string"
                          },
                          {
                            "index":2,
                            "name": "cf1:age",
                            "type": "int"
                          },
                          {
                            "index":3,
                            "name": "cf1:clazz",
                            "type": "string"
                          },
                          {
                            "index":4,
                            "name": "cf1:gender",
                            "type": "string"
                          }
                        ],
                        "versionColumn":{
                          "index": -1,
                          "value":"123456789"
                        },
                        "encoding": "utf-8"
                      
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }  
    }
}

#执行脚本
datax.py mysqlTohbase.json
        6、mysql增量同步数据到hive中。

最主要的工作就是在原先的mysql数据导入的hive中的基础上进行where过滤

vim mysqlTohive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": "id >20"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "datax_mysqltohive1", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}


#执行脚本命令:
datax.py  mysqlTohive.json
9、在使用datax过程中出现的错误:

1、配置文件出现错误,脚本不完整:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1514505.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pycharm配置解释器

pycharm配置解释器 1.mac配置解释器 1.mac配置解释器

【C++庖丁解牛】STL简介 | string容器初次见面

🍁你好,我是 RO-BERRY 📗 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 🎄感谢你的陪伴与支持 ,故事既有了开头,就要画上一个完美的句号,让我们一起加油 目录 1. 什么是STL2. STL的版本…

扫描全能王发布2023“绿色数据”:减碳超12万吨,相当于多种700万棵树

近年来,“绿色转型”“低碳生活”成为全民热议话题。从全国覆盖率越来越高的“垃圾分类”,到蓬勃发展的“无纸化办公”,低碳生活、绿色消费的环保风尚不断兴起。植树节将至,合合信息旗下扫描全能王发布了年度用户文档扫描数据&…

详细分析Mysql中的LOCATE函数(附Demo)

目录 1. 基本概念2. Demo3. 实战 1. 基本概念 LOCATE()函数在SQL中用于在字符串中查找子字符串的位置 它的一般语法如下: LOCATE(substring, string, start)LOCATE()函数返回子字符串在主字符串中第一次出现的位置 如果未找到子字符串,则返回0 具体的…

如何在群晖用Docker本地搭建Vocechat聊天服务并无公网ip远程交流协作

文章目录 1. 拉取Vocechat2. 运行Vocechat3. 本地局域网访问4. 群晖安装Cpolar5. 配置公网地址6. 公网访问小结 7. 固定公网地址 如何拥有自己的一个聊天软件服务? 本例介绍一个自己本地即可搭建的聊天工具,不仅轻量,占用小,且功能也停强大,它就是Vocechat. Vocechat是一套支持…

【BFS二叉树】113路径总和II

113路径总和 II 给你二叉树的根节点 root 和一个整数目标和 targetSum ,找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 思路: 题目最终输出的是路径,因此用BFS遍历的时候,需要记录走到每个节点的路径&#xff1…

ISIS单区域实验简述

ISIS 中间系统到中间系统,也是链路状态协议,工作在数据链路层,不依赖IP地址;与OSPF一样采用最短路径SPF算法,收敛速度快。 实验基础配置: r1: sys sysname r1 undo info enable int g0/0/0 ip add 12.1.1.1…

基于XMind的E-R图制作【笔记】

基于XMind的E-R图制作【笔记】 前言版权基于XMind的E-R图制作1.打开XMind2.选择模板3.插入一个自由主题4.为它插入子主题5.快速插入子主题6. 统一设置子主题样式 最后 前言 2024-3-11 10:36:33 以下内容源自《【创作模板】》 仅供学习交流使用 版权 禁止其他平台发布时删除…

js【详解】ajax (含XMLHttpRequest、 同源策略、跨域)

ajax 的核心API – XMLHttpRequest get 请求 // 新建 XMLHttpRequest 对象的实例 const xhr new XMLHttpRequest(); // 发起 get 请求,open 的三个参数为:请求类型,请求地址,是否异步请求( true 为异步,f…

程序语言设计

一、程序设计语言及其构成 1.程序设计语言 2.高级程序设计语言划分 3.常见的高级程序语言 4.标记语言 5.程序设计语言的构成 二、表达式 表达式的类型及转换规则 三、传值和传址调用 1.数据类型 2.传值和传址调用 四、语言处理程序 1.语言处理程序 语言处理程序&#xff1…

kangle一键安装脚本

Kangle一键脚本,是一款可以一键安装KangleEasypanelMySQLPHP集合的Linux脚本。 脚本本身集成:PHP5.38.2、MYSQL5.68.0,支持极速安装和编译安装2种模式,支持CDN专属安装模式。同时也对Easypanel面板进行了大量优化。 脚本特点 ◎…

FreeRTOS 的任务挂起和恢复

1. 任务挂起和恢复的 API 函数 API函数描述vTaskSuspend()挂起任务vTaskResume()恢复被挂起的任务xTaskResumeFromISR()在中断中恢复被挂起的任务 挂起:挂起任务类似暂停,可恢复; 删除任务,堆栈都给释放掉了,无法恢复…

某电信公司组织结构优化咨询项目成功案例纪实

——构建前后端组织结构,提升组织运营效率 随着企业的不断发展,行业的竞争也越来越激烈,企业只能不断调整自身的战略才能更好的适应这样的大环境。在战略调整的过程中,企业往往会面临这样的问题:管理层的经营理念各不…

从0到1快速搭建一个jeecg 企业级应用管理后台

一. 基本介绍 官网地址:https://jeecg.com/ JeecgBoot 是一款企业级的低代码平台!前后端分离架构 SpringBoot2.x,SpringCloud,Ant Design&Vue3,Mybatis-plus,Shiro,JWT 支持微服务。强大的…

Linux内核编译(版本6.0以及版本v0.01)并用qemu驱动

系统环境: ubuntu-22.04.1-desktop-amd64 目标平台: x86 i386 内核版本: linux-6.0.1 linux-0.0.1 环境配置 修改root密码 sudo passwd 修改软件源(非必要) vmtools安装(实现win-linux软件互传) 安装一些必须的软件&…

day02vue学习

day02 一、今日学习目标 1.指令补充 指令修饰符v-bind对样式增强的操作v-model应用于其他表单元素 2.computed计算属性 基础语法计算属性vs方法计算属性的完整写法成绩案例 3.watch侦听器 基础写法完整写法 4.综合案例 (演示) 渲染 / 删除 / 修…

ctf杂项总结

1.文件无法打开 1.1.文件拓展名损坏/错误导致 方法: 1.使用kali当中的file命令查看,之后修改为正确的后缀即可 2.通过16进制编辑器打开查看文件头 3.文件头残缺/错误,可以先使用kail当中的file命令查看它的类型,之后再通过 16…

[SAP ABAP] 数值向上/向下取整

ceil()函数对数值进行向上取整,floor()函数对数值进行向下取整 输出结果:

记录西门子:SCL设置不同顺序

一台搅拌的设备,需要控制三种料的进料顺序和进料重量,顺序和重量可以随便设定,也可以是几十种料。触摸屏上面有A、B、C三种液体原料,需要设定三种液体原料重量,并设定序号。 假设如下面所示设定:那将先打开…

【C++补充1】map容器

1.单映射 1.单映射&#xff0c;first:键 second:值 2.键唯一&#xff0c;如果重复&#xff0c;相同键插入 会覆盖值。 使用方法&#xff1a;pair<int, string> data(1, "Iloveyou"); 1.main int main() {//单映射//first:键 second:值//键唯一&am…