Spark原理——逻辑执行图

news2025/1/13 15:56:16

逻辑执行图

  • 明确逻辑计划的边界

    在 Action 调用之前,会生成一系列的RDD,这些RDD之间的关系,其实就是整个逻辑计划

    val conf= new SparkConf().setMaster("local[6]").setAppName("wordCount_source")
    val sc= new SparkContext(conf)
    
    val textRDD=sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop"))
    val splitRDD=textRDD.flatMap(_.split(" "))
    val tupleRDD=splitRDD.map((_, 1))
    val reduceRDD=tupleRDD.reduceByKey(_ + _)
    val strRDD=reduceRDD.map(item => s"${item._1},${item._2}")
    
    println(strRDD.toDebugString)
    strRDD.collect.foreach(item =>println(item))
    

    例如上述代码,如果生成逻辑计划的,会生成效如下一些RDD,这些RDD是相互关联的,这些RDD之间,其实本质上生成的就是一个 计算链

    在这里插入图片描述

    接下来, 采用迭代渐进式的方式, 一步一步的查看一下整体上的生成过程

  • RDD 如何生成

    • **textFile算子的背后**

      研究 RDD 的功能或者表现的时候, 其实本质上研究的就是 RDD中的五大属性, 因为 RDD透过五大属性来提供功能和表现, 所以如果要研究 textFile 这个算子, 应该从五大属性着手, 那么第一步就要看看生成的 RDD是什么类型的 RDD

      1. **textFile 生成的是 HadoopRDD**

        在这里插入图片描述

        在这里插入图片描述

        在这里插入图片描述

      2. **HadoopRDDPartitions对应了 HDFSBlocks**

        在这里插入图片描述

        其实本质上每个 HadoopRDDPartition都是对应了一个 HadoopBlock, 通过 InputFormat 来确定 Hadoop中的 Block 的位置和边界, 从而可以供一些算子使用

        在这里插入图片描述

      3. **HadoopRDDcompute函数就是在读取 HDFS中的 Block**

        本质上, compute 还是依然使用 InputFormat来读取 HDFS中对应分区的 Block

      4. **textFile这个算子生成的其实是一个 MapPartitionsRDD**

        textFile这个算子的作用是读取 HDFS上的文件, 但是 HadoopRDD中存放是一个元组, 其 Key是行号, 其 ValueHadoop中定义的 Text对象, 这一点和 MapReduce程序中的行为是一致的

        但是并不适合 Spark 的场景, 所以最终会通过一个 map算子, 将 (LineNum, Text)转为 String形式的一行一行的数据, 所以最终 textFile这个算子生成的 RDD 并不是 HadoopRDD, 而是一个 MapPartitionsRDD

    • **map算子的背后**

      在这里插入图片描述

      • **map算子生成了 MapPartitionsRDD**

         val conf= new SparkConf().setMaster("local[6]").setAppName("wordCount_source")
        val sc= new SparkContext(conf)
        
        val rdd=sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop"))
        val rdd1=rdd.flatMap(_.split(" "))
        val rdd2=rdd1.map((_, 1))
        

        由源码可知, 当 val rdd2 = rdd1.map() 的时候, 其实生成的新 RDD 是 rdd2, rdd2 的类型是 MapPartitionsRDD, 每个 RDD 中的五大属性都会有一些不同, 由 map 算子生成的 RDD 中的计算函数, 本质上就是遍历对应分区的数据, 将每一个数据转成另外的形式

      • **MapPartitionsRDD的计算函数是 collection.map( function )**

        真正运行的集群中的处理单元是 Task, 每个 Task 对应一个 RDD 的分区, 所以 collection 对应一个 RDD 分区的所有数据, 而这个计算的含义就是将一个 RDD 的分区上所有数据当作一个集合, 通过这个 Scala 集合的 map 算子, 来执行一个转换操作, 其转换操作的函数就是传入 map 算子的 function

      • 传入 map算子的函数会被清理

        在这里插入图片描述

        这个清理主要是处理闭包中的依赖, 使得这个闭包可以被序列化发往不同的集群节点运行

    • **flatMap算子的背后**

      在这里插入图片描述

      flatMapmap算子其实本质上是一样的, 其步骤和生成的 RDD都是一样, 只是对于传入函数的处理不同, mapcollect.map( function )flatMapcollect.flatMap( function )

      从侧面印证了, 其实 Spark中的 flatMapScala基础中的 flatMap其实是一样的

      val conf= new SparkConf().setMaster("local[6]").setAppName("wordCount_source")
      val sc= new SparkContext(conf)
      
      val textRDD=sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop"))
      val splitRDD=textRDD.flatMap(_.split(" "))
      val tupleRDD=splitRDD.map((_, 1))
      val reduceRDD=tupleRDD.reduceByKey(_ + _)
      val strRDD=reduceRDD.map(item => s"${item._1},${item._2}")
      
      // println(strRDD.toDebugString)
      strRDD.collect.foreach(item =>println(item))
      

      textRDDsplitRDDtupleRDD

      textRDDsplitRDD再到 tupleRDD的过程, 其实就是调用 mapflatMap算子生成新的 RDD的过程, 所以如下图所示, 就是这个阶段所生成的逻辑计划

  • RDD 之间有哪些依赖关系

    • 前置说明

      • 什么是RDD之间的依赖关系?

        在这里插入图片描述

        • 什么是关系(依赖关系)?

          从算子视角上来看,splitRDD 通过 map 算子得到了 tupleRDD ,所以 splitRDD 和 tupleRDD 之间的关系是 map, 但是仅仅这样说,会不够全面,从细节上来看,RDD只是数据和关于数据的计算,而具体执这种计算得出结果的是一个神秘的其它组件,所以,这两个 RDD 的关系可以表示为 splitRDD 的数据通过 map 操作,被传入 tupleRDD ,这是它们之间更细化的关系

          但是 RDD 这个概念本身并不是数据容器,数据真正应该存放的地方是 RDD 的分区,所以如果把视角放在数据这一层面上的话,直接讲这两个 RDD 之间有关系是不科学的,应该从这两个 RDD 的分区之间的关系来讨论它们之间的关系

        • 那这些分区之间是什么关系?

          如果仅仅说 splitRDD 和 tupleRDD 之间的话,那它们的分区之间就是一对一的关系

          但是 tupleRDD 到 reduceRDD 呢?tupleRDD 通过算子 reduceByKey 生成 reduceRDD ,而这个算子是一个 Shuffle 操作,Shuff1e 操作的两个 RDD 的分区之间并不是一对一,reduceByKey 的一个分区对应 tupleRDD 的多个分区

      • reduceByKey 算子会生成 ShuffledRDD

        reduceByKey 是由算子 combineByKey 来实现的,combineByKey 内部会创建 ShuffledRDD 返回,而整个reduceByKey操作大致如下过程

        在这里插入图片描述

        去掉两个 reducer 端分区,只留下一个的话,如下

        在这里插入图片描述

        所以, 对于 reduceByKey 这个 Shuffle 操作来说, reducer 端的一个分区, 会从多个 mapper 端的分区拿取数据, 是一个多对一的关系

        至此为止, 出现了两种分区见的关系了, 一种是一对一, 一种是多对一

    • 窄依赖

      窄依赖(NarrowDependency)

      假如 rddB = rddA.transform(…), 如果 rddB 中一个分区依赖 rddA 也就是其父 RDD 的少量分区, 这种 RDD 之间的依赖关系称之为窄依赖

      换句话说, 子 RDD 的每个分区依赖父 RDD 的少量个数的分区, 这种依赖关系称之为窄依赖

      @Test
      def narrowDependency(): Unit = {
        // 需求:求得两个 RDD 之间的笛卡尔积
      
        // 1. 生成 RDD
        val conf = new SparkConf().setMaster("local[6]").setAppName("cartesian")
        val sc = new SparkContext(conf)
      
        val rddA = sc.parallelize(Seq(1, 2, 3))
        val rddB = sc.parallelize(Seq("a", "b"))
      
        // 2. 计算
        val rddC = rdd1.cartesian(rdd2)
      
        // 3. 结果获取
        rddC.collect().foreach(print(_))
      
        sc.stop()
      }
      // 运行结果: (1,a) (1,b) (2,a) (2,b) (3,a) (3,b)
      
      • 上述代码的 cartesian 是求得两个集合的笛卡尔积
      • 上述代码的运行结果是 rddA 中每个元素和 rddB 中的所有元素结合, 最终的结果数量是两个 RDD 数量之和
      • rddC 有两个父 RDD, 分别为 rddA 和 rddB

      对于 cartesian 来说, 依赖关系如下

      在这里插入图片描述

      上述图形中清晰展示如下现象

      • rddC 中的分区数量是两个父 RDD 的分区数量之乘积
      • rddA 中每个分区对应 rddC 中的两个分区 (因为 rddB 中有两个分区), rddB 中的每个分区对应 rddC 中的三个分区 (因为 rddA 有三个分区)

      它们之间是窄依赖, 事实上在 cartesian 中也是 NarrowDependency 这个所有窄依赖的父类的唯一一次直接使用, 为什么呢?

      因为所有的分区之间是拷贝关系, 并不是 Shuffle 关系

      • rddC 中的每个分区并不是依赖多个父 RDD 中的多个分区
      • rddC 中每个分区的数量来自一个父 RDD 分区中的所有数据, 是一个 FullDependence, 所以数据可以直接从父 RDD 流动到子 RDD
      • 不存在一个父 RDD 中一部分数据分发过去, 另一部分分发给其它的 RDD
    • 宽依赖

      宽依赖(ShuffleDependency

      并没有所谓的宽依赖, 宽依赖应该称作为 ShuffleDependency

      在 ShuffleDependency 的类声明上如下写到

      Represents a dependency on the output of a shuffle stage.
      

      上面非常清楚的说道, 宽依赖就是 Shuffle 中的依赖关系, 换句话说, 只有 Shuffle 产生的地方才是宽依赖

      那么宽窄依赖的判断依据就非常简单明确了, 是否有 Shuffle ?

      举个 reduceByKey 的例子, rddB = rddA.reduceByKey( (curr, agg) ⇒ curr + agg ) 会产生如下的依赖关系

      在这里插入图片描述

      在这里插入图片描述

      • rddB 的每个分区都几乎依赖 rddA 的所有分区
      • 对于 rddA 中的一个分区来说, 其将一部分分发给 rddB 的 p1, 另外一部分分发给 rddB 的 p2, 这不是数据流动, 而是分发
    • 如何分辨宽窄依赖 ?

      其实分辨宽窄依赖的本身就是在分辨父子 RDD 之间是否有 Shuffle, 大致有以下的方法

      • 如果是 Shuffle, 两个 RDD 的分区之间不是单纯的数据流动, 而是分发和复制
      • 一般 Shuffle 的子 RDD 的每个分区会依赖父 RDD 的多个分区

      先看是否一对一>是就是窄依赖,如果不是一对一,是多对一>不能确定,再继续判断

      但是这样判断其实不准确, 如果想分辨某个算子是否是窄依赖, 或者是否是宽依赖, 则还是要取决于具体的算子, 例如想看 cartesian 生成的是宽依赖还是窄依赖, 可以通过如下步骤

      1. 查看 map 算子生成的 RDD

        在这里插入图片描述

        在这里插入图片描述

        在这里插入图片描述

      2. 进去 RDD 查看 getDependence 方法

        在这里插入图片描述

        在这里插入图片描述

        在这里插入图片描述

        在这里插入图片描述
        724024159.png?origin_url=Untitled%2520180.png&pos_id=img-mqipe2i4-1704890196048)

    • 常见的窄依赖类型

      Dependency.scala 源码有。

      在这里插入图片描述

      • 一对一窄依赖

        其实 RDD 中默认的是 OneToOneDependency, 后被不同的 RDD 子类指定为其它的依赖类型, 常见的一对一依赖是 map 算子所产生的依赖, 例如 rddB = rddA.map(…)

        在这里插入图片描述

        • 每个分区之间一一对应, 所以叫做一对一窄依赖
      • Range 窄依赖

        Range 窄依赖其实也是一对一窄依赖, 但是保留了中间的分隔信息, 可以通过某个分区获取其父分区, 目前只有一个算子生成这种窄依赖, 就是 union 算子, 例如 rddC = rddA.union(rddB)

        在这里插入图片描述

        • rddC 其实就是 rddA 拼接 rddB 生成的, 所以 rddC 的 p5 和 p6 就是 rddB 的 p1 和 p2
        • 所以需要有方式获取到 rddC 的 p5 其父分区是谁, 于是就需要记录一下边界, 其它部分和一对一窄依赖一样
      • 多对一窄依赖

        多对一窄依赖其图形和 Shuffle 依赖非常相似, 所以在遇到的时候, 要注意其 RDD 之间是否有 Shuffle 过程, 比较容易让人困惑, 常见的多对一依赖就是重分区算子 coalesce, 例如 rddB = rddA.coalesce(2, shuffle = false), 但同时也要注意, 如果 shuffle = true 那就是完全不同的情况了

        在这里插入图片描述

        • 因为没有 Shuffle, 所以这是一个窄依赖
      • 再谈宽窄依赖的区别

        • 宽窄依赖的区别非常重要, 因为涉及了一件非常重要的事情: 如何计算 RDD ?
        • 宽窄依赖的核心区别是: 窄依赖的 RDD 可以放在一个 Task 中运行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1376240.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多测师肖sir___ui自动化测试po框架讲解版

