celery异步框架的使用

news2025/1/15 7:44:26

文章目录

  • celery的介绍
  • celery的架构
  • celery的快速使用
  • celery 包结构
  • celery 定时 异步 延迟任务
  • django使用celery

celery的介绍

celery是什么?
-翻译过来是芹菜
-官网:https://docs.celeryq.dev/en/stable/
-吉祥物:芹菜
-分布式的异步任务框架
- 分布式:一个任务,拆成多个任务在不同机器上做
- 异步任务:后台执行,不阻塞主任务
- 框架:集成到项目中

celery主要的几个功能

1 异步任务
异步发邮件,短信,通知
\
2 延迟任务
延迟 几秒 再执行某个任务
订单提交后,延迟半小时,把订单取消
\
3 定时任务
-每隔多长事件 执行某个任务
- 定时更新缓存

celery的架构

django和celery是一个服务
在这里插入图片描述

celery的三个模块

1 broker:消息中间件,消息队列,任务中间件
-存储任务(函数):发送短信任务,统计在线人数。。。
-redis,reabbitmq 存储
-字符串形式,能把任务表示出来即可
函数名,函数参数,函数位置

2 worker:任务执行单元
-从消息队列[broker–》redis]—》取出任务执行—>程序(进程)

3 backend:结果存储 Result Stores
-任务执行完成后的结果存储在这里
-redis存储,关系型数据库。。


执行流程

1 其他程序—》提交任务(函数)—》任务序列化后存到celery的broker中

2 接下来:worker执行—》从broker中取任务–》执行

3 任务执行完后,把结果存到 bancked中

celery和其他程序是 独立运行的
在这里插入图片描述

    """
    1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
    2)celery服务为为其他项目服务提供异步解决任务需求的
    注:会有两个服务同时运行,一个是项目服务(django),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

    人(django)是一个独立运行的服务 | 医院(celery)也是一个独立运行的服务
        正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
        人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
    """

celery的快速使用

pip celery install

#  可以把这个文件封装成一个包 以后直接调用即可
from celery import Celery
import time

# 提交任务的地址 注意 必须是单引号  不然会报错
broker = 'redis://127.0.0.1:6379/1'

# 完成任务之后的结果存储地址
backend = 'redis://127.0.0.1:6379/2'

# add是任务名字  broker提交任务地址  任务完成地址
# 写任务---》写函数---》必须用 @app.task 装饰---》装饰后,就变成了celery的任务了
app = Celery('app',broker=broker,backend=backend)
# 自己设置任务
@app.task
def add():
    time.sleep(2)
    return 'hello'


@ap.task
def add1(a,b):
    time.sleep(2)
    return a+b

# 在调用的文件 
from demo import add,add1

# 同步执行 只需要加上括号即可 直接等待之后得到结果
# res = add()
# print(res)

# 异步执行 提交上去得到一个任务id号
res = add.delay()
print(res)

# 如果有参数 直接传即可
res1 = add1.delay(3,4)
print(res1)

# 启动执行任务命令
如果是win系统 需要在安装一个模块 pip3 install eventlet

###win运行:
pip3 install eventlet
celery -A 是指定命令 deno 是执行函数的文件名记得切换路径 worker -l info 指定级别
celery -A demo worker -l info -P eventlet

任务执行完成这个 会阻塞在哪里等待新任务的提交

####非win运行:mac linux
celery -A demo  worker -l info


# 查询结果 可以查询执行状态和结果
from demo import app
from celery.result import AsyncResult
id = '5a7af383-6a4a-456e-b33b-43d5618f1208'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get() # hello world
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

celery 包结构

-celery_task
	-celery.py
    -user_task.py
    -order_task.py
    -goods_task.py
    
包下必须有一个celery的文件 所有任务一定要注册到 include中
from celery import Celery
#####1 实例化得到对象#######
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task'])
#######2 写任务 ##########以后各种类型任务,单独写在py文件中

-其他程序中提交任务
add_task_package.py
-其他程序中查询结果
get_result_package.py

启动 workera切换路径到路径上一级启动 比如 cd到celery_task的上一级即可 包名文件

celery 定时 异步 延迟任务

from celery_task.user_task import send
from celery_task.order_task import order_of

# 异步任务 任务函数.delay(参数)
res = send.delay('15800089521','3216')
print(res)

# 延迟任务 
from datetime import datetime, timedelta

# # 当前时间 加 上 自己设置的时间 后面需要自己修改一下配置文件 当前时间是utc时间
eta = datetime.utcnow() + timedelta(seconds=20) # minutes=3 可以修改时间单位

# # args指定参数 eta指定时间延迟的时间
res = order_of.apply_async(args=['100860'], eta=eta)
print(res)

