分布式队列celery学习

news2024/9/20 20:29:32

说明:本文内容来自《python自动化运维快速入门》学习

一、介绍

Celery是由纯Python编写的,但协议可以用任何语言实现。目前,已有Ruby实现的RCelery、Node.js实现的node-celery及一个PHP客户端,语言互通也可以通过using webhooks实现。

1.celery概念

任务队列: 简单来说,任务队列就是存放着任务的队列,客户端将要执行任务的消息放入任务队列中,执行节点worker进程持续监视队列,如果有新的任务,就取出来执行该任务。这种机制就像生产者、消费者模型一样,客户端作为生产者,执行节点worker作为消费者,它们之前通过任务队列进行传递,如图所示。
在这里插入图片描述在这里插入图片描述
中间人(broker): Celery用于消息通信,通常使用中间人(broker)在客户端和worker之前传递,这个过程从客户端向队列添加消息开始,之后中间人把消息派送给worker。官方给出的实现broker的工具可参见下表。

在实际使用中,我们选择RabbitMQ或Redis作为中间人。

在这里插入图片描述
任务生产者: 调用Celery提供的API、函数、装饰器产生任务并交给任务队列的都是任务生产者。
执行单元worker: 属于任务队列的消费者,持续地监控任务队列,当队列中有新的任务时,便取出来执行。
任务结果存储backend: 用来存储worker执行任务的结果,Celery支持不同的方式存储任务的结果,包括AMQP、Redis、memcached、MongoDB、SQLAlchemy等。
任务调度器Beat:Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列。

2.celery重要特性

高可用性: 如果连接丢失或失败,worker和客户端就会自动重试,并且中间人通过主/主,主/从方式来提高可用性。
快速: 单个Celery进程每分钟执行数以百万计的任务,且保持往返延迟在亚毫秒级(使用RabbitMQ、py-librabbitmq和优化过的设置),可以选择多进程、Eventlet和Gevent三种模式并发执行。
灵活: Celery几乎所有模块都可以扩展或单独使用。可以自制连接池、序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、中间人传输或更多。
框架集成: Celery易于与Web框架集成,其中的一些甚至已经有了集成包,如django-celery、pyramid_celery、celery-pylons、web2py-celery、tornado-celery。因此,学习Celery具有很强的实用价值。
强大的调度功能: Celery Beat进程来实现强大的调度功能,可以指定任务在若干秒后或指定一个时间点(datetime类)来运行,也可以基于单纯的时间间隔或支持分钟、小时、每周的第几天、每月的第几天以及每年的第几个月的crontab表达式来使用周期任务调度。
易监控: 可以方便地查看定时任务的执行情况,如执行是否成功、当前状态、完成任务花费的时间等,还可以使用功能完备的管理后台或命令行添加、更新、删除任务,提供完善的错误处理机制。

二、启动

django_core目录下启动项目

启动命令

celery -A myCeleryProj.app worker -c 3 -l info

-c 3表示启用三个子进程执行该队列中的任务。运行结果如下:

(venv) PS D:\01-code\django_core> celery -A celery_app.start_celery worker -c 3 -l info
 
 -------------- celery@zhongrf v4.4.0 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.22000-SP0 2022-12-04 15:28:52
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x1fbf249e048
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 3 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery_app.tasks.task1.add

[2022-12-04 15:28:52,189: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2022-12-04 15:28:52,189: INFO/MainProcess] mingle: searching for neighbors
[2022-12-04 15:28:52,424: INFO/SpawnPoolWorker-1] child process 63380 calling self.run()
[2022-12-04 15:28:52,439: INFO/SpawnPoolWorker-2] child process 61552 calling self.run()
[2022-12-04 15:28:52,439: INFO/SpawnPoolWorker-3] child process 60512 calling self.run()
[2022-12-04 15:28:53,206: INFO/MainProcess] mingle: all alone
[2022-12-04 15:28:53,206: INFO/MainProcess] celery@zhongrf ready.

更多启动Celery worker的方法如下:

# 设置处理任务队列的子进程个数为10。
celery -A myCeleryProj.app worker -c10 -l info 
# 设置处理任务队列为web_task。
celery -A myCeleryProj.app worker -Q web_task -l info 
# 设置后台运行并指定日志文件位置。
celery -A myCeleryProj.app worker –logfile /tmp/celery.log -l info -D

在window环境,celery4不支持多启动多个worker,需要加上eventlet

celery -A myCeleryProj.app worker -P eventlet -c10 -l info 

调用task的方法有以下三种