po框架 一、ui自动化po框架介绍 (1)PO是Page Object的缩写 (2)业务流程与页面元素操作分离的模式,可以简单理解为每个页面下面都有一个配置class, 配置class就用来维护页面元素或操作方法 (3&am…

20240111在ubuntu20.04.6下解压缩RAR格式的压缩包

20240111在ubuntu20.04.6下解压缩RAR格式的压缩包 2024/1/11 18:25 百度搜搜:ubuntu rar文件怎么解压 rootrootrootroot-X99-Turbo:~/temp$ ll total 2916 drwx------ 3 rootroot rootroot 4096 1月 11 18:28 ./ drwxr-xr-x 25 rootroot rootroot 4096 1月…

【时事篇-02】20240110 365天存钱法(sum法)

背景需求 朋友圈里,一位保险推销员发布“存钱法广告”,我想用Python验算结果正确性 使用的是最近宫格数独里用到的”sum法” 代码展示 项目:存钱游戏计算 sum() 作者:阿夏 时间:2024年1月10日19:03 import random1、钻石版:从1元存到365元&a…

七星彩中奖号码模拟机器

七星彩号码抽取规则。 前区:抽取前区6个号码,每个号码是0~9之间选1个。 后区:抽取后区1个号码,每个号码是0~14之间选1个。 #七星彩模拟器,2024-01-12,by qs import random QianQu_6number [0,1,2,3,4,5,…

【C语言】linux内核set_task_stack_end_magic函数

一、函数定义 void set_task_stack_end_magic(struct task_struct *tsk) {unsigned long *stackend;stackend end_of_stack(tsk);*stackend STACK_END_MAGIC; /* for overflow detection */ } 内核版本6.4.3、6.7。 二、代码解读 解读1 这段代码是一个在Linux内核中定…

奇异值分解在图形压缩中的应用

奇异值分解在图形压缩中的应用 在研究奇异值分解的工程应用之前,我们得明白什么是奇异值?什么是奇异向量? 奇异值与奇异向量 概念:奇异值描述了矩阵在一组特定向量上的行为,奇异向量描述了其最大的作用方向。 奇异值…

Flash教程(一)入门

从本篇开始,我们将开始基于python的web开发系列教程,这里使用轻量级的web框架Flask。 1、简介 Flask是一个用来构建基于python语言的web应用程序的轻量级web框架。Flask的作者是来自Pocoo(由一群热爱python的人组建)的Armin Ronacher。本来只是作者的一…

Numpy使用简介

Numpy 相关题目 【Python】—— Numpy 初体验 【Python】—— NumPy基础及取值操作 Numpy是基于Python的通用数值计算工具包,其内包含大量数学计算函数和矩阵运算函数。多数科学计算工具包,比如Scipy,和数值分析工具包,比如Pandas…

大学物理-实验篇(二)——用分光计测定三棱镜的折射率(光:特定频段电磁波、光线在介质界面折射、平行光与凸透镜)

目录 预备知识 光:特定频段电磁波 光线在介质界面折射 平行光与凸透镜 实验目的 实验仪器 实验原理 实验步骤 准备分光计 目镜调焦 刻度盘读数 测三棱镜顶角 测三棱镜最小偏向角 数据处理 预备知识 光:特定频段电磁波 光速:…

YOLOv8涨点改进:多层次特征融合(SDI),小目标涨点明显,| UNet v2,比UNet显存占用更少、参数更少

💡💡💡本文独家改进:多层次特征融合(SDI),能够显著提升不同尺度和小目标的识别率 如何引入到YOLOv8 1)替代原始的Concat; 💡💡💡Yolov8魔术师,独家首发创新(原创),适用于Yolov5、Yolov7、Yolov8等各个Yolo系列,专栏文章提供每一步步骤和源码,轻松带你…

