快速上手分布式异步任务框架Celery

news2024/11/24 6:59:24

一、Celery架构介绍

Celery:芹菜?(跟翻译没有任何关系),分布式异步任务框架(跟其他web框架无关)
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不支持windows)
celery服务为其他项目服务提供异步解决任务需求的。

架构
分为三部分

  • broker:任务中间件,用户提交的任务,存在这个里面(redis,rabbitmq)
  • worker:任务执行者,消费者,真正执行任务的进程(真正干活的人)
  • backend:任务结果存储,任务执行后的结果(redis,rabbitmq)
    在这里插入图片描述
    在这里插入图片描述

celery能够做的事:

  • 异步任务(区分同步任务)
  • 延迟任务
  • 定时任务(其他框架做)

怎么更好的理解celery?
会有两个服务同时运行,一个是项目服务(django服务),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求。打个比方,人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)。正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题,人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求。

注:python有自己的定时任务,感兴趣的了解下apscheduler

二、Celery简单使用

安装:pip install celery==5.1.2

使用:
1.配置celery

from celery import Celery

# app=Celery('test',)

# backend='redis://:密码@127.0.0.1:6379/1'  如果有密码,这么写
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址

# 1 实例化得到celery对象
app = Celery(__name__, backend=backend, broker=broker)

# 2 写一堆任务(计算a+b,挖井,砍树),函数
# 使用装饰器包裹任务(函数)
@app.task()
def add(a, b):
    import time
    time.sleep(2)
    return a + b

2.提交任务

# from celery_task import app
import celery_task

# 1 同步执行
# res = celery_task.add(2, 3)  # 普通的同步任务,同步执行任务
# print(res)

2 异步任务:
第一步:提交(使用任务名.apply_async(参数))
结果是任务id号,唯一标识这个任务

# res = celery_task.add.apply_async(args=[2, 3])
res = celery_task.add.apply_async(kwargs={'a':2,'b':3})
print(res)  # abab1ad3-0e58-4faa-bc05-14d157dc8217

第二步:让worker执行—>结果存到redis
通过命令启动,非windows:
5.x之前这么启动
命令:celery worker -A celery_task -l info
5.x以后
命令:celery -A celery_task worker -l info

windows:

pip3 install eventlet

5.x之前这么启动
命令:celery worker -A celery_task -l info -P eventlet
5.x以后
命令:celery -A celery_task worker -l info -P eventlet

3.查看任务执行结果

from celery_task import app

from celery.result import AsyncResult

id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        print('任务执行成功了')
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

三、Celery包结构

目录结构:

    -celery_task               # 包名
    	__init__.py
    	celery.py             # app所在py文件
    	course_task.py        # 任务
    	order_task.py         # 任务
    	user_task.py          # 任务
    提交任务.py                # 提交任务
    查看结果.py                # 查看结果

创建多个任务:
celery_task /celery.py

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include  是一个列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker,include=[
    'celery_task.course_task',
    'celery_task.order_task',
    'celery_task.user_task',
])

# 原来,任务写在这个py文件中

# 后期任务非常多,可能有用户相关任务,课程相关任务,订单相关任务。。。

celery_task /任务.py
user_task.py

import time
from .celery import app

# 发送短信任务
@app.task()
def send_sms(phone, code):
    time.sleep(3)  # 模拟发送短信延迟
    print('短信发送成功,手机号是:%s,验证码是:%s' % (phone, code))
    return '短信发送成功'

order_task.py

from .celery import app
# 生成订单任务
@app.task()
def make_order():
    with open(r'D:\py18\luffy_api\script\2 celery的包结构\celery_task\order.txt', 'a', encoding='utf-8') as f:
        f.write('生成一条订单\n')
    return True

course_task.py

from .celery import app
@app.task()
def add(a,b):
    return a+b

提交多个任务:

from celery_task import user_task,order_task

# 提交一个发送短信任务
# res = user_task.send_sms.apply_async(args=['18972374345', '8888'])
# print(res)


# 提交一个生成订单任务
# res=order_task.make_order.apply_async()
# print(res)

查看结果:

from celery_task.celery import app
from celery.result import AsyncResult

id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    print(app.conf)
    if a.successful():
        print('任务执行成功了')
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

四、Celery延迟任务

# 添加延迟任务方式一:
# from datetime import datetime, timedelta
# datetime.utcnow()  获取当前的utc时间
# eta=datetime.utcnow() + timedelta(seconds=50) #  50s后的utc时间
# 10s后,发送短信
res=user_task.send_sms.apply_async(args=('12345566677', '8888'), eta=eta)
print(res)

