Spark(10):RDD依赖关系

news2024/11/24 17:58:05

目录

0. 相关文章链接

1. RDD 血缘关系

2. RDD依赖关系

3. RDD窄依赖

6. RDD宽依赖

7. RDD阶段划分

8. RDD阶段划分源码

9. RDD任务划分


0. 相关文章链接

 Spark文章汇总 

1. RDD 血缘关系

        RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。 

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * @ date: 2023/7/4
 * @ author: yangshibiao
 * @ desc: 项目描述
 */
object ModelTest {

    def main(args: Array[String]): Unit = {

        val conf: SparkConf = new SparkConf()
            .setAppName("ModelTest")
            .setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)

        val fileRDD: RDD[String] = sc.textFile("input/1.txt")
        println(fileRDD.toDebugString)
        println("----------------------")
        val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
        println(wordRDD.toDebugString)
        println("----------------------")

        val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
        println(mapRDD.toDebugString)
        println("----------------------")

        val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
        println(resultRDD.toDebugString)

        resultRDD.collect()

    }
}

2. RDD依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * @ date: 2023/7/4
 * @ author: yangshibiao
 * @ desc: 项目描述
 */
object ModelTest {

    def main(args: Array[String]): Unit = {

        val conf: SparkConf = new SparkConf()
            .setAppName("ModelTest")
            .setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)

        val fileRDD: RDD[String] = sc.textFile("input/1.txt")
        println(fileRDD.dependencies)
        println("----------------------")

        val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
        println(wordRDD.dependencies)
        println("----------------------")

        val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
        println(mapRDD.dependencies)
        println("----------------------")

        val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
        println(resultRDD.dependencies)

        resultRDD.collect()

    }
}

3. RDD窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。 

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)  

6. RDD宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。 

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner, 
    val serializer: Serializer = SparkEnv.get.serializer,     
    val keyOrdering: Option[Ordering[K]] = None,     
    val aggregator: Option[Aggregator[K, V, C]] = None,     
    val mapSideCombine: Boolean = false
)   extends Dependency[Product2[K, V]]  

7. RDD阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。 

8. RDD阶段划分源码

try { 
  // New stage creation may throw an exception if, for example, jobs are run on a 
  // HadoopRDD whose underlying HDFS files have been deleted. 
  finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {   
    case e: Exception => 
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)     listener.jobFailed(e)     return 
} 
 
…… 
private def createResultStage(   
    rdd: RDD[_],   
    func: (TaskContext, Iterator[_]) => _,   
    partitions: Array[Int],   
    jobId: Int,   
    callSite: CallSite
): ResultStage = { 
    val parents = getOrCreateParentStages(rdd, jobId) 
    val id = nextStageId.getAndIncrement() 
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) 
    stageIdToStage(id) = stage 
    updateJobIdStageIdMaps(jobId, stage) 
    stage 
} 
 
…… 
 
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { 
    getShuffleDependencies(rdd).map { 
        shuffleDep =>   getOrCreateShuffleMapStage(shuffleDep, firstJobId) 
    }.toList 
} 
 
…… 
 private[scheduler] def getShuffleDependencies(   
    rdd: RDD[_]
): HashSet[ShuffleDependency[_, _, _]] = { 
    val parents = new HashSet[ShuffleDependency[_, _, _]] 
    val visited = new HashSet[RDD[_]] 
    val waitingForVisit = new Stack[RDD[_]] 
    waitingForVisit.push(rdd) 
    while (waitingForVisit.nonEmpty) {   
        val toVisit = waitingForVisit.pop()   
        if (!visited(toVisit)) {     
            visited += toVisit 
            toVisit.dependencies.foreach {       
                case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep
                case dependency => waitingForVisit.push(dependency.rdd) 
            } 
        } 
    } 
    parents 
} 

9. RDD任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task 

  • Application:初始化一个 SparkContext 即生成一个 Application; 
  • Job:一个 Action 算子就会生成一个 Job; 
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1; 
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。 

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。  


