Python 协程在基础学习阶段,属于有难度的知识点,建议大家在学习的时候,一定要反复练习。
Python 中的协程是一种用户态的轻量级线程。它与普通的线程不同,普通线程是由操作系统调度的,而协程是由程序自己调度的。因此,协程可以更高效地使用系统资源,更快地响应请求。
文章目录
- Python 协程的简单实现
- Python 协程高级用法
- 定义协程返回值
- 取消协程
- 协程中的异常处理
- 使用 asyncio.Queue 类来在协程之间传递消息
- 使用 asyncio.Lock 类来保证协程之间的互斥访问
- 使用 asyncio.Semaphore 类来限制协程的并发数量
- Python 协程实现网络异步请求
- Python 协程读写文件
- Python 协程扩展
Python 协程的简单实现
在 Python 中,协程可以使用 async/await 关键字来实现。使用 async
关键字声明一个协程,在协程中使用 await
关键字来等待其他协程的完成。
例如,下面的代码定义了一个协程,在这个协程中会等待另一个协程的完成:
import asyncio
async def first_coroutine():
print('协程 1')
async def second_coroutine():
print('协程 2')
await first_coroutine()
print('协程 2 在执行协程 1 之后的输出')
async def main():
await second_coroutine()
asyncio.run(main())
上述代码如果你在 Python3.7 之前的版本中运行,会出现错误 module 'asyncio' has no attribute 'run'
,如果你不想进行版本升级,可以使用 get_event_loop()
函数获取事件循环,然后通过 run_until_complete()
函数来运行协程,例如:
import asyncio
async def first_coroutine():
print('协程 1')
async def second_coroutine():
print('协程 2')
await first_coroutine()
print('协程 2 在执行协程 1 之后的输出')
async def main():
await second_coroutine()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
运行代码,得到下述内容输出。
另一个方法是使用 asyncio.get_event_loop().run_until_complete(main())
,示例代码如下。
import asyncio
async def first_coroutine():
print('协程 1')
async def second_coroutine():
print('协程 2')
await first_coroutine()
print('协程 2 在执行协程 1 之后的输出')
async def main():
await second_coroutine()
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
# loop.close()
asyncio.get_event_loop().run_until_complete(main())
最后要注意,使用协程后, 不要忘记关闭事件循环,使用 loop.close()
来关闭事件循环。
Python 协程高级用法
定义协程返回值
协程可以使用 return 语句来返回值,但是在调用协程时需要使用 asyncio.create_task()
或 asyncio.ensure_future()
函数来创建任务,然后使用 await 关键字来等待结果。
下面是一个定义协程返回值的示例:
import asyncio
async def my_coro():
return "协程内部返回值,梦想橡皮擦"
async def main():
result = await my_coro()
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
在这个示例中,my_coro()
协程使用 return 语句来返回一个字符串。在 main()
协程中使用 await
关键字来等待 my_coro()
协程的完成,并将结果保存到 result
变量中。最后, 使用 print(result)
打印出结果。
在
main()
函数中使用asyncio.create_task()
或asyncio.ensure_future()
来创建任务, 以等待协程的结果, 也是可行的。
示例如下,代码仅支持 Python3.7 以上版本。
import asyncio
async def my_coro():
return "协程内部返回值,梦想橡皮擦"
async def main():
task = asyncio.create_task(my_coro())
result = await task
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
如果你使用的是 Python3.7 及以下版本,需要使用 asyncio.ensure_future()
来创建协程任务。
import asyncio
async def my_coro():
return "协程内部返回值,梦想橡皮擦"
async def main():
task = asyncio.ensure_future(my_coro())
result = await task
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
取消协程
在 Python 中,可以使用 asyncio.CancelledError
异常来取消协程。
首先,使用 asyncio.create_task()
或 asyncio.ensure_future()
创建协程任务。然后,使用 task.cancel()
方法来取消协程任务。
以下代码演示了如何在 10 秒后取消协程任务:
import asyncio
async def my_coro():
try:
while True:
print("梦想橡皮擦,你好")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("取消协程任务")
async def main():
task = asyncio.ensure_future(my_coro())
await asyncio.sleep(10)
task.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
在这个示例中,my_coro()
协程在一个无限循环中打印 "梦想橡皮擦,你好"
,每次等待 1 秒。在 main()
协程中使用 asyncio.ensure_future()
创建该协程的任务,并在 10 秒后使用 task.cancel()
取消协程任务。
这里需要注意,如果协程没有捕获 CancelledError
异常, 它不会知道自己被取消了。还可以使用 loop.stop()
来停止事件循环,并取消所有正在运行的协程,在下一次调用 run_forever()
时,它会自动取消所有协程。
协程中的异常处理
在协程中可以使用 try/except
语句来捕获和处理异常。
import asyncio
async def my_coro():
try:
print("启动协程")
await asyncio.sleep(1) # 等待 1 秒
1 / 0 # 编写一段错误代码
print("协程结束")
except ZeroDivisionError as e:
print(f"异常: {e}")
finally:
print("橡皮擦的代码")
async def main():
task = asyncio.ensure_future(my_coro())
await task
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
在这个例子中,我们定义了一个名为 my_coro()
的协程。在协程中,使用 try/except 块来捕获 ZeroDivisionError 异常。当这个异常发生时,将打印 "异常:: division by zero"
。
在 try/except 块后面还有一个 finally 块。这个块中的代码总是会被执行,无论是否发生异常。在本例中,将打印 "橡皮擦的代码"
在 main()
函数中, 我们依旧使用 asyncio.ensure_future()
创建 my_coro()
的任务,并使用 await task
等待它完成。
使用 asyncio.Queue 类来在协程之间传递消息
asyncio.Queue 类是 Python 中异步编程中常用的队列类,可用于在协程之间传递消息。这个类可以在协程之间传递消息,可以使用 put()
函数来添加消息,使用 get()
函数来获取消息。
import asyncio
async def producer(queue: asyncio.Queue):
for i in range(5):
await queue.put(i)
print(f"生产者 {i}")
await asyncio.sleep(1)
async def consumer(queue: asyncio.Queue):
while True:
value = await queue.get()
print(f"消费者 {value}")
if value == 4:
break
async def main():
queue = asyncio.Queue()
producer_task = asyncio.ensure_future(producer(queue))
consumer_task = asyncio.ensure_future(consumer(queue))
await producer_task
await consumer_task
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
本案例中,我们定义了两个协程:producer
和 consumer
。 producer 协程每秒生产一个整数,并使用 asyncio.Queue.put()
方法将其放入队列。 consumer 协程循环地从队列中获取消息,使用 asyncio.Queue.get()
方法取出队列中的值,并打印出来。如果消费到 4 就退出循环。
在 main() 函数中,我们创建了一个 asyncio.Queue 的实例,并分别创建了 producer 和 consumer 协程的任务。然后使用 await producer_task
和 await consumer_task
等待两个任务完成。
使用 asyncio.Lock 类来保证协程之间的互斥访问
asyncio.Lock 类是 Python 中 asyncio 库中用来保证协程之间互斥访问的类,这个类可以保证在同一时刻只有一个协程在访问共享资源。
下面为搭建演示一下如何使用该类。
import asyncio
async def my_coro(lock: asyncio.Lock, i):
async with lock:
print(f"协程 {i} 已经获得锁")
await asyncio.sleep(1)
print(f"协程 {i} 已经释放锁")
async def main():
lock = asyncio.Lock()
task1 = asyncio.ensure_future(my_coro(lock, 1))
task2 = asyncio.ensure_future(my_coro(lock, 2))
await task1
await task2
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
本案例中 ,我们定义了一个名为 my_coro()
的协程。它接受一个 asyncio.Lock
实例和一个整数作为参数。在协程中,我们使用 async with lock
: 块来请求获取锁。如果锁当前未被占用,则将获取锁,并执行块中的代码。如果锁已被占用,则协程将被阻塞,直到锁被释放。在这里,当一个协程获取了锁之后,它会打印 “协程 {i} 已经获得锁”,然后睡眠 1s, 打印 “协程 {i} 已经释放锁”。
因为 Lock 是互斥锁,所以两个协程获取锁的时间是互不干扰的,并且不会发生竞争情况。
使用 asyncio.Semaphore 类来限制协程的并发数量
asyncio.Semaphore 类是 Python 中 asyncio 库中用来限制协程并发数量的类。
import asyncio
async def my_coro(semaphore: asyncio.Semaphore, i):
async with semaphore:
print(f"协程 {i} 正在运行")
await asyncio.sleep(1)
print(f"协程 {i} 结束")
async def main():
semaphore = asyncio.Semaphore(3)
tasks = []
for i in range(10):
tasks.append(asyncio.ensure_future(my_coro(semaphore, i)))
await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
上述代码运行效果如图。
在本案例中,我们定义了一个名为 my_coro() 的协程。它接受一个 asyncio.Semaphore 实例和一个整数作为参数。在协程中,我们使用 async with semaphore:
块来请求获取信号量。如果信号量的计数值大于 0,则将获取信号量,计数值减 1,并执行块中的代码。如果计数值等于 0,则协程将被阻塞,直到信号量被释放。
在 main() 函数中,我们创建了一个 asyncio.Semaphore 的实例,并创建了 10 个 my_coro()
协程的任务。因为我们限制信号量只有 3 个许可,所以同时只有 3 个协程可以运行。最后通过 await asyncio.gather(*tasks)
来等待任务结束。
asyncio.gather(*tasks)
是 asyncio 库中用于并行运行多个协程的函数。它接受多个协程的任务作为参数,并返回一个新的协程,这个新协程会在所有传入的协程任务都完成后结束。
在前文代码中, 我们创建了 10 个 my_coro()
协程的任务,并使用 asyncio.gather(*tasks)
来等待这些任务的完成。这样做的效果相当于等待所有任务结束并返回结果,而不是串行等待每个任务结束。
Python 协程实现网络异步请求
使用 async/await 关键字来实现异步网络请求。
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://bing.com')
print(html)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
上述代码用到 aiohttp
库,需要提前进行安装。
这个程序使用 aiohttp
库来发送网络请求,fetch()
函数是一个协程,它使用 await
关键字来等待响应。main() 函数也是一个协程,它使用 await 关键字来等待 fetch()
函数的完成。
在此基础上,我们扩展到多个网络请求,代码如下。
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html1, html2 = await asyncio.gather(
fetch(session, 'https://bing.com'),
fetch(session, 'https://baidu.com')
)
print(html1)
print(html2)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
这个程序使用 asyncio.gather()
函数来并发地运行两个 fetch() 函数。
Python 协程读写文件
示例代码如下,请提前在 Python 文件目录准备 ca.txt
文件。
import asyncio
async def read_file(file_name):
with open(file_name, 'r',encoding='utf-8') as f:
return f.read()
async def write_file(file_name, data):
with open(file_name, 'w',encoding='utf-8') as f:
f.write(data)
async def main():
data = await read_file('ca.txt')
await write_file('ca_copy.txt', data)
asyncio.get_event_loop().run_until_complete(main())
该程序使用read_file()
和 write_file()
两个协程来读取和写入文件。在 main() 函数中使用 await 关键字来等待这两个协程的完成。
使用 asyncio.get_event_loop().run_until_complete(main())
来运行协程时,是不需要手动关闭协程的,该方法会在协程完成后自动关闭事件循环。这个方法会阻塞程序直到协程完成,所以在协程运行结束后就可以直接结束程序了。
与之对应的高版本 Python 使用的 asyncio.run(main())
,这个函数会自动创建一个事件循环并在协程完成后关闭它,也不需要再调用 close()方法关闭事件循环。
Python 协程扩展
协程是一种非常强大的编程模型,它可以帮助我们更好地处理并发和异步编程。除了我之前提到的内容之外,还有一些其他的知识点可以补充:
- 使用
asyncio.shield()
函数来保护协程不被取消。 - 使用
asyncio.as_completed()
函数来迭代完成的协程的结果。 - 使用
asyncio.wait()
函数来等待多个协程完成,并返回已完成的协程的结果。
asyncio 库中还有很多其他类和函数可以帮助我们处理多种不同类型的并发和异步编程场景,如:asyncio.Condition
、asyncio.Event
、asyncio.Barrier
、 asyncio.TimeoutError
等。
协程与线程的区别
协程是一种用户态的轻量级线程,它可以在一个线程中被调度和执行,而线程是操作系统级别的线程,通常需要更多的系统资源来进行调度和执行。由于协程可以在一个线程中被调度和执行,所以它可以充分利用线程的资源,并可以在不同的协程之间切换,实现高效的并发。而线程则需要在不同的线程之间切换,并且由于需要更多的系统资源进行调度和执行,所以并发性能通常不如协程。
📢📢📢📢📢📢
💗 你正在阅读 【梦想橡皮擦】 的博客
👍 阅读完毕,可以点点小手赞一下
🌻 发现错误,直接评论区中指正吧
📆 橡皮擦的第 831 篇原创博客
从订购之日起,案例 5 年内保证更新
- ⭐️ Python 爬虫 120,点击订购 ⭐️
- ⭐️ 爬虫 100 例教程,点击订购 ⭐️