django中使用celery

news2025/1/23 3:59:32

Celery介绍:

核心及优点:1.基于分布式系统架构(负载均衡避免单点故障,高可用) 2.实现了异步任务的调度(快速)  只需要通过配置文件的修改就可以实现架构的切换所以灵活

django-celery-beat 用于定时和周期计划

django-celery-results 用于存储celery的运行结果

folower 用于监控celery的运行状态

使用方法:

1.安装库

pip install celery

pip install redis

2.在项目setting.py同级目录下创建celery.py的文件,添加如下内容

import os
import django
from celery import Celery
from django.conf import settings

# 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错
# djangoProject1.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProject1.settings')
django.setup()

celery_app = Celery('djangoProject1')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

3.在settings同目录下的__init__.py文件下添加如下:

from .celery import celery_app

__all__ = ['celery_app']

4.配置settings

# django-celery  配置的部分
# Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://106.13.1.144:6379/0'

# BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://106.13.1.144:6379/0'

# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'

# 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'

# 指定导入的任务模块,可以指定多个
# CELERY_IMPORTS = (
#    'other_dir.tasks',
# )

5.settings同目录创建任务文件tasks.py

from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

6.views下调用任务

from django.shortcuts import render , redirect,HttpResponse
from djangoProject1.tasks import *

# Create your views here.

def task_add_view(request):
    add.delay(100, 200)
    return HttpResponse('调试调试调试djcelery')

7. 安装eventlet

pip install eventlet

8.启动celery

9.celery -A djangoProject1 worker -l debug -P eventlet

-A/--app                要使用的应用程序实例
-n/--hostname          设置自定义主机名
-Q/--queues                指定一个消息队列,该进程只接受此队列的任务
--max-tasks-per-child  配置工作单元子进程在被一个新进程取代之前可以执行的最大任务数量
--max-memory-per-child 设置工作单元子进程被替换之前可以使用的最大内存
-l/--loglevel          定义打印log的等级 DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
--autoscale                池的进程的最大数量和最小数量
-c/--concurrency       同时处理任务的工作进程数量,默认值是系统上可用的cpu数量
-B/--beat              定义运行celery打周期任务调度程序
-h/--help  

通过网页请求来调用celery执行任务

报错情况:

使用redis时,有可能会出现如下类似的异常

AttributeError: 'str' object has no attribute 'items'

这是由于版本差异,需要卸载已经安装的python环境中的 redis 库,重新指定安装特定版本(celery4.x以下适用 redis2.10.6, celery4.3以上使用redis3.2.0以上):

所以根据自己的情况安装对应版本

10.获取任务结果

在 views.py 中,通过 AsyncResult.get() 获取结果

from celery import result
from django.http import JsonResponse
def get_result_by_taskid(request):
    task_id = request.GET.get('task_id')
# 异步执行
    ar = result.AsyncResult(task_id)
 
    if ar.ready():
        return JsonResponse({'status': ar.state, 'result': ar.get()})
    else:
        return JsonResponse({'status': ar.state, 'result': ''})

AsyncResult类的常用的属性和方法:

state: 返回任务状态,等同status;

task_id: 返回任务id;

result: 返回任务结果,同get()方法;

ready(): 判断任务是否执行以及有结果,有结果为True,否则False;

info(): 获取任务信息,默认为结果;

wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;

successful(): 判断任务是否成功,成功为True,否则为False;

二、定时任务

1.settings.py

from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'mul_every_30_seconds': {
         # 任务路径
        'task': 'celery_app.tasks.mul',
         # 每30秒执行一次
        'schedule': 5,
        'args': (14, 5)
    }
}

说明(更多内容见文档:Periodic Tasks — Celery 5.2.0b3 documentation):

task:任务函数

schedule:执行频率,可以是整型(秒数),也可以是timedelta对象,也可以是crontab对象,也可以是自定义类(继承celery.schedules.schedule)

args:位置参数,列表或元组

