测试与FastAPI应用数据之间的差异

news2025/1/11 12:35:10

【squids.cn】 全网zui低价RDS,免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等

当使用两个不同的异步会话来测试FastAPI应用程序与数据库的连接时,可能会出现以下错误:

  1. 在测试中,在数据库中创建了一个对象(测试会话)。 

  2. 在应用程序中发出一个请求,在此请求中修改了这个对象(应用会话)。 

  3. 在测试中从数据库加载对象,但其中没有所需的更改(测试会话)。 

让我们找出发生了什么。

我们通常在应用程序和测试中使用两个不同的会话。

此外,在测试中,我们通常将会话包装在一个准备数据库进行测试的fixture中,测试完成后,所有内容都会被清理。

以下是应用程序的示例。

一个带有数据库连接的文件app/database.py:

""" Database settings file """
from typing import AsyncGenerator
​
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base
​
DATABASE_URL = "postgresql+asyncpg://user:password@host:5432/dbname"
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
async_session = async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
​
​
async def get_session() -> AsyncGenerator:
   """ Returns async session """
   async with async_session() as session:
       yield session
​
Base = declarative_base()

一个带有模型描述的文件app/models.py: 

""" Model file """from sqlalchemy import Integer, Stringfrom sqlalchemy.orm import Mapped, mapped_columnfrom .database import Baseclass Lamp(Base):   """ Lamp model """   __tablename__ = 'lamps'   id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)   status: Mapped[str] = mapped_column(String, default="off")

一个带有端点描述的文件app/main.py: 

""" Main file """import loggingfrom fastapi import FastAPI, Dependsfrom sqlalchemy import selectfrom sqlalchemy.ext.asyncio import AsyncSessionfrom .database import get_sessionfrom .models import Lampapp = FastAPI()@app.post("/lamps/{lamp_id}/on")async def check_lamp(       lamp_id: int,       session: AsyncSession = Depends(get_session)) -> dict:   """ Lamp on endpoint """   results = await session.execute(select(Lamp).where(Lamp.id == lamp_id))   lamp = results.scalar_one_or_none()   if lamp:       logging.error("Status before update: %s", lamp.status)       lamp.status = "on"       session.add(lamp)       await session.commit()       await session.refresh(lamp)       logging.error("Status after update: %s", lamp.status)   return {}

我特意在示例中添加了日志记录和一些其他请求,以使其更加清晰。

这里,使用Depends创建了一个会话。

以下是带有测试示例的文件tests/test_lamp.py:

""" Test lamp """
import logging
from typing import AsyncGenerator
​
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
​
from app.database import Base, engine
from app.main import app, Lamp
​
​
@pytest_asyncio.fixture(scope="function", name="test_session")
async def test_session_fixture() -> AsyncGenerator:
   """ Async session fixture """
   async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
​
   async with async_session() as session:
       async with engine.begin() as conn:
           await conn.run_sync(Base.metadata.create_all)
​
       yield session
​
   async with engine.begin() as conn:
       await conn.run_sync(Base.metadata.drop_all)
​
   await engine.dispose()
​
​
@pytest.mark.asyncio
async def test_lamp_on(test_session):
   """ Test lamp switch on """
   lamp = Lamp()
   test_session.add(lamp)
   await test_session.commit()
   await test_session.refresh(lamp)
​
   logging.error("New client status: %s", lamp.status)
   assert lamp.status == "off"
​
   async with AsyncClient(app=app, base_url="http://testserver") as async_client:
       response = await async_client.post(f"/lamps/{lamp.id}/on")
       assert response.status_code == 200
   results = await test_session.execute(select(Lamp).where(Lamp.id == lamp.id))
   new_lamp = results.scalar_one_or_none()
   logging.error("Updated status: %s", new_lamp.status)
​
   assert new_lamp.status == "on"

这是一个常规的Pytest,它在一个fixture中获取到数据库的会话。在返回会话之前,此fixture会先创建所有的表格,使用之后,它们会被删除。

请再次注意,在测试中,我们使用来自test_session fixture的会话,而在主代码中,我们使用来自app/database.py文件的会话。尽管我们使用相同的引擎,但是生成的会话是不同的。这一点很重要。

预期的数据库请求序列

应从数据库返回status = on。

在测试中,我首先在数据库中创建一个对象。这是通过来自测试的会话进行的常规INSERT。我们称其为Session 1。此时,只有这个会话连接到了数据库。应用程序会话尚未连接。

在创建对象之后,我执行了一个刷新操作。这是通过Session 1对新创建的对象进行的SELECT,并通过实例更新。

结果,我确保对象正确创建,并且status字段填充了所需的值 - off。

