Celery分布式异步框架

news2024/12/28 3:21:28

Celery异步任务框架

"""

1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)

2)celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务 正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题 人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

"""

 Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

两种celery任务结构:提倡用包管理,结构更清晰 

# 如果 Celery对象:Celery(...) 是放在一个模块下的 # 1)终端切换到该模块所在文件夹位置:scripts # 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info # 注:模块名随意 # 如果 Celery对象:Celery(...) 是放在一个包下的 # 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中 # 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info # 注:包名随意

Celery执行异步任务

包架构封装

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果

基本使用

celery.py
# celery不支持win,所以想再win上运行,需要额外安装eventlet
windows系统需要eventlet支持:pip3 install eventlet
Linux与MacOS直接执行:
	3.x,4.x版本:celery worker -A demo -l info
    5.x版本:     celery -A demo worker -l info -P eventlet
tasks.py
from .celery import app
import time
@app.task
def add(n, m):
    print(n)
    print(m)
    time.sleep(10)
    print('n+m的结果:%s' % (n + m))
    return n + m

@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的结果:%s' % (n - m))
    return n - m
add_task.py
from celery_task import tasks

# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)


# 添加延迟任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
tasks.low.apply_async(args=(200, 50), eta=eta)
get_result.py
from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

高级使用

celery.py
# 1)创建app + 任务

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info

# 4)获取结果


from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])


# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'low-task': {
        'task': 'celery_task.tasks.low',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (300, 150),
    }
}
tasks.py
from .celery import app

import time
@app.task
def add(n, m):
    print(n)
    print(m)
    time.sleep(10)
    print('n+m的结果:%s' % (n + m))
    return n + m


@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的结果:%s' % (n - m))
    return n - m
get_result.py
from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

django中使用

celery.py
"""
celery框架django项目工作流程
1)加载django配置环境
2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
4)完成提供的任务的定时配置app.conf.beat_schedule
5)启动celery服务,运行worker,执行任务
6)启动beat服务,运行beat,添加任务

重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
"""

# 一、加载django配置环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")

# 二、加载celery配置环境
from celery import Celery
# broker
broker = 'redis://127.0.0.1:6379/0'
# backend
backend = 'redis://127.0.0.1:6379/1'
# worker
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])


# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'update-banner-list': {
        'task': 'celery_task.tasks.update_banner_list',
        'schedule': timedelta(seconds=10),
        'args': (),
    }
}
tasks.py
from .celery import app

from django.core.cache import cache
from home import models, serializers
from django.conf import settings
@app.task
def update_banner_list():
    queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]
    banner_list = serializers.BannerSerializer(queryset, many=True).data
    # 拿不到request对象,所以头像的连接base_url要自己组装
    for banner in banner_list:
        banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']

    cache.set('banner_list', banner_list, 86400)
    return True

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/706267.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android 自定义View和事件分派 图解

Android 自定义View和事件分派 图解_猎羽的博客-CSDN博客https://blog.csdn.net/feather_wch/article/details/131487012

并查集和LRUCache

目录 1. 并查集 1.1原理 1.2实现 1.3应用 1.3.1省份数量 1.3.2等式方程的可满足性 2.LRUCache 1.概念 2.实现 3.JDK中类似LRUCahe的数据结构LinkedHashMap 4.LRU Cache的OJ 1. 并查集 1.1原理 把不同的元素划分到不想交的集合.开始时,每个元素自成一个单元集合,然后…

OSGI-Bundle:概念和入门

OSGI(Open Service gateway initactive)是java动态化模块系统的一系列规范。即一个系统应用上可以有很多可插拔的小应用,整个应用能运行和协调,小应用之间也可以相互交互完成业务需求。 Bundle: bundle 是以 jar 包形式存在的一个模块化物理单元&#x…

Ceph:关于 Ceph 用户认证授权管理的一些笔记

写在前面 准备考试,整理 Ceph 相关笔记博文内容涉及, Ceph 用户管理,认证管理,权限管理 以及相关 Demo理解不足小伙伴帮忙指正 对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意&…

antdesginVue a-date-picker(日期时间选择器)禁用当前时间之前的时间,包含时分秒

antdesginVue a-date-picker(日期时间选择器)禁用当前时间之前的时间&#xff0c;包含时分秒 话不多说直接上效果 <a-form-item label"发生时间" name"start_time"><a-date-pickerstyle"width: 100%"allowClearv-model:value"f…

C++模板进阶知识

文章目录 前言模板进阶1.非类型模板参数2.模板的特化2.1概念2.2函数模板特化2.3类模板特化2.3.1 全特化2.3.2 偏特化2.3.3 类模板特化应用示例 3.模板的分离编译3.1 什么是分离编译3.2 模板的分离编译3.3 解决方法 4 模板总结 后记 前言 之前我们讲过模板初阶的知识&#xff0…

Linux 解决root用户被限制连接服务器

Linux 解决root用户被限制连接服务器 1. 问题描述2. 解决问题2.1 方式一&#xff08;忘记root密码的情况&#xff09;2.2 方式二&#xff08;知道root密码的情况&#xff09; 3. 其他 1. 问题描述 使用 root 用户不能链接服务器&#xff0c;密码对&#xff0c;就是连接不上&am…

