DataX 源码调试及打包

news2025/1/14 1:19:44

文章目录

        • 1、源码分析
        • 2、打包
        • 3、任务测试
        • 4、job配置详解
          • Reader(读插件)
          • Writer(写插件)
          • 通用配置

前文回顾:
《DataX 及 DataX-Web 安装使用详解》

除了前文介绍的我们可以直接安装使用外,还可以下载源码打包,并且对源码进行适当修改,比如我这里在同步数据到 oracle 时,由于 datax 只能做 oracle 的 insert,如果主键重复的数据就冲突了。

源码地址:https://github.com/alibaba/DataX


1、源码分析

找到 core 这个包下的datax.py

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)

可以看到程序入口是com.alibaba.datax.core.Engine,进入这个类,main方法如下

    public static void main(String[] args) throws Exception {
        int exitCode = 0;
        try {
            Engine.entry(args);
        } catch (Throwable e) {
            exitCode = 1;
            LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e));

            if (e instanceof DataXException) {
                DataXException tempException = (DataXException) e;
                ErrorCode errorCode = tempException.getErrorCode();
                if (errorCode instanceof FrameworkErrorCode) {
                    FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
                    exitCode = tempErrorCode.toExitValue();
                }
            }
            System.exit(exitCode);
        }
        System.exit(exitCode);
    }

然后就,就自己顺着看吧。


2、打包

打包的时候测试失败导致打包失败,所以跳过测试打包,idea控制台输入如下命令。

mvn -U clean package assembly:assembly '-Dmaven.test.skip=true'

在这里插入图片描述

打包成功后的 datax 包位于 {DataX_source_code_home}/target/datax/datax/,结构如下

在这里插入图片描述

最后执行下自检脚本

python datax.py ../job/job.json

我这里执行任务时,提示了如下错误

[job-0] ERROR Engine -
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Framework-03], Description:[DataX引擎配置错误,该问题通常是由于DataX安装错误引起,请联系您的运维解决 .].  - 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:30)
at com.alibaba.datax.core.job.JobContainer.adjustChannelNumber(JobContainer.java:430)
at com.alibaba.datax.core.job.JobContainer.split(JobContainer.java:387)
at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:117)
at com.alibaba.datax.core.Engine.start(Engine.java:93)
at com.alibaba.datax.core.Engine.entry(Engine.java:175)
at com.alibaba.datax.core.Engine.main(Engine.java:208)

原因:DataX的配置有问题,单个 channel 的 bps 值不能为空,也不能为非正数。所以查看 datax 源码,core\src\main\conf下的 core.json 文件,内容如下。

注:如果是安装的 datax,则位置在 datax/conf/core.json。

{
    "entry": {
        "jvm": "-Xms1G -Xmx1G",
        "environment": {}
    },
    "common": {
        "column": {
            "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
            "timeFormat": "HH:mm:ss",
            "dateFormat": "yyyy-MM-dd",
            "extraFormats":["yyyyMMdd"],
            "timeZone": "GMT+8",
            "encoding": "utf-8"
        }
    },
    "core": {
        "dataXServer": {
            "address": "http://localhost:7001/api",
            "timeout": 10000,
            "reportDataxLog": false,
            "reportPerfLog": false
        },
        "transport": {
            "channel": {
                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
                "speed": {
                    "byte": -1,
                    "record": -1
                },
                "flowControlInterval": 20,
                "capacity": 512,
                "byteCapacity": 67108864
            },
            "exchanger": {
                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
                "bufferSize": 32
            }
        },
        "container": {
            "job": {
                "reportInterval": 10000
            },
            "taskGroup": {
                "channel": 5
            },
            "trace": {
                "enable": "false"
            }

        },
        "statistics": {
            "collector": {
                "plugin": {
                    "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
                    "maxDirtyNumber": 10
                }
            }
        }
    }
}

可以看到core -> transport -> channel -> speed -> byte的默认值为:-1

解决办法

修改内容:将byte值设置为: 1048576,代表单个channel容纳的最多字节数,可以适当调大,不oom就行。

"speed": {
  "byte": 1048576,
  "record": -1
},

3、任务测试

创建一个job,json文件内容如下

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "byte": 1048576
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "chaodev123",
            "column": [
              "`id`",
              "`user_id`",
              "`message`",
              "`send_date`",
              "`channel`",
              "`sendUser`",
              "`receiver`",
              "`contentType`",
              "`pictureUrl`",
              "`longitude`",
              "`latitude`",
              "`coordinateType`",
              "`isRead`",
              "`msgType`",
              "`readTime`",
              "`create_time`",
              "`state`",
              "`fileName`",
              "`fileDownloadUrl`"
            ],
            "splitPk": "",
            "connection": [
              {
                "table": [
                  "message"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://192.168.152.40:3306/im"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": [
              "`id`",
              "`user_id`",
              "`message`",
              "`send_date`",
              "`channel`",
              "`sendUser`",
              "`receiver`",
              "`contentType`",
              "`pictureUrl`",
              "`longitude`",
              "`latitude`",
              "`coordinateType`",
              "`isRead`",
              "`msgType`",
              "`readTime`",
              "`create_time`",
              "`state`",
              "`fileName`",
              "`fileDownloadUrl`"
            ],
			"preSql": [
                "truncate table message"
            ],
            "connection": [
              {
                "table": [
                  "message"
                ],
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/im"
              }
            ]
          }
        }
      }
    ]
  }
}