然后,我对/lamps/1/on端点执行一个POST请求。这是打开灯的操作。为了使示例更短,我没有使用fixture。一旦请求开始工作,将创建一个新的数据库会话。我们称其为Session 2。使用这个会话,我从数据库加载所需的对象。我将状态输出到日志。它是off。之后,我更新了这个状态并在数据库中保存了更新。数据库收到了一个请求:

BEGIN (implicit)UPDATE lamps SET status=$1::VARCHAR WHERE lamps.id = $2::INTEGERparameters: ('on', 1)COMMIT

请注意,也存在COMMIT命令。尽管事务是隐式的,但其结果在其他会话中在COMMIT之后立即可用。

接下来,我使用refresh发出请求,从数据库获取更新后的对象。我输出状态。现在它的值是on。

看起来一切都应该正常工作。端点停止工作,关闭Session 2,并将控制权转移到测试。

在测试中,我从Session 1发出常规请求,以获取修改过的对象。但在状态字段中,我看到的值是off。

以下是代码中操作序列的方案。

代码中的操作序列

代码中的操作序列 同时,根据所有日志,最后一个SELECT请求被执行并返回了status = on。此刻,数据库中的其值肯定等于on。这是engine asyncpg响应SELECT请求时接收的值。

那么,发生了什么?

发生了以下情况。

结果是,为获取新对象而做的请求并没有更新当前的对象,而是找到并使用了现有的对象。一开始,我使用ORM添加了一个灯泡对象。我在另一个会话中更改了它。当更改完成时,当前会话对此更改一无所知。并且在Session 2中进行的提交并未在Session 1中请求expire_all方法。

为了修复这个问题,你可以做以下操作之一:

  1. 对于测试和应用程序使用共享会话。

  2. 刷新实例,而不是尝试从数据库中获取它。

  3. 强制使实例过期。

  4. 关闭会话。

依赖性覆盖

为了使用相同的会话,您可以简单地使用我在测试中创建的那个覆盖应用程序中的会话。这很简单。

为此,我们需要在测试中添加以下代码:

async def _override_get_db():   yield test_sessionapp.dependency_overrides[get_session] = _override_get_db

如果你愿意,可以将这部分包装成一个夹具,以便在所有测试中使用。

所得到的算法将如下所示:

代码中使用依赖性覆盖时的步骤

下面是带有会话替代的测试代码:

@pytest.mark.asyncioasync def test_lamp_on(test_session):   """ Test lamp switch on """   async def _override_get_db():       yield test_session   app.dependency_overrides[get_session] = _override_get_db   lamp = Lamp()   test_session.add(lamp)   await test_session.commit()   await test_session.refresh(lamp)   logging.error("New client status: %s", lamp.status)   assert lamp.status == "off"   async with AsyncClient(app=app, base_url="http://testserver") as async_client:       response = await async_client.post(f"/lamps/{lamp.id}/on")       assert response.status_code == 200   results = await test_session.execute(select(Lamp).where(Lamp.id == 1))   new_lamp = results.scalar_one_or_none()   logging.error("Updated status: %s", new_lamp.status)   assert new_lamp.status == "on"

但是,如果应用程序使用多个会话(这是可能的),那么这可能不是最佳方法。此外,如果在被测试的函数中没有调用commit或rollback,那么这也将无济于事。

刷新 

第二种解决方案是最简单和最有逻辑的。我们不应该创建一个新的请求来获取一个对象。为了更新,处理端点请求后立即调用刷新就足够了。在内部,它调用expires,这导致保存的实例不用于新的请求,并且数据重新填充。这种解决方案是最有逻辑的,也最容易理解。

await test_session.refresh(lamp)

之后,你不需要再试图加载new_lamp对象,只需检查相同的lamp。

以下是使用刷新的代码方案。

使用刷新时的代码中的步骤 

以下是带有更新的测试代码。

@pytest.mark.asyncioasync def test_lamp_on(test_session):   """ Test lamp switch on """   lamp = Lamp()   test_session.add(lamp)   await test_session.commit()   await test_session.refresh(lamp)   logging.error("New client status: %s", lamp.status)   assert lamp.status == "off"   async with AsyncClient(app=app, base_url="http://testserver") as async_client:       response = await async_client.post(f"/lamps/{lamp.id}/on")       assert response.status_code == 200   await test_session.refresh(lamp)   logging.error("Updated status: %s", lamp.status)   assert lamp.status == "on"

过期 

但是,如果我们更改了很多对象,最好调用expire_all。然后,所有实例都将从数据库中读取,一致性不会被破坏。

test_session.expire_all()

你还可以在特定实例甚至实例属性上调用expire。

test_session.expire(lamp)

在这些调用之后,你将不得不手动从数据库中读取对象。

以下是使用过期时代码中的步骤序列。

使用过期时的代码中的步骤

使用过期时的代码中的步骤 以下是带有过期的测试代码。

