RDD是Spark的核心概念,它是一个只读的、可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用。Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作,从而实现各种复杂的应用。
一、RDD创建
Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址、分布式文件系统HDFS的地址,或者是Amazon S3的地址等。
1. 从文件系统中加载数据创建RDD
(1)从本地文件系统中加载数据
// spark-shell交互式环境中,执行
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/w
ord.txt MapPartitionsRDD[12] at textFile at <console>:2
“lines: org.apache.spark.rdd.RDD[String]……”是命令执行后返回的信息,从中可以看出,执行sc.textFile()方法以后,Spark从本地文件word.txt中加载数据到内存,在内存中生成一个RDD对象lines,lines是org.apache.spark.rdd.RDD这个类的一个实例,这个RDD里面包含了若干个元素,每个元素的类型是String类型,也就是说,从word.txt文件中读取出来的每一行文本内容,都成为RDD中的一个元素,如果word.txt中包含了1000行,那么,lines这个RDD中就会包含1000个String类型的元素。
(2)从分布式文件系统HDFS中加载数据
在HDFS中已经创建了与当前Linux系统登录用户hadoop对应的用户目录“/user/hadoop”。启动HDFS,就可以让Spark对HDFS中的数据进行操作。
// 从HDFS中加载数据的命令如下(下面3条语句是完全等价的,可以使用其中任意一种方式)
scala> val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> val lines = sc.textFile("/user/hadoop/word.txt")
scala> val lines = sc.textFile("word.txt")
2. 通过并行集合(数组)创建RDD
调用SparkContext的parallelize方法,从一个已经存在的集合(数组)上创建RDD。
创建方式一:
scala> val array = Array(1,2,3,4,5)
scala> val rdd = sc.parallelize(array)
创建方式二:
scala> val list = List(1,2,3,4,5)
scala> val rdd = sc.parallelize(list)
二、RDD操作
RDD操作包括两种类型,即转换(Transformation操作)和行动(Action操作)。
1,转换操作
对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个操作使用。RDD的转换过程是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。
常用的RDD转换操作API:
(1)·filter(func)
filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集。
在第1行语句中,执行sc.textFile()方法把word.txt文件中的数据加载到内存生成一个RDD,即lines,这个RDD中的每个元素都是String类型,即每个RDD元素都是一行文本内容。在第2行语句中,执行lines.filter()操作,filter()的输入参数line =>line.contains(“Spark”)是一个匿名函数,或者被称为“λ表达式”。lines.filter(line =>line.contains(“Spark”))操作的含义是,依次取出lines这个RDD中的每个元素,对于当前取到的元素,把它赋值给λ表达式中的line变量,然后,执行λ表达式的函数体部分line.contains(“Spark”),如果line中包含"Spark"这个单词,就把这个元素加入到新的RDD(即linesWithSpark)中,否则,就丢弃该元素。最终,新生成的RDD(即linesWithSpark)中的所有元素,都包含了单词"Spark"。
(2)·map(func)
map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集。
示例一:
scala> data=Array(1,2,3,4,5)
scala> val rdd1= sc.parallel
ize(data)scala> val rdd2=rdd1.map(x=>x+10)
第1行语句创建了一个包含5个Int类型元素的数组data。第2行语句执行sc.parallelize(),从数组data中生成一个RDD,即rdd1,rdd1中包含了5个Int类型的元素,即1、2、3、4、5。第3行语句执行rdd1.map()操作,map()的输入参数“x=>x+10”是一个λ表达式。rdd1.map(x=>x+10)的含义是,依次取出rdd1这个RDD中的每个元素,对于当前取到的元素,把它赋值给λ表达式中的变量x,然后,执行λ表达式的函数体部分“x+10”,也就是把变量x的值和10相加后,作为函数的返回值,并作为一个元素放入到新的RDD(即rdd2)中。最终,新生成的RDD(即rdd2),包含了5个Int类型的元素,即11、12、13、14、15。
示例二:
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val words=lines.map(line => line.split(" "))
在第1行语句中,执行sc.textFile()方法把word.txt文件中的数据加载到内存生成一个RDD,即lines,这个RDD中的每个元素都是String类型,即每个RDD元素都是一行文本,比如,lines中的第1个元素是"Hadoop is good",第2个元素是"Spark is fast",第3个元素是"Spark is better"。在第2行语句中,执行lines.map()操作,map()的输入参数line =>line.split(" “)是一个λ表达式。lines.map(line => line.split(” “))的含义是,依次取出lines这个RDD中的每个元素,对于当前取到的元素,把它赋值给λ表达式中的line变量,然后,执行λ表达式的函数体部分line.split(” “)。因为line是一行文本,比如"Hadoop is good”,一行文本中包含了很多个单词,单词之间以空格进行分隔,所以,line.split(" ")的功能是,以空格作为分隔符把line拆分成一个个单词,拆分后得到的单词都封装在一个数组对象中,成为新的RDD(即words)的一个元素。例如,“Hadoop is good"被拆分后,得到的"Hadoop”"is"和"good"三个单词,会被封装到一个数组对象中,即Array(“Hadoop”, “is”,“good”),成为words这个RDD的一个元素。
(3)·flatMap(func)
flatMap(func)与map()相似,但每个输入元素都可以映射到0或多个输出结果。
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val words=lines.flatMap(line => line.split(" "))
在第1行语句中,执行sc.textFile()方法把word.txt文件中的数据加载到内存生成一个RDD,即lines,这个RDD中的每个元素都是String类型,即每个RDD元素都是一行文本。在第2行语句中,执行lines.flatMap()操作,flatMap()的输入参数line => line.split(" “)是一个λ表达式。lines.flatMap(line => line.split(” “))的结果,等价于如下两步操作的结果
**第1步:**map()。执行lines.map(line => line.split(” “))操作,从lines转换得到一个新的RDD(即wordArray),wordArray中的每个元素都是一个数组对象。例如,第1个元素是Array(“Hadoop”, “is”,“good”),第2个元素是Array(“Spark”, “is”, “fast”),第3个元素是Array(“Spark”, “is”, “better”)。
第2步:拍扁(flat)。flatMap()操作中的“flat”是一个很形象的动作——“拍扁”,也就是把wordArray中的每个RDD元素都“拍扁”成多个元素,最终,所有这些被拍扁以后得到的元素,构成一个新的RDD,即words。例如,wordArray中的第1个元素是Array(“Hadoop”, “is”, “good”),被拍扁以后得到3个新的String类型的元素,即"Hadoop”“is"和"good”;wordArray中的第2个元素是Array(“Spark”, “is”, “fast”),被拍扁以后得到三个新的元素,即"Spark"“is"和"fast”;wordArray中的第3个元素是Array(“Spark”, “is”, “better”),被拍扁以后得到三个新的元素,即"Spark"“is"和"better”。最终,这些被拍扁以后得到的9个String类型的元素构成一个新的RDD(即words),也就是说,words里面包含了9个String类型的元素,分别是"Hadoop"“is”“good”“Spark”“is”“fast”“Spark”“is"和"better”。
(4)·groupByKey()
groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集。
上图所示,名称为words的RDD中包含了9个元素,每个元素都是<String,Int>类型,也就是(K,V)键值对类型。words. groupByKey()操作执行以后,所有key相同的键值对,它们的value都被归并到一起。例如,(“is”,1)、(“is”,1)、(“is”,1)这3个键值对的key相同,就会被归并成一个新的键值对(“is”,(1,1,1)),其中,key是"is",value是(1,1,1),而且,value会被封装成Iterable(一种可迭代集合)。
(5)·reduceByKey(func)
reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合后得到的结果。
上图所示,名称为words的RDD中包含了9个元素,每个元素都是<String,Int>类型,也就是(K,V)键值对类型。words. reduceByKey((a,b)=>a+b)操作执行以后,所有key相同的键值对,它们的value首先被归并到一起。例如,(“is”,1)、(“is”,1)、(“is”,1)这3个键值对的key相同,就会被归并成一个新的键值对(“is”,(1,1,1)),其中,key是"is",value是一个value-list,即(1,1,1)。然后,使用func函数把(1,1,1)聚合到一起,这里的func函数是一个λ表达式,即(a,b)=>a+b,它的功能是把(1,1,1)这个value-list中的每个元素进行汇总求和,首先,把value-list中的第1个元素(即1)赋值给参数a,把value-list中的第2个元素(也是1)赋值给参数b,执行a+b得到2,然后,继续对value-list中的元素执行下一次计算,把刚才求和得到的2赋值给a,把value-list中的第3个元素(即1)赋值给b,再次执行a+b得到3。最终,就得到聚合后的结果(“is”,3)。
2,行动操作
行动操作是真正触发计算的地方。Spark程序只有执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。表4-2列出了常用的RDD行动操作API。
常用的RDD行动操作API:
scala> val rdd=sc.parallelize(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1] at parallelize at
<console>:24
scala> rdd.count()
res0: Long = 5
scala> rdd.first()
res1: Int = 1
scala> rdd.take(3)
res2: Array[Int] = Array(1,2,3)
scala> rdd.reduce((a,b)=>a+b)
res3: Int = 15
scala> rdd.collect()
res4: Array[Int] = Array(1,2,3,4,5)
scala> rdd.foreach(elem=>println(elem))
1
2
3
4
5
首先使用sc.parallelize(Array(1,2,3,4,5))生成了一个RDD,变量名称为rdd,rdd中包含了5个元素,分别是1、2、3、4和5,所以,rdd.count()语句执行以后返回的结果是5。执行rdd.first()语句后,会返回第1个元素,即1。当执行完rdd.take(3)语句以后,会以数组的形式返回rdd中的前3个元素,即Array(1,2,3)。执行完rdd.reduce((a,b)=>a+b)语句后,会得到把rdd中的所有元素(即1、2、3、4、5)进行求和以后的结果,即15。在执行rdd.reduce((a,b)=>a+b)时,系统会把rdd的第1个元素1传入给参数a,把rdd的第2个元素2传入给参数b,执行a+b计算得到求和结果3;然后,把这个求和的结果3传入给参数a,把rdd的第3个元素3传入给参数b,执行a+b计算得到求和结果6;然后,把6传入给参数a,把rdd的第4个元素4传入给参数b,执行a+b计算得到求和结果10;最后,把10传入给参数a,把rdd的第5个元素5传入给参数b,执行a+b计算得到求和结果15。接下来,执行rdd.collect(),以数组的形式返回rdd中的所有元素,可以看出,执行结果是一个数组Array(1,2,3,4,5)。在这个实例的最后,执行了语句rdd.foreach(elem=>println(elem)),该语句会依次遍历rdd中的每个元素,把当前遍历到的元素赋值给变量elem,并使用println(elem)打印出elem的值。实际上,rdd.foreach(elem=>println(elem))可以被简化成rdd.foreach(println),效果是一样的。
注意:当采用Local模式在单机上执行时,rdd.foreach(println)语句会打印出一个RDD中的所有元素。但是,当采用集群模式执行时,在Worker节点上执行打印语句是输出到Worker节点的stdout中,而不是输出到任务控制节点Driver中,因此,任务控制节点Driver中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有Worker节点上的打印输出信息也显示到Driver中,就需要使用collect()方法。例如,rdd.collect().foreach(println)。但是,由于collect()方法会把各个Worker节点上的所有RDD元素都抓取到Driver中,因此,这可能会导致Driver所在节点发生内存溢出。因此,当只需要打印RDD的部分元素时,可以采用类似rdd.take(100).foreach(println)这样的语句。
三、RDD持久化
在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算,这对于迭代计算而言,代价是很大的,因为迭代计算经常需要多次重复使用同一组数据。
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd:org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at
<console>:29
scala> println(rdd.count()) //行动操作,触发一次真正从头到尾的计算 3
scala> println(rdd.collect().mkString(",")) //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hiv
可以通过持久化(缓存)机制来避免这种重复计算的开销。具体方法是使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中,被后面的行动操作重复使用。
persist()
使用persist()方法标记一个持久化的RDD,一旦被一个执行(action)触发计算,它将会被保留在计算节点的内存中并重用。如果RDD的任一分区丢失,通过使用原先创建的转换操作,它将会被自动重算,不需要全部重算,而只计算丢失的部分。persist()的圆括号中包含的是持久化级别参数
·persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容;
·persist(MEMORY_AND_DISK):表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在磁盘上。
一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。针对上面的实例,增加持久化语句以后的执行过程如下:
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
scala> rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中 3
scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
持久化RDD会占用内存空间,当不再需要一个RDD时,就可以使用unpersist()方法手动地把持久化的RDD从缓存中移除,释放内存空间。
四、RDD分区
1. 分区的作用
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。
上图所示:一个集群中包含4个工作节点(Worker Node),分别是WorkerNode1、WorkerNode2、WorkerNode3和WorkerNode4,假设有两个RDD,即rdd1和rdd2,其中,rdd1包含5个分区,即p1、p2、p3、p4和p5,rdd2包含3个分区,即p6、p7和p8。
对RDD进行分区,第一个功能是增加并行度。rdd2的3个分区p6、p7和p8,分布在3个不同的工作节点WorkerNode2、WorkerNode3和WorkerNode4上面,就可以在这3个工作节点上分别启动3个线程对这3个分区的数据进行并行处理,从而增加了任务的并行度。
对RDD进行分区的第二个功能是减少通信开销。在分布式系统中,通信的代价是巨大的,控制数据分布以获得最少的网络传输可以极大地提升整体性能。Spark程序可以通过控制RDD分区方式来减少网络通信的开销。
2. 分区的原则
RDD分区的一个原则是使分区的个数尽量等于集群中的CPU核心(Core)数目。对于不同的Spark部署模式而言(Local模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目。
一般而言,各种模式下的默认分区数目如下:
(1)Local模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
(2)Standalone或YARN模式:在“集群中所有CPU核心数目总和”和“2”这二者中取较大值作为默认值;
(3)Mesos模式:默认的分区数为8。
3. 设置分区的个数
可以手动设置分区的数量,主要包括两种方式:(1)创建RDD时手动指定分区个数;(2)使用reparititon方法重新设置分区个数。
(1)创建RDD时手动指定分区个数
在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下:
sc.textFile(path, partitionNum)
其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。
scala> val array = Array(1,2,3,4,5)
scala> val rdd = sc.parallelize(array,2) //设置两个分区
对于parallelize()而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism。对于textFile()而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。如果是从HDFS中读取文件,则分区数为文件分片数(例如,128MB/片)。
(2)使用reparititon方法重新设置分区个数
通过转换操作得到新RDD时,直接调用repartition方法。
scala> val data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)
data: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt
MapPartitionsRDD[12] at textFile at <console>:24 scala> data.partitions.size //显示data个RDD的分区数量
res2: Int=2
scala> val rdd = data.repartition(1) //对data这个RDD进行重新分区 rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :26
scala> rdd.partitions.size
res4: Int = 1
4,自定义分区方法
Spark提供了自带的哈希分区(HashPartitioner)与区域分区(RangePartitioner),能够满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过提供一个自定义的Partitioner对象来控制RDD的分区方式,从而利用领域知识进一步减少通信开销。需要注意的是,Spark的分区函数针对的是(key,value)类型的RDD,也就是说,RDD中的每个元素都是(key,value)类型,然后,分区函数根据key对RDD元素进行分区。因此,当需要对一些非(key,value)类型的RDD进行自定义分区时,需要首先把RDD元素转换为(key,value)类型,然后再使用分区函数。
要实现自定义分区,需要定义一个类,这个自定义类需要继承org.apache.spark.Partitioner类,并实现下面3个方法:
(1)numPartitions: Int返回创建出来的分区数;
(2)getPartition(key: Any): Int返回给定键的分区编号(0到numPartitions-1);
(3)equals(): Java判断相等性的标准方法。
示例:
要求根据key值的最后一位数字写到不同的文件中。例如,10写入到part-00000,11写入到part-00001,12写入到part-00002。请创建一个代码文件TestPartitioner.scala。
import org.apache.spark.{Partitioner, SparkContext, SparkConf}
//自定义分区类,需要继承org.apache.spark.Partitioner类
class MyPartitioner(numParts:Int) extends Partitioner{
//覆盖分区数
override def numPartitions: Int = numParts
//覆盖分区号获取函数
override def getPartition(key: Any): Int = {
key.toString.toInt%10
}
}
object TestPartitioner {
def main(args: Array[String]) {
val conf=new SparkConf()
val sc=new SparkContext(conf) //模拟5个分区的数据
val data=sc.parallelize(1 to 10,5) //根据尾号转变为10个分区,分别写到10个文件
data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")
}
}
val data=sc.parallelize(1 to 10,5)这行代码执行后,会生成一个名称为data的RDD,这个RDD中包含了1、2、3、…、9、10等10个Int类型的元素,并被分成5个分区。data.map(( ,1))表示把data中的每个Int类型元素取出来,转换成(key,value)类型,比如,把1这个元素取出来以后转换成(1,1),把2这个元素取出来以后转换成(2,1),因为,自定义分区函数要求RDD元素的类型必须是(key,value)类型。partitionBy(new MyPartitioner(10))表示调用自定义分区函数,把(1,1)、(2,1)、(3,1)、…、(10,1)这些RDD元素根据尾号分成10个分区。分区完成以后,再使用map(_._1),把(1,1)、(2,1)、(3,1)、…、(10,1)等(key,value)类型元素的key提取出来,得到1,2,3,…,9,10。最后调用saveAsTextFile()方法把RDD的10个Int类型的元素写入到本地文件中。
使用sbt工具对TestPartitioner.scala进行编译打包,并使用spark-submit命令提交到Spark中运行。运行结束后可以看到,在本地文件系统的“file:///usr/local/spark/mycode/rdd/partitioner”目录下面,会生成part-00000、part-00001、part-00002、…、part-00009和_SUCCESS等文件,其中,part-00000文件中包含了数字10,part-00001文件中包含了数字1,part-00002文件中包含了数字2。
文章来源:《Spark编程基础》 作者:林子雨
文章内容仅供学习交流,如有侵犯,联系删除哦!