提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 一、准备
- 二、配置
- 1.引入库
- 2.代码编写
- a、在settings.py文件下添加如下代码
- b、在项目主目录下创建celery.py文件
- c、在项目的__init__.py里面添加如下代码![在这里插入图片描述](https://img-blog.csdnimg.cn/dddaae97b4f0426cb089c3c1455ef3ce.png)
- d、创建一个应用来调用异步任务,我这里创建名为app01的任务
- e、在应用目录下面创建任务task.py
- f、在app01下的views编写视图用于调用定时任务
- g、运行
- h、异步任务实验
- 三、定时任务实现
- 1、目标
- 2、代码编写
- a、在settings.py里面添加如下代码
- b、在app01/tasks.py里面添加如下代码
- c、运行
- 问题总结
前言
之前做项目的时候需要使用celery,因此研究了一下用法,现在记录下来供以后参考。
django的任务都是同步的,如果遇到一些比较耗时的任务,就会一直卡在任务里面,无法进去其他的操作,影响用户体验。所以需要使用celery异步执行耗时操作,在不影响用户操作的同时还能完成比较耗时的任务
在编写代码过程中遇到的一些错误会在文章末尾总结,遇到问题可以先参考一下。
一、准备
python:3.7
django:3.2.19
celery:5.2.7
redis:4.5.5
电脑上需要安装redis数据库
创建一个能运行的django项目
二、配置
我的项目目录结构如下
1.引入库
安装celery和redis
pip install celery
pip install redis
2.代码编写
a、在settings.py文件下添加如下代码
# Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/0'
# BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
# 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
# 指定导入的任务模块,可以指定多个
CELERY_IMPORTS = (
'app01.tasks',
)
b、在项目主目录下创建celery.py文件
from __future__ import absolute_import, unicode_literals
import os
import django
from celery import Celery
from django.conf import settings
# 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错
# django_celery_demo 是当前项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')
django.setup()
celery_app = Celery('django_celery_demo')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
c、在项目的__init__.py里面添加如下代码
from __future__ import absolute_import, unicode_literals
from .celery import celery_app
__all__ = ['celery_app']
d、创建一个应用来调用异步任务,我这里创建名为app01的任务
python manage.py startapp app01
e、在应用目录下面创建任务task.py
task.py
from time import sleep
from celery import shared_task
@shared_task
def async_task(x, y):
sleep(10) # 睡眠10秒,这里模拟耗时操作
print(x + y)
f、在app01下的views编写视图用于调用定时任务
views.py
from django.shortcuts import HttpResponse
from .tasks import async_task
# Create your views here.
def task_add_view(request):
async_task.delay(100, 200)
return HttpResponse("函数调用结果")
urls.py
from django.urls import path
from .views import task_add_view
urlpatterns = [
path('async_task/', task_add_view),
]
g、运行
运行定时任务,需要依赖eventlet,先下载eventlet
pip install eventlet
然后,启动redis服务器
输入下面命令启动异步任务,注意:django_celery_demo是项目名,请根据实际项目名输入
celery -A django_celery_demo worker -l debug -P eventlet
看到下面结果就说明能成功执行:
h、异步任务实验
运行django项目
这里我访问了三次该链接:
可以看到命令窗口中输出三次执行结果,每次执行结果10秒左右,说明任务运行成功,也没有阻塞主线程的执行。由于这里没有返回值,所以返回值为None(在秒数后面展示)
三、定时任务实现
定时任务也和简单,在前面的代码基础上添加代码即可
1、目标
每隔10秒往scheduled_task.txt中写入一段记录
2、代码编写
a、在settings.py里面添加如下代码
# 定时任务
CELERYBEAT_SCHEDULE = {
'mul_every_10_seconds': {
# 任务路径
'task': 'app01.tasks.scheduled_task',
# 每10s执行一次
'schedule': timedelta(seconds=10),
}
}
b、在app01/tasks.py里面添加如下代码
@shared_task(bind=True)
def scheduled_task(self):
with open("scheduled_task.txt", "a", encoding="utf-8") as f:
time_str = time.strftime("%Y年%m月%d日 %H时%M分%S秒", time.localtime())
data = "记录一条消息,时间:" + time_str + '\n'
f.write(data)
f.close()
print(time_str)
c、运行
先运行下面命令(就是上面执行异步任务的命令,如果上面运行之后没关闭的话直接进行下一步)
celery -A django_celery_demo worker -l debug -P eventlet
然后新开一个命令行窗口输入如下命令:
celery -A django_celery_demo beat -l debug
看到下面结果说明运行成功:
然后项目根路径会生成一个scheduled_task.txt文件,打开可以看到下面内容:
确实是每隔10秒执行一次。
问题总结
一、报错:ImportError: cannot import name ‘Celery’ from ‘celery’
解决:解决:将importlib-metadata包的版本降至4.13.0
二、报错:ImportError: cannot import name ‘current_app’ from ‘celery’
解决:安装django-celery-beat和django-celery-results包,我用的都是2.5.0版本