累加器是分布式共享只写变量
一、累加器功能
累加器可以用来把 Executor 端的变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge
二、累加器类型
1. 系统累加器
/**
常见的系统累加器:longAccumulator/doubleAccumulator/collectionAccumulator
说明:累加器一般放在行动算子中进行操作
*/
object TestRDDAcc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Acc")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 创建累加器
val accSum = sc.longAccumulator("sum")
rdd.foreach(num => {
accSum.add(num)
})
println(accSum.value)
sc.stop()
}
}
三、自定义累加器
自定义累加器实现 WordCount 案例,避免 shuffle 操作
/**
1.继承 AccumulatorV2[IN, OUT] 抽象类,定义输入输出的泛型类型
1.1 IN 表述累加器 add 的数据的类型
1.2 OUT 表示累加器 value 的返回类型
2.重写累加器的抽象方法
*/
object TestAccWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WCAcc")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(
"hello", "hive", "hello", "spark"
))
// 创建自定义累加器
val wcAcc = new MyAccumulator()
// 向 spark 进行注册
sc.register(wcAcc, "wordCountAcc")
// 循环遍历 rdd
rdd.foreach(word => {
// 使用累加器
wcAcc.add(word)
})
// 输出累加器的值
println(wcAcc.value)
sc.stop()
}
}
/*
自定义累加器
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
// 定义累加器的返回结果 Map
private var resultMap = mutable.Map[String, Long]()
// 判断是否为初始状态
override def isZero: Boolean = resultMap.isEmpty()
// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
this
}
// 重置累加器
override def reset(): Unit = resultMap.clear()
// 获取累加器输入的数据进行操作
override def add(word: String): Unit = {
// 向 resultMap 中添加新值或累加旧值
val count = resultMap.getOrElse(word, 0L) + 1
resultMap.update(word, count)
}
// 合并多个累加器的结果
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
other.value.foreach({
case (word, count) => {
val newCount = this.resultMap.getOrElse(word, 0L) + 1
this.resultMap.update(word, newCount)
}
})
}
// 返回累加器的结果
override def value: mutable.Map[String, Long] = resultMap
}