基于DPU和HADOS-RACE加速Spark 3.x

news2024/10/7 16:15:31

背景简介

Apache Spark(下文简称Spark)是一种开源集群计算引擎,支持批/流计算、SQL分析、机器学习、图计算等计算范式,以其强大的容错能力、可扩展性、函数式API、多语言支持(SQL、Python、Java、Scala、R)等特性在大数据计算领域被广泛使用。其中,Spark SQL 是 Spark 生态系统中的一个重要组件,它允许用户以结构化数据的方式进行数据处理,提供了强大的查询和分析功能。

随着SSD和万兆网卡普及以及IO技术的提升,CPU计算逐渐成为Spark 作业的瓶颈,而IO瓶颈则逐渐消失。 有以下几个原因,首先,因为 JVM 提供的 CPU 指令级的优化如 SIMD要远远少于其他 Native 语言(如C/C++,Rust)导致基于 JVM 进行 CPU 指令的优化比较困难。其次,NVMe SSD缓存技术和AQE带来的自动优化shuffle极大的减轻了IO延迟。最后,Spark的谓词下推优化跳过了不需要的数据,进一步减少了IO开销。

基于此背景,Databricks(Spark背后的商业公司)在2022年SIGMOD会议上发表论文《Photon: A Fast Query Engine for Lakehouse Systems》,其核心思想是使用C++、向量化执行等技术来执行Spark物理计划,在客户工作负载上获得了平均3倍、最大10倍的性能提升,这证明Spark向量化及本地化是后续值得优化的方向。 Spark3.0(2020年6月发布)开始支持了数据的列式处理,英伟达也提出了利用GPU加速Spark的方案,利用GPU的列式计算和并发能力加速Join、Sort、Aggregate等常见的ETL操作。

DPU(Data Processing Unit) 作为未来计算的三大支柱之一,其设计旨在提供强大的计算能力,以加速各种数据处理任务。DPU的硬件加速能力,尤其在数据计算、数据过滤等计算密集型任务上,为处理海量数据提供了新的可能。通过高度定制和优化的架构,DPU能够在处理大规模数据时显著提升性能,为数据中心提供更高效、快速的计算体验,从而满足现代数据处理需求的挑战。但是目前DPU对Spark生态不能兼容,Spark计算框架无法利用DPU的计算优势。

中科驭数HADOS 异构计算加速软件平台(下文简称HADOS)是一款敏捷异构软件平台,能够为网络、存储、安全、大数据计算等场景进行提速。对于大数据计算场景,HADOS可以认为是一个异构执行库,提供了数据类型、向量数据结构、表达式计算、IO和资源管理等功能。 为了发挥Spark与DPU各自的优势,基于HADOS平台,我们开发了RACE算子卸载引擎,既能够发挥Spark优秀的分布式调度能力又可以发挥DPU的向量化执行能力。

我们通过实验发现,将Spark SQL的计算任务通过RACE卸载到DPU上, 预期可以把原生SparkSQL的单表达式的执行效率提升至9.97倍,TPC-DS单Query提升最高4.56倍。本文将介绍如何基于 DPU和RACE来加速 Spark SQL的查询速度,为大规模数据分析和处理提供更可靠的解决方案。

整体架构

整个解决方案可以参考下图:

• 最底层硬件资源层是DPU硬件,是面向数据中心的专用处理器,其设计旨在提供强大的计算能力,以加速各种数据处理任务,尤其是优化Spark等大数据框架的执行效率。通过高度定制和优化的架构,DPU能够在处理大规模数据时显著提升性能,为数据中心提供更高效、快速的计算体验。

• DPU加速层底层是HADOS异构计算加速软件平台,是中科驭数推出的专用计算敏捷异构软件开发平台。HADOS数据查询加速库通过提供基于列式数据的查询接口,供数据查询应用。支持Java、Scala、C和C++语言的函数调用,主要包括列数据管理、数据查询运行时函数、任务调度引擎、函数运算代价评估、内存管理、存储管理、硬件管理、DMA引擎、日志引擎等模块,目前对外提供数据管理、查询函数、硬件管理、文件存储相关功能API。

• DPU加速层中的RACE层,其最核心的能力就是修改执行计划树,通过 Spark Plugin 的机制,将Spark 执行计划拦截并下发给 DPU来执行,跳过原生 Spark 不高效的执行路径。整体的执行框架仍沿用 Spark 既有实现,包括消费接口、资源和执行调度、查询计划优化、上下游集成等。

• 最上层是面向用户的原生Spark,用户可以直接使用已有的业务逻辑,无感享受DPU带来的性能提升

目前支持的算子覆盖Spark生产环境常用算子,包括Scan、Filter、Project、Union、Hash Aggregation、Sort、Join、Exchange等。表达式方面,我们开发了目前生产环境常用的布尔函数、Sum/Count/AVG/Max/Min等聚合函数。

