2 Celery介绍
2.1 Celery是什么
# 1 celery 是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务
-现在干一堆活,如果一个人,需要一件件来做
-招了几个人,分别安排不同人干活
-并发效果--》同时好多人在干活
# 2 celery 是一个专注于实时处理的任务队列,支持任务调度
# 3 celery 是开源的,有很多的使用者
# 4 celery 完全基于 Python 语言编写
# 5 celery 本质上是一个【分布式的异步任务调度框架】,类似于 Apache 的 airflow
-分布式:可以运行在不同的计算机节点上
-异步任务:同时干好多事
-框架
# 6 celery 只是用来调度任务的,但它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的。因此要使用 celery 的话,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等等。官方推荐的是消息队列 RabbitMQ,我们使用 Redis
同步调用函数---》add--》执行5s钟---》数据返回了
异步调用函数---》add---》执行了5s钟--》执行完的数据,找个地方存着
调用方--》去存的地方看一下--》任务有没有执行完
消息队列:Rabbitmq,Kafka
redis:咱们用redis
# 总结:
-celery 分布式异步任务框架--》实现异步
-需要有redis支持,这个框架才能用
2.2 Celery使用场景
# 1 异步任务(函数)
-一些耗时操作交给celery执行
-视频转码,邮件发送,消息推送
# 2 定时任务
-定时推送消息,定时爬取一些数据,定时统计一些数据等
# 3 延迟任务
-提交任务后,等一段时间再执行任务
2.3 Celery官网
# 1 开源软件
https://github.com/celery/celery
celery:芹菜,吉祥物
# 2 文档地址
https://docs.celeryq.dev/en/stable/
# 3 最新版本
5.3.6版本 (小版更新bug)
5.3版本
软件版本命名规范:
# 3 是大版本 跟 python 2有区别,有大更新
# 3.6 版本到的代码基本一样
# 3.6.1 3.6.2区别
3.6.1上有bug,更新改正bug,再发版就叫 3.6.3
python 3.6.8
python 3.6.11
python 3.9.1
# 4 python的开源框架--》支持哪个版本及以上的python解释器
5.3 ---》Python ❨3.8, 3.9, 3.10, 3.11
# 5 集成到django中
# 支持django 2.2.x 及以上 django 3.x 4.x 5.x 都可以
# django低于 2.2 --》5.2.x
# django 1 使用celery 4.x
Celery 5.3.x supports Django 2.2 LTS or newer versions. Please use Celery 5.2.x for versions older than Django 2.2 or Celery 4.4.x if your Django version is older than 1.11.
# 6 咱们讲课:
-python:3.9.13 python 3.12
-Django:3.2.22 版本
-celery:5.3
2.4 Celery架构
# Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成:
# Producer:它负责把任务(发送短信任务,爬虫任务)提交得到broker中
# celery Beat:会读取文件--》周期性的向broker中提交任务
# broker:消息中间件,放任务的地方,celery本身不提供,借助于redis --》rabbitmq
# worker:工人,消费者,负责从消息中间件中取出任务--》执行
# backend :worker执行完,会有结果,结果存储在backend,celery不提供--》借助于 redis
3 Celery快速使用
3.1 安装
# 1 创建python项目
-pycharm创建
# 2 创建虚拟环境--》解释器中,目前没有任何模块--》后续咱们需要装各种模块
-pycharm创建
# 3 安装celery
pip install celery
# 4 安装redis
pip install redis
# 5 如果win平台,需要装
pip install eventlet # celery 是个小组织,它不支持win,借助于eventlet,可以运行在win上
3.3 redis安装和配置
#redis 安装和配置
Redis-x64-5.0.14.1.msi
# redis安装包--》一路下一步安装完--》把redis装在你的电脑上了
# mysql 一样的东西,存数据的地址
resp-2022.1.0.0.exe
# 客户端安装包--》一路下一步
# 使用这个软件--》链接redis,图形化界面查看redis的数据
# navicate一样的的东西
# 启动redis
-在服务中找到redis服务,启动
# 链接redis操作
-方式一(咱们不用):cmd中敲 redis-cli 链接 # mysql cmd 链接差不多
-方式二:如下图
3.2 使用
1-编写celery_demo.py
import time
from celery import Celery
# broker:消息中间件,使用redis
broker = 'redis://127.0.0.1:6379/1'
# backend:结果存储,使用redis
backend = 'redis://127.0.0.1:6379/2'
app = Celery('demo', broker=broker, backend=backend)
# 编写任务
@app.task # 被装饰器装饰了,才是celery的任务
def add(a, b):
print('a+b的结束是:', a + b)
time.sleep(1) # 模拟任务耗时
return a + b
2-提交任务–add_task.py
# 这个程序用来提交任务 producer
from celery_demo import add
# 1 同步调用
# res=add(7,8)
# print(res)
# 2 异步--》使用delay来提交任务
# 这句话向消息队列中提交了一个任务,计算 7+8的任务,但是这个任务没执行 3d288ffa-e9ef-4816-8903-1eccd68b0d68
res = add.delay(45, 8) # 没有1s耗时,直接返回,但是没有返回 15,而是返回了一个 uuid号
print(res)
# 使用命令,启动worker--》worker会执行被提交的任务--》执行完后,会把结果存到redis的2库中
# 项目路径下执行:
# win: celery -A celery_demo worker -l info -P eventlet
# mac linux: celery -A celery_demo worker -l info
3-redis中可以看到被提交的任务-没执行
3-启动worker执行任务
# win: celery -A celery_demo worker -l info -P eventlet
celery -A 有app的py文件的名字 worker -l info -P eventlet
# mac linux: celery -A celery_demo worker -l info
4-任务被执行了–redis中看到的
4 Celery包结构
# 后期随着项目越来越大---》task任务越来越多,都写在一个py文件中,就不好了,希望把任务拆分到多个py文件中
# 需要使用包结构来管理
项目名
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── crawl_task.py # 所有任务函数
└── order_task.py # 所有任务函数
└── user_task.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
4.1 创建包:celery_task
celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 任务分到不同的py文件中了
app = Celery('demo', broker=broker, backend=backend,
include=['celery_task.crawl_task', 'celery_task.order_task', 'celery_task.user_task'])
**crawl_task.py **
import time
from .celery import app
@app.task
def crawl_baidu():
print('====开始爬百度====')
time.sleep(2)
print('====爬完百度====')
return '成功爬取!!'
@app.task
def crawl_cnblogs():
print('====开始爬cnblogs====')
time.sleep(2)
print('====爬完cnblogs====')
return '成功爬取cnblogs!!'
order_task.py
import time
from .celery import app
@app.task
def pay_order():
print('====开始下单====')
time.sleep(5)
print('====下单完成====')
return '成功下单!!'
**user_task.py **
import time
# . 表示当前路径
from .celery import app
@app.task
def send_email(to='xxx@qq.com'):
print('====邮件开始发送====')
time.sleep(3)
print('====邮件发送完成====')
return '向%s发送邮件成功!!' % to
4.2 启动别的程序–提交任务–add_task.py
from celery_task.crawl_task import crawl_baidu
res = crawl_baidu.delay() # crawl_baidu没有参数,这里就不传
print(res) # uuid--》后续通过uuid查询结果是否执行完成 7a789046-4954-4918-b93e-2defc76adc02
4.3 提交的任务,没执行,放在redis中,可以去redis中看
4.4 启动worker执行任务
# 在包这一层执行,以包名运行
celery -A celery_task worker -l info -P eventlet
# worker就会执行咱们提交的任务,执行完,会把结果放在redis中
# 手动去redis中看
4.5 使用代码查看结果get_result.py
# 1 拿到app
from celery_task.celery import app
from celery.result import AsyncResult
# 2 知道我们要查的任务 uuid
id = '7d396575-bfb7-45a5-8a0b-3a4767cde886'
if __name__ == '__main__':
result = AsyncResult(id=id, app=app)
if result.successful():
result = result.get()
print(result)
elif result.failed():
print('任务失败')
elif result.status == 'PENDING':
print('任务等待中被执行')
elif result.status == 'RETRY':
print('任务异常后正在重试')
elif result.status == 'STARTED':
print('任务已经开始被执行')
5 异步任务–延迟任务–定时任务
5.1 异步任务
# 咱们刚刚讲的就是在做异步任务
res = 任务.delay(10, 20)
5.2 延迟任务
# 延迟5s钟给 xxx@qq.com 发邮件
#######提交延迟任务 延迟5s钟给 xxx@qq.com 发邮件
# 创建一个5s时间对象
from celery_task.user_task import send_email
from datetime import datetime,timedelta
# datetime.utcnow() 默认时区用的utc时间,咱们也需要使用utc时间
# 后期我们会换成上海时间
# utc时间5s后的时间
eta=datetime.utcnow() + timedelta(seconds=5)
res=send_email.apply_async(args=['xxx@qq.com'],eta=eta)
print(res)
# 启动worker
celery -A celery_task worker -l info -P eventlet
# worker收到后,不会立即执行,等5s后执行
# 问题
如果延迟任务提交了,但是worker没启动,等10s后,worker才启动,worker也会执行任务,但是是立即执行
# 后期worker会一直在运行,没有任务就阻塞,一般不会让它停掉,咱们这个问题一般不会出现
# 加入只有一个worker,进来俩任务,串行执行,可以在多个机器上启动多个worker
5.3 定时任务
# 每隔5s钟,爬一次百度
#1 celery.py中加入
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
app.conf.beat_schedule = {
'low-task': {
'task': 'celery_task.crawl_task.crawl_baidu',
'schedule': timedelta(seconds=5),
'args': (), # crawl_baidu 没有参数,所以把不传
}
}
# 2 启动worker
celery -A celery_task worker -l debug -P eventlet
# 3 启动beat
celery -A celery_task beat -l debug
# 每隔5s,beat负责向broker提交一个任务--》worker执行任务