尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】

news2025/1/4 20:40:28

视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili

  1. 尚硅谷大数据技术Spark教程-笔记01【SparkCore(概述、快速上手、运行环境、运行架构)】
  2. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,RDD-核心属性-执行原理-基础编程-并行度与分区-转换算子)】
  3. 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,RDD-转换算子-案例实操)】
  4. 尚硅谷大数据技术Spark教程-笔记04【SparkCore(核心编程,RDD-行动算子-序列化-依赖关系-持久化-分区器-文件读取与保存)】
  5. 尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】
  6. 尚硅谷大数据技术Spark教程-笔记06【SparkCore(案例实操,电商网站)】

目录

01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P105【105.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 原理及简单演示】15:49

P106【106.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 问题】03:39

P107【107.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现】10:55

P108【108.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现 - 1】07:14

P109【109.尚硅谷_SparkCore - 核心编程 - 数据结构 - 广播变量】17:16


01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P105【105.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 原理及简单演示】15:49

5.2 累加器

5.2.1 实现原理

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

 ​​​​​​​

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_Acc {

  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    //reduce:分区内计算,分区间计算
    //val i: Int = rdd.reduce(_+_)
    //println(i)
    var sum = 0
    rdd.foreach(
      num => {
        sum += num
      }
    )
    println("sum = " + sum) // sum = 0

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_Acc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // 获取系统累加器
    // Spark默认就提供了简单数据聚合的累加器
    val sumAcc = sc.longAccumulator("sum")

    //sc.doubleAccumulator
    //sc.collectionAccumulator

    rdd.foreach(
      num => {
        // 使用累加器
        sumAcc.add(num)
      }
    )

    // 获取累加器的值
    println(sumAcc.value) // 10

    sc.stop()
  }
}

P106【106.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 问题】03:39

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_Acc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // 获取系统累加器
    // Spark默认就提供了简单数据聚合的累加器
    val sumAcc = sc.longAccumulator("sum")

    //sc.doubleAccumulator
    //sc.collectionAccumulator

    val mapRDD = rdd.map(
      num => {
        // 使用累加器
        sumAcc.add(num)
        num
      }
    )

    // 获取累加器的值
    // 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    // 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    // 一般情况下,累加器会放置在行动算子进行操作
    mapRDD.collect()
    mapRDD.collect()
    println(sumAcc.value) // 20

    sc.stop()
  }
}

P107【107.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现】10:55

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark04_Acc_WordCount {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List("hello", "spark", "hello"))

    // 累加器 : WordCount
    // 创建累加器对象
    val wcAcc = new MyAccumulator()
    // 向Spark进行注册
    sc.register(wcAcc, "wordCountAcc")

    rdd.foreach(
      word => {
        // 数据的累加(使用累加器)
        wcAcc.add(word)
      }
    )

    // 获取累加器累加的结果
    println(wcAcc.value)

    sc.stop()
  }

  /*
    自定义数据累加器:WordCount

    1. 继承AccumulatorV2, 定义泛型
       IN : 累加器输入的数据类型 String
       OUT : 累加器返回的数据类型 mutable.Map[String, Long]

    2. 重写方法(6)
   */
  class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
  }
}

P108【108.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现 - 1】07:14

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark04_Acc_WordCount {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List("hello", "spark", "hello"))

    // 累加器 : WordCount
    // 创建累加器对象
    val wcAcc = new MyAccumulator()
    // 向Spark进行注册
    sc.register(wcAcc, "wordCountAcc")

    rdd.foreach(
      word => {
        // 数据的累加(使用累加器)
        wcAcc.add(word)
      }
    )

    // 获取累加器累加的结果
    println(wcAcc.value)

    sc.stop()
  }

  /*
    自定义数据累加器:WordCount

    1. 继承AccumulatorV2, 定义泛型
       IN : 累加器输入的数据类型 String
       OUT : 累加器返回的数据类型 mutable.Map[String, Long]

    2. 重写方法(6)
   */
  class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    private var wcMap = mutable.Map[String, Long]()

    // 判断是否初始状态
    override def isZero: Boolean = {
      wcMap.isEmpty
    }

    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
      new MyAccumulator()
    }

    override def reset(): Unit = {
      wcMap.clear()
    }

    // 获取累加器需要计算的值
    override def add(word: String): Unit = {
      val newCnt = wcMap.getOrElse(word, 0L) + 1
      wcMap.update(word, newCnt)
    }

    // Driver合并多个累加器
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {

      val map1 = this.wcMap
      val map2 = other.value

      map2.foreach {
        case (word, count) => {
          val newCount = map1.getOrElse(word, 0L) + count
          map1.update(word, newCount)
        }
      }
    }

    // 累加器结果
    override def value: mutable.Map[String, Long] = {
      wcMap
    }
  }
}

