使用DataX实现mysql与hive数据互相导入导出

news2024/9/20 20:30:28

一、概论

1.1 什么是DataX

         DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

1.2 DataX 的设计

         为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步
在这里插入图片描述

1.3 框架设计

在这里插入图片描述

  • Reader:数据采集模块,负责采集数据源的数据,将数据发给Framework。
  • Wiriter: 数据写入模块,负责不断向Framwork取数据,并将数据写入到目的端。
  • Framework:用于连接read和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
    运行原理
    在这里插入图片描述
  • Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。
  • Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
  • Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5。
  • TaskGroup:负责启动Task。

1.4 Datax所支持的渠道

类型数据源读者作家(写)文件
RDBMS关系型数据库MySQL读,写
           甲骨文        √        √    读,写
SQL服务器读,写
PostgreSQL的读,写
DRDS读,写
通用RDBMS(支持所有关系型数据库)读,写
阿里云数仓数据存储ODPS读,写
美国存托凭证
开源软件读,写
OCS读,写
NoSQL数据存储OTS读,写
Hbase0.94读,写
Hbase1.1读,写
凤凰4.x读,写
凤凰5.x读,写
MongoDB读,写
蜂巢读,写
卡桑德拉读,写
无结构化数据存储文本文件读,写
的FTP读,写
HDFS读,写
弹性搜索
时间序列数据库OpenTSDB
技术开发局读,写

二、快速入门

2.1 环境搭建

下载地址: http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址: https://github.com/alibaba/DataX

配置要求:

  • Linux
  • JDK(1.8以上 建议1.8) 下载
  • Python(推荐 Python2.6.X)下载
    安装:

1) 将下载好的datax.tar.gz上传到服务器的任意节点,我这里上传到node01上的/exprot/soft
2)解压到/export/servers/

[root@node01 soft]# tar -zxvf datax.tar.gz  -C ../servers/

3)运行自检脚本

出现以下结果说明你得环境没有问题

