Datax遇到的坑

news2025/1/14 9:52:00

公司数据中台产品,要使用airflow调datax任务实现离线作业的同步。

一、python版本问题

执行python ..datax.py  .json时 报错

在运行 Python 脚本时,代码中使用了 Python 2 的 print语法,当前的环境是 Python 3。在 Python 3 中,print 是一个函数,因此需要使用括号来调用它。

直接把datax.py替换了

#!/usr/bin/env python
# -*- coding:utf-8 -*-


"""
   Life's short, Python more.
"""

import sys
import os
import signal
import subprocess

import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platform

def isWindows():
    return platform.system() == 'Windows'

DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
    DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"

RET_STATE = {
    "KILL": 143,
    "FAIL": -1,
    "OK": 0,
    "RUN": 1,
    "RETRY": 2
}


def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"


def suicide(signum, e):
    global child_process
    print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(RET_STATE["KILL"])


def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)


def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage=usage)

    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader",type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer",type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    parser.add_option_group(prodEnvOptionGroup)

    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    parser.add_option_group(devEnvOptionGroup)
    return parser

def generateJobConfigTemplate(reader, writer):
    readerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)
    writerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)
    print(readerRef)
    print(writerRef)
    jobGuid = 'Please save the following configuration as a json file and  use\n     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
    print(jobGuid)
    jobTemplate={
      "job": {
        "setting": {
          "speed": {
            "channel": ""
          }
        },
        "content": [
          {
            "reader": {},
            "writer": {}
          }
        ]
      }
    }
    readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)
    writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)
    try:
      readerPar = readPluginTemplate(readerTemplatePath);
    except Exception as e:
       print("Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath))
    try:
      writerPar = readPluginTemplate(writerTemplatePath);
    except Exception as e:
      print("Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath))
    jobTemplate['job']['content'][0]['reader'] = readerPar;
    jobTemplate['job']['content'][0]['writer'] = writerPar;
    print(json.dumps(jobTemplate, indent=4, sort_keys=True))

def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
            return json.load(f)

def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


def buildStartCommand(options, args):
    commandMap = {}
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print('local ip: ', getLocalIp())

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)


def printCopyright():
    print('''
DataX (%s), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
''' % DATAX_VERSION)
    sys.stdout.flush()


if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        generateJobConfigTemplate(options.reader,options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    # print startCommand

    child_process = subprocess.Popen(startCommand, shell=True)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

二、数据库连接失败

我尝试跑通MySQL2MySQL

报错:com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).].  -  具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.

我尝试了各种方法,账号密码数据库名称都对,就是报错。

原来是libs的问题,/datax/plugin/writer/mysqlwriter/libs下原本只有mysql-connector-java-5.1.34.jar

,我给他上传了各种驱动mysql-connector-java-5.1.49.jar、mysql-connector-java-8.0.18.jar、mysql-connector-java-8.0.20.jar,终于成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

容易被遗忘的测试用例

网络服务器启动了吗&#xff1f;应用程序服务器启动了吗&#xff1f;数据库上线了吗&#xff1f;测试数据是否预先加载到数据库中&#xff1f;每当我们准备开始测试应用程序时&#xff0c;一切都应该已经准备妥当。 然而&#xff0c;当测试开始后&#xff0c;我们可能会漏掉一些…

机器学习与深度学习-2-Softmax回归从零开始实现

机器学习与深度学习-2-Softmax回归从零开始实现 1 前言 内容来源于沐神的《动手学习深度学习》课程&#xff0c;本篇博客对于Softmax回归从零开始实现进行重述&#xff0c;依旧是根据Python编程的PEP8规范&#xff0c;将沐神的template代码进行简单的修改。近期有点懒散哈哈哈…

文本生成类(机器翻译)系统评估

在机器翻译任务中常用评价指标&#xff1a;BLEU、ROGUE、METEOR、PPL。 这些指标的缺点&#xff1a;只能反应模型输出是否类似于测试文本。 BLUE&#xff08;Bilingual Evaluation Understudy&#xff09;&#xff1a;是用于评估模型生成的句子(candidate)和实际句子(referen…

保护数字资产:iOS 加固在当前安全环境中的重要性

随着互联网和手机的发展&#xff0c;APP在我们的日常生活中已经变得无处不在&#xff0c;各大平台的应用程序成为了黑客攻击的主要目标。尤其在 2024 年&#xff0c;随着数据泄露和隐私侵犯事件的频发&#xff0c;手机应用的安全问题再次成为公众关注的焦点。近期&#xff0c;多…

基于HTML和CSS的校园网页设计与实现

摘要 随着计算机、互联网与通信技术的进步&#xff0c;Internet在人们的学习、工作和生活中的地位也变得越来越高&#xff0c;校园网站已经成为学校与学生&#xff0c;学生与学生之间交流沟通的重要平台&#xff0c;对同学了解学校内发生的各种事情起到了重要的作用。学校网站…

Secured Finance 推出 TVL 激励计划以及基于 FIL 的稳定币

Secured Finance 是新一代 DeFi 2.0 协议&#xff0c;其正在推出基于 FIL 的稳定币、固定收益市场以及具有吸引力的 TVL 激励计划&#xff0c;以助力 Filecoin 构建更强大的去中心化金融生态体系&#xff0c;并为 2025 年初 Secured Finance 协议代币的推出铺平道路。Secure Fi…

WebRover :一个功能强大的 Python 库,用于从 Web 内容生成高质量的数据集,专为训练大型语言模型和 AI 应用程序而设计。

