Python使用多进程并行加速业务操作 完整代码
- 需求分析
- 完整代码
- 本demo性能分析
- Python中单线程、多线程和多进程的效率对比实验
需求分析
最近在对一个数据集进行处理,共2000条,每条去调一个第三方接口,耗时7-10秒。单线程处理一次要3.9-5.6小时,于是想着用多进程加速一下。 需求大致如下:
1、能配置进程数目
2、能加载要处理的数据
3、能打印完善的日志
4、多进程能共享处理后的数据结果,方便最终获取/导出
5、锁、超时控制、异常控制
完整代码
Python代码如下:(其中需要修改的地方加了TODO
)在win和linux上都可以用
import logging
import math
import multiprocessing
import time
import pandas as pd
from contextlib import contextmanager
import threading
# 设置日志配置
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO)
# 定义超时异常
class TimeoutException(Exception): pass
# 超时控制
@contextmanager
def time_limit(seconds):
timer = threading.Timer(seconds, lambda: _raise_timeout_exception())
def _raise_timeout_exception():
raise TimeoutException("Timed out!")
try:
timer.start()
yield
finally:
timer.cancel()
def process_data(i, data, results, lock):
logging.info('------group: ' + str(i) + '------')
logging.info('------len: ' + str(len(data)) + '------')
for _, row in data.iterrows():
if _ % (math.ceil(len(data) / 10.0)) == 0:
logging.info('------group' + str(i) + ': ' + str(_) + '/' + str(len(data)) + '------')
try:
# 超时限制 TODO 秒数
with time_limit(20):
# 模拟任务 TODO 任务
time.sleep(1)
# 使用锁来保证对结果列表的进程安全访问
lock.acquire()
try:
# 将结果添加到共享的结果列表中 TODO 收集结果
results.append(row['id'])
finally:
lock.release()
except Exception as e:
logging.info('------err: ' + str(e) + '------')
if __name__ == '__main__':
# 手动设置并行进程数目 TODO 进程数目
group_num = 8
# 从电脑配置中设置并行进程数目
# group_num = multiprocessing.cpu_count()
# 读取数据 TODO 数据源
data = pd.read_excel('data.xlsx')
# 使用pandas平均划分数据
grouped_data = data.groupby(data.index % group_num)
# 定义共享的结果列表
manager = multiprocessing.Manager()
results = manager.list()
# 创建锁
lock = multiprocessing.Lock()
start_time = time.time()
# 定义多进程
processes = []
for i in range(group_num):
p = multiprocessing.Process(target=process_data,
args=(i, grouped_data.get_group(i).reset_index(), results, lock))
processes.append(p)
# 启动
for _p in processes:
_p.start()
for _p in processes:
_p.join()
end_time = time.time()
execution_time = end_time - start_time
# 打印数据
print(f"代码执行时间:{execution_time}秒")
print(results)
data.xlsx
里面的数据是随便打的:
本demo性能分析
16核CPU,执行上述代码,其中任务部分用了time.sleep(1)
停了1秒,耗时分析如下:
进程数 | 耗时 |
---|---|
1 | 29.317383289337158秒 |
4 | 8.288025140762329秒 |
8 | 5.77861475944519秒 |
14 | 4.941734313964844秒 |
16 | 5.262717008590698秒 |
可以看到加了多进程,加速效果还是比较明显的。
Python中单线程、多线程和多进程的效率对比实验
此处参考:http://blog.atomicer.cn/2016/09/30/Python
我们知道,线程操作、进程操作一般分为CPU密集型操作、IO密集型操作、网络请求密集型操作。
资料显示,如果多线程的进程是CPU密集型的,那多线程并不能有多少效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降,推荐使用多进程;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。所以我们根据实验对比不同场景的效率:
CPU密集型操作 | IO密集型操作 | 网络请求密集型操作 | |
---|---|---|---|
线性操作 | 94.91824996469 | 22.46199995279 | 7.3296000004 |
多线程操作 | 101.1700000762 | 24.8605000973 | 0.5053332647 |
多进程操作 | 53.8899999857 | 12.7840000391 | 0.5045000315 |
通过上面的结果,我们可以看到:
多线程在IO密集型的操作下似乎也没有很大的优势(也许IO操作的任务再繁重一些就能体现出优势),在CPU密集型的操作下明显地比单线程线性执行性能更差,但是对于网络请求这种忙等阻塞线程的操作,多线程的优势便非常显著了
多进程无论是在CPU密集型还是IO密集型以及网络请求密集型(经常发生线程阻塞的操作)中,都能体现出性能的优势。不过在类似网络请求密集型的操作上,与多线程相差无几,但却更占用CPU等资源,所以对于这种情况下,我们可以选择多线程来执行。