并行计算框架Polars、Dask的数据处理性能对比

news2025/1/22 18:03:39

在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。

本文我们使用两个类似的脚本来执行提取、转换和加载(ETL)过程。

测试内容

这两个脚本主要功能包括:

从两个parquet 文件中提取数据,对于小型数据集,变量path1将为“yellow_tripdata/ yellow_tripdata_2014-01”,对于中等大小的数据集,变量path1将是“yellow_tripdata/yellow_tripdata”。对于大数据集,变量path1将是“yellow_tripdata/yellow_tripdata*.parquet”;

进行数据转换:a)连接两个DF,b)根据PULocationID计算行程距离的平均值,c)只选择某些条件的行,d)将步骤b的值四舍五入为2位小数,e)将列“trip_distance”重命名为“mean_trip_distance”,f)对列“mean_trip_distance”进行排序

将最终的结果保存到新的文件

脚本

1、Polars

数据加载读取

 def extraction():
     """
     Extract two datasets from parquet files
     """
     path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"
     df_trips= pl_read_parquet(path1,)
     path2 = "taxi+_zone_lookup.parquet"
     df_zone = pl_read_parquet(path2,)
 
     return df_trips, df_zone
 
 def pl_read_parquet(path, ):
     """
     Converting parquet file into Polars dataframe
     """
     df= pl.scan_parquet(path,)
     return df

转换函数

 def transformation(df_trips, df_zone):
     """
     Proceed to several transformations
     """
     df_trips= mean_test_speed_pl(df_trips, )
     
     df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)
     df = df.select(["Borough","Zone","trip_distance",])
   
     df = get_Queens_test_speed_pd(df)
     df = round_column(df, "trip_distance",2)
     df = rename_column(df, "trip_distance","mean_trip_distance")
 
     df = sort_by_columns_desc(df, "mean_trip_distance")
     return df
 
 
 def mean_test_speed_pl(df_pl,):
     """
     Getting Mean per PULocationID
     """
     df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
     return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
     """
     Only getting Borough in Queens
     """
 
     df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
     return df_pl
 
 def round_column(df, column,to_round):
     """
     Round numbers on columns
     """
     df = df.with_columns(pl.col(column).round(to_round))
     return df
 
 def rename_column(df, column_old, column_new):
     """
     Renaming columns
     """
     df = df.rename({column_old: column_new})
     return df
 
 def sort_by_columns_desc(df, column):
     """
     Sort by column
     """
     df = df.sort(column, descending=True)
     return df

保存

 def loading_into_parquet(df_pl):
     """
     Save dataframe in parquet
     """
     df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')

其他代码

 import polars as pl
 import time
 
 def pl_read_parquet(path, ):
     """
     Converting parquet file into Polars dataframe
     """
     df= pl.scan_parquet(path,)
     return df
 
 def mean_test_speed_pl(df_pl,):
     """
     Getting Mean per PULocationID
     """
     df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
     return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
     """
     Only getting Borough in Queens
     """
 
     df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
     return df_pl
 
 def round_column(df, column,to_round):
     """
     Round numbers on columns
     """
     df = df.with_columns(pl.col(column).round(to_round))
     return df
 
 def rename_column(df, column_old, column_new):
     """
     Renaming columns
     """
     df = df.rename({column_old: column_new})
     return df
 
 
 def sort_by_columns_desc(df, column):
     """
     Sort by column
     """
     df = df.sort(column, descending=True)
     return df
 
 
 def main():
     
     print(f'Starting ETL for Polars')
     start_time = time.perf_counter()
 
     print('Extracting...')
     df_trips, df_zone =extraction()
        
     end_extract=time.perf_counter() 
     time_extract =end_extract- start_time
 
     print(f'Extraction Parquet end in {round(time_extract,5)} seconds')
     print('Transforming...')
     df = transformation(df_trips, df_zone)
     end_transform = time.perf_counter() 
     time_transformation =time.perf_counter() - end_extract
     print(f'Transformation end in {round(time_transformation,5)} seconds')
     print('Loading...')
     loading_into_parquet(df,)
     load_transformation =time.perf_counter() - end_transform
     print(f'Loading end in {round(load_transformation,5)} seconds')
     print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")
 
 
 if __name__ == "__main__":
     
     main()

2、Dask

