SqlAlchemy使用教程(七) 异步访问数据库

news2024/12/26 14:13:06

在这里插入图片描述

  • SqlAlchemy使用教程(一) 原理与环境搭建
  • SqlAlchemy使用教程(二) 入门示例及编程步骤
  • SqlAlchemy使用教程(三) CoreAPI访问与操作数据库详解
  • SqlAlchemy使用教程(四) MetaData 与 SQL Express Language 的使用
  • SqlAlchemy使用教程(五) ORM API 编程入门
  • SqlAlchemy使用教程(六) – ORM 表间关系的定义与CRUD操作

注:本章要求熟悉Python异步编程的基础知识

1、SqlAlchemy 异步编程基础

1.1 异步访问数据的优点

  • 当数据库访问较频繁时,异步编程可以明显提高运行效率。
  • 可以配合FastAPI 等异步框架,用异步访问使用SqlAlchemy,充分发挥异步框架的优势 。

SqlAlchmy 1.4 提供了Core层的异步接口, 2.0提供了 异步ORM接口

1.2 编程环境准备

(1) 安装异步依赖库

sqlalchemy 的异步接口基于 greenlet,在setuptools配置中为可选安装,

安装异步
pip install sqlalchemy[asyncio]

或者自已在setup.py 中查找greenlet版本号,手工
pip install greenlet==xx.yy.zz

(2)安装数据库的异步驱动。

pip install aiosqlite

以下是常见数据库的异步驱动库
sqlite3 :

  • aiosqlite

mysql:

  • aiomysql。
  • asyncmy: 这是1个支持 MySQL/MariaDB 的高性能异步库

PostgreSQL:

  • aiopg,
  • asyncpg:
  • asyncpgsa: 是asyncpg库的封装,适用于Sqlalchemy.

2、Core Async API

Core API层的异步,首先通过 create_async_engine() 创建1个异步AsyncEngine对象,该对象提供异步连接AsyncEngine.connect() , 或者通过上下文使用AsyncEngine.begin()创建的连接。
同时,还提供了AsyncConnection.run_sync() 用于执行一些内部的同步方法,如 MetaData.create_all()。

下面我们通过实现来查看1个完整过程

import asyncio
from sqlalchemy import Column, MetaData, select, String, Table
from sqlalchemy.ext.asyncio import create_async_engine

meta = MetaData()
t1 = Table(
    'tb1',
    meta,
    Column('name', String(50), primary_key=True),
    Column('profile', String(50), nullable=True),
)

async def main():
    engine = create_async_engine("sqlite+aiosqlite:///:memory:")
    async with engine.begin() as conn:
        await conn.run_sync(meta.create_all)
        await conn.execute(
            t1.insert(), [{"name": "some name 1", 'profile': 'some profile 1'},
                          {"name": "some name 2", 'profile': 'some profile 2'}
                          ]
        )
    async with engine.begin() as conn:
        result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
        print(result.fetchall())
    await engine.dispose()
asyncio.run(main())

异步流式查询

使用AsyncConnection.stream() 执行SQL, 返回AsyncResult对象。

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))

    async for row in async_result:
        print("row: %s" % (row,))

3、异步ORM 编程API

3.1 异步ORM API介绍

ORM的异步接口主要由AsyncSession 类来提供。
AsyncSession对象由async_sessionmaker() 工厂方法来创建。
注意:1个AsyncSession 实例只能用在1个coroutine 内。 每个协程要使用不同的AsyncSession对象。

async_session = async_sessionmaker(async_engine, expire_on_commit=False)

手动关闭AsyncSession对象,
AsyncSession.close()

异步执行SQL操作
AsyncSession.execute()
AsyncSession.scalars()

对于具有relation 关系的表的操作,同步模式下存在由lazy load 带来的Implicit IO,异步模式下不支持lazy load.

3.2 完整示例

import asyncio
from typing import List
from sqlalchemy import ForeignKey, select, String, Integer
from sqlalchemy.ext.asyncio import (create_async_engine,
                                    async_sessionmaker,
                                    AsyncSession,
                                    AsyncAttrs
                                    )
