Python----Python高级(并发编程:协程Coroutines,事件循环,Task对象,协程间通信,协程同步,将协程分布到线程池/进程池中)

news2025/3/18 15:26:30

一、协程

1.1、协程

协程,Coroutines,也叫作纤程(Fiber)

协程,全称是“协同程序”,用来实现任务协作。是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。

当出现IO阻塞时,CPU一直等待IO返回,处于空转状态。这时候用协程,可以执行其他任务。当IO返回结果后,再回来处理数据。充分利用了IO等待的时间,提高了效率。

进程与线程是操作系统管理和调度的基本单位,而协程则是由程序员 实现的一种轻量级的、用户空间级别的多任务机制,通常不由操作系统直接提供支持。

1.2、协程的核心(控制流的让出和恢复)

1.每个协程有自己的执行栈,可以保存自己的执行现场

2.可以由用户程序按需创建协程(比如:遇到io操作)

3.协程“主动让出(yield)”执行权时候,会保存执行现场(保存中断时的寄存器上下文和栈),然后切换到其他协程

4.协程恢复执行(resume)时,根据之前保存的执行现场恢复到中断前的状态,继续执行,这样就通过协程实现了轻量的由用户态调度的多任务模型

1.3、协程的特点

1. 占用资源少:协程通常只需要少量的栈空间,这是因为它们采用协作式的多任务 处理机制,可以在固定的栈空间内通过状态保存和恢复来实现任务的切换,相比 多线程和多进程,协程占用的系统资源更少。

2. 切换开销小:协程的切换是在用户态进行的,不需要进行系统调用,也不涉及内 核态的上下文切换,因此其切换开销非常小,远远低于线程间的上下文切换。

3. 可暂停和可恢复的函数:协程允许函数在执行过程中主动暂停(通常是遇到I/O操 作或其他耗时操作时),并将控制权交还给调度器,以便其他协程可以运行。在 I/O操作或其他耗时操作完成后,该协程可以从暂停的地方继续执行,而不会阻塞 整个线程。这种特性使得协程非常适合于处理I/O密集型任务,可以在等待I/O操 作完成时释放CPU,从而提高程序的并发性能和资源利用率。

1.4、协程的优点

1.由于自身带有上下文和栈,无需线程上下文切换的开销,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级;

2.无需原子操作的锁定及同步的开销;

3.方便切换控制流,简化编程模型

4.单线程内就可以实现并发的效果,最大限度地利用cpu,且可扩展性高,成本低(注:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理)

asyncio协程是写爬虫比较好的方式。比多线程和多进程都好. 开辟新的线程和进程是非常耗时的。

1.5、协程的缺点

1.无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。

1.6、与进程和线程的比较

多进程是在操作系统层面实现的并行执行方式,每个进程拥有独立的内存空间和 系统资源,进程间通过进程间通信(IPC)机制(如管道、消息队列、共享内存 等)进行交互,这增加了通信的复杂性。多进程可以充分利用多核处理器的性 能,实现真正的并行计算。由于进程间的隔离性,系统的安全性和稳定性也得到 了提高。然而,进程间通信和同步的开销相对较高,且每个进程的创建和销毁通 常伴随着较大的资源开销。

多线程是在一个进程内部实现的并发执行方式,多个线程共享该进程的内存空间 和资源,这使得线程间通信和数据共享相对容易。但是,这也引入了线程安全问题,需要通过同步机制(如互斥锁、信号量、条件变量等)来避免数据冲突。多 线程的优点在于能够实现并发执行,但线程间的上下文切换开销相比进程较小, 但相比协程则较大,且需要谨慎处理线程安全问题。 

协程是一种轻量级的线程,与多进程和多线程相比,它具有占用资源少、切换开 销小、可以实现高效异步执行等优点。协程通过非阻塞I/O操作来等待数据,当数 据就绪时自动恢复执行,从而提高了程序的执行效率和响应速度。然而,协程也 有其局限性,它只能在单个线程内执行,因此它对于CPU密集型任务来说并没有 什么好处。 

二、实现协程的方法 

在python中,实现协程的方法有以下几种:

        1. 使用async/await关键字:python3.5及以后出现,到目前为止这是目前最主流的 实现协程的方法。

        2. 使用yield关键字:使用yield关键字及其send方法可以实现协程的效果。

        3. 使用asyncio.coroutine:在python3.4发布之后可以使用该装饰器与yield from 配合来实现协程,不过在python3.8弃用。

        4. 使用第三方库:通过其他的第三方库也可以实现协程,如greenlet。

def consumer():
    print("消费者准备接收数据。")
    while True:
        # 接收生产者发送的数据
        data = (yield)
        print("消费者接收到了数据:", data)
def producer(consumer_generator):
    # 启动生成器,使它准备好接收数据
    next(consumer_generator)
    for i in range(5):
        print("生产者发送数据:", i)
        # 发送数据给消费者
        consumer_generator.send(i)
    # 终止生成器
    consumer_generator.close()
if __name__ == '__main__':

    # 创建消费者生成器
    consumer_coroutine = consumer()
    # 创建生产者
    producer(consumer_coroutine)
'''
消费者准备接收数据。
生产者发送数据: 0
消费者接收到了数据: 0
生产者发送数据: 1
消费者接收到了数据: 1
生产者发送数据: 2
消费者接收到了数据: 2
生产者发送数据: 3
消费者接收到了数据: 3
生产者发送数据: 4
消费者接收到了数据: 4
'''

2.1、async

