1. spark core
1.简述hadoop 和 spark的不同点(为什么spark更快)♥♥♥
shuffle都是需要落盘的,因为在宽依赖中需要将上一个阶段的所有分区数据都准备好,才能进入下一个阶段,那么如果一直将数据放在内存中,是非常耗费资源的
- MapReduce需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO;而spark不需要将计算中间结果写入磁盘,这得益于spark的RDD弹性分布式数据集和DAG有向无环图,中间结果能够以RDD的形式放在内存中,这样大大减少了磁盘IO。
- MapReduce在shuffle时需要花费大量时间排序,而spark在shuffle时如果选择基于hash的计算引擎,是不需要排序的,这样就会节省大量时间。
- MapReduce是多进程模型,每个task会运行在一个独立的JVM进程中,每次启动都需要重新申请资源,消耗大量的时间;而spark是多线程模型,每个executor会单独运行在一个JVM进程中,每个task则是运行在executor中的一个线程。
2. 谈谈你对RDD的理解
RDD称为弹性分布式数据集,是spark中最基本的数据处理模型。代码中代表是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
- 弹性
- 存储的弹性:内存与磁盘的自动切换
- 容错的弹性:数据丢失可以自动恢复
- 计算的弹性:计算出错重试机制
- 分片的弹性:可根据需要重新分片
- 分布式:数据存储在大数据集群不同节点上
- 数据集:RDD封装了计算逻辑,并不保存数据
- 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
- 可分区、并行计算
3. 简述spark的shuffle过程♥♥♥
如果问spark与Mapreduce的shuffle的区别,先说Mapreduce的shuffle,再说spark的shuffle。
spark的shuffle分为两种实现,分别为HashShuffle和Sortshuffle
- HashShuffle分为普通机制和合并机制,分为write阶段和read阶段,write阶段就是根据key进行分区,开始先将数据写入对应的buffer中,当buffer满了之后就会溢写到磁盘上,这个时候会产生mapper的数量 x reducer的数量的小文件,这样就会产生大量的磁盘IO。read阶段就是reduce去拉取各个maptask产生的同一个分区的数据;HashShuffle的合并机制就是让多个mapper共享buffer,这时候落盘的数量等于reducer的数量 x core的个数,从而可以减少落盘的小文件数量,但是当Reducer有很多的时候,依然会产生大量的磁盘小文件。
- SortShuffle分为普通机制和bypass机制
- 普通机制:map task计算的结果数据会先写入一个内存数据结构(默认5M)中,每写一条数据之后,就会判断一下,是否达到阈值,如果达到阈值的话,会先尝试增加内存到当前内存的2倍,如果申请不到会溢写,溢写的时候先按照key进行分区和排序,然后将数据溢写到磁盘,最后会将所有的临时磁盘文件合并一个大的磁盘文件,同时生成一个索引文件,然后reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。
- bypass机制:将普通机制的排序过程去掉了,它的触发条件是当shuffle map task 数量下于200(配置参数)并且算子不是聚合类的shuffle算子(比如reduceByKey)的时候,该机制不会排序,极大提高了性能。
4. spark的作业运行流程是怎么样的♥♥
重要参数如下:
- num-executoes:配置Executor的数量
- executor-memory:配置每个Executor的内存大小(堆内内存)
- executor-cores:配置每个Executor的虚拟CPU core数量
运行流程如下:
- 首先spark的客户端将作业提交给yarn的RM,然后RM会分配container,并且选择合适的NM启动ApplicationMaster,然后AM启动Driver,紧接着向RM申请资源启动executor,Executor进程启动后会反向向Driver进行注册,全部注册完成后Driver开始执行main函数,当执行到行动算子,触发一个Job,并根据宽依赖开始划分stage(阶段的划分),之后将task分发到各个Executor上执行。
- 扩展一:spark怎么提高并行度
- spark作业中,各个stage的task的数量,也就代表了spark作业在各个stage的并行度。
- 官方推荐,并行度设置为总的cpu核心数的2到3倍
- 设置参数:spark.default.parallelism
- 扩展2:spark内存管理
- spark分为堆内内存和堆外内存,堆内内存有jvm统一管理,而堆外内存直接向操作系统进行内存的申请,不受JVM控制。spark.executor.memory和spark.memory.offHeap.size
- 堆内内存又分为存储内存和执行内存和其他内存和预留内存,存储内存主要存放缓存数据和广播变量,执行内存只要存放shuffle过程的数据,其他内存主要存放rdd的元数据集信息,预留内存和和其他内存作业相同。
- 堆外内存:
- 优点:减少了垃圾回收的工作,因为垃圾回收会暂停其他的工作;
- 缺点:堆外内存难以控制,如果内存泄漏,不容易排查。
5. spark driver的作用,以及client模式和cluster模式的区别♥
- drive 主要负责管理整个集群的作业任务调度;executor是一个jvm进程,专门用于计算的节点
- client模式下,driver运行在客户端;cluster模式下,driver运行在yarn集群。
6. 你知道Application、Job、Stage、Task他们之间的关系吗?♥
- 初始化一个SparkContext就会生成一个Application
- 一个行动算子就会生成一个job
- stage(阶段)等于宽依赖的个数+1
- 一个阶段中,最后一个RDD的分区个数就是Task的个数
- 总结:Application>Job>Stage>Task
7. spark常见的算子介绍一下(10个以上)♥♥♥
算子分为转换算子和行动算子,转换算子主要是讲旧的RDD包装成新的RDD,行动算子就是触发任务的调度和作业的执行
。- 转换算子:
- map:将数据逐条进行转换,可以是数据类型的装换,也可以是值的转换
- flatMap:先进行map操作,再进行扁平化操作
- filter:根据指定的规则进行筛选
- coalesce:减少分区的个数,默认不进行shuffle
- repartition:可以增加分区或者减少分区的个数,一定发生shuffle
- union:两个RDD求并集
- zip:将两个RDD中的元素,以键值对的形式进行合并。
- reduceByKey:按照key对value进行分组
- groupByKey:按照key对value进行分组
- cogroup:按照key对value进行合并
- 行动算子用的比较多的有:
- collect:将数据采集到Driver端,形成数组
- take:返回RDD的前n个元素组成的数组
- foreach:遍历RDD中的每一个元素(executor端)
8. 简述map和mapPartitions的区别♥
他们之间有三个区别:
- map算子是串行操作,mapPartitions算子是以分区为单位的批处理操作
- map算子主要是对数据进行转换和改变,但是数量不能增加或者减少,而mapPartitions算子可以增加或者减少数据。
- map算子性能比mapPartitions算子要低,但是mapPartitions算子会长时间占用内存,可能会导致内存溢出的情况,所以如果内存不是太多,不推荐使用,一般还是使用map算子
9. 你知道重分区的相关算子吗♥
coalesce算子和repartition算子,他们都是用来改变RDD的分区数量的,而且repartition算子底层调用的就是coalesce方法。他们的区别如下:
- coalesce一般用来减少分区,如果要增加分区必须设置shuffle参数=true,repartition既可以增加分区也可以减少分区
- coalesce根据传入的参数来决定是否shuffle,设置为false容易发生数据倾斜,但是repartition一定发生shuffle,也就是shuffle参数不可修改一定为true。
- 还有一个partitionBy算子也是进行重分区的,参数传入一个分区器,默认是HashPartitioner
10. spark目前支持哪几种分区策略
spark支持Hash分区,Range分区和自定义分区
- hash分区就是计算key的hashCode,然后和分区个数取余就可以得到对应的分区号。
- range分区就是将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
- 自定义分区就是继承Partitioner类,重写numPartitions方法和getPartition方法。
11. 简述groupByKey和reduceByKey的区别♥♥♥
他们之间主要有两个区别:
- groupByKey只能分组,不能聚合,而reduceByKey包含分组和聚合两个功能
- reduceByKey会在shuffle前对分区内相同key的数据进行预聚合(类似于MapReduce的combiner),减少落盘的数据量,而groupByKey只会shuffle,不会进行预聚合的操作,因此reduceByKey的性能会高一些。
12. 简述reduceByKey、foldByKey、aggregateByKey、combineByKey的区别♥
- reduceByKey:没有初始值,分区内和分区间计算规则一样
- foldByKey:有初始值,分区内和分区间计算规则一样
- aggregateByKey:有初始值,分区内和分区间计算规则可以不一样。
- combineByKey:没有初始值,分区内和分区间计算规则可以不一样,同时返回类型可以与输入类型不一致,因此通常在数据结构不满足要求的时候,才会使用该聚合函数。
13. 宽依赖和窄依赖之间的区别♥♥
多个连续RDD的依赖关系,称之为血缘关系;每个RDD会保存血缘关系
- 宽依赖:父的RDD一个分区的数据会被子RDD的多个分区依赖,涉及到shuffle。
- 窄依赖:父的RDD的一个分区的数据只会被子RDD的一个分区依赖
为什么要设计宽窄依赖 - 窄依赖的多个分区可以并行计算;而宽依赖必须等到上一阶段计算完成才能计算下一个阶段。
如何进行stage划分: - 对于窄依赖,不会进行划分,也就是将这个转换操作尽量放在同一个stage中,可以实现流水线并行计算。对于宽依赖,由于有shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要划分stage。
14. spark为什么需要RDD持久化,持久化的方式有哪几种,他们之间的区别是什么♥♥♥
RDD实际上是不存储数据的,那么如果RDD要想重用,那么就需要重头开始再执行一遍,所以为了提高RDD的重用性,就有了RDD持久化。
- 分类:缓存和检查点
- 区别:
- 缓存有两种方法,一种是cache,将数据临时存储在内存中(默认就会调用persist(memory_only)),还有一种是persist,将数据临时存储在磁盘中,程序结束就会自动删除临时文件;而检查点,就是checkpoint,将数据长久保存在磁盘中。
- 缓存不会切断RDD之间的血缘关系,检查点会切断RDD之间的血缘关系。
15. 简述spark的容错机制
容错机制主要包括四个方面:
- stage输出失败的时候,上层调度器DAGScheduler会进行重试
- 计算过程中,某个task失败,底层调度器会进行重试。
- 计算过程中,如果部分计算结果丢失,可以根据窄依赖和宽依赖的血统重新恢复计算。
- 如果血统非常长,可以对中间结果做检查点,写入磁盘中,如果后续计算结果丢失,那么就从检查点的RDD开始重新计算。
16.除了RDD,你还了解spark的其他数据结构吗♥♥
算子以外的代码都是在Driver端执行,算子里面的代码都是在Executor端执行。
了解累加器和广播变量两种数据结构。
- 累加器就是分布式共享只写变量,简单说一下它的原理:累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并。
- 广播变量就是分布式共享只读变量,简单说一下它的原理:用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个spark操作使用。
补充:由BlockManager管理,广播变量会带来Driver的内存膨胀,因为广播变量会在内存中分割,分割完成之前,内存中由俩份大小相同的广播变量。
17. spark调优♥♥♥
2. spark sql
用于结构化数据(structured data)处理的spark模块。
18. 谈一谈RDD,DataFrame,DataSet的区别♥
- spark1.0产生了RDD,spark1.3产生了DataFrame,spark1.6产生了DataSet。
- RDD不支持sparksql操作,但是DataFrame和DataSet支持sparksql操作。
- RDD一般会和spark mllib同时使用,但是DataFrame和DataSet一般不和spark mllib同时使用
- DataFrame是DataSet的一个特例;DataFrame的每一行的类型都是Row,DataSet的每一行的数据类型不相同。
19. Hive on spark 与 sparkSql的区别♥
只是SQL引擎不同,但是计算引擎都是spark。
20. sparkSql的三种join实现♥♥♥
包括broadcast hash join、shuffle hash join、sort merge join,前两种都是基于hash join;broadcast适合一张很小和一张很大的表进行join,shuffle适合一张较大的小表和一张大表进行join,sort适合两张较大的表进行join。
- hash join这个算法主要分为三步,首先确定哪张是build table和哪张表是probe table,这个是由spark决定的,通常情况下,小表会作为build table,大表会作为probe table;然后构建hash table,遍历build table中的数据,对于每一条数据,根据join的字段进行hash,存放到hashtable中;最后遍历probe table中的数据,使用同样的hash函数,在hashtable中寻找join字段相同的数据,如果匹配成功就join到一起。这就是hash join的过程。
- broadcast hash join 分为broadcast阶段和hash join阶段,broadcast阶段就是将小表广播到所有的executor上,hash join阶段就是在每个executor上执行hash join,小表构建为hash table,大表作为probe table。
- shuffle hash join 分为shuffle阶段和hash join阶段,shuffle阶段就是对两张表分别按照join字段进行重分区,让相同key的数据进入到一个分区中;hash join 阶段就是 对每个分区中的数据执行hash join。
- sort merge join 分为shuffle阶段,sort阶段和merge阶段,shuffle阶段就是将两张表按照join字段进行重分区,让相同key的数据进入同一个分区中;sort阶段就是对每个分区内的数据进行排序;merge阶段就是对排好序的分区表进行join,分别遍历两张表,key相同就join输出,如果不同,左边小,就继续遍历左边的表,反之,遍历右边的表。