目录
- 前言
- 封装代码
- 测试代码
- 参考
前言
协程异步操作MYSQL是常用的,博主这里在GitHub上找了两个包,databases和aiomysql,第一个包除了mysql外还支持其他的数据库,且操作MYSQL时底层也是使用的aiomysql,但文档内容比较少,所以选择了对aiomysql进行封装。使用
pip3 install aiomysql
安装即可。
封装代码
async_mysql.py
# coding:utf-8
import aiomysql
import traceback
class AsyncMySQLClient:
def __init__(self, host: str, user: str, password: str, db: str, port: int = 3306, loop=None):
self.host = host
self.port = port
self.user = user
self.password = password
self.db = db
self.loop = loop
self.pool = None
# 创建连接池
async def connect(self):
try:
self.pool = await aiomysql.create_pool(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
db=self.db,
loop=self.loop,
autocommit=True
)
except:
print(f"connect error:{traceback.format_exc()}")
# 关闭连接池
async def close(self):
self.pool.close()
await self.pool.wait_closed()
# 执行一条非查询类语句
async def execute(self, query:str, args:tuple=None) -> int:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, args)
return cur.rowcount
# 批量执行非查询类语句
async def executemany(self, query:str, args:list=None) -> int:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.executemany(query, args)
return cur.rowcount
# 查询单条数据
async def fetchone(self, query:str, args:tuple=None) -> dict:
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(query, args)
return await cur.fetchone()
# 查询多条数据
async def fetchall(self, query:str, args=None) -> list:
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(query, args)
return await cur.fetchall()
# 封装单条插入
async def insert(self, table: str, data: dict) -> int:
"""
:param table: 表名
:param data: 数据
:return:
"""
keys = ','.join(data.keys())
values = ','.join(['%s'] * len(data))
query = f'INSERT INTO {table} ({keys}) VALUES ({values})'
try:
return await self.execute(query, tuple(data.values()))
except:
print(f"execute {query} with {data} failed, error{traceback.format_exc()}")
return 0
# 封装批量插入
async def insert_many(self, table:str, data_list:list)->int:
"""
:param table: 表名
:param data_list: 数据列表
:return:
"""
keys = ','.join(data_list[0].keys())
values = ','.join(['%s'] * len(data_list[0]))
query = f'INSERT INTO {table} ({keys}) VALUES ({values})'
args = [tuple(data.values()) for data in data_list]
try:
return await self.executemany(query, args)
except:
print(f"execute {query} with {args} failed, error{traceback.format_exc()}")
return 0
非查询类的返回影响行数,查询类的返回数据,使用字典(或字典为元素的列表)。对于常用的插入和查询进行了封装,其他的使用execute函数即可。
使用时生成类的对象,之后connect,执行语句后close即可。
测试代码
import asyncio
import uuid
from async_mysql import AsyncMySQLClient
async def main():
cli = AsyncMySQLClient(host="127.0.0.1",user="root",password="12345678",db="test")
await cli.connect()
nums = await cli.insert(table="users",data={
"id": uuid.uuid4(),
"name": "lady_killer9"
})
print(f"插入{nums}条数据")
random_id = uuid.uuid4()
nums = await cli.insert_many(table="users",data_list=[{
"id": random_id,
"name": "lady_killer9"
}])
print(f"插入{nums}条数据")
user = await cli.fetchone(query="select * from users where id = %s",args=(random_id,))
print(f"查询到用户:{user}")
users = await cli.fetchall(query="select * from users where name = 'lady_killer9'")
print(f"所有用户:{users}")
num = await cli.execute(query="update users set name='lady_killer8' where id = %s",args=(random_id,))
if num == 1:
print("修改成功")
nums = await cli.execute(query="delete from users")
print(f"删除{nums}条数据")
await cli.close()
if __name__ == '__main__':
asyncio.run(main())
截图如下:
参考
databases
aiomysql
更多python相关内容,参考:【python总结】python学习框架梳理