批量生成ChunJun json任务脚本

news2025/1/8 4:56:42

        最近在研究chunjun,它是一款稳定、易用、高效、批流一体的数据集成框架。一直在用chunjun做数据抽取测试,json任务重复地在写,感觉十分浪费时间,于是想写个自动生成json脚本。

1.设计模板

        模板通过excel设计,主要记录任务中一些参数,每一行就是一个任务,如:MySQL库的ip、端口、库表还有hive的库表、hive数据存储路径 等等

 2.编写Python代码

 2.1.读取模板任务

def readList_extract_Info():
    """
    获取 模板 中的 整体 任务 数据
    :return: 返回 所有任务 集合
    """
    row_count = table.nrows-1
    for row_item in range(row_count):
        count = row_item+1
        list = table.row_values(count)
        job_list.append(list)
    return job_list

2.2.查询表分区并创建添加分区sql

        chunjun 好像和 datax一样,不支持动态分区,所以在 数据抽取之前,需要创建分区,自动生成添加分区脚本。

def create_partition(job_list):
    sql_list=[]
    # 循环 出 每一个 任务 信息
    for i in range(len(job_list)):
        # 拼接 出 创建 分区 sql
        sql = "alter table " + job_list[i][8] + "." + job_list[i][9] + " add if not exists partition("

        # 判断 任务 中 是否 为分区表,如果为分区表,那么就要根据 任务 中的分区值 创建分区
        if job_list[i][10]=="1":
            # 取出 分区 字段名
            partN_list = job_list[i][11].split(",")
            # 取出 分区 字段值
            partV_list = job_list[i][12].split(",")

            # 判断 分区字段个数是否 和 分区字段值个数 一致
            if len(partN_list)==len(partV_list):
                for item in range(len(partN_list)):
                    # 将 分区字段名称 和 分区字段值 合并 类似:dt="2023",time="2024"
                    partName = partN_list[item]+"=\""+partV_list[item]+"\" "
                    if item == len(partN_list)-1:
                        sql = sql + partName+");"
                    else:
                        sql = sql + partName+","
                sql_list.append(sql)
                print(job_list[i][9]+"---->添加分区sql 创建成功!----> "+sql)
            else:
                print("分区字段个数不匹配,填写有误,不添加分区")
        else:
            print(job_list[i][9]+"---->不是分区表")

    #判断 存储路径是否存在
    if not os.path.exists(output_path):
        # 不存在 创建
        os.makedirs(output_path)
    # 打开 存储文件,并写入 添加分区 sql
    with open(os.path.join(output_path, "create_partition_sql.sql"), "w", encoding='UTF-8') as f:
        for i in sql_list:
            f.write(i+"\n")

2.3.获取MySQL连接

def get_connection(mysql_host,mysql_port,mysql_user,mysql_passwd):
    return pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, passwd=mysql_passwd)

2.4.获取数据源表的元数据

        获取数据源表的字段名、字段类型

def get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):
    """
    获取 mysql的元数据
    :param database:
    :param table:
    :param mysql_host:
    :param mysql_port:
    :param mysql_user:
    :param mysql_passwd:
    :return:
    """
    connection = get_connection(mysql_host,mysql_port,mysql_user,mysql_passwd)
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS " \
          "WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

       把字段转换成想要 数组 [map]的形式:

注释:python3 需要把 map 外面再 套一层 list,不然会出异常

def get_mysql_columns(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):
    return list(map(lambda x:{"name":x[0],"type":x[1]},get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd)))

2.5.数据源表字段类型转换

注释:python3 需要把 map 外面再 套一层 list,不然会出异常

把字段转换成想要 数组 [map]的形式:

def get_hive_columns(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]
    meta = get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd)

    return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))

2.6.生成json文件模型

