文章目录
- Spark 行动算子
- 1、reduce
- 2、collect
- 3、count
- 4、first
- 5、take
- 6、takeOrdered
- 7、代码示例
- 8、aggregate
- 9、fold
- 10、countByValue & countByKey (wordcount重点)
Spark 行动算子
所谓的行动算子,其实就是触发作业执行的方法,之前的转换算子是不能直接触发执行的,形成了一个个新的RDD。行动算子比较少,就那么几个。
注意:
因为行动算子是直接触发执行的,并不是返回RDD
,所以不能使用collect()
方法,得使用println()
打印到控制台
1、reduce
聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
package com.atguigu.bigdata.spark.core.wc.action
import org.apache.spark.{SparkConf, SparkContext}
// RDD 行动算子 reduce:聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
class Spark02_RDD_reduce {
}
object Spark02_RDD_reduce{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD行动算子")
val context = new SparkContext(conf)
val rdd = context.makeRDD(List(1, 2, 3, 4))
val reduceRDD = rdd.reduce(_ + _) //reduce 先进行分区内聚合,再分区间聚合
println(reduceRDD)
context.stop()
}
}
2、collect
方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
3、count
统计数组源中数据的个数
4、first
获取数据源中的第一个元素
5、take
返回一个由RDD的前n个元素组成的数组
6、takeOrdered
返回该RDD排序后的前n个元素组成的数据,这个方法很不错,还可以直接排序
7、代码示例
前面几个方法比较简单,所以把他们放一起了。
package com.atguigu.bigdata.spark.core.wc.action
import org.apache.spark.{SparkConf, SparkContext}
// RDD 行动算子 reduce:聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
class Spark02_RDD_reduce {
}
object Spark02_RDD_reduce{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD行动算子")
val context = new SparkContext(conf)
val rdd = context.makeRDD(List(1, 2, 3, 5,4))
//reduce:先进行分区内聚合,再分区间聚合
val reduceRDD = rdd.reduce(_ + _)
println(reduceRDD)
//collect:方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
// val ints = rdd.collect()
// println(ints)
// count:数组源中数据的个数
val count = rdd.count()
println(count)
// first:获取数据源中的第一个元素
val i = rdd.first()
println(i)
//take:返回一个由RDD的前n个元素组成的数组
val ints = rdd.take(3)
println(ints.mkString(","))
//takeOrdered:数据排序后取前n个数据
val ints1 = rdd.takeOrdered(4)
println(ints1.mkString(","))
context.stop()
}
}
8、aggregate
分区的数据通过初始值
和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
val result = rdd.aggregate(0)(_ + _,_+ _)
//初始值,分区内计算,分区间计算。
注意:
aggregate
和 aggregateByKey
的区别:
aggregateByKey
初始值只参与分区内计算
aggregate
初始值参与内区内计算,并且和参与分区间计算
package com.atguigu.bigdata.spark.core.wc.action
import org.apache.spark.{SparkConf, SparkContext}
//aggregate:分区的数据通过`初始值`和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
class Spark03_RDD_aggregate {
}
object Spark03_RDD_aggregate{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD行动算子")
val context = new SparkContext(conf)
val rdd = context.makeRDD(List(1, 2, 3, 4))
//TODO -行动算子 aggregate
//aggregateByKey 是转换算子 aggregate 是行动算子
// aggregateByKey:初始值只会参与分区内计算
// aggregate : 初始值会参与内区内计算,并且和参与分区间计算
val i = rdd.aggregate(10)(_ + _, _ + _)
println(i)
context.stop()
}
}
9、fold
aggregate 的简化版本,只是分区内操作和分区间操作是一样的,可以减少一些。
val result = rdd.fold(0)(_ + _)
//前面是初始值,后面是分区内和分区间的计算规则
10、countByValue & countByKey (wordcount重点)
这个方法特别特别好用,做wordcount直接这一个方法就出来了。
countByKey
需要键值对类型的集合,根据key来统计。countByValue
直接普通的集合就可以了,根据value,这里的value不是键值对里面那个value,是单值的意思。
package com.atguigu.bigdata.spark.core.wc.action
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
class Spark04_countByKey {
}
object Spark04_countByKey{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD行动算子")
val context = new SparkContext(conf)
val rdd = context.makeRDD(List(1, 2, 3, 4,2,2,3))
val rdd2: RDD[String] = context.makeRDD(List("word", "word", "scala", "spark"))
val rdd3 = context.makeRDD(List(("a",1),("a",2),("b",1),("c",2))) //统计的是出现的次数不是value
//countByValue() 根据单值来统计
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
val stringToLong: collection.Map[String, Long] = rdd2.countByValue()
// countByKey() 根据key来统计
val stringToLong1 = rdd3.countByKey()
//println(intToLong) //返回的是一个Map ,Map可以直接输出,还默认就调用了collect方法
println(stringToLong)
println(stringToLong1)
context.stop()
}
}