Spark 性能调优

news2025/1/23 1:06:07

1常规性能调优

1.1常规性能调优一:最优资源配置

Spark性能调优的第一步,就是为任务分配更多的资源在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。

资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如代码清单2-1所示:

代码清单2-1 标准Spark提交脚本

/usr/opt/modules/spark/bin/spark-submit \
--class com.atguigu.spark.Analysis \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar \

可以进行分配的资源如表2-1所示:

表2-1 可分配资源表

名称

说明

--num-executors

配置Executor的数量

--driver-memory

配置Driver内存(影响不大)

--executor-memory

配置每个Executor的内存大小

--executor-cores

配置每个Executor的CPU core数量

调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度

对于具体资源的分配,我们分别讨论Spark的两种Cluster运行模式:

第一种是Spark Standalone模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写submit脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有15台机器,每台机器为8G内存,2个CPU core,那么就指定15个Executor,每个Executor分配8G内存,2个CPU core。

第二种是Spark Yarn模式,由于Yarn使用资源队列进行资源的分配和调度,在表写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源的分配,比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。

对表2-1中的各项资源进行了调节后,得到的性能提升如表2-2所示:

表2-2 资源调节后的性能提升

名称

解析

增加Executor·个数

在资源允许的情况下,增加Executor的个数可以提高执行task的并行度。比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task,如果将Executor的个数增加到8个(资源允许的情况下),那么可以并行执行16个task,此时的并行能力提升了一倍。

增加每个Executor的CPU core个数

  在资源允许的情况下,增加每个Executor的Cpu core个数,可以提高执行task的并行度。比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task,如果将每个Executor的CPU core个数增加到4个(资源允许的情况下),那么可以并行执行16个task,此时的并行能力提升了一倍。

增加每个Executor的内存量

  在资源允许的情况下,增加每个Executor的内存量以后,对性能的提升有三点:

  1. 可以缓存更多的数据(即对RDD进行cache),写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
  2. 可以为shuffle操作提供更多内存,即有更多空间来存放reduce端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
  3. 可以为task的执行提供更多内存,在task的执行过程中可能创建很多对象,内存较小时会引发频繁的GC,增加内存后,可以避免频繁的GC,提升整体性能。

补充:生产环境Spark submit脚本配置

/usr/local/spark/bin/spark-submit \
--class com.atguigu.spark.WordCount \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar

参数配置参考值:

--num-executors:50~100

--driver-memory:1G~5G

--executor-memory:6G~10G

--executor-cores:3

--master:实际生产环境一定使用yarn-cluster

1.2常规性能调优二:RDD优化

1.2.1 RDD复用

在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如图2-1所示:

 图2-1 RDD的重复计算

对图2-1中的RDD计算架构进行修改,得到如图2-2所示的优化结果:

 图2-2 RDD架构优化

1.2.2 RDD持久化

在Spark中,当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费,因此,必须对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据。

对于RDD的持久化,有两点需要说明:

第一,RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。

第二,如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对RDD数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。

1.2.3 RDD尽可能早的filter操作

获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。

1.3常规性能调优三:并行度调节

Spark作业中的并行度指各个stage的task的数量

如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20个Executor,每个Executor分配3个CPU core,而Spark作业有40个task,这样每个Executor分配到的task个数是2个,这就使得每个Executor有一个CPU core空闲,导致资源的浪费。

理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。

Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3。之所以没有推荐task数量与CPU core总数相等,是因为task的执行时间不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的情况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会立刻执行下一个task,降低了资源的浪费,同时提升了Spark作业运行的效率。

Spark作业并行度的设置如代码清单2-2所示:

代码清单2-2 Spark作业并行度设置

val conf = new SparkConf()

  .set("spark.default.parallelism", "500")

1.4常规性能调优四:广播大变量

默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对RDD进行持久化,可能就无法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另一方面,task在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC会导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响Spark性能。

假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被所有task共用,此时会在500个task中产生500个副本,耗费集群10G的内存,如果使用了广播变量, 那么每个Executor保存一个副本,一共消耗400M内存,内存消耗减少了5倍。

广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少

在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量。

1.5常规性能调优五:Kryo序列化

