datax-Oracle新增writeMode支持

news2024/9/24 18:22:44

1.在com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter中注释此内容,以让oracle支持writeMode模式

2.在com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil中,增加对oracle的判断,将getWriteTemplate修改为如下内容

public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {
        String mode = writeMode.trim().toLowerCase();
        boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")
                || writeMode.trim().toLowerCase().startsWith("replace")
                || writeMode.trim().toLowerCase().startsWith("update");

        if (!isWriteModeLegal) {
            throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                    String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));
        }
        // && writeMode.trim().toLowerCase().startsWith("replace")
        String writeDataSqlTemplate;
        if (forceUseUpdate || mode.startsWith("update")) {
            //update只在mysql下使用

            if((dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl)){
                writeDataSqlTemplate = new StringBuilder()
                        .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ","))
                        .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                        .append(")")
                        .append(onDuplicateKeyUpdateString(columnHolders))
                        .toString();
            }if (dataBaseType == DataBaseType.Oracle) {
                writeDataSqlTemplate = new StringBuilder().append(onMergeIntoDoString(writeMode, columnHolders, valueHolders)).append("INSERT (")
                        .append(StringUtils.join(columnHolders, ","))
                        .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                        .append(")").toString();
            } else {
                throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                        String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));
            }

        } else {

            //这里是保护,如果其他错误的使用了update,需要更换为replace
            if (writeMode.trim().toLowerCase().startsWith("update")) {
                writeMode = "replace";
            }
            writeDataSqlTemplate = new StringBuilder().append(writeMode)
                    .append(" INTO %s (").append(StringUtils.join(columnHolders, ","))
                    .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                    .append(")").toString();
        }

        return writeDataSqlTemplate;
    }


public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {
        String[] sArray = getStrings(merge);
        StringBuilder sb = new StringBuilder();
        sb.append("MERGE INTO %s A USING ( SELECT ");

        boolean first = true;
        boolean first1 = true;
        StringBuilder str = new StringBuilder();
        StringBuilder update = new StringBuilder();
        for (String columnHolder : columnHolders) {
            if (Arrays.asList(sArray).contains(columnHolder)) {
                if (!first) {
                    sb.append(",");
                    str.append(" AND ");
                } else {
                    first = false;
                }
                str.append("TMP.").append(columnHolder);
                sb.append("?");
                str.append(" = ");
                sb.append(" AS ");
                str.append("A.").append(columnHolder);
                sb.append(columnHolder);
            }
        }

        for (String columnHolder : columnHolders) {
            if (!Arrays.asList(sArray).contains(columnHolder)) {
                if (!first1) {
                    update.append(",");
                } else {
                    first1 = false;
                }
                update.append(columnHolder);
                update.append(" = ");
                update.append("?");
            }
        }

        sb.append(" FROM DUAL ) TMP ON (");
        sb.append(str);
        sb.append(" ) WHEN MATCHED THEN UPDATE SET ");
        sb.append(update);
        sb.append(" WHEN NOT MATCHED THEN ");
        return sb.toString();
    }

public static String[] getStrings(String merge) {
        merge = merge.replace("update", "");
        merge = merge.replace("(", "");
        merge = merge.replace(")", "");
        merge = merge.replace(" ", "");
        return merge.split(",");
    }

 3.在com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter中,修改startWriteWithConnection为以下内容

        public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
            this.taskPluginCollector = taskPluginCollector;

            List<String> columns = new LinkedList<>();
            if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {
                String merge = this.writeMode;
                String[] sArray = WriterUtil.getStrings(merge);
                this.columns.forEach(column->{
                    if (Arrays.asList(sArray).contains(column)) {
                        columns.add(column);
                    }
                });
                this.columns.forEach(column->{
                    if (!Arrays.asList(sArray).contains(column)) {
                        columns.add(column);
                    }
                });
            }
            columns.addAll(this.columns);

            // 用于写入数据的时候的类型根据目的表字段类型转换
            this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
                    this.table, StringUtils.join(columns, ","));
            // 写数据库的SQL语句
            calcWriteRecordSql();

...

4.在com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter中,修改doBatchInsert为以下内容

 

