分布式任务队列系统 celery 进阶

news2025/1/13 17:34:12

通过前面的入门,我们大概了解了celery的工作原理及简单的入门代码示例(传送门),下面进行一些稍微复杂的任务调度学习

多目录结构异步执行

在实际项目中,使用Celery进行异步任务处理时,经常需要将代码组织在多个目录和模块中,以便更好地管理和维护。下面展示如何在多目录结构下使用Celery进行异步任务执行。

项目结构

假设我们有以下项目结构:

在这里插入图片描述

1. 配置文件 (config/celery_config.py)

首先,我们需要配置Celery的broker和backend。在这个示例中,我们将配置文件放在 config 目录下。

broker_url = 'redis://localhost:32769/0'
result_backend = 'redis://localhost:32769/1'

2. 定义任务 (app/task1.py 和 app/task2.py)

接下来,在 app 目录下定义我们的任务函数。

task1

import time

from celery_demo.multi_task.app.celery_worker import celery_app


@celery_app.task
def send_email(name):
    print(f"开始给{name}发送邮件")
    time.sleep(1)
    return "done"


task2

import time

from celery_demo.multi_task.app.celery_worker import celery_app


@celery_app.task
def send_msg(name):
    print(f"开始给{name}发送短信")
    time.sleep(1)
    return "done"

3. 启动Worker (app/celery_worker.py)

为了能够启动worker来处理任务,我们需要编写一个脚本来启动它。在这个示例中,这个脚本放在 app/worker.py 中。

注意!:include参数的task路径必须写全,不然各种找不到模块报错

from celery import Celery

celery_app = Celery('celery_worker', include=['celery_demo.multi_task.app.task1', 'celery_demo.multi_task.app.task2'])
celery_app.config_from_object('celery_demo.multi_task.config.celery_config')

# 时区
celery_app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery_app.conf.enable_utc = False

# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':
    celery_app.worker_main(['worker', '--loglevel=info'])

运行命令启动Worker:

在根目录下:
celery -A celery_demo.multi_task.app.celery_worker worker --loglevel=info

或者直接运行该脚本:

/usr/bin/python3 /Users/fangyirui/PycharmProjects/pythonProject/celery_demo/multi_task/app/celery_worker.py 

或者pychram可以右键直接run

4. 主程序调用异步任务 (main.py)

最后,在主程序中调用这些异步任务。我们可以通过导入 add.delay() 或者其他方法来发送消息到消息队列。

from celery_demo.multi_task.app import task1, task2

result1 = task1.send_email.delay('AA')
result2 = task2.send_msg.delay('BB')

print(result1)
print(result1.get())
print(result2)
print(result2.get())

总结

在上述示例中,通过合理的项目结构,将不同功能模块分离开来,使得代码更加清晰易维护。具体步骤如下:

  1. 配置文件:定义了Celery的broker和backend设置。
  2. 定义任务:创建了包含具体业务逻辑的异步函数,并用@task装饰器标记为可被调度执行的task.
  3. 启动Worker:编写了用于启动celerey worker 的脚本,使其能够从消息队列拉取并执行相应操作.
  4. 主程序调用:通过导入task 模块中的方法,实现对某些操作发起异步请求.
  5. 注意各种模块的导包和引用需要把路径写全,不然就算pycharm不提示报错,运行时也会报错,博主我已经踩了很多坑了!

定时任务

代码结构目录

在这里插入图片描述

启动beat服务方式

配置 Celery 应用

创建一个名为 celery_worker.py 的文件,并配置你的 Celery 应用。

from datetime import timedelta

from celery import Celery
from celery.schedules import crontab

app = Celery('celery_worker', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1',
             include=['task']
             )

app.conf.beat_schedule = {
    'add-every-seconds': {
        # 指定执行的任务
        'task': 'task.add',

        # 单位为秒,每10秒触发一次
        # 'schedule': 10.0,

        # corn表达式触发,每一分钟触发一次
        # 'schedule': crontab(minute="*/1"),

        # 每年6月5号,16点0分执行
        'schedule': crontab(minute=0, hour=16, day_of_month=5, month_of_year=6),

        # 每6秒执行一次
        # 'schedule': timedelta(seconds=6),

        'args': (16, 16)
    },
}
app.conf.timezone = 'Asia/Shanghai'

定义任务

创建一个名为 task.py 的文件,并定义你的任务。

from celery_worker import app


@app.task
def add(x, y):
    print(f"执行成功: {x} {y} ,结果为{x + y}")
    return x + y

启动 Worker 和 Beat 服务

启动 worker:

celery -A celery_worker worker --loglevel=info

启动 beat:

celery -A celery_worker beat --loglevel=info

