Python异步编程中的Producer-Consumer模式

news2024/10/25 5:06:04

Python异步编程中的Producer-Consumer模式

    • 1. Producer-Consumer模式简介
      • 1.1 生产者(Producer)
      • 1.2 消费者(Consumer)
      • 1.3 队列(Queue)
    • 2. 示例代码
      • 2.1 简单的Producer-Consumer示例
      • 2.2 多消费者示例
      • 2.3 带批量处理的Producer-Consumer示例
    • 3. 关键技术点解释
      • 3.1 `asyncio.Queue`
      • 3.2 `asyncio.create_task`
      • 3.3 `asyncio.gather`
      • 3.4 `aiofiles`
    • 4. 总结

在Python异步编程中,Producer-Consumer模式是一种常见的设计模式,用于处理生产者和消费者之间的任务分配和处理。生产者负责生成任务,而消费者负责处理这些任务。这种模式在处理I/O密集型任务时特别有用,可以显著提高程序的效率。

本文将通过一个简单的示例,介绍如何在Python中使用asyncio库实现Producer-Consumer模式,并详细解释其中的关键技术点。

1. Producer-Consumer模式简介

1.1 生产者(Producer)

生产者负责生成任务并将任务放入队列中。生产者可以是任何生成数据的组件,例如从文件读取数据、从网络获取数据等。

1.2 消费者(Consumer)

消费者负责从队列中取出任务并进行处理。消费者可以是任何处理数据的组件,例如将数据写入文件、进行数据分析等。

1.3 队列(Queue)

队列是生产者和消费者之间的桥梁,用于存储生产者生成的任务,并供消费者取出任务进行处理。在Python中,可以使用asyncio.Queue来实现异步队列。

2. 示例代码

2.1 简单的Producer-Consumer示例

以下是一个简单的Producer-Consumer示例,生产者生成数字任务,消费者将这些数字打印出来。

import asyncio

async def producer(queue):
    for i in range(10):
        await asyncio.sleep(1)  # 模拟生产任务的延迟
        print(f"Producing task {i}")
        await queue.put(i)
    await queue.put(None)  # 添加结束标记

async def consumer(queue):
    while True:
        task = await queue.get()
        if task is None:
            break
        print(f"Consuming task {task}")
        await asyncio.sleep(1)  # 模拟处理任务的延迟
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))
    await asyncio.gather(producer_task, consumer_task)

asyncio.run(main())

2.2 多消费者示例

以下是一个多消费者的示例,生产者生成数字任务,多个消费者从队列中取出任务并进行处理。

import asyncio

async def producer(queue):
    for i in range(10):
        await asyncio.sleep(1)  # 模拟生产任务的延迟
        print(f"Producing task {i}")
        await queue.put(i)
    for _ in range(3):  # 添加结束标记
        await queue.put(None)

async def consumer(queue, consumer_id):
    while True:
        task = await queue.get()
        if task is None:
            break
        print(f"Consumer {consumer_id} consuming task {task}")
        await asyncio.sleep(1)  # 模拟处理任务的延迟
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
    await asyncio.gather(producer_task, *consumers)

asyncio.run(main())

2.3 带批量处理的Producer-Consumer示例

以下是一个带批量处理的Producer-Consumer示例,生产者生成数字任务,消费者将这些数字写入文件。

import asyncio
import aiofiles

async def producer(queue):
    for i in range(10):
        await asyncio.sleep(1)  # 模拟生产任务的延迟
        print(f"Producing task {i}")
        await queue.put(i)
    await queue.put(None)  # 添加结束标记

async def consumer(queue, batch_size):
    buffer = []
    while True:
        task = await queue.get()
        if task is None:
            break
        buffer.append(task)
        if len(buffer) >= batch_size:
            await write_to_file(buffer)
            buffer.clear()
        queue.task_done()
    if buffer:
        await write_to_file(buffer)

async def write_to_file(data):
    async with aiofiles.open('output.txt', 'a') as f:
        for item in data:
            await f.write(f"{item}\n")

async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue, batch_size=3))
    await asyncio.gather(producer_task, consumer_task)

asyncio.run(main())

3. 关键技术点解释

