由浅入深掌握 Python 进程间通信的各类方式

news2024/11/17 6:22:55

由浅入深掌握 Python 多进程间通信各类方式

  • 1、为什么要掌握进程间通信
  • 2、进程间各类通信方式简介
  • 3、消息机制通信
    • 1) 管道 Pipe 通信方式
    • 2) 消息队列Queue 通信方式
  • 4、同步机制通信
      • (1) 进程间同步锁 – Lock
      • (2) 子进程间协调机制 -- Event
  • 5、共享内存方式通信
      • (1) 共享变量
      • (2) 共享内存 Shared_memory
      • 3) ShareableList 共享列表
  • 6、共享内存管理器Manager
      • 1) Manager的主要数据结构
      • 2) 使用方法:
      • 3) 销毁共享内存变量
      • 4) 向管理器注册自定义类型

1、为什么要掌握进程间通信

python的多线程代码效率由于受制于GIL,不能利用多核CPU来加速,而多进程方式可以绕过GIL, 发挥多CPU加速的优势,能够明显提高程序的性能。
在这里插入图片描述

但进程间通信却是不得不考虑的问题。 进程不同于线程,有自己的独立内存空间,不能使用全局变量在进程间传递数据。

实际项目需求中,常常有密集计算、或实时性任务,进程之间有时需要传递大量数据,如图片数据、对象等,传递数据如果通过文件序列化、或网络接口来进行,难以满足实时性要求,采用redis,或者kaffka, rabbitMQ 之第3方消息队列包,令系统复杂化了。而Python Multiprocessing 模块本身就提供了消息机制、同步机制、共享内存等各种非常高效的进程间通信方式。

了解python进程间通信的各类方式,以及安全机制,可以帮助大幅提升程序运行性能。

2、进程间各类通信方式简介

进程间通信的主要方式总结如下在这里插入图片描述

关于进程间通信的内存安全
内存安全意味着,多进程间可能会因同抢,意外销毁等原因造成共享变量异常。
Multiprocessing 模块提供的Queue, Pipe, Lock, Event 对象,都已实现了进程间通信安全机制。
采用共享内存方式通信,需要在代码中自已来跟踪、销毁这些共享内存变量,否则可能会出同抢、未正常销毁等。造成系统异常。 除非开发者很清楚共享内存使用特点,否则不建议直接使用此共享内存,而是通过Manager管理器来使用共享内存。

内存管理器Manager
Multiprocessing提供了内存管理器Manager类,可统一解决进程通信的内存安全问题,可以将各种共享数据加入管理器,包括 list, dict, Queue, Lock, Event, Shared Memory 等,由其统一跟踪与销毁。

3、消息机制通信

1) 管道 Pipe 通信方式

类似于1上简单的socket通道,双端均可收发消息。
Pipe() 构建方法:
parent_conn, child_conn = Pipe(duplex=True/False)
• duplex=True, 可以双向通讯
• duplex=True,只有child_conn可以发,parent_conn只能收。

实现过程,见下例:

from multiprocessing import Process, Pipe
   def myfunction(conn):
      conn.send(['hi!! I am Python'])
      conn.close()

if __name__ == '__main__':
      parent_conn, child_conn = Pipe()
      p = Process(target=myfunction, args=(child_conn,))
      p.start()
  	print (parent_conn.recv() )
	p.join()

2) 消息队列Queue 通信方式

Multiprocessing的Queue 类,是在python queue 3.0版本上修改的, 可以很容易实现生产者 – 消息者间传递数据,而且Multiprocessing的Queue 模块实现了lock安全机制。
在这里插入图片描述
Queue模块共提供了3种类型的队列。
LIFO就是堆栈。
(1) FIFO queue , 先进先出,

class queue.Queue(maxsize=0)

(2) LIFO queue, 后进先出

class queue.LifoQueue(maxsize=0)

(3) 带优先级队列, 优先级最低entry value lowest 先了列

class queue.PriorityQueue(maxsize=0)

