使用这些方法让你的 Python 并发任务执行得更好

news2024/10/5 17:15:58

动动发财的小手,点个赞吧!

问题

一直以来,Python的多线程性能因为GIL而一直没有达到预期。

所以从 3.4 版本开始,Python 引入了 asyncio 包,通过并发的方式并发执行 IO-bound 任务。经过多次迭代,asyncio API 的效果非常好,并发任务的性能相比多线程版本有了很大的提升。

但是,程序员在使用asyncio时还是会犯很多错误:

一个错误如下图所示,直接使用await协程方法,将对并发任务的调用从异步变为同步,最终失去并发特性。

async def main():
    result_1 = await some_coro("name-1")
    result_2 = await some_coro("name-2")

另一个错误如下图所示,虽然程序员意识到他需要使用create_task创建一个任务在后台执行。而下面这种一个一个等待任务的方式,将不同时序的任务变成了有序的等待。

async def main():
    task_1 = asyncio.create_task(some_coro("name-1"))
    task_2 = asyncio.create_task(some_coro("name-2"))
    
    result_1 = await task_1
    result_2 = await task_2

此代码将等待 task_1 先完成,而不管 task_2 是否先完成。

什么是并发任务执行?

那么,什么是真正的并发任务呢?我们用一张图来说明:

alt

如图所示,一个并发流程应该由两部分组成:启动后台任务,将后台任务重新加入主函数,并获取结果。

大多数读者已经知道如何使用 create_task 启动后台任务。今天,我将介绍几种等待后台任务完成的方法以及每种方法的最佳实践。

开始

在开始介绍今天的主角之前,我们需要准备一个示例async方法来模拟IO绑定的方法调用,以及一个自定义的AsyncException,可以用来在测试抛出异常时友好地提示异常信息:

from random import random, randint
import asyncio


class AsyncException(Exception):
    def __init__(self, message, *args, **kwargs):
        self.message = message
        super(*args, **kwargs)

    def __str__(self):
        return self.message


async def some_coro(name):
    print(f"Coroutine {name} begin to run")
    value = random()

    delay = randint(14)
    await asyncio.sleep(delay)
    if value > 0.5:
        raise AsyncException(f"Something bad happen after delay {delay} second(s)")
    print(f"Coro {name} is Done. with delay {delay} second(s)")
    return value

并发执行方法比较

1. asyncio.gather

asyncio.gather 可用于启动一组后台任务,等待它们完成执行,并获取结果列表:

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    results = await asyncio.gather(*aws)  # need to unpack the list
    for result in results:
        print(f">got : {result}")

asyncio.run(main())

asyncio.gather 虽然组成了一组后台任务,但不能直接接受一个列表或集合作为参数。如果需要传入包含后台任务的列表,请解包。

asyncio.gather 接受一个 return_exceptions 参数。当return_exception的值为False时,任何后台任务抛出异常,都会抛给gather方法的调用者。而 gather 方法的结果列表是空的。

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    try:
        results = await asyncio.gather(*aws, return_exceptions=False)  # need to unpack the list
    except AsyncException as e:
        print(e)
    for result in results:
        print(f">got : {result}")

asyncio.run(main())
alt

当return_exception的值为True时,后台任务抛出的异常不会影响其他任务的执行,最终会合并到结果列表中一起返回。

results = await asyncio.gather(*aws, return_exceptions=True)
alt

接下来我们看看为什么gather方法不能直接接受一个列表,而是要对列表进行解包。因为当一个列表被填满并执行时,我们很难在等待任务完成时向列表中添加新任务。但是 gather 方法可以使用嵌套组将现有任务与新任务混合,解决了中间无法添加新任务的问题:

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))
    group_1 = asyncio.gather(*aws)  # note we don't use await now
    # when some situation happen, we may add a new task
    group_2 = asyncio.gather(group_1, asyncio.create_task(some_coro("a new task")))
    results = await group_2
    for result in results:
        print(f">got : {result}")

