Celery,一个实时处理的 Python 分布式系统

news2024/11/16 13:31:28

大家好!我是爱摸鱼的小鸿,关注我,收看每期的编程干货。

一个简单的库,也许能够开启我们的智慧之门,
一个普通的方法,也许能在危急时刻挽救我们于水深火热,
一个新颖的思维方式,也许能激发我们无尽的创造力,
一个独特的技巧,也许能成为我们的隐形盾牌……


神奇的 Python 库之旅,第 9

目录

    • 一、什么是 Celery?
    • 二、为什么选择 Celery?
    • 三、Celery 编程示例
    • 四、总结
    • 五、作者Info

一、什么是 Celery?

Celery 是一个强大的工具,它能够帮助我们管理和调度复杂的任务。无论是处理异步任务、计划任务,还是分布式任务,Celery 都能轻松胜任。在这篇文章中,我们将深入探讨 Celery 的魅力,通过多个代码示例,带你全面了解这个神器。

Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,并提供了维护这些任务的工具。它的使用场景非常广泛,比如电子邮件发送、视频处理、数据分析、Web 爬虫等。

Github 项目地址:

https://github.com/celery/celery

在这里插入图片描述

二、为什么选择 Celery?

选择 Celery 的理由有很多,这里列出几个主要的优势:

  • 异步任务处理:能让你在不阻塞主进程的情况下处理任务;
  • 定时任务:支持类似 cron 的定时任务调度;
  • 分布式执行:支持将任务分发到多个机器上执行,提升系统性能;
  • 高可用性:可以与消息队列(如 RabbitMQ, Redis)结合,实现高可用性和可靠性。

在这里插入图片描述

在开始之前,我们需要安装 Celery 和一个消息代理(这里我们选择 Redis):

pip install celery redis


三、Celery 编程示例

快速开始:一个简单的任务
让我们从一个简单的例子开始,创建一个任务来演示 Celery 的基本用法。

# tasks.py
from celery import Celery

# 创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

在这个例子中,我们定义了一个名为 add 的任务,它接收两个参数并返回它们的和。接下来,我们可以启动一个 Celery worker 来处理这个任务:

celery -A tasks worker --loglevel=info

启动 Celery worker 后,我们可以在 Python 交互式环境中调用这个任务:

>>> from tasks import add
>>> result = add.delay(4, 6)
>>> result.get(timeout=10)
10


深入挖掘:任务链和组
Celery 的强大之处在于它能够处理复杂的工作流,比如任务链和任务组。

任务链
任务链允许我们将多个任务串联起来,前一个任务的输出作为下一个任务的输入:

from celery import chain

# 定义任务
@app.task
def multiply(x, y):
    return x * y

@app.task
def add_and_multiply(x, y, z):
    return chain(add.s(x, y), multiply.s(z))()

# 调用任务链
result = add_and_multiply(2, 3, 4)
print(result.get())  # 输出 20,因为 (2 + 3) * 4 = 20

任务组
任务组允许我们并行执行一组任务,并在所有任务完成后获得结果:

from celery import group

# 定义任务组
@app.task
def sum_list(numbers):
    return sum(numbers)

@app.task
def process_groups():
    return group(sum_list.s([1, 2, 3]), sum_list.s([4, 5, 6]), sum_list.s([7, 8, 9]))().get()

# 调用任务组
result = process_groups()
print(result)  # 输出 [6, 15, 24]



定时任务
Celery 还支持定时任务,这类似于 Unix 系统的 cron 作业:

from celery import Celery
from celery.schedules import crontab

app = Celery('periodic_tasks', broker='redis://localhost:6379/0')

@app.task
def scheduled_task():
    print("This task runs every 10 seconds")

app.conf.beat_schedule = {
    'run-every-10-seconds': {
        'task': 'periodic_tasks.scheduled_task',
        'schedule': 10.0,
    },
}

# 启动 Celery beat 进程
celery -A periodic_tasks beat --loglevel=info



错误处理与重试机制
在实际应用中,任务失败是不可避免的。Celery 提供了优雅的错误处理和重试机制:

@app.task(bind=True, max_retries=3)
def unreliable_task(self):
    try:
        # 可能失败的操作
        risky_operation()
    except Exception as exc:
        # 捕获异常并重试
        raise self.retry(exc=exc, countdown=60)

在这个例子中,unreliable_task 如果失败,会在 60 秒后重试,总共重试 3 次。


使用 Celery 在 Web 应用
Celery 常用于 Web 应用中来处理后台任务。下面是一个简单的 Flask 应用,展示了如何集成 Celery:

from flask import Flask, request, jsonify
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery(app)

@celery.task
def long_task():
    import time
    time.sleep(10)
    return 'Task completed!'

@app.route('/start-task', methods=['POST'])
def start_task():
    task = long_task.apply_async()
    return jsonify({}), 202, {'Location': url_for('get_status', task_id=task.id)}

