Spark原理及调优

news2024/9/22 14:32:39

spark官档

  • hints:https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html
  • 调优参数:https://spark.apache.org/docs/latest/sql-performance-tuning.html#join-strategy-hints-for-sql-queries
  • 作者几乎把所有的RDD API查了个遍,仔细研究每一个算子的含义和运行原理,汇总不同算子的适用场景,总结哪些算子会引入Shuffle,对比同类功能算子的差异与优劣势,比如map和mapPartitions,再比如groupByKey、reduceByKey和aggregateByKey。
    除此之外,Spark官网的Configuration页面作者也查阅了无数次,汇总与性能有关的配置项,不停地做对比实验,比较不同参数配置下的执行性能。遇到与认知不符的实验结果,就再回去反复咀嚼Spark的核心原理,从RDD和调度系统,到内存管理和存储系统,再到内存计算和Shuffle,如此往复,乐此不疲。
    RDD、DAG、调度系统、存储系统和内存管理

01性能调优的必要性

Spark天生的执行效率再高,也需要你针对具体的应用场景和运行环境进行性能调优。仅仅实现业务逻辑常常是不够的。
第一个案例讲的是,在函数被频繁调用的情况下,函数里面一个简单变量所引入的性能开销被成倍地放大。第二个例子讲的是,不恰当的实现方式导致海量数据被反复地扫描成百上千次(foreach collect 大表)。
性能调优的收益显而易见:一来可以节约成本,尤其是按需付费的云上成本,更短的执行时间意味着更少的花销;二来可以提升开发的迭代效率,尤其是对于从事数据分析、数据科学、机器学习的同学来说,更高的执行效率可以更快地获取数据洞察,更快地找到模型收敛的最优解。因此你看,性能调优不是一件锦上添花的事情,而是开发者必须要掌握的一项傍身技能

02 性能调优的本质

从硬件资源消耗的角度切入,往往是个不错的选择。我们都知道,从硬件的角度出发,计算负载划分为计算密集型、内存密集型和IO密集型。如果我们能够明确手中的应用属于哪种类型,自然能够缩小搜索范围,从而更容易锁定性能瓶颈。
性能调优的本质我们可以归纳为4点。

  1. 性能调优不是一锤子买卖,补齐一个短板,其他板子可能会成为新的短板。因此,它是一个动态、持续不断的过程。
  2. 能调优的手段和方法是否高效,取决于它针对的是木桶的长板还是瓶颈。针对瓶颈,事半功倍;针对长板,事倍功半。
  3. 性能调优的方法和技巧,没有一定之规,也不是一成不变,随着木桶短板的此消彼长需要相应的动态切换。
  4. 性能调优的过程收敛于一种所有木板齐平、没有瓶颈的状态。

性能调优的方法与手段

  1. 应用代码
    开发阶段都有哪些常规操作、常见误区,从而尽量避免在代码中留下性能隐患。
  2. Spark配置项
    Spark官网上罗列了近百个配置项,看得人眼花缭乱,但并不是所有的配置项都和性能调优息息相关,因此我们需要对它们进行甄别、归类。

系统化的性能调优方法论,归纳为4条:

  1. 通过不同的途径如专家经验或运行时诊断来定位性能瓶颈;
  2. 从不同场景(典型场景)、不同视角(硬件资源)出发,综合运用不同层面(应用代码、Spark配置项)的调优手段和方法;
  3. 随着性能瓶颈的此消彼长,动态灵活地在不同层面之间切换调优方法;
  4. 让性能调优的过程收敛于不同硬件资源在运行时达到一种平衡、无瓶颈的状态。
    性能调优的最终目的,是在所有参与计算的硬件资源之间寻求协同与平衡,让硬件资源达到一种平衡、无瓶颈的状态。

03 RDD 弹性分布式数据集

一 RDD为何如此重要

首先,RDD作为Spark对于分布式数据模型的抽象,是构建Spark分布式内存计算引擎的基石。很多Spark核心概念与核心组件,如DAG和调度系统都衍生自RDD。因此,深入理解RDD有利于你更全面、系统地学习Spark的工作原理。
其次,尽管RDD API使用频率越来越低,绝大多数人也都已经习惯于DataFrame和Dataset API,但是,无论采用哪种API或是哪种开发语言,你的应用在Spark内部最终都会转化为RDD之上的分布式计算。换句话说,如果你想要在运行时判断应用的性能瓶颈,前提是你要对RDD足够了解。还记得吗?定位性能瓶颈是Spark性能调优的第一步。
深入理解RDD有利于你跳出单机思维模式,避免在应用代码中留下性能隐患。

二 深入理解RDD

从薯片的加工流程看RDD:

  1. 为了充分利用每一颗土豆、降低生产成本,工坊使用 3 条流水线来同时生产 3 种不同尺寸的桶装薯片。3 条流水线可以同时加工 3 颗土豆,每条流水线的作业流程都是一样的,分别是清洗、切片、烘焙、分发和装桶。
  2. 土豆工坊的每条流水线就像是分布式环境中的计算节点;
  3. 不同的食材形态,如带泥的土豆、土豆切片、烘烤的土豆片等等,对应的就是RDD;partitions
  4. 每一种食材形态都会依赖上一种形态,如烤熟的土豆片依赖上一个步骤的生土豆切片。这种依赖关系对应的就是RDD中的dependencies属性;生成该RDD所依赖的父RDD不同环节的加工方法对应RDD的compute属性;
    生成该RDD的计算接口
  5. 同一种食材形态在不同流水线上的具体实物,就是RDD的partitions属性;
    RDD的所有数据分片实体
  6. 食材按照什么规则被分配到哪条流水线,对应的就是RDD的partitioner属性。
    7.划分数据分片的规则

三 RDD的核心特征和属性

横向属性:分布式

在分布式运行环境中,RDD封装的数据在物理上散落在不同计算节点的内存或是磁盘中,这些散落的数据被称“数据分片”,RDD的分区规则决定了哪些数据分片应该散落到哪些节点中去。RDD的partitions属性对应着RDD分布式数据实体中所有的数据分片,而partitioner属性则定义了划分数据分片的分区规则,如按哈希取模或是按区间划分等。

纵向属性:容错性

在Spark中,任何一个 RDD 都不是凭空产生的,每个 RDD 都是基于某种计算逻辑从某个“数据源”转换而来。RDD的dependencies属性记录了生成RDD 所需的“数据源”,术语叫做父依赖(或父RDD),compute方法则封装了从父 RDD到当前RDD转换的计算逻辑。

04 到底啥叫“内存计算”

第一层含义:分布式数据缓存

只有需要频繁访问的数据集才有必要cache,对于一次性访问的数据集,cache不但不能提升执行效率,反而会产生额外的性能开销,让结果适得其反。

第二层含义:Stage内的流水线式计算模式

什么是DAG

  1. 在Spark的DAG中,顶点是一个个RDD,边则是RDD之间通过dependencies属性构成的父子关系。
  2. 从开发者的视角出发,DAG的构建是通过在分布式数据集上不停地调用算子来完成的。

DAG的划分

  1. 如果用一句话来概括从DAG到Stages的转化过程,那应该是:以Actions算子为起点,从后向前回溯DAG,以Shuffle操作为边界去划分Stages。
  2. DAG->在分布式环境中执行的4个环节
    回溯DAG并划分Stages
    在Stages中创建分布式任务
    分布式任务的分发
    分布式任务的执行

Stage中的内存计算

  1. 常规计算:缓存中间数据
    MapReduce提供两类计算抽象,分别是Map和Reduce:Map抽象允许开发者通过实现map 接口来定义数据处理逻辑;Reduce抽象则用于封装数据聚合逻辑。MapReduce计算模型最大的问题在于,所有操作之间的数据交换都以磁盘为媒介。例如,两个Map操作之间的计算,以及Map与Reduce操作之间的计算都是利用本地磁盘来交换数据的。不难想象,这种频繁的磁盘I/O必定会拖累用户应用端到端的执行性能。
  2. 同一Stage内所有算子融合为一个函数,Stage的输出结果由这个函数一次性作用在输入数据集而产生。

05 调度系统:数据不动代码动到底是什么意思

1. 案例:对用户兴趣特征做Label Encoding

在第1种实现方式中,函数是一个接收两个形参的普通标量函数,Dataset调用这个函数在千亿级样本上做Label encoding。
第2种实现方式,2个计算步骤被封装到一个高阶函数中。用户代码先在Driver端用模板文件调用这个高阶函数,完成第一步计算建立字典的过程,同时输出一个只带一个形参的标量函数,这个标量函数内携带了刚刚建好的映射字典。最后,Dataset将这个标量函数作用于千亿样本之上做Label encoding。

2. Spark调度系统的工作流程包含如下5个步骤:

将DAG拆分为不同的运行阶段Stages;
创建分布式任务Tasks和任务组TaskSet;
获取集群内可用的硬件资源情况;
按照调度规则决定优先调度哪些任务/组;
依序将分布式任务分发到执行器Executor。

3. 调度系统中的核心组件有哪些?

需求端 - DAGScheduler

把用户DAG拆分为Stages
在Stage内创建计算任务Tasks,这些任务囊括了用户通过组合不同算子实现的数据转换逻辑。然后,执行器Executors接收到Tasks,会将其中封装的计算函数应用于分布式数据分片,去执行分布式的计算过程。

供给端 - SchedulerBackend

对内,SchedulerBackend用ExecutorData对Executor进行资源画像;
对外,SchedulerBackend以WorkerOffer为粒度提供计算资源,WorkerOffer封装了Executor ID、主机地址和CPU核数,用来表示一份可用于调度任务的空闲资源

中介平台 - TaskScheduler

TaskScheduler的职责是,基于既定的规则与策略达成供需双方的匹配与撮合。
一个是不同Stages之间的调度优先级: 对于这种Stages之间的任务调度,TaskScheduler提供了2种调度模式,分别是FIFO(先到先得)和FAIR(公平调度)
一个是Stages内不同任务之间的调度优先级。
任务自带调度意愿,它通过本地性级别告诉TaskScheduler自己更乐意被调度到哪里去。
Spark调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。

06 存储系统:是时间换空间还是空间换时间?

1 Spark存储系统是为谁服务的?

RDD缓存:将RDD以缓存的形式物化到内存或磁盘的过程。

对于一些计算成本和访问频率都比较高的RDD来说,缓存有两个好处:一是通过截断DAG,可以降低失败重试的计算开销;二是通过对缓存内容的访问,可以有效减少从头计算的次数,从整体上提升作业端到端的执行性能。

Shuffle中间文件:Shuffle Map阶段的输出结果,这些结果会以文件的形式暂存于本地磁盘

