目录
map
mapPartitions
mapPartitionsWithIndex
flatMap
glom
groupBy
shuffle
filter
sample
distinct
coalesce
repartition
sortBy
ByKey
intersection
union
subtract
zip
partitionBy
reduceByKey
groupByKey
reduceByKey 和 groupByKey 的区别
aggregateByKey
foldByKey
combineByKey
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别
join
leftOuterJoin
cogroup
完成永远比完美更重要
Value类型
map
def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据
逐条
进行映射转换(A => B),这里的转换可以是类型的转换,也可以是值的转换。
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
num => {
num * 2
}
)
val dataRDD2: RDD[String] = dataRDD1.map(
num => {
"" + num
}
)
实例 从服务器日志数据 apache.log 中获取用户请求 URL 资源路径 apache.log
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Regular.ttf
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png
83.149.9.216 - - 17/05/2015:10:05:46 +0000 GET /presentations/logstash-monitorama-2013/images/Dreamhost_logo.svg
83.149.9.216 - - 17/05/2015:10:05:11 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard2.png
83.149.9.216 - - 17/05/2015:10:05:19 +0000 GET /presentations/logstash-monitorama-2013/images/apache-icon.gif
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/nagios-sms5.png
83.149.9.216 - - 17/05/2015:10:05:00 +0000 GET /presentations/logstash-monitorama-2013/images/redis.png
83.149.9.216 - - 17/05/2015:10:05:25 +0000 GET /presentations/logstash-monitorama-2013/images/elasticsearch.png
83.149.9.216 - - 17/05/2015:10:05:59 +0000 GET /presentations/logstash-monitorama-2013/images/logstashbook.png
83.149.9.216 - - 17/05/2015:10:05:30 +0000 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 - - 17/05/2015:10:05:53 +0000 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/1983_delorean_dmc-12-pic-38289.jpeg
83.149.9.216 - - 17/05/2015:10:05:54 +0000 GET /presentations/logstash-monitorama-2013/images/simple-inputs-filters-outputs.jpg
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/tiered-outputs-to-inputs.jpg
83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
24.236.252.67 - - 17/05/2015:10:05:40 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /articles/dynamic-dns-with-dhcp/
93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
93.114.45.13 - - 17/05/2015:10:05:45 +0000 GET /style2.css
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:17 +0000 GET /images/jordan-80.png
93.114.45.13 - - 17/05/2015:10:05:21 +0000 GET /images/web/2009/banner.png
package com.qihang.bigdata.spark.core.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("datas/apache.log")
//长String => 短String
val mapRDD: RDD[String] = rdd.map(
line => {
val data = line.split(" ")
data(6)
}
)
mapRDD.collect().foreach(println)
sc.stop()
}
}
1. rdd的计算一个分区内的数据是一个一个执行逻辑 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。 分区内数据的执行是有序的。 2. 不同分区数据计算是无序的。
mapPartitions
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据
以分区为单位
发送到计算节点进行处理,这里的处理是指可以进行任意的处
理,哪怕是过滤数据。(将一个分区的迭代器传入 => 处理后整个分区的迭代器)
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => {
datas.filter(_==2)
}
)
实例
获取每个数据分区的最大值
package com.qihang.bigdata.spark.core.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
val mpRDD = rdd.mapPartitions(
iter => {
List(iter.max).iterator // 需要返回迭代器,用List()封装
}
)
mpRDD.collect().foreach(println(_))
sc.stop()
}
}
可以以分区为单位进行数据转换操作 但是会将整个分区的数据加载到内存进行引用 如果处理完的数据是不会被释放掉,存在对象的引用。 在内存较小,数据量较大的场合下,容易出现内存溢出。
map
和
mapPartitions
的区别
➢
数据处理角度
Map
算子是分区内一个数据一个数据的执行,类似于串行操作。而
mapPartitions
算子
是以分区为单位进行批处理操作。
➢
功能的角度
Map
算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。
MapPartitions
算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,
所以可以增加或减少数据
➢
性能的角度
Map
算子因为类似于串行操作,所以性能比较低,而是
mapPartitions
算子类似于批处
理,所以性能较高。但是
mapPartitions
算子会长时间占用内存,那么这样会导致内存可能
不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用
map
操作。
mapPartitionsWithIndex
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处
理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
val dataRDD1 = dataRDD.mapPartitionsWithIndex(
(index, datas) => {
datas.map(index, _)
}
)
实例
获取第二个分区的数据
package com.qihang.bigdata.spark.core.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
if (index == 1) { //0,1 从零开始,index==1为第二个分区
iter
} else {
Nil.iterator //
}
}
)
mpiRDD.collect().foreach(println(_))
sc.stop()
}
}
将数据和所在分区合并成元组
val rdd = sc.makeRDD(List(1,2,3,4))
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
// 1, 2, 3, 4
//(0,1)(2,2),(4,3),(6,4)
iter.map(
num => {
(index, num)
}
)
}
)
flatMap
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
val dataRDD = sparkContext.makeRDD(List(
List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
list => list
)
glom
def glom(): RDD[Array[T]]
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// List => Int
// Int => Array
val glomRDD: RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data => println(data.mkString(",")))
案例
计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
// 【2】,【4】
// 【6】
val glomRDD: RDD[Array[Int]] = rdd.glom()
val maxRDD: RDD[Int] = glomRDD.map(
array => {
array.max
}
)
println(maxRDD.collect().sum)
groupBy
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(
_%2
)
shuffle
将数据根据指定的规则进行分组
,
分区默认不变,但是数据会被
打乱重新组合
,我们将这样
的操作称之为
shuffle
。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组,多个组可以放在一个分区里。
所以分区数和分组数无关。
实例
将
List("Hello", "hive", "hbase", "Hadoop")
根据单词首写字母进行分组。
val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
// 分组和分区没有必然的关系
val groupRDD = rdd.groupBy(_.charAt(0))
groupRDD.collect().foreach(println)
filter
def filter(f: T => Boolean): RDD[T]
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出
现
数据倾斜
。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)
实例
从服务器日志数据
apache.log
中获取
2015
年
5
月
17
日的请求路径
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Regular.ttf
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png
83.149.9.216 - - 17/05/2015:10:05:46 +0000 GET /presentations/logstash-monitorama-2013/images/Dreamhost_logo.svg
83.149.9.216 - - 17/05/2015:10:05:11 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard2.png
83.149.9.216 - - 17/05/2015:10:05:19 +0000 GET /presentations/logstash-monitorama-2013/images/apache-icon.gif
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/nagios-sms5.png
83.149.9.216 - - 17/05/2015:10:05:00 +0000 GET /presentations/logstash-monitorama-2013/images/redis.png
83.149.9.216 - - 17/05/2015:10:05:25 +0000 GET /presentations/logstash-monitorama-2013/images/elasticsearch.png
83.149.9.216 - - 17/05/2015:10:05:59 +0000 GET /presentations/logstash-monitorama-2013/images/logstashbook.png
83.149.9.216 - - 17/05/2015:10:05:30 +0000 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 - - 17/05/2015:10:05:53 +0000 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/1983_delorean_dmc-12-pic-38289.jpeg
83.149.9.216 - - 17/05/2015:10:05:54 +0000 GET /presentations/logstash-monitorama-2013/images/simple-inputs-filters-outputs.jpg
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/tiered-outputs-to-inputs.jpg
83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
24.236.252.67 - - 17/05/2015:10:05:40 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /articles/dynamic-dns-with-dhcp/
93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
93.114.45.13 - - 17/05/2015:10:05:45 +0000 GET /style2.css
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:17 +0000 GET /images/jordan-80.png
93.114.45.13 - - 17/05/2015:10:05:21 +0000 GET /images/web/2009/banner.png
val rdd = sc.textFile("datas/apache.log")
val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
line => {
val datas = line.split(" ")
val time = datas(3)
//time.substring(0, )
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val date: Date = sdf.parse(time)
val sdf1 = new SimpleDateFormat("HH")
val hour: String = sdf1.format(date)
(hour, 1)
}
).groupBy(_._1)
timeRDD.map{
case ( hour, iter ) => {
(hour, iter.size)
}
}.collect.foreach(println)
sample
def sample(
withReplacement: Boolean,
fraction:Double,
seed: Long = Utils.random.nextLong): RDD[T]
根据指定的规则从数据集中
抽取
数据
// sample算子需要传递三个参数
// 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
// 2. 第二个参数表示,
// 如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
// 如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
// 3. 第三个参数表示,抽取数据时随机算法的种子
// 如果不传递第三个参数,那么使用的是当前系统时间
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)
防止数据倾斜。
distinct
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
将数据集中重复的数据去重
//源码
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
// (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
// (1, null)(1, null)(1, null)
// (null, null) => null
// (1, null) => 1
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()
coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
根据数据量
缩减分区
,用于大数据集过滤后,提高小数据集的执行效率
当
spark
程序中,存在过多的小任务的时候,可以通过
coalesce
方法,收缩合并分区,减少
分区的个数,减小任务调度
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)
扩大分区
// coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
// 所以如果想要实现扩大分区的效果,需要使用shuffle操作
// spark提供了一个简化的操作
// 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
// 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
//val newRDD: RDD[Int] = rdd.coalesce(3, true)
repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
该操作内部其实执行的是
coalesce
操作,参数
shuffle
的默认值为
true
。无论是将分区数多的
RDD
转换为分区数少的
RDD
,还是将分区数少的
RDD
转换为分区数多的
RDD
,
repartition
操作都可以完成,因为无论如何都会经
shuffle
过程。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)
sortBy
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
该操作用于排序数据。在排序之前,可以将数据通过
f
函数进行处理,之后按照
f
函数处理
的结果进行排序,默认为升序排列。排序后新产生的
RDD
的分区数与原
RDD
的分区数一
致。中间存在
shuffle 的过程。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
ByKey
双Value类型
intersection
def intersection(other: RDD[T]): RDD[T]
union
def union(other: RDD[T]): RDD[T]
subtract
def subtract(other: RDD[T]): RDD[T]
zip
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
// Can't zip RDDs with unequal numbers of partitions: List(2, 4)
// 两个数据源要求分区数量要保持一致
// Can only zip RDDs with same number of elements in each partition
// 两个数据源要求分区中数据数量保持一致
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),2)
val rdd2 = sc.makeRDD(List(3,4,5,6),2)
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))
// 交集 : 【3,4】
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))
// 并集 : 【1,2,3,4,3,4,5,6】
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集 : 【1,2】
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// 拉链 : 【1-3,2-4,3-5,4-6】
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
val rdd8 = rdd1.zip(rdd7)
println(rdd6.collect().mkString(","))
Key-Value类型
partitionBy
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
将数据按照指定
Partitioner
重新进行分区。
Spark
默认的分区器是
HashPartitioner
val rdd = sc.makeRDD(List(1,2,3,4),2)
val mapRDD:RDD[(Int, Int)] = rdd.map((_,1))
// RDD => PairRDDFunctions
// 隐式转换(二次编译)
// partitionBy根据指定的分区规则对数据进行重分区
import org.apache.spark.HashPartitioner
val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
newRDD.partitionBy(new HashPartitioner(2))
newRDD.saveAsTextFile("output")
reduceByKey
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
可以将数据按照相同的
Key
对
Value
进行聚合
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
reduceByKey分区内和分区间计算规则相同。
groupByKey
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K,
将数据源的数据根据
key
对
value
进行分组
val dataRDD1 =
sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
reduceByKey 和 groupByKey 的区别
从
shuffle
的角度
:
reduceByKey
和
groupByKey
都存在
shuffle
的操作,但是
reduceByKey
可以在
shuffle
前对分区内相同
key
的数据进行预聚合(
combine
)功能,这样会减少落盘的
数据量,而
groupByKey
只是进行分组,不存在数据量减少的问题,
reduceByKey
性能比较
高。
从功能的角度
:
reduceByKey
其实包含分组和聚合的功能。
GroupByKey
只能分组,不能聚
合,所以在分组聚合的场合下,推荐使用
reduceByKey
,如果仅仅是分组而不需要聚合。那
么还是只能使用
groupByKey
aggregateByKey
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
将数据根据
不同的规则
进行分区内计算和分区间计算
val dataRDD1 =
sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 =
dataRDD1.aggregateByKey(0)(_+_,_+_)
实例
取出每个分区内
相同
key
的最大值然后分区间相加
// TODO : 取出每个分区内相同 key 的最大值然后分区间相加
// aggregateByKey 算子是函数柯里化,存在两个参数列表
// 1. 第一个参数列表中的参数表示初始值
// 2. 第二个参数列表中含有两个参数
// 2.1 第一个参数表示分区内的计算规则
// 2.2 第二个参数表示分区间的计算规则
val rdd =
sc.makeRDD(List(
("a",1),("a",2),("c",3),
("b",4),("c",5),("c",6)
),2)
// 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
// => (a,10)(b,10)(c,20)
// 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
val resultRDD =
rdd.aggregateByKey(10)(
(x, y) => math.max(x,y),
(x, y) => x + y
)
resultRDD.collect().foreach(println)
foldByKey
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
combineByKey
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
最通用的对
key-value
型
rdd
进行聚集操作的聚集函数(
aggregation function
)。类似于
aggregate()
,
combineByKey()
允许用户返回值的类型与输入不一致。
实例
将数据
List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
求每个
key
的平 均值
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),
("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
(_, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别
reduceByKey:
相同
key
的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey:
相同
key
的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相 同
AggregateByKey
:相同
key
的第一个数据和初始值进行分区内计算,分区内和分区间计算规
则可以不相同
CombineByKey:
当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区
内和分区间计算规则不相同。
/*
reduceByKey:
combineByKeyWithClassTag[V](
(v: V) => v, // 第一个值不会参与计算
func, // 分区内计算规则
func, // 分区间计算规则
)
aggregateByKey :
combineByKeyWithClassTag[U](
(v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
cleanedSeqOp, // 分区内计算规则
combOp, // 分区间计算规则
)
foldByKey:
combineByKeyWithClassTag[V](
(v: V) => cleanedFunc(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
cleanedFunc, // 分区内计算规则
cleanedFunc, // 分区间计算规则
)
combineByKey :
combineByKeyWithClassTag(
createCombiner, // 相同key的第一条数据进行的处理函数
mergeValue, // 表示分区内数据的处理函数
mergeCombiners, // 表示分区间数据的处理函数
)
*/
join
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
在类型为
(K,V)
和
(K,W)
的
RDD
上调用,返回一个相同
key
对应的所有元素连接在一起的
(K,(V,W))
的
RDD
存在笛卡儿积
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
leftOuterJoin
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
类似于
SQL
语句的左外连接
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
cogroup
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))
在类型为
(K,V)
和
(K,W)
的
RDD
上调用,返回一个
(K,(Iterable<V>,Iterable<W>))
类型的
RDD
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2) , ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 6), ("c", 7)
))
// cogroup : connect + group (分组,连接)
val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
(a,(CompactBuffer(1),CompactBuffer(4)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(3),CompactBuffer(6, 7)))
案例
package com.qihang.bigdata.spark.core.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
val sc = new SparkContext(sparkConf)
val dataRDD = sc.textFile("datas/agent.log")
val mapRDD = dataRDD.map(
line => {
val datas = line.split(" ")
((datas(1), datas(4)), 1)
}
)
val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)
//如果map中的数据有特定的格式, 可以用模式匹配简化
//case 特定格式
//https://www.cnblogs.com/JYB2021/p/16199184.html
val newMapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
case ((prv, ad), sum) =>
(prv, (ad, sum))
}
val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()
//只对 value 进行迭代 用mapValues
val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
resultRDD.collect().foreach(println)
sc.stop()
}
}