postgresql|数据库|利用sqlparse和psycopg2库批量按顺序执行SQL语句(psyconpg2新优化版本)

news2025/1/13 23:08:26

一、

旧版批量执行SQL脚本的python文件缺点,优点,以及更新内容

书接上回,postgresql|数据库开发|python的psycopg2库按指定顺序批量执行SQL文件(可离线化部署)_python sql psycopg2-CSDN博客

这个python脚本写了很久了,最近开始实际使用,发现了很多问题,问题主要集中在以下几点:

1、

SQL语句解析不太标准,遇到;分号就也当SQL语句执行,导致很多不必要的错误,并且很多时候不能有效区分多行SQL,因此,本文计划使用sqlparse库来做更准确的SQL语句解析

2、

批量的SQL文件跑完后,并没有一个比较详细的总结报告,遇到问题不太好排查,因此,本文对此做了优化,当一个目录内的SQL文件都执行完毕后,给一个相对详细的报告

确保即使某些 SQL 文件执行失败,程序也会继续尝试执行其余的文件,并最终给出一个详细的执行报告,帮助用户了解哪些文件和语句被执行以及哪些文件遇到了问题。这样,用户不仅能知道哪些文件未能成功处理,还能清楚地看到哪些文件及其内部的语句已经成功应用到了数据库中。

3、

SQL文件排序使用正则表达式处理,以改善SQL文件排序有时候不正确的问题

4、

所有的指定目录下的SQL文件都执行,但,执行失败的SQL文件仍留在原文件夹,需要将失败的SQL文件拿出来,手动处理有问题的SQL文件

5、

SQL语句执行输出太多,成功的SQL语句不应该输出到控制台,导致脚本执行情况并不是一目了然,因此,对脚本输出进行优化,以方便SQL脚本的调试

优点:

1、

以SQL文件为单位,每一个SQL文件内的SQL语句要么全部执行成功,如果中间有任何错误就回滚,对于数据库的数据安全是有一定的保障的

2、

该工具执行迅速,效率非常高,但对于锁表的情况,现在暂时没有什么想法

二、

优化后的python脚本源码

import os
import re
import sys
import json
import shutil
import psycopg2
from psycopg2 import sql, OperationalError, ProgrammingError
import sqlparse

def print_colored_text(text, color_code):
    """打印带有颜色的文本"""
    print(f"\033[{color_code}m{text}\033[0m")

# 定义颜色代码
COLORS = {
    'BLACK': 30,
    'RED': 31,
    'GREEN': 32,
    'YELLOW': 33,
    'BLUE': 34,
    'MAGENTA': 35,
    'CYAN': 36,
    'WHITE': 37
}

def find_sql_files(path):
    """查找指定路径下的所有 .sql 文件,并按文件名中第一个出现的数字升序排序返回列表"""
    sql_files = [
        os.path.join(root, file)
        for root, _, files in os.walk(path)
        for file in files
        if file.endswith('.sql')
    ]

    def extract_first_number(filename):
        """从文件名中提取第一个出现的数字序列并转换为整数,用于排序"""
        match = re.search(r'\d+', os.path.basename(filename))
        return int(match.group()) if match else float('inf')  # 如果没有匹配到数字,则排到最后

    # 使用 sorted 函数并指定 key 参数来实现升序排序,仅依据第一个数字
    sorted_sql_files = sorted(sql_files, key=extract_first_number)

    return sorted_sql_files
def execute_sql_file(db_config, sql_file_path, script_dir):
    """执行指定的 SQL 文件,并在遇到错误时移动文件"""
    conn = None
    cursor = None
    successful_statements = []  # 新增: 记录成功的SQL语句
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        with open(sql_file_path, 'r', encoding='utf-8') as file:
            parsed_statements = sqlparse.split(file.read())
            for raw_stmt in parsed_statements:
                stmt = raw_stmt.strip()
                if not stmt:  # 忽略空语句
                    continue
                
                statements = sqlparse.parse(stmt)
                for statement in statements:
                    if statement.tokens:  # 确保有非空的SQL语句
                        stmt_str = str(statement).strip(';').strip()
                        if stmt_str:  # 忽略空语句
                           # print(stmt_str)
                           # print_colored_text('<<<<<<<=======<<<<<++++++++<<<<<<上面这条语句将要执行了===----========*****', COLORS['CYAN'])
                            try:
                                cursor.execute(stmt_str)
                                successful_statements.append(stmt_str)  # 添加到成功语句列表
                                #print_colored_text('执行成功!', COLORS['GREEN'])
                            except (Exception, psycopg2.DatabaseError) as e:
                                print(f"执行失败的语句是:")
                                print(f"{stmt_str}")
                                print("=========这是第一个分隔符===================")
                                print("报错详细信息是:")
                                print(e)
                                print("===========这是第二个分隔符===================")
                                print(f"执行的文件名称是:")
                                print(f"{sql_file_path}")
                                conn.rollback()  # 回滚事务
                                move_failed_file(sql_file_path, script_dir)
                                return False, successful_statements
        conn.commit()
        print_colored_text(f"Executed SQL file successfully: {sql_file_path}", COLORS['RED'])
        return True, successful_statements
    except (OperationalError, ProgrammingError) as error:
        print(f"Database error occurred while executing SQL file {sql_file_path}: {error}")
        if conn:
            conn.rollback()
        return False, successful_statements
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

