Python提供了multiprocessing模块来开启子进程,并在子进程中执行我们定制的任务。
(python)multiprocessing子进程(Process类的使用)
- 两种使用方法
- 第一种方法:使用Process类
- 第二种方法:使用multiprocessing.Pool
- 以下是两个实例,分别演示如何使用Process类和multiprocessing.Pool来创建和管理子进程。
- 实例一:使用Process类
- 实例二:使用multiprocessing.Pool
- self.pool.starmap和pool.apply_async区别
- self.pool.starmap
- pool.apply_async
- 异步执行期间,继续执行主进程代码
- 进程池完成结束判断
- 选择进程启动方法
两种使用方法
第一种方法:使用Process类
self.process = Process(target=Server, args=(self.zmqQThread.ipc_url0, self.zmqQThread.ipc_url1))
# self.process.daemon = True
self.process.start()
解释:这段代码直接创建一个Process对象,并启动一个新的进程来运行Server函数,传入self.zmqQThread.ipc_url0和self.zmqQThread.ipc_url1作为参数。
适用场景:这种方法适用于需要直接管理单个子进程的情况,尤其是当你需要在特定点启动和控制这个进程时。比如界面中需要调用一个子进程
第二种方法:使用multiprocessing.Pool
mp_context = multiprocessing.get_context('spawn')
pool = mp_context.Pool(processes=1)
解释:这段代码使用multiprocessing.get_context(‘spawn’)获取一个特定的上下文(这里是’spawn’),并基于这个上下文创建一个进程池(Pool)。
适用场景:这种方法适用于需要管理多个子进程的情况,尤其是在需要并行处理多个任务时。进程池可以有效地管理和分配多个任务给多个进程。这里需要程序需要处理多个任务时使用
以下是两个实例,分别演示如何使用Process类和multiprocessing.Pool来创建和管理子进程。
实例一:使用Process类
展示了如何直接使用Process类来启动一个单独的服务器进程。
import time
from multiprocessing import Process
def Server(ipc_url0, ipc_url1):
print(f"Server started with IPC URLs: {ipc_url0}, {ipc_url1}")
for _ in range(5):
print("Server is running...")
time.sleep(1)
print("Server has finished running.")
class MyServer:
def __init__(self, ipc_url0, ipc_url1):
self.ipc_url0 = ipc_url0
self.ipc_url1 = ipc_url1
self.process = Process(target=Server, args=(self.ipc_url0, self.ipc_url1))
def start(self):
self.process.start()
self.process.join()
if __name__ == '__main__':
my_server = MyServer('ipc://localhost:5555', 'ipc://localhost:5556')
my_server.start()
print("Main process has finished.")
同步执行:
self.process.join()的作用是阻塞主进程,直到self.process进程结束。这确保了主进程会等待子进程完成后再继续执行后续代码。
实例二:使用multiprocessing.Pool
展示了如何使用multiprocessing.Pool来管理多个并行任务。在这个例子中,我们创建一个进程池,并使用它来并行执行多个任务。
import time
import multiprocessing
def worker(number):
print(f"Worker {number} started")
time.sleep(2)
print(f"Worker {number} finished")
if __name__ == '__main__':
# 使用 'spawn' 上下文创建进程池
mp_context = multiprocessing.get_context('spawn')
pool = mp_context.Pool(processes=3) # 创建一个包含3个进程的池
# 提交任务到进程池
results = []
for i in range(5):
result = pool.apply_async(worker, args=(i,))
results.append(result)
# 等待所有任务完成
for result in results:
result.wait()
pool.close()
pool.join()
print("All tasks have finished.")
异步执行
self.pool.starmap和pool.apply_async区别
self.pool.starmap
starmap方法类似于map,但它接受一个参数列表的列表,将这些参数解包传递给目标函数。这使得starmap非常适合需要传递多个参数的函数。
import multiprocessing
import time
def worker(x, y):
print(f"Worker started with args: ({x}, {y})")
time.sleep(2)
return x + y
if __name__ == '__main__':
with multiprocessing.Pool(processes=3) as pool:
# 参数列表的列表
args = [(1, 2), (3, 4), (5, 6), (7, 8)]
# 使用 starmap 执行任务
results = pool.starmap(worker, args)
print("Results:", results)
pool.apply_async
apply_async方法允许异步地调度单个任务,并且可以立即返回一个AsyncResult对象。这个对象可以用于获取任务结果、检查任务状态或者等待任务完成。
import multiprocessing
import time
def worker(x):
print(f"Worker started with arg: {x}")
time.sleep(2)
return x * x
if __name__ == '__main__':
# 创建一个进程池
with multiprocessing.Pool(processes=3) as pool:
# 提交多个异步任务
results = [pool.apply_async(worker, args=(i,)) for i in range(15)]
# 可以在这里执行其他操作,不需要等待任务完成
print("Main process continues to run while workers are processing.")
# 获取任务结果
for result in results:
print("Result:", result.get())
print("Main process has finished.")
异步执行期间,继续执行主进程代码
import multiprocessing
import time
def worker(x):
print(f"Worker started with arg: {x}")
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
# 提交第一批任务
results1 = [pool.apply_async(worker, args=(i,)) for i in range(15)]
# 提交第二批任务
results2 = [pool.apply_async(worker, args=(i + 5,)) for i in range(15)]
# 可以在这里执行其他操作,不需要等待任务完成
print("Main process continues to run while workers are processing.")
# 继续执行主进程的其他操作
for i in range(5):
print(f"Main process doing other work {i}")
time.sleep(1)
print("Main process has finished.")
上面进程池里的代码全部运行完,但是如果主进程比子进程快,那么子进程就会提前结束,那么为了避免进程池的任务提前结束未完成,则需要判断:
进程池完成结束判断
import multiprocessing
import time
def worker(x):
print(f"Worker started with arg: {x}")
def init_pool():
pool = multiprocessing.Pool(processes=4)
return pool
def check_pool(results):
# 检查任务状态
while True:
# 检查所有任务是否完成
if all(result.ready() for result in results):
print("All tasks are completed.")
break
else:
print("Some tasks are still running...")
time.sleep(0.2) # 等待一段时间后再检查
if __name__ == '__main__':
pool = init_pool()
# 提交第一批任务
results1 = [pool.apply_async(worker, args=(i,)) for i in range(15)]
# 提交第二批任务
results2 = [pool.apply_async(worker, args=(i + 5,)) for i in range(15)]
# 可以在这里执行其他操作,不需要等待任务完成
print("Main process continues to run while workers are processing.")
results = results1 + results2
# 继续执行主进程的其他操作
for i in range(5):
print(f"Main process doing other work {i}")
pool.apply_async(worker, args=(100 + i,))
# time.sleep(0.1)
check_pool(results)
print("Main process has finished.")
将其模块化:
- init_pool初始化进程池,返回pool
- check_pool检查进程池状态
- results为组合进程池中的结果,没有返回结果一样可以用,如果worker中有return,则for result in all_results:result.get()
选择进程启动方法
通过 multiprocessing.get_context() 方法,你可以显式地选择进程启动方法,保证代码在不同平台上的行为一致。常见的启动方法有:
fork: 父进程被复制,子进程继承父进程的资源。这是 Unix 系统默认的启动方法。
spawn: 父进程启动一个全新的 Python 解释器,并在这个新解释器中导入必要的资源。这是 Windows 系统默认的启动方法,也是 Unix 系统的可选方法。
forkserver: 父进程启动一个单独的服务器进程,后续子进程通过与这个服务器通信来创建。这在某些情况下比 fork 更安全。
import multiprocessing
import time
def worker(x):
print(f"Worker started with arg: {x}")
time.sleep(2)
print(f"Worker finished with arg: {x}")
if __name__ == '__main__':
# 获取 spawn 启动方法的上下文
mp_context = multiprocessing.get_context('spawn')
# 使用 spawn 上下文创建进程池
pool = mp_context.Pool(processes=4)
# 提交任务
results = [pool.apply_async(worker, args=(i,)) for i in range(5)]
# 可以在这里执行其他操作,不需要等待任务完成
print("Main process continues to run while workers are processing.")
# 获取所有任务结果
for result in results:
result.wait() # 等待任务完成
# 关闭进程池
pool.close()
pool.join()
print("Main process has finished.")