目录
1.说明
1.1 什么是累加器
1.2 累加器的功能
2. 使用累加器
3. 累加器和reduce、fold算子的区别
1.说明
1.1 什么是累加器
累加器是Spark提供的一个共享变量(Shared Variables)
默认情况下,如果Executor节点上使用到了Driver端定义的变量(通过算子传递)
算子会将该变量的副本发送的每个Task任务,但是并不会将Task任务对副本变量的修改返回给Driver端
但是Spark为我们提供了一个共享变量(累加器),允许Driver端和Task之间共享一个变量
1.2 累加器的功能
累加器用来将Executor端变量的信息聚合到Driver端
在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一个新的副本,每个Task更新这些副本的值以后,会再返回给Driver端进行merge,得到最终的值
2. 使用累加器
spark中为我们提供了三个常用的累加器,并且支持我们根据自己业务需求来实现自定义累加器类
代码示例:
test("使用spark自带的累加器") {
// 初始化 spark配置实例
val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
// 初始化 spark环境对象
val sc: SparkContext = new SparkContext(sparkconf)
/*
* TODO 使用 LongAccumulator
* 功能:
* 对 整数类型的元素做累加
* */
val intRdd: RDD[Int] = sc.makeRDD(List(1, 2, 2, 3, 3, 4, 5, 6, 7, 8, 9))
val accum: LongAccumulator = sc.longAccumulator("My LongAccumulator")
intRdd.foreach(x => accum.add(x))
println(s"LongAccumulator:${accum.value}")
/*
* TODO 使用 DoubleAccumulator
* 功能:
* 对 浮点类型的元素做累加
*
* */
val doubleRdd: RDD[Double] = sc.makeRDD(List(1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1))
val doubleAccumulator: DoubleAccumulator = sc.doubleAccumulator("My DoubleAccumulator")
doubleRdd.foreach(x => doubleAccumulator.add(x))
println(s"DoubleAccumulator:${doubleAccumulator.value}")
/*
* TODO 使用 CollectionAccumulator
* 将元素添加到list中去
* */
val collectAccumulator: CollectionAccumulator[Int] = sc.collectionAccumulator[Int]("My ")
intRdd.foreach(x => collectAccumulator.add(x))
println(s"CollectionAccumulator:${collectAccumulator.value}")
/*
* TODO 使用自定义累加器
* 将元素添加到Set中去
*
* 实现步骤:
* 1.根据业务逻辑实现自定义累加器实现类
* 2.向spark环境中注册自定义累加器
* 3.使用自定义累加器
*
* */
val setAccumulator = new SetAccumulator[Int]()
sc.register(setAccumulator, "My SetAccumulator")
intRdd.foreach(x => setAccumulator.add(x))
println(s"SetAccumulator:${setAccumulator.value}")
sc.stop()
}
自定义累加器:
/*
* 自定义累加器
* TODO 并未考虑线程安全的问题,实际使用时需添加这部分的判断
*
* */
class SetAccumulator[T] extends AccumulatorV2[T, collection.mutable.Set[T]] {
/* 定义可变Set */
var set = collection.mutable.Set[T]()
/* 判断 累加器是否为初始状态 */
override def isZero: Boolean = set.isEmpty
/*
* 获取当前累加器的 新副本
* 每个变量(累加器)的副本会发送到每个Task
* */
override def copy(): AccumulatorV2[T, mutable.Set[T]] = new SetAccumulator
/*
* 重置累加器(清空累加器)
* */
override def reset(): Unit = Nil
/*
* TODO 分区内累加规则(Task内)
* 获取数据并进行累加
* 根据指定的规则,向累加器中添加元素
* */
override def add(v: T): Unit = {
set += v
}
/*
* TODO 分区间累加规则
* 合并多个累加器副本
* */
override def merge(other: AccumulatorV2[T, mutable.Set[T]]): Unit = {
this.value ++= other.value
}
override def value: mutable.Set[T] = set
}
执行结果:
3. 累加器和reduce、fold算子的区别
重点关注:
1.累加器并不是调优操作,并不会带来效率上的提升
2.累加器在Executor端做add操作(累加器副本做更新),在Driver端做merge操作(合并多个Task中的累加器副本)
示例代码:
test("对比累加器和reduce、fold算子效率问题") {
/*
* TODO 思考: 累加器和reduce、fold算子的区别
* */
// 初始化 spark配置实例
val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
// 初始化 spark环境对象
val sc: SparkContext = new SparkContext(sparkconf)
val intRdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
// 查看每个分区的内容
intRdd.mapPartitionsWithIndex(
(i, iter) => {
println(s"分区编号$i :${iter.mkString(" ")}");
iter
}
).collect()
val accum: LongAccumulator = sc.longAccumulator("My Accumulator")
intRdd.foreach(x => accum.add(x))
println(s"累加器结果:${accum.value}")
println("----reduce算子----------------------")
val resultByReduce = intRdd.reduce(
(v1, v2) => {
println(s"$v1 + $v2 = ${v1 + v2}")
v1 + v2
}
)
println(s"reduce算子结果:${resultByReduce}")
println("----reduce算子----------------------")
val resultByFlod = intRdd.fold(0)(
(v1, v2) => {
println(s"$v1 + $v2 = ${v1 + v2}")
v1 + v2
}
)
println(s"resultByFlod:${resultByFlod}")
while (true) {}
// http://localhost:4040/stages/stage/?id=1&attempt=0
sc.stop()
}
执行结果:
累加器并未对计算效率带来提升
参考链接:
传送门1
传送门2
官网链接