psycopg2 介绍
psycopg2库介绍: Psycopg2是一个用于Python编程语言的第三方库,用于访问PostgreSQL数据库系统。它提供了一组工具和方法,可以轻松地在Python程序中进行数据库操作,包括查询、插入、更新、删除等操作。
以下是Psycopg2库的一些主要特点:
- 简单易用: Psycopg2提供了简单的API,易于学习和使用。
- 高性能: Psycopg2是基于C语言实现的,能够提供高效的数据库操作。
- 完全兼容: Psycopg2与PostgreSQL数据库完全兼容,并支持大多数PostgreSQL特性。
- 安全性: Psycopg2具有内置的防止SQL注入攻击的功能,能够保证数据安全。
- 使用Psycopg2库进行数据库操作通常需要以下步骤:
- 安装psycopg2库:可以使用pip install psycopg2来安装该库。
- 建立数据库连接:使用psycopg2库提供的connect()方法建立与数据库的连接。
- 执行SQL语句:使用psycopg2提供的方法执行SQL语句,如查询、插入、更新等操作。
- 处理查询结果:如果执行的是查询操作,需要使用fetchone()或fetchall()方法来处理查询结果。
- 关闭连接:最后需要使用close()方法关闭数据库连接。
以下为封装后的utils工具,可直接在项目中使用:
1.数据库配置类 config.py
# prod 环境配置
# POSTGRE_HOST = "samrt.prod.xxx.cloud"
# POSTGRE_PORT = 1921
# POSTGRE_USER = "check_prod"
# POSTGRE_DATABASE = "check_prod"
# POSTGRE_PASSWORD = "VLjxs0K8888BMWh"
# dev环境配置
POSTGRE_HOST = "smart.dev.xxx.cloud"
POSTGRE_PORT = 1921
POSTGRE_USER = "check_dev"
POSTGRE_DATABASE = "check_dev"
POSTGRE_PASSWORD = "Ku3py19822gBaXsNW"
2.封装的通用数据库操作类 PostgreSqlPool.py
from psycopg2 import pool
from config import *
class PostgreSqlTool(object):
def __init__(self):
try:
self.connectPool = pool.ThreadedConnectionPool(2, 10, host=POSTGRE_HOST, port=POSTGRE_PORT,
user=POSTGRE_USER, password=POSTGRE_PASSWORD,
database=POSTGRE_DATABASE, keepalives=1,
keepalives_idle=30, keepalives_interval=10,
keepalives_count=5)
except Exception as e:
print(e)
def getConnect(self):
conn = self.connectPool.getconn()
cursor = conn.cursor()
return conn, cursor
def closeConnect(self, conn, cursor):
cursor.close()
self.connectPool.putconn(conn)
def closeAll(self):
self.connectPool.closeall()
# 执行增删改
def execute(self, sql, value=None):
conn, cursor = self.getConnect()
try:
res = cursor.execute(sql, value)
conn.commit(conn, cursor)
self.closeConnect(conn, cursor)
return res
except Exception as e:
conn.rollock()
raise e
def selectOne(self, sql):
conn, cursor = self.getConnect()
cursor.execute(sql)
result = cursor.fetchone()
self.closeConnect(conn, cursor)
return result
def selectAll(self, sql):
conn, cursor = self.getConnect()
cursor.execute(sql)
result = cursor.fetchall()
self.closeConnect(conn, cursor)
return result
3. 实际使用操作sql类
读取本地task.txt 中的任务号,查询数据库,将查询结果写到最大批次查询结果.csv文件中
# coding=utf-8
import PostgreSqlPool
if __name__ == "__main__":
pgsql = PostgreSqlPool.PostgreSqlTool()
# 打开一个文件
with open('task.txt') as fr:
# 读取文件所有行
lines = fr.readlines()
lines = [i.rstrip() for i in lines]
list = []
list.append("taskId,batchId\n")
for taskId in lines:
sql = "select task_id, batch_id from ics_batch_info where batch_id=(select MAX(batch_id) from ics_batch_info WHERE task_id = '{taskId}')".format(
taskId=taskId)
print("执行sql: ", sql)
result = pgsql.selectOne(sql)
if result is not None:
list.append(result[0] + "," + str(result[1]) + "\n")
print(result)
list[len(list) - 1] = list[len(list) - 1].rstrip();
with open("最大批次查询结果.csv", 'w') as fw:
fw.writelines(list)
print("☺☺☺执行完毕☺☺☺")
读取文件和导出csv文件均在本目录下: