Python 多进程解析:Multiprocessing 高效并行处理的奥秘

news2024/9/25 10:32:18

Python 多进程解析:Multiprocessing 高效并行处理的奥秘

文章目录

  • Python 多进程解析:Multiprocessing 高效并行处理的奥秘
      • 一 多进程
        • 1 导入进程标准模块
        • 2 定义调用函数
        • 3 创建和启动进程
      • 二 存储进程结果 Queue
      • 三 threading & multiprocessing 对比
        • 1 创建多进程 multiprocessing
        • 2 创建多线程 multithread
        • 3 创建普通函数
        • 4 创建对比时间函数
        • 5 运行结果
      • 四 进程池 Pool
        • 1 进程池 Pool() 和 map()
        • 2 自定义核数量
        • 3 apply_async 单结果返回
        • 4 apply_async 多结果返回
        • 5 划重点
      • 五 共享内存 shared memory
      • 六 进程锁 Lock
        • 1 不加进程锁
        • 2 加进程锁
      • 七 完整代码示例
      • 八 源码地址

在 Python 编程中,多进程(Multiprocessing)是一种提高程序执行效率的重要手段。本文深入解析了多进程的概念与应用,帮助开发者充分利用多核处理器的计算能力。我们从基本的进程创建与启动开始,讲解了如何通过 Queue 实现进程间的数据传递,并通过对比多进程与多线程的性能差异,揭示了多进程在处理 CPU 密集型任务时的显著优势。文章还详细介绍了进程池(Pool)的使用方法,包括 mapapply_async 的不同应用场景。最后,我们探讨了共享内存和进程锁的使用,确保多进程在并发操作中的数据安全性。本文为希望掌握多进程编程的读者提供了全面且易懂的实践指导。

一 多进程

Multiprocessing 是一种编程和执行模式,它允许多个进程同时运行,以此提高应用程序的效率和性能。在 Python 中,multiprocessing 模块可以帮助你创建多个进程,使得每个进程都可以并行处理任务,从而有效利用多核处理器的能力。

1 导入进程标准模块
import multiprocessing as mp
2 定义调用函数
def job(a, d):
    print('你好 世界')
3 创建和启动进程
# 创建进程
p1 = mp.Process(target=job, args=(1, 2))
# 启动进程
p1.start()
# 连接进程
p1.join()

二 存储进程结果 Queue

1 存入输出到 Queue

# 该函数没有返回值!!!
def job02(q):
    res = 0
    for i in range(1000):
        res += i + i ** 2 + i ** 3
    q.put(res)  #

def my_result_process02():
    q = mp.Queue()
    p1 = mp.Process(target=job02, args=(q,))
    p2 = mp.Process(target=job02, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1)
    print(res2)
    print(res1 + res2)

三 threading & multiprocessing 对比

1 创建多进程 multiprocessing
def job03(q):
    res = 0
    for i in range(1000000):
        res += i + i ** 2 + i ** 3
    # 结果加 queue
    q.put(res)


# 多核运算多进程
def multicore03():
    q = mp.Queue()
    p1 = mp.Process(target=job03, args=(q,))
    p2 = mp.Process(target=job03, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:', res1 + res2)
2 创建多线程 multithread
# 单核运算多线程
def multithread03():
    # thread可放入process同样的queue中
    q = mp.Queue()
    t1 = td.Thread(target=job03, args=(q,))
    t2 = td.Thread(target=job03, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)
3 创建普通函数
def normal03():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i ** 2 + i ** 3
    print('normal:', res)
4 创建对比时间函数
def time_result03():
    st = time.time()
    normal03()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread03()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore03()
    print('multicore time:', time.time() - st2)
5 运行结果
normal03: 499999666667166666000000
normal03 time: 0.6855959892272949
multithread03: 499999666667166666000000
multithread03 time: 0.6804449558258057
multicore03: 499999666667166666000000
multicore03 time: 0.38849496841430664

我运行的是 normal03 > multithread03 > multicore03normal03multithread03 相差不大,multicore03normal03multithread03 快将近一倍。

四 进程池 Pool

使用进程池 Pool ,Python 会自行解决多进程问题。

1 进程池 Pool() 和 map()

map() 返回的是多结果。

def job04(x):
    # Pool的函数有返回值
    return x * x

def multicore04():
    # Pool的函数有返回值
    pool = mp.Pool()
    # 自分配 CPU 计算
    res = pool.map(job04, range(10))
    print(res)
2 自定义核数量

Pool 默认大小是 CPU的核数,传入 processes 参数自定义需要的核数量。

def multicore05():
  	# 定义CPU核数量为3
    pool = mp.Pool(processes=3)  
    res = pool.map(job04, range(10))
    print(res)
3 apply_async 单结果返回

apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的, 所以在传入值后需要加逗号, 同时需要用 get() 方法获取返回值。

def multicore06():
    pool = mp.Pool()
    res = pool.apply_async(job04, (2,))
    # 用get获得结果
    print(res.get())
4 apply_async 多结果返回
def multicore07():
    pool = mp.Pool()
    multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
    # 用get获得结果
    print([res.get() for res in multi_res])
5 划重点
  • Pool 默认调用是 CPU 的核数,传入 processes 参数可自定义CPU核数。
  • map() 放入迭代参数,返回多个结果。
  • apply_async() 只能放入一组参数,并返回一个结果,如果想得到 map() 的效果需要通过迭代。

五 共享内存 shared memory

1 定义 Shared Value

value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

2 定义 Shared Array

它只能是一维数组

array = mp.Array('i', [1, 2, 3, 4])

其中 d 和 i 参数用来设置数据类型的,d 表示一个双精浮点类型,i 表示一个带符号的整型,参考数据类型如下:

Type codeC TypePython TypeMinimum size in bytesNotes
'b'signed charint1
'B'unsigned charint1
'u'wchar_tUnicode character2(1)
'h'signed shortint2
'H'unsigned shortint2
'i'signed intint2
'I'unsigned intint2
'l'signed longint4
'L'unsigned longint4
'q'signed long longint8
'Q'unsigned long longint8
'f'floatfloat4
'd'doublefloat8

具体链接:Efficient arrays of numeric values

六 进程锁 Lock

1 不加进程锁

争抢共享内存

def job08(v, num):
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value, end="\n")


