Spark Job优化

news2024/11/15 21:40:29

1 Map端优化

1.1 Map端聚合

map-side预聚合,就是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。

RDD的话建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

SparkSQL本身的HashAggregte就会实现本地预聚合+全局聚合。

1.2 读取小文件优化

读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的Task元信息也会给Spark Driver的内存造成压力,带来单点问题。

设置参数:

spark.sql.files.maxPartitionBytes=128MB   默认128m

spark.files.openCostInBytes=4194304     默认4m

参数(单位都是bytes):

  • maxPartitionBytes:一个分区最大字节数。
  • openCostInBytes:打开一个文件的开销。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.map.MapSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源码理解: DataSourceScanExec.createNonBucketedReadRDD()

FilePartition. getFilePartitions()

1)切片大小= Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

计算totalBytes的时候,每个文件都要加上一个open开销

defaultParallelism就是RDD的并行度

2)当(文件1大小+ openCostInBytes)+(文件2大小+ openCostInBytes)+…+(文件n-1大小+ openCostInBytes)+ 文件n <= maxPartitionBytes时,n个文件可以读入同一个分区,即满足: N个小文件总大小 (N-1*openCostInBytes <=  maxPartitionBytes的话。

1.3 增大map溢写时输出流buffer

1mapShuffle Write有一个缓冲区初始阈值5m,超过会尝试增加到2*当前使用内存。如果申请不到内存,则进行溢写。这个参数是internal,指定无效(见下方源码)。也就是说资源足够会自动扩容,所以不需要我们去设置。

2)溢写时使用输出流缓冲区默认32k,这些缓冲区减少了磁盘搜索和系统调用次数,适当提高可以提升溢写效率。

3Shuffle文件涉及到序列化,是采取的方式读写,默认按照每批次1万条去读写。设置得太低会导致在序列化时过度复制,因为一些序列化器通过增长和复制的方式来翻倍内部数据结构。这个参数是internal,指定无效(见下方源码)。

综合以上分析,我们可以调整的就是输出缓冲区的大小。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.map.MapFileBufferTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源码理解:

2 Reduce端优化

2.1 合理设置Reduce数

过多的cpu资源出现空转浪费,过少影响任务性能。关于并行度、并发度的相关参数介绍,在2.2.1中已经介绍过。

2.2 输出产生小文件优化

1Join后的结果插入新表

join结果插入新表,生成的文件数等于shuffle并行度,默认就是200份文件插入到hdfs上。

解决方式:

1)可以在插入表数据前进行缩小分区操作来解决小文件过多问题,如coalescerepartition算子

2)调整shuffle并行度。根据2.2.2的原则来设置。

2、动态分区插入数据

1)没有Shuffle的情况下。最差的情况下,每个Task中都有表各个分区的记录,那文件数最终文件数将达到  Task数量 * 表分区数。这种情况下是极易产生小文件的。

INSERT overwrite table A partition ( aa ) 

SELECT * FROM B;

2)有Shuffle的情况下,上面的Task数量 就变成了spark.sql.shuffle.partitions(默认值200)。那么最差情况就会有 spark.sql.shuffle.partitions * 表分区数。

当spark.sql.shuffle.partitions设置过大时,小文件问题就产生了;当spark.sql.shuffle.partitions设置过小时,任务的并行度就下降了,性能随之受到影响。

最理想的情况是根据分区字段进行shuffle,在上面的sql中加上distribute by aa。把同一分区的记录都哈希到同一个分区中去,由一个Spark的Task进行写入,这样的话只会产生N个文件, 但是这种情况下也容易出现数据倾斜的问题。

解决思路:

结合第4章解决倾斜的思路,在确定哪个分区键倾斜的情况下,将倾斜的分区键单独拎出来:

将入库的SQL拆成(where 分区 !倾斜分区键 )和 (where 分区 = 倾斜分区键) 几个部分,非倾斜分区键的部分正常distribute by 分区字段,倾斜分区键的部分 distribute by随机数,sql如下:

//1.非倾斜键部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa != 大key

distribute by aa;

//2.倾斜键部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa = 大key

distribute by cast(rand() * 5 as int);

案例实操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.DynamicPartitionSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2.3 增大reduce缓冲区,减少拉取次数

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB。

源码:BlockStoreShuffleReader.read()

2.4 调节reduce端拉取数据重试次数

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3:

2.5 调节reduce端拉取数据等待间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。

reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s。

综合2.32.42.5,案例实操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.ReduceShuffleTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2.6 合理利用bypass

当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200)且不需要map端进行合并操作,则shuffle write过程中不会进行排序操作,使用BypassMergeSortShuffleWriter去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

当你使用SortShuffleManager时,如果确实不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

源码分析:SortShuffleManager.registerShuffle()

SortShuffleManager.getWriter()

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.BypassTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3 整体优化

3.1 调节数据本地化等待时长

在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 Task 数据本地化的级别,如果大部分都是 PROCESS_LOCAL、NODE_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 RACK_LOCAL、ANY,那么需要对本地化的等待时长进行调节,应该是反复调节,每次调节完以后,再来运行观察日志,看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短。

注意过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得 Spark 作业的运行时间反而增加了。

下面几个参数,默认都是3s,可以改成如下:

spark.locality.wait //建议6s、10s

spark.locality.wait.process //建议60s

spark.locality.wait.node //建议30s

spark.locality.wait.rack //建议20s

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.job.LocalityWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3.2 使用堆外内存

1、堆外内存参数

讲到堆外内存,就必须去提一个东西,那就是去yarn申请资源的单位,容器。Spark  on yarn模式,一个容器到底申请多少内存资源。

一个容器最多可以申请多大资源,是由yarn参数yarn.scheduler.maximum-allocation-mb决定, 需要满足:

spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size

≤ yarn.scheduler.maximum-allocation-mb

参数解释:

  • spark.executor.memory:提交任务时指定的堆内内存。
  • spark.executor.memoryOverhead:堆外内存参数,内存额外开销。

默认开启,默认值为spark.executor.memory*0.1并且会与最小值384mb做对比,取最大值。所以spark on yarn任务堆内内存申请1个g,而实际去yarn申请的内存大于1个g的原因。

  • spark.memory.offHeap.size:堆外内存参数,spark中默认关闭,需要将spark.memory.enable.offheap.enable参数设置为true。

注意:很多网上资料说spark.executor.memoryOverhead包含spark.memory.offHeap.size,这是由版本区别的,仅限于spark3.0之前的版本。3.0之后就发生改变,实际去yarn申请的内存资源由三个参数相加。

测试申请容器上限:

yarn.scheduler.maximum-allocation-mb修改为7G,将三个参数设为如下大于7G,会报错:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=2g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

spark.memory.offHeap.size修改为1g后再次提交:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2、使用堆外缓存

使用堆外内存可以减轻垃圾回收的工作,也加快了复制的速度。

当需要缓存非常大的数据量时,虚拟机将承受非常大的GC压力,因为虚拟机必须检查每个对象是否可以收集并必须访问所有内存页。本地缓存是最快的,但会给虚拟机带来GC压力,所以,当你需要处理非常多GB的数据量时可以考虑使用堆外内存来进行优化,因为这不会给Java垃圾收集器带来任何压力。让JAVA GC为应用程序完成工作,缓存操作交给堆外。

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.job.OFFHeapCache spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3.3 调节连接等待时长

在Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。

如果task在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。

在生产环境下,有时会遇到file not found、file lost这类错误,在这种情况下,很有可能是Executor的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长120s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃。这种情况也可能会导致DAGScheduler反复提交几次stage,TaskScheduler反复提交几次task,大大延长了我们的Spark作业的运行时间。

为了避免长时间暂停(如GC)导致的超时,可以考虑调节连接的超时时长,连接等待时长需要在spark-submit脚本中进行设置,设置方式可以在提交时指定:

--conf spark.core.connection.ack.wait.timeout=300s

调节连接等待时长后,通常可以避免部分的XX文件拉取失败、XX文件lost等报错。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 1g --conf spark.core.connection.ack.wait.timeout=300s --class com.atguigu.sparktuning.job.AckWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1201339.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Day28力扣打卡

打卡记录 给小朋友们分糖果 II&#xff08;容斥原理&#xff09; 链接 大佬的题解 def c2(n: int) -> int:return n * (n - 1) // 2 if n > 1 else 0class Solution:def distributeCandies(self, n: int, limit: int) -> int:return c2(n 2) - 3 * c2(n - limit …

Codeforces Round 788 (Div. 2) E. Hemose on the Tree(树上构造)

题目 t(t<5e4)组样例&#xff0c;每次给定一个数p&#xff0c; 表示一棵节点数为的树&#xff0c; 以下n-1条边&#xff0c;读入树边 对于n个点和n-1条边&#xff0c;每个点需要赋权&#xff0c;每条边需要赋权&#xff0c; 权值需要恰好构成[1,2n-1]的排列 并且当你赋…

基于springboot实现沁园健身房预约管理系统【项目源码】计算机毕业设计

基于springboot实现沁园健身房预约管理系统演示 B/S架构 B/S结构是目前使用最多的结构模式&#xff0c;它可以使得系统的开发更加的简单&#xff0c;好操作&#xff0c;而且还可以对其进行维护。使用该结构时只需要在计算机中安装数据库&#xff0c;和一些很常用的浏览器就可以…

Jenkins简介及Docker Compose部署

Jenkins是一个开源的自动化服务器&#xff0c;用于自动化构建、测试和部署软件项目。它提供了丰富的插件生态系统&#xff0c;支持各种编程语言和工具&#xff0c;使得软件开发流程更加高效和可靠。在本文中&#xff0c;我们将介绍Jenkins的基本概念&#xff0c;并展示如何使用…

[.NET]启明星电子文档管理系统edoc v33.0

启明星电子文档库是一个简单、实用的企业文档在线存储工具。系统采用ASP.NETMSSQL2008 Express开发&#xff0c;所有文档数据都以二进制方式存储在数据库里方便备份。 系统的特点包括&#xff1a; &#xff08;1&#xff09;支持文档在线预览&#xff0c;可以在线预览word&…

pta 装箱问题 Python3

假设有N项物品&#xff0c;大小分别为s1​、s2​、…、si​、…、sN​&#xff0c;其中si​为满足1≤si​≤100的整数。要把这些物品装入到容量为100的一批箱子&#xff08;序号1-N&#xff09;中。装箱方法是&#xff1a;对每项物品, 顺序扫描箱子&#xff0c;把该物品放入足以…

mini-vue 的设计

mini-vue 的设计 mini-vue 使用流程与结果预览&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><meta name&qu…

LLM 面试总结

溜一遍 MLStack.Cafe - Kill Your Next Machine Learning & Data Science Interview https://www.llmforce.com/llm-interview-questions MLStack.Cafe - Kill Your Next Machine Learning & Data Science Interview An interview with a language model, ChatGPT - W…

阿里云国际站:专有宿主机

文章目录 一、专有宿主机的概念 二、专有宿主机的优势 三、专有宿主机的应用场景 一、专有宿主机的概念 专有宿主机&#xff08;Dedicated Host&#xff0c;简称DDH&#xff09;是阿里云专为企业用户定制优化的解决方案。具有物理资源独享、部署更灵活、配置更丰富、性价比…

python实现一个简单的桌面倒计时小程序

本章内容主要是利用python制作一个简单的桌面倒计时程序&#xff0c;包含开始、重置 、设置功能。 目录 一、效果演示 二、程序代码 一、效果演示 二、程序代码 #!/usr/bin/python # -*- coding: UTF-8 -*- """ author: Roc-xb """import tkin…

分类预测 | Matlab实现PSO-BiLSTM粒子群算法优化双向长短期记忆神经网络的数据多输入分类预测