# 定时任务 单独写一个文件

# 定时任务
# 定时任务--》一定要启动beat
# 在celery.py 中写

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    'send_sms': {
        'task': 'celery_task.user_task.send',
        'schedule': timedelta(seconds=8), # 每隔多久发一次
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('1896388888', '6666'),  # 执行任务的函数 需要几个参数传几个
    }
    # 'cancle_order': {
    #     'task': 'celery_task.order_task.cancel_order',
    #     # 'schedule': timedelta(minutes=30),
    #     'schedule': crontab(hour=11, day_of_week=1, minute=21),  # 每周一早八点
    #     'args': ('9999999',),
    # },
}

# 启动beat---》每隔一段时间,就提交任务
# celery -A celery_task beat  -l info

# 启动worker
# celery -A celery_task worker -l info -P eventlet
# 两个都要启动 一个是执行任务 一个是提交任务 就不需要我们手动启了

django使用celery

# celery中如果用到django的配置文件 必须加上一句话

from celery import Celery

import os
# 异步任务使用到django的配置 必须要添加这一句加载配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')

# 提交任务的地址 注意 必须是单引号  不然会报错
broker = 'redis://127.0.0.1:6379/1'

# 完成任务之后的结果存储地址
backend = 'redis://127.0.0.1:6379/2'

# add是任务名字  broker提交任务地址  任务完成地址
app = Celery('app',broker=broker,backend=backend,include=['celery_task.order_task','celery_task.user_task'])


# 任务函数
from .celery import app
from libs.sms_sen import common_send_sms

@app.task
def send(phone, code):
    common_send_sms(mobile=phone,code=code).delay()  # 掉用封装好的短信
    return '手机号:%s,发送验证码:%s,成功' % (phone, code)



# 视图类中

# 使用celery异步发送短信
class Celery_send_sms(ViewSet):

    def create(self,request):
        mobile = request.data.get('mobile')
        code = '8888'
        res = send.delay(phone=mobile,code=code)
        print(res)
        return APIResponse('手机号%s短信发送成功' % mobile)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1448308.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络概述习题拾遗

学习目标: 自下而上第一个提供端到端服务的层次 路由器、交换机、集线器实现的功能层 TCP/IP体系结构的网络接口层对应OSI体系结构的哪两个层次 分组数量对总时延的影响 如果这篇文章对您有帮助,麻烦点赞关注支持一下动力猿吧! 学习内容…

《剑指 Offer》专项突破版 - 面试题 44 : 二叉树中每层的最大值(两种方法 + C++ 实现)

目录 前言 一、只用一个队列 二、使用两个队列 前言 题目链接:LCR 044. 在每个树行中找最大值 - 力扣(LeetCode) 题目: 输入一棵二叉树,请找出二叉树中每层的最大值。例如,输入下图中的二叉树&#x…

VueCLI核心知识综合案例TodoList

目录 1 拿到一个功能模块首先需要拆分组件: 2 使用组件实现静态页面的效果 3 分析数据保存在哪个组件 4 实现添加数据 5 实现复选框勾选 6 实现数据的删除 7 实现底部组件中数据的统计 8 实现勾选全部的小复选框来实现大复选框的勾选 9 实现勾选大复选框来…

RIDERS: Radar-Infrared Depth Estimation for Robust Sensing

RIDERS: 恶劣天气及环境下鲁棒的密集深度估计 论文链接:https://arxiv.org/pdf/2402.02067.pdf 作者单位:浙江大学, 慕尼黑工业大学 代码链接:https://github.com/MMOCKING/RIDERS 1. 摘要(Abstract) 恶劣的天气条件, …

Spring Boot 笔记 019 创建接口_文件上传

1.1 创建阿里OSS bucket OSS Java SDK 兼容性和示例代码_对象存储(OSS)-阿里云帮助中心 (aliyun.com) 1.2 编写工具类 package com.geji.utils;import com.aliyun.oss.ClientException; import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun…

Waymo数据集下载与使用

在撰写论文时,接触到一个自动驾驶数据集Waymo Dataset 论文链接为:https://arxiv.org/abs/1912.04838v7 项目链接为:https://github.com/waymo-research/waymo-open-dataset 数据集链接为:https://waymo.com/open waymo提供了两种…

K8sGPT 的使用

K8sGPT 介绍 k8sgpt 是一个扫描 Kubernetes 集群、诊断和分类问题的工具。它将 SRE 经验编入其分析器中,并帮助提取最相关的信息,通过人工智能来丰富它。它还可以与 OpenAI、Azure、Cohere、Amazon Bedrock 和本地模型结合使用。 K8sGPT Github 地址 …