protected void doBatchInsert(Connection connection, List<Record> buffer)
    throws SQLException
{
    PreparedStatement preparedStatement = null;
    try {
        connection.setAutoCommit(false);
        preparedStatement = connection
            .prepareStatement(this.writeRecordSql);
        if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {
            String merge = this.writeMode;
            String[] sArray = WriterUtil.getStrings(merge);
            for (Record record : buffer) {
                List<Column> recordOne = new ArrayList<>();
                for (int j = 0; j < this.columns.size(); j++) {
                    if (Arrays.asList(sArray).contains(this.columns.get(j))) {
                        recordOne.add(record.getColumn(j));
                    }
                }
                for (int j = 0; j < this.columns.size(); j++) {
                    if (!Arrays.asList(sArray).contains(this.columns.get(j))) {
                        recordOne.add(record.getColumn(j));
                    }
                }
                for (int j = 0; j < this.columns.size(); j++) {
                    recordOne.add(record.getColumn(j));
                }
                for (int j = 0; j < recordOne.size(); j++) {
                    record.setColumn(j, recordOne.get(j));
                }
                preparedStatement = fillPreparedStatement(
                    preparedStatement, record);
                preparedStatement.addBatch();
            }
        }
        else {
            for (Record record : buffer) {
                preparedStatement = fillPreparedStatement(
                    preparedStatement, record);
                preparedStatement.addBatch();
            }
        }
        preparedStatement.executeBatch();
        connection.commit();
    }
    catch (SQLException e) {
        LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());
        connection.rollback();
        doOneInsert(connection, buffer);
    }
    catch (Exception e) {
        throw DataXException.asDataXException(
            DBUtilErrorCode.WRITE_DATA_ERROR, e);
    }
    finally {
        DBUtil.closeDBResources(preparedStatement, null);
    }
}

5.在com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter中,修改fillPreparedStatement为以下内容

        // 直接使用了两个类变量:columnNumber,resultSetMetaData
        protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
                throws SQLException {
            for (int i = 0; i < this.resultSetMetaData.getLeft().size(); i++) {
                int columnSqlType = this.resultSetMetaData.getMiddle().get(i);
                String typeName = this.resultSetMetaData.getRight().get(i);
                String column = this.resultSetMetaData.getLeft().get(i);
                Column columnValue = record.getColumn(this.columns.indexOf(column));
                preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqlType, typeName,columnValue);
            }

            return preparedStatement;
        }

6.至此修改完成,进行测试

