DataX 二次开发支持 Oracle 更新数据

news2024/12/28 19:36:30

文章目录

        • 1、原理
        • 2、源码修改
          • 2.1 OracleWriter注释对writeMode的限制
          • 2.2 WriterUtil,增加oracle逻辑
          • 2.3 CommonRdbmsWriter.Task修改
          • 2.4 测试

前文回顾:
《DataX 及 DataX-Web 安装使用详解》
《DataX 源码调试及打包》
《DataX-Web 源码调试及打包》

目前很多主流数据库支持 on duplicate key update(当主键冲突update数据)模式,DataX 也支持通过配置 writeMode 来配置写入模式。但是目前Oracle只支持insert配置项

如何适配 Oracle 数据库 on duplicate key update 模式,今天大佬超就来带你进行二次开发。


1、原理

writeMode 的 insert、replace、update 配置项底层采用 INSERT INTOREPLACE INTO/INSERT INTO … ON DUPLICATE KEY UPDATE 语句:

其中 insert into 当主键/唯一性索引冲突时会写不进去冲突的行;后两者没有遇到主键/唯一性索引冲突时与 insert into 行为一致,遇到冲突时会用新行替换原有行所有字段。

oracle 不支持类似 MySQL的 REPLACE INTOINSERT … ON DUPLICATE KEY UPDATE,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN
 [UPDATE sql] 
WHEN NOT MATCHED THEN 
 [INSERT sql]

merge 语法其实就是存在就更新,不存在就插入。

示例:

MERGE INTO USERS A USING ( SELECT 18 AS "ID",'chaodev' AS "USER_ID" FROM DUAL ) TMP 
ON (TMP."ID" = A."ID" AND TMP."USER_ID" = A."USER_ID" ) 
WHEN MATCHED THEN 
UPDATE SET "USER_NAME" = '大佬超',"USER_PHONE" = '18000000000',"LASTUPDATETIME" = SYSDATE 
WHEN NOT MATCHED THEN 
INSERT ("ID","USER_ID","USER_NAME","USER_PHONE","LASTUPDATETIME") VALUES(18,'chaodev','大佬超','18000000000',SYSDATE)

所以最终实现原理就是:更改datax 的 oraclewriter 源码,通过 merge into 语句,实现 UPSERT 语义。


2、源码修改

涉及修改的类和方法如下:

oraclewriter包

com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter:修改允许用户配置 writeMode。

plugin-dbms-util包

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil:增加oralce的逻辑代码。

com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter:CommonRdbmsWriter.Task类替换 startWriteWithConnection() 、doBatchInsert() 和fillPreparedStatement() 方法。


2.1 OracleWriter注释对writeMode的限制

在这里插入图片描述


2.2 WriterUtil,增加oracle逻辑

在这里插入图片描述

增加oracle的逻辑代码,如下

/**
* 新增oracle update模块
* @author 程序员大佬超
* @date 20221202
* @param columnHolders
* @param valueHolders
* @param writeMode
* @param dataBaseType
* @param forceUseUpdate
* @return
*/
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders,
                                      String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate)
{
    String mode = writeMode.trim().toLowerCase();
    boolean isWriteModeLegal = mode.startsWith("insert") || mode.startsWith("replace") || mode.startsWith("update");

    if (!isWriteModeLegal) {
        throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                                              String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));
    }
    String writeDataSqlTemplate;
    if (forceUseUpdate || mode.startsWith("update")) {
        if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {
            writeDataSqlTemplate = new StringBuilder()
                .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ","))
                .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                .append(")")
                .append(onDuplicateKeyUpdateString(columnHolders))
                .toString();
        }
        else if (dataBaseType == DataBaseType.Oracle) {
            writeDataSqlTemplate = new StringBuilder().append(onMergeIntoDoString(writeMode, columnHolders, valueHolders)).append("INSERT (")
                .append(StringUtils.join(columnHolders, ","))
                .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                .append(")").toString();
        }
        else {
            throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                                                  String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));
        }
    }
    else {
        //这里是保护,如果其他错误的使用了update,需要更换为replace
        if (writeMode.trim().toLowerCase().startsWith("update")) {
            writeMode = "replace";
        }
        writeDataSqlTemplate = new StringBuilder().append(writeMode)
            .append(" INTO %s (").append(StringUtils.join(columnHolders, ","))
            .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
            .append(")").toString();
    }

    return writeDataSqlTemplate;
}