uniapp:分享一个自定义侧滑样例

首先看html,分为两部分&#xff0c;主体内容部分和功能部分&#xff0c;功能部分在css中定位到主体部分的右边 <view class"section" ref"box_center" touchstart"drawStart" touchmove"drawMove($event)"touchend"drawEnd($…

晨控智能UWB室内定位:工厂智能化的新引擎

晨控智能UWB室内定位&#xff1a;工厂智能化的新引擎 工厂是一个复杂而庞大的环境&#xff0c;通常包括多个车间、设备、人员以及大量的物料和产品。需要实时、准确的定位数据来支持各项运营活动。然而&#xff0c;传统的定位技术无法满足工厂内部的高精度定位需求。而UWB室内…

u-boot的烧写及使用,u-boot-2013.01的移植 6.30

1.将Linux的执行文件放到板子上运行 嵌入式系统 1.嵌入式系统 定制2.硬件&#xff1a;核心芯片底板软件&#xff1a;驱动应用 驱动系统应用&#xff08;并发&#xff0c;网络&#xff0c;文件。。。&#xff09;3.系统&#xff1a;linux 开源 模块化 支持芯片众多 功能…

针对字符串输入之间有空格的问题相关的问题

先说结论&#xff1a; bool flag true;while (cin >> s) {if (flag) {flag false;cout << s.size();} else {cout << , << s.size();}} 即用while&#xff08;cin>>s&#xff09;来输入&#xff0c;一段单词一段单词的来做&#xff08;遇到ci…

第十一章 原理篇:transformer模型入门

说在前面的话&#xff1a; 找工作面试不是特别顺利。进了目标公司的二面&#xff0c;但是一面面试官问的一些“新技术”问题答得不太好&#xff0c;尤其是transformer相关的。这一点确实是自己的问题&#xff0c;在工作后总是面向业务学习&#xff0c;对很多算法都是处于“听说…

AD从原理图到PCB超详细教程

AD超详细教程 前言一、建立一个工程模板二、原理图1.设计原理图。2.使用AD自带库和网上开源原理图库3.画原理图库4.编译原理图 三、PCB1.确定元器件尺寸大小2.绘制PCB Library①使用元器件向导绘制元件库②原理图与PCB的映射 3.绘制PCB①更新PCB②调整元件位置③布线④漏线检查…

库操作和表操作(数据库系列2)

目录 前言&#xff1a; 1.数据库的操作 1.1显示当前的数据库 1.2创建数据库 1.3使用数据库 1.4删除数据库 2.常用数据类型 2.1数值类型 2.2字符串类型 2.3日期类型 3.表的操作 3.1查看表结构 3.2创建表 3.3查看表 3.4删除表 结束语&#xff1a; 前言&#xff1…

【硬件自动化测试--测试软件的设计及实现】如何设计并实现!

今天来聊聊关于硬件方向的自动化软件设计及实现,后面我会用实例来让我们更加深入的了解硬件自动化,首先开发工具选择的是python语言,为啥选择python语言呢,因为他的语法比较简洁,外置库非常多,反正就是对于做自动化方面很实用就对了。 1.硬件自动化测试大致分为三个阶段实…

拓展:IDEA如何使用不同版本的JDK?(改了还报错很可能因为没改全,以mac为例)

以下面的案例为例 Enhanced ‘switch’ blocks are not supported at language level ‘8’ 后面知道是因为Spring的版本和JDK的版本不对应&#xff0c;结果网上找到的解决方案都很简单。下载了一个新版本的JDK&#xff0c;然后IDEA里面Project Structure的Project标签里把SDK给…

ubuntu的aarch64版本上安装anaconda

ubuntu的aarch64版本上安装anaconda 问题背景&#xff1a;今天在基于docker安装的ubuntu18-04的版本上想要安装anaconda&#xff0c;但是出现了问题&#xff0c;发现ubuntu的版本18-04对应的是aarch64&#xff0c;因此记录安装方法。 首先下载安装包没问题但是&#xff0c;在具…

机器学习复习7

机器学习复习7 1 - 根据下图中绘制的决策树&#xff0c;如果一个动物的耳朵是软的&#xff0c;脸型是圆的&#xff0c;并且有胡须&#xff0c;那么这个模型会预测它是猫还是不是猫&#xff1f; A. 不是猫 B. 是猫 **答案&#xff1a;B ** 2 - 以一棵决策树学习来对垃圾邮件和非…

spring boot启动原理分析

springboot启动类中有两个关键的地方 1.SpringBootApplication注解 2.SpringApplication.run方法 SpringBootApplication注解分析 SpringBootApplication注解由三大注解构成&#xff0c; SpringBootConfiguration、EnableAutoConfiguration、ComponentScan。 SpringBootCon…

Java-八股文-基础本部分<一>

Java基础部分 基础篇<一> Java基础部分 基础篇<二> Java基础部分 基础篇<三> Java基础部分 异常篇 Java基础部分 集合篇 Java基础部分 线程篇 ❤️ &#x1f9e1; &#x1f49b; &#x1f49a; &#x1f499; &#x1f49c;&#x1f5a4; &#x1f90d;…