P109【109.尚硅谷_SparkCore - 核心编程 - 数据结构 - 广播变量】17:16

5.3 广播变量

5.3.1 实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark05_Bc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ))
    //        val rdd2 = sc.makeRDD(List(
    //            ("a", 4),("b", 5),("c", 6)
    //        ))
    val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))

    // join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
    //val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    //joinRDD.collect().foreach(println)
    // (a, 1),    (b, 2),    (c, 3)
    // (a, (1,4)),(b, (2,5)),(c, (3,6))
    rdd1.map {
      case (w, c) => {
        val l: Int = map.getOrElse(w, 0)
        (w, (c, l))
      }
    }.collect().foreach(println)
    //(a,(1,4))
    //(b,(2,5))
    //(c,(3,6))

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark06_Bc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ))
    val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))

    // 封装广播变量
    val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)

    rdd1.map {
      case (w, c) => {
        // 访问广播变量
        val l: Int = bc.value.getOrElse(w, 0)
        (w, (c, l))
      }
    }.collect().foreach(println)
    //(a,(1,4))
    //(b,(2,5))
    //(c,(3,6))

    sc.stop()
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/463782.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

系统日志管理审核

系统日志管理 系统日志记录协议 (syslog) 旨在标准化网络设备用于与日志服务器通信的消息格式。网络上的路由器、交换机、防火墙和 Unix/Linux 服务器等许多设备都支持它,从而更轻松地管理这些设备生成的日志。系统日志监控和管理对于每个组…

基于GPT-4的 IDEA 神仙插件,无需魔法,亲测好用!

近日,Intellij IDEA的插件商店,悄然上线了一个新的插件——Bito,据说可以基于GPT-4和ChatGPT来写代码。短短几天,已经有50多K的下载量了。 我帮大家试用了一下,亲测好用! 根据插件介绍显示,Bito…

《面向基于人工智能的学习健康系统,使用心电图进行人群水平的死亡率预测》阅读笔记

目录 一、摘要 二、十个问题 Q1论文试图解决什么问题? Q2这是否是一个新的问题? Q3这篇文章要验证一个什么科学假设? Q4有哪些相关研究?如何归类?谁是这一课题在领域内值得关注的研究员? Q5论文中提到…

Zynq-7000、国产zynq-7000的GPIO控制(三)

本文主要对在Linux下使用zynq-7000或者FMQL45T900控制MIO/EMIO 首先内核配置项 如下,这个不用太多关注,一般都是默认打开的 CONFIG_GPIO_SYSFSy CONFIG_SYSVIPCy CONFIG_GPIO_ZYNQy两者的控制都是流程都是一样的,在细节上又区别 首先都在…

第一章 安装Unity

使用Unity开发游戏的话,首先要安装Unity Hub和Unity Editor两个软件。大家可以去官方地址下载:https://unity.cn/releases/full/2020 (这里我们选择的是2020版本) Unity Hub 是安装 Unity Editor、创建项目、管理帐户和许可证的主…

mall-swarm微服务商城系统

mall-swarm是一套微服务商城系统,采用了 Spring Cloud 2021 & Alibaba、Spring Boot 2.7、Oauth2、MyBatis、Docker、Elasticsearch、Kubernetes等核心技术,同时提供了基于Vue的管理后台方便快速搭建系统。mall-swarm在电商业务的基础集成了注册中心…

陆游和辛弃疾都是南宋主战爱国的大才子,而且生活在同一个时代,有没有交集?

辛弃疾和陆游,都是宋朝著名的爱国诗人。但是这两位都没怎么做过正儿八经的大官,按照现代人的说法,他们总是在基层打嘴炮,对于朝廷的决策,他们是无能为力的。 这两位大诗人可以说生活在同一个年代,他们究竟…

【数据结构】算法的时间复杂度和空间复杂度详解

文章目录 一、算法的效率1.1 如何衡量一个算法的好坏1.2 算法的复杂度的概念 二、大O的渐进表示法三、时间复杂度2.1 时间复杂度的概念2.2常见时间复杂度计算举例 四、空间复杂度2.1 空间复杂度的概念2.2常见空间复杂度计算举例五、解决问题的思路LeetCode-exercise 总结 一、算…

Html5惯性小鸟游戏制作与分享(经典游戏)