@app.route('/status/<task_id>')
def get_status(task_id):
    task = long_task.AsyncResult(task_id)
    response = {
        'state': task.state,
        'result': task.result,
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

在这个示例中,我们定义了一个 Flask 应用,并将 Celery 集成到其中。long_task 是一个模拟长时间运行的任务,客户端可以通过 start-task 路由启动任务,并通过 status/<task_id> 路由查询任务状态。

更多功能、详细用法可参考官方文档:

https://docs.celeryq.dev/en/stable

四、总结

Celery 是一个强大的工具,它为我们处理异步任务、计划任务和分布式任务提供了极大的便利。在这篇文章中,我们通过多个代码示例,展示了 Celery 的基本用法、任务链和组、定时任务、错误处理和重试机制,以及如何在 Web 应用中集成 Celery。

如果你在日常工作中需要处理复杂的任务调度和分布式任务,那么 Celery 绝对是一个值得深入学习和使用的工具。希望这篇文章能够帮助你更好地理解和使用 Celery,让你的工作更加高效、顺畅。

试一试吧,Celery 会让你的任务调度变得简单又高效!
在这里插入图片描述

五、作者Info

Author:小鸿的摸鱼日常

Goal:让编程更有趣! 专注于 Web 开发、爬虫,游戏开发,数据分析、自然语言处理,AI 等,期待你的关注,让我们一起成长、一起Coding!

版权说明:本文禁止抄袭、转载,侵权必究!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1911926.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring-AOP(二)

作者&#xff1a;月下山川 公众号&#xff1a;月下山川 1、什么是AOP AOP&#xff08;Aspect Oriented Programming&#xff09;是一种设计思想&#xff0c;是软件设计领域中的面向切面编程&#xff0c;它是面向对象编程的一种补充和完善&#xff0c;它以通过预编译方式和运行期…

MySQL:TABLE_SCHEMA及其应用

MySQL TABLE_SCHEMA及其应用 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https://blog.csdn.net/qq_28550263/ar…

持久化存储与设备环境查询的最佳实践

ArkUI框架中的PersistentStorage和Environment 在ArkUI框架中&#xff0c;持久化存储和设备环境查询是应用开发中不可或缺的两个重要功能。在本文中&#xff0c;我们将深入了解框架提供的PersistentStorage和Environment&#xff0c;它们的用途、限制条件以及在应用开发中的使…

SDIO CMD 数据部分 CRC 计算规则

使用的在线 crc 计算工具网址&#xff1a;http://www.ip33.com/crc.html CMD CRC7 计算 如下图为使用逻辑分析仪获取的SDIO读写SD卡时&#xff0c;CMD16指令发送的格式&#xff0c;通过逻辑分析仪总线分析&#xff0c;可以看到&#xff0c;该部分的CRC7校验值得0x05,大多数情况…

二战架构师,拿下

前言 已经许久更新文章了&#xff0c;并不是因为我懒了&#xff0c;而是在备考系统架构师考试。个人感觉还是比较幸运的&#xff0c;低分飘过。现阶段任务也算完成了&#xff0c;记录一下感受。 什么是软考 软考&#xff0c;全称“计算机技术与软件专业技术资格&#xff08…

Agent如何帮助大模型“增强记忆”?

Agent如何帮助大模型“增强记忆”&#xff1f; 原创 格林 神州问学 2024年07月08日 17:50 日本 记忆反馈 >规划&#xff1f; 来源|神州问学 引言 去年6月份&#xff0c;Lilian发布了关于LLM驱动的Agent的结构和组件&#xff0c;其中包括规划、行动、工具还有记忆&#xff…

去除Win32 Tab Control控件每个选项卡上的深色对话框背景

一般情况下&#xff0c;我们是用不带边框的对话框来充当Tab Control的每个选项卡的内容的。 例如&#xff0c;主对话框IDD_TABBOX上有一个Tab Control&#xff0c;上面有两个选项卡&#xff0c;第一个选项卡用的是IDD_DIALOG1充当内容&#xff0c;第二个用的则是IDD_DIALOG2。I…

C++相关概念和易错语法(17)(适配器模式、仿函数)

1.stack和queue stack和queue的相关接口如下&#xff1a; stack queue 我们发现不管是stack还是queue&#xff0c;它们都有push和pop&#xff0c;不区分push_back和push_front&#xff0c;这是由它们的入栈特定顺序特性决定的&#xff0c;并且它们都没有迭代器&#xff0c;st…

【系统架构设计】计算机组成与体系结构(一)

计算机组成与体系结构 计算机系统组成计算机硬件组成控制器运算器主存储器辅助存储器输入设备输出设备 计算机系统结构的分类存储程序的概念Flynn分类 复杂指令集系统与精简指令集系统总线 存储器系统流水线 兜兜转转&#xff0c;最后还是回到了4大件&#xff0c;补基础&#x…

【力扣】数组中的第K个最大元素

一、题目描述 给定整数数组 nums 和整数 k&#xff0c;请返回数组中第 k 个最大的元素。 请注意&#xff0c;你需要找的是数组排序后的第 k 个最大的元素&#xff0c;而不是第 k 个不同的元素。 你必须设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 1: 输入: [3,2,1,5,…

图片像素坐标转实际坐标的一种转换方案

原图 红色的点是我们标注的像素点&#xff0c;这些红色的点我们知道它的像素坐标&#xff0c;以及以右下角相机位置为原点的x y 实际坐标数值 通过转换&#xff0c;可以得到整个图片内部其余像素点的实际坐标&#xff0c; 这些红色的点是通过转换关系生成的&#xff0c;每隔一米…

python破解密码·筛查和选择

破解密码时可能遇到的几种情况 ① 已知密码字符&#xff0c;破排序 ② 已知密码位数&#xff0c;破字符 ③ 已知密码类型&#xff0c;破字位 ④ 已知部分密码&#xff0c;破未知 ⑤ 啥都不知道&#xff0c;盲破&#xff0c;玩完 ⑥ 已知位数、字符、类型、部分密码中的几个&am…

2024全网最全面及最新且最为详细的网络安全技巧五 之 SSRF 漏洞EXP技巧,典例分析以及 如何修复 (下册)———— 作者:LJS

五.SSRF 漏洞EXP技巧&#xff0c;典例分析以及 如何修复 (下册) 目录 五.SSRF 漏洞EXP技巧&#xff0c;典例分析以及 如何修复 (下册) 5.4gopher 协议初探 0x01 Gopher协议 0x02 协议访问学习 复现环境 centos7 kali 2018 发送http get请求 发送http post请求 5.5 SSRF…

☺初识c++(语法篇)☺

目录 一命名空间&#xff08;namespace&#xff09;&#xff1a; 二cout与cin简述&#xff1a; 三缺省参数&#xff1a; 四函数重载&#xff1a; 五引用&#xff1a; 六内联函数: 七c中的nullptr简述&#xff1a; 一命名空间&#xff08;namespace&#xff09;&#xff1…

Chromium编译指南2024 Linux篇-同步Chromium第三方库(四)

1.引言 在成功拉取Chromium源码并创建新分支后&#xff0c;我们需要进一步配置开发环境。这包括拉取必要的第三方库以及设置hooks&#xff0c;以确保我们能够顺利进行编译和开发工作。以下步骤将详细介绍如何进行这些配置。 2.拉取第三方库以及hooks Chromium 使用了大量的第…

2024第六届上海国际新材料展览会-12月精彩呈现

2024第六届上海国际新材料展览会 The 6th shanghai International New Materials Exhibition in 2024 时 间&#xff1a;2024年12月18-20日 地 点&#xff1a;上海新国际博览中心 CIME 2024专业、权威&#xff0c;涵盖整个新材料行业的国际盛会。 期待与您在CIME 2024现场相…

24-7-9-读书笔记(九)-《爱与生的苦恼》[德]叔本华 [译]金玲

文章目录 《爱与生的苦恼》阅读笔记记录总结 《爱与生的苦恼》 《爱与生的苦恼》叔本华大佬的名书&#xff0c;里面有其“臭名昭著”的《论女人》&#xff0c;抛开这篇其他的还是挺不错的&#xff0c;哲学我也是一知半解&#xff0c;这里看得也凭喜好&#xff0c;这里记录一些自…

大模型/NLP/算法面试题总结2——transformer流程//多头//clip//对比学习//对比学习损失函数

用语言介绍一下Transformer的整体流程 1. 输入嵌入&#xff08;Input Embedding&#xff09; 输入序列&#xff08;如句子中的单词&#xff09;首先通过嵌入层转化为高维度的向量表示。嵌入层的输出是一个矩阵&#xff0c;每一行对应一个输入单词的嵌入向量。 2. 位置编码&…

Qt 创建的窗口一闪而过【已解决】

Qt 创建的窗口一闪而过 引言一、详细的解决方案 - 附代码二、参考博文 引言 创建的窗口一闪而过&#xff0c;就是创建完立马被销毁了&#xff0c;常见情况是在一个函数中创建窗口并show() - 即创建在了栈上&#xff0c;函数结束局部变量(窗口)自动被释放。主流的解决方法有两种…

(HAL)stm32f407+freertos通过usb驱动移远4G模块-EC600U

概述 本篇文章主要介绍: 如何使用STM32CubeMX创建stm32F407+freertos+usb host的基础工程。USB-HOST-CDC驱动运行过程。如何根据4G模块的具体信息修改usb相关代码。MCU如何通过usb与4G模块通信,收发数据。调试过程中遇到的问题以及解决办法。 整个过程中在网上搜罗了很多参考…