Django高级扩展之celery使用

news2024/11/17 10:00:45

Celery是一个简单、灵活、可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。是一个专注于实时处理的任务队列,同时还支持任务调度。

目录

应用场景

问题

解决

celery架构图

安装

配置celery

Settings.py配置

创建celery

修改__init__

开启celery

异步执行

创建任务文件

视图

路由

演示

启动django

重启celery

浏览器访问

获取执行结果

AsyncResult属性和方法

演示

视图

路由

浏览器访问

Celery报错

kombu.async.timer

cannot import name 'current_app'

AttributeError

定时执行

增加配置

多个任务执行

创建执行方法

启动celery

绑定任务

实现绑定

实现错误重试

celery管理和监控

安装

运行命令

运行flower

浏览器访问

总结

参考文章


应用场景

问题

1.用户发起请求,需要等待响应返回;但是在视图中如果有一些耗时操作,可能导致用户会等待很长时间,这样用户体验非常不好。

2.网站隔一段时间要同步一次数据,但是http请求是需要触发的。

解决

使用celery来解决:耗时的操作方法在celery中异步执行;还可以使用celery定时执行。

 

celery架构图

celery由以下四部分构成:

任务(Task)、代理(Broker)、任务执行(Worker)、结果存储(Backend)。

 

安装

命令如下:

pip install celery
pip install redis

# window下安装,linux下不需要
pip install eventlet

 

配置celery

Settings.py配置

Settings.py中在最下面配置

# celery配置
# Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/1'
# 若有密码这样配置
# BROKER_URL = 'redis://:pwd@127.0.0.1:6379/1'

# BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 若有密码这样配置
# CELERY_RESULT_BACKEND = 'redis://:pwd@127.0.0.1:6379/1'

# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'

# 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'

# 指定导入的任务模块,可以指定多个
# CELERY_IMPORTS = (
#    'other_dir.tasks',
# )

 

创建celery

在工程目录下的project目录下创建celery.py文件。

内容如下:

import os
from celery import Celery
from django.conf import settings


# 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错

# project 是当前工程目录名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')

celery_app = Celery('project')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

修改__init__

在工程目录下project的__init__文件中添加

from .celery import celery_app

__all__ = ['celery_app']

开启celery

Window命令如下:

celery  -A project worker  -l debug -P eventlet

pip 安装eventlet,在启动celery的参数加上eventlet,

原理是windows不支持celery的进程执行。

Linux 命令如下:

celery  -A project worker  -l

成功截图如下:

Redis数据库 

 

异步执行

创建任务文件

在子应用目录下创建tasks.py来执行任务(写耗时异步执行)

内容如下:

from celery import shared_task
import time



@shared_task
def add(x, y):
    print('添加一个事件')
    time.sleep(10)
    print(x, y)
    print('事件执行完成')

视图

应用视图views.py中增加引入和视图方法

from .tasks import *


def task_add(request):
    add.delay(100, 200)
    return HttpResponse(f'调用函数结果')

路由

urlpatterns = [
    # celery
    path(r'task_add', views.task_add, name='task_add'),
]

演示

启动django

python manage.py runserver

重启celery

celery  -A project worker  -l debug -P eventlet

浏览器访问

然后浏览器访问task路由。

 通过worker的控制台,可以看到任务被worker处理。

获取执行结果

可通过AsyncResult对象通过返回的事件id来获取事件信息。

AsyncResult属性和方法

state: 返回任务状态,等同status;

task_id: 返回任务id;

result: 返回任务结果,同get()方法;

ready(): 判断任务是否执行以及有结果,有结果为True,否则False;

info(): 获取任务信息,默认为结果;

wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;

successful(): 判断任务是否成功,成功为True,否则为False;

演示

视图

from celery import result
from django.http import JsonResponse


def task_info(request):

    task_id = request.GET.get('task_id')

    res = result.AsyncResult(task_id)
    if res.ready():
        return JsonResponse({'status': res.state, 'result': res.get()})
    else:
        return JsonResponse({'status': res.state, 'result': ''})

路由

path(r'task_info/<str:id>', views.task_info, name='task_info'),

浏览器访问