当年电子词典中的经典游戏,后来出了无数变种的玩法。这里还原了最初的玩法与操作。实现了这一款有点难度“的怀旧经典游戏。 玩法也很简单,不用碰到任何东西、持续下去。。。 可以进行试玩,手机玩起来效果会更好些。 点击试玩 还有很多变种…

Python小姿势 - 知识点:

知识点: Python的字符串格式化 标题: Python字符串格式化实例解析 顺便介绍一下我的另一篇专栏, 《100天精通Python - 快速入门到黑科技》专栏,是由 CSDN 内容合伙人丨全站排名 Top 4 的硬核博主 不吃西红柿 倾力打造。 基础知识…

ASO优化之竞品研究与分析

我们的应用要有绝对优势和不同的营销策略才能在应用商城里脱颖而出,来获取用户的注意力。我们可以通过对自己家应用与竞争对手在相似的功能和后期用户评价方面,提取出有助于提升应用排名的因素,从而确定自己家应用的优化领域。 竞品分析的主…

Debian彻底卸载软件包(apt-get)

彻底卸载软件包可以运行如下命令&#xff1a; 1 # 删除软件及其配置文件 2 apt-get --purge remove <package> 3 # 删除没用的依赖包 4 apt-get autoremove <package> 5 # 此时dpkg的列表中有“rc”状态的软件包&#xff0c;可以执行如下命令做最后清理&#xff1a…

持续集成——App自动化测试集成实战

这里写目录标题 一、app自动化测试持续集成的好处二、环境准备三、Jenkins节点挂载四、节点环境的配置1、JDK2、模拟器3、sdk环境4、Python3环境5、allure-commandline工具6、allure插件 五、本地运行待测代码(保证代码没有问题)六、库文件的导出七、Jenkins上运行代码配置1、指…

如何使用git上传文件到Github远程仓库(完整详细流程)

文章目录 1.在电脑上下载Git2.配置Git3.上传Github仓库 1.在电脑上下载Git git官网下载&#xff1a;Git - Downloads (git-scm.com) 下载后安装即可。 2.配置Git 鼠标右键进入Git命令行 &#xff08;1&#xff09;设置用户名和设置用户账号&#xff08;需要是自己的注册Git…

Floccus插件 + 坚果云 实现不同浏览器间书签同步

&#xfeff; 在工作与学习中&#xff0c;我们时常希望在不同浏览器之间实现书签的同步&#xff1b;而一些传统的浏览器书签同步方案&#xff0c;或多或少都面临着一些问题——比如&#xff0c;Chrome浏览器尽管可以实现比较好的跨设备同步&#xff0c;但由于网络的限制可能导致…

总结832

学习目标&#xff1a; 4月&#xff08;复习完高数18讲内容&#xff0c;背诵21篇短文&#xff0c;熟词僻义300词基础词&#xff09; 学习内容&#xff1a; 暴力英语&#xff1a;读了《美丽心灵》中的经典演讲&#xff0c;回诵前3篇文章&#xff0c;默写了前3篇文章&#xff0c…

OpenCV中的图像处理3.1-3.3(三)

目录 3.1 改变色彩空间目标改变色彩空间对象跟踪如何找到HSV值来追踪&#xff1f;练习 3.2 图像的几何变换目标变换缩放平移旋转仿射变换透视变换其他资源 3.3 图像阈值处理目标简单的阈值处理自适应阈值处理Otsu的二值化Otsu的二值化是如何工作的&#xff1f;其他资源练习 翻译…

Ubuntu22.04安装CUDA、cudnn详细步骤

文章目录 安装CUDA安装cudnn下载安装文件安装验证是否安装成功 在Ubuntu系统中&#xff0c;使用nvidia-smi命令可以看到当前GPU信息&#xff0c;在右上角可以看到CUDA Version&#xff0c;意思是最大支持的CUDA版本号。 上一篇文章已经安装了显卡驱动&#xff0c;这次继续安装C…

带你深入学习k8s--(三) pod 管理

目录 一、简介 1、什么是pod 2、为什么要有pod 二、pod的分类 0、pod常用命令命令 1、准备镜像 2、自主式pod 3、控制器创建pod 4、扩容pod数量 5、通过service暴露pod&#xff08;负载均衡&#xff0c;自动发起&#xff09; 6、更新应用版本 三、编写yaml文件 四、Pod生命周期…

EF基础入门

目录 基础查询 基本单表Select查询 ​编辑 数据排序 分页Skip()、 Take() 查询聚合操作符&#xff08;如 Count、Sum、 Min、Max、Average、Aggregate&#xff09; 不返回一个序列&#xff0c;而返回一个值。 基本单表分页 基本单表 in / not in 内连接Join 左连 Grou…