async关键字是Python异步编程的核心组成部分,用于定义协程函数。协程函数与 普通函数不同,它们在调用时不会执行函数里面的代码,而是会返回一个协程对象。

# 定义一个协程函数
async def func():
    print('123')
    # 直接调用协程函数会发出警告,并且函数内部的功能也不会执行
func()

想要运行函数体里面的代码,需要进行两个方面的准备:

        1. 获取事件循环。

        2. 将协程对象封装为Task对象并提交到事件循环中。 

2.2、await

        在Python中, await关键字用于挂起(暂停)异步函数的执行,直到被等待的协程 (coroutine)完成。这是异步编程中的一个关键概念,因为它允许程序在等待结果 的同时执行其他任务。

2.2.1、await的基本用法

1. 只能在异步函数内部使用: await关键字只能在一个使用了 异步函数内部使用。它不能在普通的同步函数中使用。

2. 等待协程: async def定义的 await后面通常跟一个协程对象(一个异步函数的调用)。当执行到 await时,当前协程会暂停执行,等待右侧的协程完成。

2.2.2、await的工作原理

1. 挂起与恢复: 当执行到 await时,当前协程会挂起,并让出控制权给事件循环 (event loop)。事件循环可以在这段时间内运行其他协程或处理其他事件。一 旦await后面的协程完成,事件循环会恢复执行原来的协程, 果就是协程的返回值。

2. 非阻塞: 尽管 await表达式的结 await看起来像是同步代码中的阻塞操作,但实际上它是非阻塞 的。这是因为事件循环负责协程之间的切换,从而实现并发。

import asyncio
import time
# 定义一个异步函数say_after,它接受延迟时间和要打印的消息作为参数
async def say_after(delay, what):
# 使用await关键字挂起当前协程,直到指定的延迟时间结束后再继续执行
    await asyncio.sleep(delay)
    # 打印消息
    print(what)
# 定义主异步函数main
async def main():
    # 记录开始时间
    print(f"started at {time.strftime('%X')}")
    # 调用say_after函数,等待1秒后打印'hello'
    await say_after(1, 'hello')
    # 调用say_after函数,等待2秒后打印'world'
    # 注意:这里的执行不是并行的,而是顺序的,因为两个await语句是顺序执行的
    await say_after(2, 'world')
    # 记录结束时间
    print(f"finished at {time.strftime('%X')}")
# 调用asyncio.run()来启动主协程
# 这将创建一个新的事件循环并运行main()直到完成
asyncio.run(main())
'''
started at xx:xx:xx
hello
world
finished at xx:xx:xx
'''

三、事件循环

        事件循环是一种处理程序执行、事件和消息分发的机制。它不断地等待事件的发生, 当事件发生时,事件循环会将其分发给相应的处理程序进行处理。事件循环的核心是 一个循环,它会不断地检查是否有事件需要处理,如果有,就调用相应的回调函数来 处理这些事件。

其工作流程为:

1. 启动:创建并启动事件循环。

2. 注册事件:将各种事件(如网络套接字、文件描述符、定时器等)注册到事件循 环中。

3. 事件循环:进入一个循环,等待事件的发生,并处理这些事件。

4. 执行任务:当事件发生时,事件循环会调用相关的处理函数或恢复相应的协程。 

5. 关闭:当所有任务完成后,关闭事件循环。

事件循环的创建随着Python版本的不同而不同,在Python3.7版本之前,事件循环需 要先使用 asyncio.get_event_loop()来获取循环,然后使用 run_until_complete()来执行任务。在Python3.7及以后的版本,直接使用 asyncio.run()来直接执行任务。

import asyncio
# 定义一个异步函数func1
async def func1():
    print('start func1')  # 打印信息,表示func1开始执行
    await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟
    print('end func1')  # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():
    print('start func2')  # 打印信息,表示func2开始执行
    await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟
    print('end func2')  # 打印信息,表示func2执行结束
asyncio.run(func1())
asyncio.run(func2())
'''
start func1
end func1
start func2
end func2
'''

四、Task对象

        Task对象是 asyncio库中的一个实现,它用来在事件循环中安排协程的执行。一个 Task是对协程的一个封装,简单来说,协程本身并不会自动运行,当一个协程被封 装为一个Task对象并提交到事件循环中时,它才会在事件循环中被安排执行。当协程 执行完毕后, Task会提供协程的返回值或异常,并且相比协程对象, 更加丰富的方法供我们使用。

        将协程对象封装为 Task对象拥有 Task是通过asyncio库中的函数进行的,但随着Python版本的不 同,其所用函数也不同。

        在python3.7之前,Task的创建使用的是 asyncio.ensure_future()函数,通过该 函数将使用 async定义的协程函数所返回的协程对象提交到事件循环中。在 python3.7之后,创建Task对象的方法变得更加直接和明确,可以使用 asyncio.create_task()函数来创建,且python3.8版本之后,添加了name参数可 以为任务指定名称。这个函数接受一个协程对象作为参数,并返回一个新的Task对 象。

import asyncio
# 定义一个异步函数func1
async def func1():
    print('start func1')  # 打印信息,表示func1开始执行
    await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟
    print('end func1')  # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():
    print('start func2')  # 打印信息,表示func2开始执行
    await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟
    print('end func2')  # 打印信息,表示func2执行结束
# 定义主异步函数main,它将作为程序的入口点
async def main():
    # 创建任务列表,使用asyncio.create_task来创建任务
    tasks = [
        asyncio.create_task(func1()),  # 创建并调度func1作为异步任务
        asyncio.create_task(func2())# 创建并调度func2作为异步任务
    ]
    # 使用asyncio.wait等待所有任务完成
    # asyncio.wait接收一个任务列表,并等待这些任务完成
    done, pending = await asyncio.wait(tasks)
