FastAPI 结合 JWT

news2024/12/24 2:27:12

文章目录

  • FastAPI 结合 JWT
  • 步骤
  • 安装
  • 步骤
    • 导入必要的模块
    • 设置配置和初始化应用
    • 创建数据模型
    • 实现辅助函数
      • 生成 JWT Token
      • 获取用户数据
      • 验证密码
      • 获取当前用户
    • 用户登录获取 Token
    • 受保护的路由示例
  • 所有代码
  • 测试
    • 获取 Token
    • 访问受保护的路由
      • token正确
      • token错误
  • 总结
  • 注意

FastAPI 结合 JWT

JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被篡改。JWT 通常用于认证和授权流程。

步骤

在 FastAPI 中,JWT 主要用于保护 API 路由,使其只允许经过身份验证的用户访问。认证流程大致如下:

  1. **用户登录:**用户通过提交用户名和密码获取 JWT。
  2. **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
  3. **访问受保护的路由:**用户在访问受保护的路由时,需要在请求头中携带该 JWT。
  4. **Token 验证:**服务器验证 JWT 的合法性和有效性,允许或拒绝访问受保护资源。

安装

pip install fastapi uvicorn pyjwt

步骤

导入必要的模块

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import jwt

设置配置和初始化应用

SECRET_KEY = "your_secret_key"  # 用于签名 JWT 的密钥
ALGORITHM = "HS256"             # 加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 过期时间

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

创建数据模型

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None

class UserInDB(User):
    hashed_password: str

实现辅助函数

生成 JWT Token

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

获取用户数据

fake_users_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": "fakehashedpassword",
        "disabled": False,
    }
}

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

验证密码

def verify_password(plain_password, hashed_password):
    return plain_password == hashed_password

获取当前用户

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except jwt.PyJWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

用户登录获取 Token

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = get_user(fake_users_db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

受保护的路由示例

@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

所有代码

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import uvicorn
import jwt
import os

# JWT 相关配置
SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3"  # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中)
ALGORITHM = "HS256"  # 使用的加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 的有效时间,以分钟为单位

app = FastAPI()  # 创建 FastAPI 应用实例

# OAuth2PasswordBearer 实例,用于依赖项
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息
fake_users_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": "fakehashedpassword",  # 在实际应用中,存储经过哈希处理的密码
        "disabled": False,  # 用户是否被禁用
    }
}

# Pydantic 模型,用于定义请求和响应的数据结构
class Token(BaseModel):
    access_token: str  # Token 字符串
    token_type: str  # Token 类型(一般为 "bearer")

class TokenData(BaseModel):
    username: str | None = None  # 从 Token 中提取的用户名

class User(BaseModel):
    username: str  # 用户名
    email: str | None = None  # 邮箱地址,可选
    full_name: str | None = None  # 用户全名,可选
    disabled: bool | None = None  # 用户是否被禁用,可选

class UserInDB(User):
    hashed_password: str  # 存储在数据库中的哈希密码

# 生成 JWT Token 的函数
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    """
    生成 JWT Token。

    参数:
    - data (dict): 要编码到 JWT 中的数据。
    - expires_delta (timedelta, 可选): Token 的过期时间。

    返回:
    - str: 编码后的 JWT 字符串。
    """
    to_encode = data.copy()  # 创建副本,以避免修改原始数据
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})  # 添加过期时间到 JWT 数据中
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 生成 JWT
    return encoded_jwt

# 从假数据库获取用户信息
def get_user(db, username: str):
    """
    从数据库中获取用户信息。

    参数:
    - db (dict): 用户数据库(在本例中为假数据)。
    - username (str): 用户名。

    返回:
    - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。
    """
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

# 验证用户密码
def verify_password(plain_password, hashed_password):
    """
    验证用户密码。

    参数:
    - plain_password (str): 用户输入的明文密码。
    - hashed_password (str): 存储在数据库中的哈希密码。

    返回:
    - bool: 密码匹配返回 True,否则返回 False。
    """
    return plain_password == hashed_password  # 在实际应用中,这里应该使用哈希函数进行比较