asyncio.run(main())

但是gather不能直接设置timeout参数。如果需要为所有正在运行的任务设置超时时间,就用这个姿势,不够优雅。

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    results = await asyncio.wait_for(asyncio.gather(*aws), timeout=2)
    for result in results:
        print(f">got : {result}")

asyncio.run(main())

2. asyncio.as_completed

有时,我们必须在完成一个后台任务后立即开始下面的动作。比如我们爬取一些数据,马上调用机器学习模型进行计算,gather方法不能满足我们的需求,但是我们可以使用as_completed方法。

在使用 asyncio.as_completed 方法之前,我们先看一下这个方法的源码。

# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
def as_completed(fs, *, timeout=None):
  # ...
  for f in todo:
      f.add_done_callback(_on_completion)
  if todo and timeout is not None:
      timeout_handle = loop.call_later(timeout, _on_timeout)
  for _ in range(len(todo)):
      yield _wait_for_one()

源码显示as_completed不是并发方法,返回一个带有yield语句的迭代器。所以我们可以直接遍历每个完成的后台任务,我们可以对每个任务单独处理异常,而不影响其他任务的执行:

async def main():
    aws = []
    for i in range(5):
        aws.append(asyncio.create_task(some_coro(f"name-{i}")))

    for done in asyncio.as_completed(aws):  # we don't need to unpack the list
        try:
            result = await done
            print(f">got : {result}")
        except AsyncException as e:
            print(e)

asyncio.run(main())

as_completed 接受超时参数,超时后当前迭代的任务会抛出asyncio.TimeoutError:

async def main():
    aws = []
    for i in range(5):
        aws.append(asyncio.create_task(some_coro(f"name-{i}")))

    for done in asyncio.as_completed(aws, timeout=2):  # we don't need to unpack the list
        try:
            result = await done
            print(f">got : {result}")
        except AsyncException as e:
            print(e)
        except asyncio.TimeoutError: # we need to handle the TimeoutError
            print("time out.")

asyncio.run(main())
alt

as_complete在处理任务执行的结果方面比gather灵活很多,但是在等待的时候很难往原来的任务列表中添加新的任务。

3. asyncio.wait

asyncio.wait 的调用方式与 as_completed 相同,但返回一个包含两个集合的元组:done 和 pending。 done 保存已完成执行的任务,而 pending 保存仍在运行的任务。

asyncio.wait 接受一个 return_when 参数,它可以取三个枚举值:

  • 当return_when为asyncio.ALL_COMPLETED时,done存放所有完成的任务,pending为空。
  • 当 return_when 为 asyncio.FIRST_COMPLETED 时,done 持有所有已完成的任务,而 pending 持有仍在运行的任务。
async def main():
    aws = set()
    for i in range(5):
        aws.add(asyncio.create_task(some_coro(f"name-{i}")))

    done, pending = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        try:
            result = await task
            print(f">got : {result}")
        except AsyncException as e:
            print(e)
    print(f"the length of pending is {len(pending)}")

asyncio.run(main())
alt
  • 当return_when为asyncio.FIRST_EXCEPTION时,done存放抛出异常并执行完毕的任务,pending存放仍在运行的任务。

当 return_when 为 asyncio.FIRST_COMPLETED 或 asyncio.FIRST_EXECEPTION 时,我们可以递归调用 asyncio.wait,这样我们就可以添加新的任务,并根据情况一直等待所有任务完成。

async def main():
    pending = set()
    for i in range(5):
        pending.add(asyncio.create_task(some_coro(f"name-{i}")))  # note the type and name of the task list

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in done:
            try:
                result = await task
                print(f">got : {result}")
            except AsyncException as e:
                print(e)
                pending.add(asyncio.create_task(some_coro("a new task")))
    print(f"the length of pending is {len(pending)}")

asyncio.run(main())
alt