# 使用 asyncio.run() 来运行主函数
# asyncio.run()是Python 3.7引入的,它会创建一个新的事件循环,运行传入的协程,并在协程完成后关闭事件循环
# 等同于asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
'''
start func1
start func2
end func1
end func2
'''
import asyncio
# 定义一个异步函数func1
async def func1():
    print('start func1')  # 打印信息,表示func1开始执行
    await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟
    print('end func1')  # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():
    print('start func2')  # 打印信息,表示func2开始执行
    await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟
    print('end func2')  # 打印信息,表示func2执行结束
# 定义主异步函数main,它将作为程序的入口点
async def main():
    # 直接调用asyncio.gather不需要将协程对象先手动封装为Task对象
    # 该函数会负责将它们作为任务调度到事件循环中
    # asyncio.gather会返回一个包含所有结果的列表
    await asyncio.gather(func1(), func2())
# 使用 asyncio.run() 来运行主函数
# asyncio.run()会创建一个新的事件循环,运行传入的协程,并在协程完成后关闭事件循环
asyncio.run(main())
'''
start func1
start func2
end func1
end func2
'''

五、协程间通信

        与线程相似,协程之间的通信也只有消息队列一种,且拥有不同种类的消息队列。 在python中,协程所使用的消息队列在 asyncio.Queue库下,其中共存在三种类型 的队列,分别为标准的先进先出队列 Queue、先进后出队列 LifoQueue和优先级队 列PriorityQueue。

5.1、Queue 

先进先出的原则

asyncio.Queue(maxsize=0)

maxsize:队列的最大尺寸,如果maxsize小于等于零,则队列尺寸是无限的。如果 是大于0的整数,则当队列达到maxsize时, await put()将阻塞至某个元素被 get()取出。

类方法 :

●        Queue.qsize():返回队列中当前有几条消息。

●        Queue.empty():如果队列为空,返回True,否则返回 False。

●        Queue.full():如果队列已满(达到最大尺寸),返回 True,否则返回 False。

●        Queue.put(item, block=True, timeout=None):将 item 放入队列。如果 block 是True是 None(默认),则在必要时阻塞至有空闲的插槽, 如果timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有可 用的插槽,将引发 queue.Full 异常。 

●        Queue.put_nowait(item):相当于 Queue.put(item, block=False)。如果队列已满,立即引发 queue.Full 异常。

●        Queue.get(b1ock=True,timeout=None):从队列中移除并返回一个元素。如果 block是 True 且 timeout是 None(默认),则在必要时阻塞至队列中有项目可用。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有项目可用,将引发 queue.Empty 异常。

●        Queue.get_nowait():相当于 Queue.get(block=False)。如果队列为空立即引发 queue.Empty 异常。

 ●       Queue.task_done():指示之前入队的一个任务已经完成。由队列的消费者线程使用。每个Queue. get()调用之后,需要调用 Queue.task_done()告诉队列该任务处理完成。
●        Queue.join():阻塞直到队列中的所有元素都被处理完。当元素添加到队列的 时候,未完成任务的计数就会增加,每当消费协程调用 task_done()表示这个元 素已经处理完毕,那么未完成计数就会减少。当未完成计数降到零的时候, join()阻塞被接触。

import asyncio
# 生产者协程,负责生成一系列数字并将它们放入队列中
async def producer(queue, n):
    for i in range(1, n + 1):  # 循环从1到n,生成数字
        print(f'生产者生产了: {i}')  # 打印当前生产的数字
        await queue.put(i)  # 将数字放入队列,如果队列已满,则阻塞直到有空位
        await asyncio.sleep(1)  # 模拟生产耗时,等待1秒钟
    print('生产者完成生产。')  # 所有数字生产完毕,打印完成消息
    await queue.put(None)  # 放入一个None作为结束信号,通知消费者没有更多数字