地址栏传入新增事件时,返回的事件id

 

Celery报错

kombu.async.timer

开启celery,提示错误Kombu.async.timer import Entry......

解决方法:

卸载celery,重装固定版本

命令如下:

pip install celery==4.3

 

cannot import name 'current_app'

cannot import name 'current_app' from 'celery'

解决方法:

打开D:\python3.7\lib\site-packages\celery\local.py"

原方法

def getappattr(path):
    """Get attribute from current_app recursively.

    Example: ``getappattr('amqp.get_task_consumer')``.

    """
    from celery import current_app
    return current_app._rgetattr(path)

修改为:

def getappattr(path):

    """Get attribute from current_app recursively.

    Example: ``getappattr('amqp.get_task_consumer')``.


    """
    # from celery import current_app
    from celery._state import current_app
    return current_app._rgetattr(path)

AttributeError

AttributeError: 'EntryPoints' object has no attribute 'get'

解决方法:

安装固定版本的importlib-metadata

命令如下:

pip install importlib-metadata==4.13.0

 

定时执行

增加配置

工程目录下/settings.py

CELERYBEAT_SCHEDULE = {
    'every_5_seconds': {
        # 任务路径 应用目录/任务文件/方法
        'task': 'myapp.tasks.schedule_execute',
        # 每5秒执行一次
        'schedule': 5,
        # 设置参数
        'args': (14, 6, 5)
    }
}

多个任务执行

可在原来基础上继续增加

CELERYBEAT_SCHEDULE = {
    'every_5_seconds': {
        # 任务路径 应用目录/任务文件/方法
        'task': 'myapp.tasks.schedule_execute',
        # 每5秒执行一次
        'schedule': 5,
        # 设置参数
        'args': (14, 6, 5)
    },
    'every_10_seconds': {
        # 任务路径 应用目录/任务文件/方法
        'task': 'myapp.tasks.schedule_execute',
        # 每10秒执行一次
        'schedule': 10,
        # 设置参数
        'args': (18, 10, 10)
    },
}

创建执行方法

在task.py中设置执行方法并记录日志

from celery import shared_task
import logging

logger = logging.getLogger(__name__)


@shared_task
def schedule_execute(x, y, s):
    logger.info('_____' * 20)
    logger.info('每%d秒执行一次' % s)
    logger.info('%d + %d = %d' % (x, y, (x+y)))
    logger.info('执行结束')
    logger.info('_____' * 20)

启动celery

(两个cmd)分别启动worker和beat

celery -A project worker -l debug -P eventlet
celery beat -A project -l debug

可通过控制台查看执行结果。

绑定任务

Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等

实现绑定

修改应用任务文件内容:在装饰器中加入参数 bind=True,在add函数中的第一个参数设置为self。

内容如下:

@shared_task(bind=True)
def add(self, x, y):

    print('添加一个事件')
    time.sleep(2)
    print(x, y)
    logger.info(self.name)
    logger.info(dir(self))
    print('事件执行完成')

实现错误重试

self对象是myapp.tasks.add实例,可用于实现重试。

@shared_task(bind=True)
def add(self, x, y):

    try:
        print('添加一个事件')
        time.sleep(2)
        print(x, y)
        logger.info(self.name)
        # 没有state属性 只是为了验证重试
        logger.info(self.state)
        logger.info(dir(self))
        print('事件执行完成')

    except Exception as e:

        # 出错每5秒尝试一次,尝试3次
        self.retry(exc=e, countdown=5, max_retries=3)

 

celery管理和监控

celery通过flower组件实现管理和监控功能。

Flower官网

Getting started — Flower 2.0.0 documentation

安装

pip install flower

运行命令

celery -A project flower

Or

celery -A project flower --port=5001

参数说明

-A 项目名称

--port 端口号,默认5555

运行flower

运行命令,显示如下:

浏览器访问

http://127.0.0.1:port

 

总结

本文主要介绍了celery的应用场景;

如何安装及安装哪些类库;

异步和定时执行实现以及任务可视化管理。

参考文章

https://www.cnblogs.com/chunyouqudongwuyuan/p/16892475.html

Django 中celery的使用_django celery_宠乖仪的博客-CSDN博客

