python-mysql协程并发常用操作封装

news2024/9/22 17:22:32

目录

  • 前言
  • 封装代码
  • 测试代码
  • 参考


前言

协程异步操作MYSQL是常用的,博主这里在GitHub上找了两个包,databases和aiomysql,第一个包除了mysql外还支持其他的数据库,且操作MYSQL时底层也是使用的aiomysql,但文档内容比较少,所以选择了对aiomysql进行封装。使用

pip3 install aiomysql

安装即可。

封装代码

async_mysql.py

# coding:utf-8
import aiomysql
import traceback


class AsyncMySQLClient:
    def __init__(self, host: str, user: str, password: str, db: str, port: int = 3306, loop=None):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.db = db
        self.loop = loop
        self.pool = None

    # 创建连接池
    async def connect(self):
        try:
            self.pool = await aiomysql.create_pool(
                host=self.host,
                port=self.port,
                user=self.user,
                password=self.password,
                db=self.db,
                loop=self.loop,
                autocommit=True
            )
        except:
            print(f"connect error:{traceback.format_exc()}")

    # 关闭连接池
    async def close(self):
        self.pool.close()
        await self.pool.wait_closed()

    # 执行一条非查询类语句
    async def execute(self, query:str, args:tuple=None) -> int:
        async with self.pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query, args)
                return cur.rowcount

    # 批量执行非查询类语句
    async def executemany(self, query:str, args:list=None) -> int:
        async with self.pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.executemany(query, args)
                return cur.rowcount

    # 查询单条数据
    async def fetchone(self, query:str, args:tuple=None) -> dict:
        async with self.pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(query, args)
                return await cur.fetchone()

    # 查询多条数据
    async def fetchall(self, query:str, args=None) -> list:
        async with self.pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(query, args)
                return await cur.fetchall()

    # 封装单条插入
    async def insert(self, table: str, data: dict) -> int:
        """
        :param table: 表名
        :param data: 数据
        :return:
        """
        keys = ','.join(data.keys())
        values = ','.join(['%s'] * len(data))
        query = f'INSERT INTO {table} ({keys}) VALUES ({values})'
        try:
            return await self.execute(query, tuple(data.values()))
        except:
            print(f"execute {query} with {data} failed, error{traceback.format_exc()}")
            return 0

    # 封装批量插入
    async def insert_many(self, table:str, data_list:list)->int:
        """
        :param table: 表名
        :param data_list: 数据列表
        :return:
        """
        keys = ','.join(data_list[0].keys())
        values = ','.join(['%s'] * len(data_list[0]))
        query = f'INSERT INTO {table} ({keys}) VALUES ({values})'
        args = [tuple(data.values()) for data in data_list]
        try:
            return await self.executemany(query, args)
        except:
            print(f"execute {query} with {args} failed, error{traceback.format_exc()}")
            return 0

非查询类的返回影响行数,查询类的返回数据,使用字典(或字典为元素的列表)。对于常用的插入和查询进行了封装,其他的使用execute函数即可。
使用时生成类的对象,之后connect,执行语句后close即可。

测试代码

import asyncio
import uuid
from async_mysql import AsyncMySQLClient


async def main():
   cli =  AsyncMySQLClient(host="127.0.0.1",user="root",password="12345678",db="test")
   await cli.connect()
   nums = await cli.insert(table="users",data={
       "id": uuid.uuid4(),
       "name": "lady_killer9"
   })
   print(f"插入{nums}条数据")
   random_id = uuid.uuid4()
   nums = await cli.insert_many(table="users",data_list=[{
       "id": random_id,
       "name": "lady_killer9"
   }])
   print(f"插入{nums}条数据")
   user = await cli.fetchone(query="select * from users where id = %s",args=(random_id,))
   print(f"查询到用户:{user}")
   users = await cli.fetchall(query="select * from users where name = 'lady_killer9'")
   print(f"所有用户:{users}")
   num = await cli.execute(query="update users set name='lady_killer8' where id = %s",args=(random_id,))
   if num == 1:
       print("修改成功")
   nums = await cli.execute(query="delete from users")
   print(f"删除{nums}条数据")
   await cli.close()

if __name__ == '__main__':
    asyncio.run(main())

截图如下:
在这里插入图片描述

参考

databases
aiomysql

更多python相关内容,参考:【python总结】python学习框架梳理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1468215.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#上位机与三菱PLC的通信11---开发自己的通讯工具软件(WPF版)

1、先看颜值 2、开始干 1、创建项目 2、引入前面的通讯库 创建目录将前面生成的通讯库dll文件复制到项目的目录 本项目引入dll文件 3、创建命令基类 RelayCommand.cs代码 using System; using System.Collections.Generic; using System.Linq; using System.Text; using Syst…

HEVC视频编解码标准学习笔记-1

