分布式异步任务框架celery

news2025/1/16 0:54:24

Celery介绍

github地址:GitHub - celery/celery: Distributed Task Queue (development branch)

文档地址:Celery - Distributed Task Queue — Celery 5.3.6 documentation

1.1 Celery是什么

celery时一个灵活且可靠的处理大量消息的分布式系统,可以在多个节点之间处理某个任务

celery时一个专注于实时处理的任务队列,支持任务调度

celery是开源的,有很多的使用者

celery完全基于python语言编写的

celery本质上是一个【分布式的异步任务调度框架】,类似于Apache的airflow

celery只是用来调度任务的,但是它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来,因此要使用celery的话,还需要搭配一些具有存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等。官方推荐是消息队列RabbitMQ,我们使用Redis

同步调用函数 --》add--》执行5s钟--》数据返回了

异步调用函数--》add--》执行5s钟--》执行完的数据,找个地方存起来

调用方--》去存的地方看一下--》任务有没有执行完

1.2 应用场景

1)异步任务

 视频转码、邮件发送、消息推送等一些耗时操作

2)定时任务

定时推送消息、定时爬取一些数据、定时统计一些数据

3)延时任务

提交任务后,等一段时间再执行任务

1.3 celery架构

celery架构,它采用典型的生产者-消费者模式,主要由以下部分组成

生产者生产---消费者进行消费

producer:它负责把任务提交到broker钟

celery Beat:会读取文件、周期性的向broker中提交任务

broker:消息中间件,放任务的地点,celery本身不提供,借助redis等消息队列

worker:工人、消费者,负责从消息中间件中取出任务--》执行

backend:worker执行完,会有结果,结果存储再backend,celery不提供,借助redis等消息队列。

Celery使用

2.1 安装

pip install celery

使用redis作为消息队列

pip install redis

如果是Windows系统还需要安装eventlet

pip install eventlet

2.2 使用

创建main.py文件

import time

from celery import Celery

# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
# 结果消息队列
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend)


# 编写任务
@app.task  # 被装饰器装饰了,才是celery任务
def add(a, b):
    print("a+b的结果是", a + b)
    time.sleep(1)  # 模拟耗时
    return a + b

创建add_task.py文件编写消费者代码

"""这个程序用来提交任务 producer"""
from main import add

# # 同步任务
# res = add(1, 2)
# print(res)
# 异步任务
# 像消息队列中提交了一个任务,计算1+5的任务,但是没有执行  ceec680b-e0fb-4636-9244-1fa7ca0c570c
res = add.delay(1, 5)  # 没有耗时,直接返回,但是没有返回值,而是返回一个uuid号
print(res)
# 启动worker 再终端使用命令启动,执行完成后会把结果存到redis的2库中
# win :celery -A main worker -l info -P eventlet
# mac/linux:celery -A main worker -l info

启动worker需要再终端下方进行启动

win :celery -A main worker -l info -P eventlet
mac/linux:celery -A main worker -l info

如果报错celery库找不到的问题,使用python -m celery -A main worker -l info -P eventlet进行启动

2.3 包结构

后续的项目越来越大,task任务越来越多,希望把任务拆分再多个py文件中

目录结构

celery.py

import time

from celery import Celery

# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,
             include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])

# 任务分到不同的py文件中

user_task.py

import time
from .celery import app


@app.task
def send_email(to):
    print("发送邮件")
    time.sleep(3)
    print(f"向{to}发送邮件成功")
    return f"向{to}发送邮件成功"

order_task.py

import time
from .celery import app


@app.task
def pay_order():
    print("开始下单")
    time.sleep(5)
    print("下单完成")
    return "下单完成"

crawl_task.py

import time
from .celery import app


@app.task
def crawl_baidu():
    print("开始爬虫百度")
    time.sleep(2)
    print("爬虫完毕百度")
    return "爬虫完毕百度"


@app.task
def crawl_dewu():
    print("开始爬虫得物")
    time.sleep(2)
    print("爬虫完毕百度")
    return "爬虫完毕得物"

add_task.py

from celery_task.crawl_task import crawl_baidu

res = crawl_baidu.delay()  # 没有参数,这里就不传
print(res)  # 得到uuid

