Spark / Java - atomic.LongAccumulator 与 Spark.util.LongAccumulator 计数使用

news2025/1/16 14:02:53

目录

一.引言

二.atomic.LongAccumulator

1.构造方法

2.使用方法

3.创建并使用

三.Spark.util.LongAccumulator

1.构造方法

2.使用方法


一.引言

使用 Spark 进行大数据分析或相关操作时,经常需要统计某个步骤或多个步骤的相对耗时或数量,java.util 与 spark.util 都提供了原子计数器。如果是 spark on Local ,可以直接初始化 object 构建 java.util.concurrent.atomic.LongAccumulator 实现原子计数,如果是 spark on Yarn,则可以通过 org.apache.spark.util.LongAccumulator 实现累计计数。

二.atomic.LongAccumulator

1.构造方法

构造一个 java.util.concurrent.atomic.LongAccumulator 需要两个参数:

- LongBinaryOperator

function 为对应的累加器函数,其接受两个参数 left 与 right,用户在函数内定义累加逻辑,其中 left 为当前值,right 为更新的值。官方提示该累加器函数应该是无副作用的,因为当由于线程之间的争用而导致尝试更新失败时,可以重新应用它。

- long

identify 为当前值的默认值。

例入求取当前计算值的最大值,则 function 可以使用 max 函数,identify 使用 Long.MIN_VALUE。

2.使用方法

类内提供了4种方法:

- accumulate

根据 function 与当前值更新

- get

返回当前值。返回的值不是原子快照;在没有并发更新的情况下调用会返回准确的结果,但在计算值时发生的并发更新可能不会被合并。

- reset

重置保持更新为标识值的变量。此方法可能是创建新更新程序的有用替代方法,但仅在没有并发更新时有效。只有在知道没有线程正在并发更新时才应该使用它。

- getThenReset

等效于重置。例如,该方法可以应用于多线程计算之间的静态点。如果与此方法同时存在更新,则不能保证返回的值是重置之前发生的最终值。

开头提到的相对计数问题而言,主要使用的方法为 accumulate 与 get,如果存在多轮次计算或重置的情况则需要使用 reset 或者 getThenReset 方法,不过调用时尽量避免并发更新,否则可能出现结果的失真。

3.创建并使用

A.创建

这里实现了最基本的 add 操作,有点类似 LongAdder,随后将需要统计的耗时与数量对应的 key 存入 countMap,后续不同操作累计即可。

  import java.util.concurrent.atomic.LongAccumulator

  val accArray = Array("cost0", "num0",
    "cost1", "num1",
    "cost2", "num2",
    "cost3", "num3",
    "cost4", "num4",
    "cost5", "num5",
    "cost6", "num6")
  val countMap = new mutable.HashMap[String, LongAccumulator]()
  accArray.foreach(feat => {

    val longBinaryOperator = new LongBinaryOperator() {
      @Override
      def applyAsLong(left: Long, right: Long): Long = {
        left + right
      }
    }

    // 第1个参数是一个双目运算器对象,第2个参数是累加器的初始值。
    val longAccumulator: LongAccumulator = new LongAccumulator(longBinaryOperator, 0)
    countMap(feat) = longAccumulator
  })

B.累加并获取

获取结果时尽量避免并发更新,否则可能出现结果的失真。

      val st = System.currentTimeMillis()
      
                ... doSomeThing ...

      val cost = System.currentTimeMillis() - st

      // 存储
      Uobject.countMap("cost").accumulate(cost)
      Uobject.countMap("num").accumulate(1L)      

      // 获取
      Uobject.countMap("cost").get()
      Uobject.countMap("num").get()

三.Spark.util.LongAccumulator

atomic.LongAccumulator 适合 Local 模式或者非 Spark 类型作业统计平均数,如果在分布式集群情况下想要获取全局的计数可以使用 Spark.util.LongAccumulator

1.构造方法

构造时只需要通过 SparkContext 即可:

    val spark = SparkSession
      .builder
      .appName(AppName)
      .getOrCreate()

    val sc = spark.sparkContext

    // 初始化
    val cost = sc.longAccumulator

    // 累加值,类型为 Long
    cost.add(1L)

调用该方法会初始化并注册当前计数器:

 

2.使用方法

相比前面的 atomic.LongAccumulator,Spark.util.LongAccumulator 的 add 累加方法会同时记录 count +1,所以相比上面同时初始化 cost 和 num 的两个计数器相比,这里可以节省一倍操作量。

add 后我们可以通过 .value 方法获取其计数器的值,也可以通过 avg 方法获取当前统计量的均值。如果需要同时统计多个分类的计数器,可以通过 HashMap 构造多个 KV 组合,根据不同 key 累加即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/153033.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java设计模式-适配器模式Adapter

介绍 适配器模式(Adapter Pattern)将某个类的接口转换成客户端期望的另一个接口表示,主的目的是兼容性,让原本 因接口不匹配不能一起工作的两个类可以协同工作。其别名为包装器(Wrapper)适配器模式属于结构型模式主要分为三类:类适配器模式、…

树莓派自带的python3.9->python3.7

卸载python3.9:sudo apt-get remove python3卸载之后一些包可以使用sudo apt autoremove这个命令删除卸载成功如果出现问题后续再来更新(出现问题后后续安装python也会失败)(先不要安装先看)安装python3.7:…

C语言第30课笔记

1.strerror(errno要包含头文件errno.h) 2.perror头文件为stdio.h 3.一些字符函数 4.字母大小写转换函数 5.memmove理论上是memcpy的升级版(可以自己拷贝自己)。 6.匿名结构体类型在类型创建好了之后直接创建变量,只能用一次。两个完全相同的匿名结构体类型&#xf…

【八】Netty HTTP协议--文件服务系统开发

Netty HTTP协议--文件服务系统开发介绍HTTP-文件系统场景描述流程图代码展示netty依赖服务端启动类 HttpFileServer服务端业务逻辑处理类 HttpFileServerHandler结果展示错误路径文件夹路径文件路径遗留bugbug版本总结介绍 由于Netty天生是异步事件驱动的架构,因此…

java EE初阶 — Synchronized 的原理

文章目录1. Synchronized 的优化操作1.1 偏向锁1.2 轻量级锁(自旋锁)1.3 重量级锁2. 其他的优化操作2.1 锁消除2.2 锁粗化3. 相关面试题1. Synchronized 的优化操作 两个线程针对同一个对象加锁,就会产生阻塞等待。 Synchronized 内部其实还有…

ubuntu docker elasticsearch kibana安装部署

ubuntu docker elasticsearch 安装部署 所有操作尽量在root下操作. 安装docker 1. 由于是基于宝塔面板安装的所以简答的点击操作即可完成安装. 我这里已经是正常的安装好了. 2.dcoker 镜像加速 https://cr.console.aliyun.com/cn-hangzhou/instances访问这个网址进去进行了…

快速上手Golang

自动推导赋值:自动推导赋值Go中 不同的数据类型不能进行计算对于浮点型默认都是float64 精确到小数点后15位单引号的 为字节类型 一位0~255的字符转换双引号的 为字符串类型多重赋值多重赋值a,b:1,2格式输出格式输出printf“%3d”三位整数,不满足三位时头部补空格“…

录制课程用什么软件好?3款超好用的课程视频录课软件

在互联网技术的飞速发展下,在线教学已经成为一种新型的教学形式,与传统的教学方法相比,在线教学具有低成本、突破地域、时间灵活、形式多样的教学方式。那录制课程用什么软件好?今天小编就跟大家分享3款超好用的课程视频录课&…

认真研究MySQL的主从复制(一)

【1】主从复制概述 ① 如何提升数据库并发能力 在实际工作中,我们常常将Redis作为缓存与MySQL配合使用,当有请求的时候,首先会从缓存中进行查找。如果存在就直接取出,如果不存在再访问数据库。这样就提升了读取的效率&#xff0…

