文章目录
- (一)map 和 mapPartitions
- (二)foreach 和 foreachPartition
- (三)repartition的使用
- (四)reduceByKey 和 groupByKey的区别
(一)map 和 mapPartitions
- map 操作:对 RDD 中的每个元素进行操作,一次处理一条数据
- mapPartitions 操作:对 RDD 中每个 partition 进行操作,一次处理一个分区的数据
所以:
- map 操作: 执行 1 次 map算子只处理 1 个元素,如果 partition 中的元素较多,假设当前已经处理了 1000 个元素,在内存不足的情况下,Spark 可以通过GC等方法回收内存(比如将已处理掉的1000 个元素从内存中回收)。因此, map 操作通常不会导致OOM异常;
- mapPartitions 操作: 执行 1 次map算子需要接收该 partition 中的所有元素,因此一旦元素很多而内存不足,就容易导致OOM的异常,也不是说一定就会产生OOM异常,只是和map算子对比的话,相对来说容易产生OOM异常
不过一般情况下,mapPartitions 的性能更高;初始化操作、数据库链接等操作适合使用 mapPartitions操作
这是因为:
假设需要将 RDD 中的每个元素写入数据库中,这时候就应该把创建数据库链接的操作放置在mapPartitions 中,创建数据库链接这个操作本身就是个比较耗时的,如果该操作放在 map 中执行,将会频繁执行,比较耗时且影响数据库的稳定性。
map 和mapPartition代码区别
object MapPartitionScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("CheckpointOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRdd = sc.parallelize(Array(1,2,3,4,5),2)
// val sum =dataRdd.map(item=>{
// println("=======")
// item*2
// }).reduce(_+_)
//mapParition一次处理一个分区的数据
val sum = dataRdd.mapPartitions(it=>{
println("=======")
val result = new ArrayBuffer[Int]()
it.foreach(item=>{
result.+=(item*2)
})
result.toIterator
}).reduce(_+_)
print("sum:"+sum)
sc.stop()
}
}
- 建议针对初始化链接之类的操作,使用mapPartitions,放在mapPartitions内部
例如:创建数据库链接,使用mapPartitions可以减少链接创建的次数,提高性能 - 注意:创建数据库链接的代码建议放在次数,不要放在Driver端或者it.foreach内部
数据库链接放在Driver端会导致链接无法序列化,无法传递到对应的task中执行,所以 - 数据库链接放在it.foreach()内部还是会创建多个链接,和使用map算子的效果是一样
(二)foreach 和 foreachPartition
-
foreach:一次处理一条数据
-
foreachPartition:一次处理一个分区的数据
-
foreachPartition的特性和mapPartitions 的特性是一样的,唯一的区别就是
mapPartitions 是 transformation 操作(不会立即执行),foreachPartition是 action 操作(会立即执行)
代码实现:
object ForeachPartitionOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ForeachPartitionOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//foreachPartition:一次处理一个分区的数据,作用和mapPartitions类似
//唯一的区是mapPartitions是transformation算子,foreachPartition是action算子
dataRDD.foreachPartition(it=>{
//在此处获取数据库链接
println("===============")
it.foreach(item=>{
//在这里使用数据库链接
println(item)
})
//关闭数据库链接
})
sc.stop()
}
}
(三)repartition的使用
对RDD进行重分区,repartition主要有两个应用场景:
- 可以调整RDD的并行度
针对个别RDD,如果感觉分区数量不合适,想要调整,可以通过repartition进行调整,分区调整了之后,对应的并行度也就可以调整了 - 可以解决RDD中数据倾斜的问题
如果RDD中不同分区之间的数据出现了数据倾斜,可以通过repartition实现数据重新分发,可以均匀分发到不同分区中
代码实现:Repatition的使用
object RepartitionOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("RepartitionOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//重新设置RDD的分区数量为3,这个操作会产生shuffle
//也可以解决RDD中数据倾斜的问题
dataRDD.repartition(3)
.foreachPartition(it=>{
println("=========")
it.foreach(println(_))
})
//通过repartition可以控制输出数据产生的文件个数
dataRDD.saveAsTextFile("hdfs://bigdata01:9000/rep-001")
dataRDD.repartition(1).saveAsTextFile("hdfs://bigdata01:9000/rep-002")
sc.stop()
}
}
(四)reduceByKey 和 groupByKey的区别
功能:实现分组聚合
原理
首先这两个算子在执行的时候都会产生shuffle
但是:
1:当采用reduceByKey时,数据在进行shuffle之前会先进行局部聚合
2:当使用groupByKey时,数据在shuffle之间不会进行局部聚合,会原样进行shuffle
这样的话reduceByKey就减少了shuffle的数据传送,所以效率会高一些。
总结 :能用reduceByKey优先使用reduceByKey