4. asyncio.TaskGroup

在 Python 3.11 中,asyncio 引入了新的 TaskGroup API,正式让 Python 支持结构化并发。此功能允许您以更 Pythonic 的方式管理并发任务的生命周期。

总结

本文[1]介绍了 asyncio.gather、asyncio.as_completed 和 asyncio.wait API,还回顾了 Python 3.11 中引入的新 asyncio.TaskGroup 特性。

根据实际需要使用这些后台任务管理方式可以让我们的asyncio并发编程更加灵活。

Reference

[1]

Source: https://towardsdatascience.com/use-these-methods-to-make-your-python-concurrent-tasks-perform-better-b693b7a633e1

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/573878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LED子系统】八、小试牛刀

个人主页:董哥聊技术 我是董哥,高级嵌入式软件开发工程师,从事嵌入式Linux驱动开发和系统开发,曾就职于世界500强公司! 创作理念:专注分享高质量嵌入式文章,让大家读有所得! 文章目录…

2023电工杯数学建模B题完整模型代码【原创首发】

文末获取全部资料 摘要 近年来,随着人工智能(AI)技术的发展和广泛应用,其在教育领域的潜力和影响引起了广泛关注。本研究旨在通过一项全面的问卷调查,探讨AI学习工具在大学生学习过程中的影响。 在本项研究中&#…

认识HTTP协议---1

hello,大家好,今天为大家带来http协议的相关知识 1.HTTP协议 🐷1.应用层协议 🐷2.HTTP协议的工作过程 2.HTTP协议格式 🐷1.认识抓包工具Fidder 🐷2.学会使用fidder 🐷3.协议格式总结 3.HTTP请求 &#x1f437…

常用本地事务和分布式事务解决方案模型

目录 1 DTP模型2 2PC2.1 方案简介2.2 处理流程2.2.1 阶段1:准备阶段2.2.2 阶段2:提交阶段 2.3 方案总结 3 3PC3.1 方案简介3.2 处理流程3.2.1 阶段1:canCommit3.2.2 阶段2:preCommit3.3.3 阶段3:do Commit 3.3 方案总结…

使用本地的chatGLM

打开终端 wsl -d Ubuntu conda activate chatglm cd cd ChatGLM-6B python3 cli_demo.py 依次输入以上命令。

随机森林Proximity实现及应用

随机森林Proximity实现及应用 1 算法1.1 随机森林Proximity简介1.2 RF-GAP1.3 实现代码 2 应用2.1 离群点(outlier)检测2.1.1 原理和实现2.1.2 实验结果 附录 项目主页:randomforest C implementation of random forests classification, regression, proximity and…

可以免费使用的ChatGPT保姆级教程 (New Bing)

ChatGPT狂飙160天,世界已经不是之前的样子。https://ai.weoknow.com 每天给大家更新可用的国内可用chatGPT资源 最近,ChatGPT已经非常流行,但由于各种原因,国内用户无法直接免费使用ChatGPT的API,各种伟大的神也利用这…

沉浸式翻译 安装及使用

介绍一下最近非常或的沉浸式翻译工具,非常有助于外文阅读,包括网页、pdf等。可以同时显示原文和译文,操作简单,使用起来还是非常友好的。 先上链接:介绍 - 沉浸式翻译 如何使用 - 沉浸式翻译 1.安装 支持Edg…

仙人掌之歌——权力的游戏(2)

他是特级战斗英雄 “那个李通,会不会看起来好吓人呀?” 云冰洁有些紧张的样子,几乎要让陈速笑出来。 “哪有,一个很 nice 的人好吧。就是看起来比较严肃而已,我也从没看他笑过倒是。” 陈速让云冰洁看菜单&#xff0…

【极海APM32F4xx Tiny】学习笔记01-模板工程创建

