DataX介绍

news2025/1/11 6:04:34

一、介绍

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
github地址
详细文档
操作手册

支持数据框架如下:
在这里插入图片描述
在这里插入图片描述在这里插入图片描述
在这里插入图片描述在这里插入图片描述

架构
在这里插入图片描述
Reader:为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer:为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

二、使用

  1. 下载

下载地址:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz

  1. 解压缩
# 解压缩
tar -zxvf datax.tar.gz -C /opt/module/
  1. 编写数据同步任务
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,datax"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}

  1. 启动任务
python /opt/module/datax/bin/datax.py /opt/module/datax/job/stream_to_stream.json
  1. 执行结果

在这里插入图片描述

  1. 配置说明
参数说明
job.setting设置全局配置参数
job.setting.speed控制任务速度配置参数,包括:channel(通道(并发))、record(字节流)、byte(记录流)等三种模式
job.setting.speed.channel并发数
job.setting.speed.record字节流
job.setting.speed.byte记录流
job.setting.errorLimit设置错误限制
job.setting.errorLimit.record指定允许的最大错误记录数
job.setting.errorLimit.percentage指定允许的最大错误记录百分比
job.setting.dirtyDataPath设置错误限制
job.setting.dirtyDataPath.path设置错误限制
job.setting.log设置错误限制
job.setting.log.level设置错误限制
job.setting.log.dir设置错误限制
content任务配置参数
readerReader配置
nameReader类型
parameterReader具体配置(具体配置查看具体Reader)
writerWriter配置
nameWriter类型
parameterWriter具体配置(具体配置查看具体Writer)

三、常用配置

3.1、MysqlReader

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "splitPk": "db_id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

配置说明:
jdbcUrl:链接地址
username:mysql用户名
password:mysql密码
table:待同步的表名
column:所配置的表中需要同步的列名集合,可以使用使用*代表所有字段
splitPk:使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能
where:筛选条件
querySql:sql语句,可以替代column和where配置

