文章目录
- FastAPI 结合 JWT
- 步骤
- 安装
- 步骤
- 导入必要的模块
- 设置配置和初始化应用
- 创建数据模型
- 实现辅助函数
- 生成 JWT Token
- 获取用户数据
- 验证密码
- 获取当前用户
- 用户登录获取 Token
- 受保护的路由示例
- 所有代码
- 测试
- 获取 Token
- 访问受保护的路由
- token正确
- token错误
- 总结
- 注意
FastAPI 结合 JWT
JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被篡改。JWT 通常用于认证和授权流程。
步骤
在 FastAPI 中,JWT 主要用于保护 API 路由,使其只允许经过身份验证的用户访问。认证流程大致如下:
- **用户登录:**用户通过提交用户名和密码获取 JWT。
- **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
- **访问受保护的路由:**用户在访问受保护的路由时,需要在请求头中携带该 JWT。
- **Token 验证:**服务器验证 JWT 的合法性和有效性,允许或拒绝访问受保护资源。
安装
pip install fastapi uvicorn pyjwt
步骤
导入必要的模块
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import jwt
设置配置和初始化应用
SECRET_KEY = "your_secret_key" # 用于签名 JWT 的密钥
ALGORITHM = "HS256" # 加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 过期时间
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
创建数据模型
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
实现辅助函数
生成 JWT Token
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
获取用户数据
fake_users_db = {
"testuser": {
"username": "testuser",
"full_name": "Test User",
"email": "testuser@example.com",
"hashed_password": "fakehashedpassword",
"disabled": False,
}
}
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
验证密码
def verify_password(plain_password, hashed_password):
return plain_password == hashed_password
获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except jwt.PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
用户登录获取 Token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = get_user(fake_users_db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
受保护的路由示例
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
所有代码
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import uvicorn
import jwt
import os
# JWT 相关配置
SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3" # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中)
ALGORITHM = "HS256" # 使用的加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 的有效时间,以分钟为单位
app = FastAPI() # 创建 FastAPI 应用实例
# OAuth2PasswordBearer 实例,用于依赖项
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息
fake_users_db = {
"testuser": {
"username": "testuser",
"full_name": "Test User",
"email": "testuser@example.com",
"hashed_password": "fakehashedpassword", # 在实际应用中,存储经过哈希处理的密码
"disabled": False, # 用户是否被禁用
}
}
# Pydantic 模型,用于定义请求和响应的数据结构
class Token(BaseModel):
access_token: str # Token 字符串
token_type: str # Token 类型(一般为 "bearer")
class TokenData(BaseModel):
username: str | None = None # 从 Token 中提取的用户名
class User(BaseModel):
username: str # 用户名
email: str | None = None # 邮箱地址,可选
full_name: str | None = None # 用户全名,可选
disabled: bool | None = None # 用户是否被禁用,可选
class UserInDB(User):
hashed_password: str # 存储在数据库中的哈希密码
# 生成 JWT Token 的函数
def create_access_token(data: dict, expires_delta: timedelta | None = None):
"""
生成 JWT Token。
参数:
- data (dict): 要编码到 JWT 中的数据。
- expires_delta (timedelta, 可选): Token 的过期时间。
返回:
- str: 编码后的 JWT 字符串。
"""
to_encode = data.copy() # 创建副本,以避免修改原始数据
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire}) # 添加过期时间到 JWT 数据中
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 生成 JWT
return encoded_jwt
# 从假数据库获取用户信息
def get_user(db, username: str):
"""
从数据库中获取用户信息。
参数:
- db (dict): 用户数据库(在本例中为假数据)。
- username (str): 用户名。
返回:
- UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。
"""
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# 验证用户密码
def verify_password(plain_password, hashed_password):
"""
验证用户密码。
参数:
- plain_password (str): 用户输入的明文密码。
- hashed_password (str): 存储在数据库中的哈希密码。
返回:
- bool: 密码匹配返回 True,否则返回 False。
"""
return plain_password == hashed_password # 在实际应用中,这里应该使用哈希函数进行比较
# 验证 Token 并获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""
从 JWT Token 中提取用户信息,并验证 Token 的合法性。
参数:
- token (str): JWT Token。
返回:
- User: 返回当前用户信息。
抛出:
- HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 解码 JWT
username: str = payload.get("sub") # 获取 JWT 中的用户名
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except jwt.PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username) # 获取用户信息
if user is None:
raise credentials_exception
return user
# 验证用户是否被禁用
async def get_current_active_user(current_user: User = Depends(get_current_user)):
"""
验证当前用户是否被禁用。
参数:
- current_user (User): 当前用户信息。
返回:
- User: 如果用户未被禁用,返回用户信息。
抛出:
- HTTPException: 当用户被禁用时抛出 400 错误。
"""
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# 用户登录获取 Token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
用户登录接口,用于获取 JWT Token。
参数:
- form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。
返回:
- dict: 包含 access_token 和 token_type 的响应数据。
抛出:
- HTTPException: 当用户名或密码错误时抛出 401 错误。
"""
user = get_user(fake_users_db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 设置 Token 过期时间
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 受保护的路由示例
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""
获取当前用户信息的受保护路由。
参数:
- current_user (User): 当前登录的用户信息(通过 JWT 验证)。
返回:
- User: 返回当前用户的信息。
"""
return current_user
# 运行应用
if __name__ == "__main__":
uvicorn.run(
f"{os.path.basename(__file__).split('.')[0]}:app",
host="127.0.0.1",
port=8000,
reload=True, # 启用自动重载
)
测试
获取 Token
访问受保护的路由
token正确
token错误
总结
JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。
在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的合法性后,允许用户访问受保护的资源。
实现 JWT 认证的步骤包括安装必要的依赖项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。
- JWT 配置:包括密钥(
SECRET_KEY
)和加密算法(ALGORITHM
),用于生成和验证令牌。- 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处理认证逻辑。
- API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只允许携带有效 JWT 的请求访问。
使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有经过身份验证的用户才能访问受保护的 API 路由。
注意
示例中
密码并没有经过hash加密
,实际应用中要加密的
。