Spark高级特性 (难)

news2025/1/17 0:13:15

Spark高级特性 (难)

  • 闭包

    /*
      * 编写一个高阶函数,在这个函数要有一个变量,返回一个函数,通过这个变量完成一个计算
      * */
      @Test
      def test(): Unit = {
    //    val f: Int => Double = closure()
    //    val area = f(5)
    //    println(area)
    
        // 在这能否访问到 factor,不能,因为factor所在作用域是closure()方法,test()方法和closure()方法作用域是平级的,所有不能直接访问
        // 不能访问,说明 factor 在一个单独的作用域中
    
        // 在拿到 f 的时候, 可以通过 f 间接的访问到 closure() 作用域中的内容
        // 说明 f 携带了一个作用域
        // 如果一个函数携带了一个外包的作用域,这种函数我们称之为闭包
        val f = closure()
        f(5)
    
        // 闭包的本质是什么?
        // f 就是闭包,闭包的本质就是一个函数
        // 在 Scala 中,函数就是一个特殊的类型,FunctionX
        // 闭包也是一个 FunctionX 类型的对象
        // 所以闭包是一个对象
      }
      /*
      * 返回一个新的函数
      * */
      def closure(): Int => Double = {
        val factor = 3.14
        val areaFunction = (r: Int) => math.pow(r, 2) * factor // 计算圆的面积
        areaFunction
      }
    

    通过 closure 返回的函数 f 就是一个闭包, 其函数内部的作用域并不是 test 函数的作用域, 这种连带作用域一起打包的方式, 我们称之为闭包, 在 Scala 中

    Scala 中的闭包本质上就是一个对象, 是 FunctionX 的实例

  • Spark中的闭包

    分发闭包

    sc.textFile("./dataset/access_log_sample.txt")
          .flatMap(item => item.split(" "))
          .collect()
    
    // item => item.split(" ") 是一个函数,代表一个Task,这个Task会被分发到不同的Executor中
    

    上述这段代码中,flatMp中传入的是另外一个函数,传入的这个函数就是一个闭包,这个闭包会被序列化运行在不同的Executor中

    在这里插入图片描述

    class MyClass {
      val field = "Hello"
    
      def doStuff(rdd: RDD[String]): RDD[String] = {
        rdd.map(x => field + x)
      }
    }
    
    /* 
     * x => field + x   引用MyClass对象中的一个成员变量,说明它可以访问MyClass这个类的作用域,
     * 所以这个函数也是一个闭包,封闭的是MyClass这个作用域。
     * x => field + x 
     */
    

    在这里插入图片描述

    这段代码中的闭包就有了一个依赖, 依赖于外部的一个类, 因为传递给算子的函数最终要在 Executor 中运行, 所以需要 序列化 MyClass 发给每一个 Executor, 从而在 Executor 访问 MyClass 对象的属性

    在这里插入图片描述

    总结

    • 闭包就是一个封闭的作用域, 也是一个对象
    • Spark 算子所接受的函数, 本质上是一个闭包, 因为其需要封闭作用域, 并且序列化自身和依赖, 分发到不同的节点中运行
  • 累加器

    • 一个小问题

      var count = 0
      
      val conf = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
      val sc = new SparkContext(conf)
      
      sc.parallelize(Seq(1, 2, 3, 4, 5))
        .foreach(count += _)
      
      println(count)
      

      在这里插入图片描述

      上面这段代码是一个非常错误的使用, 请不要仿照, 这段代码只是为了证明一些事情

      先明确两件事, var count = 0是在 Driver 中定义的, foreach(count += _)这个算子以及传递进去的闭包运行在 Executor 中

      这段代码整体想做的事情是累加一个变量, 但是这段代码的写法却做不到这件事, 原因也很简单, 因为具体的算子是闭包, 被分发给不同的节点运行, 所以这个闭包中累加的并不是 Driver 中的这个变量


    • 全局累加器 (只能对数值型数据累加)

      Accumulators(累加器) 是一个只支持 added(添加) 的分布式变量, 可以在分布式环境下保持一致性, 并且能够做到高效的并发.

      原生 Spark 支持数值型的累加器, 可以用于实现计数或者求和, 开发者也可以使用自定义累加器以实现更高级的需求

      val conf = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
      val sc = new SparkContext(conf)
      
      val counter = sc.longAccumulator("counter")
      
      sc.parallelize(Seq(1, 2, 3, 4, 5))
        .foreach(counter.add(_)) 
      
      // 运行结果: 15
      println(counter.value)
      

      注意点:

      • Accumulator 是支持并发并行的, 在任何地方都可以通过 add 来修改数值, 无论是 Driver 还是 Executor
      • 只能在 Driver 中才能调用 value 来获取数值

      在 WebUI 中关于 Job 部分也可以看到 Accumulator 的信息, 以及其运行的情况

      在这里插入图片描述

      在这里插入图片描述

      累加器还有两个小特性, 第一, 累加器能保证在 Spark 任务出现问题被重启的时候不会出现重复计算. 第二, 累加器只有在 Action 执行的时候才会被触发.

      val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
      val sc = new SparkContext(config)
      
      val counter = sc.longAccumulator("counter")
      
      sc.parallelize(Seq(1, 2, 3, 4, 5))
        .map(counter.add(_)) // 这个地方不是 Action, 而是一个 Transformation
      
      // 运行结果是 0
      println(counter.value)
      
    • 自定义累加器

      开发者可以通过自定义累加器来实现更多类型的累加器, 累加器的作用远远不只是累加, 比如可以实现一个累加器, 用于向里面添加一些运行信息

      /**
       * RDD -> (1,2,3,4,5) —> Set(1,2,3,4,5),将原先的数,累加到一个集合中
       */
      @Test
        def acc(): Unit = {
          val conf = new SparkConf().setMaster("local[6]").setAppName("acc")
          val sc = new SparkContext(conf)
      
          val numAcc = new NumAccumulator()
          // 注册给Spark
          sc.register(numAcc, "num")
      
          sc.parallelize(Seq("1", "2", "3"))
            .foreach(item => numAcc.add(item))
      
          println(numAcc.value)
      
          sc.stop()
        }
      
      }
      
      class NumAccumulator extends AccumulatorV2[String, Set[String]] {
        private val nums: mutable.Set[String] = mutable.Set() // 定义类型是可变Set,否则后面的newAccumulator.nums ++= this.nums,++=会报错
      
        /**
         * 告诉 Spark 框架,这个累加器对象是否是空的
         */
        override def isZero: Boolean = {
          nums.isEmpty
        }
      
        /**
         * 提供给 Spark 框架一个拷贝的累加器
         */
        override def copy(): AccumulatorV2[String, Set[String]] = {
          val newAccumulator = new NumAccumulator()
          nums.synchronized {
            newAccumulator.nums ++= this.nums
          }
          newAccumulator
        }
      
        /**
         * 帮助 Spark 框架,清理累加器的内容
         */
        override def reset(): Unit = {
          nums.clear()
        }
      
        /**
         * 外部传入要累加的内容,在这个方法中进行累加
         */
        override def add(v: String): Unit = {
          nums += v
      
        }
      
        /**
         * 累加器在进行累加的时候,可能每个分布式节点都有一个实例
         * 在最后 Driver 进行一次合并,把所有的实例的内容合并起来,会调用这个 merge 方法进行合并
         */
        override def merge(other: AccumulatorV2[String, Set[String]]): Unit = {
          nums ++= other.value
        }
      
        /**
         * 提供给外部累加结果
         * 为什么一定不可变的,因为外部有可能再进行修改,如果是可变的集合,其外部的修改会影响内部的值
         */
        override def value: Set[String] = {
          nums.toSet // 不可变
      }
      
  • 广播变量

    • 广播变量的作用

      广播变量允许开发者将一个 Read-Only的变量缓存到集群中每个节点中, 而不是传递给每一个 Task 一个副本.

      • 集群中每个节点, 指的是一个机器
      • 每一个 Task, 一个 Task 是一个 Stage 中的最小处理单元, 一个 Executor 中可以有多个 Stage, 每个 Stage 有多个 Task

      所以在需要跨多个 Stage 的多个 Task 中使用相同数据的情况下, 广播特别的有用

      在这里插入图片描述

      只需要2个map,我们可以用广播

      在这里插入图片描述

    • 广播变量的API

      方法名描述
      id唯一标识
      value广播变量的值
      unpersist在 Executor 中异步的删除缓存副本
      destroy销毁所有此广播变量所关联的数据和元数据
      toString字符串表示
    • 使用广播变量的一般套路

      可以通过如下方式创建广播变量

      val b = sc.broadcast(1)
      

      如果 Log 级别为 DEBUG 的时候, 会打印如下信息

      DEBUG BlockManager: Put block broadcast_0 locally took  430 ms
      DEBUG BlockManager: Putting block broadcast_0 without replication took  431 ms
      DEBUG BlockManager: Told master about block broadcast_0_piece0
      DEBUG BlockManager: Put block broadcast_0_piece0 locally took  4 ms
      DEBUG BlockManager: Putting block broadcast_0_piece0 without replication took  4 ms
      

      创建后可以使用 value获取数据

      b.value
      

      获取数据的时候会打印如下信息

      DEBUG BlockManager: Getting local block broadcast_0
      DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)
      

      广播变量使用完了以后, 可以使用 unpersist删除数据

      b.unpersist
      

      删除数据以后, 可以使用 destroy销毁变量, 释放内存空间

      b.destroy
      

      销毁以后, 会打印如下信息

      DEBUG BlockManager: Removing broadcast 0
      DEBUG BlockManager: Removing block broadcast_0_piece0
      DEBUG BlockManager: Told master about block broadcast_0_piece0
      DEBUG BlockManager: Removing block broadcast_0
      
    • 使用 value 方法的注意

      方法签名 value: T

      value方法内部会确保使用获取数据的时候, 变量必须是可用状态, 所以必须在变量被 destroy之前使用 value方法, 如果使用 value时变量已经失效, 则会报出以下错误

      org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27)
        at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:69)
        ... 48 elided
      
    • 使用 destory 方法的注意点

      方法签名 destroy(): Unit

      destroy方法会移除广播变量, 彻底销毁掉, 但是如果你试图多次 destroy广播变量, 则会报出以下错误

      org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27)
        at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
        at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:107)
        at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)
        ... 48 elided
      
    • 使用code

      /**
         * 资源占用比较大, 有十个对应的 value
         */
        @Test
        def bc1():Unit = {
          // 数据,假装这个数据很大,大概一百兆
          val v = Map("Spark" -> "http://spark.apache.cn", "Scala" -> "http://www.scala-lang.org")
      
          val conf = new SparkConf().setMaster("local[6]").setAppName("bc")
          val sc = new SparkContext(conf)
      
          // 将其中的 Spark 和 Scala 转为对应的网址
          val r = sc.parallelize(Seq("Spark","Scala"))
          val result = r.map(item => v(item)).collect()
          result.foreach(println(_))
        }
        /**
         * 使用广播, 大幅度减少 value 的复制
         */
        @Test
        def bc2():Unit = {
          // 数据,假装这个数据很大,大概一百兆
          val v = Map("Spark" -> "http://spark.apache.cn", "Scala" -> "http://www.scala-lang.org")
      
          val conf = new SparkConf().setMaster("local[6]").setAppName("bc")
          val sc = new SparkContext(conf)
      
          // 创建广播
          val bc = sc.broadcast(v)
      
          // 将其中的 Spark 和 Scala 转为对应的网址
          val r = sc.parallelize(Seq("Spark","Scala"))
      
          // 在算子中使用广播变量代替直接引用集合, 只会复制和executor一样的数量
          // 在使用广播之前, 复制 map 了 task 数量份
          // 在使用广播以后, 复制次数和 executor 数量一致
          val result = r.map(item => bc.value(item)).collect()
          result.foreach(println(_))
        }
      

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1385181.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[易语言]易语言调用C++ DLL回调函数

