实战干货|Spark 在袋鼠云数栈的深度探索与实践

news2024/12/24 21:24:51

Spark 是一个快速、通用、可扩展的大数据计算引擎,具有高性能、易用、容错、可以与 Hadoop 生态无缝集成、社区活跃度高等优点。在实际使用中,具有广泛的应用场景:

· 数据清洗和预处理:在大数据分析场景下,数据通常需要进行清洗和预处理操作以确保数据质量和一致性,Spark 提供了丰富的 API,可以对数据进行清洗、过滤、转换等操作

· 批处理分析:Spark 适用于各种应用场景下的批处理任务,包括统计分析、数据挖掘、特征提取等,用户可以利用 Spark 强大的 API 和内置库进行复杂的数据处理和分析,从而挖掘数据中的内在价值

· 交互式查询:Spark 提供了支持 SQL 查询的 Spark SQL 模块,用户可以使用标准的 SQL 语句进行交互式查询和大规模数据分析

Spark 在袋鼠云的使用

在袋鼠云数栈离线开发平台,我们提供了三种使用 Spark 的方式:

● 创建 Spark SQL 任务

用户可以直接通过编写 SQL 的方式实现自己的业务逻辑。这种方式是目前数栈离线平台使用 Spark 最广泛的方式,也是最为推荐的一种方式。

● 创建 Spark Jar 任务

用户需要在 IDEA 上使用 Scala 或者 Java 语言实现业务逻辑,然后对该项目进行编译打包,并将得到的 Jar 包上传到离线平台,随后在创建 Spark Jar 任务的时候引用这个 Jar 包,最后将任务提交到调度运行即可。

对于使用 SQL 难以实现或表达的需求,或者用户有其他更深层次的需求,Spark Jar 任务无疑给用户提供了一种更为灵活的使用 Spark 的方式。

● 创建 PySpark 任务

用户可以直接编写对应的 Python 代码。在我们的客户群体中,有相当一部分客户,他们除了 SQL 之外,Python 可能是他们的主力语言。特别是针对有一定数据分析基础、算法基础的用户,他们往往会对处理好的数据进行更深层的分析,此时 PySpark 任务自然是他们的不二之选。

Spark 在袋鼠云数栈离线开发平台发挥着重要的作用,因此,我们内部对 Spark 做了也不少的优化,使客户在使用 Spark 提交任务时更加方便。我们还基于 Spark 做了一些工具来增强整个数栈离线开发平台的功能。

除此之外,在数据湖场景下,Spark 也发挥着相当重要的作用。在袋鼠云的湖仓一体模块中,已经支持了 Iceberg 和 Hudi 两大数据湖,用户可以使用 Spark 对湖表进行读写,湖表的治理底层也是通过使用 Spark 调用不同的存储过程实现。

下文就将从引擎侧和 Spark 本身两个方面来阐述袋鼠云内部所做的优化。

引擎端优化

袋鼠云内部引擎端的功能主要是用于任务提交、任务状态获取、任务日志获取、停止任务、语法校验等。每个功能点我们都做了不同程度的优化,下文通过两个例子进行简单介绍。

Spark on Yarn 提交速度提升

随着引擎端 Spark 插件上新功能的不断开发和完善,引擎端提交 Spark 任务所需的时间也在相应的增加,因此需要对提交 Spark 任务相关的代码进行优化,以缩短 Spark 任务提交的时长,提升用户体验。

为此,我们做了以下工作,对于一些公用的配置文件,如 core-site.xml、yarn-site.xml、keytab 文件、spark-sql-application.jar 等,原来每次提交任务都需要预先从服务器下载并提交这些配置文件。现在经过优化后,上述文件仅仅需要在客户端 SparkYarnClient 初始化的时候下载一次,然后上传到指定的 HDFS 路径,后续提交 Spark 任务只需要通过参数的方式指定到对应的 HDFS 路径即可。通过这种方式大大缩短了每次 Spark 任务的提交时间。

在新版本的数栈中,对于临时查询,我们还会根据自定义的规则判断待执行 SQL 的复杂度,将复杂度不高的 SQL 发送到引擎端启动的 SparkSQLEngine 运行,以加快运行速度。这个内部的 SparkSQLEngine 在以前仅仅用于语法校验,现在也承担了一部分 SQL 执行的功能,并且 SparkSQLEngine 还可以根据运行的整体情况,动态扩缩资源,实现资源的有效利用。

语法校验

在较老的数栈版本,对于 SQL 进行语法校验,引擎端会先把 SQL 发送到 Spark Thrift Server。这个 Spark Thrift Server 是以 local 模式部署,不仅仅需要用于语法校验,其他平台上所有元数据的获取都是通过发送 SQL 到这个 Spark Thrift Server 执行来获取。这种方式弊端较大,为此我们做了一些优化。在 Engine 端以 local 模式启动了一个 Spark 任务,在进行语法校验的时候不再将 SQL 发送到 Spark Thrift Server,而是内部维护了一个 SparkSession,直接对 SQL 进行语法校验。

这种方式虽然可以不需要再跟外部的 Spark Thrift server 强关联,但是会给调度组件带来一定的压力,在实现的过程中 Engine-Plugins 的整体复杂度也增大了不少。

为了优化以上问题,我们做了更进一步的优化,调度组件在启动的时候,提交了一个 Spark 任务 SparkSQLEngine 到 Yarn 上。可以理解为是一个远程的运行在 Yarn 上的 Spark Thrift Server,引擎端时刻监控这个 SparkSQLEngine 的健康状态。这样,每次执行语法校验的时候,引擎端将 SQL 通过 JDBC 的形式发送给 SparkSQLEngine 进行语法校验。

通过上述的优化,使得离线开发平台与 Spark Thrift Server 解耦合,EasyManager 不需要额外部署 Spark Thrift Server,使部署更轻量化。调度侧也不用维护一个 local 模式的 Spark 常驻进程。也为离线开发平台上 Spark SQL 任务交互式查询增强做铺垫。

离线开发平台与 EasyManager 部署的 Spark Thrift Server 解耦合后会有以下好处:

· 能够真正意义上的实现 Spark 多集群多版本共存

· EasyManager 标准部署可以去除 Spark Thrift Server,为一线运维减负

· Spark SQL 语法校验变得更轻量,不用缓存 SparkContext,减少 Engine 的资源占用

Spark 功能优化

随着业务的发展深入,我们发现开源的 Spark 在一些场景并没有对应的功能实现。因此我们在开源 Spark 的基础上开发了更多新的插件,以支持数栈更多的功能应用。

任务诊断

首先,我们对 Spark 的 metric sink 做了增强。Spark 内部提供了各种 Sink,除了 ConsoleSink 之外,还有 CSVSink、JmxSink、MetricsServlet、GraphiteSink、Slf4jSink、StatsdSink 等。在 Spark3.0 之后还新增了 PrometheusServlet,但这些还不能满足我们的需求。

在开发任务诊断功能的时候,我们需要通过把 Spark 内部的指标统一推送到 PushGateway,由 Prometheus Server 周期性的从 PushGateway 中拉取指标,最后通过调用 Prometheus 提供的查询接口可以近实时地查询到 Spark 内部的指标。

file

但是 Spark 并没有实现将内部指标 sink 到 PushGateway。因此我们新增了 spark-prometheus-sink 插件,并且自定义了 PrometheusPushGatewaySink 用于将 Spark 内部的指标 push 到 PushGateway。

file

除此之外,我们还自定义了一个新的指标用来描述 Spark SQL 临时查询展示任务执行进度。具体步骤如下:

· 通过自定义 JobProgressSource 来新增用于描述离线任务进度的指标,将该指标注册到 Spark 内部管理系统中的指标管理系统中

· 自定义 JobProgressListener,并将 JobProgressListener 注册到 Spark 内部管理系统中的 ListenerBus。其中,JobProgressListener 的 onJobStart 方法的逻辑是计算当前 Job 下所有的 Task 数量;onTaskEnd 方法的逻辑是在每个 Task 完成后计算并更新当前离线任务进度;onJobEnd 方法的逻辑是在每个 Job 完成后计算并更新当前离线任务进度

对接商业版 Hadoop 集群

随着袋鼠云客户越来越多,客户的环境也是各不相同。有的客户使用的是开源版本的 Hadoop 集群,也有相当一部分客户使用的是 HDP、CDH、CDP、TDH 等。我们在对接这些客户的集群的时候,开发侧往往需要进行新的适配,运维侧每次部署升级的时候也需要配置额外的参数或者有其他额外的操作。

