Zhong__Celery基本使用详解

news2025/1/15 17:26:53

时间:2023.03.10

环境:python3/centos/redis

目的:演示celery基本使用的详细案例

说明:python依赖的版本以requirement.txt文件为测试基准 不同版本可能存在差异

作者:Zhong

简介

简介及概念介绍部分不会很详细 主要看demo项目代码

Celery包含如下组件:

1. Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

2. Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。

3. Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

4. Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

核心架构图

Broker(消息中间件)

用来接收调用任务的生产者发送的任务 分发到相应worker执行

worker(消费者)

执行任务的实体

启动一个worker

# 处理所有的默认队列任务
celery -A celery_test worker -l INFO

# 处理指定的队列任务
celery -A proj worker --loglevel=info -Q queue1,queue2

启动多个worker分别处理不同的队列任务

celery -A proj worker --loglevel=info -Q queue1
celery -A proj worker --loglevel=info -Q queue2

beat(调度器)

beat是一个调度器,它可以指定在什么时候某个worker来执行某个任务。如果我们想周期执行/定时执行某个任务 需要增加beat_schedule配置信息 在celery指定的配置文件中配置

beat_schedule = {
    # 周期任务/定时任务
    'every-5-minute':
        {
            'task': 'proj.tasks.period_task',
            'schedule': 300.0,
            'args': (10, 20),
        },
    'add-every-monday-morning': {
        'task': 'proj.tasks.period_task',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (100, 200),
    },
}

开启一个celery beat服务

celery -A proj beat

celery需要保存上次任务运行的时间在数据文件中,文件在当前目录下名字叫celerybeat-schedule beat需要访问此文件

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

Routing(任务路由分发)

配置文件中指定任务发送到哪个队列中执行

task_routes=({
    'proj.tasks.task1': {'queue': 'queue1'},
    'proj.tasks.task2': {'queue': 'queue1'},
    'proj.tasks.task3': {'queue': 'queue2'},
    },
)

也可以通过apply_async()方法设置任务发送到哪个队列中执行

task1.apply_async(queue='queue1')

Producer(生产者)

调用任务 任务生产者

from proj.tasks import add

# 直接调用 不会传递到worker执行 会在当前进程直接执行
add(1, 2)

# delay 发送任务到broker 然后调度到worker执行
add.delay(1, 2)

# apply_async 发送任务到broker 然后调度到worker执行 可设置一些任务执行的参数
add.apply_async((1, 2), queue='vip', countdown=10)

无论是delay()还是apply_async()方式都会返回AsyncResult对象,方便跟踪任务执行状态,但需要我们配置result_backend. 每一个被调用的任务都会被分配一个ID,我们叫做Task ID.

Result Backend

存储任务信息 包括状态、结果等

# 配置文件中指定
result_backend = 'redis://192.168.1.1:6379/1'

# Celery配置中指定
app = Celery('proj',
             broker='redis://192.168.1.1:6379/0',
             backend='redis://192.168.1.1:63799/1',
             include=['proj.tasks'])

获取任务结果 返回的是一个AsyncResult对象

res = add.delay(1, 2)
print(res.get(timeout=30))

如果没有指定Result Backend是不能获取返回结果的 提示: NotImplementedError: No result backend is configured.

详细可配置项目选项可参考官网文档

celery项目选项配置

项目demo

通过一个demo演示各种task的基本使用

代码仓库

详见gitee: celery_demo

项目目录结构

celery_demo/

proj/

moduleA/

__init__.py

tasks.py

moduleB/

__init__.py

mod_b.py

moduleC/

tasks.py

__init__.py

celery.py

celery_config.py

tasks.py

call_tasks.py

requirement.txt

部署

redis

本demo使用redis作为broker 也可以根据需求使用mq等为消息中间件

安装redis 也可以使用docker启动 配置可远程访问

python环境

在各个要部署代码的主机上安装配置响应的python3环境 建议版本3.7及以上 并安装依赖文件requirement.txt 建议使用python的虚拟环境安装

pip3 install -i https://pypi.douban.com/simple -r requirement.txt

主机网络

部署测试demo的主机建议使用linux系统如centos 不要直接使用windows 除非你可以包装为服务来避免各种问题

测试可全部在一台主机上部署 也可以部署在不同的服务器上 保证各主机可通讯正常 主要是可与redis服务正常通讯

