Celery(分布式任务队列)入门学习笔记

news2025/1/14 2:07:44

Celery 的简单介绍

用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。

Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下面这张图

Celery 官方给出的 Hello World, 对于未接触它的人来说根本就不知道是什么

1

2

3

4

5

6

7

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task

def hello():

    return 'hello world'

还是有必要按住上面那张图看 Celery 的组成部分

  1. Celery 自身实现的部分其实是 Producer 和 Consumer. Producer 创建任务,并发送消息到消息队列,我们称这个队列为 Broker。Consumer 从 Broker 中接收消息,完成计算任务,把结果存到 Backend
  2. Broker 就是那个消息队列,可选择的实现有 RabbitMQ, Redis, Amazon SQS
  3. 结果存储(Backend), 可选择 AMQP(像 RabbitMQ 就是它的一个实现), Redis, Memcached, Cassandra, Elasticsearch, MongoDB, CouchDB, DynamoDB, Amazon S3, File system 等等,看来它的定制性很强
  4. 消息和结果的存储还涉及到一个序列化的问题,可选择 pickle(Python 专用), json, yaml, msgpack. 消息可用 zlib, bzip2 进行压缩, 或加密存储
  5. Worker 的并发可采用 prefork(多进程), thread(多线程), Eventlet, gevent, solo(单线程)]

Celery 应用的基础选型

Celery 的 Broker 和 Backend 有非常多的选择组合,RabbitMQ 和 Redis 都是即可作为 Broker 又能用作 Backend。但 Celery 的推荐是用 RabbitMQ 作为 Broker, 小的结果这里选择用 Redis 作为 Backend, 所以这里的选型是

  1. Broker: RabbitMQ
  2. Backend: Redis
  3. 序列化:JSON  -- 方便在学习中查到消息中的数据
准备 Redis

安装 Python 包

在需要运行 Producer 和 Consumer(worker) 的机器上创建一个 Python 虚拟环境,然后安装下面的包

$ pip install celery redis

实践中只需要安装 celery redis 就能运行后面的例子,没有安装 librabbitmq, "celery[librabbitmq]" 也行,安装了这两个库能使用更高效的 librabbitmq C 库。如果安装了 librabbitmq 库,broker='amqp://...'  默认使用 librabbitmq, 找不到 librabbitmq 的话就用 broker='pyamqp://...'

$ pip install librabbitmq
$ pip install "celery[librabbitmq]"

注:中括号中的是安装 Celery 提供的 bundle, 它定义在 setup.py 的 setup 函数中的 extras_require。

Celery 应用实战

我们不用 Celery 的 Hello World 实例,那不能帮助我们理解背后发生了什么。创建一个 tasks.py 文件

1

2

3

4

5

6

7

8

9

10

11

from celery import Celery

app = Celery('celery-demo',

                broker='amqp://celery:your-password@192.168.86.181:5672/',

                backend='redis://192.168.86.181:6379')

@app.task

def add(x, y):

    return x + y

这里配置连接到 brocker 的 / vhost, 如果连接到别的 vhost, 如 celery 的话, url 写成 amqp://celery:your-passoword@192.168.86.181:5672/celery. backend 的 redis 如果要配置密码, 和 db 的话,写成 redis://:password@192.168.86.181:6379/2

暂且不在该脚本中直接执行 add.delay(15, 30), 而是放到 Python 控制台下方便测试

现在进到 Python 控制台

1

2

3

4

5

6

>>> from tasks import add

>>> task = add.delay(15, 30)

>>> task.id

'c3552fa2-502a-450b-933b-19a1da65ba33'

>>> task.status

'PENDING'

由于 Worker 还没有启动,所以得到一个 task_id, 状态是 PENDING。趁这时候看看 Celery 目前做了什么,来查看到 RabbitMQ

7

celery direct

Celery 在 RabbitMQ 中创建了的资源有

  1. 一个 Exchange: celery direct
  2. 两个 binding: 送到默认(空字符串)或 celery exchange 的, routing-key 为 celery 的消息会转发到队列 celery 中
  3. 一个队列 celery

查看队列 celery 中的消息

1

2

3

4

5

6

