在Python中优雅地用多进程:进程池 Pool、管道通信 Pipe、队列通信 Queue、共享内存 Manager Value

news2024/12/23 6:08:24

Python 自带的多进程库 multiprocessing 可实现多进程。我想用这些短例子示范如何优雅地用多线程。中文网络上,有些人只是翻译了旧版的 Python 官网的多进程文档。而我这篇文章会额外讲一讲下方加粗部分的内容。

  • 创建进程 Process,fork 直接继承资源,所以初始化更快,spawn 只继承必要的资源,所以更省内存,「程序的入口」 if name == main
  • 进程池 Pool,Pool 只能接受一个参数,但有办法传入多个
  • 管道通信 Pipe,最基本的功能,运行速度快
  • 队列通信 Queue,有最常用的功能,运行速度稍慢
  • 共享内存 Manager Value,Python3.9 新特性 真正的共享内存 shared_memory

如下所示,中文网络上一些讲 Python 多进程的文章,很多重要的东西没讲(毕竟只是翻译了 Python 官网的多进程旧版文档)。上方的加粗部分他们没讲,但是这是做多进程总需要知道的内容。

  • 若你无法流畅阅读有专人更新的 Python 官网多进程英文文档 ,那么姑且可看写于 2019 不保证更新的南山南:一篇文章搞定 Python 多进程 (全)
  • 知我莫言:谈谈 python 的 GIL、多线程、多进程 ,点赞多但旧,写于 2016,你还不如看我下方写的「简述何为多线程 threading 与多进程 processing」

1.多线程与多进程的区别

多线程 threading: 一个人有与异性聊天和看剧两件事要做。单线程的她可以看完剧再去聊天,但这样子可能就没人陪她聊天了「哼,发消息不回」。我们把她看成一个 CPU 核心,为她开起多线程——先看一会剧,偶尔看看新消息,在两件事(线程)间来回切换。多线程:单个 CPU 核心可以同时做几件事,不至于卡在某一步傻等着。

用处:爬取网站信息(爬虫),等待多个用户输入

多进程 processing: 一个人有很多砖需要搬,他领取手套、推车各种物资(向系统申请了资源)然后开始搬砖。然而他身边有很多人,我们让这些人去帮他!(一核有难,八核围观)。于是他们做了分工,砖很快就搬完了。多进程让多个 CPU 核心可以一起做事,不至于只有一人干活而其他人傻站着。

用处:进行高性能计算。只有多进程方案设计合理,才能加速计算。

2. 全局锁与多进程

为何在 Python 里用多进程这么麻烦? 因为 Python 的线程是操作系统线程,因此要有 Python 全局解释器锁。一个 python 解释器进程内有一条主线程,以及多条用户程序的执行线程。即使在多核 CPU 平台上,由于 GIL 的存在,所以禁止多线程的并行执行。——来自百度百科词条 全局解释器锁。发展历程:

  1. Python 全局锁。Python 3.2 的时候更新过 GIL。在我小时候,由于 Python GIL 的存在(全局解释器锁 Global Interpreter Lock) ,此时 Python 无法靠自己实现多进程
  2. 外部多进程通信。Python3.5。在 2015 年,要么用 Python 调用 C 语言(如 Numpy 此类用其他语言在底层实现多进程的第三方库),要么需要在外部代码(MPI 2015)
  3. 内置多进程通信。Python 3.6 才让 multiprocessing 逐渐发展成一个能用的 Python 内置多进程库,可以进行进程间的通信,以及有限的内存共享
  4. 共享内存。Python 3.8 在 2019 年增加了新特性 shared_memory

3.子进程 Process

多进程的主进程一定要写在程序入口 if name ==‘main’: 内部

def function1(id):  # 这里是子进程
    print(f'id {id}')

def run__process():  # 这里是主进程
    from multiprocessing import Process
    process = [mp.Process(target=function1, args=(1,)),
               mp.Process(target=function1, args=(2,)), ]
    [p.start() for p in process]  # 开启了两个进程
    [p.join() for p in process]   # 等待两个进程依次结束

# run__process()  # 主线程不建议写在 if外部。由于这里的例子很简单,你强行这么做可能不会报错
if __name__ =='__main__':
    run__process()  # 正确做法:主线程只能写在 if内部

尽管在这个简单的例子里,把主进程 run__process() 写在程序入口 if 外部不会有报错。但是你最好还是按我要求去做。详细解释的内容过长,我写在→「Python 程序入口有重要功能(多线程)而非编程习惯」

上面的例子只是用 Process 开启了多进程,不涉及进程通信。当我准备把一个串行任务编排成多进程时,我还需要多进程通信。进程池 Pool 可以让主程序获得子进程的计算结果(不太灵活,适合简单任务),管道 Pipe 队列 Queue 等等 可以让进程之间进行通信(足够灵活)。共享值 Value 共享数组 Array 共享内容 shared_memory(Python 3.6 Python3.9 的新特性,还不太成熟)下面开讲。

Python 多进程可以选择两种创建进程的方式,spawn 与 fork。分支创建:fork 会直接复制一份自己给子进程运行,并把自己所有资源的 handle 都让子进程继承,因而创建速度很快,但更占用内存资源。分产创建:spawn 只会把必要的资源的 handle 交给子进程,因此创建速度稍慢。详细解释请看 Stack OverFlow multiprocessing fork vs spawn 。(分产 spawn 是我自己随便翻译的,有更好的翻译请推荐。我绝不把 handle 翻译成句柄)

multiprocessing.set_start_method('spawn')  # default on WinOS or MacOS
multiprocessing.set_start_method('fork')   # default on Linux (UnixOS)

请注意:我说 分支 fork 在初始化创建多进程的时候比 分产 spawn 快,而不是说高性能计算会比较快。通常高性能计算需要让程序运行很久,因此为了节省内存以及进程安全,我建议选择 spawn。

4.进程池 Pool

几乎 Python 多进程代码都需要你明明白白地调用 Process。而进程池 Pool 会自动帮我们管理子进程。Python 的 Pool 不方便传入多个参数,我这里提供两个解决思路:

思路 1:函数 func2 需要传入多个参数,现在把它改成一个参数,无论你直接让 args 作为一个元组 tuple、词典 dict、类 class 都可以

思路 2:使用 function.partial Passing multiple parameters to pool.map() function in Python。这个不灵活的方法固定了其他参数,且需要导入 Python 的内置库,我不推荐

import time

def func2(args):  # multiple parameters (arguments)
    # x, y = args
    x = args[0]  # write in this way, easier to locate errors
    y = args[1]  # write in this way, easier to locate errors

    time.sleep(1)  # pretend it is a time-consuming operation
    return x - y


def run__pool():  # main process
    from multiprocessing import Pool

    cpu_worker_num = 3
    process_args = [(1, 1), (9, 9), (4, 4), (3, 3), ]

    print(f'| inputs:  {process_args}')
    start_time = time.time()
    with Pool(cpu_worker_num) as p:
        outputs = p.map(func2, process_args)
    print(f'| outputs: {outputs}    TimeUsed: {time.time() - start_time:.1f}    \n')

    '''Another way (I don't recommend)
    Using 'functions.partial'. See https://stackoverflow.com/a/25553970/9293137
    from functools import partial
    # from functools import partial
    # pool.map(partial(f, a, b), iterable)
    '''

if __name__ =='__main__':
    run__pool()

5.管道 Pipe

顾名思义,管道 Pipe 有两端,因而 main_conn, child_conn = Pipe() ,管道的两端可以放在主进程或子进程内,我在实验中没发现主管道口 main_conn 和子管道口 child_conn 的区别。两端可以同时放进去东西,放进去的对象都经过了深拷贝:用 conn.send() 在一端放入,用 conn.recv() 另一端取出,管道的两端可以同时给多个进程。conn 是 connect 的缩写。

import time

def func_pipe1(conn, p_id):
    print(p_id)

    time.sleep(0.1)
    conn.send(f'{p_id}_send1')
    print(p_id, 'send1')

    time.sleep(0.1)
    conn.send(f'{p_id}_send2')
    print(p_id, 'send2')

    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)

    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)


def func_pipe2(conn, p_id):
    print(p_id)

    time.sleep(0.1)
    conn.send(p_id)
    print(p_id, 'send')
    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)


def run__pipe():
    from multiprocessing import Process, Pipe

    conn1, conn2 = Pipe()

    process = [Process(target=func_pipe1, args=(conn1, 'I1')),
               Process(target=func_pipe2, args=(conn2, 'I2')),
               Process(target=func_pipe2, args=(conn2, 'I3')), ]

    [p.start() for p in process]
    print('| Main', 'send')
    conn1.send(None)
    print('| Main', conn2.recv())
    [p.join() for p in process]

if __name__ =='__main__':
    run__pipe()

如果追求运行更快,那么最好使用管道 Pipe 而非下面介绍的队列 Queue,详细请移步 Python pipes and queues performance ↓

So yes, pipes are faster than queues - but only by 1.5 to 2 times, what did surprise me was that Python 3 is MUCH slower than Python 2 - most other tests I have done have been a bit up and down (as long as it is Python 3.4 - Python 3.2 seems to be a bit of a dog - especially for memory usage).

曾经用到 Python 多线程队列功能写过一个实际例子 ↓,若追求极致性能,还可以把里面的 Queue 改为 Pipe。

Pipe 还有 duplex 参数poll() 方法 需要了解。默认情况下 duplex==True,若不开启双向管道,那么传数据的方向只能 conn1 ← conn2 。conn2.poll()==True 意味着可以马上使用 conn2.recv() 拿到传过来的数据。conn2.poll(n) 会让它等待 n 秒钟再进行查询。

from multiprocessing import Pipe

conn1, conn2 = Pipe(duplex=True)  # 开启双向管道,管道两端都能存取数据。默认开启
# 
conn1.send('A')
print(conn1.poll())  # 会print出 False,因为没有东西等待conn1去接收
print(conn2.poll())  # 会print出 True ,因为conn1 send 了个 'A' 等着conn2 去接收
print(conn2.recv(), conn2.poll(2))  # 会等待2秒钟再开始查询,然后print出 'A False'

尽管我下面的例子不会报错,但这是因为它过于简单,没有真的开多线程去跑,也没有写在程序入口的 if 内部。很多时候 Pipe 运行会快一点,但是它的功能太少了,得用 Queue。最明显的一个区别是:

conn1, conn2 = multiprocessing.Pipe()  # 管道有两端,某一端放入的东西,只能在另一端拿到
queue = multiprocessing.Queue()        # 队列只有一个,放进去的东西可以在任何地方拿到。

6. 队列 Queue

可以 import queue 调用 Python 内置的队列,在多线程里也有队列 from multiprocessing import Queue。下面提及的都是多线程的队列。

队列 Queue 的功能与前面的管道 Pipe 非常相似:无论主进程或子进程,都能访问到队列,放进去的对象都经过了深拷贝。不同的是:管道 Pipe 只有两个断开,而队列 Queue 有基本的队列属性,更加灵活,详细请移步 Stack Overflow Multiprocessing - Pipe vs Queue。

def func1(i):
    time.sleep(1)
    print(f'args {i}')

def run__queue():
    from multiprocessing import Process, Queue

    queue = Queue(maxsize=4)  # the following attribute can call in anywhere
    queue.put(True)
    queue.put([0, None, object])  # you can put deepcopy thing
    queue.qsize()  # the length of queue
    print(queue.get())  # First In First Out
    print(queue.get())  # First In First Out
    queue.qsize()  # the length of queue

    process = [Process(target=func1, args=(queue,)),
               Process(target=func1, args=(queue,)), ]
    [p.start() for p in process]
    [p.join() for p in process]

if __name__ =='__main__':
    run__queue()

除了上面提及的 Python 多线程,读取多个 (海康 \ 大华) 网络摄像头的视频流 ,我自己写的开源的强化学习库:小雅 ElegantRL 也使用了 Queue 进行多 CPU 多 GPU 训练,为了提速,我已经把 Queue 改为 Pipe。

7. 共享内存 Manager

为了在 Python 里面实现多进程通信,上面提及的 Pipe Queue 把需要通信的信息从内存里深拷贝了一份给其他线程使用(需要分发的线程越多,其占用的内存越多)。而共享内存会由解释器负责维护一块共享内存(而不用深拷贝),这块内存每个进程都能读取到,读写的时候遵守管理(因此不要以为用了共享内存就一定变快)。

Manager 可以创建一块共享的内存区域,但是存入其中的数据需要按照特定的格式,Value 可以保存数值,Array 可以保存数组,如下。这里不推荐认为自己写代码能力弱的人尝试。下面这里例子来自 Python 官网的 Document。

# https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing%20array#multiprocessing.Array

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

我删掉了 Python 3.8 的 shared_momery 介绍,这部分有 Bug

下文来自 Stack Overflow,问题 Shared memory in multiprocessing 下 thuzhf 的回答 2021-01 :

