DataX简介、部署、原理和使用介绍

news2024/12/28 20:56:29

DataX简介、部署、原理和使用介绍

1.DataX简介

1-1.项目地址

项目地址:https://github.com/alibaba/DataX

官方文档:https://github.com/alibaba/DataX/blob/master/introduction.md

1-2.DataX概述

​ DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能

​ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通

1-3.DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL读 、写
Oracle读 、写
OceanBase读 、写
SQLServer读 、写
PostgreSQL读 、写
DRDS读 、写
Kingbase读 、写
通用RDBMS(支持所有关系型数据库)读 、写
阿里云数仓数据存储ODPS读 、写
ADB
ADS
OSS读 、写
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件datahub读 、写
SLS读 、写
阿里云图数据库GDB读 、写
NoSQL数据存储OTS读 、写
Hbase0.94读 、写
Hbase1.1读 、写
Phoenix4.x读 、写
Phoenix5.x读 、写
MongoDB读 、写
Cassandra读 、写
数仓数据存储StarRocks读 、写
ApacheDoris
ClickHouse
Databend
Hive读 、写
kudu
无结构化数据存储TxtFile读 、写
FTP读 、写
HDFS读 、写
Elasticsearch
时间序列数据库OpenTSDB
TSDB读 、写
TDengine读 、写

1-4.DataX特点

  • 可靠的数据质量监控
  • 丰富的数据转换功能
  • 精准的速度控制
  • 强劲的同步性能
  • 健壮的容错机制
  • 极简的使用体验

2.DataX原理

2-1.DataX设计理念

  1. 异构数据源同步问题,就是不同框架之间同步数据时,相同的数据在不同框架中具有不同的数据结构。
  2. DataX的设计理念:
    DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接数据各种数据源。
    当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

2-2.DataX框架设计

DataX本身作为离线数据同步框架,采用Framework+plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中

  • Reader

    • 数据采集模块,负责采集数据源的数据,将数据发送给Framework
  • Writer

    • 数据写入模块,负责不断从Framework取数据,并将数据写出到目的端。
  • Framework

    • 主题框架,用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题

Framework的几大功能

  • 缓冲

ReaderWriter 可能会有读写速度不一致的情况,所以中间需要一个组件作为缓冲

  • 流控

控制数据传输的速度,DataX 可以随意根据需求调整数据传输速度

  • 并发

并发的同步或写入数据

  • 数据转换

既然是异构,那么说明读 Reader 的数据源与 写 Writer 的数据源 数据结构可能不同,数据结构不同的话,需要做数据转换操作,转换也在 Framework 中完成

2-3.DataX运行流程

DataX支持单机多线程模式完成同步作业,下面用一个DataX作业生命周期的时序图,用以说明DataX的运行流程、核心概念以及每个概念的关系

  • 核心模块介绍:
  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

2-4.DataX调度策略

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

3.DataX安装部署

# 下载安装包
[wangting@hdt-dmcp-ops05 ~]$ cd /opt/software/
[wangting@hdt-dmcp-ops05 software]$ wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz
# 解压安装包
[wangting@hdt-dmcp-ops05 software]$ tar -xf datax.tar.gz -C /opt/module
[wangting@hdt-dmcp-ops05 software]$ cd /opt/module/datax/
# DataX自检任务
[wangting@hdt-dmcp-ops05 datax]$ python bin/datax.py job/job.json
2023-02-15 19:01:48.489 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-02-15 19:01:38
任务结束时刻                    : 2023-02-15 19:01:48
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0
# 成功执行自检任务

4.DataX使用介绍

4-1.同步MySQl全量数据到HDFS案例

将MySQL的全量数据,利用DataX工具同步至HDFS
1.查看MySQL被迁移的数据情况
2.根据需求确定reader为mysqlreader,writer为hdfswriter

查看reader和writer模板的方式(-r 读模板; -w 写模板):

[wangting@hdt-dmcp-ops05 datax]$ python bin/datax.py -r mysqlreader -w hdfswriter

3.编写同步json脚本
4.确定HDFS上目标路径是否存在
5.通过datax.py指定json任务运行同步数据
6.数据验证,查看HDFS上是否已经有MySQL对应表中的所有数据

