文章目录
- 一、简介:【进程 + 多进程】 + 【线程 + 多线程】
- 1.1、系统支持的CPU核心处理器
- 1.2、核心处理器的参数解析:12th Gen Intel( R ) Core( TM ) i7-12700 2.10 GHz
- 二、函数详解
- 2.0、计算CPU核心数:os.cpu_count() + mp.cpu_count()
- 2.1、用于【多线程并行计算】的执行器(Executor):concurrent.futures.ThreadPoolExecutor()
- 2.2、用于【多进程并行计算】的执行器(Executor):concurrent.futures.ProcessPoolExecutor()
- 2.3、多进程与多线程的应用领域
- 四、项目实战
- 4.1、同时运行相同任务
- 4.1.1、多线程并行计算
- 4.1.2、多进程并行计算
- 4.2、同时运行不同任务
- 4.2.1、多线程
- 4.2.2、多进程
- 4.2.3、协程(使用 asyncio)
- 4.2.4、并行计算库(使用 concurrent.futures)速度极快
一、简介:【进程 + 多进程】 + 【线程 + 多线程】
进程(Process)
:进程是操作系统中的一个执行单元。每个进程都有自己独立的内存空间,包含代码、数据和资源。
- 进程之间是相互独立的,一个进程的崩溃不会影响其他进程。
- 进程之间通信相对复杂,通常需要使用特定的机制,如管道、消息队列或共享内存等。
- 创建和销毁进程的开销较大。
- 进程适用于多个任务需要完全独立运行,且不共享资源的场景。
多进程(Multiprocessing)
:多进程是同时运行多个进程的机制。每个进程独立执行,并拥有自己的资源,例如内存空间和文件句柄。
- 多进程能够充分利用多核处理器,实现并行计算,从而提高计算性能和系统的响应速度。
- 多进程之间相互独立,可以通过进程间通信来实现数据交换和协调。
线程(Thread)
:线程是进程中的一个执行单元。一个进程可以包含多个线程,它们共享相同的内存空间和资源。
- 线程之间相互依赖,共享数据和资源更方便,但也容易导致数据竞争等问题。
- 线程之间的切换开销较小,因为它们共享相同的进程上下文。
- 线程适用于多个任务之间需要共享资源,且需要频繁切换的场景。
多线程(Multithreading)
:多线程是在一个进程中同时运行多个线程。
- 多线程可以提高程序的并发性,从而更高效地利用计算机资源。
- 但多线程编程需要注意线程同步和数据共享的问题,避免出现竞态条件和死锁等 bug。
1.1、系统支持的CPU核心处理器
系统支持的CPU核心处理器:取决于硬件和操作系统,即处理器的核心数和操作系统的配置。
- 处理器核心数:现代计算机通常配备有多核心的处理器,每个核心可以执行一个线程。因此,一个处理器的核心数就决定了系统支持的线程数上限。
- 超线程技术:一些处理器支持超线程技术,它允许一个物理核心同时执行两个线程。这意味着一个处理器的线程数可以是核心数的两倍。
- 操作系统:不同的操作系统对线程数量的支持有所不同。
一般来说,在现代桌面和服务器计算机上,可以期望支持至少几十个线程。例如,一个4核8线程的处理器支持同时执行8个线程。
1.2、核心处理器的参数解析:12th Gen Intel( R ) Core( TM ) i7-12700 2.10 GHz
- 代代相传:
"12th Gen"
指的是该处理器是英特尔第12代Core i7系列处理器。随着技术的不断进步,每代处理器都会带来更高的性能、更多的功能和更好的能效。- 型号名称:
"Intel(R) Core(TM) i7-12700"
是该处理器的型号名称。其中,"i7"
表示该处理器属于高性能桌面处理器系列,"12700"
表示该型号在12th Gen Core i7系列中的具体型号。- 基本主频:
"2.10 GHz"
表示该处理器的基本主频为2.10 GHz,即处理器的时钟频率。这是处理器在默认情况下的主频,实际运行时可能根据负载和功耗管理动态调整主频。
- 核心数和线程数:
Core i7-12700
是一款多核心处理器。它通常配备有多个物理核心和支持超线程技术,从而每个物理核心可以同时执行两个线程。这可以提高处理器的并发处理能力。具体的核心数和线程数需要参考该型号的规格表,通常在该型号名称后的括号中有说明。- 架构和制造工艺:
Core i7-12700
属于Intel的"12th Gen Alder Lake" 架构。该架构采用了不同的核心设计,包括"Performance Cores"(性能核心)和"Efficiency Cores"(效能核心)。性能核心用于高性能计算任务,而效能核心则用于轻负载和功耗敏感任务,以提供更好的能效。- 技术支持:
Core i7-12700
支持许多英特尔的技术特性,例如超线程技术、Turbo Boost技术(动态加速主频)、内存缓存、虚拟化技术等。这些技术可以提高处理器的性能和能效,同时支持更多的计算和应用场景。
二、函数详解
2.0、计算CPU核心数:os.cpu_count() + mp.cpu_count()
# 方法一
import os
num_cores = os.cpu_count()
print("系统支持的CPU核心处理器数量:", num_cores)
# 方法二
import multiprocessing as mp
num_cores = mp.cpu_count()
print("系统支持的CPU核心处理器数量:", num_cores)
2.1、用于【多线程并行计算】的执行器(Executor):concurrent.futures.ThreadPoolExecutor()
import concurrent.futures
"""
函数说明:concurrent.futures.ThreadPoolExecutor(max_workers):
输入参数: max_workers 指定最大线程数。默认使用系统的CPU核心数作为最大线程数。
"""
#############################################################################
# 使用举例:
# (0)使用ThreadPoolExecutor来创建一个线程池。
# (1)使用executor.submit将所有参数只应用一次到函数calculate(),完成并行化计算。
# (2)使用executor.map将列表的每个参数循环应用到函数calculate(),完成并行化计算。
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(my_function, arg1, arg2) # 其中:my_function是执行函数,arg1和arg2是函数的参数。
results = executor.map(my_function, [arg1, arg2, arg3]) # 其中:my_function是执行函数,[arg1, arg2, arg3]是一个包含函数参数的列表。
result = future.result() # 获取任务的执行结果
executor.shutdown() # 等待所有任务完成并关闭线程池
#############################################################################
"""
使用方式:可以使用submit()、map()、shutdown()方法分别用于提交任务、并行计算、关闭线程池。
(1)并行计算一个任务:submit()方法提交一个任务(函数)给ThreadPoolExecutor进行并行计算,并返回一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
(2)并行计算多个任务:map()方法接收一个函数和可迭代的参数,并将函数应用于每个参数,实现并行计算。
(3)获取任务的执行结果:concurrent.futures.Future对象表示一个尚未完成的任务。如果任务尚未完成,result()方法会阻塞直到任务完成并返回结果。
(4)等待所有任务完成并关闭线程池:shutdown()方法。如果不调用shutdown(),程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
"""
2.2、用于【多进程并行计算】的执行器(Executor):concurrent.futures.ProcessPoolExecutor()
应用:允许在多个进程中执行任务,从而实现并行计算,特别适用于CPU密集型任务。
import concurrent.futures
"""
函数说明:concurrent.futures.ProcessPoolExecutor(max_workers):
输入参数: max_workers 指定最大进程数。默认使用系统的CPU核心数作为最大进程数。
"""
#############################################################################
# 使用举例:
# (0)使用ProcessPoolExecutor来创建一个进程池。
# (1)使用executor.submit将所有参数只应用一次到函数calculate(),完成并行化计算。
# (2)使用executor.map将列表的每个参数循环应用到函数calculate(),完成并行化计算。
with concurrent.futures.ProcessPoolExecutor() as executor:
future = executor.submit(my_function, arg1, arg2) # 其中:my_function是执行函数,arg1和arg2是函数的参数。
future = executor.map(my_function, [arg1, arg2, arg3]) # 其中:my_function是执行函数,[arg1, arg2, arg3]是一个包含函数参数的列表。
result = future.result() # 获取任务的执行结果
executor.shutdown() # 等待所有任务完成并关闭进程池
#############################################################################
"""
使用方式:可以使用submit()、map()、shutdown()方法分别用于提交任务、并行计算、关闭进程池。
(1)并行计算一个任务:submit()方法提交一个任务(函数)给ThreadPoolExecutor进行并行计算,并返回一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
(2)并行计算多个任务:map()方法接收一个函数和可迭代的参数,并将函数应用于每个参数,实现并行计算。
(3)获取任务的执行结果:concurrent.futures.Future对象表示一个尚未完成的任务。如果任务尚未完成,result()方法会阻塞直到任务完成并返回结果。
(4)等待所有任务完成并关闭进程池:shutdown()方法。如果不调用shutdown(),程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
"""
2.3、多进程与多线程的应用领域
多线程
:适用于IO密集型任务,可以让CPU在IO等待期间切换到其他线程,提高系统的效率。
- IO密集型任务:任务的主要瓶颈在于输入/输出(IO)操作,而不是计算操作。这类任务涉及大量的读取、输入、网络通信或其他IO操作,任务执行时主要的时间都花费在等待IO操作完成。
- 在Python中,多线程由于存在全局解释器锁(GIL),不能实现真正的并行计算。
- 单线程处理IO操作:降低系统效率。因为线程在等待IO时会被阻塞,不能同时处理其他任务。
- 多线程并行计算:提高系统效率。让CPU在IO等待期间切换到其他任务,充分利用计算资源。
典型的IO密集型任务包括:(1)文件读写:大量的文件读取和写入操作,如读取大型数据文件、写入日志文件等。(2)网络通信:涉及网络请求和响应的任务,如下载文件、发送和接收网络请求等。(3)数据库操作:对数据库进行大量读取和写入操作,如查询数据库、写入数据等。(4)图像/音视频处理:图像、音频或视频处理任务中的IO操作,如加载图像、保存处理后的图像等。(5)并发网络服务器:处理大量并发客户端连接的服务器,其中主要的延迟来自于网络IO。
多进程
:适用于CPU密集型任务,可以充分利用多核处理器来实现真正的并行计算。
- CPU密集型任务:任务的主要瓶颈在于计算操作,而不是输入/输出(IO)操作。这类任务涉及大量的计算和处理,任务执行时主要的时间都花费在CPU计算上。
典型的CPU密集型任务包括:(1)大规模数据处理:对大量数据进行复杂的计算、统计、分析等操作。(2)数值计算:进行大规模的数值计算,如矩阵运算、图像处理、信号处理等。(3)加密解密:进行大量的数据加密或解密操作。(4)3D渲染:进行复杂的三维图形渲染,如视频游戏或动画制作中的渲染操作。(5)并行算法:执行需要大量并行计算的算法,如并行排序、并行搜索等。
四、项目实战
4.1、同时运行相同任务
4.1.1、多线程并行计算
"""
# 将下述算法优化为并行计算
for ii in range(image_raw.shape[0]): # 遍历3D的每个slice
M = phase(image_median[ii], 4, 6) # 调用函数
image_final_median[ii] = M # 保存计算结果
"""
import napari
import tifffile
import numpy as np
import concurrent.futures
from skimage.filters import median
def phase(img, param1, param2):
# 定义phase函数。示例:需要根据实际情况实现该函数
return calculated_phase
def calculate_phase(ii):
# 定义calculate函数。
M = phase(image_median[ii], 4, 6)
image_final_median[ii] = M
if __name__ == "__main__":
# 1、加载图像 + 图像处理
image_path = r'D:\downSampleImage.tif'
image_raw = tifffile.imread(image_path) # 3D灰度图像:100x110x120
image_median = median(image_raw) # 中值滤波
image_final_median = np.zeros_like(image_median) # 新建数组
# 2、并行计算
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(calculate_phase, range(image_raw.shape[0]))
# 3、在napari中显示图像
viewer = napari.Viewer() # 创建napari视图
viewer.layers.clear() # 清空图层
viewer.add_image(image_median, name="image_median") # 添加图像
viewer.add_image(image_final_median, name="image_final_median") # 添加图像
napari.run() # 显示napari图形界面
4.1.2、多进程并行计算
"""
# 将下述算法优化为并行计算
for ii in range(image_raw.shape[0]): # 遍历3D的每个slice
M = phase(image_median[ii], 4, 6) # 调用函数
image_final_median[ii] = M # 保存计算结果
"""
import napari
import tifffile
import numpy as np
import concurrent.futures
from skimage.filters import median
def phase(img, param1, param2):
# 定义phase函数。示例:需要根据实际情况实现该函数
return calculated_phase
def calculate_phase(ii):
# 定义calculate函数。
M = phase(image_median[ii], 4, 6) # 归一化结果:M = [0~1]
# image_final_median[ii] = M
return M
"""
一、多进程并行计算:子进程中的变量必须是全局变量。
举例说明:在calculate函数中,image_median将提示未定义。
二、多进程并行计算:子进程无法直接修改主进程的变量。(若调用,系统不提示且不报错)
解决方法一:可以通过返回(子进程)计算结果,然后在(主进程)遍历获取。
解决方法二:可以在调用多进程时,将所需要的变量传给子进程。
举例说明:在calculate函数中,对(主进程变量)image_final_median的赋值操作失败,最终得到的image_final_median为空。
具体做法:results = executor.map(calculate, range(image_raw.shape[0]))
"""
# 1、定义全局变量(多进程)
image_path = r'D:\downSampleImage.tif'
image_raw = tifffile.imread(image_path) # 3D灰度图像:100x110x120
image_median = median(image_raw) # 中值滤波
image_final_median = np.zeros_like(image_median) # 新建数组
if __name__ == "__main__":
# 2、并行计算
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(calculate_phase, range(200, 202)) # image_raw.shape[0]
for ii, result in enumerate(results): # 备注:ii从0开始,而不是200
image_final_median[ii] = result
# 3、在napari中显示图像
viewer = napari.Viewer() # 创建napari视图
viewer.layers.clear() # 清空图层
viewer.add_image(image_median, name="image_median") # 添加图像
viewer.add_image(image_final_median, name="image_final_median") # 添加图像
napari.run() # 显示napari图形界面
4.2、同时运行不同任务
-
多线程 适用于 I/O 密集型任务,因为线程切换的开销较小,可以有效地并行执行多个 I/O 操作,如文件读写、网络请求等。但由于 Python 的全局解释器锁(GIL),多线程在 CPU 密集型任务上性能有限。
-
多进程 适用于 CPU 密集型任务,因为每个进程都有独立的 Python 解释器和内存空间,不受 GIL 限制,可以充分利用多核处理器。对于 CPU 密集型任务,多进程通常比多线程更快。
-
协程 适用于高并发的 I/O 密集型任务,协程允许在单线程中执行多个任务,避免了线程切换的开销,但需要合理地设计异步代码。协程可以实现非常高的并发性能,但在 CPU 密集型任务上性能可能较差。
-
并行计算库 如 concurrent.futures,joblib,dask 等可以提供简单的接口来管理并行任务,性能取决于底层的并行执行策略和硬件资源。
4.2.1、多线程
import threading
import time
# 定义任务1
def task1():
for i in range(5):
print("Task 1 - Step", i + 1)
time.sleep(1) # 模拟耗时操作
# 定义任务2
def task2():
for i in range(3):
print("Task 2 - Step", i + 1)
time.sleep(1) # 模拟耗时操作
if __name__ == "__main__":
# 创建两个线程
thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("All tasks are completed.")
"""
Task 1 - Step 1
Task 2 - Step 1
Task 2 - Step 2
Task 1 - Step 2
Task 1 - Step 3
Task 2 - Step 3
Task 1 - Step 4
Task 1 - Step 5
All tasks are completed.
"""
4.2.2、多进程
import multiprocessing
import time
# 定义任务1
def task1():
for i in range(5):
print("Task 1 - Step", i + 1)
time.sleep(1) # 模拟耗时操作
# 定义任务2
def task2():
for i in range(3):
print("Task 2 - Step", i + 1)
time.sleep(1) # 模拟耗时操作
if __name__ == "__main__":
# 创建两个进程
process1 = multiprocessing.Process(target=task1)
process2 = multiprocessing.Process(target=task2)
# 启动进程
process1.start()
process2.start()
# 等待进程完成
process1.join()
process2.join()
print("All tasks are completed.")
"""
Task 1 - Step 1
Task 2 - Step 1
Task 2 - Step 2
Task 1 - Step 2
Task 2 - Step 3
Task 1 - Step 3
Task 1 - Step 4
Task 1 - Step 5
All tasks are completed.
"""
4.2.3、协程(使用 asyncio)
import asyncio
# 定义任务1
async def task1():
for i in range(5):
print("Task 1 - Step", i + 1)
await asyncio.sleep(1) # 模拟异步操作
# 定义任务2
async def task2():
for i in range(3):
print("Task 2 - Step", i + 1)
await asyncio.sleep(1) # 模拟异步操作
async def main():
# 并行执行 task1 和 task2
await asyncio.gather(task1(), task2())
if __name__ == "__main__":
asyncio.run(main())
"""
Task 1 - Step 1
Task 2 - Step 1
Task 1 - Step 2
Task 2 - Step 2
Task 1 - Step 3
Task 2 - Step 3
Task 1 - Step 4
Task 1 - Step 5
"""
4.2.4、并行计算库(使用 concurrent.futures)速度极快
import concurrent.futures
# 定义任务1
def task1():
for i in range(5):
print("Task 1 - Step", i + 1)
# 定义任务2
def task2():
for i in range(3):
print("Task 2 - Step", i + 1)
if __name__ == "__main__":
# 使用 ThreadPoolExecutor 创建线程池
with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交任务1和任务2给线程池
future1 = executor.submit(task1)
future2 = executor.submit(task2)
# 获取任务1和任务2的结果
result1 = future1.result()
result2 = future2.result()
# 在这里执行任何需要等待线程池完成的后续操作
"""
Task 1 - Step 1
Task 1 - Step 2
Task 1 - Step 3
Task 1 - Step 4
Task 1 - Step 5
Task 2 - Step 1
Task 2 - Step 2
Task 2 - Step 3
"""