【Python开发】FastAPI 08:Security 登录认证

news2025/1/8 22:03:21

FastAPIfastapi.security 模块中提供了诸多安全性的工具,简化了各种安全机制的使用方法,可用于处理安全性、身份认证和授权等问题!

目录

1 介绍

1.1 OAuth2

1.2 OpenAPI

2 安全基础

2.1 使用 Bearer

① OAuth2PasswordBearer

② 使用

③ 运行

 ④ 密码流

2.2 获取当前用户

① 创建用户模型

② 创建 get_current_user 依赖项

③ 注入当前用户

3 使用密码和 Bearer 的简单 OAuth2

3.1流程

① 获取 username 和 password

② 使用表单数据

③ 校验密码

④ 返回令牌/token

⑤ 获取当前活跃用户

3.2 实际效果

① 完整代码

② 身份认证

③ 获取本人的用户数据

④ 未启用的用户

4 OAuth2 实现密码哈希与 Bearer JWT 令牌验证

4.1 JWT

4.2 流程

① 密码哈希与校验

③ 更新依赖项

④ 更新 /token 路径操作

4.3 实际效果

① 完整代码

② 检查

③ 小结


📌 源码地址:

https://gitee.com/yinyuu/fast-api_study_yinyu

1 介绍

FastAPI 提供了多种工具,可帮助你以标准的方式轻松、快速地处理安全性,而无需研究和学习所有的安全规范,这相比花费大量的精力和代码处理安全性和身份认证很有好了(比如 java 😂)。

首先我们来看一些小概念,如果已了解可直接看第二种~

1.1 OAuth2

OAuth2 是一个规范,它定义了几种处理身份认证和授权的方法。 它是一个相当广泛的规范,涵盖了一些复杂的使用场景,包括使用「第三方」进行身份认证的方法。

Facebook,Google,Twitter,GitHub 登录系统均是采用该机制。

📌 OAuth 1

OAuth 1OAuth2 完全不同,并且更为复杂,它直接包含了有关如何加密通信的规范。

如今它已经 out 了,也就没多少人用了。

OAuth2 没有指定如何加密通信,它期望使用 HTTPS 进行通信。

1.2 OpenAPI

OpenAPI(以前称为 Swagger)是用于构建 API 的开放规范。

FastAPI 基于 OpenAPI,因此自动交互式文档界面,代码生成等成为可能。

OpenAPI 有一种定义多个安全「方案」的方法,你可以利用所有这些基于标准的工具,包括这些交互式文档系统。

OpenAPI 定义了以下安全方案:

  • apiKey:一个特定于应用程序的密钥,可以来自:
    • 查询参数。
    • 请求头。
    • cookie
  • http:标准的 HTTP 身份认证系统,包括:
    • bearer: 一个值为 Bearer 加令牌字符串的 Authorization 请求头。这是从 OAuth2 继承的。
    • HTTP Basic 认证方式。
    • HTTP Digest,等等。
  • oauth2:所有的 OAuth2 处理安全性的方式(称为「流程」)。
    • 以下几种流程适合构建 OAuth 2.0 身份认证的提供者(例如 Google,Facebook,Twitter,GitHub 等): * implicit * clientCredentials * authorizationCode
    • 但是有一个特定的「流程」可以完美地用于直接在同一应用程序中处理身份认证:
    • password:接下来将介绍它的示例。
  • openIdConnect:提供了一种定义如何自动发现 OAuth2 身份认证数据的方法。
    • 此自动发现机制是 OpenID Connect 规范中定义的内容。

FastAPI fastapi.security 模块中为每个安全方案提供了几种工具,这些工具简化了这些安全机制的使用方法。

2 安全基础

假设前后端分离开发,前端要使用后端的 username password 验证用户身份。

预先安装:pip install python-multipart

因为 OAuth2 使用表单数据发送 username password

2.1 使用 Bearer

本例使用 OAuth2 Password 流以及 Bearer 令牌(Token),为此要使用 OAuth2PasswordBearer 类。

① OAuth2PasswordBearer

创建 OAuth2PasswordBearer 的类实例时,要传递 tokenUrl 参数。该参数包含客户端(用户浏览器中运行的前端) 的 URL,用于发送 username password,并获取令牌。

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

tokenUrl="token" 指向的是暂未创建的相对 URL token,这个相对 URL 相当于 ./token。比如 API 位于 https://example.com/,则指向 https://example.com/token

