Celery介绍:
核心及优点:1.基于分布式系统架构(负载均衡避免单点故障,高可用) 2.实现了异步任务的调度(快速) 只需要通过配置文件的修改就可以实现架构的切换所以灵活
django-celery-beat 用于定时和周期计划
django-celery-results 用于存储celery的运行结果
folower 用于监控celery的运行状态
使用方法:
1.安装库
pip install celery
pip install redis
2.在项目setting.py同级目录下创建celery.py的文件,添加如下内容
import os
import django
from celery import Celery
from django.conf import settings
# 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错
# djangoProject1.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProject1.settings')
django.setup()
celery_app = Celery('djangoProject1')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
3.在settings同目录下的__init__.py文件下添加如下:
from .celery import celery_app
__all__ = ['celery_app']
4.配置settings
# django-celery 配置的部分
# Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://106.13.1.144:6379/0'
# BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://106.13.1.144:6379/0'
# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
# 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
# 指定导入的任务模块,可以指定多个
# CELERY_IMPORTS = (
# 'other_dir.tasks',
# )
5.settings同目录创建任务文件tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
6.views下调用任务
from django.shortcuts import render , redirect,HttpResponse
from djangoProject1.tasks import *
# Create your views here.
def task_add_view(request):
add.delay(100, 200)
return HttpResponse('调试调试调试djcelery')
7. 安装eventlet
pip install eventlet
8.启动celery
9.celery -A djangoProject1 worker -l debug -P eventlet
-A/--app 要使用的应用程序实例
-n/--hostname 设置自定义主机名
-Q/--queues 指定一个消息队列,该进程只接受此队列的任务
--max-tasks-per-child 配置工作单元子进程在被一个新进程取代之前可以执行的最大任务数量
--max-memory-per-child 设置工作单元子进程被替换之前可以使用的最大内存
-l/--loglevel 定义打印log的等级 DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
--autoscale 池的进程的最大数量和最小数量
-c/--concurrency 同时处理任务的工作进程数量,默认值是系统上可用的cpu数量
-B/--beat 定义运行celery打周期任务调度程序
-h/--help
通过网页请求来调用celery执行任务
报错情况:
使用redis时,有可能会出现如下类似的异常
AttributeError: 'str' object has no attribute 'items'
这是由于版本差异,需要卸载已经安装的python环境中的 redis 库,重新指定安装特定版本(celery4.x以下适用 redis2.10.6, celery4.3以上使用redis3.2.0以上):
所以根据自己的情况安装对应版本
10.获取任务结果
在 views.py 中,通过 AsyncResult.get() 获取结果
from celery import result
from django.http import JsonResponse
def get_result_by_taskid(request):
task_id = request.GET.get('task_id')
# 异步执行
ar = result.AsyncResult(task_id)
if ar.ready():
return JsonResponse({'status': ar.state, 'result': ar.get()})
else:
return JsonResponse({'status': ar.state, 'result': ''})
AsyncResult类的常用的属性和方法:
state: 返回任务状态,等同status;
task_id: 返回任务id;
result: 返回任务结果,同get()方法;
ready(): 判断任务是否执行以及有结果,有结果为True,否则False;
info(): 获取任务信息,默认为结果;
wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
successful(): 判断任务是否成功,成功为True,否则为False;
二、定时任务
1.settings.py
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'mul_every_30_seconds': {
# 任务路径
'task': 'celery_app.tasks.mul',
# 每30秒执行一次
'schedule': 5,
'args': (14, 5)
}
}
说明(更多内容见文档:Periodic Tasks — Celery 5.2.0b3 documentation):
task:任务函数
schedule:执行频率,可以是整型(秒数),也可以是timedelta对象,也可以是crontab对象,也可以是自定义类(继承celery.schedules.schedule)
args:位置参数,列表或元组
kwargs:关键字参数,字典
options:可选参数,字典,任何 apply_async() 支持的参数
relative:默认是False,取相对于beat的开始时间;设置为True,则取设置的timedelta时间
在task.py中设置了日志
from celery import shared_task
import logging
logger = logging.getLogger(__name__))
@shared_task
def mul(x, y):
logger.info('___mul__'*10)
return x * y
2.启动celery
(两个cmd)分别启动worker和beat
celery -A worker celery_study -l debug -P eventlet
celery beat -A celery_study -l debug
3.任务绑定
Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等
方法:
- 在装饰器中加入参数 bind=True
- 在task函数中的第一个参数设置为self
在task.py 里面写
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
# 任务绑定
@shared_task(bind=True)
def add(self,x, y):
logger.info('add__-----'*10)
logger.info('name:',self.name)
logger.info('dir(self)',dir(self))
return x + y
其中:self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
# 任务绑定
@shared_task(bind=True)
def add(self,x, y):
try:
logger.info('add__-----'*10)
logger.info('name:',self.name)
logger.info('dir(self)',dir(self))
raise Exception
except Exception as e:
# 出错每4秒尝试一次,总共尝试4次
self.retry(exc=e, countdown=4, max_retries=4)
return x + y
启动celery
celery -A worker celery_study -l debug -P eventlet
4.任务钩子
Celery在执行任务时,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行)
方法:通过继承Task类,重写对应方法即可,
from celery import Task
class MyHookTask(Task):
def on_success(self, retval, task_id, args, kwargs):
logger.info(f'task id:{task_id} , arg:{args} , successful !')
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}')
# 在对应的task函数的装饰器中,通过 base=MyHookTask 指定
@shared_task(base=MyHookTask, bind=True)
def add(self,x, y):
logger.info('add__-----'*10)
logger.info('name:',self.name)
logger.info('dir(self)',dir(self))
return x + y
启动celery
celery -A worker celery_study -l debug -P eventlet
5.任务编排
在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成,Celery也能实现这样的任务,完成这类型的任务通过以下模块完成:
- group: 并行调度任务
- chain: 链式任务调度
- chord: 类似group,但分header和body2个部分,header可以是一个group任务,执行完成后调用body的任务
- map: 映射调度,通过输入多个入参来多次调度同一个任务
- starmap: 类似map,入参类似*args
- chunks: 将任务按照一定数量进行分组
文档:Next Steps — Celery 5.2.0b3 documentation
1.group
urls.py:
path('primitive/', views.test_primitive),
views.py:
from .tasks import *
from celery import group
def test_primitive(request):
# 创建10个并列的任务
lazy_group = group(add.s(i, i) for i in range(10))
promise = lazy_group()
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
说明:
通过task函数的 s 方法传入参数,启动任务
上面这种方法需要进行等待,如果依然想实现异步的方式,那么就必须在tasks.py中新建一个task方法,调用group,示例如下:
tasks.py:
@shared_task
def group_task(num):
return group(add.s(i, i) for i in range(num))().get()
urls.py:
path('first_group/', views.first_group),
views.py:
def first_group(request):
ar = tasks.group_task.delay(10)
return HttpResponse('返回first_group任务,task_id:' + ar.task_id)
2.chain
默认上一个任务的结果作为下一个任务的第一个参数
def test_primitive(request):
# 等同调用 mul(add(add(2, 2), 5), 8)
promise = chain(tasks.add.s(2, 2), tasks.add.s(5), tasks.mul.s(8))()
# 72
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
3.chord
任务分割,分为header和body两部分,hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body
def test_primitive(request):
# header: [3, 12]
# body: xsum([3, 12])
promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
6、celery管理和监控
celery通过flower组件实现管理和监控功能 ,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理
官网:flower · PyPI
文档:Flower - Celery monitoring tool — Flower 1.0.1 documentation
安装flower
pip install flower
启动flower
flower -A celery_study --port=5555
说明:
- -A:项目名
- --port: 端口号
访问
在浏览器输入:http://127.0.0.1:5555
通过api操作
curl http://127.0.0.1:5555/api/workers