Spark-06:共享变量

news2024/9/22 13:41:59

目录

1.广播变量(broadcast variables)

2.累加器(accumulators)


      在分布式计算中,当在集群的多个节点上并行运行函数时,默认情况下,每个任务都会获得函数中使用到的变量的一个副本。如果变量很大,这会导致网络传输占用大量带宽,并且在每个节点上都占用大量内存空间。为了解决这个问题,Spark引入了共享变量的概念。

        共享变量允许在多个任务之间共享数据,而不是为每个任务分别复制一份变量。这样可以显著降低网络传输的开销和内存占用。Spark提供了两种类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

1.广播变量(broadcast variables)

        通常情况下,Spark程序运行时,通常会将数据以副本的形式分发到每个执行器(Executor)的任务(Task)中,但当变量较大时,这会导致大量的内存和网络开销。通过使用广播变量,Spark将变量只发送一次到每个节点,并在多个任务之间共享这个副本,从而显著降低了内存占用和网络传输的开销。

Scala 实现:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Java 实现:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

2.累加器(accumulators)

        累加器是Spark中的一种特殊类型的共享变量,主要用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。累加器支持的数据类型仅限于数值类型,包括整数和浮点数等。

Scala 实现:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

Java 实现:

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

        内置累加器功能有限,但可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个方法必须重写:reset用于将累加器重置为零,add用于向累加器中添加另一个值,merge用于将另一个相同类型的累加器合并到此累加器。

自定义累加器Scala实现:

package com.yichenkeji.demo.sparkscala

import org.apache.spark.util.AccumulatorV2


class CustomAccumulator extends AccumulatorV2[Int, Int]{
  //初始化累加器的值
  private var sum = 0
  override def isZero: Boolean = sum == 0

  override def copy(): AccumulatorV2[Int, Int] = {
    val newAcc = new CustomAccumulator()
    newAcc.sum = sum
    newAcc
  }

  override def reset(): Unit = sum = 0

  override def add(v: Int): Unit = sum += v

  override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value

  override def value: Int = sum
}

自定义累加器Java实现:

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.util.AccumulatorV2;

public class CustomAccumulator extends AccumulatorV2<Integer, Integer> {
    // 初始化累加器的值
    private Integer sum = 0;
    @Override
    public boolean isZero() {
        return sum == 0;
    }

    @Override
    public AccumulatorV2<Integer, Integer> copy() {
        CustomAccumulator customAccumulator = new CustomAccumulator();
        customAccumulator.sum = this.sum;
        return customAccumulator;
    }

    @Override
    public void reset() {
        this.sum = 0;
    }

    @Override
    public void add(Integer v) {
        this.sum += v;
    }

    @Override
    public void merge(AccumulatorV2<Integer, Integer> other) {
        this.sum += ((CustomAccumulator) other).sum;
    }

    @Override
    public Integer value() {
        return sum;
    }
}

自定义累加器的使用:

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.Arrays;
import java.util.List;