易语言适合用于数据展示&#xff0c;数据的获取还是VC来的快、方便哈。 因此我一般使用VC编写DLL&#xff0c;使用易语言编写界面&#xff0c;同一个程序&#xff0c;DLL和EXE通讯最方便的就是使用接口回调了。 废话少说&#xff0c;进入主题。 1. VC编写DLL 为了DLL能够调…

精品量化公式——“风险指数”,适用于短线操作的交易系统,股票期货都适用!不漂移

不多说&#xff0c;直接上效果如图&#xff1a; ► 日线表现 代码评估 技术指标代码评估&#xff1a; 用于通过各种技术指标来分析股市走势。它使用了多个自定义变量&#xff08;VAR1, VAR2, VAR3, 等等&#xff09;&#xff0c;并且基于这些变量构建了复杂的条件和计算。以下…

PostgreSQL之SEMI-JOIN半连接

什么是Semi-Join半连接 Semi-Join半连接&#xff0c;当外表在内表中找到匹配的记录之后&#xff0c;Semi-Join会返回外表中的记录。但即使在内表中找到多条匹配的记录&#xff0c;外表也只会返回已经存在于外表中的记录。而对于子查询&#xff0c;外表的每个符合条件的元组都要…

爬虫入门学习(二)——response对象

大家好&#xff01;我是码银&#xff0c;代码的码&#xff0c;银子的银&#x1f970; 欢迎关注&#x1f970;&#xff1a; CSDN&#xff1a;码银 公众号&#xff1a;码银学编程 前言 在本篇文章&#xff0c;我们继续讨论request模块。从上一节&#xff08;爬虫学习(1)--reque…

