Celery的任务流

news2025/1/16 7:52:49

在这里插入图片描述

Celery的任务流

在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此目标,Celery使用一种叫做signatures的东西

celery的简单使用

signature的引入

signature官方文档

可以简单理解为signature是将之前的异步任务以某种方式包装,包装后的异步任务仍可以使用之前的delay()和apply_async()方法,并且包装后的异步任务就可以以多种方式组合成复杂的工作流程

先创建一个tasks.py

import time

import celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)


# 加
@app.task
def add_num(a, b):
    print(f'{a}+{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a + b
    return c


# 减
@app.task
def subtract_num(a, b):
    print(f'{a}-{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a - b
    return c


# 乘
@app.task
def multiply_num(a, b):
    print(f'{a}*{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a * b
    return c


# 除
@app.task
def divide_num(a, b):
    print(f'{a}/{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a / b
    return c


@app.task
def test(args):
    print(args)
    return args

运行celery消费者

# 格式为:celery -A app对象所在的文件 worker -l 日志级别 -Q 队列名称(也可以不指定,默认为celry)
celery -A tasks worker -l info -Q test

在这里插入图片描述

用signature对上面的add_num包装

from celery import signature
from tasks import add_num, subtract_num, multiply_num, divide_num

# 方法1
sign = add_num.signature((1, 1), queue='test')
ret = sign.delay()
print(ret.get())

# 方法2
sign = signature('tasks.add_num', (1, 1), queue='test')
ret = sign.delay()
print(ret.get())

# 方法3
sign = signature(add_num, (1, 1), queue='test')
ret = sign.delay()
print(ret.get())

chain的使用

chain官方链接

chain可以将signature包装的任务函数一个一个执行,一个执行完将执行return结果传递给下一个任务函数

from celery import signature, chain
from tasks import add_num, subtract_num, multiply_num, divide_num

add_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (2,), queue='test')
multiply_sign = signature(multiply_num, (2,), queue='test')
divide_sign = signature(divide_num, (2,), queue='test')

# 对某个数依次做加减乘除处理
chain1 = chain(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = chain1.delay()
print(ret.get())

在这里插入图片描述

可以看到异步任务依次执行,并将上一个异步任务的结果作为参数传递给下一个,形成一个链条

group的使用

group官方链接

group可以将signature包装的任务函数并行执行,返回一组返回值

from celery import signature, chain, group
from tasks import add_num, subtract_num, multiply_num, divide_num

add_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')

# 对某个数分别做加减乘除处理

group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = group1.delay()
print(ret.get())
#[8, 4, 12, 3.0]

在这里插入图片描述
可以看到相比于chain,group里的任务是同时执行

chord的使用

chord官方链接
依赖一个group任务,group任务结束后,将所有子任务的返回值作为参数传递给chord的回调函数,即chord由group任务组与回调函数组成

上代码

from celery import signature, chain, group, chord
from tasks import add_num, subtract_num, multiply_num, divide_num, test

add_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')

group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)

#包装test异步任务函数
test_sign = signature(test, queue='test')

c1 = chord(group1, test_sign)
c1.delay()

在这里插入图片描述
可以看出,在执行完加减乘除所有异步任务后,chord会将任务组的结果作为list交给test函数,这里的test有点像回调函数

PS:根据我的观察,chain,group,chord在执行完后都会返回一个任务id,其中chain的任务id为任务链里最后一个任务的id,group的任务id是一个临时的任务id(group任务都结束后就会消失),chord的任务id是回调函数的任务id。因此chain和chord在任务结束后,任务结果还是可以查到的,而group则查询不到,因此group的任务结果可能无法用AsyncResult查询到

最后附上celery关于任务工作流的官方链接
celery工作流

PS

有的时候我们可能需要在celery的task函数中调用其他的celery函数,并且需要同步的获取结果(其实着本质上就是把异步的celery函数变成同步运行),具体如下,先创建一个tasks.py

import time

import celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)


# 加
@app.task
def add_num(a, b):
    print(f'{a}+{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a + b
    return c


@app.task
def test():#在test函数里调用add_num函数,并且同步获取结果,将结果作为test函数的返回值
    ret = add_num.delay(1,2)
    ret = ret.get()
    return ret
#启动消费者
celery -A tasks worker -l info

调用test异步函数

from tasks import test
ret = test.delay()

在这里插入图片描述

结果就是出错了,因为官方不建议在一个异步任务中区等待另一个异步任务的返回结果,所以这个时候就可以通过上面的chain方法实现这个需求。当然还有一种不建议的方法就是在同步获取celery任务结果的get方法中添加参数disable_sync_subtasks=False,具体如下

import time

import celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)


# 加
@app.task
def add_num(a, b):
    print(f'{a}+{b}')
    time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序
    c = a + b
    return c


@app.task
def test():
    ret = add_num.delay(1, 2)
    ret = ret.get(disable_sync_subtasks=False)#在这添加disable_sync_subtasks=False
    return ret

再调用一次test方法

在这里插入图片描述
成功调用

详见celery官方链接
链接传送门

结语

写这些,仅记录自己学习使用celery的过程。如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1567467.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

个人医疗开支预测项目

注意:本文引用自专业人工智能社区Venus AI 更多AI知识请参考原站 ([www.aideeplearning.cn]) 项目背景 随着医疗成本的持续上涨,个人医疗开支成为一个重要议题。理解影响医疗费用的多种因素对于医疗保险公司、政府机构以及个人…

计算机组成结构1

概念 计算机组成: 运算器、控制器、存储器、输出设备、输入设备 CPU:运算器控制器 运算器:算数逻辑单元ALU、累加寄存器AC、数据缓冲寄存器DR、状态条件寄存器PSW 控制器:指令寄存器IR、程序计数器PC、地址寄存器AR、指令译码器I…

制作ubuntu镜像及烧录

最近更换了nuc主机作为机器人的控制主机,为了避免之前在操作系统过程中把系统搞崩了的惨状决定养成定期备份系统的好习惯,这就需要掌握制作系统镜像及烧录的能力。 1、安装systemback软件 ubuntu中可以使用systemback工具进行系统的备份,这…

人工智能、深度伪造和数字身份:企业网络安全的新前沿

深度伪造(Deepfakes)的出现打响了网络安全军备竞赛的发令枪。对其影响的偏执已经波及到一系列领域,包括政治错误信息、假新闻和社交媒体操纵。 深度伪造将加剧公共领域对信任和沟通的本已严峻的压力。这将理所当然地引起监管机构和政策制定者…

鸿蒙南向开发实战:【智能窗帘】

样例简介 智能窗帘设备不仅接收数字管家应用下发的指令来控制窗帘开启的时间,而且还可以加入到数字管家的日程管理中。通过日程可以设定窗帘开关的时间段,使其在特定的时间段内,窗帘自动打开或者关闭;通过日程管家还可以实现窗帘…

Intel TBB安装​​​​

下载 windows 版本,如: oneapi-tbb-2021.11.0-win.zip Releases oneapi-src/oneTBB (github.com)https://github.com/oneapi-src/oneTBB/releases 下载后解压,执行 env 文件夹下的 vars.bat 即可 Intel TBB,全称Intel Threading Building…

idea建多级目录出现问题,报错找不到xml文件,如何解决?

🏆本文收录于「Bug调优」专栏,主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&…

Python3.10 - 列表的常用方法

01 列表的切片 lst [a, b, 1, 2, [c1, 1]]# 01 列表的切片 lst1 lst[0:2] # 从0开始切到1(顾头不顾尾), 切得长度超过1时, 切出来为list类型 print(lst1, type(lst1))lst2 lst[0] # 从中切某个元素时, 切出来的类型即元素本身, 以下同理 print(lst2, type(lst2))lst3 ls…

会声会影可以制作卡拉OK字幕吗 会声会影制作卡拉OK字幕教程 会声会影视频制作教程 会声会影模板免费下载 会声会影2023永久激活版

会声会影是一款功能强大、操作简便的视频剪辑软件,适合不同级别的用户使用,无论是初学者还是专业人员,都能够轻松地实现自己的创意和想法。这篇文章就一起来学习一下会声会影可以制作卡拉OK字幕吗,会声会影制作卡拉OK字幕教程。 …

stm32之基本定时器的使用

在上文我们使用到了HAL库的自带的延时函数,HAL_Delay();我们来看一下函数的原型 __weak void HAL_Delay(uint32_t Delay) {uint32_t tickstart HAL_GetTick();uint32_t wait Delay;/* Add a freq to guarantee minimum wait */…

数据结构算法题(力扣)——链表

以下题目建议大家先自己动手练习,再看题解代码。这里只提供一种做法,可能不是最优解。 1. 移除链表元素(OJ链接) 题目描述:给一个链表的头节点 head 和一个整数 val ,删除链表中所有满足值等于 val 的节点…

关于ansible的模块 ③

转载说明:如果您喜欢这篇文章并打算转载它,请私信作者取得授权。感谢您喜爱本文,请文明转载,谢谢。 接《关于Ansible的模块①》和《关于Ansible的模块②》,继续学习ansible的user模块。 user模块可以增、删、改linux远…

Pycharm显示Low memory的解决办法

这种情况该怎么办呢? 按照网上的说法,首先按照下图,选择memory Indicator: 就可以在pycharm的右下角看到内存以及其分配情况(allocated表示被分配的,可以看到我的已经被分配完了,应该是这个意思&#xff0…

Ubuntu Desktop 安装有道词典

Ubuntu Desktop 安装有道词典 1. 有道词典2. Installation2.1. 解压 deb 包到 youdao 目录2.2. 解压 deb 包中的 control 信息 (包的依赖写在该文件里面)2.3. 编辑 control 文件,删除依赖里面的 gstreamer0.10-plugins-ugly2.4. 创建 youdaobuild 目录,重…

Linux非管理员安装ninja,解决RuntimeError: Ninja is required to load C++ extensions错误

最近在复现代码的时候,需要用到C环境进行编译,这就少不了ninja,但是因为服务器是实验室公用的,所以一般没有管理员权限,所以就很难办!!!! 下面是非管理员权限安装ninja&a…

新型智慧城市大数据解决方案(附下载)

随着云计算、大数据、移动互联网等技术的发展,由城市运行产生的交通、环境、市政、商业等各领域数据量巨大,这些数据经过合理的分析挖掘可产生大量传统数据不能反映的城市运行信息,已成为智慧城市的重要资产。 在大数据时代,数据信…

【EasyExcel】多sheet、追加列

业务-EasyExcel多sheet、追加列 背景 最近接到一个导出Excel的业务,需求就是多sheet,每个sheet导出不同结构,第一个sheet里面能够根据最后一列动态的追加列,追加多少得看运营人员传了多少需求列。原本使用的 pig4cloud 架子&…

excel统计分析——协方差分析的作用

参考资料:生物统计学 1、协变量与试验因素的区别 如果把协方差分析资料中的协变量看作多因素方差分析资料中的一个因素,则两类资料有相似之处,但两类资料有本质的不同。在方差分析中,各因素的水平时人为控制的,即使是…

[flink 实时流基础] flink 源算子

学习笔记 Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整…

js手持小风扇

文章目录 1. 演示效果2. 分析思路3. 代码实现 1. 演示效果 2. 分析思路 先编写动画&#xff0c;让风扇先转起来。使用 js 控制动画的持续时间。监听按钮的点击事件&#xff0c;在事件中修改元素的animation-duration属性。 3. 代码实现 <!DOCTYPE html> <html lang…