目录
0. 相关文章链接
1. 最优资源配置
2. RDD优化
2.1. RDD复用
2.2. RDD持久化
2.3. RDD尽可能早的 filter 操作
3. 并行度调节
4. 广播大变量
5. Kryo序列化
6. 调节本地化等待时长
0. 相关文章链接
Spark文章汇总
1. 最优资源配置
Spark 性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
资源的分配在使用脚本提交 Spark 任务时进行指定,标准的 Spark 任务提交脚本如下所示:
bin/spark-submit \
--class com.spark.WordCount \
--master yarn
--deploy-mode cluster
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar \
可以进行分配的资源如表所示:
名称 | 说明 |
--num-executors | 配置 Executor 的数量 |
--driver-memory | 配置 Driver 内存(影响不大) |
--executor-memory | 配置每个 Executor 的内存大小 |
--executor-cores | 配置每个 Executor 的 CPU core 数量 |
调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。
对于具体资源的分配,我们分别讨论 Spark 的两种 Cluster 运行模式:
- 第一种是 Spark Standalone 模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写 submit 脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有 15 台机器,每台机器为 8G 内存,2 个 CPU core,那么就指定 15 个 Executor,每个 Executor 分配 8G 内存,2 个 CPU core。
- 第二种是 Spark Yarn 模式,由于 Yarn 使用资源队列进行资源的分配和调度,在编写 submit 脚本的时候,就根据 Spark 作业要提交到的资源队列,进行资源的分配,比如资源队列有 400G 内存,100 个 CPU core,那么指定 50 个 Executor,每个 Executor 分配8G 内存,2 个 CPU core。
对各项资源进行了调节后,得到的性能提升会有如下表现:
名称 | 解析 |
增加 Executor·个数 | 在资源允许的情况下,增加 Executor 的个数可以提高执行 task 的并行度。比如有 4 个Executor,每个 Executor 有 2 个 CPU core,那么可以并行执行 8 个 task,如果将 Executor 的个数增加到 8 个(资源允许的情况下),那么可以并行执行 16 个 task,此时的并行能力提升了一倍。 |
增加每个 Executor 的 CPU core 个数 | 在资源允许的情况下,增加每个 Executor 的Cpu core 个数,可以提高执行 task 的并行度。比如有 4 个 Executor,每个 Executor 有 2 个 CPU core,那么可以并行执行 8 个 task,如果将每个 Executor 的 CPU core 个数增加到 4 个(资源允许的情况下),那么可以并行执行 16 个 task,此时的并行能力提升了一倍。 |
增加每个 Executor 的内存量 | 在资源允许的情况下,增加每个 Executor 的内存量以后,对性能的提升有三点: 1. 可以缓存更多的数据(即对 RDD 进行 cache),写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘 IO; 2. 可以为 shuffle 操作提供更多内存,即有更多空间来存放 reduce 端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘 IO; 3. 可以为 task 的执行提供更多内存,在 task 的执行过程中可能创建很多对象,内存较小时会引发频繁的 GC,增加内存后,可以避免频繁的 GC,提升整体性能。 |
生产环境 Spark submit 脚本配置 :
bin/spark-submit \
--class com.spark.WordCount \
--master yarn\
--deploy-mode cluster\
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar
参数配置参考值:
--num-executors: 50~100
--driver-memory: 1G~5G
--executor-memory: 6G~10G
--executor-cores: 3
--master:实际生产环境一定使用 yarn
2. RDD优化
2.1. RDD复用
在对 RDD 进行算子时,要避免相同的算子和计算逻辑之下对 RDD 进行重复的计算
对上图中的 RDD 计算架构进行修改,得到如下图所示的优化结果:
2.2. RDD持久化
在 Spark 中,当多次对同一个 RDD 执行算子操作时,每一次都会对这个 RDD 以之前的父 RDD 重新计算一次,这种情况是必须要避免的,对同一个 RDD 的重复计算是对资源的极大浪费,因此,必须对多次使用的 RDD 进行持久化,通过持久化将公共 RDD 的数据缓存到内存/磁盘中,之后对于公共 RDD 的计算都会从内存/磁盘中直接获取 RDD 数据。
对于 RDD 的持久化,有两点需要说明:
- RDD 的持久化是可以进行序列化的,当内存无法将 RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。
- 如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对 RDD 数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。
2.3. RDD尽可能早的 filter 操作
获取到初始 RDD 后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升 Spark 作业的运行效率。
3. 并行度调节
Spark 作业中的并行度指各个stage的task的数量。 如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20 个 Executor,每个 Executor 分配 3 个 CPU core,而 Spark 作业有 40 个 task,这样每个 Executor 分配到的 task 个数是 2 个,这就使得每个 Executor 有一个 CPU core 空闲,导致资源的浪费。 理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark 作业的性能和运行速度。
Spark 官方推荐,task 数量应该设置为 Spark 作业总 CPU core 数量的 2~3 倍。之所以没有推荐 task 数量与 CPU core 总数相等,是因为 task 的执行时间不同,有的 task 执行速度快而有的 task 执行速度慢,如果 task 数量与 CPU core 总数相等,那么执行快的 task 执行完成后,会出现 CPU core 空闲的情况。如果 task 数量设置为 CPU core 总数的 2~3 倍,那么一个 task 执行完毕后,CPU core 会立刻执行下一个 task,降低了资源的浪费,同时提升了 Spark 作业运行的效率。
Spark 作业并行度的设置如下所示:
val conf = new SparkConf()
conf.set("spark.default.parallelism", "500")
4. 广播大变量
默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。
假设当前任务配置了 20 个 Executor,指定 500 个 task,有一个 20M 的变量被所有 task 共用,此时会在 500 个 task 中产生 500 个副本,耗费集群 10G 的内存,如果使用了广播变量, 那么每个 Executor 保存一个副本,一共消耗 400M 内存,内存消耗减少了 5 倍。广播变量在每个 Executor 保存一个副本,此 Executor 的所有 task 共用此广播变量,这让变量产生的副本数量大大减少。
在初始阶段,广播变量只在 Driver 中有一份副本。task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中尝试获取变量,如果本地没有,BlockManager 就会从 Driver 或者其他节点的 BlockManager 上远程拉取变量的复本,并由本地的 BlockManager 进行管理;之后此 Executor 的所有 task 都会直接从本地的BlockManager 中获取变量。
5. Kryo序列化
默认情况下,Spark 使用 Java 的序列化机制。Java 的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现 Serializable 接口即可,但是,Java 序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。
Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式了。
public class MyKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
kryo.register(StartupReportLogs.class);
}
}
配置 Kryo 序列化方式的实例代码:
//创建 SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化库,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在 Kryo 序列化库中注册自定义的类集合,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");
6. 调节本地化等待时长
Spark 作业运行过程中,Driver 会对每一个 stage 的 task 进行分配。根据 Spark 的 task 分配算法,Spark 希望 task 能够运行在它要计算的数据算在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task 可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark 会等待一段时间,默认 3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将 task 分配到比较差的本地化级别所对应的节点上,比如将 task 分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。
当 task 要处理的数据不在 task 所在节点上时,会发生数据的传输。task 会通过所在节点的BlockManager 获取数据,BlockManager 发现数据不在本地时,户通过网络传输组件从数据所在节点的 BlockManager 处获取数据。
网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分 task,那么当前的 task 将有机会得到执行,这样就能够改善 Spark 作业的整体性能。
Spark 的本地化等级如表所示:
名称 | 解析 |
PROCESS_LOCAL | 进程本地化,task 和数据在同一个 Executor 中,性能最好。 |
NODE_LOCAL | 节点本地化,task和数据在同一个节点中,但是task 和数据不在同一个 Executor 中,数据需要在进程间进行传输。 |
RACK_LOCAL | 机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。 |
NO_PREF | 对于 task 来说,从哪里获取都一样,没有好坏之分。 |
ANY | task 和数据可以在集群的任何地方,而且不在一个机架中,性能最差。 |
在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 task 数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看 task 的本地化级别有没有提升,并观察 Spark 作业的运行时间有没有缩短。 注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark 作业的运行时间反而增加了。
Spark 本地化等待时长的设置如代码所示:
val conf = new SparkConf()
conf.set("spark.locality.wait", "6")
注:其他Spark相关系列文章链接由此进 -> Spark文章汇总