默认情况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现Serializable接口即可,但是,Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。

Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。

Kryo序列化注册方式的实例代码如代码清单2-3所示:

代码清单2-3 Kryo序列化机制配置代码

public class MyKryoRegistrator implements KryoRegistrator
{
  @Override
  public void registerClasses(Kryo kryo)
  {
    kryo.register(StartupReportLogs.class);
  }
}

配置Kryo序列化方式的实例代码如代码清单2-4所示:

代码清单2-4 Kryo序列化机制配置代码

//创建SparkConf对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator"); 

1.6常规性能调优六:调节本地化等待时长

Spark作业运行过程中,Driver会对每一个stage的task进行分配。根据Spark的task分配算法,Spark希望task能够运行在它要计算的数据算在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark会等待一段时间,默认3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点上,比如将task分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级

当task要处理的数据不在task所在节点上时,会发生数据的传输。task会通过所在节点的BlockManager获取数据,BlockManager发现数据不在本地时,会通过网络传输组件从数据所在节点的BlockManager处获取数据。

网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的task将有机会得到执行,这样就能够改善Spark作业的整体性能。

Spark的本地化等级如表2-3所示:

表2-3 Spark本地化等级

名称

解析

PROCESS_LOCAL

进程本地化,task和数据在同一个Executor中,性能最好。

NODE_LOCAL

节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输。

RACK_LOCAL

机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。

NO_PREF

对于task来说,从哪里获取都一样,没有好坏之分。

ANY

task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。

在Spark项目开发阶段,可以使用client模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看task的本地化级别有没有提升,并观察Spark作业的运行时间有没有缩短。

注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark作业的运行时间反而增加了。

Spark本地化等待时长的设置如代码清单2-5所示:

代码清单2-5 Spark本地化等待时长设置示例

val conf = new SparkConf()
  .set("spark.locality.wait", "6")

2算子调优

2.1算子调优一:mapPartitions

普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。如果是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。

 

图2-3 map算子

如果是mapPartition算子,由于一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收所有的partition数据,效率比较高。

 

图2-4 mapPartitions算子

比如,当要把RDD中的所有数据通过JDBC写入数据,如果使用map算子,那么需要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions算子,那么针对一个分区的数据,只需要建立一个数据库连接。

mapPartitions算子也存在一些缺点:对于普通的map操作,一次处理一条数据,如果在处理了2000条数据后内存不足,那么可以将已经处理完的2000条数据从内存中垃圾回收掉;但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出

因此,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM)

在项目中,应该首先估算一下RDD的数据量、每个partition的数据量,以及分配给每个Executor的内存资源,如果资源允许,可以考虑使用mapPartitions算子代替map。

2.2算子调优二:foreachPartition优化数据库操作

在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能

如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foreachPartition算子。

与mapPartitions算子非常相似,foreachPartition是将RDD的每个分区作为遍历对象,一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接,如图2-5所示:

 图2-5 foreachPartition算子

使用了foreachPartition算子后,可以获得以下的性能提升:

1. 对于我们写的function函数,一次处理一整个分区的数据

2. 对于一个分区内的数据,创建唯一的数据库连接

3. 只需要向数据库发送一次SQL语句和多组参数

在生产环境中,全部都会使用foreachPartition算子完成数据库操作。foreachPartition算子存在一个问题,与mapPartitions算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出。

2.3算子调优三:filter与coalesce的配合使用

在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如图2-6所示:

 

图2-6 分区数据过滤结果

根据图2-6我们可以发现两个问题:

1. 每个partition的数据量变小了,如果还按照之前与partition相等的task个数去处理当前数据,有点浪费task的计算资源;

2. 每个partition的数据量不一样,会导致后面的每个task处理每个partition数据的时候,每个task要处理的数据量不同,这很有可能导致数据倾斜问题。

如图2-6所示,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题。

针对上述的两个问题,我们分别进行分析:

1. 针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来4个分区的数据转化到2个分区中,这样只需要用后面的两个task进行处理即可,避免了资源的浪费。

2. 针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配,让每个partition中的数据量差不多,这就避免了数据倾斜问题。

