1、reduce(聚合)
2、collect(采集)
3、count (统计)
4、first
5、take
6、takeOrdered
7、aggregate
8、fold
9、countByKey
10、countByValue
11、save 算子
12、foreach
算子总结
所谓行动算子其实就是触发作业执行的方法,底层代码调用的是环境对象runJob方法
1、reduce(聚合)
def reduce(f: (T, T) => T): T
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
def main(args: Array[String]): Unit = {
//准备环境
//"*"代表线程的核数 应用程序名称"RDD"
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val r: Int = rdd.reduce(_+_)
println(r)
//关闭环境
sc.stop()
2、collect(采集)
在驱动程序中,以数组 Array 的形式返回数据集的所有元素
def collect(): Array[T]
方法会将不同分区的数据安装分区顺序采集到Driver端内存中,形成数组
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集数据到 Driver
rdd.collect().foreach(println)
3、count (统计)
def count(): Long
返回 RDD 中元素的个数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()
4、first
def first(): T
返回 RDD 中的第一个元素
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val firstResult: Int = rdd.first()
println(firstResult)
5、take
def take(num: Int): Array[T]
返回一个由 RDD 的前 n 个元素组成的数组
def main(args: Array[String]): Unit = {
//准备环境
//"*"代表线程的核数 应用程序名称"RDD"
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
//获取数据 take(参数为获取数据的个数)
val tRDD: Array[Int] = rdd.take(3)
println(tRDD.mkString(" "))
//关闭环境
sc.stop()
}
6、takeOrdered
是在take的基础上先对数据进行排序然后再获取第n个数据
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
返回该 RDD 排序后的前 n 个元素组成的数组
val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
// 返回 RDD 中元素的个数
val result: Array[Int] = rdd.takeOrdered(2)
7、aggregate
aggregate 于 aggregateBykey的区别:
- aggregate:初始值只会参与分区内的计算
- aggregateBykey:初始值会参与分区内和分区外的计算
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val a: Int = rdd.aggregate(0)(_+_,_+_)
println(a)
8、fold
当分区内外计算操作相同时使用。折叠操作,aggregate 的简化版操作
def fold(zeroValue: T)(op: (T, T) => T): T
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val a: Int = rdd.fold(0)(_+_)
println(a)
9、countByKey
统计每种 key 出现的个数
def countByKey(): Map[K, Long]
val rdd = sc.makeRDD(List(("a",1),("b",2),("a",2),("a",1),("b",4)))
val ck: collection.Map[String, Long] = rdd.countByKey()
println(ck)
10、countByValue
统计元素值出现的次数
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val cv: collection.Map[Int, Long] = rdd.countByValue()
println(cv)
11、save 算子
将数据保存到不同格式的文件中
1、saveAsTextFile
保存成 Text 文件
def saveAsTextFile(path: String): Unit
rdd.saveAsTextFile("output")
2、saveAsObjectFile
序列化成对象保存到文件
def saveAsObjectFile(path: String): Unit
rdd.saveAsObjectFile("output1")
3、saveAsSequenceFile
保存成 Sequencefile 文件,要求数据格式必须是K-V类型
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
rdd.map((_,1)).saveAsSequenceFile("output2")
12、foreach
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
分布式遍历 RDD 中的每一个元素,调用指定函数
def main(args: Array[String]): Unit = {
//准备环境
//"*"代表线程的核数 应用程序名称"RDD"
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6,7))
// 收集后打印
//下面foreach其实是Driver端内存集合的遍历方法
rdd.map(num=>num).collect().foreach(println)
println("****************")
// 分布式打印
//下面foreach其实是Executor端内存数据打印
rdd.foreach(println)
//关闭环境
sc.stop()
}