其中RACE层的架构如下:

下面我们着重介绍RACE层的核心功能。

核心功能模块

RACE与Spark的集成

RACE作为Spark的一个插件,实现了SparkPlugin接口,与Spark的集成分为Driver端和Executor端。

• 在Driver端, 通过Spark Catalyst扩展点插入自定义的规则,实现对查询语句解析过程、优化过程以及物理计划转换过程的控制。

• 在Executor端, 插件在Executor的初始化过程中完成DPU设备的初始化工作。

Plan Conversion

Spark SQL在优化 Physical Plan时,会应用一批规则,RACE通过插入的自定义规则可以拦截到优化后的Physical Plan,如果发现当前算子上的所有表达式可以下推给DPU,那么替换Spark原生算子为相应的可以在DPU上执行的自定义算子,由HADOS将其下推给DPU 来执行并返回结果。

Fallback

Spark支持的Operator和Expression非常多,在RACE研发初期,无法 100% 覆盖 Spark 查询执行计划中的算子和表达式,因此 RACE必须有Fallback机制,支持Spark 查询执行计划中部分算子不运行在DPU上。

对于DPU无法执行的算子,RACE安排 Fallback 回正常的 Spark 执行路径进行计算。例如,下图中展示了插件对原生计划树的修改情况,可以下推给DPU的算子都替换成了对应的"Dpu"开头的算子,不能下推的算子仍然保留。除此之外,会自动插入行转列算子或者列转行算子来适配数据格式的变化。

当然了,不管是行转列算子还是列转行算子,都是开销比较大的算子,随着RACE支持的算子和表达式越来越多,Fallback的情况会逐渐减少。

Strategy

当查询计划中存在未卸载的算子时,因为这样引入了行列转换算子,由于其带来了额外的开销,导致即使对于卸载到DPU上的算子,其性能得到提升,而对于整个查询来说,可能会出现比原生Spark更慢的情况。 针对这种情况,最稳妥的方式就是整个Query全部回退到CPU,这至少不会比原生Spark慢,这是很重要的。

由于Spark3.0加入了AQE的支持,规则通常拦截到的是一个个QueryStage,它是Physical Plan的一部分而非完整的 Physical Plan。 RACE的策略是获取AQE规则介入之前的整个Query的 Physical Plan,然后分析该Physical Plan中的算子是否全部可卸载。如果全部可以卸载,则对QueryStage进行Plan Conversion, 如果不能全部卸载,则跳过Plan Conversion转而直接交给Spark处理。

我们在实际测试过程中发现,一些算子例如Take操作,它需要处理的数据量非常小,那么即使发生Fallback,也不会有很大的行列转换开销,通过白名单机制忽略这种算子,防止全部回退到CPU,达到加速目的。

Metrics

RACE会收集DPU执行过程中的指标统计,然后上报给Spark的Metrics System做展示,以方便Debug和系统调优。

Native Read&Write

SparkSQL的Scan算子支持列式读取,但是Spark的向量与DPU中定义的向量不兼容,需要在JVM中进行一次列转行然后拷贝到DPU中,这会造成巨大的IO开销。我们主要有以下优化:

1. 减少行列转换:对于Parquet格式等列式存储格式的文件读取,SparkSQL采用的是按列读取的方式,即Scan算子是列式算子,但是后续数据过滤等数据处理算子均是基于行的算子,SparkSQL必须把列式数据转换为行式数据,这会导致额外的计算开销。而本方案由于都是列式计算的算子,因此无需这种行列转换。

2. 减少内存拷贝: RACE卸载Scan算子到HADOS平台,HADOS平台的DPUScan算子以Native库的方式加载磁盘数据直接复制到DPU,省去了JVM到DPU的拷贝开销

3. 谓词下推支持:DPUScan也支持ColumnPruning规则,这个规则会确保只有真正使用到的字段才会从这个数据源中提取出来。支持两种Filter:PartitionFilters和PushFilters。PartitionFilters可以过滤掉无用的分区, PushFilters把字段直接下推到Parquet文件中去

4. 同时,文件的写出也进行了类似的优化

注意,这些优化仍然需要对数据进行一次复制,DPU直接读取磁盘是一个后续的优化方向。

加速效果

TPC-DS 单Query加速

单机单线程local模式场景,在1T数据集下,TPC-DS语句中有5条语句E2E时间提升比例超过2倍,最高达到4.56倍:

运算符加速效果

运算符的性能提升,DPU运算符相比Spark原生的运算符的加速比最高达到9.97。

算子加速效果

TPC-DS的测试中,向对于原生Spark解决方案,本方案Filter算子性能最高提高到了43倍,哈希聚合算子提升了13倍。这主要是因为我们节省了列式数据转换为行式数据的开销以及DPU运算的加速。

CPU资源使用情况

