因为项目是后期引入celery,所以导致构建docker的时候只有fastapi的项目,celery的重启比较麻烦
1.docker安装celery
pip install celery
安装celery的时候注意python版本与celery版本的适配,有些celery的版本不支持python的版本,具体的版本请看celery官网里面的版本信息
2.在工程目录中创建celery的启动文件,这里我创建的是tasks.py 文件
from celery import Celery
celery_app = Celery("worker",
broker="redis://:frasergen2022@192.168.2.189:26379/0",
backend="redis://:frasergen2022@192.168.2.189:26379/0",
include=["apps.tools.my_celery.__init__"]
)
这是celery的启动文件,里面最好不要引入你的项目里面的变量,如果引入你项目里的变了,后期去定义后台任务的时候,引入celery_app会陷入到循环引用的深坑
3.测试celery
celery -A tasks.celery_app worker --loglevel=info
4.可以后台启动celery
celery multi start w1 -A tasks.celery_app -l info --logfile=celerylog.log
这里不用启动,后续会用看门狗(watchdog)监控任务文件,如果文件修改,会重启celery
5.安装看门狗(watchdog)
pip install watchdog
参考链接:https://whoosy.cn/2019/08/01/Celery/celery使用/
后台启动watchdog
nohup watchmedo auto-restart --directory=/data/cloud_platform/apps/tools/my_celery/ --pattern=*.py --recursive -- celery -A tasks.celery_app worker --loglevel=info --logfile=celerylog.log > watchmedo.log 2> watchmedo.elog &
–directory : 监控路径
–pattern: 监控文件后缀
6.配置celery后台任务
1.配置tortoise-orm数据库连接
import asyncio
from tortoise import Tortoise
from celery.signals import worker_process_init, worker_process_shutdown
from apps.models import User
from tasks import celery_app
from apps.tools.db_config import ORM_LINK_CONF
async def init_db():
await Tortoise.init(
config=ORM_LINK_CONF
)
@worker_process_init.connect
def on_worker_init(*args, **kwargs):
print('初始化数据库')
from celery._state import _task_stack
if _task_stack.top is not None:
loop = _task_stack.top.request.loop
else:
loop = asyncio.get_event_loop()
loop.run_until_complete(init_db())
@worker_process_shutdown.connect
def on_worker_shutdown(*args, **kwargs):
print('关闭数据库')
from celery._state import _task_stack
if _task_stack.top is not None:
loop = _task_stack.top.request.loop
else:
loop = asyncio.get_event_loop()
loop.run_until_complete(Tortoise.close_connections())
2.配置后台任务
@celery_app.task(name="get_user_task")
def get_user_task(*args, **kwargs):
asyncio.get_event_loop().run_until_complete(_get_user_task())
async def _get_user_task():
user = await User.filter().all()
for item in user:
print(item.nickname, item.username)
print(f"{item.phone=}")
代码上传到docker后,celery会重启
7.测试接口+后台异步任务
@router.get("/get_user_info", summary="测试后台任务获取用户信息")
async def get_user_info():
get_user_task.delay()
return res()
结果: