简述
SparkRDD中可以包含任何类型的对象,在实际应用中,“键值对”是一种比较常见的RDD元素类型,分组和聚合操作中经常会用到,尤其是groupByKey和reduceByKey。
Spark操作中经常会用到“键值对RDD”(Pair RDD),用于完成聚合计算。普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对”。
生产环境用到的操作
以下为我在生产环境用到的操作
WordCount:
统计文本中每个单词出现的次数,使用Pair RDD将每个单词作为键,将出现次数作为值,然后进行reduceByKey操作进行聚合。
分组聚合:
将具有相同键的元素分组在一起,并对每个键的值进行聚合操作,如groupByKey、reduceByKey等。
数据连接和关联:
使用键值对进行数据的连接和关联操作,如join、cogroup等。
数据预处理:
对数据进行分组、排序、过滤等预处理操作,如groupBy、sortByKey、filter等。
数据分析和统计:
使用Pair RDD进行数据分析和统计操作,如计算平均值、求和、最大值、最小值等。 通过Pair RDD,可以更方便地处理键值对数据,实现更灵活和复杂的数据处理和分析需求。
Pair RDD的创建方式
第一种:从文件中加载数据创建pairRDD
//测试数据,自己编的,文件名为personID
591,2021,15448329898,北京,彩信
592,2022,15648029823,河北,微信
593,2022,16742329894,山西,电话
594,2020,17748529893,海南,微信
595,2020,19048729896,大连,QQ
代码及运行结果
scala> val lines = sc.textFile("file:///data/testdata/personID.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///data/testdata/personID.txt MapPartitionsR DD[1] at textFile at <console>:23
scala> val pairRDD = lines.flatMap(elem => (elem + 1))
pairRDD: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[2] at flatMap at <console>:23
scala> val pairRDD = lines.flatMap(line => line.split(",")).map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>: 23
scala> pairRDD.foreach(println)
(591,1)0:> (0 + 1) / 1]
(2023,1)
(15448329898,1)
(北京,1)
(彩信,1)
(592,1)
......
从代码执行结果来看:
返回的结果是键值对类型的RDD,即RDD[(String, Int)]。从pairRDD.foreach(println)执行的打印输出结果也可以看到,都是由(单词,1)这种形式的键值对。
第二种:通过数组Array或集合List创建pairRDD
案例:
//使用array数组
scala> val array = Array("spark", "hadoop", "flink", "hive")
array: Array[String] = Array(spark, hadoop, flink, hive)
scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val pairRDD = rdd.map(word =>(word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:23
scala> pairRDD.foreach(println)
(spark,1)
(hadoop,1)
(flink,1)
(hive,1)
//使用list集合
scala> val list = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:23 ^
scala> pairRDD.foreach(println)
(hadoop,1)
(spark,1)
(hive,1)
常用键值对转换操作
常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等
reduceByKey(func)
功能:使用func函数合并具有相同键的值。注意,这里强调合并相同键。
比如,reduceByKey((a,b) => a+b),有五个键值对(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)
对具有相同key的键值对进行合并后的结果就是:
(spark,1)
(hadoop,2)
(nlp,3)
我们对上面第二种方式创建List集合得到的pairRDD进行reduceByKey()操作,代码如下:
scala> val list = List("nlp","nlp","spark","nlp","hadoop","hadoop")
list: List[String] = List(nlp, nlp, spark, nlp, hadoop, hadoop)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:23
scala> pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)
scala> pairRDD.reduceByKey((a,b) => a + b).foreach(println)
(spark,1)
(hadoop,2)
(nlp,3)
groupByKey()
功能:对具有相同键的值进行分组。注意,这里强调对相同的键分成一组。
比如,groupByKey((a,b) => a+b),有五个键值对(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)
我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作,代码如下:
scala> pairRDD.groupByKey()
res17: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at <console>:24
// 分组后,value被保存到Iterable[Int]中
scala> pairRDD.groupByKey().foreach(println)
(spark,CompactBuffer(1))
(hadoop,CompactBuffer(1, 1))
(nlp,CompactBuffer(1, 1, 1))
keys
功能:会把键值对RDD中的key返回形成一个新的RDD。
scala> pairRDD.keys
res20: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at keys at <console>:24
scala> pairRDD.keys.foreach(println)
nlp
nlp
spark
nlp
hadoop
hadoop
可以对返回的key的集合进行操作,比如说写入一个List集合中
scala> val prirRDDkeysList = pairRDD.keys.collect().toList
prirRDDkeysList: List[String] = List(nlp, nlp, spark, nlp, hadoop, hadoop)
scala> val prirRDDkeysArray = pairRDD.keys.collect()
prirRDDkeysArray: Array[String] = Array(nlp, nlp, spark, nlp, hadoop, hadoop)
values
功能: 把键值对RDD中的value返回形成一个新的RDD。
scala> pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)
scala> pairRDD.values.foreach(println)
1
1
1
1
1
1
将得到的值保存到数组或集合中
scala> val prirRDDValuesList = pairRDD.values.collect().toList
prirRDDValuesList: List[Int] = List(1, 1, 1, 1, 1, 1)
scala> val prirRDDValueArray = pairRDD.values.collect()
prirRDDValueArray: Array[Int] = Array(1, 1, 1, 1, 1, 1)
注意
为什么会报错value collect is not a member of Unit ,因为foreach
方法返回的是Unit
类型,它没有collect
方法。
scala> val prirRDDValuesList = pairRDD.values.foreach(println).collect().toList
:26: error: value collect is not a member of Unit
val prirRDDValuesList = pairRDD.values.foreach(println).collect().toList
工作中使用collect()导致的内存不足调优:
当处理大数据集时,可以考虑使用Spark的分布式计算能力来处理数据,而不是将所有数据收集到驱动程序中。这样可以避免内存不足的问题。
我使用collect
方法将这个RDD
中的元素收集到驱动程序,并返回一个数组。如果pairRDD
中的数据量很大,collect
操作可能会导致内存不足的问题,建议在处理大数据集时,谨慎使用collect
方法。我们可以用很多方法来避免:
- 使用RDD转换操作:可以使用各种RDD转换操作,如
map
、filter
、reduceByKey
等,对数据集进行转换和聚合操作。这些操作在分布式环境下进行,可以利用集群中的多个节点进行计算。 - 使用RDD的
collect
和take
方法:如果只需要获取部分数据,可以使用collect
方法将数据收集到驱动程序中,确保数据量不会导致内存不足,可以使用take
方法获取RDD中的前几个元素。 - 使用RDD的
sample
方法:可以使用sample
方法对数据进行采样,从而获取数据集的一个子集。这样可以在处理大数据集时降低计算和内存的压力。 - 使用Spark SQL或DataFrame:如果数据集结构化且存储在支持Spark SQL的数据源中,可以使用Spark SQL或DataFrame API进行数据操作和分析。这些API提供了更高级的数据操作和查询功能。
- 使用持久化存储:如果需要将处理结果保存下来或供其他程序使用,可以将结果存储在持久化存储系统中,如HDFS或数据库。这样可以避免将所有数据收集到驱动程序中。 利用集群中的计算资源进行并行计算,避免将所有数据收集到驱动程序中,可以使用RDD转换操作、采样、分页获取等技术来处理数据。
sortByKey()
功能:是返回一个根据键排序的RDD。
scala> pairRDD.sortByKey().foreach(println)
(hadoop,1)
(hadoop,1)
(nlp,1)
(nlp,1)
(nlp,1)
(spark,1)
mapValues(func) (常用)
功能:对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。
即我只对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。例如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。
scala> pairRDD.mapValues(a => a*2).foreach(println)
(nlp,2)
(nlp,2)
(spark,2)
(nlp,2)
(hadoop,2)
(hadoop,2)
join (常用)
功能:对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
比如,pairRDD1是一个键值对集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一个键值对集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{("spark",1,"fast"),("spark",2,"fast")}。
案例代码:
scala> val paRDD1 = sc.parallelize(Array(("spark",2),("hadoop",3),("spark",1),("hive",4),("hadoop",2)))
paRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[28] at parallelize at <console>:26
scala> val paRDD2 = sc.parallelize(Array(("spark","nicetry"),("hadoop","good"),("spark",234),("hive",2314),("hadoop","ohho")))
paRDD2: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[29] at parallelize at <console>:26
scala> paRDD1.join(paRDD2).foreach(println)
(spark,(2,nicetry))
(spark,(2,234))
(spark,(1,nicetry))
(spark,(1,234))
(hive,(4,2314))
(hadoop,(3,good))
(hadoop,(3,ohho))
(hadoop,(2,good))
(hadoop,(2,ohho))
eg:现在来看林子雨教授讲解的是真清晰,温故而知新。
一个完整实例-计算每种图书的每天平均销量
思路
计算一天中各种类图书卖出去的平均值,键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量
步骤
1、构建数组,包含对应键值对,调用parallelize方法生成 RDD
2、针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。
(注:collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。)
3、调用reduceByKey()函数,此处必须要十分准确地理解reduceByKey()函数的功能 => 合并具有相同键的值。
reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式(x,y) => (x._1+y._1,x._2 + y._2),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value。
4、 计算最终结果。对得到的几个键值对构成的RDD执行mapValues()操作,得到每种书的每天平均销量。mapValues,key不变,只对值记性操作。value会被赋值给Lamda表达式x => (x._1 / x._2中的x,x的值就是(22,2),x._1就是22,表示hadoop书总销量是22,x._2就是2,表示2天,因此,hadoop书籍的每天平均销量就是x._1 / x._2,也就是11。mapValues()输出的一个键值对就是("hadoop",11),其他同理。
代码
//构建书籍及销量
scala> val books = sc.parallelize(Array(("book1",5),("book2",10),("book3",8),("book1",6),("book2",12)))
books: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:23
// 统计
scala> val sum_books = books.mapValues(x => (x,1)).foreach(println)
(book1,(5,1))
(book2,(10,1))
(book3,(8,1))
(book1,(6,1))
(book2,(12,1))
sum_books: Unit = ()
//计算出现次数,value中,前面是总数,后面是天数,如(11,2),表示2天卖出11本
scala> val average_books = books.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2)).foreach(println)
(book1,(11,2))
(book3,(8,1))
(book2,(22,2))
average_books: Unit = ()
//平均值统计
scala> val average_books = books.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2)).mapValues(x => x._1 / x._2).foreach(println)
(book1,5)
(book3,8)
(book2,11)
average_books: Unit = ()