CPU资源从平均60%下降到5%左右

原生Spark方案CPU使用情况:

基于RACE和DPU加速后,CPU使用情况:

总结与展望

通过把Spark的计算卸载到DPU加速器上,在用户原有代码无需变更的情况下,端到端的性能可以得到2-5倍的提升,某些算子能达到43倍性能提升,同时CPU资源使用率从60%左右下降到5%左右,显著提升了原生SparkSQL的执行效率。DPU展现了强大的计算能力,对于端到端的分析,会有一些除去算子之外的因素影响整体运行时间,包括磁盘IO,网络Shuffle以及调度的Overhead。这些影响因素将来可以逐步去做特定的优化,例如:

1. 算子的Pipeline执行

原生Spark的算子Pipeline执行以及CodeGen都是Spark性能提升的关键技术,当前,我们卸载到DPU中的计算还没有支持Pipeline以及CodeGen。未来这两个技术的加入,是继续提升Spark的执行效率的一个方向。

2. 读数据部分,通过DPU卡直读磁盘数据来做优化

我们还可以通过DPU卡直接读取硬盘数据,省去主机DDR到DPU卡DDR的数据传输时间,以达到性能提升的效果,可以参考英伟达的GPU对磁盘读写的优化,官方数据CSV格式的文件读取可优化20倍左右。

3. RDMA技术继续提升Shuffle性能

对于Shuffle占比很高的作业,可以通过内存Shuffle以及RDMA技术,来提升整个Shuffle的过程,目前已经实现内存Shuffle,未来我们还可以通过RDMA技术直读远端内存数据,从而完成整个Shuffle链路的优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1466297.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++11新特性 lambda表达式与模板函数 std::make_shared

一&#xff1a;make_shared example1: auto l_size make_shared<std::array<int, 2> >(); example2: m_timeHandlePtr make_shared<svTimerHandle>(renderPtr->GetRenderWindow()->GetInteractor(), m_BatchCallBack); C11 中引入了智能指针, 同…

JavaWeb——004Maven SpringBootWeb入门

一、Maven 1、什么是maven&#xff1f; 2、Maven的作用是什么&#xff1f;&#xff08;3种&#xff09; 1.1、方便的依赖管理 依赖管理&#xff1a;有了Maven&#xff0c;我们就不用再手动导入Jar包了&#xff0c;我们只需要在配置文件当中&#xff0c;简单描述一下项目所需要…

JavaSec 之 XXE 简单了解

