[ Tool ] celery分布式任务框架基本使用

news2024/12/25 13:07:43

celery官方
Celery 官网:www.celeryproject.org/

Celery 官方文档英文版:docs.celeryproject.org/en/latest/i…

Celery 官方文档中文版:docs.jinkan.org/docs/celery…

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

专注于实时处理的异步任务队列

同时也支持任务调度

注意:

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

celery异步任务框架
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务 正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题 人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

celery架构
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务,比如每天数据统计

celery的安装配置

pip install celery

pip install eventlet

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)

两种celery的任务结构

基本结构

只写一个py文件,如celery_task.py

# worker文件
from celery import Celery
broker='redis://127.0.0.1:6379/1'  # broker任务队列
backend='redis://127.0.0.1:6379/2' # 结果存储
app=Celery(__name__,broker=broker,backend=backend)
 
#添加任务(使用这个装饰器装饰,@app.task)
@app.task
def add(x,y):
    print(x,y)
    return x+y
启动worker:
    需要将路径切换到该py文件的路径下,用终端执行
    - 非windows:celery worker -A celery_task -l info
    - windows:celery worker -A celery_task -l info -P eventlet
        
# 提交任务到broker文件
from celery_task import add
add(3,4)  # 直接执行,不会被添加到broker中
ret = add.delay(1,2)  # 向broker中添加一个任务
print(ret)  # ret是任务编号,后期需要通过任务编号获取任务执行结果
 
# 查看任务执行结果文件
from celery_task import app-
from celery.result import AsyncResult
id = '3e397fd7-e0c1-4c5c-999c-2655a96793bb'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
        elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
 
包结构
  • 新建一个包,包名随意,比如celery_task

包内结构

-celery_task

​ -__init__.py

​ -celery.py(必须叫celery)

​ -task01.py(任务文件,每个任务单独写一个py文件)

​ -task02.py

-add_task.py:为worker中提交任务

_get_res.py:获取任务执行结果

  • celery.py
from celery import Celery
broker='redis://127.0.0.1:6379/1'  # broker任务队列
backend='redis://127.0.0.1:6379/2' # 结果存储
app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task01','celery_task.task02'])
 
  • add_task.py
from celery_task.task01 import add
from celery_task.task02 import mul
 
 
ret = add.delay(1,2)
print(ret)
 
ret = mul.delay(10,10)
print(ret)
 
  • get_res.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '3d3779be-fe75-4ad4-ab31-d7dbe13d3e63'
 
 
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
 
执行延时任务
 

执行延时任务

  • add_task.py
from celery_task.task01 import add
from celery_task.task02 import mul
 
# 非延迟任务
ret = add.delay(1,2)
print(ret)
ret = mul.delay(10,10)
print(ret)
 
 
# 将任务做成延迟任务
from datetime import datetime,timedelta
# 延迟时间必须是utc时间,timedelta是根据utc时间向后延迟
eta = datetime.utcnow() + timedelta(seconds=10)
ret = add.apply_async(args=(200,50),eta=eta)
print(ret)

执行定时任务
执行定时任务时,任务的配置需要写在celery.py中,就是worker,因为定时任务的配置需要在worker开启之前加载,否则无法执行定时任务。

执行定时任务除了需要woker以外,还需要一位‘工人’定时为worker送任务,这个工人就是beat

执行定时任务的时候,需要启动worker及beat,一个负责执行任务,一个负责定时运送任务

启动worker,启动beat -celery worker -A celery_task -l info -P eventlet -celery beat -A celery_task -l info

  • celery.py
from celery import Celery
 
 
broker='redis://127.0.0.1:6379/1'  # broker任务队列
backend='redis://127.0.0.1:6379/2' # 结果存储
app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task01','celery_task.task02'])
 
 
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'add-task': {
        # 需要定时执行的任务
        'task': 'celery_task.task01.add',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        # 任务需要的参数
        'args': (300, 150),
    }
}
 
总结:

感谢每一个认真阅读我文章的人!!!

