Spark---累加器

news2024/9/24 21:18:00

1.累加器实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
    var sum=0
    dataRdd.foreach(num=>sum+=num)

    println(sum)
    context.stop()

运行结果:
在这里插入图片描述
我们预期是想要实现数据的累加,开始数据从Driver被传输到了Executor中进行计算,但是每个分区在累加数据完成之后并没有将计算结果返回到Driver端,所以导致最后的结果与预期的不一致。
在这里插入图片描述
对上述代码使用累加器

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
    val sum = context.longAccumulator("sum")
    dataRdd.foreach(num=>{
      //使用累加器
      sum.add(num)
    })

    //获取累加器的值
    println(sum.value)

运行结果:
在这里插入图片描述
由此可见,在使用了累加器之后,每个Executor在开始都会获得这个累加器变量,每个Executor在执行完成后,累加器会将每个Executor中累加器变量的值聚合到Driver端。
在这里插入图片描述

Spark提供了多种类型的累加器,以下是其中的一些:
在这里插入图片描述

2.自定义累加器

用户可以通过继承AccumulatorV2来自定义累加器。需求:自定义累加器实现WordCount案例。

AccumulatorV2[IN,OUT]中:
IN:输入数据的类型
OUT:输出数据类型

在这里插入图片描述
WordCount案例实现完整代码:

package bigdata.wordcount.leijiaqi

import bigdata.wordcount.leijiaqi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
 * 使用累加器完成WordCount案例
 */
object Spark_addDemo {
  def main(args: Array[String]): Unit = {
    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val dataRDD: RDD[String] = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")

    //创建累加器对象
    val wordCountAccumulator = new WordCountAccumulator
    //向Spark中进行注册
    context.register(wordCountAccumulator,"wordCountAccumulator")

    //实现累加
    dataRDD.foreach(word => {
      wordCountAccumulator.add(word)
    })
    //获取累加结果,打印在控制台上
    println(wordCountAccumulator.value)

    //关闭链接
    context.stop()
  }

}

class WordCountAccumulator extends  AccumulatorV2[String,mutable.Map[String,Long]]
{

  //定义一个map用于存储累加后的结果
  var map: mutable.Map[String, Long] =mutable.Map[String,Long]()

  //累加器是否为初始状态
  override def isZero: Boolean = {
    map.isEmpty
  }

  //复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new WordCountAccumulator()
  }

  //重置累加器
  override def reset(): Unit = {
    map.clear()
  }

  //向累加器添加数据IN
  override def add(word: String): Unit = {
    // 查询 map 中是否存在相同的单词
    // 如果有相同的单词,那么单词的数量加 1
    // 如果没有相同的单词,那么在 map 中增加这个单词
    val newValue = map.getOrElse(word, 0L) + 1
    map.update(word,newValue)
  }

  //合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    var map1=this.map
    var map2=other.value

    //合并两个map
    map2.foreach({
      case (word,count)=>{
        val newValue = map1.getOrElse(word,0L)+count
        map1.update(word,newValue)
      }
    })
  }

  //返回累加器的结果(OUT)
  override def value: mutable.Map[String, Long] = this.map
}
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1380439.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MATLAB】逐次变分模态分解SVMD信号分解算法

有意向获取代码,请转文末观看代码获取方式~ 1 基本定义 逐次变分模态分解(Sequential Variational Mode Decomposition,简称SVMD)是一种用于信号处理和数据分析的方法。它可以将复杂的信号分解为一系列模态函数,每个…

状态图作业

状态图作业 一. 简答题(共7题,100分) (简答题) 什么是状态,对象的状态和对象的属性有什么区别? 正确答案: 状态是指在对象生命周期中满足某些条件、 执行某些活动或等待某些事件的一个条件和状 况。属性表…

函数的秘密

1. 函数的概念 在数学中我们学习过函数,而在C语言中其有着与数学不同的概念: 在C语言中,函数是指一组执行特定任务的语句,这些语句可以重复使用,并且可以在程序的不同部分调用。通过使用函数,程序员可以将…

sectigo dv证书适合场景买一年送一月

Sectigo是成立于美国的知名CA认证机构,随着互联网的发展,Sectigo颁发了越来越多的SSL数字证书。这些SSL证书产品不仅可以对网站传输数据进行加密服务,还可以对服务器身份进行认证服务。Sectigo旗下的DV证书产品比较齐全,适用场景也…

腾讯云TDSQL TCA/TCP/TCE 认证考试有什么区别呢?

腾讯云认证等级:专项认证考试&云方向认证考试 一、专项认证考试 数据库交付运维-腾讯云TDSQL认证考试一共分为三个等级: 初级TCA、高级工程师TCP、专家级TCE 1、TDSQL TCA培训(MySQL版/PostgreSQL版)考试安排 TCA考试是纯理论题,总分是…

全新加密叙事,以Solmash为代表的 LaunchPad 平台如何为用户赋能?

铭文市场的火爆带来“Fair Launch”这种全新的代币启动方式,Fair Launch 的特点在于其为所有人参与 Launch 带来了公平的机会,所有链上玩家们都需要通过先到先得的方式 Mint 资产,VC 在 Fair Launch 中几乎没有话语权,不同的投资者…

