批量生成datax同步JSON(postgresql到doris)

news2024/10/5 18:23:48

img

1.问题描述

使用datax同步psql数据到doris,表的数量过多,写datax的配置文件很麻烦。鉴于此,编写了一个datax的配置文件生成脚本,可以灵活的实现一键生成配置文件,提高生产效率。
废话不多说,脚本如下

2.问题解决

vim gen_import_psql_config_simple.py

批量生成datax同步JSON(postgresql到doris)

# coding=utf-8
import json
import getopt
import os
import sys
import psycopg2

#MySQL相关配置,需根据实际情况作出修改
psql_host = "xxx"
psql_port = "xxx"
psql_user = "xxx"
psql_passwd = "xxx"

#HDFS NameNode相关配置,需根据实际情况作出修改
doris_host = "xxx"
doris_port = "xxx"
doris_http_port = "xxx"
doris_user = "xxx"
doris_passwd = "xxx"
sink_database = "xxx"
condition = True


#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/data/job"


def get_connection(database):
    return psycopg2.connect(host=psql_host, port=int(psql_port), user=psql_user, password=psql_passwd,database=database,options="-c search_path=information_schema,public")


def get_psql_meta(database, schema,table):
    connection = get_connection(database)
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME from columns WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [schema, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall
def get_mysql_columns(database,schema ,table):
    return list(map(lambda x: x[0], get_psql_meta(database,schema, table)))
def get_psql_columns(database,schema ,table):
    return list(map(lambda x: f'\"{x[0]}\"', get_psql_meta(database,schema, table)))


def generate_json(source_database,source_schema, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 1
                }
            },
            "content": [{
                "reader": {
                    "name": "postgresqlreader",
                    "parameter": {
                        "username": psql_user,
                        "password": psql_passwd,
                        "column": get_psql_columns(source_database,source_schema, source_table),
                        "fetchSize": 1024,
                        "where": "1 = 1",
                        "connection": [{
                            "table": [source_schema + "." +source_table],
                            "jdbcUrl": ["jdbc:postgresql://" + psql_host + ":" + psql_port + "/" + source_database]
                        }]
                    }
                },

                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "loadUrl": [doris_host + ":" + doris_port],
                        "column": get_mysql_columns(source_database,source_schema, source_table),
                        "username": doris_user,
                        "password": doris_passwd,
                        "flushInterval":30000,
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://" + doris_host + ":" + doris_http_port + "/" + sink_database,
                                "selectedDatabase": sink_database,
                                "table": [source_table]
                            }
                        ],
                        "loadProps": {
                            "format": "json",
                            "strip_outer_array": condition
                        }
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_schema,source_table, "json"])), "w") as f:
        json.dump(job, f)

