文章目录
Spark重要知识汇总
一、Spark 是什么
二、Spark 四大特点
三、Spark框架模块介绍
3.1、Spark Core的RDD详解
3.1.1、什么是RDD
3.1.2、RDD是怎么理解的
四、Spark 运行模式
4.1、Spark本地模式介绍
4.2、Spark集群模式 Standalone
4.3、Spark集群模式 Standalone HA
4.4、Spark集群模式 Spark on YARN介绍
4.5、Spark On Yarn两种模式总结
五、Spark应用架构基本介绍
Spark重要知识汇总
一、Spark 是什么
大规模数据处理的统一分析引擎,也可说是分布式内存迭代计算框架。
二、Spark 四大特点
- 速度快(内存计算)
- 易于使用(支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言)
- 通用性强(提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库)
- 运行方式多(包括在 Hadoop 和 Mesos 上,也支持 Standalone的独立运行模式,同时也可以运行在云Kubernetes(Spark 2.3开始支持)上)
三、Spark框架模块介绍
- Spark Core(实现了 Spark 的基本功能,包含RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。数据结构:RDD)
- Spark SQL(可以使用 SQL操作数据。数据结构:Dataset/DataFrame = RDD + Schema)
- Spark Streaming(用来操作数据流的 API。 数据结构:DStream = Seq[RDD])
- Spark MLlib(提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。 数据结构:RDD或者DataFrame)
- Spark GraphX(用于图计算的API,性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。数据结构:RDD或者DataFrame)
- Structured Streaming(结构化流处理模块,针对流式结构化数据封装到DataFrame中进行分析)
3.1、Spark Core的RDD详解
3.1.1、什么是RDD
RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spk中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
3.1.2、RDD是怎么理解的
1、概念:RDD是弹性分布式数据集(Resilient Distributed Dataset)
2、RDD有五大特性:
分区列表(A List of Partitions)
-
一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。
-
对于RDD来说,每个分片都会被一个计算任务处理,分片数决定并行度。
-
用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。
计算函数(A Function for Computing Each Partition)
-
一个函数会被作用在每一个分区。
-
Spark中RDD的计算是以分片为单位的,compute函数会被作用到每个分区上。
依赖关系(Lineage/Dependency)
-
一个RDD会依赖于其他多个RDD。
-
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算(Spark的容错机制)。
分区函数(Partitioner for Key-Value RDDs,可选)
-
可选项,对于KeyValue类型的RDD会有一个Partitioner,即RDD的分区函数。
-
当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。
-
只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。
-
Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
-
对key-value的类型RDD的默认分区HashPartitoner。
位置优先性(Location Preference,可选)
-
可选项,一个列表,存储存取每个Partition的优先位置(preferred location)。
-
对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。
-
按照"移动数据不如移动计算"的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算。(数据本地性)
WordCount中RDD
sc.textFile().flatmap().map().reduceByKey()
在内部,每个RDD都有五个主要特性:
-
分区列表:每个RDD都有会分区的概念,类似于HDFS的分块, 分区的目的:提高并行度。
-
用于计算每个分区的函数:用函数来操作各个分区中的数据。
-
对其他RDD的依赖列表:后面的RDD需要依赖前面的RDD。
-
可选地,键值RDDs的分区器。(例如,reduceByKey中的默认的Hash分区器)
-
可选地,计算每个分区的首选位置列表/最佳位置。(例如HDFS文件)
3、RDD的宽依赖和窄依赖:
窄依赖定义:
窄依赖指的是每一个父RDD的Partition(分区)最多被子RDD的一个Partition使用。这种依赖关系形象地比喻为“独生子女”,即一个父RDD的分区只对应一个子RDD的分区(或几个父RDD的分区对应一个子RDD的分区,但仍然是“一对一”的关系)。
特点:
-
高效执行:窄依赖支持在一个节点上管道化执行,例如,在filter之后可以直接执行map操作,无需移动数据。
-
快速容错:当子RDD的某个分区数据丢失时,只需要重新计算对应的父RDD分区即可,不需要重新计算整个RDD。
-
操作类型:常见的窄依赖操作包括map、filter、union(特定条件下)等。
宽依赖定义:
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition。这种依赖关系形象地比喻为“超生”,即一个父RDD的分区会被多个子RDD的分区所使用。
特点:
-
Shuffle操作:宽依赖通常涉及Shuffle操作,即数据需要在不同节点间重新分布。Shuffle操作是耗时的,因为它涉及磁盘I/O、数据序列化和网络I/O。
-
低效容错:当子RDD的某个分区数据丢失时,由于宽依赖的特性,可能需要重新计算父RDD的所有分区才能恢复数据。
-
操作类型:常见的宽依赖操作包括groupByKey、reduceByKey、sortByKey、join(输入未协同划分)等。
注意:根据父RDD有一个或多个子RDD对应,因为窄依赖可以在任务间并行,宽依赖会发生Shuffle,并不是所有的bykey算子都会产生shuffle?需要注意的是(1)分区器一致(2)分区个数一致
4、RDD血缘关系linage:
linage会记录当前RDD依赖于上一个RDD,如果一个RDD失效可以重建RDD,容错关键。
RDD血缘关系的作用
-
容错机制:RDD血缘关系的主要作用是支持Spark的容错机制。由于RDD是不可变的,并且只支持粗粒度转换(即在大量记录上执行的单个操作),因此当RDD的某个分区数据丢失时,Spark可以根据血缘关系图重新计算丢失的数据分区,而无需重新计算整个RDD。
-
优化执行计划:Spark的执行引擎可以利用RDD血缘关系来优化作业的执行计划。例如,通过识别窄依赖和宽依赖,Spark可以决定如何切分作业为不同的阶段(Stages),并在不同的节点上并行执行这些阶段。
RDD的缓存:cache和persist,cache会将数据缓存在内存中,persist可以指定多种存储级别,cache底层调用的是persist。
-
cache()方法:这是RDD缓存的简化形式,它等价于调用
persist(StorageLevel.MEMORY_ONLY)
。即,将数据以未序列化的Java对象形式存储在JVM的堆内存中。 -
persist()方法:这是一个更通用的方法,允许用户指定缓存的级别。通过传递一个
StorageLevel
对象作为参数,用户可以控制数据是存储在内存中、磁盘上,还是两者都存储,以及是否进行序列化等。
缓存级别
Spark提供了以下几种缓存级别(StorageLevel
):
-
MEMORY_ONLY:将RDD以未序列化的Java对象形式存储在JVM的堆内存中。如果内存不足,则某些分区可能不会被缓存,而是会在需要时重新计算。
-
MEMORY_AND_DISK:将RDD以未序列化的Java对象形式存储在JVM的堆内存中。如果内存不足,则将未缓存的分区存储在磁盘上。
-
MEMORY_ONLY_SER:将RDD以序列化的Java对象形式存储(每个分区为一个字节数组)。这种方式比未序列化的对象更节省空间,但读取时会增加CPU的负担。
-
MEMORY_AND_DISK_SER:类似于MEMORY_ONLY_SER,但如果内存不足,则将溢出的分区存储在磁盘上。
-
DISK_ONLY:将RDD仅存储在磁盘上。
-
OFF_HEAP:将数据存储在堆外内存中,这通常涉及额外的配置和可能的内存管理问题。
5、RDD的检查点机制:
Checkpoint会截断所有的血缘关系,而缓存会将血缘的关系全部保存在内存或磁盘中。
5.1、检查点机制的基本概念
检查点机制允许用户将RDD的中间结果持久化到可靠的文件系统(如HDFS)中,以便在出现节点故障或数据丢失时,能够快速地恢复RDD的状态,而不需要重新计算整个RDD的依赖链。这可以大大减少数据恢复的时间和计算资源的消耗。
5.2、检查点机制的工作流程
-
设置检查点目录:首先,需要使用
SparkContext
的setCheckpointDir()
方法设置一个检查点目录,该目录用于存储检查点数据。这个目录应该是可靠的文件系统,如HDFS。 -
标记RDD为检查点:然后,使用需要持久化的RDD的
checkpoint()
方法将该RDD标记为检查点。此时,并不会立即执行检查点操作,而是会在遇到第一个行动操作(如collect()
、count()
等)时触发。 -
执行检查点操作:当遇到第一个行动操作时,Spark会启动一个新的作业来计算被标记为检查点的RDD,并将其结果写入之前设置的检查点目录中。同时,Spark会移除该RDD的所有依赖关系,因为在未来需要恢复该RDD时,可以直接从检查点目录中读取数据,而不需要重新计算依赖链。
-
数据恢复:如果在后续的计算过程中出现了节点故障或数据丢失,Spark可以根据需要从检查点目录中恢复RDD的状态,从而继续执行后续的计算任务。
5.3、检查点机制的优点
-
提高容错性:通过将RDD的中间结果持久化到可靠的文件系统中,可以在出现节点故障或数据丢失时快速恢复RDD的状态,从而提高Spark作业的容错性。
-
减少计算开销:在宽依赖的RDD上设置检查点可以显著减少计算开销,因为当需要恢复宽依赖的RDD时,只需要重新计算从检查点开始的部分,而不是整个RDD的依赖链。
-
优化性能:通过将经常使用的RDD持久化到外部存储中,可以避免在多次计算中重复计算相同的RDD,从而优化Spark作业的性能。
四、Spark 运行模式
- 本地模式-Local Mode(Task运行在一个本地JVM Process进程中,通常开发测试使用)
- 集群模式-Cluster Mode(Hadoop YARN集群,Spark 自身集群Standalone及Apache Mesos集群)
- Kubernetes 云服务模式-Cloud(AWS 阿里云 腾讯云 等等云平台都提供了 EMR产品)
4.1、Spark本地模式介绍
两类角色:
一个是Master类似Namenode做管理
一个是Worker类似DataNode是干活的
Local模式就是,以一个JVM进程,去模拟整个Spark的运行环境,就是讲Master和Worker角色以线程的形式运行在这个进程中。
WEB UI监控页面,默认端口号 4040
4.2、Spark集群模式 Standalone
Standalone模式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理
两个角色:master和worker
架构:
Standalone集群使用了分布式计算中的master-slave模型,master是集群中含有Master进程的节点,slave是集群中的Worker节点含有Executor进程。
WEB UI页面 8080
SparkContext web UI 4040
Master的IP和提交任务的通信端口 7077
4.3、Spark集群模式 Standalone HA
高可用HA
文件系统的单点恢复(Single-Node Recovery with Local File System)只能用于开发或测试环境
基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)可以用于生产环境
4.4、Spark集群模式 Spark on YARN介绍
Spark运行在YARN上是有2个模式的, 1个叫 Client模式 一个叫Cluster模式
Spark HistoryServer服务WEB UI页面端口 18080
总结:
Spark On Yarn 不需要搭建Spark集群,只需要:Yarn+单机版Spark,当然还要一些配置。
4.5、Spark On Yarn两种模式总结
cluster和client模式最最本质的区别是:Driver程序运行在哪里
而Driver程序运行的位置可以通过--deploy-mode 来指定
企业实际生产环境中使用cluster
client 模式
cluster 模式
Client模式:学习测试时使用,开发不用,了解即可
-
Driver运行在Client上,和集群的通信成本高
-
Driver输出结果会在客户端显示
Cluster模式:生产环境中使用该模式
-
Driver程序在Yarn集群中,和集群的通信成本低
-
Driver输出结果不能在客户端显示
-
该模式下Driver运行ApplicattionMaster这个节点上,由Yarn管理,如果出现问题,Yarn会重启ApplicattionMaster(Driver)
五、Spark应用架构基本介绍
用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段:
-
用户程序创建 SparkContext 时,新创建的 SparkContext 实例会连接到 ClusterManager。 Cluster Manager 会根据用户提交时设置的 CPU 和内存等信息为本次提交分配计算资源,启动 Executor。
-
Driver会将用户程序划分为不同的执行阶段Stage,每个执行阶段Stage由一组完全相同Task组成,这些Task分别作用于待处理数据的不同分区。在阶段划分完成和Task创建后, Driver会向Executor发送 Task。
-
Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态汇报给Driver。
-
Driver会根据收到的Task的运行状态来处理不同的状态更新。 Task分为两种:一种是Shuffle Map Task,它实现数据的重新洗牌,洗牌的结果保存到Executor 所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据;
-
Driver 会不断地调用Task,将Task发送到Executor执行,在所有的Task 都正确执行或者超过执行次数的限制仍然没有执行成功时停止。
Job、DAG和Stage:
一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行按照DAG图进行的,其中每个Stage中包含多个Task任务,每个Task以线程Thread方式执行,需要1Core CPU。
Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:
Task:被分配到各个 Executor 的单位工作内容,它是 Spark 中的最小执行单位,一般来说有多少个 Paritition,就会有多少个 Task,每个 Task 只会处理单一分支上的数据。
Job:由多个 Task 的并行计算部分,一般 Spark 中的 action 操作(如 save、collect),会生成一个 Job。
Stage:Job 的组成单位,一个 Job 会切分成多个 Stage,Stage 彼此之间相互依赖顺序执行,而每个 Stage 是多个 Task 的集合,类似 map 和 reduce stage。
-
📢博客主页:https://lansonli.blog.csdn.net
-
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
-
📢本文由 Lansonli 原创,首发于 CSDN博客🙉
-
📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