用一个例子告诉你 怎样在spark中创建累加器

news2024/9/28 9:32:21

目录

1.说明

1.1 什么是累加器

1.2 累加器的功能

2. 使用累加器

3. 累加器和reduce、fold算子的区别


1.说明

1.1 什么是累加器

累加器是Spark提供的一个共享变量(Shared Variables)
    默认情况下,如果Executor节点上使用到了Driver端定义的变量(通过算子传递)
    算子会将该变量的副本发送的每个Task任务,但是并不会将Task任务对副本变量的修改返回给Driver端
    但是Spark为我们提供了一个共享变量(累加器),允许Driver端和Task之间共享一个变量

1.2 累加器的功能

    累加器用来将Executor端变量的信息聚合到Driver端
    在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一个新的副本,每个Task更新这些副本的值以后,会再返回给Driver端进行merge,得到最终的值


2. 使用累加器

spark中为我们提供了三个常用的累加器,并且支持我们根据自己业务需求来实现自定义累加器类

代码示例:

  test("使用spark自带的累加器") {
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    /*
    *  TODO 使用 LongAccumulator
    *  功能:
    *     对 整数类型的元素做累加
    * */
    val intRdd: RDD[Int] = sc.makeRDD(List(1, 2, 2, 3, 3, 4, 5, 6, 7, 8, 9))
    val accum: LongAccumulator = sc.longAccumulator("My LongAccumulator")
    intRdd.foreach(x => accum.add(x))
    println(s"LongAccumulator:${accum.value}")

    /*
    *  TODO 使用 DoubleAccumulator
    *  功能:
    *     对 浮点类型的元素做累加
    *
    * */
    val doubleRdd: RDD[Double] = sc.makeRDD(List(1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1))
    val doubleAccumulator: DoubleAccumulator = sc.doubleAccumulator("My DoubleAccumulator")
    doubleRdd.foreach(x => doubleAccumulator.add(x))
    println(s"DoubleAccumulator:${doubleAccumulator.value}")


    /*
    * TODO 使用 CollectionAccumulator
    *    将元素添加到list中去
    * */
    val collectAccumulator: CollectionAccumulator[Int] = sc.collectionAccumulator[Int]("My ")
    intRdd.foreach(x => collectAccumulator.add(x))
    println(s"CollectionAccumulator:${collectAccumulator.value}")

    /*
    * TODO 使用自定义累加器
    *   将元素添加到Set中去
    *
    * 实现步骤:
    *     1.根据业务逻辑实现自定义累加器实现类
    *     2.向spark环境中注册自定义累加器
    *     3.使用自定义累加器
    *
    * */
    val setAccumulator = new SetAccumulator[Int]()
    sc.register(setAccumulator, "My SetAccumulator")
    intRdd.foreach(x => setAccumulator.add(x))
    println(s"SetAccumulator:${setAccumulator.value}")


    sc.stop()
  }

自定义累加器:

/*
* 自定义累加器
* TODO 并未考虑线程安全的问题,实际使用时需添加这部分的判断
*
* */
class SetAccumulator[T] extends AccumulatorV2[T, collection.mutable.Set[T]] {
  /* 定义可变Set */
  var set = collection.mutable.Set[T]()

  /* 判断 累加器是否为初始状态 */
  override def isZero: Boolean = set.isEmpty

  /*
  * 获取当前累加器的 新副本
  * 每个变量(累加器)的副本会发送到每个Task
  * */
  override def copy(): AccumulatorV2[T, mutable.Set[T]] = new SetAccumulator

  /*
  * 重置累加器(清空累加器)
  * */
  override def reset(): Unit = Nil

  /*
  * TODO 分区内累加规则(Task内)
  *     获取数据并进行累加
  *     根据指定的规则,向累加器中添加元素
  * */
  override def add(v: T): Unit = {
    set += v
  }

  /*
  * TODO 分区间累加规则
  *      合并多个累加器副本
  * */
  override def merge(other: AccumulatorV2[T, mutable.Set[T]]): Unit = {
    this.value ++= other.value

  }

  override def value: mutable.Set[T] = set
}

执行结果:


3. 累加器和reduce、fold算子的区别

重点关注:
      1.累加器并不是调优操作,并不会带来效率上的提升
      2.累加器在Executor端做add操作(累加器副本做更新),在Driver端做merge操作(合并多个Task中的累加器副本)