这样,Celery Beat 会按照你配置的时间间隔发送 add(16, 16) 的任务到队列中,而 Worker 会从队列中取出并执行这个任务。

单个任务时指定时间

send_task.py

from datetime import datetime
from datetime import timedelta

from task import add

# # 方式一
# # 如果是过去的时间,则会立马执行
# v1 = datetime(2024, 6, 5, 16, 5, 00)
# print(v1)
# # 如果不转utc时间,会比预计的晚8个小时才执行
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[5, 5], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())

# 用当前时间延迟10秒执行
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = add.apply_async(args=[7, 7], eta=task_time)
print(result.id)

zsh: command not found: celery

问题描述

在终端执行celery的命令时报错,zsh: command not found: celery

但执行pip3 show celery查看已经安装过的celery时,发现安装没有问题:

➜ pythonProject pip3 show celery
Name: celery
Version: 5.4.0
Summary: Distributed Task Queue.
Home-page: https://docs.celeryq.dev/
Author: Ask Solem
Author-email: auvipy@gmail.com
License: BSD-3-Clause
Location: /Users/fangyirui/Library/Python/3.9/lib/python/site-packages
Requires: billiard, python-dateutil, click-didyoumean, click, click-repl, click-plugins, vine, kombu, tzdata
Required-by: 

在网上查阅其他解决方案后,问题仍没有解决,那么你很可能跟我一样还没有将 Python 二进制安装目录添加到 PATH 中。

解决方案:

在 .zshrc 文件中添加你的python/bin目录: /Users/fangyirui/Library/Python/3.9/bin

或者执行:

export PATH=$PATH:/Users/fangyirui/Library/Python/3.9/bin

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1791321.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CTF本地靶场搭建——基于阿里云ACR实现动态flag题型的创建

接上文,这篇主要是结合阿里云ACR来实现动态flag题型的创建。 这里顺便也介绍一下阿里云的ACR服务。 阿里云容器镜像服务(简称 ACR)是面向容器镜像、Helm Chart 等符合 OCI 标准的云原生制品安全托管及高效分发平台。 ACR 支持全球同步加速、…

STM32作业实现(七)OLED显示数据

目录 STM32作业设计 STM32作业实现(一)串口通信 STM32作业实现(二)串口控制led STM32作业实现(三)串口控制有源蜂鸣器 STM32作业实现(四)光敏传感器 STM32作业实现(五)温湿度传感器dht11 STM32作业实现(六)闪存保存数据 STM32作业实现(七)OLED显示数据 STM32作业实现(八)触摸按…

6 大亮点!全新 Anolis OS 23.1 GA 版正式发布,满足多样化平台支持

一、引言 Anolis OS 23,作为龙蜥社区推出的着重于技术演进和先进性的 Linux 发行版本,即便在频繁集成各类软件最新特性的同时,依然确保了系统的高度稳定性和可靠性。Anolis OS 在社区共建上对理事单位的需求给予了极大重视,力保各…

使用正则表达式分割字符串

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 split()方法用于实现根据正则表达式分割字符串,并以列表的形式返回。其作用同字符串对象的split()方法类似,所不同的就是分割…

Linux下的配置工具menuconfig+配置文件(Kconfig/.config/defconfig)

我们都知道,嵌入式开发中,或者说C语言中,配置基本都是通过宏定义来决定的,在MCU开发中,代码量比较小,配置项也比较少,我们直接修改对应的宏定义即可。 但是,Linux开发中,操作系统、驱动部分还有应用部分加起来,代码量极大,配置项目也非常多,这时候,就需要对这些配…

HTML+CSS+JS 动态登录表单

效果演示 实现了一个登录表单的背景动画效果,包括一个渐变背景、一个输入框和一个登录按钮。背景动画由多个不同大小和颜色的正方形组成,它们在页面上以不同的速度和方向移动。当用户成功登录后,标题会向上移动,表单会消失。 Code <!DOCTYPE html> <html lang=&q…

小白学大模型:Hugging Face Tokenizer

Tokenizer介绍 在自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;Tokenizer&#xff08;分词器&#xff09;是准备输入模型的关键步骤之一。Hugging Face 提供了用于各种模型的分词器库&#xff0c;其中大多数分词器都以两种风格提供&#xff1a;一种是完整的 Pytho…

VRRP

文章目录 VRRP基本原理技术背景VRRP作用VRRP概述VRRP报文VRRP名词解释VRRP路由器VRRP组虚拟路由器虚拟IP地址、MAC地址Master、Backup路由器 VRRP状态机Master/ Backup 路由器Master路由器:Backup路由器: VRRP的工作过程 VRRP基础配置 VRRP基本原理 技术背景 为了解决单个路由…

力扣hot100:138. 随机链表的复制(技巧,数据结构)