调用的方法新增如下,主要就是拼接merge语句:

public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {
    String[] sArray = getStrings(merge);
    StringBuilder sb = new StringBuilder();
    sb.append("MERGE INTO %s A USING ( SELECT ");

    boolean first = true;
    boolean first1 = true;
    StringBuilder str = new StringBuilder();
    StringBuilder update = new StringBuilder();
    for (String columnHolder : columnHolders) {
        if (Arrays.asList(sArray).contains(columnHolder)) {
            if (!first) {
                sb.append(",");
                str.append(" AND ");
            } else {
                first = false;
            }
            str.append("TMP.").append(columnHolder);
            sb.append("?");
            str.append(" = ");
            sb.append(" AS ");
            str.append("A.").append(columnHolder);
            sb.append(columnHolder);
        }
    }

    for (String columnHolder : columnHolders) {
        if (!Arrays.asList(sArray).contains(columnHolder)) {
            if (!first1) {
                update.append(",");
            } else {
                first1 = false;
            }
            update.append(columnHolder);
            update.append(" = ");
            update.append("?");
        }
    }

    sb.append(" FROM DUAL ) TMP ON (");
    sb.append(str);
    sb.append(" ) WHEN MATCHED THEN UPDATE SET ");
    sb.append(update);
    sb.append(" WHEN NOT MATCHED THEN ");
    return sb.toString();
}

public static String[] getStrings(String merge) {
    merge = merge.replace("update", "");
    merge = merge.replace("(", "");
    merge = merge.replace(")", "");
    merge = merge.replace(" ", "");
    return merge.split(",");
}

2.3 CommonRdbmsWriter.Task修改

修改 startWriteWithConnection() 方法

/**
* 更改适配oracle update
* @author 程序员大佬超
* @date 20221202
* @param recordReceiver
* @param taskPluginCollector
* @param connection
*/
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection)
{
    this.taskPluginCollector = taskPluginCollector;
    List<String> mergeColumns = new ArrayList<>();

    if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {
        LOG.info("write oracle using {} mode", this.writeMode);
        List<String> columnsOne = new ArrayList<>();
        List<String> columnsTwo = new ArrayList<>();
        String merge = this.writeMode;
        String[] sArray = WriterUtil.getStrings(merge);
        for (String s : this.columns) {
            if (Arrays.asList(sArray).contains(s)) {
                columnsOne.add(s);
            }
        }
        for (String s : this.columns) {
            if (!Arrays.asList(sArray).contains(s)) {
                columnsTwo.add(s);
            }
        }
        int i = 0;
        for (String column : columnsOne) {
            mergeColumns.add(i++, column);
        }
        for (String column : columnsTwo) {
            mergeColumns.add(i++, column);
        }
    }
    mergeColumns.addAll(this.columns);

    // 用于写入数据的时候的类型根据目的表字段类型转换
    this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
                                                      this.table, StringUtils.join(mergeColumns, ","));
    // 写数据库的SQL语句
    calcWriteRecordSql();

    List<Record> writeBuffer = new ArrayList<>(this.batchSize);
    int bufferBytes = 0;
    try {
        Record record;
        while ((record = recordReceiver.getFromReader()) != null) {
            if (record.getColumnNumber() != this.columnNumber) {
                // 源头读取字段列数与目的表字段写入列数不相等,直接报错
                throw DataXException
                    .asDataXException(
                    DBUtilErrorCode.CONF_ERROR,
                    String.format(
                        "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
                        record.getColumnNumber(),
                        this.columnNumber));
            }

            writeBuffer.add(record);
            bufferBytes += record.getMemorySize();

            if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
                doBatchInsert(connection, writeBuffer);
                writeBuffer.clear();
                bufferBytes = 0;
            }
        }
        if (!writeBuffer.isEmpty()) {
            doBatchInsert(connection, writeBuffer);
            writeBuffer.clear();
        }
    }
    catch (Exception e) {
        throw DataXException.asDataXException(
            DBUtilErrorCode.WRITE_DATA_ERROR, e);
    }
    finally {
        writeBuffer.clear();
        DBUtil.closeDBResources(null, null, connection);
    }
}

