详解 Spark 核心编程之累加器

news2025/1/14 1:16:48

累加器是分布式共享只写变量

一、累加器功能

​ 累加器可以用来把 Executor 端的变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge

在这里插入图片描述

二、累加器类型

1. 系统累加器

/**
常见的系统累加器:longAccumulator/doubleAccumulator/collectionAccumulator
说明:累加器一般放在行动算子中进行操作
*/
object TestRDDAcc {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("Acc")
    	val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        // 创建累加器
        val accSum = sc.longAccumulator("sum")
        
        rdd.foreach(num => {
        	accSum.add(num)    
        })
        
        println(accSum.value)
        
        sc.stop()
        
    }
}

三、自定义累加器

自定义累加器实现 WordCount 案例,避免 shuffle 操作

/**
	1.继承 AccumulatorV2[IN, OUT] 抽象类,定义输入输出的泛型类型
		1.1 IN 表述累加器 add 的数据的类型
		1.2 OUT 表示累加器 value 的返回类型
		
	2.重写累加器的抽象方法
*/
object TestAccWordCount {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("WCAcc")
    	val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(
        	"hello", "hive", "hello", "spark"
        ))
        
        // 创建自定义累加器
        val wcAcc = new MyAccumulator()
        
        // 向 spark 进行注册
        sc.register(wcAcc, "wordCountAcc")
        
        // 循环遍历 rdd
        rdd.foreach(word => {
            // 使用累加器
        	wcAcc.add(word)    
        })
        
        // 输出累加器的值
        println(wcAcc.value)
        
        sc.stop()
        
    }
}