def generate_json(list):
    # 判断 Hive的存储 路径 是否 填写
    if len(list[13])==0:
        list[13] ="/user/hive/warehouse/"
    # 判断 Hive的存储 路径 最后一个字符 为 /
    if list[13][-1]!="/":
        list[13]+="/"

    # 拼接 hive 文件 存储 路径 /user/hive/warehouse/stg.db/stu/
    path = list[13]+list[8]+".db/"+list[9]+"/"

    # 判断 是否 有分区 ,循环 分区
    if list[10]=="1":
        partN_list = list[11].split(",")
        partV_list = list[12].split(",")
        if len(partN_list) == len(partV_list):
            # hive 表若有分区,它的存储路径拼接  /user/hive/warehouse/stg.db/stu/dt=2023/time=2024
            for item in range(len(partN_list)):
                partName = partN_list[item] + "=" + partV_list[item] + "/"
                path=path+partName


    job = {"job": {
            "setting": {
                "speed": {
                    "channel": 1
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": list[2],
                        "password": list[3],
                        "column": get_mysql_columns(list[4], list[5],list[0],int(list[1]),list[2],list[3]),
                        "splitPk": "",
                        "connection": [{
                            "table": [list[5]],
                            "jdbcUrl": ["jdbc:mysql://" + list[0] + ":" + str(int(list[1])) + "/" + list[4]]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + list[6] + ":" + str(int(list[7])),
                        "fileType": list[15],
                        "path": path,
                        #"fileName": source_table,
                        "column": get_hive_columns(list[4], list[5],list[0],int(list[1]),list[2],list[3]),
                        "writeMode": list[14],
                        "fieldDelimiter": "\t"
                    }
                }
            }]
        }
    }

    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([list[8], list[9], "json"])), "w", encoding='UTF-8') as f:
        json.dump(job, f)
    print("数据源表:" + list[5] + " 数据抽取到目标表:" + list[9] + " [chunjun json脚本已创建在【"+output_path+"】目录下]")

3.整体代码

# ecoding=utf-8
import json
import getopt
import os
import sys
import pymysql
import xlrd

# 打开文件
data = xlrd.open_workbook("F:\\模板.xlsx")

# 获取第一个sheet内容
table = data.sheet_by_index(0)

job_list=[]

#生成添加分区文件和json脚本的目标路径,可根据实际情况作出修改
output_path = "F:\\"

def readList_extract_Info():
    """
    获取 模板 中的 整体 任务 数据
    :return: 返回 所有任务 集合
    """
    row_count = table.nrows-1

    for row_item in range(row_count):
        count = row_item+1
        list = table.row_values(count)
        job_list.append(list)
    return job_list


def create_partition(job_list):
    sql_list=[]
    # 循环 出 每一个 任务 信息
    for i in range(len(job_list)):
        # 拼接 出 创建 分区 sql
        sql = "alter table " + job_list[i][8] + "." + job_list[i][9] + " add if not exists partition("

        # 判断 任务 中 是否 为分区表,如果为分区表,那么就要根据 任务 中的分区值 创建分区
        if job_list[i][10]=="1":
            # 取出 分区 字段名
            partN_list = job_list[i][11].split(",")
            # 取出 分区 字段值
            partV_list = job_list[i][12].split(",")

            # 判断 分区字段个数是否 和 分区字段值个数 一致
            if len(partN_list)==len(partV_list):
                for item in range(len(partN_list)):
                    # 将 分区字段名称 和 分区字段值 合并 类似:dt="2023",time="2024"
                    partName = partN_list[item]+"=\""+partV_list[item]+"\" "
                    if item == len(partN_list)-1:
                        sql = sql + partName+");"
                    else:
                        sql = sql + partName+","
                sql_list.append(sql)
                print(job_list[i][9]+"---->添加分区sql 创建成功!----> "+sql)
            else:
                print("分区字段个数不匹配,填写有误,不添加分区")
        else:
            print(job_list[i][9]+"---->不是分区表")

    #判断 存储路径是否存在
    if not os.path.exists(output_path):
        # 不存在 创建
        os.makedirs(output_path)
    # 打开 存储文件,并写入 添加分区 sql
    with open(os.path.join(output_path, "create_partition_sql"), "w", encoding='UTF-8') as f:
        for i in sql_list:
            f.write(i+"\n")

def get_connection(mysql_host,mysql_port,mysql_user,mysql_passwd):
    """
    mysql 连接
    :param mysql_host:
    :param mysql_port:
    :param mysql_user:
    :param mysql_passwd:
    :return:
    """
    return pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, passwd=mysql_passwd)


def get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):
    """
    获取 mysql的元数据
    :param database:
    :param table:
    :param mysql_host:
    :param mysql_port:
    :param mysql_user:
    :param mysql_passwd:
    :return:
    """
    connection = get_connection(mysql_host,mysql_port,mysql_user,mysql_passwd)
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS " \
          "WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):
    """
    获取 mysql 表 字段
    :param database:
    :param table:
    :param mysql_host:
    :param mysql_port:
    :param mysql_user:
    :param mysql_passwd:
    :return:
    """
    return list(map(lambda x:{"name":x[0],"type":x[1]},get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd)))



def get_hive_columns(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]
    meta = get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd)

    return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))