vagrant@celery:~$ rabbitmqadmin get queue=celery ackmode=ack_requeue_true

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| routing_key | exchange | message_count |                                       payload                                       | payload_bytes | payload_encoding | redelivered |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| celery      |          | 0             | [[15, 30], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}] | 83            | string           | False       |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

ackmode=ack_requeue_true, 所以消息仍然在队列中, Redis 中什么也还没发生,接下来要

启动 Celery Worker

要用到 celery 命令,不过只要是 Python 的程序,命令行能做的事情总是能用 Python 代码来执行,用 celery --help 可看它的详细说明。

$ celery -A tasks worker -l INFO

tasks 是自己创建的模块文件 tasks.py

这时候显示出一条绿绿的芹菜出来了,所以得用屏幕截图来表现

取出消息并显示任务执行完成,这时候去看 RabbitMQ 的队列 celery 中的消息不见了,启动 Worker 后也会在 RabbitMQ 中创建 queue, 及对应的 binding, exchange。

再回到提交任务的 Python 控制台

1

2

3

4

>>> task.status

'SUCCESS'

>>> task.result

45

一个 Celery 全套服务圆满完成。结果存在了 Redis 中

192.168.86.181:6379> keys *
1) "celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33"
192.168.86.181:6379> TTL celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
(integer) 85840
192.168.86.181:6379> get celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
"{\"status\": \"SUCCESS\", \"result\": 45, \"traceback\": null, \"children\": [], \"date_done\": \"2022-01-17T07:23:48.901999\", \"task_id\": \"c3552fa2-502a-450b-933b-19a1da65ba33\"}"

Redis 中的结果保存时长为 24 小时,失败的任务会记录下异常信息。

关于 Worker 的控制查看帮助 celery worker --help, 比如

  1. -c, --concurrency: 并发数,默认为系统中 CPU 的内核数
  2. -P, --pool [prefork|eventlet|gevent|solo|processes|threads]:  worker 池的实现方式
  3. --max-tasks-per-child INTEGER: worker 执行的最大任务数,达到最大数目后便重启当前 worker
  4. -Q, --queues: 指定处理任务的队列名称,逗号分隔

任务的状态变迁是:PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Celery 的配置

除了在声明 Celery 对象时可以指定 broker, backend 属性之外,我们可以用 py 配置文件的形式来配置更多的内容,配置文件 celeryconfig.py, 内容是 Configuration and defautls 中列出的项目

比如 celeryconfig.py

1

2

3

4

5

6

7

broker_url = 'amqp://celery:your-password@192.168.86.181:5672/'

result_backend = 'redis://192.168.86.181:6379'

task_serializer = 'json'

result_serializer = 'json'

accept_content = ['json']

timezone = 'America/Chicago'

enable_utc = True

新的格式是用小写的,旧格式用大写,如 BROKER_URL, 但是同一个配置文件中不能混合大小写,同时写 BROKER_URL 和 result_backend 就不行了。

然后在 tasks.py 中加载配置文件

1

2

3

4

from celery import Celery

import celeryconfig

app = Celery('celery-demo')

app.config_from_object(celeryconfig)

Celery 实时监控工具

Flower 是一个基于 Web 的监控 Celery 中任务的工具,安装和启动

$ pip install flower
$ celery -A tasks flower

打开链接 http://localhost:5555

其他剩下的问题,应该就是如何安排 Worker(比如结合 AutoScaling),从 Python 代码中启动 Worker, 怎么做灵活的配置, 调度任务的执行,其他的 backend 选择等等。

其他补充

backend rpc:// 的组合

如果配置中用

1

2

broker_url = 'amqp://celery:password@192.168.86.50:5672/celery'

result_backend = 'rpc://'

amqp 和 rpc:// 的组合,任务和结果都会存在 RabbitMQ 中

1

2

broker_url = 'redis://192.168.86.50'

result_backend = 'rpc://'

redis 和  rpc:// 的组合,任务和结果都保存在 Redis 中

为什么 Celery 推荐使用 RabbitMQ, 一说是它的一开发人员负责开发过 RabbitMQ, 所以即使使用 Redis 时,也会在 Redis 中写入有关 RabbitMQ 概念的数据,如 exchange, routing key 等。

 常见问题

Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案