视频编解码标准H.265/HEVC(High Efficiency Video Coding)通过将视频数据编码为更高效格式,大幅改善了视频流的压缩效率。这里主要介绍Tile、Slice和CTU的定义,以及介绍这些技术组件之间的相互关系。 CTU(编码树单元&…

【数据结构】C语言实现二叉树的相关操作

树 定义 树(Tree)是 n (n > 0) 个结点的有限集 若 n 0,称为空树 若 n > 0,则它满足如下两个条件: 有且仅有一个特定的称为根(Root)的结点其余结点可分为 m(m>0) 个互不相交的有限…

云原生超融合八大核心能力|ZStack Edge云原生超融合产品技术解读

企业数字化转型的焦点正在发生变化,云基础设施由资源到应用,数据中心从核心到边缘。面向云原生趋势,围绕应用升级,新一代超融合产品——云原生超融合应运而生。 ZStackEdge云原生超融合是基于云原生架构研发的新一代IT基础设施 …

Docker基础篇(四) 容器数据卷 容器间传递共享(--volumes-from)

容器间传递共享 当前没有运行的容器 两个数据卷: containVolum-01 containVolum-02 docker run -it --name zenA zen/centos 上面生成了容器 zenA ctrl P Q docker run -it --name zenB1 --volumes-from zenA zen/centos ctrl P Q docker run -it --name zen…

07 Redis之持久化(RDB(Redis DataBase) + 写时复制 + AOF(Append Only File)+混合持久化)

4 Redis持久化 Redis 是一个内存数据库,然而内存中的数据是不持久的,若主机宕机或 Redis 关机重启,则内存中的数据全部丢失。 当然,这是不允许的。Redis 具有持久化功能,其会按照设置以快照或操作日志的形式将数据持…

GEE入门篇|遥感专业术语(实践操作2):空间分辨率(Spatial Resolution)

目录 空间分辨率(Spatial Resolution) 1.MODIS(搭载在Aqua 和 Terra 卫星上) 2. TM(搭载在早期LandSat卫星上) 3.MSI(搭载在在Sentinel-2 卫星上) 4.NAIP 空间分辨率&#xff0…

Unity中.Net与Mono的关系

什么是.NET .NET是一个开发框架,它遵循并采用CIL(Common Intermediate Language)和CLR(Common Language Runtime)两种约定, CIL标准为一种编译标准:将不同编程语言(C#, JS, VB等)使用各自的编译器,按照统…

C++ STL vector详解

1. vector简介 template<class T, class Alloc allocator<T>> class vector; vector是一个可以动态增长的数组&#xff0c;T是要存储的元素类型。vector可以像数组一样&#xff0c;用下标[]来访问元素&#xff0c;如&#xff1a; int arr[] {1,2,3,4}; for (i…

[极客挑战2019]HTTP

这道题考察的是http请求头字段的含义和使用&#xff1b; 具体如下 Referer:来源地址 User-Agent:客户端配置信息&#xff1a;浏览器类型、版本、系统类型等 X-Forwarded-For:代理地址&#xff0c;即数据发出的地址 开始解题&#xff1a;&#xff08;对我这初学者真的烧脑&a…

【机器学习】特征工程之特征选择

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;机器学习 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进…

抖音视频评论数据提取软件|抖音数据抓取工具

一、开发背景&#xff1a; 在业务需求中&#xff0c;我们经常需要下载抖音视频。然而&#xff0c;在网上找到的视频通常只能通过逐个复制链接的方式进行抓取和下载&#xff0c;这种操作非常耗时。我们希望能够通过关键词自动批量抓取并选择性地下载抖音视频。因此&#xff0c;为…

Pycharm服务器配置与内网穿透工具结合实现远程开发的解决方法

文章目录 一、前期准备1. 检查IDE版本是否支持2. 服务器需要开通SSH服务 二、Pycharm本地链接服务器测试1. 配置服务器python解释器 三、使用内网穿透实现异地链接服务器开发1. 服务器安装Cpolar2. 创建远程连接公网地址 四、使用固定TCP地址远程开发 本文主要介绍如何使用Pych…

STM32CubeMX FOC工程配置(AuroraFOC)

一. 简介 哈喽&#xff0c;大家好&#xff0c;今天给大家带来基于AuroraFOC开发板的STM32CubeMX的工程配置&#xff0c;主要配置的参数如下: 1. 互补PWM输出 2. 定时器注入中断ADC采样 3. SPI配置 4. USB CDC配置 5. RT Thread配置 大家如果对这几部分感兴趣的话&#xff0c…

C语言——实用调试技巧——第2篇——(第23篇)

坚持就是胜利 文章目录 一、实例二、如何写出好&#xff08;易于调试&#xff09;的代码1、优秀的代码2、示范&#xff08;1&#xff09;模拟 strcpy 函数方法一&#xff1a;方法二&#xff1a;方法三&#xff1a;有弊端方法四&#xff1a;对方法三进行优化assert 的使用 方法五…

浅析DPDK内存管理:Mempool

文章目录 前言Mempool工作机制Mempool数据结构rte_mempool_opsMempool操作接口rte_mempool_create&#xff1a;创建Mempoolrte_mempool_get&#xff1a;申请对象rte_mempool_put&#xff1a;释放对象 相关参考 前言 DPDK提供了一套内存池管理机制&#xff0c;以提供高效分配固…

了解人工智能的13个细分领域

人工智能&#xff08;Artificial Intelligence&#xff0c;简称AI&#xff09;作为当今最热门和前沿的技术之一&#xff0c;已经在各种领域发挥着越来越重要的作用。随着人工智能技术的不断进步和应用&#xff0c;AI的细分领域也越来越多。目前&#xff0c;根据AI的应用领域和特…

flink源码分析 - 获取调用位置信息

flink版本: flink-1.11.2 调用位置: org.apache.flink.streaming.api.datastream.DataStream#flatMap(org.apache.flink.api.common.functions.FlatMapFunction<T,R>) 代码核心位置: org.apache.flink.api.java.Utils#getCallLocationName() flink算子flatmap中调用了一…

【深度学习笔记】3_6 代码实现softmax-regression

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 3.6 softmax回归的从零开始实现 这一节我们来动手实现softmax回归。首先导入本节实现所需的包或模块。 import torch import torchvision import numpy as np import…

神经网络系列---权重初始化方法

文章目录 权重初始化方法Xavier初始化&#xff08;Xavier initialization&#xff09;Kaiming初始化&#xff0c;也称为He初始化LeCun 初始化正态分布与均匀分布Orthogonal InitializationSparse Initializationn_in和n_out代码实现 权重初始化方法 Xavier初始化&#xff08;X…