作为一位过来人也是希望大家少走一些弯路,如果你不想再体验一次学习时找不到资料,没人解答问题,坚持几天便放弃的感受的话,在这里我给大家分享一些自动化测试的学习资源,希望能给你前进的路上带来帮助。

软件测试面试文档

我们学习必然是为了找到高薪的工作,下面这些面试题是来自阿里、腾讯、字节等一线互联网大厂最新的面试资料,并且有字节大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。

   视频文档获取方式:
这份文档和视频资料,对于想从事【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴我走过了最艰难的路程,希望也能帮助到你!以上均可以分享,点下方小卡片即可自行领取。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1353386.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HackTheBox - Medium - Linux - Agile

Agile Agile 是一个中等难度的 Linux 机器,在端口 80 上有一个密码管理网站。创建帐户并添加几个密码后,发现网站的导出到 CSV 功能容易受到任意文件读取的攻击。其他终结点的枚举显示“/download”在访问时引发错误,并显示“Werkzeug”调试…

贪心算法part05 435无重叠区间

435无重叠区间 763 划分字母区间 56合并区间

什么是聚合支付,又能带来哪些好处?

随着科技的飞速发展,人们的支付方式也在不断地发生变革。从最初的现金支付、银行卡支付,到现在的移动支付、扫码支付等,支付方式已经变得越来越便捷。聚合支付作为一种新型的支付方式,也在逐渐改变着人们的生活方式。那么&#xf…

CSS 中间位置翻转动画

<template><div class"container" mouseenter"startAnimation" mouseleave"stopAnimation"><!-- 旋方块 --><div class"box" :class"{ rotate-hor-center: isAnimating }"><!-- 元素内容 -->…

一篇文章带你搞定Python所有内置函数

前言 Python 内置了许多的函数和类型&#xff0c;比如print()&#xff0c;input()等&#xff0c;我们可以直接在程序中使用它们&#xff0c;非常方便&#xff0c;并且它们是Python解释器的底层实现的&#xff0c;所以效率是比一般的自定义函数更有效率。目前共有71个内置函数&…

深入解析d3dcompiler_47.dll文件及其丢失的修复方法

一、d3dcompiler_47.dll是什么文件&#xff1f; d3dcompiler_47.dll是DirectX SDK中的一个动态链接库文件&#xff0c;它是用于编译DirectX着色器的工具之一。DirectX是由微软公司开发的一种多媒体编程接口&#xff0c;它提供了一系列的API和工具&#xff0c;用于开发游戏和多…

打造专业开发者指南:针对ShardingProxy分库分表解决策略的深度剖析 – 详解部署、使用、服务治理与优化技巧

一、 ShardingProxy快速使用 ShardingProxy的功能同样是分库分表&#xff0c;但是他是一个独立部署的服务端&#xff0c;提供 统一的数据库代理服务。注意&#xff0c;ShardingProxy目前只支持MySQL和PostgreSQL。并且&#xff0c;客户端连接ShardingProxy时&#xff0c;最好使…

java每日一题——双色球系统(答案及编程思路)

前言&#xff1a; 打好基础&#xff0c;daydayup! 题目&#xff1a;要求如下&#xff08;同时&#xff1a;红球每个号码不可以相同&#xff09; 编程思路&#xff1a;1&#xff0c;创建一个可以录入数字的数组&#xff1b;2&#xff0c;生成一个可以随机生成数字的数组&#xf…

MT8766安卓核心板规格参数_MTK8766核心板模块方案定制

MT8766安卓核心板&#xff1a;高性能、稳定可靠、集成度高的一体化解决方案 MT8766安卓核心板采用联发科MTK8766四核4G模块方案&#xff0c;是一款高度集成的安卓一体板。四核芯片架构&#xff0c;主频可达到2.0GHz&#xff0c;支持国内4G全网通。12nm制程工艺&#xff0c;支持…

虾皮跨境电商选品有哪些规则