示例代码:

  test("对比累加器和reduce、fold算子效率问题") {
    /*
    * TODO 思考: 累加器和reduce、fold算子的区别
    * */

    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    val intRdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

    // 查看每个分区的内容
    intRdd.mapPartitionsWithIndex(
      (i, iter) => {
        println(s"分区编号$i :${iter.mkString(" ")}");
        iter
      }
    ).collect()

    val accum: LongAccumulator = sc.longAccumulator("My Accumulator")

    intRdd.foreach(x => accum.add(x))

    println(s"累加器结果:${accum.value}")

    println("----reduce算子----------------------")
    val resultByReduce = intRdd.reduce(
      (v1, v2) => {
        println(s"$v1 + $v2 = ${v1 + v2}")
        v1 + v2
      }
    )
    println(s"reduce算子结果:${resultByReduce}")

    println("----reduce算子----------------------")
    val resultByFlod = intRdd.fold(0)(
      (v1, v2) => {
        println(s"$v1 + $v2 = ${v1 + v2}")
        v1 + v2
      }
    )
    println(s"resultByFlod:${resultByFlod}")


    while (true) {}
    // http://localhost:4040/stages/stage/?id=1&attempt=0
    
    sc.stop()
  }

执行结果:

累加器并未对计算效率带来提升 

参考链接:

传送门1

传送门2

官网链接
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/399180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis常用命令及数据类型参数

1. 针对于string SET key value / GET key SET k1 v1 GET k1 // v1String是二进制安全的,是可变长度的, 底层类似于ArrayList 是可扩容的,最大存储内存为 512MB。 2. 判断key中是否存在某个内容 EXISTS key SET k1 v1 EXISTS k1 // …

Noah-MP陆面过程模型建模方法与站点、区域模拟

陆表过程的主要研究内容以及陆面模型在生态水文研究中的地位和作用 熟悉模型的发展历程,常见模型及各自特点; Noah-MP模型的原理 Noah-MP模型所需的系统环境与编译环境的搭建方法您都了解吗?? linux系统操作环境您熟悉吗&…

Linux驱动中的fasync(异步通知)和fsync

一、fsync用来同步设备的写入操作,考虑把一块设局写入到硬盘的操作,如果使用write函数,函数返回后只能保证数据被写入到驱动程序或者内核管理的数据缓存中,而无法保证数据被真正写入到硬盘的存储块里。但是fync可以做到这一点&…

查找、排序、二叉树的算法,统统记录于此。

文章目录一、查找1. 无序表的顺序查找2. 折半查找3. 分块查找4. 二叉排序树BST5. 哈希表查找二、排序1. 不带哨兵的直接插入排序2. 带哨兵的直接插入排序3. 带哨兵、折半查找的直接插入排序4. 希尔排序5. 冒泡排序6. 快速排序7. 选择排序8. 堆排序9. 归并排序二叉树1. 递归先序…

八,iperf3源代码分析:状态机及状态转换过程--->运行正向TCP单向测试时的客户端代码

本文目录一、测试用命令二、iperf3客户端状态机中各个状态解析状态机迁移图运行正向TCP单向测试时的客户端的状态列表三、iperf3客户端状态机迁移分析A-初始化测试对象(NA--->初始化状态):B-建立控制连接,等待服务端PARAM_EXCHANGE的指令&…

西电机试数据结构核心算法与习题代码汇总(机考真题+核心算法)

文章目录前言一、链表问题1.1 反转链表1.1.1 题目1.1.2 代码1.2 多项式加减法1.2.1 题目1.2.2 代码二、队列和栈2.1 学生退学2.1.1 问题2.1.2 代码三、矩阵和串题目3.1 矩阵对角线求和3.1.1 问题3.1.2 代码四、排序问题4.1 多元素排序4.1.1 问题4.1.2 代码五、二叉树5.1 相同二…

synchronize优化偏向锁

偏向锁 轻量级锁在没有竞争时(只有自己一个线程),仍然会尝试CAS替换mark word; 会造成一定的性能的损耗; JDK6之中引入了偏向锁进行优化,第一次使用时线程ID注入到Mark word中,之后重入不再进…

旅游预约APP开发具有什么优势和功能

旅游活动目前正在作为广大用户休闲娱乐的一个首选内容,不仅是公司团建活动可以选择旅游,而且一些节假日也可以集结自己的亲朋好友来一次快乐有趣的旅游活动,随着当代人对于旅游的需求呈现上升的趋势,也让旅游预约APP开发开始流行并…

大家都在用哪些研发流程管理软件?

全球知名的10款流程管理软件分享:1.IT/研发项目流程管理:PingCode;2.通用项目流程管理:Worktile;3.销售流程管理:Salesforce Workflow;4.合同流程管理:Agiloft;5.IBM Bus…

20230308 APDL Lsdyna结构学习笔记

