python 异步任务框架 Celery 入门,速看!

news2025/1/11 4:19:06

一、简介

Celery 是使用 python 编写的分布式任务调度框架。

它有几个主要的概念:

celery 应用

  • 用户编写的代码脚本,用来定义要执行的任务,然后通过 broker 将任务发送到消息队列中

broker

  • 代理,通过消息队列在客户端和 worker 之间进行协调。

  • celery 本身并不包含消息队列,它支持一下消息队列

    RabbitMQ

    Rdis

    Amazon SQS

    Zookeeper

  • 更多关于 Broker 见官方文档(末尾点击阅读原文)

backend

  • 数据库,用来存储任务返回的结果。

worker

  • 工人,用来执行 broker 分派的任务。

任务

  • 任务,定义的需要执行的任务

版本要求

Celery5.1 要求:

  • python(3.6,3.7,3.8)

Celery 是一个资金最少的项目,所以我们不支持 Microsoft Windows。

更多更详细的版本要求见官方文档

安装

使用 pip 安装:

pip install -U Celery

 如果你想学习自动化测试,我这边给你推荐一套视频,这个视频可以说是B站播放全网第一的接口自动化测试教程,同时在线人数到达1000人,并且还有笔记可以领取及各路大神技术交流:798478386      

【已更新】B站讲的最详细的Python接口自动化测试实战教程全集(实战最新版)_哔哩哔哩_bilibili【已更新】B站讲的最详细的Python接口自动化测试实战教程全集(实战最新版)共计200条视频,包括:1.【接口自动化】目前软件测试的市场行情以及测试人员能力标准。、2.【接口自动化】全面熟练Requests库以及底层方法调用逻辑、3.【接口自动化】接口自动化实战及正则和JsonPath提取器的应用等,UP主更多精彩视频,请关注UP账号。https://www.bilibili.com/video/BV17p4y1B77x/?spm_id_from=333.337&vd_source=488d25e59e6c5b111f7a1a1a16ecbe9a 

捆绑包

Celery 还定义了一组包,用于安装 Celery 和给定的依赖项。

可以在 pip 命令中实现中括号来指定这些依赖项。

pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"

二、简单使用

1. 选择一个 broker

使用 celery 首先需要选择一个消息队列。安装任意你熟悉的前面提到的 celery 支持的消息队列。

2. 编写一个 celery 应用

首先我们需要编写一个 celery 应用,它用来创建任务和管理 wokers,它要能够被其他的模块导入。

创建一个tasks.py 文件:


from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):   
    return x + y

第一个参数tasks是当前模块的名称,它可以省略,建议以当前模块名为名称。

第二个关键字参数 broker='redis://localhost:6379/0'指定我们使用 Redis 作为消息队列,并指定连接地址。

3.运行 celery 的 worker 服务

cd 到 tasks.py 所在目录,然后运行下面的命令来启动 worker 服务

celery -A tasks worker --loglevel=INFO

4. 调用任务

>>> from tasks import add
>>> add.delay(4,4)

通过调用任务的 delay 来执行对应的任务。celery 会把执行命令发送到 broker,broker 再将消息发送给 worker 服务来执行,如果一切正常你将会在 worker 服务的日志中看到接收任务和执行任务的日志。

5. 保存结果

如果你想要跟踪任务的状态以及保存任务的返回结果,celery 需要把它发送到某个地方。celery 提供多种结果后端。

我们这里以 reids 为例,修改 tasks.py中的代码,添加一个 Redis 后端。

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

更多结果后端见官方文档。(末尾点击阅读原文)

重新启动 worker 服务,重新打开 python 解释器

>>> from tasks import add>>> result = add.delay(4,4)

ready()方法返回任务是否执行完成:

>>> result.ready()False

还可以等待结果完成,但很少使用这种方法,因为它将异步调用转换为同步调用​​​​​​​

>>> result.get(timeout=1)8

三、在应用中使用 celery

创建项目

项目结构:

proj/__init__.py
    /celery.py
    /tasks.py

proj/celery.py

from celery import Celery

app = Celery('proj',
            broker='redis://localhost:6379/0',
            backend='redis://localhost:6379/1',             
            include=['proj.tasks']

)# 配置
app.conf.update(
   result_expires=3600, # 结果过期时间
)

在这个模块中我们创建了一个 Celery 模块。要在你的项目中使用 celery 只需要导入此实例。

proj/tasks.py


from .celery import app


@app.task
def add(x, y): 
   return x + y


@app.task
def mul(x, y):
   return x * y


@app.tas
kdef xsum(numbers)
    return sum(numbers)

启动 worker

celery -A proj worker -l INFO

调用任务​​​​​​​

>>> from proj.tasks import add>>> add.delay(2, 2)

四、在 django 中使用celery

要在你的 django 项目中使用 celery,首先需要定义一个 Celery 的实例。

如果你又 django 项目如下:

- proj/ 
 - manage.py
 - proj/ 
   - __init__.py
   - settings.py
   - urls.py

那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义芹菜实例:file:proj/proj/celery.py


import os