oauth2_scheme 变量是 OAuth2PasswordBearer 的实例,也是可调用项。

比如以下边方式调用:

oauth2_scheme(some, parameters)

因此,Depends 可以调用 oauth2_scheme 变量。

② 使用

接下来,使用 Depends oauth2_scheme 传入依赖项。

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}

该依赖项使用字符串(str)接收路径操作函数的参数 token

③ 运行

使用 main 函数运行:

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

📌 文档

此时访问交互式文档 http://127.0.0.1:8000/docs# 👇

页面右上角出现了一个「Authorize」按钮。 路径操作的右上角也出现了一个可以点击的小锁图标,代表这个接口需要登录认证。

那么点击 Authorize 按钮,弹出授权表单,输入 username password 及其它可选字段:

目前,由于是 demo,在表单中输入内容不会有任何反应,后文会逐步进行完善!

这个自动工具非常实用,可在文档中与所有 API 交互。

前端团队(可能就是开发者本人)可以使用本工具。

第三方应用与系统也可以调用本工具。 开发者也可以用它来调试、检查、测试应用。

📌 文档中请求接口

在文档中请求该接口时,FastAPI 校验请求中的 Authorization 请求头,核对请求头的值是不是由 Bearer + 令牌组成, 并返回令牌字符串(str)。

如果没有找到 Authorization 请求头,或请求头的值不是 Bearer + 令牌。FastAPI 直接返回 401 错误状态码(UNAUTHORIZED):

 ④ 密码流

现在,再回过头来看一下。 Password 流是 OAuth2 定义的,用于处理安全与身份验证的方式(流)。 OAuth2 的设计目标是为了让后端或 API 独立于服务器验证用户身份。

但在本例中,FastAPI 应用会处理 API 与身份验证。

以下是简化的运行流程:

  • 用户在前端输入 username password,并点击回车
  • (用户浏览器中运行的)前端把 username password 发送至 API 中指定的 URL(使用 tokenUrl="token" 声明)
  • API 检查 username 与 password,并用令牌(Token) 响应(暂未实现此功能):
  • 令牌只是用于验证用户的字符串
  • 一般来说,令牌会在一段时间后过期
    • 过时后,用户要再次登录
    • 这样一来,就算令牌被人窃取,风险也较低。因为它与永久密钥不同,在绝大多数情况下不会长期有效
  • 前端临时将令牌存储在某个位置
  • 用户点击前端,前往前端应用的其它部件
  • 前端需要从 API 中提取更多数据:
    • 为指定的端点(Endpoint)进行身份验证
    • 因此,用 API 验证身份时,要发送值为 Bearer + 令牌的请求头 Authorization
    • 假如令牌为 foobarAuthorization 请求头就是: Bearer foobar

看到了吧,只要多写三四行代码,就可以添加基础的安全表单。

2.2 获取当前用户

前边的安全系统向路径操作函数提供了一个 str 类型的 token,接下来进行完善,让它返回当前用户给我们。

① 创建用户模型

首先,创建一个用户 Pydantic 模型。

与使用 Pydantic 声明请求体的方式相同,我们可以在其他任何地方使用它:

from pydantic import BaseModel

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

② 创建 get_current_user 依赖项

接下来创建一个 get_current_user 依赖项,用于获取当前的用户,get_current_user 将具有一个我们之前所创建的同一个 oauth2_scheme 作为依赖项。

fake_decode_token (伪)工具函数接收 str 类型的令牌并返回我们的 Pydantic User 模型,get_current_user 将从子依赖项 oauth2_scheme 中接收一个 str 类型的 token

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def fake_decode_token(token):
    return User(
        username=token + "fakedecoded", email="yinyu@example.com", full_name="yinyu"
    )

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user

③ 注入当前用户

现在可以在路径操作中使用 get_current_user 作为 Depends 了,完整代码:

from typing import Union
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

def fake_decode_token(token):
    return User(
        username = token + "fakedecoded", email="yinyu@example.com", full_name="yinyu"
    )

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

注意我们将 current_user 的类型声明为 Pydantic 模型 User,FastAPI 将自动进行代码补全和类型检查。

现在你可以直接在路径操作函数中获取当前用户。 接下来我们只需要再为用户/客户端添加一个真正发送 username password 的路径操作。

3 使用密码和 Bearer 的简单 OAuth2

接着上一章继续开发,添加缺少的部分以实现一个完整的安全性流程。

3.1流程

① 获取 username password