执行任务,如下

python datax.py ../job/im_message.json

可以看到,任务执行成功。

在这里插入图片描述


4、job配置详解

Reader(读插件)
  • name:与要读取的数据库一致(字符串)
  • jdbcUrl:数据库链接(数组)
  • username:数据库用户名(字符串)
  • password:数据库密码(字符串)
  • table:要同步的表名(数组),需保证表结构一致
  • column:要同步的列名(数组)
  • where:选取的条件(字符串)
  • querySql:自定义查询语句, 会自动忽略上述的同步条件

Writer(写插件)
  • name:与要读取的数据库一致(字符串)
  • jdbcUrl:数据库链接(字符串)
  • username:数据库用户名( 字符串)
  • password: 数据库密码 (字符串)
  • table:要同步的表名(数组),需保证表结构一致
  • column :列名可以不对应,但是类型和总的个数要一致( 数组),需保证表结构一致
  • preSql: 写入前执行的语句(数组),比如清空表等,如TRUNCATE TABLE @table(或指定表名)
  • postSql : 写入后执行的语句 (数组)
  • writeMode:写入模式,默认insert ,可选(insert/replace/update)。
    insert 模式就是直接插入,如果主键冲突就无法插入;replace 模式如果存在记录则先删除该条记录再插入;update 模式则是有重复进行更新。需要注意的是oracle只支持insert配置项。
  • session:DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性。
  • batchSize:默认值1024,一次性批量提交的记录数大小,该值可以极大减少DataX与数据库交互次数,但是该值设置过大可能会造成OOM。

通用配置
  • job.setting.speed(流量控制):channel的值控制同步时的并发数,byte的值控制单个channel容纳的最多字节数。
  • job.setting.errorLimit(脏数据控制):对脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record值)或者脏数据占比阈值(percentage值),当Job传输过程出现的脏数据大于指定的数量/百分比,DataX Job报错退出。

后续继续更新datax-web源码打包、二次开发支持Oracle更新数据等,如果觉得有帮助就给大佬超点个关注点个赞吧。


更多技术干货,请持续关注程序员大佬超。
原创不易,转载请注明出处。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/72539.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一文解决Kubernetes 的 API 流量查看器 - Kubeshark

一、Kubeshark 是什么? Kubeshark 由 2021 年 UP9 公司开源的 K8s API 流量查看器 Mizu 发展而来,试图成为一款 K8s 全过程流量监控工具。 Kubeshark 被叫做 kubernetes 的 API 流量查看器,它提供对进出 Kubernetes 集群内容器和 pod 的所有…

[附源码]JAVA毕业设计微服务的高校二手交易平台(系统+LW)

[附源码]JAVA毕业设计微服务的高校二手交易平台(系统LW) 项目运行 环境项配置: Jdk1.8 Tomcat8.5 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 …

多线程与高并发(14)——Executor框架(线程池基础)

一、简介 线程是什么,线程是一个任务的工作单元和执行单元。我们在用线程的时候知道,要创建线程,再执行线程。如果任务多的情况呢,会有大量的创建销毁线程的资源消耗,这时候就引入了线程池的概念。 JDK5开始&#xff…

使用Maven创建Servlet项目

创建Maven项目 点击FIle, 选择new ,选择Project… 选择Maven 然后点击next. 选择自己想要创建项目的目录.点击next 引入依赖 在pom.xml中添加servlet依赖. 先书写dependencies标签.然后在 Maven中央仓库 中找到servlet的依赖.复制填写进去. 这里是我常用的一个 Maven中央仓库…

TI Lab_SRR学习_3 速度扩展_2 interChirpProcessing_RangeDPU

RangeProcDSP共分为三步,如下图所示 transfers ADCBuf data through dataIn EDMA channels in ping/pong alternate order to FFT input scratch buffer - adcDataIn.Range FFT processing is done by using DSPlib and mmwavelib APIs. FFT input data is stored in input sc…

[附源码]计算机毕业设计JAVA中青年健康管理监测系统

[附源码]计算机毕业设计JAVA中青年健康管理监测系统 项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM …

Python 实战分析某招聘网站数据分析岗位的招聘情况

