Python 异步协程:从 async/await 到 asyncio 再到 async with

news2024/12/24 15:59:07

在 Python 3.8 以后的版本中,异步编程变得越来越重要。本文将系统介绍 Python 标准库中的异步编程工具,带领大家掌握 async/await 语法和 asyncio 的使用。

从一个简单的场景开始

假设我们在处理一些耗时的 I/O 操作,比如读取多个文件或处理多个数据。为了模拟这种场景,我们先用 time.sleep() 来代表耗时操作:

import time
import random

def process_item(item):
    # 模拟耗时操作
    print(f"处理中:{item}")
    process_time = random.uniform(0.5, 2.0)
    time.sleep(process_time)
    return f"处理完成:{item},耗时 {process_time:.2f} 秒"

def process_all_items():
    items = ["任务A", "任务B", "任务C", "任务D"]
    results = []
    for item in items:
        result = process_item(item)
        results.append(result)
    return results

if __name__ == "__main__":
    start = time.time()
    results = process_all_items()
    end = time.time()
    
    print("\n".join(results))
    print(f"总耗时:{end - start:.2f} 秒")
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 1.28 秒
处理完成:任务C,耗时 0.66 秒
处理完成:任务D,耗时 1.80 秒
总耗时:5.72 秒

这段代码的问题很明显:每个任务都必须等待前一个任务完成才能开始。如果有4个任务,每个任务平均耗时1秒,那么总耗时就接近4秒。

认识 async/await

Python 引入了 async/await 语法来支持异步编程。当我们在函数定义前加上 async 关键字时,这个函数就变成了一个"协程"(coroutine)。而 await 关键字则用于等待一个协程完成。让我们改写上面的代码:

import asyncio
import random
import time

async def process_item(item):
    print(f"处理中:{item}")
    # async 定义的函数变成了协程
    process_time = random.uniform(0.5, 2.0)
    # time.sleep() 换成 asyncio.sleep()
    await asyncio.sleep(process_time)  # await 等待异步操作完成
    return f"处理完成:{item},耗时 {process_time:.2f} 秒"

async def process_all_items():
    items = ["任务A", "任务B", "任务C", "任务D"]
    # 创建任务列表
    tasks = [
        asyncio.create_task(process_item(item))
        for item in items
    ]
    print("开始处理")
    results = await asyncio.gather(*tasks)
    return results

