【Spark精讲】一文讲透Spark RDD

news2025/1/12 1:56:53

MapReduce的缺陷

MR虽然在编程接口的种类和丰富程度上已经比较完善了,但这些系统普遍都缺乏操作分布式内存的接口抽象,导致很多应用在性能上非常低效 。 这些应用的共同特点是需要在多个并行操 作之间重用工作数据集 ,典型的场景就是机器学习和图应用中常用的迭代算法 (每一步对数据 执行相似的函数) 。

RDD

RDD是只读的。

RDD五大属性:①分区、②依赖、③计算函数、④分区器、⑤首选运行位置。

RDD 则是直接在编程接口层面提供了一种高度受限的共享内存模型,如图下图所示。 RDD 是 Spark 的核心数据结构,全称是弹性分布式数据集 (Resilient Distributed Dataset),其本质是一种分布式的内存抽象,表示一个只读的数据分区( Partition)集合 。一个 RDD 通常只能通过其他的 RDD转换而创建。 RDD 定义了各种丰富的转换操作(如 map、 join和 filter等),通过这些转换操作,新的 RDD 包含了如何从其他 RDD 衍生所必需的信息,这些信息构成了 RDD 之间的依赖关系( Dependency) 。 依赖具体分为两种, 一种是窄依赖, RDD 之间分区是一一对应的;另一种是宽依赖,下游 RDD 的每个分区与上游 RDD (也称之为父 RDD)的每个分区都有关,是多对多的关系 。 窄依赖中的所有转换操作可以通过类似管道(Pipeline)的方式全部执行,宽依赖意味着数据需要在不同节点之间 Shuffle 传输 。

RDD计算的时候会通过一个 compute函数得到每个分区的数据。 若 RDD是通过已有的文件系统构建的,则 compute 函数读取指定文件系统中的数据;如果 RDD 是通过其他 RDD 转换而来的,则 compute 函数执行转换逻辑,将其他 RDD 的数据进行转换。 RDD 的操作算子包括两 类, 一类是 transformation,用来将 RDD 进行转换,构建 RDD 的依赖关系;另一类称为 action, 用来触发 RDD 的计算,得到 RDD 的相关计算结果或将 RDD 保存到文件系统中。

在 Spark 中, RDD 可以创建为对象 ,通过对象上的各种方法调用来对 RDD 进行转换 。 经过一系列的 transformation逻辑之后,就可以调用 action来触发 RDD 的最终计算。 通常来讲, action 包括多种方式,可以 是 向应用程序返回结果( show、 count 和 collect等),也可以是向存 储系统保存数据(saveAsTextFile等)。 在Spark中,只有遇到 action,才会真正地执行 RDD 的计算(注:这被称为惰性计算,英文为 LazyEvqluation),这样在运行时可以通过管道的方式传输多个转换 。

总结而言,基于 RDD 的计算任务可描述为从稳定的物理存储(如分布式文件系统 HDFS) 中加载记录,记录被传入由一组确定性操作构成的 DAG (有向无环图),然后写回稳定存储。 RDD还可以将数据集缓存到内存中,使得在多个操作之间可以很方便地重用数据集。 总的来讲,RDD 能够很方便地支持 MapReduce 应用、关系型数据处理、流式数据处理(Stream Processing) 和迭代型应用(图计算、机器学习等)。

在容错性方面,基于 RDD 之间的依赖, 一个任务流可以描述为 DAG。 在实际执行的时候, RDD 通过 Lineage 信息(血缘关系)来完成容错,即使出现数据分区丢失,也可以通过 Lineage 信息重建分区。 如果在应用程序中多次使用同一个 RDD,则可以将这个 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据 Lineage 信息得到分区的数据,在后续其他地方用到这个 RDD 的时候,会直接从缓存处读取而不用再根据 Lineage信息计算,通过重用达到提升性能的目的 。 虽然 RDD 的 Lineage 信息可以天然地实现容错(当 RDD 的某个分区数据计算失败或丢 失时,可以通过 Lineage信息重建),但是对于长时间迭代型应用来说,随着迭代的进行,RDD 与 RDD之间的 Lineage信息会越来越长,一旦在后续迭代过程中出错,就需要通过非常长的 Lineage信息去重建,对性能产生很大的影响。 为此,RDD 支持用 checkpoint机制将数据保存到持久化的存储中,这样就可以切断之前的 Lineage信息,因为 checkpoint后的 RDD不再需要知道它的父 RDD ,可以从 checkpoint 处获取数据。

