累加器 - 分布式共享写变量

news2025/1/11 18:59:14

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 概念
      • 注意:
      • 应用

概念

  因为RDD是可分区的,每个分区在不同的节点上运行,如果想要对某个值进行全局累加,就需要将每个task中的值取到然后进行累加,而各个Executor之间是不能相互读取对方数据的,所以就没办法在task里面进行最终累加结果的输出,所以就需要一个全局统一的变量来处理。

用下面的代码举例:

@Test
def test(): Unit = {

  import org.apache.spark.{SparkConf, SparkContext}

  val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[4]")
  val sc = new SparkContext(conf)

  var sum = 0

  val rdd = sc.parallelize(List(10,20,30,40,60))

  rdd.foreach(x=>{
    sum+=x
  })
    println(sum)
}

以上代码的输出结果是 0
分析:因为函数体外的代码是在Driver端执行的,函数体内的代码是在task里面执行的,而上述代码中创建sum变量是在Driver端创建的,函数体内的代码sum+=x是在task里面执行的,task里面对sum进行了累加,然后在Driver端打印sum的值,打印出来的还是Driver端sum的初始值0


  以下代码是用累加器实现的相同功能
@Test
def test(): Unit = {

  import org.apache.spark.{SparkConf, SparkContext}

  val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[4]")
  val sc = new SparkContext(conf)

  // 创建累加器从sparkContext中
  val acc = sc.longAccumulator("sumAcc")


  val rdd = sc.parallelize(List(10,20,30,40,60))

  //    rdd.foreach(x=>{
  //      sum+=x
  //    })

  // 将要累加的变量放到累加器中
    rdd.foreach(x=>{acc.add(x)})

  // 打印累加器的值
  println(acc.value)
  
}

上面代码的输出结果是160
分析:上述代码中从SparkContext中创建了longAccumulator累加器,起名为sumAcc,然后在task中执行累加的时候调用改累加器的add()方法将要累加的变量加入到累加器中,最后在driver端调用累加器的value方法取出累加器的值,所以就会得出160

总结
累加器的创建:val acc = sc.longAccumulator(“sumAcc”)
向累加器中添加数据:acc.add(x)
取出累加器中的数据:acc.value

注意:

  • Executor端不要获取累加器的值,那样取到的那个值不是累加器最终的累加结果,因为累加器是一个分布式共享的写变量
  • 使用累加器时(也就是向累加器中添加要累加的变量的时候)要将其放在行动算子中。因为在行动算子中这个累加器的值可以保证绝对可靠,如果在转换算子中使用累加器,假如这个spark应用程序有多个job,每个job执行的时候都会执行一遍转换算子,那么这个累加器就会被累加多次,这个值也就不准确了。所以累加器要在行动算子中使用。

应用

  累加器在某些场景下可以避免shuffle。spark中自带的累加器有三个longAccumulator()doubleAccumulator()collectionAccumulator[](),比较常用的是collectionAccumulator[]()

我们用累加器实现WordCount
下面是正常的WordCount代码:

  def main(args: Array[String]): Unit = {

    import org.apache.spark.{SparkConf, SparkContext}

    val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1 = sc.textFile("datas/wc.txt")

    val rdd2 = rdd1.flatMap(_.split(" "))

    val rdd3 = rdd2.map((_, 1))

    rdd3.reduceByKey(_ + _).collect().foreach(println)
    
    Thread.sleep(100000000)
  }

结果:
在这里插入图片描述查看web页面:
在这里插入图片描述

下面用collectionAccumulator[]()累加器实现,用累加器替换掉reduceByKey

