FastAPI 在 fastapi.security 模块中提供了诸多安全性的工具,简化了各种安全机制的使用方法,可用于处理安全性、身份认证和授权等问题!
目录
1 介绍
1.1 OAuth2
1.2 OpenAPI
2 安全基础
2.1 使用 Bearer
① OAuth2PasswordBearer
② 使用
③ 运行
④ 密码流
2.2 获取当前用户
① 创建用户模型
② 创建 get_current_user 依赖项
③ 注入当前用户
3 使用密码和 Bearer 的简单 OAuth2
3.1流程
① 获取 username 和 password
② 使用表单数据
③ 校验密码
④ 返回令牌/token
⑤ 获取当前活跃用户
3.2 实际效果
① 完整代码
② 身份认证
③ 获取本人的用户数据
④ 未启用的用户
4 OAuth2 实现密码哈希与 Bearer JWT 令牌验证
4.1 JWT
4.2 流程
① 密码哈希与校验
③ 更新依赖项
④ 更新 /token 路径操作
4.3 实际效果
① 完整代码
② 检查
③ 小结
📌 源码地址:
https://gitee.com/yinyuu/fast-api_study_yinyu
1 介绍
FastAPI 提供了多种工具,可帮助你以标准的方式轻松、快速地处理安全性,而无需研究和学习所有的安全规范,这相比花费大量的精力和代码处理安全性和身份认证很有好了(比如 java 😂)。
首先我们来看一些小概念,如果已了解可直接看第二种~
1.1 OAuth2
OAuth2 是一个规范,它定义了几种处理身份认证和授权的方法。 它是一个相当广泛的规范,涵盖了一些复杂的使用场景,包括使用「第三方」进行身份认证的方法。
Facebook,Google,Twitter,GitHub 登录系统均是采用该机制。
📌 OAuth 1
OAuth 1 与 OAuth2 完全不同,并且更为复杂,它直接包含了有关如何加密通信的规范。
如今它已经 out 了,也就没多少人用了。
OAuth2 没有指定如何加密通信,它期望使用 HTTPS 进行通信。
1.2 OpenAPI
OpenAPI(以前称为 Swagger)是用于构建 API 的开放规范。
FastAPI 基于 OpenAPI,因此自动交互式文档界面,代码生成等成为可能。
OpenAPI 有一种定义多个安全「方案」的方法,你可以利用所有这些基于标准的工具,包括这些交互式文档系统。
OpenAPI 定义了以下安全方案:
- apiKey:一个特定于应用程序的密钥,可以来自:
- 查询参数。
- 请求头。
- cookie。
- http:标准的 HTTP 身份认证系统,包括:
- bearer: 一个值为 Bearer 加令牌字符串的 Authorization 请求头。这是从 OAuth2 继承的。
- HTTP Basic 认证方式。
- HTTP Digest,等等。
- oauth2:所有的 OAuth2 处理安全性的方式(称为「流程」)。
- 以下几种流程适合构建 OAuth 2.0 身份认证的提供者(例如 Google,Facebook,Twitter,GitHub 等): * implicit * clientCredentials * authorizationCode
- 但是有一个特定的「流程」可以完美地用于直接在同一应用程序中处理身份认证:
- password:接下来将介绍它的示例。
- openIdConnect:提供了一种定义如何自动发现 OAuth2 身份认证数据的方法。
- 此自动发现机制是 OpenID Connect 规范中定义的内容。
FastAPI 在 fastapi.security 模块中为每个安全方案提供了几种工具,这些工具简化了这些安全机制的使用方法。
2 安全基础
假设前后端分离开发,前端要使用后端的 username 与 password 验证用户身份。
预先安装:pip install python-multipart
因为 OAuth2 使用表单数据发送 username 与 password。
2.1 使用 Bearer
本例使用 OAuth2 的 Password 流以及 Bearer 令牌(Token),为此要使用 OAuth2PasswordBearer 类。
① OAuth2PasswordBearer
创建 OAuth2PasswordBearer 的类实例时,要传递 tokenUrl 参数。该参数包含客户端(用户浏览器中运行的前端) 的 URL,用于发送 username 与 password,并获取令牌。
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
tokenUrl="token" 指向的是暂未创建的相对 URL token,这个相对 URL 相当于 ./token。比如 API 位于 https://example.com/,则指向 https://example.com/token。
oauth2_scheme 变量是 OAuth2PasswordBearer 的实例,也是可调用项。
比如以下边方式调用:
oauth2_scheme(some, parameters)
因此,Depends 可以调用 oauth2_scheme 变量。
② 使用
接下来,使用 Depends 把 oauth2_scheme 传入依赖项。
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
return {"token": token}
该依赖项使用字符串(str)接收路径操作函数的参数 token 。
③ 运行
使用 main 函数运行:
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
📌 文档
此时访问交互式文档 http://127.0.0.1:8000/docs# 👇
页面右上角出现了一个「Authorize」按钮。 路径操作的右上角也出现了一个可以点击的小锁图标,代表这个接口需要登录认证。
那么点击 Authorize 按钮,弹出授权表单,输入 username 与 password 及其它可选字段:
目前,由于是 demo,在表单中输入内容不会有任何反应,后文会逐步进行完善!
这个自动工具非常实用,可在文档中与所有 API 交互。
前端团队(可能就是开发者本人)可以使用本工具。
第三方应用与系统也可以调用本工具。 开发者也可以用它来调试、检查、测试应用。
📌 文档中请求接口
在文档中请求该接口时,FastAPI 校验请求中的 Authorization 请求头,核对请求头的值是不是由 Bearer + 令牌组成, 并返回令牌字符串(str)。
如果没有找到 Authorization 请求头,或请求头的值不是 Bearer + 令牌。FastAPI 直接返回 401 错误状态码(UNAUTHORIZED):
④ 密码流
现在,再回过头来看一下。 Password 流是 OAuth2 定义的,用于处理安全与身份验证的方式(流)。 OAuth2 的设计目标是为了让后端或 API 独立于服务器验证用户身份。
但在本例中,FastAPI 应用会处理 API 与身份验证。
以下是简化的运行流程:
- 用户在前端输入 username 与password,并点击回车
- (用户浏览器中运行的)前端把 username 与password 发送至 API 中指定的 URL(使用 tokenUrl="token" 声明)
- API 检查 username 与 password,并用令牌(Token) 响应(暂未实现此功能):
- 令牌只是用于验证用户的字符串
- 一般来说,令牌会在一段时间后过期
- 过时后,用户要再次登录
- 这样一来,就算令牌被人窃取,风险也较低。因为它与永久密钥不同,在绝大多数情况下不会长期有效
- 前端临时将令牌存储在某个位置
- 用户点击前端,前往前端应用的其它部件
- 前端需要从 API 中提取更多数据:
- 为指定的端点(Endpoint)进行身份验证
- 因此,用 API 验证身份时,要发送值为 Bearer + 令牌的请求头 Authorization
- 假如令牌为 foobar,Authorization 请求头就是: Bearer foobar
看到了吧,只要多写三四行代码,就可以添加基础的安全表单。
2.2 获取当前用户
前边的安全系统向路径操作函数提供了一个 str 类型的 token,接下来进行完善,让它返回当前用户给我们。
① 创建用户模型
首先,创建一个用户 Pydantic 模型。
与使用 Pydantic 声明请求体的方式相同,我们可以在其他任何地方使用它:
from pydantic import BaseModel
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
② 创建 get_current_user 依赖项
接下来创建一个 get_current_user 依赖项,用于获取当前的用户,get_current_user 将具有一个我们之前所创建的同一个 oauth2_scheme 作为依赖项。
fake_decode_token (伪)工具函数接收 str 类型的令牌并返回我们的 Pydantic User 模型,get_current_user 将从子依赖项 oauth2_scheme 中接收一个 str 类型的 token:
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def fake_decode_token(token):
return User(
username=token + "fakedecoded", email="yinyu@example.com", full_name="yinyu"
)
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
return user
③ 注入当前用户
现在可以在路径操作中使用 get_current_user 作为 Depends 了,完整代码:
from typing import Union
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
def fake_decode_token(token):
return User(
username = token + "fakedecoded", email="yinyu@example.com", full_name="yinyu"
)
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
return user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
注意我们将 current_user 的类型声明为 Pydantic 模型 User,FastAPI 将自动进行代码补全和类型检查。
现在你可以直接在路径操作函数中获取当前用户。 接下来我们只需要再为用户/客户端添加一个真正发送 username 和 password 的路径操作。
3 使用密码和 Bearer 的简单 OAuth2
接着上一章继续开发,添加缺少的部分以实现一个完整的安全性流程。
3.1流程
① 获取 username 和 password
回顾一下,流程第一步就是用户在前端输入 username 与password,那么此时要做的事便是获取该 username 与password,然后再进行校验,部分代码:
from fastapi.security import OAuth2PasswordRequestForm
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
FastAPI 提供了 OAuth2PasswordRequestForm ,它的作用是接收请求表单中的数据,包括:
- username
- password
- 一个可选的 scope 字段,是一个由空格分隔的字符串组成的大字符串
- 一个可选的 grant_type
- 一个可选的 client_id(我们的示例不需要它)
- 一个可选的 client_secret(我们的示例不需要它)
OAuth2PasswordRequestForm 并不像 OAuth2PasswordBearer 一样是 FastAPI 的一个特殊的类。 OAuth2PasswordBearer 使得 FastAPI 明白它是一个安全方案。所以它得以通过这种方式添加到 OpenAPI 中。 但 OAuth2PasswordRequestForm 只是一个你可以自己编写的类依赖项,或者你也可以直接声明 Form 参数(请求表单),只是前者更为快捷方便。
② 使用表单数据
首先,使用表单字段中的 username 从(伪)数据库中获取用户数据。 如果没有这个用户,我们将返回一个错误消息,提示「用户名或密码错误」。
...
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username) #使用表单数据
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
...
③ 校验密码
目前我们已经从数据库中获取了用户数据,但尚未校验密码。
首先让我们将这些数据放入 Pydantic UserInDB 模型中。 永远不要保存明文密码,因此,我们将使用(伪)哈希密码系统。 如果密码不匹配,我们将返回同一个错误。
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
...
hashed_password 可以理解为哈希密码(为了安全):「哈希」的意思是:将某些内容(在本例中为密码)转换为看起来像乱码的字节序列(只是一个字符串)。 每次你传入完全相同的内容(完全相同的密码)时,你都会得到完全相同的乱码。
📌 UserInDB(**user_dict) 表示:
直接将 user_dict 的键和值作为关键字参数传递,等同于:
UserInDB(
username = user_dict["username"],
email = user_dict["email"],
full_name = user_dict["full_name"],
disabled = user_dict["disabled"],
hashed_password = user_dict["hashed_password"],
)
这在【Python开发】FastAPI 04:响应模型 有所介绍。
④ 返回令牌/token
token 端点的响应必须是一个 JSON 对象,它应该有一个 token_type。在我们的例子中,由于我们使用的是「Bearer」令牌,因此令牌类型应为「bearer」。 并且还应该有一个 access_token 字段,它是一个包含我们的访问令牌的字符串。
在此示例中,我们将极其不安全地返回相同的 username 作为令牌。
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
在下一章中,你将看到一个真实的安全实现,使用了哈希密码和 JWT 令牌。
⑤ 获取当前活跃用户
我们想要仅当此用户处于启用状态时才能获取 current_user。 因此,我们创建了一个额外的依赖项 get_current_active_user,而该依赖项又以 get_current_user 作为依赖项。
如果用户不存在或处于未启用状态,则这两个依赖项都将仅返回 HTTP 错误。 因此,在我们的端点中,只有当用户存在,身份认证通过且处于启用状态时,我们才能获得该用户:
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
我们在此处返回的值为 Bearer 的额外响应头 WWW-Authenticate 也是规范的一部分。 任何的 401「未认证」HTTP(错误)状态码都应该返回 WWW-Authenticate 响应头。 对于 bearer 令牌(我们的例子),该响应头的值应为 Bearer。 不过你可以忽略这个额外的响应头,没什么影响。
3.2 实际效果
① 完整代码
from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret1",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
def get_user(db: dict, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token: str):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"} #access_token 将被识别为 token
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8080)
② 身份认证
http://127.0.0.1:8000/docs#/打开交互式文档:http://127.0.0.1:8000/docs#/
点击「Authorize」按钮。 使用以下凭证:
- 用户名:johndoe
- 密码:secret1
在系统中进行身份认证后,你将看到:
③ 获取本人的用户数据
现在执行 /users/me 路径的 GET 操作。
你将获得你的用户数据,如:
{
"username": "johndoe",
"email": "johndoe@example.com",
"full_name": "John Doe",
"disabled": false,
"hashed_password": "fakehashedsecret1"
}
如果你点击锁定图标并注销,然后再次尝试同一操作,则会得到 HTTP 401 错误:
{
"detail": "Not authenticated"
}
④ 未启用的用户
现在尝试使用未启用的用户,并通过以下方式进行身份认证:
- 用户名:alice
- 密码:secret2
然后尝试执行 /users/me 路径的 GET 操作。
你将得到一个「未启用的用户」错误,如:
{
"detail": "Inactive user"
}
现在你掌握了为你的 API 实现一个基于 username 和 password 的完整安全系统的工具。 唯一缺少的细节是它实际上还并不「安全」。下边你将看到如何使用一个安全的哈希密码库和 JWT 令牌。
4 OAuth2 实现密码哈希与 Bearer JWT 令牌验证
至此,我们已经编写了所有安全流,本章学习如何使用 JWT 令牌(Token)和安全密码哈希(Hash)实现真正的安全机制。
接下来,继续完善安全机制。
4.1 JWT
官方网址:JSON Web Tokens - jwt.io
JWT 即 JSON 网络令牌(JSON Web Tokens)。
JWT 是一种将 JSON 对象编码为没有空格,且难以理解的长字符串的标准。JWT 的内容如下所示:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
JWT 字符串没有加密,任何人都能用它恢复原始信息。
但 JWT 使用了签名机制,接受令牌时,可以用签名校验令牌,那么别人将无法破解。
使用 JWT 创建有效期为一周的令牌。第二天,用户持令牌再次访问时,仍为登录状态。 令牌于一周后过期,届时,用户身份验证就会失败。只有再次登录,才能获得新的令牌。如果用户(或第三方)篡改令牌的过期时间,因为签名不匹配会导致身份验证失败。
📌 安装 python-jose
在 Python 中生成和校验 JWT 令牌:
pip install python-jose[cryptography]
Python-jose 需要安装配套的加密后端。 本教程推荐的后端是:pyca/cryptography,因为 Python-jose 支持 PyJWT 的所有功能,还支持与其它工具集成时可能会用到的一些其它功能。
4.2 流程
① 密码哈希与校验
📌 安装 passlib
Passlib 是处理密码哈希的 Python 包。 它支持很多安全哈希算法及配套工具。 本教程推荐的算法是 Bcrypt。 因此请先安装附带 Bcrypt 的 PassLib:
pip install passlib[bcrypt]
📌 哈希与校验
首先从 passlib 导入所需工具。 创建用于密码哈希和身份校验的 PassLib 上下文。
接下来,创建三个工具函数:
- 第一个函数用于校验接收的密码是否匹配存储的哈希值
- 第二个函数用于哈希用户的密码,也就是给密码加密
- 第三个函数用于根据用户名获取用户信息
- 第四个函数用于身份验证,并返回用户
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
② 处理 JWT 令牌
导入已安装的模块, 创建用于 JWT 令牌签名的随机密钥。 使用以下命令,生成安全的随机密钥:
openssl rand -hex 32
#比如:09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
然后,把生成的密钥复制到变量 SECRET_KEY,我是自己定义了一个 "yinyusecuritykey"。
接着创建设置令牌过期时间的变量,定义令牌端点响应的 Pydantic 模型,创建生成新的访问令牌的工具函数。
from jose import JWTError, jwt
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class Token(BaseModel):
access_token: str
token_type: str
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
③ 更新依赖项
更新 get_current_user 以接收与之前相同的令牌,但这里用的是 JWT 令牌。 解码并校验接收到的令牌,然后返回当前用户。
如果令牌无效,则直接返回 HTTP 错误。
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
#对token进行解码
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
📌 JWT sub 的技术细节
JWT 规范还包括 sub 键,值是令牌的主题。
该键是可选的,但要把用户标识放在这个键里,所以本例使用了该键。 除了识别用户与许可用户在 API 上直接执行操作之外,JWT 还可能用于其它事情,简单来说就是权限校验。
例如,识别汽车或博客。 接着,为实体添加权限,比如驾驶(汽车)或编辑(博客)。 然后,把 JWT 令牌交给用户(或机器人),他们就可以执行驾驶汽车,或编辑博客等操作。无需注册账户,只要有 API 生成的 JWT 令牌就可以。
④ 更新 /token 路径操作
用令牌过期时间创建 timedelta 对象。
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
4.3 实际效果
① 完整代码
#pip install python-jose,pip install passlib[bcrypt]
from datetime import datetime, timedelta
from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "yinyusecuritykey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$Iy1FxkhNXVtoJjDfTVTo9erntJ2FFFPJKZbQH3FcBFijS/zNCzGK.",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
if __name__ == '__main__':
# import uvicorn
# uvicorn.run(app, host="127.0.0.1", port=8000)
print(get_password_hash("yinyusecurity")) #此为“yinyusecurity”加密后的 👇,然后放在fake_users_db中
#$2b$12$Iy1FxkhNXVtoJjDfTVTo9erntJ2FFFPJKZbQH3FcBFijS/zNCzGK.
② 检查
运行服务器并访问文档: http://127.0.0.1:8000/docs,可以看到如下用户界面:
用与上一章同样的方式实现应用授权。 使用如下凭证:
- 用户名: johndoe
- 密码: yinyusecurity
调用 /users/me/ 端点,收到下面的响应:
{
"username": "johndoe",
"email": "johndoe@example.com",
"full_name": "John Doe",
"disabled": false
}
打开浏览器的开发者工具,可查看数据是怎么发送的,而且数据里只包含了令牌。
验证用户的第一个请求会发送密码,并获取访问令牌,但之后不会再发送密码,就只会校验访问令牌了。
注意,请求中 Authorization 响应头的值以 Bearer 开头。
③ 小结
至此,您可以使用 OAuth2 和 JWT 等标准配置安全的 FastAPI 应用。
几乎在所有框架中,处理安全问题很快都会变得非常复杂。 有些包为了简化安全流,不得不在数据模型、数据库和功能上做出妥协。而有些过于简化的软件包其实存在了安全隐患。
FastAPI 不向任何数据库、数据模型或工具做妥协, 开发者可以灵活选择最适合项目的安全机制。