助眠神器小程序源码|白噪音|小睡眠|微信小程序前后端开源

安装要求和说明后端程序运行环境:NginxPHP7.4MySQL5.6 PHP程序扩展安装:sg11 网站运行目录设置为:public 伪静态规则选择:thinkphp 数据库修改文件路径:/config/database.php需要配置后端的小程序配置文件,…

Java中锁的应用

文章目录 前言一、场景描述二、加锁1.synchronized2.ReentrantLock 三、扩展1.ThreadLocal 总结 前言 在多线程场景下,多个线程同时对共享变量进行操作是存在风险的,这时候就需要加锁来保证数据的正确性。 一、场景描述 我这里有5个无人机,准备卖到乌克…

Spring Boot 笔记 009 创建接口_更新用户基本信息

1.1.1 给User实体类添加校验 package com.geji.pojo;import com.fasterxml.jackson.annotation.JsonIgnore; import jakarta.validation.constraints.Email; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import jakarta…

【开源图床】使用Typora+PicGo+Gitee搭建个人博客图床

准备工作: 首先电脑得提前完成安装如下: 1. nodejs环境(node ,npm):【安装指南】nodejs下载、安装与配置详细教程 2. Picgo:【安装指南】图床神器之Picgo下载、安装与配置详细教程 3. Typora:【安装指南】markdown神器之Typora下载、安装与无限使用详细教…

Linux网络基础1

目录 计算机网络背景协议OSI七层模型TCP/IP五层(四层)模型网络传输基本流程以太网通信原理IP地址理解 计算机网络背景 到目前为止,我们之前所有的编程都是单机的,不是多机互联。以前计算机被发明的时候是为了军事用途&#xff0…

x86汇编通用寄存器用途一览

文章目录 写在前面通用寄存器参考资料 写在前面 intel官方文档链接:Intel64和IA-32架构软件开发者手册 具体在Combined Volume Set of Intel 64 and IA-32 Architectures Software Developer’s Manuals这本手册 (五千页我的天。。。) 不想…

FastAI 之书(面向程序员的 FastAI)(二)

原文:www.bookstack.cn/read/th-fastai-book 译者:飞龙 协议:CC BY-NC-SA 4.0 第三章:数据伦理 原文:www.bookstack.cn/read/th-fastai-book/9bc6d15b4440b85d.md 译者:飞龙 协议:CC BY-NC-SA 4…

中小学信息学奥赛CSP-J认证 CCF非专业级别软件能力认证-入门组初赛模拟题第二套(选择题)

CSP-J入门组初赛模拟题二 1、在计算机内部用来传送、存贮、加工处理的数册或指令都是以()形式进行的 A、二进制 B、八进制 C、十进制 D、智能拼音 答案:A 考点分析:主要考查小朋友们计算机相关知识,在计算机中都是采用二进制运算&#…

初识webpack(二)解析resolve、插件plugins、dev-server

目录 (一)webpack的解析(resolve) 1.resovle.alias 2.resolve.extensions 3.resolve.mainFiles (二) plugin插件 1.CleanWebpackPlugin 2.HtmlWebpackPlugin 3.DefinePlugin (三)webpack-dev-server 1.开启本地服务器 2.HMR模块热替换 3.devServer的更多配置项 (…

面试经典150题——串联所有单词的子串(困难)

"Opportunities dont happen, you create them." ​ - Chris Grosser 1. 题目描述 2. 题目分析与解析 2.1 思路一——暴力求解 遇见这种可能刚开始没什么思路的问题,先试着按照人的思维来求解该题目。对于一个人来讲,我想要找到 s 字符串中…

Java的Cloneable接口和深拷贝

Java 中内置了一些很有用的接口, Clonable 就是其中之一。 Object 类中存在一个 clone 方法, 调用这个方法可以创建一个对象的 "拷贝"。 但是要想合法调用 clone 方法, 必须要先实现 Clonable 接口, 否则就会抛出 CloneNotSupportedException 异常。 浅拷贝&#xff…

数据结构-并查集

并查集原理 在一些应用问题中,需要将n个不同的元素划分成一些不相交的集合。开始时,每个元素自成一个 单元素集合,然后按一定的规律将归于同一组元素的集合合并。在此过程中要反复用到查询一 个元素归属于那个集合的运算。适合于描述这类…

cool Nodejs后端框架 如何快速入门 写一个接口

1.cool 框架 js前端开发者 想自己写后端接口 快速入门的就是node.js 了 可以用这个框架自己做一些东西 或者实现前后端的开发 2.目录结构 这个基本上 就是cool 框架的项目结构 主要是 这个src 中的modules 文件夹 这个文件夹 主要是一些接口模块 比如 business 中 相当于…