Python多进程间通讯(包含共享内存方式)

news2025/1/2 0:09:22

文章目录

  • 1 通过非共享内存配合队列方式
  • 2 通过共享内存配合队列方式


注:本博文测试环境为Linux系统。


1 通过非共享内存配合队列方式

下面是一个常见的生产者与消费者的模式示例,这里分别启动了两个子进程,一个为生产者(producer)一个为消费者(consumer)。生产者负责生产Numpy的NDArray数据(这里为了体现进程间传递数据会耗时故创建的NDArray的shape比较大),然后将数据放入队列Queue。消费者监控队列Queue一旦有数据就取出并简单打印下shape信息和填充的Value信息。

import time
import multiprocessing as mp
from multiprocessing import Process, Queue

import numpy as np


def producer_task(queue: Queue):
    for i in range(10):
        data = np.full(shape=(1, 3, 2048, 2048), fill_value=i, dtype=np.float32)
        queue.put(data)
        time.sleep(0.1)

    # send exit signal
    queue.put(None)
    print("producer exit.")


def consumer_task(queue: Queue):
    while True:
        data = queue.get()
        if data is None:
            break

        print(f"get data shape:{data.shape}, fill value:{data[0][0][0][0]}")

    print("consumer exit.")


def main():
    queue = Queue()
    producer = Process(target=producer_task, args=(queue,), name="producer")
    consumer = Process(target=consumer_task, args=(queue,), name="consumer")

    producer.start()
    consumer.start()

    producer.join()
    consumer.join()


if __name__ == '__main__':
    mp.set_start_method("spawn")
    main()

执行以上代码终端输出以下内容:

get data shape:(1, 3, 2048, 2048), fill value:0.0
get data shape:(1, 3, 2048, 2048), fill value:1.0
get data shape:(1, 3, 2048, 2048), fill value:2.0
get data shape:(1, 3, 2048, 2048), fill value:3.0
get data shape:(1, 3, 2048, 2048), fill value:4.0
get data shape:(1, 3, 2048, 2048), fill value:5.0
get data shape:(1, 3, 2048, 2048), fill value:6.0
get data shape:(1, 3, 2048, 2048), fill value:7.0
get data shape:(1, 3, 2048, 2048), fill value:8.0
get data shape:(1, 3, 2048, 2048), fill value:9.0
producer exit.
consumer exit.

为了进一步看清进程之间传递数据的过程,这里使用viztracer工具进一步分析(直接通过pip install viztraver即可安装)。使用指令如下,其中main.py就是上面的代码内容。跑完后会在当前目录下生成一个result.json文件。

viztracer main.py

通过如下指令可视化result.json文件:

vizviewer result.json

在终端输入上述指令后,终端会提示你打开网页并进入http://localhost:9001,如果使用的是VSCODE IDE在右下角也会提示你打开浏览器。
在这里插入图片描述

在这里插入图片描述
可以看到生产者进程在将数据放入队列后会先进行ForkingPickler.dump即数据序列化的过程,大概耗时12ms。然后开始posix.write即开始将数据从一个进程传递到另一个进程,大概耗时34ms。最后在消费者进程进行_pickle.loads即数据的反序列化,大概耗时6ms。从生产者进程将数据放入队列到消费者进程拿到数据总耗时约53ms。从这个示例中可以看到,当在进程间传递的数据量很大时会很耗时。


2 通过共享内存配合队列方式

下面示例代码将传递的数据改为了共享内存的方式,这样可以大幅减小进程间数据传递的成本。这里主要是使用multiprocessing库中的shared_memory.SharedMemory对象。创建新的共享内存时需要将create参数设置为True(如果是复用已有的共享内存时设置为False),然后指定具体的size大小,该参数为数据的字节大小,比如要申请一块存放数据类型为float32shape(1, 3, 2048, 2048)的空间所需字节数为1 * 3 * 2048 * 2048 * 4float32为4个字节)。根据Python官方文档介绍,当一个进程不在使用该共享内存时应关闭指向共享内存的文件描述符/句柄,具体做法是调用共享内存对象的close方法。当某块共享内存不在需要时,需在最后一个使用到的进程中调用unlink方法显示释放掉(如果不调用该方法,共享内存会一直存在,如果后续再不断申请新的共享内存则会出现共享内存泄露的问题,或者当程序未正常退出时该共享内存块会成为僵尸共享内存?)。例如在当前示例中,生产者进程创建了共享内存并放入队列里后可调用close方法关闭当前进程指向共享内存的文件描述符/句柄,在消费者进程中拿到数据并消费完后除了调用close方法外还会调用unlink方法删除该共享内存。有关共享内存的详细介绍看查看Python官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.shared_memory.html#multiprocessing.shared_memory.SharedMemory