1.使用apply_async(args[, kwargs[, …]])发送一个task到任务队列

支持更多的控制,如add.apply_async(countdown=10)表示执行add函数的时间限制最多为10秒;

add.apply_async(countdown=10, expires=120)表示执行add函数的时间限制最多为10秒,add函数的有效期为120秒;

add.apply_async(expires=now + timedelta(days=2))表示执行add函数的有效期为两天。使用apply_async还支持回调,假如任务函数如下:

@app.task
def add(x, y):
return x + y

那么

add.apply_async((2, 2), link=add.s(16))  # 就相当于(2 + 2) + 16 = 20。

2.使用delay(*args, **kwargs)

该方法是apply_async的快捷方式,提供便捷的异步调度,但是如果想要更多的控制,就必须使用方法1。使用delay就像调用普通函数那样,非常简便,如下所示。

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

3.直接调用,相当于普通的函数调用,不在worker上执行。

三、celery架构

celery完整架构如下:
在这里插入图片描述
任务生产者产生任务并将任务发送到中间人

有多个消费者,即执行单元worker持续地监控消息中间人,如有属于自己队列的任务需要执行,就从中间人那里取出作业名称,查找对应的函数代码并执行,执行完成后将结果存储在Backend。这里的Worker可以分布式部署,彼此之间是独立的。

任务调度器Beat:Celery Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给中间人。

四、Celery 队列

Celery非常容易设置和运行,它通常会使用默认名为Celery的队列(可以通过CELERY_DEFAULT_QUEUE修改)来存放任务。Celery支持同时运行多个队列,还可以使用优先级不同的队列来确保高优先级的任务不需要等待就立即得到响应。

我们来实现不同的队列来执行不同的任务:使任务add在队列default中运行;taskA在队列task_A中运行;taskB在队列task_B中运行。

celery队列配置

celery_app/config/celery_config.py

from kombu import Queue

CELERY_QUEUES = (  # 定义任务队列
    Queue("default", routing_key="task.#"),  # 路由键以“task.”开头的消息都进default队列
    Queue("tasks_A", routing_key="A.#"),  # 路由键以“A.”开头的消息都进tasks_A队列
    Queue("tasks_B", routing_key="B.#"),  # 路由键以“B.”开头的消息都进tasks_B队列
)

CELERY_ROUTES = (
    [
        ("celery_queue_tasks.celery_queue_task.add", {"queue": "default"}),  # 将add任务分配至队列 default
        ("celery_queue_tasks.celery_queue_task.taskA", {"queue": "tasks_A"}),  # 将taskA任务分配至队列 tasks_A
        ("celery_queue_tasks.celery_queue_task.taskB", {"queue": "tasks_B"}),  # 将taskB任务分配至队列 tasks_B
    ],
)

celery 队列启动

开启三个终端窗口,分别启动三个队列的worker,执行以下命令。

celery -A celery_queue_tasks.start_queue_celery worker -Q default -l info
celery -A celery_queue_tasks.start_queue_celery worker -Q tasks_A -l info
celery -A celery_queue_tasks.start_queue_celery worker -Q tasks_B -l info

# 或者一次启动多个队列
 celery -A celery_queue_tasks.start_queue_celery worker -Q default,tasks_A,tasks_B -P eventlet -c 3 -l info

最后开启一个窗口来调用task。

>>> from myCeleryProj.tasks import *
>>> add.delay(4,5);taskA.delay();taskB.delay()

五、Celery Beat任务调度

Celery Beat是Celery的调度器,其定期启动任务,然后由集群中的可用工作节点worker执行这些任务。

默认情况下,Beat进程读取配置文件中CELERYBEAT_SCHEDULE的设置,也可以使用自定义存储,比如将启动任务的规则存储在SQL数据库中。请确保每次只为调度任务运行一个调度程序,否则任务将被重复执行。使用集群的方式意味着调度不需要同步,服务可以在不使用锁的情况下运行。

先明确一个概念——时区。间隔性任务调度默认使用UTC时区,也可以通过时区设置来改变时区。例如:

CELERY_TIMEZONE = 'Asia/Shanghai'  # 通过配置文件设置
app.conf.timezone = 'Asia/Shanghai' #直接在Celery app的源代码中设置

时区的设置必须加入CeleryApp中,默认的调度器(将调度计划存储在celerybeat-schedule文件中)将自动检测时区是否改变,如果时区改变,则自动重置调度计划。其他调度器可能不会自动重置,比如Django数据库调度器就需要手动重置调度计划。

配置

from datetime import timedelta