# 消费者协程,负责从队列中取出数字并打印它们
async def consumer(queue):
    while True:  # 无限循环,直到接收到结束信号
        item = await queue.get()  # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素
        if item is None:  # 检查是否接收到结束信号
            queue.task_done()  # 通知队列,当前任务已经完成
            break  # 如果是结束信号,退出循环
        print(f'消费者消费了: {item}')  # 打印当前消费的数字
        queue.task_done()  # 通知队列,当前任务已经完成
    print('消费者完成消费。')  # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():
    queue = asyncio.Queue(5)  # 创建一个队列实例,用于生产者和消费者之间的通信
    # 创建生产者和消费者协程
    producer_coro = producer(queue, 5)  # 生产者协程,生产1到5的数字
    consumer_coro = consumer(queue)  # 消费者协程
    # 使用asyncio.gather等待生产者和消费者协程完成
    # gather允许同时运行多个协程,并在它们都完成时返回结果
    await asyncio.gather(producer_coro, consumer_coro)
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: 1
消费者消费了: 1
生产者生产了: 2
消费者消费了: 2
生产者生产了: 3
消费者消费了: 3
生产者生产了: 4
消费者消费了: 4
生产者生产了: 5
消费者消费了: 5
生产者完成生产。
消费者完成消费。
'''

5.2、LifoQueue 

后进先出

asyncio.LifoQueue(maxsize=0)

maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。 

常用方法: 

●        LifoQueue.put(item, block=True, timeout=None):将 如果 block 是 True 且 timeout 是 item 放入队列。 None(默认),则在必要时阻塞至有空闲 的插槽。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没 有可用的插槽,将引发完全异常。 
●        LifoQueue.put_nowait(item):相当于LifoQueue.put(item,b1ock=False)。如果队列已满,立即引发完全异常。
●        LifoQueue.get(block=True,timeout=None):从队列中移除并返回一个元素。如果 block是True且timeout是 None(默认),则在必要时阻塞至队列中有项目可用。如果 timeout 是正数,将最多阻塞timeout秒,如果在这段时间内没有项目可用,将引发完全异常。 
●        LifoQueue.get_nowait():相当于 LifoQueue.get(block=False)。如果队列为空,立即引发完全异常。
●        LifoQueue.qsize():返回队列中的项目数量
●        LifoQueue.empty():如果队列为空,返回 True,否则返回 False。
●        LifoQueue.full():如果队列已满(达到最大尺寸),返回 True,否则返回False 。

●        LifoQueue.task_done(): 指示之前入队的一个任务已经完成,即 get出来的元素相关操作已经完成。由队列中的 get()端掌控,每次 get用于一个任务时,任务最后要调用 task_done()告诉队列,任务已经完成。

●        LifoQueue.join():阻塞直到队列中的所有元素都被处理完。当元素添加到队 列的时候,未完成任务的计数就会增加,每当消费协程调用 task_done()表示这 个元素已经处理完毕,那么未完成计数就会减少。当未完成计数降到零的时候, join()阻塞被接触。

import asyncio
# 生产者协程,负责生成一系列数字并将它们放入队列中
async def producer(queue, n):
    for i in range(1, n + 1):  # 循环从1到n,生成数字
        print(f'生产者生产了: {i}')  # 打印当前生产的数字
        await queue.put(i)  # 将数字放入队列,如果队列已满,则阻塞直到有空位
        await asyncio.sleep(1)  # 模拟生产耗时,等待1秒钟
    print('生产者完成生产。')  # 所有数字生产完毕,打印完成消息
    await queue.put(None)  # 放入一个None作为结束信号,通知消费者没有更多数字
# 消费者协程,负责从队列中取出数字并打印它们
async def consumer(queue):
    await asyncio.sleep(5)
    while True:  # 无限循环,直到接收到结束信号
        item = await queue.get()  # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素
        if item is None:  # 检查是否接收到结束信号
            queue.task_done()  # 通知队列,当前任务已经完成
            break  # 如果是结束信号,退出循环
        print(f'消费者消费了: {item}')  # 打印当前消费的数字
        queue.task_done()  # 通知队列,当前任务已经完成
    print('消费者完成消费。')  # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():
    queue = asyncio.LifoQueue(10)  # 创建一个队列实例,用于生产者和消费者之间的通信
    # 创建生产者和消费者协程
    producer_coro = producer(queue, 5)  # 生产者协程,生产1到5的数字
    consumer_coro = consumer(queue)  # 消费者协程
    # 使用asyncio.gather等待生产者和消费者协程完成
    # gather允许同时运行多个协程,并在它们都完成时返回结果
    await asyncio.gather(producer_coro, consumer_coro)
    # 等待队列中的所有项目都被处理完毕
    await queue.join()
    print('所有任务都已处理完毕。')
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: 1
生产者生产了: 2
生产者生产了: 3
生产者生产了: 4
生产者生产了: 5
消费者消费了: 5
消费者消费了: 4
消费者消费了: 3
消费者消费了: 2
消费者消费了: 1
生产者完成生产。
消费者完成消费。
所有任务都已处理完毕。
'''

5.3、PriorityQueue

实现优先级队列

asyncio.PriorityQueue(maxsize=0) 

maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。

常用方法: 