import time
import multiprocessing as mp
from multiprocessing import Process, Queue, shared_memory

import numpy as np


def producer_task(queue: Queue):
    for i in range(10):
        shm = shared_memory.SharedMemory(
            name=f"data_{i}",
            create=True,
            size=1 * 3 * 2048 * 2048 * 4
        )
        np_data = np.ndarray(shape=(1, 3, 2048, 2048), dtype=np.float32, buffer=shm.buf)
        np_data.fill(i)

        queue.put(shm.name)
        shm.close()
        time.sleep(0.1)

    # send exit signal
    queue.put(None)
    print("producer exit.")


def consumer_task(queue: Queue):
    while True:
        shm_name = queue.get()
        if shm_name is None:
            break

        shm = shared_memory.SharedMemory(name=shm_name, create=False)
        np_data = np.ndarray(shape=(1, 3, 2048, 2048), dtype=np.float32, buffer=shm.buf)
        print(f"get data shape:{np_data.shape}, fill value:{np_data[0][0][0][0]}")
        shm.close()
        shm.unlink()

    print("consumer exit.")


def main():
    queue = Queue()
    producer = Process(target=producer_task, args=(queue,), name="producer")
    consumer = Process(target=consumer_task, args=(queue,), name="consumer")

    producer.start()
    consumer.start()

    producer.join()
    consumer.join()


if __name__ == '__main__':
    mp.set_start_method("spawn")
    main()

同样我们使用viztracer来看看进程间的通讯情况:
在这里插入图片描述

数据从生产者进程传递到消费者进程耗时为245us相比之前不使用共享内存方法的53ms,速度比值为53000/245≈216X,提升还是非常明显的。但是这有个很奇怪的现象我无法理解,就是在生产者进程中调用close方法用了1.8ms,而在消费者进程里调用close方法只用了15us,unlink用了8us,如果有知道的大神希望能帮忙解释下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2240250.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

平台整合是网络安全成功的关键

如今,组织面临着日益复杂、动态的网络威胁环境,随着恶意行为者采用越来越阴险的技术来破坏环境,攻击的数量和有效性也在不断上升。我们最近的 Cyber​​Ark 身份威胁形势报告(2024 年 5 月)发现,去年 99% 的…

冒泡选择法(c基础)

适合对象c语言初学者。 冒泡选择法 作用对一个数组进行排序。(介绍一下数组(c基础)(详细版)-CSDN博客) 核心要点 1: 数组元素个数 sz 2: 比较后的交换。 核心思路 进行(sz - 1)趟,每一趟把最大数的放到末尾。其…

ubuntu[无桌面]——使用FileZilla连接本地和虚拟机实现文件共享

