Spark7-9

news2025/1/8 4:34:00

7. Spark中的一些重要概念

7.1 Application

使用SparkSubmit提交的个计算应用,一个Application中可以触发多次Action,触发一次Action产生一个Job,一个Application中可以有一到多个Job

7.2 Job

Driver向Executor提交的作业,触发一次Acition形成一个完整的DAG,一个DAG对应一个Job,一个Job中有一到多个Stage,一个Stage中有一到多个Task

7.3 DAG

概念:有向无环图,是对多个RDD转换过程和依赖关系的描述,触发Action就会形成一个完整的DAG,一个DAG对应一个Job

7.4 Stage

概念:任务执行阶段,Stage执行是有先后顺序的,先执行前的,在执行后面的,一个Stage对应一个TaskSet,一个TaskSet中的Task的数量取决于Stage中最后一个RDD分区的数量

7.5 Task

概念:Spark中任务最小的执行单元,Task分类两种,即ShuffleMapTask和ResultTask

Task其实就是类的实例,有属性(从哪里读取数据),有方法(如何计算),Task的数量决定并行度,同时也要考虑可用的cores  

7.6 TaskSet

保存同一种计算逻辑多个Task的集合,一个TaskSet中的Task计算逻辑都一样,计算的数据不一样

8. 任务执行原理分析

8.1 WordCount程序有多个RDD

该Job中,有多少个RDD,多少个Stage,多少个TaskSet,多个Task,Task的类型有哪些?

Scala
 object WordCount {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    val sc: SparkContext = new SparkContext(conf)
    
    sc.textFile(args(0))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_)
      .saveAsTextFile(args(1))
    
    sc.stop()
  }
}

8.2 RDD的数量分析

  • 读取hdfs中的目录有两个输入切片,最原始的HadoopRDD的分区为2,以后没有改变RDD的分区数量,RDD的分区数量都是2
  • 在调用reduceByKey方法时,有shuffle产生,要划分Stage,所以有两个Stage
  • 第一个Stage的并行度为2,所以有2个Task,并且为ShuffleMapTask。第二个Stage的并行度也为2,所以也有2个Task,并且为ResultTask,所以一共有4个Task

spark的任务上次的逻辑计划图

 

下面的的物理执行计划图,会生成Task,生成的Task如下

 

8.3 Stage和Task的类型

Stage有两种类型,分别是ShuffleMapStage和ResultStage,ShuffleMapStage生成的Task叫做ShuffleMapTask,ResultStage生成的Task叫做ResultTask

  • ShuffleMapTask

1.可以读取各种数据源的数据

2.可以读取Shuffle的中间结果(Shuffle Read)

3.为shuffle做准备,即应用分区器,将数据溢写磁盘(ShuffleWrite),后面一定还会有其他的Stage

  • ResultTask

     1.可以读取各种数据源的数据

     2.可以读取Shuffle的中间结果(Shuffle Read)

     3.是整个job中最后一个阶段对应的Task,一定会产生结果数据(就是将产生的结果返回的Driver或写入到外部的存储系统)

多种情况:

 第一种:

 

 第二种

 

 第三种:

 

9. Shuffle的深入理解

什么是Shuffle,本意为洗牌,在数据处理领域里面,意为将数打散。

问题:shuffle一定有网络传输吗?有网络传输的一定是Shuffle吗?

9.1 Shuffle的概念

通过网络将数据传输到多台机器,数据被打散,但是有网络传输,不一定就有shuffle,Shuffle的功能是将具有相同规律的数据按照指定的分区器的分区规则,通过网络,传输到指定的机器的一个分区中,需要注意的是,不是上游的Task发送给下游的Task,而是下游的Task到上游拉取数据

 

9.2 reduceByKey一定会Shuffle吗

不一定,如果一个RDD事先使用了HashPartitioner分区先进行分区,然后再调用reduceByKey方法,使用的也是HashPartitioner,并且没有改变分区数量,调用redcueByKey就不shuffle

如果自定义分区器,多次使用自定义的分区器,并且没有改变分区的数量,为了减少shuffle的次数,提高计算效率,需要重新自定义分区器的equals方法

例如:

Scala
//创建RDD,并没有立即读取数据,而是触发Action才会读取数据
val lines = sc.textFile("hdfs://node-1.51doit.cn:9000/words")

val wordAndOne = lines.flatMap(_.split(" ")).map((_, 1))
//先使用HashPartitioner进行partitionBy
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val partitioned = wordAndOne.partitionBy(partitioner)
//然后再调用reduceByKey
val reduced: RDD[(String, Int)] = partitioned.reduceByKey(_ + _)

