FastAPI 学习之路(五十六)将token缓存到redis

news2024/9/21 10:49:18

在之前的文章中,FastAPI 学习之路(二十九)使用(哈希)密码和 JWT Bearer 令牌的 OAuth2,FastAPI 学习之路(二十八)使用密码和 Bearer 的简单 OAuth2,FastAPI 学习之路(三十四)数据库多表操作,我们分享了基于jwt认证token和基于数据库创建用户,那么我们今天把这些代码整理下,形成基于数据库用户名密码,登陆验证token存储到redis中。

首先我们看下之前基于jwt认证token的代码:

# 见chapter18

"""
-*- encoding=utf-8 -*-
Time: 2024/6/28 16:16
Author: lc
Email: 15101006331@163.com
File: chapter18.py
"""


"""
使用(哈希)密码和 JWT Bearer 令牌的 OAuth2
"""
from fastapi import FastAPI, Depends, status, HTTPException
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer

from pydantic import BaseModel
from typing import Optional
from jose import JWTError, jwt
from datetime import datetime, timedelta
from passlib.context import CryptContext

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

fake_db_users = {
    "mrli": {
        "username": "mrli",
        "full_name": "mrli_hanjing",
        "email": "mrli@qq.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False
    }
}

app = FastAPI()


def fake_hash_password(password: str):
    """模拟加密密码"""
    return password


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    full_name: Optional[str] = None
    email: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password: str, hashed_password: str):
    """校验密码"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str):
    """密码加密"""
    return pwd_context.hash(password)


def get_user(db_users: dict, username: str):
    if username in db_users:
        user_info = db_users[username]
        return UserInDB(**user_info)


def authenticate_user(db_users: dict, username: str, password: str):
    """校验用户权限"""
    user = get_user(db_users, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires: Optional[timedelta] = None):
    """创建jwt"""
    to_encode = data.copy()
    if expires:
        expire_time = datetime.utcnow() + expires
    else:
        expire_time = datetime.utcnow() + timedelta(minutes=30)
    to_encode.update({"exp": expire_time})
    encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encode_jwt


def fake_decode_token(token):
    """模拟解码token"""
    user = get_user(fake_db_users, token)
    return user


def get_current_user(token: str = Depends(oauth2_scheme)):
    exc = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Authentication Failed",
        headers={"WWW-Authenticate": "Bearer"}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
        if username is None:
            raise exc
        token_data = TokenData(username=username)
    except JWTError:
        raise exc
    user = get_user(fake_db_users, username=token_data.username)
    if not user:
        raise exc
    return user


def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)  # 必须实现路径为/token的接口来返回access_token,在文档页面点击Authorize时就是调用的这个接口
def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_db_users, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"}
        )
    access_token_expire = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": form_data.username}, expires=access_token_expire
    )
    return {
        "access_token": access_token,
        "token_type": "bearer"
    }


@app.get("/me")
def get_me(current_user: User = Depends(get_current_active_user)):
    return current_user

我们需要把这部分代码进行整理,我们吊证到routers中的users.py。实际上就是把之前的方法柔和到新的方法中,需要调整下之前的创建用户,把登录实现了。

我们看下修改后的代码:

from fastapi import APIRouter, status
from fastapi import Depends, HTTPException
from starlette.requests import Request

from models.crud import *
from models.schemas import UserToken
from datetime import timedelta, datetime
from jose import JWTError, jwt
from typing import Optional


SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

user_router = APIRouter()
from . import create_db
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password: str, hashed_password: str):
    """校验密码"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str):
    """密码加密"""
    return pwd_context.hash(password)


def create_access_token(data: dict, expires: Optional[timedelta] = None):
    """创建jwt"""
    to_encode = data.copy()
    if expires:
        expire_time = datetime.utcnow() + expires
    else:
        expire_time = datetime.utcnow() + timedelta(minutes=30)
    to_encode.update({"exp": expire_time})
    encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encode_jwt


def get_current_user(token):
    exc = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Authentication Failed",
        headers={"WWW-Authenticate": "Bearer"}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
        if username is None:
            raise exc
        return username
    except JWTError:
        raise exc