前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~ 今天案例难度指数: ☆☆☆ 准备 环境使用: Anaconda (python3.9) –>识别我们写的代码 开发工具: jupyter notebook –>代码编辑功能敲代码的工具 相关模块: seaborn pandas …

在CentOS7.9系统上安装N卡3060驱动、CUDA和离线升级gcc(4.8—>8.3)用以编译框架的过程记录

1、更换yum源 主要是在终端操作需要,在显示器界面可以直接联网解决网络问题进行软件安装或更新 """"备份原来的源""" mv /etc/yum.repos.d/ /etc/yum.repos.d.bak/ mkdir /etc/yum.repos.d vim /etc/yum.repos.d/xxx.repo &qu…

LeetCode HOT 100 —— 155.最小栈

题目 设计一个支持 push ,pop ,top 操作,并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。 void push(int val) 将元素val推入堆栈。 void pop() 删除堆栈顶部的元素。 int top() 获取堆栈顶部的元素。 …

JavaScript 删除对象中的某一项

delete let obj{a:1,b:2,c:3,d:4,e:5,f:6} delete obj.b console.log(obj)运行结果: Reflect.deleteProperty JavaScript 中的静态方法 Reflect.deleteProperty() 允许用于删除对象上的属性。它很像 deleteoperator,但它是一个函数。 Reflect.delet…

Python操作Azure Blob Storage

安装 Azure Storage SDK for Python 最简单的方式是在安装了 python 和 pip 的机器上直接执行下面的命令: pip install azure-storage 安装完成后通过 pip freeze 命令查看安装的版本: 由于 Azure Storage SDK for Python 是一个开源项目,…

Buildroot系列开发(七)block device

摘自:百问网 文章目录1.块设备2.1 什么是块设备?raw flash?2.2 block device 列表2.3 块设备分区2.4 传输数据到块设备2. 块设备文件系统2.1 支持的块设备文件系统2.2 linux / unix 其他日志文件系统2.3 F2FS2.4 SquashFS2.5 如何选择最佳文件…

计算机网络-转发表和路由选择协议

有志者,事竟成 文章目录一、描述1、转发表和路由选择协议二、总结一、描述 1、转发表和路由选择协议 前面我们说过,路由器从与它相连的一条通信链路得到分组,然后向与它相连的另一条通信链路转发该分组。但是路由器怎样决定它应当向哪条链路…

Docker[4]-Docker数据卷

数据卷 前面我们介绍了镜像和容器,通过镜像我们可以启动多个容器,但是我们发现当我们的容器停止获取删除后,我们在容器中的应用的一些数据也丢失了,这时为了解决容器的数据持久化,我们需要通过容器数据卷来解决这个问…

一文4000字教你如何使用可视化的Docker进行UI自动化测试

随着 docker 的发展,很多测试的同学也已经在测试工作上使用 dockr 作为环境基础去进行一些自动化测试,这篇文章主要讲述我们在 docker 中使用浏览器进行自动化测试如果可以实现可视化,同时可以对浏览器进行相关的操作。 开篇 首先我们先了解…

【C++学习笔记】C++编程环境配置

g跟gcc之间是否有依赖关系 g跟gcc之间没有依赖关系,两者分别对应面向C和C语言的编译程序,关于gcc和g的区别,请参考知乎回答《gcc和g是什么关系? ——gcc 和 g 的区别》 1 Ubuntu环境配置 Ubuntu官方源提供gcc和g预编译版本 Ub…

用一张图说一说 ChatGPT 内部技术工作流程

前沿 这几天ChatGPT可谓是热火朝天,很多同事和朋友都来找到勇哥,说能不能说一说相关话题,但是之前几天勇哥都在默默的干一件大事情,今天终于成型、有结果了,所有就抽了点时间来和大家一起聊聊ChatGPT背后的技术&#…

.net开发安卓入门 - 布局与样式(像素单位px、dp、sp的区别)

.net开发安卓入门 - 布局与样式布局LinearLayoutRelativeLayoutTableLayoutRecyclerViewListViewGridViewGridLayoutTabbed Layouts主题 Material Theme主题应用程序主题活动像素pxdpdipsp常用UI框架推荐常用动画推荐布局 布局用于排列构成屏幕的 UI 界面的元素 (,…

没有公网IP,怎样远程查看视频监控?

视频监控通常被称作“第三只眼”。如今,除了最基础的安防需求外,视频监控在不同的应用场景延伸出了各种各样的功能需求,并且正与日俱增。 常见的家庭应用场景,如照看老人小孩、宠物等;常见的公司应用场景,如…

vue的script动态改css、scss变量方法

解决场景&#xff1a;script设颜色变量&#xff0c;<style>的background-color的值"#ddd"的跟着变 序 1、这篇博文适用vue2和vue3版本&#xff0c;博主实验时&#xff0c;vue3的版本是^3.2.45 2、 其实要解决的方案在vue3里有一个专栏“单文件组件的 <…