《Python实战进阶》No 9:使用 Celery 实现异步任务队列

news2025/4/20 14:51:03

第9集:使用 Celery 实现异步任务队列


引言

在现代 Web 应用中,许多操作(如发送邮件、处理文件上传、执行复杂计算等)可能需要耗费较长时间。如果这些操作直接在主线程中执行,会导致用户请求阻塞,降低用户体验。为了解决这一问题,我们可以使用 Celery 来实现异步任务队列。

Celery 是一个强大的分布式任务队列框架,支持 Python 编写的异步任务调度和后台任务处理。本篇将详细介绍如何使用 Celery 构建异步任务队列,并结合实际案例展示其在 Flask 和 Django 项目中的应用。
在这里插入图片描述


1. 什么是 Celery?

Celery 是一个基于消息队列的任务调度工具,它允许你将耗时的任务从主线程中分离出来,交由后台进程异步执行。Celery 的核心组件包括:

  • 任务生产者(Producer):负责创建任务并将其发送到消息队列。
  • 消息代理(Broker):用于存储任务队列,常见的代理有 RabbitMQ 和 Redis。
  • 任务消费者(Worker):从消息队列中获取任务并执行。

2. 安装与配置
2.1 安装 Celery

首先,确保你的环境中已安装 Celery 和消息代理(这里以 Redis 为例):

pip install celery redis
2.2 配置 Redis

Redis 是 Celery 常用的消息代理之一。你可以通过以下命令安装 Redis:

sudo apt install redis-server

启动 Redis 服务:

sudo systemctl start redis

验证 Redis 是否正常运行:

redis-cli ping
# 如果返回 "PONG",说明 Redis 已成功启动。

3. 使用 Celery 的基本流程

以下是使用 Celery 的基本步骤:

  1. 创建 Celery 应用实例。
  2. 定义异步任务。
  3. 启动 Celery Worker。
  4. 在主程序中调用任务。

接下来我们将通过一个简单的示例演示这些步骤。


4. 示例:异步发送邮件

假设我们需要实现一个功能:当用户注册时,系统会向用户发送一封欢迎邮件。由于发送邮件是一个耗时操作,我们将其封装为 Celery 异步任务。

4.1 创建 Celery 应用

在项目根目录下创建 celery_app.py 文件:

from celery import Celery

# 创建 Celery 应用实例
app = Celery('tasks', broker='redis://localhost:6379/0')

# 定义异步任务
@app.task
def send_welcome_email(user_email):
    print(f"Sending welcome email to {user_email}...")
    # 模拟发送邮件的耗时操作
    import time
    time.sleep(5)
    print(f"Welcome email sent to {user_email}")
4.2 启动 Celery Worker

在终端中启动 Celery Worker:

celery -A celery_app worker --loglevel=info

这将启动一个 Celery Worker,监听任务队列并执行任务。

4.3 调用异步任务

在主程序中调用 send_welcome_email 任务。以下是一个 Flask 示例:

from flask import Flask, request, jsonify
from celery_app import send_welcome_email

app = Flask(__name__)

@app.route('/register', methods=['POST'])
def register():
    user_email = request.json.get('email')
    if not user_email:
        return jsonify({"error": "Email is required"}), 400
    
    # 异步调用任务
    send_welcome_email.delay(user_email)
    
    return jsonify({"message": "Registration successful. Welcome email will be sent shortly."}), 200

if __name__ == '__main__':
    app.run(debug=True)
4.4 测试

启动 Flask 应用:

python app.py

使用 Postman 或 curl 发送 POST 请求:

curl -X POST http://127.0.0.1:5000/register -H "Content-Type: application/json" -d '{"email": "test@example.com"}'

你会看到 Celery Worker 输出日志,表明任务正在异步执行。


5. Celery 的高级特性
5.1 定时任务

Celery 支持定时任务,可以用来定期执行某些操作。例如,每天凌晨清理过期数据:

  1. 安装 Celery 的扩展包 celery[redis]celery[schedule]
  2. 配置定时任务:
from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-every-midnight': {
        'task': 'tasks.cleanup_expired_data',
        'schedule': crontab(hour=0, minute=0),
    },
}
  1. 启动 Celery Beat:
celery -A celery_app beat --loglevel=info
5.2 任务结果存储

默认情况下,Celery 不会存储任务的执行结果。如果需要查看任务状态或结果,可以配置结果后端(如 Redis 或数据库):

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

