Spark(30):Spark性能调优之常规性能调优

news2024/11/22 23:03:03

目录

0. 相关文章链接

1. 最优资源配置

2. RDD优化

2.1. RDD复用

2.2. RDD持久化

2.3. RDD尽可能早的 filter 操作

3. 并行度调节

4. 广播大变量

5. Kryo序列化

6. 调节本地化等待时长


0. 相关文章链接

 Spark文章汇总 

1. 最优资源配置

        Spark 性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。 

资源的分配在使用脚本提交 Spark 任务时进行指定,标准的 Spark 任务提交脚本如下所示: 

bin/spark-submit \ 
--class com.spark.WordCount \ 
--master yarn 
--deploy-mode cluster 
--num-executors 80 \ 
--driver-memory 6g \ 
--executor-memory 6g \ 
--executor-cores 3 \ 
/usr/opt/modules/spark/jar/spark.jar \ 

可以进行分配的资源如表所示: 

名称 说明 
--num-executors 配置 Executor 的数量 
--driver-memory 配置 Driver 内存(影响不大) 
--executor-memory 配置每个 Executor 的内存大小 
--executor-cores 配置每个 Executor CPU core 数量 

调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。

对于具体资源的分配,我们分别讨论 Spark 的两种 Cluster 运行模式: 

  • 第一种是 Spark Standalone 模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写 submit 脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有 15 台机器,每台机器为 8G 内存,2 个 CPU core,那么就指定 15 个 Executor,每个 Executor 分配 8G 内存,2 个 CPU core。 
  • 第二种是 Spark Yarn 模式,由于 Yarn 使用资源队列进行资源的分配和调度,在编写 submit 脚本的时候,就根据 Spark 作业要提交到的资源队列,进行资源的分配,比如资源队列有 400G 内存,100 个 CPU core,那么指定 50 个 Executor,每个 Executor 分配8G 内存,2 个 CPU core。 

对各项资源进行了调节后,得到的性能提升会有如下表现: 

名称 解析 
增加 Executor·个数  在资源允许的情况下,增加 Executor 的个数可以提高执行 task 的并行度。比如有 4 Executor,每个 Executor 2 CPU core,那么可以并行执行 8 task,如果将 Executor 的个数增加到 8 个(资源允许的情况下),那么可以并行执行 16 task,此时的并行能力提升了一倍。 
 增加每个 Executor 的 CPU core 个数 在资源允许的情况下,增加每个 Executor 的Cpu core 个数,可以提高执行 task 的并行度。比如有 4 个 Executor,每个 Executor 有 2 个 CPU core,那么可以并行执行 8 个 task,如果将每个 Executor 的 CPU core 个数增加到 4 个(资源允许的情况下),那么可以并行执行 16 个 task,此时的并行能力提升了一倍。 
增加每个 Executor 的内存量 在资源允许的情况下,增加每个 Executor 的内存量以后,对性能的提升有三点:
1. 可以缓存更多的数据(即对 RDD 进行 cache),写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘 IO;
2. 可以为 shuffle 操作提供更多内存,即有更多空间来存放 reduce 端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘 IO; 
3. 可以为 task 的执行提供更多内存,在 task 的执行过程中可能创建很多对象,内存较小时会引发频繁的 GC,增加内存后,可以避免频繁的 GC,提升整体性能。 

生产环境 Spark submit 脚本配置 :

bin/spark-submit \
--class com.spark.WordCount \
--master yarn\
--deploy-mode cluster\
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar

参数配置参考值:

--num-executors: 50~100
--driver-memory: 1G~5G
--executor-memory: 6G~10G
--executor-cores: 3
--master:实际生产环境一定使用 yarn

2. RDD优化

2.1. RDD复用

在对 RDD 进行算子时,要避免相同的算子和计算逻辑之下对 RDD 进行重复的计算

对上图中的 RDD 计算架构进行修改,得到如下图所示的优化结果:

2.2. RDD持久化

        在 Spark 中,当多次对同一个 RDD 执行算子操作时,每一次都会对这个 RDD 以之前的父 RDD 重新计算一次,这种情况是必须要避免的,对同一个 RDD 的重复计算是对资源的极大浪费,因此,必须对多次使用的 RDD 进行持久化,通过持久化将公共 RDD 的数据缓存到内存/磁盘中,之后对于公共 RDD 的计算都会从内存/磁盘中直接获取 RDD 数据。 

