【Spark分布式内存计算框架——Spark Core】8. 共享变量

news2024/9/28 7:26:10

第七章 共享变量

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark提供了两种类型的变量:
1)、广播变量Broadcast Variables

  • 广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本;
    2)、累加器Accumulators
  • 累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);

官方文档:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#shared-variables

7.1 广播变量

广播变量允许开发人员在每个节点(Worker or Executor)缓存只读变量,而不是在Task之间传递这些变量。使用广播变量能够高效地在集群每个节点创建大数据集的副本。同时Spark还使用高效的广播算法分发这些变量,从而减少通信的开销。
在这里插入图片描述

可以通过调用sc.broadcast(v)创建一个广播变量,该广播变量的值封装在v变量中,可使用获取
该变量value的方法进行访问。
在这里插入图片描述

7.2 累加器

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,即确提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取Accumulator的值,只有Driver程序可以读取Accumulator的值。创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名。
在这里插入图片描述
Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。

当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。实现自定义累加器的步骤:

  • 第一步、继承AccumulatorV2,实现相关方法;
  • 第二步、创建自定义Accumulator的实例,然后在SparkContext上注册它;

官方提供实例如下:
在这里插入图片描述

7.3 案例演示

以词频统计WordCount程序为例,假设处理的数据如下所示,包括非单词符合,统计数据词频时过滤非单词的符合并且统计总的格式。
在这里插入图片描述
实现功能:
第一、过滤非单词符合

  • 非单词符合存储列表List中
    在这里插入图片描述
  • 使用广播变量广播列表
    在这里插入图片描述

第二、累计统计非单词符号出现次数

  • 定义一个LongAccumulator累加器,进行计数
    在这里插入图片描述
    范例演示完整代码如下
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
/**
* 基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数
* -a. 过滤标点符号数据
* 使用广播变量
* -b. 统计出标点符号数据出现次数
* 使用累加器
*/
object SparkSharedVariableTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// a. 读取文件数据
val datasRDD: RDD[String] = sc.textFile("datas/filter/datas.input", minPartitions = 2)
// TODO: 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中
val list: List[String] = List(",", ".","!","#","$","%")
// TODO: 通过广播变量 将列表list广播到各个Executor内存中,便于多个Task使用
val listBroadcast: Broadcast[List[String]] = sc.broadcast(list)
// TODO: 定义累加器,记录单词为符号数据的个数
val accumulator: LongAccumulator = sc.longAccumulator("number_accum")
// b. 分割单词,过滤数据
val wordsRDD = datasRDD
// 1)、过滤数据,去除空行数据
.filter(line => null != line && line.trim.length > 0)
// 2)、分割单词
.flatMap(line => line.trim.split("\\s+"))
// 3)、过滤字典数据:符号数据
.filter{word =>
// 获取符合列表 TODO: 从广播变量中获取列表list的值
val listValue = listBroadcast.value
// 判断单词是否为符号数据,如果是就过滤掉
val isFlag = listValue.contains(word)
if(isFlag){
// TODO: 如果单词为符号数据,累加器加1
accumulator.add(1L)
}
!isFlag
}
val resultRDD: RDD[(String, Int)] = wordsRDD
// 转换为二元组
.mapPartitions{iter => iter.map(word => (word, 1))}
// 按照单词聚合统计
.reduceByKey((tmp, item) => tmp + item)
resultRDD.foreach(println)
println(s"过滤符合数据的个数:${accumulator.value}")
// 应用程序运行结束,关闭资源
sc.stop()
}
}

运行应用,查看WEB UI监控,定义累加器的值:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/339670.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

T35,没有token是什么意思?

描述 输入一个升序数组 array 和一个数字S,在数组中查找两个数,使得他们的和正好是S,如果有多对数字的和等于S,返回任意一组即可,如果无法找出这样的数字,返回一个空数组即可。 数据范围: 0≤len(array)≤…

常规网页布局

