常见的并发模型
- 多进程/多线程
- 异步
- Actor
- Pub/Sub
Python 异步的基石:协程
协程简介
概念:协作式多任务的子程序,用户态线程或微线程(Coroutine)。
特点:子程序执行可以中断,恢复后不会丢失之前的上下文状态。
区别:与线程不同,协程是用户级的非抢占式调度,比线程更轻量(纳秒级的 CPU 时间),无资源竞争,无需加锁。
Python 使用协程 + IO 多路复用实现异步编程。
Python 协程的发展历程
- PEP 255 (Python 2.2):引入生成器,实现可迭代对象的惰性求值。
- PEP 342:引入
send
和throw
方法,允许在try/finally
中使用yield
。 - PEP 380 (Python 3.3):引入
yield from
,简化生成器/协程的实现和串联。 - PEP 3156 (Python 3.4):试验性引入异步 I/O 框架 asyncio。
- PEP 492 (Python 3.5):通过
async/await
语法明确支持协程。 - Python 3.6:
asyncio
成为标准库的一部分。
Python 第三方异步 I/O 方案
- Greenlet
- Gevent
- Eventlet
- Twisted
官方异步模块 Asyncio
认识几个概念
- Task:协程的高层抽象。
- Future:沟通协程和事件循环。
- Coroutine:协程与
await
搭配使用的语法糖。 - Event Loop:事件循环协程的执行调度。
从一个爬虫例子着手
阻塞方式写法
import socket
def blocking_run():
sock = socket.socket()
sock.connect(("example.com", 80))
request = "GET / HTTP/1.0\r\nHost: example.com\r\n\r\n"
sock.send(request.encode())
response = sock.recv(2048)
return response
for _ in range(10):
print(blocking_run())
非阻塞方式 - 事件循环 + 回调
import socket
from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector
selector = DefaultSelector()
def no_blocking_run():
def on_connect(key, mask):
selector.unregister(key.fd)
request = "GET / HTTP/1.0\r\nHost: example.com\r\n\r\n"
sock.send(request.encode())
selector.register(key.fd, EVENT_READ, on_response)
return "on_connect success"
def on_response(key, mask):
response = sock.recv(2048)
selector.unregister(key.fd)
return "on_response success"
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(("example.com", 80))
except BlockingIOError:
pass
selector.register(sock.fileno(), EVENT_WRITE, on_connect)
def event_loop():
for _ in range(10):
no_blocking_run()
while True:
events = selector.select(timeout=3)
if not events:
break
for event_key, event_mask in events:
callback = event_key.data
print(callback(event_key, event_mask))
执行过程说明
- 启动执行函数,创建 socket 连接并在 selector 上注册可写事件,无阻塞操作,函数立即返回。
- 遍历 10 个不同的下载任务,注册连接事件。
- 启动事件循环,阻塞在事件监听上。
- 当某个下载任务 EVENT_WRITE 被触发,回调其 on_connect 方法。
- 进入下一轮事件循环,处理事件。
回调地狱问题
- 回调函数执行不正常时,错误处理困难。
- 嵌套回调导致代码复杂。
- 状态共享和管理困难。
callback_a(callback_b(callback_c(callback_d(...))))
对比基于协程的解决方案
Future 对象
作为数据接收代理,和系统事件沟通的桥梁
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, func):
self._callbacks.append(func)
def set_result(self, result):
self.result = result
for func in self._callbacks:
func(self)
基于协程重构
def no_blocking_v2():
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(("example.com", 80))
except BlockingIOError:
pass
f = Future()
selector.register(sock.fileno(), EVENT_WRITE, lambda: f.set_result(None))
yield f
selector.unregister(sock.fileno())
request = "GET / HTTP/1.0\r\nHost: example.com\r\n\r\n"
sock.send(request.encode())
f = Future()
selector.register(sock.fileno(), EVENT_READ, lambda: f.set_result(sock.recv(2048)))
yield f
selector.unregister(sock.fileno())
Task 对象
协程高层抽象,能管理和控制协程
class Task:
def __init__(self, coro):
self.coro = coro
f = Future()
f.set_result(None)
self.next(f)
def next(self, future):
try:
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.next)
加入 Event Loop 驱动协程运行
def event_loop_v2():
for _ in range(10):
Task(no_blocking_v2())
while True:
events = selector.select(timeout=3)
if not events:
break
for event_key, event_mask in events:
callback = event_key.data
callback()
整个过程如图所示
协程风格与回调风格对比
- 回调风格:
- 存在链式回调,破坏同步代码结构。
- 需要在回调之间维护状态。
- 协程风格:
- 无链式调用。
- Selector 的回调只需设置 future 的值。
- 事件循环只管调度,结构更接近同步代码。
- 无需在多个协程之间维护状态。
结语
上述只是原型的简单介绍,实际 Asyncio 远比上述原型复杂,需要实现零拷贝、公平调度、异常处理、任务状态管理等。理解原理有助于在后续开发中更好地处理异步问题。
如果你对异步编程感兴趣,欢迎体验 AppBoot 项目,它基于FastAPI提供了一个类似 Django 的开发体验,完全异步,并内置 SQLAlchemy 2.0 支持。使用 AppBoot,你可以轻松快速地开发企业级的异步 Web 服务,感受异步编程的强大与乐趣。