@user_router.post("/user", response_model=UserOut)
def create_user(user: UserModel, db: Session = Depends(create_db)):
    return create_user_method(db, user)


@user_router.get("/user", response_model=UserOut)
def get_user(uid: int, db: Session = Depends(create_db)):
    return get_user_method(db, uid)


@user_router.post("/login")
async def login(request: Request, user: UserModel, db: Session = Depends(create_db)):
    db_user = get_user_by_email(db, user.email)
    pass

现在登录还未完全实现,接下来我们实现这个api。

这里的UserOut在schemas中实现。

class UserToken(BaseUser):
    token: str

登录的具体实现:

@user_router.post("/login")
async def login(request: Request, user: UserModel, db: Session = Depends(create_db)):
    db_user = get_user_by_email(db, user.email)
    # 密码校验
    verify = verify_password(user.password, db_user.hashed_password)
    if verify:
        # 产生token
        token = create_access_token(data={"sub": user.email})
        is_cached = await request.app.state.redis.get(user.email)
        if is_cached:
            raise HTTPException(status_code=200, detail="请勿重复登录")
        await request.app.state.redis.set(user.email, token, expire=ACCESS_TOKEN_EXPIRE_MINUTES*60)
        user_token = UserToken(token=token, email=user.email)
        return user_token
    else:
        raise HTTPException(status_code=200, detail="用户名或密码错误")

redis的相关操作还是使用上次分享时的startup和shutdown方法

我们启动测试,看是否正确,由于我们更改了创建用户时密码的hash方式,所以我们先创建用户

 接下来,我们调用登录api

这样token就产生了,redis中也有对应的缓存信息

通过本节,我们将登录的用户存储到了数据库中,并将登录后的用户token缓存到了redis,接下来我们将分享如何校验token 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1933271.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【笔记】一起齿轮箱的故障和相应的数学模拟实验