●        PriorityQueue.put((priority, item), block=True, timeout=None):将 item 放入队列,并为其指定一个优先级 timeout 是 priority。如果 block 是 True 且 None(默认),则在必要时阻塞至有空闲的插槽。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有可用的插槽,将引发 完全异常。
●        PriorityQueue.put_nowait((item, priority):相当于 PriorityQueue.put((item, priority), block=False)。如果队列已满,立 即引发完全异常。
●        PriorityQueue.get(block=True, timeout=None):从队列中移除并返回一 个元素。如果 block 是 True 且 timeout 是 None(默认),则在必要时阻塞 至队列中有项目可用。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在 这段时间内没有项目可用,将引发完全异常。
●        PriorityQueue.get_nowait():相当于 PriorityQueue.get(block=False)。如果队列为空,立即引发完全异常。
●        PriorityQueue.qsize():返回队列中的项目数量。
●        PriorityQueue.empty():如果队列为空,返回True,否则返回False。
●        PriorityQueue.full():如果队列已满(达到最大尺寸),返回True,否则返回 False。 
●        PriorityQueue.task_done():指示之前入队的一个任务已经完成,即 来的元素相关操作已经完成。由队列中的 get()端掌控,每次get用于一个任务 时,任务最后要调用 get出 task_done()告诉队列,任务已经完成。 

import asyncio
# 生产者协程,负责生成一系列数字(这里实际上是字典键值对)并将它们放入队列中
async def producer(queue, n):
    fx = {4: 'd', 5: 'e', 2: 'b', 3: 'c', 1: 'a'}  # 定义一个字典,包含数字和字母的映射
    fx_tuples = [(key, value) for key, value in fx.items()]  # 将字典转换为元组列表
    for i in range(0, n):  # 循环从0到n-1,但由于字典是无序的,这里的i仅用作索引限制
        # 注意:如果n大于fx_tuples的长度,将会引发IndexError
        print(f'生产者生产了: {fx_tuples[i]}')  # 打印当前生产的元组
        await queue.put(fx_tuples[i])  # 将元组放入队列,如果队列已满,则阻塞直到有空位
        await asyncio.sleep(1)  # 模拟生产耗时,等待1秒钟
    print('生产者完成生产。')  # 所有指定的元组生产完毕,打印完成消息
    await queue.put(None)  # 放入一个None作为结束信号,通知消费者没有更多元组
# 消费者协程,负责从队列中取出元组并打印它们
async def consumer(queue):
    await asyncio.sleep(5)  # 消费者在开始消费前等待5秒(模拟其他任务或延迟)
    while True:  # 无限循环,直到接收到结束信号
        item = await queue.get()  # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素
        if item is None:  # 检查是否接收到结束信号
            queue.task_done()  # 通知队列,当前任务(即处理None作为结束信号的任务)已经完成
            break  # 如果是结束信号,退出循环
        print(f'消费者消费了: {item}')  # 打印当前消费的元组
        queue.task_done()  # 通知队列,当前任务(即处理一个元组的任务)已经完成
    print('消费者完成消费。')  # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():
    queue = asyncio.PriorityQueue(10)  # 创建一个优先队列实例,用于生产者和消费者之间的通信,容量为10
    # 创建生产者和消费者协程
    producer_coro = producer(queue, 5)  # 生产者协程,尝试生产前5个字典中的键值对(注意字典无序)
    consumer_coro = consumer(queue)  # 消费者协程
    # 使用asyncio.gather等待生产者和消费者协程完成
    # gather允许同时运行多个协程,并在它们都完成时返回结果(这里不关心具体返回值)
    await asyncio.gather(producer_coro, consumer_coro)
    # 等待队列中的所有项目都被处理完毕(即等待所有task_done()被调用)
    await queue.join()
    print('所有任务都已处理完毕。')
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: (4, 'd')
生产者生产了: (5, 'e')
生产者生产了: (2, 'b')
生产者生产了: (3, 'c')
生产者生产了: (1, 'a')
消费者消费了: (1, 'a')
消费者消费了: (2, 'b')
消费者消费了: (3, 'c')
消费者消费了: (4, 'd')
消费者消费了: (5, 'e')
生产者完成生产。
消费者完成消费。
所有任务都已处理完毕。
'''

六、协程同步

与进程、线程类似,协程也有同步机制,包括Lock、Semaphore、Event、 Condition。

6.1、Lock

在协程中,可以使用Lock来确保同一时间只有一个协程可以访问某个资源。

asyncio.Lock() 

其方法为:

        acquire():获取锁。此方法会等待直至锁为unlocked,然后将其设为locked并 返回True。当有一个以上的协程在 acquire()中被阻塞则会等待解锁,最终只 有一个协程会被执行。锁的获取是公平的,被执行的协程将是第一个开始等待锁 的协程。

        release():释放锁。当锁为locked时,将其设为unlocked并返回。如果锁为 unlocked,则会抛出异常。

        locked():如果锁为locked则返回 True。 

为了避免死锁,建议使用async with 来管理Lock。 

import asyncio  # 导入asyncio模块,提供异步编程的原语和工具
async def worker(lock, id):  # 定义一个协程函数,接收一个锁和一个标识符
    while True:  # 无限循环,模拟持续工作的协程
        async with lock:  # 使用async with语句获取锁,确保同一时间只有一个协程执行这部分代码
            print(f"Worker {id} is working")  # 打印当前协程正在工作的消息
            await asyncio.sleep(1)  # 模拟I/O操作,挂起协程1秒钟
async def main():  # 定义主协程函数,用于启动和管理其他协程
    lock = asyncio.Lock()  # 创建一个锁,用于同步协程,防止它们同时执行某些代码块
    # 创建两个协程,并将它们传递给asyncio.gather()函数
    # asyncio.gather()用于并发运行多个协程,并等待它们全部完成
    await asyncio.gather(worker(lock, 1), worker(lock, 2))  # 并发运行两个worker协程
asyncio.run(main())  # 运行主协程,启动事件循环并执行主协程

6.2、Semaphore

        在协程中,可以使用Semaphore来控制对资源的访问数量。Semaphore会管理一个 内部计数器,该计数器会随每次 acquire调用递减并随每次 release调用递增,计 数器的值永远不会降到零以下。当 acquire发现其值为零时,它将保持阻塞直到有某 个任务调用了 release。

asyncio.Semaphore(value=1) 

value:value参数用来为内部计数器赋初始值,默认为1。如果给定的值小于0则会 抛出异常。 

其方法为: 

        acquire():获取一个信号量。如果内部计数器的值大于零,则将其减一并立即 返回True。如果其值为零,则会等待直到 release被调用。

        release():释放一个信号量,将内部计数器的值加一。可以唤醒一个正在等待 获取信号量对象的任务。 

        locked():如果信号量对象无法被立即获取则返回True

建议使用async with来管理Semaphore。 

import asyncio
async def car(semaphore, car_id):
    """模拟车辆进入停车场并离开的过程"""
    print(f"Car {car_id} 正在等待停车位")
    async with semaphore:  # 获取信号量,相当于获取停车位
        print(f"Car {car_id} 进入停车场了.")
        await asyncio.sleep(2)  # 模拟车辆在停车场内停留的时间
        print(f"Car {car_id} 离开停车场了")
# 信号量在退出async with块时自动释放,相当于车辆离开停车场
async def main():
    # 假设停车场只有3个停车位
    parking_spaces = asyncio.Semaphore(3)
    # 创建5个车辆协程
    cars = [car(parking_spaces, i) for i in range(1, 6)]
    # 并发运行所有车辆协程
    await asyncio.gather(*cars)
asyncio.run(main())
'''
Car 1 正在等待停车位
Car 1 进入停车场了.
Car 2 正在等待停车位
Car 2 进入停车场了.
Car 3 正在等待停车位
Car 3 进入停车场了.
Car 4 正在等待停车位
Car 5 正在等待停车位
Car 1 离开停车场了
Car 2 离开停车场了
Car 3 离开停车场了
Car 4 进入停车场了.
Car 5 进入停车场了.
Car 4 离开停车场了
Car 5 离开停车场了
'''

6.3、Event

        在python中使用Event允许一个协程通知一个或多个协程某个事件已经发生。Event 对象会管理一个内部标志,可通过 set方法将其设为True并通过 设为False。 clear方法将其重 wait方法会阻塞直至该标志被设为True。该标志初始时会被设为 False。

asyncio.Event() 

其方法为: 

        wait():协程等待直至事件被设置。如果事件已被设置,则立即返回True,否则将阻塞直至另一个任务调用set()

        set():设置事件。所有等待事件被设置的任务将被立即唤醒。 

        clear():清空(取消设置)事件。通过wait()进行等待的任务现在将会阻塞直至set()方法被再次调用。

        is_set():如果事件已被设置则返回 True。

import asyncio
import random
async def producer(event, data):
    """生产者协程,它在准备好数据后设置事件"""
    print(f"Producer is preparing data: {data}")
    time = random.uniform(0.5, 2)
    print(time)
    await asyncio.sleep(time)  # 模拟数据准备时间
    print(f"Producer has prepared data: {data}")
    event.set()  # 设置事件,表示数据已经准备好了
    print("Producer has notified the consumer.")
async def consumer(event):
    """消费者协程,它在事件被设置后开始消费数据"""
    print("Consumer is waiting for data.")
    await event.wait()  # 等待事件被设置
    print("Consumer has received the notification and is consuming data.")
    # 模拟数据处理
    await asyncio.sleep(random.uniform(0.5, 2))
    print("Consumer has finished consuming data.")
async def main():
    # 创建一个事件对象
    event = asyncio.Event()
    # 创建生产者和消费者协程
    producer_coro = producer(event, "data1")
    consumer_coro = consumer(event)
    # 并发运行生产者和消费者协程
    await asyncio.gather(producer_coro, consumer_coro)
asyncio.run(main())
'''
Producer is preparing data: data1
1.4196680751620212
Consumer is waiting for data.
Producer has prepared data: data1
Producer has notified the consumer.
Consumer has received the notification and is consuming data.
Consumer has finished consuming data.
'''

6.4、Condition

         在python中允许协程等待某些条件成立,然后被通知恢复执行。在本质上, Condition 对象合并了 Event和 Lock 的功能。 多个 Condition 对象有可能共享一个 Lock,这允许关注于共享资源的特定状态的不同任务实现对共享资源的协同独占访 问。

asyncio.Condition(lock=None) 

        lock:lock参数必须为自己创建的 Lock对象或None,在后一种情况下会自动创建 一个新的Lock对象。

其方法为: 

        acquire():获取下层的锁。此方法会等待直至下层的锁为 unlocked,将其设为 locked 并返回 True。

        notify(n=1):唤醒最多 n 个正在等待此条件的任务(默认为 1 个)。如果没 有任务正在等待则此方法为空操作。锁必须在此方法被调用前被获取并在随后被 快速释放。 如果通过一个 unlocked 锁调用则会引发异常。

        locked():如果下层的锁已被获取则返回 True。

        notify_all():唤醒所有正在等待此条件的任务。此方法的行为类似于 notify,但会唤醒所有正在等待的任务。锁必须在此方法被调用前被获取并在 随后被快速释放。 如果通过一个 unlocked 锁调用则会引发异常。

        release():释放下层的锁。在未锁定的锁调用时,会引发异常。

        wait():等待直至收到通知。当此方法被调用时如果调用方任务未获得锁,则 会引发异常。这个方法会释放下层的锁,然后保持阻塞直到被 notify()或 notify_all()调用所唤醒。 一旦被唤醒,Condition 会重新获取它的锁并且此 方法将返回 True。

        wait_for(predicate):等待直到目标值变为 true。目标必须为一个可调用对 象,其结果将被解读为一个布尔值。 

建议使用async with来管理Condition。 

import asyncio
# 生产者函数,负责通知所有等待的消费者
async def producer(condition):
    while True:  # 无限循环,模拟持续的生产活动
        await condition.acquire()  # 获取条件变量的锁
        condition.notify_all()  # 通知所有等待的消费者
        condition.release()  # 释放条件变量的锁
        await asyncio.sleep(1)  # 暂停一秒,模拟生产活动的时间间隔
# 消费者函数,负责等待生产者的通知
async def consumer(condition, number):
    while True:  # 无限循环,模拟持续的消费活动
        await condition.acquire()  # 获取条件变量的锁
        print(f'{number}正在等待condition')  # 打印消费者正在等待的通知
        await condition.wait()  # 等待生产者的通知
        print(f'{number}已释放condition')  # 打印消费者收到通知后的消息
        condition.release()  # 释放条件变量的锁
# 主函数,负责启动生产者和消费者任务
async def main():
    condition = asyncio.Condition()  # 创建一个条件变量
    # 创建任务列表,包括一个生产者和多个消费者
    tasks = [
        asyncio.create_task(producer(condition)),  # 创建生产者任务
        asyncio.create_task(consumer(condition, 1)),  # 创建消费者任务,编号为1
        asyncio.create_task(consumer(condition, 2)),  # 创建消费者任务,编号为2
        asyncio.create_task(consumer(condition, 3)),  # 创建消费者任务,编号为3
        asyncio.create_task(consumer(condition, 4)),  # 创建消费者任务,编号为4
        asyncio.create_task(consumer(condition, 5)),  # 创建消费者任务,编号为5
    ]
    # 等待所有任务完成,由于生产者是无限循环,这里实际上会无限等待
    await asyncio.wait(tasks)
# 运行主函数,启动事件循环
asyncio.run(main())

七、将协程分布到线程池/进程池中

        一般情况下,程序的异步开发要么使用协程,要么使用进程池或线程池,但是也会碰 到有一些情况需要既使用协程又使用进程池或线程池,而进程池、线程池 submit后 返回的 Future和协程的 Future又不是一回事,不能直接使用await,因此就需要进 行一个对象的转换。

        在Python中,可以通过 asyncio.wrap_future()来将一个 concurrent.futures.Future转化为asyncio.Future,这样就可以去使用协程的相关内容了。

import asyncio
import concurrent.futures
import time
# 这是一个普通函数
def func1():
    time.sleep(5)
    print('in func1')
# 这是一个普通函数
def func2():
    time.sleep(3)
    print('in func2')
async def main():
# 创建一个进程池
    with concurrent.futures.ProcessPoolExecutor() as pool:
        # 使用进程池提交任务
        future1 = pool.submit(func1)
        future2 = pool.submit(func2)
        # 将 concurrent.futures.Future 转换为 asyncio.Future
        async_future1 = asyncio.wrap_future(future1)
        async_future2 = asyncio.wrap_future(future2)
        # 使用 asyncio 的 await 等待结果
        result = await asyncio.gather(async_future1,async_future2)
        print(f"The result is {result}")
# 注意:进程就需要放到主模块中去执行
if __name__ == '__main__':
    asyncio.run(main())
'''
in func2
in func1
The result is [None, None]
'''

 使用 loop.run_in_executor()直接转换

        使用 asyncio.get_running_loop()时,如果当前没有正在运行的事件循环,就抛 出异常。而上面的 asyncio.get_event_loop()则是在当前没有正在运行的事件循 环的基础上,会创建一个新的事件循环。相对来说, asyncio.get_running_loop()更适合在协程或异步函数内部使用, asyncio.get_event_loop()适用于更广泛的情况。 

loop.run_in_executor(executor, func, *args): 

        executor:一个执行器对象,通常是 concurrent.futures.ThreadPoolExecutor 或 concurrent.futures.ProcessPoolExecutor 的实例。它管理同步函数的执 行,如果不指定就默认创建一个线程池。

        func:要执行的同步函数。

        *args:传递给 func的位置参数。 

import asyncio
import time


# 示例同步函数,模拟耗时操作
def slow_function1():
    # 打印信息,表示函数开始执行
    print("Function 1 is running")
    # 模拟耗时操作,线程睡眠2秒
    time.sleep(2)
    # 打印信息,表示函数执行完毕
    print("Function 1 is done")
    # 返回函数执行结束的信息
    return 'func1 end'


def slow_function2():
    # 打印信息,表示函数开始执行
    print("Function 2 is running")
    # 模拟耗时操作,线程睡眠2秒
    time.sleep(2)
    # 打印信息,表示函数执行完毕
    print("Function 2 is done")
    # 返回函数执行结束的信息
    return 'func2 end'


async def main():
    # 获取当前正在运行的事件循环
    loop = asyncio.get_running_loop()

    print('before run')
    # 使用线程池执行器并发运行两个同步函数
    # run_in_executor的第一个参数为None,表示使用默认的线程池执行器
    task1 = loop.run_in_executor(None, slow_function1)
    task2 = loop.run_in_executor(None, slow_function2)

    print('after run')
    # 等待两个函数执行完成,并获取它们的返回值
    result1 = await task1
    result2 = await task2

    print('after await')
    # 打印两个函数的执行结果
    print(f"Result of function 1: {result1}")
    print(f"Result of function 2: {result2}")


if __name__ == '__main__':
    # 记录程序开始执行的时间
    start = time.time()

    # 运行主函数
    asyncio.run(main())
    # asyncio.get_running_loop()

    # 打印程序执行的总时间
    print('total_time', time.time() - start)
'''
before run
Function 1 is running
Function 2 is running
after run
Function 1 is done
Function 2 is done
after await
Result of function 1: func1 end
Result of function 2: func2 end
total_time 2.0069408416748047
'''

八、思维导图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2295222.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DeepSeek使用技巧大全(含本地部署教程)

在人工智能技术日新月异的今天,DeepSeek 作为一款极具创新性和实用性的 AI,在众多同类产品中崭露头角,凭借其卓越的性能和丰富的功能,吸引了大量用户的关注。 DeepSeek 是一款由国内顶尖团队研发的人工智能,它基于先进…

ElasticSearch集群因索引关闭重打开导致飘红问题排查

背景 某组件向 ElasticSearch 写入数据,从最近某一天开始写入速度变慢,数据一直有积压。推测是 ElasticSearch 集群压力导致的,查看 ElasticSearch 集群状态,发现集群确实处于 red 状态。 本文记录 ElasticSearch 集群因索引关闭…

计算机毕业设计Tensorflow+LSTM空气质量监测及预测系统 天气预测系统 Spark Hadoop 深度学习 机器学习 人工智能

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

手搓基于CNN的Chest X-ray图像分类

数据集Chest X-ray PD Dataset 数据集介绍 - 知乎https://zhuanlan.zhihu.com/p/661311561 CPU版本 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from torchvision import transforms, models import …

使用java代码操作rabbitMQ收发消息

SpringAMQP 将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也…

【数据结构】(7) 栈和队列

一、栈 Stack 1、什么是栈 栈是一种特殊的线性表,它只能在固定的一端(栈顶)进行出栈、压栈操作,具有后进先出的特点。 2、栈概念的例题 答案为 C,以C为例进行讲解: 第一个出栈的是3,那么 1、…

Composo:企业级AI应用的质量守门员

在当今快速发展的科技世界中,人工智能(AI)的应用已渗透到各行各业。然而,随着AI技术的普及,如何确保其可靠性和一致性成为了企业面临的一大挑战。Composo作为一家致力于为企业提供精准AI评估服务的初创公司,通过无代码和API双模式,帮助企业监测大型语言模型(LLM)驱动的…

Python数据分析案例71——基于十种模型的信用违约预测实战

背景 好久没写这种基础的做机器学习流程了,最近过完年感觉自己代码忘了好多.....复习一下。 本次带来的是信贷违约的预测,即根据这个人的特征(年龄收入什么的),预测他是不是会违约,会违约就拒绝贷款&…

python康威生命游戏的图形化界面实现

康威生命游戏(Conway’s Game of Life)是由英国数学家约翰何顿康威(John Horton Conway)在1970年发明的一款零玩家的细胞自动机模拟游戏。尽管它的名字中有“游戏”,但实际上它并不需要玩家参与操作,而是通…

区块链技术:Facebook 重塑社交媒体信任的新篇章

在这个信息爆炸的时代,社交媒体已经成为我们生活中不可或缺的一部分。然而,随着社交平台的快速发展,隐私泄露、数据滥用和虚假信息等问题也日益凸显。这些问题的核心在于传统社交媒体依赖于中心化服务器存储和管理用户数据,这种模…

UE求职Demo开发日志#25 试试网络同步和尝试打包

1 改了一些时序上的bug,成功运行了多端 (UE一些网络相关的功能都弄好了,只需要标记哪个变量或Actor需要复制) 2 以前遗留的bug太多了,改到晚上才打包好一个能跑的版本,而且有的特效还不显示(悲…

Win10环境使用ChatBox集成Deep Seek解锁更多玩法

Win10环境使用ChatBox集成Deep Seek解锁更多玩法 前言 之前部署了14b的Deep Seek小模型,已经验证了命令行及接口方式的可行性。但是纯命令行或者PostMan方式调用接口显然不是那么友好: https://lizhiyong.blog.csdn.net/article/details/145505686 纯…

第 26 场 蓝桥入门赛

2.对联【算法赛】 - 蓝桥云课 问题描述 大年三十,小蓝和爷爷一起贴对联。爷爷拿出了两副对联,每副对联都由 N 个“福”字组成,每个“福”字要么是正的(用 1 表示),要么是倒的(用 0 表示&#…

CodeGPT + IDEA + DeepSeek,在IDEA中引入DeepSeek实现AI智能开发

CodeGPT IDEA DeepSeek,在IDEA中引入DeepSeek 版本说明 建议和我使用相同版本,实测2022版IDEA无法获取到CodeGPT最新版插件。(在IDEA自带插件市场中搜不到,可以去官网搜索最新版本) ToolsVersionIntelliJ IDEA202…

【2025年更新】1000个大数据/人工智能毕设选题推荐

文章目录 前言大数据/人工智能毕设选题:后记 前言 正值毕业季我看到很多同学都在为自己的毕业设计发愁 Maynor在网上搜集了1000个大数据的毕设选题,希望对大家有帮助~ 适合大数据毕业设计的项目,完全可以作为本科生当前较新的毕…

什么是中间件中间件有哪些

什么是中间件? 中间件(Middleware)是指在客户端和服务器之间的一层软件组件,用于处理请求和响应的过程。 中间件是指介于两个不同系统之间的软件组件,它可以在两个系统之间传递、处理、转换数据,以达到协…

使用docker搭建FastDFS文件服务

1.拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/fastdfs:latest2.使用docker镜像构建tracker容器(跟踪服务器,起到调度的作用) docker run -dti --networkhost --name tracker -v /data/fdfs/tracker:/var/fdfs -…

天津三石峰科技——汽车生产厂的设备振动检测项目案例

汽车产线有很多传动设备需要长期在线运行,会出现老化、疲劳、磨损等 问题,为了避免意外停机造成损失,需要加装一些健康监测设备,监测设备运 行状态。天津三石峰科技采用 12 通道振动信号采集卡(下图 1)对…

Linux之文件IO前世今生

在 Linux之文件系统前世今生(一) VFS中,我们提到了文件的读写,并给出了简要的读写示意图,本文将分析文件I/O的细节。 一、Buffered I/O(缓存I/O)& Directed I/O(直接I/O&#…

半导体制造工艺讲解

目录 一、半导体制造工艺的概述 二、单晶硅片的制造 1.单晶硅的制造 2.晶棒的切割、研磨 3.晶棒的切片、倒角和打磨 4.晶圆的检测和清洗 三、晶圆制造 1.氧化与涂胶 2.光刻与显影 3.刻蚀与脱胶 4.掺杂与退火 5.薄膜沉积、金属化和晶圆减薄 6.MOSFET在晶圆表面的形…