DataX-Oracle新增writeMode支持update

news2024/11/22 5:32:55

目录

前言

第一步下载源码

第二步修改源码

1、Oraclewriter

2、WriterUtil

 2.1、修改getWriteTemplate方法

 2.2、新增onMergeIntoDoString与getStrings方法

3、CommonRdbmsWriter

 3.1、修改startWriteWithConnection

 3.2、修改doBatchInsert

 3.3、修改fillPreparedStatement

第三步打包

第四步脚本修改

修改后jar包地址 




前言

目前 DataX更新到datax_v202309版本还不能支持Oracle写入的update,只通过DataX只能修改源码。

原理:oracle 不支持类似 MySQL的 REPLACE INTO 和 INSERT … ON DUPLICATE KEY UPDATE,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN
 [UPDATE sql] 
WHEN NOT MATCHED THEN 
 [INSERT sql]

第一步下载源码

 地址:datax_v202309。

第二步修改源码

一共修改3个文件

1、Oraclewriter

 

找到该代码直接注释掉就行。 

2、WriterUtil
 2.1、修改getWriteTemplate方法
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {
        boolean update = writeMode.trim().toLowerCase().startsWith("update");
        boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")
                || writeMode.trim().toLowerCase().startsWith("replace")
                || update;

        if (!isWriteModeLegal) {
            throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                    String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));
        }
        // && writeMode.trim().toLowerCase().startsWith("replace")
        String writeDataSqlTemplate;
        if (forceUseUpdate || update) {
            //update只在mysql下使用
            if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {
                writeDataSqlTemplate = new StringBuilder()
                        .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ","))
                        .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                        .append(")")
                        .append(onDuplicateKeyUpdateString(columnHolders))
                        .toString();
            }
            //update在Oracle下使用
            else if (dataBaseType == DataBaseType.Oracle) {
                writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" +
                        StringUtils.join(columnHolders, ",") +
                        ") VALUES(" + StringUtils.join(valueHolders, ",") +")";
            }else {
                throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                        String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));
            }
        } else {

            //这里是保护,如果其他错误的使用了update,需要更换为replace
            if (update) {
                writeMode = "replace";
            }
            writeDataSqlTemplate = new StringBuilder().append(writeMode)
                    .append(" INTO %s (").append(StringUtils.join(columnHolders, ","))
                    .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                    .append(")").toString();
        }

        return writeDataSqlTemplate;
    }
 2.2、新增onMergeIntoDoString与getStrings方法

代码作用:对Oracle进行update的MERGE拼接

public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {
        String[] sArray = getStrings(merge);
        StringBuilder sb = new StringBuilder();
        sb.append("MERGE INTO %s A USING ( SELECT ");
        boolean first = true;
        boolean first1 = true;
        StringBuilder str = new StringBuilder();
        StringBuilder update = new StringBuilder();
        for (String columnHolder : columnHolders) {
            if (Arrays.asList(sArray).contains(columnHolder)) {
                if (!first) {
                    sb.append(",");
                    str.append(" AND ");
                } else {
                    first = false;
                }
                str.append("TMP.").append(columnHolder);
                sb.append("?");
                str.append(" = ");
                sb.append(" AS ");
                str.append("A.").append(columnHolder);
                sb.append(columnHolder);
            }
        }

        for (String columnHolder : columnHolders) {
            if (!Arrays.asList(sArray).contains(columnHolder)) {
                if (!first1) {
                    update.append(",");
                } else {
                    first1 = false;
                }
                update.append(columnHolder);
                update.append(" = ");
                update.append("?");
            }
        }

        sb.append(" FROM DUAL ) TMP ON (");
        sb.append(str);
        sb.append(" ) WHEN MATCHED THEN UPDATE SET ");
        sb.append(update);
        sb.append(" WHEN NOT MATCHED THEN ");
        return sb.toString();
    }

    public static String[] getStrings(String merge) {
        merge = merge.replace("update", "");
        merge = merge.replace("(", "");
        merge = merge.replace(")", "");
        merge = merge.replace(" ", "");
        return merge.split(",");
    }
