【squids.cn】 全网zui低价RDS,免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等
当使用两个不同的异步会话来测试FastAPI应用程序与数据库的连接时,可能会出现以下错误:
-
在测试中,在数据库中创建了一个对象(测试会话)。
-
在应用程序中发出一个请求,在此请求中修改了这个对象(应用会话)。
-
在测试中从数据库加载对象,但其中没有所需的更改(测试会话)。
让我们找出发生了什么。
我们通常在应用程序和测试中使用两个不同的会话。
此外,在测试中,我们通常将会话包装在一个准备数据库进行测试的fixture中,测试完成后,所有内容都会被清理。
以下是应用程序的示例。
一个带有数据库连接的文件app/database.py:
""" Database settings file """
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base
DATABASE_URL = "postgresql+asyncpg://user:password@host:5432/dbname"
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
async_session = async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
async def get_session() -> AsyncGenerator:
""" Returns async session """
async with async_session() as session:
yield session
Base = declarative_base()
一个带有模型描述的文件app/models.py:
""" Model file """
from sqlalchemy import Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from .database import Base
class Lamp(Base):
""" Lamp model """
__tablename__ = 'lamps'
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
status: Mapped[str] = mapped_column(String, default="off")
一个带有端点描述的文件app/main.py:
""" Main file """
import logging
from fastapi import FastAPI, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from .database import get_session
from .models import Lamp
app = FastAPI()
@app.post("/lamps/{lamp_id}/on")
async def check_lamp(
lamp_id: int,
session: AsyncSession = Depends(get_session)
) -> dict:
""" Lamp on endpoint """
results = await session.execute(select(Lamp).where(Lamp.id == lamp_id))
lamp = results.scalar_one_or_none()
if lamp:
logging.error("Status before update: %s", lamp.status)
lamp.status = "on"
session.add(lamp)
await session.commit()
await session.refresh(lamp)
logging.error("Status after update: %s", lamp.status)
return {}
我特意在示例中添加了日志记录和一些其他请求,以使其更加清晰。
这里,使用Depends创建了一个会话。
以下是带有测试示例的文件tests/test_lamp.py:
""" Test lamp """
import logging
from typing import AsyncGenerator
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.database import Base, engine
from app.main import app, Lamp
@pytest_asyncio.fixture(scope="function", name="test_session")
async def test_session_fixture() -> AsyncGenerator:
""" Async session fixture """
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.mark.asyncio
async def test_lamp_on(test_session):
""" Test lamp switch on """
lamp = Lamp()
test_session.add(lamp)
await test_session.commit()
await test_session.refresh(lamp)
logging.error("New client status: %s", lamp.status)
assert lamp.status == "off"
async with AsyncClient(app=app, base_url="http://testserver") as async_client:
response = await async_client.post(f"/lamps/{lamp.id}/on")
assert response.status_code == 200
results = await test_session.execute(select(Lamp).where(Lamp.id == lamp.id))
new_lamp = results.scalar_one_or_none()
logging.error("Updated status: %s", new_lamp.status)
assert new_lamp.status == "on"
这是一个常规的Pytest,它在一个fixture中获取到数据库的会话。在返回会话之前,此fixture会先创建所有的表格,使用之后,它们会被删除。
请再次注意,在测试中,我们使用来自test_session fixture的会话,而在主代码中,我们使用来自app/database.py文件的会话。尽管我们使用相同的引擎,但是生成的会话是不同的。这一点很重要。
预期的数据库请求序列
应从数据库返回status = on。
在测试中,我首先在数据库中创建一个对象。这是通过来自测试的会话进行的常规INSERT。我们称其为Session 1。此时,只有这个会话连接到了数据库。应用程序会话尚未连接。
在创建对象之后,我执行了一个刷新操作。这是通过Session 1对新创建的对象进行的SELECT,并通过实例更新。
结果,我确保对象正确创建,并且status字段填充了所需的值 - off。
然后,我对/lamps/1/on端点执行一个POST请求。这是打开灯的操作。为了使示例更短,我没有使用fixture。一旦请求开始工作,将创建一个新的数据库会话。我们称其为Session 2。使用这个会话,我从数据库加载所需的对象。我将状态输出到日志。它是off。之后,我更新了这个状态并在数据库中保存了更新。数据库收到了一个请求:
BEGIN (implicit)
UPDATE lamps SET status=$1::VARCHAR WHERE lamps.id = $2::INTEGER
parameters: ('on', 1)
COMMIT
请注意,也存在COMMIT命令。尽管事务是隐式的,但其结果在其他会话中在COMMIT之后立即可用。
接下来,我使用refresh发出请求,从数据库获取更新后的对象。我输出状态。现在它的值是on。
看起来一切都应该正常工作。端点停止工作,关闭Session 2,并将控制权转移到测试。
在测试中,我从Session 1发出常规请求,以获取修改过的对象。但在状态字段中,我看到的值是off。
以下是代码中操作序列的方案。
代码中的操作序列
代码中的操作序列 同时,根据所有日志,最后一个SELECT请求被执行并返回了status = on。此刻,数据库中的其值肯定等于on。这是engine asyncpg响应SELECT请求时接收的值。
那么,发生了什么?
发生了以下情况。
结果是,为获取新对象而做的请求并没有更新当前的对象,而是找到并使用了现有的对象。一开始,我使用ORM添加了一个灯泡对象。我在另一个会话中更改了它。当更改完成时,当前会话对此更改一无所知。并且在Session 2中进行的提交并未在Session 1中请求expire_all方法。
为了修复这个问题,你可以做以下操作之一:
-
对于测试和应用程序使用共享会话。
-
刷新实例,而不是尝试从数据库中获取它。
-
强制使实例过期。
-
关闭会话。
依赖性覆盖
为了使用相同的会话,您可以简单地使用我在测试中创建的那个覆盖应用程序中的会话。这很简单。
为此,我们需要在测试中添加以下代码:
async def _override_get_db():
yield test_session
app.dependency_overrides[get_session] = _override_get_db
如果你愿意,可以将这部分包装成一个夹具,以便在所有测试中使用。
所得到的算法将如下所示:
代码中使用依赖性覆盖时的步骤
下面是带有会话替代的测试代码:
@pytest.mark.asyncio
async def test_lamp_on(test_session):
""" Test lamp switch on """
async def _override_get_db():
yield test_session
app.dependency_overrides[get_session] = _override_get_db
lamp = Lamp()
test_session.add(lamp)
await test_session.commit()
await test_session.refresh(lamp)
logging.error("New client status: %s", lamp.status)
assert lamp.status == "off"
async with AsyncClient(app=app, base_url="http://testserver") as async_client:
response = await async_client.post(f"/lamps/{lamp.id}/on")
assert response.status_code == 200
results = await test_session.execute(select(Lamp).where(Lamp.id == 1))
new_lamp = results.scalar_one_or_none()
logging.error("Updated status: %s", new_lamp.status)
assert new_lamp.status == "on"
但是,如果应用程序使用多个会话(这是可能的),那么这可能不是最佳方法。此外,如果在被测试的函数中没有调用commit或rollback,那么这也将无济于事。
刷新
第二种解决方案是最简单和最有逻辑的。我们不应该创建一个新的请求来获取一个对象。为了更新,处理端点请求后立即调用刷新就足够了。在内部,它调用expires,这导致保存的实例不用于新的请求,并且数据重新填充。这种解决方案是最有逻辑的,也最容易理解。
await test_session.refresh(lamp)
之后,你不需要再试图加载new_lamp对象,只需检查相同的lamp。
以下是使用刷新的代码方案。
使用刷新时的代码中的步骤
以下是带有更新的测试代码。
@pytest.mark.asyncio
async def test_lamp_on(test_session):
""" Test lamp switch on """
lamp = Lamp()
test_session.add(lamp)
await test_session.commit()
await test_session.refresh(lamp)
logging.error("New client status: %s", lamp.status)
assert lamp.status == "off"
async with AsyncClient(app=app, base_url="http://testserver") as async_client:
response = await async_client.post(f"/lamps/{lamp.id}/on")
assert response.status_code == 200
await test_session.refresh(lamp)
logging.error("Updated status: %s", lamp.status)
assert lamp.status == "on"
过期
但是,如果我们更改了很多对象,最好调用expire_all。然后,所有实例都将从数据库中读取,一致性不会被破坏。
test_session.expire_all()
你还可以在特定实例甚至实例属性上调用expire。
test_session.expire(lamp)
在这些调用之后,你将不得不手动从数据库中读取对象。
以下是使用过期时代码中的步骤序列。
使用过期时的代码中的步骤
使用过期时的代码中的步骤 以下是带有过期的测试代码。
@pytest.mark.asyncio
async def test_lamp_on(test_session):
""" Test lamp switch on """
lamp = Lamp()
test_session.add(lamp)
await test_session.commit()
await test_session.refresh(lamp)
logging.error("New client status: %s", lamp.status)
assert lamp.status == "off"
async with AsyncClient(app=app, base_url="http://testserver") as async_client:
response = await async_client.post(f"/lamps/{lamp.id}/on")
assert response.status_code == 200
test_session.expire_all()
# OR:
# test_session.expire(lamp)
results = await test_session.execute(select(Lamp).where(Lamp.id == 1))
new_lamp = results.scalar_one_or_none()
logging.error("Updated status: %s", new_lamp.status)
assert new_lamp.status == "on"
关闭
实际上,使用会话终止的最后一种方法也调用了expire_all,但会话可以进一步使用。当读取新数据时,我们将获得最新的对象。
await test_session.close()
这应该在应用程序请求完成之后并在检查开始之前立即调用。
以下是使用关闭时代码中的步骤。
使用关闭时的代码中的步骤
以下是带有会话关闭的测试代码。
@pytest.mark.asyncio
async def test_lamp_on(test_session):
""" Test lamp switch on """
lamp = Lamp()
test_session.add(lamp)
await test_session.commit()
await test_session.refresh(lamp)
logging.error("New client status: %s", lamp.status)
assert lamp.status == "off"
async with AsyncClient(app=app, base_url="http://testserver") as async_client:
response = await async_client.post(f"/lamps/{lamp.id}/on")
assert response.status_code == 200
await test_session.close()
results = await test_session.execute(select(Lamp).where(Lamp.id == 1))
new_lamp = results.scalar_one_or_none()
logging.error("Updated status: %s", new_lamp.status)
assert new_lamp.status == "on"
调用 rollback() 也会有帮助。它也调用 expire_all,但它明确地回滚了事务。如果需要执行事务,commit() 也会执行 expire_all。但在这个例子中,既不需要回滚也不需要提交,因为测试中的事务已经完成,应用程序中的事务不会影响来自测试的会话。
实际上,此功能仅在SQLAlchemy ORM的异步模式中在事务中工作。然而,在代码中我确实向数据库发出请求以获取新对象的行为看起来似乎不合逻辑,如果它仍然返回一个缓存的对象,而不是从数据库强制接收的对象。当调试代码时,这有点令人困惑。但当正确使用时,这就是它应有的样子。
结论
在异步模式下使用SQLAlchemy ORM,您必须并行跟踪事务和线程中的会话。如果所有这些看起来太复杂,那么使用SQLAlchemy ORM的同步模式。它里面的一切都简单得多。
作者:Aleksei Sharypov
更多内容请关注公号【云原生数据库】
squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。