先安装eventlet

pip install eventlet

然后,启动worker的时候加一个参数,如下:

celery -A <moduleName> worker -l info -P eventlet

然后就可以正常运行worker执行任务了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1646429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

状压dp 理论例题 详解

状压dp 四川2005年省选题&#xff1a;互不侵犯 首先我们可以分析一下&#xff0c;按照我们普通的思路&#xff0c;就是用搜索&#xff0c;枚举每一行的每一列&#xff0c;尝试放下一个国王&#xff0c;然后标记&#xff0c;继续枚举下一行 那么&#xff0c;我们的时间复杂度…

什么是X电容和Y电容?

先补充个知识&#xff1a; 一、什么是差模信号和共模信号 差模信号&#xff1a;大小相等&#xff0c;方向相反的交流信号&#xff1b;双端输入时&#xff0c;两个信号的相位相差180度 共模信号&#xff1a;大小相等。方向相同。双端输入时&#xff0c;两个信号相同。 二、安规…

嵌入式5-6QT

1> 思维导图 2> 自由发挥应用场景&#xff0c;实现登录界面。 要求&#xff1a;尽量每行代码都有注释。 #include "mywidget.h"MyWidget::MyWidget(QWidget *parent): QWidget(parent) {//设置标题this->setWindowTitle("MYQQ");//设置图标this…

5月6号作业

申请该结构体数组&#xff0c;容量为5&#xff0c;初始化5个学生的信息 使用fprintf将数组中的5个学生信息&#xff0c;保存到文件中去 下一次程序运行的时候&#xff0c;使用fscanf&#xff0c;将文件中的5个学生信息&#xff0c;写入(加载)到数组中去&#xff0c;并直接输出学…

PyTorch机器学习实现液态神经网络

大家好&#xff0c;人工智能的发展催生了神经网络这一强大的预测工具&#xff0c;这些网络通过数据和参数优化生成预测&#xff0c;每个神经元像逻辑回归门一样工作。结合反向传播技术&#xff0c;模型能够根据损失函数来调整参数权重&#xff0c;实现自我优化。 然而&#xf…

题目:排序疑惑

问题描述&#xff1a; 解题思路&#xff1a; 做的时候没想到&#xff0c;其实这是以贪心题。我们可以每次排最大的区间&#xff08;小于n&#xff0c;即n-1大的区间&#xff09;&#xff0c;再判断是否有序 。因此只需要分别判断排&#xff08;1~n-1&#xff09;和&#xff08;…

算法学习:二分查找

&#x1f525; 引言 在现代计算机科学与软件工程的实践中&#xff0c;高效数据检索是众多应用程序的核心需求之一。二分查找算法&#xff0c;作为解决有序序列查询问题的高效策略&#xff0c;凭借其对数时间复杂度的优越性能&#xff0c;占据着算法领域里举足轻重的地位。本篇内…

open 函数到底做了什么

使用设备之前我们通常都需要调用 open 函数&#xff0c;这个函数一般用于设备专有数据的初始化&#xff0c;申请相关资源及进行设备的初始化等工作&#xff0c;对于简单的设备而言&#xff0c;open 函数可以不做具体的工作&#xff0c;你在应用层通过系统调用 open 打开设备…

Unity 修复Sentinel key not found (h0007)错误

这个问题是第二次遇到了&#xff0c;上次稀里糊涂的解决了&#xff0c;也没当回事&#xff0c;这次又跑出来了&#xff0c;网上找的教程大部分都是出自一个人。 1.删除这个路径下的文件 C:\ProgramData\SafeNet Sentinel&#xff0c;注意ProgramData好像是隐藏文件 2.在Windows…

04-19 周五 GitHub actions-runner 程序解释

04-19 周五 GitHub actions-runner 程序解释 时间版本修改人描述2024年4月19日17:26:17V0.1宋全恒新建文档 简介 本文主要描述了actions-runner-linux-x64-2.315.0.tar.gz这个github actions CI所需要的客户端安装包的重要文件和内容信息。有关GitHub actions 的配置&#xff…

利用matplotlib和networkx绘制有向图[显示边的权重]