For those interested in using Python3.8 's shared_memory module, it still has a bug which hasn’t been fixed and is affecting Python3.8/3.9/3.10 by now (2021-01-15). The bug is about resource tracker destroys shared memory segments when other processes should still have valid access. So take care if you use it in your code.

PyTorch 也有自带的多进程 torch.multiprocessing

How to share a list of tensors in PyTorch multiprocessing? rozyang 的回答 ,非常简单,核心代码如下:

import torch.multiprocessing as mp
tensor.share_memory_()

正文已经结束,我把部分 multiprocessing 的代码都放在 github。希望大家能写出让自己满意的多线程。我设计高性能的多进程时,会遵守以下规则:

  • 尽可能少传一点数据
  • 尽可能减少主线程的负担
  • 尽可能不让某个进程傻等着
  • 尽可能减少进程间通信的频率

开源的深度强化学习 (DRL) 算法库 伯克利的 Ray-project Rllib 训练快,但太复杂,OpenAI 的 SpinningUp 简单,但不快。刚好我又懂一点多进程、Numpy、深度学习框架、深度强化学习这些双层优化算法,所以我觉得自己也写一个 DRL 库难度不大,于是开源了强化学习库:小雅 ElegantRL。让别人好好看看,DRL 库挺简单的一个东西弄那么复杂做什么?

尽管这个库会一直保持框架小巧、代码优雅来方便入门深度强化学习的人,但 ElegantRL 却把训练效率放在首位(正因如此,ElegantRL 与 SpinningUp 的定位不同),所以我需要用 Python 的多进程来加速 DRL 的训练。因而顺便写【在 Python 中优雅地用多进程】这篇东西。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/754821.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

zigbee DL-20无线串口收发模块使用(双车通讯,电赛模块推荐)

前言 (1)通常有时候,我们可能会需要让两个MCU进行通讯。而zigbee是最适合两个MCU短距离通讯的模块。他使用极其简单,非常适合两款MCU之间的进行数据交互。 (2)在各类比赛中,经常出现需要两个MCU…

独立看门狗 IWDG

独立看门狗介绍 Q:什么是看门狗? A:可以理解为对于一只修勾的定时投喂,如果不给它吃东西就会狂叫,因此可以通过观察修勾的状态来判断喂它的人有没有正常工作。 在由单片机构成的微型计算机系统中,由于单…

【业务功能篇44】Mysql 海量数据查询优化,进行分区操作

业务场景:当前有个发料表,随着业务数据量增多,达到了几千万级别水平,查询的效率就越来越低了,针对当前的架构情况,我们进行了分区的设置,通过对时间字段,按年月,一个月作…

ios 启动页storyboard 使用记录

本文简单记录ios启动页storyboard 如何使用和注意事项。 xcode窗口简介 以xcode14为例,新建项目如下图,左边文件栏中的LaunchScreen.storyboard 为默认启动页布局。窗口中间部分是storyboard中的组件列表,右侧为预览,可以看到渲…

H3C-Cloud Lab-实验-DHCP实验

实验拓扑图: 实验需求: 1、按照图示为R1配置IP地址 2、配置R1为DHCP服务器,提供服务的地址池为192.168.1.0/24网段,网关为192.168.1.254,DNS服务器地址为202.103.24.68,202.103.0.117 3、192.168.1.10-1…

Camtasia Studio 2023 最新中文版,camtasiaStudio如何添加背景音乐

Camtasia2023的视频编辑工具可以帮助用户剪辑、裁剪、旋转、调整大小、添加特效、混合音频等。用户还可以使用Camtasia2023的字幕功能添加字幕和注释,以及使用其内置的特效和转场来提高视频的视觉效果。 Camtasia Studio 2023新功能介绍 的光标增强 由于光标在屏幕…

解决win10电脑无法访问局域网内其它共享文件问题

问题描述 今天需要上传文件到一个共享的局域网文件夹里,在我的电脑和浏览器访问//192.168.0.16//public都提升访问受限,开始以为是因为用户没授权,后来一般沟通后,发现其它电脑都能正常访问的,所以确定是自己电脑配置…

Caerulein,17650-98-5,雨蛙肽,以三氟醋酸盐形式提供的十肽分子

