Python代码中通常有三种实现并行化的方法
- multiprocessing的同步方法,map
- multiprocessing的异步方法,apply_async
- Ray提供的并行或分布式能力
Ray 和 Python 的 multiprocessing
模块都是用于并行和分布式计算的工具,但它们在设计目标、功能和实现细节上有一些显著的区别。
Multiprocessing
-
设计目标:
multiprocessing
是 Python 标准库的一部分,旨在利用多核处理器来并行化任务。它通过在多个进程之间分配任务来绕过 GIL(全局解释器锁)的限制。
-
实现方式:
- 使用进程而不是线程,这意味着每个进程都有自己的 Python 解释器和内存空间。
- 提供了
Process
类和共享数据的工具,如Queue
,Pipe
,Manager
等。
-
使用场景:
- 适合 CPU 密集型任务,因为每个进程运行在独立的内存空间中,可以充分利用多核 CPU。
- 需要在单机上进行并行处理。
-
限制:
- 由于进程间的内存独立性,数据共享和通信相对复杂。
- 启动新进程的开销较大。
Ray
-
设计目标:
- Ray 是一个开源的通用分布式计算框架,旨在简化大规模并行和分布式应用的开发。
- 支持大规模集群管理和调度任务,不限于单机。
-
实现方式:
- 提供高级抽象,如任务(task)和 actor 模型,可以轻松地在集群中并行化和分布式化工作。
- 内置了对资源管理、任务调度和故障恢复的支持。
-
使用场景:
- 适合需要大规模分布式计算的任务,包括机器学习、数据处理和实时流处理。
- 提供了高效的状态共享和数据传输机制。
-
优势:
- 能够扩展到多台机器和大规模集群。
- 内置的库支持,如 Ray RLlib(强化学习库)和 Ray Tune(超参数优化工具),适合复杂应用。
-
限制:
- 需要额外安装和配置,初学者可能需要时间学习和适应。
- 对于简单的并行任务可能显得过于复杂。
总之,multiprocessing
适合简单的单机并行任务,而 Ray 更加适合需要分布式计算的复杂应用。选择使用哪个工具取决于具体的应用需求和规模。
调用
每种方法都提供一个调用示例,如下:
# from multiprocessing.pool import Pool # 进程池
from multiprocessing import Pool # 进程池
from multiprocessing import cpu_count
from multiprocessing import set_start_method
import time
import ray
import numpy as np
ray.init(num_cpus=cpu_count())
@ray.remote
def test_remote(para):
return test(para)
def test(para):
times = para[1]%6+1
task_complexity = para[3]
# print(para[0], para[1], para[2],beishu)
for i in range(task_complexity*times):
a = i*1.1+0.9/para[2]
return [para[0],para[1]]
def main(threds_num, task_complexity):
tic = time.time()
par = 3
lists = [(i, i+1, par, task_complexity) for i in range(threds_num)]
pool = Pool(processes=cpu_count())
res = pool.map_async(test, (lists)).get()
pool.close()
pool.join()
toc = time.time()
print('multiprocessing 异步耗时: {}s'.format(toc - tic))
# print(res)
return toc - tic
def main2(threds_num, task_complexity):
tic = time.time()
par = 3
lists = [(i, i+1, par, task_complexity) for i in range(threds_num)]
pool = Pool(processes=cpu_count())
res = pool.map(test, (lists))
pool.close()
pool.join()
toc = time.time()
print('multiprocessing 同步执行耗时: {}s'.format(toc - tic))
# print(res)
return toc - tic
def main3(threds_num, task_complexity):
tic = time.time()
par = 3
lists = [(i, i + 1, par, task_complexity) for i in range(threds_num)]
futures = [test_remote.remote(item) for item in lists]
res = ray.get(futures)
toc = time.time()
print('Ray 执行耗时: {}s'.format(toc - tic))
# print(res)
return toc - tic
def performance_compare():
res_times = []
for threds_num in [2,8,32,128,4096,409600]:
task_complexitys = [int(1e3), int(1e5), int(1e7)]
for task_complexity in task_complexitys:
asyn_time = main(threds_num, task_complexity)
map_time = main2(threds_num, task_complexity)
ray_time = main3(threds_num, task_complexity)
res_times.append([asyn_time, map_time, ray_time])
for item in res_times:
print(f"{item[0]:.5f}\t{item[1]:.5f}\t{item[2]:.5f}")
if __name__ == "__main__":
set_start_method("spawn")
performance_compare()
性能对比
在单台笔记本上执行相同的计算任务,主要区分循环次数和单个任务的复杂度两方面来进行对比,统计耗时如下:
结论
- 单个任务复杂度高时,一般情况下ray的效率比multiprocessing的效率要高
- 循环数量相同时,随着任务复杂度的上升,ray的优势越来越小
- multiprocessing的同步和异步执行在所有情况下都表现接近