文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州
▲ 本章节目的
⚪ 了解Spark的DAG;
⚪ 掌握Spark的RDD的依赖关系;
⚪ 了解Spark对于DAG的Stage的划分;
一、DAG概念
1. 概述
Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。
2. 案例1解释
Spark Scala版本的Word Count程序如下:
val file=sc.textFile("hdfs://hadoop01:9000/hello1.txt")
val counts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。
上图展示的是word count案例的计算链,Spark底层会将这个计算链抽象为一个DAG(有向无环图)。关键的是,这个DAG记录了RDD之间的依赖关系,借助RDD之间的依赖关系,可以实现数据容错。比如上图中,RDD1是RDD2的父RDD。反之RDD2是RDD1的子RDD。从分区的角度,有父分区和子分区的概念。
即当某个子分区数据丢失,借助RDD之间的依赖关系,可以从上游的父分区进行恢复。
那么上面这5行代码的具体实现是什么呢?
1. 行1:sc是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口,会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。
sc.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。
2. 行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD。
3. 行3:将第2步生成的MapPartittionsRDD再次经过map将每个单词word转为(word,1)的元组。这些元组最终被放到一个MapPartitionsRDD中。
4. 行4:首先会生成一个MapPartitionsRDD,起到m