那么具体应该如何实现上面的解决思路?我们需要coalesce算子。

repartition与coalesce都可以用来进行重分区,其中repartition只是coalesce接口中shuffle为true的简易实现,coalesce默认情况下不进行shuffle,但是可以通过参数进行设置。

假设我们希望将原本的分区个数A通过重新分区变为B,那么有以下几种情况:

  1. A > B(多数分区合并为少数分区)

① A与B相差值不大

此时使用coalesce即可,无需shuffle过程。

② A与B相差值很大

此时可以使用coalesce并且不启用shuffle过程,但是会导致合并过程性能低下,所以推荐设置coalesce的第二个参数为true,即启动shuffle过程。

  1. A < B(少数分区分解为多数分区)

此时使用repartition即可,如果使用coalesce需要将shuffle设置为true,否则coalesce无效。

我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能

注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。

2.4算子调优四:repartition解决SparkSQL低并行度问题

在第一节的常规性能调优中我们讲解了并行度的调节策略,但是,并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage生效

Spark SQL的并行度不允许用户自己指定,Spark SQL自己会默认根据hive表对应的HDFS文件的split个数自动设置Spark SQL所在的那个stage的并行度用户自己通spark.default.parallelism参数指定的并行度,只会在没Spark SQLstage中生效

由于Spark SQL所在stage的并行度无法手动设置,如果数据量较大,并且此stage中后续的transformation操作有着复杂的业务逻辑,而Spark SQL自动设置的task数量很少,这就意味着每个task要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有Spark SQL的stage速度很慢,而后续的没有Spark SQL的stage运行速度非常快。

为了解决Spark SQL无法设置并行度和task数量的问题,我们可以使用repartition算子。

 

图2-7 repartition算子使用前后对比图

Spark SQL这一步的并行度和task数量肯定是没有办法去改变了,但是,对于Spark SQL查询出来的RDD,立即使用repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再设计Spark SQL,因此stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。使用repartition算子的前后对比如图2-7所示。

2.5算子调优五:reduceByKey本地聚合

reduceByKey相较于普通的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。reduceByKey算子的执行过程如图2-8所示:

 

图2-8 reduceByKey算子执行过程

使用reduceByKey对性能的提升如下:

  1. 本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;
  2. 本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;
  3. 本地聚合后,在reduce端进行数据缓存的内存占用减少;
  4. 本地聚合后,在reduce端进行聚合的数据量减少。

基于reduceByKey的本地聚合特征,我们应该考虑使用reduceByKey代替其他的shuffle算子,例如groupByKey。reduceByKey与groupByKey的运行原理如图2-9和图2-10所示:

 图2-9 groupByKey原理

 图2-10 reduceByKey原理

根据上图可知,groupByKey不会进行map端的聚合,而是将所有map端的数据shuffle到reduce端,然后在reduce端进行数据的聚合操作。由于reduceByKey有map端聚合的特性,使得网络传输的数据量减小,因此效率要明显高于groupByKey。

3Shuffle调优

3.1Shuffle调优一:调节map端缓冲区大小

在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能

map端缓冲的默认配置是32KB,如果每个task处理640KB的数据,那么会发生640/32 = 20次溢写,如果每个task处理64000KB的数据,机会发生64000/32=2000此溢写,这对于性能的影响是非常严重的。

map端缓冲的配置方法如代码清单2-7所示:

代码清单2-7 map端缓冲配置

val conf = new SparkConf()
  .set("spark.shuffle.file.buffer", "64")

3.2Shuffle调优二:调节reduce端拉取数据缓冲区大小

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能

reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB,该参数的设置方法如代码清单2-8所示:

代码清单2-8 reduce端数据拉取缓冲区配置

val conf = new SparkConf()
  .set("spark.reducer.maxSizeInFlight", "96")

3.3Shuffle调优三:调节reduce端拉取数据重试次数

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3,该参数的设置方法如代码清单2-9所示:

代码清单2-9 reduce端拉取数据重试次数配置

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "6")

3.4Shuffle调优四:调节reduce端拉取数据等待间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。

reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s,该参数的设置方法如代码清单2-10所示:

代码清单2-10 reduce端拉取数据等待间隔配置