以 HDP 为例,在对接 HDP 的时候,我们使用的 Spark 是 HDP 自带的 Spark2.3,并且我们还需要在运维侧新增一些参数,并将 HDP 自带的 Spark 的所有 Jar包 移动到指定目录。这些操作其实会给运维带来一定的困惑和麻烦,不同类型的集群,运维需要维护不同的运维文档,部署的过程也比较容易出错。并且我们其实对 Spark 的源码做了功能增强和 bug 的修复,如果使用的是 HDP 自带的 Spark,那么就享受不到我们内部维护的 Spark 带来的所有好处。

为了解决上面这些问题,我们内部的 Spark 对现有市场上已有的、常见的发行商都做了适配。换句话来说,我们内部的 Spark 可以在所有不同的 Hadoop 集群上运行。这样,无论对接哪一种类型的 Hadoop 集群,运维只需要部署同一个 Spark 即可,这大大减轻了运维部署的压力。更重要的是,客户可以直接使用我们内部的 Spark 稳定版本,享受到更多的新特性和更大的性能提升。

Spark3.2 新特性-AQE

较老的数栈版本中,默认的 Spark 版本是 2.1.3,后来我们将 Spark 的版本升级到 2.4.8,从数栈6.0开始,Spark3.2 也可以使用了。这里着重介绍一下 AQE,这也是 Spark3.x 中最重要的新特性。

AQE 概述

Spark3.2 之前,AQE 默认是关闭的,需要通过将 spark.sql.adaptive.enabled 设置为 true,才能开启 AQE。Spark3.2 之后,AQE 默认是开启的,任务在运行过程中只要满足 AQE 的触发条件,即可享受 AQE 带来的优化。

需要注意的是,AQE 的优化只会发生在 shuffle 阶段,如果 SQL 在运行过程中并没有涉及到 shuffle 操作,那么即使 spark.sql.adaptive.enabled 的值为 true,AQE 也不会发挥作用。更准确来说,只有物理执行计划包含 exchange 节点或者包含子查询,AQE 才会生效。

AQE 在运行期间,会收集 shuffle map 阶段所生成的中间文件的信息,并将这些信息进行统计,结合已有的规则动态的调整尚未执行的 Optimized Logical Plan 和 Spark Plan,从而对原来的 SQL 语句进行运行时优化。

file

从 Spark 源码来看,AQE 涉及到以下4个优化规则:

file

我们知道,RBO 是根据一系列的规则(rule)来对 SQL 进行优化,包括谓词下推、列剪枝、常量替换等。这些静态规则本身已经内置在 Spark 中,Spark 在执行 SQL 的过程中,这些 rule 会一一作用到 SQL 中。

AQE 的优势

CBO 这个特性是 Spark2.2 之后才有的,相比于 RBO,CBO 会结合表的统计信息,并根据这些统计信息和代价模型(Cost Model)选择出较为优化的执行计划。

但是,CBO 仅仅支持注册到 Hive Metastore 的表。对于存储在分布式文件系统的 parquet、orc 等文件,CBO 是不支持的。并且,如果 Hive 表缺少元数据信息,CBO 收集统计信息的时候就会收集不到,这可能会导致 CBO 失效。

CBO 的另外一个劣势在于 CBO 在优化之前需要先执行 ANALYZE TABLE COMPUTE STATISTICS 来收集统计信息。该语句在执行过程中如果碰到大表则会较为耗时,收集效率较低。

无论是 CBO 还是 RBO,它们都属于静态优化。在物理执行计划提交后,如果任务在运行过程中,数据量、数据分布情况发生变化,CBO 也不会对已有的物理执行计划进行优化。

与 CBO、RBO 不同的是,AQE 在运行过程中,会对 shuffle map 过程中所产生的中间文件进行分析,动态的调整并优化尚未开始执行的逻辑执行计划和物理执行计划,相对静态优化的 CBO 和 RBO 而言,AQE 的处理能得到更加优化的物理执行计划。

AQE 三大特性

● 自动分区合并