from celery import Celery

# 为`celery`设置默认的django设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')

app = Celery('proj')

# 设置配置来源
app.config_from_object('django.conf:settings',namespace='CELERY')

# 加载所有的已注册django应用中的任务
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self): 
   print(f'Request: {self.request!r}')

然后你需要在你的 proj/proj/__init__.py模块中导入这个应用程序。这样就可以保证 Django 启动时加载应用程序,以便于 @shared_task 装饰器的使用。

proj/proj/__init__.py:

from .celery import app as celery_app__all__ = ('celery_app',)

请注意,此示例项目布局适用于较大的项目,对于简单的项目,可以使用包含定义应用程序和任务的单个模块。

接下来我们来解释一下 celery.py 中的代码,首先,我们设置celery命令行程序的环境变量DJANGO_SETTINGS_MODULE的默认值:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

这一行的作用是加载当前 django 项目的环境设置,特别是当需要在异步任务中用到 ORM。它必须在创建应用程序实例之前。

app = Celery('proj')

我们还添加了 Django 设置模块作为 Celery 的配置源。这意味着我们不必使用多个配置文件,而是直接在 Django 的配置文件中配置 Celery。

app.config_from_object('django.conf:settings', namespace='CELERY')

大写命名空间意味着所有Celery配置项必须以大写指定,并以 CELERY_ 开头,因此例如broker_url 设置变为 CELERY_BROKER_URL。

例如,Django 项目的配置文件可能包括:

settings.py​​​​​​​

CELERY_TIMEZONE = "Asia/Shanghai"CELERY_TASK_TRACK_STARTED = TrueCELERY_TASK_TIME_LIMIT = 30*60

接下来,可重用应用程序的常见做法是在单独的tasks.py模块中定义所有任务Celery有一种方法可以自动发现这些模块:

app.autodiscover_tasks()

使用上面的行,Celery 将按照tasks.py 约定自动从所有已安装的应用程序中发现任务:

- app1/   - tasks.py   - models.py- app2/   - tasks.py   - models.py

这样就不必手动将各个模块添加到CELERY_IMPORTS 设置中。

使用 @shared_task 装饰器

我们编写的任务可能会存在于可重用的应用程序中,而可重用的应用程序不能依赖与项目本身,因此无法直接导入 celery 应用实例。

@shared_task装饰器可以让我们无需任何具体的 celery 实例创建任务:demoapp/tasks.py

# Create your tasks here

from demoapp.models import Widget

from celery import shared_task


@shared_task
def add(x, y):
   return x + y


@shared_task
def mul(x, y):
   return x * y


@shared_task
def xsum(numbers):
   return sum(numbers)


@shared_task
def count_widgets(): 
   return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
   w = Widget.objects.get(id=widget_id)
   w.name = name
   w.save()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/876347.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java有哪些应用领域?

Java是一种广泛使用的编程语言,它在全球范围内拥有数百万的开发者,Java既简单又强大,它可以用于开发各种类型的应用程序,从桌面应用程序到大型企业级系统。Java主要应用于金融、电商、嵌入式、大数据技术等领域。 1.金融行业。许多…

【校招VIP】产品深入分析之电商运营

考点介绍: 电商类产品运营分析是面试里经常遇到的问题,作为产品经理,需要对主流电商,比如淘宝、京东、拼多多等模式、打法有深入的理解。 在分析问题的时候,可以从场景(如618)、用户群体和模式、…

大专同事,7天开发了一套应用管理系统

目录 一、前言 二、如何基于工具实现应用开发? 三、低代码基本功能及操作 体验过程: 01、连接数据源 02、设计表单 03、流程设计 04、图表呈现 05、组织架构设置 五、效率评价 六、小结 一、前言 众所周知,每家公司在发展过程中都需要构建大量…

ChatGPT: 提升程序员开发效率的秘密武器!

引言 在现代软件开发中,时间和效率显得尤为重要。程序员们需要在尽可能短的时间内编写高质量的代码,并使之处于状态良好的维护周期。为满足这些需求,人工智能技术逐渐成为软件开发的一项核心能力。ChatGPT作为自然语言生成模型中的佼佼者&am…

uniapp开发微信小程序底部地区选择弹框

个人项目地址: SubTopH前端开发个人站 (自己开发的前端功能和UI组件,一些有趣的小功能,感兴趣的伙伴可以访问,欢迎提出更好的想法,私信沟通,网站属于静态页面) SubTopH前端开发个人站…

你选的产品真的适合做独立站吗?用这5种方法测试一下吧!

跨境电商行业圈里流行一句话“七分靠选品,三分靠运营”,一个独立站新手卖家要想在跨境电商这个庞大的行业体系里分得一块蛋糕,不深入了解一些选品的技巧恐怕很难做出成绩来。 很多独立站赚不到钱的原因在于选品上的失败,如果你的…

蓝绿灰度发布的介绍

背景介绍 蓝绿灰度发布 介绍 蓝绿部署中,一共有两套系统:一套是正在提供服务系统(也就是上面说的旧版),标记为绿色;另一套是准备发布的系统,标记为蓝色。两套系统都是功能完善的,并且正在运行的系统&…