对于 RDD 的持久化,有两点需要说明: 

  • RDD 的持久化是可以进行序列化的,当内存无法将 RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。 
  • 如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对 RDD 数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。 

2.3. RDD尽可能早的 filter 操作

获取到初始 RDD 后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升 Spark 作业的运行效率。 

3. 并行度调节

        Spark 作业中的并行度指各个stage的task的数量。 如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20 个 Executor,每个 Executor 分配 3 个 CPU core,而 Spark 作业有 40 个 task,这样每个 Executor 分配到的 task 个数是 2 个,这就使得每个 Executor 有一个 CPU core 空闲,导致资源的浪费。 理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark 作业的性能和运行速度。 

        Spark 官方推荐,task 数量应该设置为 Spark 作业总 CPU core 数量的 2~3 倍。之所以没有推荐 task 数量与 CPU core 总数相等,是因为 task 的执行时间不同,有的 task 执行速度快而有的 task 执行速度慢,如果 task 数量与 CPU core 总数相等,那么执行快的 task 执行完成后,会出现 CPU core 空闲的情况。如果 task 数量设置为 CPU core 总数的 2~3 倍,那么一个 task 执行完毕后,CPU core 会立刻执行下一个 task,降低了资源的浪费,同时提升了 Spark 作业运行的效率。 

Spark 作业并行度的设置如下所示: 

val conf = new SparkConf()
conf.set("spark.default.parallelism", "500")

4. 广播大变量

        默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。 

        假设当前任务配置了 20 个 Executor,指定 500 个 task,有一个 20M 的变量被所有 task 共用,此时会在 500 个 task 中产生 500 个副本,耗费集群 10G 的内存,如果使用了广播变量, 那么每个 Executor 保存一个副本,一共消耗 400M 内存,内存消耗减少了 5 倍。广播变量在每个 Executor 保存一个副本,此 Executor 的所有 task 共用此广播变量,这让变量产生的副本数量大大减少。 

        在初始阶段,广播变量只在 Driver 中有一份副本。task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中尝试获取变量,如果本地没有,BlockManager 就会从 Driver 或者其他节点的 BlockManager 上远程拉取变量的复本,并由本地的 BlockManager 进行管理;之后此 Executor 的所有 task 都会直接从本地的BlockManager 中获取变量。 

5. Kryo序列化

        默认情况下,Spark 使用 Java 的序列化机制。Java 的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现 Serializable 接口即可,但是,Java 序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。 

        Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式了。 

public class MyKryoRegistrator implements KryoRegistrator { 
  @Override 
  public void registerClasses(Kryo kryo) {     
    kryo.register(StartupReportLogs.class); 
  } 
} 

配置 Kryo 序列化方式的实例代码:

//创建 SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化库,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在 Kryo 序列化库中注册自定义的类集合,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");

6. 调节本地化等待时长

        Spark 作业运行过程中,Driver 会对每一个 stage 的 task 进行分配。根据 Spark 的 task 分配算法,Spark 希望 task 能够运行在它要计算的数据算在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task 可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark 会等待一段时间,默认 3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将 task 分配到比较差的本地化级别所对应的节点上,比如将 task 分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。 

        当 task 要处理的数据不在 task 所在节点上时,会发生数据的传输。task 会通过所在节点的BlockManager 获取数据,BlockManager 发现数据不在本地时,户通过网络传输组件从数据所在节点的 BlockManager 处获取数据。 

        网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分 task,那么当前的 task 将有机会得到执行,这样就能够改善 Spark 作业的整体性能。 

Spark 的本地化等级如表所示: 

名称 解析 
PROCESS_LOCAL 进程本地化,task 和数据在同一个 Executor 中,性能最好。 
NODE_LOCAL 节点本地化,task和数据在同一个节点中,但是task 和数据不在同一个 Executor 中,数据需要在进程间进行传输。 
RACK_LOCAL 机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。 
NO_PREF 对于 task 来说,从哪里获取都一样,没有好坏之分。 
ANY task 和数据可以在集群的任何地方,而且不在一个机架中,性能最差。 

        在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 task 数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看 task 的本地化级别有没有提升,并观察 Spark 作业的运行时间有没有缩短。 注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark 作业的运行时间反而增加了。 