【C++】异常机制

异常 一、传统的处理错误的方式二、C异常概念三、异常的使用1. 异常的抛出和捕获&#xff08;1&#xff09;异常的抛出和匹配原则&#xff08;2&#xff09;在函数调用链中异常栈展开匹配原则 2. 异常的重新抛出3. 异常安全4. 异常规范 四、自定义异常体系五、C 标准库的异常体…

编曲混音FL Studio21.2对电脑有什么配置要求

FL Studio 21是一款非常流行的音乐制作软件&#xff0c;它可以帮助音乐人和制作人创作出高质量的音乐作品。然而&#xff0c;为了保证软件的稳定性和流畅性&#xff0c;用户需要知道FL Studio 21对电脑的配置要求。本文将介绍FL Studio 21的配置要求&#xff0c;以帮助用户选择…

详解Java之Spring框架中事务管理的艺术

第1章&#xff1a;引言 大家好&#xff0c;我是小黑&#xff0c;咱们今天聊聊Spring框架中的事务管理。不管是开发小型应用还是大型企业级应用&#xff0c;事务管理都是个不可避免的话题。那么&#xff0c;为什么事务管理这么重要呢&#xff1f;假设在银行系统中转账时&#x…

Java实现海南旅游景点推荐系统 JAVA+Vue+SpringBoot+MySQL

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 用户端2.2 管理员端 三、系统展示四、核心代码4.1 随机景点推荐4.2 景点评价4.3 协同推荐算法4.4 网站登录4.5 查询景点美食 五、免责说明 一、摘要 1.1 项目介绍 基于VueSpringBootMySQL的海南旅游推荐系统&#xff…