def generate_json(list):
    # 判断 Hive的存储 路径 是否 填写
    if len(list[13])==0:
        list[13] ="/user/hive/warehouse/"
    # 判断 Hive的存储 路径 最后一个字符 为 /
    if list[13][-1]!="/":
        list[13]+="/"

    # 拼接 hive 文件 存储 路径 /user/hive/warehouse/stg.db/stu/
    path = list[13]+list[8]+".db/"+list[9]+"/"

    # 判断 是否 有分区 ,循环 分区
    if list[10]=="1":
        partN_list = list[11].split(",")
        partV_list = list[12].split(",")
        if len(partN_list) == len(partV_list):
            # hive 表若有分区,它的存储路径拼接  /user/hive/warehouse/stg.db/stu/dt=2023/time=2024
            for item in range(len(partN_list)):
                partName = partN_list[item] + "=" + partV_list[item] + "/"
                path=path+partName


    job = {"job": {
            "setting": {
                "speed": {
                    "channel": 1
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": list[2],
                        "password": list[3],
                        "column": get_mysql_columns(list[4], list[5],list[0],int(list[1]),list[2],list[3]),
                        "splitPk": "",
                        "connection": [{
                            "table": [list[5]],
                            "jdbcUrl": ["jdbc:mysql://" + list[0] + ":" + str(int(list[1])) + "/" + list[4]]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + list[6] + ":" + str(int(list[7])),
                        "fileType": list[15],
                        "path": path,
                        #"fileName": source_table,
                        "column": get_hive_columns(list[4], list[5],list[0],int(list[1]),list[2],list[3]),
                        "writeMode": list[14],
                        "fieldDelimiter": "\t"
                    }
                }
            }]
        }
    }

    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([list[8], list[9], "json"])), "w", encoding='UTF-8') as f:
        json.dump(job, f)
    print("数据源表:" + list[5] + " 数据抽取到目标表:" + list[9] + " [chunjun json脚本已创建在【"+output_path+"】目录下]")


if __name__ == '__main__':
    job_list=readList_extract_Info()
    create_partition(job_list)

    for i in job_list:
        generate_json(i)



4.运行结果

代码运行结果:

 产出脚本:

 create_partition_sql.sql