from celery.schedules import crontab

BROKER_URL = 'redis://127.0.0.1:6379/0'  # 使用redis 作为消息代理

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'  # 任务结果存在Redis

CELERY_RESULT_SERIALIZER = 'json'  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显

CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
    "add": {
        "task": "celery_beat_tasks.celery_beat_task.add",
        "schedule": timedelta(seconds=10),  # 定义间隔为10s的任务
        "args": (10, 16),
    },
    "taskA": {
        "task": "celery_beat_tasks.celery_beat_task.taskA",
        "schedule": crontab(hour=19, minute=50),  # 定义间隔为对应时区下21:11分执行的任务
    },
    "taskB": {
        "task": "celery_beat_tasks.celery_beat_task.taskB",
        "schedule": crontab(hour=21, minute=8),  # 定义间隔为对应时区下21:8分执行的任务
    },
}

启用

启用Celery Beat进程处理调度任务

celery -A celery_beat_tasks.start_celery_beat beat -l info

最后可以在worker界面看到定时或间隔任务的处理情况

celery -A celery_beat_tasks.start_celery_beat worker -P eventlet -c 3 -l info 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/60517.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]JAVA毕业设计客户台账管理(系统+LW)

[附源码]JAVA毕业设计客户台账管理(系统LW) 目运行 环境项配置: Jdk1.8 Tomcat8.5 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术&…

Activiti7工作流(二)