修改 doBatchInsert() 方法

/**
* 更改适配oracle update
* @author 程序员大佬超
* @date 20221202
* @param connection
* @param buffer
* @throws SQLException
*/
protected void doBatchInsert(Connection connection, List<Record> buffer)
    throws SQLException
{
    PreparedStatement preparedStatement = null;
    try {
        connection.setAutoCommit(false);
        preparedStatement = connection
            .prepareStatement(this.writeRecordSql);
        if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {
            String merge = this.writeMode;
            String[] sArray = WriterUtil.getStrings(merge);
            for (Record record : buffer) {
                List<Column> recordOne = new ArrayList<>();
                for (int j = 0; j < this.columns.size(); j++) {
                    if (Arrays.asList(sArray).contains(this.columns.get(j))) {
                        recordOne.add(record.getColumn(j));
                    }
                }
                for (int j = 0; j < this.columns.size(); j++) {
                    if (!Arrays.asList(sArray).contains(this.columns.get(j))) {
                        recordOne.add(record.getColumn(j));
                    }
                }
                for (int j = 0; j < this.columns.size(); j++) {
                    recordOne.add(record.getColumn(j));
                }
                for (int j = 0; j < recordOne.size(); j++) {
                    record.setColumn(j, recordOne.get(j));
                }
                preparedStatement = fillPreparedStatement(
                    preparedStatement, record);
                preparedStatement.addBatch();
            }
        }
        else {
            for (Record record : buffer) {
                preparedStatement = fillPreparedStatement(
                    preparedStatement, record);
                preparedStatement.addBatch();
            }
        }
        preparedStatement.executeBatch();
        connection.commit();
    }
    catch (SQLException e) {
        LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());
        connection.rollback();
        doOneInsert(connection, buffer);
    }
    catch (Exception e) {
        throw DataXException.asDataXException(
            DBUtilErrorCode.WRITE_DATA_ERROR, e);
    }
    finally {
        DBUtil.closeDBResources(preparedStatement, null);
    }
}

修改 fillPreparedStatement() 方法

/**
* 更改适配oracle update
* @author 程序员大佬超
* @date 20221202
* @param preparedStatement
* @param record
* @return
* @throws SQLException
*/
protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
    throws SQLException
{
    for (int i = 0; i < record.getColumnNumber(); i++) {
        int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
        preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,
                                                            columnSqltype, record.getColumn(i));
    }
    return preparedStatement;
}

2.4 测试

重新打包后,测试job

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "byte": 1048576
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": [
              "`id`",
              "`user_id`",
              "`user_password`",
              "`user_name`",
              "`user_phone`",
              "`email`",
              "`nick_name`",
              "`head_url`",
              "`sex`",
              "`state`",
              "`create_time`",
              "`create_user`",
              "`lastUpdateTime`"
            ],
            "splitPk": "",
            "connection": [
              {
                "table": [
                  "users"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/im"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "oraclewriter",
          "parameter": {
            "username": "yxc",
            "password": "123456",
            "column": [
              "\"ID\"",
              "\"USER_ID\"",
              "\"USER_PASSWORD\"",
              "\"USER_NAME\"",
              "\"USER_PHONE\"",
              "\"EMAIL\"",
              "\"NICK_NAME\"",
              "\"HEAD_URL\"",
              "\"SEX\"",
              "\"STATE\"",
              "\"CREATE_TIME\"",
              "\"CREATE_USER\"",
              "\"LASTUPDATETIME\""
            ],
            "writeMode": "update(\"ID\",\"USER_ID\")",
            "connection": [
              {
                "table": [
                  "USERS"
                ],
                "jdbcUrl": "jdbc:oracle:thin:@//192.168.157.142:1521/orcl"
              }
            ]
          }
        }
      }
    ]
  }
}