回顾一下,流程第一步就是用户在前端输入 username password,那么此时要做的事便是获取该 username password,然后再进行校验,部分代码:

from fastapi.security import OAuth2PasswordRequestForm

async def login(form_data: OAuth2PasswordRequestForm = Depends()):

FastAPI 提供了 OAuth2PasswordRequestForm ,它的作用是接收请求表单中的数据,包括:

  • username
  • password
  • 一个可选的 scope 字段,是一个由空格分隔的字符串组成的大字符串
  • 一个可选的 grant_type
  • 一个可选的 client_id(我们的示例不需要它)
  • 一个可选的 client_secret(我们的示例不需要它)

OAuth2PasswordRequestForm 并不像 OAuth2PasswordBearer 一样是 FastAPI 的一个特殊的类。 OAuth2PasswordBearer 使得 FastAPI 明白它是一个安全方案。所以它得以通过这种方式添加到 OpenAPI 中。 但 OAuth2PasswordRequestForm 只是一个你可以自己编写的类依赖项,或者你也可以直接声明 Form 参数(请求表单),只是前者更为快捷方便。

② 使用表单数据

首先,使用表单字段中的 username 从(伪)数据库中获取用户数据。 如果没有这个用户,我们将返回一个错误消息,提示「用户名或密码错误」

...

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username) #使用表单数据
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    ...

③ 校验密码

目前我们已经从数据库中获取了用户数据,但尚未校验密码。

首先让我们将这些数据放入 Pydantic UserInDB 模型中。 永远不要保存明文密码,因此,我们将使用(伪)哈希密码系统。 如果密码不匹配,我们将返回同一个错误。

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

class UserInDB(User):
    hashed_password: str

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
 ...

hashed_password 可以理解为哈希密码(为了安全):「哈希」的意思是:将某些内容(在本例中为密码)转换为看起来像乱码的字节序列(只是一个字符串)。 每次你传入完全相同的内容(完全相同的密码)时,你都会得到完全相同的乱码。

📌 UserInDB(**user_dict) 表示:

直接将 user_dict 的键和值作为关键字参数传递,等同于:

UserInDB(
    username = user_dict["username"],
    email = user_dict["email"],
    full_name = user_dict["full_name"],
    disabled = user_dict["disabled"],
    hashed_password = user_dict["hashed_password"],
)

这在【Python开发】FastAPI 04:响应模型 有所介绍。

④ 返回令牌/token

token 端点的响应必须是一个 JSON 对象,它应该有一个 token_type。在我们的例子中,由于我们使用的是「Bearer」令牌,因此令牌类型应为「bearer」。 并且还应该有一个 access_token 字段,它是一个包含我们的访问令牌的字符串。

在此示例中,我们将极其不安全地返回相同的 username 作为令牌。

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

在下一章中,你将看到一个真实的安全实现,使用了哈希密码和 JWT 令牌。

⑤ 获取当前活跃用户

我们想要仅当此用户处于启用状态时才能获取 current_user。 因此,我们创建了一个额外的依赖项 get_current_active_user,而该依赖项又以 get_current_user 作为依赖项。

如果用户不存在或处于未启用状态,则这两个依赖项都将仅返回 HTTP 错误。 因此,在我们的端点中,只有当用户存在,身份认证通过且处于启用状态时,我们才能获得该用户:

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

我们在此处返回的值为 Bearer 的额外响应头 WWW-Authenticate 也是规范的一部分。 任何的 401「未认证」HTTP(错误)状态码都应该返回 WWW-Authenticate 响应头。 对于 bearer 令牌(我们的例子),该响应头的值应为 Bearer。 不过你可以忽略这个额外的响应头,没什么影响。

3.2 实际效果

① 完整代码

from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret1",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}


app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

class UserInDB(User):
    hashed_password: str


def get_user(db: dict, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token: str):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"} #access_token 将被识别为 token


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8080)

② 身份认证

http://127.0.0.1:8000/docs#/打开交互式文档:http://127.0.0.1:8000/docs#/

点击「Authorize」按钮。 使用以下凭证:

  • 用户名:johndoe
  • 密码:secret1

在系统中进行身份认证后,你将看到:

③ 获取本人的用户数据

现在执行 /users/me 路径的 GET 操作。

你将获得你的用户数据,如:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false,
  "hashed_password": "fakehashedsecret1"
}

 如果你点击锁定图标并注销,然后再次尝试同一操作,则会得到 HTTP 401 错误:

{
  "detail": "Not authenticated"
}

④ 未启用的用户

现在尝试使用未启用的用户,并通过以下方式进行身份认证:

  • 用户名:alice
  • 密码:secret2

然后尝试执行 /users/me 路径的 GET 操作。

你将得到一个「未启用的用户」错误,如:

{
  "detail": "Inactive user"
}

现在你掌握了为你的 API 实现一个基于 username password 的完整安全系统的工具。 唯一缺少的细节是它实际上还并不「安全」。下边你将看到如何使用一个安全的哈希密码库和 JWT 令牌。

4 OAuth2 实现密码哈希与 Bearer JWT 令牌验证

至此,我们已经编写了所有安全流,本章学习如何使用 JWT 令牌(Token)和安全密码哈希(Hash)实现真正的安全机制。

接下来,继续完善安全机制。

4.1 JWT

官方网址:JSON Web Tokens - jwt.io

JWT JSON 网络令牌(JSON Web Tokens)。

JWT 是一种将 JSON 对象编码为没有空格,且难以理解的长字符串的标准。JWT 的内容如下所示:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

JWT 字符串没有加密,任何人都能用它恢复原始信息。

JWT 使用了签名机制,接受令牌时,可以用签名校验令牌,那么别人将无法破解。

使用 JWT 创建有效期为一周的令牌。第二天,用户持令牌再次访问时,仍为登录状态。 令牌于一周后过期,届时,用户身份验证就会失败。只有再次登录,才能获得新的令牌。如果用户(或第三方)篡改令牌的过期时间,因为签名不匹配会导致身份验证失败。

📌 安装 python-jose

Python 中生成和校验 JWT 令牌:

pip install python-jose[cryptography]

Python-jose 需要安装配套的加密后端。 本教程推荐的后端是:pyca/cryptography,因为 Python-jose 支持 PyJWT 的所有功能,还支持与其它工具集成时可能会用到的一些其它功能。

4.2 流程

① 密码哈希与校验

📌 安装 passlib

Passlib 是处理密码哈希的 Python 包。 它支持很多安全哈希算法及配套工具。 本教程推荐的算法是 Bcrypt。 因此请先安装附带 Bcrypt PassLib

pip install passlib[bcrypt]

📌 哈希与校验

首先从 passlib 导入所需工具。 创建用于密码哈希和身份校验的 PassLib 上下文。

接下来,创建三个工具函数:

  • 第一个函数用于校验接收的密码是否匹配存储的哈希值
  • 第二个函数用于哈希用户的密码,也就是给密码加密
  • 第三个函数用于根据用户名获取用户信息
  • 第四个函数用于身份验证,并返回用户
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

② 处理 JWT 令牌

导入已安装的模块, 创建用于 JWT 令牌签名的随机密钥。 使用以下命令,生成安全的随机密钥:

openssl rand -hex 32
#比如:09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7

然后,把生成的密钥复制到变量 SECRET_KEY,我是自己定义了一个 "yinyusecuritykey"。

接着创建设置令牌过期时间的变量,定义令牌端点响应的 Pydantic 模型,创建生成新的访问令牌的工具函数。

from jose import JWTError, jwt

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class Token(BaseModel):
    access_token: str
    token_type: str

def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

③ 更新依赖项

更新 get_current_user 以接收与之前相同的令牌,但这里用的是 JWT 令牌。 解码并校验接收到的令牌,然后返回当前用户。

如果令牌无效,则直接返回 HTTP 错误。

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        #对token进行解码
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

📌 JWT sub 的技术细节

JWT 规范还包括 sub 键,值是令牌的主题。

该键是可选的,但要把用户标识放在这个键里,所以本例使用了该键。 除了识别用户与许可用户在 API 上直接执行操作之外,JWT 还可能用于其它事情,简单来说就是权限校验。

例如,识别汽车博客。 接着,为实体添加权限,比如驾驶(汽车)编辑(博客)。 然后,把 JWT 令牌交给用户(或机器人),他们就可以执行驾驶汽车,或编辑博客等操作。无需注册账户,只要有 API 生成的 JWT 令牌就可以。

④ 更新 /token 路径操作

用令牌过期时间创建 timedelta 对象。

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

4.3 实际效果

① 完整代码

#pip install python-jose,pip install passlib[bcrypt]
from datetime import datetime, timedelta
from typing import Union

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "yinyusecuritykey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$Iy1FxkhNXVtoJjDfTVTo9erntJ2FFFPJKZbQH3FcBFijS/zNCzGK.",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]