这里先跑通一个实验案例,再根据操作来总结

  • MySQL数据:
[wangting@hdt-dmcp-ops05 ~]$ mysql -uroot -p123456 -Dwangtingdb
mysql> select * from test;
+------+---------+
| id   | name    |
+------+---------+
|    1 | wang111 |
|    2 | wang222 |
|    3 | wang333 |
+------+---------+
3 rows in set (0.00 sec)
  • 同步任务定义:
[wangting@hdt-dmcp-ops05 datax]$ vim job/mysql2hdfs.json

mysql2hdfs.json内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": ["id","name"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://hdt-dmcp-ops05:3306/wangtingdb"],
                "table": ["test"]
              }
            ],
            "password": "123456",
            "username": "root",
            "splitPk": ""
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {"name": "id","type": "bigint"},
              {"name": "name","type": "string"}
            ],
            "compress": "gzip",
            "defaultFS": "hdfs://hdt-dmcp-ops01:8020",
            "fieldDelimiter": "\t",
            "fileName": "test",
            "fileType": "text",
            "path": "/test",
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}
  • 任务执行:
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -mkdir /test
2023-02-15 19:25:04,683 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
[wangting@hdt-dmcp-ops05 datax]$ python bin/datax.py job/mysql2hdfs.json
...
...
2023-02-15 19:25:43.656 [job-0] INFO  JobContainer -
         [total cpu info] =>
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu
                -1.00%                         | -1.00%                         | -1.00%


         [total gc info] =>
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.022s             | 0.022s             | 0.022s
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.010s             | 0.010s             | 0.010s

2023-02-15 19:25:43.656 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-15 19:25:43.656 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3 records, 24 bytes | Speed 2B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s|  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-02-15 19:25:43.657 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-02-15 19:25:32
任务结束时刻                    : 2023-02-15 19:25:43
任务总计耗时                    :                 11s
任务平均流量                    :                2B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0
  • 验证
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -ls /test
2023-02-15 19:28:48,080 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
-rw-r--r--   3 wangting supergroup         43 2023-02-15 19:25 /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -get /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz ~/
2023-02-15 19:29:41,378 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-15 19:29:41,786 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
[wangting@hdt-dmcp-ops05 datax]$ cd ~
[wangting@hdt-dmcp-ops05 ~]$ ll
total 8
drwxrwxr-x 2 wangting wangting 4096 Feb 15 15:13 bin
-rw-r--r-- 1 wangting wangting   43 Feb 15 19:29 test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[wangting@hdt-dmcp-ops05 ~]$ gunzip test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[wangting@hdt-dmcp-ops05 ~]$ ll
total 8
drwxrwxr-x 2 wangting wangting 4096 Feb 15 15:13 bin
-rw-r--r-- 1 wangting wangting   30 Feb 15 19:29 test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4
[wangting@hdt-dmcp-ops05 ~]$ cat test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4
1       wang111
2       wang222
3       wang333

截至到这里,可以看到最终数据文件的内容和原MySQL数据匹配的上

总结:

**MysqlReader插件介绍:**实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的SQL语句将数据从mysql库中select出来。

**MysqlReader插件原理:**MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

**HdfsWriter插件介绍:**提供项HDFS文件系统指定路径中写入TextTile和OrcFile类型的文件,文件内容可与Hive表相关联。

**HdfsWriter:**插件实现过程:首先根据用户指定的path,创建一个hdfs文件系统上的不存在的临时目录,创建规则是:path_随机;然后将读取的文件写入到这个临时目录中;待到全部写入后,再将这个临时目录下的文件移动到用户所指定的目录下,(在创建文件时保证文件名不重复);最后删除临时目录。如果在中间过程中发生网络中断等情况,造成无法与hdfs建立连接,需要用户手动删除已经写入的文件和临时目录

4-2.同步MySQl需求数据到HDFS案例(where)

相对上个案例的变化:

1.增加了where关键词,过滤同步的数据范围

2.去除了压缩格式:"compress": "gzip"