def main(args):
    source_database = ""
    source_table = ""
    source_schema = ""

    options, arguments = getopt.getopt(args, '-d:-s:-t:', ['sourcedb=','sourceschema=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-s', '--sourceschema'):
            source_schema = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value
    generate_json(source_database,source_schema, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

3.脚本使用

python ./gen_import_psql_config_simple.py -d psql_database -s psql_schema -t psql_table

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1366821.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue入门二(列表渲染|数据的双向绑定|事件处理)

文章目录 一、列表渲染小案例补充es6对象写法v-for可以循环的类型补充js可循环类型key值的解释 二、数据的双向绑定三、事件处理基本使用过滤案例事件修饰符 一、列表渲染 小案例 <!DOCTYPE html><html lang"en"><head><meta charset"UTF…

跨平台的传输协议@WebDav协议@windows系统配置WedDav服务器@局域网内的WebDav传输系统

文章目录 WebDav协议基本信息启用必要的windows功能启动站点管理器IIS站点根目录访问权限设置站点的功能设置端口通行防火墙IMME文件类型(文件后缀)其他设备登录和访问本机的WebDav服务站点 小结优点缺点 refs WebDav 协议基本信息 来自wikipedia:基于Web的分布式编写和版本控…

数字IC芯片设计实现 | 时序Timing Signoff check_timing检查解析

今天分享在数字IC芯片设计实现做timing signoff阶段必须要看的report。check_timing的报告必须是clean的&#xff0c;否则芯片回来大概率是废片&#xff01;&#xff01;&#xff01;实际上一堆公司的芯片败在不看这个report了。 我们知道primetime(简称PT)做时序检查是基于我…

RT-Thread: 基于STM32CubeMX配置驱STM32驱动的USB虚拟串口调试

关键词&#xff1a;USB 虚拟串口 USB虚拟串口&#xff0c;RT-Thread Studio&#xff0c;STM32 说明&#xff1a; 1&#xff1a;文档记录 STM32F103系列基于 RT-Thread 系统的 USB虚拟串口的开启及数据收发应用流程介绍。 2&#xff1a;本文以STM32F103C8T6型号做测试&#x…

Java-伪共享

在说这个计算机术语之前&#xff0c;我先在这里问候所有问“什么是JVM伪共享”的垃圾JAVA程序员以及一瓶不满半瓶晃荡的面试官全家 我从来没想过国内已经很卷的JAVA圈&#xff0c;已经卷到语无伦次的地步了&#xff0c;“伪共享”是java程序员应该知道的吗&#xff1f;能问出这…

【Linux Shell】5. 运算符

文章目录 【 1. expr 命令 】【 2. 算术运算符 】【 3. 关系运算符 】【 4. 布尔运算符 】【 5. 逻辑运算符 】【 6. 字符串运算符 】【 7. 文件测试运算符 】 【 1. expr 命令 】 原生 bash 不支持简单的数学运算&#xff0c;但是可以通过其他命令来实现&#xff0c;例如 awk …

基于SSM图书管理系统【源码】【最详细运行文档】

SSM图书管理系统【源码】【最详细运行文档】 系统简介系统涉及系统运行系统演示源码获取 系统简介 以往的图书馆管理事务处理主要使用的是传统的人工管理方式&#xff0c;这种管理方式存在着管理效率低、操作流程繁琐、保密性差等缺点&#xff0c;长期的人工管理模式会产生大量…

超维空间M1无人机使用说明书——52、ROS无人机二维码识别与降落

引言&#xff1a;使用二维码引导无人机实现精准降落&#xff0c;首先需要实现对二维码的识别和定位&#xff0c;可以参考博客的二维码识别和定位内容。本小节主要是通过获取拿到的二维码位置&#xff0c;控制无人机全向的移动和降落&#xff0c;分为两种&#xff0c;一种是无人…

【JAVA】final、finally、finalize 有什么区别?

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; JAVA ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 final&#xff1a; finally&#xff1a; finalize&#xff1a; 结语 我的其他博客 前言 在Java中&#xff0c;final、f…

适合培训协会搭建的培训机构管理系统开发方案

一、项目背景与目标 &#xff08;一&#xff09;项目背景 培训学校教务管理系统是培训机构数字化管理的必备系统&#xff0c;该系统功能大大提升机构办学的管理效率、提升机构在家长心中的专业度&#xff0c;市面上的培训机构管理系统收费越来越贵&#xff0c;为了给协会内培…

CMake入门教程【核心篇】静态库 (.a, .lib)

😈「CSDN主页」:传送门 😈「Bilibil首页」:传送门 😈「动动你的小手」:点赞👍收藏⭐️评论📝 文章目录 概述创建静态库添加静态库到你的项目完整代码示例实战使用技巧与注意事项总结与分析概述 静态库在C++开发中扮演着重要的角色。它们通常以.a(在Unix-like系统

django websocket实现聊天室功能

注意事项channel版本 django2.x 需要匹配安装 channels 2 django3.x 需要匹配安装 channels 3 Django3.2.4 channels3.0.3 Django3.2.* channels3.0.2 Django4.2 channles3.0.5 是因为最新版channels默认不带daphne服务器 直接用命令 python manage.py runsever 默认运行的是w…

python协程asyncio的应用,async,await,loop

关于协程&#xff0c;asyncio&#xff0c;async&#xff0c;await&#xff0c;loop的概念&#xff0c;参照上一篇文章可迭代对象&#xff0c;迭代器&#xff0c;生成器&#xff0c;协程-CSDN博客 上一章我们详细的讲解了上述各个名词的概念&#xff0c;但是这些东西实际上该怎…

Dash+Plotly | Web应用开发(1)

本文为https://github.com/CNFeffery/DataScienceStudyNotes的学习笔记&#xff0c;部分源码来源于此仓库。 本期内容主要为基础概念、web布局方法和交互回调。 文章目录 Dash的主要模块Highlightlayoutcallback 惰性交互阻止初次回调忽略回调匹配错误控制部分回调输出不更新获…

计算机毕业设计----SSM场地预订管理系统

项目介绍 本项目分为前后台&#xff0c;前台为普通用户登录&#xff0c;后台为管理员登录&#xff1b; 用户角色包含以下功能&#xff1a; 按分类查看场地,用户登录,查看网站公告,按分类查看器材,查看商品详情,加入购物车,提交订单,查看订单,修改个人信息等功能。 管理员角…

linux安装codeserver实现云端开发

先看图 下载安装包 https://github.com/coder/code-server/releases 找到code-server-版本号-linux-amd64.tar.gz&#xff0c;我这里是code-server-4.16.1-linux-amd64.tar.gz 1、使用acrm用户登录目标服务器 2、切换root用户&#xff0c;创建 vscode 用户&#xff0c;并设…

selenium对于页面改变的定位元素处理办法

在学习selenimu中&#xff0c;总是发现元素定位不到&#xff0c;想了各种办法&#xff0c;最后总结大致有两个原因。 1.等待时间不够&#xff0c;页面还没有完全渲染就进行操作&#xff0c;使用time模块进行等待。 2.换了页面后&#xff0c;发现定位不到元素&#xff0c;因为…

外包做了1个月,技术退步一大半了。。。

先说一下自己的情况&#xff0c;本科生&#xff0c;20年通过校招进入深圳某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…

新颖度爆表。网络药理学+PPI+分子对接+实验验证

今天给同学们分享一篇生信文章“The convergent application of metabolites from Avena sativa and gut microbiota to ameliorate non-alcoholic fatty liver disease: a network pharmacology study”&#xff0c;这篇文章发表在J Transl Med期刊上&#xff0c;影响因子为7.…

LeetCode-58/709

1.最后一个单词的长度&#xff08;58&#xff09; 题目描述&#xff1a; 给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 思路&…