def multicore08():
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job08, args=(v, 1))
    p2 = mp.Process(target=job08, args=(v, 3))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()
2 加进程锁
def job09(v, num, l):
    l.acquire()  # 锁住
    for _ in range(5):
        # print(v.value, num)
        time.sleep(0.1)
        v.value = v.value + num  # 获取共享内存
        print(v.value)
    l.release()  # 释放


def multicore09():
    l = mp.Lock()  # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享内存
    
    p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
    p1.start()
    p1.join()

    p2 = mp.Process(target=job09, args=(v, 3, l))
    p2.start()
    p2.join()


# def multicore10():
#     l = mp.Lock()  # 定义一个进程锁
#     v = mp.Value('i', 0)  # 定义共享内存
#     p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
#     p2 = mp.Process(target=job09, args=(v, 3, l))
#     p1.start()
#     p2.start()
#     p1.join()
#     p2.join()

在这个示例中,必须先执行 p1 以达到预期效果。分别运行 multicore09multicore10 会发现一些有意思的情况。

七 完整代码示例

:建议在运行 main.py 对应的代码功能时,逐行使用注释进行操作。

# This is a sample Python script.

# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.

import multiprocessing as mp
import threading as td
import time as time


def print_hi(name):
    # Use a breakpoint in the code line below to debug your script.
    print(f'Hi, {name}')  # Press ⌘F8 to toggle the breakpoint.

    # 创建进程
    p1 = mp.Process(target=job, args=(1, 2))
    # 启动进程
    p1.start()

    # Shared Value
    value1 = mp.Value('i', 0)
    value2 = mp.Value('d', 3.14)
    # Shared Array,只能是一维数组
    array = mp.Array('i', [1, 2, 3, 4])


def job(a, d):
    print('你好 世界')


# 该函数没有返回值!!!
def job02(q):
    res = 0
    for i in range(1000):
        res += i + i ** 2 + i ** 3
    q.put(res)  #


def my_result_process02():
    q = mp.Queue()
    p1 = mp.Process(target=job02, args=(q,))
    p2 = mp.Process(target=job02, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1)
    print(res2)
    print(res1 + res2)


def job03(q):
    res = 0
    for i in range(1000000):
        res += i + i ** 2 + i ** 3
    # 结果加 queue
    q.put(res)


