Celery 分布式任务队列

news2024/11/15 11:09:33

1. 认识 Celery

Celery 是一个 基于 Python 开发的分布式异步消息任务队列,可以实现任务异步处理,制定定时任务等。

  • 异步消息队列:执行异步任务时,会返回一个任务 ID 给你,过一段时间后拿着任务 ID 去取执行结果
  • 定时任务:类似于 Windows / Linux 上的定时任务,到点执行任务

Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用 rabbitMQRedis(默认采用 RabbitMQ)

优点:

  • 简单易用
  • 高可用:即使执行失败或执行过程发生中断,也会尝试再次执行
  • 快速:一个单进程的 Celery 每分钟可以执行上百万个任务
  • 拓展性强:Celery 的各个组件都可以拓展和自定制

Celery 构成

Celery 主要模块:

  • 任务模块 Task:异步和定时任务
  • 消息中间件 Broker:即任务调度队列,接收生产者发来的任务,将任务存入队列。Celery 本身不提供队列服务,官方推荐 RabbitMQ 或 Redis 等
  • 任务执行单元 Worker:处理任务,实时监控消息队列,获取队列中调度的任务,并执行它。
  • 结果存储 Backend:存储任务执行结果,以便查询,与中间件一样,也可以使用 RabbitMQ、Redis 或 MongoDB 存储

2. 异步任务

实现异步任务步骤:

  • 创建一个 Celery 实例
  • 启动 Celery Worker
  • 应用程序调用异步任务

1、安装

pip3 install 'celery[redis]'
pip3 install celery

2、创建 Celery 实例

C1/tasks.py

# -*- coding: utf-8 -*-
 
import time
from celery import Celery
 
broker = 'redis://127.0.0.1:6379'		# 消息中间件
backend = 'redis://127.0.0.1:6379/0'	# backend ,存储结果
 
app = Celery('my_task', broker=broker, backend=backend)		# 创建实例
 
# 创建一个任务,5s 后执行
@app.task(name='tasks.add')
def add(x, y):
    time.sleep(5) 	# 模拟耗时操作
    return x + y

3、启动 Celery Worker

打开 Ubuntu 终端,输入:celery worker -A C1.tasks --loglevel=info,看到如下图就表示启动成功了:

参数:

  • A:指定实例所在位置
  • --loglevel:指定日志级别,有:warning、debug、info、error、fatal ,默认 warning

4、调用任务

另起一个终端,进入 Python 环境,执行任务:

# Celery 提供两种方法来调用任务,delay() 或 apply_async() 方法
python3
>>> from tasks import add
>>> add.delay(6, 8)		# 调用任务,并返回一个任务 ID
<AsyncResult: 194e99af-d0bd-481b-a500-433ec19117e4>

判断任务是否完成:

>>> result = add.delay(6, 8)
>>> result.ready()			# True 表示已完成
True

获取任务结果:

>>> result.get()
14

踩坑:在调用任务时出现Received unregistered task of type 'tasks.add'.

  • 原因:Celery 没有找到读取到任务

  • 解决办法:在装饰器出加上 name='tasks.add'

参考博客:Received unregistered task of type ‘XXX’ Celery报错

3. 项目中使用 celery

celery 还可以配置成一个应用,放置在项目中使用,其结构为:

Tips:

  • 项目应该是个包文件
  • 必须命名为 celery.py,否则报错 AttributeError:module 'proj' has no attribute 'celery'

1、proj/celery.py

from __future__ import absolute_import, unicode_literals		# 将相对路径转换为绝对路径
from celery import Celery
# 创建一个Celery的实例
app = Celery('tasks',
             # redis://:password@hostname:port/db_number  有密码认证的连接
             broker='redis://127.0.0.1:6379',
             # broker='redis://:密码@192.168.2.105:6379/0',
             backend='redis://127.0.0.1:6379/0',  # 用于Celery的返回结果的接收
             include=['proj.tasks']       # 用于声明Celery要执行的tasks任务的位置
             )
# 配置结果超时时间
app.conf.update(
    result_expires=3600,   # Celery结果存在中间件Redis的超时时间[仅针对当前的Celery的App]
)
if __name__ == '__main__':
    app.start()

2、proj/tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app  # 从我的Celery中导入App
import time

