异构数据同步 datax (2)-postgres 写扩展

news2025/1/11 12:35:31

1、postgres SQL 支持  插入更新操作(与mysql 语法有一定差异)

可参考下面文章

MySQL + PostgreSQL批量插入更新insertOrUpdate_mysql insert update-CSDN博客

2、datax中,可通过源码调整来实现

参考来源

https://juejin.cn/post/7124899170615296013

3、源码调整注意事项

datax : 版本 

源码下载,自行用idea进行打包编译,修改完如下类,

com.alibaba.datax.plugin.writer.postgresqlwriter.PostgresqlWriter

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil

编译替换jar文件名:

postgresqlwriter-0.0.1-SNAPSHOT.jar

plugin-rdbms-util-0.0.1-SNAPSHOT.jar

目录树如下:(plugin/writer/postgresqlwriter)

find <目录路径> | sed -e 's/[^-][^\/]*\//--/g' -e 's/--/|-/'

|-lib
|-bin
|-job
|-conf
|-log
|-log_perf
|-tmp
|-script
|-plugin
|---writer
|-----postgresqlwriter
|-------plugin_job_template.json
|-------plugin.json
|-------libs
|---------checker-qual-3.5.0.jar
|---------postgresql-42.3.3.jar
|---------commons-collections-3.0.jar
|---------druid-1.0.15.jar
|---------commons-lang3-3.3.2.jar
|---------logback-core-1.0.13.jar
|---------commons-io-2.4.jar
|---------datax-common-0.0.1-SNAPSHOT.jar
|---------guava-r05.jar
|---------plugin-rdbms-util-0.0.1-SNAPSHOT.jar
|---------hamcrest-core-1.3.jar
|---------logback-classic-1.0.13.jar
|---------commons-math3-3.1.1.jar
|---------slf4j-api-1.7.10.jar
|---------fastjson2-2.0.23.jar
|-------postgresqlwriter-0.0.1-SNAPSHOT.jar

4、使用

4.1 、可以支持带有唯一索引的表的新增或者更新

{
    "job":{
        "content":[
            {
                "reader":{
                    "name":"txtfilereader",
                    "parameter":{
                         "username": "root",
                        "password": "数据库密码",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"
                                ],
                                "querySql": [
                                    " SELECT * from  sys_test_copy1"
                                ]
                            }
                    }
                },
                "writer":{
                    "name":"postgresqlwriter",
                    "parameter":{
                        "writeMode": "update!@#(id)!@#(name)",
                        "column":[
                            "id",
                            "name"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":"jdbc:postgresql://127.0.0.1:5432/postgres",
                                "table":[
                                    "test_datax"
                                ]
                            }
                        ],
                        "password":"xxxx",
                        "username":"postgres"
                    }
                }
            }
        ],
        "setting":{
            "speed":{
                "channel":6
            }
        }
    }
}

4.2、根据主键进行新增或者更新

INSERT INTO sys_test_copy1(user_id, email) VALUES (5592, 'xxxx5@hotmail.com')  ON CONFLICT (user_id) do nothing;

对应的 job 可以按如下结构编写

{
    "job": {
        "setting": {
            "speed": {
                "channel": 5
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "数据库密码",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"
                                ],
                                "querySql": [
                                    " SELECT * from  sys_test_copy1"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "postgresqlwriter",
                    "parameter": {
                        "username": "postgres",
                        "password": "数据库密码",
                        "writeMode": "insert!@#(user_id)",
                        "column": [
                            "*"
                        ],                      
                        "connection": [
                            {
                                "table": [
                                    "sys_test_copy1"
                                ],
                                "jdbcUrl": "jdbc:postgresql://192.168.5.190:5432/xxxx",
                            }
                        ]
                    }
                }
            }
        ]
    }
}

其实都是写的 insert into on CONFLICT 语句

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil

下面的代码后续调整下规则,

private static String onDuplicateKeyUpdateString(String writeMode, List<String> columnHolders) {
        String[] writeModeArr = writeMode.split("!@#", -1);
        int writeModeArrLen = writeModeArr.length;
        writeMode = writeModeArr[0];

        StringBuilder sb = new StringBuilder();
        if ("insert".equals(writeMode) && writeModeArrLen == 2) {
            sb.append(" ON CONFLICT ").append(writeModeArr[1]).append(" do nothing");
        }

        if ("update".equals(writeMode) && writeModeArrLen == 3) {
            sb.append(" ON CONFLICT ").append(writeModeArr[1]);
            String[] updateFieldArr = writeModeArr[2].replace("(","").replace(")","").split(",", -1);

            List<String> updateSqlList = new ArrayList<>();
            for (String updateField : updateFieldArr) {
                if (!columnHolders.contains(updateField)) {
                    continue;
                }
                updateSqlList.add(updateField + "=EXCLUDED." + updateField);
            }

            if (updateSqlList.isEmpty()) {
                sb.append(" DO NOTHING");
            } else {
                sb.append(" DO UPDATE SET ").append(StringUtils.join(updateSqlList, ","));
            }
        }
        return sb.toString();
    }