在django中使用celery_哔哩哔哩_bilibili

一文读懂 Python 分布式任务队列 celery

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/682387.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CVE-2022-25099

文章目录 CVE-2022-25099一、漏洞介绍二、渗透步骤1、打开网站2、登录后台3、文件上传4、查看flag值 CVE-2022-25099 一、漏洞介绍 WBCE CMS v1.5.2 RCE。 WBCE CMS v1.5.2 /language/install.php 文件存在漏洞&#xff0c;攻击者可精心构造文件上传造成RCE。 二、渗透步骤 1…

宝塔面板实用教程(1):10分钟部署在线客服系统

客服系统发布以来&#xff0c;一直有朋友询问如何在宝塔面板中安装部署&#xff0c;开始我一直认为参考 Linux 版的安装教程就可以了&#xff0c;一直没有专门写宝塔环境的教程。这段时间来咨询的朋友越来越多&#xff0c;经过了解&#xff0c;我才知道宝塔面板的普及率有多高&…

SSM流浪动物救助网站-计算机毕设 附源码82131

SSM流浪动物救助网站 摘 要 随着生活水平的持续提高和家庭规模的缩小&#xff0c;宠物已经成为越来越多都市人生活的一部分&#xff0c;随着宠物的增多&#xff0c;流浪的动物的日益增多&#xff0c;中国的流浪动物领养和救助也随之形成规模&#xff0c;同时展现巨大潜力。本次…

XML文件

xml文件 类似于html那种标签语言 但是用途却大不一样&#xff0c;xml一般用于小型数据传输&#xff08;存储数据&#xff09; xml文件作用 xml语法规则 一个简单的xml文件案例 xml解析 解析思想 所谓的xml解析 也就是从xml文件提取数据 解析思想&#xff1a;前端的文档对…

【C++实现】RPC框架的简单实现介绍

文章目录 介绍为什么使用protobufprotobuf的service rpc 框架发布方的上层使用逻辑框架的提供方逻辑Rpc调用方框架实现客户端上层框架使用重新梳理简单聊一下RpcController 引入缓冲区队列zookeeper总结 介绍 以下博客覆盖内容&#xff1a; 集群和分布式概念原理&#xff1b; …

安科瑞智能照明系统在绿色建筑中的应用与产品选型

【摘要】&#xff1a;智能照明系统应用在智能建筑中不仅能营造出舒适的生活、工作环境以及现代化的管理方式而且要具有一定的节能效果。给出了智能照明和传统照明系统的比较并分析了智能照明系统的节能。 【关键字】&#xff1a;智能建筑&#xff1b;智能照明&#xff1b;节能…

什么样的口译是好的?交传和同传的评估标准是什么?

众所周知&#xff0c;交传和同传是口译的两种高级形式&#xff0c;难度比较大&#xff0c;一般应用于国际会议、商务洽谈、重大新闻发布会等领域。那么&#xff0c;如何做好交传和同传翻译&#xff0c;什么样的口译是好的&#xff0c;交传和同传的评估标准是什么&#xff0c;北…

密码学obe软件思路

匿名通信与暗网研究综述 匿名通信系统是一种建立在Internet之上综合利用数据转发、内容加密、流量混淆等多种技术隐藏通信实体关系和内容的覆盖网络.由于匿名通信主体难以被追踪定位,匿名通信网络中各类匿名滥用问题层出不穷,而其中隐藏服务机制更是被用于构建充斥着各种非法…

Integer移位算法

常用移位算法 给定值最高位1的权重给定值最低位1的权重给定值高位连续零的个数给定值低位连续零个数 给定值最高位1的权重 也就是给定值左侧。返回给定值左侧最大的2的次幂 //获取i最高位1代表的2次幂&#xff0c;最高位1代表的权值public static int highestOneBit(int i) …

轻量化技术 | 大面积模型秒加载、TB级数据处理能力、多平台便捷展示应用...

实景三维技术的发展日新月异&#xff0c;但在应用上却一直无法深入&#xff0c;尤其是在大场景三维模型展示与应用过程中&#xff0c;不可避免地会遇到占内存、渲染慢、加载卡顿、模型塌陷等情况&#xff0c;这是由于数据格式类型多、内存占比大、模型体量大。 对此&#xff0…