2024-11-30 &#xff0c;由Area-25团队开发的一个专门用于生成高质量网络内容数据集的Python库。该数据集旨在为大型语言模型&#xff08;LLM&#xff09;和人工智能应用的训练提供丰富的数据资源。 数据集地址&#xff1a;WebRover Dataset|自然语言处理数据集|AI模型训练数据…

基于ZYNQ-7000系列的FPGA学习笔记7——按键控制蜂鸣器(模块化编写)

基于ZYNQ-7000系列的FPGA学习笔记7——按键控制蜂鸣器&#xff08;模块化编写&#xff09; 1. 实验要求2. 功能分析3. 模块设计4. 波形图4.1 按键消抖模块4.2 按键控制蜂鸣器模块 5.代码编写5.1 rtl代码5.2 测试代码 6. 代码仿真7. 添加约束文件并分析综合 在上期的内容中&…

Android 分词的两种方式

前言&#xff1a; 本文分别介绍了原生和三方(Jieba)两种分词方式的使用和注意事项 1、安卓原生BreakIterator分词 比较简单&#xff0c;但是效果不太行 /*** 功能&#xff1a;原生分词* 参数&#xff1a;text&#xff1a;需要分词的语句* 返回值&#xff1a;return&#xf…

python之Django连接数据库

文章目录 连接Mysql数据库安装Mysql驱动配置数据库信息明确连接驱动定义模型在模型下的models.py中定义表对象在settings.py 中找到INSTALLED_APPS添加创建的模型 测试testdb.py中写增删改查操作urls.py添加请求路径启动项目进行测试 连接Mysql数据库 安装Mysql驱动 pip inst…

JavaWeb学习(1)(同步或异步请求、依赖jQuery简单实现Ajax技术)

目录 一、Web的基本流程与页面局部刷新。 &#xff08;1&#xff09;web开发时基本流程。 &#xff08;2&#xff09;页面的"全局刷新"与"局部刷新"。 二、Ajax技术。 &#xff08;1&#xff09;基本介绍。 &#xff08;2&#xff09;基本特点。 1、与服务…

spark sql 环境安装,java 默认路径和 安装配置!

yum安装java 查看默认路径 update-alternatives --config java # Java 环境变量 export JAVA_HOME/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64/jreexport PATH$JAVA_HOME/bin:$PATH# Spark 环境变量 export SPARK_HOME/home/vagrant/soft/sparkexport PATH…

网络层总结

网络层任务&#xff1a; 分组 从源主机 经多个网络/多段链路 传输到目的主机 两种重要的功能&#xff1a; 分组转发、 路由选择 网络层向其上层提供的两种服务 —— 面向连接的虚电路服务、无连接的数据报服务 面向连接的虚电路服务&#xff1a; 可靠通…

python学习笔记15 python中的类

上一篇我们介绍了python中的库 &#xff0c;学习了一些常见的内置库。详细内容可点击–>python学习笔记14 python中的库&#xff0c;常见的内置库&#xff08;random、hashlib、json、时间、os&#xff09; 这一篇我们来看一下python中的类 创建一个类 class 类的名称():de…

MySQL数据集成到广东省追溯平台的销售信息同步方案

销售信息同步--外购上报流程2&#xff1a;MySQL数据集成到广东省特殊食品电子追溯平台 在现代数据驱动的业务环境中&#xff0c;确保销售信息的准确性和及时性至关重要。本文将分享一个具体的技术案例&#xff0c;展示如何通过轻易云数据集成平台&#xff0c;将MySQL中的销售信…

【推荐算法】推荐系统的评估

这篇文章是笔者阅读《深度学习推荐系统》第五章推荐系统的评估的学习笔记&#xff0c;在原文的基础上增加了自己的理解以及内容的补充&#xff0c;在未来的日子里会不断完善这篇文章的相关工作。 文章目录 离线评估划分数据集方法客观评价指标P-R曲线ROC/AUCmAPNDCG A/B 测试分…

移植NIOS10.1工程,NIOS10.1路径修改

移植NIOS10.1工程&#xff0c;NIOS10.1路径修改 因工程的需要&#xff0c;使用的NIOS10.1&#xff0c;比较老&#xff0c;这个版本的路径是使用的绝对路径&#xff0c;导致移植工程市回报路径的错误&#xff0c;在13.1之后改为了相对路径&#xff0c;不存在这个问题。 需要修…

WPF+LibVLC开发播放器-LibVLC播放控制

接上一篇&#xff1a; LibVLC在C#中的使用 实现LibVLC播放器播放控制 界面 界面上添加一个Button按钮用于控制播放 <ButtonGrid.Row"1"Width"88"Height"24"Margin"10,0,0,0"HorizontalAlignment"Left"VerticalAlignme…

iOS与Windows间传文件

想用数据线从 windows 手提电脑传文件入 iPhone&#xff0c;有点迂回。 参考 [1]&#xff0c;要在 windows 装 Apple Devices。装完、打开、插线之后会检测到手机&#xff0c;界面&#xff1a; 点左侧栏「文件」&#xff0c;不是就直接可以传&#xff0c;而是要通过某个应用传…

两个畸变矩阵相乘后还是一个2*2的矩阵,有四个畸变元素。1、畸变矩阵吸收了法拉第矩阵。2、畸变矩阵也给法拉第旋转角带来模糊(求解有多种可能)

角度一&#xff1b;恢复畸变的时候也把法拉第旋转恢复了 角度二&#xff1a;求解法拉第旋转角的时候 前面乘的复系数的不同也会带来法拉第旋转角和畸变的不同解 注意&#xff1a;无论多少个畸变矩阵相乘&#xff0c;结果都是2*2的矩阵&#xff0c;也就是畸变参数可以减少…