# 多核运算多进程
def multicore03():
    q = mp.Queue()
    p1 = mp.Process(target=job03, args=(q,))
    p2 = mp.Process(target=job03, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore03:', res1 + res2)


# 单核运算多线程
def multithread03():
    # thread可放入process同样的queue中
    q = mp.Queue()
    t1 = td.Thread(target=job03, args=(q,))
    t2 = td.Thread(target=job03, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread03:', res1 + res2)


def normal03():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i ** 2 + i ** 3
    print('normal03:', res)


def time_result03():
    st = time.time()
    normal03()
    st1 = time.time()
    print('normal03 time:', st1 - st)
    multithread03()
    st2 = time.time()
    print('multithread03 time:', st2 - st1)
    multicore03()
    print('multicore03 time:', time.time() - st2)


def job04(x):
    # Pool的函数有返回值
    return x * x


def multicore04():
    # Pool的函数有返回值
    pool = mp.Pool()
    # 自分配 CPU 计算
    res = pool.map(job04, range(10))
    print(res)


def multicore05():
    pool = mp.Pool(processes=3)  # 定义CPU核数量为3
    res = pool.map(job04, range(10))
    print(res)


def multicore06():
    pool = mp.Pool()
    # apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,
    # 所以在传入值后需要加逗号, 同时需要用get()方法获取返回值
    res = pool.apply_async(job04, (2,))
    # 用get获得结果
    print(res.get())


def multicore07():
    pool = mp.Pool()
    multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
    # 用get获得结果
    print([res.get() for res in multi_res])


def job08(v, num):
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value, end="\n")


def multicore08():
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job08, args=(v, 1))
    p2 = mp.Process(target=job08, args=(v, 3))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()


def job09(v, num, l):
    l.acquire()  # 锁住
    for _ in range(5):
        # print(v.value, num)
        time.sleep(0.1)
        v.value = v.value + num  # 获取共享内存
        print(v.value)
    l.release()  # 释放


def multicore09():
    l = mp.Lock()  # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享内存

    p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
    p1.start()
    p1.join()

    p2 = mp.Process(target=job09, args=(v, 3, l))
    p2.start()
    p2.join()