3.更换了分隔符,由原\t变成不可见分隔字符\u0001

[wangting@hdt-dmcp-ops05 datax]$ vim job/mysql2hdfs_2.json

mysql2hdfs_2.json脚本内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": ["id","name"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://hdt-dmcp-ops05:3306/wangtingdb"],
                "table": ["test"]
              }
            ],
            "password": "123456",
            "username": "root",
            "where": "id>=2",
            "splitPk": ""
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {"name": "id","type": "bigint"},
              {"name": "name","type": "string"}
            ],
            "defaultFS": "hdfs://hdt-dmcp-ops01:8020",
            "fieldDelimiter": "\u0001",
            "fileName": "test",
            "fileType": "text",
            "path": "/test",
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

执行任务:

[wangting@hdt-dmcp-ops05 datax]$ python bin/datax.py job/mysql2hdfs_2.json
...
...
2023-02-16 10:37:05.503 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-16 10:37:05.504 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 16 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s|  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-02-16 10:37:05.505 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-02-16 10:36:54
任务结束时刻                    : 2023-02-16 10:37:05
任务总计耗时                    :                 11s
任务平均流量                    :                1B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

验证:

[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -ls /test
2023-02-16 10:37:45,122 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 2 items
-rw-r--r--   3 wangting supergroup         20 2023-02-16 10:36 /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
-rw-r--r--   3 wangting supergroup         43 2023-02-15 19:25 /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -cat /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
2023-02-16 10:38:07,714 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-16 10:38:08,111 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2wang222
3wang333

可以看到只有满足id>=2的2条数据被写入到了HDFS,直接看似乎没有分隔符,字符相连了,把文件下载到本地再次验证

[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -get /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224 ~/
2023-02-16 10:38:39,679 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-16 10:38:40,079 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
[wangting@hdt-dmcp-ops05 datax]$ vim ~/test__00adf34b_234b_49d8_8ba6_bd3b94881224

注意,cat文件是看不到特殊分隔符的

4-3.同步MySQl需求数据到HDFS案例(传条件参数)

​ 在生产环境中,离线数据同步任务需要在任务调度平台每日定时重复执行去拉取某个时间窗口的数据,例如每日同步T-1的数据到HDFS,但脚本中如果写了固定日期,每日任务都需要修改日期条件,显然不合理。因此为实现这个业务需求,需要使用DataX的传参功能。

创建测试表:

[wangting@hdt-dmcp-ops05 datax]$ mysql -uroot -p123456
mysql> use wangtingdb;
mysql> create table test_2(id int(11),name varchar(20),updated datetime);
insert into test_2 value(1,"wangting111","2023-02-13 15:13:42");
insert into test_2 value(2,"wangting222","2023-02-13 21:22:12");
insert into test_2 value(3,"wangting333","2023-02-14 09:15:04");
insert into test_2 value(4,"wangting444","2023-02-14 18:00:32");
insert into test_2 value(5,"wangting555","2023-02-15 13:44:30");
insert into test_2 value(6,"wangting666","2023-02-15 22:13:41");
insert into test_2 value(7,"wangting777","2023-02-16 12:22:30");
insert into test_2 value(8,"wangting888","2023-02-16 23:14:52");

mysql> select * from test_2;
+------+-------------+---------------------+
| id   | name        | updated             |
+------+-------------+---------------------+
|    1 | wangting111 | 2023-02-13 15:13:42 |
|    2 | wangting222 | 2023-02-13 21:22:12 |
|    3 | wangting333 | 2023-02-14 09:15:04 |
|    4 | wangting444 | 2023-02-14 18:00:32 |
|    5 | wangting555 | 2023-02-15 13:44:30 |
|    6 | wangting666 | 2023-02-15 22:13:41 |
|    7 | wangting777 | 2023-02-16 12:22:30 |
|    8 | wangting888 | 2023-02-16 23:14:52 |
+------+-------------+---------------------+
8 rows in set (0.01 sec)

当前时间为20230216,

拟定2个变量:

START_FLAG=date -d"1 day ago" +%Y%m%d
END_FLAG=date +%Y%m%d

[wangting@hdt-dmcp-ops05 datax]$ date -d"1 day ago" +%Y%m%d
20230215
[wangting@hdt-dmcp-ops05 datax]$ date +%Y%m%d
20230216

编写同步脚本任务

[wangting@hdt-dmcp-ops05 datax]$ vim job/mysql2hdfs_3.json

mysql2hdfs_3.json脚本任务内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": ["id","name","updated"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://hdt-dmcp-ops05:3306/wangtingdb"],
                "table": ["test_2"]
              }
            ],
            "password": "123456",
            "username": "root",
            "where": "updated>=${START_FLAG} AND updated<${END_FLAG}",
            "splitPk": ""
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {"name": "id","type": "bigint"},
              {"name": "name","type": "string"}
              {"name": "updated","type": "string"}
            ],
            "defaultFS": "hdfs://hdt-dmcp-ops01:8020",
            "fieldDelimiter": "\t",
            "fileName": "test",
            "fileType": "text",
            "path": "/test",
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