在集群范围内,Reducer想要拉取属于自己的那部分中间数据,就必须要知道这些数据都存储在哪些节点,以及什么位置。而这些关键的元信息,正是由Spark存储系统保存并维护的。因此你看,没有存储系统,Shuffle是玩不转的。

广播变量:往往用于在集群范围内分发访问频率较高的小数据

利用存储系统,广播变量可以在Executors进程范畴内保存全量数据。

2 存储系统的基本组件有哪些?

BlockManager:Executors端负责统一管理和协调数据的本地存取与跨节点传输

对外,BlockManager与Driver端的BlockManagerMaster通信,不仅定期向BlockManagerMaster汇报本地数据元信息,还会不定时按需拉取全局数据存储状态。另外,不同Executors的BlockManager之间也会以Server/Client模式跨节点推送和拉取数据块。
对内,BlockManager通过组合存储系统内部组件的功能来实现数据的存与取、收与发
BlockManager正是利用MemoryStore和DiskStore来分别管理数据在内存和磁盘中的存取。

  1. 广播变量的全量数据存储在Executors进程中,因此它由MemoryStore管理。
  2. Shuffle中间文件往往会落盘到本地节点,所以这些文件的落盘和访问就要经由DiskStore。
  3. RDD缓存会稍微复杂一些,由于RDD缓存支持内存缓存和磁盘缓存两种模式,因此我们要视情况而定,缓存在内存中的数据会封装到MemoryStore,缓存在磁盘上的数据则交由DiskStore管理。

数据的存储形式

  1. 对象值(Object Values)和字节数组(Byte Array)
  2. 序列化和反序列化
  3. 对象值和字节数组二者之间存在着一种博弈关系,也就是所谓的“以空间换时间”和“以时间换空间”,两者之间该如何取舍,我们还是要看具体的应用场景。核心原则就是:如果想省地儿,你可以优先考虑字节数组;如果想以最快的速度访问对象,还是对象值更直接一些。
  4. DiskStore只能存储序列化后的字节数组,凡是落盘的东西,都需要先进行序列化。

透过RDD缓存看MemoryStore

  1. MemoryStore同时支持存储对象值和字节数组这两种不同的数据形式
  2. 统一采用MemoryEntry数据抽象对它们进行封装,实现类有两个 :DeserializedMemoryEntry 封装原始对象值 Array[T];SerializedMemoryEntry 封装序列化之后的字节数组 ByteBuffe
  3. LinkedHashMap[BlockId, MemoryEntry]:得益于MemoryEntry对于对象值和字节数组的统一封装,MemoryStore能够借助一种高效的数据结构来统一存储与访问数据块:LinkedHashMap[BlockId, MemoryEntry],即 Key 为BlockId,Value 是MemoryEntry的链式哈希字典。 在这个字典中,一个Block对应一个MemoryEntry。 在逻辑关系上,RDD的数据分片与存储系统的Block一一对应,也就是说一个RDD数据分片会被物化成一个内存或磁盘上的Block。
  4. RDD缓存步骤
    第一步:Unroll Memory,把RDD迭代器展开为数据值,然后把这些数据值暂存到一个叫做ValuesHolder的数据结构里。
    第二步:Transfer,为了节约存储空间,我们需要调用toArray或者toBuffer函数,将RDD迭代器展开的数据值转换成MemoryEntry数据结构
    第三步:存储数据的元数据信息。这些包含RDD数值的MemoryEntry以及其对应的blockId会被存入LinkedHashMap<key=blockId,value=MemoryEntry引用的>的链式数据字典中
    📝1. “如果内存空间不足以容纳整个RDD怎么办?”很简单,强行把大RDD塞进有限的内存空间肯定不是明智之举,所以Spark会按照LRU策略逐一清除字典中最近、最久未使用的Block,以及其对应的MemoryEntry。相比频繁的展开、物化、换页所带来的性能开销,缓存下来的部分数据对于RDD高效访问的贡献可以说微乎其微。
    📝2. 当所有RDD都被转换成MemoryEntry并且将元数据信息存储到LinkedHashMap中便完成了RDD的数据缓存到内存中的过程。

3 透过Shuffle看DiskStore

DiskStore存取的本质在字节序列和磁盘文件之间的转换
DiskBlockManager用于逻辑数据块Block与磁盘文件系统中物理文件的对应关系,每个Block都对应一个磁盘文件
Shuffle过程

  1. Shuffle write
    有3类结果文件:temp_shuffle_XXX、shuffle_XXX.data和shuffle_XXX.index
    在Shuffle write的不同阶段,Shuffle manager通过BlockManager调用DiskStore的putBytes方法将数据块写入文件。文件由DiskBlockManager创建,文件名就是putBytes方法中的Block ID,这些文件会以“temp_shuffle”或“shuffle”开头,保存在spark.local.dir目录下的子目录里。
  2. Shuffle read
    在Shuffle read阶段,Shuffle manager再次通过BlockManager调用DiskStore的getBytes方法,读取data文件和index文件,将文件内容转化为数据块,最终这些数据块会通过网络分发到Reducer端进行聚合计算。

4. 思考

结合RDD数据存储到MemoryStore的过程,你能推演出通过MemoryStore通过getValues/getBytes方法去访问RDD缓存内容的过程吗?
RDD缓存的逆过程,细节待考证
参考RDD缓存存储的过程,你能推演出广播变量存入MemoryStore的流程吗?
我的回答:driver->Executor,变量->MemoryEntry->LinkedHashMap,待考证

07 内存管理基础:Spark如何高效利用有限的存储空间?

1 内存的管理模式

堆内内存:堆内内存的申请与释放统一由JVM代劳
堆外内存:Spark通过调用Unsafe的allocateMemory和freeMemory方法直接在操作系统内存中申请、释放内存空间。这样的内存管理方式自然不再需要垃圾回收机制,也就免去了它带来的频繁扫描和回收引入的性能开销。更重要的是,空间的申请与释放可以精确计算,因此Spark对堆外可用内存的估算会更精确,对内存的利用率也更有把握。

2 内存区域的划分

  1. 堆内内存Spark.executor.memory
    执行和缓存的内存空间,包括execution memory和storage memory
    User Memory的内存空间,它用于存储开发者自定义数据结构。
    Reserved Memory,它被用来存储各种Spark内部对象,例如存储系统中的BlockManager、DiskBlockManager等等。
  2. 堆外内存Spark.memory.offHeap.size
    Execution Memory:一块用于执行分布式任务,如Shuffle、Sort和Aggregate等操作
    Storage Memory:一块用于缓存RDD和广播变量等数据
  3. 执行与缓存内存
    数据集缓存和Stage内的流水线计算
    Spark1.6之后推出了统一内存管理模式:统一内存管理模式指的是Execution Memory和Storage Memory之间可以相互转化;执行内存要比缓存内存具有更高的优先级
    执行内存主要有两项任务:Shuffle map 阶段的数据转换、映射、排序、聚合和归并等操作;Shuffle reduce 阶段完成数据的排序和聚合
    抢占内存的规则:如果对方的内存空间有空闲,双方就都可以抢占;对于RDD缓存任务抢占的执行内存,当执行任务有内存需要时,RDD缓存任务必须立即归还抢占的内存,涉及的RDD缓存数据要么落盘、要么清除;对于分布式计算任务抢占的Storage Memory内存空间,即便RDD缓存任务有收回内存的需要,也要等到任务执行完毕才能释放。

3从代码看内存消耗

内存区域分为Reserved Memory、User Memory、Execution Memory和Storage Memory。
其中,Reserved Memory用于存储Spark内部对象,User Memory用于存储用户自定义的数据结构,Execution Memory用于分布式任务执行,而Storage Memory则用来容纳RDD缓存和广播变量。

08 应用开发三原则:如何拓展自己的开发边界

一 坐享其成

1 如何利用钨丝计划的优势?

它的优势是,可以通过对数据模型与算法的优化,把Spark应用程序的执行性能提升一个数量级。
在数据结构方面,Tungsten自定义了紧凑的二进制格式。存储效率+计算开销
Tungsten利用Java Unsafe API开辟堆外(Off Heap Memory)内存来管理对象。对内存占用的估计更精确,不需要像Java heap 那样反复进行垃圾回收
Tungsten用全阶段代码生成(Whol Stage Code Generation)取代火山迭代模型,提高CPU的使用效率

2 如何利用AQE的优势?

自适应查询执行Adaptive Query Execution
Spark SQL的优化过程可以大致分为语法分析、语义解析、逻辑计划和物理计划这几个环节。Spark3.0之前仅仅在编译时基于规则和策略遍历AST查询语法树,来优化逻辑计划,一旦基于最佳逻辑计划选定最佳物理执行计划,Spark就会严格按照这个物理执行计划机械地执行这个计划。AQE可以让Spark在运行时的各个阶段,周期性地动态的调整前面的逻辑计划,然后根据优化的逻辑执行计划重新选定最优的物理执行计划,从而调整运行时后续的执行计划。

AQE的改进方面

1 自动分区合并
AQE会自动检测过小的数据分区,并对它们自动合并
2 数据倾斜
处理不当则很容易出现OOM,这时候Spark会对数据集进行自动加盐,通过把Key打散来均衡数据在不同节点上的分布。
3 Join策略调整
当两个有序表要进行数据关联的时候,Spark SQL在优化过程中总会选择Sort Merge Join的实现方式。但有一种情况是,其中一个表在排序前需要对数据进行过滤,过滤后的表小到足可以由广播变量容纳。这个时候,Broadcast Join比Sort Merge Join的效率更高。但是,3.0版本之前的优化过程是静态的,做不到动态切换Join方式。
针对这种情况,AQE会根据运行时的统计数据,去动态地调整Join策略,把之前敲定的Sort Merge Join改为Broadcast Join,从而改善应用的执行性能。

参数调整

spark.sql.adaptive.enabled:是否启动Aqe

再比如,使用Parquet、ORC等文件格式,去坐享谓词下推带来的数据读取效率。

二 能省则省、能拖则拖

1 what:省的是数据量,拖的是shuffle操作

2 why:

数据量越少就需要越少的计算负载,越低的计算负载就会有更快的处理速度
shuffle操作执行的越晚,需要落盘和分发的数据就越少,更低的磁盘IO和网络开销就意味着更高的执行效率。

2 how

  1. 尽量把能节省数据扫描量和数据处理量的操作往前推;
  2. 尽力消灭掉Shuffle,省去数据落盘与分发的开销;
  3. 如果不能干掉Shuffle,尽可能地把涉及Shuffle的操作拖到最后去执行。

3 跳出单机思维