可以用鼠标右键进行结构的旋转视图。 一、编辑材料 输入参数分别为: 密度; 弹性模量; 泊松比; 屈服应力; 切线模量 由于模型是分块建立的,这里需要把模型进行粘接 点击booleans(布尔工具) 点击Glue、areas,结构物是由面单元构成的

ReactDOM.render函数内部做了啥

ReactDOM.render函数是整个 React 应用程序首次渲染的入口函数&#xff0c;它的参数是什么&#xff0c;返回值是什么&#xff0c;函数内部做了什么&#xff1f; ReactDOM.render(<App />, document.getElementById("root")); 前序 首先看下首次渲染时候&…

二叉树OJ题目详解

根据二叉树创建字符串 采用前序遍历的方式&#xff0c;将二叉树转换成一个由括号和数字组成的字符串。 再访问每一个节点时&#xff0c;需要分情况讨论。 如果这个节点的左子树不为空&#xff0c;那么字符串应加上括号和左子树的内容&#xff0c;然后判断右子树是否为空&#x…

VBA小模板,跨表统计的2种写法

目标 1 统计一个excel 文件里&#xff0c;多个sheet里的内容2 有的统计需求是&#xff0c;每个表只单表统计&#xff0c;只是进行批量操作3 有的需求是&#xff0c;多个表得某些行列累加等造出来得文件 2 实现方法1 &#xff08;可能只适合VBAEXCEL&#xff0c;不太干净的写法…

一文带你了解,前端模块化那些事儿

文章目录前端模块化省流&#xff1a;chatGPT 总结一、参考资料二、发展历史1.无模块化引出的问题:横向拓展2.IIFE3.Commonjs(cjs)4.AMD引出的问题&#xff1a;5.CMD6.UMD7.ESM往期精彩文章前端模块化 省流&#xff1a;chatGPT 总结 该文章主要讲述了前端模块化的发展历史和各个…

css伪类和伪元素的区别

文章目录什么是css伪类和伪元素css伪类和伪元素有什么用&#xff1f;css伪类的具体使用常见的伪类伪元素的具体使用常见的伪元素什么是css伪类和伪元素 伪类和为元素是两个完全不同且重要的概念&#xff0c;它们的作用是给元素添加一些特殊的效果或样式 伪类用于选择某个元素的…

Kalman Filter in SLAM (6) ——Error-state Kalman Filter (EsKF, 误差状态卡尔曼滤波)

文章目录0.前言1. IMU的误差状态空间方程2. 误差状态观测方程3. 误差状态卡尔曼滤波4. 误差状态卡尔曼滤波方程细节问题0.前言 这里先说一句&#xff1a;什么误差状态卡尔曼&#xff1f;完全就是在扯淡&#xff01; 回想上面我们推导的IMU的误差状态空间方程&#xff0c;其实…

乐山持点科技:抖客推广准入及准出管理规则

抖音小店平台新增《抖客推广准入及准出管理规则》&#xff0c;本次抖音规则具体如下&#xff1a;第一章 概述1.1 目的及依据为维护精选联盟平台经营秩序&#xff0c;保障精选联盟抖客、商家、消费者等各方的合法权益;根据《巨量百应平台服务协议》、《“精选联盟”服务协议(推广…

【GNN/深度学习】常用的图数据集(资源包)

【GNN/深度学习】常用的图数据集&#xff08;图结构&#xff09; 文章目录【GNN/深度学习】常用的图数据集&#xff08;图结构&#xff09;1. 介绍2. 图数据集2.1 Cora2.2 Citeseer2.3 Pubmed2.4 DBLP2.5 ACM2.6 AMAP & AMAC2.7 WIKI2.8 COCS2.9 BAT2.10 EAT2.11 UAT2.12 C…

第十三届蓝桥杯省赛Python大学B组复盘

目录 一、试题B&#xff1a;寻找整数 1、题目描述 2、我的想法 3、官方题解 4、另解 二、试题E&#xff1a;蜂巢 1、题目描述 2、我的想法 3、官方题解 三、试题F&#xff1a;消除游戏 1、题目描述 2、我的想法&#xff08;AC掉58.3%&#xff0c;剩下全超时&#x…

Substrate 基础教程(Tutorials) -- 监控节点指标

Substrate 公开有关网络操作的度量。例如&#xff0c;您可以收集有关您的节点连接了多少个对等节点、您的节点使用了多少内存以及正在生成的块数量的信息。为了捕获和可视化Substrate节点公开的度量&#xff0c;您可以配置和使用Prometheus和Grafana等工具。本教程演示如何使用…