注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/717608.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HDLBits刷题笔记7:Circuits.Combinational Logic.Karnaugh Map to Circuit

Karnaugh Map to Circuit 3-variable 实现如下卡诺图&#xff0c;用sop和pos两种方式 化简&#xff1a; module top_module(input a,input b,input c,output out ); // sop和pos相同assign out a | b | c; endmodule4-variable 实现如下卡诺图&#xff0c;用sop和pos两种方…

【RabbitMQ】

一、概念 MQ&#xff08;消息队列&#xff09;&#xff1a;是指在消息传送过程中保存消息的容器&#xff0c;用于分布式系统之间的通信 生产者&#xff1a;是发送消息的用户应用程序。 队列&#xff1a;是存储消息的缓冲区。 消费者&#xff1a;是接收消息的用户应用程序。 1…

前端开发在公司中的位置以及日常工作内容

导读 俗话说的好&#xff0c;不谋全局者不足谋一域。 上一篇文章我们介绍了计算机相关的各种不同方向&#xff0c;相信大家心里也有自己所喜欢的职业&#xff0c;那么今天我们继续讲讲在一个公司中前端开发处于什么样的地位&#xff0c;以及前端的一天都干些什么 普通公司的…

太卷了,阿里一面试官把多年总结的Java八股文完全开源了.......

Java越来越卷了&#xff0c;都快卷成韭菜花了&#xff0c;最近又赶上跳槽的高峰期&#xff0c;好多粉丝&#xff0c;都问我要有没有最新面试题&#xff0c;索性&#xff0c;前一阵子偶然得到一份阿里面试官整理的Java八股文&#xff0c;答案都整理好&#xff0c;整理的《互联网…

【Java基础教程】(二)入门介绍篇 · 下:从JDK下载安装到第一个“Hello World!”程序,解析PATH和CLASSPATH环境变量的妙用~

Java基础教程之入门介绍 下 本节学习目标1️⃣ JDK安装与配置2️⃣ 第一个Java程序&#xff1a;“Hello World!”3️⃣ 环境变量 CLASSPATH&#x1f33e; 总结 本节学习目标 JDK 安装与配置&#xff1b;理解环境变量PATH和CLASSPATH的主要作用&#xff1b;运行第一个Java程序…

Spark(11):RDD持久化

目录 0. 相关文章链接 1. RDD Cache 缓存 2. RDD CheckPoint 检查点 3. 缓存和检查点区别 0. 相关文章链接 Spark文章汇总 1. RDD Cache 缓存 RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存&#xff0c;默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这…

关于Spring Boot的若干个重要问题

Spring Boot 1.什么是springboot 用来简化spring应用的初始搭建以及开发过程 使用特定的方式来进行配置&#xff08;properties或yml文件&#xff09; 创建独立的spring引用程序 main方法运行 嵌入的Tomcat 无需部署war文件 简化maven配置 自动配置spring添加对应功能starter…

nodejs-pm2管理js并发/自动重启/恢复等

目录 一、nodejs安装二、启动运行js三、实用功能1-pm2对进程名起别名四、实用功能2-pm2启动多个进程五、实用功能3-pm2内存限制自动重启六、实用功能4-服务器宕机前保存记录恢复进程 一、nodejs安装 nodejs安装使用nohup后台启动项目&#xff0c;倒是解决了控制台问题&#xf…

用Python从文件中读取学生成绩,并计算最高分/最低分/平均分

目录标题 前言环境使用:涉及知识点代码展示尾语 前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 今天咱们试试用Python从文件中读取学生成绩&#xff0c;并计算最高分/最低分/平均分。 环境使用: Python 3.8 解释器 Pycharm 编辑器 涉及知识点 文件读写 基础语法 字…

QQ邮箱第三方POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV服务授权码

参考QQ邮箱&#xff1a; 什么是授权码&#xff0c;它又是如何设置&#xff1f; 设置入口&#xff1a; 选择 账户 下拉找到POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV服务就好了。我这边已经开通&#xff0c;开通流程挺简单的&#xff0c;手机号绑定然后输入验证码就好了。