@app.task(name='tasks.add')		# 需要配置 name='tasks.add',否则报 Received unregistered task of type 'app.tasks.add'.
def add(x, y):
    time.sleep(10)
    return x + y


@app.task(name='tasks.mul')
def mul(x, y):
    time.sleep(10)
    return x * y

3、启动 worker,分为前台和后台启动(无需关心起行为):

# 前台
celery -A proj worker -l info

运行结果如下:

4、调用任务:

# 在这里使用终端调用,也可以再项目中调用
>>> from proj.tasks import add, mul

>>> result1 = add.delay(5, 8)
>>> result2 = mul.delay(5, 8)
>>> result1.get()		# 取值
13
>>> result2.get()
40

worker 放在后台继续运行,我们可以继续做别的事情:

# w1:worker
celery multi start w1 -A proj -l info		# 启动 worker
celery multi restart w1 -A proj -l info		# 重启
celery multi stop w1 -A proj -l info		# 关闭
ps -ef | grep celery						# 查看目前还有几个 worker 正在运行


参考文章

  • Celery学习— Celery在项目中的使用

4. 定时任务

celery 通过 celery beat 模块即可实现定时任务功能。

4.1 小试牛刀

1、新建一个 c1\task1.py,编辑如下:

from celery import Celery
from celery.schedules import crontab
 
app = Celery()
 
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每过 10 s,执行一次 hello
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
 
    # 每过 30 s,执行一次 world
    sender.add_periodic_task(30.0, test.s('world'), expires=10)
 
    # 每周一七点三十执行一次 Happy Mondays!
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )
 
@app.task
def test(arg):
    print(arg)

也可以配置成下面这样,或许更好理解:

# 可以配置多个
app.conf.beat_schedule = {
    'add-every-30-seconds': {			# 任务名字
        'task': 'tasks.add',			# 执行 tasks 中的 add 函数
        'schedule': 30.0,				# 时间,也可以用 timedelta(seconds=20),
        'args': (16, 16)				# 参数
    },
}
app.conf.timezone = 'UTC'				# 时区

2、启动 beat 进程,监控是否有任务:

hj@hj:~/桌面/c1$ celery -A task1 beat

3、启动 worker 执行任务:

hj@hj:~/桌面/c1$ celery -A task1 worker

从上图中可以看到,每过 10s,就会输出一个 hello,每过 30s 输出一个 world,当然这只是几个比较简单的任务示例。

beat 需要将任务的最后运行时间存储在本地数据库文件中(默认名称为 celerybeat-schedule),因此需要访问当前目录中的写入,或者您可以为此文件指定自定义位置:

# beat 运行时,会产生几个文件
hj@hj:~/桌面/c1$ ls
celerybeat.pid  celerybeat-schedule  __pycache__  task1.py

# 指定文件位置
celery -A task1 beat -s /home/celery/var/run/celerybeat-schedule

4.2 使用 crontab 构建复杂定时任务

如果你只是想每过多少秒输出一个 hello 的话,那么上面的功能就能实现。但是若你想每周一的早上七点半定时发送一封邮件或提醒做什么事的话,那么就只能使用 crontab 才能实现(与 Linux 自带的 crontab功能是一样的)。

from celery.schedules import crontab
from datetime import timedelta


app.conf.beat_schedule = {
     # 任务一
    'sum-task':{				# 任务名
        'task':'tasks.add',		# 执行 tasks.py 中的 add 函数
        'schedule':timedelta(seconds=20),		# 时间
        'args':(5, 6)			# 参数
    },
    # 任务二
    'multi-task': {
        'task': 'tasks.multi',
        'schedule': crontab(hour=4, minute=30, day_of_week=1),
        'args': (3, 4)
    }
}

更多关于 crontab

示例说明
crontab()每分钟执行一次
crontab(minute=0, hour=0)每天午夜执行
crontab(minute=0, hour='*/3')每三个小时执行一次
crontab(minute=0,``hour='0,3,6,9,12,15,18,21')与上面相同
crontab(minute='*/15')每 15min执行一次
crontab(day_of_week='sunday')周日每分钟执行一次
crontab(minute='*',``hour='*',``day_of_week='sun')与上面相同
crontab(minute='*/10',``hour='3,17,22',``day_of_week='thu,fri')每周四或周五凌晨3-4点,下午5-6点和晚上10-11点
crontab(minute=0,hour='*/2,*/3')每过一个小时执行一次, 以下时间除外: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5')执行小时可被5整除,比如下午三点(十五点)触发
crontab(minute=0, hour='*/3,8-17')执行时间能被 2整除,在办公时间 8-17点,每小时执行一次
crontab(0, 0,day_of_month='2')每个月第二天执行
crontab(0, 0,``day_of_month='2-30/3')每个偶数日执行
crontab(0, 0,``day_of_month='1-7,15-21')在本月的第一周和第三周执行
crontab(0, 0,day_of_month='11',``month_of_year='5')每年5月11日执行
crontab(0, 0,``month_of_year='*/3')每个季度第一个月执行