然后可以通过 AsyncResult 获取任务结果:

from celery.result import AsyncResult

result = AsyncResult(task_id, app=app)
print(result.status)  # 查看任务状态
print(result.result)  # 查看任务结果

6. 在 Django 中使用 Celery

如果你使用的是 Django,可以通过 django-celery-resultsdjango-celery-beat 扩展来简化 Celery 的集成。

  1. 安装依赖:
pip install django-celery-results django-celery-beat
  1. 配置 settings.py
INSTALLED_APPS += [
    'django_celery_results',
    'django_celery_beat',
]

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
  1. 迁移数据库:
python manage.py migrate
  1. 定义任务并在视图中调用,与 Flask 类似。

7. 总结

通过本篇教程,我们学习了如何使用 Celery 构建异步任务队列,并通过实际案例展示了其在 Flask 和 Django 项目中的应用。Celery 的强大之处在于其灵活性和可扩展性,无论是简单的异步任务还是复杂的分布式任务调度,它都能胜任。

下一集我们将探讨 Web 安全性,重点讲解如何防止 SQL 注入、XSS 和 CSRF 攻击,敬请期待!


参考资料
  • Celery 官方文档
  • Redis 官方文档
  • Flask 官方文档
  • Django 官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2308722.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Mark】记录用宝塔+Nginx+worldpress+域名遇到的跨域,301,127.0.0.1,CSS加载失败问题

背景 想要用宝塔搭建worldpress,然后用域名直接转https,隐藏掉ipport。 结果被折磨了1天,一直在死活在301,127.0.0.1打转 还有css加载不了的情况 因为worldpress很多是301重定向的,所以改到最后我都不知道改了什么&am…

Linux | Ubuntu 与 Windows 双系统安装 / 高频故障 / UEFI 安全引导禁用

注:本文为 “buntu 与 Windows 双系统及高频故障解决” 相关文章合辑。 英文引文,机翻未校。 How to install Ubuntu 20.04 and dual boot alongside Windows 10 如何将 Ubuntu 20.04 和双启动与 Windows 10 一起安装 Dave’s RoboShack Published in…

计算机毕业设计SpringBoot+Vue.js手机商城 (源码+文档+PPT+讲解)

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

CSS—隐藏元素:1分钟掌握与使用隐藏元素的方法

个人博客:haichenyi.com。感谢关注 1. 目录 1–目录2–display:none3–visibility: hidden4–opacity: 05–position: absolute;与 left: -9999px;6–z-index 和 position7–clip-path: circle(0%) 2. display:none 标签会挂载在html中,但是不会在页面上…

EtherCAT总线学习笔记

一、EtherCAT的概述: EtherCAT是由德国BECKHOFF自动化公司于2003年提出的 实时工业以太网技术。它具有高速和高数据有效率的特点,支持多种设备连接拓扑结构。其 从站节点使用专用控制芯片,主站使用标准的以太网控制器。 EtherCAT的主要特点如…

WebRTC与PJSIP:呼叫中心系统技术选型指南

助力企业构建高效、灵活的通信解决方案 在数字化时代,呼叫中心系统的技术选型直接影响客户服务效率和业务扩展能力。WebRTC与PJSIP作为两大主流通信技术,各有其核心优势与适用场景。本文从功能、成本、开发门槛等维度为您深度解析,助您精准匹…

Vue-Flow绘制流程图(Vue3+ElementPlus+TS)简单案例

本文是vue3Elementplusts框架编写的简单可拖拽绘制案例。 1.效果图&#xff1a; 2.Index.vue主代码&#xff1a; <script lang"ts" setup> import { ref, markRaw } from "vue"; import {VueFlow,useVueFlow,MarkerType,type Node,type Edge } fro…

如何通过 LlamaIndex 将数据导入 Elasticsearch

作者&#xff1a;来自 Elastic Andre Luiz 逐步介绍如何使用 RAG 和 LlamaIndex 提取数据并进行搜索。 在本文中&#xff0c;我们将使用 LlamaIndex 来索引数据&#xff0c;从而实现一个常见问题搜索引擎。 Elasticsearch 将作为我们的向量数据库&#xff0c;实现向量搜索&am…

Boosting

Boosting 学习目标 知道boosting集成原理和实现过程知道bagging和boosting集成的区别知道AdaBoost集成原理 Boosting思想 Boosting思想图 每一个训练器重点关注前一个训练器不足的地方进行训练通过加权投票的方式&#xff0c;得出预测结果串行的训练方式 1 什么是boosting 随着…