Multiprocessing.Queue类的主要方法:

  • queue.qsize() # 返回队列长度
  • queue.full() queue.empty()
  • queue.put(item) # 将数据写入队列
  • queue.get() # 将数据抛出队列 ,
  • queue.put_nowait(item), queue.get_nowait() # 无等待写入或抛出

说明:

  • put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。
  • Multiprocessing 的Queue类没有提供Task_done, join方法

Queue模块的其它队列类:
(1) SimpleQueue
简洁版的FIFO队列, 适事简单场景使用
(2) JoinableQueue子类
Queue的子类,具备task_done, join()方法
task_done()表示,最近读出的1个任务已经完成。
join()阻塞队列,直到queue中的所有任务都已完成。

producer – consumer 场景使用Queue的示例 :

import multiprocessing

def producer(numbers, q):
    for x in numbers:
        if x % 2 == 0:
            if q.full():
                print("queue is full")
                break
            q.put(x)
            print(f"put {x} in queue by producer")
    return None

def consumer(q):
    while not q.empty():
        print(f"take data {q.get()} from queue by consumer")
    return None

if __name__ == "__main__":
    # 设置1个queue对象,最大长度为5
    qu = multiprocessing.Queue(maxsize=5,) 

    # 创建producer子进程,把queue做为其中1个参数传给它,该进程负责写
    p5 = multiprocessing.Process(
        name="producer-1",
        target=producer,
        args=([random.randint(1, 100) for i in range(0, 10)], qu)
    )
    p5.start()
    p5.join()
    #创建consumer子进程,把queue做为1个参数传给它,该进程中队列中读
    p6 = multiprocessing.Process(
        name="consumer-1",
        target=consumer,
        args=(qu,)
    )
    p6.start()
    p6.join()

    print(qu.qsize())

4、同步机制通信

(1) 进程间同步锁 – Lock

Multiprocessing也提供了与threading 类似的同步锁机制,确保某个时刻只有1个子进程可以访问某个资源或执行某项任务, 以避免同抢。
例如:多个子进程同时访问数据库表时,如果没有同步锁,用户A修改1条数据后,还未提交,此时,用户B也进行了修改,可以预见,用户A提交的将是B个修改的数据。
加了同步锁,可以确保同时只有1个子进程能够进行写入数据库与提交操作。

下面示例,同时只有1个进程可以执行打印操作。

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

(2) 子进程间协调机制 – Event

Event 机制的工作原理:
1个event 对象实例管理着1个 flag标记, 可以用set()方法将其置为true, 用clear()方法将其置为false, 使用wait()将阻塞当前子进程,直至flag被置为true.
这样由1个进程通过event flag 就可以控制、协调各子进程运行。

Event object的使用方法:
1)主函数: 创建1个event 对象, flag = multiprocessing.Event() , 做为参数传给各子进程
2) 子进程A: 不受event影响,通过event 控制其它进程的运行
o 先clear(),将event 置为False, 占用运行权.
o 完成工作后,用set()把flag置为True。
3) 子进程B, C: 受event 影响
o 设置 wait() 状态,暂停运行
o 直到flag重新变为True,恢复运行

主要方法:
 set(), clear()设置 True/False,
 wait() 使进程等待,直到flag被改为true.
 is_set(), Return True if and only if the internal flag is true.

验证进程间通信 – Event

import multiprocessing
import time
import random

def joo_a(q, ev):
    print("subprocess joo_a start")
    if not ev.is_set():
        ev.wait()
    q.put(random.randint(1, 100))
    print("subprocess joo_a ended")

def joo_b(q, ev):
    print("subprocess joo_b start")
    ev.clear()
    time.sleep(2)
    q.put(random.randint(200, 300))
    ev.set()
    print("subprocess joo_b ended")

def main_event():
    qu = multiprocessing.Queue()
    ev = multiprocessing.Event()
    sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev))
    sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,))
    sub_a.start()
    sub_b.start()
    # ev.set()
    sub_a.join()
    sub_b.join()
    while not qu.empty():
        print(qu.get())

