在 Python 3.8 以后的版本中,异步编程变得越来越重要。本文将系统介绍 Python 标准库中的异步编程工具,带领大家掌握 async
/await
语法和 asyncio
的使用。
从一个简单的场景开始
假设我们在处理一些耗时的 I/O 操作,比如读取多个文件或处理多个数据。为了模拟这种场景,我们先用 time.sleep()
来代表耗时操作:
import time
import random
def process_item(item):
# 模拟耗时操作
print(f"处理中:{item}")
process_time = random.uniform(0.5, 2.0)
time.sleep(process_time)
return f"处理完成:{item},耗时 {process_time:.2f} 秒"
def process_all_items():
items = ["任务A", "任务B", "任务C", "任务D"]
results = []
for item in items:
result = process_item(item)
results.append(result)
return results
if __name__ == "__main__":
start = time.time()
results = process_all_items()
end = time.time()
print("\n".join(results))
print(f"总耗时:{end - start:.2f} 秒")
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 1.28 秒
处理完成:任务C,耗时 0.66 秒
处理完成:任务D,耗时 1.80 秒
总耗时:5.72 秒
这段代码的问题很明显:每个任务都必须等待前一个任务完成才能开始。如果有4个任务,每个任务平均耗时1秒,那么总耗时就接近4秒。
认识 async/await
Python 引入了 async
/await
语法来支持异步编程。当我们在函数定义前加上 async
关键字时,这个函数就变成了一个"协程"(coroutine)。而 await
关键字则用于等待一个协程完成。让我们改写上面的代码:
import asyncio
import random
import time
async def process_item(item):
print(f"处理中:{item}")
# async 定义的函数变成了协程
process_time = random.uniform(0.5, 2.0)
# time.sleep() 换成 asyncio.sleep()
await asyncio.sleep(process_time) # await 等待异步操作完成
return f"处理完成:{item},耗时 {process_time:.2f} 秒"
async def process_all_items():
items = ["任务A", "任务B", "任务C", "任务D"]
# 创建任务列表
tasks = [
asyncio.create_task(process_item(item))
for item in items
]
print("开始处理")
results = await asyncio.gather(*tasks)
return results
async def main():
start = time.time()
results = await process_all_items()
end = time.time()
print("\n".join(results))
print(f"总耗时:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
开始处理
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 0.80 秒
处理完成:任务C,耗时 0.83 秒
处理完成:任务D,耗时 1.46 秒
总耗时:1.97 秒
让我们详细解释这段代码的执行过程:
- 当函数被
async
关键字修饰后,调用该函数不会直接执行函数体,而是返回一个协程对象 - await 关键字只能在
async
函数内使用,它表示"等待这个操作完成后再继续" asyncio.create_task()
将协程包装成一个任务,该任务会被事件循环调度执行asyncio.gather()
并发运行多个任务,并等待它们全部完成asyncio.run()
创建事件循环,运行main()
协程,直到它完成
使用 asyncio.wait_for 添加超时控制
在实际应用中,我们往往需要为异步操作设置超时时间:
import asyncio
import random
import time
async def process_item(item):
process_time = random.uniform(0.5, 2.0)
try:
# 设置1秒超时
await asyncio.wait_for(
asyncio.sleep(process_time),
timeout=1.0
)
return f"处理完成:{item},耗时 {process_time:.2f} 秒"
except asyncio.TimeoutError:
return f"处理超时:{item}"
async def main():
items = ["任务A", "任务B", "任务C", "任务D"]
tasks = [
asyncio.create_task(process_item(item))
for item in items
]
start = time.time()
results = await asyncio.gather(*tasks, return_exceptions=True)
end = time.time()
print("\n".join(results))
print(f"总耗时:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
处理超时:任务A
处理完成:任务B,耗时 0.94 秒
处理超时:任务C
处理完成:任务D,耗时 0.78 秒
总耗时:1.00 秒
使用异步上下文管理器
Python 中的 with
语句可以用于资源管理,类似地,异步编程中我们可以使用 async with
。一个类要支持异步上下文管理,需要实现 __aenter__
和 __aexit__
方法:
import asyncio
import random
class AsyncResource:
async def __aenter__(self):
# 异步初始化资源
print("正在初始化资源...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步清理资源
print("正在清理资源...")
await asyncio.sleep(0.1)
async def process(self, item):
# 异步处理任务
print(f"正在处理任务:{item}")
process_time = random.uniform(0.5, 2.0)
await asyncio.sleep(process_time)
return f"处理完成:{item},耗时 {process_time:.2f} 秒"
async def main():
items = ["任务A", "任务B", "任务C"]
async with AsyncResource() as resource:
tasks = [
asyncio.create_task(resource.process(item))
for item in items
]
results = await asyncio.gather(*tasks)
print("\n".join(results))
if __name__ == "__main__":
asyncio.run(main())
正在初始化资源...
正在处理任务:任务A
正在处理任务:任务B
正在处理任务:任务C
正在清理资源...
处理完成:任务A,耗时 1.31 秒
处理完成:任务B,耗时 0.77 秒
处理完成:任务C,耗时 0.84 秒
使用事件循环执行阻塞操作 run_in_executor
在异步编程中,我们可能会遇到一些无法避免的阻塞操作(比如调用传统的同步API)。这时,asyncio.get_running_loop()
和 run_in_executor
就显得特别重要:
import asyncio
import time
import requests # 一个同步的HTTP客户端库
async def blocking_operation():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 在线程池中执行阻塞操作
result = await loop.run_in_executor(
None, # 使用默认的线程池执行器
requests.get, # 要执行的阻塞函数
'http://httpbin.org/delay/1' # 函数参数
)
return result.status_code
async def non_blocking_operation():
await asyncio.sleep(1)
return "非阻塞操作完成"
async def main():
# 同时执行阻塞和非阻塞操作
tasks = [
asyncio.create_task(blocking_operation()),
asyncio.create_task(non_blocking_operation())
]
start = time.time()
results = await asyncio.gather(*tasks)
end = time.time()
print(f"操作结果:{results}")
print(f"总耗时:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
输出:
操作结果:[200, '非阻塞操作完成']
总耗时:1.99 秒
这个例子展示了如何在异步程序中优雅地处理同步操作。如果不使用 run_in_executor
,阻塞操作会阻塞整个事件循环,导致其他任务无法执行:
requests.get()
是同步操作,会阻塞当前线程- 事件循环运行在主线程上
- 如果直接在协程中调用
requests.get()
,整个事件循环都会被阻塞 - 其他任务无法在这期间执行
run_in_executor
会将阻塞操作放到另一个线程中执行- 主线程的事件循环可以继续处理其他任务
- 当线程池中的操作完成时,结果会被返回给事件循环
最佳实践是:
- 尽量使用原生支持异步的库(如
aiohttp
) - 如果必须使用同步库,就用
run_in_executor
- 对于 CPU 密集型任务也可以用
run_in_executor
放到进程池中执行
任务取消:优雅地终止异步操作
有时我们需要取消正在执行的异步任务,比如用户中断操作或超时处理:
import asyncio
import random
async def long_operation(name):
try:
print(f"{name} 开始执行")
while True: # 模拟一个持续运行的操作
await asyncio.sleep(0.5)
print(f"{name} 正在执行...")
except asyncio.CancelledError:
print(f"{name} 被取消了")
raise # 重要:继续传播取消信号
async def main():
# 创建三个任务
task1 = asyncio.create_task(long_operation("任务1"))
task2 = asyncio.create_task(long_operation("任务2"))
task3 = asyncio.create_task(long_operation("任务3"))
# 等待1秒后取消task1
await asyncio.sleep(1)
task1.cancel()
# 等待2秒后取消其余任务
await asyncio.sleep(1)
task2.cancel()
task3.cancel()
try:
# 等待所有任务完成或被取消
await asyncio.gather(task1, task2, task3, return_exceptions=True)
except asyncio.CancelledError:
print("某个任务被取消了")
if __name__ == "__main__":
asyncio.run(main())
输出:
任务1 开始执行
任务2 开始执行
任务3 开始执行
任务1 正在执行...
任务2 正在执行...
任务3 正在执行...
任务1 被取消了
任务2 正在执行...
任务3 正在执行...
任务2 正在执行...
任务3 正在执行...
任务2 被取消了
任务3 被取消了
这个例子展示了如何正确处理任务取消:
- 任务可以在执行过程中被取消
- 被取消的任务会抛出
CancelledError
- 我们应该适当处理取消信号,确保资源被正确清理
深入理解协程:为什么需要 async/await?
协程(Coroutine)是一种特殊的函数,它可以在执行过程中暂停,并在之后从暂停的地方继续执行。当我们使用 async
定义一个函数时,我们实际上是在定义一个协程:
import asyncio
# 这是一个普通函数
def normal_function():
return "Hello"
# 这是一个协程
async def coroutine_function():
await asyncio.sleep(1)
return "Hello"
# 让我们看看它们的区别
print(normal_function) # <function normal_function at 0x1052cc040>
print(coroutine_function) # <function coroutine_function at 0x1054b9790>
# 调用它们的结果不同
print(normal_function()) # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>
await 如何与事件循环协作
协程(Coroutine)的核心在于它可以在执行过程中主动交出控制权,让其他代码有机会执行。让我们通过一个详细的例子来理解这个过程:
import asyncio
async def task1():
print("任务1:开始")
print("任务1:准备休眠")
await asyncio.sleep(2) # 关键点1:交出控制权
print("任务1:休眠结束")
async def task2():
print("任务2:开始")
print("任务2:准备休眠")
await asyncio.sleep(1) # 关键点2:交出控制权
print("任务2:休眠结束")
async def main():
# 同时执行两个任务
await asyncio.gather(task1(), task2())
asyncio.run(main())
这段代码的输出会是:
任务1:开始
任务1:准备休眠
任务2:开始
任务2:准备休眠
任务2:休眠结束 # 1秒后
任务1:休眠结束 # 2秒后
让我们详细解释执行过程:
-
当程序遇到
await asyncio.sleep(2)
时:- 这个
sleep
操作被注册到事件循环中 - Python 记录当前的执行位置
- task1 主动交出控制权
- 重要:task1 并没有停止运行,而是被暂停了,等待之后恢复
- 这个
-
事件循环接管控制权后:
- 寻找其他可以执行的协程(这里是 task2)
- 开始执行 task2,直到遇到
await asyncio.sleep(1)
- task2 也交出控制权,被暂停
-
事件循环继续工作:
- 管理一个计时器,追踪这两个 sleep 操作
- 1秒后,发现 task2 的 sleep 时间到了
- 恢复 task2 的执行,打印"任务2:休眠结束"
- 2秒到时,恢复 task1 的执行,打印"任务1:休眠结束"
这就像是一个指挥家(事件循环)在指挥一个管弦乐队(多个协程):
- 当某个乐器(协程)需要休息时,它举手示意(await)
- 指挥家看到后,立即指挥其他乐器演奏
- 当休息时间到了,指挥家会示意这个乐器继续演奏
代码验证:
import asyncio
import time
async def report_time(name, sleep_time):
print(f"{time.strftime('%H:%M:%S')} - {name}开始")
await asyncio.sleep(sleep_time)
print(f"{time.strftime('%H:%M:%S')} - {name}结束")
async def main():
# 同时执行多个任务
await asyncio.gather(
report_time("任务A", 2),
report_time("任务B", 1),
report_time("任务C", 3)
)
asyncio.run(main())
输出:
00:19:26 - 任务A开始
00:19:26 - 任务B开始
00:19:26 - 任务C开始
00:19:27 - 任务B结束
00:19:28 - 任务A结束
00:19:29 - 任务C结束
这种机制的优势在于:
- 单线程执行,没有线程切换开销
- 协程主动交出控制权,而不是被操作系统强制切换
- 比起回调地狱,代码更清晰易读
- 错误处理更直观,可以使用普通的
try
/except
理解了这个机制,我们就能更好地使用异步编程:
- 在
await
的时候,其他协程有机会执行 - 耗时操作应该是真正的异步操作(比如
asyncio.sleep
) - 不要在协程中使用阻塞操作,那样会卡住整个事件循环
小结
Python 的异步编程主要依赖以下概念:
async
/await
语法:定义和等待协程asyncio
模块:提供事件循环和任务调度Task
对象:表示待执行的工作单元- 异步上下文管理器:管理异步资源
使用异步编程的关键点:
- I/O 密集型任务最适合使用异步编程
- 所有耗时操作都应该是真正的异步操作
- 注意处理超时和异常情况
- 合理使用
asyncio.gather()
和 asyncio.wait_for()
异步编程不是万能的,但在处理 I/O 密集型任务时确实能带来显著的性能提升。合理使用这些工具,能让我们的程序更高效、更优雅。