使用Python中的matplotlib和networkx库来绘制一个有向图&#xff0c;并显示边的权重标签。 1. 定义了节点和边&#xff1a;节点是一个包含5个节点的列表&#xff0c;边是一个包含各个边以及它们的权重的列表。 2. 创建了一个有向图对象 G。 3. 向图中添加节点和边。 4. 设置了…

Elasticsearch:如何使用 Java 对索引进行 ES|QL 的查询

在我之前的文章 “Elasticsearch&#xff1a;对 Java 对象的 ES|QL 查询”&#xff0c;我详细介绍了如何使用 Java 来对 ES|QL 进行查询。对于不是很熟悉 Elasticsearch 的开发者来说&#xff0c;那篇文章里的例子还是不能单独来进行运行。在今天的这篇文章中&#xff0c;我来详…

外贸企业邮箱是什么?做外贸企业邮箱哪个好?

外贸企业邮箱是什么&#xff1f;外贸企业在进行跨国沟通时必不可少的工具就是外贸企业邮箱&#xff0c;外贸企业邮箱需要具备的条件就是海外邮件抵达率高、安全稳定、多语言沟通。而我们又怎么选择一个适合的外贸企业邮箱呢&#xff1f;小编今天带您一起了解。 一、外贸企业邮…

MySQL基础_5.多表查询

文章目录 一、多表连接1.1、笛卡尔积&#xff08;或交叉连接&#xff09; 二、多表查询&#xff08;SQL99语法&#xff09;2.1、内连接(INNER JOIN)2.2、内连接(INNER JOIN) 一、多表连接 多表查询&#xff0c;也称为关联查询&#xff0c;指两个或更多个表一起完成查询操作。 …

一款开源的原神工具箱,专为现代化 Windows 平台设计,旨在改善桌面端玩家的游戏体验

Snap.Hutao 胡桃工具箱是一款以 MIT 协议开源的原神工具箱&#xff0c;专为现代化 Windows 平台设计&#xff0c;旨在改善桌面端玩家的游戏体验。通过将既有的官方资源与开发团队设计的全新功能相结合&#xff0c;提供了一套完整且实用的工具集&#xff0c;且无需依赖任何移动设…

Django开发实战之登录用户鉴权登录界面实现

Django自带的鉴权系统非常的安全&#xff0c;大家可以放心使用&#xff0c;那么如何使用呢&#xff1f; 1、首先需要检查settings文件种的INSTALLED_APPS&#xff0c;有没有这两部分内容&#xff1a; 2、检查中间件&#xff0c;比如这两个中间件&#xff0c;一个是用于登录&a…

【探秘地球宝藏】矿产资源知多少?

当我们仰望高楼林立的城市&#xff0c;乘坐便捷的交通工具&#xff0c;享受各种现代生活的便利时&#xff0c;你是否曾想过这一切背后的支撑力量&#xff1f;答案就藏在我们脚下——矿产资源&#xff0c;这些大自然赋予的宝贵财富&#xff0c;正是现代社会发展的基石。今天&…

使用ThemeRoller快速实现前端页面风格美化

使用ThemeRoller快速实现前端页面风格美化 文章目录 使用ThemeRoller快速实现前端页面风格美化一、ThemeRoller二、使用方法1.基本操作面板介绍2.直接用现成的配色风格——Gallery画廊3.自定义风格——Roll Your Own4.下载风格包并应用到页面 一、ThemeRoller ThemeRoller是jQ…

基于矩阵乘法的GPU烤机python代码(pytorch版)

前言 测试gpu前需要安装Anaconda、pytorch、tmux、nvitop。 单gpu 代码 import numpy as np from tqdm import tqdmProject &#xff1a;gpu-test File &#xff1a;gpu_stress.py Author &#xff1a;xxx Date &#xff1a;2024/4/20 16:13import argparse import …

力扣153. 寻找旋转排序数组中的最小值

Problem: 153. 寻找旋转排序数组中的最小值 文章目录 题目描述思路复杂度Code 题目描述 思路 1.初始化左右指针left和right&#xff0c;指向数组的头和尾&#xff1b; 2.开始二分查找&#xff1a; 2.1.定义退出条件&#xff1a;当left right时退出循环&#xff1b; 2.2.当nums…