python celery使用队列

news2025/1/14 18:39:18

在celery的配置方法中有个参数叫task_routes,是用来设置不同的任务 消费不同的队列(也就是路由)。

格式如下:

{ ‘task name’: { ‘queue’: ‘queue name’ }}

直接上代码,简单明了,目录格式如下:

在这里插入图片描述

首先是配置文件 config.init.py

import os
import sys
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))


class Config(object):
    """配置文件基类"""

    """ 项目名称 """
    PROJECT_NAME = "crawler_worker"
    """ celery backend存放结果 """
    CELERY_BACKEND_URL = "redis://127.0.0.1:6379/4"
    """ celery broker中间件 """
    CELERY_BROKER_URL = "redis://127.0.0.1:6379/5"

    """ worker 名称 """
    CRAWL_SEND_EMAIL_TASK = "crawl_service.crawl.send_email_task"  # 抓取发送邮件任务
    CRAWL_SEND_MSG_TASK = "crawl_service.crawl.send_msg_task"  # 抓取发送短信任务


settings = Config()

celery应用程序模块配置相关 celery_base.celery_app.py

import os
import sys
import time
import celery
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))

from config import settings


# 实例化celery对象
celery_app = celery.Celery(
    settings.PROJECT_NAME,
    backend=settings.CELERY_BACKEND_URL,
    broker=settings.CELERY_BROKER_URL,
    include=[
        "tasks.crawl_send_email",
        "tasks.crawl_send_msg",
    ],
)

# 任务路由
task_routes = {
    settings.CRAWL_SEND_EMAIL_TASK: {
        "queue": f"{settings.CRAWL_SEND_EMAIL_TASK}_queue"
    },
    settings.CRAWL_SEND_MSG_TASK: {"queue": f"{settings.CRAWL_SEND_MSG_TASK}_queue"},
}
# 任务去重
celery_once = {
    "backend": "celery_once.backends.Redis",
    "settings": {"url": settings.CELERY_BACKEND_URL, "default_timeout": 60 * 60},
}
# 配置文件
celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_default_queue="normal",
    timezone="Asia/Shanghai",
    enable_utc=False,
    task_routes=task_routes,
    task_ignore_result=True,
    redis_max_connections=100,
    result_expires=3600,
    ONCE=celery_once,
)

抓取基类 crawl_worker_base.py

from celery_once import QueueOnce


class CrawlBase(QueueOnce):
    """
    抓取worker基类
    """

    name = None
    once = {"graceful": True}
    ignore_result = True

发送邮件任务 crawl_send_email.py

import os
import sys
import time
import celery
from loguru import logger
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))

from config import settings
from celery_base.celery_app import celery_app
from tasks.crawl_worker_base import CrawlBase

"""

执行命令:
celery -A tasks.crawl_send_email worker -l info -Q crawl_service.crawl.send_email_task_queue

"""


class SendEmailClass(CrawlBase):
    name = settings.CRAWL_SEND_EMAIL_TASK

    def __init__(self, *args, **kwargs):
        super(SendEmailClass, self).__init__(*args, **kwargs)

    def run(self, name):
        logger.info("class的方式, 向%s发送邮件..." % name)
        time.sleep(5)
        logger.info("class的方式, 向%s发送邮件完成" % name)
        return f"成功拿到{name}发送的邮件!"


send_email = celery_app.register_task(SendEmailClass())

发送短信 crawl_send_msg.py

import os
import sys
import time
import celery
from loguru import logger
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))
from config import settings
from celery_base.celery_app import celery_app
from tasks.crawl_worker_base import CrawlBase

"""

执行命令:
celery -A tasks.crawl_send_msg worker -l info -Q crawl_service.crawl.send_msg_task_queue

"""


class SendMsgClass(CrawlBase):
    name = settings.CRAWL_SEND_MSG_TASK

    def __init__(self, *args, **kwargs):
        super(SendMsgClass, self).__init__(*args, **kwargs)

    def run(self, name):
        logger.info("class的方式, 向%s发送短信..." % name)
        time.sleep(5)
        logger.info("class的方式, 向%s发送短信完成" % name)
        return f"成功拿到{name}发送的短信!"


send_msg = celery_app.register_task(SendMsgClass())

发送邮件任务-调度器 send_email_scheduler.py

import sys
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))

from config import settings
from celery_base.celery_app import celery_app