def main(args: Array[String]): Unit = {

  import org.apache.spark.{SparkConf, SparkContext}

  val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[*]")
  val sc = new SparkContext(conf)

//    定义集合累加器,并声明累加器中存放的元素类型    可变的map,可变就是可以直接在原来的集合上修改不会返回新的集合
  val acc = sc.collectionAccumulator[mutable.Map[String,Int]]("accNum")

  val rdd1 = sc.textFile("datas/wc.txt")

  val rdd2 = rdd1.flatMap(_.split(" "))

  val rdd3 = rdd2.map((_, 1))

//    rdd3.reduceByKey(_ + _).collect().foreach(println)

  // 遍历每个二元元组
  rdd3.foreachPartition(it=>{
    // 定义一个map用来存放一个分区的累加结果
    val resMap =  mutable.Map[String,Int]()

    // 遍历分区的每个元素(hadoop,1)
    it.foreach(y=>{
      // 从resMap中取这个元素的key看有没有,没有的话返回0
      val num = resMap.getOrElse(y._1, 0)
      // 将取到的数和元素的标记1累加
      val num2 = num+y._2
      // 写回到map中
      resMap.put(y._1,num2)
    })

    // 将每个分区的累加结果添加到累加器中
    acc.add(resMap)

  })


  //    println(rdd4.collect())
  // 在driver端取到累加器的结果,这个结果是Java类型的list
  val res = acc.value

  // 添加 scala.collection.JavaConverters._ 用里面的asScala将Java类型的list转为Scala类型
  import scala.collection.JavaConverters._

  val scalaList = res.asScala

  //    将list里面的map压掉,剩下()元组
  val flatten = scalaList.flatten

  //  以元组的key分组
  val grouped = flatten.groupBy(_._1)

  // 得出每个分组内的次数和
  val res_end = grouped.map(x => {
    // 将元组中的第二个次数来一次map然后sum,取出每个单词的频率
    val sum = x._2.map(y => y._2).sum
    (x._1, sum)
  })

  res_end.foreach(println)

  Thread.sleep(100000000)
}

结果:
在这里插入图片描述查看web页面:
在这里插入图片描述
通过对比两次的web页面可以发现,使用reduceByKey会有一次shuffle,使用累加器替换掉reduceByKey实现相同的功能,没有产生shuffle,因此累加器在某些聚合场景下可以避免掉shuffle从而在一定程度上提高性能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1448725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux笔记】进程间通信之管道

一、匿名管道 我们在之前学习进程的时候就知道了一个概念,就是进程间是互相独立的,所以就算是两个进程是父子关系,其中一个进程退出了也不会影响另一个进程。 也因为进程间是互相独立的,所以两个进程间就不能直接的传递信息或者…

C++ 特殊类的实现

一、请设计一个类,不能被拷贝 拷贝只会放生在两个场景中:拷贝构造函数以及赋值运算符重载,因此想要让一个类禁止拷贝,只需让该类不能调用拷贝构造函数以及赋值运算符重载即可。 在C98中:将拷贝构造函数与赋值运算符重载…

【Java记】数据类型与变量

一、数据类型 在Java中数据类型主要分为两类:基本数据类型和引用数据类型。基本数据类型有四类八种: 四类:整型、浮点型、字符型以及布尔型八种: 数据类型 关键字 内存占用 范围 字节型 byte 1 字节 -128~ 127 短整型 …

【C语言】数据结构#实现堆

目录 (一)堆 (1)堆区与数据结构的堆 (二)头文件 (三)功能实现 (1)堆的初始化 (2)堆的销毁 (3)插入数据 …

Vue3+Vite+TS+Pinia+ElementPlus+Router+Axios创建项目

目录 初始项目组成1. 创建项目1.1 下载项目依赖1.2 项目自动启动1.3 src 别名设置vite.config.ts配置文件tsconfig.json配置若新创项目ts提示1.4 运行测试2. 清除默认样式2.1 样式清除代码下载2.2 src下创建公共样式文件夹`style`2.3 main.js中引入样式2.4 安装`sass`解析插件2…

SpringCloud之Eureka注册中心和负载均衡

SpringCloud之Eureka注册中心和负载均衡 微服务技术栈认识微服务单体架构分布式架构微服务 微服务拆分及远程调用微服务拆分注意事项 Eureka注册中心提供者与消费者原理分析服务调用出现的问题Eureka的作用 使用流程1、搭建EurekaServer2、注册user-service3、在order-service完…

代码随想录算法训练营第三十一天 |基础知识,455.分发饼干,376.摆动序列,53.最大子序和(已补充)

基础知识: 题目分类大纲如下: #算法公开课 《代码随想录》算法视频公开课(opens new window):贪心算法理论基础!(opens new window),相信结合视频再看本篇题解,更有助于大家对本题的理解。 #什么是贪心 贪心的本质…

《区块链公链数据分析简易速速上手小册》第4章:交易数据分析(2024 最新版)