华安联大:基于北斗RTK+蓝牙AOA、UWB定位技术为智慧港口提供多元化解决方案

深圳华安联大创新科技有限公司的商场室内导航系统方案&#xff0c;解决传统购物中心用户体验差的缺点&#xff0c;可实现3大类功能应用: (1)实现顾客在商场内自主导航&#xff0c;室内位置实时分享&#xff0c;目的地商铺自主导航&#xff0c;路径规划等功能: (2)停车场反向寻…

CISA在三星和D-Link设备中发现8个被积极利用的漏洞

美国网络安全和基础设施安全局&#xff08;CISA&#xff09;根据已有的证据&#xff0c;将8个被积极利用的漏洞列入已知的漏洞&#xff08;KEV&#xff09;目录中。 这8个被积极利用的漏洞包括影响三星智能手机的六个漏洞和影响D-Link设备的两个漏洞。以下是这八个漏洞&#x…

Java jsp 实战

1.JSP执行过程&#xff08;原理&#xff09; 步骤1&#xff1a;翻译&#xff08;jsp-->java&#xff09; 步骤2&#xff1a;编译&#xff08;java-->class文件&#xff09; 步骤3&#xff1a;执行&#xff08;执行class(字节码)文件&#xff09; 2.JSP实战 步骤1&…

UE4中创建的瞄准偏移或者混合空间无法拖入动画

UE4系列文章目录 文章目录 UE4系列文章目录前言一、解决办法 前言 UE4 AimOffset(瞄准偏移)动画融合时&#xff0c;AimOffse动画拖入不了融合框的解决办法&#xff0c;你会发现动画无法拖入到融合框&#xff0c;ue4编辑器提示“Invalid Additive animation Type”&#xff0c;…

Android Studio最新好用的插件----Gson转Java实体类/Kotlin Data

1.Java 安装好插件之后&#xff0c;把Gson/Json数据复制一下&#xff0c;eg: { "Chrome": "UA-66061856-6", "ChromePro": "UA-66061856-9", "Opera": "UA-66061856-7", "Edge": "UA-66061856-8&q…

K8s 为什么要弃用 Docker

K8s 为什么要弃用 Docker 最近在学习容器技术的过程中&#xff0c;看到有关于Kubernetes“弃用 Docker”的事情&#xff0c;担心现在学 Docker 是否还有价值&#xff0c;是否现在就应该切换到 containerd 或者是其他 runtime。 随着深入了解&#xff0c;这些疑虑的确是有些道理…

python实现语音识别(讯飞开放平台)

文章目录 讯飞平台使用python实现讯飞接口的语音识别第一步&#xff1a;导入需要的依赖库第二步&#xff1a;初始化讯飞接口对象第三步&#xff1a;收到websocket建立连接后的处理函数第四步&#xff1a;收到websocket消息的处理函数第五步&#xff1a;整合运行各函数 讯飞平台…

linux内核TCP源码浅析

目录 数据接收流程驱动层网络层ip_local_deliverip_local_deliver_finish 传输层tcp_v4_rcvtcp_v4_do_rcvtcp_rcv_establishedtcp_recvmsg linux内核源码下载&#xff1a;https://cdn.kernel.org/pub/linux/kernel/ 我下载的是&#xff1a;linux-5.11.1.tar.gz 数据接收流程 …

服务器数据库被360后缀勒索病毒攻击怎么办?360勒索病毒的加密形式

近日&#xff0c;我们收到很多企业的求助&#xff0c;企业服务器内的多种数据库被.360后缀的勒索病毒加密&#xff0c;导致企业许多工作无法正常运转&#xff0c;也给企业带来了严重的经济损失。360后缀勒索病毒是一种恶意软件&#xff0c;它属于BeijingCrypt勒索病毒家族&…

Server - 通过 AutoSSH 建立服务器端口转发用于访问网页

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/131536508 AutoSSH 是一款用于创建和维护持久的SSH隧道的工具&#xff0c;可以自动检测和恢复断开的连接&#xff0c;从而保证隧道的稳定性。Auto…