3.1 asyncio.Queue

asyncio.Queue 是异步队列,用于在生产者和消费者之间传递任务。生产者使用 await queue.put(item) 将任务放入队列,消费者使用 await queue.get() 从队列中取出任务。

3.2 asyncio.create_task

asyncio.create_task 用于创建异步任务,并将其添加到事件循环中。这样可以并行执行多个任务。

3.3 asyncio.gather

asyncio.gather 用于等待多个协程完成。它返回一个包含所有协程结果的列表。

3.4 aiofiles

aiofiles 是一个第三方库,提供了异步文件操作的功能。通过 aiofiles.open 可以异步打开文件,并通过 await f.write 进行异步写入。

4. 总结

通过本文的介绍和示例代码,我们了解了如何在Python中使用asyncio库实现Producer-Consumer模式。这种模式在处理I/O密集型任务时特别有用,可以显著提高程序的效率。使用asyncio.Queue可以方便地在生产者和消费者之间传递任务,使用asyncio.create_taskasyncio.gather可以并行执行多个任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2222975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

54万字WORD电力数字化转型智慧电力一体化监管云平台整体解决方案

▲关注智慧方案文库,学习9000多份最新解决方案,其中 PPT、WORD超过7000多份 ,覆盖智慧城市多数领域的深度知识社区,稳定更新4年,日积月累,更懂行业需求。 1459页54万字WORD丨电力行业数字化转型 智慧电力…

【ArcGIS Pro实操第4期】绘制三维地图

【ArcGIS Pro实操第4期】绘制三维地图 ArcGIS Pro绘制三维地图-以DEM高程为例参考 如何使用ArcGIS Pro将栅格数据用三维的形式进行表达?在ArcGIS里可以使用ArcScene来实现,ArcGIS Pro实现原理跟ArcScene一致。由于Esri未来将不再对ArcGIS更新&#xff0c…

53页 PPT煤炭行业数字化转型规划方案

▲关注智慧方案文库,学习9000多份最新解决方案,其中 PPT、WORD超过7000多份 ,覆盖智慧城市多数领域的深度知识社区,稳定更新4年,日积月累,更懂行业需求。 53页 PPT煤炭行业数字化转型规划方案 通过对煤企高…

手机玩使命召唤21:黑色行动6?GameViewer远程玩使命召唤教程

使命召唤21:黑色行动 6这个第一人称射击游戏,将于10月25号上线!如果你是使命召唤的老玩家,是不是也在期待这部新作?其实这个游戏不仅可以用电脑玩,还可以用手机玩,使用网易GameViewer远程就能让…

【Qt6聊天室项目】 主界面功能实现

1. 获取当前用户的个人信息 1.1 前后端逻辑分析(主界面功能) 主界面上所有的前后端交互逻辑相同,分析到加载会话列表后其余功能仅实现。 核心逻辑总结 异步请求-响应模型 客户端发起请求,向服务器发送包含会话ID的请求服务端处…

python画图|曲线动态输出

【1】引言 前序教程中的曲线动态输出,其实是把曲线按照左右移动的形式输出(波的传递形式)。 python画图|曲线动态输出基础教程_python 动态曲线-CSDN博客 但有些时候我们更期待的是曲线不移动,随着自变量的增加而输出因变量&am…

信号与系统学习:傅里叶级数

一、基本概念 1. 什么是傅里叶级数? 傅里叶级数是一种数学工具,可以将一个周期函数分解为一系列正弦和余弦函数(即三角函数)的和。这些正弦和余弦函数的频率是原函数的整数倍。 2. 为什么要使用傅里叶级数? 信号分…

【STM32+HAL】OV2640捕获图像显示

一、准备工作 有关CUBEMX的初始化配置,参见我的另一篇blog:【STM32HAL】CUBEMX初始化配置 二、所用工具 1、芯片: STM32F407ZGT6 2、IDE: MDK-Keil软件 3、库文件:STM32F4xxHAL库 三、实现功能 通过OV2640捕获图像…

Flutter UI组件库(JUI)

Flutter UI组件库 (JUI) 介绍 您是否正在寻找一种方法来简化Flutter开发过程,并创建美观、一致的用户界面?您的搜索到此为止!我们的Flutter UI组件库(JUI)提供了广泛的预构建、可自定义组件,帮助您快速构建…