from sqlalchemy.orm import (DeclarativeBase,
                            Mapped,
                            mapped_column,
                            relationship,
                            selectinload)

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "user"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    age: Mapped[int] = mapped_column(Integer())
    company_id: Mapped[int] = mapped_column(ForeignKey("company.id"))
    company: Mapped["Company"] = relationship(back_populates="users")

class Company(Base):
    __tablename__ = "company"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    users: Mapped[List[User]] = relationship()

async def insert_data(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            session.add_all([
                Company(name="Baidu", users=[]),
                Company(name="Alibaba", users=[]),
                User(name='Tom', age=21, company_id=1),
                User(name='Jerry', age=22, company_id=2),
                User(name='Jack', age=23, company_id=1),
            ])

async def main() -> None:
    engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
    async_session = async_sessionmaker(engine, expire_on_commit=False)

    # 创建表
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # 使用异步协程插入数据
    await asyncio.gather(insert_data(async_session))
    # await insert_data(async_session)  # 或者直接执行 

    # 查询User表数据,联合查询Company表数据
    async with async_session() as session:
        stmt = select(User, Company).join(User.company).order_by(User.name)
        result = await session.execute(stmt)
        for row in result.scalars():
            print(row.id, row.name, row.age, row.company.name)

    # 查询 Company 数据,反向查询User表数据
    print("查询 Company 数据,反向查询User表数据")
    async with async_session() as session:
        stmt = select(Company).options(selectinload(Company.users))
        result = await session.execute(stmt)
        for row in result.scalars():
            print(row.id, row.name)
            for user in row.users:
                print('\t', user.id, user.name, user.age)

    await engine.dispose()

asyncio.run(main())


执行上述代码,输出为:

output 
3 Jack 23 Baidu
2 Jerry 22 Alibaba
1 Tom 21 Baidu
查询 Company 数据,反向查询User表数据
1 Baidu
         1 Tom 21
         3 Jack 23
2 Alibaba
         2 Jerry 22

说明:

  • User表定义有外键字段,与company是1对1对多关系,查询User表时,如果希望同时获得 Company表数据,应使用联合查询。
  • 查询Company表中,反向查询 User表数据,须处理懒加载问题,参考下一节.

3.3 关系查询中懒加载问题

如果A, B之间存在1对多关系, B中的外键指向A,SqlAlchemy在查询 A表的数据后,如果设置了反向查询字段,默认 SqlAlchemy会对关联表隐式地发送查询请求。由于这个I/O是同步的,因此 AsyncSession是不支持此操作。会Block此操作。

有两种解决办法:

方法一:引入AsyncAttrs Mixin混入类


from sqlalchemy.ext.asyncio import AsyncAttrs

class Base(AsyncAttrs, DeclarativeBase):   # Base引入AsyncAttrs
    pass
# 表A与B之间存在外键关系。 
class A(Base):
    __tablename__ = "a"
    # ... rest of mapping ...
    bs: Mapped[List[B]] = relationship()  # 反射查询关系

class B(Base):
    __tablename__ = "b"
    a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
    # ... rest of mapping ...

A的 bs属性查询时是lazy load,将被做为 AsyncAttrs来处理,阻止其发磅IO
联合查询时,要手工用异步方式执行查询操作

a1 = (await session.scalars(select(A))).one()
for b1 in await a1.awaitable_attrs.bs:
    print(b1)

方式2: 用异步eager load加载关系表数据
如果不使用AsyncAttrs 方式,可用 eager load 来解决:
最常用eager load方法为selectinload() ,其与select()形成链式调用

stmt = select(A).options(selectinload(A.bs))
result = wait session.scalars(stmt)
for r in result: 
    print(r.id, r.data, r.bs) 

注意:

  • 当A构建新对象时,对bs总是赋个空值, 如 A(bs=[], data="a2")

3.4 运行同步方法

如同上一节提到,Core API 的 AsyncConnection对象提供了run_sync()方法运行同步方法,同样ORM API中,AsyncSession对象也提供了run_sync() 执行同步方法。

await session.run_sync(fetch_and_update_objects)    # fetch_and_update_objects() 是1个同步方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2180184.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Find My储物盒|苹果Find My技术与储物盒结合,智能防丢,全球定位

储物盒是用来存储,收藏东西的器具。储物盒可以帮助用户合理利用有限的空间,通过分类归置物品,避免浪费和混乱。储物盒能够有效地保护存放的物品,防止它们受到灰尘、污渍、损坏和潮湿的影响。储物盒还可以增加空间利用率、方便搬家…

Windows环境下使用Docker配置MySQL数据库

用Docker配置数据库,无论是做开发,还是做生产部署,都非常的方便 它不需要单独安装数据库,也不用担心出现各种环境的配置问题。 本文将分享用Docker配置数据库的步骤,这里用MySQL举例。 其他的数据库如MSSQL&#xf…

全球IP归属地查询-IP地址查询-IP城市查询-IP地址归属地-IP地址解析-IP位置查询-IP地址查询API接口

IP地址城市版查询接口 API是指能够根据IP地址查询其所在城市等地理位置信息的API接口。这类接口在网络安全、数据分析、广告投放等多个领域有广泛应用。以下是一些可用的IP地址城市版查询接口API及其简要介绍 1. 快证 IP归属地查询API 特点:支持IPv4 提供高精版、…

Scalefit:有效避免工作场所运动损伤的解决方案

在当今快节奏的工作环境中,运动损伤已成为一个不容忽视的问题。长时间的久坐、重复性动作以及缺乏适当的运动,都可能导致肌肉骨骼损伤、关节疼痛等问题。作为一款专注于运动健康管理的平台,Scalefit Industrial Athlete通过科学的方法和个性化…

天坑!Spark+Hive+Paimon+Dolphinscheduler

背景: 数据中台项目使用Spark+Hive+Paimon做湖仓底层,调度任务使用的是基于Dolphinscheduler进行二开。在做离线脚本任务开发时,在Paimon库下执行非查询类SQL报错。 INSERT报错 DELETE报错 现状: 原始逻辑为数据中台中选择的Paimon数据源,实际上在Dolphinscheduler中是…

生成靶标图像代码——C语言代码实现

1. 生成左右相机拍摄的3个彩色靶标的图像 两个相机在x轴方向上平移 // 生成左右相机拍摄3个靶标时的图像 生成彩色靶标 #include <stdio.h> #include <stdlib.h> #include <math.h>// 图像尺寸 #define WIDTH 1920 #define HEIGHT 1080// BMP头信息 #pra…

掌握自动化测试必要的几种技能?

1.自动化测试员技能——编程语言 当我开始担任手动测试人员时&#xff0c;我不喜欢编码。但是&#xff0c;当我逐渐进入自动化领域时&#xff0c;对我来说很清楚&#xff0c;如果没有对编程语言的一些基本了解&#xff0c;就无法编写逻辑自动化测试脚本。 对编程有一点了解&a…

短视频矩阵源码oem/矩阵系统搭建/源码开发注意事项知识分享

短视频矩阵系统的源码框架主要涵盖Spring、Struts与Hibernate三种。Spring是一款全栈式Java应用开发框架&#xff0c;集成了IOC容器、AOP以及事务管理等关键功能。Struts则基于MVC架构设计&#xff0c;用于Web应用程序的开发&#xff0c;有效分离数据模型、用户界面及控制器逻辑…

全面指南:探索并实施解决Windows系统中“mfc140u.dll丢失”的解决方法

当你的电脑出现mfc140u.dll丢失的问题是什么情况呢&#xff1f;mfc140u.dll文件依赖了什么&#xff1f;mfc140u.dll丢失会导致电脑出现什么情况&#xff1f;今天这篇文章就和大家聊聊mfc140u.dll丢失的解决办法。希望能够有效的帮助你解决这问题。 哪些程序依赖mfc140u.dll文件…

深圳市软件行业协会领导到访开源网安,共筑大湾区数字经济安全未来

近日&#xff0c;深圳市软件行业协会会长邓爱国、秘书长郑飞等一行人到访开源网安进行参观交流。双方以网信行业技能培训、软件安全开发能力评价和智能网联汽车安全测试等方面为探讨方向&#xff0c;对未来的合作进行了深入交流。 在参观过后&#xff0c;深圳市软件行业协会相关…

查找满足条件的行序号

有 2022 年 1 月的日销售额统计表如下所示&#xff1a; 找出日销售额大于 1000 的日子&#xff1a; spl("E(?1).pselecta(Sales>1000)",A1:B32)pselecta()返回所有满足条件的记录序号&#xff0c;pselect() 则只返回第一个满足条件的行序号 免费课程、免费软件下…

Elasticsearch要点简记

Elasticsearch要点简记 1、ES概述2、基础概念&#xff08;1&#xff09;索引、文档、字段&#xff08;2&#xff09;映射&#xff08;3&#xff09;DSL 3、架构原理4、索引字段的数据类型5、ES的三种分页方式&#xff08;1&#xff09;深度分页&#xff08;fromsize&#xff09…

码随想录算法训练营第60天|卡码网:94. 城市间货物运输 I、95. 城市间货物运输 II、96. 城市间货物运输 III

1.卡码网&#xff1a;94. 城市间货物运输 I 题目链接&#xff1a;https://kamacoder.com/problempage.php?pid1152 文章链接&#xff1a;https://www.programmercarl.com/kamacoder/0094.城市间货物运输I-SPFA.html 思路&#xff1a; 只对 上一次松弛的时候更新过的节点作为出…

智能餐饮:Spring Boot 点餐系统

第四章 系统设计 4.1 系统体系结构 网上点餐系统的结构图4-1所示&#xff1a; 图4-1 系统结构 模块包括主界面&#xff0c;首页、个人中心、用户管理、美食店管理、美食分类管理、美食信息管理、美食订单管理、美食评价管理、系统管理等进行相应的操作。 登录系统结构图&…

2025年3月PMP考试《PMBOK®指南》第六版不再作为参考资料

大家都知道《PMBOK指南》第六版是PMP认证考试的必备教材&#xff0c;由项目管理协会&#xff08;PMI&#xff09;指定。本书详细介绍了项目管理的5个过程组&#xff0c;并对项目管理的10个知识领域进行了阐述。 就在2024.9.30昨天的时候中国国际人才交流基金会公布了&#xff…

崖山数据库的共享集群机制初探

本文作者&#xff1a;YashanDB高级服务工程师周国超 YashanDB共享集群是崖⼭数据库系统&#xff08;YashanDB&#xff09;的⼀个关键特性&#xff0c;它是⼀个单库多实例的多活数据库系统。⽤⼾可以连接到任意实例访问同⼀个数据库&#xff0c;多个数据库实例能够并发读写同⼀…

QT对QBytearray的data()指针进行结构体转换时会自动字节对齐填充

1、测试代码 #include <QCoreApplication>#pragma pack(push, 1) typedef struct {int a;float b;char c;int *d; }testStruct; #pragma pack(pop)#include <QByteArray> #include <QDebug>int main() {testStruct structA;structA.a 1;structA.b 2;struc…

正点原子阿波罗STM32F429IGT6移植zephyr rtos(二)---使用I2C驱动MPU6050

硬件平台&#xff1a;正点原子阿波罗STM32F429IGT6 zephyr版本&#xff1a;Zephyr version 3.7.99 开发环境&#xff1a;ubuntu 24.4 zephyr驱动开发与之前接触到的开发方式可能都不一样&#xff0c;更像是linux驱动开发&#xff0c;zephyr源码里边其实已经有写好的I2C和MPU60…

谷歌SEO:有心栽花花不开,无心插柳柳成荫!

之前一开始是想搞个谷歌SEO免费的技术教程博客&#xff08;https://www.c-sz.com/&#xff09;主要是很多时候遇到在谷歌独立站推广群里的朋友需要咨询和学习一些谷歌技术基础知识&#xff0c;当然我自己也有点小心思&#xff0c;就是希望在谷歌能吸引部分的谷歌SEO爱好者尤其包…

【AI学习】DDPM 无条件去噪扩散概率模型实现(pytorch)

这里主要使用pytorch实现基本的无条件去噪扩散模型&#xff0c;理论上面的推导这里不重点介绍。 原文理论参考&#xff1a; 前向和反向过程示意图 前向过程和后向过程 扩散过程包括正向过程和反向过程。前向过程是基于噪声调度的预定马尔可夫链。噪声表是一组方差 &#xff0…