时间:2023.03.10
环境:python3/centos/redis
目的:演示celery基本使用的详细案例
说明:python依赖的版本以requirement.txt文件为测试基准 不同版本可能存在差异
作者:Zhong
简介
简介及概念介绍部分不会很详细 主要看demo项目代码
Celery包含如下组件:
1. Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
2. Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
3. Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
4. Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
核心架构图
Broker(消息中间件)
用来接收调用任务的生产者发送的任务 分发到相应worker执行
worker(消费者)
执行任务的实体
启动一个worker
# 处理所有的默认队列任务
celery -A celery_test worker -l INFO
# 处理指定的队列任务
celery -A proj worker --loglevel=info -Q queue1,queue2
启动多个worker分别处理不同的队列任务
celery -A proj worker --loglevel=info -Q queue1
celery -A proj worker --loglevel=info -Q queue2
beat(调度器)
beat是一个调度器,它可以指定在什么时候某个worker来执行某个任务。如果我们想周期执行/定时执行某个任务 需要增加beat_schedule配置信息 在celery指定的配置文件中配置
beat_schedule = {
# 周期任务/定时任务
'every-5-minute':
{
'task': 'proj.tasks.period_task',
'schedule': 300.0,
'args': (10, 20),
},
'add-every-monday-morning': {
'task': 'proj.tasks.period_task',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (100, 200),
},
}
开启一个celery beat服务
celery -A proj beat
celery需要保存上次任务运行的时间在数据文件中,文件在当前目录下名字叫celerybeat-schedule beat需要访问此文件
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
Routing(任务路由分发)
配置文件中指定任务发送到哪个队列中执行
task_routes=({
'proj.tasks.task1': {'queue': 'queue1'},
'proj.tasks.task2': {'queue': 'queue1'},
'proj.tasks.task3': {'queue': 'queue2'},
},
)
也可以通过apply_async()方法设置任务发送到哪个队列中执行
task1.apply_async(queue='queue1')
Producer(生产者)
调用任务 任务生产者
from proj.tasks import add
# 直接调用 不会传递到worker执行 会在当前进程直接执行
add(1, 2)
# delay 发送任务到broker 然后调度到worker执行
add.delay(1, 2)
# apply_async 发送任务到broker 然后调度到worker执行 可设置一些任务执行的参数
add.apply_async((1, 2), queue='vip', countdown=10)
无论是delay()还是apply_async()方式都会返回AsyncResult对象,方便跟踪任务执行状态,但需要我们配置result_backend. 每一个被调用的任务都会被分配一个ID,我们叫做Task ID.
Result Backend
存储任务信息 包括状态、结果等
# 配置文件中指定
result_backend = 'redis://192.168.1.1:6379/1'
# Celery配置中指定
app = Celery('proj',
broker='redis://192.168.1.1:6379/0',
backend='redis://192.168.1.1:63799/1',
include=['proj.tasks'])
获取任务结果 返回的是一个AsyncResult对象
res = add.delay(1, 2)
print(res.get(timeout=30))
如果没有指定Result Backend是不能获取返回结果的 提示: NotImplementedError: No result backend is configured.
详细可配置项目选项可参考官网文档
celery项目选项配置
项目demo
通过一个demo演示各种task的基本使用
代码仓库
详见gitee: celery_demo
项目目录结构
celery_demo/
proj/
moduleA/
__init__.py
tasks.py
moduleB/
__init__.py
mod_b.py
moduleC/
tasks.py
__init__.py
celery.py
celery_config.py
tasks.py
call_tasks.py
requirement.txt
部署
redis
本demo使用redis作为broker 也可以根据需求使用mq等为消息中间件
安装redis 也可以使用docker启动 配置可远程访问
python环境
在各个要部署代码的主机上安装配置响应的python3环境 建议版本3.7及以上 并安装依赖文件requirement.txt 建议使用python的虚拟环境安装
pip3 install -i https://pypi.douban.com/simple -r requirement.txt
主机网络
部署测试demo的主机建议使用linux系统如centos 不要直接使用windows 除非你可以包装为服务来避免各种问题
测试可全部在一台主机上部署 也可以部署在不同的服务器上 保证各主机可通讯正常 主要是可与redis服务正常通讯
如
ip为192.168.1.1的服务器上 部署redis redis地址不能为本地地址如127.0.0.1 别的主机要能访问到它 单机部署无所谓
ip为192.168.1.2的服务器上 部署beat
ip为192.168.1.3的主机部署一个worker处理默认的task
ip为192.168.1.4的主机部署一个worker处理queue为queue1的task
ip为192.168.1.5的主机部署一个worker处理queue为queue2的task
代码分发
将相同的代码分别复制到ip为192.168.1.2-192.168.1.5的主机上 安装python依赖 当然根据需求部分代码如调用的task代码可以不同
启动
192.168.1.1主机启动redis
192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行
celery -A proj worker --loglevel=info
192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行
celery -A proj worker --loglevel=info -Q queue1
192.168.1.3主机启动worker 在celery_demo目录下proj目录同级执行
celery -A proj worker --loglevel=info -Q queue2
ip为192.168.1.2主机启动beat
celery -A proj beat -s ./celerybeat-schedule
当所有服务启动后 可以观察到worker主机会输出celery及task相关的信息 还有各个worker节点同步的信息 它们主要是通过broker代理来保持协作
在指定的beat任务执行时 可以看到worker处理beat task的输出 主要根据机制算法分发到某台worker去执行任务
测试
在某台主机调用call_tasks.py
python3 call_tasks.py
可以观察到默认的task 指定queue的task 都在对应的worker主机上得到了执行
Note
celery可设置多种类型的任务 可以集成到其它框架如Django使用
更多详情见官网文档 ... ...