1.齿轮箱故障一例 出处:设备的故障识别 GearBox的频谱图,原作者不知道是从哪里拷贝来的,待会儿确认一下。 齿轮啮合频率GMF等于齿数乘以齿轮转速频率: ★齿轮啮合频率两边有边频,间距为1X(这是由冲击响应…

游泳溺水智能监测报警摄像机

当今社会,游泳已经成为人们重要的休闲活动之一。然而,溺水事故时有发生,尤其是在公共泳池或开放水域。为了提高游泳安全,智能监测技术的应用变得尤为重要。本文将探讨一种创新的游泳溺水智能监测报警摄像机系统,旨在有…

git使用以及理解

git练习网站 Learn Git Branching git操作大全Oh Shit, Git!?! git commit git branch name git merge bugFix 合并俩个分支 git rebase main git checkout headgit switch head 会导致HEAD分离 ,就是指head->HEAD->c1 相对引用 ------------------- …

PDF文件无法编辑?3步快速移除PDF编辑限制

正常来说,我们通过编辑器打开pdf文件后,就可以进行编辑了。如果遇到了打开pdf却不能编辑的情况,那有可能是因为密码或是扫描件的原因。小编整理了一些pdf文件无法编辑,以及pdf文件无法编辑时我们要如何处理的方法。下面就随小编一起来…

WEB前端06-BOM对象

BOM浏览器对象模型 浏览器对象模型:将浏览器的各个组成部分封装成对象。是用于描述浏览器中对象与对象之间层次关系的模型,提供了独立于页面内容、并能够与浏览器窗口进行交互的对象结构。 组成部分 Window:浏览器窗口对象 Navigator&…

Human Serum Amyloid A1 ELISA试剂盒

走近指标:Serum Amyloid A1(SAA1) (Human SAA1 结构图,参考网址https://www.rcsb.org/structure/4IP9) 血清淀粉样蛋白 A1(SAA1)是一种由SAA1基因编码的蛋白质&…

用Docker来开发

未完成。。。 现在好像用Docker是越来越多了。之前其实也看过docker的原理,大概就是cgroup那些,不过现在就不看原理了,不谈理论,只看实际中怎么用,解决眼前问题。 用docker来做开发,其实就是解决的编译环境…

Word文档恢复竟然这么简单?3个推荐方案送上!

“我很喜欢用Word进行文字创作,可是我有一次重新打开我的Word文档,却显示文档已丢失,这该怎么办呢?凝聚我多年心血的文章还有可能恢复吗?” 不论是总结学习内容还是汇报工作成果,我们总会用上Word。Word作…

[C++基础]构造函数和析构函数

💖💖💖欢迎来到我的博客,我是anmory💖💖💖 又和大家见面了 欢迎来到C探索系列 作为一个程序员你不能不掌握的知识 先来自我推荐一波 个人网站欢迎访问以及捐款 推荐阅读 如何低成本搭建个人网站…

继承和多态常见的面试问题

文章目录 概念问答 概念 下面哪种面向对象的方法可以让你变得富有( A) A: 继承 B: 封装 C: 多态 D: 抽象 (D )是面向对象程序设计语言中的一种机制。这种机制实现了方法的定义与具体的对象无关, 而对方法的调用则可以关联于具体的对象。 A: 继承 B: 模板 C: 对象的…

关于hash的面试题

目录 题目1.java里,HashMap的底层实现原理2.如何判断一个HashMap是否已经满了?3.HashSet如何检查重复4.HashSet如何判断一个元素是否已经存在.简单的理解hash 题目 选自牛客网 1.java里,HashMap的底层实现原理 数组结构:HashMap 使用一个数…

vue2导入elementui组件库

第一步安装 npm i element-ui -S 第二步在main.js中导入 第三步使用然后在运行项目

【合集】临时邮箱网站 临时邮箱API(持续更新)

众所周知,在注册一些账户时,比较常见的验证方式就是邮箱,但是在进行一些小众和不知名网站注册时,邮箱的泄露可能预示着不休止的邮件推送。尤其是当我们只是想临时使用邮箱这种情况,第二种,批量注册账号的情…

vue3前端开发-执行npm run dev提示报错怎么解决

vue3前端开发-执行npm run dev提示报错怎么解决!今天在本地安装初始化了一个vue3的案例demo。但是当我执行npm run dev想启动它时报错了说,找不到dev。让我检查package.json文件是否包含dev。如下图所示: 实际上,不必惊慌&#xf…

视频压縮大小不影响画质,视频压缩大小不影响画质的软件

在数字化浪潮推动下,视频制作和分享已成为我们生活的一部分。然而,视频文件体积过大常常让分享和存储变得头疼。今天,我们就来聊聊如何在苹果电脑上压缩视频文件大小,让你的视频瞬间瘦身! 方法一、 1.下载并安装视频压…

AgentGYM:结合模仿学习和探索学习策略,让智能体不再需要人类的帮助,在各种环境和任务中自我进化

AgentGYM:结合模仿学习和探索学习策略,让智能体不再需要人类的帮助,在各种环境和任务中自我进化 提出背景AgentGYM 框架AgentGYM 解法拆解AgentEVOL 自我进化算法子解法1:行为克隆子解法2:探索子解法3:学习…

简单搭建卷积神经网络实现手写数字10分类

搭建卷积神经网络实现手写数字10分类 1.思路流程 1.导入minest数据集 2.对数据进行预处理 3.构建卷积神经网络模型 4.训练模型,评估模型 5.用模型进行训练预测 一.导入minest数据集 MNIST--->raw--->test-->(0,1,2...) 10个文件夹 MNIST--->raw-…

爬虫与 Zapier 集成

利用与 Zapier 集成的爬虫 API,以最小的工作量自动完成数据收集、处理和报告等复杂任务。 什么是 Zapier? Zapier 是一家为网络应用程序提供集成的公司,可用于自动化工作流程。 无代码集成 爬虫 API 集成无需编码,您只需点击几下&#x…

ERR SELECT is not allowed in cluster mode

在redis集群模式下,默认且只能使用0号database库,不允许使用SELECT 操作选择database 。

Django Q()函数

Q() 函数的作用 在Django中,Q()函数是一个非常有用的工具,主要用于构建复杂的查询。它允许你创建复杂的查询语句,包括AND、OR和NOT逻辑操作。这对于处理复杂的数据库查询特别有用,特别是在你需要组合多个条件或处理复杂的过滤逻辑…