举一个例子
为了生成训练样本,我们需要对两张大表进行关联。根据“能省则省、能拖则拖”原则,我们想把其中一张表变小,把Shuffle Join转换为Broadcast Join,这样一来就可以把Shuffle的环节省掉了。
尽管两张表的尺寸都很大,但右表的Payload只有一列,其他列都是Join keys,所以只要我们把Join keys干掉,右表完全可以放到广播变量里。但是,直接干掉Join keys肯定不行,因为左右表数据关联是刚需。那么,我们能否换个方式把它们关联在一起呢?
受Hash Join工作原理的启发,我们想到可以把所有的Join keys拼接在一起,然后用哈希算法生成一个固定长度的字节序列,把它作为新的Join key。这样一来,右表中原始的Join keys就可以拿掉,右表的尺寸也可以大幅削减,小到可以放到广播变量里。同时,新的Join key还能保证左右表中数据的关联关系保持不变,一举两得。
为了对拼接的Join keys进行哈希运算,我们需要事先准备好各种哈希算法,然后再转换左、右表。接到这样的需求之后,同学小A立马在右表上调用了map算子,并且在map算子内通过实例化Util类获取哈希算法,最后在拼接的Join keys上进行哈希运算完成了转换。

4 归纳这件事的意义和价值。

我们之所以把各种开发技巧归纳为开发原则,一方面是遵循这些原则,你能在不知不觉中避开很多性能上的坑。但更重要的是,从这些原则出发,向外推演,我们往往能发现更多的开发技巧,从而能拓展自己的“常规操作”边界,做到举一反三,真正避免“调优思路枯竭”的窘境。

09-10 调优配置项速查

一 配置项的分类

在Spark分布式计算环境中,负责计算负载的主要是Execution,而driver主要负责分布式调度,调优空间有限

官网全量配置项的分类

  1. 硬件资源类
    与CPU、内存、磁盘有关的资源项
    平衡不同硬件资源的利用率
  2. Spark shuffle类
  3. Spark SQL类
    无论是在Streaming、Mllib、Graph等子框架中,还是在PySpark中,只要你使用DataFrame API,Spark在运行时都会使用Spark SQL做统一优化。因此,我们需要梳理出一类配置项,去充分利用Spark SQL的先天性能优势。

二 硬件资源类调优配置项

1 哪些配置项与CPU设置有关

  1. 集群:spark.cores.max
    集群范围内满配CPU核数
  2. Executor:spark.executor.cores
    单个Executor内CPU核数
  3. 计算任务:spark.task.cores
    单个任务消耗的CPU核数
  4. 并行度
    并行度的出发点是数据,它明确了数据划分的粒度。
    spark.default.parallelism
    未指定分区数时的默认并行度
    spark.sql.shuffle.partitions
    数据关联、聚合操作中Reducer的并行度

2 哪些配置项与内存设置有关

  1. 内存的基础配置项
    spark.executor.memory
    单个executor内堆内内内存的大小
    spark.memory.offHeap.size
    单个executor内堆外内存的大小,只有在spark.memory.offHeap.enabled设置为true时才能生效
    spark.memory.fraction
    堆内内存中,执行内存和缓存的比例,1-userMemory所占的比例
    spark.memory.storageFraction
    堆内内存用于缓存RDD的内存占比
    spark.rdd.compress
    RDD缓存是否压缩,默认不压缩
  2. 堆外与堆内的平衡
    堆内内存
    对于需要处理的数据集,如果数据模式比较扁平,而且字段多是定长数据类型,就更多地使用堆外内存。相反地,如果数据模式很复杂,嵌套结构或变长字段很多,就更多采用JVM堆内内存会更加稳妥。
    User Memory与Spark可用内存如何分配?
    当在JVM内平衡Spark可用内存和User Memory时,你需要考虑你的应用中类似的自定义数据结构多不多、占比大不大?然后再相应地调整两块内存区域的相对占比
    举例子label Encoding
    Execution Memory该如何与Storage Memory平衡?
    在打算把大面积的内存空间用于RDD cache之前,你需要衡量这么做可能会对执行效率产生的影响。
    缓存密集型
    兼顾RDD访问和应用的整体执行效率
    首先,你可以放弃对象值的缓存方式,改用序列化的缓存方式,序列化会把多个对象转换成一个字节数组。这样,对象个数的问题就得到了初步缓解。
    其次,我们可以调节spark.rdd.compress这个参数。RDD缓存默认是不压缩的,启用压缩之后,缓存的存储效率会大幅提升,有效节省缓存内存的占用,从而把更多的内存空间留给分布式任务执行。

3 哪些配置项与磁盘设置有关

spark.local.dir:这个参数允许开发者设置磁盘目录,该目录用于存储RDD cache落盘数据块和Shuffle中间文件。

三 Shuffle类配置项

  1. spark.shuffle.file.buffer:map输出端写入缓冲区大小,我们可以通过spark.shuffle.file.buffer来扩大写缓冲区的大小,缓冲区越大,能够缓存的落盘数据越多,Spark需要刷盘的次数就越少,I/O效率也就能得到整体的提升。
  2. spark.reducer.maxSizeInFlight:
    reduce输入端的读取缓冲区的大小:我们就可以通过spark.reducer.maxSizeInFlight配置项控制Reduce端缓冲区大小,来调节Shuffle过程中的网络负载。
    在Reduce阶段,因为Spark会通过网络从不同节点的磁盘中拉取中间文件,它们又会以数据块的形式暂存到计算节点的读缓冲区(Read Buffer)。缓冲区越大,可以暂存的数据块越多,在数据总量不变的情况下,拉取数据所需的网络请求次数越少,单次请求的网络吞吐越高,网络I/O的效率也就越高。

四 Spark SQL大类配置项

1 开启AQE

AQE功能默认是禁用的,想要使用这些特性,我们需要先通过配置项spark.sql.adaptive.enabled来开启AQE

2 哪些配置项与自动分区合并有关?

AQE事先并不判断哪些分区足够小,而是按照分区编号进行扫描,当扫描量超过“目标尺寸”时,就合并一次。

  1. spark.sql.adaptive.advisoryPartitionSizeInBytes
    由开发者指定分区合并后的推荐尺寸
  2. spark.sql.adaptive.coalescePartitions.minPartitionNum
    分区合并后,数据值的分区数不能低于该值
    这个参数的目的就是避免并行度过低导致CPU资源利用不充分。

3 哪些配置项与自动数据倾斜处理有关?

  1. spark.sql.adaptive.skewJoin.enabled
  2. spark.sql.adaptive.skewJoin.skewedPartitionFactor
  3. spark.sql.adative.skewJoin.skewedPartitionThresholdInBytes
    首先,分区尺寸必须要大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes参数的设定值,才有可能被判定为倾斜分区。
  4. spark.sql.adaptive.advisoryPartitionSizeInBytes
    以字节为单位,拆分倾斜分区的数据粒度

4 哪些配置项与Join策略调整有关?

  1. broadcast join
    Broadcast Join的精髓在于“以小博大”,它以广播的方式将小表的全量数据分发到集群中所有的Executors,大表的数据不需要以Join keys为基准去Shuffle,就可以与小表数据原地进行关联操作
  2. AQE动态Join策略调整可以在运行时将Shuffle Joins降级为Broadcast Join,同时,运行时的数据量估算要比编译时准确得多,因此相比静态优化会更可靠。不过,需要我们注意的是,Shuffle过后非空分区占比要小于nonEmptyPartitionRatioForBroadcastJoin才能触发Broadcast Join的降级优化。
    在这里插入图片描述

11 shuffle的工作原理,为什么shuffle是一时无两的性能杀手

一 用仙女三花的策略类比shuffle的map和reduce阶段

从老师分发花朵,到5个小同学把花朵按照颜色归类,对应的是Shuffle的Map阶段
也可以叫shuffle write
大家把归类的花朵分发到相应的课桌,这个过程类似于Shuffle的Reduce阶段。
也可以叫shuffle read

二 Map阶段是如何输出中间文件的?

1首先以结果为导向,先看一下map阶段输出是什么?

Map阶段最终生产的数据会以中间文件的形式物化到磁盘中,这些中间文件就存储在spark.local.dir设置的文件目录里。
中间文件包含两种类型
一类是后缀为data的数据文件,存储的内容是Map阶段生产的待分发数据;
另一类是后缀为index的索引文件,它记录的是数据文件中不同分区的偏移地址。
这里的分区是指Reduce阶段的分区,因此,分区数量与Reduce阶段的并行度保持一致。

2 Map是怎么输出文件的?

用groupByKey实现“仙女散花”

  1. 对于分片中的数据记录,逐一计算其目标分区,并将其填充到PartitionedPairBuffer;
    “PartitionedPairBuffer”,它本质上就是一种数组形式的缓存结构。
  2. PartitionedPairBuffer填满后,如果分片中还有未处理的数据记录,就对Buffer中的数据记录按(目标分区ID,Key)进行排序,将所有数据溢出到临时文件,同时清空缓存;
    Spark需要一种计算机制,来保障在数据总量超出可用内存的情况下,依然能够完成计算。这种机制就是:排序、溢出、归并。
  3. 重复步骤1、2,直到分片中所有的数据记录都被处理;
  4. 对所有临时文件和PartitionedPairBuffer归并排序,最终生成数据文件和索引文件。

3用reduceByKey实现“仙女散花”

在计算的过程中,reduceByKey采用一种叫做PartitionedAppendOnlyMap的数据结构来填充数据记录。
这个数据结构是一种Map,而Map的Value值是可累加、可更新的。
PartitionedAppendOnlyMap非常适合聚合类的计算场景,如计数、求和、均值计算、极值计算等等。
依靠高效的内存数据结构、更少的磁盘文件、更小的文件尺寸,我们就能大幅降低了Shuffle过程中的磁盘和网络开销。

4Reduce阶段是如何进行数据分发的?

Reduce Task通过网络拉取中间文件的过程,实际上就是不同Stages之间数据分发的过程。
Shuffle中数据分发的网络开销,会随着Map Task与Reduce Task的线性增长,呈指数级爆炸。m*n

三 性能杀手

1 对于Shuffle来说,它需要消耗所有的硬件资源

无论是PartitionedPairBuffer、PartitionedAppendOnlyMap这些内存数据结构,还是读写缓冲区,都会消耗宝贵的内存资源;
由于内存空间有限,因此溢出的临时文件会引入大量磁盘I/O,而且,Map阶段输出的中间文件也会消耗磁盘;
呈指数级增长的跨节点数据分发,带来的网络开销更是不容小觑。

2Shuffle消耗的不同硬件资源之间很难达到平衡。

磁盘和网络的消耗是Shuffle中必需的环节。

12-13 广播变量

1 如何理解广播变量?

分发到task还是executor?
我们需要降低数据结构分发的频次。
广播变量是一种分发机制,它一次性封装目标数据结构,以Executors为粒度去做数据分发
在广播变量的运行机制下,封装成广播变量的数据,由Driver端以Executors为粒度分发,每一个Executors接收到广播变量之后,将其交给BlockManager管理。

