DataX二次开发之达梦数据库插件

news2025/1/10 11:15:27
  • 达梦数据库自定义插件

    • 达梦8的依赖引入

    • 定义reader module

    • 定义writer module

    • 修改核心配置数据库类型支持

    • 打包插件

  • 测试

    • 以mysql到dm数据库为例

    • 配置mysql2dm.json

    • 执行任务

    • 查询下结果

DataX二次开发之达梦数据库插件

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,支持大部分主流的数据库之间的数据同步,也提供了数据库插件的自定义开发。

达梦数据库自定义插件

插件自定义开发详细看官网dataxPluginDev.md说明文档

达梦8的依赖引入

<!-- dm driver -->
  <!-- https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 -->
  <dependency>
   <groupId>com.dameng</groupId>
   <artifactId>DmJdbcDriver18</artifactId>
   <version>8.1.3.140</version>
  </dependency>

定义reader module

427e4eba9382d60f4f333f4321cf9652.png

定义writer module

753f83ef60536194647e87db09ab7ac9.png

修改核心配置数据库类型支持

public enum DataBaseType {
    Dm("dm", "dm.jdbc.driver.DmDriver"),
   //...省略其他

    private String typeName;
    private String driverClassName;

    DataBaseType(String typeName, String driverClassName) {
        this.typeName = typeName;
        this.driverClassName = driverClassName;
    }

    public String getDriverClassName() {
        return this.driverClassName;
    }

    public String appendJDBCSuffixForReader(String jdbc) {
      
            case Oracle:
                break;
            case Dm:
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }

        return result;
    }

    public String appendJDBCSuffixForWriter(String jdbc) {
        String result = jdbc;
        String suffix = null;
        switch (this) {
       
            case Oracle:
                break;
            case Dm:
                break;
          //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }

        return result;
    }

    public String formatPk(String splitPk) {
        String result = splitPk;

        switch (this) {
            case MySql:
            
            case Dm:
                if (splitPk.length() >= 2 && splitPk.startsWith("`") && splitPk.endsWith("`")) {
                    result = splitPk.substring(1, splitPk.length() - 1).toLowerCase();
                }
                break;
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }

        return result;
    }


    public String quoteColumnName(String columnName) {
        String result = columnName;

        switch (this) {
          
            case Dm:
               //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
        }

        return result;
    }

    public String quoteTableName(String tableName) {
        String result = tableName;

        switch (this) {
            case MySql:
                result = "`" + tableName.replace("`", "``") + "`";
                break;
            case Oracle:
                break;
            case Dm:
                break;
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
        }

        return result;
    }

    private static Pattern mysqlPattern = Pattern.compile("jdbc:mysql://(.+):\\d+/.+");
    private static Pattern oraclePattern = Pattern.compile("jdbc:oracle:thin:@(.+):\\d+:.+");
    private static Pattern dmPattern = Pattern.compile("jdbc:dm://:@(.+):\\d+:.+");

    /**
     * 注意:目前只实现了从 mysql/oracle 中识别出ip 信息.未识别到则返回 null.
     */
    public static String parseIpFromJdbcUrl(String jdbcUrl) {
        Matcher mysql = mysqlPattern.matcher(jdbcUrl);
        if (mysql.matches()) {
            return mysql.group(1);
        }
        Matcher oracle = oraclePattern.matcher(jdbcUrl);
        if (oracle.matches()) {
            return oracle.group(1);
        }
        Matcher dm = dmPattern.matcher(jdbcUrl);
        if (dm.matches()) {
            return dm.group(1);
        }
        return null;
    }
      //...省略其他

}
62703333fb8b7fecf2676a973b78a347.png

打包插件

需要在父类的插件里边配置模块以及插件打包配置

cbc91b208a8998075770c582df403a96.png

maven执行以下命令

mvn -U clean package assembly:assembly -Dmaven.test.skip=true
7eafa8eeaa66d2915ce299753a49af7a.png

测试

以mysql到dm数据库为例

mysql建立表并插入数据

-- test.psn definition

CREATE TABLE `psn` (
  `id` int(11) NOT NULL,
  `name` text,
  `address` text
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO psn
(id, name, address)
VALUES(1, 'elite', 'gz');
INSERT INTO psn
(id, name, address)
VALUES(2, 'tom', 'bj');
INSERT INTO psn
(id, name, address)
VALUES(3, 'jack', 'sz');
INSERT INTO psn
(id, name, address)
VALUES(4, 'json', 'sh');

在达梦数据库里边创建一个表psn

CREATE TABLE test.psn
(
 id int NOT NULL,
 name VARCHAR(40) NULL,
 address VARCHAR(100)
);

配置mysql2dm.json

关系型数据库插件都差不多的配置

{
   "job": {
    "setting": {
      "speed": {
        "channel":2
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "splitPk": "id",
            "column": ["id","name","address"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://mysqlip:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
                "table": ["psn"]
              }
            ]
          }
        },
        "writer": {
          "name": "dmwriter",
          "parameter": {
            "username": "test",
            "password": "123456@dm",
            "column": ["id","name","address"],
            "connection": [
              {
                "table": [
                  "psn"
                ],
                "jdbcUrl": "jdbc:dm://ip:5236?schema=TEST"
              }
            ]
          }
        }
      }
    ]
  }
}