if __name__ == "__main__":
    for i in range(100):
        result = celery_app.send_task(
            name=settings.CRAWL_SEND_EMAIL_TASK, args=(f"张三嘿嘿{i}",)
        )
        print(result.id)

发送短信任务-调度器 send_msg_scheduler.py

import os
import sys
import time
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))

from config import settings
from celery_base.celery_app import celery_app

if __name__ == "__main__":
    for i in range(100, 500):
        result = celery_app.send_task(
            name=settings.CRAWL_SEND_MSG_TASK, args=(f"李四哈哈哈{i}",)
        )
        print(result.id)

准备工作已经做好,紧接着分别执行命令:

celery -A tasks.crawl_send_email worker -l info -Q crawl_service.crawl.send_email_task_queue
celery -A tasks.crawl_send_msg worker -l info -Q crawl_service.crawl.send_msg_task_queue

出现👇🏻下面效果就代表celery启动成功:

在这里插入图片描述

最后只要发送任务即可,在redis中就可以看到专门指定的两个队列了。

在这里插入图片描述

看下运行过程中的日志

在这里插入图片描述

一个简单的celery + 队列就实现了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1459152.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LabVIEW读取excel日期

LabVIEW读取excel日期 | Excel数据表格中有日期列和时间列,如下表所示: 通过LabVIEW直接读取Excel表格数据,读出的日期列和时间列数据与原始表格不一致,直接读出来的数据如下表所示: 日期、时间列数据异常 问题产生原因…

哈希应用位图 | 位图概述与代码实现 | 关于位图的几个面试题

文章目录 1.位图的概述与实现1.1.位图的引出与概述1.2.位图的代码实现1.3.位图的应用及其他面试题 1.位图的概述与实现 当然C库中也有位图的实现:链接 1.1.位图的引出与概述 面试题:给40亿个不重复的无符号整数(0~2^32)&#xf…

消息中间件之RocketMQ源码分析(十一)

Namesrv路由原理 Namesrv获取的Topic路由信息来自Broker定时心跳,心跳时Broker将Topic信息和其他信息发送到Namesrv。 Namesrv通过RequestCode.REGISTER_BROKER接口将心跳中的Broker信息和Topic信息存储在Namesrv中 路由注册 registerBrokerWithFilterServer()方…

手撕扩散模型(一)| 训练部分——前向扩散,反向预测代码全解析

文章目录 1 直接使用 核心代码2 工程代码实现2.1 DDPM2.2 训练 三大模型VAE,GAN, DIffusion扩散模型 是生成界的重要模型,但是最近一段时间扩散模型被用到的越来越多的,最近爆火的OpenAI的 Sora文生视频模型其实也是用了这种的方…

【Java期末】学生成绩管理系统(MySQL数据库)

诚接C语言、C、Java、Python、HTML、JavaScript、vue、MySQL相关编程作业, 标价10-20每份,如有需要请加文章最下方QQ。 本文资源:https://download.csdn.net/download/weixin_47040861/88856340 1.题目要求 学生成绩管理系统 通过Java控制…

PNG图片压缩-UPNG.js参数说明及示例

UPNG.js是一个非常轻量且高效的库,用于处理PNG图像。它可以编码和解码PNG图片,同时支持压缩和解压缩功能。特别适合在前端项目中处理图像,尤其是在需要优化图像大小而不牺牲质量时。 UPNG.encode()函数是UPNG.js中用于将图像数据编码成PNG格…

量化巨头“卖空”被刷屏!网友:又一类量化策略要“收摊”了

量化圈遇到了龙年首宗“大事件”! 2月20日晚间,沪深交易所同时出手对量化巨头灵均投资的异常交易行为进行“处理”。 沪深交易所均称发现灵均在2月19日开盘1分钟内,名下多个账户通过计算机程序自动生产交易指令,短时间大量下单卖…

WireShark 安装指南:详细安装步骤和使用技巧

Wireshark是一个开源的网络协议分析工具,它能够捕获和分析网络数据包,并以用户友好的方式呈现这些数据包的内容。Wireshark 被广泛应用于网络故障排查、安全审计、教育及软件开发等领域。接下将讲解Wireshark的安装与简单使用。 目录 Wireshark安装步骤…

过了30岁了,一定要专注一件事情?视频号值得尝试!

经常说视频号下载助手, 但发现大多数的大佬都只是先专注一件事情。 小编初6就回来了,和一个大佬吃饭,虽然人家规模并不大,但日引客户上千也是基本的。 这里给大家揭秘一下,他的做法!!&#x…