参考文章

  • 每天一个linux命令(50):crontab命令
  • 官方文档

5. Django 中使用 Celery

5.1 构建简单的异步任务

- project/			# 项目主目录
  - app/			# app
    	- urls.py
        - views.py
        - tasks.py	# celery 任务,名字必须是 tasks.py
  - project/			# 项目文件
		- __init__.py
  		- settings.py
  		- urls.py
        - celery.py		# 创建 Celery 实例,加载 redis 配置文件
  - manage.py

在 Django 中使用 Celery ,依赖 django_celery_beat,因此先要安装它:

pip3 install django_celery_beat

并将其添加到 settings.py 中:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app',
    'django_celery_beat',
]

...
# redis  连接
CELERY_BROKER_URL = 'redis://127.0.0.1:6397'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6397/0'

1、project/celery

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# 使用 Django 环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Project.settings')

app = Celery('celery_task')

app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 运行 root 用户运行 celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

2、project/__init__.py

from __future__ import absolute_import, unicode_literals

# 确保导入应用,Django 启动就能使用 app 

from .celery import app as celery_app

__all__ = ['celery_app']

3、创建任务 app/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time 

@shared_task
def add(x, y):
    time.sleep(10)
    return x + y

@shared_task
def multi(x, y):
    time.sleep(10)
    return x * y

tasks.py 必须在各个 app 根目录下,且只能叫 tasks.py

4、视图中调用任务 views.py

  • ready():判断任务是否执行完毕
  • get(timeout=1):获取结果
  • traceback():获取原始回溯信息
from django.shortcuts import render, HttpResponse
from celery.result import AsyncResult

def celery_test(request):
    # 调用任务
    task = add.delay(4,22)

    return HttpResponse(task.id)	# 获取任务 id

def celery_res(request):
    # 获取任务结果
    task_id = 'b3fbe0da-57bb-4055-aea2-160afd6ae801'
    res = AsyncResult(id=task_id)
    return HttpResponse(res.get())		# 获取结果

5、路由配置 app/urls.py

path('celery_test/', views.celery_test, name='celery_test'),
path('celery_result/', views.celery_result, name='celery_result'),

6、打开终端启动 worker

celery -A project worker -l info

访问 127.0.0.1:8000/app/celery_test 调用执行任务:

访问 127.0.0.1:8000/app/celery_result 查看任务结果:

因为这是异步处理的,所有再执行任务时,其他代码照样执行。

5.2 在 Django 中使用定时任务

在 Django 也能设置定时任务,依赖于 django_celery_beatcrontab

1、在 project/celery.py 添加定时任务:

from celery.schedules import crontab
from datetime import timedelta


app.conf.update(
        CELERYBEAT_SCHEDULE = {
            # 任务一
            'sum-task':{
                'task':'app.tasks.add',
                'schedule':timedelta(seconds=20),
                'args':(5, 6)
                },
            # 任务二
            'multi-task': {
                'task': 'app.tasks.multi',
                'schedule': crontab(hour=4, minute=30, day_of_week=1),
                'args': (3, 4)
                }
            }
        )

在上面添加了两个定时任务 sum-taskmulti-task

  • sum-task :每过 20 s执行一次 add() 函数
  • multi-task:每周一的早上四点三十分执行一次 multi() 函数

启动 celery beat ,celery 启动一个 beat 进程不断检查是否有任务要执行:

celery -A project beat -l info

timedelta

timedelta 是datetime 的一个对象,需要引入 from datatime import timedelta,参数如下:

  • days:天
  • seconds:秒
  • microseconds:微秒
  • milliseconds:毫秒
  • minutes:分钟
  • hours:小时