# 使用第二种方式执行异步任务(两者传参不同;不写时间,就表示立即执行):
res=user_task.send_sms.delay('12345566677', '8888')
print(res)

五、Celery定时任务

第一步:celery.py中写入

# 第一步,在包(celery_task)下的celey.py中写入
###修改celery的配置信息    app.conf整个celery的配置信息
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

####配置定时任务
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'send_sms_every_3_seconds': {
        'task': 'celery_task.user_task.send_sms',  # 指定执行的是哪个任务
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('18953675221', '8888'),
    },
    'make_order_every_5_seconds': {
        'task': 'celery_task.order_task.make_order',  # 指定执行的是哪个任务
        'schedule': timedelta(seconds=5),
    },
    'add_every_1_seconds': {
        'task': 'celery_task.course_task.add',  # 指定执行的是哪个任务
        'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 5),
    },
}

第二步:启动worker

# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet

如果beat没有启动,worker是没有活干的,需要启动beat,worker才能干活,和beat启动顺序无先后

第三步:启动beat

# celery beat -A celery_task -l 
celery -A celery_task beat -l info

六、Django中集成Celery

第一种方式使用django-celery(了解):
第三方把django和celery集成起来,方便我们使用,但是,第三方写的包的版本,跟celery和django版本完全对应。

我们自己使用包结构集成到django中:
第一步,把写好的包,直接复制到项目根路径
第二步,在视图类中(函数中)

from celery_task.user_task import send_sms
def test(request):
    mobile = request.GET.get('mobile')
    code = '9999'
    res = send_sms.delay(mobile, code)  # 同步发送假设3分支钟,异步发送,直接就返回id了,是否成功不知道,后期通过id查询
    print(res)
    return HttpResponse(res)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/499879.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【嵌入式系统】课程复习资料整理

【嵌入式系统】课程复习资料整理 一、绪论 1.定义 从技术的角度定义:以应用为中心、以计算机技术为基础、软件硬件可裁剪、对功能、可靠性、成本、体积、功耗严格要求的专用计算机系统。从系统的角度定义:嵌入式系统是设计完成复杂功能的硬件和软件&a…

Android多模块开发

Android多模块开发 1. 建立项目和多个模块 ​ app为主模块 ​ app-setting为功能模块,可作为独立模块运行,也可作为其他模块的资源模块 ​ app-video为功能模块 2. 建立公共环境文件(env.gradle)并在各模块配置 Step1: 建立在根目录下建…

第31步 机器学习分类实战:多轮建模

开始填坑之旅。 首先,之前提过,random_state这个参数,它的功能是确保每次随机抽样所得到的数据都是一样的,有利于数据的复现。比如,我们这十个ML模型,用的参数都是random_state666,这样作比较才…

【写一个hello的html页面,将页面放到服务器,通过浏览器访问页面,这个过程是怎么实现的?】第一个 servlet 程序

第一个 servlet 程序 第一个 servlet 程序1. 创建项目创建好后的 默认目录 解析 2. 引入依赖为什么要引入依赖? 3. 创建目录结构1、在 main 目录下创建一个 webapp 目录2、在 webapp 下创建一个 WEB-INF 目录3、在 WEB-INF 目录下创建一个 web.xml 文件4、web.xml 需…

章节3:02-Apache Commons Collections反序列化漏洞

章节3:02-Apache Commons Collections反序列化漏洞 02-Apache Commons Collections反序列化漏洞 漏洞爆出 2015.01.28 Gabriel Lawrence和Chris Frohoff https://speakerdeck.com/frohoff/appseccali-2015-marshalling-pickles-how-deserializing-objects-can-r…

《Java虚拟机学习》 asmtools 字节码汇编器使用 与 JVM识别方法重载 的思考

1.asmtools下载 链接:https://pan.baidu.com/s/1R3nAaUbN1Dkf6UKkdEMSEA?pwdk8l8 提取码:k8l8 2.结合方法重载实验的使用 总所周知,方法重载跟方法名无关,但对于JVM而言,区别方法主要通过 类名,方法名&…

java spring MVC REST风格概念叙述

REST属于spring MVC中的一个知识点 REST是三个单词的缩写 即 Representational State Transfer 意思为 表现形式状态转换 老实说 不用尝试字面上理解 因为字面意思 确实是比较抽象 其实 意思就是 访问网络资源的格式 转换 下图 对比了 传统风格和REST风格 请求路径的差别 RES…

