33-98-spark-核心编程-RDD算子和任务阶段等

news2025/1/12 3:45:44

33-spark-核心编程-RDD:

1、RDD的创建,4中方式。分别是从内存中创建,从文件中创建,从其他RDD创建和new RDD,后两者不常用。

创建:big-data-study\Spark-demo\src\main\java\spark\core\com\zh\rdd\builder

2、RDD并行度和分区big-data-study\Spark-demo\src\main\java\spark\core\com\zh\rdd\builder\Spark01_RDD_Memor_Par

3、内存读取数据时,数据的分区Spark01_RDD_Memor_Par_v1

4、文件读取数据时,数据如何分配到对应分区Spark02_RDD_File_Par

5、RDD方法(RDD算子,分为转换算子和行动算子)

RDD方法:

​ 1.转换:功能的补充和封装,将旧的RDD包装成新的RDD,flatMap,map一层一层的。

​ 2.行动:触发任务的调度和作业的执行,collect

RDD方法=》RDD算子

​ 认知心理学认为解决问题其实将问题的状态进行改变:
​ 问题(初始) => 操作(算子) =>问题(审核中) => 操作(算子) => 问题(完成)

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型

Value类型

1.map:Spark01_RDD_Operator_transform

函数说明:将处理的数据逐条进行映射转换,转换可以是类型的转换,也可以是值的转换。

2.mapPartitions:Spark02_RDD_Operator_transform

函数说明:将待处理的数据以分区为单位发送到计算节点进行处理

3.mapPartitionsWithIndex:Spark03_RDD_Operator_transform

函数说明:相对于mapPartitions而言,可以获取到分区的索引

4.flatMap:Spark04_RDD_Operator_transform_flatmap

函数说明:将处理的数据进行扁平化后再进行映射处理

5.glom:Spark04_RDD_Operator_transform_glom

函数说明:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

6.groupBy:Spark05_RDD_Operator_transform_groupby

函数说明:将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,将该操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中。一个组的数据在一个分区中,但是并不是说一个分区中只有一个组7

7.filter:Spark06_RDD_Operator_transform_filter

算法说明:将数据根据指定的规则进行筛选过滤。数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

8.sample(了解,针对数据倾斜可查找原因等):Spark07_RDD_Operator_transform_sample

算法说明:根据指定的规则从数据集中抽取数据

9.distinct Spark08_RDD_Operator_transform_distinct

算法说明:将数据集中重复的数据去重

10.coalesce Spark09_RDD_Operator_transform_coalesce

算法说明:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率,当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

11.repartition Spark10_RDD_Operator_transform_repartition

算法说明:该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。

  1. sortBy Spark11_RDD_Operator_transform_sortby

算法说明:该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

双Value类型

1.intersection(交集),union(并集),subtract(差集),zip(拉链) Spark12_RDD_Operator_transform_doubleValueType

Key - Value 类型

1.partitionBy Spark13_RDD_Operator_transform_keyValue_partitionBy

算法说明:将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

2.reduceByKey Spark14_RDD_Operator_transform_keyValue_reduceByKey

算法说明:可以将数据按照相同的 Key 对 Value 进行聚合

3.groupByKey Spark15_RDD_Operator_transform_keyValue_groupByKey

算法说明:将数据源的数据根据 key 对 value 进行分组

reduceByKey 和 groupByKey 的区别?

shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

groupbykey:先分组,再聚合

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wqI1u0kl-1670771755999)(png/image-20210929233653636.png)]

reduceByKey:在落盘之前先对相同的key进行聚合,落盘数据减少,对读取数据也减少,提升性能。结果一样。combine预聚合

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-quobbNLW-1670771756000)(png/image-20210929234454175.png)]

reduceByKey分区内和分区间计算规则是相同的。

4.aggregateByKey图解,Spark16_RDD_Operator_transform_keyValue_aggregateByKey

算法说明:将数据根据不同的规则进行分区内计算和分区间计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4iDNHKw2-1670771756001)(png/image-20211007120428128.png)]

5.foldByKey Spark16_RDD_Operator_transform_keyValue_aggregateByKey

算法说明:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

6.combineByKey Spark17_RDD_Operator_transform_keyValue_combineByKey

算法说明:最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

7.key-value reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别 Spark18_RDD_Operator_transform_keyValue_ByKeyDifferent

8.sortByKey Spark19_RDD_Operator_transform_keyValue_sortByKey

