Spark Shuffle的演进及SortShuffle的基本思想
1. 演进
HashShuffle -> HashShuffle 的Consolidate机制 -> SortBasedShuffle -> Tungsten-sort Based Shuffle
2. 各种 Shuffle的介绍
2.1 HashShuffle
Spark 之所以一开始就提供基于 Hash 的 Shuffle 实现机制,有一个主要目的就是为了避免不需要的排序,在MR里的Shuffle将sort作为固定步骤,导致许多并不需要排序的操作造成不必要的开销。
Hash 的 Shuffle的基本思路就是每个MapTask都为每个ReduceTask生成一个文件,然后ReduceTask读取对应的文件
优点:避免不需要的排序
缺点:
- 资源消耗过高,每个分区都需要一个buffer,当分区个数太大,如10,000时,每个map task需要约320MB的内存,造成内存消耗过大
- 打开的文件太多,对文件系统压力大,生成的文件个数是M * R(M代表MapTask数量,R代表ReduceTask数量),如果Map端游1000个task,reduce个task,那么打开文件的数量就是1000 * 1000
2.2 带有Consolidate的HashShuffle
consolidate 机制允许一个core上不同的 MapTask 复用同一个磁盘文件,这样就可以有效 MapTask 的磁盘文件进行合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。
**优点:**避免不需要的排序;相较于HashShuffle,在一定程度上减少了生成文件的个数
**缺点:**文件个数依然很多,生成的文件个数是 E * cores * R(E是executor个数,cores是每个executor上的核心数,R是ReduceTask个数)
2.3 SortShuffle
Sorted-Based Shuffle不会为每个Reducer中的Task生产一个单独的文件,相反,Sorted-Based Shuffle会把Mapper中每个ShuffleMapTask所有的输出数据Data只写到一个文件中,因为每个ShuffleMapTask中的数据会被分类,所以Sort-based Shuffle使用了index文件,存储具体ShuffleMapTask输出数据在同一个Data文件中是如何分类的信息。
这样,生成的文件个数就是2*M。(M是MapTask个数)
**优点:**相较于HashShuffle,大幅度减少了生成文件的个数,以及解决了buffer消耗内存过大的问题。
缺点:
- 如果MapTask过多,文件依然会很多,但是相较于HashShuffle产生的文件个数是M * R 或 E * cores * R,已经好了很多;
- 强制按照partitionId排序造成开销。(是否按照记录本身排序,取决于数据操作类型,在下面会讲)
2.4 Tungsten-sort Based Shuffle
基本思路和SortShuffle一样,都是会生成一份数据文件和一份索引文件,但是Tungsten-sort Based Shuffle会把数据做序列化,减少内存开销,下面会详细讲。
SortShuffle 框架的设计
在spark中,有多种数据操作类型类型,各种操作类型需要不一样,Spark需要根据不同数据操作的特点,灵活构建合适的 Shuffle 机制,如下图列举了Shuffle机制中Spark典型的数据操作的计算需求
可以看到,目前的数据操作里面,在Shuffle map端没有进行排序的,只有combine。下面会讲各种操作采取的shuffle方式。
但是,spark未来可能会有些数据操作或者用户自定义操作同时需要这两个功能,所以Shuffle框架的实现上还是需要支持全部的功能。
Shuffle Write
在Shuffle Write阶段,数据操作需要分区、聚合和排序3个功能,虽然现有的数据操作只需要其中的一个功能,但是为了支持所有的情况,设计了一个通用的Shuffle Write框架,计算顺序为 “map输出 -> 数据聚合 -> 排序”。注意看这里的顺序是先聚合,再排序,与MapReduce的先排序后聚合不同。
在实现的过程中,Spark对不同的情况进行了分类,形成了不同的Shuffle Write方式。下面介绍Spark如果针对不同的情况构建最合适的Shuffle Write方式。
1.不需要 map() 端 combine,且分区个数不大
这种方式最简单,只需要实现分区功能,如果6.5所示,map() 依次输出 <K, V> record,并计算其partitionId,Spark根据partitionId,将record依次输出到不同的buffer中,每当buffer填满(由spark.Shuffle.file.buffer控制,默认为32KB),就将record溢写到磁盘上的分区文件中。分配buffer的原因是map()输出record的速度很快,需要进行缓冲来减少磁盘I/O。在实现代码中,Spark将这种Shuffle Write方式称为BypassMergeSortShuffleWriter,即不需要排序的Shuffle Write方式。
**优点:**速度快,直接将record输出到不同的分区文件中。
缺点:
- 资源消耗过高,每个分区都需要一个buffer,当分区个数太大,如10,000时,每个map task需要约320MB的内存,造成内存消耗过大
- 打开的文件太多,对文件系统压力大,如果Map端游1000个task,reduce个task,那么打开文件的数量就是1000 * 1000
**适用的操作类型:**map端不需要combine、Key不需要排序且分区个数较少(<=spark.shuffle.sort.bypassMergeThreshold,默认为200)。例如groupByKey(100),partitionBy(100),sortByKey(100)等
2. 不需要map()端combine,但需要排序
首先需要强调的是,spark目前没有这种数据操作(如表6.5所示),但不排除用户自定义或者未来会支持。
在这种情况下,需要按照partitionId + Key进行排序。如图6.6所示,Spark采用的实现方法是建立一个Array(图6.6中的PartitionedPairBuffer)来存放map()输出的record,并对Array中的Key进行精心设计,将<K, V> record转化为 <(PID, K), V> record存储;然后按照partitionId + Key对record进行排序,最后将record写入一个文件中,通过建立索引来标识每个分区
如果Array存放不下,即curSize = capacity,则会把Array扩容到原来的2倍生成一个新数组,然后把原来数组的元素copy到新数组。
注意,这里扩容的时候并没有判断当前内存是否支持扩容,所以我觉得可能会报OOM;而且如果curSize = MAXIMUM_CAPACITY,则会直接抛异常,Can’t insert more than ${MAXIMUM_CAPACITY} elements
书里面讲的是在扩容的时候判断是否可以放下,如果放不下,就溢写。
在insert完以后,会判断是否需要溢写,因为在Array里面存的都只是堆里面对象的引用,所以不能直接通过Array的长度来判断,而是要计算所以对象实例的总大小,Spark通过一个估算算法来计算当前使用到的内存,在介绍AppendOnlyMap的时候会介绍这个估算算法。
这种Shuffle模式被称为SortShuffleWriter
**优点:**输出的数据按照partitionId进行排序,因此只需要2个分区文件存储,即可标识不同的分区数据,克服了BypassMergeSortShuffleWriter中建立文件数过多的问题,适用于分区个数很大的情况。
**缺点:**排序增加计算时延。
**适用的操作类型:**map()端不需要combine、Key需要排序,分区个数无限制。目前,Spark本身没有提供这种排序类型的数据操作,但不排除用户会自定义,或者系统未来提供这种类型的操作。SortByKey() 操作虽然需要按Key进行排序,但这个排序过程是在Shuffle Read端完成的,不需要在Shuffle Write端进行排序。
上面提到的BypasssMergeSortWriter模式的缺点是,分区一旦过多,就会出现buffer过大的问题,这种问题可以用SortShuffleWriter来解决,该Shuffle只需要一个Array就可以在输出到文件的时候标识不同的分区,可以解决Bypass机制buffer分配过多的问题,但是唯一的缺点是需要按照PartitionId + Key进行排序,而Bypass机制不需要排序,所以我们只需要将按PartitionId + Key进行排序改成只按PartitionId进行排序,就可以支持 ”不需要map()端combine、不需要聚合,分区个数过大 “ 的操作。例如,groupByKey(16,777,216)、partitionBy(16,777,216)、sortByKey(16,777,216)。在分区个数小于16,777,216时,用到的是Serialized Shuffle
问题:如果不采用UnsafeShuffleWriter,SortByKey操作按Key排序了吗,不会按Key排序,spark直接将ordering设置成null
只有既需要在map端需要combine,又定义了keyOrdering的,才会在map端按照key排序
如果需要combine,但是没定义keyOrdering,就按照hash(key)排序
如果不需要combine,也按照hash(key)排序,sortByKey就是按照这个做的
3. 需要map()端combine,需要或者不需要排序
首先需要强调的是,spark目前没有需要在map端排序的数据操作(如表6.5所示),但不排除用户自定义或者未来会支持。
在这种情况下,需要实现按Key进行聚合的功能,如图6.7的上图所示,Spark采用的实现方法是建立一个类似于HashMap的数据结构对map()端输出的record进行聚合。Map的Key是 “partitionId + Key”,Value是经过聚合的结果。例如,combine如果是sum函数,那么Value的结果就是累加的结果。聚合完成后,Spark对Map中的record进行排序,如果不需要按Key排序,如图6.7的上图所示,那么按照partitionId + hash(Key) 排序(书里写的是只按照partitionId排序,但是代码里是按照partitionId + hash(Key) 排序的,因为有序之后,便于和溢写文件合并),如果需要按照排序Key进行排序,如图6.7的下图所示,那么按照partitionId + Key进行排序。最后,将排序后的record写入一个分区文件中。
在input的过程中,如果当前map中的元素个数 > growThreshold(0.7 * capacity),那就将map扩展为原来的2倍,注意,这里在扩容的时候也没有判断当前内存剩余大小是否可以支持扩容,所以是可能导致OOM的。
**适用的操作类型:**需要map()端combine,需要或者不需要按Key进行排序,分区个数无限制,如reduceByKey()、aggregateByKey()。
4. 不需要map()端combine,且分区个数较大
BypassMergeSortShuffleWriter的缺点是在分区个数太多时buffer内存消耗过大,那么有没有办法降低内存消耗呢?有,可以采用SortShuffleWriter,将其排序器设置为None,将map()输出的<K,V> record不断放进数组,然后将数组里的record按照partitionId + hash(Key) 排序,输出即可。这样可行,但是普通的record数据是Java对象,占用空间较大,具体请参考https://wiki.zuoyebang.cc/pages/viewpage.action?pageId=410431091,需要更大的内存,这样容易造成内存不足。另外,大量的record对象存放到内存中也会造成频繁GC。为了提升内存利用率,Spark设计了Serialized Shuffle方(SerializedShuffleWriter),将record对象序列化后在存放到可分页存储的数组中,序列化可以减少存储开销,分页可以利用不连续的空间。
更具体地,Serialized Shuffle的优点包括:
- 序列化后的record占用的内存空间小;
- 不需要连续的内存空间,如图9.8所示,Serialized Shuffle将存储的record的数组进行分页,分页可以利用内存碎片,不需要连续的内存空间,而普通数组需要连续的内存空间;
- 排序效率高。对序列化后的record按partitionId进行排序时,排序的不是record本身,而是record序列化后字节数组的指针(元数据);
- 由于排序是在二进制数据上进行排序的,而不是Java对象,所以可以减少内存开销和GC的次数。同时,这个操作需要序列化的对象可以被排序,而不是需要进行反序列化后才能排序;
- 当溢写压缩解编码器支持压缩数据连接以及序列化器不做加密时,溢写合并只是链接序列化和压缩的文件来产出最终的分区,而不用进行反序列化和解压。这样就可以使用高效的数据复制方法,如NIO的transferTo;
- 可以使用堆外内存来存储序列化以后的对象,从而让这块数据完全避免GC。
使用Serialized Shuffle需要满足4个条件:
- 不需要map()端聚合;
- 使用的序列化器可以支持序列化Value的位置互换功能,目前KryoSerializer和Spark SQL的序列化器可以支持;
- 分区个数小于16,777,216,在一次Shuffle中产生超过1600万个输出分区的可能行极小。
**实现方式:**Serlizlized Shuffle采用了分页技术,像操作系统一样将内存空间划分为Page,每个Page大小在1MB~64MB,既可以在堆内内存上分配,也可以在堆外内存上分配(设置spark.memory.offHeap.enabled=true)。
如图9.8所示,对于map()输出的每个<K, V> record,Spark将其序列化后写入某个Page中,再将该record的索引,包括partitionId、所在的PageNum,以及该Page中的Offset放到PointerArray中,然后通过排序partitionId来对record进行排序。PointerArray就是一个long数组,在每个long里面,[24 bit partition number] [13 bit memory page number] [27 bit offset in page]
在做溢写的时候,会记录每个partition在溢写文件中的位置以及长度,然后在合并的时候可以根据这个信息读取相关的数据来做合并,如果溢写文件的压缩格式支持快速合(下面写的4个)并且序列化器不做加密时(Kyro不做加密),溢写合并只是链接序列化和压缩的文件来产出最终的分区,而不用进行反序列化和解压。这样就可以使用高效的数据复制方法,如NIO的transferTo。
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
|| codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec])
总结:
Shuffle Read
Shuffle Read阶段需要实现3个功能:跨节点数据获取、聚合和排序。如表6.1所示,每个数据操作都需要其中的部分功能,Spark为了支持所有的情况,设计了一个通用的Shuffle Read框架,框架的计算顺序为“数据获取 -> 聚合 -> 排序” 输出。如图6.8所示。
在具体实现上,Shuffle Read比Write简单,选择用什么策略时,不需要考虑分区个数。
1. 不需要聚合,不需要按Key进行排序
这种情况最简单,只需要实现数据获取功能即可。如图6.9所示,等待所有的map task结束后,reduce task开始不断从各个map task获取<K, V> record,并将record输出到一个buffer中(大小为spark.reducer.maxSizeInFight=48MB),下一个操作直接从buffer中获取数据即可。
适用操作:既不需要聚合也不需要排序,如partitionBy()
2. 不需要聚合,需要按Key进行排序
在这种情况下,需要实现数据获取和按Key排序的功能。如图6.10所示,获取数据后,将buffer中的record依次输出到一个Array结构(PartitionedPairBuffer)中。然后,对Array中的record按照Key进行排序,并将排序结果输出或者传递给下一步操作。
insert流程与Shuffle map端一样,都是先插入,如果放不下就扩容,插入完以后再判断是否需要溢出。
适用的操作:在reduce端不需要聚合,但是需要按照Key进行排序的操作,如sortByKey()、sortBy() 等
3. 需要聚合,需要或者不需要按Key进行排序
在这种情况下,需要实现按照Key进行聚合,根据需要按Key进行排序的功能。如图6.11的上图所示,获取record后,Spark建立一个类似HashMap的数据结构(ExternalAppendOnlyMap)对buffer中的record进行聚合,Map中的Key是record中的Key,Value是经过聚合函数计算后的结果。如图6.11中,聚合函数是sum()函数,那么Value中存放的是多个record对应Value相加后的结果。之后,如果需要按照Key进行排序,就在Map中进行排序(ExternalAppendOnlyMap的底层是用数组实现的,所以可以在Map上直接排序)。
适用操作:适合reduce端需要聚合、需要或者不需要按Key进行排序的操作,如reduceByKey()、aggregateByKey()等
支持高效聚合和排序的数据结构
为了提高聚合和排序性能,Spark为Shuffle Write/Read的聚合和排序过程设计了3种数据结构,如表6.2所示。这几种数据结构的基本思想是在内存中对record进行聚合和排序,如果存放不下,则进行扩容,如果还存放不下,就将数据排序后spill到磁盘上,最后将磁盘和内存中的数据进行聚合、排序,得到最终结果。
Spark中的PartitionedAppendOnlyMap和ExternalAppendOnlyMap都基于AppendOnlyMap实现,所以我们先介绍AppendOnlyMap的原理。
1. AppendOnlyMap的原理
AppendOnlyMap实际上是一个只支持添加和更新的HashMap。与Java HashMap采用“数组 + 链表”实现不同,AppendOnlyMap只是用数组来存储元素,根据元素的Hash值确定存储位置,如果存储元素时发生Hash值冲突,使用二次探测法来解决Hash值冲突。
对于每个新来的<K, V> record,先使用 Hash(K) 计算器存放位置,如果存放位置为空,就把record存放到该位置。如果该位置已经被占用,就重新计算位置,一直找到一个为空的位置。
扩容:AppendOnlyMap使用数组来实现的问题是,如果插入的record太多,则很快会被填满。Spark的解决方案是,如果AppendOnlyMap的利用率达到70%,那么就扩张一倍,扩张意味着原来的Hash()失效,因此对所有Key进行rehash,重新排列每个Key的位置。
排序:由于AppendOnlyMap采用了数组作为底层存储结构,可以支持快速排序等排序算法(目前使用的是TimSort)。实现方法,如图6.13所示,先将数组中所有的<K, V> record转移到数组的前端,用begin和end来标识起始位置,然后调用排序算法对[begin, end]中的record进行排序。
2. ExternalAppendOnlyMap
AppendOnlyMap的优点是能够将聚合和排序功能很好地结合在一起,缺点是只能使用内存,难以适用内存空间不足的情况。为了解决这个问题,Spark基于AppendOnlyMap设计了基于内存+磁盘的ExternalAppendOnlyMap,用于Shuffle Read端大规模数据聚合。同时,由于Shuffle Write端聚合需要考虑partitionId,Spark也设计了带有partitionId的ExternalAppendOnlyMap,名为PartitionedAppendOnlyMap。这两个数据结构功能很类似。
ExternalAppendOnlyMap的工作原理是,先持有一个AppednOnlyMap来不断接受和聚合新来的record,每插入一条数据,都会判断当前是否需要溢出,如果需要溢出,就将AppendOnlyMap中的record进行排序然后都spill到磁盘上(注意:这里判断是否需要溢出,与Shuffle Write阶段是反着的,目前还不知道为什么这样设计)。AppendOnlyMap快被装满时(70%),会在内存中进行扩容,,因为record不断到来,可能会多次溢出。等record都处理完,此时AppendOnlyMap中可能还留存一些聚合后的record,磁盘上也有多个spill文件。因为这些数据都经过了部分聚合,还需要进行全局聚合。因此,ExternalAppendOnlyMap的最后一步时将内存中AppendOnlyMap的数据和磁盘上的spill文件进行全局聚合,得到最终的结果。
上述流程,有三个核心问题需要解决:(1)如何获知当前AppendOnlyMap的大小?虽然我们知道AppendOnlyMap中持有数组的长度和大小,但是因为数组中存储的是对象的引用,并不是它们实际对象的大小,而且Value也会不断被更新,实际大小不断变化。什么时候会超出内存界限难以确定。(2)如何设计spill的文件结构,使得可以支持高校的全局聚合?(3)怎样进行全局聚合
2.1 AppendOnlyMap的大小估计
可以有以下两个办法:
- 每次插入record或者对现有record的Value进行更新后,都扫描一下AppendOnlyMap中存放的record,计算每个record的实际大小并相加,但这样会非常耗时。
- 每次插入record或者对现有record的Value进行更新后,都计算当前的更新大小,然后更新历史累计的大小,但是这种就需要有一个地方来维护每一个Key原本占了多大,引入额外的存储;而且如果Value中包含多层引用,那每次插入一条record,都得查一遍这个对象的引用;
Spark设计了一个增量式的估算算法,在每个record插入或更新时,都会根据历史统计值和当前变化量直接估算当前AppendOnlyMap的大小,算法的复杂度时O(1),开销很小。在record插入和聚合过程中会定期对当前AppendOnlyMap中的record进行抽样,然后精确计算这些record的总大小、总个数、更新个数及平均值等,作为历史统计值。
//估算的代码
def estimateSize(): Long = {
assert(samples.nonEmpty)
//bytesPerUpdate:通过采样获得的每次更新大小,在每次采样时都会更新该值
//numUpdates:总的更新的record个数
//sample.last.numUpdates:上一次采样时更新的record个数
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
//从上一次采样到现在的增量
(samples.last.size + extrapolatedDelta).toLong
}
2.2 Spill过程与排序
当AppendOnlyMap达到内存限制时,会将record排序后写入磁盘。排序是为了方便下一步全局聚合时可以采用更高效的merge-sort。那么问题是根据什么对record进行排序的?自然想到的是根据record的Key进行排序,但是这就要求操作定义Key的排序方法,如SortByKey()等操作定义了按照Key进行排序。但是大部分操作,如groupByKey(),并没有定义Key的排序方法,也不需要输出结果按照Key进行排序,这种情况下,Spark就采用Key的Hash值进行排序,如果Hash值冲突,就判断Key是否相等。
解决了spill时如何对record进行排序的问题后,每当AppendOnlyMap超过内存限制,就会将其内部的record排序后spill到磁盘上,如图6.14所示,AppendOnlyMap被填满了四次,也被spill到磁盘上4次。
2.3 全局聚合
由于最终的spill文件和内存中的AppendOnlyMap都是经过部分聚合后的结果,其中可能存在相同Key的record,因此还需要一个全局聚合阶段将AppendOnlyMap中的record与spill文件中的record进行聚合,得到最终聚合后的结果。全局聚合的方法就是建立一个最小堆或最大堆,每次从各个spill文件中读取前几个具有相同Key(或者相同Key的Hash值)的record,然后与AppendOnlyMap中的record进行聚合,并输出聚合后的结果。在图6.14中,在全局聚合时,Spark分别从4个spill文件中提取第1个<K, V> record,与还留在AppendOnlyMap中的第1个record组成最小堆,然后不断从最小堆中提取具有相同Key的record进行聚合。然后,Spark继续读取spill文件及AppendOnlyMap中的record填充最小堆,直到所有的record处理完成。
总结:ExternalAppendOnlyMap是一个高性能的HashMap,只支持数据插入和更新,但可以同时利用内存和磁盘堆大规模数据进行聚合和排序,满足了Shuffle Read阶段数据聚合、排序的需求。
3. PartitionedAppendOnlyMap
PartitionAppendOnlyMap适用于在Shuffle Write端对record进行聚合的。PartitionedAppendOnlyMap的只是对AppendOnlyMap做了简单的封装,并没有判断溢写的逻辑,溢写是使用者来判断的。
PartitionedOnlyMap只有insert和扩容,本身没有判断溢写,而是在ExternalSorter中判断是否需要溢写;而ExternalOnlyMap则是每次在insert的之前,都会先判断是否需要溢写,然后才进行insert和扩容。目前不理解为什么要这样设计
4. PartitionedPairBuffer
PartitionedPairBuffer本质上是一个基于内存+磁盘的Array,随着数据添加,不断地扩容,当达到内存限制时,就将Array中的数据按照partitionId或partitionId+Key进行排序,然后spill到磁盘上,该过程可以进行多次,最后对内存中和磁盘上的数据进行全局排序
Spark Shuffle和MR Shuffle的对比
1. 简单介绍MR Shuffle
MR Shuffle流程简单概述:Map端把数据放到环形缓冲区,如果缓冲区达到80%,就排序后溢写到磁盘,如果有man端combine,则排序后再加一个combine然后在溢写;reduce从获取各个map的数据,然后做merge排序,最后走reduce中定义的逻辑。
具体的可以看:https://blog.csdn.net/stable_zl/article/details/127199024?spm=1001.2014.3001.5502
2. MR Shuffle优缺点
优点:
- Shuffle流程固定,阶段分明,每个阶段读取什么数据、进行什么操作、输出什么数据都是确定性的,使得实现起来比较容易;
- 内存消耗也是确定的,map阶段之需要一个大的spill buffer,reduce阶段之需要一个大的MergeQueue来存放获取的分区文件中的record。这样,什么时候将数据spill到磁盘上也是确定的,也易于实现和内存管理。当然,用户定义的聚合函数,如combine()和reduce()的内存是不确定的;
- 对Key进行了严格排序,使得可以使用最小堆或最大堆进行聚合,非常高效。
缺点:
- 强制按Key进行排序,对于不需要严格按照Key排序的的操作会增加计算量;
- 不能在线聚合,不管是map()端还是reduce()端,都是先将数据存放到内存或者磁盘上后,再执行聚合操作,存储这些数据需要大量的内存和磁盘空间。如果能一边获取record一边聚合,那么对于大多数绝好操作,可以有效的减少存储空间;
- 在环形缓冲区中存储的是序列化以后的数据,导致每次在进行排序和combine的时候需要将数据进行反序列化
3. Spark对比MR的优缺点
优点:
- 克服强制排序,Spark提供了按partitionId排序、按Key排序等多种方式来灵活应对不同操作的排序需求;
- 克服不能在线聚合,Spark利用AppendOnly等数据结构完成在线聚合
- 克服反序列化,Spark在做聚合和排序时,是直接存放的原始对象,当然,这种方式也需要更好的内存管理;
缺点:(我自己想的,欢迎大家纠正和补充)
- Spark目前的操作都不在map端做排序,reduce不能直接做归并排序,会导致reduce端的压力变大;
- 在Spark Serialized Shuffle中,存储partition元数据信息的数组,同样会有开销,在数据量大的时候,这个数组会导致OOM
参考:许利杰老师的《大数据处理框架Apache Spark的设计与实现》