远程开发之vscode端口转发

远程开发之vscode端口转发 涉及的软件forwarded port 通过端口转发&#xff0c;实现在本地电脑上访问远程服务器上的内网的服务。 涉及的软件 vscode、ssh forwarded port 在ports界面中的port字段&#xff0c;填需要转发的IP:PORT&#xff0c;即可转发远程服务器中的内网端…

SSH镜像、systemctl镜像、nginx镜像、tomcat镜像

目录 一、SSH镜像 二、systemctl镜像 三、nginx镜像 四、tomcat镜像 五、mysql镜像 一、SSH镜像 1、开启ip转发功能 vim /etc/sysctl.conf net.ipv4.ip_forward 1sysctl -psystemctl restart docker 2、 cd /opt/sshd/vim Dockerfile 3、生成镜像 4、启动容器并修改ro…

AirServer2024官方最新版免费下载

AirServer官方版是一款使用方便的投屏软件&#xff0c;在教室&#xff0c;会议室以及游戏中极为方便。AirServer官方版支持IOS、Android、Windows、mac、Chromebook等多种设备&#xff0c;使用AirServer不需要其他的设备即可完成投屏操作&#xff0c;相比其他投屏软件&#xff…

星图地球——Landsat5_C2_TOA_T1数据集

简介 Landsat 5是美国陆地卫星系列&#xff08;Landsat&#xff09;的第五颗卫星&#xff0c;于1984年3月1日发射&#xff0c;2011年11月停止工作。16天可覆盖全球范围一次。Landsat5_C2_TOA数据集是由Collection2 level1数据通过MTL文件计算得到的TOA反射率产品。数据集的空间…

