文章目录
- 1、Spark 整体架构
- 1.1、Spark 集群角色
- 1.1.1、Cluster Manager
- 1.1.2、Worker Node
- 1.1.3、Executor
- 1.1.4、Application
- 1.1.5、Driver
- 1.1.6、Executor
- 2、Spark 运行基本流程
- 2.1、RDD
- 2.2、DAG
- 2.3、DAGScheduler
- 2.4、TaskScheduler
- 2.5、Job
- 2.6、Stage
- 2.7、TaskSet Task
- 2.8、Partition
- 3、Spark 作业运行流程
- 4、Spark RDD迭代过程
- 5、Spark 程序执行流程
- 5.1、yarn-client 模式
- 5.2、yarn-cluster 模式
- 6、参数设置
- 6.1 Yarn 资源
- 6.2 Yarn 参数设置
- 6.3 Spark 参数设置
- 6.4 Spark On Yarn 资源分配策略
- 7、Spark 与 MR
- 8、Spark SQL与 Hive
1、Spark 整体架构
Spark Core:包含 Spark 的基本功能;尤其是定义 RDD 的 API 、操作以及这两者上的动作。其他 Spark 的库都是构建在 RDD 和 Spark Core 之上的
Spark SQL:提供通过 Apache Hive 的 SQL 变体 Hive 查询语言(HiveQL)与 Spark 进行交互的 API 。每个数据库表被当做一个 RDD ,Spark SQL 查询被转换为 Spark 操作。
Spark Streaming:对实时数据流进行处理和控制。Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据
MLlib:一个常用机器学习算法库,算法被实现为对 RDD 的 Spark 操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。
GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX 扩展了 RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。
1.1、Spark 集群角色
Spark 的集群角色图,主要有集群管理节点Custer Manager,工作节点 Worker Node,执行器 Executor,驱动器 Driver 和应用程序 Application 五部分组成,下面详细说明每部分的特点。
1.1.1、Cluster Manager
集群管理器,它存在于Master进程中,主要用来对应用程序申请的资源进行管理,根据其部署模式的不同,可以分为local,standalone,yarn,mesos等模式。
1.1.2、Worker Node
worker是spark的工作节点,用于执行任务的提交,主要工作职责有下面四点:
- Worker节点通过注册机向 Cluster Manager 汇报自身的 CPU,内存等信息。
- Worker 节点在 Spark Master 作用下创建并启用 Executor,Executor是真正的计算单元。
- Spark Master 将任务 Task 分配给 Worker 节点上的 Executor 并执行运用。
- Worker 节点同步资源信息和 Executor 状态信息给 Cluster Manager。
1.1.3、Executor
Executor 是真正执行计算任务的组件,它是 Application 运行在 Worker 上的一个进程。这个进程负责 Task 的运行,它能够将数据保存在内存或磁盘存储中,也能够将结果数据返回给 Driver。
1.1.4、Application
Application 是 Spark API 编程的应用程序,它包括实现 Driver 功能的代码和在程序中各个 Executor 上要执行的代码,一个 Application 由多个 Job 组成。其中应用程序的入口为用户所定义的 main 方法。
1.1.5、Driver
驱动器节点,它是一个运行 Application 中 main 函数并创建 SparkContext 的进程。Application 通过 Driver 和 Cluster Manager 及 Executor 进行通讯。它可以运行在 Application 节点上,也可以由 Application 提交给 Cluster Manager,再由 Cluster Manager 安排 Worker 进行运行。
1.1.6、Executor
SparkContext 是整个 Spark 应用程序最关键的一个对象,是 Spark 所有功能的主要入口点。核心作用是初始化 Spark 应用程序所需要的组件,同时还负责向 Master 程序进行注册等。
2、Spark 运行基本流程
2.1、RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最重要的一个概念,是弹性分布式数据集,是一种容错的、可以被并行操作的元素集合,是 Spark 对所有数据处理的一种基本抽象。可以通过一系列的算子对 RDD 进行操作,主要分为 Transformation和Action 两种操作。
- Transformation(转换):是对已有的RDD进行换行生成新的RDD,对于转换过程采用惰性计算机制,不会立即计算出结果。常用的方法有map,filter,flatmap等。
- Action(执行):对已有对 RDD 对数据执行计算产生结果,并将结果返回Driver或者写入到外部存储中。常用到方法有 reduce,collect,saveAsTextFile 等。
2.2、DAG
DAG 是一个有向无环图, 反映 RDD 之间的依赖关系。主要分为 DAG Scheduler 和 Task Scheduler。
2.3、DAGScheduler
DAG Scheduler 是面向stage的高层级的调度器,DAG Scheduler 把 DAG 拆分为多个 Task,每组 Task 都是一个 Stage,解析时是以 shuffle 为边界进行反向构建的,每当遇见一个 shuffle,Spark 就会产生一个新的 Stage,接着以 TaskSet 的形式提交给底层的调度器(Task Scheduler),每个 Stage 封装成一个 TaskSet 。DAG Scheduler 需要记录 RDD 被存入磁盘物化等动作,同时会需要 Task 寻找最优等调度逻辑,以及监控因 shuffle 跨节点输出导致的失败。
2.4、TaskScheduler
Task Scheduler 负责每一个具体任务的执行。它的主要职责包括
- 任务集的调度管理
- 状态结果跟踪
- 物理资源调度管理
- 任务执行
- 获取结果
2.5、Job
一个 Job 包含多个 RDD 及作用于相应 RDD 上的各种操作,它包含很多 task 的并行计算,可以认为是 SparkRDD 里面的 action ,每个 action 的触发会生成一个 job 。用户提交的 Job 会提交给 DAGScheduler , Job 会被分解成 Stage ,Stage 会被细化成 Task,Task 简单的说就是在一个数据 partition 上的单个数据处理流程。
2.6、Stage
Spark 中 DAG 生成过程的重点是对 Stage 的划分,其划分的依据是 RDD 的依赖关系,对于不同的依赖关系,高层调度器会进行不同的处理。
- 对于窄依赖,RDD 之间的数据不需要进行 Shuffle ,多个数据处理可以在同一台机器的内存中完成,所以窄依赖在 Spark 中被划分为同一个 Stage ;
- 对于宽依赖,由于 Shuffle 的存在,必须等到父 RDD 的 Shuffle 处理完成后,才能开始接下来的计算,所以会在此处进行 Stage 的切分。
在 Spark 中,DAG 生成的流程关键在于回溯,在程序提交后,高层调度器将所有的 RDD 看成是一个 Stage ,然后对此 Stage 进行从后往前的回溯,遇到 Shuffle 就断开,遇到窄依赖,则归并到同一个 Stage 。等到所有的步骤回溯完成,便生成一个 DAG 图。
-
为什么要划分Stage? --并行计算
一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage 阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。
Pipeline:HDFS----textRDD----splitRDD-----tupleRDD
-
如何划分 DAG 的 Stage?
对于窄依赖,partition 的转换处理在 Stage 中完成计算,不划分(将窄依赖尽量放在在同一个 Stage 中,可以实现流水线计算)
对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要要划分 Stage
Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的 Stage 阶段中。
2.7、TaskSet Task
TaskSet 可以理解为一种任务,对应一个 Stage ,是 Task 组成的任务集。一个 TaskSet 中的所有 Task 没有 shuffle 依赖可以并行计算。
Task 是 Spark 中最独立的计算单元,由 Driver Manager 发送到 Executer 执行,通常情况一个 Task 处理 Spark RDD 一个 partition。Task 分为 ShuffleMapTask 和 ResultTask 两种,位于最后一个 Stage 的 Task 为 ResultTask ,其他阶段的属于 ShuffleMapTask。
2.8、Partition
Partition 类似 Hadoop 的 Split,计算是以 partition 为单位进行的,当然 partition 的划分依据有很多,这是可以自己定义的,像 HDFS 文件,划分的方式就和 MapReduce 一样,以文件的 block 来划分不同的 partition 。总而言之,Spark 的 partition 在概念上与 Hadoop 中的 Split 是相似的,提供了一种划分数据的方式。
3、Spark 作业运行流程
Spark 应用程序以进程集合为单位在分布式集群上运行,通过 Driver 程序的 main 方法创建 SparkContext 的对象与集群进行交互。具体运行流程如下:
- SparkContext 向 Cluster Manager申请 CPU,内存等计算资源。
- Cluster Manager 分配应用程序执行所需要的资源,在 Worker 节点创建 Executor。
- SparkContext 将程序代码和task任务发送到 Executor 上进行执行,代码可以是编译成的 jar 包等。接着 SparkContext 会收集结果到Driver 端。
4、Spark RDD迭代过程
- SparkContext 创建 RDD 对象,计算 RDD 间的依赖关系,并组成一个 DAG 有向无环图。
- DAGScheduler 将 DAG 划分为多个 Stage ,并将 Stage 对应的 TaskSet 提交到集群的管理中心,Stage 的划分依据是 RDD 中的宽窄依赖,Spark 遇见宽依赖就会划分为一个 Stage,每个 Stage 中包含来一个或多个 Task 任务,避免多个 Stage 之间消息传递产生的系统开销。
- TaskScheduler 通过集群管理中心为每一个 Task 申请资源并将 Task 提交到 Worker 的节点上进行执行。
- Worker 上的 Executor 执行具体的任务。
5、Spark 程序执行流程
Spark On Yarn 分为两种模式 yarn-client 模式,和 yarn—cluster 模式,一般线上采用的是 yarn-cluster 模式。
5.1、yarn-client 模式
Driver 在客户端本地执行,这种模式可以使得 Spark Application 和客户端进行交互,因为 Driver 在客户端可以通过 WebUI 访问 Driver 的状态。同时 Driver 会与 Yarn 集群中的 Executor 进行大量的通信,会造成客户机网卡流量的大量增加。
执行流程:
- 客户端提交一个 Application,在客户端启动一个 Driver 进程。
- Driver 进程会向 RS(ResourceManager) 发送请求,启动 AM(ApplicationMaster)。
- RS 收到请求,随机选择一台 NM(NodeManager) 启动 AM 。这里的 NM 相当于 Standalone 中的 Worker节点。
- AM 启动后,会向 RS 请求一批 Container 资源,用于启动 Executor。
- RS 会找到一批 NM 返回给 AM ,用于启动 Executor。AM 会向 NM 发送命令启动 Executor。
- Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。
小结:
1、Yarn-client 模式同样是适用于测试,因为 Driver 运行在本地,Driver 会与 yarn 集群中的 Executor 进行大量的通信,会造成客户机网卡流量的大量增加.
2、 ApplicationMaster 的作用:
(1)为当前的 Application 申请资源
(2)给 NodeManager 发送消息启动 Executor。
注意:ApplicationMaster 有 launchExecutor 和申请资源的功能,并没有作业调度的功能。
命令提交模式:
spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--num-executors 3 \
--executor-cores 40 \
--queue queue-1 \
/usr/lib/spark/examples/xxx.jar \
200000
5.2、yarn-cluster 模式
yarn-cluster 主要用于生产环境中,因为 Driver 运行在 Yarn 集群中某一台 NodeManager 中,每次提交任务的 Driver 所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过 Yarn 查看日志。
执行流程:
- 客户机提交 Application 应用程序,发送请求到 RS(ResourceManager) ,请求启动 AM(ApplicationMaster)。
- RS 收到请求后随机在一台 NM(NodeManager) 上启动 AM(相当于Driver端)。
- AM 启动,AM 发送请求到 RS ,请求一批 Container 用于启动 Executor。
- RS 返回一批 NM 节点给 AM 。
- AM 连接到 NM ,发送请求到 NM 启动 Executor。
- Executor 反向注册到 AM 所在的节点的 Driver。Driver 发送 task 到 Executor。
小结:
- yarn-cluster 主要用于生产环境中,因为 Driver 运行在 Yarn 集群中某一台 NodeManager 中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。
- ApplicationMaster 的作用:
(1) 为当前的 Application 申请资源
(2)给 NodeManager 发送消息启动 Excutor
(3)任务调度。(这里和 client 模式的区别是 AM 具有调度能力,因为其就是 Driver 端,包含Driver 进程)
提交命令:
./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-xx.jar 100
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-xx.jar 100
6、参数设置
6.1 Yarn 资源
Yarn 的 RM 负责管理整个集群,NM 则负责管理该工作节点。
- Yarn 的 NM 可分配 core 数(即可以分给Container的最大CPU核数)由参数
yarn.nodemanager.resource.cpu-vcores
指定,一般要小于本节点的物理 CPU 核数,因为要预留一些资源给其他任务。Hadoop 集群工作节点一般都是同构的,即配置相同。 - NM 可分配给 Container的最大内存则由参数
yarn.nodemanager.resource.memory-mb
指定,默认情况下,可分配内存会小于本机内存*0.8。
特别注意:
1、分配给作业的资源不要超过 Yarn 可分配的集群资源总数。
2、分配给单个 Container 的核数和内存不能超过阈值,即为 Executor 设置的核数和内存不能超过阈值。若分配给作业的资源超过上限,将不会启动指定数目的 Executor(也就是说,不会起足够数目的 Container)。
6.2 Yarn 参数设置
在 Yarn 中,资源管理由 ResourceManager 和 NodeManager 共同完成,其中,ResourceManager 中的调度器负责资源的分配,而NodeManager 则负责资源的供给和隔离,将CPU、内存等包装称 Container,一个 Container 代表最小计算资源
。
ResourceManager 将某个 NodeManager 上资源分配给任务(这就是所谓的“资源调度”)后,NodeManager 需按照要求为任务提供相应的资源,甚至保证这些资源应具有独占性,为任务运行提供基础的保证,这就是所谓的资源隔离。
yarn 中可以通过 yarn-site.xml 中设置如下几个参数达到管理内存的目的:
yarn.nodemanager.resource.memory-mb 默认值:8192M NM总的可用物理内存,以MB为单位。一旦设置,不可动态修改
yarn.nodemanager.resource.cpu-vcores 默认值:8 可分配的CPU个数
yarn.scheduler.minimum-allocation-mb 默认值:1024 可申请的最少内存资源,以MB为单位
yarn.scheduler.maximum-allocation-mb 默认值:8192 可申请的最大内存资源,以MB为单位
yarn.scheduler.minimum-allocation-vcores 默认值:1 可申请的最小虚拟CPU个数
yarn.scheduler.maximum-allocation-vcores 默认值:32 可申请的最 大虚拟CPU个数
yarn.nodemanager.resource.memory-mb
与 yarn.nodemanager.resource.cpu-vcores
的值不会根据系统资源自动设置,需要手动设置,如果系统内存小于8G 、cpu小于8个,最好手动设置
6.3 Spark 参数设置
Spark 执行任务是 Executor,一个 Executor 可以运行多个 Task。一个 Executor 对应一个 JVM 进程。从 Spark 的角度看,Executor占用的内存分为两部分:ExecutorMemory
和 MemoryOverhead
。
spark.driver.memory 默认值:1g; 分配给driver process的jvm堆内存大小,SparkContext将会在这里初始化,命令行中可通过 --driver-memory指定,也可通过配置文件指定一个固定值
spark.driver.cores 默认值:1; 分配给driver process的核心数量,只在cluster模式下
spark.driver.memoryOverhead 默认值:driverMemory * 0.10, with minimum of 384; 用于driver process的启停jvm内存大小
spark.executor.cores 默认值:1; 分配给executor process的核心数量,命令行中可通过 executor-cores指定
spark.executor.memory 默认值:1g; 分配给每个executor的程序的内存大小,命令行中可通过 --executor-memory指定
spark.executor.memoryOverhead 默认值:executorMemory * 0.10, with minimum of 384; jvm非堆内存的开销,一般占max(executorMemory *10%,384M)大小
6.4 Spark On Yarn 资源分配策略
当 Yarn 上运行 Spark 作业,每个 Spark Executor 作为一个容器运行。Spark可以使得多个Tasks在同一个容器里面运行。一个 Saprk Executor 运行在一个 Yarn Container 里,即 Executor 和 Container 是一对一的关系
。
- NodeManager 管理资源
NodeManager 管理资源 = 总的资源 - 系统需求资源 - Hbase、HDFS 等需求资源
- 划分内存资源
JVM 资源 = executor.memory(JVM堆资源)+ executor.memoryOverhead(JVM非堆需要资源),也就是一个 Executor 所需内存资源=--executor-memory + max(executorMemory *10%,384M)
。同时这个值需要通过 Yarn 申请,必须落在 minimum-allocation-mb 与 maximum-allocation-mb 之间
- 划分CPU资源
通过 executor.cores 指定 executor 可拥有的 CPU 个数,也就是 task 可并行运行的个数,一般小于5
计算 executor 个数。设置 num-executors
对于client模式 : NodeManager管理资源 >= executor 个数 * 单个executor 资源(内存 + cpu)
对于cluster模式:NodeManager管理资源 >= executor 个数 * 单个executor 资源(内存 + cpu)+ driver 资源(内存 + cpu)
7、Spark 与 MR
MapReduce 能够完成的各种离线批处理功能,以及常见算法(比如二次排序、topn 等),基于 Spark RDD 的核心编程,都可以实现,并且可以更好地、更容易地实现。而且基于 Spark RDD 编写的离线批处理程序,运行速度是 MapReduce 的数倍,速度上有非常明显的优势。
Spark 相较于 MapReduce 速度快的最主要原因就在于,MapReduce 的计算模型太死板,必须是 map-reduce 模式,有时候即使完成一些诸如过滤之类的操作,也必须经过 map-reduce 过程,这样就必须经过 shuffle 过程。而 MapReduce 的 shuffle 过程是最消耗性能的,因为 shuffle 中间的过程必须基于磁盘来读写。而 Spark 的 shuffle 虽然也要基于磁盘,但是其大量 transformation 操作,比如单纯的 map 或者 filter 等操作,可以直接基于内存进行 pipeline 操作,速度性能自然大大提升。
但是 Spark 也有其劣势。由于 Spark 基于内存进行计算,虽然开发容易,但是真正面对大数据的时候(比如一次操作针对10亿以上级别),在没有进行调优的情况下,可能会出现各种各样的问题,比如 OOM 内存溢出等等。导致 Spark 程序可能都无法完全运行起来,就报错挂掉了,而 MapReduce 即使是运行缓慢,但是至少可以慢慢运行完。
此外,Spark 由于是新崛起的技术新秀,因此在大数据领域的完善程度,肯定不如 MapReduce ,比如基于 HBase、Hive 作为离线批处理程序的输入输出,Spark 就远没有 MapReduce 来的完善。实现起来非常麻烦。
8、Spark SQL与 Hive
Spark SQL 实际上并不能完全替代 Hive ,因为 Hive 是一种基于 HDFS 的数据仓库,并且提供了基于 SQL 模型的,针对存储了大数据的数据仓库,进行分布式交互查询的查询引擎。
严格的来说, Spark SQL 能够替代的,是 Hive 的查询引擎,而不是 Hive 本身,实际上即使在生产环境下, SparkSQL 也是针对 Hive 数据仓库中的数据进行查询, Spark 本身自己是不提供存储的,自然也不可能替代 Hive 作为数据仓库的这个功能。
Spark SQL 的一个优点,相较于 Hive 查询引擎来说,就是速度快,同样的 SQL 语句,可能使用 Hive 的查询引擎,由于其底层基于 MapReduce,必须经过 shuffle 过程走磁盘,因此速度是非常缓慢的。很多复杂的 SQL 语句,在 Hive 中执行都需要一个小时以上的时间。而 Spark SQL 由于其底层基于自身的基于内存的特点,因此速度达到了 Hive 查询引擎的数倍以上。
而 Spark SQL 相较于 Hive 的另外一个优点,就是支持大量不同的数据源,包括 hive、json、 parquet、jdbc 等等此外, Spark SQL 由于身处技术堆栈内,也是基于 RDD 来工作,因此可以与 Spark 的其他组件无缝整合使用,配合起来实现许多复杂的功能。比如 Spark SQL 支持可以直接针对 HDFS 文件执行 sql 语句。