if __name__ == "__main__":
    main_event()

5、共享内存方式通信

(1) 共享变量

子进程之间共存内存变量,要用 multiprocessing.Value(), Array() 来定义变量。 实际上是ctypes 类型,由multiprocessing.sharedctypes模块提供相关功能

注意 使用 share memory 要考虑同抢等问题,释放等问题,需要手工实现。因此在使用共享变量时,建议使用Manager管程来管理这些共享变量。

def  func(num):
    num.value=10.78   #子进程改变数值的值,主进程跟着改变
 
if  __name__=="__main__":
num = multiprocessing.Value("d", 10.0) 
# d表示数值,主进程与子进程可共享这个变量。

    p=multiprocessing.Process(target=func,args=(num,))
    p.start()
    p.join()
 
    print(num.value)

进程之间共享数据(数组型):

import multiprocessing
 
def  func(num):
    num[2]=9999   #子进程改变数组,主进程跟着改变
 
if  __name__=="__main__":
    num=multiprocessing.Array("i",[1,2,3,4,5])   

    p=multiprocessing.Process(target=func,args=(num,))
    p.start() 
    p.join()
 
    print(num[:])

(2) 共享内存 Shared_memory

如果进程间需要共享对象数据,或共享内容,数据较大,multiprocessing 提供了SharedMemory类来实现进程间实时通信,不需要通过发消息,读写磁盘文件来实现,速度更快。
注意:直接使用SharedMemory 存在着同抢、泄露隐患,应通过SharedMemory Manager 管程类来使用, 以确保内存安全。

创建共享内存区:

multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)

方法:
父进程创建shared_memory 后,子进程可以使用它,当不再需要后,使用close(), 删除使用unlink()方法
相关属性:
获取内存区内容: shm.buf
获取内存区名称: shm.name
获取内存区字节数: shm.size

示例:

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

3) ShareableList 共享列表

sharedMemory类还提供了1个共享列表类型,这样就更方便了,进程间可以直接共享python强大的列表
构建方法:
multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)


from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported


b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

6、共享内存管理器Manager

Multiprocessing 提供了 Manager 内存管理器类,当调用1个Manager实例对象的start()方法时,会创建1个manager进程,其唯一目的就是管理共享内存, 避免出现进程间共享数据不同步,内存泄露等现象。

其原理如下: 在这里插入图片描述
Manager管理器相当于提供了1个共享内存的服务,不仅可以被主进程创建的多个子进程使用,还可以被其它进程访问,甚至跨网络访问。本文仅聚焦于由单一主进程创建的各进程之间的通信。

1) Manager的主要数据结构

相关类:multiprocessing.Manager
子类有:

  • multiprocessing.managers.SharedMemoryManager
  • multiprocessing.managers.BaseManager

支持共享变量类型:

  • python基本类型 int, str, list, tuple, list
  • 进程通信对象: Queue, Lock, Event,
  • Condition, Semaphore, Barrier ctypes类型: Value, Array

2) 使用方法:

1)创建管理器对象
snm = Manager()
snm = SharedMemoryManager()

2)创建共享内存变量
新建list, dict
sl = snm.list(), snm.dict()

新建1块bytes共享内存变量,需要指定大小
sx = snm.SharedMemory(size)

新建1个共享列表变量,可用列表来初始化
sl = snm.ShareableList(sequence) 如
sl = smm.ShareableList([‘howdy’, b’HoWdY’, -273.154, 100, True])

新建1个queue, 使用multiprocessing 的Queue类型
snm = Manager()
q = snm.Queue()

示例 :

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

将打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

3) 销毁共享内存变量

方法1: 调用snm.shutdown()方法,会自动调用每个内存块的unlink()方法释放内存。或者 snm.close()
方法2: 使用with语句,结束后会自动释放所有manager变量

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

4) 向管理器注册自定义类型