ip为192.168.1.1的服务器上 部署redis redis地址不能为本地地址如127.0.0.1 别的主机要能访问到它 单机部署无所谓

ip为192.168.1.2的服务器上 部署beat

ip为192.168.1.3的主机部署一个worker处理默认的task

ip为192.168.1.4的主机部署一个worker处理queue为queue1的task

ip为192.168.1.5的主机部署一个worker处理queue为queue2的task

代码分发

将相同的代码分别复制到ip为192.168.1.2-192.168.1.5的主机上 安装python依赖 当然根据需求部分代码如调用的task代码可以不同

启动

192.168.1.1主机启动redis

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info -Q queue1

192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行

celery -A proj worker --loglevel=info -Q queue2

ip为192.168.1.2主机启动beat

celery -A proj beat -s ./celerybeat-schedule

当所有服务启动后 可以观察到worker主机会输出celery及task相关的信息 还有各个worker节点同步的信息 它们主要是通过broker代理来保持协作

在指定的beat任务执行时 可以看到worker处理beat task的输出 主要根据机制算法分发到某台worker去执行任务

测试

在某台主机调用call_tasks.py

python3 call_tasks.py

可以观察到默认的task 指定queue的task 都在对应的worker主机上得到了执行

Note

celery可设置多种类型的任务 可以集成到其它框架如Django使用

更多详情见官网文档 ... ...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/402418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【HTTP协议与Web服务器】

HTTP协议与Web服务器浏览器与服务器通信过程HTTP的请求报头HTTP请求报头结构HTTP的请求方法HTTP应答报头HTTP应答报头结构应答状态web服务器的c语言实现浏览器与服务器通信过程 浏览器与Web服务器再应用层通信使用的是HTTP协议,而HTTP协议在传输层使用的是TCP协议。…

深度学习必备知识——模型数据集Yolo与Voc格式文件相互转化

在深度学习中,第一步要做的往往就是处理数据集,尤其是学习百度飞桨PaddlePaddle的小伙伴,数据集经常要用Voc格式的,比如性能突出的ppyolo等模型。所以学会数据集转化的本领是十分必要的。这篇博客就带你一起进行Yolo与Voc格式的相互转化&…

数据库系统概论

文章目录前言基础篇:1-5章第 1 章 绪论1.1 数据库系统概述1.2 数据模型1.3 数据库系统的结构1.4 数据库系统的组成1.5 小结第 2 章 关系数据库1.关系模型1.1 关系数据结构1.2 关系完整性约束实体完整性、参照完整性、用户定义完整性2.关系代数8种关系代数运算符并 ∪…

「媒体邀约」如何选择适合的媒体公关,媒体服务供应商

传媒如春雨,润物细无声,大家好,我是51媒体网胡老师。 每天胡老师也会接到大量关于媒体方面的询问,胡老师也都一一的很耐心的进行了解答,也都很详细的做了媒体规划和媒体传播方案,但有的朋友还是很犹豫&…

关于 @Aspect 注解的使用

一、Spring AOPAOP(Aspect Oriented Programming) 是一种面向切面的编程思想。面向切面编程是将程序抽象成各个切面,即解剖对象的内部,将那些影响了多个类的公共行为抽取到一个可重用模块里,减少系统的重复代码,降低模块间的耦合度…

Hive小结

Hive的定义hive是一个建立在Hadoop上的开源数据仓库软件,可以将结构化的数据文件映射为一张数据库表,基于表提供了一种类似SQL的查询模型,称为hive查询语言(HQL),用于访问和分析存储在Hadoop文件中的大型数…

Uipath Excel 自动化系列12-InsertDeleteSheet(新增删除Sheet)

活动描述 Insert Sheet 新增Sheet:在 Excel 文件中插入工作表,该活动需与Use Excel File 活动选择的 Excel 文件一起使用。 Delete Sheet 删除Sheet:从 Excel 文件中删除指定工作表,该活动需与Use Excel File 活动选择的 Excel 文件一起使用。 使用如下图: Inser…

【React教程】一、React简介

一、React简介 React是一个用于构建用户界面的JavaScript库,它是Facebook的内部项目,用来架设Instagram的网站,并于2013年5月开源。React主要用于构建Ul,很多人认为React 是 MVC 中的 V(视图)。由于拥有较高的性能&…