注:writeMode的update括号里字段要加上\"

查看运行日志,可以看到已经正确拼接了MERGE语句:
在这里插入图片描述

在这里插入图片描述

任务执行成功。切换 insert 和 update 模式检验,无异常。



更多技术干货,请持续关注程序员大佬超。
原创不易,转载请注明出处。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/75746.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2022年四川建筑八大员(土建施工员)考试试题及答案

百分百题库提供建筑八大员&#xff08;土建&#xff09;考试试题、建筑八大员&#xff08;土建&#xff09;考试预测题、建筑八大员&#xff08;土建&#xff09;考试真题、建筑八大员&#xff08;土建&#xff09;证考试题库等,提供在线做题刷题&#xff0c;在线模拟考试&…

RabbitMQ基础概念

文章目录RabbitMQ介绍AMQPErlang架构模型PublisherConnectionChannelVirtual HostExchangeBindingConsumerRabbitMQ介绍 RabbitMQ是实现了高级消息队列协议&#xff08;AMQP&#xff09;的开源消息代理软件&#xff08;亦称面向消息的中间件&#xff09;。RabbitMQ服务器是用Er…

Qt-数据库开发-事务提交(3)

Qt-数据库开发-通过QSqlTableModel显示和修改数据&#xff0c;开启事务 文章目录Qt-数据库开发-通过QSqlTableModel显示和修改数据&#xff0c;开启事务1、概述2、实现效果3、主要代码4、完整源代码更多精彩内容&#x1f449;个人内容分类汇总 &#x1f448;&#x1f449;数据库…

毕设选题推荐基于python的django框架的疫苗预约接种管理系统

&#x1f496;&#x1f525;作者主页&#xff1a;计算机毕设老哥&#x1f525; &#x1f496; 精彩专栏推荐订阅&#xff1a;在 下方专栏&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; Java实战项目专栏 Python实…

新型网络接入控制技术