函数功能与上面一样,所以我们把代码整合在一起:

 import dask.dataframe as dd
 from dask.distributed import Client
 import time
 
 def extraction():
     path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"
     df_trips = dd.read_parquet(path1)
     path2 = "taxi+_zone_lookup.parquet"
     df_zone = dd.read_parquet(path2)
 
     return df_trips, df_zone
 
 def transformation(df_trips, df_zone):
     df_trips = mean_test_speed_dask(df_trips)
     df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")
     df = df[["Borough", "Zone", "trip_distance"]]
 
     df = get_Queens_test_speed_dask(df)
     df = round_column(df, "trip_distance", 2)
     df = rename_column(df, "trip_distance", "mean_trip_distance")
 
     df = sort_by_columns_desc(df, "mean_trip_distance")
     return df
 
 def loading_into_parquet(df_dask):
     df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")
 
 def mean_test_speed_dask(df_dask):
     df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})
     return df_dask
 
 def get_Queens_test_speed_dask(df_dask):
     df_dask = df_dask[df_dask["Borough"] == "Queens"]
     return df_dask
 
 def round_column(df, column, to_round):
     df[column] = df[column].round(to_round)
     return df
 
 def rename_column(df, column_old, column_new):
     df = df.rename(columns={column_old: column_new})
     return df
 
 def sort_by_columns_desc(df, column):
     df = df.sort_values(column, ascending=False)
     return df
 
 
 
 def main():
     print("Starting ETL for Dask")
     start_time = time.perf_counter()
 
     client = Client()  # Start Dask Client
 
     df_trips, df_zone = extraction()
 
     end_extract = time.perf_counter()
     time_extract = end_extract - start_time
 
     print(f"Extraction Parquet end in {round(time_extract, 5)} seconds")
     print("Transforming...")
     df = transformation(df_trips, df_zone)
     end_transform = time.perf_counter()
     time_transformation = time.perf_counter() - end_extract
     print(f"Transformation end in {round(time_transformation, 5)} seconds")
     print("Loading...")
     loading_into_parquet(df)
     load_transformation = time.perf_counter() - end_transform
     print(f"Loading end in {round(load_transformation, 5)} seconds")
     print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")
 
     client.close()  # Close Dask Client
 
 if __name__ == "__main__":
     main()

测试结果对比

1、小数据集

我们使用164 Mb的数据集,这样大小的数据集对我们来说比较小,在日常中也时非常常见的。

下面是每个库运行五次的结果:

Polars

Dask

2、中等数据集

我们使用1.1 Gb的数据集,这种类型的数据集是GB级别,虽然可以完整的加载到内存中,但是数据体量要比小数据集大很多。

Polars

Dask

3、大数据集

我们使用一个8gb的数据集,这样大的数据集可能一次性加载不到内存中,需要框架的处理。

Polars

Dask

总结

从结果中可以看出,Polars和Dask都可以使用惰性求值。所以读取和转换非常快,执行它们的时间几乎不随数据集大小而变化;

可以看到这两个库都非常擅长处理中等规模的数据集。

由于polar和Dask都是使用惰性运行的,所以下面展示了完整ETL的结果(平均运行5次)。

Polars在小型数据集和中型数据集的测试中都取得了胜利。但是,Dask在大型数据集上的平均时间性能为26秒。

这可能和Dask的并行计算优化有关,因为官方的文档说“Dask任务的运行速度比Spark ETL查询快三倍,并且使用更少的CPU资源”。

上面是测试使用的电脑配置,Dask在计算时占用的CPU更多,可以说并行性能更好。

https://avoid.overfit.cn/post/74128cd8803b43f2a51ca4ff4fed4a95

作者:Luís Oliveira

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/730631.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】-Linux部署Javaweb项目

作者:学Java的冬瓜 博客主页:☀冬瓜的主页🌙 专栏:【Linux】 分享: 屋檐如悬崖 风铃如沧海 我等燕归来 时间被安排 演一场意外 你悄然走开 故事在城外 浓雾散不开 看不清对白 你听不出来 风声不存在 是我在感慨 梦想来是谁在窗台 …

【服务器】Python一行命令搭建HTTP服务器并外网访问 - 内网穿透

文章目录 1.前言2.本地http服务器搭建2.1.Python的安装和设置2.2.Python服务器设置和测试 3.cpolar的安装和注册3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 转载自cpolar极点云文章:【Python】快速简单搭建HTTP服务器并公网访问「cpolar内网穿透」…

【专题速递】音频生成、TTS和AIGC在音乐上的运用

// AIGC的发展为音频带来了什么?AIGC如何赋能音乐创作?如何识别虚假音频?TTS可以在哪种场景下解决特定问题?7月29日LiveVideoStackCon2023上海站音频新体验专场,为您解答。 音频新体验 随着多媒体和通信网络技术的不…

开利网络受邀参与广州三会企业数字化转型专题研讨会

​7月6日,开利网络受邀出席由广州三会于广州市黄埔区组织的“广州三会第六届理事会第八次会长联席会议”,并进行了主题为“企业数字化转型如何推动企业价值再造?”的专题分享会,为各位参会来宾分享企业数字化转型常见误区及数字化…

【IC设计】ICC1 workshop lab guide 学习笔记——Lab 2 Design Planning Task5-9

文章目录 ICC1 workshop lab guide2.5 Create P/G Rings Around Macro Groups2.6 Power Network Synthesis2.7 Check the Timing2.8 Write Out the DEF Floorplan File2.9 Create 2nd Pass Design Ready for Placement ICC1 workshop lab guide 2.5 Create P/G Rings Around M…

uniapp 发送全文件 支持App端ios、android,微信小程序,H5

由于uniapp提供的API在app端只能上传图片和视频,不能上传其他文件,说以只能借助插件了。 ios端用的这个插件 获取到文件对象 免费的 这个是返回一个 filePath 可用直接用于 uni.uploadFile 上传的路径,后面自己又改的File对象 全文件上传选择…