def move_failed_file(src, dst_dir):
    """将失败的SQL文件移动到指定的目标目录"""
    try:
        os.makedirs(dst_dir, exist_ok=True)
        dst = os.path.join(dst_dir, os.path.basename(src))
        print(f"Moving failed SQL file to {dst}")
        shutil.move(src, dst)
    except Exception as e:
        print(f"Failed to move the file {src}: {e}")

def main():
    if len(sys.argv) != 3:
        print("Usage: python script.py <user> <path_to_search>")
        sys.exit(1)

    user = sys.argv[1]
    search_path = sys.argv[2]
    script_dir = os.path.dirname(os.path.abspath(__file__))  # 获取脚本所在目录

    try:
        with open('test.json', 'r', encoding='utf-8') as f:
            params = json.load(f)
            db_config = params.get('db_config', {})
            required_keys = ['dbname', 'user', 'password', 'host', 'port']
            if not all(key in db_config for key in required_keys):
                print("Error: Missing required database configuration in test.json")
                sys.exit(1)
            db_config['user'] = user
    except FileNotFoundError:
        print("Error: test.json not found")
        sys.exit(1)
    except json.JSONDecodeError:
        print("Error: test.json is not a valid JSON file")
        sys.exit(1)

    sql_files = list(find_sql_files(search_path))
    print(f"Found {len(sql_files)} SQL files: {sql_files}")
    print_colored_text(f'|||||二十秒后开始执行{sql_files},以打印出来的顺序依次执行SQL文件|||||||||||', COLORS['MAGENTA'])

    failed_files = []
    executed_statements = []
    successful_files = []  # 新增: 记录成功的SQL文件

    if sql_files:
        for sql_file in sql_files:
            print_colored_text(f'这个文件将要执行: {sql_file}\n\n', COLORS['YELLOW'])
            success, statements = execute_sql_file(db_config, sql_file, script_dir)
            if not success:
                print(f"Failed to execute SQL file: {sql_file}. Continuing with the next file...")
                failed_files.append(sql_file)
            else:
                executed_statements.extend(statements)
                successful_files.append(sql_file)  # 添加到成功文件列表
        # 打印总结信息
        if failed_files:
            print_colored_text("\nThe following SQL files failed to execute:", COLORS['RED'])
            for failed_file in failed_files:
                print(f"- {failed_file}")
        else:
            print_colored_text("\nAll SQL files executed successfully.", COLORS['GREEN'])
#        print_colored_text("\nThe following SQL statements were executed successfully:", COLORS['GREEN'])
#        for stmt in executed_statements:
#            print(f"- {stmt}")
        print_colored_text("\nThe following SQL files were executed successfully:", COLORS['GREEN'])
        for successful_file in successful_files:
            print(f"- {successful_file}")

    else:
        print("No SQL files found in the specified path.")

if __name__ == "__main__":
    main()

该python脚本依赖于python3环境,libpq.so,pyscopg库,sqlparse库,这些库什么的都可以离线安装,相关文件都已放到百度网盘内了

通过网盘分享的文件:批量执行SQL语句项目
链接: https://pan.baidu.com/s/1zCAL78hp2-92NdjIHCjM0w?pwd=gkw1 提取码: gkw1 

 python3.zip 文件里都是rpm包,适用于centos7,解压后,怎么安装就不在这多说了
其中,里面的sqlpaser.tar.gz 需要解压到/usr/local/lib/python3.6目录下,如果没有python3.6目录,建立即可

psycopg2.gz这个文件里的内容解压到/usr/lib64/python3.6目录下

解压完毕后,需要执行python3 -V 命令,以激活sqlparse库

最终目录如下所示即可:

[root@centos7 ~]# ls /usr/lib64/python3.6/site-packages/
psycopg2  psycopg2-2.8.6-py3.6.egg-info  __pycache__  README.txt
[root@centos7 ~]# ls /usr/local/lib/python3.6/site-packages/
sqlparse  sqlparse-0.4.4.dist-info

三、

重点代码解析

1、

代码编程结构

本次代码编写仍然是延续上个版本,主要功能封装为方法,在main方法统一集中调用

主要是一个main主方法+4个功能方法,分别是控制台颜色渲染方法,SQL文件搜寻方法,SQL文件调用psyconpg2,sqlparse库逐行执行SQL语句方法,移动执行失败SQL文件到脚本当前目录方法,整体调用main方法

2、

控制台颜色渲染方法

该方法主要是使用了字典,形参调用形式,例如该方法的调用:

print_colored_text(f"Executed SQL file successfully: {sql_file_path}", COLORS['RED'])

没什么好说的,主要就是字典的应用是难点 

3、

SQL文件搜寻方法

def find_sql_files(path):
    """查找指定路径下的所有 .sql 文件,并按文件名中第一个出现的数字升序排序返回列表"""
    sql_files = [
        os.path.join(root, file)
        for root, _, files in os.walk(path)
        for file in files
        if file.endswith('.sql')
    ]

    def extract_first_number(filename):
        """从文件名中提取第一个出现的数字序列并转换为整数,用于排序"""
        match = re.search(r'\d+', os.path.basename(filename))
        return int(match.group()) if match else float('inf')  # 如果没有匹配到数字,则排到最后

    # 使用 sorted 函数并指定 key 参数来实现升序排序,仅依据第一个数字
    sorted_sql_files = sorted(sql_files, key=extract_first_number)

    return sorted_sql_files

这一段代码难点主要在方法嵌套,首先迭代寻找指定的路径下所有的SQL文件,然后通过子方法,调用正则表达式重新排序,主要是以数字开始的文件

这里需要注意,搜寻到的SQL文件是返回一个列表

4、

SQL文件调用psyconpg2,sqlparse库逐行执行SQL语句方法

该方法主要是业务逻辑实现,主要逻辑是以单个SQL文件为整体事务,如果某个SQL文件有错误,事务回滚,并调用移动执行失败SQL文件到脚本当前目录方法;如果该SQL文件顺利执行完成,则提交事务

这样做的目的是保护数据库,避免不必要的数据混乱

无论如何,当一个SQL文件执行完毕,都会将数据库连接关闭,游标关闭

编写的时候考虑了一下,还是需要捕获错误并将详细错误信息打印,并收集失败的SQL文件和成功的SQL文件,在main方法内将要调用这两个列表,list

5、

移动执行失败SQL文件到脚本当前目录方法

该方法没什么特别的,主要是调用此方法的形式,在main方法内,这里比较绕,当时编写的时候考虑了很久

6、

main方法

整体逻辑封装都在此方法内

    user = sys.argv[1]
    search_path = sys.argv[2]
    script_dir = os.path.dirname(os.path.abspath(__file__))  # 获取脚本所在目录

__file__ 是一个非常有用的内置变量,它帮助你在编写脚本时能够动态地确定文件的位置,而不需要硬编码路径。这对于提高代码的可移植性和维护性非常有帮助。如果你在开发过程中遇到需要根据脚本位置来定位其他文件的情况,说实话这个变量我是差点给忘记掉的

那么,为什么要有user这个变量呢?考虑到很多时候并不是有高权限的数据库账号

其它的就没什么好说的了

四、

数据库信息文件json

有需要实践的同学按实际情况填写此json文件

{
    "db_config": {
        "dbname": "postgres",
        "user": "postgres",
        "password": "xxxxx",
        "host": "192.168.xxx.xx",
        "port": "1543"
    }
}

四、

执行方式和执行结果示例

