Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图 :
1.异步任务
小案例:
--demo.py --------产生app对象
--add_task.py ---------提交任务
--check_add.py -----查看任务进程
1.1demo,py
from celery import Celery
import time
broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2' # 结果存储 redis
app = Celery(__name__, broker=broker, backend=backend)
@app.task #变成celery的任务了
def add(a, b):
print('运算结果是',a + b)
time.sleep(1)
return a + b
1.2 启动worker(worker监听消息队列,等待别人提交任务,如果没有就卡再这)
注意:要将路径切换到demo.py的上一层
celery -A demo worker -l info -P eventlet
1.3 别人提交任务,提交完成会返回一个uuid号,后期使用uuid号查询,至于这个任务有没有被执行,取决于worker有没有启动
add_task.py
from demo import add
res=add.delay(77,66)
1.4.查看任务结果
check_add.py
from demo import app #导入app路径
# celery的包下
from celery.result import AsyncResult
id = '042a8fc1-6b0f-4ad6-bf72-edefa657a52f'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful(): # 正常执行完成
result = a.get() # 任务返回的结果
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
2.包结构celery
以后都这样创建 创建有一个包
-celery_task -------包
-__init__
-celery.py --------必须叫这个名字 放定时任务、里面实例化得到app对象
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2' # 结果存储 redis
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.course_task','celery_task.home_task','celery_task.user_task'])
参数:include------任务的地址
启动worker,以包启动,来到包所在路径下
celery -A 包名 worker -l info -P eventlet
celery -A celery_task worker -l info -P eventlet
3.延时任务
延时任务和异步任务差不多,唯一的不同就是提交任务的不同
异步任务提交
from celery_task.user_task import send_sms
res=send_sms.delay(188888888,2546) #提交任务
print(res)
延时任务的提交
需要你先计算好你需要延时多长时间
小案例:延时10秒
1.还要先去celery,py去配置国际化
# 配置国际化
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
提交任务
from datetime import datetime,timedelta
# print(datetime.utcnow()) #utc时间 与咱们相差8个小时
# print(eta_second(10),'dddddddddd')
# print(timedelta(seconds=10)) #0:0:0:10
time_delay=timedelta(seconds=10)
t_time=datetime.now() #配置国际化后,用now()
eta=time_delay+t_time
from celery_task.user_task import send_sms
res=send_sms.apply_async(kwargs={'mobile': '1896334234', 'code': 8888}, eta=eta)
4.定时任务
4.1在app所在的文件下配置,也就是在celery.py中
- 1 配置
app.conf.beat_schedule = {
'send_sms': { #定时任务的名称,可以随便起
'task': 'celery_task.user_task.send_sms', #定时任务的路径指定函数,send_sms-》函数
'schedule': timedelta(seconds=5), #五秒执行一次
'args': ('1822344343', 8888), #给函数传参,没有可以不传
},
'add_course': {
'task': 'celery_task.course_task.add_course',
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'schedule': crontab(hour=11, minute=38), # 每天11点35,执行
'args': (),
}
}
4.2提交定时任务需要借助于beat
启动worker的路径下启动beat
celery -A 包名 beat -l info
celery -A celery_task beat -l info
启动worker