这里是引用
1 八股文
1.1 基本原理
-
driver节点是整个应用程序的指挥所
-
指挥官是sparkcontext
-
环境:构建一个集群
-
应用程序提交
-
确定主节点,确定指挥所driver,确定指挥官sparkcontext
-
sparkcontext会向资源管理器申请资源
-
会将作业分为不同阶段
-
将不同任务分到不同节点执行
-
整个过程还会进行监控
-
资源管理器收到sparkcontext的资源请求
-
会向executor分配资源
-
启动executor进程,才会启动线程
-
executor进程是驻留在不同的work node中
-
会有成百上千个进程和work node
-
sparkcontext对象要根据 rdd依赖关系 构建一个DAG图
-
代码就是针对RDD一次次的操作
-
这些操作会被转换成一个有向无环图 dag
-
DAG会被提交到dag scheduler解析
-
DAG图会被切为很多个阶段 stage
-
每个stage又分为若干个任务
-
每一个阶段stage是任务的集合
-
把这个阶段stage提交给task scheduler
-
task scheduler负责分发任务
-
worker node上的executor会向task scheduler主动申请
-
task scheduler会返回任务给worker node上的executor去派生线程去执行
-
计算给节点的分发原则:
-
计算向数据靠拢。数据在哪个节点上面,task scheduler优先分配,完成本地化的处理。
-
executor运行的结果会再次反馈给task scheduler
-
再向上传给 dag scheduler
-
spark context做最后的处理。返回给用户看或者写入HDFS
-
sparkcontext:代表了整个应用程序连接集群的通道。链接应用和集群
1.2 核心原理
1.2.1 关键概念
driver: 该进程调用spark程序的main方法,并且启动sparkcontext
cluster Manager:该进程负责和外部集群工具打交道,申请或释放集群资源
Woker:该进程是一个守护进程,负责启动和管理executor
executor:该进程是一个JVM虚拟机。负责运行spark task
1.2.2 运行步骤
- 启动driver,创建sparkcontext
- client提交程序给driver,driver向cluster manager申请集群资源
- 资源申请完毕,在worker中启动executor
- driver将程序转化为tasks,分发给executor执行
1.2.3 spark运行模型
1.2.3.1 单机
使用线程模拟并行来运行程序
- 一般用于测试开发
- 不能启动spark的master、worker守护进程
- 不能启动Hadoop的各项服务
- sparksubmit进程是客户提交任务的client进程,又是spark的driver程序、还充当着spark执行task的executor的角色
1.2.3.2 集群
使用集群管理器来和不同类型的集群交互,将任务运行在集群中。
- spark standalone
- hadoop yarn
- Apache mesos
- kubernetes
2 Spark UI深入解读
- 在日常的开发工作中,我们总会遇到spark应用运行失败,或者执行效率不达预期的情况。对于这样的问题,想找到根本原因(root cause),就可以通过spark ui来获取最直接、最直观的线索,在全量的审查spark应用的同时,迅速定位问题所在。
- 如果我们把失败的、或是执行低效的spark应用看做是“病人”的话,那么spark ui中关于应用的众多衡量指标(metrics),就是这个病人的“体检报告”。结合多样的metrics,身为“大夫”的开发者可以结合经验来迅速定位“病灶”。
2.1 spark UI一级入口
打开spark UI,映入眼帘的是默认的Jobs页面。JObs页面记录着应用中涉及的actions动作,以及与数据读取、移动有关的动作。其中每一个action都对应着一个job,而每一个job都对应着一个作业。我们一会再去对jobs页面做展开,现在先把目光集中在spark ui最上面的导航条,这里面罗列这spark ui的所有一级入口
入口页 | 包含内容。作用 |
---|---|
jobs | actions,以及数据的读取与移动等操作。作业详情预览 |
stages | DAG中每个stage的入口。作业详情预览 |
storage | 分布式数据集缓存(cache)详情页。审查cache在内存与磁盘中的分布情况 |
storage | 分布式数据集缓存(cache)详情页。审查cache在内存与磁盘中的分布情况 |
environment | 配置项,环境变量详情。审查spark配置项是否符合预期 |
executors | 分布式运行环境与计算负载详情页。深入审查执行计划中的每一个环节 |
- JOB:作业详情的预览
- 什么情况下spark会产生一个job?对应的action算子会产生,对应的数据的读取和移动操作。
- stage。一个Job可以拆分为多个stage。job拆分stage的详细情况。划分stage的依据是宽窄依赖
- storage。在cache数据的时候,会有应用。
- environment。spark任务的配置项、环境变量。
- executor。分布式运行环境计算负载的详情页,作用是审查executor之间是否存在数据倾斜。比如,看input的数据的差异用于评估负载情况。
- SQL。spark SQL执行计划的详情页。
executor、environment、storage不存在二级入口,但是SQL、storage、JOBs有二级入口
2.1.1 executors
2.1.1.1 summary
- RDD blocks:原始分区数据集的分区数量
- storage memory:用于cache时所占的内存的占用情况。
- disk used。计算过程中消耗的磁盘空间。
- cores。用于计算的CPU的核数。
- acitive task:活跃的task数量
- failed task:失败的task数量
- complete task:完成的task数量
- task time(GC time)。任务的执行时候,以及任务的GC时间
- input。输入数据量的大小
- shuffle read 大小
- shuffle write 大小
- blocklisted。黑名单
2.1.1.2 executors
- executors tab的主要内容如下,主要包含“summary”和“executors”两部分。这两部分记录的度量指标是一致的,其中“executors”以更新粒度记录者每一个executor的详情,而第一部分“summary”是下面所有executors度量指标的简单加和。
- sparkUI都提供了哪些metrics,来量化每一个executor的工作负载(workload)。
metric | 含义 |
---|---|
RDD | 原始数据集的分区数量 |
storage memory | 用于cache的内存占用 |
disk used | 计算过程中消耗的磁盘空间 |
cores | 用于计算的CPU核数 |
active/failed/complete | total tasks |
task time(GC time) | 任务执行时间(括号内为任务GC时间) |
input | 输入数据量大小 |
shuffle head/write | shuffle读写过程中消耗的数据量 |
logs/thread dump | 日志与core dump |
- 不难发现,executors页面清清楚楚的记录着每一个executor消耗的数据量,以及他们对CPU、内存与磁盘等硬件资源的消耗。基于这些信息,我们可以轻松判断应用中是否存在数据倾斜的隐患。
- Thread Dump。Java中的诊断工具,每个JVM都可以显示所有线程在某一个点的状态,用作Java定位问题的诊断功能。
- runnable。当前可以运行的线程
- timed-waiting。线程主动等待的意思。
- waiting。等待的线程。
summary是executors所有指标聚合的情况。
基于这些信息,盘点不同executor之间是否存在负载不均衡的情况、数据倾斜的隐患。
2.1.2 environment
- 各种各样环境变量与配置信息。
metric | 含义 |
---|---|
runtime information | Java Scala版本号信息 |
spark properties | 所有spark配置项的设置细节,重点 |
Hadoop properties | hadoop配置项细节 |
system properties | 应用提交方法(spark-shell/spark-submit) |
classpath entries | classpath路径设置信息 |
- spark properties是重点。其中记录着所有运行时生效的spark配置项设置。通过spark properties,我们可以确认运行时的设置,与我们预期的设置是否一致,从而排除因配置项设置错误而导致的稳定性或是性能问题。
2.1.2.1 runtime information
2.1.2.2 spark properties
spark任务的各种配置项、判断参数是否合理
2.1.2.3 resource properties
不重要
2.1.2.4 Hadoop properties
Hadoop的各种配置项
2.1.2.5 system properties
系统配置项,可以看启动命令。sum.java.command
2.1.2.6 classpath properties
配置、jar包的路径
2.1.3 storage
- 记录了每一个缓存,rdd cache、dataframe cache。包括缓存级别、已缓存的分区数、缓存比例、内存大小与磁盘大小。
- spark支持不同的缓存级别,他是存储介质(内存、磁盘)、存储形式(对象、序列化字节)与副本数量的排列组合。对于data frame来说,默认的级别是单副本的disk memory deserialized,也就是存储介质为内存加磁盘,粗出形式为对象的单一副本存储方式。
metric | 含义 |
---|---|
storage level | 存储级别 |
cached partitions | 已缓存的分区数 |
fraction caches | 缓存比例 |
size in memory | 内存大小 |
size on disk | 磁盘大小 |
-
cached partitions 和fraction caches分别记录着数据集成功缓存的分区数量,以及这些缓存的分区占所有分区的比例。当fraction cached小于100%的时候。说明分布式数据集并没有完全缓存到内存(或是磁盘)。对于这种情况,我们要警惕缓存换入换出可能带来的性能隐患。
-
基于storage页面提供的详细信息,我们可以有的放失的设置于内存有关的配置项,如spark.executor.memory、spark.executor.fraction、spark.executor.storageFraction、从而有针对性的对storage memory进行调整。
-
cache partitions 已缓存的分区数
-
fraction cached。缓存的比例,代表缓存的分区占所有分区的比例,当小于100%的时候,代表分布式的数据没有完全划分到内存或者磁盘里面。
- 缓存换入换出,有可能带来性能的问题。
-
size in memory。内存缓存的大小
- storage memory不足的情况下,会把他size到磁盘里面。
-
size in disk。磁盘缓存的大小
2.1.4 SQL
- 以actions为单位,记录着每个action对应的sparksql执行计划。我们需要点击“description”列中的超链接,才能进入到二级页面,去了解每个执行计划的详细信息。
- Jobs:同理,低于jobs来说,spark ui也是以actions为粒度,记录着每个action对应作业的执行情况。我们要了解作业详情页,也必须通过“description”页面提供的二级入口链接。
- 一个action对应一个query,一个query会有多个job id。
- 以actions为单位,记录着每个action对应的spark sql执行计划。我们需要点击“description”列中的超链接,才能进入到二级页面,去了解每个执行计划的详细信息。
2.1.4 JOBs
- description。描述
- submitted。提交时间
- duration。执行时间
- stage。成功
2.1.4 stage
- 我们知道,每一个作业,都包含多个阶段,就是我们常说的stages。在stages页面,spark ui罗列了应用中涉及的所有stage,这些stages分属于不同的作业。要想查看哪些stages隶属于哪个job,还需要从jobs的descritions二级入口进入查看。
- stage页面,更多的是一种预览,要想查看每一个stage的详情,同样需要从“description”进入详情页。
总结:
一级入口 | 重点内容 |
---|---|
executors | 不同executors之间,是否存在负载倾斜 |
environment | 不同executors之间,是否存在负载倾斜 |
storage | 分布式数据集的缓存级别,内存,磁盘缓存比例 |
SQL | 初步了解不同执行计划的执行时间,确实是否符合预期 |
jobs | 初步感知不同jobs的执行时间,确实是否符合预期 |
stage | 初步感知不同stage的执行时间,确实是否符合预期 |
- 记录了以action为粒度,记录了每个action作业的情况。
- executor可以看到不同executor负载情况、执行情况,判断数据倾斜
- environment可以看到spark任务的配置情况,判断配置是否合理。参数的配置。
- 配置优先级:code>conf>默认
- SQL。可以了解不同任务执行时间是否符合预期。
- job。可以看到job的执行情况,是否符合预期
- storage。可以看到storage的执行情况,是否符合预期
- stage。可以看到stage的执行情况,是否符合预期
2.2 spark UI二级入口
- 所为二级入口,指的是通过一次超链接跳转才能访问到的页面。对于SQL、jobs和stages这三类入口来说,二级入口往往已经提供了足够的信息,基本覆盖了“体检报告”的全部内容。因此,尽管spark UI也提供了少量的三级入口(需要凉调才能到达的页面),但是这些隐藏在“犄角旮旯”的三级入口,往往不需要开发者去特别关注。
- 接下来我们就沿着sql->job->stages的顺序,一次的去访问他们的耳机入口,从而针对全局dag,作业以及执行阶段,获得更加深入的探索和洞察。
2.2.1 sql详情页
- 在 SQLtab一级入口,我们看到有1个条目
- 点击图中的“description”,即可进入到该作业的执行计划页面,如下图所示。
2.2.1.1 exchange
- 可以看到,对应每一个exchange,spark ui都提供了丰富的metrics来刻画shuffle的计算过程。从shuffle write到shuffle read,从数据量到处理时间,应有尽有。
metrics | 含义 |
---|---|
shuffle records written | shuffle write阶段写入的数据条目数量 |
shuffle write time total | shuffle write阶段花费的写入时间 |
records read | shuffle read阶段读取的数据条目数量 |
local bytes read total | shuffle read阶段从本地节点读取的数据总量 |
fetch wait time total | shuffle read阶段花费在网络传输上的时间 |
remote bytes read total | shuffle read阶段跨网络、从远端节点读取的数据总量 |
date size total | 原始数据在内存中展开之后的总大小 |
remote bytes read to disk | shuffle read阶段因数据块过大而直接落盘的情况 |
shuffle bytes written total | shuffle中间文件总大小 |
- 结合这份shuffle的体检报告,我们就能一量化的方式,去掌握shuffle过程的计算细节,从而为调优提供更多的洞察和思路。
2.2.1.2 sort
- 接下来,我们再来说说sort。相比exchange,sort的度量指标没有那么多,不过,他们足以让我们一窥sort再运行时,对内存的消耗,如下图所示。
-
metrics | 含义 |
---|---|
sort time total | 排序消耗的总时间 |
peak memory total | 内存的消耗峰值(集群范围内) |
spill size total | 排序过程中移除到磁盘的数据总量 |
- 可以看到“peak memory total ”和“spill size total”这两个数值,足以指导我们更有针对性的去设置spark.executor.memory、spark.memory.fraction、spark.memory.storageFraction,从而使得execution memory区域得到充分的保障。
2.2.1.2 aggragate
- 与sort类似,衡量aggregate的度量指标,主要记录的也是操作的内存消耗。
- - 可以看到对于aggregate操作,spark ui也记录着磁盘移除与峰值消耗,即spill size和peak memory total。这两个数值也为内存的调整提供了依据。
2.2.2 jobs详情页
- 接下里,我们再来说说jobs详情页。jobs详情页非常的简单、直观,他罗列这隶属于当前job的所有stages。要想访问每一个stage的执行细节,我们还需要通过description的超链接做跳转。
2.2.3 stages详情页
- 实际上,要访问stage详情,我们还有另外一种选择,那就是直接从stages以及入口进入,然后完成跳转。因此stage详情页也归类为二级入口。
- 接下来我们以id为8的stage位例,去看一看详情页都记录着哪些关键信息。
- 在所有的二级入口中,stage详情页的信息流可以说是巨大的。点击stage详情页,可以看到他主要包含3大信息类。分别是stage dag、event timeline与task metrics.
- 其中task metics又分为summary和entry detail两部分,提供不同粒度的信息汇总。而task metrics中记录的指标类别,还可以通过“show additional metrics”选项进行扩展。
2.2.3.1 stage dag
- 接下来,我们沿着“stage dag -> event timeline -> task metrics的顺讯,依次讲讲这些页面所包含的内容
- 首先,我们先来看看最简单的stage dag。点开蓝色的dag visualization按钮,我们就能获取到当前stage的dag,如下图所示。
- 之所以说stage dag简单,是因为咱们在SQL🎧入口,已经对dag做过详细的说明。而stage dag仅仅是SQL页面完整的dag的子集,毕竟,SQL页面的dag,针对的是作业(job)。因此,只要掌握了作业的dag,自然也就掌握了每一个stage的dag
2.2.3.2 event timeline
- 与dag visualization并列,在summary metrics之上有一个event timeline的按钮,点开它,我们可以得到如下图所示的可视化信息。
- - even timeline,记录着分布式任务调度与执行的过程中,不同计算环节主要的时间花销。图中的每一个条带,都代表着一个分布式任务,条带由不同颜色构成。其中不同颜色的矩形,代表不同环节的计算时间。
- 为了方便叙述,我还是同表格形式帮你梳理了这些环节的含义与作用,你可以保存以后随时查看。
metrics | 含义 |
---|---|
scheduler delay | 调度延迟(调度系统开销) |
task deserialization time | 任务的反序列化时间(调度系统开销) |
shuffle read time | shuffle read时间开销 |
executor computing time | 计算时间 |
shuffle write time | shuffle write时间开销 |
result serialization time | 任务结果的序列化时间 |
getting result time | 结果收集花费的时间 |
- 理想情况下,条带的大部分应该都是绿色的,也就是任务的时间消耗,大部分都是执行时间。不过,实际情况并不总是如此,比如,有些时候,蓝色的部分占比较多,或是橙色的部分占比较大
- 在这些情况下,我们就可以结合event timeline,来判断作业是否存在跳读开销过大,或是shuffle负载过重的问题,从而有针对性的对不同环节做调优。
- 比方说,如果条带中深蓝的部分(scheduler delay)很多,那就说明任务的调度开销 很重。这个时候,我们就需要参考”三足鼎立“的掉调优方法,去相应的调整cpu、内存与并行度、从而减低任务的调度开销。
- 再比如,如果条带中黄色(shuffle write time)与橙色(shuffle read time)的面积较大,就说明任务的shuffle负载很重,这个时候,我们就需要考虑,有没有可能通过利用broadcast join来消除shuffle,从而缓解任务的shuffle负担。
2.2.3.3 task metrics
- 说完stage dag和event timeline,最后,我们再来说一说stage详情页的重头戏:task metrics。
- 之所以说他是重头戏,在于task metrics以不同的粒度,提供了详尽的量化指标。其中,task以task为粒度,记录着每一个分布式任务的执行细节,而summary metrics则是对于所有tasks执行细节的统计汇总。我们先看看粗粒度的summary metrics,载入展开细粒度的task。
2.2.3.4 summary metrics
- 首先我们点开show additional metrics按钮,勾选select all,让那所有的度量指标都生效,如下图所示。这么做的目的,在于获取最详尽的task执行信息。
- 可以看到,select all生效之后,spark ui打印出了所有的执行细节。老规矩,为了方便叙述,我还是把这些metrics整理到表格中,方便你随时查阅。其中,task deserialization time、result serialization time、getting result time、scheduler delay与刚刚表格中的含义相同,不再赘述,这里我们紧整理出新出现的task metrics。
metrics | 含义 |
---|---|
duration | task 执行时间 |
gc time | 任务执行过程中,Java gc时间 |
peak execution memory | 内存消耗峰值 |
spill(memory) | 溢出数据的内存占用 |
spill(disk) | 溢出数据的磁盘占用 |
shuffle read size/ records | shuffle read 阶段读取的数据量与条目数量 |
shuffle read blocked time | shuffle read阶段的网络延迟 |
shuffle remote reads | shuffle read阶段跨节点、从远端节点拉取的数据量 |
shuffle write size/records | shuffle write阶段写入的数据量与条目数量 |
shuffle write time | shuffle write阶段话费的写入时间 |
- 对于这些详尽的task metrics,难能可贵的,spark UI以最大最小(max min)以及分位点(25%分位、50%分位、75%分位)的方式,提供了不同的metrics的统计分布。这一点非常重要,原因在于,这些metrics的统计分布,可以让我们非常清晰的量化任务的负载分布。
- 换句话说,根基不同metrics的统计信息分布,我们就可以轻而易举的判定,当前作业的不同任务之间,是相对均衡,还是存在严重的倾斜。如果判断计算存在负载倾斜,那么我们就需要利用手工加盐或者是aqe的自动倾斜处理,去消除任务之间的不均衡,从而改善作业的性能。
- 在上面的表格中,有一半的metrics是预shuffle直接相关的,比如shuffle read size/record,shuffle remote reads等等。
- 这些metrics我们在介绍SQL详情的时候,已经详细说过了。另外duration、gctime以及peak execution memory,这些metrics的含义,要么已经讲过,要么过于简单、无需解释。因此,对于这三个指标,咱们也不多写笔墨。
- 这里要特别值得关注的,是spill(memory)和spill(disk)这两个指标。spill,也即溢出数据,他指的是因内存数据结构(partitionpairbuffer、appendonlymap,等等)空间受限,而腾挪出去的数据。spill(memory)表示的是,这部分数据在内存的存储大小,而spill(disk)表示的是,这些数据在磁盘中的大小。
- 因此,用spill(memory)除以spill(disk),就可以得到“数据膨胀系数”的近似值,我们把他记为explosion ratio。有了explosion ratio,对于一份存储在磁盘中的数据,我们就可以预估他在内存中的存储大小,从而准确的把我数据的内存消耗。
2.2.3.4 task
- 介绍完粗粒度的summary metrics,接下来,我们再来说说细粒度的tasks。实际上,task的不少指标,是预summary高度重合的,如下图所示。同理,这些重合的metrics,咱们不在赘述,你可以参考summary的部分,来理解这些metrics。唯一的区别,就是这些指标是针对每一个task进行度量的。
metrics | 含义 |
---|---|
locality level | 本地性级别 |
logs | 执行日志 |
errors | 执行错误细节 |
- 可以看到新指标并不多,这里最值得关注的,是locality level,也就是本地性级别。在调度系统重,我们讲过,每个task都有自己的本地性倾向。结合本地性倾向,调度系统会把tasks调度到适度的executors或者是计算节点,尽可能保证数据不动,代码动。
- logs与errors属于spark ui的三级入口,他们是tasks的执行日志,详细记录了tasks在执行过程中的运行时状态。一般来说,我们不需要深入到三级入口进行debug。errors列提供的报错信息,往往足以让我们迅速的定位问题所在。
思考问题:为什么有跳过的skip stage或者task?
skipped stages表示已经执行过了。
rdd rdd.filter.map => transformation
rdd.cache
count reduce的操作 两个都是action
count
执行reduce的时候不需要重新计算他的transformation
3 Spark 参数调优与Spark sql调优
3.1 spark数据倾斜的解决方案
产生数据倾斜的主要原因是在shuffle中不同的key对应的数据量不同,导致不同的task分配的数据量不均衡。
- 提高shuffle的并行度
- 使用随机数前缀。将相同的key增加不同的前缀,使这些相同的key分散到不同的task进行处理。
- 缺点:对内存要求高
- reduce join转为map join。适用于两个表里面一个表比较小的场景,在map端进行小表广播,用map算子实现与join相同的效果。
- 优点:不需要shuffle
- 缺点:只只用于大表join小表的情况。
- 过滤少数导致数据倾斜的key
- 缺点:数据量多的key没有业务使用上的含义。场景单一
- 优点:实现简单。
- 在hive进行预处理,然后将数据传给hive
- 使用两阶段预聚合操作。
- 先局部聚合,再做全局聚合。适用于reducebykey groupby的场景。
- 优点:显著提升spark性能
- 缺点:适用于最固定的场景