DAG

顾名思义,DAG 是一种“图”,图计算模型的应用由来已久,早在上个世纪就被应用于数据库系统(Graph databases)的实现中。任何一个图都包含两种基本元素:节点(Vertex)和边(Edge),节点通常用于表示实体,而边则代表实体间的关系。

DAG,有向无环图,Directed Acyclic Graph的缩写,常用于建模。Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。
基础概念
介绍DAGScheduler中的一些概念,有助于理解后续流程。

  • Job:调用RDD的一个action,如count,即触发一个Job,spark中对应实现为ActiveJob,DAGScheduler中使用集合activeJobs和jobIdToActiveJob维护Job
  • Stage:代表一个Job的DAG,会在发生shuffle处被切分,切分后每一个部分即为一个Stage,Stage实现分为ShuffleMapStage和ResultStage,一个Job切分的结果是0个或多个ShuffleMapStage加一个ResultStage
  • TaskSet:一组Task
  • Task:最终被发送到Executor执行的任务,和stage的ShuffleMapStage和ResultStage对应,其实现分为ShuffleMapTask和ResultTask

把 DAG 图反向解析成多个阶段,每个阶段中包含多个任务,每个任务会被任务调度器分发给工作节点上的 Executor 上执行。

Web UI上DAG举例 

Checkpoint

RDD的依赖

checkpoint先了解一下RDD的依赖,比如计算wordcount:

scala>  sc.textFile("hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd").flatMap(_.split("\\\t")).map((_,1)).reduceByKey(_+_);
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:28
 
scala> res0.toDebugString
res1: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:28 []
 +-(2) MapPartitionsRDD[3] at map at <console>:28 []
    |  MapPartitionsRDD[2] at flatMap at <console>:28 []
    |  hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd MapPartitionsRDD[1] at textFile at <console>:28 []
    |  hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd HadoopRDD[0] at textFile at <console>:28 []

1、在textFile读取hdfs的时候就会先创建一个HadoopRDD,其中这个RDD是去读取hdfs的数据key为偏移量value为一行数据,因为通常来讲偏移量没有太大的作用所以然后会将HadoopRDD转化为MapPartitionsRDD,这个RDD只保留了hdfs的数据。
2、flatMap 产生一个RDD MapPartitionsRDD
3、map 产生一个RDD MapPartitionsRDD
4、reduceByKey 产生一个RDD ShuffledRDD

如何建立checkPoint

1、首先需要用sparkContext设置hdfs的checkpoint的目录,如果不设置使用checkpoint会抛出异常:

scala> res0.checkpoint
org.apache.spark.SparkException: Checkpoint directory has not been set in the SparkContext
 
scala> sc.setCheckpointDir("hdfs://leen:8020/checkPointDir")

sc.setCheckpointDir("hdfs://leen:8020/checkPointDir")
执行了上面的代码,hdfs里面会创建一个目录:
/checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d

2、然后执行checkpoint

scala> res0.checkpoint
1

发现hdfs中还是没有数据,说明checkpoint也是个transformation的算子。

scala> res0.count()
INFO ReliableRDDCheckpointData: Done checkpointing RDD 4 to hdfs://leen:8020/checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4, new parent is RDD 5
res5: Long = 73689
1
2
3
hive > dfs -du -h /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4;
147    147    /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/_partitioner
1.2 M  1.2 M  /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/part-00000
1.2 M  1.2 M  /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/part-00001

但是执行的时候相当于走了两次流程,前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中,如下:

rdd.cache()
rdd.checkpoint()
rdd.collect