个微CRM管理系统,都有哪些功能?

微信CRM管理系统是一种优化整个公司的工作流程以改善客户管理的工具。 不仅可以提升微信CRM管理系统的创建,还可以提升现有客户的质量。 那么很多人就想问,微信CRM管理系统都有哪些功能呢? 1、多个微信号聚合聊天,解决微信来回…

通达信接口怎么调用?(通达信量化接口)准确吗?

通达信(DXFeed/TDX)接口是一个常用的股票行情数据接口,在中国股市数据分析和量化交易中应用广泛。下面我将介绍一种通达信接口的调用方式,但请注意,该方式可能存在变化和不准确之处,具体的调用方式还需参考官方文档或其他可靠来源…

Redis数据结构——字典

字典是一种用来保存键值对的数据结构。 在字典中,一个key与一个value相对应,字典中的key是唯一的。 在Redis中字典使用哈希表作为底层实现,用数组来表示一个哈希表,每个元素都是一对key-value 同样,在Redis中字典由三…

【笔试题心得】关于KMP在笔试中的题型

好几家都考到KMP了 问的比较多的是 next数组 , 其实KMP的相关机制我在代码随想录算法训练营第九天|KMP算法_菜鸟的Zoom之旅的博客-CSDN博客中写道过,现在在复习一下,由于next数组的定义其实会有所歧义(有些程序中会直接将前缀表作…

C++专题--标准模板库STL

c专题-标准模板库STL 1 标准模板库概述 2 序列式容器 2.1 vector 容器 2.2 deque 容器 2.3 list 容器 3 关联式容器 4 无序关联容器 5 容器适配器 5.1 STL容器适配器的种类 5.2 stack容器适配器 5.3 queue容器适配器 5.3 priority_queue容器适配器…

Mac RN环境搭建

RN ios android原生环境搭建有时候是真恶心,电脑环境不一样配置也有差异。 我已经安装官网的文档配置了ios环境 执行 npx react-nativelatest init AwesomeProject 报错 然后自己百度查呀执行 gem update --system 说是没有权限,执行失败。因为Mac…

SQL Server2019安装后使用SQL Server身份验证登录失败

错误情况 今天在电脑安装SQL Server2019和SMMS,安装过程一切顺利,但是在使用SMMS连接数据库时出现了异常。使用"Window 身份验证"登录时正常,但是如果改为使用"SQL Server 身份验证"登录时却连接失败! 解决方…

两张图搞定前端面试特别常重要的知识点:defer和async的区别

渲染引擎解析<script>的过程 <script>标签上有defer或async属性&#xff0c;脚本就会异步加载。渲染引擎遇到这一行命令&#xff0c;就会开始下载外部脚本&#xff0c;但不会等它下载和执行&#xff0c;而是直接执行后面的命令&#xff1b;默认情况是渲染引擎遇到…

微信开发之一键扫码入群的技术实现

好友将群二维码发送给机器人&#xff0c;机器人调用本接口将自动识别入群 请求URL&#xff1a; http://域名地址/scanJoinRoom 请求方式&#xff1a; POST 请求头Headers&#xff1a; Content-Type&#xff1a;application/jsonAuthorization&#xff1a;login接口返回 …

MS2692宽带低噪声放大器

MS2692 是一款宽带低噪声放大器&#xff0c;工作频率 0.45GHz  5.0GHz 。 具有高线性度、低噪声、带内增益平坦等特点。在 0.85GHz  4.0GHz 频带内&#xff0c;增益波动小于 3dB 。在 0.85GHz  5.0GHz 频带内&#xff0c;噪声系数 小于 1.2dB 。内部集成偏…

文件批量改名高手:轻松删除文件名,仅保留编号!

您是否经常需要对大量文件进行命名调整&#xff1f;是否为繁琐的手动操作而感到厌烦&#xff1f;现在&#xff0c;我们的智能批量文件改名工具为您提供了一种简单而高效的解决方案&#xff01;只需几步操作&#xff0c;您就能轻松删除原有的文件名&#xff0c;仅保留编号&#…

Oracle-如何判断字符串包含中文字符串(汉字),删除中文内容及保留中文内容

今天遇见一个问题需要将字段中包含中文字符串的筛选出来 --建表 CREATE TABLE HADOOP1.AAA ( ID VARCHAR2(255) ); --添加字段INSERT INTO HADOOP1.AAA(ID)VALUES(理解);....--查询表内容SELECT * FROM HADOOP1.AAA;在网上查找了一下有以下三种方式&#xff1a; 第一种&#…

新的 Python URL 解析漏洞可能导致命令执行攻击

Python URL 解析函数中的一个高严重性安全漏洞已被披露&#xff0c;该漏洞可绕过 blocklist 实现的域或协议过滤方法&#xff0c;导致任意文件读取和命令执行。 CERT 协调中心&#xff08;CERT/CC&#xff09;在周五的一份公告中说&#xff1a;当整个 URL 都以空白字符开头时&…