执行任务

执行任务可以用命令测试,详细可以参考官网,或者用java,本例以java为例

84640ca02f501a3a10ecb940fe2ba05c.png

查询下结果

016b602737ec549814115ddc1557c06f.png

c96703240207aac0930e9d818bfd55c9.jpeg

6db9125d83b547ff71727b23ab5de297.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2047267.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

eNSP 华为远程登录路由器

华为远程登录路由器 前提&#xff1a;主机能与路由器通信就行&#xff0c;如果不同网段就配路由协议&#xff0c;这里直接模拟直连通信 Cloud&#xff1a; R&#xff1a; <Huawei>sys [Huawei]sys R [R]int g0/0/0 [R-GigabitEthernet0/0/0] [R-GigabitEthernet0/0/0]i…

AQS 原理详解

日常开发中&#xff0c;我们经常使用锁或者其他同步器来控制并发&#xff0c;那么它们的基础框架是什么呢&#xff1f;如何实现的同步功能呢&#xff1f;本文将详细讲解构建锁和同步器的基础框架--AQS&#xff0c;并根据源码分析其原理。 一、什么是 AQS&#xff1f; (一) AQS…

Oracle+ASM+High冗余详解及空间计算

Oracle ASM&#xff08;Automatic Storage Management&#xff09;的High冗余模式是一种提供高度数据保护的策略&#xff0c;它通过创建多个数据副本来确保数据的可用性和安全性。 以下是关于Oracle ASM High冗余的详细解释&#xff1a; 一、High冗余的特点 1.数据冗余度 在Hi…

Java | Leetcode Java题解之第342题4的幂

题目: 题解&#xff1a; class Solution {public boolean isPowerOfFour(int n) {return n > 0 && (n & (n - 1)) 0 && n % 3 1;} }

【Datawhale AI夏令营第四期】 魔搭-大模型应用开发方向笔记 Task03 大咖项目分享 人话八股文Bakwaan_Buddy项目开发尝试

【Datawhale AI夏令营第四期】 魔搭-大模型应用开发方向笔记 Task03 人话八股文Bakwaan_Buddy项目开发尝试 Idea: 我们草台班子目前的想法是解决大家计算机学院毕业面临的BUG——不爱背、背不下来八股文&#xff0c;觉得枯燥、烦、工作了用不着&#xff0c;反正就是知识他不进…

Python酷库之旅-第三方库Pandas(085)

目录 一、用法精讲 356、pandas.Series.str.isnumeric方法 356-1、语法 356-2、参数 356-3、功能 356-4、返回值 356-5、说明 356-6、用法 356-6-1、数据准备 356-6-2、代码示例 356-6-3、结果输出 357、pandas.Series.str.isdecimal方法 357-1、语法 357-2、参数…

RabbitMQ的核心概念

RabbitMQ是一个消息中间件&#xff0c;也是一个生产者消费者模型&#xff0c;负责接收&#xff0c;存储和转发消息。 核心概念 Producer 生产者&#xff0c;是RabbitMQ Server的客户端&#xff0c;向RabbitMQ发送消息。 Consumer 消费者&#xff0c;是RabbitMQ Server的客…

Ps:首选项 - 单位与标尺

Ps菜单&#xff1a;编辑/首选项 Edit/Preferences 快捷键&#xff1a;Ctrl K Photoshop 首选项中的“单位与标尺” Units & Rulers选项卡允许用户根据工作需求定制 Photoshop 的测量单位和标尺显示方式。这对于保持工作的一致性和精确性&#xff0c;尤其是在跨设备或跨平台…

mybatis plus 查询部分源码分析,typehandler怎么实现的?FastjsonTypehandler 查询问题怎么解决?

我们在使用mysql的json字段的时候有时为了方便&#xff0c;最好是查询的时候直接反序列化为对象比较好&#xff0c;这时候我们就用到了typehandler这个属性 首先mybatis plus 会初始化一系列的 typeHandler,并且扫描用户设置的typeHandler路径&#xff08;mybatis-plus: type-…