/*
	自定义累加器
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    // 定义累加器的返回结果 Map
    private var resultMap = mutable.Map[String, Long]()
    
    // 判断是否为初始状态
    override def isZero: Boolean = resultMap.isEmpty()
    
    // 复制累加器
    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
        this
    }
    
    // 重置累加器
    override def reset(): Unit = resultMap.clear()
    
    // 获取累加器输入的数据进行操作
    override def add(word: String): Unit = {
    	// 向 resultMap 中添加新值或累加旧值
        val count = resultMap.getOrElse(word, 0L) + 1
        resultMap.update(word, count)
    }
    
    // 合并多个累加器的结果
    override def merge(other:  AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    other.value.foreach({
        case (word, count) => {
            val newCount = this.resultMap.getOrElse(word, 0L) + 1
            this.resultMap.update(word, newCount)
        }
    })
}
    
    // 返回累加器的结果
    override def value: mutable.Map[String, Long] = resultMap
    
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1791273.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

程序媛:拽姐

更多精彩内容在公众号。 最近都在玩梗图,我也来玩下拽姐的梗图。来说说拽姐做为程序媛的痛。 程序媛的痛不在于996,而在于无休止的攻关。拽姐刚入职听领导说攻关不多,一年也就一次,拽姐心中暗喜,觉得来对了地方。结果…

MySQL之查询性能优化(六)

查询性能优化 查询优化器 9.等值传播 如果两个列的值通过等式关联,那么MySQL能够把其中一个列的WHERE条件传递到另一列上。例如,我们看下面的查询: mysql> SELECT film.film_id FROM film-> INNER JOIN film_actor USING(film_id)-> WHERE f…

百度地图API 教程使用 嵌套到vue3项目中使用,能够定位并且搜索地点名称位置,反向解析获取经度和维度

文章目录 目录 文章目录 流程 小结 概要安装流程技术细节小结 概要 注册百度地图成为开发者: 登录百度账号 注册成功开始下一步 百度地图API是百度提供的一组开发接口,用于在自己的应用程序中集成地图功能。通过百度地图API,您可以实现地图…

1Panel 搭建 halo博客

线上服务器一直闲置,刷到视频 1Panel 能更好管理服务器,还能快速搭建博客,便上手试试,的确很方便,顺手记录一下。 零、准备工作 一台服务器(按需购买,此处准备的阿里云服务器一台,也…

负载均衡算法深度探析:F5技术在高效流量管理中的应用

传统的单一服务器模式下,随着用户请求量的增加,单个服务器可能会承受过重的压力,导致响应速度下降甚至系统崩溃,负载均衡技术应运而生。它广泛应用于各种软硬件系统中,将网络流量以某种算法合理分配给各个节点&#xf…

电阻、电容和电感测试仪设计

在现代化生产、学习、实验当中,往往需要对某个元器件的具体参数进行测量,在这之中万用表以其简单易用,功耗低等优点被大多数人所选择使用。然而万用表有一定的局限性,比如:不能够测量电感,而且容量稍大的电容也显得无能为力。所以制作一个简单易用的电抗元器件测量仪是很…

鸿蒙轻内核M核源码分析系列七 动态内存Dynamic Memory

内存管理模块管理系统的内存资源,它是操作系统的核心模块之一,主要包括内存的初始化、分配以及释放。 在系统运行过程中,内存管理模块通过对内存的申请/释放来管理用户和OS对内存的使用,使内存的利用率和使用效率达到最优&#x…

优思学院|质量工程师工资不高怎么办?

你是否曾经好奇,为什么在职场中,质量工程师的工资普遍不高?这一现象背后的原因,实际上与他们的职业门槛和专业知识密切相关。早期,国内的质量工程师入行门槛较低,许多人即使没有任何专业知识也可以进入这一…

2025 QS 世界大学排名公布,北大清华跻身全球前20

一年一度,2025 QS 世界大学排名公布! QS(Quacquarelli Symonds)是唯一一个同时将就业能力与可持续发展纳入评价体系的排名。 继去年 2024 QS 排名因为“墨尔本超耶鲁,新南悉尼高清华”而荣登微博热搜之后&#xff0c…

c# 学习 2

常量 转义字符 类型转换

【Linux】写一个日志类

文章目录 1. 源代码2. 函数功能概览3. 代码详细解释3.1 头文件和宏定义3.2 Log类定义3.3 打印日志的方法3.4 操作符重载和析构函数3.5 可变参数函数的原理 4. 测试用例 1. 源代码 下面代码定义了一个 Log 类,用于记录日志信息。这个类支持将日志信息输出到屏幕、单…

java判断对象是否还在被引用

1、代码取消强引用后&#xff0c;gc回收对象 public static void main(String[] args) {Object obj new Object();WeakReference<Object> weakRef new WeakReference<>(obj);System.out.println(weakRef.get());obj null; // 取消强引用,后续gc会被回收,如果不…

endnote IEEEtran 参考文献 输出Latex

文章目录 参考文献Latex1. 新建格式1.1 新建BibTeX Export样式文件1.2 保存自定义文献格式 2 修改2.1 修改Journal Names 为简写2.2 修改Author Lists2.3 修改 模版 Templates 3. 特殊字符作者名字标题 4. 增加期刊简写4.1 删除已有简写的Term Lists 4.2 下载最新的Term LIsts4…

自动化办公02 用openpyxl库操作excel.xlsx文件(新版本)

目录 一、文件读操作 二、文件写操作 三、修改单元格样式 openpyxl 是一个处理Excel表格的第三方库。openpyxl 库可以处理Excel2010以后的电子表格格式&#xff0c;包括&#xff1a;xlsx/xlsm/xltx/xltm。 openpyxl教程 一、文件读操作 工作簿(workbook): excel文件 工作表…

PID控制算法介绍及使用举例

PID 控制算法是一种常用的反馈控制算法&#xff0c;用于控制系统的稳定性和精度。PID 分别代表比例&#xff08;Proportional&#xff09;、积分&#xff08;Integral&#xff09;和微分&#xff08;Derivative&#xff09;&#xff0c;通过组合这三个部分来调节控制输出&#…

宝塔nginx配置

将跟php有关的注释掉&#xff1a; 添加&#xff1a; #解决vue刷新404问题try_files $uri $uri/ /index.html; location /prod-api/ {proxy_set_header Host $http_host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header REMOTE-HOST $remote_addr;proxy_set_header…

SOCKS 代理 和 HTTP 代理, WebSocket

SOCKS 代理 和 HTTP 代理 的区别 SOCKS 代理 和 HTTP 代理 都是代理服务器&#xff0c;它们充当客户端和目标服务器之间的中介&#xff0c;但它们的工作方式和应用场景有所不同。 1. SOCKS 代理&#xff1a; 工作原理&#xff1a; SOCKS 代理是一种更底层的代理&#xff0c;…

攻防世界—webbaby详解

1.ssrf注入漏洞 ssrf&#xff08;服务端请求伪造&#xff09;是一种安全漏洞&#xff0c;攻击者通过该漏洞向受害服务器发出伪造的请求&#xff0c;从而访问并获取服务器上的资源&#xff0c;常见的ssrf攻击场景包括访问内部网络的服务&#xff0c;执行本地文件系统命令&#…

基于小波变换贝叶斯LMMSE估计的图像降噪方法(MATLAB 2018)

自从小波被发现以来&#xff0c;由于其优良的时频局部化性能&#xff0c;大大解决了信号与图像降噪的难题。利用小波降噪大致有三种方法&#xff0c;分别是基于小波模极大值原理、基于小波变换系数的相关性&#xff0c;和最为常用的小波阈值函数法。 基于小波模极大值降噪 该…

React(五)useEffect、useRef、useImperativeHandle、useLayoutEffect

(一)useEffect useEffect – React 中文文档 useEffect hook用于模拟以前的class组件的生命周期&#xff0c;但比原本的生命周期有着更强大的功能 1.类组件的生命周期 在类组件编程时&#xff0c;网络请求&#xff0c;订阅等操作都是在生命周期中完成 import React, { Com…