Spark 本地化等待时长的设置如代码所示: 

val conf = new SparkConf() 
conf.set("spark.locality.wait", "6") 

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/766702.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

9.Ceph部署

文章目录 Ceph部署前期环境准备实验部署软件安装部署Ceph集群部署mon节点部署OSD存储节点部署mgr节点开启监控模块管理pool Ceph部署 前期环境准备 主机名public网络cluster网络角色admin192.168.242.69admin(管理节点)node01192.168.242.66192.168.242.100.11mon、mgr、osdn…

【Elemnt-UI——el-popover点击出现多个弹框】

效果图 解决 :append-to-body"false"添加这个属性就可以了 <el-popoverv-model"item.contextmenuVisible"placement"bottom-end":append-to-body"false"trigger"click":visible-arrow"false"hide"item.…

[PCIE体系结构导读]PCIE总结(一)

什么是PCIE PCIe Peripheral Component Interconnect express 快速外部组件互联 高速串行计算机扩展总线标准 处理器系统的局部总线 连接外部设备 高速、低时延支持热插拔可靠扩展性好复杂度高点对点串行连接 附一个博主写的总结文章&#xff0c;非常好 《PCI EXPRESS体系结…

如何在服务器下载coco数据集

如果是需要在服务器上进行跑实验&#xff0c;可以直接通过wget下载链接 那么如何确定下载地址呢&#xff1f; 例如&#xff1a;先找到coco的数据集地址http://cocodataset.org 然后可以看到 可以下载一个 2017 Train/Val annotations[241MB]&#xff0c;在下载内容中可以看到…

跨域是怎么个事?

跨域是怎么个事&#xff1f; 【一】到底什么是跨域&#xff1f;【二】什么是浏览器的同源策略&#xff1f;【三】跨域问题有哪些解决方案&#xff1f;【四】详细说说Nginx解决跨域问题的实现过程&#xff1f;&#xff08;1&#xff09;项目准备 【一】到底什么是跨域&#xff1…

宋浩线性代数笔记(一)行列式的计算

本帖更新b站宋浩老师的线代网课笔记&#xff0c;内容较为细致详细&#xff0c;参考书用的是科学出版社的第三版&#xff0c;之后会附加同济出版社第六版的教材内容。 &#xff08;字不好看大家将就看吧QAQ&#xff09;

【论文阅读】一些多轮对话文章的体会 ACL 2023

前言 本文是对昨天看到的ACL 2023三篇多轮对话文章的分享这三个工作都是根据一些额外属性控制输出的工作&#xff0c;且评估的方面比较相似&#xff0c;可以借鉴 方法 这几篇文章都不是做general任务的&#xff0c;倾向于通过一些额外信息&#xff0c;来做specific任务 【1】…

当低代码和无代码平台可以加速应用程序现代化时

你的组织很可能正在寻求将遗留应用程序现代化、将单片应用架构拆分为服务&#xff0c;并迁移到公共或私有云基础架构。在此过程中&#xff0c;您可能还希望改善用户体验&#xff0c;创建CI/CD流水线&#xff0c;添加测试自动化&#xff0c;并实施一系列其他DevOps最佳实践。 这…

水声功率放大器的作用是什么

水声功率放大器是一种专门用于水声设备的高功率电子设备&#xff0c;主要用于提升水下信号的传输距离和保证语音清晰度。它的作用在水下通信、水下测量、海洋科学等领域都非常重要。 其主要作用有以下几个方面&#xff1a; 增强信号传输距离 水声信号在水中传播会受到各种因素的…

维安股份冲刺A股上市:计划募资约15亿元,上海科学院为实控人

7月17日&#xff0c;上海证券交易所对上海维安电子股份有限公司&#xff08;下称“维安股份”&#xff09;发出问询函。据贝多财经了解&#xff0c;维安股份于2023年6月20日递交招股书&#xff0c;准备在上海证券交易所主板上市。 本次冲刺上市&#xff0c;维安股份计划募资15.…