reduced.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-82")

 

9.3 join一定会Shuffle吗

不一定,join一般情况会shuffle,但是如果两个要join的rdd实现都使用相同的分区去进行分区了,并且join时,依然使用相同类型的分区器,并且没有改变分区数据,那么不shuffle

Scala
//通过并行化的方式创建一个RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通过并行化的方式再创建一个RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//该join一定有shuffle,并且是3个Stage
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)


val rdd11 = rdd1.groupByKey()
val rdd22 = rdd2.groupByKey()
//下面的join,没有shuffle
val rdd33 = rdd11.join(rdd22)

rdd33.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-86")

 

分析一下下面的图片,有几次shuffle,有几个Stage

 

上面的分支没有shuffle,因为实现已经使用groupBy进行分区了(使用了HashPartitioner,分区数量为3),在join是,使用的分区器也是HashPartitioner,分区数量为3,所有不shuffle

下面的分支没有实现进行分区(即使使用了HashPartitioner进行分区,但是jion后的分区数量发生了变化),所有要shuffle

9.4 shuffle数据的复用

spark在shuffle时,会应用分区器,当读取达到一定大小或整个分区的数据被处理完,会将数据溢写磁盘磁盘(数据文件和索引文件),溢写持磁盘的数据,会保存在Executor所在机器的本地磁盘(默认是保存在/temp目录,也可以配置到其他目录),只要application一直运行,shuffle的中间结果数据就会被保存。如果以后再次触发Action,使用到了以前shuffle的中间结果,那么就不会从源头重新计算而是,而是复用shuffle中间结果,所有说,shuffle是一种特殊的persist,以后再次触发Action,就会跳过前面的Stage,直接读取shuffle的数据,这样可以提高程序的执行效率。

正常情况:

 

再次触发Action

 

如果由于机器宕机或磁盘问题,部分shuffle的中间数据丢失,以后再次触发Action,使用到了shuffle中间结果数据,但是部数据无法访问,spark会根据RDD的依赖关系(RDD的血统)重新生成对应分区的Task,重新计算丢失的数据!

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/680633.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

没想到,老刘是逃离北上广的那波人

我今天跟老刘调试的时候,我问了老刘一个问题——我问你工作这么久了,有没有遇到什么可以让你财富自由的机会。 老刘那个时候正在焊板子,背着我,他抬起头又低了下去,然后说「我是有一次机会了,但是没有抓住&…

MySQL-SQL存储函数以及触发器详解

♥️作者:小刘在C站 ♥️个人主页: 小刘主页 ♥️努力不一定有回报,但一定会有收获加油!一起努力,共赴美好人生! ♥️学习两年总结出的运维经验,以及思科模拟器全套网络实验教程。专栏&#xf…

PyTorch翻译官网教程3-DATASETS DATALOADERS

官网链接 Datasets & DataLoaders — PyTorch Tutorials 2.0.1cu117 documentation 数据集和数据加载器 处理样本数据的代码可能会变得混乱并且难以维护。理想情况下,我们希望我们的数据集代码与模型训练代码解耦,以获得更好的可读性和模块化。PyT…

轻松了解工作与学习必备的版本控制+Git,全程舒适~

目录 一、版本控制 二、版本控制器 三、Git 四、项目实操 第一步 在github上创建一个新的远程仓库 第二步 克隆到本地文件夹 第三步 IDEA(PyCharm为例)集成Git 一、版本控制 概念:版本控制是指对软件开发过程中各种程序代码、配置文件…

【spring cloud学习】4、创建服务提供者

注册中心Eureka Server创建并启动之后,接下来介绍如何创建一个Provider并且注册到Eureka Server中,再提供一个REST接口给其他服务调用。 首先一个Provider至少需要两个组件包依赖:Spring Boot Web服务组件和Eureka Client组件。如下所示&…

ADRC自抗扰控制(CODESYS平台完整源代码)

博途PLC ADRC完整源代码请参考下面文章链接: 博途PLC ADRC自抗扰控制完整SCL源代码_adrc控制算法代码_RXXW_Dor的博客-CSDN博客关于自抗扰控制框图可以参看专栏的其它文章,这里不再讲解具体算法过程,详细了解也可以参看韩京清研究员写的 《ADRC自抗扰》一书。_adrc控制算法…

基于混合策略的改进哈里斯鹰优化算法-附代码

基于混合策略的改进哈里斯鹰优化算法 文章目录 基于混合策略的改进哈里斯鹰优化算法1.哈里斯鹰优化算法2.改进哈里斯鹰优化算法2.1 Sobol 序列初始化种群2.2 limit 阈值执行全局搜索阶段2.4 动态反向学习 3.实验结果4.参考文献5.Matlab代码6.python代码 摘要:针对原…