3、CommonRdbmsWriter
 3.1、修改startWriteWithConnection
        // 替换原先的代码块
        public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
            this.taskPluginCollector = taskPluginCollector;
            List<String> columns = new LinkedList<>();
            if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {
                String merge = this.writeMode;
                String[] sArray = WriterUtil.getStrings(merge);
                this.columns.forEach(column->{
                    if (Arrays.asList(sArray).contains(column)) {
                        columns.add(column);
                    }
                });
                this.columns.forEach(column->{
                    if (!Arrays.asList(sArray).contains(column)) {
                        columns.add(column);
                    }
                });
            }
            columns.addAll(this.columns);
            // 用于写入数据的时候的类型根据目的表字段类型转换
            this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ","));
            // 写数据库的SQL语句
            calcWriteRecordSql();
 
            List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
            int bufferBytes = 0;
            try {
                Record record;
                while ((record = recordReceiver.getFromReader()) != null) {
                    if (record.getColumnNumber() != this.columnNumber) {
                        // 源头读取字段列数与目的表字段写入列数不相等,直接报错
                        throw DataXException
                                .asDataXException(
                                        DBUtilErrorCode.CONF_ERROR,
                                        String.format(
                                                "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
                                                record.getColumnNumber(),
                                                this.columnNumber));
                    }
 
                    writeBuffer.add(record);
                    bufferBytes += record.getMemorySize();
 
                    if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
                        doBatchInsert(connection, writeBuffer);
                        writeBuffer.clear();
                        bufferBytes = 0;
                    }
                }
                if (!writeBuffer.isEmpty()) {
                    doBatchInsert(connection, writeBuffer);
                    writeBuffer.clear();
                    bufferBytes = 0;
                }
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        DBUtilErrorCode.WRITE_DATA_ERROR, e);
            } finally {
                writeBuffer.clear();
                bufferBytes = 0;
                DBUtil.closeDBResources(null, null, connection);
            }
        }
 3.2、修改doBatchInsert
 protected void doBatchInsert(Connection connection, List<Record> buffer)
                throws SQLException
        {
            PreparedStatement preparedStatement = null;
            try {
                connection.setAutoCommit(false);
                preparedStatement = connection
                        .prepareStatement(this.writeRecordSql);
                if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {
                    String merge = this.writeMode;
                    String[] sArray = WriterUtil.getStrings(merge);
                    for (Record record : buffer) {
                        List<Column> recordOne = new ArrayList<>();
                        for (int j = 0; j < this.columns.size(); j++) {
                            if (Arrays.asList(sArray).contains(this.columns.get(j))) {
                                recordOne.add(record.getColumn(j));
                            }
                        }
                        for (int j = 0; j < this.columns.size(); j++) {
                            if (!Arrays.asList(sArray).contains(this.columns.get(j))) {
                                recordOne.add(record.getColumn(j));
                            }
                        }
                        for (int j = 0; j < this.columns.size(); j++) {
                            recordOne.add(record.getColumn(j));
                        }
                        for (int j = 0; j < recordOne.size(); j++) {
                            record.setColumn(j, recordOne.get(j));
                        }
                        preparedStatement = fillPreparedStatement(
                                preparedStatement, record);
                        preparedStatement.addBatch();
                    }
                }
                else {
                    for (Record record : buffer) {
                        preparedStatement = fillPreparedStatement(
                                preparedStatement, record);
                        preparedStatement.addBatch();
                    }
                }
                preparedStatement.executeBatch();
                connection.commit();
            }
            catch (SQLException e) {
                LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());
                connection.rollback();
                doOneInsert(connection, buffer);
            }
            catch (Exception e) {
                throw DataXException.asDataXException(
                        DBUtilErrorCode.WRITE_DATA_ERROR, e);
            }
            finally {
                DBUtil.closeDBResources(preparedStatement, null);
            }
        }
 3.3、修改fillPreparedStatement
  protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
                throws SQLException
        {
            for (int i = 0; i < record.getColumnNumber(); i++) {
                int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
                String typeName = this.resultSetMetaData.getRight().get(i);
                preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,columnSqltype, typeName,record.getColumn(i));
            }
            return preparedStatement;
        }

第三步打包

1、只需要在idea里面打包修改的两个程序就可以

 2、打包成功后获取两个jar包

 3、将包替换到datax的插件里面

 将oraclewriter-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter

 将plugin-rdbms-util-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter\libs

第四步脚本修改

{
    "job": {
        "setting": {
            "speed": {
                 "byte": 1048576
            },
                "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "${r_username}",
                        "password": "${r_password}",
                        "connection": [
                            {	   
                                "querySql": ["SELECT f_year,f_code,f_name,f_order FROM tableName"],
                                "jdbcUrl": ["${r_jdbcUrl}"]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "oraclewriter",
                    "parameter": {
						"writeMode": "update(f_year,f_code)",
                        "username": "${w_username}",
                        "password": "${w_password}",
                        "column": [
                         "f_year","f_code","f_name","f_order"
                        ],
                        "session": [],
                        "preSql": [],
                        "connection": [
                            {
                                "jdbcUrl": "${w_jdbcUrl}",
                                "table": ["tableName"]
                            }
                        ]
                    }
			   }		   
            }
        ]
    }
}