解决一云多芯全球命题,浪潮云海给出了解题思路

云计算&#xff0c;生而为用户简化资源的使用&#xff0c;让用户不必关注复杂的底层硬件架构&#xff0c;而是通过“服务”的方式调用资源&#xff0c;专注于自己的业务创新。 因此&#xff0c;用户要上云&#xff0c;就天然地要求云平台必须能够屏蔽底层的硬件架构&#xff0…

轻量级应用服务器开放端口

关于使用浏览器连接自己所写的TCP进程时&#xff0c;由于没有开放端口&#xff0c;而且搜索到对应的操作来进行开放端口&#xff0c;所以在完成开放端口后特意做个笔记&#xff0c;防止忘记。 登录自己所使用的服务器的网站找到控制台 找到轻量级应用服务器 找到所需要开放端口…

建造者模式-复杂对象的组装与创建

生产一辆车&#xff0c;主要有以下步骤&#xff1a;安装骨架、安装发动机及安装轮胎。这些步骤有指定的执行顺序&#xff0c;步骤缺一不可。 图 传统方案 传统方案存在的问题&#xff1a; 传参不便&#xff0c;虽可在构造函数那传参&#xff0c;但是传参时需要注意参数顺序等…

After Effects CPU 和 RAM 使用率高,如何修复?

如果您发现 Adob​​e After Effects 的 CPU 和 RAM 使用率较高&#xff0c;可以按照以下方法解决该问题。 1]确保您的系统满足最低系统要求 要在您的 PC 上运行 Adob​​e After Effects&#xff0c;您的 PC 需要满足最低系统要求。只有这样&#xff0c;该程序才会停止消耗更…

WEB:Confusion1

背景知识 SSTI漏洞 题目 根据网站图片和题目描述的提示&#xff0c;大象是php&#xff0c;蟒蛇是python&#xff0c;说明了这个网站是用python写的 在python中&#xff0c;比较常规的漏洞就是SSTI模板注入 没有思路&#xff0c;先点login和register页面看看 查看源代码 之前…

如何防止DDoS?基于分布式云的 DDoS 解决方案一览

在日益开放和互联的世界中&#xff0c;DDOS&#xff08;分布式拒绝服务&#xff09;攻击和安全漏洞日益频发&#xff0c;企业都应将有效地保护其业务、声誉和数据中心免受不断加剧的DDoS攻击放在战略性位置。如何防止DDoS&#xff1f;看了F5提供的分布式云DDoS解决方案&#xf…

list最常用的遍历五种方式以及使用场景

目录 遍历方式的适用场景对比 迭代器遍历 列表迭代器 增强for遍历 Lambda表达式 lambda表达式简介 普通for遍历 集合中通用的并且常用的六种方法 遍历方式的适用场景对比 迭代器遍历 &#xff1a;在遍历过程中需要删除元素&#xff0c;请使用迭代器 列表迭代器&#xff1…

Java如何实现定时读取json文件里的内容

Java实现定时读取json文件里的内容 项目背景代码实现读取json配置文件定时任务 测试 项目背景 有时候我们会需要定时来读取JSON配置文件里的内容&#xff0c;来执行一些业务逻辑上的操作。 比如&#xff1a;开发一个物流运输系统&#xff0c;系统需要定期读取一个包含货物信息…

6个真正免费的平面设计素材网站

新手设计师没有自己的素材库&#xff0c;找素材费时又费力&#xff0c;找到了还不一定能免费下载或商用&#xff0c;别急&#xff01;我带着我收藏多年的素材库来了&#xff0c;平面、UI、免扣、高清背景图、设计元素、字体等素材全部都有&#xff0c;重点是免费下载&#xff0…

线程池学习(四)任务调度

线程池有几个重要的属性&#xff0c;核心线程数&#xff0c;最大线程数&#xff0c;阻塞任务队列。 一、调度流程 1. 接收新的任务后&#xff0c;先判断核心线程数是否已满&#xff0c;未满则创建新的线程去执行任务 2. 如果核心线程数已满&#xff0c;再判断阻塞任务队列是否…