ElasticSearch-Kibana的安装

Kibana的安装 什么是ELK? ELK是Elasticsearch,Logstash,Kibana三大开源框架首字母大写简称,ELK属于大数据,是拆箱即用的,上手比较快 什么是Kibana? Kibana是一个针对ES的开源分析以及可视化平台,用来搜索,查看交互存储在ES索引中的数据,使用Kibana可以通过各类图标进行高级…

Flink(1)-概述

1.1 Apache Flink是什么? 在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题。目前比较流行的大数据处理引擎Apache Spark,…

SpringBoot第14讲:SpringBoot 如何统一异常处理

SpringBoot第14讲:SpringBoot 如何统一异常处理 本文是SpringBoot第14讲,SpringBoot接口如何对异常进行统一封装,并统一返回呢?以上文的参数校验为例,如何优雅的将参数校验的错误信息统一处理并封装返回呢 文章目录 Sp…

诊断测试工具CANoe.DiVa从入门到精通系列——开门见山

我是穿拖鞋的汉子,魔都中坚持长期主义的工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 人们会在生活中不断攻击你。他们的主要武器是向你灌输对自己的怀疑:你的价值、你的能力、你的潜力。他们往往会将此伪装成客观意见,但无一例外的是,他们想…

网络安全就业前景如何?是否还能入行?

网络安全专业是2015年新设立的专业,作为新兴专业吸引了很多人准备入行,那么它的就业前景怎么样?大致可以分为3个版块来介绍。 1.就业领域前景广阔 目前互联网、通信、新能源、房地产、金融证券、电子技术等行业迫切需要网络安全人才&#x…

22. 算法之图的最短路径

前言 关于图的最短路径问题,是图这种数据结构中的经典问题。也是与我们的生活息息相关的,比如上海四通八达的地铁线路,从一个地铁站,到另一个地铁站,可能有很多种不同的路线。那么,我们选哪种路线&#xf…

JavaFX第五篇 Image图片加载处理

JavaFX第五篇 Image图片加载处理 1. 代码2. 讲解3. 代码仓 图片已经成为每个网站的必备了,不仅可以提升个人网站的标识度而且还可以美化网站, 所以这里需要讲解一下如何加载图片,展示到前台给用户查看。 本次只是简单的讲解如何展示使用&…

【算法证明 七】深入理解深度优先搜索

深度优先搜索包含一个递归,对其进行分析要复杂一些。与上一篇文章一样,还是给节点定义几个状态,然后详细分析深度优先搜索算法有哪些性质。 算法描述 定义状态 v . c o l o r :初始状态为白色,被发现时改为灰色&…

Mysql的SQL性能分析【借助EXPLAIN分析】

性能分析 要说sql有问题,需要拿出证据,因此需要性能分析 Mysql查询优化器(Mysql Query Optimizer) Mysql中有专门负责优化SELECT语句的优化器模块,主要功能:通过计算分析系统中收集到的统计信息&#xf…

Xline v0.4.1: 一个用于元数据管理的分布式KV存储

Xline是什么?我们为什么要做Xline? Xline是一个基于Curp协议的,用于管理元数据的分布式KV存储。现有的分布式KV存储大多采用Raft共识协议,需要两次RTT才能完成一次请求。当部署在单个数据中心时,节点之间的延迟较低&a…

python机器学习——分类模型评估 分类算法(k近邻,朴素贝叶斯,决策树,随机森林,逻辑回归,svm)

目录 分类模型的评估模型优化与选择1.交叉验证2.网格搜索 【分类】K近邻算法【分类】朴素贝叶斯——文本分类实例:新闻数据分类 【分类】决策树和随机森林1.决策树2.决策树的算法3.代码实现实例:泰坦尼克号预测生死 【集成学习】随机森林1.集成学习2.随机…

LOMO:在受限资源上全参数微调

LOMO:Full Parameter Fine-Tuning for large language models with limited resources IntroductionMethodRethink the functionality of optimizerUsing SGD LOMO: LOw-Memory Optimization 实验参考 Introduction 在这篇文章中,作者的目的…

Go 语言进阶 - 工程进阶

前言: \textcolor{Green}{前言:} 前言: 💞这个专栏就专门来记录一下寒假参加的第五期字节跳动训练营 💞从这个专栏里面可以迅速获得Go的知识 今天的内容包括以下两个内容。关于实践的内容我会在后续发布出来。 01.语言…