H 指数,经典算法实战。

🏆作者简介,普修罗双战士,一直追求不断学习和成长,在技术的道路上持续探索和实践。 🏆多年互联网行业从业经验,历任核心研发工程师,项目技术负责人。 🎉欢迎 👍点赞✍评论…

Mac上使用phpstudy+vscode配置PHP开发环境

使用的工具: 1、系统版本 2、vs code code 3、phpstudy_pro 一、下载vs code code以及必要的插件 1、vs code下载 点击vs code官网下载 选择对应的版本,一般电脑会自动识别对应的版本,点击下载,然后傻瓜式安装! 2…

陶瓷碗口缺口检测-技术方案

项目背景 陶瓷碗出厂前需要做的质量检测工作包括对陶瓷碗是否有缺口的检测,利用图像处理技术也可以对陶瓷碗的缺口进行检测和定位。 技术方案 陶瓷碗口缺口检测包含如下五个步骤。首先通过CMOS相机获取陶瓷碗的图像,二值化处理后通过图像复原技术进行去…

1886_emacs_v29中的行号配置

Grey 全部学习内容汇总: GitHub - GreyZhang/editors_skills: Summary for some common editor skills I used. emacs 29中的行号显示配置 行号显示 行号显示是一个编辑器中很常见的功能,我觉得这个功能的需求度可能因人群或者个人习惯而不同。对于只…