参数 "writeMode": "update(f_year,f_code)" 里面f_year,f_code就是主键, 参数上不要加/"

update(\"f_year\",\"f_code\")这样是拼不上sql的,这个问题调试了好久才解决。

这时候运行就成功了

参考文章DataX 二次开发支持 Oracle 更新数据icon-default.png?t=N7T8https://blog.csdn.net/xch_yang/article/details/128250190?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-128250190-blog-106881907.235%5Ev43%5Epc_blog_bottom_relevance_base8&spm=1001.2101.3001.4242.1&utm_relevant_index=3Datax oracle 支持增量并且支持全量更新icon-default.png?t=N7T8https://blog.csdn.net/weixin_41250031/article/details/122615271?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&utm_relevant_index=7

修改后jar包地址 

懒得修改可以直接下载两个jar替换到你们的datax对应目录。

https://download.csdn.net/download/qq_36802726/89046154icon-default.png?t=N7T8https://download.csdn.net/download/qq_36802726/89046154

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1553688.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

苹果应用商店上架利器:推荐几款常用的应用发布工具

摘要 移动应用app上架是开发者关注的重要环节&#xff0c;但常常会面临审核不通过等问题。为帮助开发者顺利完成上架工作&#xff0c;各种辅助工具应运而生。本文探讨移动应用app上架原理、常见辅助工具功能及其作用&#xff0c;最终指出合理使用工具的重要性。 引言 移动应…

洛谷_P1803 凌乱的yyy / 线段覆盖_python写法

P1803 凌乱的yyy / 线段覆盖 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 这道题是不是用python做只能做到70分啊&#xff1f;&#xff1f; n int(input()) data [] for i in range(n):data.append(list(map(int,input().split())))data.sort(keylambda x:x[1])ans 1 mi…

Solidity Uniswap V2 Router swapTokensForExactTokens

最初的router合约实现了许多不同的交换方式。我们不会实现所有的方式&#xff0c;但我想向大家展示如何实现倒置交换&#xff1a;用未知量的输入Token交换精确量的输出代币。这是一个有趣的用例&#xff0c;可能并不常用&#xff0c;但仍有可能实现。 GitHub - XuHugo/solidit…

景联文科技高质量大模型训练数据汇总!

3月25日&#xff0c;2024年中国发展高层论坛年会上&#xff0c;国家数据局局长刘烈宏在“释放数据要素价值&#xff0c;助力可持续发展”的演讲中表示&#xff0c;中国10亿参数规模以上的大模型数量已超100个。 当前&#xff0c;国内AI大模型发展仍面临诸多困境。其中&#xff…

批量剪辑视频,批量调整片头片尾时长,批量剪辑更高效!

在视频剪辑的世界里&#xff0c;有时候我们需要对视频的片头片尾进行精细调整&#xff0c;以适应不同的需求和创意。然而&#xff0c;传统的视频剪辑软件往往操作繁琐&#xff0c;效率低下&#xff0c;让人望而却步。今天&#xff0c;我要为您介绍一种全新的批量剪辑方式&#…

一个传入省市区ID的级联框

省市区ID 功能edit页面(主要)script逻辑如何拿到当前级联下所有ID数组长ID数组是如何回显的 (1)长ID数组是如何回显的 (2) 功能 选择第一层传第一层下的所有 id 数组&#xff0c;选择第二层传递第二层以及第二层下的所有 id 数组 edit页面(主要) 编辑页的一个 Table&#xff0c…

Dynamo设置按链接视图显示

Hello大家好&#xff01;我是九哥~ 先来看一段视频&#xff1a; Dynamo设置链接视图 相信用Revit的小伙伴都用到过这个功能&#xff0c;就是在链接Revit模型时&#xff0c;为了便于出图&#xff0c;我们经常需要将链接模型从“按主体视图”改为“按链接视图”&#xff0c;这样能…

buy me a btc 使用数字货币进行打赏赞助

最近在调研使用加密货币打赏的平台&#xff0c;发现idatariver平台 https://idatariver.com 推出的buymeabtc功能刚好符合使用场景&#xff0c;下图为平台的演示项目, 演示项目入口 https://buymeabtc.com/idatariver 特点 不少人都听说过buymeacoffee&#xff0c;可以在上面发…

BabySQL【2019极客大挑战】

知识点&#xff1a; 功能分析 登录界面一般是 where username and password 可以从username出手&#xff0c;注释掉and语句单引号闭合绕过 通过测试和报错信息发现是一个单引号读取输入可以单引号闭合绕过关键字过滤 or and 过滤 || &&替换双写绕过select from wher…

【学习笔记】java项目—苍穹外卖day01

文章目录 苍穹外卖-day01课程内容1. 软件开发整体介绍1.1 软件开发流程1.2 角色分工1.3 软件环境 2. 苍穹外卖项目介绍2.1 项目介绍2.2 产品原型2.3 技术选型 3. 开发环境搭建3.1 前端环境搭建3.2 后端环境搭建3.2.1 熟悉项目结构3.2.2 Git版本控制3.2.3 数据库环境搭建3.2.4 前…

基于单片机环境监测温湿度PM2.5系统设计

**单片机设计介绍&#xff0c;基于单片机环境监测温湿度PM2.5系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机环境监测温湿度PM2.5系统是一个集成了传感器技术、单片机控制以及数据处理与显示功能的综合性系统…

30-3 越权漏洞 - 水平越权(横向越权)

环境准备:构建完善的安全渗透测试环境:推荐工具、资源和下载链接_渗透测试靶机下载-CSDN博客 一、定义 攻击者可以访问和操作与其拥有同级权限的用户资源。 示例: 学生A在教务系统上正常只能修改自己的作业内容,但由于不合理的权限校验规则等原因,学生A可以修改学生B的内…

五种免费的Python开发环境及具体下载网址

五种免费的Python开发环境及具体下载网址 目录 五种免费的Python开发环境及具体下载网址1.Anaconda2.PyCharm Community Edition3.Visual Studio Code4.Jupyter Notebook5. WinPython Python编程可选择不同的开发工具环境进行&#xff0c;本文介绍五种常用的&#xff0c;读者可…

R语言使用dietaryindex包计算NHANES数据多种营养指数(2)

健康饮食指数 (HEI) 是评估一组食物是否符合美国人膳食指南 (DGA) 的指标。Dietindex包提供用户友好的简化方法&#xff0c;将饮食摄入数据标准化为基于指数的饮食模式&#xff0c;从而能够评估流行病学和临床研究中对这些模式的遵守情况&#xff0c;从而促进精准营养。 该软件…

WhatsApp封号怎么办?看看原因与解封方法

相信各位小伙伴已经发现&#xff0c;WhatsApp新一轮风控已经启动&#xff0c;不少小伙伴已经受到封号潮的冲击。无论是老号还是新号都难以幸免。为了防止WhatsApp客户数据和聊天信息的丢失&#xff0c;针对封号的防封攻略请收藏&#xff01; 一、WhatsApp被封的8个原因 1、被过…

编译与链接(想了解编译与链接,那么看这一篇就足够了!)

前言&#xff1a;在我们练习编程的时候&#xff0c;我们只需要将代码写入、运行&#xff0c;就可以得到计算之后的结果了&#xff0c;但是你有没有想过&#xff0c;为什么就可以得到计算之后的结果呢&#xff0c;它的底层又到底是什么呢&#xff1f; ✨✨✨这里是秋刀鱼不做梦的…

算法之美:B+树原理、应用及Mysql索引底层原理剖析

B树的一种变种形式&#xff0c;B树上的叶子结点存储关键字以及相应记录的地址&#xff0c;同等存储空间下比B-Tree存储更多Key。非叶子节点不对关键字记录的指针进行保存&#xff0c;只进行数据索引 , 树的层级会更少 , 所有叶子节点都在同一层, 叶子节点的关键字从小到大有序排…

安装uim-ui插件不成功,成功解决

安装&#xff1a;这种安装&#xff0c;umi4 不支持&#xff0c;只有umi3才支持。而我发现官网现在默认使用的umi4。 yarn add umijs/preset-ui -D 解决&#xff1a;更改umi版本重新安装umi3 npm i ant-design/pro-cli3.1.0 -g #使用umi3 (指定umi3版本) pro create user-ce…

伪原创文章生成软件:自媒体文章写作好神器

自媒体的红利时代&#xff0c;许多人都纷纷参于其中&#xff0c;而文章写作是做自媒体的基本技能&#xff0c;但是随着技术的发展&#xff0c;如今&#xff0c;既使不会写作能力一样可以做起自媒体&#xff0c;方法就是利用伪原创文章生成软件来做内容的输出&#xff0c;其实伪…

【Vue3进阶】- 第2学堂小商城实战课程前言

该教程为进阶教程&#xff0c;如果你还不了解Vue3的基础知识&#xff0c;可以先前往Vue3基础教程&#xff0c;从入门到实战。 学习时遇到的任何疑问都欢迎在相应课文页面下方的问答区进行提问哦 我能学到什么&#xff1f; 编程写法千千万&#xff0c;实现需求是第一。 教程中…