文章目录 XMLReaderSAXReaderSAXBuilderDocumentBuilderUnmarshaller**SAXParserFactory**XMLReaderFactoryDigester总结 XMLReader public String XMLReader(RequestBody String content) {try {XMLReader xmlReader XMLReaderFactory.createXMLReader();// 修复&#xff1a…

【C++精简版回顾】6.构造函数

一。类的四种初始化方式 1.不使用构造函数初始化类 使用函数引用来初始化类 class MM { public:string& getname() {return name;}int& getage() {return age;}void print() {cout << "name: " << name << endl << "age: &quo…

【2024软件测试面试必会技能】Charles(6):Charles设置弱网

设置弱网&#xff08;慢网速&#xff09; 方法一&#xff1a;点击Charles 上方的乌龟标志&#xff0c;模拟网络延迟&#xff1b; 方法二&#xff1a;点击Proxy——Throttle Settings——勾选Enable Throttling——再勾选Only for selected hosts——点击Add,设置指定的域名——…

探索Promise异步模式抽象的变体——Promise.race篇

如果阅读有疑问的话&#xff0c;欢迎评论或私信&#xff01;&#xff01; 本人会很热心的阐述自己的想法&#xff01;谢谢&#xff01;&#xff01;&#xff01; 文章目录 前言初识Promise.race探索Promise.raceAPI实例 前言 在本栏前一篇Promise.all中&#xff0c;我们可以实…

Panic与Recover:Go异常处理的救命稻草

Panic与Recover&#xff1a;Go异常处理的救命稻草 异常处理是每个程序员都应该关注的重要问题。在Go语言中&#xff0c;Panic和Recover是用于异常处理的两个关键概念。Panic用于触发异常&#xff0c;而Recover用于捕获和处理异常。本文将深入探讨Panic和Recover的区别&#xff…

面试redis篇-05双写一致

原理 双写一致性:当修改了数据库的数据也要同时更新缓存的数据,缓存和数据库的数据要保持一致 读操作:缓存命中,直接返回;缓存未命中查询数据库,写入缓存,设定超时时间写操作:延迟双删方案一:分布式锁,一致性要求高

安全生产:AI视频智能分析网关V4如何应用在企业安全生产场景中?

随着科技的不断进步&#xff0c;视频智能分析技术在安全生产领域中的应用越来越广泛。这种技术通过计算机视觉和人工智能算法&#xff0c;可以对监控视频进行自动分析和处理&#xff0c;以实现多种功能&#xff0c;如目标检测、行为识别、异常预警等。今天我们以TSINGSEE青犀AI…

PHP实现分离金额和其他内容便于统计计算

得到的结果可以粘贴到excel计算 <?php if($_GET["x"] "cha"){ $tips isset($_POST[tips]) ? $_POST[tips] : ; $pattern /(\d\.\d|\d)/; $result preg_replace($pattern, "\t\${1}\t", $tips); echo "<h2><strong>数…

如何快速卸载windows电脑的一些软件?

本系列是一些电脑常规操作的普及&#xff0c;有需要借鉴即可 注&#xff1a;每个电脑都会有差异&#xff0c;参考即可。 其实大部分软件你删除桌面上的图标不等于删除&#xff0c;因为桌面上的那个图标就是一个简单的快捷方式而已。 在这里插入图片描述 那如何正确的卸载软件呢…

实验室预约|实验室预约小程序|基于微信小程序的实验室预约管理系统设计与实现(源码+数据库+文档)

实验室预约小程序目录 目录 基于微信小程序的实验室预约管理系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、微信小程序前台 2、管理员后台 &#xff08;1&#xff09;管理员登录 &#xff08;2&#xff09;实验室管理 &#xff08;3&#xff09;公告信息…

【初始RabbitMQ】高级发布确认的实现

在生产环境中由于一些不明原因&#xff0c;导致 rabbitmq 重启&#xff0c;在 RabbitMQ 重启期间生产者消息投递失败&#xff0c; 导致消息丢失&#xff0c;需要手动处理和恢复。于是&#xff0c;我们开始思考&#xff0c;如何才能进行 RabbitMQ 的消息可靠投递呢&#xff1f; …

Jmeter学习系列之六:阶梯加压线程组Stepping Thread Group详解

性能测试中,有时需要模拟一种实际生产中经常出现的情况,即:从某个值开始不断增加压力,直至达到某个值,然后持续运行一段时间。 在jmeter中,有这样一个插件,可以帮我们实现这个功能,这个插件就是:Stepping Thread Group 1、下载配置方法 1.1.下载配置 插件下载地址:…

dell戴尔电脑灵越系列Inspiron 15 3520原厂Win11系统中文版/英文版

Dell戴尔笔记本灵越3520原装出厂Windows11系统包&#xff0c;恢复出厂开箱预装OEM系统 链接&#xff1a;https://pan.baidu.com/s/1mMOAnvXz5NCDO_KImHR5gQ?pwd3nvw 提取码&#xff1a;3nvw 原厂系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、Office办公软件、MyD…

【Java】接口及其实现(实验四)

目录 一、实验目的 二、实验内容 三、实验小结 一、实验目的 了解接口的作用掌握接口的定义与实现掌握接口的回调 二、实验内容 1. 定义一个接口Human&#xff0c;其中有一无参的、返回类型为void的方法speak&#xff08;&#xff09;&#xff1b;定义类Student实现接口&a…

大模型分布式训练方法FDSP和DeepSpeed

备注&#xff1a; 本文部分内容参考自其他作者的内容&#xff0c;如有不妥&#xff0c;请联系&#xff0c;立即删除。 pytorch单精度、半精度、混合精度、单卡、多卡&#xff08;DP / DDP&#xff09;、FSDP、DeepSpeed模型训练 相关代码&#xff1a;pytorch-model-train-temp…

高录用快见刊【最快会后两个月左右见刊】第三届社会科学与人文艺术国际学术会议 (SSHA 2024)

第三届社会科学与人文艺术国际学术会议 (SSHA 2024) 2024 3rd International Conference on Social Sciences and Humanities and Arts *文章投稿均可免费参会 *高录用快见刊【最快会后两个月左右见刊】 重要信息 会议官网&#xff1a;icssha.com 大会时间&#xff1a;202…

【Wio Terminal】使用WiFi(1)- 更新无线核心固件

使用WiFi&#xff08;1&#xff09;- 更新无线核心固件 一、概述1、更新无线核心固件步骤 1 - 擦除初始出厂固件步骤 2 - 刷入最新的固件 2、从Arduino IDE检查RTL8720固件版本安装rpcWiFi库验证 3、更新 SAMD ArduinoCore 一、概述 这篇wiki介绍了如何为Wio Terminal上的Real…

【Vuforia+Unity】AR01实现单张多张图片识别(Image Targets)召唤数字内容

1.官网注册 Home | Engine Developer Portal 2.下载插件SDK&#xff0c;导入Unity 3.官网创建数据库上传图片&#xff0c;官网处理成数据 下载好导入Unity&#xff01; 下载好导入Unity&#xff01; 下载好导入Unity&#xff01; 下载好导入Unity&#xff01; 4.在Unity设…