由于广播变量携带的数据已经通过专门的途径存储到BlockManager中,因此分发到Executors的Task不需要再携带同样的数据。

				普通变量
					任务分发
						以task为粒度,driver->task
					创建广播变量
						以executor为粒度,driver->executor
					读取广播变量
						executor的公共仓库blockmanager->task
		广播分布式数据集
			分布式数据集的数据源不在Driver端,而是来自所有的Executors
				Executors中的每个分布式任务负责生产全量数据集的一部分,也就是图中不同的数据分区。
			两步走
				步骤1就是Driver从所有的Executors拉取这些数据分区,然后在本地构建全量数据。
				步骤2与从普通变量创建广播变量的过程类似。 Driver把汇总好的全量数据分发给各个Executors,Executors将接收到的全量数据缓存到存储系统的BlockManager中。
			如何用广播变量克制Shuffle?
				Shuffle Join
					第一步就是对参与关联的左右表分别进行Shuffle
					第二步就是在同一个Executors内,Reduce task就可以对userID一致的记录进行关联操作。
				克制Shuffle的方式
					我们在做数据关联的时候,把Shuffle Joins转换为Broadcast Joins,就可以用小表广播来代替大表的全网分发,真正做到克制Shuffle。
		如何让Spark SQL选择Broadcast Joins?
			利用配置项强制广播
				spark.sql.autoBroadcastJoinThreshold
				对于参与Join的两张表来说,任意一张表的尺寸小于设置的大小,Spark就在运行时采用Broadcast Joins的实现方式去做数据关联。
				使用广播阈值配置项让Spark优先选择Broadcast Joins的关键,就是要确保至少有一张表的存储尺寸小于广播阈值。
				第一步,把要预估大小的数据表缓存到内存,比如直接在DataFrame或是Dataset上调用cache方法;第二步,读取Spark SQL执行计划的统计数据。
			利用API强制广播
				用Join Hints强制广播
				用broadcast函数强制广播
			广播变量不是银弹
				首先,从性能上来讲,Driver在创建广播变量的过程中,需要拉取分布式数据集所有的数据分片。
				其次,从功能上来讲,并不是所有的Joins类型都可以转换为Broadcast Joins。
	14 如何高效利用CPU
		协调、平衡,硬件资源达到一种平衡和无瓶颈的状态
		CPU与内存的平衡本质上是什么?
			Spark中CPU与内存的平衡,其实就是CPU与执行内存之间的协同与配比。
			要想平衡CPU与执行内存之间的协同和配比,我们需要使用3类配置参数,它们分别控制着并行度、执行内存大小和集群的并行计算能力。
			否则CPU与执行内存之间的平衡就会被打破,要么CPU工作不饱和,要么OOM内存溢出
		任务并发过程中多个线程抢占内存资源时需要遵循的基本逻辑。
			执行内存抢占规则就是,在同一个Executor中,当有多个(记为N)线程尝试抢占执行内存时,需要遵循2条基本原则:
				执行内存总大小(记为M)为两部分之和,一部分是Execution Memory初始大小,另一部分是Storage Memory剩余空间
				每个线程分到的可用内存有一定的上下限,下限是M/N/2,上限是M/N,也就是均值
		三足鼎立:并行度、并发度与执行内存
			3类配置项
				并行
					并行度指的是为了实现分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。
					并行度可以通过两个参数来设置,分别是spark.default.parallelism和spark.sql.shuffle.partitions。
				并发度
					同一时间内,一个Executor内部可以同时运行的最大任务数量。
					1. Executor的线程池大小由参数spark.executor.cores决定
					2. 每个任务在执行期间需要消耗的线程数由spark.task.cpus配置项给定
					3. 两者相除得到的商就是并发度
				执行内存
					堆内执行内存的初始值
						spark.executor.memory * spark.memory.fraction * (1 - spark.memory.storageFraction)
					堆外执行内存
						spark.memory.offHeap.size * (1 - spark.memory.storageFraction)
					在统一内存管理模式下,在Storage Memory没有被RDD缓存占满的情况下,执行任务可以动态地抢占Storage Memory
					可分配的执行内存总量会随着缓存任务和执行任务的此消彼长,而动态变化。但无论怎么变,可用的执行内存总量,都不会低于配置项设定的初始值。
		CPU低效原因之一:线程挂起
			在给定执行内存总量M和线程总数N的情况下,为了保证每个线程都有机会拿到适量的内存去处理数据,Spark用HashMap数据结构,以(Key,Value)的方式来记录每个线程消耗的内存大小,并确保所有的Value值都不超过M/N。在一些极端情况下,有些线程申请不到所需的内存空间,能拿到的内存合计还不到M/N/2。这个时候,Spark就会把线程挂起,直到其他线程释放了足够的内存空间为止。
			如果分布式数据集的并行度设置得当,因任务调度滞后而导致的线程挂起问题就会得到缓解。
		CPU低效原因之二:调度开销
			数据过于分散会带来严重的副作用:调度开销骤增。
			当数据过于分散,分布式任务数量会大幅增加,但每个任务需要处理的数据量却少之又少,就CPU消耗来说,相比花在数据处理上的比例,任务调度上的开销几乎与之分庭抗礼。
		如何优化CPU利用率?
			首先,在一个Executor中,每个CPU线程能够申请到的内存比例是有上下限的,最高不超过1/N,最低不少于1/N/2,其中N代表线程池大小。
			其次,在给定线程池大小和执行内存的时候,并行度较低、数据分片较大容易导致CPU线程挂起,线程频繁挂起不利于提升CPU利用率,而并行度过高、数据过于分散会让调度开销更显著,也利于提升CPU利用率。
			最后,在给定执行内存M、线程池大小N和数据总量D的时候,想要有效地提升CPU利用率,我们就要计算出最佳并行度P,计算方法是让数据分片的平均大小D/P坐落在(M/N/2, M/N)区间。这样,在运行时,我们的CPU利用率往往不会太差。
	15-17 内存视角
		如何最大化内存的利用效率
			Label Encoding 实例
				User Memory性能隐患
					#size*#thread,在一个executor中可能会存在多个重复数据,map结构
				性能调优
					广播变量
						#size
						小飞机之前需要携带函数findIndex,现在则换成了一位“匿名的乘客”:一个读取广播变量并调用其getOrElse方法的匿名函数。由于这位“匿名的乘客”将大件行李托运给了“联邦广播快递公司”的专用货机,因此,Task小飞机着陆后,没有任何“行李”需要寄存到User Memory。换句话说,优化后的版本不会对User Memory内存区域进行占用,所以第一种实现方式中#threads * #size的内存消耗就可以省掉了。
				Storage Memory规划
					广播变量消耗的是storage memory的内存区域
					明确了Storage Memory内存区域的具体消耗之根据公式:(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction去有针对性地调节相关的内存配置项。
			内存规划两步走
				预估内存占用
					堆内内存划分为Reserved Memory、User Memory、Storage Memory和Execution Memory这4个区域。预留内存固定为300MB,不用理会,其他3个区域需要你去规划。
					第一步,计算User Memory的内存消耗。
						我们先汇总应用中包含的自定义数据结构,并估算这些对象的总大小#size,然后用#size乘以Executor的线程池大小,即可得到User Memory区域的内存消耗#User。
					第二步,计算Storage Memory的内存消耗。
						我们先汇总应用中涉及的广播变量和分布式数据集缓存,分别估算这两类对象的总大小,分别记为#bc、#cache。另外,我们把集群中的Executors总数记作#E。这样,每个Executor中Storage Memory区域的内存消耗的公式就是:#Storage = #bc + #cache / #E。
					第三步,计算执行内存的消耗。
						学习上一讲,我们知道执行内存的消耗与多个因素有关。第一个因素是Executor线程池大小#threads,第二个因素是数据分片大小,而数据分片大小取决于数据集尺寸#dataset和并行度#N。因此,每个Executor中执行内存的消耗的计算公式为:#Execution = #threads * #dataset / #N。
				调整内存配置项
					得到这3个内存区域的预估大小#User、#Storage、#Execution
						spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction)
					spark.memory.fraction可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)
					spark.memory.storageFraction的数值应该参考(#Storage)/(#Storage + #Execution)
					对于Executor堆内内存总大小spark.executor.memory的设置,我们自然要参考4个内存区域的总消耗,也就是300MB + #User + #Storage + #Execution。不过,我们要注意,利用这个公式计算的前提是,不同内存区域的占比与不同类型的数据消耗一致。
		如何有效避免cache滥用
			Cache的工作原理
				缓存的存储级别
					存储介质:内存还是磁盘,或是两者都有。
					存储形式:对象值还是序列化的字节数组,带SER字样的表示以序列化方式存储,不带SER则表示采用对象值。
					副本数量:存储级别名字最后的数字代表拷贝数量,没有数字默认为1份副本。
					最常用的只有两个:MEMORY_ONLY和MEMORY_AND_DISK,它们分别是RDD缓存和DataFrame缓存的默认存储级别
				缓存的计算过程
					这两种存储级别都是先尝试把数据缓存到内存
					无论是RDD还是DataFrame,它们的数据分片都是以迭代器Iterator的形式存储的。
						1. 要把数据缓存下来,我们先得把迭代器展开成实实在在的数据值,这一步叫做Unroll
						2. 展开的对象值暂时存储在一个叫做ValuesHolder的数据结构里,然后转换为MemoryEntry。转换的实现方式是toArray,因此它不产生额外的内存开销,这一步转换叫做Transfer
						3. MemoryEntry和与之对应的BlockID,以Key、Value的形式存储到哈希字典(LinkedHashMap)中
				缓存的销毁过程
					Spark使用了一个巧妙的数据结构:LinkedHashMap,这种数据结构天然地支持LRU算法。
						LinkedHashMap使用两个数据结构来维护数据,一个是传统的HashMap,另一个是双向链表。HashMap的用途在于快速访问,根据指定的BlockId,HashMap以O(1)的效率返回MemoryEntry。双向链表则不同,它主要用于维护元素(也就是BlockId和MemoryEntry键值对)的访问顺序。凡是被访问过的元素,无论是插入、读取还是更新都会被放置到链表的尾部。因此,链表头部保存的刚好都是“最近最少访问”的元素。
					数据清除的过程中,Spark遵循两个基本原则:
						LRU:按照元素的访问顺序,优先清除那些“最近最少访问”的BlockId、MemoryEntry键值对
						兔子不吃窝边草:在清除的过程中,同属一个RDD的MemoryEntry拥有“赦免权”
				退化为MapReduce
				Cache的用武之地
					2条基本原则
						如果RDD/DataFrame/Dataset在应用中的引用次数为1,就坚决不使用Cache
						如果引用次数大于1,且运行成本占比超过30%,应当考虑启用Cache
							运行成本占比 指的是计算某个分布式数据集所消耗的总时间与作业执行时间的比值。
				Cache的注意事项
					.cache是惰性操作,因此在调用.cache之后,需要先用Action算子触发缓存的物化过程。
					只有count才会触发缓存的完全物化
					first、take和show这3个算子只会把涉及的数据物化
					用.unpersist来清理弃用的缓存数据,它是.cache的逆操作
						异步模式:调用unpersist()或是unpersist(False)
						同步模式:调用unpersist(True)
		OOM问题
			哪里会发生OOM?
				发生OOM的LOC(Line Of Code),也就是代码位置在哪?
				OOM发生在Driver端,还是在Executor端?
				如果是发生在Executor端,OOM到底发生在哪一片内存区域?
			Driver端的OOM
				Spark在Driver端的计算任务
					创建小规模的数据集合:使用parallelize、createDataFrame等API创建数据集
					收集计算结果:通过take、show、collect等算子把结果收集到Driver端
				Driver端的两类OOM问题
					创建的数据集超过内存上限
					收集的结果集超过内存上限
						广播变量,collect等原因
			Executor端的OOM
				执行内存分为4个区域:Reserved Memory、User Memory、Storage Memory和Execution Memory
				在Executors中,与任务执行有关的内存区域才存在OOM的隐患。
				User Memory的OOM
					spark.executor.memory * ( 1 - spark.memory.fraction)
				Execution Memory的OOM
					数据量并不是决定OOM与否的关键因素,数据分布与Execution Memory的运行时规划是否匹配才是。
					一旦分布式任务的内存请求超出1/N这个上限,Execution Memory就会出现OOM问题。
						它不仅仅与内存空间大小、数据分布有关,还与Executor线程池和运行时任务调度有关。
					两个常见的实例
						数据膨胀
							表象:磁盘中的数据加载到JVM中会膨胀,数据请求超出1/N上限
							调优思路
								把数据打散,提高数据分片数量、降低数据粒度,让膨胀之后的数据量降到100MB左右
								加大内存配置,结合Executor线程池调整,提高1/N上限到300MB
						数据倾斜
							消除数据倾斜,让所有的数据分片尺寸都不大于100MB
							调整Executor线程池、内存、并行度等相关配置,提高1/N上限到300MB
								维持并发度、并行度不变,增大执行内存设置,提高1/N上限到300MB
								维持并发度、执行内存不变,使用相关配置项来提升并行度将数据打散,让所有的数据分片尺寸都缩小到100MB以内
	18 磁盘视角
		磁盘在功能上的作用
			溢出临时文件
			存储shuffle中间文件
			缓存分布式数据集
				也就是说,凡是带DISK字样的存储模式,都会把内存中放不下的数据缓存到磁盘。
		性能上的价值
			磁盘复用,它指的是Shuffle Write阶段产生的中间文件被多次计算重复利用的过程
			失败重试中的磁盘复用
				磁盘复用的收益之一就是缩短失败重试的路径,在保障作业稳定性的同时提升执行性能。
			ReuseExchange机制下的磁盘复用
				相同或是相似的物理计划可以共享Shuffle计算的中间结果
				reuseExchange的触发条件
					多个查询所依赖的分区规则要与Shuffle中间数据的分区规则保持一致
					多个查询所涉及的字段(Attributes)要保持一致
	19 网络视角:如何降低网络开销?
		数据读写
			Spark的数据存储格式和数据存储系统五花八门,但是不管是什么存储系统,也不管是什么存储格式,数据传输的网络开销都是取决于任务与数据的本地性关系,也就是任务的本地性级别
			任务的本地性级别
				PROCESS_LOCAL:任务与数据同在一个JVM进程中
					内存计算
				NODE_LOCAL:任务与数据同在一个计算节点,数据可能在磁盘上或是另一个JVM进程中
					磁盘IO开销
				RACK_LOCAL:任务与数据不在同一节点,但在同一个物理机架上
					同机架网络开销
				ANY:任务与数据是跨机架、甚至是跨DC(Data Center,数据中心)的关系
					跨机架网络开销
			对于SPARK加HDFS或者SPARK加MongoDB来说,是否会引入网络开销完全取决于他们的部署模式。
				物理上紧耦合,在NODE_LOCAL级别下,Spark用磁盘I/O替代网络开销获取数据;物理上分离,网络开销就无法避免。
				物理上的隔离同样会影响数据的写入效率
					当数据处理完毕,需要将处理结果落盘到外部存储的时候,紧耦合模式下的数据写入会把数据分片落盘到本地节点,避免网络开销。
			私有DC和公有云
				在企业的私有化DC中更容易定制化集群的部署方式,大家通常采用紧耦合的方式来提升数据访问效率。
				在公有云环境中,计算集群在物理上往往和存储系统隔离,因此数据源的读取只能走网络。
		数据处理
			能省则省
				shuffle
					在数据关联的场景中,省去Shuffle最好的办法,就是把Shuffle Joins转化为Broadcast Joins。
					MAP端的聚合
						如果实在没法避免Shuffle,我们要尽可能地在计算中多使用Map端聚合,去减少需要在网络中分发的数据量。
				多副本的RDD缓存
					多副本是为了实现系统的高可用性
						可用性的计算=系统的平均无故障时长/(系统的平均无故障时长+平均故障修复时间)
					当数据副本数大于1的时候,本地数据分片就会通过网络被拷贝到其他节点,从而产生网络开销。
		数据传输
			序列化
				在落盘或是在网络传输之前,数据都是需要先进行序列化的
				SPARK提供的两种序列化方式
					Java Serializer和Kyro Serializer
					通常来说,Kryo Serializer相比Java serializer,在处理效率和存储效率两个方面都会胜出数倍。
			kyro序列话方式
				对于一些自定义的数据结构来说,如果你没有明确把这些类型向Kryo Serializer注册的话,虽然它依然会帮你做序列化的工作,但它序列化的每一条数据记录都会带一个类名字,这个类名字是通过反射机制得到的,会非常长。在上亿的样本中,存储开销自然相当可观。
			配置项
				spark.serializer
					默认使用JavaSerializer
						推荐使用KyroSerializer
				spark.registrationRequired
					是否强制注册自定义类型
						默认为False
						推荐设置为True
	20 RDD和DataFrame:既生瑜,何生亮?
		RDD之痛:优化空间受限
			对于这些高阶算子,开发者需要以Lambda函数的形式自行提供具体的计算逻辑。
			对于Spark Core来说,优化空间受限最主要的影响,莫过于让应用的执行性能变得低下。
		 DataFrame应运而生
			DataFrame和RDD之间的区别
				是否携带Schema(从数据表示的角度看)
				RDD算子多是高阶函数,这些算子允许开发者灵活地实现业务逻辑,表达能力极强。
			DataFrame的表达能力很弱
				它定义了一套DSL(Domain Specific Language)算子,如select、filter、agg、groupBy等等。由于DSL语言是为解决某一类任务而专门设计的计算机语言,非图灵完备,因此,语言表达能力非常有限
				DataFrame中的绝大多数算子都是标量函数(Scalar Functions),它们的形参往往是结构化的数据列(Columns),表达能力也很弱
			DataFrame API最大的意义:它为Spark引擎的内核优化打开了全新的空间
				DataFrame中Schema携带的类型信息,让Spark可以根据明确的字段类型设计定制化的数据结构,从而大幅提升数据的存储和访问效率。
				DataFrame中标量算子确定的计算逻辑,让Spark可以基于启发式的规则和策略,甚至是动态的运行时信息去优化DataFrame的计算过程。
		Spark SQL智能大脑
			Catalyst:执行过程优化
				首先,基于DataFrame确切的计算逻辑,Spark会使用第三方的SQL解析器ANTLR来生成抽象语法树(AST,Abstract Syntax Tree)。
					两类信息
						节点:标量算子(如select、filter)的处理逻辑
						边:数据信息,关系表和数据列
					Unresolved Logical Plan
						边上记录的关系表和数据列仅仅是一些字符串,还没有和实际数据对应起来。这些字符串的来源是DataFrame的DSL查询,Catalyst并不确定这些字段名是不是有效的,更不知道每个字段都是什么类型。
				Catalyst做的第一步优化,就是结合DataFrame的Schema信息,确认计划中的表名、字段名、字段类型与实际数据是否一致。
					Unresolved Logical Plan ➡️Analyzed Logical Plan
				利用启发式的规则和执行策略,Catalyst最终把逻辑计划转换为可执行的物理计划
			Tungsten:数据结构优化
				Tungsten使用定制化的数据结构Unsafe Row来存储数据,Unsafe Row的优点是存储效率高、GC效率高
					Tungsten是用二进制字节序列来存储每一条用户数据的,因此在存储效率上完胜Java Object。
				DataFrame携带的Schema允许Tungsten能够设计这样的数据结构
					Java Object在内存中的结构
						https://juejin.cn/post/6844903832833490957
			Spark SQL端到端的完整优化流程主要包括两个阶段:Catalyst优化器和Tungsten
				总的来说,基于DataFrame简单的标量算子和明确的Schema定义,借助Catalyst优化器和Tungsten,Spark SQL有能力在运行时构建起一套端到端的优化机制。这套机制运用启发式的规则与策略,以及运行时的执行信息,将原本次优、甚至是低效的查询计划转换为高效的执行计划,从而提升端到端的执行性能。
	21 Catalyst逻辑计划:你的SQL语句是怎么被优化的
		逻辑计划解析
			Catalyst把“Unresolved Logical Plan”转换为“Analyzed Logical Plan”
				确认表名、字段名、字段类型与实际数据是否一致
		逻辑计划优化
			Catalyst基于一些既定的启发式规则(Heuristics Based Rules),把“Analyzed Logical Plan”转换为“Optimized Logical Plan”。
			catalyst优化器可以划分到三个范畴
				减少需要扫描和处理的数据量,降低后续计算的负载
					谓词下推(Predicate Pushdown)
						“谓词”指代的是像用户表上“age < 30”这样的过滤条件,“下推”指代的是把这些谓词沿着执行计划向下,推到离数据源最近的地方,从而在源头就减少数据扫描量。
						对于Parquet、ORC这类存储格式,结合文件注脚(Footer)中的统计信息,下推的谓词能够大幅减少数据扫描量,降低磁盘I/O开销。
					列剪裁(Column Pruning)
						列剪裁就是扫描数据源的时候,只读取那些与查询相关的字段。
				常量替换 (Constant Folding)
			Catalys的优化过程
				自顶向下实行transformDown
					从“Analyzed Logical Plan”到“Optimized Logical Plan”的转换,就是从一个TreeNode生成另一个TreeNode的过程。
			Cache Manager优化
	22 Catalyst物理执行计划:你的SQL语句是怎么被优化的?
		逻辑优化的每一步仅仅是从逻辑上表明Spark SQL需要“做什么”,并没有从执行层面说明具体该“怎么做”。
			具体怎么做:物理执行计划
		为了让查询计划(Query Plan)变得可操作、可执行,Catalyst的物理优化阶段(Physical Planning)可以分为两个环节
			优化Spark Plan
				在优化Spark Plan的过程中,Catalyst基于既定的优化策略(Strategies),把逻辑计划中的关系操作符一一映射成物理操作符,生成Spark Plan。
			生成Physical Plan
		优化Spark Plan
			所有优化策略在转换方式上都大同小异,都是使用基于模式匹配的偏函数(Partial Functions),把逻辑计划中的操作符平行映射为Spark Plan中的物理算子。
			Catalyst都有哪些Join策略?
				5种join策略
					Broadcast Hash Join(BHJ)
					Shuffle Sort Merge Join(SMJ)
					Shuffle Hash Join(SHJ)
					Broadcast Nested Loop Join(BNLJ)
					Shuffle Cartesian Product Join(CPJ)
				两种数据分发方式
					broadcast
					shuffle
				三种join实现机制
					hash join
					sort merge join
					nested loop join
			JoinSelection 如何选择join策略
				条件型信息
					Join类型,也就是是否等值、连接形式等,这种信息的来源是查询语句本身
					内表尺寸,这些信息的来源就比较广泛了,可以是Hive表之上的ANALYZE TABLE语句,也可以是Spark对于Parquet、ORC、CSV等源文件的尺寸预估,甚至是来自AQE的动态统计信息
				指令型信息:join hints
		生成Physical Plan
			Preparation Rules有6组规则,这些规则作用到Spark Plan之上就会生成Physical Plan,而Physical Plan最终会由Tungsten转化为用于计算RDD的分布式任务。
			6种 preparation rules
				EnsureRequirements
					含义:确保每个操作符的输入要求,必要时添加shuffle、sort等操作
					作用:为physical plan补充必要的操作步骤,保证spark plan中的每个步骤能够顺利进行
					参考小Q计算过程中EnsureRequirements在project后面添加的Shuffle和sort计算
				collapseCodegenStages
					含义:tungsten的优化机制-全阶段代码生成,whole stage code generation
					作用:在同一个stage内部,尽可能将所有操作和计算步骤捏合成一个函数,提成计算效率
				ReuseExchange
					含义:内存或磁盘中的存储复用
					作用:同一个执行计划可以共享广播变量或者shuffle的中间结果,避免重复计算
				ReuseSubquery
					含义:自查询复用
					作用:复用同样的查询结果,避免重复计算
				Plansubquery
					含义:生成自查询
					作用:对子查询使用preparation rules
				ExtractPythonUDFs
					含义:提取Python的UDF函数
					作用:把Python UDF分发到单独的Python进程
	23 Tungsten:钨丝计划
		Tungsten又叫钨丝计划,它围绕内核引擎主要有两方面的优化
			数据结构设计
			全阶段代码生成WSCG:whole stage code generation
		Tungsten在数据结构方面的设计
			Unsafe Row:二进制数据结构
				使用JVM传统的对象方式来存储Schema具有的缺点
					首先,存储开销大。
					其次,在JVM堆内内存中,对象数越多垃圾回收效率越低。
				UnsafeRow
					字节数组的存储方式消除存储开销
					使用一个数组对象就能够实现对一条数据的封装,显著降低GC压力
			基于内存页的内存管理
				tungsten地址分为两个部分
					前64位预留给Java Object
					后64位是偏移地址Offset
				对于On Heap空间的Tungsten地址
					前64位存储的是JVM堆内对象的引用或者说指针,后64位Offset存储的是数据在该对象内的偏移地址
				Off Heap空间
					在堆外的空间中,由于Spark是通过Java Unsafe API直接管理操作系统内存,不存在内存对象的概念
					前64位存储的是null值,后64位则用于在堆外空间中直接寻址操作系统的内存空间
				对比Java标准库(java.util.HashMap)和Tungsten模式下的HashMap
					存储开销
						首先,Tungsten放弃了链表的实现方式,使用数组加内存页的方式来实现HashMap。数组中存储的元素是Hash code和Tungsten内存地址,也就是Object引用外加Offset的128位地址。Tungsten HashMap使用128位地址来寻址数据元素,相比java.util.HashMap大量的链表指针,在存储开销上更低。
					GC效率
						其次,Tungsten HashMap的存储单元是内存页,内存页本质上是Java Object,一个内存页可以存储多个数据条目。因此,相比标准库中的HashMap,使用内存页大幅缩减了存储所需的对象数量。比如说,我们需要存储一百万条数据记录,标准库的HashMap至少需要三百万的JVM对象才能存下,而Tungsten HashMap可能只需要几个或是十几个内存页就能存下。对比下来,它们所需的JVM对象数量可以说是天壤之别,显然,Tungsten的实现方式对于GC更加友好。
					CPU cache利用率
						再者,内存页本质上是JVM对象,其内部使用连续空间来存储数据,内存页加偏移量可以精准地定位到每一个数据元素。因此,在需要扫描HashMap全量数据的时候,得益于内存页中连续存储的方式,内存的访问方式从原来的随机访问变成了顺序读取(Sequential Access)。顺序内存访问会大幅提升CPU cache利用率,减少CPU中断,显著提升CPU利用率。
			如何理解WSCG?
				内存计算的第二层含义了,它指的是在同一个Stage内部,把多个RDD的compute函数捏合成一个,然后把这一个函数一次性地作用在输入数据上。
					WSCG指的是基于同一Stage内操作符之间的调用关系,生成一份“手写代码”,真正把所有计算融合为一个统一的函数。
				什么是火山迭代模型?
					语法树当中的每个操作符都需要完成如下步骤
						从内存中读取父操作符的输出结果作为输入数据
						调用hasNext、next方法,以操作符逻辑处理数据,如过滤、投影、聚合等等
						将处理后的结果以统一的标准形式输出到内存,供下游算子消费
					“手写代码”解决了VI计算模型的两个核心痛点:操作符之间频繁的虚函数调用,以及操作符之间数据交换引入的内存随机访问。手写代码中的每一条指令都是明确的,可以顺序加载到CPU寄存器,源数据也可以顺序地加载到CPU的各级缓存中,因此,CPU的缓存命中率和工作效率都会得到大幅提升。
				WSCG是如何在运行时动态生成代码的?
					本质上,WSCG机制的工作过程就是基于一份“性能较差的代码”,在运行时动态地(On The Fly)重构出一份“性能更好的代码”。
					手写代码的生成过程分为两个步骤
						从父节点到子节点,递归调用doProduce,生成代码框架
						从子节点到父节点,递归调用doConsume,向框架填充每一个操作符的运算逻辑
					以select count(user_id) from citizens where city = 'beijing'为例
						首先,在Stage顶端节点也就是Project之上,添加WholeStageCodeGen节点。WholeStageCodeGen节点通过调用doExecute来触发整个代码生成过程的计算。doExecute会递归调用子节点的doProduce函数,直到遇到Shuffle Boundary为止。这里,Shuffle Boundary指的是Shuffle边界,要么是数据源,要么是上一个Stage的输出。在叶子节点(也就是Scan)调用的doProduce函数会先把手写代码的框架生成出来,如图中右侧蓝色部分的代码。
							while (table.hasNext()) {internalRow row table.next();blabla}
						然后,Scan中的doProduce会反向递归调用每个父节点的doConsume函数。不同操作符在执行doConsume函数的过程中,会把关系表达式转化成Java代码,然后把这份代码像做“完形填空”一样,嵌入到刚刚的代码框架里。比如图中橘黄色的doConsume生成的if语句,其中包含了判断地区是否为北京的条件,以及紫色的doConsume生成了获取必需字段userId的Java代码。
							if row.getString(2) == 'beijing' {int user_id =row.getString(0);rowWriter.write(0,ser_id; };
							ret = rowWriter.getRow();
						就这样,Tungsten利用CollapseCodegenStages规则,经过两层递归调用把Catalyst输出的Spark Plan加工成了一份“手写代码”,并把这份手写代码会交付给DAGScheduler。
							拿到代码之后,DAGScheduler再去协调自己的两个小弟TaskScheduler和SchedulerBackend,完成分布式任务调度。
	24-26 Spark3.0的三个新特性:自适应查询执行(AQE)、动态分区剪裁(DPP)和扩展的 Join Hints 
		AQE的三个特性怎样才能用好?
			Spark为什么需要AQE?
				启发式的优化又叫RBO(Rule Based Optimization,基于规则的优化)
					谓词下推等经验主义的优化策略
				CBO(Cost Based Optimization,基于成本的优化)
					CBO的特点是“实事求是”,基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。
					缺点:窄/慢/静
						窄指的是适用面太窄,CBO仅支持注册到Hive Metastore的数据表,但在大量的应用场景中,数据源往往是存储在分布式文件系统的各类文件,如Parquet、ORC、CSV等等。
						慢指的是统计信息的搜集效率比较低。对于注册到Hive Metastore的数据表,开发者需要调用ANALYZE TABLE COMPUTE STATISTICS语句收集统计信息,而各类信息的收集会消耗大量时间。
						静指的是静态优化,这一点与RBO一样。CBO结合各类统计信息制定执行计划,一旦执行计划交付运行,CBO的使命就算完成了。换句话说,如果在运行时数据分布发生动态变化,CBO先前制定的执行计划并不会跟着调整、适配。
				AQE自定义查询执行
			AQE到底是什么?
				AQE是Spark SQL的一种动态优化机制,在运行时,每当Shuffle Map阶段执行完毕,AQE都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。
				AQE优化机制触发的时机是Shuffle Map阶段执行完毕。
					也就是说,AQE优化的频次与执行计划中Shuffle的次数一致。
				AQE的规则以及执行策略
					首先,AQE赖以优化的统计信息与CBO不同,这些统计信息并不是关于某张表或是哪个列,而是Shuffle Map阶段输出的中间文件。
					其次,结合Spark SQL端到端优化流程图我们可以看到,AQE从运行时获取统计信息,在条件允许的情况下,优化决策会分别作用到逻辑计划和物理计划。
					1个逻辑优化规则和3个物理优化策略
						join策略调整
						自动分区合并
						自动倾斜处理
			如何用好AQE
				AQE三大特性
					Join策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从Shuffle Sort Merge Join降级(Demote)为执行效率更高的Broadcast Hash Join。
					自动分区合并:在Shuffle过后,Reduce Task数据分布参差不齐,AQE将自动合并过小的数据分区。
					自动倾斜处理:结合配置项,AQE自动拆分Reduce阶段过大的数据分区,降低单个Reduce Task的工作负载。
				Join策略调整
					DemoteBroadcastHashJoin
						DemoteBroadcastHashJoin规则的作用,是把Shuffle Joins降级为Broadcast Joins。需要注意的是,这个规则仅适用于Shuffle Sort Merge Join这种关联机制,其他机制如Shuffle Hash Join、Shuffle Nested Loop Join都不支持。
						shuffle map阶段完成后,会对map中间文件进行两条判断
							中间文件尺寸总和小于广播阈值spark.sql.autoBroadcastJoinThreshold
							空文件占比小于配置项spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
					OptimizeLocalShuffleReader
						采取OptimizeLocalShuffleReader策略可以省去Shuffle常规步骤中的网络分发,Reduce Task可以就地读取本地节点(Local)的中间文件,完成与广播小表的关联操作。
						OptimizeLocalShuffleReader物理策略的生效与否由一个配置项决定。这个配置项是spark.sql.adaptive.localShuffleReader.enabled
				自动分区合并
					在Reduce阶段,当Reduce Task从全网把数据分片拉回,AQE按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起。
					目标分区尺寸参数
						spark.sql.adaptive.advisoryPartitionSizeInBytes,由开发者指定分区合并后的推荐尺寸。
						spark.sql.adaptive.coalescePartitions.minPartitionNum,分区合并后,分区数不能低于该值。
							会写到物理执行计划中
				自动倾斜处理
					在Reduce阶段,当Reduce Task所需处理的分区尺寸大于一定阈值时,利用OptimizeSkewedJoin策略,AQE会把大分区拆成多个小分区。
					倾斜分区和拆分粒度的配置项
						spark.sql.adaptive.skewJoin.skewedPartitionFactor,判定倾斜的膨胀系数
						spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,判定倾斜的最低阈值
						spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位,定义拆分粒度
					缺点
						只能够改善单个executor中的数据倾斜
							AQE的自动倾斜处理机制只能以Task为粒度来平衡工作负载
						总的来说,当应用场景中的数据倾斜比较简单,比如虽然有倾斜但数据分布相对均匀,或是关联计算中只有一边倾斜,我们完全可以依赖AQE的自动倾斜处理机制。但是,当我们的场景中数据倾斜变得复杂,比如数据中不同Key的分布悬殊,或是参与关联的两表都存在大量的倾斜,我们就需要衡量AQE的自动化机制与手工处理倾斜之间的利害得失。
		DPP该怎么用
			分区剪裁
				如果过滤谓词中包含分区键,那么Spark SQL对分区表做扫描的时候,是完全可以跳过(剪掉)不满足谓词条件的分区目录,这就是分区剪裁
				DPP指的是在数据关联的场景中,Spark SQL利用维度表提供的过滤信息,减少事实表中数据的扫描量、降低I/O开销,从而提升执行性能。
			动态分区剪裁
				使用DPP加速事实表的读取和访问的三个条件
					事实表必须是分区表,而且分区字段(可以是多个)必须包含Join Key。
					DPP仅支持等值Joins,不支持大于、小于这种不等值关联关系。
					维度表过滤之后的数据集要小于广播阈值,开发者要注意调整配置项spark.sql.autoBroadcastJoinThreshold
				使用广播变量封装过滤之后的维度表数据
					在维度表做完过滤之后,Spark SQL在其上构建哈希表(Hash Table),这个哈希表的Key就是用于关联的Join Key
					实现的两个作用
						第一个作用就是给事实表用来做分区剪裁,如图中的步骤1所示,哈希表中的Key Set刚好可以用来给事实表过滤符合条件的数据分区。
						第二个作用就是参与后续的Broadcast Join数据关联,如图中的步骤2所示。这里的哈希表,本质上就是Hash Join中的Build Table,其中的Key、Value,记录着数据关联中所需的所有字段,如users.id、users.name,刚好拿来和事实表做Broadcast Hash Join。
		Join Hints 指南:不同场景下,如何选择join策略?
			Join的实现方式详解
				NLJ的工作原理:嵌套循环
					对于外表中的每一条数据记录,内层的for循环会逐条扫描内表的所有记录,依次判断记录的Join Key是否满足关联条件
					NLJ算法的计算复杂度是O(M * N)
				SMJ的工作原理:先排序、再归并
					SMJ刚开始工作的时候,内外表的游标都会先锚定在两张表的第一条记录上,然后再对比游标所在记录的Join Key。
						外表Join Key等于内表Join Key,满足关联条件,把两边的数据记录拼接并输出,然后把外表的游标滑动到下一条记录
						外表Join Key小于内表Join Key,不满足关联条件,把外表的游标滑动到下一条记录
						外表Join Key大于内表Join Key,不满足关联条件,把内表的游标滑动到下一条记录
					SMJ算法的计算复杂度为O(M + N)
				HJ的工作原理:把内表扫描的计算复杂度降低至O(1)
					Build阶段和Probe阶段
						在Build阶段,基于内表,算法使用既定的哈希函数构建哈希表。哈希表中的Key是Join Key应用(Apply)哈希函数之后的哈希值,表中的Value同时包含了原始的Join Key和Payload。
						在Probe阶段,算法遍历每一条数据记录,先是使用同样的哈希函数,以动态的方式(On The Fly)计算Join Key的哈希值。然后,用计算得到的哈希值去查询刚刚在Build阶段创建好的哈希表。如果查询失败,说明该条记录与维度表中的数据不存在关联关系;如果查询成功,则继续对比两边的Join Key。如果Join Key一致,就把两边的记录进行拼接并输出,从而完成数据关联。
			分布式环境下的Join
				分布式环境中的数据关联在计算环节依然遵循着NLJ、SMJ和HJ这3种实现方式,只不过是增加了网络分发这一变数。
				在Spark的分布式计算环境中,数据在网络中的分发主要有两种方式,分别是Shuffle和广播
					如果采用Shuffle的分发方式来完成数据关联,那么外表和内表都需要按照Join Key在集群中做全量的数据分发。因为只有这样,两个数据表中Join Key相同的数据记录才能分配到同一个Executor进程,从而完成关联计算,如下图所示。
					如果采用广播机制的话,情况会大有不同。在这种情况下,Spark只需要把内表(基表)封装到广播变量,然后在全网进行分发。由于广播变量中包含了内表的全量数据,因此体量较大的外表只要“待在原地、保持不动”,就能轻松地完成关联计算。
			Spark如何选择Join策略?
				网络分发和计算开销
				等值Join下,Spark如何选择Join策略?
					在等值数据关联中,Spark会尝试按照BHJ > SMJ > SHJ的顺序依次选择Join策略
						BHJ:一是连接类型不能是全连接(Full Outer Join);二是基表要足够小,可以放到广播变量里面去。
						SHJ:首先,外表大小至少是内表的3倍。其次,内表数据分片的平均大小要小于广播变量阈值。
						SMJ没有这么多的附加条件,无论是单表排序,还是两表做归并关联,都可以借助磁盘来完成
						spark.sql.join.preferSortMergeJoin
				不等值Join下,Spark如何选择Join策略?
					由于不等值Join只能使用NLJ来实现,因此Spark SQL可选的Join策略只剩下BNLJ和CPJ
	27 大表Join小表:广播变量容不下小表怎么办?
		案例1:Join Key远大于Payload
			1. 基于现有的Join Keys去生成一个全新的数据列,它可以叫“Hash Key”。生成的方法分两步:
				把所有Join Keys拼接在一起,把性别、年龄、一直到小时拼接成一个字符串,如图中步骤1、3所示
				使用哈希算法(如MD5或SHA256)对拼接后的字符串做哈希运算,得到哈希值即为“Hash Key”,如上图步骤2、4所示
			2. 消除哈希冲突隐患的方法其实很多,比如“二次哈希”,也就是我们用两种哈希算法来生成Hash Key数据列
			Join Keys远大于Payload的数据关联,我们可以使用映射方法(如哈希运算),用较短的字符串来替换超长的Join Keys,从而大幅缩减小表的存储空间。如果缩减后的小表,足以放进广播变量,我们就可以将SMJ转换为BHJ,=来消除繁重的Shuffle计算。需要注意的是,映射方法要能够有效地避免“映射冲突”的问题,避免出现不同的Join Keys被映射成同一个数值。
		案例2:过滤条件的Selectivity较高
			对于两张表都远超广播阈值的关联场景来说,如果我们不做任何调优的,Spark就会选择SMJ策略计算。
			对于案例中的这种星型关联,我们还可以利用DPP机制来减少事实表的扫描量,进一步减少I/O开销、提升性能。
			如果小表携带过滤条件,且过滤条件的选择性很高,我们可以通过开启AQE的Join策略调整特性,在运行时把SMJ转换为BHJ,从而大幅提升执行性能。
		案例3:小表数据分布均匀
			当参与Join的两张表尺寸相差悬殊且小表数据分布均匀的时候,SHJ往往比SMJ的执行效率更高
			使用Join Hints来强制Spark SQL去选择SHJ策略进行关联计算
			如果小表不带过滤条件,且尺寸远超广播阈值。如果小表本身的数据分布比较均匀,我们可以考虑使用Join hints强行要求Spark SQL在运行时选择SHJ关联策略。一般来说,在“大表Join小表”的场景中,相比SMJ,SHJ的执行效率会更好一些。背后的原因在于,小表构建哈希表的开销,要小于两张表排序的开销。
	28-29 大表join大表
		“分而治之”的调优思路
			如何理解“分而治之”
				“分而治之”中一个关键的环节就是内表拆分,我们要求每一个子表的尺寸相对均匀,且都小到可以放进广播变量
				拆分的关键在于拆分列的选取
			如何保证内表拆分的粒度足够细?
				为了兼顾执行性能与开发效率,拆分列的基数要足够大,这样才能让子表小到足以放进广播变量,但同时,拆分列的基数也不宜过大,否则实现“分而治之”的开发成本就会陡然上升。通常来说,日期列往往是个不错的选择。
			如何避免外表的重复扫描?
				第一种是将外表全量缓存到内存,不过这种方法对于内存空间的要求较高,不具备普适性。
				第二种假设外表的分区键包含Join Keys,那么,每一个内表子表都可以通过DPP机制,帮助与之关联的外表减少数据扫描量。
			“分而治之”调优思路实战
		“负隅顽抗”的调优思路
			数据分布均匀
				SHJ hint:/*+ shuffle_hash(orders) */
				使用SHJ的条件
					两张表数据分布均匀。
					内表所有数据分片,能够完全放入内存。
						先根据并发度与执行内存,计算出可供每个Task消耗的内存上下限,然后结合分布式数据集尺寸与上下限,倒推出与之匹配的并行度。
			数据倾斜
				数据倾斜的三种情况
					内表倾斜
					外表倾斜
					双表倾斜
				以task为粒度解决数据倾斜
					AQE会检测数据倾斜,将倾斜分区拆分为多个数据分区。同时,AQE会复制内表的分区数据,保护两表之间的数据关联
					配置项参数
						spark.sql.adaptive.skewJoin.skewedPartitionFactor,判定倾斜的膨胀系数
						spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,判定倾斜的最低阈值
						spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位定义拆分粒度
				以executor为粒度解决数据倾斜
					分而治之:对于内外表中两组不同的数据,我们分别采用不同的方法做关联计算,然后通过Union操作,再把两个关联计算的结果集做合并,最终得到“大表Join大表”的计算结果
					“两阶段Shuffle”
						在不破坏原有关联关系的前提下,在集群范围内以Executors为粒度平衡计算负载 
						“加盐、Shuffle、关联、聚合”
							外表的处理称作“随机加盐”,具体的操作方法是,对于任意一个倾斜的Join Key,我们都给它加上1到#N之间的一个随机后缀。一般将N设置为executor的总数比较合适。
							内表的处理称为“复制加盐”,具体的操作方法是,对于任意一个倾斜的Join Key,我们都把原数据复制(#N – 1)份,从而得到#N份数据副本。对于每一份副本,我们为其Join Key追加1到#N之间的固定后缀,让它与打散后的外表数据保持一致。
							内外表分别加盐之后,数据倾斜问题就被消除了。这个时候,我们就可以使用常规优化方法,比如,将Shuffle Sort Merge Join转化为Shuffle Hash Join,去继续执行Shuffle、关联和聚合操作。到此为止,“两阶段Shuffle” 的第一阶段执行完毕,我们得到了初步的聚合结果,这些结果是以打散的Join Keys为粒度进行计算得到的。
						“去盐化、Shuffle、聚合”
							首先,我们把每一个Join Key的后缀去掉,这一步叫做“去盐化”。
							然后,我们按照原来的Join Key再做一遍Shuffle和聚合计算,这一步计算得到的结果,就是“分而治之”当中倾斜部分的计算结果。
				以Executors为粒度的调优实战
				执行性能与开发成本的博弈
					我们要明确的是,分而治之外加两阶段Shuffle的调优技巧的初衷,是为了解决AQE无法以Executors为粒度平衡计算负载的问题。因此,这项技巧取舍的关键就在于,Executors之间的负载倾斜是否构成整个关联计算的性能瓶颈。
	30 应用开发:北京市小客车(汽油车)摇号趋势分析
	31 性能调优:手把手带你提升应用的执行性能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2155266.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python 中的 Socket 编程入门

Python 中的 Socket 编程入门 Socket 编程是网络编程的重要组成部分&#xff0c;允许计算机通过网络进行通信。在 Python 中&#xff0c;使用内置的 socket 模块&#xff0c;开发者可以轻松地实现客户端和服务器之间的交互。本文将详细介绍 Python 中的 Socket 编程&#xff0…

微服务Docker相关指令

1、拉取容器到镜像仓库 docker pull xxx //拉取指令到 镜像仓库 例如 docker pull mysql 、docker pull nginx docker images //查看镜像仓库 2、删除资源 2.1、删除镜像仓库中的资源 docker rmi mysql:latest //删除方式一&#xff1a;格式 docker rmi 要…

19.初始C语言指针

初始C语言指针 1.指针的认识2.指针变量的引入3.指针变量的类型4.指针的应用场景15.指针的应用场景26.作业 1.指针的认识 指针 地址 //int a 10; //类型 变量名 内存地址 值 1.变量名直接访问2.通过地址访问&&#xff1a;取地址运算符* &#xff1a;将地址内的值读取…

数据库数据恢复—Oracle报错“需要更多的恢复来保持一致性”的数据恢复案例

Oracle数据库故障&检测&#xff1a; 打开oracle数据库报错“system01.dbf需要更多的恢复来保持一致性&#xff0c;数据库无法打开”。 数据库没有备份&#xff0c;无法通过备份去恢复数据库。用户方联系北亚企安数据恢复中心并提供Oracle_Home目录中的所有文件&#xff0c;…

GitHub 上高星 AI 开源项目推荐

FIFO-Diffusion 介绍&#xff1a;FIFO-Diffusion 是一个创新的开源项目&#xff0c;它能够基于文本描述生成无限长度的高品质视频&#xff0c;而无需任何预先的模型训练。这一技术的核心在于其高效的内存管理策略和先进的扩散模型&#xff0c;使得即使是小型GPU配置也能轻松应…

1018. 可被 5 整除的二进制前缀

目录 一&#xff1a;题目&#xff1a; 二&#xff1a;代码&#xff1a; 三&#xff1a;结果&#xff1a; 一&#xff1a;题目&#xff1a; 给定一个二进制数组 nums ( 索引从0开始 )。 我们将xi 定义为其二进制表示形式为子数组 nums[0..i] (从最高有效位到最低有效位)。 …

数据结构之栈(python)

栈&#xff08;顺序栈与链栈&#xff09; 1.栈存储结构1.1栈的基本介绍1.2进栈和出栈1.3栈的具体实现1.4栈的应用例一例二例三 2.顺序栈及基本操作&#xff08;包含入栈和出栈&#xff09;2.1顺序栈的基础介绍2.2顺序栈元素入栈2.3顺序栈元素出栈2.4顺序栈的表示和实现 3.链栈及…

IDEA去除掉虚线,波浪线,和下划线实线的方法

初次安装使用IDEA&#xff0c;总是能看到导入代码后&#xff0c;出现很多的波浪线&#xff0c;下划线和虚线&#xff0c;这是IDEA给我们的一些提示和警告&#xff0c;但是有时候我们并不需要&#xff0c;反而会让人看着很不爽&#xff0c;这里简单记录一下自己的调整方法&#…

Linux:权限管理

基本权限和归属 权限和归属 基本权限与归属 • 访问权限 – 读取&#xff1a;允许查看内容-read – 写入&#xff1a;允许修改内容-write – 可执行&#xff1a;允许运行和切换-excute对于文本文件&#xff1a;r读取权限&#xff1a;cat、less、grep、head、tailw写入权限&am…

linux下共享内存的3种使用方式

进程是资源封装的单位&#xff0c;内存就是进程所封装的资源的一种。一般情况下&#xff0c;进程间的内存是相互隔离的&#xff0c;也就是说一个进程不能访问另一个进程的内存。如果一个进程想要访问另一个进程的内存&#xff0c;那么必须要进过内核这个桥梁&#xff0c;这就是…

中国雕塑—孙溟㠭凿刻印《自然贼》

中国雕塑孙溟㠭凿刻作品《自然贼》 孙溟㠭凿刻印《自然贼》 遵循自然之法谓之道&#xff0c;脱离自然之道谓之贼&#xff0c;道法自然。丙申秋月溟展刊。 孙溟㠭凿刻印《自然贼》 这方《自然贼》&#xff0c;红木章料&#xff0c;半尺见方&#xff0c;自然古朴&#xff0c;浑…

摆脱困境并在iPhone手机上取回删除照片的所有解决方案

您是否无意中从 iPhone 中删除了照片&#xff1f;您&#xff0c;无需惊慌&#xff0c;因为您可以使用以下方法恢复所有照片。 如果您长时间使用 iPhone&#xff0c;您应该知道 iOS 提供了许多 Android 不提供的备份功能。例如&#xff0c;您的所有照片都会自动备份到 iCloud 存…

【机器学习(七)】分类和回归任务-K-近邻 (KNN)算法-Sentosa_DSML社区版

文章目录 一、算法概念二、算法原理&#xff08;一&#xff09;K值选择&#xff08;二&#xff09;距离度量1、欧式距离2、曼哈顿距离3、闵可夫斯基距离 &#xff08;三&#xff09;决策规则1、分类决策规则2、回归决策规则 三、算法优缺点优点缺点 四、KNN分类任务实现对比&am…

音视频生态下Unity3D和虚幻引擎(Unreal Engine)的区别

技术背景 好多开发者跟我们做技术交流的时候&#xff0c;会问我们&#xff0c;为什么有Unity3D的RTMP|RTSP播放模块&#xff0c;还有RTMP推送和轻量级RTSP服务模块&#xff0c;为什么不去支持虚幻引擎&#xff1f;二者区别在哪里&#xff1f;本文就Unity3D和虚幻引擎之间的差异…

idea上传jar包到nexus

注意&#xff1a;确保idea中项目为maven项目&#xff0c;并且在nexus中已经创建了maven私服。 1、配置pom.xml中推送代码配置 <distributionManagement> <repository> <id>releases</id> <url>http://127.0.0.1:8001/repository/myRelease/<…

鼻咽癌中三级淋巴结构的单细胞与空间转录组分析|文献精析·24-09-22

小罗碎碎念 研究团队通过单细胞和空间转录组分析&#xff0c;揭示了与鼻咽癌进展和免疫治疗反应相关的三级淋巴结构。 作者角色作者姓名单位&#xff08;中文&#xff09;第一作者Yang Liu/通讯作者Jin-Xin Bei国家癌症中心南方肿瘤学重点实验室&#xff0c;鼻咽癌诊断治疗广东…

机器学习04-逻辑回归(python)-02原理与损失函数

​​​​​​​ 1. 逻辑回归概念 逻辑回归&#xff08;Logistic Regression&#xff09; 是一种 分类模型&#xff0c;主要用于解决 二分类问题&#xff08;即分成两类&#xff0c;如是否通过、是否患病等&#xff09;。逻辑回归的目标是根据输入的特征预测一个 概率&#xff0…

C++——关联式容器(4):set和map

在接触了诸如二叉搜索树、AVL树、红黑树的树形结构之后&#xff0c;我们对树的结构有了大致的了解&#xff0c;现在引入真正的关联式容器。 首先&#xff0c;先明确了关联式容器的概念。我们之前所接触到的如vector、list等容器&#xff0c;我们知道他们实际上都是线性的数据结…

C++门迷宫

目录 开头程序程序的流程图程序游玩的效果下一篇博客要说的东西 开头 大家好&#xff0c;我叫这是我58。 程序 #include <iostream> using namespace std; void printmaze(const char strmaze[11][11]) {int i 0;int ia 0;for (; i < 11; i) {for (ia 0; ia <…

部署林风社交论坛/社交论坛linfeng-community遇到问题集合

部署开源版本遇到的问题 1.管理端前端部署 npm install报错 “ERR! gyp verb ensuring that file exists: C:\Python27\python.exe” “ERR! gyp ERR! node -v v20.10.0” “ ERR! gyp ERR! node-gyp -v v3.8.0” 原因:node版本和node-gyp版本不匹配 解决方法: 1&…