python之异步任务

news2024/9/17 7:33:53

在 Python 中,异步任务通常通过使用库如 Celery 来实现。Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时提供操作控制。

Celery 中,delayapply_async 是两种常用的方法来调度异步任务。

delay 方法

delayCelery 提供的一个快捷方法,用于简化任务的调用。它会自动将任务标记为异步执行。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

# 使用 delay 方法调用任务
result = add.delay(4, 6)

apply_async 方法

apply_async 提供了更多的控制选项,例如可以指定任务的执行时间、重试策略等。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

# 使用 apply_async 方法调用任务
result = add.apply_async((4, 6), countdown=10)  # 任务将在10秒后执行
参数说明
  • args:任务的参数,通常以元组形式传递。
  • kwargs:任务的关键字参数,以字典形式传递。
  • countdown:任务延迟执行的时间(以秒为单位)。
  • eta:任务的预计执行时间(datetime 对象)。
  • expires:任务的过期时间(datetime 对象或秒数)。
  • retry:是否在任务失败时自动重试。
  • retry_policy:重试策略,例如最大重试次数、重试间隔等。

示例1

使用 apply_async 方法来设置任务的各种参数:

from celery import Celery
from datetime import datetime, timedelta

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task(bind=True, max_retries=3)
def add(self, x, y):
    try:
        return x + y
    except Exception as exc:
        raise self.retry(exc=exc, countdown=5)