async def main():
    start = time.time()
    results = await process_all_items()
    end = time.time()
    
    print("\n".join(results))
    print(f"总耗时:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())
开始处理
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 0.80 秒
处理完成:任务C,耗时 0.83 秒
处理完成:任务D,耗时 1.46 秒
总耗时:1.97 秒

让我们详细解释这段代码的执行过程:

  1. 当函数被 async 关键字修饰后,调用该函数不会直接执行函数体,而是返回一个协程对象
  2. await 关键字只能在 async 函数内使用,它表示"等待这个操作完成后再继续"
  3. asyncio.create_task() 将协程包装成一个任务,该任务会被事件循环调度执行
  4. asyncio.gather() 并发运行多个任务,并等待它们全部完成
  5. asyncio.run() 创建事件循环,运行 main() 协程,直到它完成

使用 asyncio.wait_for 添加超时控制

在实际应用中,我们往往需要为异步操作设置超时时间:

import asyncio
import random
import time

async def process_item(item):
    process_time = random.uniform(0.5, 2.0)
    try:
        # 设置1秒超时
        await asyncio.wait_for(
            asyncio.sleep(process_time),
            timeout=1.0
        )
        return f"处理完成:{item},耗时 {process_time:.2f} 秒"
    except asyncio.TimeoutError:
        return f"处理超时:{item}"

async def main():
    items = ["任务A", "任务B", "任务C", "任务D"]
    tasks = [
        asyncio.create_task(process_item(item))
        for item in items
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks, return_exceptions=True)
    end = time.time()
    
    print("\n".join(results))
    print(f"总耗时:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())
处理超时:任务A
处理完成:任务B,耗时 0.94 秒
处理超时:任务C
处理完成:任务D,耗时 0.78 秒
总耗时:1.00 秒

使用异步上下文管理器

Python 中的 with 语句可以用于资源管理,类似地,异步编程中我们可以使用 async with 。一个类要支持异步上下文管理,需要实现 __aenter____aexit__ 方法:

import asyncio
import random

class AsyncResource:
    async def __aenter__(self):
        # 异步初始化资源
        print("正在初始化资源...")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步清理资源
        print("正在清理资源...")
        await asyncio.sleep(0.1)
    
    async def process(self, item):
        # 异步处理任务
        print(f"正在处理任务:{item}")
        process_time = random.uniform(0.5, 2.0)
        await asyncio.sleep(process_time)
        return f"处理完成:{item},耗时 {process_time:.2f} 秒"

async def main():
    items = ["任务A", "任务B", "任务C"]
    
    async with AsyncResource() as resource:
        tasks = [
            asyncio.create_task(resource.process(item))
            for item in items
        ]
        results = await asyncio.gather(*tasks)
    
    print("\n".join(results))

if __name__ == "__main__":
    asyncio.run(main())
正在初始化资源...
正在处理任务:任务A
正在处理任务:任务B
正在处理任务:任务C
正在清理资源...
处理完成:任务A,耗时 1.31 秒
处理完成:任务B,耗时 0.77 秒
处理完成:任务C,耗时 0.84 秒

使用事件循环执行阻塞操作 run_in_executor

在异步编程中,我们可能会遇到一些无法避免的阻塞操作(比如调用传统的同步API)。这时,asyncio.get_running_loop()run_in_executor 就显得特别重要:

import asyncio
import time
import requests  # 一个同步的HTTP客户端库

async def blocking_operation():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # 在线程池中执行阻塞操作
    result = await loop.run_in_executor(
        None,  # 使用默认的线程池执行器
        requests.get,  # 要执行的阻塞函数
        'http://httpbin.org/delay/1'  # 函数参数
    )
    return result.status_code

async def non_blocking_operation():
    await asyncio.sleep(1)
    return "非阻塞操作完成"

async def main():
    # 同时执行阻塞和非阻塞操作
    tasks = [
        asyncio.create_task(blocking_operation()),
        asyncio.create_task(non_blocking_operation())
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks)
    end = time.time()
    
    print(f"操作结果:{results}")
    print(f"总耗时:{end - start:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())

输出:

操作结果:[200, '非阻塞操作完成']
总耗时:1.99 秒

这个例子展示了如何在异步程序中优雅地处理同步操作。如果不使用 run_in_executor,阻塞操作会阻塞整个事件循环,导致其他任务无法执行:

  • requests.get() 是同步操作,会阻塞当前线程
  • 事件循环运行在主线程上
  • 如果直接在协程中调用 requests.get() ,整个事件循环都会被阻塞
  • 其他任务无法在这期间执行
  • run_in_executor 会将阻塞操作放到另一个线程中执行
  • 主线程的事件循环可以继续处理其他任务
  • 当线程池中的操作完成时,结果会被返回给事件循环

最佳实践是:

  • 尽量使用原生支持异步的库(如 aiohttp)
  • 如果必须使用同步库,就用 run_in_executor
  • 对于 CPU 密集型任务也可以用 run_in_executor 放到进程池中执行

任务取消:优雅地终止异步操作

有时我们需要取消正在执行的异步任务,比如用户中断操作或超时处理:

import asyncio
import random

async def long_operation(name):
    try:
        print(f"{name} 开始执行")
        while True:  # 模拟一个持续运行的操作
            await asyncio.sleep(0.5)
            print(f"{name} 正在执行...")
    except asyncio.CancelledError:
        print(f"{name} 被取消了")
        raise  # 重要:继续传播取消信号

async def main():
    # 创建三个任务
    task1 = asyncio.create_task(long_operation("任务1"))
    task2 = asyncio.create_task(long_operation("任务2"))
    task3 = asyncio.create_task(long_operation("任务3"))
    
    # 等待1秒后取消task1
    await asyncio.sleep(1)
    task1.cancel()
    
    # 等待2秒后取消其余任务
    await asyncio.sleep(1)
    task2.cancel()
    task3.cancel()
    
    try:
        # 等待所有任务完成或被取消
        await asyncio.gather(task1, task2, task3, return_exceptions=True)
    except asyncio.CancelledError:
        print("某个任务被取消了")

if __name__ == "__main__":
    asyncio.run(main())

输出:

任务1 开始执行
任务2 开始执行
任务3 开始执行
任务1 正在执行...
任务2 正在执行...
任务3 正在执行...
任务1 被取消了
任务2 正在执行...
任务3 正在执行...
任务2 正在执行...
任务3 正在执行...
任务2 被取消了
任务3 被取消了

这个例子展示了如何正确处理任务取消:

  1. 任务可以在执行过程中被取消
  2. 被取消的任务会抛出 CancelledError
  3. 我们应该适当处理取消信号,确保资源被正确清理

深入理解协程:为什么需要 async/await?

协程(Coroutine)是一种特殊的函数,它可以在执行过程中暂停,并在之后从暂停的地方继续执行。当我们使用 async 定义一个函数时,我们实际上是在定义一个协程:

import asyncio

# 这是一个普通函数
def normal_function():
    return "Hello"

# 这是一个协程
async def coroutine_function():
    await asyncio.sleep(1)
    return "Hello"

# 让我们看看它们的区别
print(normal_function)      # <function normal_function at 0x1052cc040>
print(coroutine_function)   # <function coroutine_function at 0x1054b9790>

# 调用它们的结果不同
print(normal_function())    # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>

await 如何与事件循环协作

协程(Coroutine)的核心在于它可以在执行过程中主动交出控制权,让其他代码有机会执行。让我们通过一个详细的例子来理解这个过程:

import asyncio

async def task1():
    print("任务1:开始")
    print("任务1:准备休眠")
    await asyncio.sleep(2)  # 关键点1:交出控制权
    print("任务1:休眠结束")

async def task2():
    print("任务2:开始")
    print("任务2:准备休眠")
    await asyncio.sleep(1)  # 关键点2:交出控制权
    print("任务2:休眠结束")

async def main():
    # 同时执行两个任务
    await asyncio.gather(task1(), task2())

asyncio.run(main())

这段代码的输出会是:

任务1:开始
任务1:准备休眠
任务2:开始
任务2:准备休眠
任务2:休眠结束    # 1秒后
任务1:休眠结束    # 2秒后

让我们详细解释执行过程:

  1. 当程序遇到 await asyncio.sleep(2) 时:

    • 这个 sleep 操作被注册到事件循环中
    • Python 记录当前的执行位置
    • task1 主动交出控制权
    • 重要:task1 并没有停止运行,而是被暂停了,等待之后恢复
  2. 事件循环接管控制权后:

    • 寻找其他可以执行的协程(这里是 task2)
    • 开始执行 task2,直到遇到 await asyncio.sleep(1)
    • task2 也交出控制权,被暂停
  3. 事件循环继续工作:

    • 管理一个计时器,追踪这两个 sleep 操作
    • 1秒后,发现 task2 的 sleep 时间到了
    • 恢复 task2 的执行,打印"任务2:休眠结束"
    • 2秒到时,恢复 task1 的执行,打印"任务1:休眠结束"

这就像是一个指挥家(事件循环)在指挥一个管弦乐队(多个协程):

  • 当某个乐器(协程)需要休息时,它举手示意(await)
  • 指挥家看到后,立即指挥其他乐器演奏
  • 当休息时间到了,指挥家会示意这个乐器继续演奏

代码验证:

import asyncio
import time

async def report_time(name, sleep_time):
    print(f"{time.strftime('%H:%M:%S')} - {name}开始")
    await asyncio.sleep(sleep_time)
    print(f"{time.strftime('%H:%M:%S')} - {name}结束")

async def main():
    # 同时执行多个任务
    await asyncio.gather(
        report_time("任务A", 2),
        report_time("任务B", 1),
        report_time("任务C", 3)
    )

asyncio.run(main())

输出:

00:19:26 - 任务A开始
00:19:26 - 任务B开始
00:19:26 - 任务C开始
00:19:27 - 任务B结束
00:19:28 - 任务A结束
00:19:29 - 任务C结束

这种机制的优势在于:

  1. 单线程执行,没有线程切换开销
  2. 协程主动交出控制权,而不是被操作系统强制切换
  3. 比起回调地狱,代码更清晰易读
  4. 错误处理更直观,可以使用普通的 try/except

理解了这个机制,我们就能更好地使用异步编程:

  • await 的时候,其他协程有机会执行
  • 耗时操作应该是真正的异步操作(比如 asyncio.sleep
  • 不要在协程中使用阻塞操作,那样会卡住整个事件循环

小结

Python 的异步编程主要依赖以下概念:

  1. async/await 语法:定义和等待协程
  2. asyncio 模块:提供事件循环和任务调度
  3. Task 对象:表示待执行的工作单元
  4. 异步上下文管理器:管理异步资源

使用异步编程的关键点:

  1. I/O 密集型任务最适合使用异步编程
  2. 所有耗时操作都应该是真正的异步操作
  3. 注意处理超时和异常情况
  4. 合理使用 asyncio.gather() 和 asyncio.wait_for()

异步编程不是万能的,但在处理 I/O 密集型任务时确实能带来显著的性能提升。合理使用这些工具,能让我们的程序更高效、更优雅。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2264801.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Allegro17.4创建异形焊盘速通

Allegro17.4创建异形焊盘速通 打开Padstack Editor 17.4&#xff0c;新建焊盘&#xff0c;以标贴焊盘为例: 该标贴焊盘是在底面&#xff0c;选择END LAYRE &#xff0c;选择 Shape symbol Geometry&#xff0c;Shape symbol选择框右边有选择键&#xff0c;跳到Library Shape S…

多行为级联24|多行为推荐的超图增强级联图卷积网络

挂到arxiv上的&#xff0c;多行为级联超图加对比学习,超图是针对单个行为的&#xff0c;而不是针对多个行为的。参考的类似工作是CRGCN加MBSSL。这两篇我都做了论文阅读&#xff0c;&#xff08;CRGCN正在路上&#xff0c;MBSSL是我的第一篇论文阅读帖子&#xff09;推荐系统论…

利用.NET Upgrade Assitant对项目进行升级

本教程演示如何把WPF程序从 <TargetFrameworkVersion>v4.8</TargetFrameworkVersion>升级到<TargetFramework>net8.0-windows</TargetFramework>. 下载并安装.NET Upgrade Assistant - Visual Studio Marketplace Supported .NET upgrades: .NET Frame…

Vue2四、 scoped样式冲突,data是一个函数,组件通信-父传子-子传父-非父子

组件通信 1. 父组件通过 props 将数据传递给子组件 2. 子组件利用 $emit 通知父组件修改更新 父--->子 子--->父

第146场双周赛:统计符合条件长度为3的子数组数目、统计异或值为给定值的路径数目、判断网格图能否被切割成块、唯一中间众数子序列 Ⅰ

Q1、统计符合条件长度为3的子数组数目 1、题目描述 给你一个整数数组 nums &#xff0c;请你返回长度为 3 的子数组&#xff0c;满足第一个数和第三个数的和恰好为第二个数的一半。 子数组 指的是一个数组中连续 非空 的元素序列。 2、解题思路 我们需要在给定的数组 nums…

PSDK的编译与ROS包封装

本文档讲述在NIVIDIA开发板上使用大疆提供的Payload SDK获取无人机实时GPS信息的方法&#xff0c;以及基于Payload SDK发布ROS GPS话题信息的方法。 文章目录 0 实现目标1 Payload SDK1.1 PSDK 源码的编译1.2 PSDK 的使用 2 遥测数据的读取2.1 示例代码结构2.2 读取机载GPS信息…

铝电解电容使用寿命

铝电解电容寿命问题 铝电解电容为什么会失效&#xff1f;铝电解电容失效与那些因素有关&#xff1f;电解电容寿命如何计算&#xff1f; 1铝电解电容为什么会失效&#xff1f; 电容都是由两个导电板并排放到一起就构成了。正极是铝&#xff08;阳极箔&#xff09;&#xff0c;…

用Python PySide6 复刻了两软件UI 做下练习

图样 1 代码 1&#xff1a; # -*- coding: utf-8 -*-import sys from PySide6.QtCore import (QCoreApplication, QMetaObject, QRect, QDate) from PySide6.QtGui import QIcon, QPixmap, QColor from PySide6.QtWidgets import (QApplication, QDialog, QLineEdit, QPushBut…

安装MongoDB,环境配置

官网下载地址&#xff1a;MongoDB Shell Download | MongoDB 选择版本 安装 下载完成双击打开 点击mongodb-windows-x86_64-8.0.0-signed 选择安装地址 检查安装地址 安装成功 二.配置MongoDB数据库环境 1.找到安装好MongoDB的bin路径 复制bin路径 打开此电脑 -> 打开高级…

Spring学习(一)——Sping-XML

一、Spring的概述 (一)什么是Spring? Spring是针对bean对象的生命周期进行管理的轻量级容器。提供了功能强大IOC、AOP及Web MVC等功能。Spring框架主要由七部分组成&#xff1a;分别是 Spring Core、 Spring AOP、 Spring ORM、 Spring DAO、Spring Context、 Spring Web和 S…

重温设计模式--职责链模式

文章目录 职责链模式的详细介绍C 代码示例C示例代码2 职责链模式的详细介绍 定义与概念 职责链模式&#xff08;Chain of Responsibility Pattern&#xff09;是一种行为型设计模式&#xff0c;它旨在将请求的发送者和多个接收者解耦&#xff0c;让多个对象都有机会处理请求&a…

easegen将教材批量生成可控ppt课件方案设计

之前客户提出过一个需求&#xff0c;就是希望可以将一本教材&#xff0c;快速的转换为教学ppt&#xff0c;虽然通过人工程序脚本的方式&#xff0c;已经实现了该功能&#xff0c;但是因为没有做到通用&#xff0c;每次都需要修改脚本&#xff0c;无法让客户自行完成所有流程&am…

高考志愿填报:如何制定合理的志愿梯度?

高考志愿填报中常见的避雷行为&#xff0c;深入分析了专业选择、招生政策了解、学校选择、备选方案准备以及防诈骗等方面的关键问题&#xff0c;并提出了针对性的建议与策略。旨在为考生和家长提供实用的指导&#xff0c;助力考生科学合理地填报高考志愿&#xff0c;避免陷入各…

如何查看vivado项目所使用的版本

在我们提供的各类教程中vivado使用的版本都不同&#xff0c;而使用不同版本的vivado打开项目时可能会产生一些其它错误&#xff0c;所有最好使用对应的vivado版本打开&#xff0c;本例主要演示如何查看项目所示使用的vivado版本。 如下图所示&#xff0c;为vivado2023.1版本创建…

ue5 pcg(程序内容生成)真的简单方便,就5个节点

总结&#xff1a; 前情提示 鼠标单击右键平移节点 1.编辑-》插件-》procedural->勾选两个插件 2.右键-》pcg图表-》拖拽进入场景 3.先看点point 右键-》调试(快捷键d)->右侧设置粒子数 3.1调整粒子数 可以在右侧输入框&#xff0c;使用加减乘除 4.1 表面采样器 …

光谱相机在农业的应用

一、作物生长监测1、营养状况评估 原理&#xff1a;不同的营养元素在植物体内的含量变化会导致植物叶片或其他组织的光谱反射率特性发生改变。例如&#xff0c;氮元素是植物叶绿素的重要组成部分&#xff0c;植物缺氮时&#xff0c;叶绿素含量下降&#xff0c;其在可见光波段&a…

基于Springboot的数字科技风险报告管理系统

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了多年的设计程序开发&#xff0c;开发过上千套设计程序&#xff0c;没有什么华丽的语言&#xff0c;只有实…

14,攻防世界Web_php_unserialize

进入场景 看见代码&#xff0c;解析一下 这段PHP代码定义了一个名为Demo的类&#xff0c;并演示了如何通过URL参数进行反序列化和文件高亮显示的功能&#xff0c;同时也包含了一些安全措施以防止对象注入攻击。下面是对这段代码的逐行解释&#xff1a; 1.<php 开始PHP代码…

基于NodeMCU的物联网窗帘控制系统设计

最终效果 基于NodeMCU的物联网窗帘控制系统设计 项目介绍 该项目是“物联网实验室监测控制系统设计&#xff08;仿智能家居&#xff09;”项目中的“家电控制设计”中的“窗帘控制”子项目&#xff0c;最前者还包括“物联网设计”、“环境监测设计”、“门禁系统设计计”和“小…

【Linux开发工具】自动化构建-make/Makefile

&#x1f525;个人主页&#x1f525;&#xff1a;孤寂大仙V &#x1f308;收录专栏&#x1f308;&#xff1a;Linux &#x1f339;往期回顾&#x1f339;&#xff1a;【Linux开发工具】gcc和g &#x1f516;流水不争&#xff0c;争的是滔滔不 一、make和Makefile简介1.1 什么是…