managers的子类BaseManager提供register()方法,支持注册自定义数据类型。如下例,注册1个自定义MathsClass类,并生成实例。

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/354674.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Bluetooth开发】蓝牙开发入门

BLE 蓝牙设备在生活中无处不在&#xff0c;但是我们也只是将其作为蓝牙模块进行使用&#xff0c;发送简单的AT命令实现数据收发。 那么&#xff0c;像对于一些复杂的使用场合&#xff1a;“车载蓝牙”、"智能手表"、“蓝牙音箱”等&#xff0c;我们不得不去了解底层…

千锋教育+计算机四级网络-计算机网络学习-04

UDP概述 UDP协议 面向无连接的用户数据报协议&#xff0c;在传输数据前不需要先建立连接&#xff1b;目地主机的运输层收到UDP报文后&#xff0c;不需要给出任何确认 UDP特点 相比TCP速度稍快些简单的请求/应答应用程序可以使用UDP对于海量数据传输不应该使用UDP广播和多播应用…

VectorDraw Web Library 10.1003.0.1 Crack

将 CAD 绘图和矢量对象显示添加到您的 HTML5 应用程序。 VectorDraw Web Library 是一个矢量图形库&#xff0c;旨在不仅可以打开 CAD 绘图&#xff0c;还可以在任何支持 HTML 5 标准的平台&#xff08;例如 Windows、Android、IOS 和 Linux&#xff09;上显示通用矢量对象。它…

MySQL 9:MySQL存储引擎

数据库存储引擎是数据库的底层软件组织&#xff0c;数据库管理系统&#xff08;DBMS&#xff09;使用数据引擎来创建、查询、更新和删除数据。不同的存储引擎提供不同的存储机制、索引技术、锁定级别等。 许多不同的数据库管理系统现在支持许多不同的数据引擎。 MySQL的核心是…

字符串匹配 - 模式预处理:KMP 算法(Knuth-Morris-Pratt)

Knuth-Morris-Pratt算法&#xff08;简称KMP&#xff09;是最常用的字符串匹配算法之一。算法简介如下算法解释主要来源于这里&#xff0c;但是通常很难阅读完全&#xff0c;我推荐你直接进入下一节 图例解释部分。我们来观察一下朴素的字符串匹配算法的操作过程。如下图&#…

Linux调试器gdb

本文已收录至《Linux知识与编程》专栏&#xff01; 作者&#xff1a;ARMCSKGT 演示环境&#xff1a;CentOS 7 ​ 目录 前言 正文 下载gdb 生成可调式文件 进入gdb gdb常用指令 查看代码 l 运行程序 r 断点设置 b 显示信息 info 查看断点 info b 删除断点 d …

CentOS7突然没法上网【Network 中wired 图标消失】

参考文章(七种办法)&#xff1a;CentOS 7 右上角网络连接图标消失,设置网络有线消失解决办法 正常图标消失&#xff0c;先在 终端命令 依次执行以下命令 service NetworkManager stop service network restart service NetworkManager start 一、问题真烦 CentOS7图形化界面安装…

Day893.MySQL 实例健康状态检测方法 -MySQL实战

MySQL 实例健康状态检测方法 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于MySQL 实例健康状态检测方法的内容。 在一主一备的双 M 架构里&#xff0c;主备切换只需要把客户端流量切到备库&#xff1b;而在一主多从架构里&#xff0c;主备切换除了要把客户端流量切…

搭建企业级docker仓库—Harbor

一、简介 docker 官方提供的私有仓库 registry&#xff0c;用起来虽然简单 &#xff0c;但在管理的功能上存在不足。 Harbor是一个用于存储和分发Docker镜像的企业级Registry服务器&#xff0c;harbor使用的是官方的docker registry(v2命名是distribution)服务去完成。harbor在…

String是如何保证不变的?反射为什么可以改变String的值?