文章目录 4.1 解析交易输入和输出4.1.1 基础知识4.1.2 重点案例:追踪比特币交易4.1.3 拓展案例 1:以太坊交易的输入输出解析拓展案例1:以太坊交易的输入输出解析步骤1: 连接到以太坊网络步骤2: 获取交易数据步骤3: 解析交易输入结论 4.1.4 拓…

PyQt Python 使用 VTK ITK 进行分割 三维重建 医学图像可视化系统 流程

效果: 重建流程: 1. 输入 可以读取DICOM ,nii nrrd 等数据 设置读取器以加载 DICOM 图像系列。 使用 itk::GDCMImageIO 作为 DICOM 图像的输入输出接口。 使用 itk::GDCMSeriesFileNames 获取指定路径下的所有 DICOM 文件名。 使…

Zabbix图形中文乱码问题(显示口口)解决办法

一 切换到zabbix安装目录assets/fonts下,下载字体 这里使用是nginxphp作为zabbix-web展示,使用find 命令查找 进入目录下,将原有字体备份,下载msyh字体 wget https://www.xxshell.com/download/sh/zabbix/ttf/msyh.ttf 二 修改配…

AI换脸(视频换脸)讲解-1

AI换脸是一种人工智能技术,它可以将一个人的面部表情和特征应用到另一个人的脸部上,以创建逼真的视频和图像。 首先,AI换脸技术需要大量的训练数据。这些数据通常是由多个人以不同的表情、姿态、光照条件下的照片或视频组成。通过使用人工智…

DolphinScheduler安装与配置

DolphinScheduler概述 Apache DolphinScheduler是一个分布式、易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。 DolphinScheduler的主要角色如下: MasterServer采用分布式无…

Linux第46步_通过“添加自定义菜单”来学习menuconfig图形化配置原理

通过“添加自定义菜单”来学习menuconfig图形化配置原理,将来移植linux要用到。 自定义菜单要求如下: ①、在主界面中添加一个名为“My test menu”,此菜单内部有一个配置项。 ②、配置项为“MY TESTCONFIG”,此配置项处于菜单“My test m…

C# winfrom实例:四路激光测距雷达数据采集和波形图绘制

1.所述产品 产品型号:TFmini Plus 相关资料下载地址:http://www.benewake.com/download 产品名称:TFmini Plus激光雷达模组制造商公司:北醒(北京)光子科技有限公司 2.产品功能:TFmini Plus是基…

编辑器的新选择(基本不用配置)

Cline 不用看网上那些教程Cline几乎不用配置。 点击设置直接选择Chinese, C直接在选择就行了。 Cline是一个很好的编辑器,有很多懒人必备的功能。 Lightly 这是一个根本不用配置的C编辑器。 旁边有目录,而且配色也很好,语言标准可以自己…

vue 获取 form表格 的值 的方法

vue 获取 form表格 的值 代码 let discountLastMoney this.form.getFieldValue(discountLastMoney)-0

tee漏洞学习-翻译-3:TrustZone exploit for MSM8974

原文:http://bits-please.blogspot.com/2015/08/full-trustzone-exploit-for-msm8974.html 在这篇博文中,我们将介绍利用上一篇文章中描述的 TrustZone 漏洞的完整过程。 在开发此漏洞时,我只使用了我值得信赖的(个人&#xff0…

Linux环境中的git

目录 1.要使用git,首先要安装git 2.首次使用git需要做的操作 3.git操作 1.要使用git,首先要安装git 指令:sudo yum install -y git 2.首次使用git需要做的操作 在gitee网页,在你的仓库中找到: 先将下面两行代码分别…

第12讲创建图文投票实现

创建图文投票实现 图文投票和文字投票基本一样&#xff0c;就是在投票选项里面&#xff0c;多了一个选项图片&#xff1b; <view class"option_item" v-for"(item,index) in options" :key"item.id"><view class"option_input&qu…

如何用 ChatGPT 做项目管理?

ChatGPT 可以通过创建和维护跨团队项目协作计划&#xff0c;让员工更容易理解他们的角色和职责。 这个协作计划里面会包括每个团队或个人要执行的具体任务&#xff0c;每个任务最后期限和任何事情之 间的依赖关系。 该场景对应的关键词库:(24 个) 项目管理、项目协作计划、跨…