小结:

pg插件,目前不支持插入更新操作,需要手工调整源码来适配。适配注意点,是根据你是否配置唯一索引来决定。(insert or update)

下期将简单介绍下,如果通过xxl-job 来执行 脚本

python datax.py ./job/mysql_postgres_job.json


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2053451.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用GPT-SoVITSS生成各种角色的语言

百度网盘 请输入提取码 项目来自b站UP主花儿不哭 一&#xff0c;先除去背景声音————人生伴奏出去背景声音 1.下载后&#xff0c;按下面路径打开&#xff0c;打开文件beta&#xff0c;打开go-webui程序 回车&#xff0c;然后稍等一下&#xff0c;等待网页打开 2.勾选如下…

解决部分软件在 Linux 下截屏黑屏,远程控制黑屏的问题

解决部分软件在 Linux 下截屏黑屏&#xff0c;远程控制黑屏的问题 1.黑屏问题 某些 Linux 发行版本默认使用的是Wayland显示协议&#xff0c;比如 ubuntu 22.04 以上版本、fedora、manjaro 等版本。某些 Linux 软件在使用 Wayland 显示服务器协议时&#xff0c;截屏时屏幕是黑…

嵌入式面经篇九——网络编程

文章目录 前言一、网络编程1、列举一下 OSI 协议的各种分层。说说你最熟悉的一层协议的功能。2、TCP/IP协议包括&#xff1f;3、TCP通信建立和释放的过程&#xff1f;端口的作用&#xff1f;4、IP地址转换成物理地址的协议&#xff1f;反之&#xff1f;5、IP 地址的编码分为哪两…

海外媒体发稿的投稿策略:如何撰写高质量的新闻稿?

发布国外新闻稿件是一个涉及多步骤的过程&#xff0c;旨在确保您的新闻稿能够有效覆盖目标受众。以下是一些关键步骤和实用的技巧&#xff0c;帮助你实现海外媒体发稿。 1. 明确目标和受众 首先&#xff0c;明确您发布新闻稿的目标&#xff0c;是为了增加品牌曝光、推出新产品…

AIGC:text2img - 文生图

当前手头上的定制化项目&#xff0c;可用训练数据较少&#xff0c;训练的模型效果不佳。所以通过 clip-interrogator 获取图片获取描述后&#xff0c;批量进行 文生图 以增加样本量。 在批量生成前&#xff0c;先简单评测一下当前的主流 文生图 模型。直接上效果&#xff1a; …

SQL非技术快速入门39题

※食用指南&#xff1a;文章内容为牛客网《非技术快速入门》39道题重点笔记&#xff0c;用于重复思考错题&#xff0c;加深印象。 练习传送门&#xff1a;SQL非技术快速入门39题 目录&#xff1a; SQL13 Where in 和Not in SQL19 分组过滤练习题 SQL20 分组排序练习题 SQL2…

DDPM | 扩散模型代码详解【较为详细细致!!!】

文章目录 1、UNet网络结构1.1 residual网络和attention网络的细节1.2 t 的作用1.3 DDPM 中的 Positional Embedding 的使用1.4 DDPM 中的 Positional Embedding 代码1.5 residual block1.6 attention block1.7 UNet结构 2、命令行参数解析3、数据的获取与预处理4、模型的训练框…

stm32的UART重定向printf()

1配置好uart 2打开usart.c文件 3在此文件前面添加头文件 4在末尾添加重定向代码 添加的代码 /* USER CODE BEGIN 1 *///加入以下代码,支持printf函数,而不需要选择use MicroLIB //#define PUTCHAR_PROTOTYPE int fputc(int ch, FILE *f) #if 1 //#pragma import(__use_n…

microsoft edge怎么关闭安全搜索

microsoft edge浏览器为用户提供了安全搜索功能&#xff0c;旨在帮助用户过滤掉搜索结果中出现的不当信息。然而&#xff0c;有些用户可能觉得安全搜索功能限制了他们的浏览体验或工作需求。下面就给大家带来关闭microsoft edge安全搜索的相关内容&#xff0c;一起来看看吧。&a…

