FastAPI 与 SQLModel 分页功能实现指南
1. 基础分页模型
from typing import Generic, TypeVar, Optional, List
from pydantic import BaseModel
from sqlmodel import SQLModel
T = TypeVar( "T" )
class PageParams ( BaseModel) :
page: int = 1
size: int = 10
class PageResponse ( SQLModel, Generic[ T] ) :
items: List[ T]
total: int
page: int
size: int
pages: int
has_next: bool
has_prev: bool
@property
def next_page ( self) - > Optional[ int ] :
if self. has_next:
return self. page + 1
return None
@property
def prev_page ( self) - > Optional[ int ] :
if self. has_prev:
return self. page - 1
return None
2. 通用分页函数
2.1 基础分页查询
from sqlmodel import select, Session, func
def paginate_query (
session: Session,
query,
page: int = 1 ,
size: int = 10
) - > PageResponse:
total = session. exec ( select( func. count( ) ) . select_from( query. subquery( ) ) ) . one( )
pages = ( total + size - 1 ) // size
items = session. exec (
query. offset( ( page - 1 ) * size) . limit( size)
) . all ( )
return PageResponse(
items= items,
total= total,
page= page,
size= size,
pages= pages,
has_next= page < pages,
has_prev= page > 1
)
2.2 带过滤和排序的分页查询
from typing import Optional, List
from sqlalchemy import desc, asc
def get_users_paginated (
session: Session,
page: int = 1 ,
size: int = 10 ,
search: Optional[ str ] = None ,
sort_by: Optional[ str ] = None ,
sort_order: str = "asc"
) - > PageResponse[ User] :
query = select( User)
if search:
query = query. where(
User. username. ilike( f"% { search} %" ) |
User. email. ilike( f"% { search} %" )
)
if sort_by:
direction = asc if sort_order == "asc" else desc
query = query. order_by( direction( getattr ( User, sort_by) ) )
return paginate_query( session, query, page, size)
3. FastAPI 路由实现
3.1 基础分页路由
from fastapi import APIRouter, Depends, Query
from sqlmodel import Session
router = APIRouter( )
@router. get ( "/users" , response_model= PageResponse[ User] )
def get_users (
page: int = Query( 1 , ge= 1 ) ,
size: int = Query( 10 , ge= 1 , le= 100 ) ,
session: Session = Depends( get_session)
) :
query = select( User)
return paginate_query( session, query, page, size)
3.2 高级分页路由
@router. get ( "/users/advanced" , response_model= PageResponse[ User] )
def get_users_advanced (
page: int = Query( 1 , ge= 1 ) ,
size: int = Query( 10 , ge= 1 , le= 100 ) ,
search: Optional[ str ] = Query( None ) ,
sort_by: Optional[ str ] = Query( None ) ,
sort_order: str = Query( "asc" , regex= "^(asc|desc)$" ) ,
session: Session = Depends( get_session)
) :
return get_users_paginated(
session= session,
page= page,
size= size,
search= search,
sort_by= sort_by,
sort_order= sort_order
)
4. 前端分页实现
4.1 使用 Axios 调用分页 API
interface PaginationParams {
page: number ;
size: number ;
search? : string ;
sortBy? : string ;
sortOrder? : 'asc' | 'desc' ;
}
async function fetchUsers ( params: PaginationParams) {
const { page, size, search, sortBy, sortOrder } = params;
const response = await axios. get ( '/api/users/advanced' , {
params: {
page,
size,
search,
sort_by: sortBy,
sort_order: sortOrder
}
} ) ;
return response. data;
}
4.2 Vue 3 分页组件
<template>
<div class="pagination">
<!-- 数据展示 -->
<div class="items">
<div v-for="item in pageData.items" :key="item.id">
{{ item.username }}
</div>
</div>
<!-- 分页控制 -->
<div class="controls">
<button
:disabled="!pageData.has_prev"
@click="changePage(pageData.prev_page)"
>
上一页
</button>
<span>{{ pageData.page }} / {{ pageData.pages }}</span>
<button
:disabled="!pageData.has_next"
@click="changePage(pageData.next_page)"
>
下一页
</button>
</div>
</div>
</template>
<script setup lang="ts">
import { ref, onMounted } from 'vue';
const pageData = ref({
items: [],
total: 0,
page: 1,
size: 10,
pages: 0,
has_next: false,
has_prev: false
});
const loading = ref(false);
async function loadData(page = 1) {
loading.value = true;
try {
const data = await fetchUsers({
page,
size: 10
});
pageData.value = data;
} catch (error) {
console.error('Failed to load data:', error);
} finally {
loading.value = false;
}
}
function changePage(newPage: number) {
if (newPage) {
loadData(newPage);
}
}
onMounted(() => {
loadData();
});
</script>
<style scoped>
.pagination {
display: flex;
flex-direction: column;
gap: 1rem;
}
.controls {
display: flex;
justify-content: center;
gap: 1rem;
align-items: center;
}
button:disabled {
opacity: 0.5;
cursor: not-allowed;
}
</style>
5. 性能优化建议
查询优化
为分页字段添加索引 使用 COUNT(*) OVER()
优化计数查询 对大数据集使用游标分页
def optimize_count_query ( session: Session, query) :
count_query = query. add_columns( func. count( ) . over( ) . label( "total" ) )
first_result = session. exec ( count_query) . first( )
return first_result. total if first_result else 0
缓存策略
from functools import lru_cache
@lru_cache ( maxsize= 100 )
def get_cached_page ( page: int , size: int , cache_key: str ) :
return get_users_paginated( page= page, size= size)
6. 常见问题解决
6.1 处理空结果
def handle_empty_page (
session: Session,
query,
page: int ,
size: int
) - > PageResponse:
total = session. exec ( select( func. count( ) ) . select_from( query. subquery( ) ) ) . one( )
if total == 0 :
return PageResponse(
items= [ ] ,
total= 0 ,
page= page,
size= size,
pages= 0 ,
has_next= False ,
has_prev= False
)
pages = ( total + size - 1 ) // size
if page > pages:
return paginate_query( session, query, pages, size)
return paginate_query( session, query, page, size)
6.2 处理无效参数
def validate_page_params ( page: int , size: int ) - > tuple [ int , int ] :
"""验证并修正分页参数"""
page = max ( 1 , page)
size = max ( 1 , min ( size, 100 ) )
return page, size
7. 测试分页功能
def test_pagination ( ) :
users = [ User( username= f"user { i} " , email= f"user { i} @example.com" )
for i in range ( 25 ) ]
with Session( engine) as session:
session. add_all( users)
session. commit( )
page1 = paginate_query( session, select( User) , page= 1 , size= 10 )
assert len ( page1. items) == 10
assert page1. total == 25
assert page1. has_next is True
page3 = paginate_query( session, select( User) , page= 3 , size= 10 )
assert len ( page3. items) == 5
assert page3. has_next is False