在源码中,在checkpoint的时候强烈建议先进行cache,并且当你checkpoint执行成功了,那么前面所有的RDD依赖都会被销毁,如下:

 /**
   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
   * directory set with `SparkContext#setCheckpointDir` and all references to its parent
   * RDDs will be removed. This function must be called before any job has been
   * executed on this RDD. It is strongly recommended that this RDD is persisted in
   * memory, otherwise saving it on a file will require recomputation.
   */
 
  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    // NOTE: we use a global lock here due to complexities downstream with ensuring
    // children RDD partitions point to the correct parent partitions. In the future
    // we should revisit this consideration.
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

RDD依赖被销毁

scala> res0.toDebugString
res6: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:28 []
 |  ReliableCheckpointRDD[5] at count at <console>:30 []

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1341452.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mybatis行为配置之Ⅰ—缓存

专栏精选 引入Mybatis Mybatis的快速入门 Mybatis的增删改查扩展功能说明 mapper映射的参数和结果 Mybatis复杂类型的结果映射 Mybatis基于注解的结果映射 Mybatis枚举类型处理和类型处理器 再谈动态SQL 文章目录 专栏精选摘要引言正文缓存配置项说明cacheEnabledlocal…

SASS循环

<template><div><button class"btn type-1">默认按钮</button><button class"type-2">主要按钮</button><button class"type-3">成功按钮</button><button class"type-4">信息…

VSCode 如何安装插件的历史版本

背景 在日常开发过程中&#xff0c;我们可能会遇到新版VSCode插件存在问题&#xff0c;无法正常工作的情况。这种情况下&#xff0c;一种可行的解决方案就是安装插件的历史版本。VSCode 插件默认安装的都是插件最新的版本&#xff0c;例如下面 vscode-styled-compoents 插件 本…

C语言之进制转换

C语言之进制转换 一、引言二、十进制与二进制、八进制、十六进制三、二进制与八进制、十六进制四、八进制与十六进制 一、引言 在C语言中&#xff0c;经常使用的整数的进制有十进制、二进制、十六进制&#xff08;在C语言中以0x或0X为前缀&#xff09;、八进制&#xff08;在C…

3D游戏角色建模纹理贴图处理

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 在本文中&#xff0c;我们将介绍 3D 纹理的基础知识&#xff0c;并讨…

使用Microsoft托管密钥的Azure信息保护云退出

由于各种原因&#xff0c;一些组织需要一个明确定义的流程来停止使用 Azure 信息保护以及对云服务的任何依赖&#xff0c;而不会在采用之前失去对其数据的访问权限 - 以便在出现需要时做好准备。 Azure 信息保护 (AIP) 为使用自带密钥 (BYOK) 的客户和使用 Microsoft 托管密钥…

uniapp:全局消息是推送,实现app在线更新,WebSocket,apk上传

全局消息是推送&#xff0c;实现app在线更新&#xff0c;WebSocket 1.在main.js中定义全局的WebSocket2.java后端建立和发送WebSocket3.通知所有用户更新 背景&#xff1a; 开发人员开发后app后打包成.apk文件&#xff0c;上传后通知厂区在线用户更新app。 那么没在线的怎么办&…

爬虫工作量由小到大的思维转变---<第三十三章 Scrapy Redis 23年8月5日后会遇到的bug)>

前言: 收到回复评论说,按照我之前文章写的: 爬虫工作量由小到大的思维转变---&#xff1c;第三十一章 Scrapy Redis 初启动/conn说明书)&#xff1e;-CSDN博客 在启动scrapy-redis后,往redis丢入url网址的时候遇到: TypeError: ExecutionEngine.crawl() got an unexpected …

ASM GaN: 行业硅基氮化镓射频和功率设备标准模型—第一部分:直流、CV和射频模型

来源&#xff1a;ASM GaN: Industry Standard Model for GaN RF and Power Devices—Part 1: DC, CV, and RF Model (IEEE TRANSACTIONS ON ELECTRON DEVICES) 19年 摘要 本文介绍了GaN&#xff08;氮化镓&#xff09;HEMT&#xff08;高电子迁移率晶体管&#xff09;的先进S…

边缘检测——PidiNet网络训练自己数据集并优化推理测试(详细图文教程)

PiDiNet 是一种用于边缘检测的算法&#xff0c;它提出了一种简单、轻量级但有效的架构。PiDiNet 采用了新 颖的像素差卷积&#xff0c;将传统的边缘检测算子集成到现代 CNN 中流行的卷积运算中&#xff0c;以增强任务性能。 在 BSDS500、NYUD 和 Multicue 上进行了大量的实验…