1.NAC技术 1.1简介 网络接入控制(Network Access Control&#xff0c;简称NAC)是由思科(Cisco)主导的产业级协同研究成果&#xff0c;NAC可以协助保证每一个终端在进入网络前均符合网络安全策略。NAC技术可以提供保证端点设备在接入网络前完全遵循本地网络内需要的安全策略&a…

list容器模拟实现

&#x1f4cb; 个人简介 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是菀枯&#x1f61c; &#x1f389; 支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f4ac;格言&#xff1a;不要在低谷沉沦自己&#xff0c;不要在高峰上放弃努力&am…

布局福建市场,维也纳酒店欧暇·地中海酒店能否为投资人带来信心与底气?

近日&#xff0c;锦江酒店&#xff08;中国区&#xff09;“一城一海&#xff0c;暇享好时光”——欧暇地中海漳州长泰凯悦广场店开业典礼隆重举办。 与此同时&#xff0c;锦江酒店&#xff08;中国区&#xff09;维也纳酒店&欧暇地中海酒店品牌厦门推介会也圆满落地。在本…

Able2Extract Professional识别引擎经过微调

Able2Extract Professional识别引擎经过微调 改进的表格检测-现在&#xff0c;您可以在自定义PDF到Microsoft Excel转换过程中更准确地确定类似表格结构和内容的位置。 改进了表与列标题的分离-表识别引擎经过改进&#xff0c;可以检测和识别具有单个标题的多列表。 改进的PDF文…

echarts map地图中绘制浙江省市区县乡镇多级联动边界下钻的最新geojson数据文件获取和更新

文章目录ECharts Map地图的显示GeoJSON数据文件获取在ECharts中绘制浙江省的数据ECharts Map地图的显示 ECharts支持地理坐标显示&#xff0c;专门提供了一个geo组件&#xff0c;在setOption中提供option.geo配置即可显示地图。 option.geo配置中有个map属性&#xff0c;取值…

HBase中的Compaction详解

Compaction的作用 由于memstore每次刷写都会生成一个新的HFile&#xff0c;且同一个字段的不同版本&#xff08;timestamp&#xff09;和不同类型&#xff08;Put/Delete&#xff09;有可能会分布在不同的 HFile 中&#xff0c;因此查询时需要遍历所有的 HFile。为了减少 HFile…

PHPMYADMIN 无法编辑 MYSQL 解决方法

本想通过镜像重新把老站点搭建起来拷贝点文章内容,登录后台时发现忘记了密码,想着通过 PHPMyAdmin 修改 Mysql 数据库内容是非常简答的,万万没想到如下图提示错误:#1030 Get error -1 from storage engine,当时就想到可能因 InnoDB 引擎问题导致,查看在 Mysql 的 my.cnf …

C#学习记录——Windows计算器的制作【实例】

参考《C#从入门到项目实践》边学习&#xff0c;边练习实现。 Windows计算器的制作 此次练习的计算器应用软件在Visual Studio 2019编程环境中开发&#xff0c;是一个简单的窗体应用程序&#xff0c;实现简单的计算器功能。 1、系统功能描述 Windows计算器是利用C#程序设计编…

InputStreamReader构造函数的四种方式实现

InputStreamReader类的构造函数 InputStreamReader(InputStream in) //创建InputStreamReader对象&#xff0c;构造方法中传递输入流&#xff0c;使用默认字符集InputStreamReader(InputStream in, String charsetName) //创建InputStreamReader对象&#xff0c;构造方法中传递…

国考省考行测:主体分析法,高频词往往是主体,没有主体也能说语意主旨,故事型材料对比分析法,积极引申大道理

国考省考行测&#xff1a;主体分析法&#xff0c;高频词往往是主体&#xff0c;没有主体也能说语意主旨&#xff0c;故事型材料对比分析法&#xff0c;积极引申大道理 2022找工作是学历、能力和运气的超强结合体! 公务员特招重点就是专业技能&#xff0c;附带行测和申论&#…

Redis主从复制,哨兵模式和集群模式

主从复制 什么是主从复制 主从复制是高可用Redis的基础&#xff0c;哨兵和集群都是在主从复制基础上实现高可用的。主从复制主要实现了数据的多机备份&#xff0c;以及对于读操作的负载均衡和简单的故障恢复。缺陷&#xff1a;故障恢复无法自动化&#xff1b;写操作无法负载均…

JSP ssh网上家具店系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 JSP ssh网上家具店系统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模 式开发。开发环境为TOMCAT7.0,M…

shell编程(三)--awk

本以为只是个命令&#xff0c;学起来这就是语言么&#xff0c;参看man手册多试吧 格式 awk pattern{action} <file> ​ A pattern can be: ​ BEGIN ​ END ​ expression 示例&#xff1a; ​ awk {print $0} awk.txt ​ print是打印命令&#xff0c;awk.txt是我们为…

JavaScript—实现手风琴画册

✅作者简介&#xff1a;热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏&#xff1a;前端案例分…

EM算法——投硬币样例实现

理论参考 【机器学习】EM——期望最大&#xff08;非常详细&#xff09; 样例介绍 有c个硬币&#xff0c;每次随机选一个投掷n次&#xff0c;重复执行m次并记录结果。 根据投掷结果计算出每个硬币正面的概率。 每次选择的硬币未知。 过程介绍 随机初始化硬币为正的概率 he…

阿里P8终于整理出:Nginx+jvm+MySQL+Docker+Spring实战技术文档

前言 都说程序员工资高、待遇好&#xff0c; 2022 金九银十到了&#xff0c;你的小目标是 30K、40K&#xff0c;还是 16薪的 20K&#xff1f;作为一名 Java 开发工程师&#xff0c;当能力可以满足公司业务需求时&#xff0c;拿到超预期的 Offer 并不算难。然而&#xff0c;提升…