计算机组成原理——计算机系统概述

文章目录计算机系统的组成计算机硬件冯诺依曼结构计算机的功能部件计算机软件系统软件和应用软件三个级别的语言计算机的性能指标字长数据通路宽度主存容量运算速度计算机系统的组成 计算机系统由硬件系统和软件系统组成: 硬件是指有形的物理设备,是计…

【CICD】如何编写 .gitlab-ci.yml 文件

⏳ CICD 指的是持续集成/持续交付(continuous integration/ continuous delivery),是为了满足互联网、金融公司快速迭代项目的需要而提出的一种软件开发思想。大致思路是通过编写自动化脚本,使新代码必须通过一些规则核查后才能部…

自定义控件(?/N) - 事件分发

一、外部传递到ViewGroup中Activity会通过 getWindow( ) 获取PhoneWindow对象并调用它的superDispatchTouchEvent( ),该方法会调用它(PhoneWindow)的内部类 DecorView 的 superDispatchTouchEvent( ),而它(DecorView&a…

【Docker】P1 初识 Docker 以及 Ubuntu 安装 Docker

初识 Docker 以及 Ubuntu 安装 Docker初识 Docker故事引入DockerUbuntu 安装 Docker读完本文,你应当会理解这两句话: Docker 可以大大简化运维部署相关操作,可以规避一些 bug; Docker 是一种容器技术,解决软件跨环境迁…

使用Houdini输出四面体网格并输出tetgen格式

我们的目标是从houdini输出生成的四面体,希望是tetgen格式的。 众所周知,houdini是不能直接输出四面体的。 有三方案去解决: 输出点云ply文件,然后利用tetgen生成网格。输出Hounidi内置的.geo格式文件,然后写个脚本…

[Java Web]Request对象 | 超1w字带你熟悉Servlet中的request请求

⭐作者介绍:大二本科网络工程专业在读,持续学习Java,输出优质文章 ⭐所属专栏:Java Web ⭐如果觉得文章写的不错,欢迎点个关注😉有写的不好的地方也欢迎指正,一同进步😁 目录 Reque…

Codeforces Round 857 (Div. 2)【A-C】

文章目录A. Likes【贪心、模拟】B. Settlement of Guinea Pigs【贪心】C. The Very Beautiful Blanket【构造、观察】链接传送门A. Likes【贪心、模拟】 分析 为了使得当前时间点赞的尽可能大,那么前面的赞的数目也要尽可能大,所以前面把能赞的都要先赞…

2-8 SpringCloud快速开发入门: Eureka 服务注册中心自我保护机制

接上一章节Eureka 注册中心高可用集群搭建,这里讲讲Eureka 服务注册中心自我保护机制 Eureka 服务注册中心自我保护机制 自我保护机制是 Eureka 注册中心的重要特性,当 Eureka 注册中心进入自我保护模式时,在 Eureka Server 首页会输出如下警…

Python JS逆向篇(一)

Python JS逆向篇(一)效果实现思路最后一步逆向 p.a.HmacSHA256(t, s["a"].state.commonStore.cupid_sign_key)JS实现py实现(先苦后甜)逆向主题:51job请求头headers中携带的sign参数。 (注&#x…

Windows基于Nginx搭建RTMP流媒体服务器(附带所有组件下载地址及验证方法)

RTMP服务时常用于直播时提供拉流推流传输数据的一种服务。前段时间由于朋友想搭建一套直播时提供稳定数据传输的服务器,所以就研究了一下如何搭建及使用。 1、下载nginx 首先我们要知道一般nginx不能直接配置rtmp服务,在Windows系统上需要特殊nginx版本…

centos8 安装 pcs pacemaker

一、背景 在centos-8中安装pcs、pacemaker会显示找不到源 (yum install pcs pacemaker 也是一样的) 通过搜索引擎,有说:dnf config-manager --set-enable HighAvailability 也有的说:执行dnf update 也有的说执行 dn…

AB测试——流程介绍(设计实验)

前言: 作为AB测试的学习记录,接上文内容, 本文继续介绍假设建立和实验设计部分,包括实验对象、样本量计算(显著性水平、统计功效及最小可检测效应)、实验周期。 相关文章: AB测试——原理介绍 A…