目录
一、Dask介绍
二、使用说明
安装
三、测试
1、单个文件中实现功能
2、运行多个可执行文件
最近在写并行计算相关部分,用到了python的Dask库。
Dask官网:Dask | Scale the Python tools you love
一、Dask介绍
Dask是一个灵活的并行和分布式计算库,旨在处理大规模数据集。它提供了类似于Pandas 和 NumPy 的数据结构,但能够有效处理比内存更大的数据集。通过使用Dask,可以在单台机器或分布式集群中运行,更方便处理大规模数据。
Dask是一个用于Python的并行计算模块,从单机多核扩展到拥有数千台机器的数据中心。它既由低级任务API,也有更高级面向数据的API。低级任务API支持Dask与多种Python库的集成,公共API为围绕Dask发展的各种工具的生态系统提供了基础。
Dask相较于Spark这些大数据处理框架,更轻量级。Dask更侧重与其他框架,如:Numpy、Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。
Dask存在三种最基本的数据结构,分别是:Arrays、Dataframes以及Bags。
二、使用说明
安装
pip install dask
python -m pip install "dask[array]"
python -m pip install "dask[distributed]"
python -m pip install "dask[dataframe]"
先测试是否已经安装了模块,命令行进入到python3编辑器:
from dask.distributed import Client, progress
没有报缺少模块错误,则说明是可以正常执行的。
三、测试
1、单个文件中实现功能
下述的主要数据处理在定义计算任务函数calculate_value(num)中,即将计算任务函数处理32次。
from dask.distributed import Client, progress
import time
# 定义计算任务的函数
def calculate_value(num):
num_float = float(num) * 0.33
num_double = float(num) * 0.33
return num_float, num_double
# 设置Dask客户端
def setup_client():
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
scheduler_info = client.scheduler_info()
ncores = sum(worker['nthreads'] for worker in scheduler_info['workers'].values())
print(f"Connected to Dask cluster with {ncores} cores")
return client
# 提交任务并收集结果
def submit_tasks(client, num, num_tasks=32):
# 创建任务列表
tasks = [client.submit(calculate_value, num) for _ in range(num_tasks)]
# 等待所有任务完成,并显示进度
progress(tasks)
# 收集结果
results = [task.result() for task in tasks]
return results
# 主函数
def main():
num = 558558571 # 这是您要处理的数字
client = setup_client() # 设置Dask客户端
# 提交32个任务
results = submit_tasks(client, num)
# 打印结果
for i, (num_float, num_double) in enumerate(results):
print(f"Task {i+1} - num_float: {num_float}, num_double: {num_double}")
# 关闭客户端连接
client.close()
if __name__ == "__main__":
main()
运行上述的python程序:
python3 my_dask_script.py
执行结果如下:
此时表示运行了32个task。
在运行的时候如果提示:
表明 dask-scheduler
无法启动,原因是端口 8787
已经被占用了。
解决方法:
1、查找并终止占用端口 8787
的进程
(1)先安装lsof:
apt install lsof
(2)查看占用端口进程:
lsof -i :8787
(3)通过进程的 PID 使用 kill
命令终止该进程:
kill -9 PID
2、修改 dask-scheduler
使用的端口
dask-scheduler --port 8888
再次重新启动查看 dask-scheduler 使用的端口:
dask-scheduler
2、运行多个可执行文件
我在同目录中创建了一个test.cc文件,为简单的打印数据,内容如下:
#include <iostream>
#include <iomanip>
int main() {
int num = 558558571;
float num_float = static_cast<float>(num) * 0.33;
double num_double = static_cast<double>(num) * 0.33;
std::cout << "num value: " << num << std::endl;
std::cout << std::fixed << std::setprecision(2);
std::cout << "num_float value: " << num_float << std::endl;
std::cout << "num_double value: " << num_double << std::endl;
return 0;
}
此时将上述的test.cc编译:
g++ -o main test.cc
然后新建一个my_dask_script.py文件,内容如下:
from dask.distributed import Client, LocalCluster
import os
# 定义执行外部程序的函数
def run_external_program():
cmd = './main' # 您的外部程序命令
os.system(cmd) # 使用os.system来执行命令
# 设置Dask客户端
def setup_client():
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
scheduler_info = client.scheduler_info()
ncores = sum(worker['nthreads'] for worker in scheduler_info['workers'].values())
print(f"Connected to Dask cluster with {ncores} cores")
return client
# 提交任务到Dask集群
def submit_tasks(client, num_tasks=32):
futures = [client.submit(run_external_program) for _ in range(num_tasks)]
return futures
# 主函数
def main():
client = setup_client() # 设置Dask客户端
futures = submit_tasks(client) # 提交任务
# 等待所有任务完成
client.gather(futures)
# 关闭客户端连接
client.close()
if __name__ == "__main__":
main()
运行结果:
此时表示上述的可执行文件main已运行了32份。