crontab

  • month_of_year:月份
  • day_of_month:日期
  • day_of_week:周
  • hour:小时
  • minute:分钟

总结

  • 同时启动异步任务和定时任务:celery -A project worker -b -l info

  • 使用 RabbitMQ,配置:broker='amqp://admin:admin@localhost'

  • Celery 长时间运行避免内存泄露,添加配置:CELERY_MAX_TASKS_PER_CHILD = 10

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/382917.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

进程的介绍

文章目录一.进程的概念1.1概念1.2进程的组成1.2.1 PCB中描述进程的特征二.进程的虚拟地址空间三.进程间的通信引入线程一.进程的概念 1.1概念 百科的介绍: 换句话说,一个跑起来的程序,就是一个进程,也就是在操作系统中运行的exe程序就是一个进程,如下图的进程列表 进程是操…

【女士,房间墙上凿个洞,看你在干嘛~】安全攻防内网渗透-绕过防火墙和安全检测,搭建DNS隐蔽隧道

作者&#xff1a;Eason_LYC 悲观者预言失败&#xff0c;十言九中。 乐观者创造奇迹&#xff0c;一次即可。 一个人的价值&#xff0c;在于他所拥有的。所以可以不学无术&#xff0c;但不能一无所有&#xff01; 技术领域&#xff1a;WEB安全、网络攻防 关注WEB安全、网络攻防。…

Java并发包提供了哪些并发工具类?

第19讲 | Java并发包提供了哪些并发工具类&#xff1f; 通过前面的学习&#xff0c;我们一起回顾了线程、锁等各种并发编程的基本元素&#xff0c;也逐步涉及了 Java 并发包中的部分内容&#xff0c;相信经过前面的热身&#xff0c;我们能够更快地理解 Java 并发包。 今天我要…

SpringBoot集成Swagger3.0(入门) 02

文章目录Swagger3常用配置注解接口测试API信息配置Swagger3 Docket开关&#xff0c;过滤&#xff0c;分组Swagger3常用配置注解 ApiImplicitParams,ApiImplicitParam&#xff1a;Swagger3对参数的描述。 参数名参数值name参数名value参数的具体意义&#xff0c;作用。required参…

mes系统核心业务流程及应用场景介绍

现在许多企业已经开始使用MES系统控制和管理工厂的生产过程&#xff0c;实时监控、诊断和控制生产过程&#xff0c;完成单元集成和系统优化。本文将为大家具体介绍一下MES系统的业务流程。 MES系统业务流程 1、计划调度MES系统承接了ERP订单&#xff0c;开始干预生产。该模块…

kaggle RSNA 比赛过程总结

引言 算算时间&#xff0c;有差不多两年多没在打kaggle了&#xff0c;自20年最后一场后&#xff08;其实之前也就打过两场&#xff0c;一场打铁&#xff0c;一场表格赛是金是银不太记得&#xff0c;当时相当于刺激战场&#xff0c;过拟合lb大赛太刺激了&#xff0c;各种trick只…

毕业设计 基于51单片机的指纹红外密码电子锁

基于51单片机的指纹红外密码电子锁1、项目简介1.1 系统框架1.2 系统功能2、部分电路设计2.1 STC89C52单片机最小系统电路设计2.2 矩阵按键电路电路设计2.3 液晶显示模块电路设计3、部分代码展示3.1 LCD12864显示字符串3.2 串口初始化实物图1、项目简介 选题指导&#xff0c;项…

动态规划|特殊的多行规划|dp[2][] 用两行元素分别记录状态变化

多行规划是我自己整理此类问题时起的名字&#xff0c;如有专属名词&#xff0c;麻烦评论告知 用于处理当动态规划中&#xff0c;需要记录多个值的状态变化时。 376. 摆动序列&#xff08;特殊的自定义二维dp&#xff09; 做惯了一般的动态规划&#xff0c;突然看到这种题目&a…

UDPTCP网络编程

udp编程接口 一个UDP程序的编写可以分为3步&#xff1a; 创建一个网络套接字&#xff1a; 它相当于文件操作时的文件描述符&#xff0c;是一个程序进行网络通讯的门户&#xff0c; 所有的网络操作都要基于它 绑定IP和端口&#xff1a; 需要为网络套接字填充IP和端口信息 但是…

Python - 操作txt文件