C++ STL关联式容器和无序容器(哈希容器)

文章目录 5.2 C STL关联式容器5.2.1 C STL map容器5.2.2 C STL multimap容器5.2.3 C STL set容器5.2.2 C STL multiset容器 5.3 C STL无序容器&#xff08;哈希容器&#xff09; 参考 5.2 C STL关联式容器 关联式容器在存储元素值的同时&#xff0c;会为各元素额外再配备一个值…

让你不再疑惑PDF转Excel怎么操作

你是否曾经遇到过需要编辑PDF表格的情况&#xff0c;但却不知道如何进行转换&#xff1f;不用担心&#xff0c;现在有一些简单的方法可以将PDF表格转换为Excel&#xff0c;让你轻松地编辑和修改表格。如果你还不知道PDF表格转Excel怎么转的话&#xff0c;那么接下来三种实用的小…

摆脱传统模式,快速构建系统只需要7步~

摆脱传统模式&#xff0c;快速构建系统只需要7步~ 首先我们来说说用“低代码开发平台”来进行开发是一种怎样的感受&#xff1f;它可以帮助您更快、更可靠地提供价值。通过在可视化设计器中&#xff0c;以拖拽的方式快速构建应用程序&#xff0c;您可以跳过基础架构以及可能会让…

聚观早报 | TikTok首席运营官离职;AMDR7 7840HS轻薄本开始上市

今日要闻&#xff1a;TikTok首席运营官离职&#xff1b;AMDR7 7840HS轻薄本开始上市&#xff1b;美国5月AI招聘岗位增加约20%&#xff1b;荷兰传最收紧ASML对华出口限制&#xff1b;力斯SERES5完成欧洲首批交付 TikTok首席运营官离职 当地时间 6 月 22 日&#xff0c;TikTok 首…

LangChain 基于 Prompts 提示管理 构建特定领域模型

一、langChain Prompts 上篇文章对 langChain 进行了简单的介绍与使用&#xff0c;可以知道LangChain 是一个由语言模型LLMs驱动的应用程序框架&#xff0c;使用 LangChain 可以极大的降低开发成本&#xff0c;本篇文章主要基于 LangChain 中的 Prompts 提示管理实现特定领域的…

【软件设计师暴击考点】知识产权高频考点暴击系列

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;软件…

Linux conda 环境迁移 服务器之间在线迁移

网上很多方法语焉不详&#xff0c;本文章主要介绍conda list方式进行Linux系统在线环境迁移&#xff0c;迁移完毕后需要手动安装缺失的python库&#xff0c;负责环境不推荐此种方式迁移。 在Linux系统之间进行单一环境迁移&#xff1a;从服务器A迁移到服务器B 服务器A&#xf…

磷酸铁锂电池应用前景广阔,英集芯响应市场推出IP2366电源管理芯片

正极材料是锂电池的核心材料之一&#xff0c;其性能直接影响锂电池的能量密度、安全性、寿命和应用等&#xff0c;占电池总材料成本中的比例超过30%。目前行业内常见的锂离子电池正极材料主要可分为磷酸铁锂&#xff08;LFP&#xff09;、三元材料&#xff08;NCM、NCA&#xf…

使用全志方案遇到glibc库版本低以及编译报错的解决方法

Glibc 包含了linux一些主要的C库&#xff0c;用于分配内存、搜索目录、打开关闭文件、读写文件、字串处理、模式匹配、数学计算等&#xff0c;在遇到glibc库版本低编译还报错的情况时&#xff0c;遵循以下步骤解决 参考文章&#xff1a;如何编译glibc库 make工具 注意由于AW…

Flask基础及常见问题整理

一、Flask框架介绍 使用python开发后端&#xff0c;目前主流的框架就是Flask和Django。其中&#xff0c;Flask是一款轻量级的Python web框架,有以下主要特点: 1. 简单易用 Flask很简单易用&#xff0c;可以快速上手开发web应用。它只依赖Werkzeug和Jinja2两个库&#xff0c;…