Flutter-->AAPT: error: resource android:attr/lStar not found.

更新Flutter 3.24.0之后, 打包出现AAPT: error: resource android:attr/lStar not found.问题, 这里出一个我的解决方案. 更新Flutter 3.24.0之后, Android编译sdk需要使用34, 否则就会出现很多问题… 由于很多库都不可能及时更新适配到Android sdk 34, 所以可以等pub get将子…

硅谷物理服务器有哪些关键优势和特点

硅谷的物理服务器设施全球知名&#xff0c;为各类企业提供了卓越的IT基础设施支持。下面将逐一探讨硅谷物理服务器的关键优势和特点&#xff0c;rak小编为您整理发布硅谷物理服务器有哪些关键优势和特点。 1. 卓越的性能 高性能计算能力&#xff1a;硅谷的物理服务器采用最新一…

Authentik:开源身份提供商

Authentik 是一个开源身份提供商&#xff0c;旨在实现最大的灵活性和适应性。 它可轻松集成到现有环境中并支持新协议。 它是一个全面的解决方案&#xff0c;用于在您的应用程序中实现注册、帐户恢复等功能&#xff0c;无需手动管理这些任务。 Authentik 可以无缝集成到现有…

arcgis打开不同tif格式编码的栅格数据

1、如下图&#xff0c;将文件包包解压打开&#xff0c;看到【2020年GDP数据】。 2、点击进入【2020年GDP数据】文件夹如下图所示。接着去打开arcgis软件。 3、按照步骤来&#xff0c;在arcgis【目录】里面添加【文件夹】然后选中你刚刚解压的【GDP文件夹数据】&#xff0c;最…

21 注意力机制—自注意力

目录 1.自注意力和位置编码跟CNN,RNN对比位置编码(position encoding)1、和 CNN / RNN 不同,自注意力并没有记录位置信息2、为了使用序列的顺序信息,通过在输入表示中添加位置编码将位置信息注入到输入里3、P 的元素具体计算如下:位置编码矩阵绝对位置信息相对位置信息总…

Linux运维篇-yum命令报错 /lib64/libcurl.so.4相关

目录 项目场景&#xff1a;问题描述原因分析&#xff1a;解决方案&#xff1a; 项目场景&#xff1a; centos7&#xff0c;8&#xff0c;同样也适用openEuer&#xff0c;Kylin等redhat系的国产化操作系统 问题描述 在使用yum命令时报错&#xff1a; 主要报错信息为&#xff1…

诈骗未成功是否构成犯罪?

诈骗未成功不一定构成犯罪。在刑法上&#xff0c;构成诈骗罪需要满足特定的构成要件&#xff0c;包括有非法占有的目的、实施了虚构事实或隐瞒真相的行为、对方因此陷入错误认识并处分财产、行为人或第三方取得财产、被害人遭受财产损失。如果诈骗行为未能成功&#xff0c;即被…

[C#]基于winform结合photocartoon算法实现人物卡通化源码实现

【官方框架】 https://github.com/minivision-ai/photo2cartoon 简介 人像卡通风格渲染的目标是&#xff0c;在保持原图像ID信息和纹理细节的同时&#xff0c;将真实照片转换为卡通风格的非真实感图像。我们的思路是&#xff0c;从大量照片/卡通数据中习得照片到卡通画的映射…

HDRP管线下的开放世界游戏与跨平台优化,《仙剑世界》万字分享

《仙剑世界》作为仙剑 IP 系列的最新⻓篇⼒作&#xff0c;从故事和剧情上延续了仙剑的精髓。在仙剑 33 年的世界观下&#xff0c;《仙剑世界》打造出了⼀个由浪漫唯美的江南全景、磅礴恢弘的蜀⼭、神秘苗疆等区域构成的 384 平⽅公⾥完整的⽆缝开放⼤世界。以东⽅题材为起点&am…

Java入门-接口:JDK8开始接口新增方法,接口的多继承,接口注意事项

&#xff08;一&#xff09;新增接口注意事项&#xff1a; 接口A&#xff1a; package interface_jdk8;public interface A {//1.新增默认方法&#xff1a;必须使用defalut修饰&#xff0c;默认会被public修饰//注意&#xff1a;这种默认方法可以带方法体/*实例方法&#xff1…

openssh升级到9.8

升级步骤 1、查看版本 [rootlocalhost openssh-8.8p1]# ssh -V OpenSSH_8.8p1, OpenSSL 1.0.2k-fips 26 Jan 20172、下载安装包 cd /usr/local/src wget https://www.zlib.net/zlib-1.3.1.tar.gz wget https://www.openssl.org/source/openssl-3.2.1.tar.gz wget https://cdn.…