kwargs:关键字参数,字典

options:可选参数,字典,任何 apply_async() 支持的参数

relative:默认是False,取相对于beat的开始时间;设置为True,则取设置的timedelta时间

在task.py中设置了日志

from celery import shared_task
import logging  
logger = logging.getLogger(__name__))
 
 
@shared_task
def mul(x, y):
    logger.info('___mul__'*10)
    return x * y

2.启动celery

(两个cmd)分别启动worker和beat

celery -A worker celery_study -l debug -P eventlet
celery beat -A celery_study -l debug

3.任务绑定

Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等

方法:

  • 在装饰器中加入参数 bind=True
  • 在task函数中的第一个参数设置为self

在task.py 里面写

from celery import shared_task
import logging  
logger = logging.getLogger(__name__)
 
 
# 任务绑定
@shared_task(bind=True)
def add(self,x, y):
    logger.info('add__-----'*10)
    logger.info('name:',self.name)
    logger.info('dir(self)',dir(self))
    return x + y

其中:self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能

from celery import shared_task
import logging  
logger = logging.getLogger(__name__)
 
 
# 任务绑定
@shared_task(bind=True)
def add(self,x, y):
    try:
        logger.info('add__-----'*10)
        logger.info('name:',self.name)
        logger.info('dir(self)',dir(self))
        raise Exception
    except Exception as e:
        # 出错每4秒尝试一次,总共尝试4次
        self.retry(exc=e, countdown=4, max_retries=4)    
    return x + y

启动celery

celery -A worker celery_study -l debug -P eventlet

4.任务钩子

Celery在执行任务时,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行)

方法:通过继承Task类,重写对应方法即可,

from celery import Task
 
class MyHookTask(Task):
 
    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'task id:{task_id} , arg:{args} , successful !')
 
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')
 
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , retry !  erros: {exc}')
 
 
 
 
# 在对应的task函数的装饰器中,通过 base=MyHookTask 指定
@shared_task(base=MyHookTask, bind=True)
def add(self,x, y):
    logger.info('add__-----'*10)
    logger.info('name:',self.name)
    logger.info('dir(self)',dir(self))
    return x + y

启动celery

celery -A worker celery_study -l debug -P eventlet

5.任务编排

在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成,Celery也能实现这样的任务,完成这类型的任务通过以下模块完成:

  • group: 并行调度任务
  • chain: 链式任务调度
  • chord: 类似group,但分header和body2个部分,header可以是一个group任务,执行完成后调用body的任务
  • map: 映射调度,通过输入多个入参来多次调度同一个任务
  • starmap: 类似map,入参类似*args
  • chunks: 将任务按照一定数量进行分组

文档:Next Steps — Celery 5.2.0b3 documentation

1.group

urls.py:

path('primitive/', views.test_primitive),

views.py:

from .tasks import *
from celery import group
 
 
 
def test_primitive(request):
    # 创建10个并列的任务
    lazy_group = group(add.s(i, i) for i in range(10))
    promise = lazy_group()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

说明:

通过task函数的 s 方法传入参数,启动任务

上面这种方法需要进行等待,如果依然想实现异步的方式,那么就必须在tasks.py中新建一个task方法,调用group,示例如下:

tasks.py:

@shared_task
def group_task(num):
    return group(add.s(i, i) for i in range(num))().get()

urls.py:

path('first_group/', views.first_group),

views.py:

def first_group(request):
ar = tasks.group_task.delay(10)
return HttpResponse('返回first_group任务,task_id:' + ar.task_id)

2.chain

默认上一个任务的结果作为下一个任务的第一个参数

def test_primitive(request):
# 等同调用  mul(add(add(2, 2), 5), 8)
promise = chain(tasks.add.s(2, 2), tasks.add.s(5), tasks.mul.s(8))()
#  72
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})

3.chord

任务分割,分为header和body两部分,hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body

def test_primitive(request):
# header:  [3, 12]
# body: xsum([3, 12])
promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})