资料编辑|陕西新研博美生物科技有限公司小编MISSwu​ Caerulein |雨蛙素,雨蛙肽,蓝肽| CAS:17650-98-5 | 纯度:95% ------雨蛙素结构式---- ----试剂参数信息--- CAS号:17650-98-5 外观(Appearance&a…

java中使用POI生成Excel并导出

注:本文章中代码均为本地Demo版本,若后续代码更新将不会更新文章 需求说明及实现方式 根据从数据库查询出的数据,将其写入excel表并导出 我的想法是通过在实体属性上写自定义注解的方式去完成。因为我们在代码中可以通过反射的方式去获取实体…

js小写金额转大写 自动转换

// 小写转为大写convertCurrency(money) {var cnNums [零, 壹, 贰, 叁, 肆, 伍, 陆, 柒, 捌, 玖]var cnIntRadice [, 拾, 佰, 仟]var cnIntUnits [, 万, 亿, 兆]var cnDecUnits [角, 分, 毫, 厘]// var cnInteger 整var cnIntLast 元var maxNum 999999999999999.9999var…

vulnhub靶场red:1教程

靶场搭建 靶机下载地址:Red: 1 ~ VulnHub 难度:中等 信息收集 arp-scan -l 这里没截图忘记了,就只是发现主机 扫描端口 nmap --min-rate 1000 -p- 192.168.21.130 nmap -sT -sV -sC -O -p22,80 192.168.21.130 先看80端口 看到链接点一…

怎么又快又准的确定业务系统属于等保几级?

等保2.0政策已经落地严格执行了一段时间,但大家对于等保政策还有很多不清楚。这不不少人在问,怎么又快有准的确定业务系统属于等保几级? 怎么又快又准的确定业务系统属于等保几级? 【回答】:根据《信息安全等级保护管…

AtcoderABC255场

A - You should output ARC, though this is ABC.A - You should output ARC, though this is ABC. 题目大意 给定整数R和C以及一个2x2矩阵A,需要输出A R,C的值。 思路分析 简单的矩阵查找。根据给定的索引R和C,找到矩阵A中相应位置的元素&#xff0c…

实例014 OutLook界面

实例说明 程序主界面包括菜单栏、工具栏、状态栏和树状视图。OutLook界面美观、友好,是一个很实用的程序主界面,并且菜单栏和工具栏是可移动的。运行本例效果如图1.14所示。 图1.14 Out Look界面 技术要点 一般程序的菜单栏和工具栏是不可移动的&…

【Ajax】笔记-服务端响应JSON数据

服务端响应JSON数据 构建测试案例 键盘按键触发请求服务端&#xff1a; 键盘按下触发事件 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width,…

项目中前端如何实现无感刷新 token!

场景&#xff1a;线上平台有时会出现用户正在使用的时候&#xff0c;突然要用户去进行登录&#xff0c;这样会造成很不好的用户体验。 1.请求采用的是axios 2.平台的采用的 JWT(JSON Web Tokens) 进行用户登录鉴权。 原因&#xff1a; 1.突然跳转到登录页面&#xff0c;是…

【IVI】EVS 应用

EVS 应用 1、EVS启动2、EvsStateControl.cpp 控制管理2.1 EvsStateControl初始化2.2 EvsVehicleListener.h唤起处理EvsStateControl::updateLoop() 3、EVS 应用逻辑流程 android12-release 增强型视觉系统 (EVS) 1、EVS启动 Android 包含与 EVS 管理器和车载 HAL 通信的 EVS 应…

CAD2021安装教程适合新手小白【附安装包和手册】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、下载文件二、使用步骤1.安装软件前&#xff0c;断开电脑网络&#xff08;拔掉网线、关闭WIFI&#xff09;2、鼠标右击【AutoCAD2021(64bit)】压缩包选择【解…

解密:GPT-4框架与训练过程,数据集组成,并行性的策略,专家权衡,推理权衡等细节内容

大家好&#xff0c;我是微学AI&#xff0c;今天给大家解密一下GPT-4框架与训练过程&#xff0c;数据集组成&#xff0c;并行性的策略&#xff0c;专家权衡&#xff0c;推理权衡等细节内容。2023年3月14日&#xff0c;OpenAI发布GPT-4&#xff0c;然而GPT-4的框架没有公开&#…

Nacos服务注册和配置中心(Config,Eureka,Bus)2

Nacos数据模型 Nacos领域模型,Namespace命名空间、Group分组、集群这些都是为了进行归类管理&#xff0c;把服务和配置文件进行归类&#xff0c;归类之后就可以实现一定的效果&#xff0c;比如隔离。对于服务来说&#xff0c;不同命名空间中的服务不能够互相访问调用 N…