val conf = new SparkConf()
  .set("spark.shuffle.io.retryWait", "60s")

3.5Shuffle调优五:调节SortShuffle排序操作阈值

对于SortShuffleManager,如果shuffle reduce task的数量小于某一阈值则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件

当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

SortShuffleManager排序操作阈值的设置可以通过spark.shuffle.sort. bypassMergeThreshold这一参数进行设置,默认值为200,该参数的设置方法如代码清单2-11所示:

代码清单2-10 reduce端拉取数据等待间隔配置

val conf = new SparkConf()
  .set("spark.shuffle.sort.bypassMergeThreshold", "400")

4 JVM调优

对于JVM调优,首先应该明确,(major)full gc/minor gc,都会导致JVM的工作线程停止工作,即stop the world

4.1 JVM调优一:降低cache操作的内存占比

1. 静态内存管理机制

根据Spark静态内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存RDD数据和broadcast数据,Execution主要用于缓存在shuffle过程中产生的中间数据,Storage占系统内存的60%,Execution占系统内存的20%,并且两者完全独立。

在一般情况下,Storage的内存都提供给了cache操作,但是如果在某些情况下cache操作内存不是很紧张,而task的算子中创建的对象很多,Execution内存又相对较小,这回导致频繁的minor gc,甚至于频繁的full gc,进而导致Spark频繁的停止工作,性能影响会很大。

在Spark UI中可以查看每个stage的运行情况,包括每个task的运行时间、gc时间等等,如果发现gc太频繁,时间太长,就可以考虑调节Storage的内存占比,让task执行算子函数式,有更多的内存可以使用

Storage内存区域可以通过spark.storage.memoryFraction参数进行指定,默认为0.6,即60%,可以逐级向下递减,如代码清单2-6所示:

代码清单2-6 Storage内存占比设置

val conf = new SparkConf()
  .set("spark.storage.memoryFraction", "0.4")

2. 统一内存管理机制

根据Spark统一内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存数据,Execution主要用于缓存在shuffle过程中产生的中间数据,两者所组成的内存部分称为统一内存,Storage和Execution各占统一内存的50%,由于动态占用机制的实现,shuffle过程需要的内存过大时,会自动占用Storage的内存区域,因此无需手动进行调节

4.2 JVM调优二:调节Executor堆外内存 

Executor的堆外内存主要用于程序的共享库、Perm Space、 线程Stack和一些Memory mapping等, 或者类C方式allocate object

有时,如果你的Spark作业处理的数据量非常大,达到几亿的数据量,此时运行Spark作业会时不时地报错,例如shuffle output file cannot find,executor lost,task lost,out of memory等,这可能是Executor的堆外内存不太够用,导致Executor在运行的过程中内存溢出。

stage的task在运行的时候,可能要从一些Executor中去拉取shuffle map output文件,但是Executor可能已经由于内存溢出挂掉了,其关联的BlockManager也没有了,这就可能会报出shuffle output file cannot find,executor lost,task lost,out of memory等错误,此时,就可以考虑调节一下Executor的堆外内存,也就可以避免报错,与此同时,堆外内存调节的比较大的时候,对于性能来讲,也会带来一定的提升。

默认情况下,Executor堆外内存上限大概为300多MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致Spark作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G,甚至于2G、4G。

Executor堆外内存的配置需要在spark-submit脚本里配置,如代码清单2-7所示:

代码清单2-7 Executor堆外内存配置

--conf spark.yarn.executor.memoryOverhead=2048

以上参数配置完成后,会避免掉某些JVM OOM的异常问题,同时,可以提升整体Spark作业的性能。

4.3 JVM调优三:调节连接等待时长

在Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。

如果task在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时

在生产环境下,有时会遇到file not found、file lost这类错误,在这种情况下,很有可能是Executor的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长60s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃。这种情况也可能会导致DAGScheduler反复提交几次stage,TaskScheduler返回提交几次task,大大延长了我们的Spark作业的运行时间。

此时,可以考虑调节连接的超时时长,连接等待时长需要在spark-submit脚本中进行设置,设置方式如代码清单2-8所示:

代码清单2-8 连接等待时长配置