文章目录打开txt文件读取txt文件写入txt文件删除txt文件打开txt文件 open(file, moder, bufferingNone, encodingNone, errorsNone, newlineNone, closefdTrue)函数用来打开txt文件。 #方法1&#xff0c;这种方式使用后需要关闭文件 f open("data.txt","r&qu…

【Visual Studio】git提交代码时使用GPG

前言 下载安装GPG的过程省略,直接开始进行配置 0.visual studio 版本说明 其余版本未测试,但是应该也是可以的 1 获取GPG的密钥ID 1.1 window下可以打开Kleopatra查看生成好的密钥的密钥ID 1.2 也可以从命令行中获取 gpg --list-keys 红框位置,后16位就是密钥ID 2 配置.git…

QML MouseArea详解

1.MouseArea简介 MouseArea是一个不可见的项目&#xff0c;通常与一个可见的项目一起使用&#xff0c;以便为该项目提供鼠标处理。通过有效地充当代理&#xff0c;鼠标处理的逻辑可以包含在MouseArea项中。 常用属性&#xff1a; 属性 类型描述 containsMouse bool 光标当前…

刷题笔记2 | 977.有序数组的平方 ,209.长度最小的子数组 ,59.螺旋矩阵II ,总结

977.有序数组的平方 给你一个按 非递减顺序 排序的整数数组 nums&#xff0c;返回 每个数字的平方 组成的新数组&#xff0c;要求也按 非递减顺序 排序。 输入&#xff1a;nums [-4,-1,0,3,10] 输出&#xff1a;[0,1,9,16,100] 解释&#xff1a;平方后&#xff0c;数组变为 […

二、Spring概述

1.Spring简介 Spring是一个开源框架&#xff0c;它由Rod Johnson创建。它是为了解决企业应用开发的复杂性而创建的。 从简单性、可测试性和松耦合的角度而言&#xff0c;任何Java应用都可以从Spring中受益。 Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架。 Sp…

关于如何合理设置线程池参数解决方案

关于如何合理设置线程池参数解决方案&#xff08;ThreadPoolExecutor&#xff09; 线程池参数有哪些 我们直接来看构造方法 ... public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6,ThreadFactory var7, Rejecte…

W25Q256被写保护如何修改

W25Q256被写保护如何修改1、 W25Q256数据读不到1.1 打印的寄存器的值1.2 可能原因1.3 解决办法1.4 用到的函数1、 W25Q256数据读不到 能够正确的读到ID&#xff0c;但是读到的数据不正确 1.1 打印的寄存器的值 0x2 BUSY &#xff1a;只读&#xff0c; 指令正在执行 WEL (1) &…

物盾安全汤晓冬:工业互联网企业如何应对高发的供应链安全风险?

编者按&#xff1a;物盾安全是一家专注于物联网安全的产品厂商&#xff0c;其核心产品“物安盾”在能源、制造、交通等多个领域落地&#xff0c;为这些行业企业提供覆盖物联网云、管、边、端的安全整体解决方案。“物安盾”集成了腾讯安全制品扫描&#xff08;BSCA&#xff09;…

【二】kubernetes操作

k8s卸载重置 名词解释 1、Namespace&#xff1a;名称用来隔离资源&#xff0c;不隔离网络 创建名称空间 一、命名空间namesapce 方式一&#xff1a;命令行创建 kubectl create ns hello删除名称空间 kubectl delete ns hello查询指定的名称空间 kubectl get pod -n kube-s…

【Adobe国际认证中文官网】Adobe中国摄影计划,免费安装 正版激活

一直以来国内有非常多的 Adobe 用户&#xff0c;但苦于正版的购买渠道较少、价格较为高昂&#xff0c;转而选择其他国家或地区的 Adobe 计划&#xff0c;亦或者是其他软件。这次Adobe在杭州宣布在中国大陆地区推出面向专业摄影师及摄影爱好者的Adobe Creative Cloud 中国摄影计…

大话数据结构-普里姆算法(Prim)和克鲁斯卡尔算法(Kruskal)

5 最小生成树 构造连通网的最小代价生成树称为最小生成树&#xff0c;即Minimum Cost Spanning Tree&#xff0c;最小生成树通常是基于无向网/有向网构造的。 找连通网的最小生成树&#xff0c;经典的有两种算法&#xff0c;普里姆算法和克鲁斯卡尔算法。 5.1 普里姆&#xff…