3.2、MysqlWriter

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                 "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 19880808,
                                "type": "long"
                            },
                            {
                                "value": "1988-08-08 08:08:08",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "test",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 1000
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

配置说明:
jdbcUrl:链接地址
username:mysql用户名
password:mysql密码
table:待同步的表名
column:所配置的表中需要同步的列名集合,可以使用使用*代表所有字段
preSql:写入数据到目的表前,会先执行这里的标准语句
postSql:写入数据到目的表后,会执行这里的标准语句
writeMode:控制写入数据到目标表采用insert into或者replace into或者 ON DUPLICATE KEY UPDATE语句
batchSize:一次性批量提交的记录数大小

3.3、HdfsReader

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/user/hive/warehouse/mytable01/*",
                        "defaultFS": "hdfs://xxx:port",
                        "column": [
                               {
                                "index": 0,
                                "type": "long"
                               },
                               {
                                "index": 1,
                                "type": "boolean"
                               },
                               {
                                "type": "string",
                                "value": "hello"
                               },
                               {
                                "index": 2,
                                "type": "double"
                               }
                        ],
                        "fileType": "orc",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }

                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true
                    }
                }
            }
        ]
    }
}

配置说明:
path:文件路径
defaultFS:namenode节点地址
fileType:文件的类型,目前支持:”text”、”orc”、”rc”、”seq”、”csv”
column:读取字段列表
fieldDelimiter:读取的字段分隔符
encoding:读取文件的编码配置
nullFormat:文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null
haveKerberos:是否有Kerberos认证,默认false
kerberosKeytabFilePath:Kerberos认证keytab文件路径,且为绝对路径
kerberosPrincipal:Kerberos认证Principal名
hadoopConfig:hadoop相关的一些高级参数

3.4、HdfsWriter

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "long"
                            },
                            {
                                "index": 1,
                                "type": "long"
                            },
                            {
                                "index": 2,
                                "type": "long"
                            },
                            {
                                "index": 3,
                                "type": "long"
                            },
                            {
                                "index": 4,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 5,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 6,
                                "type": "STRING"
                            },
                            {
                                "index": 7,
                                "type": "STRING"
                            },
                            {
                                "index": 8,
                                "type": "STRING"
                            },
                            {
                                "index": 9,
                                "type": "BOOLEAN"
                            },
                            {
                                "index": 10,
                                "type": "date"
                            },
                            {
                                "index": 11,
                                "type": "date"
                            }
                        ],
                        "fieldDelimiter": "\t"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://xxx:port",
                        "fileType": "orc",
                        "path": "/user/hive/warehouse/writerorc.db/orcfull",
                        "fileName": "xxxx",
                        "column": [
                            {
                                "name": "col1",
                                "type": "TINYINT"
                            },
                            {
                                "name": "col2",
                                "type": "SMALLINT"
                            },
                            {
                                "name": "col3",
                                "type": "INT"
                            },
                            {
                                "name": "col4",
                                "type": "BIGINT"
                            },
                            {
                                "name": "col5",
                                "type": "FLOAT"
                            },
                            {
                                "name": "col6",
                                "type": "DOUBLE"
                            },
                            {
                                "name": "col7",
                                "type": "STRING"
                            },
                            {
                                "name": "col8",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "col9",
                                "type": "CHAR"
                            },
                            {
                                "name": "col10",
                                "type": "BOOLEAN"
                            },
                            {
                                "name": "col11",
                                "type": "date"
                            },
                            {
                                "name": "col12",
                                "type": "TIMESTAMP"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}

配置说明:
path:存储到Hadoop Hdfs文件系统的路径信息
defaultFS:namenode节点地址
fileType:文件的类型,目前支持:“text”或“orc”
fileName:文件名
column:写入字段列表
fieldDelimiter:读取的字段分隔符
compress:文件压缩类型

3.5、FtpReader

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "ftpreader",
                    "parameter": {
                        "protocol": "sftp",
                        "host": "127.0.0.1",
                        "port": 22,
                        "username": "xx",
                        "password": "xxx",
                        "path": [
                            "/home/hanfa.shf/ftpReaderTest/data"
                        ],
                        "column": [
                            {
                                "index": 0,
                                "type": "long"
                            },
                            {
                                "index": 1,
                                "type": "boolean"
                            },
                            {
                                "index": 2,
                                "type": "double"
                            },
                            {
                                "index": 3,
                                "type": "string"
                            },
                            {
                                "index": 4,
                                "type": "date",
                                "format": "yyyy.MM.dd"
                            }
                        ],
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "ftpWriter",
                    "parameter": {
                        "path": "/home/hanfa.shf/ftpReaderTest/result",
                        "fileName": "shihf",
                        "writeMode": "truncate",
                        "format": "yyyy-MM-dd"
                    }
                }
            }
        ]
    }
}

配置说明:
protocol:ftp服务器协议,目前支持传输协议有ftp和sftp
host:ftp服务器地址
port:ftp服务器端口
timeout:连接ftp服务器连接超时时间,单位毫秒,默认:60000
connectPattern:连接模式(主动模式或者被动模式)
username:用户名
password:密码
path:路径
column:读取字段列表
fieldDelimiter:读取的字段分隔符

3.6、FtpWriter

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {},
                "writer": {
                    "name": "ftpwriter",
                    "parameter": {
                        "protocol": "sftp",
                        "host": "***",
                        "port": 22,
                        "username": "xxx",
                        "password": "xxx",
                        "timeout": "60000",
                        "connectPattern": "PASV",
                        "path": "/tmp/data/",
                        "fileName": "yixiao",
                        "writeMode": "truncate|append|nonConflict",
                        "fieldDelimiter": ",",
                        "encoding": "UTF-8",
                        "nullFormat": "null",
                        "dateFormat": "yyyy-MM-dd",
                        "fileFormat": "csv",
			"suffix": ".csv",
                        "header": []
                    }
                }
            }
        ]
    }
}

配置说明:
protocol:ftp服务器协议,目前支持传输协议有ftp和sftp
host:ftp服务器地址
port:ftp服务器端口
timeout:连接ftp服务器连接超时时间,单位毫秒,默认:60000
connectPattern:连接模式(主动模式或者被动模式)
username:用户名
password:密码
path:路径
fileName:文件名
fieldDelimiter:读取的字段分隔符

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1424023.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LLM 推理优化探微 (1) :Transformer 解码器的推理过程详解

编者按:随着 LLM 赋能越来越多需要实时决策和响应的应用场景,以及用户体验不佳、成本过高、资源受限等问题的出现,大模型高效推理已成为一个重要的研究课题。为此,Baihai IDP 推出 Pierre Lienhart 的系列文章,从多个维…

2024年小年是哪一天?小年习俗记到手机便签

随着春节的临近,我们即将迎来一个重要的传统节日——“小年”。那么2024年小年是哪一天呢?关于2024年小年的具体日期,地域不同,节日时间有所不同。在北方,小年通常是在腊月二十三,即2月2日;而在…

locust--python实现的分布式性能测试工具

1.locust特点: 1.1 支持Python编写测试用例方案; 1.2 使用requests发送http请求; 1.3 使用协程实现,高并发时消耗更低; 1.4 使用Flask提供 Web UI; 1.5 有第三方插件支持扩展; 2.创建locust 性能…

【MySQL】学习并使用聚合函数和DQL进行分组查询

🌈个人主页: Aileen_0v0 🔥热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​💫个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-t8K8tl6eNwqdFmcD {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…

canvas自定义扩展方法:文字自动换行

查看专栏目录 canvas实例应用100专栏,提供canvas的基础知识,高级动画,相关应用扩展等信息。canvas作为html的一部分,是图像图标地图可视化的一个重要的基础,学好了canvas,在其他的一些应用上将会起到非常重…

15. 三数之和(力扣LeetCode)

文章目录 15. 三数之和题目描述双指针去重逻辑的思考a的去重b与c的去重 15. 三数之和 题目描述 给你一个整数数组 nums ,判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k ,同时还满足 nums[i] nums[j] nums[k] 0 。请 …

hbuilderx uniapp运行到真机控制台显示手机端调试基座版本号1.0.0,调用uni.share提示打包时未添加share模块

记录一个困扰了几天的一个蠢问题,发现真相的我又气又笑。 由于刚开始接触uniapp 移动端开发,有个需求需要使用uni.share API,但是我运行项目老提示打包时没配置share模块 我确实没在manifest内配置。网上搜了一些资料,但是我看官…

MySQL判断两个时间段是否重合

前提 新增的数据不能和数据库的时间有重合部分。 如图,4种重合情况和2种不重合情况。 时间段 a,b 数据库字段 start_time,end_time 第一种写法 列举每一种重合的情况: SELECT * FROM table WHERE(start_time > a and en…

大数据开发之离线数仓项目(用户行为采集平台)(可面试使用)

第 1 章:数据仓库概念 数据仓库,是为企业指定决策,提供数据支持的,可以帮助企业,改进业务流程、提高产品质量等。 数据仓库的输入数据通常包括:业务数据、用户行为数据和爬虫数据等。 业务数据&#xff1a…

写静态页面——粘性定位练习

0、效果&#xff1a; 1、HTML代码&#xff1a;为了简洁采用内部样式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"…

企业网络基础架构监控工具

IT 基础架构已成为提供基本业务服务的基石&#xff0c;无论是内部管理操作还是为客户托管的应用程序服务&#xff0c;监控 IT 基础设施至关重要&#xff0c;并且已经建立起来&#xff0c;SMB IT 基础架构需要简单的网络监控工具来监控性能和报告问题。通常&#xff0c;几个 IT …

【HTML】自定义属性(data)

自定义属性 data: 的用法&#xff08;如何设置,如何获取) &#xff0c;有何优势&#xff1f; data-* 的值的获取和设置&#xff0c;2种方法: 传统方法 getAttribute() 获取 data- 属性值; setAttribute() 设置 data- 属性值getAttribute() 获取 data- 属性值; setAttribute()…

强大的虚拟机Parallels Desktop 19 mac中文激活

Parallels Desktop是一款功能全面、易于使用的虚拟机软件&#xff0c;它为用户提供了在Mac电脑上同时运行多个操作系统的便利。 软件下载&#xff1a;Parallels Desktop 19 mac中文激活版下载 Parallels Desktop 19 mac具有快速启动和关闭虚拟机的能力&#xff0c;让用户能够迅…

怎么使用cmd命令来进行Vue脚手架的项目搭建

前言 使用vue搭建项目的时候&#xff0c;我们可以通过对应的cmd命令去打开脚手架&#xff0c;然后自己配置对应的功能插件 怎么打开 我们打开对应的cmd命令之后就开始进入对应的网站搭建 vue ui 然后我们就打开对应的项目管理器来进行配置----这里我们打开开始创建新的项目…

问题:第十三届全国人民代表大会第四次会议召开的时间是()。 #经验分享#知识分享#媒体

问题&#xff1a;第十三届全国人民代表大会第四次会议召开的时间是&#xff08;&#xff09;。 A. 2018年3月3日至3月11日 B. 2019年3月5日至3月11日 C. 2020年3月5日至3月11日 D. 2021年3月5日至3月11日 参考答案如图所示 问题&#xff1a;顾客满意是顾客对一件产品满足…

MacOS X 中 OpenGL 环境搭建 Makefile的方式

1&#xff0c;预备环境 安装 brew&#xff1a; /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" 安装glfw&#xff1a; brew install glfw 安装glew&#xff1a; brew install glew 2.编译 下载源代码…

《区块链简易速速上手小册》第7章:区块链在其他行业的应用(2024 最新版)

文章目录 7.1 供应链管理7.1.1 供应链管理中区块链的基础7.1.2 主要案例&#xff1a;食品安全追踪7.1.3 拓展案例 1&#xff1a;制药供应链7.1.4 拓展案例 2&#xff1a;汽车行业的零部件追踪 7.2 区块链在医疗保健中的应用7.2.1 医疗保健中区块链的基础7.2.2 主要案例&#xf…

Kafka下载安装及基本使用

目录 Kafka介绍 消息队列的作用 消息队列的优势 应用解耦 异步提速 削峰填谷 为什么要用Kafka Kafka下载安装 Kafka快速上手&#xff08;单机体验&#xff09; 1. 启动zookeeper服务 2. 启动kafka服务 3. 简单收发消息 Kakfa的消息传递机制 Kafka介绍 Apache Kafka…

结构体与共用体——C语言——day15

在C语言中&#xff0c;C语言允许用户自己指定这样一种数据结构&#xff0c;它称为结构体(structure) 。它相当于其他高级语言中的“记录”。 假设程序中要用到图所表示的数据结构&#xff0c;但是C语言没有提供这种现成的数据类型&#xff0c;因此用户必须要在程序中建立所需的…

vue3学习——初始化项目及配置

初始化项目 环境 node 16pnpm 8.0.0 命令 pnpm create vite进行以下选择 &#x1f447; – 项目名 – VUe – Ts – cd/目录 – pnpm run dev 浏览器自动打开 package.json 配置eslint 安装依赖包 pnpm i eslint -D npx eslint --init // 生成配置文件进行以下选择 &a…