if __name__ == '__main__':
    # import uvicorn
    # uvicorn.run(app, host="127.0.0.1", port=8000)
    print(get_password_hash("yinyusecurity")) #此为“yinyusecurity”加密后的 👇,然后放在fake_users_db中
    #$2b$12$Iy1FxkhNXVtoJjDfTVTo9erntJ2FFFPJKZbQH3FcBFijS/zNCzGK.

② 检查

运行服务器并访问文档: http://127.0.0.1:8000/docs,可以看到如下用户界面:

用与上一章同样的方式实现应用授权。 使用如下凭证:

  • 用户名: johndoe
  • 密码: yinyusecurity

 调用 /users/me/ 端点,收到下面的响应:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false
}

打开浏览器的开发者工具,可查看数据是怎么发送的,而且数据里只包含了令牌。

验证用户的第一个请求会发送密码,并获取访问令牌,但之后不会再发送密码,就只会校验访问令牌了。

 注意,请求中 Authorization 响应头的值以 Bearer 开头。

③ 小结

至此,您可以使用 OAuth2 JWT 等标准配置安全的 FastAPI 应用。

几乎在所有框架中,处理安全问题很快都会变得非常复杂。 有些包为了简化安全流,不得不在数据模型、数据库和功能上做出妥协。而有些过于简化的软件包其实存在了安全隐患。

FastAPI 不向任何数据库、数据模型或工具做妥协, 开发者可以灵活选择最适合项目的安全机制。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/608861.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

开关电源关键参数计算方法