[/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件.
在这里插入图片描述

2.2搭建环境注意事项

[/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件.

参考:

find ./* -type f -name ".*er"  | xargs rm -rf
find: paths must precede expression: |
Usage: find [-H] [-L] [-P] [-Olevel] [-D help|tree|search|stat|rates|opt|exec] [path...] [expression]


find /datax/plugin/reader/ -type f -name "._*er" | xargs rm -rf
find /datax/plugin/writer/ -type f -name "._*er" | xargs rm -rf

这里的/datax/plugin/writer/要改为你自己的目录

原文链接:https://blog.csdn.net/dz77dz/article/details/127055299

2.3读取Mysql中的数据写入到HDFS

准备
创建数据库和表并加载测试数据

create database test;
use test;
create table c_s(
   id   varchar(100) null,
    c_id int          null,
    s_id varchar(20)  null
);
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 1, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 2, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 3, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 5, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 6, '201967');

查看官方提供的模板

[root@node01 datax]# bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [],
                        "compress": "",
                        "defaultFS": "",
                        "fieldDelimiter": "",
                        "fileName": "",
                        "fileType": "",
                        "path": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据官网模板进行修改

[root@node01 datax]# vim job/mysqlToHDFS.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "c_id",
                            "s_id"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://node02:3306/test"
                                ],
                                "table": [
                                    "c_s"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "string"
                            },
                            {
                                "name": "c_id",
                                "type": "int"
                            },
                            {
                                "name": "s_id",
                                "type": "string"
                            }
                        ],
                        "defaultFS": "hdfs://node01:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "c_s.txt",
                        "fileType": "text",
                        "path": "/",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

HDFS的端口号注意版本,2.7.4 是9000;hdfs://node01:9000

MySQL的参数介绍
在这里插入图片描述
HDFS参数介绍
在这里插入图片描述
运行脚本

[root@node01 datax]# bin/datax.py  job/mysqlToHDFS.json
2020-10-02 16:12:16.358 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /export/servers/datax/hook
2020-10-02 16:12:16.359 [job-0] INFO  JobContainer -
         [total cpu info] =>
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu
                -1.00%                         | -1.00%                         | -1.00%


         [total gc info] =>
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.245s             | 0.245s             | 0.245s
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.155s             | 0.155s             | 0.155s

2020-10-02 16:12:16.359 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-10-02 16:12:16.359 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-10-02 16:12:16.360 [job-0] INFO  JobContainer -
任务启动时刻                    : 2020-10-02 16:12:04
任务结束时刻                    : 2020-10-02 16:12:16
任务总计耗时                    :                 12s
任务平均流量                    :                5B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

2.4 读取HDFS中的数据写入到Mysql

准备工作

create database test;
use test;
create table c_s2(
   id   varchar(100) null,
    c_id int          null,
    s_id varchar(20)  null
);

查看官方提供的模板

[root@node01 datax]# bin/datax.py -r hdfsreader -w mysqlwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the hdfsreader document:
     https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md

Please refer to the mysqlwriter document:
     https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [],
                        "defaultFS": "",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ",",
                        "fileType": "orc",
                        "path": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": "",
                                "table": []
                            }
                        ],
                        "password": "",
                        "preSql": [],
                        "session": [],
                        "username": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据官方提供模板进行修改

[root@node01 datax]# vim job/hdfsTomysql.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [
                            "*"
                        ],
                        "defaultFS": "hdfs://node01:8020",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "fileType": "text",
                        "path": "/c_s.txt"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "id",
                            "c_id",
                            "s_id"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://node02:3306/test",
                                "table": [
                                    "c_s2"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root",
                        "writeMode": "replace"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

脚本运行

[root@node01 datax]# bin/datax.py job/hdfsTomysql.json

         [total cpu info] =>
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu
                -1.00%                         | -1.00%                         | -1.00%


         [total gc info] =>
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.026s             | 0.026s             | 0.026s
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.015s             | 0.015s             | 0.015s

2020-10-02 16:57:13.152 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-10-02 16:57:13.152 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.033s | Percentage 100.00%
2020-10-02 16:57:13.153 [job-0] INFO  JobContainer -
任务启动时刻                    : 2020-10-02 16:57:02
任务结束时刻                    : 2020-10-02 16:57:13
任务总计耗时                    :                 11s
任务平均流量                    :                5B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

2.5将Mysql表导入Hive

1.在hive中建表

-- hive建表
CREATE TABLE student2 (
	classNo string,
	stuNo string,
	score int) 
row format delimited fields terminated by ',';


-- 构造点mysql数据
create table if not exists student2(
    classNo varchar ( 50 ),
    stuNo   varchar ( 50 ),
    score    int 
)
insert into student2 values('1001','1012ww10087',63);
insert into student2 values('1002','1012aa10087',63);
insert into student2 values('1003','1012bb10087',63);
insert into student2 values('1004','1012cc10087',63);
insert into student2 values('1005','1012dd10087',63);
insert into student2 values('1006','1012ee10087',63);

2.编写mysql2hive.json配置文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "table": [
                                    "student2"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.43.10:3306/mytestmysql"
                                ]
                            }
                        ],
                        "column": [
                            "classNo",
                            "stuNo",
                            "score"
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://192.168.43.10:9000",
                        "path": "/hive/warehouse/home/myhive.db/student2",
                        "fileName": "myhive",
                        "writeMode": "append",
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "column": [
                            {
                                "name": "classNo",
                                "type": "string"
                            },
                            {
                                "name": "stuNo",
                                "type": "string"
                            },
                            {
                                "name": "score",
                                "type": "int"
                            }
                        ]
                    }
                }
            }
        ]
    }
}

3.运行脚本

bin/datax.py job/mysql2hive.json 

4.查看hive表是否有数据

2.6将Hive表数据导入Mysql

1.要先在mysql建好表

create table if not exists student(
    classNo varchar ( 50 ),
    stuNo   varchar ( 50 ),
    score    int 
)

2.hive2mysql.json配置文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/hive/warehouse/home/myhive.db/student/*",
                        "defaultFS": "hdfs://192.168.43.10:9000",
                        "column": [
                               {
                                "index": 0,
                                "type": "string"
                               },
                                                           {
                                "index": 1,
                                "type": "string"
                               },
                               {
                                "index": 2,
                                "type": "Long"
                               }
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }

                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "classNo",
                            "stuNo",
                            "score"
                        ],
                        "preSql": [
                            "delete from student"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.43.10:3306/mytestmysql?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "student"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

注意事项:

在Hive的ODS层建表语句中,以“,”为分隔符;
fields terminated by ','
在DataX的json文件中,也以“,”为分隔符。
"fieldDelimiter": "," 与hive表里面的分隔符保持一致即可

由于DataX不能完全支持所有Hive表的数据类型,应将DataX启动文件中的hdfsreader中的column字段的类型改成DataX支持的类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/812375.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【外卖系统】分类管理业务

公共字段自动填充 需求分析 对于之前的开发中,有创建时间、创建人、修改时间、修改人等字段,在其他功能中也会有出现,属于公共字段,对于这些公共字段最好是在某个地方统一处理以简化开发,使用Mybatis Plus提供的公共…

简单记录牛客top101算法题(初级题C语言实现)判断回文字符串 反转字符串 合并两个有序的数组

1. 判断是否为回文字符串 给定一个长度为 n 的字符串,请编写一个函数判断该字符串是否回文。如果是回文请返回true,否则返回false。   字符串回文指该字符串正序与其逆序逐字符一致。 //示例 输入:"ranko" 返回值:fa…

DevOps系列文章之 自动化测试大全(单测和集成测试)

自动化测试业界主流工具 核心目标: 主要是功能测试和覆盖率测试 业界常用主流工具 GoogleTest GoogleTest是一个跨平台的(Liunx、Mac OS X、Windows 、Cygwin 、Windows CE and Symbian ) C单元测试框架,由google公司发布,为在不同平台上为编…

案例:缺陷个数与返工工作量强相关

某公司积累了21个项目缺陷个数与返工工作量的数据,如下表所示: 项目序号缺陷修复工时缺陷数1943314452299040536347446471385496071061370246774066812232189276652810830213781162678126111511381110514209032015144023516516078417710010301875601239…

3.Makefile变量的用法(附示例)

一、本节概要 本专栏所有内容围绕Makefile官方文档进行刨析,给出详细具体示例做辅助理解手撕Makefile官方手册 二、Makefile中的变量 1、没有使用变量的makefile 以下是不使用变量的makefile完整示例: edit: main.o kbd.o command.o display.o insert.o search.o files…

django channels实战(websocket底层原理和案例)

1、websocket相关 1.1、轮询 1.2、长轮询 1.3、websocket 1.3.1、websocket原理 1.3.2、django框架 asgi.py在django项目同名app目录下 1.3.3、聊天室 django代码总结 小结 1.3.4、群聊(一) 前端代码 后端代码 1.3.5、群聊(二&#xff09…

网络编程 IO多路复用 [epoll版] (TCP网络聊天室)

//head.h 头文件 //TcpGrpSer.c 服务器端 //TcpGrpUsr.c 客户端 通过IO多路复用实现服务器在单进程单线程下可以与多个客户端交互 API epoll函数 #include<sys/epoll.h> int epoll_create(int size); 功能&#xff1a;创建一个epoll句柄//创建红黑树根…

TypeScript算法题实战——剑指 Offer篇(5)

目录 一、平衡二叉树1.1、题目描述1.2、题解 二、数组中数字出现的次数2.1、题目描述2.2、题解 三、数组中数字出现的次数 II3.1、题目描述3.2、题解 四、和为s的两个数字4.1、题目描述4.2、题解 五、和为s的连续正数序列5.1、题目描述5.2、题解 六、翻转单词顺序6.1、题目描述…

《cuda c编程权威指南》01- 用gpu输出hello world

学习一门新语言的最好方式就是用它来编写程序。 目录 1. 使用cpu输出hello world 2. 使用gpu输出hello world 3. CUDA编程结构 1. 使用cpu输出hello world hello.cu #include <stdio.h>void helloFromCPU() {printf("hello world from cpu!\n"); }int m…

Zabbix分布式监控配置和使用

目录 1 Zabbix监控的配置流程2 添加主机组3 添加模板4 添加主机5 配置图形6 配置大屏7 新建监控项7.1 简介7.2 添加监控项7.3 查看数据7.4 图表 8 新建触发器8.1 概述8.2 添加触发器8.3 显示触发器状态 1 Zabbix监控的配置流程 在Zabbix-Web管理界面中添加一个主机&#xff0c;…

【Golang 接口自动化00】为什么要用Golang做自动化?

目录 为什么使用Golang做自动化 最终想实现的效果 怎么做&#xff1f; 写在后面 资料获取方法 为什么使用Golang做自动化 顺应公司的趋势学习了Golang之后&#xff0c;因为没有太多时间和项目来实践&#xff0c;怕止步于此、步Java缺少练习遗忘殆尽的后尘&#xff0c;决定…

【C++进阶之路】多态篇

文章目录 前言一、概念1.分类2.实现条件①重写虚函数1.1总结三重1.2 final与override ②父类的指针或者引用2.1普通调用VS多态调用 3.抽象类3.1. 纯虚函数3.2. 接口继承和实现继承 二、原理及使用1.虚函数表 —— 虚表2.默认成员函数2.1构造函数2.2析构函数 3. 多继承3.1普通的…

python速成之循环分支结构学习

循环结构 应用场景 我们在写程序的时候&#xff0c;一定会遇到需要重复执行某条或某些指令的场景。例如用程序控制机器人踢足球&#xff0c;如果机器人持球而且还没有进入射门范围&#xff0c;那么我们就要一直发出让机器人向球门方向移动的指令。在这个场景中&#xff0c;让…

AD21原理图的高级应用(六)原理图设计片段的使用

&#xff08;六&#xff09;原理图设计片段的使用 Altium Designer 的片段功能可以很方便地重复使用一些单元模块,其中包括原理图的电路模块、PCB(包括布线)和代码模块。例如在工程中需要设计电源模块,而别的工程中又恰好有比较完善的电源模块,这时就可以通过片段功能重复地使用…

一文了解 Android 车机如何处理中控的旋钮输入?

前言 上篇文章《从实体按键看 Android 车载的自定义事件机制》带大家了解了 Android 车机支持自定义输入的机制 CustomInputService。事实上&#xff0c;除了支持自定义事件&#xff0c;对于中控上常见的音量控制、焦点控制的旋钮事件&#xff0c;Android 车机也是支持的。 那…

测试|测试用例方法篇

测试|测试用例方法篇 文章目录 测试|测试用例方法篇1.测试用例的基本要素&#xff1a;测试环境&#xff0c;操作步骤&#xff0c;测试数据&#xff0c;预期结果…2.测试用例带来的好处3.测试用例的设计思路&#xff0c;设计方法&#xff0c;具体设计方法之间的关系**设计测试用…

linux设备驱动的poll与fasync

什么是fasync 在 Linux 驱动程序中&#xff0c;fasync 是一种机制&#xff0c;用于在异步事件发生时通知进程。它允许进程在等待设备事件时&#xff0c;不必像传统的轮询方式那样持续地查询设备状态。 具体来说&#xff0c;当进程调用 fcntl(fd, F_SETFL, O_ASYNC) 函数时&am…

lib-flexible修改配置适配更多不同分辨率

找到设置宽度的地方 然后根据你的屏幕最大多大呀&#xff0c;最小多小呀设置一下 if (width / dpr < 1980) { width 1980 * dpr; } else if (width / dpr > 5760) { width 5760 * dpr; }

Python 教程之标准库概览

概要 Python 标准库非常庞大&#xff0c;所提供的组件涉及范围十分广泛&#xff0c;使用标准库我们可以让您轻松地完成各种任务。 以下是一些 Python3 标准库中的模块&#xff1a; 「os 模块」 os 模块提供了许多与操作系统交互的函数&#xff0c;例如创建、移动和删除文件和…

【Linux】进程篇Ⅱ:进程开始、进程终止、进程等待

文章目录 五、fork 函数&#xff0c;创建进程写时拷贝 六、进程终止1. 退出码2. 如何终止程序 七、进程等待1. 概念2. wait 函数waitpid 函数 &#x1f53a; 3. 阻塞等待 五、fork 函数&#xff0c;创建进程 #include <unistd.h>   pid_t fork(void);   返回值&#xf…