@pytest.mark.asyncioasync def test_lamp_on(test_session):   """ Test lamp switch on """   lamp = Lamp()   test_session.add(lamp)   await test_session.commit()   await test_session.refresh(lamp)   logging.error("New client status: %s", lamp.status)   assert lamp.status == "off"   async with AsyncClient(app=app, base_url="http://testserver") as async_client:       response = await async_client.post(f"/lamps/{lamp.id}/on")       assert response.status_code == 200   test_session.expire_all()   # OR:   # test_session.expire(lamp)   results = await test_session.execute(select(Lamp).where(Lamp.id == 1))   new_lamp = results.scalar_one_or_none()   logging.error("Updated status: %s", new_lamp.status)   assert new_lamp.status == "on"

关闭 

实际上,使用会话终止的最后一种方法也调用了expire_all,但会话可以进一步使用。当读取新数据时,我们将获得最新的对象。

await test_session.close()

这应该在应用程序请求完成之后并在检查开始之前立即调用。

以下是使用关闭时代码中的步骤。

使用关闭时的代码中的步骤

以下是带有会话关闭的测试代码。

@pytest.mark.asyncioasync def test_lamp_on(test_session):   """ Test lamp switch on """   lamp = Lamp()   test_session.add(lamp)   await test_session.commit()   await test_session.refresh(lamp)   logging.error("New client status: %s", lamp.status)   assert lamp.status == "off"   async with AsyncClient(app=app, base_url="http://testserver") as async_client:       response = await async_client.post(f"/lamps/{lamp.id}/on")       assert response.status_code == 200   await test_session.close()   results = await test_session.execute(select(Lamp).where(Lamp.id == 1))   new_lamp = results.scalar_one_or_none()   logging.error("Updated status: %s", new_lamp.status)   assert new_lamp.status == "on"

调用 rollback() 也会有帮助。它也调用 expire_all,但它明确地回滚了事务。如果需要执行事务,commit() 也会执行 expire_all。但在这个例子中,既不需要回滚也不需要提交,因为测试中的事务已经完成,应用程序中的事务不会影响来自测试的会话。

实际上,此功能仅在SQLAlchemy ORM的异步模式中在事务中工作。然而,在代码中我确实向数据库发出请求以获取新对象的行为看起来似乎不合逻辑,如果它仍然返回一个缓存的对象,而不是从数据库强制接收的对象。当调试代码时,这有点令人困惑。但当正确使用时,这就是它应有的样子。

结论 

在异步模式下使用SQLAlchemy ORM,您必须并行跟踪事务和线程中的会话。如果所有这些看起来太复杂,那么使用SQLAlchemy ORM的同步模式。它里面的一切都简单得多。

作者:Aleksei Sharypov

更多内容请关注公号【云原生数据库】

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1023846.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

指针笔试题讲解-----让指针简单易懂(2)