算法说明:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

9.join,leftOuterJoin,rightOuterJoin Spark20_RDD_Operator_transform_keyValue_join

算法说明:连接,类似数据库的join,leftjoin,rightjoin

10.cogroup Spark21_RDD_Operator_transform_keyValue_cogroup

算法说明:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD,分组连接

测试:1) 数据准备,agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。

  1. 需求描述,统计出每一个省份每个广告被点击数量排行的 Top3

  2. 需求分析

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L0CVtWQ1-1670771756001)(png/image-20211008224951463.png)]

  3. 功能实现 Spark22_RDD_Req

行动算子 :触发作业的执行

1、collect Spark01_RDD_Operaor_Action_collect

算法说明:在驱动程序中,以数组 Array 的形式返回数据集的所有元素

2、reduce,count,first,take,takeOrdered Spark02_RDD_Operaor_Action_Reduce

算法说明:reduce聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

count:返回 RDD 中元素的个数 first:返回 RDD 中的第一个元素 take:返回一个由 RDD 的前 n 个元素组成的数组

takeOrdered:返回该 RDD 排序后的前 n 个元素组成的数组

3、aggregate,fold Spark03_RDD_Operaor_Action_aggregate

算法说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合,fold是aggregate 的简化版操作

4、countByKey Spark04_RDD_Operaor_Action_countByKey

算法说明:统计每种 key 的个数

5、save Spark05_RDD_Operaor_Action_save

算法说明:将数据保存到不同格式的文件中

6、foreach Spark06_RDD_Operaor_Action_forearch

算法说明:分布式遍历 RDD 中的每一个元素,调用指定函数,rdd.collect.foreach(print) 和 rdd.foreach(print)的区别

先采集后打印rdd.collect.foreach(print)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3arZlvTD-1670771756002)(png/image-20211015211503377.png)]

rdd.foreach(print)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VJvHAUVj-1670771756003)(png/image-20211015211621864.png)]

RDD 序列化

  1. 闭包检查

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

  1. 序列化方法和属性

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行,看如下代码:

operator\serializable\Spark01_RDD_Serial.scala

  1. Kryo 序列化框架

Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型

已经在 Spark 内部使用 Kryo 来序列化。注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。 Spark02_RDD_Kyro

RDD 血缘-依赖关系 Spark02_RDD_Dep

1) RDD 血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mC2y4mFQ-1670771756003)(png/image-20211016112019373.png)]

相邻的两个RDD的关系称之为依赖关系
val rdd1 = rdd.map(_ * 2)
新的RDD依赖于旧的RDD
多个连续的RDD的依赖关系,称之为血缘关系.
每个RDD都会保存血缘关系

RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

RDD不会保存数据的
RDD为了提供容错性,需要将RDD间的关系保存下来
一旦出现错误,可以根据血缘关系将数据源重新读取进行计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3bvdAsGK-1670771756004)(png/image-20211016112308503.png)]

RDD宽依赖-窄依赖

窄依赖:窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。分区的数据对应,没有进行shuffle打乱。即org.apache.spark.OneToOneDependency

宽依赖:宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。例如入门的数据单双数字的分区,一对多

RDD阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5uRHAAvZ-1670771756005)(png/image-20211016122438689.png)]

窄依赖:新旧之间,分区一对一,不需要等待另一个分区的数据完成以后才能进行新的操作。每个分区对应一个task,每个task先执行旧的在执行新的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wHjhc3Yf-1670771756006)(png/image-20211016122531942.png)]

宽依赖:存在阶段性,需要通过shuffle打乱重排,则需要另一个分区的数据任务执行完毕后,即阶段一统一完成以后才能执行阶段二。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mVyJHiJo-1670771756007)(png/image-20211016122733191.png)]

RDD 阶段划分源码

org\apache\spark\scheduler\DAGScheduler.scala

try {
 // New stage creation may throw an exception if, for example, jobs are run on 
a
 // HadoopRDD whose underlying HDFS files have been deleted.
 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
 case e: Exception =>
 logWarning("Creating new stage failed due to exception - job: " + jobId, e)
 listener.jobFailed(e)
 return
}
……
private def createResultStage(
 rdd: RDD[_],
 func: (TaskContext, Iterator[_]) => _,
 partitions: Array[Int],
 jobId: Int,
 callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] 