get_result.py(查询是否被执行)

from .celery_task.celery import app
from celery.result import AsyncResult

id = "你的任务uuid"
if __name__ == '__main__':
    result = AsyncResult(id=id, app=app)
    if result.successful():
        result = result.get()
        print(result)
    elif result.failed():
        print("任务失败")
    elif result.status == "PENDING":
        print("任务等待被执行")
    elif result.status == "RETRY":
        print("任务异常后正在重试")
    elif result.status == "STARTED":
        print("任务已经开始被执行")

启动worker命令

celery -A celery_task(包名) worker -l info -P eventlet

异步任务-延时任务-定时任务

异步任务

上述介绍的均为异步任务

使用delay()

延时任务

from celery_task.user_task import send_email
from datetime import datetime, timedelta

eta = datetime.utcnow() + timedelta(seconds=5)  # 默认时区为utc时区
res = send_email.apply_async(args=['邮箱'], eta=eta)
print(res)

apply_async(args=['参数'],eta=延时时间)

如果延迟任务提交了,但是worker没启动,等延迟的时间,worker再启动,任务会立马启动

定时任务

在celery.py中

import time

from celery import Celery

# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,
             include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])

# 任务分到不同的py文件中


# 加入定时任务
# 指定了时区,中国时区,以后延时任务
app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False
# 每隔5s爬取百度
from datetime import datetime, timedelta

app.conf.beat_schedule = {
    'low-task': {
        'task': 'celery_task.crawl_task.crawl_baidu',
        'schedule': timedelta(seconds=5), # 每5秒发送一次
        'args': ()  # 参数
    }
}
# 必须启动beat

启动beat命令

celery -A celery_task beat -l info

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1534428.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库关系运算理论:传统的集合运算概念解析

✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…

如何在wps的excel表格里面使用动态gif图

1、新建excel表格,粘贴gif图到表格里面,鼠标右键选择超链接。 找到源文件, 鼠标放到图片上的时候,待有个小手图标,双击鼠标可以放大看到动态gif图。 这种方式需要确保链接的原始文件位置和名称不能变化!&a…

网工内推 | 云计算工程师,HCIE认证优先,最高18k*14薪

01 杭州中港科技有限公司 招聘岗位:云计算工程师 职责描述: 1、承担云计算相关工程交付、业务上云及售前测试,从事虚拟化、桌面云、存储、服务器、数据中心、大数据、相关产品的工程项目交付或协助项目交付。 2、承担云计算维护工程师职责&…

深入理解Mysql索引底层原理(看这一篇文章就够了)

目录 前言 1、Mysql 索引底层数据结构选型 1.1 哈希表(Hash) 1.2 二叉查找树(BST) 1.3 AVL 树和红黑树 1.4 B 树 1.5 B树 2、Innodb 引擎和 Myisam 引擎的实现 2.1 MyISAM 引擎的底层实现(非聚集索引方式) 2.2 Innodb 引…

L4 级自动驾驶汽车发展综述

摘要:为了减小交通事故概率、降低运营成本、提高运营效率,实现安全、环保的出行,自动驾驶 技术的发展已成为大势所趋,而搭配有L4 级自动驾驶系统的车辆是将车辆驾驶全部交给系统。据此,介绍了自动驾驶汽车的主流技术解决方案;分析了国内外L4 级自动驾驶汽车的已发布车型、…

Python 安装目录及虚拟环境详解

Python 安装目录 原文链接:https://blog.csdn.net/xhyue_0209/article/details/106661191 Python 虚拟环境 python 虚拟环境图解 python 虚拟环境配置与详情 原文链接:https://www.cnblogs.com/hhaostudy/p/17321646.html

C语言易错知识点:二级指针、数组指针、函数指针

指针在C语言中非常关键,除开一些常见的指针用法,还有一些可能会比较生疏,但有时却也必不可少,本文章整理了一些易错知识点,希望能有所帮助! 1.二级指针: parr是一个指针数组,其中每…

std::shared_ptr与std::make_unique在类函数中的使用