在虚拟机上跑命令的时候,有时候需要使用到一些在本机上的文件,但是由于安装的Ubuntu是无桌面的,那么怎么去实现将本地文件拖放到虚拟机上捏,这里记录一下 FileZilla的操作,以及一些问题的解决。 (1&#xf…

openGauss常见问题与故障处理(四)

4.数据库故障定位手段: 数据库故障定位手段通常有如下三种类: 提到“种类”,这里给大家举一个模拟场景中肖荏盖反向的小故事 对于初学者入门的学习,一些理论不容易理解或记住,所以本节课程【创新】采用了【正、反对比…

无法启动此程序,因为计算机中丢失 msvcr100.dll”。五种有效方法分享

msvcr100.dll 是 Microsoft Visual C 2010 Redistributable Package 的一部分,这是一个动态链接库(DLL)文件,对于运行基于 Windows 操作系统的许多应用程序至关重要。它内含 C 运行时库,提供多种常用函数与类&#xff…

【React】状态管理之Redux

🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 ​💫个人格言: "如无必要,勿增实体" 文章目录 状态管理之Redux引言1. Redux 的核心概念1.1 单一数据源(Single Sou…

数字IC后端实现之Innovus specifyCellEdgeSpacing和ICC2 set_placement_spacing_rule的应用

昨天帮助社区IC训练营学员远程协助解决一个Calibre DRC案例。通过这个DRC Violation向大家分享下Innovus和ICC2中如何批量约束cell的spacing rule。 数字IC后端手把手实战教程 | Innovus verify_drc VIA1 DRC Violation解析及脚本自动化修复方案 下图所示为T12nm A55项目的Ca…

LLM - 计算 多模态大语言模型 的参数量(Qwen2-VL、Llama-3.1) 教程

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/143749468 免责声明:本文来源于个人知识与公开资料,仅用于学术交流,欢迎讨论,不支持转载。 影响 (…

一图胜千言,一张图深入读懂大模型应用是如何工作的

在科技飞速发展的今天,人工智能(AI)早已不再是遥不可及的科幻概念,而是融入了我们生活的方方面面。其中,大模型作为AI领域的重要分支,以其卓越的表现力和广泛的应用前景,吸引了无数人的目光。但…

Spring AI Alibaba - 快速开发生成式Java Al应用

大家好,我是袁庭新。 今天我们不谈ServerlessAI、AI可观测性、云消息队列演进与AI赋能以及AI原生应用架构等,这些都是近年最火热的技术方向。但是如果你想在未来成为一名合格且具备前瞻视野的软件开发工程师,这些新兴且热门的技术领域都是需…

简易入手《SOM神经网络》的本质与原理

原创文章,转载请说明来自《老饼讲解神经网络》:www.bbbdata.com 关于《老饼讲解神经网络》: 本网结构化讲解神经网络的知识,原理和代码。 重现matlab神经网络工具箱的算法,是学习神经网络的好助手。 目录 一、入门原理解说 01.…

大模型经典著作《大语言模型基础与前沿》

介绍 **《大语言模型基础与前沿》是由美国明尼苏达大学双城分校电子与计算机工程博士熊涛所著。**熊博士曾在多家中美知名高科技公司担任高级管理职位和首席科学家,在人工智能的多个领域,包括大语言模型、图神经网络等从事研发和管理工作多年。 本书深…

DBeaver 连接 OceanBase Oracle 租户

DBeaver 是一款通用的数据库工具软件,支持任何具有JDBC驱动程序的数据库。DBeaver 需要 Java 运行环境的支持。截稿时 DBeaver 24.0.0 版本默认提供的 OceanBase 驱动是连接 MySQL 的,想连接 Oracle 租户需要新建一个驱动器使用。 下载数据库驱动包 1、…

定时任务进行简单监控、爬虫的自动化之旅

原文链接:「定时任务」进阶指南:监控、爬虫的自动化之旅

spring gateway 动态路由

##yml配置 spring:application:name: public-gateway # cloud: # gateway: # routes: # - id: mybatis-plus-test # 路由的唯一标识 # uri: http://192.168.3.188:9898 # 目标服务的地址 # predicates: # - Path/test/** # 匹配…

论文1—《基于卷积神经网络的手术机器人控制系统设计》文献阅读分析报告

论文报告:基于卷积神经网络的手术机器人控制系统设计 摘要 本研究针对传统手术机器人控制系统精准度不足的问题,提出了一种基于卷积神经网络的手术机器人控制系统设计。研究设计了控制系统的总体结构,并选用PCI插槽上直接内插CAN适配卡作为上…

OpenHarmony-1.启动流程

OpenHarmony启动流程 1.kernel的启动 流程图如下所示:   OpenHarmony(简称OH)的标准系统的底层系统是linux,所以调用如下代码: linux-5.10/init/main.c: noinline void __ref rest_init(void) {struct task_struct *tsk;int pid;rcu_sch…

Python Plotly 库使用教程

Python Plotly 库使用教程 引言 数据可视化是数据分析中至关重要的一部分,它能够帮助我们更直观地理解数据、发现潜在的模式和趋势。Python 提供了多种数据可视化库,其中 Plotly 是一个功能强大且灵活的库,支持交互式图表的创建。与静态图表…

校园交友系统的设计与实现(开源版+三端交付+搭建+售后)

系统基础架构 采用UniApp进行开发,UniApp是一个使用Vue.js开发所有前端应用的框架,它支持编译为H5、小程序、App等多个平台。 使用PHP作为后端开发语言,PHP是一种广泛使用的开源脚本语言,尤其适用于Web开发,并可高效…

SQL 外连接

1 外连接 外连接是一种用于结合两个或多个表的方式,返回至少一个表中的所有记录。 左外连接 LEFT JOIN,左表为驱动表,右表为从表。返回驱动表的所有记录以及从表中的匹配记录。如果从表没有匹配,则结果中从表的部分为NULL。 右…