前言
本文将和大家一起探讨python的多协程并发编程(上篇),使用内置基本库asyncio来实现并发,先通过官方来简单使用这个模块。先打好基础,能够有个基本的用法与认知,后续文章,我们再进行详细使用。
本文为python并发编程的第十篇,上一篇文章地址如下:
python:并发编程(九)_Lion King的博客-CSDN博客
下一篇文章地址如下:
python:并发编程(十一)_Lion King的博客-CSDN博客
一、快速开始
官方文档:asyncio --- 异步 I/O — Python 3.11.4 文档
1、事件循环
asyncio
提供了事件循环(Event Loop)作为协程的调度器和执行者。事件循环负责处理协程的调度和执行,并处理事件、回调函数等。以下是关于 asyncio
事件循环的一些重要概念和用法:
(1)获取事件循环对象:可以使用 asyncio.get_event_loop()
获取默认的事件循环对象,或者使用 asyncio.new_event_loop()
创建一个新的事件循环对象。
(2)设置默认事件循环:可以使用 asyncio.set_event_loop()
设置默认的事件循环对象。
(3)运行事件循环:可以使用 loop.run_forever()
方法以无限循环的方式运行事件循环,直到调用 loop.stop()
。
(4)执行协程:可以使用 loop.run_until_complete()
方法执行一个协程或任务,并等待其完成。
(5)停止事件循环:可以使用 loop.stop()
停止事件循环的运行。
(6)调度协程:可以使用 loop.create_task()
方法创建一个任务,并将其添加到事件循环中运行。
(7)定时调度:可以使用 loop.call_later()
或 loop.call_at()
方法在指定的时间点调度回调函数的执行。
(8)异常处理:可以使用 try-except
块来捕获和处理在协程执行过程中抛出的异常。
以下是一个简单的示例代码,演示了如何使用 asyncio
的事件循环:
import asyncio
# 定义一个协程
async def my_coroutine():
print("Coroutine is running")
await asyncio.sleep(1) # 模拟耗时操作
print("Coroutine is done")
# 创建事件循环
loop = asyncio.get_event_loop()
# 将协程添加到事件循环中
task = loop.create_task(my_coroutine())
# 运行事件循环直到任务完成
loop.run_until_complete(task)
# 关闭事件循环
loop.close()
以上代码创建了一个简单的协程 my_coroutine()
,并将其添加到事件循环中运行。loop.run_until_complete(task)
运行事件循环直到任务完成,然后关闭事件循环。
通过使用 asyncio
的事件循环,可以方便地管理和调度协程的执行,实现异步操作和并发编程。
2、Futures
在 asyncio
中,Futures
是一种用于表示异步操作结果的对象。它充当了协程和异步函数之间的桥梁,可以用于等待和获取异步操作的结果。
Futures
提供了以下主要功能:
(1)表示异步操作的结果:Future
对象表示一个尚未完成的异步操作,可以在协程中使用它来等待操作的完成,并获取最终的结果。
(2)设置结果值:通过调用 Future
对象的 set_result()
方法,可以设置异步操作的结果值。
(3)获取结果值:可以使用 await
关键字或 yield from
表达式等待 Future
对象的完成,并获取操作的结果值。
(4)异常处理:Future
对象可以捕获和传播异步操作中抛出的异常。可以使用 try-except
块来捕获异常,并使用 set_exception()
方法将异常传播给等待的协程。
以下是一个简单的示例代码,演示了如何使用 Futures
:
import asyncio
# 定义一个异步函数
async def my_async_function():
await asyncio.sleep(1)
return "Async operation completed"
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建一个 Future 对象
future = asyncio.ensure_future(my_async_function())
# 执行事件循环,等待 Future 完成
loop.run_until_complete(future)
# 获取 Future 的结果值
result = future.result()
print(result)
# 关闭事件循环
loop.close()
在上述示例中,通过调用 asyncio.ensure_future()
方法,将异步函数 my_async_function()
转换为 Future
对象。然后,使用 loop.run_until_complete()
方法等待 Future
对象的完成,并获取结果值。
通过使用 Futures
,可以方便地进行异步操作的等待和结果处理,实现并发编程和异步任务的管理。
3、传输和协议
服务端
运行以下代码将启动一个简单的服务端,监听本地的8888端口。当客户端连接并发送数据时,服务端将接收数据并发送回一条响应。请确保在运行服务器之前,没有其他应用程序在使用相同的IP地址和端口号,并且防火墙设置不会阻止客户端与服务器建立连接。
import asyncio
# 自定义协议类
class MyProtocol(asyncio.Protocol):
def connection_made(self, transport):
print('Client connected')
self.transport = transport
def data_received(self, data):
print('Received:', data.decode())
# 响应客户端请求
response = b'Hello, client!'
self.transport.write(response)
def connection_lost(self, exc):
print('Client disconnected')
# 创建事件循环
loop = asyncio.get_event_loop()
# 启动服务端
async def start_server():
# 创建服务器
server = await loop.create_server(lambda: MyProtocol(), 'localhost', 8888)
# 获取绑定的地址和端口
address = server.sockets[0].getsockname()
print('Server started at', address)
# 等待服务器关闭
await server.wait_closed()
# 运行事件循环,启动服务端
loop.run_until_complete(start_server())
客户端
在 asyncio
中,传输(Transport)和协议(Protocol)是用于实现网络通信的重要组件。
**传输(Transport)**是网络连接的抽象表示,负责发送和接收数据。它提供了发送和关闭连接的方法,以及处理连接状态的相关属性。
**协议(Protocol)**是定义网络通信规则和行为的抽象接口。它定义了在网络连接上进行数据交换的方式,包括数据的编码、解码和处理等。协议通常是从 asyncio.Protocol
类派生而来的子类。
在 asyncio
中,可以通过以下步骤来使用传输和协议进行网络通信:
(1)创建传输和协议对象:使用 loop.create_connection()
方法创建传输和协议对象。该方法接受一个协议类和主机地址、端口号等参数,并返回一个 (transport, protocol)
对象。
(2)连接到远程主机:调用传输对象的 transport.connect()
方法,将传输对象与远程主机建立连接。连接成功后,将调用协议对象的 protocol.connection_made()
方法。
(3)数据收发:使用传输对象的 transport.write()
方法发送数据到远程主机,以及通过协议对象的回调方法接收和处理远程主机发送的数据。
(4)关闭连接:调用传输对象的 transport.close()
方法关闭连接。关闭连接后,将调用协议对象的 protocol.connection_lost()
方法。
以下是一个简单的示例代码,演示了如何使用传输和协议进行网络通信:
import asyncio
# 自定义协议类
class MyProtocol(asyncio.Protocol):
def connection_made(self, transport):
print('Connected')
self.transport = transport
def data_received(self, data):
print('Received:', data.decode())
def connection_lost(self, exc):
print('Connection closed')
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建传输和协议对象
coro = loop.create_connection(MyProtocol, 'localhost', 8888)
# 运行事件循环,建立连接
transport, protocol = loop.run_until_complete(coro)
# 发送数据
transport.write(b'Hello, server!')
# 关闭连接
transport.close()
# 停止事件循环
loop.stop()
在上述示例中,我们自定义了一个协议类 MyProtocol
,继承自 asyncio.Protocol
。然后,使用 loop.create_connection()
方法创建传输和协议对象,并通过 loop.run_until_complete()
方法建立连接。最后,通过传输对象的 transport.write()
方法发送数据,并调用传输对象的 transport.close()
方法关闭连接。
通过使用传输和协议,我们可以方便地实现网络通信,并处理收发数据的逻辑。
4、策略
在asyncio
中,策略对象是用于控制事件循环行为的一种机制。asyncio
提供了默认的策略对象,但也允许用户自定义和替换策略对象来满足特定的需求。
默认的策略对象是asyncio.DefaultEventLoopPolicy
,它基于具体的事件循环实现,如asyncio.SelectorEventLoop
(基于selectors
模块)或asyncio.SelectorEventLoop
(基于select
模块)。默认策略对象通过asyncio.get_event_loop_policy()
方法获取。
用户可以通过自定义策略对象来改变事件循环的行为,例如使用特定的事件循环实现或者自定义调度器。自定义策略对象必须实现asyncio.AbstractEventLoopPolicy
接口的方法,例如get_event_loop()
、set_event_loop()
、new_event_loop()
等。
下面是一个使用自定义策略对象的示例:
import asyncio
# 自定义策略对象
class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
loop = super().get_event_loop()
print('Using custom event loop:', type(loop).__name__)
return loop
# 其他自定义方法...
# 设置自定义策略对象
asyncio.set_event_loop_policy(MyEventLoopPolicy())
# 创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_forever()
在这个示例中,我们创建了一个自定义的策略对象MyEventLoopPolicy
,并重写了get_event_loop()
方法来打印使用的事件循环类型。然后通过asyncio.set_event_loop_policy()
方法将自定义策略对象设置为当前的策略。最后,使用asyncio.get_event_loop()
获取事件循环并运行。
请注意,自定义策略对象的设置应该在创建事件循环之前进行,以确保新创建的事件循环使用自定义策略。
5、Extending
asyncio
提供了扩展其功能的机制,使开发者能够根据自己的需求进行定制和扩展。以下是一些扩展asyncio
功能的常见方法:
(1)自定义协议(Protocol):可以通过继承asyncio.Protocol
类来创建自定义的协议。通过实现connection_made
、data_received
和connection_lost
等方法,可以处理连接的建立、数据的接收和连接的关闭等事件。
(2)自定义传输(Transport):可以通过继承asyncio.BaseTransport
类来创建自定义的传输。传输是协议和网络层之间的接口,负责处理数据的发送和接收。
(3)自定义事件循环(Event Loop):可以通过继承asyncio.AbstractEventLoop
类来创建自定义的事件循环。事件循环是asyncio
的核心组件,负责调度和执行异步任务。
(4)自定义异步任务(Coroutines):可以使用asyncio.coroutine
装饰器和async def
关键字来定义自己的异步任务。通过使用协程,可以编写异步的、非阻塞的代码逻辑。
(5)自定义异步函数(Async Functions):可以使用@asyncio.coroutine
装饰器和async def
关键字来定义自己的异步函数。异步函数可以与协程配合使用,用于编写更高级的异步逻辑。
(6)自定义事件处理器(Event Handlers):可以使用asyncio
提供的事件处理器机制来定制和处理特定的事件。例如,可以注册和处理定时器事件、I/O事件、信号事件等。
通过以上扩展方法,可以根据具体需求对asyncio
进行定制和扩展,以满足特定的异步编程场景和功能要求。这使得asyncio
具有很高的灵活性和可扩展性,适用于各种异步编程任务和应用领域。
下面是一个简单的示例代码,展示了如何扩展asyncio
的功能,通过自定义协议和传输来实现一个简单的Echo服务器:
import asyncio
# 自定义协议类
class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
message = data.decode()
print("Received:", message)
self.transport.write(data)
def connection_lost(self, exc):
print("Connection closed")
# 自定义传输类
class EchoTransport(asyncio.Transport):
def __init__(self, loop):
super().__init__(extra=None)
self.loop = loop
self.buffer = b''
def write(self, data):
print("Sending:", data.decode())
self.buffer += data
def close(self):
print("Closing connection")
def can_write_eof(self):
return False
def get_write_buffer_size(self):
return len(self.buffer)
def write_eof(self):
pass
def abort(self):
pass
def pause_reading(self):
pass
def resume_reading(self):
pass
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建协议和传输对象
protocol = EchoProtocol()
transport = EchoTransport(loop)
# 将协议和传输对象关联起来
protocol.connection_made(transport)
# 模拟收到数据并处理
data = b"Hello, server!"
protocol.data_received(data)
# 关闭连接
protocol.connection_lost(None)
# 停止事件循环
loop.stop()
这个示例中,EchoProtocol
类继承了asyncio.Protocol
,并实现了connection_made
、data_received
和connection_lost
等方法,用于处理连接的建立、数据的接收和连接的关闭。EchoTransport
类继承了asyncio.Transport
,并实现了一系列传输相关的方法,用于处理数据的发送和关闭连接。
通过自定义协议和传输,我们可以根据需要来扩展和定制asyncio
的功能,以适应不同的应用场景和需求。