# 使用 apply_async 方法调用任务
eta = datetime.utcnow() + timedelta(seconds=10)
result = add.apply_async((4, 6), eta=eta, expires=60, retry=True, retry_policy={
    'max_retries': 5,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

任务 add 被设置为在10秒后执行,并且在60秒后过期。如果任务失败,它会自动重试最多5次,每次重试间隔0.2秒。

  • delay 方法是 apply_async 的简化版本,适用于简单的异步任务调用。
  • apply_async 方法提供了更多的控制选项,适用于需要更复杂调度和重试策略的任务。

示例2

假设你有一个自定义的任务基类 CallbackTask,你可以这样定义一个任务:

from celery import Celery, Task

app = Celery('tasks', broker='pyamqp://guest@localhost//')

class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print(f'Task {task_id} succeeded with result: {retval}')
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(f'Task {task_id} failed with exception: {exc}')

@app.task(name='my_custom_task', base=CallbackTask, ignore_result=True)
def add(x, y):
    return x + y

# 调用任务
result = add.delay(4, 6)
  1. 自定义任务基类 CallbackTask

    • on_success 方法:当任务成功完成时调用。
    • on_failure 方法:当任务失败时调用。
  2. 任务定义

    • @app.task(name='my_custom_task', base=CallbackTask, ignore_result=True)
      • name='my_custom_task':任务的自定义名称。
      • base=CallbackTask:任务的基类是 CallbackTask
      • ignore_result=True:任务的结果将不会被存储。
  3. 调用任务

    • result = add.delay(4, 6):异步调用任务 add,传递参数 46

Celery 中,任务的参数通常以元组或字典的形式传递,并且 Celery 会自动处理参数的序列化和反序列化。因此,你通常不需要手动将参数 JSON 化。

参数传递

简单参数
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

# 使用 delay 方法调用任务,传递参数
result = add.delay(4, 6)

在这个示例中,参数 46 被传递给任务 addCelery 会自动处理这些参数的序列化和反序列化。

复杂参数

如果你需要传递更复杂的参数,例如嵌套的字典或列表,Celery 也能处理这些情况:

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def process_data(data):
    # 假设 data 是一个字典
    return data['key1'] + data['key2']

# 使用 apply_async 方法调用任务,传递复杂参数
data = {'key1': 10, 'key2': 20}
result = process_data.apply_async((data,))

在这个示例中,data 是一个字典,Celery 会自动将其序列化并传递给任务 process_data

常见错误

Celery 中,如果尝试传递一个 Django 模型对象作为任务参数,而没有设置适当的序列化和反序列化方法,通常会遇到序列化错误。默认情况下,Celery 使用 JSON 作为序列化格式,而 JSON 不支持直接序列化 Django 模型对象。

如果直接传递一个 Django 模型对象作为任务参数,可能会遇到类似以下的错误:

kombu.exceptions.EncodeError: Object of type <YourModel> is not JSON serializable

这个错误表明 Celery 尝试将 Django 模型对象序列化为 JSON,但失败了,因为 JSON 序列化器不知道如何处理 Django 模型对象。

解决方法

  1. 传递模型对象的主键

    • 传递模型对象的主键(或其他简单类型)作为任务参数,然后在任务内部重新获取模型对象。
    from celery import Celery
    from myapp.models import MyModel
    
    app = Celery('tasks', broker='pyamqp://guest@localhost//')
    
    @app.task
    def process_model_object(model_id):
        obj = MyModel.objects.get(id=model_id)
        # 处理对象
        print(obj)
    
    # 调用任务,传递模型对象的主键
    obj = MyModel.objects.first()
    process_model_object.delay(obj.id)
    
  2. 自定义序列化和反序列化

    • 自定义任务参数的序列化和反序列化方法,将模型对象转换为可序列化的格式(如字典)。
    from celery import Celery
    from myapp.models import MyModel
    
    app = Celery('tasks', broker='pyamqp://guest@localhost//')
    
    @app.task
    def process_model_object(model_data):
        # 反序列化模型对象
        obj = MyModel(**model_data)
        # 处理对象
        print(obj)
    
    # 调用任务,传递模型对象的字典表示
    obj = MyModel.objects.first()
    model_data = {
        'id': obj.id,
        'field1': obj.field1,
        'field2': obj.field2,
        # 其他字段
    }
    process_model_object.delay(model_data)
    
  3. 使用 Pickle 序列化器

    • Celery 支持多种序列化器,包括 Pickle。Pickle 可以序列化几乎所有 Python 对象,但它有安全风险,不建议在不受信任的环境中使用。
    from celery import Celery
    from myapp.models import MyModel
    
    app = Celery('tasks', broker='pyamqp://guest@localhost//')
    app.conf.update(
        task_serializer='pickle',
        accept_content=['pickle'],  # Ignore other content
        result_serializer='pickle',
    )
    
    @app.task
    def process_model_object(obj):
        # 处理对象
        print(obj)
    
    # 调用任务,传递模型对象
    obj = MyModel.objects.first()
    process_model_object.delay(obj)
    

总结

  • 传递模型对象的主键:这是最常见和推荐的方法,因为它简单且安全。
  • 自定义序列化和反序列化:适用于需要传递复杂对象的情况。
  • 使用 Pickle 序列化器:虽然方便,但有安全风险,不建议在不受信任的环境中使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2116448.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

博士生锻炼记录:2024.9.8

读博三年来感觉身体状况大不如前&#xff0c;虽然博士生的主要任务就是做课题和发文章&#xff0c;但是身体健康也是不容忽视的一环&#xff0c;一个好的身体是做好任何事情的基础&#xff0c;我们应该在不影响身体健康的前提下努力做课题。 这周初去参加了一个体成分测量的活…

Python编码系列—Python项目管理:掌握高效工具与实践

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

YOLOv9改进策略【Neck】| 使用CARAFE轻量级通用上采样算子

一、本文介绍 本文记录的是利用CARAFE上采样对YOLOv9的颈部网络进行改进的方法研究。YOLOv9采用传统的最近邻插值的方法&#xff0c;仅考虑子像素邻域&#xff0c;无法捕获密集预测任务所需的丰富语义信息&#xff0c;从而影响模型在密集预测任务中的性能。CARAFE通过在大感受…

Linux服务器Java启动脚本

Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包&#xff0c; 通常我们会使用nohup直接启动&#xff0c;但是还是需要手动停止然后再次启动&#xff0c; 那如何更优雅的在服务器上启动jar包呢&#xff0c;让我…

设计模式之工厂模式(通俗易懂--代码辅助理解【Java版】)

文章目录 1、工厂模式概述1&#xff09;特点&#xff1a;2&#xff09;主要角色&#xff1a;3&#xff09;工作流程&#xff1a;4&#xff09;优点5&#xff09;缺点6&#xff09;适用场景 2、简单工厂模式(静态工厂模式)1) 在简单工厂模式中&#xff0c;有三个主要角色&#x…

基于SpringBoot的宠物服务系统+uniapp小程序+LW参考示例

系列文章目录 1.基于SSM的洗衣房管理系统原生微信小程序LW参考示例 2.基于SpringBoot的宠物摄影网站管理系统LW参考示例 3.基于SpringBootVue的企业人事管理系统LW参考示例 4.基于SSM的高校实验室管理系统LW参考示例 5.基于SpringBoot的二手数码回收系统原生微信小程序LW参考示…

浏览器插件利器--allWebPluginV2.0.0.20-alpha版发布

allWebPlugin简介 allWebPlugin中间件是一款为用户提供安全、可靠、便捷的浏览器插件服务的中间件产品&#xff0c;致力于将浏览器插件重新应用到所有浏览器。它将现有ActiveX控件直接嵌入浏览器&#xff0c;实现插件加载、界面显示、接口调用、事件回调等。支持Chrome、Firefo…

小琳AI课堂:深入学习BERT

大家好&#xff0c;这里是小琳AI课堂。今天我们来聊聊BERT&#xff0c;这个在自然语言处理&#xff08;NLP&#xff09;领域掀起革命风潮的模型。 出现背景 在BERT之前&#xff0c;NLP领域主要依赖RNN或CNN模型&#xff0c;这些模型大多只能单向处理文本&#xff0c;从左到右…

【全网首创】大模型LLM-RAG知识库问答项目实战课

在大数据和人工智能迅猛发展的今天&#xff0c;大模型和知识库的结合成为了理论探索和实际应用的重要方向。LLM-RAG项目课程正是围绕这一热点展开&#xff0c;旨在通过系统性的教学&#xff0c;帮助学员掌握从项目部署、模块开发到实际应用的完整流程。课程共有43课时&#xff…

SprinBoot+Vue公交智能化系统的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平台Java领域优质…

高可用架构模式

架构里比较重要的是高性能、高可用、高扩展性。上次是高性能&#xff0c;这次是高可用。 对一般的项目而言&#xff0c;高可用主要用公司提供的基建&#xff0c;如多机房部署、主从等。但有些项目确实需要思考更多高可用的事项&#xff0c;如资源不足的情况下要做好限流或者降…

gdb中使用python脚本

1、入门案例 首先有1个a.cpp&#xff0c;代码如下&#xff1a; #include <map> #include <set> #include <iostream> #include <string>using namespace std;struct MyStruct {std::string mName;std::map<int, std::string> mField1;std::set…

SpringBoot3 简单集成 Mybatis plus

SpringBoot3 集成 Mybatis plus 1、引入Mybatisplus的starter <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.7</version></dependency>2、引入数据…

JVM3-双亲委派机制

目录 概述 作用 如何指定加载类的类加载器&#xff1f; 面试题 打破双亲委派机制 自定义类加载器 线程上下文类加载器 Osgi框架的类加载器 概述 由于Java虚拟机中有多个类加载器&#xff0c;双亲委派机制的核心是解决一个类到底由谁加载的问题 双亲委派机制&#xff…

Qt中window frame的影响

window frame 在创建图形化界面的时候&#xff0c;会创建窗口主体&#xff0c;上面会多出一条&#xff0c;周围多次一圈细边&#xff0c;这就叫window frame窗口框架&#xff0c;这是操作系统自带的。 这个对geometry的一些属性有一定影响&#xff0c;主要体现在Qt坐标系体系…

安装Seata-Service,Seata服务中心安装,并完成Nacos注册

一、下载服务器软件包 从 Releases apache/incubator-seata GitHub ,下载服务器软件包&#xff0c;将其解压缩。 版本选择&#xff1a; 1可以从官网查询版本对照。 2.可以在项目中&#xff0c;倒入seata版依赖 <!-- seata--><dependency><groupId&…

嘉立创中秋福利来啦!

单笔订单商品实付慢2万送良品铺子月饼 多品牌折扣 快来立创商城一探究竟吧~ 立创商城_一站式电子元器件采购自营商城_嘉立创电子商城 (szlcsc.com)

深度学习中常见的权重参数初始化方法

在深度学习中&#xff0c;权重参数的初始化对模型的训练过程和性能有着非常重要的影响。一个好的权重初始化方法能够帮助模型更快收敛、避免梯度爆炸或梯度消失等问题。以下是几种常见的权重初始化方法及其背后的原理。 1. 零初始化&#xff08;Zero Initialization&#xff0…

每天学习一个字符串类函数之memmove函数

目录 前言&#xff1a; 一、头文件 二、memmove函数的作用 三、理解memmove函数的定义 1、返回类型 2、参数 四、使用memmove函数 案例1&#xff1a; 案例2&#xff1a; 五、解决数据拷贝之前被覆盖的方法 六、模拟实现memmove函数 前言&#xff1a; 上一篇博客&#xff0c;我…

【C++】STL容器详解【上】

目录 一、STL基本概念 二、STL的六大组件 三、string容器常用操作 3.1 string 容器的基本概念 3.2 string 容器常用操作 3.2.1 string 构造函数 3.2.2 string基本赋值操作 3.2.3 string存取字符操作 3.2.4 string拼接字符操作 3.2.5 string查找和替换 3.2.6 string比…