RDD 算子就是 RDD 的方法
一、转换算子
根据数据处理方式的不同可以分为单 Value 类型、双 Value 类型和 Key-Value 类型
1. map
/**
单 Value 类型算子
函数签名:def map[U: ClassTag](f: T => U): RDD[U]
功能:将处理的数据逐条进行映射转换,可以是类型的转换,也可以是值的转换
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
// 案例 1:将集合中的每个元素 * 2
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
def mapFunc(num: Int): Int = {
num * 2
}
val mapRdd = rdd.map(mapFunc)
mapRdd.collect().foreach(println)
// 使用匿名函数和至简原则传参
val mapRdd2 = rdd.map(_ * 2)
mapRdd2.collect().foreach(println)
// 案例2:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径
// apache.log 内容格式:ip - - 时间戳 时区 请求方式 URL资源路径
val rdd2 = sc.textFile("data/apache.log")
val mapRdd3 = rdd2.map(
line => {
val dataArr = line.split(" ")
dataArr(6)
}
)
mapRdd3.collect().foreach(println)
// 案例3:map 的并行计算
val rdd3 = sc.makeRDD(List(1,2,3,4), 1)
rdd3.map(num => {
println(">>>> " + num)
num
}).map(num => {
println("@@@@ " + num)
num
}).collect()
// 结论:同一个分区内的数据 map 执行是有序的,即前一个数据执行完全部操作后再执行下一个
val rdd4 = sc.makeRDD(List(1,2,3,4), 2)
rdd4.map(num => {
println(">>>> " + num)
num
}).map(num => {
println("@@@@ " + num)
num
}).collect()
// 结论:不同分区内的数据 map 执行是无序
}
}
2. mapPartitions
/**
单 Value 类型算子
函数签名:def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
功能:将待处理的数据以分区为单位发送到计算节点进行处理,处理是指可以进行任意的处理,即使是过滤数据
缺点:该算子会将分区的整个数据加载到内存中并引用,由于存在引用,所以数据处理完不会被释放掉,在内存较小且数据量较大的情况下会造成内存溢出
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
// 案例 1:将集合中的每个元素 * 2
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val mpRdd = rdd.mapPartitions(iter => {
println(">>>>>") // 执行次数与分区数一致
iter.map(_ * 2)
})
mpRdd.collect().foreach(println)
// 案例2:获取每个数据分区的最大值
val mpRdd2 = rdd.mapPartitions(iter => {
val maxValue = iter.max()
List(maxValue).iterator
})
mpRdd2.collect().foreach(println)
}
}
3. map VS mapPartitions
-
数据处理角度:map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作
-
功能的角度:map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。mapPartitions 算子是传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
-
性能的角度:map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,可能会导致内存溢出,所以在内存有限的情况下,推荐使用 map 操作
4. mapPartitionsWithIndex
/**
单 Value 类型算子
函数签名:def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
功能:将待处理的数据以分区为单位发送到计算节点进行处理,在处理时同时可以获取当前分区索引
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
// 案例 1:输出第二个分区中集合的数据
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
rdd.mapPartitionsWithIndex((index, iter) => {
if(index == 1) iter else Nil.iterator
}).collect().foreach(println)
// 案例2:将集合中的元素及其分区显示输出
val rdd2 = sc.makeRDD(List(1,2,3,4))
rdd2.mapPartitionsWithIndex((index, iter) => {
iter.map(num => (index, num))
}).collect().foreach(println)
}
}
5. flatMap
/**
单 Value 类型算子
函数签名:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
功能:将处理的数据进行扁平化后再进行映射处理,也称之为扁平映射
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
// 案例 1:将 List(List(1,2),List(3,4))进行扁平化操作
val rdd: RDD[List[Int]] = sc.makeRDD(List(
List(1,2), List(3,4)
))
rdd.flatMap(list => list).collect().foreach(println)
// 案例 2:将 List(List(1,2),3,List(4,5))进行扁平化操作
val rdd2 = sc.makeRDD(List(
List(1,2), 3, List(4,5)
))
rdd2.flatMap(data => {
case list: List[_] => list
case n: Int => List(n)
case _ =>
}).collect().foreach(println)
}
}
6. glom
/**
单 Value 类型算子
函数签名:def glom(): RDD[Array[T]]
功能:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变(分区数和数据所在分区不变)
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
// 案例 1
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
val glomRdd: RDD[Array[Int]] = rdd.glom()
val glomArr: Array[Array[Int]] = glomRdd.collect()
glomArr.foreach(println(_.mkString(",")))
// 案例 2:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
val maxSum: Int = rdd.glom().map(_.max).collect().sum()
println(maxSum)
}
}
7. groupBy
/**
单 Value 类型算子
函数签名:def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
功能:将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,这样的操作称为 shuffle。极限情况下,数据可能被分在同一个分区中,一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
// 案例 1:按奇偶数分组
val rdd = sc.makeRDD(List(1,2,3,4), 2)
def groupFunc(num: Int): Int = { // 接收集合元素返回分组 key
num % 2
}
val groupRdd: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunc)
groupRdd.collect().foreach(println)
// 案例 2:按首字母分组
val rdd2 = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
rdd2.groupBy(_.charAt(0)).collect().foreach(println)
// 案例 3:从服务器日志数据 apache.log 中获取每个时间段访问量
// apache.log 内容格式:ip - - 时间戳 时区 请求方式 URL资源路径
val rdd3 = sc.textFile("data/apache.log")
rdd3.map(line => {
val time: String = line.split(" ")(3)
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val date: Date = sdf.parse(time)
val sdf1 = new SimpleDateFormat("HH")
val hour: String = sdf1.format(date)
(hour, 1)
}).groupBy(_._1).map(data => {
(_._1, _._2.size)
}).collect().foreach(println)
}
}
8. filter
/**
单 Value 类型算子
函数签名:def filter(f: T => Boolean): RDD[T]
功能:将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
// 案例 1:过滤集合中的奇数
val rdd = sc.makeRDD(List(1,2,3,4), 2)
rdd.filter(_ % 2 == 0).collect().foreach(println)
// 案例 2:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径
val rdd2 = sc.textFile("data/apache.log")
rdd2.filter(data => {
val date: String = data.split(" ")(3)
date.startsWith("17/05/2015")
}).collect().foreach(println)
}
}
9. sample
/**
单 Value 类型算子
函数签名:def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong
): RDD[T]
功能:根据指定的规则从数据集中抽取数据,一般用于在分区数据倾斜时抽取数据判断原因
参数说明:
1.第一个参数表示数据抽取完后是否放回数据源(true:放回 false:丢弃)
2.第二个参数:
2.1 在数据抽取不放回时:表示每个数据被抽取的概率,指定了第三个参数有基准值的概念,每 次抽取的数据都是一样的
2.2 在数据抽取放回时:表示每个数据可能被抽取的次数,但最终可能小于或大于这个值
3.第三个参数表示抽取数据时随机算法的种子,默认值是系统时间
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
// 抽取不放回
// 底层使用伯努利算法,又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
val arr1 = rdd.sample(false, 0.4, 1).collect().mkString(",")
println(arr1)
// 抽取放回
// 底层使用泊松算法
val arr2 = rdd.sample(true, 2).collect().mkString(",")
println(arr2)
}
}
10. distinct
/**
单 Value 类型算子
函数签名:
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
功能:将数据集中重复的数据去重
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))
rdd.distinct().collect().foreach(println)
/**
List().distinct 的原理:底层使用 HashSet 处理去重
rdd.distinct() 的原理:底层实现为 map(x => (x, null)).reduceByKey((x, _) => x).map(_._1)
1-> 1,2,3,4,1,2,3,4 => (1,null),(2,null),(3,null),(4,null),(1,null),(2,null),(3,null),(4,null)
2-> (1,null) (1,null) => (1,null),.....
3-> (1,null) => 1
*/
}
}
11. coalesce
/**
单 Value 类型算子
函数签名:def coalesce(
numPartitions: Int,
shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty
)(implicit ord: Ordering[T] = null): RDD[T]
功能:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率;当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3)
rdd.saveAsTextFile("output") // [1,2],[3,4],[5,6]
// 不 shuffle,默认
rdd.coalesce(2).saveAsTextFile("output1") // [1,2],[3,4,5,6]
// shuffle
rdd.coalesce(2, true).saveAsTextFile("output2") // [1,4,5],[2,3,6]
// 可以扩大分区,必须设置 shuffle 为 true,否则不起作用
val rdd2 = sc.makeRDD(List(1,2,3,4,5,6), 2)
// rdd2.coalesce(3).saveAsTextFile("output3") // 不起作用
rdd2.coalesce(3, true).saveAsTextFile("output3")
}
}
12. repartition
/**
单 Value 类型算子
函数签名:def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
功能:扩大或缩减分区,其底层是调用 coalesce 方法,且参数 shuffle 的默认设置为 true
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2)
rdd.repartition(3).saveAsTextFile("output") // 底层调用 coalesce(3,true)
}
}
13. sortBy
/**
单 Value 类型算子
函数签名:def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length
)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
功能:用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(6,3,5,1,2,4), 2)
rdd.sortBy(_).collect().foreach(println) // 默认升序
val rdd2 = sc.makeRDD(List(("1",1),("1",2),("11",3),("2",2)), 2)
// 按照 tuple 的第一个元素降序
rdd2.sortBy(_._1, false).collect().foreach(println)
// 按照 tuple 升序,先比较第一个元素,相同则比较第二个元素,依次类推
rdd2.sortBy(_, true).collect().foreach(println)
}
}
14. intersection
/**
双 Value 类型算子
函数签名:def intersection(other: RDD[T]): RDD[T]
功能:对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
注意事项:三个 RDD 的数据类型必须一致
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
rdd3.collect().foreach(println) // 3, 4
}
}
15. union
/**
双 Value 类型算子
函数签名:def union(other: RDD[T]): RDD[T]
功能:对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
注意事项:三个 RDD 的数据类型必须一致
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd3: RDD[Int] = rdd1.union(rdd2)
rdd3.collect().foreach(println) // 1,2,3,4,3,4,5,6
}
}
16. subtract
/**
双 Value 类型算子
函数签名:def subtract(other: RDD[T]): RDD[T]
功能:以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来返回一个新的 RDD
注意事项:三个 RDD 的数据类型必须一致
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd3: RDD[Int] = rdd1.subtract(rdd2)
rdd3.collect().foreach(println) // 1,2
val rdd4: RDD[Int] = rdd2.subtract(rdd1)
rdd4.collect().foreach(println) // 5,6
}
}
17. zip
/**
双 Value 类型算子
函数签名:def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
功能:将两个 RDD 中的元素,以键值对的形式进行合并返回一个新的 RDD。其中,键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素
注意事项:
1.两个 RDD 的数据类型可以不一致
2.两个 RDD 的分区数量必须保持一致
3.两个 RDD 的每个分区中的数据个数必须保持一致
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd3: RDD[(Int,Int)] = rdd1.zip(rdd2)
rdd3.collect().foreach(println) // (1,3),(2,4),(3,5),(4,6)
val rdd4: RDD[(Int,Int)] = rdd2.zip(rdd1)
rdd4.collect().foreach(println) // (3,1),(4,2),(5,3),(6,4)
// 错误案例
val rdd5 = sc.makeRDD(List(1,2,3,4), 2)
val rdd6 = sc.makeRDD(List(3,4,5,6), 3)
rdd5.zip(rdd6) // error
val rdd7 = sc.makeRDD(List(1,2,3,4,5,6), 2)
val rdd8 = sc.makeRDD(List(3,4,5,6), 2)
rdd7.zip(rdd8) // error
}
}
18. partitionBy
/**
Key-Value 类型算子
函数签名:def partitionBy(partitioner: Partitioner): RDD[(K, V)]
功能:将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
注意事项:
1.partitionBy 方法不是 RDD 中的方法,而是 PairRDDFunctions 类的方法
2.RDD 的伴生对象中有 RDD 转换为 PairRDDFunctions 的隐式函数,所以 RDD 可以调用 partitionBy 方法
3.partitionBy 不会改变分区的数量(区别于 coalesce 和 repartition),只是将数据所在的分区位置进行重新分配
4.当 RDD 调用 partitionBy 指定的 partitioner 类型和分区数与该 RDD 的自身的一致,则不会执行
5.Spark 所有的分区器都继承 Partitioner 抽象类,也可以通过它自定义分区器
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// rdd.partitionBy() // error,数据源不是 Key-Value 类型
val mapRdd = rdd.map((_, 1))
// 二次编译通过隐式函数将 mapRdd 转换成 PairRDDFunctions
mapRdd.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")
// 数据分布从 【1,2】【3,4】 变成了 【(1,1),(3,1)】【(2,1),(4,1)】
// HashPartitioner 底层分区原理是获取数据的 key 的 hashCode 值与分区数取模得到分区号
}
}
19. reduceByKey
/**
Key-Value 类型算子
函数签名:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
功能:将数据按照相同的 Key 对 Value 进行聚合运算
注意事项:
1.reduceByKey 每次是对相同 key 的值进行两两聚合直到得到最终的一个结果
2.如果数据源中的某个 key 只有一份,则不会参与 reduceByKey 运算
3.reduceByKey 的分区内和分区间的聚合运算规则是一致的
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a",1),("a",2),("a",3),("b",4)
))
val reduceRdd: RDD[(String, Int)] = rdd.reduceByKey((v1, v2) => {
println(s"v1=$v1 v2=$v2")
v1 + v2
})
reduceRdd.collect().foreach(println)
}
}
20. groupByKey
/**
Key-Value 类型算子
函数签名:
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
功能:将数据源的数据根据 key 对 value 进行分组
注意事项:
1.groupByKey 不需要单独指定 key,返回的是 (key, Iterable(value)) 的二元组
2.groupBy 可以指定进行分组的 key,返回的是 (key, Iterable((key,value))) 的二元组
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a",1),("a",2),("a",3),("b",4)
))
val groupRdd: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupRdd.collect().foreach(println)
// (a,CompactBuffer(1,2,3)), (b,CompactBuffer(4))
}
}
21. reduceByKey VS groupByKey
-
从性能的角度:
-
reduceByKey 和 groupByKey 由于需要打乱重组数据,所以都存在 shuffle 的操作,在 Spark 中 shuffle 操作必须落盘,不能在内存中等待数据,会造成内存溢出
-
reduceByKey 会在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)操作,这样会减少落盘的数据量,所以性能比较高
-
groupByKey 只是进行分组,不会进行预聚合减少数据量,所以性能比较低
-
-
从功能的角度:reduceByKey 包含分组和聚合的两个操作。groupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合,则使用 groupByKey
22. aggregateByKey
/**
Key-Value 类型算子
函数签名:def aggregateByKey[U: ClassTag](zeroValue: U)(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)]
功能:将数据源的数据根据 key 按不同的规则进行分区内计算和分区间计算
解释:
1.aggregateByKey 方法存在函数柯里化,拥有两个参数列表
2.第一个参数列表需要传递一个参数,表示初始值,用于分区内和第一个元素 key 的 value 进行计算
3.第二个参数列表需要传递两个函数类型参数,第一个函数类型参数表示分区内运算规则,第二个函数类型参数表示分区间运算规则
4.aggregateByKey 方法返回的 value 类型取决于初始值的类型
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
// 案例 1:统计每个分区内的最大值,然后求不同分区最大值的和
val rdd = sc.makeRDD(List(
("a",1),("a",2),("a",3),("a",4)
), 2)
/*
【("a",1),("a",2)】【("a",3),("a",4)】
【("a",2)】【("a",4)】
("a", 6)
*/
rdd.aggregateByKey(0)(
(x, y) => math.max(x, y),
(x, y) => x + y
).collect().foreach(println)
// 案例 2:统计每个 key 的 value 的平均值(value和/次数)
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
), 2)
// 预期结果:(a, (1+2+6)/(1+1+1)=3),(b, (3+4+5)/(1+1+1)=4)
val aggRdd: RDD[(String, (Int, Int))] = rdd1.aggregateByKey((0, 0))(
(t, v) => (t._1 + v, t._2 + 1),
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)
)
// aggRdd.map((_._1, _._2._1/_._2._2)).collect().foreach(println)
val resultRdd: RDD[(String, Int)] = aggRdd.mapValues(_._1/_._2)
resultRdd.collect().foreach(println)
}
}
23. foldByKey
/**
Key-Value 类型算子
函数签名:def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
功能:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
), 2)
rdd.aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)
rdd.foldByKey(0)(_ + _).collect().foreach(println)
}
}
24. combineByKey
/**
Key-Value 类型算子
函数签名:def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C
): RDD[(K, C)]
功能:最通用的对 key-value 型 rdd 进行聚集操作的聚集函数
解释:
1.第一个参数表示对相同 key 的第一个 value 进行转换的操作
2.第二个参数表示分区内运算规则
3.第三个参数表示分区间运算规则
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
// 案例:统计每个 key 的 value 的平均值(value和/次数)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
), 2)
// 预期结果:(a, (1+2+6)/(1+1+1)=3),(b, (3+4+5)/(1+1+1)=4)
val aggRdd: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
(t: (Int, Int), v) => (t._1 + v, t._2 + 1),
(t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2)
)
// aggRdd.map((_._1, _._2._1/_._2._2)).collect().foreach(println)
val resultRdd: RDD[(String, Int)] = aggRdd.mapValues(_._1/_._2)
resultRdd.collect().foreach(println)
}
}
25. reduceByKey/foldByKey/aggregateByKey/combineByKey 的区别
- 四个方法底层最终调用的都是
combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)]
方法 - reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
- foldByKey:相同 key 的第一个数据和初始值按分区内规则进行计算,分区内和分区间计算规则相同
- aggregateByKey:相同 key 的第一个数据和初始值按分区内规则进行计算,分区内和分区间计算规则可以不相同
- combineByKey:相同 key 的第一个数据可以进行结构转换,分区内和分区间计算规则可以不相同
26. join
/**
Key-Value 类型算子
函数签名:def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
功能:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W)) 的 RDD,类似于 SQL 的 join 连接
说明:
1.如果两个数据集中的 key 没有匹配上,则这部分数据会被丢弃
2.如果两个数据集中存在多个相同的 key,则每个 key 会与其他 key 依次匹配,产生笛卡尔积,造成数据量几何性增长,造成内存溢出
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(
("a", 2), ("b", 2), ("c", 3), ("d", 4)
), 2)
val rdd2 = sc.makeRDD(List(
("a", 1), ("a", 7), ("c", 5), ("e", 6)
), 2)
val joinRdd: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRdd.collect().foreach(println)
}
}
27. leftOuterJoin/rightOuterJoin
/**
Key-Value 类型算子
函数签名:
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
功能:类似于 SQL 语句的左外连接和右外连接
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(
("a", 2), ("b", 2), ("c", 3), ("d", 4)
), 2)
val rdd2 = sc.makeRDD(List(
("a", 1), ("b", 7), ("c", 5), ("e", 6)
), 2)
val leftJoinRdd: RDD[(String, (Int, Option(Int))] = rdd1.leftOuterJoin(rdd2)
leftJoinRdd.collect().foreach(println)
val rightJoinRdd: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
rightJoinRdd.collect().foreach(println)
}
}
28. cogroup
/**
Key-Value 类型算子
函数签名:def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
功能:在类型为(K,V)和 1~3 个(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD,相同的 key 连接
说明:cogroup -> connect + group,即分组后再连接
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
), 2)
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 6), ("c", 7),("d", 8)
), 2)
val cgRdd: RDD[(String, (Iterable[Int], Iterable[Int])] = rdd1.cogroup(rdd2)
cgRdd.collect().foreach(println)
}
}
29. sortByKey
/**
Key-Value 类型算子
函数签名:def sortByKey(
ascending: Boolean = true,
numPartitions: Int = self.partitions.length
): RDD[(K, V)]
功能:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序
的 RDD
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("1",1),("11",3),("2",2)), 2)
// rdd.sortBy(_._1, false).collect().foreach(println) // 降序
rdd.sortByKey(false).collect().foreach(println) // 降序,默认是升序
}
}
30. 案例实操
-
需求:统计出每一个省份每个广告被点击数量排行的 Top3
agent.log 文件格式: 时间戳 省份 城市 用户 广告 1516609143867 6 7 64 16 1516609143869 9 4 75 18 1516609143869 1 7 87 12
-
实现:
object TestAdvClick { def main(args: Array[String]): Unit = { // 1.创建 spark 连接 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Adv") val sc = new SparkContext(sparkConf) // 2.读取文件数据 val dataRdd: RDD[String] = sc.textFile("data/agent.log") // 3.将一行的字符串数据转换为 ((省份, 广告), 1) 元组 val mapRdd: RDD[((String, String), Int)] = dataRdd.map(line => { val datas = line.split(" ") ((datas(1), datas(4)), 1) }) // 4.分组聚合求出每个省份每个广告的点击次数 val reduceRdd: RDD[((String, String), Int)] = mapRdd.reduceByKey(_ + _) // 5.将数据结构由 ((省份, 广告), sum) 转换为 (省份, (广告, sum)) val newMapRdd: RDD[(String, (String, Int))] = reduceRdd.map { case ((province, adv), sum) => (province, (adv, sum)) } // 6.按照省份分组 val groupRdd: RDD[(String, Iterable[(String, Int)])] = newMapRdd.groupByKey() // 7.对每个省份的广告点击次数进行降序并取 top 3 val resultRdd: RDD[(String, Iterable[(String, Int)])] = groupRdd.mapValues(iter => { iter.toList().sortBy(_._2)(Ordering.Int.reverse).take(3) }) // 8.输出结果 resultRdd.collect().foreach(println) } }
二、行动算子
只有行动算子会触发作业的执行
1. reduce
/**
函数签名:def reduce(f: (T, T) => T): T
功能:按照规则聚合 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val result: Int = rdd.reduce(_ + _)
println(result)
sc.stop()
}
}
2. collect
/**
函数签名:def collect(): Array[T]
功能:将不同分区中的数据按照分区顺序采集到的 Driver 端的内存中形成数组
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val arr: Array[Int] = rdd.collect()
println(arr.mkString(","))
sc.stop()
}
}
3. count
/**
函数签名:def count(): Long
功能:返回 RDD 中元素的个数
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val num: Long = rdd.count()
println(num)
sc.stop()
}
}
4. first
/**
函数签名:def first(): T
功能:返回 RDD 中的第一个元素
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val elem: Int = rdd.first()
println(elem)
sc.stop()
}
}
5. take
/**
函数签名:def take(num: Int): Array[T]
功能:返回一个由 RDD 的前 n 个元素组成的数组
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val takeArr: Array[Int] = rdd.take(3)
println(takeArr.mkString(","))
sc.stop()
}
}
6. takeOrdered
/**
函数签名:def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
功能:返回该 RDD 排序后的前 n 个元素组成的数组,默认是升序
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(4,2,3,1), 2)
val toArr: Array[Int] = rdd.takeOrdered(3)(Ordering[Int].reverse)
println(toArr.mkString(","))
sc.stop()
}
}
7. aggregate
/**
函数签名:def aggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U
): U
功能:每个分区的数据通过初始值和分区内的数据进行聚合,然后初始值再和所有分区间的数据聚合
对比:
1.aggregateByKey 只能用于 key-value 类型数据,aggregate 无限制
2.aggregateByKey 的初始值只参与分区内的计算,而 aggregate 的初始值会同时参与分区内和分区间的计算
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val result: Int = rdd.aggregate(10)(_ + _, _ + _)
println(result) // 40
sc.stop()
}
}
8. fold
/**
函数签名:def fold(zeroValue: T)(op: (T, T) => T): T
功能:分区内和分区间的计算规则一致时替代 aggregate
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val result: Int = rdd.fold(10)(_ + _)
println(result)
sc.stop()
}
}
9. countByKey/countByValue
/**
函数签名:
def countByKey(): Map[K, Long]
def countByValue(): Map[V, Long]
功能:统计每种 key 的个数/统计每种 value 的个数
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,1,1,4), 2)
val valMap: Map[Int, Long] = rdd.countByValue()
println(valMap) // Map(1 -> 3, 4 -> 1)
val rdd2 = sc.makeRDD(List(
"a" -> 1, "a" -> 2, "a" -> 3,
"b" -> 4
), 2)
val keyMap: Map[String, Long] = rdd2.countByKey()
println(keyMap) // Map(a -> 3, b -> 1)
sc.stop()
}
}
10. save 相关算子
/**
函数签名:
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None
): Unit
功能:将数据保存到不同格式的文件中:text文件/序列化成对象保存到文件/保存成 Sequencefile 文件
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3),("d", 4)
), 2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile 要求数据必须为 key-value 类型
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
11. foreach
/**
函数签名:def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
功能:分布式(在不同的 Executor 端)遍历 RDD 中的每一个元素,调用指定函数
说明:
1.Scala集合的方法是在同一个节点内存中执行的
2.RDD的方法可以将计算逻辑发送到分布式不同的节点执行
3.为了区分,一般将 RDD 的方法称为算子,算子的外部调用是在 Driver 端执行,而内部的具体计算逻辑是在 Executor 端执行
*/
object TestRDDOperator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 此处的 foreach 是调用的 scala 集合中 Array 的方法,是在 Driver 端的内存中执行
// 数据的打印是按顺序的
rdd.collect().foreach(println)
println("===================")
// 此处的 foreach 是调用的 RDD 的算子,是在不同的 Executor 端的内存中执行
// 数据的打印的顺序是不确定的
rdd.foreach(println)
sc.stop()
}
}