1、源调整率(Line Regulation):将待测开关电源以额定输入电压及额定负载状况下热机15 分钟稳定后,分别于输入电压的下限、额定输入电压(Normal)、输入电压的上限测量并记录各自对应的输出电压值为 V1、V0(normal&#…

Linux NGINX服务 ReWrite^location

ReWrite^location 从功能看 rewrite 和 location 似乎有点像,都能实现跳转,主要区别在于 rewrite 是在同一域名内更改获取资源的路径,而 location 是对一类路径做控制访问或反向代理,还可以proxy_pass 到其他机器。 rewrite 对访问…

Nginx正则表达式、location匹配、Rewrite重写详解

Nginx正则表达式、location匹配、Rewrite重写详解 一、常用的Nginx正则表达式二、location匹配概述1、location大致可以分为三类2、location常用的匹配规则3、location 优先级4、location 示例说明5、实际网站使用中,至少有三个匹配规则定义 三、rewrite重写1、rewr…

果推断16--基于反事实因果推断的度小满额度模型学习笔记

目录 一、原文地址 二、一些问题 2.1如何从RCT随机样本过渡到观测样本因果建模? 2.2反事实学习的核心思想 2.3度小满的连续反事实额度模型 Mono-CFR 2.4Mono-CFR代码实现(待补充) 2.5CFR学习 2.5.1CFR 2.5.2DR-CFR 参考 一、原文地…

Spring Cloud Alibaba — Nacos 构建服务注册中心

文章目录 Nacos Server下载启动登录创建命名空间 Nacos Client启动样例Nacos 服务发现配置项 集成 OpenFeign 远程接口调用添加 OpenFeign 依赖开启 EnableFeignClients 注解编写远程服务接口远程接口调用 集成 Sentinel 熔断降级添加 Sentinel 依赖开启 Sentinel 熔断降级编写…

【数据结构每日一题】链表——单链表重排

[数据结构习题]链表——单链表重排 👉知识点导航💎:【数据结构】线性表——顺序存储 👉知识点导航💎:【数据结构】线性表——链式存储 👉[王道数据结构]习题导航💎: p …

pr安装缺少VCRUNTIME140.dll怎么办?这三个修复方案可以解决

在我们安装pr的时候,遇到缺少VCRUNTIME140.dll怎么办?vcruntime140.dll是一个Windows动态链接库,其主要功能是为C/C编译的程序提供运行时支持。这些库包括输入/输出函数、数学函数、字符串函数等等。因此,如果您的计算机缺少vcrun…

【接口自动化测试】一步一步教你搭建接口环境

要做接口测试,我们得搭建一套本地可以运行的接口环境。这次我选择了一个搭建容易,适合学习的系统——学生管理系统。 Python安装 这套管理系统是Python代码写的,因此需要Python环境。 安装挺无脑的,按照我提供的安装包和方法装…

windows下PC端小程序抓包--FiddlerCharles

目录 引言 【背景说明】 【操作说明】 【总结】 引言 大家好,你是否曾经遇到过想要抓取Windows下PC端小程序的网络请求数据,但不知道该用什么工具呢? 今天我要介绍的Fiddler和Charles两款工具,可帮助你轻松切入小程序网络请…

MySQL数据库 7.图形化界面工具DataGrip基础应用教学

目录 前言: DataGrip安装界面: 利用DataGrip创建数据库: 利用DataGrip为数据库创建表: 利用datagrip修改表: 添加元素: 结束! 前言: 在之前我们一直接触的是MySQL命令行语句开…

4.3 最优装载

博主简介:一个爱打游戏的计算机专业学生博主主页: 夏驰和徐策所属专栏:算法设计与分析 1.什么是贪心算法的最优装载问题? 最优装载问题(Bin Packing Problem)是一个经典的组合优化问题,涉及将一…

【Linux】-编译器-gcc/g++使用以及动态库和静态库的介绍(以及解决sudo失败的方法)

💖作者:小树苗渴望变成参天大树 ❤️‍🩹作者宣言:认真写好每一篇博客 💨作者gitee:gitee 💞作者专栏:C语言,数据结构初阶,Linux,C 如 果 你 喜 欢 作 者 的 文 章 ,就 给 作 者 点…

iOS证书(.p12)和描述文件(.mobileprovision)申请

目录 iOS证书(.p12)和描述文件(.mobileprovision)申请文末扩展(UDID获取、添加测试设备) 说明:本文申请证书、描述文件转载自 uniapp官网   iOS证书(.p12)和描述文件(.mobileprovision)申请      官网会时不时更新,如有疑问&…

《Apollo 智能驾驶进阶课程》二、 高精地图

1. 高精地图与自动驾驶的关系 1.1 高精地图与自动驾驶 L3级别以上才需要高精地图 1.2 什么是高精地图 1.3 高精地图与导航地图 1.4 高精地图-基础模块 高精地图与定位模块的关系 现在主流的自动驾驶的定位方案有两种:一种是基于点云,另一种是基于C…

投票系统(前后端分离)

1.投票系统的介绍 投票系统是一种用于组织选举和投票的软件系统,它可以帮助政府、企业、组织和社区等各种机构进行公正、透明和高效的投票活动。投票系统的主要功能包括:选民身份验证、投票管理、计票和结果公布等。 选民身份验证是指投票系统可以通过…

SpringCloud Sleuth/Zipkin学习

SpringCloud Sleuth/Zipkin 文章目录 SpringCloud Sleuth/Zipkin1 Sleuth/Zipkin 简介2 Sleuth/ZipKin-搭建链路监控实例 1 Sleuth/Zipkin 简介 在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的的服务节点调用, 来协同产生最后的请求结果&…

电脑右键删除的文件如何恢复?提供了4种方法

电脑上不小心删除文件是很常见的一件事。比如在使用右键删除的情况下,但是,误删并不代表永远丧失这个文件,因为我们可以通过一些方法来恢复右键删除的文件。本文提供了4种方法,你可以根据具体的数据丢失情况选择合适的方法。 方法…

【Python】列表 List ③ ( 查询操作 / 修改操作 | 列表查询操作 List#index | 修改列表指定位置元素值 )

文章目录 一、列表查询操作1、List#index 函数简介2、代码示例 - 列表查询3、列表查询 ValueError 报错 二、修改列表指定索引元素1、语法简介2、代码示例 - 使用正向 / 反向索引修改指定元素 一、列表查询操作 1、List#index 函数简介 列表 List 查询功能 , 通过 List#index 函…

PyTorch 深度学习 || 专题一:神经网络基础

神经网络基础 神经网络是一门重要的机器学习技术。它是目前最为火热的研究方向–深度学习的基础。学习神经网络不仅可以让你掌握一门强大的机器学习方法,同时也可以更好地帮助你理解深度学习技术。 神经网络是一种模拟人脑的神经网络以期能够实现类人工智能的机器…

【JAVA】双向链表详解

【JAVA】双向链表详解 双向链表的定义双向链表的初步实现(准备)双向链表的操作一. 打印链表二. 得到链表长度三. 插入操作3.1 头插法3.2 尾插法3.3 任意位置插入 四. 删除操作4.1 删除第一次出现为key的节点(3种情况)4.2 删除所以…