"where": "updated>=${START_FLAG} AND updated<${END_FLAG}"

相当于updated大于等于2023-02-15 00:00:00,小于2023-02-16 00:00:00的数据

当前日期为2月16日,则意为着数据是前一天日内的全量数据

执行任务:

[wangting@hdt-dmcp-ops05 datax]$ python bin/datax.py -p "-DSTART_FLAG=`date -d"1 day ago" +%Y%m%d` -DEND_FLAG=`date +%Y%m%d`" job/mysql2hdfs_3.json
2023-02-16 16:58:22.742 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-16 16:58:22.742 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 40 bytes | Speed 4B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s|  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-02-16 16:58:22.743 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-02-16 16:58:11
任务结束时刻                    : 2023-02-16 16:58:22
任务总计耗时                    :                 11s
任务平均流量                    :                4B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

验证数据:

[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -ls /test
2023-02-16 16:58:37,819 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 3 items
-rw-r--r--   3 wangting supergroup         20 2023-02-16 10:36 /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
-rw-r--r--   3 wangting supergroup         43 2023-02-15 19:25 /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
-rw-r--r--   3 wangting supergroup         68 2023-02-16 16:58 /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -cat /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
2023-02-16 16:58:49,450 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-16 16:58:49,845 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
5       wangting555     2023-02-15 13:44:30
6       wangting666     2023-02-15 22:13:41

可以看到数据只收取到了T-1日的2条数据

4-4.同步HDFS数据到MySQL案例

准备HDFS文件目录

[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -ls /test
2023-02-16 17:17:50,778 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 3 items
-rw-r--r--   3 wangting supergroup         20 2023-02-16 10:36 /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
-rw-r--r--   3 wangting supergroup         43 2023-02-15 19:25 /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
-rw-r--r--   3 wangting supergroup         68 2023-02-16 16:58 /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -rm /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
2023-02-16 17:18:15,296 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Deleted /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -rm /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
2023-02-16 17:18:31,868 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Deleted /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -ls /test
2023-02-16 17:18:45,837 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
-rw-r--r--   3 wangting supergroup         68 2023-02-16 16:58 /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
[wangting@hdt-dmcp-ops05 datax]$ hdfs dfs -cat /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
2023-02-16 17:18:56,387 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-16 17:18:56,791 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
5       wangting555     2023-02-15 13:44:30
6       wangting666     2023-02-15 22:13:41

# /test目录下有一个test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4文件
# 文件中有2条数据

创建MySQL被导入的测试表

[wangting@hdt-dmcp-ops05 datax]$ mysql -uroot -p123456 -Dwangtingdb;

mysql> create table test_666 like test_2;
Query OK, 0 rows affected (0.01 sec)

mysql> desc test_666;
+---------+-------------+------+-----+---------+-------+
| Field   | Type        | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+-------+
| id      | int(11)     | YES  |     | NULL    |       |
| name    | varchar(20) | YES  |     | NULL    |       |
| updated | datetime    | YES  |     | NULL    |       |
+---------+-------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

mysql> select * from test_666;
Empty set (0.00 sec)
# 当前test_666表为空,没有数据
[wangting@hdt-dmcp-ops05 datax]$ vim job/hdfs2mysql.json

hdfs2mysql.json任务内容:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/test",
            "defaultFS": "hdfs://hdt-dmcp-ops01:8020",
            "column": [
			{"index":0,"type":"string"},
			{"index":1,"type":"string"},
			{"index":2,"type":"string"}
			],
            "fileType": "text",
            "encoding": "UTF-8",
            "nullFormat": "\\N",
            "fieldDelimiter": "\t"
          }

        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "replace",
            "username": "root",
            "password": "123456",
            "column": [
              "id",
              "name",
              "updated"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://hdt-dmcp-ops05:3306/wangtingdb?useUnicode=true&characterEncoding=utf-8",
                "table": [
                  "test_666"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

执行任务:

[wangting@hdt-dmcp-ops05 datax]$ python bin/datax.py job/hdfs2mysql.json
2023-02-16 17:19:56.672 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-16 17:19:56.673 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 62 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s|  All Task WaitReaderTime 0.004s | Percentage 100.00%
2023-02-16 17:19:56.674 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-02-16 17:19:45
任务结束时刻                    : 2023-02-16 17:19:56
任务总计耗时                    :                 11s
任务平均流量                    :                6B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

验证查看MySQL

[wangting@hdt-dmcp-ops05 datax]$ mysql -uroot -p123456 -Dwangtingdb -e "select * from test_666;"
mysql: [Warning] Using a password on the command line interface can be insecure.
+------+-------------+---------------------+
| id   | name        | updated             |
+------+-------------+---------------------+
|    5 | wangting555 | 2023-02-15 13:44:30 |
|    6 | wangting666 | 2023-02-15 22:13:41 |
+------+-------------+---------------------+

4-5.同步CSV文件数据倒MySQL案例

  • 准备一个csv文件用于同步数据
[wangting@hdt-dmcp-ops05 ~]$ cd /opt/module/datax/job/huatong_data/
[wangting@hdt-dmcp-ops05 huatong_data]$ vim prefecture_level_city_quarter.csv
"dbcode","code","cname","ayearmon","regcode","regname","cunit",data,"updatetime"
"djsjd","A0302","社会消费品零售总额_累计增长","2021D","511000","内江市","%",18.2,"2022-02-20 09:29:00"
"djsjd","A0302","社会消费品零售总额_累计增长","2021C","511000","内江市","%",21.1,"2021-11-08 08:14:05"
"djsjd","A0302","社会消费品零售总额_累计增长","2021B","511000","内江市","%",25.1,"2021-07-29 07:23:33"
"djsjd","A0302","社会消费品零售总额_累计增长","2021A","511000","内江市","%",29.9,"2021-07-29 07:23:30"
"djsjd","A0302","社会消费品零售总额_累计增长","2020D","511000","内江市","%",-3.2,"2021-08-02 11:42:00"
"djsjd","A0302","社会消费品零售总额_累计增长","2020B","511000","内江市","%",-7.9,"2021-08-02 11:41:56"
"djsjd","A0302","社会消费品零售总额_累计增长","2020A","511000","内江市","%",-11.9,"2021-08-02 11:41:55"
"djsjd","A0301","社会消费品零售总额_累计值","2021C","511000","内江市","亿元",446.38,"2021-11-08 08:14:05"
"djsjd","A0301","社会消费品零售总额_累计值","2020B","511000","内江市","亿元",232.43,"2021-08-02 11:41:56"
"djsjd","A0301","社会消费品零售总额_累计值","2020A","511000","内江市","亿元",106.81,"2021-08-02 11:41:55"
"djsjd","A0202","城镇常住居民人均可支配收入_累计增长","2016D","511000","内江市","%",8.53,"2018-12-13 05:21:45"
"djsjd","A0202","城镇常住居民人均可支配收入_累计增长","2022A","511000","内江市","%",6.4,"2022-05-11 02:11:47"
"djsjd","A0202","城镇常住居民人均可支配收入_累计增长","2021D","511000","内江市","%",8.9,"2022-05-27 02:14:21"
"djsjd","A0202","城镇常住居民人均可支配收入_累计增长","2021D","511000","内江市","%",8.9,"2022-02-23 08:23:30"
"djsjd","A0202","城镇常住居民人均可支配收入_累计增长","2021C","511000","内江市","%",9.8,"2022-02-23 09:49:27"

这里只提供部分样例数据用于调试

  • 创建MySQL库表用于写入数据
-- 创建db
CREATE DATABASE `huatongdata`;

use huatongdata;

-- 创建地级市季度表
create table prefecture_level_city_quarter(
dbcode varchar(50) comment "维度码",
code varchar(200) comment "指标编码",
cname varchar(200) comment "指标名称",
ayearmon varchar(50) comment "时间期",
regcode varchar(50) comment "地区编码",
regname varchar(200) comment "地区名称",
cunit varchar(80) comment "计量单位",
`data` decimal(38,8) comment "数值",
updatetime datetime comment "更新时间"
) comment "地级市季度表";

-- 刚创建的表目前为空
mysql> select * from prefecture_level_city_quarter;
Empty set (0.00 sec)
  • 编写任务脚本
[wangting@hdt-dmcp-ops05 datax]$ vim job/csv2mysql.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "txtfilereader", 
                    "parameter": {
                    "path": ["/opt/module/datax/job/huatong_data/prefecture_level_city_quarter.csv"], 
                    "encoding":"utf-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            },
                            {
                                "index": 2,
                                "type": "string"
                            },
							{
                                "index": 3,
                                "type": "string"
                            },
							{
                                "index": 4,
                                "type": "string"
                            },
							{
                                "index": 5,
                                "type": "string"
                            },
							{
                                "index": 6,
                                "type": "string"
                            },
							{
                                "index": 7,
                                "type": "string"
                            },
							{
                                "index": 8,
                                "type": "string"
                            }
                        ],
                    "skipHeader": "true"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "dbcode",
                            "code",
                            "cname",
                            "ayearmon",
                            "regcode",
                            "regname",
                            "cunit",
                            "data",
                            "updatetime"
                        ], 
                        "connection": [
                            {
                               "jdbcUrl": "jdbc:mysql://hdt-dmcp-ops05:3306/huatongdata?useUnicode=true&characterEncoding=utf8", 
                               "table": ["prefecture_level_city_quarter"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root",
                        "preSql":[""],
                        "session":["set session sql_mode='ANSI'"],
                        "writeMode":"insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 执行同步任务
[wangting@hdt-dmcp-ops05 datax]$ python bin/datax.py job/csv2mysql.json
...
...
2023-02-17 10:36:17.020 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-17 10:36:17.021 [job-0] INFO  StandAloneJobContainerCommunicator - Total 173559 records, 10806586 bytes | Speed 1.03MB/s, 17355 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 3.118s |  All Task WaitReaderTime 0.268s | Percentage 100.00%
2023-02-17 10:36:17.022 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-02-17 10:36:06
任务结束时刻                    : 2023-02-17 10:36:17
任务总计耗时                    :                 10s
任务平均流量                    :            1.03MB/s
记录写入速度                    :          17355rec/s
读出记录总数                    :              173559
读写失败总数                    :                   0

读出记录总数 : 173559

说明本次任务同步到MySQL涉及到173559行

  • 验证
# 登录MySQL查询刚创建的prefecture_level_city_quarter地级市季度表查看数据量
mysql> select count(*) from prefecture_level_city_quarter;
+----------+
| count(*) |
+----------+
|   173559 |
+----------+
1 row in set (0.05 sec)

和datax记录总数可以对上,说明CSV文件全部都同步到MySQL

5.DataX常见的参数设置

5-1.加速相关配置

参数说明注意事项
job.setting.speed.channel设置并发数
job.setting.speed.record总record限速配置此参数,则必须配置单个channel的record限速参数
job.setting.speed.byte总byte限速配置此参数,则必须配置单个channel的byte限速参数
core.transport.channel.speed.record单个channel的record限速,默认10000条/s

【注意】:如果配置了总record限速和总byte限速,channel并发数就会失效。因为配置了这两个参数后,实际的channel并发数是通过计算得到的

5-2.运行内存调整

当提升DataX Job内的Channel并发数时,内存的占用会明显增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。

例如:channel中会有一个Buffer,作为临时的数据交换缓冲区,而在Reader和Write中,也会有一些buffer,为了防止OOM等错误,需要适当调大JVM堆内存

  • 永久修改

修改datax.py

# 找到DEFAULT_JVM相关内容更改:-Xms1g -Xmx1g
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
  • 当前任务修改

启动时使用参数:python bin/datax.py --jvm = "-Xms8G -Xmx8G” job.json

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/351391.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JDK安装环境变量配置

jdk下载地址:https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html 下载安装包和exe都可。 安装JDK 安装JDK时&#xff0c;除了修改安装目录&#xff0c;其他的一路【下一步】&#xff0c;傻瓜式安装。 &#xff08;注&#xff1a;当提示安装JRE时…

WPF MVVM系统入门-上

WPF MVVM系统入门-上 Models:存放数据的模型&#xff0c;实体对象 Views:可视化界面 ViewModels:业务逻辑。ViewModels与Models的联系会更紧密&#xff0c;而Views页面会主动绑定ViewModels中的数据&#xff0c;原则上ViewModels不要直接去操作Views&#xff0c;被动的被Vie…

教唆chat ai 吵架--chatGPT和chatBing体验

教唆chat ai 吵架–chatGPT和chatBing体验 请注意&#xff0c;本文主观性非常高&#xff0c;只是一个参考性文章&#xff0c;无任何其他含义。 当我们谈到人工智能对话模型时&#xff0c;ChatGPT和ChatBing是两个备受关注的模型。它们都是自然语言处理领域中的重要里程碑&…

hive学习(仅供参考)

hive搭建Hive什么是hiveHive的优势和特点hive搭建解压、改名修改环境变量添加hive-site.xml将maven架包拷贝到hive替换一下gua包使环境变量生效初始化安装成功Hive 什么是hive 将结构化的数据文件映射为数据库表 提供类sql的查询语言HQL(Hive Query Language) Hive让更多的人…

【C++内存管理机制】学习笔记(4):重载operate new/::operator new..../new()

目录 简介C++应用程序 分配内存的途径重载::operator new/::operator delete重载operator new/operator delete重载new()/delete()结语简介 Hello! 非常感谢您阅读海轰的文章,倘若文中有错误的地方,欢迎您指出~ ଘ(੭ˊᵕˋ)੭ 昵称:海轰 标签:程序猿|C++选手|学生 简…

自学软件测试?一般人“别来沾边”...

本人7年测试经验&#xff0c;在学测试之前对电脑的认知也就只限于上个网&#xff0c;玩个办公软件。这里不能跑题&#xff0c;我为啥说&#xff1a;自学软件测试&#xff0c;一般人我还是劝你算了吧&#xff1f;因为我就是那个一般人&#xff01; 软件测试基础真的很简单&…

mac环境,安装NMP遇到的问题

一 背景 项目开发中,公司项目需要使用本地的环境运行,主要是php这块的业务。没有使用docker来处理,重新手动撸了一遍。记录下其中遇到的问题; 二 遇到的问题 2.1 Nginx的问题 brew install nginx后,启动nginx,报错如下:nginx: [emerg] no "ssl_certificate" …

数据结构与算法(二):线性表

上一篇《数据结构与算法&#xff08;一&#xff09;&#xff1a;概述》中介绍了数据结构的一些基本概念&#xff0c;并分别举例说明了算法的时间复杂度和空间复杂度的求解方法。这一篇主要介绍线性表。 一、基本概念 线性表是具有零个或多个数据元素的有限序列。线性表中数据…

零售电子标签解决方案

电子货架标签系统&#xff08;ESLs&#xff09;&#xff0c;是一种放置在货架上、可替代传统纸质价格标签的电子显示装置&#xff0c; 每一个电子货架标 签通过有线或者无线网络与商场计算机数据库相连&#xff0c; 并将最新的商品价格通过电子货架标签上的屏显示出来。 电子…

2023年数学建模美赛D题(Prioritizing the UN Sustainability Goals)分析与编程

2023年数学建模美赛D题分析建模与编程 重要说明&#xff1a; 本文介绍2023年美赛题目&#xff0c;并进行简单分析&#xff1b;本文首先对 D题进行深入分析&#xff0c;其它题目分析详见专题讨论&#xff1b;本文及专题分析将在 2月17日每3小时更新一次&#xff0c;完全免费&am…

使用chatgpt生成快速入眠笔记

以下是使用chatgpt生成快速入眠笔记的简单过程 可以发现&#xff0c;增加详细两个字&#xff0c;可以让它表述的更明白。 通过询问“还有其他方法吗”&#xff0c;获取更多可能性&#xff0c;当然你也可以直接说继续 但实测继续有时候不会记住上一条提问 详细讲解一下程序员怎…

类似LeetCode的登录页面(小程序版)

前言每一个项目都会有用户端的注册和登录页面&#xff0c;对于刚入门的小白来说&#xff0c;在UI设计方面不太擅长&#xff0c;就算大致的UI界面设计出来了&#xff0c;但是落实到代码上来实现的时候就很容易卡住。这篇博客主要介绍的就是仿作一个类似LeetCode登录的简约大方页…

离线环境轻量级自动化部署

流程图&#xff1a; 常规系统发布的痛点 服务器频繁重启&#xff0c;上面部署的应用服务不能随之重启&#xff0c;导致服务时常宕机应用手动部署相对比较麻烦&#xff0c;步骤繁琐应用发布环境取决于发布人本地环境&#xff0c;导致不同发布人每次发布环境不一致&#xff0c;导…

【玩转多核异构】双核高速率CAN-FD评测——飞凌嵌入式

为了能够让更多的工程师朋友了解多核异构处理器&#xff0c;飞凌嵌入式特别推出了【玩转多核异构】专题&#xff0c;帮助大家解决在多核异构处理器的开发过程中遇到的问题。【玩转多核异构】专题持续更新中&#xff0c;欢迎您的持续关注。引言凭借实时性、抗干扰性和安全性等优…

Redis 开发规范

原创 | Java 2021 超神之路&#xff0c;很肝~中文详细注释的开源项目RPC 框架 Dubbo 源码解析网络应用框架 Netty 源码解析消息中间件 RocketMQ 源码解析数据库中间件 Sharding-JDBC 和 MyCAT 源码解析作业调度中间件 Elastic-Job 源码解析分布式事务中间件 TCC-Transaction 源…

详细总结Ansible中使用playbook

文章目录前言一、Playbook的功能二、YAML三、playbook执行命令1.使用ansible-playbook部署ftp服务&#xff0c;并开启匿名用户访问权利2.使用ansible-playbook部署apache服务&#xff0c;设定默认发布文件内容为www.westos.org3.tags&#xff1a;标签四、使用vim解决yaml书写格…

使用git中可能出现的问题

问题1&#xff1a;如果遇到自己的文件在远程仓库dev分支被别人修改了&#xff0c;自己在本地仓库test分支继续在写代码先拉取最新的代码 覆盖本地dev分支 TortoiseGit->Pull被修改如图2.拉取最新的代码(拉取成功后 本地dev分支user有四条属性)3.切换到自己的分支tortoiseGit…

C++009-C++循环结构while

文章目录C009-C循环结构whilewhile循环while循环举例题目描述 对折多少次能超过nmm题目描述 输入整数和超过n题目描述 输入若干个大写字母&#xff0c;输出对应的小写字母题目描述 输入整数&#xff0c;逆序输出作业在线练习&#xff1a;总结C009-C循环结构while 在线练习&…

模型转换 PyTorch转ONNX 入门

前言 本文主要介绍如何将PyTorch模型转换为ONNX模型&#xff0c;为后面的模型部署做准备。转换后的xxx.onnx模型&#xff0c;进行加载和测试。最后介绍使用Netron&#xff0c;可视化ONNX模型&#xff0c;看一下网络结构&#xff1b;查看使用了那些算子&#xff0c;以便开发部署…

计算机网络第1章(概述)学习笔记

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…