def multicore10():
    l = mp.Lock()  # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享内存
    p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
    p2 = mp.Process(target=job09, args=(v, 3, l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()


# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    print_hi('什么是 Multiprocessing')
    my_result_process02()
    time_result03()
    multicore04()
    multicore05()
    multicore06()
    multicore07()
    multicore08()
    multicore09()
    # multicore10()

# See PyCharm help at https://www.jetbrains.com/help/pycharm/

复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。

Hi, 什么是 Multiprocessing
你好 世界
249833583000
249833583000
499667166000
normal03: 499999666667166666000000
normal03 time: 0.7139420509338379
multithread03: 499999666667166666000000
multithread03 time: 0.6696178913116455
multicore03: 499999666667166666000000
multicore03 time: 0.3917398452758789
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
3
4
7
8
11
12
1515

16
19
1
2
3
4
5
8
11
14
17
20

八 源码地址

代码地址:

国内看 Gitee 之 什么是 Multiprocessing.py

国外看 GitHub 之 什么是 Multiprocessing.py

引用 莫烦 Python

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2163303.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于单片机的指纹打卡系统

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于STC89C52RC,采用两个按键替代指纹,一个按键按下,LCD12864显示比对成功,则 采用ULN2003驱动步进电机转动,表示开门,另一个…

电脑桌面归纳小窗口如何设置?电脑桌面一键整理工具分享!

电脑桌面归纳小窗口如何设置?日常使用电脑的过程中,随着文件、应用程序的不断增加,桌面往往会变得杂乱无章,这不仅影响了美观,也降低了工作效率。幸运的是,现代技术为我们提供了多种桌面整理工具&#xff0…

【QA-MISRA】解决使用命令行扫描项目后看不到报告的问题

1、 文档目标 解决使用命令行扫描项目后看不到报告的问题 2、 问题场景 客户使用命令行扫描项目后看不到报告,原因是客户未设置和勾选报告格式就导出了DAX文件进行命令行直接扫描。 3、软硬件环境 1、软件版本: QA-MISRA23.04 2、机器环境&#xff1…

李宏毅2023机器学习作业HW07解析和代码分享

ML2023Spring - HW7 相关信息: 课程主页 课程视频 Kaggle link 回来了 : ) Sample code HW07 视频 HW07 PDF 个人完整代码分享: GitHub | Gitee | GitCode P.S. HW7 的代码都很易懂,可以和 2024 年的新课:生成式AI导论做一个很好的衔接&#…

开源 AI 智能名片与 S2B2C 商城小程序:嫁接权威实现信任与增长

摘要:本文探讨了嫁接权威在产品营销中的重要性,并结合开源 AI 智能名片与 S2B2C 商城小程序,阐述了如何通过与权威关联来建立客户信任,提升产品竞争力。强调了在当今商业环境中,巧妙运用嫁接权威的方法,能够…

一款前后端分离设计的企业级快速开发平台,支持单体服务与微服务之间灵活切换(附源码)

前言 当前软件开发面临诸多挑战,诸如开发效率低下、重复工作多、维护成-本高等问题,这些问题在一定程度上阻碍了项目的进展。针对这些痛点,我们迫切需要一款既能提升开发效率又能降低维护成-本的处理方案。由此,一款基于前后端分…

HDMI20协议解析_Audio_Clock_Regeneration

HDMI20协议解析_Audio_Clock_Regeneration 1.版本说明 日期作者版本说明20240918风释雪初始版本 2.概述 当通过HDMI传输音频信号时,Audio Clock Regeneration(ACR)是必须要传输的数据包之一; HDMI传输过程中,音频采样…

数学建模-线性规划讲解(Matlab版本)

引言 相信不少小伙伴刚开始接触数学建模时,第一个学习的算法就是运筹学的重要分支--数学规划,而数学规划当中重要的分支就是线性规划了。在这里笔者参考了司守奎和孙玺菁老师的《数学建模算法与应用》(第三版)这本书,以此来讲讲关…

同等学力申硕英语多少分及格

同等学力申硕全国统考与往年的分数线一样,英语、学科综合均为60分合格通过制,满分均100分。 单科分数未达到及格线的考生,次年5月可以参加单科的补考 同等学力申硕的意义和作用 授予同等学力人员硕士学位是国家为同等学力人员开辟的获得学位…

前端——阿里图标的使用

阿里图标 将小图标定义成字体,通过引入字体的方式来展示这些图标 1.打开阿里图标库 https://www.iconfont.cn/ 2.登录 / 注册一个账号 3.选中你需要使用的图标 并且把它加入购物车 4.全部选择完之后 点击右上角 购物车 然后下载代码 5.解压后你下载的文…

MySQL数据库的日志你知道几个?

1、前言 MySQL相信大家都用过,但MySQL中都有哪些日志,是干什么的,估计有小伙伴还没有搞清楚。可能有小伙伴只知道最重要的三个:undolog、redolog、binlog。其实这是不全的,MySQL中的日志有: undolog&…

双端搭建个人博客

1. 准备工作 确保你的两个虚拟机都安装了以下软件: 虚拟机1(Web服务器): Apache2, PHP虚拟机2(数据库服务器): MariaDB2. 安装步骤 虚拟机1(Web服务器) 安装Apache2和PHP 更新系统包列表: sudo apt update安装Apache2: sudo apt install apache2 -y安装PHP及其Apac…

python学习第十二节:python开发图形界面

python学习第十二节:python开发图形界面 创建一个窗口实例化窗口对象调用窗口设置窗口大小设置窗口的标题设置窗口图标否能够改变窗口设置窗口的背景 创建容器组件容器组件的介绍组件参数的介绍label标签label添加标签控件 label添加标签定位label的relief参数label…

网站建设中,常用的后台技术有哪些,他们分别擅长做什么网站平台

PHP、Python、JavaScript、Ruby、Java和.NET各自适用于不同类型的网站平台。以下是对这些编程语言适用场景的具体介绍: PHP Web开发:PHP是一种广泛使用的开源服务器端脚本语言,特别适合Web开发。全球有超过80%的网站使用PHP作为服务器端编程语…

SaaS(Software as a Service)软件的主流技术架构

在当今数字化时代,SaaS(Software as a Service,软件即服务)软件以其灵活、高效和成本效益高的特点,成为企业信息化建设的首选。为了实现SaaS软件的稳定、可靠和高效运行,其技术架构的设计显得尤为重要。本文…

页面在移动设备上显示不正常的原因及解决方案

聚沙成塔每天进步一点点 本文回顾 ⭐ 专栏简介页面在移动设备上显示不正常的原因及解决方案1. 缺少 viewport 元标签1.1 问题描述1.2 解决方案1.3 注意事项 2. 响应式设计未实现或设计不当2.1 问题描述2.2 解决方案示例:媒体查询的使用 2.3 常见的媒体查询断点 3. 固…

基于传感网技术的职业院校安防系统实训室

一、引言 随着信息技术的飞速发展和城市化的不断加速,智能楼宇及安防系统已成为现代城市建设的重要组成部分。高职院校作为培养技术型人才的重要基地,应积极响应市场需求,建设符合时代潮流的安防系统实训室,以提升学生的专业技能…

Linux之实战命令11:tload应用实例(四十五)

简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【…

【工具】语音朗读PDF的免费工具

转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~ 背景介绍 看累了,不想看,能不能读给我听! 工具介绍 Natural Readers Free Text to Speech Online with Realistic…

最优化理论与自动驾驶(二-补充):求解算法(梯度下降法、牛顿法、高斯牛顿法以及LM法,C++代码)

在之前的章节里面(最优化理论与自动驾驶(二):求解算法)我们展示了最优化理论的基础求解算法,包括高斯-牛顿法(Gauss-Newton Method)、梯度下降法(Gradient Descent Metho…