在之前的分享的中,产生了token,用于做登录的认证,那么产生了token后,登陆携带了token,我们如何校验呢?
先来梳理下逻辑:
1. 调用登录接口,产生token
2.调用依赖登录的接口,在请求头中携带token
3.依赖登录的接口,接收到请求,判断是否在headers中携带token
4.携带token,校验是否过期
5.解析token,获取username
6.根据username从redis获取
7.查询到username的token且token相等,我们认为用户登录
8.调用接口返回数据
9.如果其中一项校验不通过,返回对应的失败信息
思路梳理完毕,下面我们把逻辑实现
先从headers中获取token并且验证是否在redis中存在这个token
async def get_current_user(request: Request, token: Optional[str] = Header(...)) -> BaseUser:
"""token依赖请求头的token校验"""
# 验证失败返回信息
credentials_exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication Failed",
headers={"WWW-Authenticate": "Bearer"}
)
# 未登录返回信息
not_login_exc = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="NOT LOGIN OR TOKEN EXPIRED"
)
try:
# 解析token
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exc
# 从redis获取用户token
redis_token = await request.app.state.redis.get(username)
if not redis_token or redis_token != token:
raise not_login_exc
return BaseUser(email=username)
except JWTError:
raise credentials_exc
BaseUser类如下,放在了schemas中
class BaseUser(BaseModel):
email: str
接下来,我们修改下之前获取用户信息接口,需要依赖登录,返回当前用户信息
@user_router.get("/user", response_model=UserOut)
def get_user(user: BaseUser = Depends(get_current_user),
db: Session = Depends(create_db)):
return get_user_by_email(db, user.email)
测试,看下效果
①:不携带token
②:携带token(如果没有token值,先调用登录接口)
③:使用错误token
我们发现,当携带token后返回了当前的用户信息。这样我们就简单的把用户登录认证实现了