Spring Boot 中实现文件上传、下载、删除功能

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

VUE好看的个人简历模板

文章目录 1.设计来源1.1 首页界面1.2 关于我界面1.3 我的资历界面1.4 项目经验界面1.5 我的技能界面1.6 联系我界面 2.效果和源码2.1 动态效果2.2 源码目录结构 源码下载 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_43151418/article/details/…

【python】12.字符串和正则表达式

使用正则表达式 正则表达式相关知识 在编写处理字符串的程序或网页时&#xff0c;经常会有查找符合某些复杂规则的字符串的需要&#xff0c;正则表达式就是用于描述这些规则的工具&#xff0c;换句话说正则表达式是一种工具&#xff0c;它定义了字符串的匹配模式&#xff08;…

网站万词霸屏推广系统源码:实现关键词推广,轻松提高关键词排名,带完整的安装部署教程

现如今&#xff0c;互联网的快速发展&#xff0c;网站推广成为企业网络营销的重要手段。而关键词排名作为网站推广的关键因素&#xff0c;一直备受关注。罗峰给大家分享一款网站万词霸屏推广系统源码&#xff0c;该系统可实现关键词推广&#xff0c;有效提高关键词排名&#xf…

Github 2024-01-15开源项目周报Top14

根据Github Trendings的统计&#xff0c;本周(2024-01-15统计)共有14个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目6TypeScript项目3Jupyter Notebook项目3Java项目2Kotlin项目1C#项目1C项目1 Microsoft PowerToys:…

实现STM32烧写程序-(2)Flash Loader 发送指令解析

简介 实现STM32烧写程序-(1)获取Bootloader版本信息&#xff0c; 看了数据手册之后可以了解到指令的发送, 但实现之前可以使用现成的工具进行测试和查看他的收发情况; Usart Bootloader 指令列表 Usart Bootloader 指令列表 应答ACK | NACK ACK(0x79) 表示 正常答复, NACK(0x…

mybatis中的驼峰转换

一、有啥用 开发时常用对象来存储从数据库中的记录&#xff0c;开启驼峰转化即可实现数据库字段(通常使用_下划线连接)与对象属性的对应&#xff0c;如数据库中的first_name字段会转化为firstName与对象中的firstName属性对应。 二、配置 三、相关报错 数据库字段与对象属性…

我为什么要写RocketMQ消息中间件实战派上下册这本书?

我与RocketMQ结识于2018年&#xff0c;那个时候RocketMQ还不是Apache的顶级项目&#xff0c;并且我还在自己的公司做过RocketMQ的技术分享&#xff0c;并且它的布道和推广&#xff0c;还是在之前的首席架构师的带领下去做的&#xff0c;并且之前有一个技术神经质的人&#xff0…