Spark RDD编程
文章目录
- Spark RDD编程
- 一、RDD操作
- 1.创建操作
- ①从文件创建
- ②从并行集合创建
- 2.转换操作/转换算子/Transformation
- ①map(func):
- ②flatMap(func):
- ③filter(func):
- ④groupByKey():
- ⑤reduceByKey(func):
- 3.控制操作
- ①rdd.cache():
- ②rdd.persist(MEMORY_ONLY)
- ③rdd.persist(MEMORY_AND_DISK)
- ④rdd.unpersist()
- 4.行动操作/行动算子/Action
- ①collect
- ②first
- ③take(n)
- ④count
- ⑤reduce(func)
- ⑥foreach(fucn)
- 练习:
- 练习1.计算销售量
- 练习2.词频统计
- 二、分区
- 1.行动算子saveAsTextFile
- 2.分区
- 3.默认的分区数量,取决于进入spark-shell时的方式
- 4.在创建RDD时指定分区数量
- 三、键值对RDD(Pair RDD)
- 1.keys
- 2.values
- 3.sortByKey()和sortBy
- 4.mapValues(func)
- 5.join
一、RDD操作
1.创建操作
①从文件创建
文件的一行对应RDD的一个元素
a.从本地文件创建
//格式:sc.textFile("file://本地文件绝对路径")
val rdd = sc.textFile("file:///home/centos7/infos.txt")
b.从HDFS文件夹创建
//格式一:sc.textFile("hdfs://HDFS文件绝对路径")
val rdd = sc.textFile("hdfs:///user/centos7/infos.txt")
//格式二:sc.textFile("HDFS文件绝对路径")
val rdd = sc.textFile("/user/centos7/infos.txt")
//格式三:sc.textFile("相对路径"),相当于在相对路径请默认加了“/user/账号/”
val rdd = sc.textFile("infos.txt")
②从并行集合创建
//sc.parallelize(arr),arr必须时集合或者数组/序列
val rdd2 = sc.parallelize(arr)
2.转换操作/转换算子/Transformation
①map(func):
将每个元素传递到函数func中,并将结果返回为一个新的数据集(新RDD的元素个数等于原本RDD的元素个数)
②flatMap(func):
将每个元素传递到函数func中,并将结果 “拍扁” 返回为一个新的数据集(新RDD的元素个数与原本RDD的元素个数无必然联系)
//创建一个数组
scala> val arr = Array("zhangsan lisi wangwu","zhaoliu"))
arr: Array[String] = Array(zhangsan lisi wangwu, zhaoliu)
//将数组转化为RDD
scala> val rdd4 = sc.parallelize(arr)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:26
//查看RDD的全部内容
scala> rdd4.collect
res19: Array[String] = Array(zhangsan lisi wangwu, zhaoliu)
//map算子
scala> rdd4.map(_.split(" ")).collect
res20: Array[Array[String]] = Array(Array(zhangsan, lisi, wangwu), Array(zhaoliu))
//flatMap算子
scala> rdd4.flatMap(_.split(" ")).collect
res21: Array[String] = Array(zhangsan, lisi, wangwu, zhaoliu)
③filter(func):
func返回值必须是布尔类型,将每个元素传递到函数func中,并且将满足func的RDD返回
//创建RDD
scala> val rdd2 = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24
//查看RDD的全部内容
scala> rdd2.collect
res30: Array[Int] = Array(1, 2, 3, 4, 5, 6)
//过滤:将满足条件_%2==0的元素保存下来
scala> rdd2.filter(_%2==0).collect
res31: Array[Int] = Array(2, 4, 6)
scala> rdd2.filter(_%2!=0).collect
res32: Array[Int] = Array(1, 3, 5)
scala> rdd2.filter(_<=3).collect
res33: Array[Int] = Array(1, 2, 3)
④groupByKey():
将相同key的value放在一起,必须要应用在键值对RDD上
//创建RDD
scala> val rdd5 = sc.parallelize(List("hadoop","spark","spark"))
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[35] at parallelize at <console>:24
//查看RDD的全部内容
scala> rdd5.collect
res35: Array[String] = Array(hadoop, spark, spark)
//将RDD的每一个元素变为键值对(元组),//这一类元素是键值对的RDD,称其为键值对RDD(Pair RDD)
scala> rdd5.map((_,1)).collect
res36: Array[(String, Int)] = Array((hadoop,1), (spark,1), (spark,1))
//执行groupByKey算子,将相同key的value放在一起
scala> rdd5.map((_,1)).groupByKey().collect
res41: Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(1, 1)), (hadoop,CompactBuffer(1)))
⑤reduceByKey(func):
将相同key的value调用func,func必须有两个参数
//创建RDD
scala> val rdd5 = sc.parallelize(List("hadoop","spark","spark"))
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[35] at parallelize at <console>:24
//查看RDD的全部内容
scala> rdd5.collect
res35: Array[String] = Array(hadoop, spark, spark)
//将RDD的每一个元素变为键值对(元组),//这一类元素是键值对的RDD,称其为键值对RDD(Pair RDD)
scala> rdd5.map((_,1)).collect
res36: Array[(String, Int)] = Array((hadoop,1), (spark,1), (spark,1))
//执行reduceByKey(_+_)算子,将相同key的value相加
//_+_ ============> (x,y)=>x+y
scala> rdd5.map((_,1)).reduceByKey(_+_).collect
res48: Array[(String, Int)] = Array((spark,2), (hadoop,1))
3.控制操作
持久化数据
①rdd.cache():
将rdd保存在内存中,以便下次使用,等同于调用rdd.persist(MEMORY_ONLY)
②rdd.persist(MEMORY_ONLY)
将rdd保存在内存中,以便下次使用
③rdd.persist(MEMORY_AND_DISK)
将rdd持久化到磁盘,表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上
④rdd.unpersist()
手动地把持久化的RDD从缓存中移除
4.行动操作/行动算子/Action
①collect
返回RDD的所有元素,如果RDD过大,则不推荐使用collect
scala> val rdd6 = sc.parallelize(Array("spark","hadoop","scala"))
rdd6: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[55] at parallelize at <console>:24
scala> rdd6.collect
res57: Array[String] = Array(spark, hadoop, scala)
②first
返回rdd的第一个元素
scala> rdd6.first
res59: String = spark
③take(n)
返回rdd的前n个元素
scala> rdd6.take(2)
res60: Array[String] = Array(spark, hadoop)
scala> rdd6.take(1)
res61: Array[String] = Array(spark)
scala> rdd6.take(3)
res62: Array[String] = Array(spark, hadoop, scala)
scala> rdd6.take(10000)
res63: Array[String] = Array(spark, hadoop, scala)
④count
返回rdd元素个数
scala> rdd.count
res64: Long = 5
⑤reduce(func)
通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
scala> val rdd7 = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[58] at parallelize at <console>:24
scala> rdd7.reduce(_+_)
res66: Int = 45
scala> rdd7.reduce(_-_)
res67: Int = 17
⑥foreach(fucn)
遍历,将数据集中的每个元素传递到函数func中运行
scala> rdd6.foreach(println(_))
spark
hadoop
scala
练习:
练习1.计算销售量
现有某书店spark和hadoop书籍五天销售量,请计算每本书5天销售总量。
书籍 | 第一天销量 | 第二天销量 | 第三天销量 | 第四天销量 | 第五天销量 |
---|---|---|---|---|---|
Spark编程基础 | 24 | 44 | 17 | 22 | 31 |
Hadoop原理 | 16 | 33 | 21 | 22 | 18 |
//数据的格式
//("spark",24),("spark",44),("spark",17),("spark",22),("spark",31)
//("hadoop",16),("hadoop",33),("hadoop",21),("hadoop",22),("hadoop",18)
//定义数组
scala> var arr = Array(("spark",24),("spark",44),("spark",17),("spark",22),("spark",31),("hadoop",16),("hadoop",33),("hadoop",21),("hadoop",22),("hadoop",18))
arr: Array[(String, Int)] = Array((spark,24), (spark,44), (spark,17), (spark,22), (spark,31), (hadoop,16), (hadoop,33), (hadoop,21), (hadoop,22), (hadoop,18))
//转换为RDD
scala> var rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26
//查看RDD内容
scala> rdd.collect
res0: Array[(String, Int)] = Array((spark,24), (spark,44), (spark,17), (spark,22), (spark,31), (hadoop,16), (hadoop,33), (hadoop,21), (hadoop,22), (hadoop,18))
//相同key的值相加
scala> rdd.reduceByKey(_+_).collect
res2: Array[(String, Int)] = Array((spark,138), (hadoop,110))
练习2.词频统计
现有一段话
What is Apache Spark
Apache Spark is a multi-language engine for executing data engineering data science
Apache Spark integrates with your favorite frameworks helping to scale them to thousands of machines
请统计这段话中每个单词出现的次数
//从文件创建RDD
scala> val rdd = sc.textFile("file:///home/centos7/word.txt")
rdd: org.apache.spark.rdd.RDD[String] = file:///home/centos7/word.txt MapPartitionsRDD[4] at textFile at <console>:24
//查看RDD内容
scala> rdd.collect
res3: Array[String] = Array("What is Apache Spark ", Apache Spark is a multi-language engine for executing data engineering data science, Apache Spark integrates with your favorite frameworks helping to scale them to thousands of machines)
//flatMap(_.split(" ")) 拆分
scala> rdd.flatMap(_.split(" ")).collect
res4: Array[String] = Array(What, is, Apache, Spark, Apache, Spark, is, a, multi-language, engine, for, executing, data, engineering, data, science, Apache, Spark, integrates, with, your, favorite, frameworks, helping, to, scale, them, to, thousands, of, machines)
//map(x=>(x,1)) 转换为键值对(key, value),给每个单词标记一个1
scala> rdd.flatMap(_.split(" ")).map(x=>(x,1)).collect
res5: Array[(String, Int)] = Array((What,1), (is,1), (Apache,1), (Spark,1), (Apache,1), (Spark,1), (is,1), (a,1), (multi-language,1), (engine,1), (for,1), (executing,1), (data,1), (engineering,1), (data,1), (science,1), (Apache,1), (Spark,1), (integrates,1), (with,1), (your,1), (favorite,1), (frameworks,1), (helping,1), (to,1), (scale,1), (them,1), (to,1), (thousands,1), (of,1), (machines,1))
//reduceByKey(_+_) 相同key的值相加
scala> rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect
res7: Array[(String, Int)] = Array((scale,1), (is,2), (executing,1), (Apache,3), (with,1), (data,2), (science,1), (integrates,1), (machines,1), (multi-language,1), (What,1), (them,1), (engine,1), (favorite,1), (Spark,3), (a,1), (helping,1), (to,2), (engineering,1), (frameworks,1), (of,1), (for,1), (thousands,1), (your,1))
二、分区
1.行动算子saveAsTextFile
saveAsTextFile(“路径”)
根据路径保存后是一个文件夹,而不是文件;在文件夹中有两类文件:
①part-00000 ,以part开头的文件保存着数据,part开头的文件的个数由RDD的分区所决定,一个分区对应生成一个part文件
②_SUCCESS ,大小为0,用来表示成功
2.分区
//查看分区数量
scala> rdd.partitions.size
res10: Int = 2
//修改分区数量
scala> val rdd2 = rdd.repartition(4)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at repartition at <console>:26
scala> rdd2.partitions.size
res14: Int = 4
3.默认的分区数量,取决于进入spark-shell时的方式
spark-shell --master <master-url>
master-url | 含义 |
---|---|
local | spark-shell --master local 使用一个Worker线程本地化运行SPARK(完全不并行) |
local[*] | spark-shell spark-shell --master local[*] 使用逻辑CPU个数数量的线程来本地化运行Spark,等同于什么都不加 |
local[K] | spark-shell --master local[k] 使用K个Worker线程本地化运行Spark(理想情况下,K应该根据运行机器的CPU核数设定) |
4.在创建RDD时指定分区数量
scala> val arr = Array(1,2,3,4,5,6)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6)
//在创建RDD时指定分区数量为4
scala> val rdd = sc.parallelize(arr,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> rdd.partitions.size
res2: Int = 4
//在创建RDD时指定分区数量为3
scala> val rdd2 = sc.textFile("file:///home/centos7/word.txt",3)
rdd2: org.apache.spark.rdd.RDD[String] = file:///home/centos7/word.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd2.partitions.size
res3: Int = 3
三、键值对RDD(Pair RDD)
含义:RDD中的每一个元素都是键值对(key, value)
1.keys
返回键值对RDD的所有key
//创建数组
scala> var arr = Array(("spark",24),("spark",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (spark,44), (hadoop,16))
//创建键值对RDD
scala> val rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:26
//查看RDD的所有元素
scala> rdd.collect
res4: Array[(String, Int)] = Array((spark,24), (spark,44), (hadoop,16))
//keys:返回键值对RDD的所有key
scala> rdd.keys.collect
res6: Array[String] = Array(spark, spark, hadoop)
2.values
values:返回键值对RDD的所有value
//创建数组
scala> var arr = Array(("spark",24),("spark",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (spark,44), (hadoop,16))
//创建键值对RDD
scala> val rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:26
//查看RDD的所有元素
scala> rdd.collect
res4: Array[(String, Int)] = Array((spark,24), (spark,44), (hadoop,16))
//values:返回键值对RDD的所有value
scala> rdd.values.collect
res8: Array[Int] = Array(24, 44, 16)
3.sortByKey()和sortBy
sortByKey():根据键key排序
sortBy():根据指定内容排序
scala> var arr = Array(("spark",24),("javascript",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))
scala> var rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:26
scala> rdd.collect
res14: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))
//根据key按照字典序排序(降序)
scala> rdd.sortByKey(false).collect
res15: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))
//根据key按照字典序排序(升序)
scala> rdd.sortByKey().collect
res16: Array[(String, Int)] = Array((hadoop,16), (javascript,44), (spark,24))
//根据键值对的value排序(默认,升序)
scala> rdd.sortBy(_._2).collect
res24: Array[(String, Int)] = Array((hadoop,16), (spark,24), (javascript,44))
//根据键值对的key排序(默认,升序)
scala> rdd.sortBy(_._1).collect
res25: Array[(String, Int)] = Array((hadoop,16), (javascript,44), (spark,24))
//根据键值对的value排序(降序)
scala> rdd.sortBy(_._2,false).collect
res26: Array[(String, Int)] = Array((javascript,44), (spark,24), (hadoop,16))
4.mapValues(func)
针对键值对中的value执行函数func
scala> var arr = Array(("spark",24),("javascript",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))
scala> var rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:26
scala> rdd.collect
res14: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))
//给键值对中的每个value乘10
scala> rdd.mapValues(_*10).collect
res28: Array[(String, Int)] = Array((spark,240), (javascript,440), (hadoop,160))
//给键值对中的每个value加1
scala> rdd.mapValues(_+1).collect
res30: Array[(String, Int)] = Array((spark,25), (javascript,45), (hadoop,17))
5.join
join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
scala> var arr = Array(("spark",24),("javascript",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))
scala> var rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:26
scala> var arr2 = Array(("spark",24),("hadoop",16))
arr2: Array[(String, Int)] = Array((spark,24), (hadoop,16))
scala> var rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:26
scala> rdd.collect
res32: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))
scala> rdd2.collect
res33: Array[(String, Int)] = Array((spark,24), (hadoop,16))
//将rdd和rdd2进行合并,两个RDD中都有的key会被保留
scala> rdd.join(rdd2)
res34: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[41] at join at <console>:33
scala> rdd.join(rdd2).collect
res35: Array[(String, (Int, Int))] = Array((spark,(24,24)), (hadoop,(16,16)))=