本项目的使用的开发板 关于芯片使用的其他笔记 外部晶振时钟 模板工程创建/LED工程 项目仓库 https://gitcode.net/u010261063/apm32_test_part 创建模板工程的核心要素 复制官方的标准外设库复制启动文件复制cmsis文件复制相关的公共头文件如apm32f4xx_int.h 和 system_apm…

mybatis trim标签使用详解

mybatis trim标签使用详解 mybatis的trim标签一般用于去除sql语句中多余的and关键字,逗号,或者给sql语句前拼接 “where“、“set“以及“values(“ 等前缀,或者添加“)“等后缀,可用于选择性插入、更新、删除或者条件查询等操作。…

Dubbo框架

文章目录 1. 什么是Dubbo2. Dubbo架构3. SpringBoot整合Dubbo框架3.1 前期准备3.1.1 Zookeeper的安装 3.2 项目创建3.3 添加依赖3.4 定义服务接口3.5 服务端的实现3.6 消费端请求任务3.7 服务端配置文件3.8 消费端配置文件3.9 启动应用 4. Dubbo负载均衡5. Dubbo集群容错 1. 什…

第一部分-基础篇-第一章:PSTN与VOIP(上篇)

文章目录 序言引言:什么是VOIP和PSTN1.1 PSTN起源与发展1.1.1 最早的电话网1.1.2 人工电话交换时代1.1.3自动电话交换时代1.1.4半电子交换机时代1.1.5空分交换机时代1.1.6 数字交换机时代1.1.7现代PSTN时代1.1.8 下一代网络及VoIP时代 1.2 电话实现技术1.2.1 电话号…

【MySQL】如何速通MySQL(1)

📌前言:本篇博客介绍如何速通MySQL,主要介绍Mysql中主要的基础的入门,学习MySQL之前要先安装好MySQL,如果还没有安装的小伙伴可以看看博主前面的博客,里面有详细的安装教程。或者看一下下面这个链接~ &…

“AI孙燕姿”爆火背后,是内容合规问题的再次升级|上云那些事

“讽刺的是,人类再怎么快也无法超越它。”这是歌手孙燕姿关于自己AI分身遍布网络一事,在MAKE MUSIC网站的博客上发表的看法。 来源:孙燕姿MAKE MUSIC网站博客 当大家还在担心AIGC会不会让自己失业时,歌手孙燕姿就因为“AI孙燕姿”…

LDA算法实现鸢尾花数据集降维

目录 1. 作者介绍2. LDA降维算法2.1 基本概念2.2 算法流程 3. LDA算法实现3.1 数据集介绍3.2 代码实现3.3 结果展示 1. 作者介绍 唐杰,男,西安工程大学电子信息学院,2022级研究生 研究方向:机器视觉与人工智能 电子邮件&#xff…

深度学习笔记(八)——语义分割标注转换

核心思想:“将颜色转换成对应的标号” 形式一:Json格式的标注转换成调色板mask 形式二:RGB类型mask(24位三通道)转成调色板mask(8位单通道),调色板的格式为.png 形式三:对于二分类的…

oracle安装

服务端安装(公司中不需要,只安装客户端就行) 1、挂载一个Windows系统 双击vmx文件 启动 2、网络配置 添加一个网络 自己电脑看控制面板是否添加虚拟网卡 查看连接的网络,ip地址不能为1,为1就自己修改,…

深度剖析:C++内存池的设计与实现

深度剖析:C内存池的设计与实现 一、引言(Introduction)1.1 内存管理的重要性1.2 内存池的基本概念1.3 内存池的应用场景 二、C内存管理机制(C Memory Management Mechanism)2.1 C内存分配与释放2.2 C内存管理的问题2.3…

《Kali渗透基础》04. 主动信息收集(一)

kali渗透 1:主动信息收集2:发现3:二层发现3.1:arping3.2:nmap3.3:netdiscover3.4:Scapy 4:三层发现4.1:ping4.2:Scapy4.3:nmap4.4:fpi…