Python学习(1):使用Python的Dask库实现并行计算

news2024/11/14 3:37:18

目录

一、Dask介绍

二、使用说明

安装

三、测试

1、单个文件中实现功能

2、运行多个可执行文件


最近在写并行计算相关部分,用到了python的Dask库。

Dask官网:Dask | Scale the Python tools you love

一、Dask介绍

Dask是一个灵活的并行和分布式计算库,旨在处理大规模数据集。它提供了类似于Pandas 和 NumPy 的数据结构,但能够有效处理比内存更大的数据集。通过使用Dask,可以在单台机器或分布式集群中运行,更方便处理大规模数据。

Dask是一个用于Python的并行计算模块,从单机多核扩展到拥有数千台机器的数据中心。它既由低级任务API,也有更高级面向数据的API。低级任务API支持Dask与多种Python库的集成,公共API为围绕Dask发展的各种工具的生态系统提供了基础。

Dask相较于Spark这些大数据处理框架,更轻量级。Dask更侧重与其他框架,如:Numpy、Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。

Dask存在三种最基本的数据结构,分别是:Arrays、Dataframes以及Bags

二、使用说明

安装

pip install dask
python -m pip install "dask[array]"
python -m pip install "dask[distributed]"
python -m pip install "dask[dataframe]"

先测试是否已经安装了模块,命令行进入到python3编辑器:

from dask.distributed import Client, progress

没有报缺少模块错误,则说明是可以正常执行的。

三、测试

1、单个文件中实现功能

下述的主要数据处理在定义计算任务函数calculate_value(num)中,即将计算任务函数处理32次。

from dask.distributed import Client, progress
import time

# 定义计算任务的函数
def calculate_value(num):
    num_float = float(num) * 0.33
    num_double = float(num) * 0.33  
    return num_float, num_double

# 设置Dask客户端
def setup_client():
    from dask.distributed import Client, LocalCluster
    
    cluster = LocalCluster()
    client = Client(cluster)
    
    scheduler_info = client.scheduler_info()
    ncores = sum(worker['nthreads'] for worker in scheduler_info['workers'].values())
    
    print(f"Connected to Dask cluster with {ncores} cores")
    
    return client

# 提交任务并收集结果
def submit_tasks(client, num, num_tasks=32):
    # 创建任务列表
    tasks = [client.submit(calculate_value, num) for _ in range(num_tasks)]
    
    # 等待所有任务完成,并显示进度
    progress(tasks)
    
    # 收集结果
    results = [task.result() for task in tasks]
    return results

# 主函数
def main():
    num = 558558571  # 这是您要处理的数字
    client = setup_client()  # 设置Dask客户端
    
    # 提交32个任务
    results = submit_tasks(client, num)
    
    # 打印结果
    for i, (num_float, num_double) in enumerate(results):
        print(f"Task {i+1} - num_float: {num_float}, num_double: {num_double}")
    
    # 关闭客户端连接
    client.close()

if __name__ == "__main__":
    main()

运行上述的python程序:

python3 my_dask_script.py

执行结果如下:

此时表示运行了32个task。

在运行的时候如果提示:

表明 dask-scheduler 无法启动,原因是端口 8787 已经被占用了。

解决方法:

1、查找并终止占用端口 8787 的进程

(1)先安装lsof:

apt install lsof

(2)查看占用端口进程:

lsof -i :8787

(3)通过进程的 PID 使用 kill 命令终止该进程:

kill -9 PID

2、修改 dask-scheduler 使用的端口

dask-scheduler --port 8888

再次重新启动查看 dask-scheduler 使用的端口:

dask-scheduler

2、运行多个可执行文件

我在同目录中创建了一个test.cc文件,为简单的打印数据,内容如下:

#include <iostream>
#include <iomanip>
 
int main() {
    int num = 558558571;
    float num_float = static_cast<float>(num) * 0.33;
    double num_double = static_cast<double>(num) * 0.33;
    
    std::cout << "num value: " << num << std::endl;
    std::cout << std::fixed << std::setprecision(2);
    std::cout << "num_float value: " << num_float << std::endl;
    std::cout << "num_double value: " << num_double << std::endl;
    return 0;
}

此时将上述的test.cc编译:

g++ -o main test.cc

然后新建一个my_dask_script.py文件,内容如下:

from dask.distributed import Client, LocalCluster
import os

# 定义执行外部程序的函数
def run_external_program():
    cmd = './main'  # 您的外部程序命令
    os.system(cmd)  # 使用os.system来执行命令