String是如何保证不变的&#xff1f;反射为什么可以改变String的值&#xff1f; 1. String字符串的源码分析 String 字符串到底能不能改变已经是老生常谈的问题了&#xff0c;但是在面试环节中&#xff0c;依然能够难住不少人。 下面我们根据 JDK1.8 版本下的String源码进行…

微信Hook逆向-获取登录二维码

文章目录前言一、打开Pc微信&#xff0c;切换到二维码界面二、解析当前二维码内容三、利用Cheat Enginer软件扫描二维码解析文本四、寻找静态偏移五.代码获取二维码网址前言 微信二维码可以Hook获取,也可以通过找到静态偏移的方式读取 提示&#xff1a;以下是本篇文章正文内容…

力扣(LeetCode)240. 搜索二维矩阵 II(C++)

题目描述 枚举 枚举整个矩阵&#xff0c;找到等于 target 的元素&#xff0c;则 return true &#xff0c;否则 return false。 class Solution { public:bool searchMatrix(vector<vector<int>>& matrix, int target) {int n matrix.size(), m matrix[0]…

DataWhale 大数据处理技术组队学习task2

三、Hadoop分布式文件系统 1. 产生背景 数据量越来越大&#xff0c;一台独立的计算机已经无法存储所有的数据---->将大规模的数据存储到成百上千的计算机中------为了解决数据管理以及维护极其繁琐与低效------>分布式文件系统 分布式文件系统是管理网络中跨多台计算机…

基于SSM框架的狼途汽车门店管理系统的设计与实现

基于SSM框架的狼途汽车门店管理系统的设计与实现 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、…

AutoCAD学习之基本操作学习笔记

AutoCAD学习 基本操作&#xff08;23.2.15~23.2.17&#xff09; CtrlN 新建一个CAD文档F7 删除格栅F3 对象捕捉&#xff08;很重要啊&#xff0c;如果一直开着&#xff0c;操作起来很费劲。&#xff09;&#xff0c;需要关掉&#xff0c;注意使用snipaste&#xff0c;会不停地…

QT 文件监视系统QFileSystemWatcher监视目录的改变directoryChanged和监视文件的改变fileChanged

QT 文件监视系统QFileSystemWatcher监视目录的改变相关操作说明mainwindow.hmainwindow.cpp调试结果相关操作说明 添加头文件 Header: #include qmake: QT core bool QFileSystemWatcher::addPath(const QString &path)如果路径存在&#xff0c;则会向文件系统监视器添…

Prometheus Docker安装及监控自身

前提环境&#xff1a; Docker环境 涉及参考文档&#xff1a; 安装Prometheus开始 Prometheusnode_exporter Agent组件 一、部署Prometheus 1、启动容器将文件拷贝出来 docker run -d prom/prometheus2、容器将文件拷贝出来 docker cp 容器ID:/usr/share/prometheus/conso…

深度学习笔记:误差反向传播(1)

1 计算图 计算图使用图&#xff08;由节点和边构成的图&#xff09;来表达算式。 如图&#xff0c;我们用节点代表运算符号&#xff0c;用边代表传入的参数&#xff0c;即可算出购买苹果和橘子的总价格。 2 计算图的局部计算 局部计算意味着每个节点只处理和其相关的运算&…

网页设计html期末大作业

网页设计html期末大作业网页设计期末大作业-自制网站大一期末作业&#xff0c;外卖网站设计网页设计期末大作业-精美商城-首页框架网页设计期末大作业-自制网站 有导航栏&#xff0c;轮播图&#xff0c;按钮均可点进去&#xff0c;如下图所示 点我下载资源》》》》 大一期末…

linux ubuntu查日志信息以及错误排查

目录 一、linux的日志文件 1、常用日志文件 2、其他日志文件 二、历史日志的查看 1、查看Logrotate的配置信息 2、查看日志配置 一、linux的日志文件 Linux系统中最有趣的(可能也是最重要的)目录之一是/var/log。根据文件系统层次结构标准&#xff0c;在系统中运行的大多数…