[root@centos7 ~]# python3 test7.py postgres ./
Found 2 SQL files: ['./12323.sql', './33333.sql']
|||||二十秒后开始执行['./12323.sql', './33333.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./12323.sql
Moving failed SQL file to /root/12323.sql
Failed to execute SQL file: ./12323.sql. Continuing with the next file...
这个文件将要执行: ./33333.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./33333.sql
Moving failed SQL file to /root/33333.sql
Failed to execute SQL file: ./33333.sql. Continuing with the next file...

The following SQL files failed to execute:
- ./12323.sql
- ./33333.sql

The following SQL files were executed successfully:

全失败的:

有失败有成功的情况:

[root@centos7 ~]# python3 test7.py postgres ./
Found 2 SQL files: ['./12323.sql', './33333.sql']
|||||二十秒后开始执行['./12323.sql', './33333.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


Executed SQL file successfully: ./12323.sql
这个文件将要执行: ./33333.sql


执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.

===========这是第二个分隔符===================
执行的文件名称是:
./33333.sql
Moving failed SQL file to /root/33333.sql
Failed to execute SQL file: ./33333.sql. Continuing with the next file...

The following SQL files failed to execute:
- ./33333.sql

The following SQL files were executed successfully:
- ./12323.sql

全部成功的情况:

[root@centos7 ~]# python3 test7.py postgres ./
Found 1 SQL files: ['./12323.sql']
|||||二十秒后开始执行['./12323.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


Executed SQL file successfully: ./12323.sql

All SQL files executed successfully.

The following SQL files were executed successfully:
- ./12323.sql

 

🆗,这个SQL文件批量执行的小工具就介绍到这了,欢迎各位大拿指正错误!!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2276191.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5个不同类型的数据库安装

各种社区版本下载官方地址&#xff1a;MySQL :: MySQL Community Downloads 一、在线YUM仓库&#xff08;Linux&#xff09; 选择 MySQL Yum Repository 选择对应版本下载仓库安装包&#xff08;No thanks, just start my download.&#xff09; 下载方法1&#xff1a;下载到本…

shell基础使用及vim的常用快捷键

一、shell简介 参考博文1 参考博文2——shell语法及应用 参考博文3——vi的使用 在linux中有很多类型的shell&#xff0c;不同的shell具备不同的功能&#xff0c;shell还决定了脚本中函数的语法&#xff0c;Linux中默认的shell是 / b in/ b a s h &#xff0c;流行的shell…

Spring Data Elasticsearch简介

一、Spring Data Elasticsearch简介 1 SpringData ElasticSearch简介 Elasticsearch是一个实时的分布式搜索和分析引擎。它底层封装了Lucene框架,可以提供分布式多用户的全文搜索服务。 Spring Data ElasticSearch是SpringData技术对ElasticSearch原生API封装之后的产物,它通…

【巨实用】Git客户端基本操作

本文主要分享Git的一些基本常规操作&#xff0c;手把手教你如何配置~ ● 一个文件夹中初始化Git git init ● 为了方便以后提交代码需要对git进行配置&#xff08;第一次使用或者需求变更的时候&#xff09;&#xff0c;告诉git未来是谁在提交代码 git config --global user.na…

有收到腾讯委托律师事务所向AppStore投诉带有【水印相机】主标题名称App的开发者吗

近期&#xff0c;有多名开发者反馈&#xff0c;收到来自腾讯科技 (深圳) 有限公司委托北京的一家**诚律师事务所卞&#xff0c;写给AppStore的投诉邮件。 邮件内容主要说的是&#xff0c;腾讯注册了【水印相机】这四个字的商标&#xff0c;所以你们这些在AppStore上的app&…

导出文件,能够导出但是文件打不开

背景&#xff1a; 在项目开发中&#xff0c;对于列表的查询&#xff0c;而后会有导出功能&#xff0c;这里导出的是一个excell表格。实现了两种&#xff0c;1.导出的文件&#xff0c;命名是前端传输过去的&#xff1b;2.导出的文件&#xff0c;命名是根据后端返回的文件名获取的…

Redis 源码分析-内部数据结构 dict

Redis 源码分析-内部数据结构 dict 在上一篇 Redis 数据库源码分析 提到了 Redis 其实用了全局的 hash 表来存储所有的键值对&#xff0c;即下方图示的 dict&#xff0c;dict 中有两个数组&#xff0c;其中 ht[1] 只在 rehash 时候才真正用到&#xff0c;平时都是指向 null&am…

010:传统计算机视觉之大津算法初探

本文为合集收录&#xff0c;欢迎查看合集/专栏链接进行全部合集的系统学习。 合集完整版请参考这里。 上一节学习了利用 Canny 算法来完成一个图片的边缘检测&#xff0c;从而可以区分出图像的边缘。 本节再了解一个计算机视觉中更常见的应用&#xff0c;那就是把图片的前景和…

使用Cilium/eBPF实现大规模云原生网络和安全

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 目录 抽象 1 Trip.com 云基础设施 1.1 分层架构 1.2 更多细节 2 纤毛在 Trip.com 2.1 推出时间表 2.2 自定义 2.3 优化和调整 2.3.1 解耦安装 2.3.2 避免重试/重启风暴 2.3.3 稳定性优先 2…

怎么把word试题转成excel?

在教育行业、学校管理以及在线学习平台中&#xff0c;试题库的高效管理是一项核心任务。许多教育工作者和系统开发人员常常面临将 Word 中的试题批量导入 Excel 的需求。本文将详细介绍如何快速将试题从 Word 转换为 Excel&#xff0c;帮助您轻松解决繁琐的数据整理问题&#x…

css盒子水平垂直居中

目录 1采用flex弹性布局&#xff1a; 2子绝父相margin&#xff1a;负值&#xff1a; 3.子绝父相margin:auto&#xff1a; 4子绝父相transform&#xff1a; 5通过伪元素 6table布局 7grid弹性布局 文字 水平垂直居中链接&#xff1a;文字水平垂直居中-CSDN博客 以下为盒子…

Spring 项目 基于 Tomcat容器进行部署

文章目录 一、前置知识二、项目部署1. 将写好的 Spring 项目先打包成 war 包2. 查看项目工件&#xff08;Artifact&#xff09;是否存在3. 配置 Tomcat3.1 添加一个本地 Tomcat 容器3.2 将项目部署到 Tomcat 4. 运行项目 尽管市场上许多新项目都已经转向 Spring Boot&#xff0…

【Web安全】SQL 注入攻击技巧详解:UNION 注入(UNION SQL Injection)

【Web安全】SQL 注入攻击技巧详解&#xff1a;UNION 注入&#xff08;UNION SQL Injection&#xff09; 引言 UNION注入是一种利用SQL的UNION操作符进行注入攻击的技术。攻击者通过合并两个或多个SELECT语句的结果集&#xff0c;可以获取数据库中未授权的数据。这种注入技术要…

docker安装rabbit后访问报错最佳的几种解决方案

错误通常是由于RabbitMQ的安全配置导致的&#xff0c;RabbitMQ默认配置允许的用户仅能通过localhost访问。这通常出现在RabbitMQ的guest用户上&#xff0c;guest用户默认只能从localhost登录&#xff0c;而无法从其他IP地址进行远程访问。 解决方法&#xff1a; 1. **创建一个…

计科高可用服务器架构实训(防火墙、双机热备,VRRP、MSTP、DHCP、OSPF)

一、项目介绍 需求分析&#xff1a; &#xff08;1&#xff09;总部和分部要求网络拓扑简单&#xff0c;方便维护&#xff0c;网络有扩展和冗余性&#xff1b; &#xff08;2&#xff09;总部分财务部&#xff0c;人事部&#xff0c;工程部&#xff0c;技术部&#xff0c;提供…

spark汇总

目录 描述运行模式1. Windows模式代码示例 2. Local模式3. Standalone模式 RDD描述特性RDD创建代码示例&#xff08;并行化创建&#xff09;代码示例&#xff08;读取外部数据&#xff09;代码示例&#xff08;读取目录下的所有文件&#xff09; 算子DAGSparkSQLSparkStreaming…

Linux - 什么是线程和线程的操作

线程概念 什么是线程: 线程&#xff08;Thread&#xff09;是操作系统能够进行运算调度的最小单位. 它被包含在进程之中, 是进程中的实际运作单位. 一个进程可以包含多个线程. 进程 : 线程 1 : n (n > 1). 进程是系统分配资源的基本单位. 线程则是系统调度的基本单位. 在…

基于YOLOv8的高空无人机小目标检测系统(python+pyside6界面+系统源码+可训练的数据集+也完成的训练模型

目标检测系统【环境搭建过程】&#xff08;GPU版本&#xff09;-CSDN博客 摘要 本文提出了一种基于YOLOv8算法的高空无人机小目标检测系统&#xff0c;利用VisDrone数据集中的7765张图片&#xff08;6903张训练集&#xff0c;862张验证集&#xff09;进行模型训练&#xff0c;…

apollo内置eureka dashboard授权登录

要确保访问Eureka Server时要求输入账户和密码&#xff0c;需要确保以下几点&#xff1a; 确保 eurekaSecurityEnabled 配置为 true&#xff1a;这个配置项控制是否启用Eureka的安全认证。如果它被设置为 false&#xff0c;即使配置了用户名和密码&#xff0c;也不会启用安全认…

一学就废|Python基础碎片,文件读写

文件处理是指通过编程接口对文件执行诸如创建、打开、读取、写入和关闭等操作的过程。它涉及管理程序与存储设备上的文件系统之间的数据流&#xff0c;确保数据得到安全高效的处理。 Python 中的文件模式 打开文件时&#xff0c;我们必须指定我们想要的模式&#xff0c;该模式…