6、celery管理和监控

celery通过flower组件实现管理和监控功能 ,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理

官网:flower · PyPI

文档:Flower - Celery monitoring tool — Flower 1.0.1 documentation

安装flower

pip install flower

启动flower

flower -A celery_study --port=5555

说明:

  • -A:项目名
  • --port: 端口号

访问

在浏览器输入:http://127.0.0.1:5555

通过api操作

curl http://127.0.0.1:5555/api/workers

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/603528.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Oracle中的循环

目录 一、简单循环 1.1LOOP 循环语法: 1.2LOOP 循环示例 二、for循环 2.1for循环语法: 2.2for循环示例 三、while循环 3.1while循环语法 3.2while循环示例 四、GOTO 循环 4.1GOTO 循环语法 4.2GOTO 循环示例 在 Oracle 数据库中,…

储能之动力电池与储能电池区别?

储能之动力电池与储能电池区别 1、概念1.1 动力电池1.2 储能电池 2、应用场景3、动力电池与储能电池的对比3.1 性能要求3.2 循环次数3.3 电池类型方面3.4 成本结构不同 1、概念 1.1 动力电池 动力电池即为工具提供动力来源的电源,多指为电动汽车、电动列车、电动自…

Oracle中ORA-12560:协议适配器错误

平时在长时间未登录Oracle数据库,再次登录时会出现如下错误: 当Oracle登录时出现12560协议适配器错误时,可以通过以下步骤尝试启动相应的服务: 第一步: 打开本地【服务】,点击最顶层的名称输入【O】&…

java-字符流和字节流(三)

java-字符流和字节流(三) 一、IO特殊操作流 1.1 标准流 1.1.1 标准输入流 System类中有两个静态的成员变量 public static final InputStream in:标准输入流。通常该流对应于键盘输入或由主机环境或用户指定的另一个输入源public static final PrintStream out&am…

【StringBuilder类】添加和反转方法以及StringBuilder和String相互转换

StringBuilder类 如果对字符串进行拼接操作,每次拼接都会构建一个新的String对象,既耗时又浪费内存空间,而这种操作还不可避免。我们可以通过Java提供的StringBuilder类来解决这个问题。StringBuilder是一个可变的字符串类,我们可…

java-基础语法(一)

java-基础语法(一) 一、java变量 1.1、注释 单行注释 // // 这是单行注释文字多行注释 /* *//* 这是多行注释文字 这是多行注释文字 这是多行注释文字 */ 注意:多行注释不能嵌套使用。1.2 常量 常量:在程序运行过程中,其值不可以发生改变的…

Arthas-monitor/watch/trace 相关命令使用

tip:作为程序员一定学习编程之道,一定要对代码的编写有追求,不能实现就完事了。我们应该让自己写的代码更加优雅,即使这会费时费力。 开头: 本章所有的命令都非常重要,都是使用率相当高的。 文章目录 moni…

python---列表

列表 1. 列表的介绍1.1 访问列表元素1.2 索引从0而不是1开始1.3 使用列表中的各个值1.4 修改、添加和删除元素1.4.1 修改列表元素 1.5 在列表中添加元素1.5.1 在列表末尾添加元素1.5.2 在列表中插入元素 1.6 从列表中删除元素1.6.1 使用方法pop()删除元素1.6.2 弹出列表中任何位…

【TreeSet集合】自然排序Comparator的使用

自然排序Comparator的使用 存储学生对象并遍历,创建TreeSet集合使用无参构造方法 要求:按照年龄从小到大排序,年龄相同时,按照姓名的字母顺序排序 创建学生类: package com.gather.set.treeset; public class Student…

【利用AI让知识体系化】前端开发学习了解业务架构

文章目录 I. 前端技术入门1.1 HTML/CSS/Javascript 简介1.2 前端框架 React/Vue/Angular 了解1.3 前端工具 Git/Webpack/npm/yarn 的使用1.4 前端调试和性能优化技巧 II. 开发综合应用2.1 工程化开发的全流程2.2 单页面应用 (SPA)2.3 数据交互和批量操作2.4 模块化和组件化开发…