猫头虎分享已解决Bug || 脚本执行错误(Script Execution Failure):ScriptError, ExecutionFailure

博主猫头虎的技术世界 🌟 欢迎来到猫头虎的博客 — 探索技术的无限可能! 专栏链接: 🔗 精选专栏: 《面试题大全》 — 面试准备的宝典!《IDEA开发秘籍》 — 提升你的IDEA技能!《100天精通鸿蒙》 …

挑战30天学完Python:Day15 错误类型

📘 Day 14 🎉 本系列为Python基础学习,原稿来源于 30-Days-Of-Python 英文项目,大奇主要是对其本地化翻译、逐条验证和补充,想通过30天完成正儿八经的系统化实践。此系列适合零基础同学,或仅了解Python一点…

Linux编辑器——Vim详解

目录 ⭐前言 ⭐vim的基本概念 ⭐vim的基本操作 ⭐vim命令模式命令集 ⭐vim末行模式命令集 ⭐简单vim配置 ⭐配置文件的位置 ⭐常用配置选项 ⭐前言 vi/vim的区别简单点来说,它们都是多模式编辑器,不同的是vim是vi的升级版本,它不仅兼容…

课程大纲:图像处理中的矩阵计算

课程名称:《图像处理中的矩阵计算》 课程简介: 图像处理中的矩阵计算是图像分析与处理的核心部分。本课程旨在教授学员如何应用线性代数中的矩阵计算,以实现各种图像处理技术。我们将通过强调实际应用和实践活动来确保学员能够理解和掌握这些…

代码随想录算法训练营第三六天 | 无重叠区间、划分字母区间、合并区间

目录 无重叠区间划分字母区间合并区间 LeetCode 435. 无重叠区间 LeetCode 763.划分字母区间 LeetCode 56. 合并区间 无重叠区间 给定一个区间的集合 intervals ,其中 intervals[i] [starti, endi] 。返回 需要移除区间的最小数量,使剩余区间互不重叠…

vue3 之 商城项目—会员中心

整体功能梳理 1️⃣个人中心—个人信息和猜你喜欢数据渲染 2️⃣我的订单—各种状态下的订单列表展示 路由配置&#xff08;三级路由配置&#xff09; 准备模版member/index.vue <script setup> </script><template><div class"container">…

深度学习图像算法工程师--面试准备(1)

1 请问人工神经网络中为什么 ReLU 要好过于 tanh 和 Sigmoid function&#xff1f; 采⽤Sigmoid 等函数&#xff0c;算激活函数时&#xff08;指数运算&#xff09;&#xff0c;计算量⼤&#xff0c;反向传播求误差梯度时&#xff0c;求导涉及除法和指数运算&#xff0c;计算量…

《VitePress 简易速速上手小册》第2章:Markdown 与页面创建(2024 最新版)

文章目录 2.1 Markdown 基础及扩展2.1.1 基础知识点解析2.1.2 重点案例&#xff1a;技术博客2.1.3 拓展案例 1&#xff1a;食谱分享2.1.4 拓展案例 2&#xff1a;个人旅行日记 2.2 页面结构与布局设计2.2.1 基础知识点解析2.2.2 重点案例&#xff1a;公司官网2.2.3 拓展案例 1&…

软件测试方法_边界值分析法

目录&#xff1a; ①边界值分析法的介绍和概念 ②边界值分析法的原理和思想 ③单缺陷假设和多缺陷假设 ④边界值测试数据类型 ⑤内部边界值分析 ⑥各类边界值测试介绍 ⑦基于边界值分析方法选择测试用例的原则 ⑧边界值分析法的实例分析 1、边界值分析法的介绍和概念 …

力扣94 二叉树的中序遍历 (Java版本) 递归、非递归

文章目录 题目描述递归解法非递归解法 题目描述 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2] 示例 2&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[] 示…

【大厂AI课学习笔记】【2.2机器学习开发任务实例】(7)特征构造

特征分析之后&#xff0c;就是特征构造。 特征构造第一步 特征构造往往要进行数据的归一化。 在本案例中&#xff0c;我们将所有的数据&#xff0c;将所有特征区间调整为0~1之间。 如上图。 那么&#xff0c;为什么要进行归一化&#xff0c;又如何将数据&#xff0c;调整为…