流程定义相关 流程定义查询 查询流程相关信息,包含流程定义,流程部署,流程定义版本 Test public void testDefinitionQuery(){//创建ProcessEngine对象ProcessEngine processEngine ProcessEngines.getDefaultProcessEngine();//获取仓库…

自动识别验证码实现系统自动登录(可扩展实现无人自动化操作,如领取各个平台的优惠券),不依赖第三方可以支持离线识别处理,附源码可直接运行

自动识别验证码实现系统自动登录(可扩展实现无人自动化操作,如领取各个平台的优惠券),不依赖第三方可以支持离线识别处理,附源码可直接运行。 实现过程: 1、只要是图片验证码都支持识别; 2、通过百度API实现验证码识别;(依赖第三方,且需要连接互联网,内网不可用,实…

7-FreeRTOS软件定时器

1- 简介 1.1 软件定时器简述 软件定时器就是允许函数设置一定的等待时间,然后执行。定时器执行的函数被称为定时器的回调函数。定时器从启动到执行回调函数之间的时间称为定时器的周期。定时器的回调函数在定时器的时间到达时执行。 软件定时器要先创建才能使用。…

实战Docker未授权访问提权

1、fofa关键字 port“2375” && body“page not found” 2、docker -H tcp://ip:port 可查看到当前所有的实例 3、docker -H tcp://ip:port pull alpine 4、docker -H tcp://ip:port run -it --privileged alpine bin/sh 5、fdisk -l 查看其分区结构 6、创建一个…

Java安全-CC1

CC1 这里用的是组长的链子和yso好像不太一样&#xff0c;不过大体上都是差不多的。后半条的链子都是一样的&#xff0c;而且这条更短更易理解。yso的CC1过段时间再看一下。 环境 Maven依赖&#xff1a; <dependencies><dependency><groupId>commons-colle…

十四、使用 Vue Router 开发单页应用(3)

本章概要 命名路由命名视图编程式导航传递 prop 到路由组件HTML 5 history 模式 14.5 命名路由 有时通过一个名称来标识路由会更方便&#xff0c;特别是在链接到路由&#xff0c;或者执行导航时。可以在创建 Router 实例时&#xff0c;在routes 选项中为路由设置名称。 修改…

用Unity实现FXAA

用Unity实现FXAAFXAA是现代的常用抗锯齿手段之一&#xff0c;这次我们来在Unity中从零开始实现它。 首先我们来看一个测试场景&#xff0c;我们在Game视角下将scale拉到2x&#xff1a; 可以看到画面的锯齿比较严重&#xff0c;下面我们将一步一步地实现FXAA&#xff0c;消除锯…

BDD - SpecFlow ExternalData Plugin 导入外部测试数据

BDD - SpecFlow ExternalData Plugin 导入外部测试数据引言SpecFlow ExternalData 插件支持的数据源Tags实践创建一个 Class Libary Project添加 NuGet Packages添加测试数据源文件CSV 文件Excel 文件添加 Feature 文件实现 Step Definition执行Scenario 导入测试数据源Scenari…

深入URP之Shader篇4: Depth Only Pass

Depth only pass unlit shader中包含了一个Depth Only Pass&#xff0c;这个pass的代码在Packages\com.unity.render-pipelines.universal\Shaders\DepthOnlyPass.hlsl中。这是一个公共pass&#xff0c;几乎所有的URP shader都会包含这个pass。本篇说一说这个pass的作用以及实…

Ubuntu映射到Windows网络驱动器

将虚拟机Ubuntu映射到Windows网络驱动器中&#xff0c;我们需要Ubuntu的网络和主机网络处于同一网段下&#xff0c;然后使Ubuntu具备共享文件功能&#xff0c;最后在windows下添加网络地址。 将Ubuntu设置和主机同一网段 查看主机网络信息 在虚拟机中 选择编辑-- 虚拟网络编…

Java的字符串String

文章目录什么是字符串String类的声明为什么我们的String是不可变的为什么String类用final修饰String的创建字符串比较相等关于Java中的比较关于字符串不同赋值操作对应的内存分配那对象如何进行比较内容字符串常量池StringTalbe的位置字符串常见的操作拼接操作获得字符串的子串…

事件驱动的微服务、CQRS、SAGA、Axon、Spring Boot

事件驱动的微服务、CQRS、SAGA、Axon、Spring Boot 学习构建分布式事件驱动的微服务、CQRS、事件溯源、SAGA、事务 课程英文名&#xff1a;Event-Driven Microservices, CQRS, SAGA, Axon, Spring Boot 此视频教程共10.0小时&#xff0c;中英双语字幕&#xff0c;画质清晰无…

一个带有楼中楼的评论系统数据库设置思路

前言 有个需求&#xff0c;需要实现百度贴吧那样能评论帖子中某一楼的评论里的评论 分析 说起来有点拗口&#xff0c;其实这个评论系统分为4个部分&#xff1a; 主题&#xff08;楼主发布的帖子&#xff09;直接返回楼主的评论&#xff08;从帖&#xff09;&#xff1a;直接…

(11)点云数据处理学习——Colored point cloud registration(彩色点注册)

1、主要参考 &#xff08;1&#xff09;官网介绍 Colored point cloud registration — Open3D 0.16.0 documentation 2、原理和实现 2.1原理 本教程演示了使用几何形状和颜色进行配准的ICP变体。实现了[Park2017]算法。颜色信息锁定沿切平面的对齐。因此&#xff0c;该算法…

Yocto创建自己的分区(基于STM32MP1)

Yocto创建自己的分区&#xff08;基于STM32MP1&#xff09; 前几章节我们分析了machine class里面几篇关键的class&#xff0c;还有machine conf里面的inc文件&#xff0c;大致的创建分区的流程都比较清晰了&#xff0c;本章节动手实际操作一把&#xff0c;创建一个自己的分区…

Unity中的协程

一、什么是协程 协程(Coroutines) 是一种比线程更加轻量级的存在&#xff0c;也被称为用户态线程一个进程可以拥有多个线程&#xff0c;一个线程可以拥有多个协程协程并不会增加线程&#xff0c;它在线程中运行&#xff0c;通过分时复用的方式运行多个协程&#xff0c;其切换代…

《Spring 5.x源码解析之Spring AOP 注解驱动使用及其实现原理》

《Spring 5.x源码解析之Spring AOP 注解驱动使用及其实现原理》 学好路更宽&#xff0c;钱多少加班。---- mercyblitz 一、前言 大家好&#xff0c;欢迎阅读《Spring 5.x源码解析》系列&#xff0c;本篇作为该系列的第二篇&#xff0c;重点介绍Spring AOP在注解驱动编程模式上的…

基于J2EE的大型视频影音系统的设计与实现

目 录 毕业设计&#xff08;论文&#xff09;任务书 I 摘 要 II ABSTRACT III 第1章 绪 论 1 1.1 课题的提出 1 1.1.1 Web2.0浪潮进一步影响全球互联网发展 1 1.1.2 视频分享成为2.0浪潮的最新爆发点 1 1.2 系统研究目的 2 1.3 系统设计目标 2 第2章 关键技术介绍 4 2.1 网页…

C#使用策略模式或者委托替代多IfElse判断和Switch语句

这篇文件介绍使用设计模式中的策略模式和委托来解决多个IfElse判断语句和Switch语句&#xff0c;这种替换方式在其他语言也一样可以做到&#xff0c;比如PHP、JavaScript、Python或者Java等。 这里以C#为例进行演示。 需要为一个程序编写计算方法&#xff0c;根据标签名称来决定…