压缩感知重构之匹配追踪算法

算法的重构是压缩感知中重要的一步,是压缩感知的关键之处。因为重构算法关系着信号能否精确重建,国内外的研究学者致力于压缩感知的信号重建,并且取得了很大的进展,提出了很多的重构算法,每种算法都各有自己的优缺点&a…

[golang 微服务] 4. gRPC介绍,Protobuf结合gRPC 创建微服务

一.gRPC框架的介绍 简介 gRPC是一个 高性能、 开源和 通用的 RPC 框架, 面向移动端和 HTTP/2 设计,目前提供 C、Java 和 Go语言版本,分别是:grpc, grpc-java, grpc-go,其中 C 版本支持 C, C, Node.js, Python, Ruby, Objective-C, PHP 和 C# …

Windows Pyqt5配置环境过程(pycharm Anaconda)

必要安装 Anaconda下载地址 Pycharm下载地址 这两个推荐2019年左右的版本就行了,安装的时候选择“add path” Anaconda换源 换源之后叉掉终端之后再创建环境 Anaconda常用命令 Anaconda换源应该是只对conda install 有用,pip还要换源 使用清华源进行…

java-集合

java-集合 一、集合体系结构 集合类的特点 ​ 提供一种存储空间可变的存储模型,存储的数据容量可以随时发生改变 集合类的体系图 ​ 二、单列集合 2.1 Collection集合(接口) Collection集合概述 是单列集合的顶层接口,它表示一…

汇编栈寄存器SS与SP使用

入栈时,栈段地址与偏移地址计算 使用a命令输入下面汇编,然后使用u命令查看 写入汇编指令到内存 修改CS:IP指向当前代码段 使用t命令执行汇编指令,详细执行如下图标号 注意每行指令执行后寄存器变化. 取内存段单元数据 将内存段单元数据送入寄存器, 多次送入数据到同一寄存…

chatgpt赋能python:Python分词处理的重要性

Python分词处理的重要性 随着互联网的飞速发展,大数据的普及与应用越来越广泛。人们需要从海量的数据中找到自己需要的信息。因此,自然语言处理技术被广泛应用,其中分词技术是自然语言处理中最基础的一项技术。 在这个领域中,Py…

JS逆向——借助playwright实现逆向

原理: 1.修改js文件,将加密方法设定为全局变量并调用。 2.使用playwright替换浏览器加载的js文件 3.在python中通过playwright实现js注入获取加密结果 实现: 以下面链接为例: Scrape | Movie 1.首先,要知道页面…

方法引用相关知识点

这里写目录标题 方法引用方法引用符简介代码演示 Lambda表达式支持的方法引用引用 类方法简介使用 引用对象的实例方法简介操作 引用类的实例方法简介具体代码 引用构造器简介代码演示 二级目录二级目录二级目录二级目录二级目录二级目录 方法引用 方法引用符 简介 注意 这里…

JDK SPI、Spring SPI、Dubbo SPI三种机制的细节与演化

JDK SPI、Spring SPI、Dubbo SPI三种机制的细节与演化 SPI机制 SPI机制的应用 JDBC中加载驱动 Spring SPI Dubbo SPI SPI深入理解 API与SPI的区别 ServiceLoader JDK SPI、Spring SPI、Dubbo SPI综合对比 SPI机制 Java SPI(Service Provider Interface&am…

Linux4.3Apache配置与应用

文章目录 计算机系统5G云计算第一章 LINUX Apache配置与应用及网页优化一、构建虚拟 Web 主机二、基于域名的虚拟主机1.为虚拟主机提供域名解析2.为虚拟主机准备网页文档3.添加虚拟主机配置4.设置访问控制5.Options指令解释6.AllowOverride指令解释7.地址限制策略8.加载独立的配…