SQLModel 系统性指南
目录
- 简介
- 什么是 SQLModel?
- 为什么使用 SQLModel?
- 安装
- 快速入门
- 定义模型
- 创建数据库和表
- 基本 CRUD 操作
- 创建(Create)
- 读取(Read)
- 更新(Update)
- 删除(Delete)
- 处理关系
- 一对多关系
- 多对多关系
- 高级功能
- 异步支持
- 自定义查询
- 迁移(Migrations)
- 与 FastAPI 的集成
- 依赖注入
- 路由保护
- 性能优化与最佳实践
- 常见问题解答
- 参考资料
1. 简介
什么是 SQLModel?
SQLModel 是一个现代化的 Python 库,旨在简化与数据库的交互。它结合了 Pydantic 和 SQLAlchemy 的优势,使得定义数据模型、进行数据验证和与数据库交互变得更加直观和高效。SQLModel 由 Sebastián Ramírez(FastAPI 的创始人)开发,专为与 FastAPI 框架无缝集成而设计。
为什么使用 SQLModel?
- 简洁性:通过结合 Pydantic 的数据验证和 SQLAlchemy 的 ORM 功能,SQLModel 使模型定义和数据库操作更加简洁。
- 类型安全:充分利用 Python 的类型提示,增强代码的可读性和可靠性。
- 与 FastAPI 无缝集成:优化了与 FastAPI 的集成,支持自动文档生成和依赖注入。
- 灵活性:支持同步和异步操作,适应不同的性能需求。
- 现代化设计:采用现代化的 Python 编码风格和最佳实践,提升开发体验。
2. 安装
首先,确保您已经安装了 Python 3.7 或更高版本。然后,使用 pip
安装 sqlmodel
包:
pip install sqlmodel
此外,根据您使用的数据库,还需要安装相应的数据库驱动。例如:
-
SQLite:无需额外安装驱动,Python 内置支持。
-
PostgreSQL:
pip install asyncpg
-
MySQL:
pip install pymysql
3. 快速入门
定义模型
使用 SQLModel
定义数据模型时,通常会继承自 SQLModel
并使用 table=True
参数指示这是一个数据库表。
from typing import Optional
from sqlmodel import SQLModel, Field
from datetime import datetime
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False, unique=True)
email: str = Field(index=True, nullable=False, unique=True)
hashed_password: str = Field(nullable=False)
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
创建数据库和表
使用 SQLAlchemy 的引擎和 SQLModel
的元数据来创建数据库和表。
from sqlmodel import SQLModel, create_engine
from models import User # 假设上面的模型保存在 models.py 文件中
DATABASE_URL = "sqlite:///./test.db" # 或者使用其他数据库,如 PostgreSQL
engine = create_engine(DATABASE_URL, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
在应用启动时调用 create_db_and_tables
来创建数据库表。
4. 基本 CRUD 操作
创建(Create)
向数据库中插入一条新记录。
from sqlmodel import Session, select
from models import User
from database import engine, create_db_and_tables
def create_user(username: str, email: str, hashed_password: str) -> User:
user = User(username=username, email=email, hashed_password=hashed_password)
with Session(engine) as session:
session.add(user)
session.commit()
session.refresh(user)
return user
读取(Read)
从数据库中查询记录。
def get_user_by_id(user_id: int) -> Optional[User]:
with Session(engine) as session:
user = session.get(User, user_id)
return user
def get_user_by_username(username: str) -> Optional[User]:
with Session(engine) as session:
statement = select(User).where(User.username == username)
user = session.exec(statement).first()
return user
更新(Update)
更新数据库中的记录。
def update_user_email(user_id: int, new_email: str) -> Optional[User]:
with Session(engine) as session:
user = session.get(User, user_id)
if user:
user.email = new_email
session.add(user)
session.commit()
session.refresh(user)
return user
return None
删除(Delete)
从数据库中删除记录。
def delete_user(user_id: int) -> bool:
with Session(engine) as session:
user = session.get(User, user_id)
if user:
session.delete(user)
session.commit()
return True
return False
5. 处理关系
一对多关系
例如,一个用户可以有多条地址记录。
from typing import List, Optional
from sqlmodel import SQLModel, Field, Relationship
class Address(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
street: str
city: str
user_id: int = Field(foreign_key="user.id")
user: Optional["User"] = Relationship(back_populates="addresses")
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False, unique=True)
email: str = Field(index=True, nullable=False, unique=True)
hashed_password: str = Field(nullable=False)
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
addresses: List[Address] = Relationship(back_populates="user")
多对多关系
例如,用户和角色之间的多对多关系。
from typing import List, Optional
from sqlmodel import SQLModel, Field, Relationship
class UserRoleLink(SQLModel, table=True):
user_id: int = Field(foreign_key="user.id", primary_key=True)
role_id: int = Field(foreign_key="role.id", primary_key=True)
class Role(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
users: List["User"] = Relationship(
back_populates="roles",
link_model=UserRoleLink
)
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False, unique=True)
email: str = Field(index=True, nullable=False, unique=True)
hashed_password: str = Field(nullable=False)
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
roles: List[Role] = Relationship(
back_populates="users",
link_model=UserRoleLink
)
6. 高级功能
异步支持
SQLModel
支持异步数据库操作,适用于需要高并发和高性能的应用。
首先,安装异步驱动(如 asyncpg
用于 PostgreSQL):
pip install asyncpg
然后,配置异步引擎和会话:
from sqlmodel import SQLModel, create_engine, select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from models import User
from datetime import datetime
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
async_engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(
async_engine, class_=AsyncSession, expire_on_commit=False
)
async def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
# 在应用启动时调用 init_db
import asyncio
asyncio.run(init_db())
# 异步获取会话
async def get_async_session():
async with async_session() as session:
yield session
# 异步 CRUD 操作示例
async def get_user_async(user_id: int) -> Optional[User]:
async with async_session() as session:
user = await session.get(User, user_id)
return user
自定义查询
使用 SQLAlchemy 的强大查询功能,执行复杂的数据库操作。
from sqlmodel import Session, select, func
from models import User
def count_users() -> int:
with Session(engine) as session:
statement = select(func.count(User.id))
count = session.exec(statement).one()
return count
def get_users_with_email_domain(domain: str) -> List[User]:
with Session(engine) as session:
statement = select(User).where(User.email.like(f"%@{domain}"))
users = session.exec(statement).all()
return users
迁移(Migrations)
虽然 SQLModel
本身不提供迁移工具,但它与 Alembic 完全兼容,可以使用 Alembic 进行数据库迁移。
安装 Alembic:
pip install alembic
初始化 Alembic:
alembic init alembic
配置 Alembic:
编辑 alembic.ini
,设置 sqlalchemy.url
为您的数据库 URL。
在 alembic/env.py
中,导入您的模型:
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlmodel import SQLModel
import sys
import os
# 将项目路径添加到 sys.path
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from models import User # 导入您的模型
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
target_metadata = SQLModel.metadata
def run_migrations_offline():
...
# 保持默认配置
def run_migrations_online():
...
# 保持默认配置
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
创建迁移脚本:
alembic revision --autogenerate -m "Initial migration"
应用迁移:
alembic upgrade head
7. 与 FastAPI 的集成
依赖注入
利用 FastAPI 的依赖注入机制,将数据库会话注入到路由中。
from fastapi import FastAPI, Depends, HTTPException
from sqlmodel import Session, select
from models import User
from database import engine, get_session
app = FastAPI()
@app.post("/users/", response_model=User)
def create_user(user: User, session: Session = Depends(get_session)):
db_user = session.exec(select(User).where(User.username == user.username)).first()
if db_user:
raise HTTPException(status_code=400, detail="Username already exists")
session.add(user)
session.commit()
session.refresh(user)
return user
@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int, session: Session = Depends(get_session)):
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
路由保护
结合 JWT 进行身份验证,保护特定路由。
安装 fastapi-jwt-auth
:
pip install fastapi-jwt-auth
配置 JWT:
from fastapi import FastAPI, Depends, HTTPException
from fastapi_jwt_auth import AuthJWT
from pydantic import BaseModel
from sqlmodel import Session, select
from models import User
from database import engine, get_session
class Settings(BaseModel):
authjwt_secret_key: str = "your-secret-key"
app = FastAPI()
@AuthJWT.load_config
def get_config():
return Settings()
@app.post('/login')
def login(user: User, Authorize: AuthJWT = Depends()):
# 验证用户凭证(此处省略具体验证逻辑)
access_token = Authorize.create_access_token(subject=user.username)
return {"access_token": access_token}
@app.get('/protected')
def protected(Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
current_user = Authorize.get_jwt_subject()
return {"message": f"Hello, {current_user}"}
8. 性能优化与最佳实践
8.1 使用连接池
优化数据库连接,使用连接池以提高性能和资源利用率。
from sqlmodel import create_engine
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, echo=True, pool_size=20, max_overflow=0)
8.2 异步操作
对于高并发应用,使用异步数据库操作。
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
async_engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(
async_engine, class_=AsyncSession, expire_on_commit=False
)
8.3 缓存
使用缓存机制(如 Redis)减少数据库查询,提高响应速度。
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_user_cached(user_id: int) -> Optional[User]:
cached_user = redis_client.get(f"user:{user_id}")
if cached_user:
return User.parse_raw(cached_user)
with Session(engine) as session:
user = session.get(User, user_id)
if user:
redis_client.set(f"user:{user_id}", user.json(), ex=3600)
return user
8.4 索引优化
为常用查询字段添加索引,提高查询性能。
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False, unique=True)
email: str = Field(index=True, nullable=False, unique=True)
# 其他字段...
8.5 分页
对于大量数据查询,使用分页机制减少单次查询的数据量。
def get_users_paginated(skip: int = 0, limit: int = 10) -> List[User]:
with Session(engine) as session:
statement = select(User).offset(skip).limit(limit)
users = session.exec(statement).all()
return users
9. 常见问题解答
9.1 如何在 SQLModel 中使用外键?
在定义模型时,使用 Field
的 foreign_key
参数指定外键。
class Address(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
street: str
city: str
user_id: int = Field(foreign_key="user.id")
user: Optional["User"] = Relationship(back_populates="addresses")
9.2 SQLModel 支持哪些数据库?
SQLModel
基于 SQLAlchemy,支持所有 SQLAlchemy 支持的数据库,包括:
- SQLite
- PostgreSQL
- MySQL
- SQL Server
- Oracle
- 以及其他数据库,通过相应的数据库驱动支持。
9.3 如何进行数据库迁移?
SQLModel
本身不提供迁移工具,但可以与 Alembic 配合使用进行数据库迁移。
安装 Alembic:
pip install alembic
初始化 Alembic:
alembic init alembic
配置 Alembic:
编辑 alembic.ini
,设置 sqlalchemy.url
为您的数据库 URL。
在 alembic/env.py
中,导入您的模型:
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlmodel import SQLModel
import sys
import os
# 将项目路径添加到 sys.path
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from models import User # 导入您的模型
config = context.config
fileConfig(config.config_file_name)
target_metadata = SQLModel.metadata
def run_migrations_offline():
...
def run_migrations_online():
...
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
创建迁移脚本:
alembic revision --autogenerate -m "Initial migration"
应用迁移:
alembic upgrade head
9.4 如何处理模型验证错误?
SQLModel
结合了 Pydantic 的数据验证功能,可以在模型定义中使用 Pydantic 的字段验证器。
from sqlmodel import SQLModel, Field
from pydantic import validator, EmailStr
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False, unique=True)
email: EmailStr = Field(index=True, nullable=False, unique=True)
hashed_password: str = Field(nullable=False)
@validator('username')
def username_must_not_be_empty(cls, v):
if not v or not v.strip():
raise ValueError('Username must not be empty')
return v
10. 参考资料
- SQLModel 官方文档:https://sqlmodel.tiangolo.com/
- SQLAlchemy 官方文档:https://www.sqlalchemy.org/
- FastAPI 官方文档:https://fastapi.tiangolo.com/
- Alembic 官方文档:https://alembic.sqlalchemy.org/en/latest/
- Real Python 的 SQLModel 教程:https://realpython.com/sqlmodel-python-orm/
- Pydantic 官方文档:https://pydantic-docs.helpmanual.io/
- GitHub 上的 SQLModel 仓库:https://github.com/tiangolo/sqlmodel
附录:完整示例
以下是一个完整的 FastAPI 应用示例,展示了如何使用 SQLModel 进行数据库操作和 API 构建。
目录结构
my_fastapi_app/
├── main.py
├── models.py
├── database.py
├── schemas.py
└── alembic/
├── env.py
├── script.py.mako
└── versions/
models.py
from typing import List, Optional
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
class Address(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
street: str
city: str
user_id: int = Field(foreign_key="user.id")
user: Optional["User"] = Relationship(back_populates="addresses")
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False, unique=True)
email: str = Field(index=True, nullable=False, unique=True)
hashed_password: str = Field(nullable=False)
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
addresses: List[Address] = Relationship(back_populates="user")
schemas.py
from typing import List, Optional
from pydantic import BaseModel, EmailStr
from datetime import datetime
class AddressCreate(BaseModel):
street: str
city: str
class AddressRead(BaseModel):
id: int
street: str
city: str
class Config:
orm_mode = True
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
class UserRead(BaseModel):
id: int
username: str
email: EmailStr
is_active: bool
created_at: datetime
addresses: List[AddressRead] = []
class Config:
orm_mode = True
database.py
from sqlmodel import SQLModel, create_engine, Session
from models import User, Address
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlmodel import Session, select
from models import User, Address
from schemas import UserCreate, UserRead, AddressCreate, AddressRead
from database import create_db_and_tables, get_session
from typing import List
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/users/", response_model=UserRead)
def create_user(user: UserCreate, session: Session = Depends(get_session)):
db_user = session.exec(select(User).where(User.username == user.username)).first()
if db_user:
raise HTTPException(status_code=400, detail="Username already exists")
new_user = User(
username=user.username,
email=user.email,
hashed_password=user.password # 实际项目中应进行哈希处理
)
session.add(new_user)
session.commit()
session.refresh(new_user)
return new_user
@app.get("/users/{user_id}", response_model=UserRead)
def read_user(user_id: int, session: Session = Depends(get_session)):
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users/{user_id}/addresses/", response_model=AddressRead)
def create_address(user_id: int, address: AddressCreate, session: Session = Depends(get_session)):
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
new_address = Address(**address.dict(), user_id=user_id)
session.add(new_address)
session.commit()
session.refresh(new_address)
return new_address
@app.get("/users/{user_id}/addresses/", response_model=List[AddressRead])
def read_addresses(user_id: int, session: Session = Depends(get_session)):
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user.addresses
运行应用
使用 uvicorn
运行 FastAPI 应用:
uvicorn main:app --reload
访问 http://127.0.0.1:8000/docs 查看自动生成的 API 文档,并进行测试。