Java医院管理系统HIS源码带小程序和安装教程

Java医院管理系统HIS源码带小程序和安装教程该项目是用springbootlayuishiro写的医院管理系统,该系统的业务比较复杂,数据库一共有36张表。项目的视频业务参考文档,都在百度云盘中,可以先看看视频和参考文档。 项目分为门诊管理、…

计算机毕设项目(二)基于django+vue+sqlite实现自适应学习系统,在线考试系统

文章目录 自适应学习系统功能介绍分权分域用户管理考试与练习管理练习记录管理学习内容管理其他功能管理界面部分源码展示完整代码 自适应学习系统功能介绍 这个系统是一个基于Django框架的Python在线考试和学习平台。vuedjango在线学习系统,在线考试系统。数据库使…

HTML--图片

HTML中使用 img标签来显示一张图片 它有三个属性&#xff1a; src alt tiltle src属性&#xff1a; 图片路径建议为相对路径&#xff0c;以免文件移动造成无法正常工作 用法&#xff1a; <img src"图片路径和名字"><!DOCTYPE html> <html> <…

PHP代码审计基础知识

前言 本文章主要是PHP代码审计的一些基础知识&#xff0c;包括函数的用法&#xff0c;漏洞点&#xff0c;偏向基础部分&#xff0c;个人能力有限&#xff0c;部分可能会出现错误或者遗漏&#xff0c;读者可自行补充。 代码执行 代码执行是代码审计当中较为严重的漏洞&…

计算机毕业设计——SpringBoot仓库管理系统(附源码)

1&#xff0c;绪论 1.2&#xff0c;项目背景 随着电子计算机技术和信息网络技术的发明和应用&#xff0c;使着人类社会从工业经济时代向知识经济时代发展。在这个知识经济时代里&#xff0c;仓库管理系统将会成为企业生产以及运作不可缺少的管理工具。这个仓库管理系统是由&a…

【Huggingface】如何访问Huggingface,Huggingface镜像

镜像站&#xff1a; https://hf-mirror.com/本站域名 hf-mirror.com&#xff0c;用于镜像 huggingface.co 域名。 更多用法&#xff08;多线程加速等&#xff09;详见这篇文章。简介&#xff1a; 方法一&#xff1a;使用huggingface 官方提供的 huggingface-cli 命令行工具。…

Centos7.9服务器编译安装Nginx1.24.0和php8.3

Centos7.9服务器编译安装Nginx1.24.0和php8.3 服务器nginx原版本有安全漏洞,需要升级,由于原始是yum源安装,通过yum直接升级,无法正常升级完成,故而需要卸载yum源,重新编译安装。 1、查看原来nginx版本,ps查看原来nginx进程,运行状态: ps aux | grep nginx ​ root …

深入浅出的说地弹(即地噪声)

1. 什么是地弹&#xff0c;地弹的概念&#xff0c;为何叫地弹 地弹、振铃、串扰、信号反射这几个在信号完整性分析总是分析的重点对象。初学者一看&#xff1a;好高深&#xff01;其实&#xff0c;感觉高深是因为你满天听到“地弹”二字&#xff0c;却到处找不到“地弹…

DNS 域名解析 后续(二)-----主从复制、分离解析

&#xff08;软件名 bind , 服务名 named&#xff09; bind主包 yum install bind bind-utils -y 主软件 和 配置包管理软件&#xff08;工具包&#xff09; rpm -q bind #检查是否安装dns服务 yum install bind bind-utils -y #安装dns服务,安装bind软件包 &#xff0…

【Linux笔记】自定义一个简单的shell

一、命令行解释器shell的原理 我们已经知道Linux给我们提供了一系列由exec开头的系统调用接口&#xff0c;可以让我们在自己所写的程序中调用各种指令或者我们自己写的其他程序&#xff1a; 而我们的shell命令行解释器也是接收用户输入的指令&#xff0c;然后执行&#xff1a;…

Error: Failed to download template from registry: fetch failed

第一次构建Nuxt项目时&#xff0c;出现在这样的错误&#xff01;&#xff01;&#xff01; 如果你也是这样得错误&#xff0c;修改hosts也没用。我试了 是因为你的npm安装了其他镜像源&#xff0c; 这个时候你就需要手动下载了&#xff1a; web端访问&#xff1a; https://ra…

[DL]深度学习_Feature Pyramid Network

FPN结构详解 一、概念介绍 Feature Pyramid Network (FPN)是一种用于目标检测和语义分割的神经网络架构。它的目标是解决在处理不同尺度的图像时&#xff0c;信息丢失和语义细节模糊的问题。 FPN的核心思想是通过在网络中添加一组横向连接来构建多尺度特征金字塔。这些横向连接…

01循环算法

1.求小数点的某一位&#xff0c;且超出float和double的精度问题 【题目描述】 分数a/b化为小数后&#xff0c;小数点后第n位的数字是多少&#xff1f; 【输入】 三个正整数a&#xff0c;b&#xff0c;n&#xff0c;相邻两个数之间用单个空格隔开。0<a<b<100&#…

RK3568驱动指南|第十二篇 GPIO子系统-第134章 三级节点操作函数实验

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…