目录 回顾上篇重点 : 一.笔试题 ( 1 ) 二.笔试题 ( 2 ) 科普进制知识点 (1) 二进制 (2) 八进制 (3)十六进制 三.笔试题( 3 ) 四.笔试题( 4 ) 五.笔试题( 5 ) 六.笔试题( …

Word中的图片保存后变模糊怎么解决

目录 1.介绍 2.原因 3.解决方案 Word是由微软公司开发的一款文字处理软件,它是Microsoft Office套件的一部分。Word提供了丰富的功能和工具,使用户能够创建、编辑和格式化文档。它支持各种文本处理任务,包括编写信函、报告、论文、简历等。…

C# Onnx Yolov8 Detect Poker 扑克牌识别

效果 项目 代码 using Microsoft.ML.OnnxRuntime; using Microsoft.ML.OnnxRuntime.Tensors; using OpenCvSharp; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System…

OVS-DPDK学习

安装教程: https://docs.openvswitch.org/en/latest/intro/install/dpdk/ https://docs.openvswitch.org/en/latest/howto/dpdk/ overview和应用 https://www.intel.com/content/www/us/en/developer/articles/technical/open-vswitch-with-dpdk-overview.html OVS…

【网络安全】黑客自学笔记

1️⃣前言 🚀作为一个合格的网络安全工程师,应该做到攻守兼备,毕竟知己知彼,才能百战百胜。 计算机各领域的知识水平决定你渗透水平的上限🚀 【1】比如:你编程水平高,那你在代码审计的时候就会比…

【Map篇】HashTable详解

目录 成员变量属性构造函数put()remove()get()总结: HashTable的优点?HashTable 是一种基于哈希函数的数据结构。它将每个键Key映射到一个唯一的索引Index,通过这个索引来快速访问数据。底层是一个数组,数组中的每个元素称为桶(bucket)。 当我们需要访问某个元素时,首先会对…

Python 网络爬取的时候使用那种框架

尽管现代的网站多采取前后端分离的方式进行开发了,但是对直接 API 的调用我们通常会有 token 的限制和可以调用频率的限制。 因此,在一些特定的网站上,我们可能还是需要使用网络爬虫的方式获得已经返回的 JSON 数据结构,甚至是处理…

[架构之路-218]- 架构师责权利的定位, 架构师是技术领导者、决策者、激励者、企业家思维、战略思维、理论指导

目录 一、架构的诉求与系统的规模和复杂度强相关 1.1 系统的规模和复杂度对架构的影响 1.2 系统的业务需求对架构的影响 1.3 业架构和软件架构 二、架构师的类型 三、系统架构师 3.1 什么是系统架构师 3.2 系统架构师的技术素质要求 3.3 系统架构师的管理素质要求 3.…

【小程序】九宫格抽奖,页面不是有点丑,功能没啥问题,有需要直接拿去改吧

概述 常用活动抽奖功能,九宫格抽奖,两种方式切换,图片模式和文字模式,带抽奖次数。功能没啥问题,除了有点丑,css样式自己美化一下就可以了... 详细 微信小程序大转盘抽奖 演示图文字: 演示图…

Redis延迟双删-架构案例2021(三十二)

数据库设计 某医药销售企业因业务发展,需要建立线上药品销售系统,为用户提供便捷的互联网药品销售服务、该系统除了常规药品展示、订单、用户交流与反馈功能外,还需要提供当前热销产品排名、评价分类管理等功能。 通过对需求的分析&#xf…

JVM G1垃圾回收器学习笔记

前言 最近在工作中遇到频繁FullGC且YoungGC时间有时特别长的情况,而自己对JVM的垃圾回收也是一知半解,因此需要对JVM做系统的了解,为快速解决工作中的问题,能有效分析GC日志和业务代码,先从G1垃圾回收器开始学习&…

【论文阅读】检索增强发展历程及相关文章总结

文章目录 前言Knn-LMInsightMethodResultsDomain AdaptionTuning Nearest Neighbor Search Analysis REALMInsightsMethodKnowledge RetrieverKnowledge-Augmented Encoder ExpResultAblation StudyCase Study DPRInsightMethodExperimentsResults RAGInsightRAG-Sequence Mode…

小程序中如何查看会员的访问记录

​在小程序中,我们可以通过如下方式来查看会员的访问记录。下面是具体的操作流程: 1. 找到指定的会员卡。在管理员后台->会员管理处,找到需要查看访客记录的会员卡。也支持对会员卡按卡号、手机号和等级进行搜索。 2. 查看会员卡详情。点…

RK3588平台开发系列讲解(项目篇)视频监控之RTMP推流

文章目录 一、RTMP协议是什么二、RTMP 的原理三、Nginx 流媒体服务器四、FFmpeg 推流沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 目前常见的视频监控和视频直播都是使用了 RTMP、RTSP、HLS、MPEG-DASH、WebRTC流媒体传输协议等。 视频监控项目组成,分为三部分:…

PHP-composer安装扩展安装,批量操作合并pdf

清除Composer缓存: 运行以下命令来清除Composer的缓存,并再次尝试安装包。 bash composer clear-cache 使用不同的镜像源: Composer使用的默认包源可能会受到限制或访问问题。你可以切换到使用其他镜像源,如阿里云、Composer中国…

uni-app:实现等待加载功能

例子 下例是实现蓝牙连接的部分代码,先进行加载连接显示,在进行连接,连接成功/失败,都自动关闭加载效果 效果 核心 开始的加载效果: uni.showLoading({title: 正在连接,请稍候...,mask: true, }); 关闭…

进程转态及其转换过程

一.进程转态及其转换过程 在 Linux 操作系统中,进程的状态可以相互转换,下面是不同状态之间的相互转换: 就绪态(Ready State):当一个进程创建后,它被放入就绪态。此时,进程已经被加…

Docker Compose初使用

简介 Docker-Compose项目是Docker官方的开源项目,负责实现对Docker容器集群的快速编排。 Docker-Compose将所管理的容器分为三层,分别是 工程(project),服务(service)以及容器(cont…

密码学概论

1.密码学的三大历史阶段: 第一阶段 古典密码学 依赖设备,主要特点 数据安全基于算法的保密,算法不公开,只要破译算法 密文就会被破解, 在1883年第一次提出 加密算法应该基于算法公开 不影响密文和秘钥的安全&#xff…

《Kubernetes部署篇:Ubuntu20.04基于外部etcd+部署kubernetes1.25.14集群(多主多从)》

一、部署架构图 1、架构图如下所示: 2、部署流程图如下所示: 二、环境信息 1、资源下载基于外部etcd+部署容器版kubernetes1.25.14集群资源合集 2、部署规划主机名K8S版本系统版本内核版本IP地址备注k8s-master-121.25.14Ubuntu 20.04.5 LTS5.15.0-69-generic192.168.1.12ma…