最近需要用到,学习了一下记录
首先安装依赖
pip install Flask-HTTPAuth pyjwt passlib
Welcome to Flask-HTTPAuth’s documentation! — Flask-HTTPAuth documentation
Welcome to PyJWT — PyJWT 2.8.0 documentation
Passlib 1.7.4 documentation — Passlib v1.7.4 Documentation
实现验证token的方法,
from dependency_injector.wiring import inject, Provide
from flask_httpauth import HTTPTokenAuth
import jwt
from flask import current_app
auth_provider = HTTPTokenAuth(scheme='Bearer')
@auth_provider.verify_token
def verify_token(token):
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
username=payload['username']
try:
user = self.user_repository.get_by_name(username)
except EntityNotFoundError:
current_app.logger.error(f'====== Login user not found, name : "+{username} ======')
return False
else:
return user
except jwt.exceptions.DecodeError as e:
current_app.logger.error('====== Decode JWT Failed ======')
return False
except jwt.exceptions.ExpiredSignatureError:
current_app.logger.error('====== JWT Already Expired ======')
return False
这样的话,只需要引入之前定义的auth_provider,然后在需要登录之后才能访问的api上加入@auth_provider.login_required,当然auth_provider这个变量名是自己取的,你怎么定义的就用什么就行了。这样的话访问该api的时候就会自动去拿jwt验证。
访问的时候需要在headers放Authorization为key,然后Bearer+token为value,Bearer这个指定是在定义auth_provider = HTTPTokenAuth(scheme='Bearer')指定的。
登录的方法可以看到,验证密码之后就会生成jwt返回 ,前端接到这个jwt之后,就会放在之后的请求内
from flask import Blueprint, jsonify, request,current_app
from dependency_injector.wiring import inject, Provide
from main.containers import Container
from main.security.jwt_filter import auth_provider
import datetime
from passlib.apps import custom_app_context as pwd_context
auth_bp=Blueprint('auth',__name__)
@auth_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username', None)
password = data.get('password', None)
if not username or not password:
current_app.logger.error(f'Login failed with missing username or password')
return jsonify({'error': 'Missing username or password'}), 400
try:
user=self.user_repository.get_by_name(username,session)
except EntityNotFoundError:
current_app.logger.error(f'====== Login user not found, name : "+{username} ======')
return jsonify({'error': 'User does not exist'}), 401
if pwd_context.verify(password, user.password):
jwt = self._generate_token(user.user_id, username)
return jsonify({'token': jwt}), 200
else:
current_app.logger.error(f'{username} login failed.')
return jsonify({'error': 'Username and password does not match'}), 401
@auth_bp.route('/account', methods=['GET'])
@auth_provider.login_required
def account():
return auth_provider.current_user().to_response(),200
def _generate_token(self, user_id, username)->any:
expire_time = datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=self.access_token_expire_minutes)
payload = {
'exp': expire_time, #失效时间
'iat': datetime.datetime.now(datetime.UTC), #生效时间
'sub': user_id,
'username': username
}
token = jwt.encode(payload, 'A05qYNxO/ka9lzjtB5WVJ6li/KHM91fMMT+4Wm6xRBKMivll2kMmR+cJWyEE0bh4PcwI/9LUwwlzZ0pjkFepTg==', algorithm='HS256')
return token
jwt过期时间你可以根据需求自己定义
生成密码的方式在这里
from passlib.apps import custom_app_context as pwd_context
TEST_USERS_ARRAY= [
{
"id": uuid.uuid1().hex,
"username":"admin",
"password": pwd_context.encrypt("admin")
},
{
"id": uuid.uuid1().hex,
"username":"user1",
"password": pwd_context.encrypt("user1")
}
]
from flask import jsonify
from sqlalchemy import Column, String
from main.db_access.domain.common_entity import CommonEntity
class User(CommonEntity):
__tablename__ = "TBL_AA_USER"
user_id = Column(String, primary_key=True)
username = Column(String)
password = Column(String)
def __init__(self,user_id,username,password):
self.user_id=user_id
self.username=username
self.password=password
def to_dict(self):
return {'user_id':self.user_id,'username':self.username}
def to_response(self):
data = self.to_dict()
response = jsonify(data)
response.headers['Content-Type'] = 'application/json'
return response