果然程序员的世界不是 0 就是 1

在一场轰动全球的爱情故事中,OpenAI 的首席执行官、同时也是打破常规的浪漫英雄,奥特曼,与他的基友奥利弗穆尔赫林在夏威夷举行了一场迷人的婚礼。在奥特曼的岛屿别墅附近,这对低调却又令人羡慕的新人,在奥特曼的哥哥杰…

python 爬虫 生成markdown文档

本文介绍的案例为使用python爬取网页内容并生成markdown文档,首先需要确定你所需要爬取的框架结构,根据网页写出对应的爬取代码 1.分析总网页的结构 我选用的是redis.net.com/order/xxx.html (如:Redis Setnx 命令_只有在 key 不存在时设置 key 的值。…

win系统搭建Minecraft世界服务器,MC开服教程,小白开服教程

Windows系统搭建我的世界世界服务器,Minecraft开服教程,小白开服教程,MC 1.19.4版本服务器搭建教程。 此教程使用 Mohist 1.19.4 服务端,此服务端支持Forge模组和Bukkit/Spigot/Paper插件,如果需要开其他服务端也可参…

uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -投票帖子详情实现

锋哥原创的uniapp微信小程序投票系统实战: uniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )_哔哩哔哩_bilibiliuniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )共计21条视频…

使用 Jamf Pro 和 Okta 工作流程实现自动化苹果设备管理

Jamf的销售工程师Vincent Bonnin与Okta的产品经理Emily Wendell一起介绍了JNUC 2021的操作方法会议。它们涵盖了Okta工作流程(Okta Workflow),并在其中集成了Jamf Pro,构建了一些工作流程,并提供了几个用例。 Okta 工作…

【密码学】python密码学库pycryptodome

记录了一本几乎是10年前的书(python绝技–用python成为顶级黑客)中过时的内容 p20 UNIX口令破解机 里面提到了python标准库中自带的crypt库,经验证Python 3.12.1中并没有这个自带的库,密码学相关的库目前(2024.1.12&a…