1.问题描述
使用datax同步psql数据到doris,表的数量过多,写datax的配置文件很麻烦。鉴于此,编写了一个datax的配置文件生成脚本,可以灵活的实现一键生成配置文件,提高生产效率。
废话不多说,脚本如下
2.问题解决
vim gen_import_psql_config_simple.py
批量生成datax同步JSON(postgresql到doris)
# coding=utf-8
import json
import getopt
import os
import sys
import psycopg2
#MySQL相关配置,需根据实际情况作出修改
psql_host = "xxx"
psql_port = "xxx"
psql_user = "xxx"
psql_passwd = "xxx"
#HDFS NameNode相关配置,需根据实际情况作出修改
doris_host = "xxx"
doris_port = "xxx"
doris_http_port = "xxx"
doris_user = "xxx"
doris_passwd = "xxx"
sink_database = "xxx"
condition = True
#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/data/job"
def get_connection(database):
return psycopg2.connect(host=psql_host, port=int(psql_port), user=psql_user, password=psql_passwd,database=database,options="-c search_path=information_schema,public")
def get_psql_meta(database, schema,table):
connection = get_connection(database)
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME from columns WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [schema, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall
def get_mysql_columns(database,schema ,table):
return list(map(lambda x: x[0], get_psql_meta(database,schema, table)))
def get_psql_columns(database,schema ,table):
return list(map(lambda x: f'\"{x[0]}\"', get_psql_meta(database,schema, table)))
def generate_json(source_database,source_schema, source_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": psql_user,
"password": psql_passwd,
"column": get_psql_columns(source_database,source_schema, source_table),
"fetchSize": 1024,
"where": "1 = 1",
"connection": [{
"table": [source_schema + "." +source_table],
"jdbcUrl": ["jdbc:postgresql://" + psql_host + ":" + psql_port + "/" + source_database]
}]
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": [doris_host + ":" + doris_port],
"column": get_mysql_columns(source_database,source_schema, source_table),
"username": doris_user,
"password": doris_passwd,
"flushInterval":30000,
"connection": [
{
"jdbcUrl": "jdbc:mysql://" + doris_host + ":" + doris_http_port + "/" + sink_database,
"selectedDatabase": sink_database,
"table": [source_table]
}
],
"loadProps": {
"format": "json",
"strip_outer_array": condition
}
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_schema,source_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
source_database = ""
source_table = ""
source_schema = ""
options, arguments = getopt.getopt(args, '-d:-s:-t:', ['sourcedb=','sourceschema=', 'sourcetbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--sourcedb'):
source_database = opt_value
if opt_name in ('-s', '--sourceschema'):
source_schema = opt_value
if opt_name in ('-t', '--sourcetbl'):
source_table = opt_value
generate_json(source_database,source_schema, source_table)
if __name__ == '__main__':
main(sys.argv[1:])
3.脚本使用
python ./gen_import_psql_config_simple.py -d psql_database -s psql_schema -t psql_table