public class AccumulatorTest {
    public static void main(String[] args) {
        //1.初始化SparkContext对象
        SparkConf sparkConf = new SparkConf().setAppName("Spark Java").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        CustomAccumulator customAccumulator = new CustomAccumulator();
        //注册自定义累加器才能使用
        sc.sc().register(customAccumulator);
        sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).foreach(x -> customAccumulator.add(x));
        System.out.println(customAccumulator.value());
        //5.停止SparkContext
        sc.stop();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1244955.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深信服防火墙路由模式开局部署-手把手教学(小白篇)

PS&#xff1a;深信服的设备只有400能够通过console连接&#xff0c;一般用户是无法连接的&#xff0c;所以大家不要妄想着从Console连接设备了&#xff0c;开局就通过MANAGE进入Web就可以 接通电源后&#xff0c;开机拿一根网线&#xff0c;一端连接防火墙的MANAGE口&#xf…

中国毫米波雷达产业分析2——毫米波雷达产业链分析

一、产业链构成 毫米波雷达产业链分为三部分&#xff1a;上游主要包括射频前端组件&#xff08;MMIC&#xff09;、数字信号处理器&#xff08;DSP/FPGA&#xff09;、高频PCB板、微控制器&#xff08;MCU&#xff09;、天线及控制电路等硬件供应商&#xff1b;中游主体是毫米波…

Electron+VUE3开发简版的编辑器【文件预览】

简版编辑器的功能主要是: 打开对话框,选择文件后台读取文件文件前端展示文件内容。主要技术栈是VUE3、Electron和Nodejs,VUE3做页面交互,Electron提供一个可执行Nodejs的环境以及支撑整个应用的环境,nodeJS负责读取文件内容。 环境配置、安装依赖这些步骤就不再叙述了。 …

【Flink】状态管理

目录 1、状态概述 1.1 无状态算子 1.2 有状态算子 2、状态分类 ​编辑 2.1 算子状态 2.1.1 列表状态&#xff08;ListState&#xff09; 2.1.2 联合列表状态&#xff08;UnionListState&#xff09; 2.1.3 广播状态&#xff08;BroadcastState&#xff09; 2.2 按键分…

案例012:Java+SSM+uniapp基于微信小程序的科创微应用平台设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

ElementUI table+dialog实现一个简单的可编辑的表格

table组件如何实现可编辑呢&#xff1f; 我的需求是把table组件那样的表格&#xff0c;实现它点击可以弹出一个框&#xff0c;然后在这个框里面输入你的东西&#xff0c;然后将他回显回去&#xff0c;当然&#xff0c;输入的有可能是时间啥的。 为什么要弹出弹层不在框上直接…

文件名称管理文件:抓关键字归类文件,让文件管理变得简单明了

在当今数字时代&#xff0c;每天都要处理大量的文件&#xff0c;无论是文本、图片、视频还是其他类型的文件。如何有效地管理这些文件&#xff0c;能够迅速找到所需的信息&#xff0c;已经成为了一个重要的问题。文件名称是文件内容的第一反映&#xff0c;也是识别和检索文件的…

在arm 64 环境下使用halcon算法

背景&#xff1a; halcon&#xff0c;机器视觉领域神一样得存在&#xff0c;在windows上&#xff0c;应用得特别多&#xff0c; 但是arm环境下使用得很少。那如何在arm下使用halcon呢。按照官方说明&#xff0c;arm下只提供了运行时环境&#xff0c;并且需要使用价值一万多人民…

element中el-switch的v-model自定义值

一、问题 element中的el-switch的值默认都是true或false&#xff0c;但是有些时候后端接口该字段可能是0或者1&#xff0c;如果说再转换一次值&#xff0c;那就有点太费力了。如下所示&#xff1a; <template><el-switchinactive-text"否"active-text&quo…

【miniQMT实盘量化5】获取财务报表数据

前言 上面文章&#xff0c;我们介绍了如何获取实时数据&#xff0c;这篇文章&#xff0c;我们继续往下探讨&#xff0c;介绍关于财务报表数据的获取。 财务报表数据 财务报表数据&#xff0c;也就是常说的基本面数据&#xff0c;是除了行情数据之外&#xff0c;辅助我们投资…

PMP 考试的含金量怎么样?

这里可以三个思考题和三个价值点帮你认识PMP考试。 三个思维题 1.工作环境 PMP证书含金量的一个很大因素&#xff0c;就是考证的人是否对PMP证书有比较强的实际需求。相反&#xff0c;如果只是听别人说&#xff0c;PMP证书很好&#xff0c;不管工作中是否有需要&#xff0c;…

〖大前端 - 基础入门三大核心之JS篇㊷〗- DOM事件对象及它的属性

说明&#xff1a;该文属于 大前端全栈架构白宝书专栏&#xff0c;目前阶段免费&#xff0c;如需要项目实战或者是体系化资源&#xff0c;文末名片加V&#xff01;作者&#xff1a;不渴望力量的哈士奇(哈哥)&#xff0c;十余年工作经验, 从事过全栈研发、产品经理等工作&#xf…

黑马点评12-实现好友关注/取关功能,查看好友共同关注列表

好友关注 数据模型 数据库中的tb_follow记录博主与粉丝的关系 tb_follow表对应的实体类 Data EqualsAndHashCode(callSuper false) Accessors(chain true) TableName("tb_follow") public class Follow implements Serializable {private static final long ser…

【RtpRtcp】1: webrtc m79:audio的ChannelReceive 创建并使用

m79中,RtpRtcp::Create 的调用很少 不知道谁负责创建ChannelReceiveclass ChannelReceive : public ChannelReceiveInterface,public MediaTransportAudioSinkInterface {接收编码后的音频帧:接收rtcp包:

nodejs微信小程序+python+PHP -留学信息查询系统的设计与实现-安卓-计算机毕业设计

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

如何提高希音、亚马逊、国际站店铺流量转化,自养号优势及测评底层环境逻辑

随着全球贸易数字化程度加快&#xff0c;尤其是跨境电商的发展日新月异&#xff0c;在外贸出口占比越来越高&#xff0c;在这其中&#xff0c;亚马逊作为全球实力强劲的在线零售平台之一&#xff0c;吸引了大量的优秀卖家。 而这也加剧了亚马逊平台的竞争程度&#xff0c;尤其…

Java核心知识点整理大全9-笔记

目录 null文章浏览阅读9w次&#xff0c;点赞7次&#xff0c;收藏7次。Java核心知识点整理大全https://blog.csdn.net/lzy302810/article/details/132202699?spm1001.2014.3001.5501 Java核心知识点整理大全2-笔记_希斯奎的博客-CSDN博客 Java核心知识点整理大全3-笔记_希斯…

二十四、RestClient操作文档

目录 一、新增文档 1、编写测试代码 二、查询文档 1、编写测试代码 三、删除文档 1、编写测试代码 四、修改文档 1、编写测试代码 五、批量导入文档 批量查询 一、新增文档 1、编写测试代码 SpringBootTest public class HotelDocumentTest {private RestHighLevelC…

PCIE链路训练-状态机描述2

Configuration.Lanenum.Accept 如果use_modified_TS1_TS2_Ordered_Set为1&#xff0c;需要注意&#xff1a; &#xff08;1&#xff09;tx需要发送Modified TS1而不是正常的TS1&#xff1b; &#xff08;2&#xff09;rx端必须检查是否收到Modified TS1&#xff08;注意一开…

node版本管理工具-nvm

1、 下载地址 https://github.com/coreybutler/nvm-windows/releases/tag/1.1.11 2、 选择安装地址不能有空格&#xff0c;中文 3、 使用命令