stg.stu_no_part.json 

 notepadt++ 格式化代码

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "000000",
                        "column": [{
                                "name": "id",
                                "type": "int"
                            }, {
                                "name": "name",
                                "type": "varchar"
                            }
                        ],
                        "splitPk": "",
                        "connection": [{
                                "table": ["stu"],
                                "jdbcUrl": ["jdbc:mysql://192.168.233.130:3306/test"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://192.168.233.130:8020",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/stg.db/stu_no_part/",
                        "column": [{
                                "name": "id",
                                "type": "bigint"
                            }, {
                                "name": "name",
                                "type": "string"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t"
                    }
                }
            }
        ]
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/816898.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【phaser微信抖音小游戏开发004】往画布上增加文本以及文本的操作

我们在states中创建st004.js的类,或者将states中的index.js直接重命名为st004.js,把里面的类名也修改为st004.如下图 在main.js中,引入st004,并设置启用的state为st004。如下图 接下来到states/st004里面,在create里面将文本修改一…

为什么不推荐用 index 做 key

之所以添加key属性,究其根本是因 diff算法。而在业务开发过程中特别是使用map, forEach 等遍历函数的时候往往随手就将index做为组件的key. 那么:key 到底有什么用? 当 Vue.js 用 v-for 正在更新已渲染过的元素列表时,它默认用就地复用策略 …

IP 工具

什么是IP 工具 IP 工具是用于轻松扫描和排除网络 IP 地址空间故障的网络工程工具。IP 工具使网络管理员能够审核、跟踪和监视 IP 地址、子网以及使用 IP 的设备和主机的性能。这个全面的网络工程工具集包括高级 IP 工具,如 Ping、系统资源管理器、MAC 地址解析器和…

看表情包学C语言 ——局部优先原则

🔗 【C语言趣味教程】专栏介绍👈 猛戳了解!!! Ⅰ. 作用域(Scope) 0x00 引入:什么是作用域? 变量和常量在程序中都是有作用范围的,这个范围我们称之为变量的 …

40k的offer拿到手!爽歪歪~

据说周一和就业喜报更配?快跟着我一起来看看2023上半年黑马软件测试学科的就业喜报: 从黑马软件测试学科的就业中,我们也能看到软件测试对于企业的重要性,一点也不比程序员差,他们拿到的薪资也能和程序员的高薪媲美&am…

Netty 执行了多次channelReadComplete()却没有执行ChannelRead()

[TOC](Netty 执行了多次channelReadComplete()) Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denpendies.…

JAVA的回调机制、同步/异步调用

一、同步调用 同步调用是最基本的调用方式。类A的a()方法调用类B的b()方法,类A的方法需要等到B类的方法执行完成才会继续执行。如果B的方法长时间阻塞,就会导致A类方法无法正常执行下去。 二、异步调用 如果A调用B,B的执行时间比较长&#…

【Git系列】Git概述

🐳Git概述 🧊1. Git发展历史🧊 2. Git与SVN的区别🧊3. Git本地结构🧊4. 代码托管中心🪟4.1 代码托管中心是什么?🪟4.2 托管中心种类 🧊1. Git发展历史 Git的发展历史可以…

容灾独家技术揭秘:HyperBDR无主机数据同步技术

01、一对一单机热备-传统灾备方式 单机热备是一种备份解决方案,它使用两台服务器来确保高可用性,是市场上最为常见的灾备模式。 在单机热备中,一台主服务器和一台备用服务器保持同步,以确保在主服务器出现故障或宕机时可以立即切换…

【Unity】超简单特效 - 烟雾

前言: 各式各样的制造工坊常常会出现在任意类型的游戏中,铁匠铺、车间、工业建筑等等,那么如何快速且简单的实现一款可复用的烟雾特效呢,先在脑海中想象一下我们生活里常见的烟雾吧。 初步实现: 在经过简单的想象以后…

TensorFlow项目练手(三)——基于GRU股票走势预测任务

项目介绍 项目基于GRU算法通过20天的股票序列来预测第21天的数据,有些项目也可以用LSTM算法,两者主要差别如下: LSTM算法:目前使用最多的时间序列算法,是一种特殊的RNN(循环神经网络)&#xf…

JDK 8.x 微服务启动JVM参数调优实战

微服务启动JVM参数调优实战 1.1 配置JVM启动参数1.2 解释1.3 JVM参数优化思路1.3.1 调整堆内存大小1.3.2 年轻代大小1.3.3 Metaspace 大小1.3.4 栈大小1.3.5 垃圾回收器选择1.3.6 垃圾回收参数1.3.7 预分配内存 1.3.8 禁用 ResizePLAB2. 常用JVM参数 1.1 配置JVM启动参数 服务…

每日一题——重建二叉树

重建二叉树 题目描述 给定节点数为 n 的二叉树的前序遍历和中序遍历结果,请重建出该二叉树并返回它的头结点。 例如输入前序遍历序列{1,2,4,7,3,5,6,8}和中序遍历序列{4,7,2,1,5,3,8,6},则重建出如下图所示。 提示: 1.vin.length pre.length 2.pre 和…

颠倒二进制位,颠倒给定的 32 位无符号整数的二进制位。

题记: 颠倒给定的 32 位无符号整数的二进制位。 提示: 请注意,在某些语言(如 Java)中,没有无符号整数类型。在这种情况下,输入和输出都将被指定为有符号整数类型,并且不应影响您的…

ChatPaper全流程加速科研:论文阅读+润色+优缺点分析与改进建议+审稿回复

项目设计集合(人工智能方向):助力新人快速实战掌握技能、自主完成项目设计升级,提升自身的硬实力(不仅限NLP、知识图谱、计算机视觉等领域):汇总有意义的项目设计集合,助力新人快速实…

惊喜!1行Python代码,瞬间测你工作量,分享一个统计代码行数的神器

大家好,这里是程序员晚枫。 **你想不想知道一个项目中,自己写了多少行代码?**我用今天的工具统计了一下开源项目:python-office的代码行数,竟然有21w行! 我们一起看一下怎么用最简单的方法,统…

《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(16)-Fiddler如何充当第三者再识AutoResponder标签-上

1.简介 Fiddler充当第三者,主要是通过AutoResponder标签在客户端和服务端之间,Fiddler抓包,然后改包,最后发送。AutoResponder这个功能可以算的上是Fiddler最实用的功能,可以让我们修改服务器端返回的数据&#xff0c…

Windows10系统还原操作

哈喽,大家好,我是雷工! 复制了下虚拟机的Win10系统,但其中有一些软件,想实现类似手机的格式化出厂操作,下面记录Windows10系统的还原操作。 一、系统环境: 虚拟机内的Windows10,64…

JavaWeb第三章:JavaScript的全面知识

目录 前言 一.JavaScript的简介 💖概念 💖学习内容 二.JavaScript的引入方式 💖内部脚本 💖外部脚本 三.JavaScript的基础语法 💖语法的书写 💖变量 ✨ 全局变量 ✨局部变量 ✨常量 &a…

vue表单筛选

目录 筛选 HTML scss* filterComp 排序 表格 自定义数据样式 inner-table 分页 删除 default-modal 自定义元素的插槽-占位符 .search-wrap {height: 60px;display: flex;align-items: center;overflow: hidden;padding: 0 20px;.selected-options-wrap {flex: 1;.…