为什么会配置足够打LOL等网游很卡?12代大小核处理器最典型

卡顿原因及优化建议 大小核调度问题: 调度不当:某些游戏未针对大小核进行优化,可能导致系统将负载分配到效率核(小核),而性能核(大核)未被充分利用。操作系统调度策略:尽…

15.6 JDBC数据库编程6——可滚动和可更新的ResultSet

目录 15.6 引言 15.6.1 可滚动的ResultSet 15.6.1 可更新的ResultSet 15.6 引言 可滚动的ResultSet是指在结果集对象上不但可以向前访问结果集中的记录,还可以向后访问结果集中记录。可更新的ResultSet是指不但可以访问结果集中的记录,还可以更新…

文件操作(1) —— 文件基础知识

目录 1. 为什么使用文件? 2. 文件种类【按功能分】 3. 文件名 4. 数据文件种类【按存储方式细分】 5. 文件的打开和关闭 5.1 流和标准流 5.2 文件指针 5.3 文件的打开和关闭函数 6. 文件缓冲区 1. 为什么使用文件? 如果没有⽂件,我…

Vue笔记-浏览器窗口改变时,重新计算表格高度并设置

当窗口大小改变时,你监听 window 对象的 resize 事件,然后在事件处理程序中重新计算表格的高度。在 Vue 中,可以在组件中通过 created 生命周期钩子来添加事件监听器,然后在组件销毁时移除事件监听器。 如下vue代码: …

Android GPU Inspector分析帧数据快速入门

使用 谷歌官方工具Android GPU Inspector (AGI) 可以对Android 应用进行深入和全面的系统性能分析和帧性能分析 。AGI 是一个非常强大的分析工具,尤其是在需要诊断 GPU 性能问题和优化应用时,可以帮助你精准找到性能瓶颈。本文介绍如何使用该工具对帧数据…

24V转3.3V2A同步降压WT6030

24V转3.3V2A同步降压WT6030 WT6030 是一种高效同步整流降压开关模式转换器,集成内部功率 MOSFET,能在宽输入电源范围内提供较高的输出电流,以下是使用 WT6030 将 24V 降压到 3.3V 输出 2A 电流的相关设计要点: 1. 电路设计 输入电…

零基础Java第九期:一维数组(二)和二维数组

目录 一、数组的练习 1.1. 顺序表查找 1.2. 二分查找 1.3. 冒泡排序 二、二维数组 2.1. 二维数组的性质 2.2. 不规则二维数组 一、数组的练习 1.1. 顺序表查找 题目描述:给定一个数组, 再给定一个元素, 找出该元素在数组中的位置。 利用for循环去遍历数组&am…

听一听语音助手的声音

分享自制树莓派语音助手的博文也有一些日子了,今天咱们来听听语音助手自己的声音。 上图是本次对话的log记录,从图上可以看到,主要的对话耗时是用于录音(默认5秒)和语音识别(平均5秒)这两个组件…

【数据结构】包装类简单认识泛型-Java

包装类 在Java中,由于基本类型不是继承自Object,为了在泛型代码中可以支持基本类型,Java给每个基本类型都给了一个包装类型 基本数据类型和对应的包装类 基本数据类型包装类ByteByteshortShortint Integer longLongfloatFloatdoubleDoublec…

洞察前沿趋势!2024深圳国际金融科技大赛——西丽湖金融科技大学生挑战赛技术公开课指南

在当前信息技术与“互联网”深度融合的背景下,金融行业的转型升级是热门话题,创新与发展成为金融科技主旋律。随着区块链技术、人工智能技术、5G通信技术、大数据技术等前沿科技的飞速发展,它们与金融领域的深度融合,正引领着新型…

模型的部署:服务端与客户端建立连接(Flask)

目录 一、服务端部署(使用Flask) 1.安装Flask 2.加载模型(这里以识别图片的类型模型为例) 3.定义API端点 4.运行Flask应用 二、客户端请求 1.安装HTTP客户端库 2.发送请求 请求成功示例: 监控与日志 总结 在…