java 函数接口Consumer简介与示例【函数式编程】【Stream】

Java 8 中的 消费者接口Consumer 是一个函数接口&#xff0c;它可以接受一个泛型 类型参数&#xff0c;它属于java.util.function包。 accept(T) 方法&#xff1a;是 Consumer 函数式接口的方法&#xff0c;传入单个输入参数&#xff0c;无返回值&#xff0c;可以用于 Lambda 表…

日本央行还会加息?机构与市场唱反调!

最近&#xff0c;关于日本央行是否会继续加息的话题引发了市场热议。一边是市场对加息预期大幅下降&#xff0c;另一边却有像先锋、M&G这样的国际知名资产管理公司坚定地看好日本央行的进一步紧缩。 这究竟是怎么回事呢&#xff1f; 市场与机构的观点分歧 市场看跌加息&am…

如何通过社交媒体有效促进口碑营销?

在一个广告无处不在的时代&#xff0c;大多数品牌不能再盲目的选择传统的广告轰炸&#xff0c;而应依靠口碑营销&#xff0c;通过消费者的自发传播实现了品牌的快速崛起。 口碑营销的几个关键的传播要素&#xff1a; 真实性&#xff1a;在广告反感的时代&#xff0c;消费者更倾…

Java常用API第二篇

正则表达式&#xff1a; 正则表达式&#xff08;简称 regex&#xff09;是用来描述字符串模式的工具&#xff0c;常用于字符串的查找、匹配、替换等操作。它在文本处理、数据验证、以及编程中非常常见。以下是正则表达式的基本知识点&#xff1a; 1. 正则表达式的基础符号 . (…

【Datawhale AI夏令营第四期】 魔搭-大模型应用开发方向笔记 Task04 RAG模型 人话八股文Bakwaan_Buddy项目创空间部署

【Datawhale AI夏令营第四期】 魔搭-大模型应用开发方向笔记 Task04 RAG模型 人话八股文Bakwaan_Buddy项目创空间部署 什么是RAG&#xff1a; 我能把这个过程理解为Kimi.ai每次都能列出的一大堆网页参考资料吗&#xff1f;Kimi学了这些资料以后&#xff0c;根据这里面的信息综…

期权有指定的交易场所吗?

在当前的金融市场环境下&#xff0c;设立专门的股票期权交易所&#xff0c;比如上交所&#xff0c;深交所和中金所&#xff0c;都是国内出门的交易场所&#xff0c;不过期权交易都是在券商和期货开通账户买卖&#xff0c;但这并不妨碍投资者通过其他途径参与期权投资&#xff0…

DHCP原理及实验

目录 1.基础知识 &#xff08;1&#xff09;基本概念 &#xff08;2&#xff09;DHCP优点 2.工作原理 3.私接路由器解决方法 4.实验搭建 &#xff08;1&#xff09;实验top &#xff08;2&#xff09;实验概述 5.配置命令 &#xff08;1&#xff09;基础配置 &#…

音境剧场:开启声学体验的全新纪元—轻空间

在现代建筑和空间设计中&#xff0c;声学环境越来越受到重视。一个好的声学空间&#xff0c;不仅能提升使用者的感受&#xff0c;还能为各种活动提供最优质的音效体验。作为创新声学空间的代表作&#xff0c;“音境剧场”应运而生&#xff0c;它不仅是一个多功能场馆&#xff0…

FunHPC算力平台评测

作为内测老用户&#xff0c;已经用DeepLn平台&#xff08;现改名为FunHPC平台&#xff09;好久了&#xff0c;一路见证了平台从最初100多人的小群到现在满群的状态&#xff0c;FunHpc平台确实在一步步的走向成熟&#xff0c;一步步的变大。趁着现在活动的时间&#xff0c;发篇文…

ant design pro 中用户的表单如何控制多个角色

ant design pro 如何去保存颜色ant design pro v6 如何做好角色管理ant design 的 tree 如何作为角色中的权限选择之一ant design 的 tree 如何作为角色中的权限选择之二ant design pro access.ts 是如何控制多角色的权限的 看上面的图片 当创建或编辑一个用户时&#xff0c;…

vue3里面的组件实例类型(包括原生的html标签类型)

在 通过 ref&#xff08;null&#xff09;获取组件的时候&#xff0c;我们想要为 组件标注组件类型&#xff0c;可以通过 any 类型来进行标注&#xff0c;但是很明显&#xff0c;这些的代码很不优雅&#xff0c;所以我们可以利用 vue3 里面的 InstanceType 来进行类型标注 这是…