{
 "core": {
  "transport": {
   "channel": {
    "speed": {
     "record": 2000
     }
    }
   }
 },
  "job": {
    "setting": {
      "speed": {
        "record": 2000,
      "errorLimit": {
        "record": 0,
        "percentage": 0
      }      },
    },
    "content": [
      {
        "reader": {
	"name": "oraclereader",
	"parameter": {
		"username": "xxx",
		"password": "xxx",
		"connection": [
			{
				"jdbcUrl": ["jdbc:oracle:thin:@xxx:1521:orcl"],
				"querySql": ["select id,user_id,group_id,create_time from test1"]
			}
		]
	}
},
        "transformer": [],
        "writer": {
	"name": "oraclewriter",
	"parameter": {
		"username": "xxx",
		"password": "xxx",
		"postSql": [ "" ],
		"preSql": [ "" ],
		"column": [  "ID", "USER_ID", "GROUP_ID", "CREATE_TIME" ],
		"writeMode": "update(ID)",
		"connection": [
			{
				"jdbcUrl": "jdbc:oracle:thin:@xxx:1521:orcl",
				"table": [
					"TEST2"
				]
			}
		]
	}
}
      }
    ]
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/689145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在文件夹中获取某个文件的绝对路径

#!/bin/bash -lpathfind $(pwd) -name *.ipaecho ${path}写成下面这样也是可以的 path$(find $(pwd) -name *.ipa)如图所示&#xff0c;Export 文件夹下有.ipa文件&#xff0c;我们目前想获取.ipa文件的绝对路径 执行结果如下 192:Jenkins liubo$ cd /Users/liubo/Desktop/…

C# 通过委托实现多个窗口之间的传值

之前用qt写的时候&#xff0c;都有信号和槽来实现&#xff0c;用C#的话应该也有类似的 大概实现的是我在父窗口当中new了两个子窗口&#xff0c;这个两个子窗口都可以将处理完的数据传递给父窗口&#xff0c;并且两个子窗口通过父窗口进行通信。 我这就按上面窗口名称来说明代…

pipeline实现二次还原

通过mode参数确定是否发布还是回滚&#xff0c;在满足rollback条件下&#xff0c;列举出我们的所有的备份的目录&#xff0c;根据回滚条件选择索要回滚的目录(目录是根据时间戳来判断创建的文件) pipeline {agent anyparameters {choice(name: mode, choices: [deploy,rollbac…

HA 自动化-通知提醒

配置->场景自动化->创建自动化 notify.notify

Gitlab群组及项目仓库搭建

1、新建群组 2、新建项目 3、克隆到Visualstudio 复制克隆地址&#xff0c;克隆到本地 这里会让你登录账号 可以添加成员并邀请ta进项目组 从已注册用户列表中选择 4、Git工作流 回顾一下Git工作流&#xff0c;工程人员只需要从Develop分支新建自己的分支即可。分支命名以姓名…

设计模式学习之抽象工厂模式

设计模式系列往期文章 设计模式学习之策略模式设计模式学习之策略模式在前端的应用设计模式学习之简单工厂模式设计模式学习之工厂方法模式 如果你已经理解了工厂方法模式&#xff0c;那你能够很快的明白抽象工厂模式。 温习&#xff1a;什么是工厂方法模式 我们先温习一下…

【面试题19】B-Tree和B+Tree的区别,以及B+Tree在MySQL中的应用

文章目录 一、前言二、关于B-Tree和B Tree2.1 B-Tree2.2 BTree 三、B Tree与B Tree的差异3.1 叶子节点的差异3.2 数据访问的差异3.3 范围查询的差异 四、Mysql中BTree的应用场景4.1 主键索引4.2 唯一索引4.3 普通索引 五、MySQL为什么使用BTree来做索引&#xff1f;总结 一、前…

【网络安全】IP地址定位技术的应用场景

随着科技的不断发展&#xff0c;网络空间已经成为人们生活中重要的一部分。而其中&#xff0c;IP地址定位技术又是网络空间不可或缺的一部分。IP地址定位技术是一种可以根据IP地址确定用户位置的技术这项技术不仅可以用于个人定位&#xff0c;也可以使用在商业领域、网络安全和…

【新星计划·2023】认识和学习BASH(一)

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 作者会持续更新网络知识和python基础知识&#xff0c;期待你的关注 目录 一、认识BASH 1、硬件、核心与Shell ①硬件 ②核心管理 ③应用程序 2、为何要学文字接口的shell&#xff1f; 3、系统的合法shel…

学习AQS

面试的时候被问到AQS会不会一脸懵逼呢?今天来学习一下AQS吧! 看看这个听起来高大上又难以回答的东西到底是个啥? 1.什么是AQS JUC包中 的很多同步器如ReentrantLock、Semaphore、CountDownLatch等等都是有一些基础的共同的行为,比如:等待队列、条件队列、独占获取、共享…

IntelliJ IDEA - IDEA 如何快速生成 serialVersionUID?

问题描述 今天遇到序列化问题的时候&#xff0c;需要用到 serialVersionUID 字段&#xff0c;如下。但是不知道如何快速自动生成这个&#xff0c;于是有了这篇博客&#xff0c;哈哈哈 private static final long serialVersionUID 6828716364537510652L; 解决方案 首先如图…

STM32F系列项目定时器配置-中断触发源详解

1.以高级定时器为例&#xff1a; 2.触发源选择与极性选择&#xff1a; 3.触发源介绍 (282条消息) 【STM32技巧】&#xff08;1&#xff09;STM32定时器8种触发源之ITR0~ITR3说明_stm32 定时器触发定时器_小石头有大内涵的博客-CSDN博客 ITR0内部触发0ITR1内部触发1ITR2内部触…

为什么ECB模式不安全\链接模式【密码学】(6)

目录 一、链接模式 二、ECB模式 三、链接模式缺失带来什么问题 一、链接模式 之前讲过&#xff0c;链接模式就是将是一个分组运算和下一个分组运算联系起来。 上一个分组运算的所有结果都有可能参与到下一个分组运算中&#xff1b;下一个分组运算的每一个要素&#xff0c…

Unity3d_shader_Transparency(解决透明材质物体重叠穿透问题)

解决两个透明材质物体穿透问题 【Unity】半透明なオブジェクトを綺麗に表示するシェーダを導入する - コガネブログ https://github.com/ewersp/Shaders https://github.com/ewersp/Shaders/blob/master/BetterTransparentDiffuse.shader // An improvement to the default …

Java --- springboot3整合kafka

一、消息队列场景 1.1、异步 1.2、解耦 1.3、削峰 1.4、缓冲 二、springboot整合kafka 导入pom依赖 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency> 修改配置 spring.…

CSS基础学习--21 img ( 图片 )

一、使用 CSS 创建图片廊 <!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>CSS基础学习-图片</title> <style> div.img {margin: 5px;border: 1px solid #ccc;float: left;width: 180px; }div.img:hover…

【系统架构】软件架构技术发展路线概览

发展主线&#xff1a; 模块化编程/面向对象编程构件技术面向服务开发技术云技术 注&#xff1a;点击查看大图 任何新技术、新方向和新思路的出现都会融入软件架构的发展历程中。

排序算法——冒泡排序

冒泡排序 算法步骤 以升序排序为例&#xff1a; 比较相邻元素&#xff0c;如果前面的比后面的元素大&#xff0c;则两元素交换位置对每一对相邻元素进行比较&#xff0c;大的放后&#xff0c;这样最后的元素将是最大的元素对越来越少的混乱元素重复上述步骤&#xff08;最后…

OpenCV——《bitwise_and》mask的操作以及直方图的操作

1.bitwise_and和mask操作 bitwise_and该函数是一个and操作当两者全为1的时候才会为1&#xff0c;有0则0. import cv2 import numpy as np import matplotlib.pyplot as pltdef cv_show(name,img):cv2.imshow(name,img)cv2.waitKey()cv2.destroyAllWindows() maskmask np.zer…

【Java】JVM学习(一)

JVM是一种规范 Java程序的执行过程 一个 Java 程序&#xff0c;首先经过 javac 编译成 .class 文件&#xff0c;然后 JVM 将其加载到方法区&#xff0c;执行引擎将会执行这些字节码。执行时&#xff0c;会翻译成操作系统相关的函数。JVM 作为 .class 文件的翻译存在&#xff…