如何在虾皮&#xff08;Shopee&#xff09;平台上进行跨境电商选品在如今全球化的商业环境中&#xff0c;跨境电商已成为许多卖家拓展业务的重要途径。虾皮&#xff08;Shopee&#xff09;作为一家知名的跨境电商平台&#xff0c;为卖家提供了丰富的销售机会。然而&#xff0c;…

Linux 485驱动通信异常

背景 前段时间接到一个项目&#xff0c;要求用主控用485和MCU通信。将代码调试好之后&#xff0c;验证没问题就发给测试了。测试测的也没问题。 但是&#xff0c;到设备量产时&#xff0c;发现有几台设备功能异常。将设备拿回来排查&#xff0c;发现是485通信有问题&#xff…

大语言模型LLM微调技术:P-Tuning

1 引言 Bert时代&#xff0c;我们常做预训练模型微调&#xff08;Fine-tuning&#xff09;&#xff0c;即根据不同下游任务&#xff0c;引入各种辅助任务loss和垂直领域数据&#xff0c;将其添加到预训练模型中&#xff0c;以便让模型更加适配下游任务的方式。每个下游任务都存…

生活中危险的气体:一氧化碳与二氧化碳中毒的症状及安全预防措施

一氧化碳和血红蛋白亲和力超过氧气&#xff0c;会占用血红蛋白&#xff0c;导致缺氧。 二氧化碳会和血浆结合&#xff0c;导致血液pH值不正常&#xff0c;抑制呼吸&#xff0c;导致窒息。 通俗点说&#xff1a;一氧化碳是中毒&#xff0c;二氧化碳则是窒息。 一氧化碳中毒 …

【完整代码】网上书店信息管理系统--基于Mysql数据库与java

网上书店信息管理系统 一、需求分析&#xff08;一&#xff09;设计系统的意义以及用途&#xff08;二&#xff09;实现的功能1.用户模块&#xff1a;1、全部图书浏览2、图书搜索3、购物车管理和订单查看4、修改密码 2.书店管理员模块1、图书类别管理2、图书管理3、全部订单查看…

互联网加竞赛 Yolov安全帽佩戴检测 危险区域进入检测 - 深度学习 opencv

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; Yolov安全帽佩戴检测 危险区域进入检测 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 该项目较为新颖&am…

ISCTF 2023 miscweb wp

web 圣杯战争!!! 题目: PHP <?php highlight_file(__FILE__); error_reporting(0); class artifact{ public $excalibuer; public $arrow; public function __toString(){ echo "为Saber选择了对的武器!<br>"; return $this…

阿里云性能测评ESSD Entry云盘、SSD云盘、ESSD和高效云盘

阿里云服务器系统盘或数据盘支持多种云盘类型&#xff0c;如高效云盘、ESSD Entry云盘、SSD云盘、ESSD云盘、ESSD PL-X云盘及ESSD AutoPL云盘等&#xff0c;阿里云百科aliyunbaike.com详细介绍不同云盘说明及单盘容量、最大/最小IOPS、最大/最小吞吐量、单路随机写平均时延等性…

Python中的垃圾回收机制是什么

一、写在前面&#xff1a; 我们都知道Python一种面向对象的脚本语言&#xff0c;对象是Python中非常重要的一个概念。在Python中数字是对象&#xff0c;字符串是对象&#xff0c;任何事物都是对象&#xff0c;而它们的核心就是一个结构体--PyObject。 typedef struct_object{i…

data.TensorDataset解析

data.TensorDataset 是 PyTorch 中的一个类&#xff0c;用于创建一个包含多个张量的数据集。这个类的主要作用是将输入的张量组合成一个数据集&#xff0c;使得在训练过程中可以方便地进行数据加载和迭代。 具体来说&#xff0c;TensorDataset 接受一系列的张量作为输入参数&a…

正负样本分配策略simOTA

simOTA是YOLOX中提出的 正负样本分配策略&#xff08;OTA, SimOTA&#xff0c;TAS&#xff09; OTA源于2021年cvpr的论文&#xff0c;使训练和验证的标签有着更好的对应关系。 yolov5没有用到&#xff0c;只有一种loss&#xff1a; from utils.loss import ComputeLoss comput…