# 设置Dask客户端
def setup_client():
    from dask.distributed import Client, LocalCluster
    
    cluster = LocalCluster()
    client = Client(cluster)
    
    scheduler_info = client.scheduler_info()
    ncores = sum(worker['nthreads'] for worker in scheduler_info['workers'].values())
    
    print(f"Connected to Dask cluster with {ncores} cores")
    
    return client

# 提交任务到Dask集群
def submit_tasks(client, num_tasks=32):
    futures = [client.submit(run_external_program) for _ in range(num_tasks)]
    return futures

# 主函数
def main():
    client = setup_client()  # 设置Dask客户端
    futures = submit_tasks(client)  # 提交任务
    
    # 等待所有任务完成
    client.gather(futures)
    
    # 关闭客户端连接
    client.close()

if __name__ == "__main__":
    main()

运行结果:

此时表示上述的可执行文件main已运行了32份。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1989744.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言 ——— 学习并使用字符分类函数

目录 学习isupper函数 学习isdigit函数 学习tolower函数 将输入的字符串中把大写字母转换为小写字母并输出 学习isupper函数 参数部分&#xff1a; 形参需要传递的是一个字母&#xff0c;字符在ASCII码表上是以整型存储的&#xff0c;所以实参部分用(int c)没有问题 返回…

HarmonyOS笔记3:从网络数据接口API获取数据

面向HarmonyOS的移动应用一般采用MVVM模式&#xff08;见参考文献【1】&#xff09;&#xff0c;其中&#xff1a; M&#xff08;Model层)&#xff1a;模型层&#xff0c;存储数据和相关逻辑的模型。它表示组件或其他相关业务逻辑之间传输的数据。Model是对原始数据的进一步处理…

享界S9别乱选,定价有大玄机!

文 | AUTO芯球 作者 | 雷慢 享界S9刚上市&#xff0c; 就有人傻钱多、工作忙的老大哥来问我&#xff0c; 两个版本怎么选&#xff1f; 这不巧了吗&#xff0c;论华为车系&#xff0c;我是资深用户&#xff0c; 常开问界M9&#xff0c;试过智界S7&#xff0c;问界M7&#x…

PUA自己到无法自拔,或许是你过度信奉【优绩主义】

本文算是人文社科心理篇的第二期&#xff0c;不时发一些【理性】的【鸡汤】&#xff0c;或许对你认识社会本质有所帮助~ 一.定义 顾名思义&#xff0c;从理科生的角度来说&#xff0c;【优绩主义】以优秀的成绩作为评判人生是否成功的极大型指标&#xff0c;在东亚的【休息羞…

性能优化之自定义指令实现图片懒加载

1&#xff09;图片懒加载 是常见的用于在页面滚动时动态加载图片&#xff0c;而不是在页面加载时一次性加载所有图片。性能优化必备提高页面加载速度的手段&#xff0c;特别是在包含大量图片的网站上。 图片懒加载的原理&#xff0c;其实就是&#xff0c;当图片出现在视口内时…

数据分析:宏基因组的荟萃分析之MMUPHin

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 介绍 批次效应是实验中由于样本处理和测序技术变异引起的非生物学差异&#xff0c;可能干扰研究结果。这种效应难以完全消除&#xff0c;但可通过方法如PCA、PC…

【Vue3】Pinia getters

【Vue3】Pinia getters 背景简介开发环境开发步骤及源码 背景 随着年龄的增长&#xff0c;很多曾经烂熟于心的技术原理已被岁月摩擦得愈发模糊起来&#xff0c;技术出身的人总是很难放下一些执念&#xff0c;遂将这些知识整理成文&#xff0c;以纪念曾经努力学习奋斗的日子。本…

求职leetcode题目(6)

1.简化路径 解题思路: 根据题意&#xff0c;使用栈进行模拟即可。 具体的&#xff0c;从前往后处理 path&#xff0c;每次以 item 为单位进行处理&#xff08;有效的文件名&#xff09;&#xff0c;根据 item 为何值进行分情况讨论&#xff1a; item 为有效值 &#xff1a;存…

AQS框架

文章目录 概要AQS概述公平锁与非公平锁原理可重入 概要 假设现在需要写一个SDK层面的锁&#xff0c;应该如何实现呢&#xff1f; 初步的思路如下&#xff1a; 搞一个状态标记&#xff0c;用来表示持有或未持有锁&#xff0c;但得是 volatile 类型的保证线程可见性。编写一个 …

