Celery概述
Celery是一个分布式任务调度模块,用于在Python中处理异步任务。它允许你创建任务,并发送给工作节点执行。
Celery常常用于我们说的脏活,累活,处理耗时的操作,如发送电子邮件、处理数据、执行计算等。
上手非常简单,并且有很大的可扩展性。本篇主要介绍Celery的基本使用方法。
- 部署Redis&RabbitMQ
使用RabbitMQ用作broker,Redis用作backend存放结果数据。具体部署此处不表,可以使用docker自行部署。
- 安装库
# 安装celery
pip3 install celery -i https://pypi.tuna.tsinghua.edu.cn/simple
# redis包建议安装,不然可能会报错
pip3 install redis -i https://pypi.tuna.tsinghua.edu.cn/simple
celery应用
- 创建实例
# tasks.py 文件名需与实例化对象第一个参数一致
import time
from celery import Celery
redis_url = 'redis://Password@IP:port/0'
rabbitmq_host = 'amqp://Uername:Password@IP:Port//v_host'
# 实例化Celery对象
app = Celery('tasks', # 同当前模块名
broker= rabbitmq_host, # 消息中间件URL,此处使用RabbitMQ,也可使用Redis
backend= redis_url, # 后端结果存储,使用Redis进行存放
)
- 任务定义
# tasks.py
# 使用实例化对象 @.task装饰器装饰的方法即成为任务
@app.task
def add(x, y):
print("start...")
time.sleep(5)
print("end...")
return x + y
- 启动worker
celery worker [OPTIONS]
参数 | 含义 |
---|---|
-A或–app | 指定要使用的 Celery 应用程序模块。 |
-l或–loglevel | 指定日志级别,例如 DEBUG、INFO、WARNING、ERROR、CRITICAL。 |
-b或–broker | 指定 Celery 使用的 broker(消息中间件),例如 RabbitMQ、Redis 等。 |
-c或–concurrency | 指定 worker 并发执行的任务数量。 |
-S | 指定 worker 在执行任务时,如果超过这个时间没有响应,则认为任务失败。 |
-T | 指定 worker 在执行任务时,如果超过这个时间没有响应,则立即杀死任务。 |
-n | 指定hostname |
例如,要启动上面的任务,并发执行任务数量为 10,日志级别为INFO 的 worker,可以使用以下命令:
celery -A tasks worker -c 10 -l INFO -P threads -n w01291
命令行输出,可以看到任务列表里出现了tasks.add,即上边有装饰器的方法成为了任务
- 任务调用
我们可以在交互式控制台进行调用操作
>>> from tasks import add
>>> res = add.delay(5,5)
>>> res.ready()
True
>>> res.state
'SUCCESS'
>>> res.task_id
'6d067a7e-197e-4fa0-ad24-57e4497c09ad'
>>> res.result
10
可以查看Celery日志
也可以在Redis数据库中看到任务执行后的结果数据
测试发送100个任务
from tasks import add
def test_100_task():
results = {}
for i in range(99):
results[f'res{i}'] = add.delay(1, i)
- 日志
再看Redis,出现了99条结果