--conf spark.core.connection.ack.wait.timeout=300

调节连接等待时长后,通常可以避免部分的XX文件拉取失败、XX文件lost等报错。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386680.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

高研发投入成就产品力,蔚来财报透露重要信号

3月1日晚间&#xff0c;蔚来发布了2022年第四季度及全年财报。 财报显示&#xff0c;蔚来四季度营收160.6亿元&#xff0c;同比增长62.2%&#xff0c;连续11个季度正增长&#xff0c;同时全年总营收达492.7亿元&#xff0c;季度和年度营收均创新高。 尽管过去一年受到新冠疫情…

妇女节到了,祝福所有女神 Happy Women‘s Day!

在每年&#xff13;月&#xff18;日人们庆祝妇女节 &#xff37;omens Day is cllebrated on March 8 every year.国际妇女节(IWD)&#xff0c;中国内地称“三八”国际劳动妇女节或国际劳动妇女节。是在每年的3月8日为庆祝妇女在经济、政治和社会等领域作出的重要贡献和取得的…

5个商用字体网站分享

整理了5个免费、商用字体素材网站&#xff0c;对你有帮助记得点赞收藏。 更多设计素材免费下载&#xff1a; https://www.sucai999.com/?vNTYxMjky 1、FontSpace https://www.fontspace.com/ 这个网站提供了96000款免费字体&#xff0c;可商业用途的字体就有17000款&#xf…

RK3568-IOT核心板不同规格品牌TF卡读写速率测试

1. 测试对象HD-RK3568-IOT 底板基于HD-RK3568-CORE工业级核心板设计&#xff08;双网口、双CAN、 5路串口&#xff09;&#xff0c;接口丰富&#xff0c;适用于工业现场应用需求&#xff0c;亦方便用户评估核心板及CPU的性能。适用于工业自动化控制、人机界面、中小型医疗分析器…

VMworkstation centos虚拟机配置仅主机模式

首先是仅主机模式介绍&#xff0c;可以略过直接看下面具体配置过程。仅主机模式用于在宿主机和虚拟机之间建立局域网&#xff0c;宿主机和虚拟机之间可以互相访问&#xff0c;原理是建立虚拟交换机和宿主机虚拟网卡&#xff08;vmnet1&#xff09;&#xff0c;宿主机虚拟网卡和…

深入分析域内ntlm relay to adcs服务的利用(含wireshark抓包分析)

前言 2021年中旬&#xff0c;specterops发布了一项针对域证书服务(adcs)的利用白皮书&#xff0c;文档中提到了19种对adcs服务的利用。本篇主要是分析文中提出的ntlm relay to adcs窃取证书的攻击流程&#xff0c;原理和抓包分析。 相关内容 ADCS介绍 Active Directory证书…

AI for Science系列(三):赛桨PaddleScience底层核心框架技术创新详解

继上一篇典型案例及API功能介绍&#xff0c;本篇重点讲解飞桨核心框架为支持科学计算任务在技术上的创新工作与成果。 框架技术创新 飞桨科学计算套件赛桨PaddleScience底层技术依赖飞桨核心框架。为了有力地支撑科学计算任务高效训练与推理&#xff0c;飞桨核心框架在自动微…

案例13-前端对localStorage的使用分析

一&#xff1a;背景介绍 前端在调用后端接口获取某一个人的评论次数、获赞次数、回复次数。调用之后判断后端返回过来的值。如果返回回来的值是0的话&#xff0c;从缓存中获取对应的值&#xff0c;如果从缓存中获取的评论次数为空那么其他两个的次数也为0。 二&#xff1a;思路…

SqlServer Management Studio启用身份验证登录

背景 一开始安装好SqlServer Management Studio时&#xff0c;默认只能用本地window身份验证登录&#xff0c;也就是除了SqlServer的电脑&#xff0c;别的都访问不了这个数据库&#xff0c;这是很不方便的 方案 1.打开SqlServer Management Studio&#xff0c;先用window身份…

windows内核学习-段和页相关