中国数据库的诸神之战

作者 | 唐小引出品 | 《新程序员》编辑部“现在的数据库产品实在是太多了!”前几天,我和深耕数据库/大数据近 30 年的卢东明老师相聊时,他发出了这样的感慨。将包括 DB-Engines Ranking 以及国内数据库排行等在内的数据库产品列表进行汇总&am…

快速入门Freemarker模块引擎技术

1、 freemarker 介绍 ​ FreeMarker 是一款 模板引擎: 即一种基于模板和要改变的数据, 并用来生成输出文本(HTML网页,电子邮件,配置文件,源代码等)的通用工具。 它不是面向最终用户的,而是一个Java类库&am…

采场的车辆管理及卸料点计数管理有哪些难题需要解决

近期,安环部检查采矿区域工程车辆驾驶人员情况时,发现有部分驾驶员及工作人员存在违规顶替情况,有非注册备案人员驾驶矿用工程车辆违规作业。为了进行统一有效的人员车辆管理,同时能监督安全员定期对采矿作业区进行安全巡查&#…

Camtasia Studio2023喀秋莎新增功能及电脑配置要求介绍

Camtasia Studio2023具有强大的视频播放和视频编辑功能,录制屏幕后,根据时间轴对视频剪辑进行各种标记、媒体库、画中画、画中画、画外音当然,也可以导入现有视频并对其进行编辑操作。编辑完成后,可以将录制的视频输出为最终的视频…

光伏废水深度除氟装置,用于高盐废水除氟的工艺

光伏行业废水根据生产产品可细分为单品硅生产线排水、多品硅生产线排水。其生产工序中有污水排放的工段主要是:制绒和清洗工段。废水中的主要污染物为由异丙醇引起的高浓度COD、氟离子及酸碱污染,其中以含异丙醇的废水一直是水处理中的难题。如果不对废水…

【自学Python】Python input()函数

Python input()函数 Python input()函数教程 在 Python 中,input() 函数用于获取用于的输入,并给出提示。input() 函数,总是返回 string 类型,因此,我们可以使用 input() 函数,获取用户输入的任何数据类型…

【C进阶】第十五篇——内存函数

memcpy - 内存拷贝1 函数介绍 模拟实现 memmove - 内存拷贝2 函数介绍 模拟实现 memcmp - 内存比较 memset - 内存设置 memcpy - 内存拷贝1 函数介绍 void *memcpy( void *dest, const void *src, size_t count );memcpy函数是一个用于拷贝两个不相关的内存块的函数。…

4-2文件管理-文件系统实现

文章目录一.文件系统层次结构二.文件系统的全局结构三.虚拟文件系统与文件系统挂载(安装)(一)虚拟文件系统(二)文件系统挂载(安装)一.文件系统层次结构 (1)用…

密码学_MD5算法

MD5即Message-Digest Algorithm 5(信息-摘要算法5),用于确保信息传输完整一致。是计算机广泛使用的杂凑算法之一(又译摘要算法、哈希算法),主流编程语言普遍已有MD5实现。 MD5算法具有以下特点&#xff1a…

php宝塔搭建部署实战易优cms皮具皮包手袋定制网站源码

大家好啊,我是测评君,欢迎来到web测评。 本期给大家带来一套php开发的易优cms皮具皮包手袋定制网站源码,感兴趣的朋友可以自行下载学习。 技术架构 PHP7.2 nginx mysql5.7 JS CSS HTMLcnetos7以上 宝塔面板 文字搭建教程 下载源码&a…

Java 日常开发记录

手动分页 非mybatis 自动分页 service 层 Overridepublic PageInfo<CfLogVo> cfLogList(CfLogQuery cfLogQuery) {if (StrUtil.isNotBlank(cfLogQuery.getRequest()) && cfLogQuery.getRequest().length() >100){throw new ServiceException("请求报文…