在之前的文章中,FastAPI 学习之路(二十九)使用(哈希)密码和 JWT Bearer 令牌的 OAuth2,FastAPI 学习之路(二十八)使用密码和 Bearer 的简单 OAuth2,FastAPI 学习之路(三十四)数据库多表操作,我们分享了基于jwt认证token和基于数据库创建用户,那么我们今天把这些代码整理下,形成基于数据库用户名密码,登陆验证token存储到redis中。
首先我们看下之前基于jwt认证token的代码:
# 见chapter18
"""
-*- encoding=utf-8 -*-
Time: 2024/6/28 16:16
Author: lc
Email: 15101006331@163.com
File: chapter18.py
"""
"""
使用(哈希)密码和 JWT Bearer 令牌的 OAuth2
"""
from fastapi import FastAPI, Depends, status, HTTPException
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from pydantic import BaseModel
from typing import Optional
from jose import JWTError, jwt
from datetime import datetime, timedelta
from passlib.context import CryptContext
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_db_users = {
"mrli": {
"username": "mrli",
"full_name": "mrli_hanjing",
"email": "mrli@qq.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False
}
}
app = FastAPI()
def fake_hash_password(password: str):
"""模拟加密密码"""
return password
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
full_name: Optional[str] = None
email: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str):
"""校验密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str):
"""密码加密"""
return pwd_context.hash(password)
def get_user(db_users: dict, username: str):
if username in db_users:
user_info = db_users[username]
return UserInDB(**user_info)
def authenticate_user(db_users: dict, username: str, password: str):
"""校验用户权限"""
user = get_user(db_users, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires: Optional[timedelta] = None):
"""创建jwt"""
to_encode = data.copy()
if expires:
expire_time = datetime.utcnow() + expires
else:
expire_time = datetime.utcnow() + timedelta(minutes=30)
to_encode.update({"exp": expire_time})
encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encode_jwt
def fake_decode_token(token):
"""模拟解码token"""
user = get_user(fake_db_users, token)
return user
def get_current_user(token: str = Depends(oauth2_scheme)):
exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication Failed",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise exc
token_data = TokenData(username=username)
except JWTError:
raise exc
user = get_user(fake_db_users, username=token_data.username)
if not user:
raise exc
return user
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token) # 必须实现路径为/token的接口来返回access_token,在文档页面点击Authorize时就是调用的这个接口
def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_db_users, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}
)
access_token_expire = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": form_data.username}, expires=access_token_expire
)
return {
"access_token": access_token,
"token_type": "bearer"
}
@app.get("/me")
def get_me(current_user: User = Depends(get_current_active_user)):
return current_user
我们需要把这部分代码进行整理,我们吊证到routers中的users.py。实际上就是把之前的方法柔和到新的方法中,需要调整下之前的创建用户,把登录实现了。
我们看下修改后的代码:
from fastapi import APIRouter, status
from fastapi import Depends, HTTPException
from starlette.requests import Request
from models.crud import *
from models.schemas import UserToken
from datetime import timedelta, datetime
from jose import JWTError, jwt
from typing import Optional
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
user_router = APIRouter()
from . import create_db
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str):
"""校验密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str):
"""密码加密"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires: Optional[timedelta] = None):
"""创建jwt"""
to_encode = data.copy()
if expires:
expire_time = datetime.utcnow() + expires
else:
expire_time = datetime.utcnow() + timedelta(minutes=30)
to_encode.update({"exp": expire_time})
encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encode_jwt
def get_current_user(token):
exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication Failed",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise exc
return username
except JWTError:
raise exc
@user_router.post("/user", response_model=UserOut)
def create_user(user: UserModel, db: Session = Depends(create_db)):
return create_user_method(db, user)
@user_router.get("/user", response_model=UserOut)
def get_user(uid: int, db: Session = Depends(create_db)):
return get_user_method(db, uid)
@user_router.post("/login")
async def login(request: Request, user: UserModel, db: Session = Depends(create_db)):
db_user = get_user_by_email(db, user.email)
pass
现在登录还未完全实现,接下来我们实现这个api。
这里的UserOut在schemas中实现。
class UserToken(BaseUser): token: str
登录的具体实现:
@user_router.post("/login")
async def login(request: Request, user: UserModel, db: Session = Depends(create_db)):
db_user = get_user_by_email(db, user.email)
# 密码校验
verify = verify_password(user.password, db_user.hashed_password)
if verify:
# 产生token
token = create_access_token(data={"sub": user.email})
is_cached = await request.app.state.redis.get(user.email)
if is_cached:
raise HTTPException(status_code=200, detail="请勿重复登录")
await request.app.state.redis.set(user.email, token, expire=ACCESS_TOKEN_EXPIRE_MINUTES*60)
user_token = UserToken(token=token, email=user.email)
return user_token
else:
raise HTTPException(status_code=200, detail="用户名或密码错误")
redis的相关操作还是使用上次分享时的startup和shutdown方法
我们启动测试,看是否正确,由于我们更改了创建用户时密码的hash方式,所以我们先创建用户
接下来,我们调用登录api
这样token就产生了,redis中也有对应的缓存信息
通过本节,我们将登录的用户存储到了数据库中,并将登录后的用户token缓存到了redis,接下来我们将分享如何校验token