# 验证 Token 并获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
    """
    从 JWT Token 中提取用户信息,并验证 Token 的合法性。

    参数:
    - token (str): JWT Token。

    返回:
    - User: 返回当前用户信息。

    抛出:
    - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解码 JWT
        username: str = payload.get("sub")  # 获取 JWT 中的用户名
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except jwt.PyJWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)  # 获取用户信息
    if user is None:
        raise credentials_exception
    return user

# 验证用户是否被禁用
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    """
    验证当前用户是否被禁用。

    参数:
    - current_user (User): 当前用户信息。

    返回:
    - User: 如果用户未被禁用,返回用户信息。

    抛出:
    - HTTPException: 当用户被禁用时抛出 400 错误。
    """
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# 用户登录获取 Token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """
    用户登录接口,用于获取 JWT Token。

    参数:
    - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。

    返回:
    - dict: 包含 access_token 和 token_type 的响应数据。

    抛出:
    - HTTPException: 当用户名或密码错误时抛出 401 错误。
    """
    user = get_user(fake_users_db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 设置 Token 过期时间
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# 受保护的路由示例
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """
    获取当前用户信息的受保护路由。

    参数:
    - current_user (User): 当前登录的用户信息(通过 JWT 验证)。

    返回:
    - User: 返回当前用户的信息。
    """
    return current_user

# 运行应用
if __name__ == "__main__":
    uvicorn.run(
        f"{os.path.basename(__file__).split('.')[0]}:app",
        host="127.0.0.1",
        port=8000,
        reload=True,  # 启用自动重载
    )

测试

获取 Token

在这里插入图片描述

访问受保护的路由

token正确

在这里插入图片描述

token错误

在这里插入图片描述

总结

JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。

在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的合法性后,允许用户访问受保护的资源。

实现 JWT 认证的步骤包括安装必要的依赖项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。

  • JWT 配置:包括密钥(SECRET_KEY)和加密算法(ALGORITHM),用于生成和验证令牌。
  • 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处理认证逻辑。
  • API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只允许携带有效 JWT 的请求访问。

使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有经过身份验证的用户才能访问受保护的 API 路由。

注意

示例中密码并没有经过hash加密实际应用中要加密的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1970294.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【中项】系统集成项目管理工程师-第9章 项目管理概论-9.5 项目立项管理

前言:系统集成项目管理工程师专业,现分享一些教材知识点。觉得文章还不错的喜欢点赞收藏的同时帮忙点点关注。 软考同样是国家人社部和工信部组织的国家级考试,全称为“全国计算机与软件专业技术资格(水平)考试”&…

RAGCache多级动态缓存管理系统,让RAG推理更高效|RAG论文分享

今天为大家分享北京大学和字节跳动联合发表的一篇关于加速RAG推理的论文: RAGCache: Efficient Knowledge Caching for Retrieval-Augmented Generation 论文地址:https://arxiv.org/abs/2404.12457 1.论文概述 Retrieval-Augmented Generation (RAG) …

聊聊Netty中几个重要的生命周期

写在文章开头 Netty内置了各种开箱即用的处理器,把握好处理器中几个比较重要的生命周期回调用助于我们编写出强大的网络通信程序,所以本文将基于一个简单的示例和源码介绍一下Netty中几个比较重要的生命周期函数,希望对你有帮助。 Hi&#x…

九州未来参编,《边缘计算产业发展研究报告》正式发布

日前,由中国通信标准化协会主办的第四届“云边协同”大会暨首届分布式算力论坛在北京成功举办。大会聚焦云边端分布式算力领域技术新突破、应用新场景以及发展新价值,搭建政产学研用交流对接平台,深化产业链协同开放合作。 会上,由…

Cookie-Monster:一款针对Web浏览器的安全分析与数据提取工具

关于Cookie-Monster Cookie-Monster是一款针对常见Web浏览器的安全分析与数据提取工具,该工具可以帮助广大研究人员提取并分析Edge、Chrome和Firefox浏览器中的Cookie数据。 Cookie-Monster适用于红队和蓝队成员,能够提取WebKit主密钥,找到具…

无监督学习与强化学习基础

就是训练数据无标签,算法自动对数据进行分类,听着很神奇,但学了机器学习以后,除了神经网络比较悬,对人像是个黑盒,别的都是基于数学的分类算法,无监督学习也不例外。 聚类—K-means算法 坐标轴…

Postman下载安装~用于springboot控制层测试

第一步:下载安装 方法1:在线下载 Postman API Platform 方法2:百度网盘 通过百度网盘分享的文件:Postman-win64-Setup 链接:https://pan.baidu.com/s/16nNfKvuNfM8z4kP1Ad-K2Q?pwdotxe 提取码:otxe -…

见证中国数据库的崛起:从追赶到引领的壮丽征程《三》

见证中国数据库的崛起:从追赶到引领的壮丽征程《三》 三、深度思考:中国数据库发展的经验与启示产学研用结合的创新模式应用驱动的创新路径人才培养的关键作用 【纪录片】中国数据库前世今生 在数字化潮流席卷全球的今天,数据库作为IT技术领域…

Java高级流

高级流 java将IO分为了两类 节点流:又称为"低级流" 特点:直接链接程序与另一端的"管道",是真实读写数据的流IO一定是建立在节点流的基础上进行的。文件流就是典型的节点流(低级流) 处理流:又称为"高级流" 特点:不能独立存在&#x…

开源项目的发展趋势,以及参与开源项目可以获得的经验和成果,以及涉及到的注意事项

目录 一、当前开源项目的发展趋势 1. 全球化协作与社区增长 2. 多领域技术创新与迭代加速 3. 开放协作模式 4. 商业化与产业融合 5. 安全性与隐私保护 6. 跨界融合与生态构建 7. 政策支持 二、参与开源项目的经验和收获 1. 技术能力提升 2. 团队协作与沟通能力 3.领…

阿里微服务质量保障系列:异步通信模式以及测试分析

软件质量保障 所寫即所思|一个阿里质量人对测试的所感所悟。 1. 异步通信模式 最常见的方式就是异步消息通信。使用消息机制时,服务之间的通信采用异步交换消息的方式完成。基于消息机制的应 用程序通常使用消息代理,它充当服务之间的中介。另一种选择是使用无代理架构,通…

实验室自动测试系统产品化注意问题

在实验室开发的自动测试系统转向产品化时,需要综合考虑多个方面,以确保系统的稳定性、可靠性和可维护性。以下是一些关键问题和建议: 1. 硬件选择与兼容性 在实验室中,可能会使用一些专业的实验设备或定制的硬件,但在…

一文带你掌握C++模版

12. C模板 什么是模板 模板编程也可以叫做泛型编程,忽略数据类型的一种编程方式 //求最值问题 int Max(int a,int b) {return a>b?a:b; } double Max(int a,int b) {return a>b?a:b; } string Max(string a,string b) {return a>b?a:b; …

搭建Nginx正向代理服务器,轻松实现外部网络请求的转发

​ 本文将介绍如何使用Nginx搭建一个简单的正向代理服务器,实现外部网络请求的转发。通过设置正向代理,我们可以隐藏真实的服务器地址,提高访问速度,以及增强网络安全性 一、Nginx正向代理简介 正向代理(Forward Pro…

基于node的学生公寓管理系统-计算机毕设 附源码 06412

基于node的学生公寓管理系统 摘 要 本论文主要论述了如何使用Node.js框架和Express框架开发一个基于Node的学生公寓管理系统,本系统将严格按照软件开发流程进行各个阶段的工作,采用B/S结构,面向对象编程思想进行项目开发。在引言中&#xff0…

无人机环保行业解决方案-应急环境污染处理

无人机环境应急处理 传统环境应急的典型挑战 发生环境应急事件时,最重要的是快速获取前方信息。然而,有毒气体 和易燃易爆品多,存在二次爆炸风险,严重威胁人身安全。无人机可快 速赶到事故现场,查看周边环境、污染物…

免费开源的搜索工具,支持搜索文件内容,绿色免安装

dnGrep是一款功能强大的开源Windows搜索工具,旨在帮助用户高效地在各种文档和压缩文件中查找文本内容。它支持多种查询方式,包括文本、正则表达式(Regex)、XPath以及音序查询。 dnGrep主要功能: 多格式支持&#xff…

网络安全学习平台top10_网络安全训练平台

前言 1.Hack In The Box:http://www.hackinthebox.org/ 2.Hellbound Hackers:https://www.hellboundhackers.org/ 3.Exploit Database:https://www.exploit-database.net/ 4.Hacking-Tutorial:https://www.hacking-tutorial.c…

【Python实战】如何优雅地实现 PDF 去水印?

话接上篇,自动化处理 PDF 文档,完美实现 WPS 会员功能 小伙伴们更关心的是如何去除 PDF 中的水印~ 今天,就来分享一个超简单的 PDF 去水印方法~ 1. 原理介绍 在上一篇中,我们介绍了如何将 PDF 文档转换成图片,图片…

前端必知必会-html中input的type设置

文章目录 HTML type的设置输入类型文本输入类型密码输入类型提交输入类型重置输入类型单选按钮输入类型复选框输入类型按钮输入类型颜色输入类型日期输入类型 Datetime-local输入类型电子邮件输入类型图像输入类型 文件输入类型 隐藏输入类型 月份输入类型 数字输入类型范围输入…