【通俗讲解电子电路】——从零开始理解生活中的电路(一)

导言&#xff1a;电子电路为什么重要&#xff1f; ——看不见的“魔法”&#xff0c;如何驱动你的生活&#xff1f; 清晨&#xff0c;当你的手机闹钟响起时&#xff0c;你可能不会想到&#xff0c;是电子电路在精准控制着时间的跳动&#xff1b;当你用微波炉加热早餐时&#…

LeetCode72编辑距离(动态规划)

给你两个单词 word1 和 word2&#xff0c; 请返回将 word1 转换成 word2 所使用的最少操作数 。 你可以对一个单词进行如下三种操作&#xff1a; 插入一个字符 删除一个字符 替换一个字符 示例 1&#xff1a; 输入&#xff1a;word1 “horse”, word2 “ros” 输出&#xf…

【K8S】Kubernetes 基本架构、节点类型及运行流程详解(附架构图及流程图)

Kubernetes 架构 k8s 集群 多个 master node 多个 work nodeMaster 节点&#xff08;主节点&#xff09;&#xff1a;负责集群的管理任务&#xff0c;包括调度容器、维护集群状态、监控集群、管理服务发现等。Worker 节点&#xff08;工作节点&#xff09;&#xff1a;实际运…

Windows版FFmpeg使用及B站视频下载示例python源码

Windows版FFmpeg使用及B站视频下载示例python源码 FFmpeg介绍和下载 FFmpeg 是一个功能强大、灵活且广泛使用的多媒体处理工具&#xff0c;无论是在专业领域还是日常使用中&#xff0c;都能满足各种多媒体处理需求。FFmpeg 是一个开源项目&#xff0c;遵循 LGPL 或 GPL 许可。…

飞书考勤Excel导入到自己系统

此篇主要用于记录Excel一行中&#xff0c;单条数据的日期拿取&#xff0c;并判断上下班打卡情况。代码可能满足不了大部分需求&#xff0c;目前只够本公司用&#xff0c;如果需要&#xff0c;可以参考。 需要把飞书月度汇总的考勤表导入系统中可以参考下。 下图为需要获取的年…

【leetcode hot 100 560】和为K的子数组

解法一&#xff1a;用左右指针寻找字串&#xff0c;如果和>k&#xff0c;则减少一个数&#xff08;left&#xff09;&#xff1b;如果和<k&#xff0c;则加上一个数&#xff08;right&#xff09;。 class Solution {public int subarraySum(int[] nums, int k) {int nu…

EGO-Planner的无人机视觉选择(yolov5和yolov8)

EGO-Planner的无人机视觉选择&#xff08;yolov5和yolov8&#xff09; 效果 yolov5检测效果 yolov8检测效果 一、YOLOv8 vs YOLOv5&#xff1a;关键差异解析 1. 训练效率&#xff1a;为何YOLOv8更快&#xff1f; 架构轻量化 YOLOv8采用C2f模块&#xff08;Cross Stage Partia…

性能测试分析和调优

步骤 性能调优的步骤 性能调优的步骤&#xff1a; 1.确定问题&#xff1a;根据性能测试的结果来分析确定bug。–测试人员职责 2.分析原因&#xff1a;分析问题产生的原因。----开发人员职责 3.给出解决方案&#xff1a;可以是修改软件配置、增加硬件资源配置、修改代码等----…

阿里云oss文件上传springboot若依java

一、第一步 引入依赖 <!-- 阿里云OSS --> <dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId> </dependency> 二、第二步 application.yml #阿里云oss服务配置 aliyun:oss:endpoint: …

使用create_sql_query_chain工具根据自然语言问题生成SQL查询,踩坑版

1. 开启调试模式 from langchain import debugdebug True # 启用调试模式说明&#xff1a; 这里从 langchain 库中导入了一个名为 debug 的变量&#xff08;或模块&#xff09;&#xff0c;然后将它设置为 True。这通常用来启用调试模式&#xff0c;方便开发者在程序运行时看…

无人机自主导航与避障技术!

自主导航的实现 环境感知&#xff1a;通过传感器&#xff08;如摄像头、激光雷达、超声波传感器等&#xff09;获取周围环境信息。 地图构建&#xff1a;利用SLAM&#xff08;同步定位与地图构建&#xff09;技术&#xff0c;实时生成环境地图并确定无人机的位置。 路径规划…