CAD绘制三维升旗台

首先绘制长方体的底座 用交叉对角线来定位,绘制一个小一点的矩形,用来定位 大概的效果: 沿着矩形的一个角绘制三个长方体,形成护栏 用阵列或者复制等形成四个角的护栏 旋转,换成真实的效果图: 添加一个圆…

文件共享平台Pingvin Share

本文完成于 2 月上旬。最近正好应网友要求折腾了 ClamAV,所以翻出来一起发了,可以作为 ClamAV 的一个应用示例; 什么是 Pingvin Share ? Pingvin Share 是自托管文件共享平台,是 WeTransfer 的替代品。使用 Pingvin Sh…

【C语言基础】遍历

(꒪ꇴ꒪(꒪ꇴ꒪ ),我是祐言博客主页:C语言基础,Linux基础,软件配置领域博主🌍快上🚘,一起学习!送给读者的一句鸡汤🤔:集中起来的意志可以击穿顽石!作者水平很有限,如果发现错误&…

S7-1200与ABB机器人进行SOCKET通信的具体方法示例

S7-1200与ABB机器人进行SOCKET通信的具体方法示例 SOCKET通信是一种基于TCP/IP协议的通信方式,提供了程序内部与外界通信的端口并为通信双方提供了数据传输通道。 ABB机器人实现SOCKET通信必须要在Communication选项中勾选616-1 PC Interface选项功能。 具体方法可参考以下内容…

SpringBoot+Vue酒店客房管理系统

💕💕作者:程序员徐师兄 个人简介:7 年大厂程序员经历,擅长Java、微信小程序、Python、Android等,大家有这一块的问题可以一起交流! 各类成品java毕设 。javaweb,ssh,ssm&…

从YOLOv1到YOLOv8的YOLO系列最新综述【2023年4月】

作者:Juan R. Terven 、Diana M. Cordova-Esparaza 摘要:YOLO已经成为机器人、无人驾驶汽车和视频监控应用的核心实时物体检测系统。我们对YOLO的演变进行了全面的分析,研究了从最初的YOLO到YOLOv8每次迭代的创新和贡献。我们首先描述了标准…

Python 中的二维插值

本文展示了如何在 Python 中进行插值,并研究了不同的 2d 实现方法。 我们将讨论用于双变量插值的有用函数,例如 scipy.interpolate.interp2d、numpy.meshgrid 和 Python 中使用的用于平滑/插值 (RBF) 的径向基函数。 我们将使用 SciPy 和 Numpy 库实现插…

树莓派配置ubuntu server 22.04环境

背景 比起raspberry系统ubuntu更通用,结合公司项目开发需要,将树莓派4B刷上ubuntu server系统,并且安装LXDE桌面环境。 一波next 烧写镜像 用树莓派镜像烧录软件安装比较简单,选择操作系统:Other general-purpose O…

python psutil模块常用方法

psutil 是一个功能强大的跨平台第三方库,用于检索系统相关信息和进程管理。它提供了一些方便的函数和方法,可以获取 CPU 使用率、内存使用情况、磁盘信息、网络统计数据以及进程列表等。 1. 安装psutil pip install psutil2. 获取 CPU 使用率 import p…

flutter聊天界面-聊天列表 下拉加载更多历史消息

flutter聊天界面-聊天列表 下拉加载更多历史消息 在之前实现了flutter聊天界面的富文本展示内容、自定义表情键盘实现、加号【➕】更多展开相机、相册等操作Panel、消息气泡展示实现Flexible。这里把实现的聊天界面的滑动列表及下拉加载更多历史消息记录一下 聊天界面的列表使…

MySQL索引优化原则和失效情况

目录 1. 全值匹配2. 最佳左前缀法则3. 不要在索引列上做任何计算4. 范围之后全失效5. 尽量使用覆盖索引6. 使用不等于&#xff08;!或<>&#xff09;会使索引失效7. is null 或 is not null也无法使用索引8. like通配符以%开头会使索引失效9. 字符串不加单引号导致索引失…

程序员的悲哀是什么?

点击下方“JavaEdge”&#xff0c;选择“设为星标” 第一时间关注技术干货&#xff01; 免责声明~ 切记&#xff0c;任何文章不要过度深思&#xff08;任何东西都无法经得起审视&#xff0c;因为这世上没有同样的成长环境&#xff0c;也没有同样的认知水平同时也「没有适用于所…

大模型高效训练基础知识:梯度累积(Gradient Accumulationn)

梯度累积 梯度累积&#xff08;Gradient Accumulation&#xff09;的基本思想是将一次性的整批参数更新的梯度计算变为以一小步一小步的方式进行&#xff08;如下图&#xff09;&#xff0c;具体而言该方法以小批次的方式进行模型前向传播和反向传播&#xff0c;过程中迭代计算…

变革管理中的几个不错的模型小结

其他的变革模型&#xff1a; 变革管理流程&#xff1a;