LeetCode&#xff1a;138. 随机链表的复制 这是一个经典的数据结构题&#xff0c;当做数据结构来学习。 1、哈希映射 需要注意的是&#xff0c;指针也能够当做unordered_map的键值&#xff0c;指针实际上是一个地址值&#xff0c;在unordered_map中&#xff0c;使用指针的实…

快速C++中的入门智能指针

✨前言✨ &#x1f4d8; 博客主页&#xff1a;to Keep博客主页 &#x1f646;欢迎关注&#xff0c;&#x1f44d;点赞&#xff0c;&#x1f4dd;留言评论 ⏳首发时间&#xff1a;2024年6月4日 &#x1f4e8; 博主码云地址&#xff1a;博主码云地址 &#x1f4d5;参考书籍&#…

【数据集划分】假如你有超百万条oracle数据库数据(成真版)

【数据集划分】假如你有接近百万条oracle数据库数据&#xff08;成真版&#xff09; 写在最前面小结 数据集划分原因注意事项 1. 留出法&#xff08;Hold-out Method&#xff09;原理算法复杂度代码示例Scikit-learn的train_test_split分布式计算框架&#xff08;如Apache Spar…

JVM学习-Jprofiler

JProfiler 基本概述 特点 使用方便&#xff0c;界面操作友好对被分析的应用影响小(提供模板)CPU&#xff0c;Tread&#xff0c;Memory分析功能尤其强大支持对jdbc,noSql,jsp,servlet,socket进行分析支持多种模式(离线、在线)的分析支持监控本地、远程JVM跨平台&#xff0c;拥…

MongoDB~索引使用与优化

Study by&#xff1a; https://docs.mongoing.com/indexeshttps://www.cnblogs.com/Neeo/articles/14325130.html#%E5%85%B6%E4%BB%96%E7%B4%A2%E5%BC%95 作用 如果你把数据库类比为一本书&#xff0c;那书的具体内容是数据&#xff0c;书的目录就是索引&#xff0c;所以索引…

C++第三方库【httplib】断点续传

什么是断点续传 上图是我们平时在浏览器下载文件的场景&#xff0c;下载的本质是数据的传输。当出现网络异常&#xff0c;浏览器异常&#xff0c;或者文件源的服务器异常&#xff0c;下载都可能会终止。而当异常解除后&#xff0c;重新下载文件&#xff0c;我们希望从上一次下载…

用例篇03

正交表 因素&#xff1a;存在的条件 水平&#xff1a;因素的取值 最简单的正交表&#xff1a;L4(2) 应用 allpairs 来实现正交表。 步骤&#xff1a; 1.根据需求找出因素和水平 2.将因素和水平写入到excel表格中&#xff08;表格不需要保存&#xff09;&#xff08;推荐用…

文本批量高效编辑器:一键在每行结尾添加分隔符,助力文本处理飞速提升!

在信息爆炸的时代&#xff0c;文本处理成为了一项不可或缺的技能。然而&#xff0c;面对大量的文本数据&#xff0c;如何高效地进行处理却成为了一项挑战。这时&#xff0c;一款高效、易用的文本批量编辑器就显得尤为重要。这个软件就是首助编辑高手 首先&#xff0c;打开首助…

fairseq框架使用记录

sh命令 cmd"fairseq-train data-bin/$data_dir--save-dir $save_dir--distributed-world-size $gpu_num -s $src_lang -t $tgt_lang--arch $arch--dropout $dropout--criterion $criterion --label-smoothing 0.1--task mmt_vqa--optimizer adam --adam-betas (0.9, 0.98…

高并发系统限流原理

短时间内巨大的访问流量&#xff0c;我们如何让系统在处理高并发的同时还能保证自身系统的稳定性&#xff1f;估计有人会说&#xff0c;增加机器就可以了&#xff0c;因为我的系统架构设计就是按照分布式思想进行架构设计的&#xff0c;所以可以只需要增加机器就可以解决问题了…

代码随想录算法训练营day41

题目&#xff1a;01背包理论基础、416. 分割等和子集 参考链接&#xff1a;代码随想录 动态规划&#xff1a;01背包理论基础 思路&#xff1a;01背包是所有背包问题的基础&#xff0c;第一次看到比较懵&#xff0c;完全不知道dp数据怎么设置。具体分析还是dp五部曲&#xff…

Vue3实战笔记(58)—从零开始掌握Vue3插槽机制,基础入门

文章目录 前言插槽基础入门总结 前言 不论是组件封装还是分析源码&#xff0c;实际开发中经常接触插槽&#xff0c;插槽是干什么用的呢&#xff1f;组件之间能够接收任意类型的 JavaScript 值作为 props&#xff0c;但组件要如何接收模板内容呢&#xff1f;在某些场景中&#…