分类预测 | Matlab实现PSO-BiLSTM粒子群算法优化双向长短期记忆神经网络的数据多输入分类预测 目录 分类预测 | Matlab实现PSO-BiLSTM粒子群算法优化双向长短期记忆神经网络的数据多输入分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现PSO-BiLSTM粒子…

sqli-labs关卡13(基于post提交的单引号加括号的报错盲注)通关思路

文章目录 前言一、回顾第十二关知识点二、靶场第十三关通关思路1、判断注入点2、爆显位3、爆数据库名4、爆数据库表5、爆数据库列6、爆数据库关键信息 总结 前言 此文章只用于学习和反思巩固sql注入知识&#xff0c;禁止用于做非法攻击。注意靶场是可以练习的平台&#xff0c;…

Spark 资源调优

1 资源规划 1.1 资源设定考虑 1、总体原则 以单台服务器128G内存&#xff0c;32线程为例。 先设定单个Executor核数&#xff0c;根据Yarn配置得出每个节点最多的Executor数量&#xff0c;每个节点的yarn内存/每个节点数量单个节点的数量 总的executor数单节点数量*节点数。 2、…

php的api接口token简单实现

<?php // 生成 Token function generateToken() {$token bin2hex(random_bytes(16)); // 使用随机字节生成 tokenreturn $token; } // 存储 Token&#xff08;这里使用一个全局变量来模拟存储&#xff09; $tokens []; // 验证 Token function validateToken($token) {gl…

SolidWorks绘制花瓶教程

这个花瓶是我学习solidworks画图以来用时最长的一个图形了&#xff0c;特此记录一下&#xff0c;用了我足足两个早晨才把他给画出来&#xff0c;我这是跟着哔站里的隔壁老王学习的&#xff0c;下面是视频地址&#xff1a;点击我一下看视频教程 下面是我的绘图过程&#xff0c;…

wordpress是什么?快速搭网站经验分享

​作者主页 &#x1f4da;lovewold少个r博客主页 ⚠️本文重点&#xff1a;c入门第一个程序和基本知识讲解 &#x1f449;【C-C入门系列专栏】&#xff1a;博客文章专栏传送门 &#x1f604;每日一言&#xff1a;宁静是一片强大而治愈的神奇海洋&#xff01; 目录 前言 wordp…

【刷题篇】动态规划(四)

文章目录 1、珠宝的最高价值2、下降路径最小和3、最小路径和4、地下城游戏5、按摩师6、打家劫舍|| 1、珠宝的最高价值 现有一个记作二维矩阵 frame 的珠宝架&#xff0c;其中 frame[i][j] 为该位置珠宝的价值。拿取珠宝的规则为&#xff1a; 只能从架子的左上角开始拿珠宝 每次…

ConstraintLayout的基本用法

ConstraintLayout的基本用法 1、基线对齐——Baseline 有时候我们需要这样一个场景&#xff1a; app:layout_constraintBaseline_toBaselineOf"id/30"2、链——Chains 用于将多个控件形成一条链&#xff0c;可以用于平分空间。 <?xml version"1.0"…

sqli-labs关卡14(基于post提交的双引号闭合的报错注入)通关思路

文章目录 前言一、回顾上一关知识点二、靶场第十四关通关思路1、判断注入点2、爆显位3、爆数据库名4、爆数据库表5、爆数据库列6、爆数据库关键信息 总结 前言 此文章只用于学习和反思巩固sql注入知识&#xff0c;禁止用于做非法攻击。注意靶场是可以练习的平台&#xff0c;不…

react 组件进阶

目标&#xff1a;1.能够使用props接收数据 2.能够实现父子组建之间的通讯 3.能够实现兄弟组建之间的通讯 4.能够给组建添加props校验 5.能够说出生命周期常用的钩子函数 6.能够知道高阶组件的作用 一&#xff0c;组件通讯介绍 组件是独立且封闭的单元&#xff0c;默认情况下&a…