= {
getShuffleDependencies(rdd).map { shuffleDep =>
 getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
……
private[scheduler] def getShuffleDependencies(
 rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
 val toVisit = waitingForVisit.pop()
 if (!visited(toVisit)) {
 visited += toVisit
 toVisit.dependencies.foreach {
 case shuffleDep: ShuffleDependency[_, _, _] =>
 parents += shuffleDep
 case dependency =>
 waitingForVisit.push(dependency.rdd)
 }
 } }
parents
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VlmaHyqa-1670771756007)(png/image-20211016133352906.png)]

当RDD中存在shuffle依赖时,阶段会自动增加一一个
阶段的数量= shuffle依赖的数量+ 1
ResultStage只有一个,最后需要执行的阶段

RDD任务划分源码

RDD 任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

⚫ Application:初始化一个 SparkContext 即生成一个 Application;

⚫ Job:一个 Action 算子就会生成一个 Job;

⚫ Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;

⚫ Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

ShuffleMapStage => ShuffleMap Task
ResultStage => ResultTask

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。 结合代码流程理解

任务数量=当前阶段中最后一个RDD的分区数量

val tasks: Seq[Task[_]] = try {
 stage match {
 case stage: ShuffleMapStage =>
 partitionsToCompute.map { id =>
 val locs = taskIdToLocations(id)
 val part = stage.rdd.partitions(id)
 new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
 taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, 
Option(jobId),
 Option(sc.applicationId), sc.applicationAttemptId)
 }
 case stage: ResultStage =>
 partitionsToCompute.map { id =>
 val p: Int = stage.partitions(id)
 val part = stage.rdd.partitions(p)
 val locs = taskIdToLocations(id)
 new ResultTask(stage.id, stage.latestInfo.attemptId,
 taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
 Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
 }
 }
……
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
……
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
 .findMissingPartitions(shuffleDep.shuffleId)
 .getOrElse(0 until numPartitions) }

学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/86156.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PTA-基础编程题目集(函数题)

个人主页:平行线也会相交 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 平行线也会相交 原创 收录于专栏【[PTA刷题训练营]】 目录6-1 简单输出整数6-2 多项式求值(重点掌握)6-3 简单求和6-4 求自定类型元素的平均6-5 求…

医院陪诊小程序怎么开发-医院陪诊小程序源码功能

目前医院陪诊新型行业已经占据了很大的市场所在,我们去医院看病找医生挂号帮忙取药有时候去外地人生地不熟的 自己转半天摸索不过来浪费时间 而且有时候一个人需要陪同比较放心,所以呢衍生出来了 热门的陪诊师 如何开发? 开发无非就是几种 …

如何为 Longhorn 扩展对象存储能力

作者: 王海龙,Rancher 中国社区技术经理,Linux Foundation APAC Evangelist,负责 Rancher 中国技术社区的维护和运营。拥有 8 年的云计算领域经验,经历了 OpenStack 到 Kubernetes 的技术变革,无论底层操作…

软件包管理器RPM与yum

1、RPM安装软件包 安装单个rpm软件包 下载JDK8u221的rpm软件包 链接:https://pan.baidu.com/s/1fYKNNM02GBh-cOUuajkBIg 提取码:yg53 上传JDK8u221的rpm软件包到虚拟机/opt目录 命令:rpm -ivh jdk-8u221-linux-x64.rpm 查看JDK版本 命令&a…

C语言split分割字符串

C语言split分割字符串。 //以下解法的前提是,先把所有环变成1.无环路,2.一个环没有扣住3个及以上的其他环 voidmain(){ intarray[16]{0}; //init,array[1]xxx;根据输入初始化数组,如1-2,则,array[1]2,... intHash…

在线人事管理系统

开发工具(eclipse/idea/vscode等):idea 数据库(sqlite/mysql/sqlserver等):mysql 功能模块(请用文字描述,至少200字):本系统按功能分为以下几个模块: “简易云”是这个系统的名字 (1)登录页面:实…

【HMS Core】华为统一扫码服务ScanKit如何获取具体条码的类型?

1、问题描述 项目中接入了华为的统一扫码服务SDK,识别过程正常,但是目前有个需求,需要在扫码完成之后根据条码的具体类型处理接下来的业务。 问题是:识别完条形码后,如何拿到具体的条形码和二维码类型,比…

从零搭建本地pypi镜像源1:快速体验