第四课:早期的编程方式、编程语言发展史、编程基础-语句和函数、算法入门、数据结构、阿兰图灵及软件工程

第四课&#xff1a;早期的编程方式、编程语言发展史、编程基础-语句和函数、算法入门、数据结构、阿兰图灵及软件工程 第十章&#xff1a;早期的编程方式1、早期&#xff0c;程序如何进入计算机2、早期计算机的编程3、现代计算机基础结构——冯诺依曼计算机 第十一章&#xff1…

公司创建百度百科需要哪些内容?

一个公司或是一个品牌想要让自己更有身份&#xff0c;更有知名度&#xff0c;更有含金量&#xff0c;百度百科词条是必不可少的。通过百度百科展示公司的详细信息&#xff0c;有助于增强用户对公司的信任感&#xff0c;提高企业形象。通过百度百科展示公司的发展历程、领导团队…

[BUG] Hadoop-3.3.4集群yarn管理页面子队列不显示任务

1.问题描述 使用yarn调度任务时&#xff0c;在CapacityScheduler页面上单击叶队列&#xff08;或子队列&#xff09;时&#xff0c;不会显示应用程序任务信息&#xff0c;root队列可以显示任务。此外&#xff0c;FairScheduler页面是正常的。 No matching records found2.原…

web自动化(4)——POM设计重构

1. 什么是POM Page Object Model 是ui自动化测试中常见的封装方式。 原理&#xff1a;将页面封装为PO对象&#xff0c;然后通过面向对象的方式实现UI自动化 2. 封装原则 PO无需包含全部UI元素PO应当验证元素PO不应该包含断言PO不应该暴露元素 3. 怎么进行POM封装 面向对象…

<JavaEE> TCP 的通信机制(五) -- 延时应答、捎带应答、面向字节流

目录 TCP的通信机制的核心特性 七、延时应答 1&#xff09;什么是延时应答&#xff1f; 2&#xff09;延时应答的作用 八、捎带应答 1&#xff09;什么是捎带应答&#xff1f; 2&#xff09;捎带应答的作用 九、面向字节流 1&#xff09;沾包问题 2&#xff09;“沾包…

用ChatGPT挑选钻石!著名珠宝商推出-珠宝GPT

根据Salesforce最新发布的第五版《互联网购物报告》显示&#xff0c;ChatGPT等生成式AI的出现、快速发展&#xff0c;对零售行业和购物者产生了较大影响。可有效简化业务流程实现降本增效&#xff0c;并改善购物体验。 著名珠宝商James Allen为了积极拥抱生成式AI全面提升销售…

Redis 是如何执行的?

文章目录 命令执行流程步骤一&#xff1a;用户输入一条命令步骤二&#xff1a;客户端先将命令转换成 Redis 协议&#xff0c;然后再通过 socket 连接发送给服务器端步骤三&#xff1a;服务器端接收到命令步骤四&#xff1a;执行前准备步骤五&#xff1a;执行最终命令&#xff0…

HTML5+CSS3+JS小实例:网站实现一键切换暗色主题

实例:网站实现一键切换暗色主题 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"> <head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge&…

V-rep(CoppeliaSim)添加相机,与python联合仿真,并使用python读取V-rep中的RGB图与深度图

目录 前言在V-rep中构建场景建立python与V-rep通信 前言 本文主要介绍了如何使用python与V-rep联合仿真&#xff0c;并用OpenCV可视化V-rep中视觉传感器所能看到的 RGB图和深度图&#xff0c;效果图如下。 在V-rep中构建场景 本文使用的V-rep版本是3.5&#xff1a; 打开V-…

深度学习在语义分割中的进展与应用

埃弗顿戈梅德&#xff08;Everton Gomede&#xff09; 一、说明 语义分割是计算机视觉领域的一项关键任务&#xff0c;涉及将图像中的每个像素分类为预定义的类。这项任务在从自动驾驶汽车到医学成像的各种应用中都具有深远的影响。深度学习的出现显著提高了语义分割模型的功能…