Shuffle 过程分为 Map 阶段和 Reduce 两个阶段,Reduce 阶段会将 Map 阶段产生的中间临时文件拉取到对应的 Executor 下,如果 Map 阶段所处理的数据分布非常不均匀,有很多 key 其实仅仅只有几条数据,数据经过处理后可能会形成比较多的小文件。

为了避免上述情况,可以开启 AQE 的自动分区合并功能,可以避免启动过多的 reduce task 去拉取 Map 阶段生成的小文件。

file

● 自动数据倾斜处理

应用场景主要在 Data Joins 中,当发生数据倾斜,AQE 能够自动检测到倾斜分区,并对倾斜分区按照一定的规则进行拆分。目前,在 Spark3.2 中,对 SortMergeJoin 和 ShuffleHashJoin 都支持自动数据倾斜处理。

● Join 策略调整

AQE 会动态的将 Hash Join、Sort Merge Join,降级调整为 Broadcast Join。

我们知道,Spark 任务一旦开始执行,并行度就已经确定。比如说,shuffle map 阶段,并行度为分区的个数;shuffle reduce 阶段并行度则为 spark.sql.shuffle.partitions 的值,默认为200。如果 Spark 任务在运行的过程中数据量变小导致大部分的分区的大小变小,这时如果仍然启动那么多的线程去处理小的数据集就会导致资源的浪费。

而 AQE 在执行过程会根据 shuffle 后生成的中间临时结果,在一定条件下,通过应用 CoalesceShufflePartitions 规则,结合用户提供的参数自动合并分区,其实就是调整 reducer 的数量。原来一个 reduce 线程只会拉取一个处理后的分区的数据,现在一个 reduce 线程会根据实际情况拉取更多的分区的数据,这样就能减少资源的浪费,提高任务执行效率。 《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057?src=szsm

《数栈产品白皮书》下载地址:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想了解或咨询更多有关大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szcsdn

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1634324.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决React报错Encountered two children with the same key

当我们从map()方法返回的两个或两个以上的元素具有相同的key属性时,会产生"Encountered two children with the same key"错误。为了解决该错误,为每个元素的key属性提供独一无二的值,或者使用索引参数。 这里有个例子来展示错误是…

学习【Mysql运维篇】这一篇就够了

运维篇 1. 日志1-1. 错误日志1-2. 二进制日志1-3. 查询日志1-4. 慢查询日志 2. 主从复制2-1. 概述2-2. 原理2-3. 搭建 3. 分库分表3-1. 介绍3-2. Mycat概述3-3. Mycat入门3-4. Mycat配置3-5. Mycat分片3-6. Mycat管理及监控 4. 读写分类 1. 日志 1-1. 错误日志 错误日志是MyS…

【hackmyvm】vivifytech靶机

渗透思路 信息收集端口扫描端口服务信息目录扫描爆破hydra--sshgit提权 信息收集 ┌──(kali㉿kali)-[~] └─$ fping -ag 192.168.9.0/24 2>/dev/null 192.168.9.119 --主机 192.168.9.164 --靶机个人习惯,也方便后续操作,将IP地址赋值给一个变…

IDEA 创建Servlet-HelloWorldServlet