在最近学习cartographer算法的时候,发现源码中大量的使用了std::shared_ptr与std::make_unique,对于这些东西之前不是很了解,为了更好的理解源代码,因此简单学习了一下这块内容的使用,在这里简单记个笔记。 std::shar…

【热门话题】深入浅出:npm常用命令详解与实践

🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 ​💫个人格言: "如无必要,勿增实体" 文章目录 标题:深入浅出:npm常用命令详解与实践引言一、npm基本概…

打流仪/网络测试仪这个市场还能怎么卷?

#喝了点,码点字# 以下为个人观点,看看就好,如有冒犯,私信删稿 都有哪些厂商在做打流仪/网络测试仪 -洋品牌:思博伦/Viavi-Spirent,是德/Keysight-Ixia,信雅纳/Lecroy-Xena, -国产…

睿尔曼超轻量仿人机械臂之-灵巧手动作编写及程序调用

一、灵巧手动作编写 1.连接设备 2. 运动控制 3. 参数设置 4 动作库使用 本软件可以设置灵巧手内部第 1-第 13 套动作序列数据,每套动作序列最多能有 8 步 分解动作,每一步分解动作的手指角度、运动速度、力度以及等待时间都可以单独设置。 步骤数&…

QT_day2:2024/3/21

作业1:使用QT完成一个登录界面 要求: 1. 需要使用Ui界面文件进行界面设计 2. ui界面上的组件相关设置,通过代码实现 3. 需要添加适当的动图 源代码: #include "widget.h" #include "ui_widget.h"Widget…

力扣由浅至深 每日一题.06 删除有序数组中的重复项

希望我们都能对抗生活的苦难,在乌云周围突破阴霾积极的生活 —— 24.3.16 删除有序数组中的重复项 提示 给你一个 非严格递增排列 的数组 nums ,请你 原地 删除重复出现的元素,使每个元素 只出现一次 ,返回删除后数组的新长度。元…

贝尔曼方程【Bellman Equation】

强化学习笔记 主要基于b站西湖大学赵世钰老师的【强化学习的数学原理】课程,个人觉得赵老师的课件深入浅出,很适合入门. 第一章 强化学习基本概念 第二章 贝尔曼方程 文章目录 强化学习笔记一、状态值函数贝尔曼方程二、贝尔曼方程的向量形式三、动作值…

Windows系统部署GoLand结合内网穿透实现SSH远程Linux服务器开发调试

🌈个人主页: Aileen_0v0 🔥热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法|MySQL| ​💫个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-HIOuHATnug3qMHzx {font-family:"trebuchet ms",verdana,arial,sans-serif;f…

【Vue3遇见的问题】创建vue3的项目使用vscode打开后项目的app.vue里面存在爆红

出现的问题 直接上上问题:问题的图片如下: 解决方法 解决效果 补充 因为vetur的插件禁用了 所以需要一个新插件来 这里发现的官网推荐的插件 也就是volar 他两是一样的

嵌入式软件面试-linux-中高级问题

Linux系统启动过程: BIOS自检并加载引导程序。引导程序(如GRUB)加载Linux内核到内存。内核初始化硬件,加载驱动,建立内存管理。加载init进程(PID为1),通常是systemd或SysVinit。init…

Redis监控工具

Redis 是一种 NoSQL 数据库系统,以其速度、性能和灵活的数据结构而闻名。Redis 在许多领域都表现出色,包括缓存、会话管理、游戏、排行榜、实时分析、地理空间、叫车、聊天/消息、媒体流和发布/订阅应用程序。Redis 数据集完全存储在内存中,这…

深度学习——数据预处理

一、数据预处理 为了能用深度学习来解决现实世界的问题,我们经常从预处理原始数据开始, 而不是从那些准备好的张量格式数据开始。 在Python中常用的数据分析工具中,我们通常使用pandas软件包。 像庞大的Python生态系统中的许多其他扩展包一样…

es 集群核心概念以及实践

节点概念: 节点是一个Elasticsearch的实例 本质上就是一个JAVA进程一台机器上可以运行多个Elasticsearch进程,但是生产环境一般建议一台机器上只运行一个Elasticsearch实例 每一个节点都有名字,通过配置文件配置,或者启动时候 -…