33-spark-核心编程-RDD:
1、RDD的创建,4中方式。分别是从内存中创建,从文件中创建,从其他RDD创建和new RDD,后两者不常用。
创建:big-data-study\Spark-demo\src\main\java\spark\core\com\zh\rdd\builder
2、RDD并行度和分区big-data-study\Spark-demo\src\main\java\spark\core\com\zh\rdd\builder\Spark01_RDD_Memor_Par
3、内存读取数据时,数据的分区Spark01_RDD_Memor_Par_v1
4、文件读取数据时,数据如何分配到对应分区Spark02_RDD_File_Par
5、RDD方法(RDD算子,分为转换算子和行动算子)
RDD方法:
1.转换:功能的补充和封装,将旧的RDD包装成新的RDD,flatMap,map一层一层的。
2.行动:触发任务的调度和作业的执行,collect
RDD方法=》RDD算子
认知心理学认为解决问题其实将问题的状态进行改变:
问题(初始) => 操作(算子) =>问题(审核中) => 操作(算子) => 问题(完成)
RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型
Value类型
1.map:Spark01_RDD_Operator_transform
函数说明:将处理的数据逐条进行映射转换,转换可以是类型的转换,也可以是值的转换。
2.mapPartitions:Spark02_RDD_Operator_transform
函数说明:将待处理的数据以分区为单位发送到计算节点进行处理
3.mapPartitionsWithIndex:Spark03_RDD_Operator_transform
函数说明:相对于mapPartitions而言,可以获取到分区的索引
4.flatMap:Spark04_RDD_Operator_transform_flatmap
函数说明:将处理的数据进行扁平化后再进行映射处理
5.glom:Spark04_RDD_Operator_transform_glom
函数说明:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
6.groupBy:Spark05_RDD_Operator_transform_groupby
函数说明:将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,将该操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中。一个组的数据在一个分区中,但是并不是说一个分区中只有一个组7
7.filter:Spark06_RDD_Operator_transform_filter
算法说明:将数据根据指定的规则进行筛选过滤。数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
8.sample(了解,针对数据倾斜可查找原因等):Spark07_RDD_Operator_transform_sample
算法说明:根据指定的规则从数据集中抽取数据
9.distinct Spark08_RDD_Operator_transform_distinct
算法说明:将数据集中重复的数据去重
10.coalesce Spark09_RDD_Operator_transform_coalesce
算法说明:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率,当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本
11.repartition Spark10_RDD_Operator_transform_repartition
算法说明:该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。
- sortBy Spark11_RDD_Operator_transform_sortby
算法说明:该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
双Value类型
1.intersection(交集),union(并集),subtract(差集),zip(拉链) Spark12_RDD_Operator_transform_doubleValueType
Key - Value 类型
1.partitionBy Spark13_RDD_Operator_transform_keyValue_partitionBy
算法说明:将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
2.reduceByKey Spark14_RDD_Operator_transform_keyValue_reduceByKey
算法说明:可以将数据按照相同的 Key 对 Value 进行聚合
3.groupByKey Spark15_RDD_Operator_transform_keyValue_groupByKey
算法说明:将数据源的数据根据 key 对 value 进行分组
reduceByKey 和 groupByKey 的区别?
从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
groupbykey:先分组,再聚合
reduceByKey:在落盘之前先对相同的key进行聚合,落盘数据减少,对读取数据也减少,提升性能。结果一样。combine预聚合
reduceByKey分区内和分区间计算规则是相同的。
4.aggregateByKey图解,Spark16_RDD_Operator_transform_keyValue_aggregateByKey
算法说明:将数据根据不同的规则进行分区内计算和分区间计算
5.foldByKey Spark16_RDD_Operator_transform_keyValue_aggregateByKey
算法说明:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
6.combineByKey Spark17_RDD_Operator_transform_keyValue_combineByKey
算法说明:最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
7.key-value reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别 Spark18_RDD_Operator_transform_keyValue_ByKeyDifferent
8.sortByKey Spark19_RDD_Operator_transform_keyValue_sortByKey
算法说明:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的
9.join,leftOuterJoin,rightOuterJoin Spark20_RDD_Operator_transform_keyValue_join
算法说明:连接,类似数据库的join,leftjoin,rightjoin
10.cogroup Spark21_RDD_Operator_transform_keyValue_cogroup
算法说明:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD,分组连接
测试:1) 数据准备,agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
-
需求描述,统计出每一个省份每个广告被点击数量排行的 Top3
-
需求分析
-
功能实现 Spark22_RDD_Req
行动算子 :触发作业的执行
1、collect Spark01_RDD_Operaor_Action_collect
算法说明:在驱动程序中,以数组 Array 的形式返回数据集的所有元素
2、reduce,count,first,take,takeOrdered Spark02_RDD_Operaor_Action_Reduce
算法说明:reduce聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
count:返回 RDD 中元素的个数 first:返回 RDD 中的第一个元素 take:返回一个由 RDD 的前 n 个元素组成的数组
takeOrdered:返回该 RDD 排序后的前 n 个元素组成的数组
3、aggregate,fold Spark03_RDD_Operaor_Action_aggregate
算法说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合,fold是aggregate 的简化版操作
4、countByKey Spark04_RDD_Operaor_Action_countByKey
算法说明:统计每种 key 的个数
5、save Spark05_RDD_Operaor_Action_save
算法说明:将数据保存到不同格式的文件中
6、foreach Spark06_RDD_Operaor_Action_forearch
算法说明:分布式遍历 RDD 中的每一个元素,调用指定函数,rdd.collect.foreach(print) 和 rdd.foreach(print)的区别
先采集后打印rdd.collect.foreach(print)
rdd.foreach(print)
RDD 序列化
- 闭包检查
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变
- 序列化方法和属性
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行,看如下代码:
operator\serializable\Spark01_RDD_Serial.scala
- Kryo 序列化框架
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型
已经在 Spark 内部使用 Kryo 来序列化。注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。 Spark02_RDD_Kyro
RDD 血缘-依赖关系 Spark02_RDD_Dep
1) RDD 血缘关系
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区
相邻的两个RDD的关系称之为依赖关系
val rdd1 = rdd.map(_ * 2)
新的RDD依赖于旧的RDD
多个连续的RDD的依赖关系,称之为血缘关系.
每个RDD都会保存血缘关系
RDD 依赖关系
这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系
RDD不会保存数据的
RDD为了提供容错性,需要将RDD间的关系保存下来
一旦出现错误,可以根据血缘关系将数据源重新读取进行计算
RDD宽依赖-窄依赖
窄依赖:窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。分区的数据对应,没有进行shuffle打乱。即org.apache.spark.OneToOneDependency
宽依赖:宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。例如入门的数据单双数字的分区,一对多
RDD阶段划分
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。
窄依赖:新旧之间,分区一对一,不需要等待另一个分区的数据完成以后才能进行新的操作。每个分区对应一个task,每个task先执行旧的在执行新的。
宽依赖:存在阶段性,需要通过shuffle打乱重排,则需要另一个分区的数据任务执行完毕后,即阶段一统一完成以后才能执行阶段二。
RDD 阶段划分源码
org\apache\spark\scheduler\DAGScheduler.scala
try {
// New stage creation may throw an exception if, for example, jobs are run on
a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
……
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage]
= {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
……
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
} }
parents
}
当RDD中存在shuffle依赖时,阶段会自动增加一一个
阶段的数量= shuffle依赖的数量+ 1
ResultStage只有一个,最后需要执行的阶段
RDD任务划分源码
RDD 任务划分
RDD 任务切分中间分为:Application、Job、Stage 和 Task
⚫ Application:初始化一个 SparkContext 即生成一个 Application;
⚫ Job:一个 Action 算子就会生成一个 Job;
⚫ Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
⚫ Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
ShuffleMapStage => ShuffleMap Task
ResultStage => ResultTask
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。 结合代码流程理解
任务数量=当前阶段中最后一个RDD的分区数量
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties,
Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
……
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
……
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions) }
学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230