前言: 许多公司,出于数据安全与知识产权的原因,在公司内部搭建局域网进行算法开发。配置一个本地的pypi镜像源对工程开发十分重要。搭建本地pypi镜像源的工具有多种,本文主要介绍pip2pi方法。 第一步:新建项目&#…

间接采购品类多,机械制造企业如何破局制胜优化间采管理?

受贸易政策和能源结构转型等宏观因素的叠加影响,当前机械制造业的市场环境正在迅速变化。过去几十年来,全球经济的有利形势迅速逆转,复杂的国际形势也影响了区域乃至全球贸易平衡。在国内,疫情频发、产业升级、能源转型、“双碳”…

10个提高生产力的 Linux 命令与技巧,用完直接起飞

文章目录一、前言二、使用tab键进行补全2.1 使用Tab键补全命令2.2 使用Tab键补全路径2.3 使用Tab键补全参数三、切换回上一个工作目录四、返回用户主目录五、搜索您使用过的命令六、移至行首或行尾七、快速删除八、使用 less读取文件九、格式化输出结语一、前言 在本文中&…

多数据源解决分布式事务

环境:ideaspringboot2.x 场景:调用addUser方法执行对两个数据库的表操作,如果方法出现异常就回滚 user数据库中的users表 order数据库中的order_number表 将各自的事务管理器改为统一事务管理器即可 第一步pom文件配置jta atomikos 依赖 &l…

Acwing-872. 最大公约数

d | a, a | b > d | ax by (a, b) (b, a mod b) 证明:a mod b a - [a / b] * b a - c * b 注:[ ] 为下取整符号,[a / b] 记为c 所以,(a, b) (b, a - c * b) &#xf…

FFmpeg基础到工程-多路H265监控录放开发学习笔记

多路H265监控录放开发学习笔记 课程涉及:FFmpeg,WebRTC,SRS,Nginx,Darwin,Live555,等。包括:音视频、流媒体、直播、Android、视频监控28181、等。 具体内容包括: 一、视频监控的架构和流程 二、FFmpeg4.3SDL2Qt5开发环境的搭建 三、FFmpeg的…

Chomsky文法

一、实验原理 了解0123型文法的定义并会判断各个文法,会编写并利用程序进行0123型文法的判断 二、实验目的 由于不同文法的判断归根结底是对产生式中不同终结符和非终结符个数的判断,所以在程序中先放置三个字符串集合用以存储终结符、非终结符、产生…

git clone info/refs not valid: is this a git repository问题解决

项目场景: 在我们使用gitlab克隆代码时候,发现无法克隆,遇到如下问题 $ git clone http://192.168.2.x/product/demo.git Cloning into zhlx-web-bpmn... fatal: http://192.168.2.x/product/demo.git/info/refs not valid: is this a gi…

几张图片生成3D模型?距离真正的AI建模还有多远?

时间溯回,早在2017年,美图秀秀就曾引入人工智能美化人像而被谷歌誉为“最佳娱乐App”。智能技术奔腾发展,今年的AIGC技术可谓在各行各业大放异彩,从AI绘画、AI写作到AI配音,人工智能技术自动生成内容已经成为继UGC、PG…

Go C编程 第1课 神奇的魔笔

慧通教育 慧通教育 1.画长方形(GoC测试题样例) 难度:1 登录 26.画7字(魔法学院第3课) 难度:1 登录 27.画2字(魔法学院第3课) 难度:1 登录 28.画十字(魔法学院第3课) 难度:1 登录 29.画旗帜(魔法学院第…

linux系统使用rsync做主备服务器文件同步

根据本文档设置,可以实现备机自动同步主机中的文件。 (注意,此方式缺陷为:如果主机文件修改,但是文件大小无变化或者文件变小时,无法自动同步到备机中,只有主机中文件修改后变大或者名称修改才能…

DP学生最喜欢/讨厌选学的IB课程是什么?

我们看看IBDP在读生们对于IBDP各学科的主观看法供正在选课的准IB学生们参考!(以下以第一人称方式,信息汇总于IB论坛,仅汇总部分科目,主观性强,仅供参考)DP学生最喜欢的IB课程 ● 数学 AA HL 被数…

Java基于springboot+vue+elementUI企业制度管理系统

本企业制度管理系统是针对目前企业制度管理的实际需求,从实际工作出发,对过去的企业制度管理系统存在的问题进行分析,完善用户的使用体会。采用计算机系统来管理信息,取代人工管理模式,查询便利,信息准确率…