servlet 1.创建空项目2.配置web项目3.配置Tomcat4.加载Tomcat包5.创建HelloWorldServlet类6.配置web.xml7.运行get与post请求 1.创建空项目 2.配置web项目 3.配置Tomcat 4.加载Tomcat包 5.创建HelloWorldServlet类 public class controller extends HttpServlet {Override//get…

【Hadoop】MapReduce (五)

MapReduce 入门案例练习 统计文件中每一个单词出现的次数(文件:words.txt)对IP去重(文件:ip.txt) 组件 序列化 - Writable 统计每一个人花费的上行流量、下行流量以及总流量(文件:flow.txt) 在MapReduce中,各个节点之间基本上…

【面试经典 150 | 图的广度优先搜索】最小基因变化

文章目录 写在前面Tag题目来源解题思路方法一:广搜 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法,两到三天更新一篇文章,欢迎催更…… 专栏内容以分析题目为主,并附带一些对于本题涉及到的数据结构等内容进行回顾…

Java使用SpringBoot和EasyExcel 实现动态数据导出实战

Java使用SpringBoot和EasyExcel 实现动态数据导出实战 1、前言2、【资源地址】3、代码示例(demo)4、目前Java实现数据导出为Excel方式5、依赖6、总结 1、前言 工作中有用到将数据导出为Excel的场景,在此记录下。在日常开发中,Excel文件处理是一项常见的…

部署(Deployment)

Today you’ll be designing your own machine learning project, creating your own dataset, training a model using your data, and finally deploying an application on the web. We’ll be using a particular deployment target called Hugging Face Space with Gradio…

今日早报 每日精选15条新闻简报 每天一分钟 知晓天下事 4月30日,星期二

每天一分钟,知晓天下事! 2024年4月30日 星期二 农历三月廿二 1、 气象台:五一假日前期全国大部地区晴好为主,假日后期中东部地区需警惕大范围强降雨。 2、 交通运输部:5月1日0时至5日24时,收费公路小客车免…

通过AI助手实现一个nas定时任务更新阿里云域名解析

一.通过AI助手实现一个ip-domain.py的脚本 起一个Python脚本,ip-domain.py;注意已安装Python3.的运行环境;将下面阿里云相关配置添加,注意这里引用了两个包,requests和alibabacloud_alidns20150109;执行前…

【优质书籍推荐】Vue.js 3.x+Element Plus从入门到精通

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。…

袁庭新ES系列17节|Spring Data Elasticsearch基础

前言 为了简化对Elasticsearch的操作Spring Data提供了Spring Data Elasticsearch。Spring Data Elasticsearch是Spring Data技术对Elasticsearch原生API封装之后的产物,它通过对原生API的封装,使得程序员可以简单的对Elasticsearch进行各种操作。接下来…

华为 huawei 交换机 配置 MUX VLAN 示例(汇聚层设备)

组网需求 在企业网络中,企业所有员工都可以访问企业的服务器。但对于企业来说,希望企业内部部分员工之间可以互相交流,而部分员工之间是隔离的,不能够互相访问。 如 图 6-4 所示, Switch1 位于网络的汇聚层&#xff0…

STM32F103学习笔记 | 4.STM32F103芯片介绍

STM32F1入门学习将使用STM32F103C8T6开发板最小系统板。小R为什么选择它来入门呢?咳咳~首先,ST官方提供强大且易用的标准库函数,使得开发过程方便快捷;其次,网上的教程资料多也十分详细。所以呢,它对高校学…

SQLite的扩展函数Carray()表值函数(三十八)

返回:SQLite—系列文章目录 上一篇:SQLite如何处理CSV 虚拟表 下一篇:SQLite—系列文章目录 ​ 1. 概述 Carray()是一个具有单列的表值函数(名为 “value”)和零行或多行。 carray() 中每一行的“值”取自 C 语言数组 由应用程序通过参数绑定提…

【yolov8目标检测部署】TensorRT int8量化

原作者github:https://github.com/xuanandsix/Tensorrt-int8-quantization-pipline/tree/main 改进: 源代码支持的TensorRT版本为7.许多属性已经弃用; 在原有的代码上将支持的TensorRT版本从7改到8. !!不知道如何安装T…

工具链工具——映射与调度、模拟与验证、开发与测试工具

本篇文章将重点介绍工具链的工具相关知识,我们将从工具链的基本概念出发,重点介绍工具链中的映射和调度工具、模拟与验证工具、开发和测试工具,最后提出对工具链发展的展望,从而对工具链的工具进行一个较为系统的讲解。 工具链的…

万兴PDF专家 PDFelement Pro v10.3.8 破姐版!

🧑‍💻万兴PDF专家 PDFelement Pro v10.3.8 破姐版 (https://docs.qq.com/sheet/DRVVxTHJ3RXJFVHVr)

WebSocket 全面解析

🌟 引言 WebSocket,一个让实时通信变得轻而易举的神器,它打破了传统HTTP协议的限制,实现了浏览器与服务器间的全双工通信。想象一下,即时消息、在线游戏、实时股票报价…这一切都离不开WebSocket的魔力💫。…

利用word2vec包将中文转变为词向量

代码展示&#xff1a; import jieba import re import json import logging import sys import gensim.models as word2vec from gensim.models.word2vec import LineSentence, loggerpattern u[\\s\\d,.<>/?:;\\"[\\]{}()\\|~!\t"#$%^&*\\-_a-zA-Z&…