psycopg2 使用dbutils 工具封装

news2025/1/22 12:27:10

1.什么是dbutils

Dbutils是一套工具,可为数据库提供可靠,持久和汇总的连接,该连接可在各种多线程环境中使用。

2.使用代码记录

db_config.py  数据库配置类:

# -*- coding: UTF-8 -*-
import psycopg2

# 数据库信息
DB_TEST_HOST = "dev-pg.test.xxx.cloud"
DB_TEST_PORT = 1921
DB_TEST_DBNAME = "check_db"
DB_TEST_USER = "check_db"
DB_TEST_PASSWORD = "okk3py2323gBaX111"

# 数据库连接编码
DB_CHARSET = "utf8"

# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 10

# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 10

# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 20

# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 100

# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True

# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0

# setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None

# creator : 使用连接数据库的模块
DB_CREATOR = psycopg2

2.封装sqlHelper.py

import sys
import db_config as config
import psycopg2.extras
from dbutils.pooled_db import PooledDB
import threading


class PsycopgConn:
    _instance_lock = threading.Lock()

    def __init__(self):
        self.init_pool()

    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, '_instance'):
            with PsycopgConn._instance_lock:
                if not hasattr(cls, '_instance'):
                    PsycopgConn._instance = object.__new__(cls)
                return PsycopgConn._instance

    def get_pool_conn(self):
        """
        获取连接池连接
        :return:
        """
        if not self._pool:
            self.init_pool()
        return self._pool.connection()

    def init_pool(self):
        """
        初始化连接池
        :return:
        """
        try:
            pool = PooledDB(
                creator=config.DB_CREATOR,  # 使用连接数据库的模块 psycopg2
                maxconnections=config.DB_MAX_CONNECYIONS,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
                mincached=config.DB_MIN_CACHED,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
                maxcached=config.DB_MAX_CACHED,  # 链接池中最多闲置的链接,0 和 None 不限制
                blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
                setsession=[],  # 开始会话前执行的命令列表
                host=config.DB_TEST_HOST,
                port=config.DB_TEST_PORT,
                user=config.DB_TEST_USER,
                password=config.DB_TEST_PASSWORD,
                database=config.DB_TEST_DBNAME)
            self._pool = pool
        except:
            print
            'connect postgresql error'
            self.close_pool()

    def close_pool(self):
        """
        关闭连接池连接
        :return:
        """
        if self._pool != None:
            self._pool.close()

    def SelectSql(self, sql):
        """
        查询
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)  # 设置返回格式为字典
            cursor.execute(sql)
            result = cursor.fetchall()
        except Exception as e:
            print('execute sql {0} is error'.format(sql))
            sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.close()
        return result

    def InsertSql(self, sql):
        """
        插入数据
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result = True
        except Exception as e:
            print('ERROR: execute  {0} causes error'.format(sql))
            sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            conn.close()
        return result

    def UpdateSql(self, sql):
        """
        更新数据
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result = True
        except Exception as e:
            print('ERROR: execute  {0} causes error'.format(sql))
            sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            conn.close()
        return result


if __name__ == "__main__":
    pgsql = PsycopgConn()
    # 打开一个文件
    with open('task.txt') as fr:
        # 读取文件所有行
        lines = fr.readlines()
    lines = [i.rstrip() for i in lines]
    list = []
    list.append("taskId,batchId\n")
    for taskId in lines:
        sql = "select task_id, batch_id  from ics_batch_info where batch_id=(select MAX(batch_id) from ics_batch_info WHERE task_id = '{taskId}')".format(
            taskId=taskId)
        print("执行sql: ", sql)
        result = pgsql.SelectSql(sql)
        if len(result) != 0:
            list.append(result[0] + "," + str(result[1]) + "\n")
            print(result)

    list[len(list) - 1] = list[len(list) - 1].rstrip();
    with open("最大批次查询结果.csv", 'w') as fw:
        fw.writelines(list)

    print("☺☺☺执行完毕☺☺☺")

验证执行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/920781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

0823|C++day3 类+this指针+类中的特殊成员函数(6种)

一、思维导图 二、作业 要求&#xff1a; 设计一个Per类。类中包含私有成员&#xff1a;姓名、年龄、指针成员身高、体重&#xff1b; 再设计一个Stu类&#xff0c;类中包含私有成员&#xff1a;成绩、Per类对象 p1&#xff1b; 设计这两个类的构造函数、析构函数和拷贝构造函数…

Spring练习30---用户列表的展示,其实用户列表的展示(上)

1、我们其实刚才分析了&#xff0c;用户表与表之间的关系是多对多&#xff0c;我们内部已经用到角色的信息了&#xff0c;那么在实体描述的&#xff0c;就得描述user实体和角色的关系 2、一个用户可以有多个角色 3、所以我在描述实体关系的时候&#xff0c;我咋写&#xff0c;…

玩转Mysql系列 - 第5天:DML操作汇总,确定你都会?

这是Mysql系列第5篇。 环境&#xff1a;mysql5.7.25&#xff0c;cmd命令中进行演示。 DML(Data Manipulation Language)数据操作语言&#xff0c;以INSERT、UPDATE、DELETE三种指令为核心&#xff0c;分别代表插入、更新与删除&#xff0c;是必须要掌握的指令&#xff0c;DML…

【王道-第三章-内存管理】

#pic_center R 1 R_1 R1​ R 2 R^2 R2 目录 知识框架No.1 内存的基础知识一、什么是内存&#xff1f;有何作用&#xff1f;二、内存单位三、指令的工作原理四、三种装入方式1、绝对装入2、可重定位装入3、动态运行时装入 五、从写程序到程序运行六、链接的三种方式七、总结 No.…

stm32之8.中断

&#xff08;Exceptions&#xff09;异常是导致程序流更改的事件&#xff0c;发生这种情况&#xff0c;处理器将挂起当前执行的任务&#xff0c;并执行程序的一部分&#xff0c;称之为异常处理函数。在完成异常处理程序的执行之后&#xff0c;处理器将恢复正常的程序执行&#…

Docker容器:dockerfile创建 LNMP 服务+Wordpress 网站平台

文章目录 一.环境及准备工作1.项目环境2.服务器环境3.任务需求 二.Linux 系统基础镜像三.docker构建Nginx1.建立工作目录上传安装包2.编写 Dockerfile 脚本3.准备 nginx.conf 配置文件4.生成镜像5.创建自定义网络6.启动镜像容器7.验证 nginx 四.docker构建Mysql1. 建立工作目录…

华为数通方向HCIP-DataCom H12-821题库(单选题:61-80)

第61题 关于 BGP 的Keepalive报文消息的描述,错误的是 A、Keepalive周期性的在两个BGP邻居之间发送 B、Keepalive报文主要用于对等路由器间的运行状态和链路的可用性确认 C、Keepalive 报文只包含一个BGP数据报头 D、缺省情况下,Keepalive 的时间间隔是180s 答案&#xff…

QFileSystemModel类和QStringListModel类

QFileSystemModel介绍 QFileSystemModel是Qt框架中的一个模型类&#xff0c;用于在Qt应用程序中表示本地文件系统的目录结构。它提供了一种方便的方式来访问和操作文件系统中的文件和目录。下面是对QFileSystemModel的详细介绍&#xff1a; 目录结构的表示&#xff1a;QFileSy…

矩阵与图的关系:矩阵是图,图是矩阵

原文连接 线性代数最被低估的一个事实&#xff1a;矩阵是图&#xff0c;图是矩阵。 将矩阵编码为图是一种取巧的行为(cheat code)&#xff0c;它其使复杂的行为变得易于研究。 让我告诉你怎么做&#xff01; 1. 非负矩阵的有向图 &#xff08;The directed graph of a nonne…

ChatGPT影响大学生思想行为模式的三个维度

ChatGPT作为新一代AI技术的代表&#xff0c;深刻嵌入并影响着大学生的日常学习和生活场景&#xff0c;其在提升学习研究效率、拓宽认知阈限、重塑人机互动模式等方面带来极大突破&#xff0c;也会对大学生的思想行为模式产生潜在的影响&#xff0c;这些影响可以从个体、关系与社…

Linux设置虚拟内存扩容-偷鸡省钱小技巧-消耗服务器的存储内存转换成运行内存-之强行突破境界

阿丹&#xff1a; 这一段时间各大运营商都在相互内卷&#xff0c;趁着这股劲拿下了几个服务器。因为降本升效&#xff08;囊中羞涩&#xff09;的宗旨&#xff0c;买的服务器的内存并没有那么大所以偷鸡技巧这就来了。 设置虚拟内容-让4G内存up!up!up!到更高&#xff01; 操作…

自己实现 SpringMVC 底层机制 系列之-实现任务阶段 7- 完成简单视图解析

&#x1f600;前言 自己实现 SpringMVC 底层机制 系列之-实现任务阶段 7- 完成简单视图解析 &#x1f3e0;个人主页&#xff1a;尘觉主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是尘觉&#xff0c;希望我的文章可以帮助到大家&#xff0c;您的满意是我的动力…

SAP VK11/VK12 创建/更新价格记录

1、事务码VK11&#xff0c;创建物料价格 例如&#xff0c;客户10001&#xff0c;物料200001&#xff0c;价格120&#xff0c;有效期是2023.08.23-9999.12.31 现在有活动&#xff0c;在2023.08.23到2023.08.31想要维护一个活动价格100&#xff0c;而2023.09.01到9999.12.31还是…

Java开发中非常好用的工具

一、项目工具 1.1 IDE 主流的 Java 开发工具现在非 IntelliJ IDEA 莫属。前几年&#xff0c;可能 Eclipse 还能和 IDEA 一争高下&#xff0c;到了现在已经基本是 IDEA 的天下了。 就拿我自己来说吧&#xff0c;我最早用 IDEA&#xff0c;后来用了几年 Eclipse&#xff0c;再后…

EasyExcel实现多sheet文件导出

文章目录 EasyExcel引入依赖表结构学生表课程表教师表 项目结构下载模板实体类StudentVoCourseVoTeacherVo ControllerServiceEasyExcelServiceStudentServiceCourseServiceTeacherService ServiceImplEasyExcelServiceImplStudentServiceImplCourseServiceImplTeacherServiceI…

嵌入式是假风口?其实是你不够强!

嵌入式系统&#xff0c;作为一种集成电路技术的应用&#xff0c;近年来备受瞩目。然而&#xff0c;有人质疑嵌入式系统是否真的有那么大的市场前景&#xff0c;甚至认为这只是一个假风口。但实际上&#xff0c;嵌入式系统的发展潜力远不止于此&#xff0c;关键在于个人实力的提…

PI SSL证书导入

导语&#xff1a;最近在通过PI系统&#xff0c;做REST接口&#xff0c;对方地址是https的&#xff0c;调用时出现错误&#xff1a;【适配器框架出现异常: iaik.security.ssl.SSLCertificateException; Peer certificate relected by ChainVerifie】&#xff0c;此问题是因为这个…

SIP播放解码器

SIP-7101 SIP播放解码器 一、描述 SIP-7101是我司的一款壁挂式SIP网络播放终端&#xff0c;具有10/100M以太网接口&#xff0c;配置一路继电器输出和一路线路输出&#xff0c;可将内部音源输出到外接功放&#xff0c;可实现广播播放功能。SIP-7101作为网络SIP系统的播放终端&…

在Linux上安装redis7

1.检测虚拟机环境 1.1 bit检测命令&#xff1a;getconf LONG_BIT&#xff08;建议使用64bit做开发&#xff09; 1.2 gcc环境检测&#xff1a;gcc -v 如果不具备gcc环境&#xff0c;则使用yum -y install gcc- c命令进行c环境的安装 2.开始安装 2.1 下载redis&#xff1a; 进…

7、Vue 核心技术与实战 day07

1.1 vuex概述 1.2 构建 vuex [多组件数据共享] 环境 1.创建项目 vue create vuex-demo2.创建三个组件, 目录如下 |-components |--Son1.vue |--Son2.vue |-App.vue3.源代码如下 App.vue在入口组件中引入 Son1 和 Son2 这两个子组件 <template><div id"app&qu…