段寄存器(96位)(234条消息) 03.段寄存器_015646的博客-CSDN博客代码段寄存器CS&#xff08;Code Segment&#xff09;存放当前正在运行的程序代码所在段的段基址&#xff0c;表示当前使用的指令代码可以从该段寄存器指定的存储器段中取得&#xff0c;相应的偏移量则由IP提供。数…

Apinto 网关 V0.11.1 版本发布,多协议互转,新增编码转换器,接入 Prometheus

Eolink 旗下 Apinto 开源网关再次更新啦~ 一起来看看是否有你期待的功能&#xff01; 1、协议转换功能上线 之前发布的 Apinto v0.10.0 已经支持了多协议的基本功能&#xff0c;实现多协议支持的一次验证。本次最新版本可以支持 HTTP 与 gRPC、HTTP 与 Dubbo2 之间的协议转换。…

【Java】 异步调用实践

本文要点&#xff1a; 为什么需要异步调用CompletableFuture 基本使用RPC 异步调用HTTP 异步调用编排 CompletableFuture 提高吞吐量BIO 模型 当用户进程调用了recvfrom 这个系统调用&#xff0c;kernel 就开始了 IO 的第一个阶段&#xff1a;准备数据。对于 network io 来说…

react Context学习记录

react Context学习记录1.Context是干嘛的2.可以倒是可以实现的做法-props逐级传递3.Context1.Context是干嘛的 一种React组件间通信方式, 常用于【祖组件】与【后代组件】间通信 2.可以倒是可以实现的做法-props逐级传递 import React, { Component } from "react";…

Qt资源文件.qrc

目录 一 用途 二 使用效果 三 如何添加资源文件 一 用途 有时候我们想添加图片或者动画的话&#xff0c;就可以使用资源文件 二 使用效果 我在标签上添加了一个蝴蝶的图片&#xff0c;最后呈现出来的效果就是这样子的 三 如何添加资源文件 添加资源文件要指定特定的路径&…

iptables语法规则

iptables命令基本语法 iptables [-t table] command [链名] [条件匹配] [-j 目标动作]以下是对iptables命令的拆分讲解&#xff1a; -t table 用来指明使用的表&#xff0c;有三种选项&#xff1a;filter,nat,mangle。若未指定&#xff0c;则默认使用filter表。 command参数 …

【数据库】基础知识,创建一个表

计算机硬件软件硬件&#xff1a;1.输入输出设备 2.控制器 3.运算器 4.存储器&#xff08;内存&#xff09;软件&#xff1a;1.应用软件 2.系统软件&#xff08;操作系统、数据库管理系统、语言处理系统……&#xff09;从开发者角度分软件&#xff1a;B/S浏览器和服务器结构&am…

umi学习(umi4)

umi 官方文档 官方建议使用 pnpm node版本在 14 以上 创建项目&#xff1a; 根据 包管理工具不同 &#xff0c;官方推荐 这里使用 pnpm&#xff1a; 1. pnpm dlx create-umilatest 2. 选择模板 &#xff08;这里使用 Simple App&#xff09; 想对module处理需要使用 Ant Desig…

19 客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程 说起Nacos的服务订阅机制&#xff0c;大家会觉得比较难理解&#xff0c;那我们就来详细分析一下&#xff0c;那我们先从Nacos订阅的概述说起 Nacos订阅概述 Nacos的订阅机制&#xff0c;如果用一句话来描述就是&#xff1a;Nacos客户端通…

vue自定义指令以及angular自定义指令(以禁止输入空格为例)

哈喽&#xff0c;小伙伴们&#xff0c;大家好啊&#xff0c;最近要实现一个vue自定义指令&#xff0c;就是让input输入框禁止输入空格建立一个directives的指令文件&#xff0c;里面专门用来建立各个指令的官方文档&#xff1a;自定义指令 | Vue.js (vuejs.org)我们都知道vue中…

小白学Pytorch 系列--Torch API

小白学Pytorch 系列–Torch API Torch version 1.13 Tensors TORCH.IS_TENSOR 如果obj是PyTorch张量&#xff0c;则返回True。 注意&#xff0c;这个函数只是简单地执行isinstance(obj, Tensor)。使用isinstance 更适合用mypy进行类型检查&#xff0c;而且更显式-所以建议使…