单列布局1 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>单列布局1-头主尾等宽</title><style>.container {max-width: 960px; /*设置最大宽度为固定值*/margin: 0 auto; /*设置内部子…

Delphi 中TImageCollection和TVirtualImageList 控件实现high-DPI

一、概述RAD Studio允许你通过使用TImageCollection组件和TVirtualImageList组件&#xff0c;在你的Windows VCL应用程序中包含缩放、高DPI、多分辨率的图像。这两个组件位于Windows 10面板中&#xff1a;注意&#xff1a;如果你使用FireMonkey进行跨平台应用&#xff0c;请看T…

用VSCode在共用服务器上使用连接自己的Docker容器进行开发

问题描述 我们实验室有一台很牛的Linux服务器&#xff0c;核多卡多硬盘大&#xff0c;它是大家共用的&#xff0c;组里给我们每个人都创建了一个普通用户&#xff0c;没有sudo权限&#xff0c;所以不能用apt。 但是每个人对开发环境的需求都是不一样的&#xff0c;比如我要用…

年前无情被裁,面试大厂的这几个月…

2月份了&#xff0c;金三银四也即将来临&#xff0c;在这个招聘季&#xff0c;大厂也开始招人&#xff0c;但还是有很多人吐槽说投了很多简历&#xff0c;却迟迟没有回复… 另一面企业招人真的变得容易了吗&#xff1f;有企业HR吐槽&#xff0c;简历确实比以前多了好几倍&…

【手写 Vuex 源码】第五篇 - Vuex 中 Mutations 和 Actions 的实现

一&#xff0c;前言 上一篇&#xff0c;主要介绍了 Vuex 中 getters 的实现&#xff0c;主要涉及以下几个点&#xff1a; 将选项中的 getters 方法&#xff0c;保存到 store 实例中的 getters 对象中&#xff1b;借助 Vue 原生 computed&#xff0c;实现 Vuex 中 getters 的数…

有趣的Hack-A-Sat黑掉卫星挑战赛——跟踪卫星

国家太空安全是国家安全在空间领域的表现。随着太空技术在政治、经济、军事、文化等各个领域的应用不断增加&#xff0c;太空已经成为国家赖以生存与发展的命脉之一&#xff0c;凝聚着巨大的国家利益&#xff0c;太空安全的重要性日益凸显[1]。而在信息化时代&#xff0c;太空安…

TMC2660驱动及调试记录

TMC2660 一款优秀的电机驱动芯片&#xff0c;驱动简单。 理论就看这篇&#xff1a;TMC260/TMC2660/TMC262步进电机驱动 或者直接看手册&#xff0c;手册也不复杂。 使用SPI通信&#xff0c;通过SPI配置参数。支持直接使用SPI和Step/Dir方式控制两种控制步进电机的方式。 TMC…

【C语言】数据结构-二叉树

主页&#xff1a;114514的代码大冒险 qq:2188956112&#xff08;欢迎小伙伴呀hi✿(。◕ᴗ◕。)✿ &#xff09; Gitee&#xff1a;庄嘉豪 (zhuang-jiahaoxxx) - Gitee.com 引入 我们之前已经学过线性数据结构&#xff0c;今天我们将介绍非线性数据结构----树 树是一种非线性的…

面试官问我:说说你对Spring MVC的理解

文章目录什么是MVC模式MVC的原理图2.1 分析工作原理3.1 组件说明&#xff1a;3.1.1 组件&#xff1a;核心架构的具体流程步骤什么是MVC模式 MVC&#xff1a;MVC是一种设计模式 MVC的原理图 2.1 分析 M-Model 模型&#xff08;完成业务逻辑&#xff1a;有javaBean构成&#x…

C++11的异步操作让多线程开发变得简单

C11的异步操作简介一、std::future1.1、future的类型1.2、future的使用1.3、使用示例二、std::packaged_task三、std::promise总结简介 C提供如下的异步操作接口&#xff1a; std::future &#xff1a;异步指向某个任务&#xff0c;然后通过future特性去获取任务函数的返回结…

1. RNN神经网络初探

目录1. 神经网络与未来智能2. 回顾数据维度和神经网络3. 文本转变为词向量1. 神经网络与未来智能 2. 回顾数据维度和神经网络 循环神经网络&#xff0c;主要用来处理时序的数据&#xff0c;它对每个词的顺序是有要求的。 循环神经网络如何保存记忆功能&#xff1f; 当前样本只…

Window10开放某个端口

需求&#xff1a;由于防火墙原因&#xff0c;开放某个端口:如9999 在开始那里搜索防火墙-进入防火墙 第一步&#xff1a;核实是否启动了防火墙&#xff0c;之后进行 第二步&#xff1a;点击“高级设置”&#xff0c;→“入站规则”→“新建规则”→“端口”→ “下一步” …

【前端领域】3D旋转超美相册(HTML+CSS)

世界上总有一半人不理解另一半人的快乐。 ——《爱玛》 目录 一、前言 二、本期作品介绍 3D旋转相册 三、效果展示 四、详细介绍 五、编码实现 index.html style.css img 六、获取源码 公众号获取源码 获取源码&#xff1f;私信&#xff1f;关注&#xff1f;点赞&…

基于微信小程序的游戏账号交易小程序

文末联系获取源码 开发语言&#xff1a;Java 框架&#xff1a;ssm JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏览器…

【C++初阶】十二、STL---反向迭代器的实现

目录 一、反向迭代器 二、反向迭代器的实现 一、反向迭代器 之前的模拟实现vector、list 的时候&#xff0c;这些都是实现了正向迭代器&#xff0c;反向迭代器都没有实现&#xff0c;这里就要实现反向迭代器 反向迭代器也是适配器&#xff08;配接器&#xff09;的一种&#…

在阿里干了2年的测试,总结出来的划水经验

测试新人 我的职业生涯开始和大多数测试人一样&#xff0c;开始接触都是纯功能界面测试。那时候在一家电商公司做测试&#xff0c;做了一段时间&#xff0c;熟悉产品的业务流程以及熟练测试工作流程规范之后&#xff0c;效率提高了&#xff0c;工作比较轻松&#xff0c;这样我…

代码随想录第55天(动态规划):● 309.最佳买卖股票时机含冷冻期 ● 714.买卖股票的最佳时机含手续费

一、最佳买卖股票时机含冷冻期 题目描述: 思路和想法&#xff1a; 这道题相较于之前的题目&#xff0c;注重对状态的分析&#xff0c;这里分为四个状态。 &#xff08;1&#xff09;状态一&#xff0c;买入状态 dp[i][0] 操作一&#xff1a;前一天就是持有状态&#xff08;状…

day39【代码随想录】动态规划之

文章目录前言一、不同路径&#xff08;力扣62&#xff09;二、不同路径||&#xff08;力扣63&#xff09;三、最小路径和&#xff08;力扣64&#xff09;前言 1、不同路径 2、不同路径|| 3、最小路径和 一、不同路径&#xff08;力扣62&#xff09; 一个机器人位于一个 m x n…

MyBatis-Plus分页插件和MyBatisX插件

MyBatis-Plus分页插件和MyBatisX插件六、插件1、分页插件a>添加配置类b>测试八、代码生成器1、引入依赖2、快速生成十、MyBatisX插件1、新建spring boot工程a>引入依赖b>配置application.ymlc>连接MySQL数据库d>MybatisX逆向生成2、MyBatisX快速生成CRUD申明…