源码见:"fastapi_study_road-learning_system_online_courses: fastapi框架实战之--在线课程学习系统"
接上一篇文章FastAPI(六十六)实战开发《在线课程学习系统》接口开发--用户注册接口开发。这次我们分享实际开发--用户登陆接口开发。
我们先来梳理下逻辑:
1.查询用户是否存在
2.校验密码是否正确
3.密码校验失败记录失败次数
4.60分钟内失败次数大于等于3次,60分钟内不能登陆
5.密码校验通过产生对应的token返回
接着我们去设置pydantic登录参数校验模型,同样添加到user_schemas.py中
class UserLogin(UserBase):
"""登录校验模型"""
password: str = Field(min_length=8, max_length=16)
这里我们继承的是之前的UserBase。
对应操作数据库查询用户的逻辑我们使用之前注册的时候使用的get_by_username即可。
我们把密码输入失败和token的值放在redis中,那么redis对应的配置,我们在搭建架构时已经配置好了,都放在了.env中:
ENV = "DEV"
# mysql
MYSQL_HOST = "10.30.10.36"
MYSQL_PORT = 3306
MYSQL_USERNAME = "root"
MYSQL_PASSWORD = "123456"
MYSQL_DB_DEV = "learn_onsite_system_dev"
MYSQL_DB_TEST = "learn_onsite_system_test"
MYSQL_DB_PRO = "learn_onsite_system_pro"
# redis
REDIS_HOST = "10.30.10.36"
REDIS_PORT = "6379"
REDIS_DB = "0"
而且redis初始化相关逻辑之前我是放在了mian.py主文件中,今天我将其单独提取出来维护
"""
-*- encoding=utf-8 -*-
Time: 2024/7/22 16:02
Author: lc
Email: 15101006331@163.com
File: redis.py
"""
from aioredis import Redis, create_redis_pool
from settings.config import REDIS_CONFIG
async def create_redis() -> Redis:
return await create_redis_pool(
f"redis://:@{REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}/{REDIS_CONFIG['db']}?encoding=utf-8")
再将其导入到main.py中
from middlewares.redis import create_redis
@app.on_event("startup")
async def startup_event():
app.state.redis = await create_redis()
print("init redis success")
create_tables()
print("init database success")
init_roles()
print("init roles success")
@app.on_event("shutdown")
async def shutdown_event():
app.state.redis.close()
await app.state.redis.wait_closed()
print("redis closed")
我们把token相关配置也配置进去
ENV = "DEV"
# mysql
MYSQL_HOST = "10.30.10.36"
MYSQL_PORT = 3306
MYSQL_USERNAME = "root"
MYSQL_PASSWORD = "123456"
MYSQL_DB_DEV = "learn_onsite_system_dev"
MYSQL_DB_TEST = "learn_onsite_system_test"
MYSQL_DB_PRO = "learn_onsite_system_pro"
# redis
REDIS_HOST = "10.30.10.36"
REDIS_PORT = "6379"
REDIS_DB = "0"
# TOKEN
SECRET_KEY = "08d25e094faa6ca2556c819756bhj9563b93f7099f6f0f4xxd6cf93b33e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
那么产生token的代码如何实现呢
from jose import JWTError, jwt
from settings.config import TOKEN_CONFIG
def create_access_token(data: dict):
"""产生token"""
to_encode = data.copy()
encoded_jwt = jwt.encode(to_encode, TOKEN_CONFIG["secret_key"], algorithm=TOKEN_CONFIG["algorithm"])
return encoded_jwt
接下来就是根据逻辑去实现具体的登录逻辑了,在user_method.py中增加如下方法:
async def verify_login(request: Request, user: UserLogin, db: Session):
logger.info("登录开始了")
db_user = get_by_username(db, user.username)
if not db_user:
logger.warning(f"用户:’{user.username}‘ 不存在")
return response(code=100205, message="用户不存在")
verify = verify_password(user.password, db_user.password)
if verify:
redis_user = await request.app.state.redis.get(user.username)
if not redis_user:
try:
token = create_access_token(data={"sub": user.username})
except:
logger.warning(f"method verify_login error: {format_exc()}")
return response(code=100203, message="生产token失败")
await request.app.state.redis.set(user.username, token, expire=TOKEN_CONFIG["access_token_expire_time"])
return response()
return response(code=100202, message="重复登录")
else:
error_key = user.username + "_password"
result = await request.app.state.redis.hgetall(error_key, encoding="utf-8")
# 没有查到认为是第一次出现错误,将次数设置为1,时间设置为当前时间
if not result:
current_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
await request.app.state.redis.hmset_dict(error_key, num=1, time=current_time)
return response(code=100206, message="密码错误")
# 查到则不是第一次,要分多重情况
else:
error_num = int(result["num"])
num_time = (datetime.now() - datetime.strptime(result["time"], "%Y-%m-%d %H:%M:%S")).seconds / 60
# 60分钟内错误没达到3次,错误次数加1
if error_num < 3 and num_time < 60:
error_num += 1
await request.app.state.redis.hmset_dict(error_key, num=error_num)
return response(code=100206, message="密码错误")
# 超60分钟没有达到3次,错误次数重置为1,时间设置为当前
elif error_num < 3 and num_time > 60:
error_num = 1
num_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
await request.app.state.redis.hmset_dict(error_key, num=error_num, time=num_time)
return response(code=100206, message="密码错误")
# 60分钟内错误超过3次,错误次数加1,限制60分钟内不可以登录
elif error_num >= 3 and num_time < 60:
error_num += 1
await request.app.state.redis.hmset_dict(error_key, num=error_num)
return response(code=100204, message="输入密码错误次数过多,账号暂时锁定,请60分钟后再来登录")
# 超60分钟,如果再次输错,将错误次数重置为1,时间设置为当前时间
else:
error_num = 1
num_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
await request.app.state.redis.hmset_dict(error_key, num=error_num, time=num_time)
return response(code=100206, message="密码错误")
接下来,在user.py中增加我们的登录接口
@user_router.post("/login", summary="登录")
async def login(request: Request, user: UserLogin, db: Session = Depends(create_db)):
return await verify_login(request, user, db)
测试:
①:成功
②:密码错误
③:60分钟内连续3次登录错误
至此,我们的登录接口就完成了