【英语】大学英语CET考试,阅读部分2(长篇阅读,选词填空,综合演练)

文章目录 1、长篇阅读(连连看,要会做)1.1 解题技巧(定位词扫读,看到大于看懂,一题带练)1.2 做题方法复习总结1.3 题目练习(2篇文章) 2、选词填空(只有5分&…

opencv_c++学习(五)

Mat类数值存储方式 上图为opencv中三通道数据的存储方式,反映到图像上则为空间维度为3*3,通道为3的图像。 Mat类的属性 Mat类的属性如上,在这里我们解释一下step。step是行列数与数据类型的字节数相乘的数据。 Mat类元素读取 在Mat中&…

云原生: istio+dapr构建多运行时服务网格...  多运行时是一个非常新的概念。在

2020 年,Bilgin Ibryam 提出了 Multi-Runtime(多运行时)的理念,对基于 Sidecar 模式的各种产品形态进行了实践总结和理论升华。那到底什么是多运行时呢?首先还是得从分布式应用的四大类基本需求讲起。简单来讲任何分布…

【力扣周赛】第344场周赛

【力扣周赛】第344场周赛 6416:找出不同元素数目差数组题目描述解题思路 6417:频率跟踪器题目描述解题思路 6418:有相同颜色的相邻元素数目题目描述解题思路 6419:使二叉树所有路径值相等的最小代价题目描述解题思路 6416&#xf…

C++ ---- 类和对象(上)

目录 本节目标 常见问题 面向过程和面向对象的理解 什么是类如何定义类 类的引入 类的定义 类的两种定义习惯 类的作用域 类的访问限定符 访问限定符介绍 封装 封装的意义 类的实例化 类对象模型 类对象的存储方式 结构体对齐 计算类对象的大小 this指针 问题…

数青蛙​、[USACO10FEB]Chocolate Giving S

一、1419. 数青蛙 思路 这道题有俩种解法,一是记数,二是贪心 记数: 这是官方的题解 我们用frog_ num来表示现在正在发出蛙鸣声的青蛙数目,用cnt[c] 示已经发出-次有效蛙鸣中的字符c的青蛙个数,比如当cnt[c] 2时表示当前有2只…

[mini LCTF 2023] 西电的部分

感觉比赛还是很不错,就是有点难了,不过都是简单题重复更没意思。作出一道来就有一点收获。 misc1 签到题也不简单,已经很久不作misc了,感觉这东西需要安的东西太多,怕机子累坏了。 一个复合的wav声音文件&#xff0…

【Android入门到项目实战-- 8.5】—— 使用HTTP协议访问网络的实践用法

目录 准备工作 一、创建HttpUtil类 二、调用使用 一个应用程序可能多次使用到网络功能,这样就会大量代码重复,通常情况下我们应该将这些通用的网络操作封装到一个类里,并提供一个静态方法,想要发送网络请求的时候,只…

【c语言】字符串匹配(搜索) | API仿真

c语言系列专栏:c语言之路重点知识整合 字符串知识点:字符串基本概念、存储原理 字符串匹配 目录 一、字符串匹配二、strstr仿真声明:指针方式定义:调用测试:运行结果: 一、字符串匹配 字符串匹配是对一个…

HttpClient连接池使用不当问题分析解决

目录 背景代码实现工具类功能实现模拟使用 问题分析与定位解决方案总结 背景 最近遇到一个HttpClient问题,某个接口一直报404错误。该接口使用HttpClient调用其他服务获取数据,为了提高接口调用性能,利用httpclient池化技术来保证请求的数量…

嵌入式中利用软件实现定时器的两种方法分析

目录 第一:简介 第二:链表实现方式 第三:结构体实现方式 第一:简介 在一般的嵌入式产品设计中,介于成本、功耗等,所选型的MCU基本都是资源受限的,而里面的定时器的数量更是有限。在我们软件…

Origin如何绘制基础图形?

文章目录 0.引言1.绘图操作2.图形设置3.图形标注 0.引言 因科研等多场景需要绘制专业的图表,笔者对Origin进行了学习,本文通过《Origin 2022科学绘图与数据》及其配套素材结合网上相关资料进行学习笔记总结,本文对绘制基础图形进行阐述。 1.…

2023.5.7 第五十二次周报

目录 前言 文献阅读:基于BO-EMD-LSTM模型预测教室长期二氧化碳浓度 背景 思路 BO-EMD-LSTM 混合模型 EMD 算法 与其他模型的比较 结论 论文代码 总结 前言 This week, I studied an article that uses LSTM to predict gas concentration.This study wa…