揭秘公司高效查快递的秘密武器

在快节奏的现代商务环境中&#xff0c;物流管理的效率直接关系到企业的运营成本和客户满意度。对于拥有大量快递业务往来的公司而言&#xff0c;如何快速、准确地追踪每一个包裹的物流信息&#xff0c;成为了一项至关重要的任务。今天&#xff0c;我们将揭秘一款公司高效查快递…

智慧农场数字港系统设计与实现

1 项目介绍 1.1 摘要 农业是一个国家的根本之一&#xff0c;也是国家经济、社会发展的重中之重&#xff0c;从“粮食第一”方针到农业生产市场化&#xff0c;再到乡村振兴、加强扶持农业技术创新和基础建设&#xff0c;我国的农业发展以及走过了几个阶段&#xff0c;并一直在…

Nature教你怎么用GPT做学术

ChatGPT如何助力学术写作&#xff1a;三个关键方式 生成性人工智能&#xff08;AI&#xff09;在近年来逐渐成为学术界的热门话题。Dritjon Gruda在2024年4月发表于《Nature》的一篇文章中&#xff0c;详细探讨了ChatGPT如何在学术写作、编辑和同行评审中提供帮助。这篇文章将…

第R2周:Pytorch实现:LSTM-火灾温度预测

nn.LSTM() 函数详解 nn.LSTM 是 PyTorch 中用于创建长短期记忆&#xff08;Long Short-Term Memory&#xff0c;LSTM&#xff09;模型的类。LSTM 是一种循环神经网络&#xff08;Recurrent Neural Network&#xff0c;RNN&#xff09;的变体&#xff0c;用于处理序列数据&#…

常见的框架漏洞

框架 Web框架(Web framework)或者叫做Web应⽤框架(Web application framework)&#xff0c;是⽤于 进⾏Web开发的⼀套软件架构。⼤多数的Web框架提供了⼀套开发和部署⽹站的⽅式。为Web的 ⾏为提供了⼀套⽀持⽀持的⽅法。使⽤Web框架&#xff0c;很多的业务逻辑外的功能不需要⾃…

微步社区帖子中使用编码数据调戏吃瓜群众初探

什么&#xff0c;居然有人在微步社区公然使用编码后的字符串调戏吃瓜群众。 在演练活动的的某一天&#xff0c;微步威胁情报社区突然流行多重编码后内容的帖子。作者本着为人民群众利益着想的目的&#xff0c;结合毕生所学&#xff0c;决定要将这些奇技淫巧和小把戏公之于众。…

R 语言学习教程,从入门到精通,R 判断语句(7)

1、R 判断语句 判断结构要求程序员指定一个或多个要评估或测试的条件&#xff0c;以及条件为真时要执行的语句&#xff08;必需的&#xff09;和条件为假时要执行的语句&#xff08;可选的&#xff09;。 下面是大多数编程语言中典型的判断结构的一般形式&#xff1a; R 语言…

嵌入式linux系统中USART应用实现

各位开发者大家好,今天主要给大家分享一下,如何在linux系统中使用UART串口的功能。 第一:串口的作用 UART:通用异步收发器简称串口。常用的调试:移植u-boot、内核时,主要使用串口查看打印信息。也可以外接各种模块。 第二:linux系统中的串口 接下来,我们来看一下,linu…

达梦数据库的系统视图v$mem_heap

达梦数据库的系统视图v$mem_heap 达梦数据库的V$MEM_HEAP视图提供了关于内存堆的信息&#xff0c;仅当系统启动时 MEMORY_LEAK_CHECK 为 1 时有效。这个视图通常包含内存堆的使用情况&#xff0c;包括堆的大小、已使用空间、空闲空间等。通过查询V$MEM_HEAP视图&#xff0c;用…

图书馆座位再利用小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;座位信息管理&#xff0c;座位预订管理&#xff0c;互勉信息管理&#xff0c;意见反馈管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&#xff0c;我的 开发…

[算法]第一集 递归(未完待续)

递归啊递归&#xff0c;说简单简单&#xff0c;说难难。 首先我们要知道 一、什么是递归&#xff1f; 我们再C语言和数据结构里都用了不少递归&#xff0c;这里就不多详细介绍。 递归简单来说就是函数自己调用自己的情况 二、为什么要用递归呢&#xff1f; 本质来说其实就…