Spark是什么
定义:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。
Spark 借鉴了 MapReduce 思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提高了运行速度、并提供丰富的操作数据的API提高了开发速度。
Spark VS Hadoop(MapReduce)
Hadoop的基于进程的计算和Spark基于线程方式优缺点?
Hadoop中的MR中每个map/reduce task都是一个java进程方式运行,好处在于进程之间是互相独立的,每个task独享进程资源,没有互相干扰,监控方便,但是问题在于task之间不方便共享数据,执行效率比较低。比如多个map task读取不同数据源文件需要将数据源加载到每个map task中,造成重复加载和浪费内存。而基于线程的方式计算是为了数据共享和提高执行效率,Spark采用了线程的最小的执行单位,但缺点是线程之间会有资源竞争。
(线程基本概念:线程是CPU的基本调度单位,一个进程一般包含多个线程, 一个进程下的多个线程共享进程的资源,不同进程之间的线程相互不可见,线程不能独立执行,一个线程可以创建和撤销另外一个线程)
Spark四大特点
1. 速度快
由于Apache Spark支持内存计算,并且通过DAG(有向无环图)执行引擎支持无环数据流,所以官方宣称其在内存中的运算速度要比Hadoop的MapReduce快100倍,在硬盘中要快10倍。
Spark处理数据与MapReduce处理数据相比,有如下两个不同点:
其一、Spark处理数据时,可以将中间处理结果数据存储到内存中;
其二、Spark 提供了非常丰富的算子(API), 可以做到复杂任务在一个Spark 程序中完成。
2. 易于使用
Spark 的版本已经更新到 Spark 3.2.0(截止日期2021.10.13),支持了包括 Java、Scala、Python、R和SQL语言在内的多种语言。为了兼容Spark2.x企业级应用场景,Spark仍然持续更新Spark2版本。
3. 通用性强
在 Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库,我们可以在一个应用中无缝地使用这些工具库。
4. 运行方式
Spark 支持多种运行方式,包括在 Hadoop 和 Mesos 上,也支持 Standalone 的独立运行模式,同时也可以运行在云Kubernetes(Spark 2.3开始支持)上。
Spark 框架模块
整个Spark 框架模块包含:Spark Core、 Spark SQL、 Spark Streaming、 Spark GraphX、 Spark MLlib,而后四项的能力都是建立在核心引擎之上。
Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、Scala、R语言的API,可以编程进行海量离线数据批处理计算。
SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。
SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。
MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。
GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。
Spark提供多种运行模式,包括:
本地模式(单机):本地模式就是以一个独立的进程,通过其内部的多个线程来模拟整个Spark运行时环境
Standalone模式(集群):Spark中的各个角色以独立进程的形式存在,并组成Spark集群环境
Hadoop YARN模式(集群):Spark中的各个角色运行在YARN的容器内部,并组成Spark集群环境
Kubernetes模式(容器集群):Spark中的各个角色运行在Kubernetes的容器内部,并组成Spark集群环境
云服务模式(运行在云平台上)
......
Spark运行角色
注:正常情况下Executor是干活的角色,不过在特殊场景下(Local模式)Driver可以即管理又干活
用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段:
1)用户程序创建 SparkContext 时,新创建的 SparkContext 实例会连接到 ClusterManager。ClusterManager 会根据用户提交时设置的 CPU 和内存等信息为本次提交分配计算资源,启动 Executor。
2)Driver会将用户程序划分为不同的执行阶段Stage,每个执行阶段Stage由一组完全相同Task组成,这些Task分别作用于待处理数据的不同分区。在阶段划分完成和Task创建后, Driver会向Executor发送 Task;
3)Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态汇报给Driver;
4)Driver会根据收到的Task的运行状态来处理不同的状态更新。 Task分为两种:一种是Shuffle Map Task,它实现数据的重新洗牌,洗牌的结果保存到 Executor 所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据;
5)Driver 会不断地调用Task,将Task发送到Executor执行,在所有的 Task 都正确执行或者超过执行次数的限制仍然没有执行成功时停止。
Spark程序运行层次结构
4040: 是一个运行的Application在运行的过程中临时绑定的端口,用以查看当前任务的状态.4040被占用会顺延到4041.4042等。4040是一个临时端口,当前程序运行完成后, 4040就会被注销
8080: 默认是StandAlone下, Master角色(进程)的WEB端口,用以查看当前Master(集群)的状态
18080: 默认是历史服务器的端口,由于每个程序运行完成后,4040端口就被注销了. 在以后想回看某个程序的运行状态就可以通过历史服务器查看,历史服务器长期稳定运行,可供随时查看被记录的程序的运行过程.
运行起来一个Spark Application, 然后打开其4040端口,并查看:/export/server/spark/bin/spark-shell --master spark://localhost:7077
在localhost运行pyspark-shell,WEB UI监控页面地址:http://localhost:4040
可以发现在一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行按照DAG图进行的。其中每个Stage中包含多个Task任务,每个Task以线程Thread方式执行,需要1 Core CPU。
Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:
Job:由多个 Task 的并行计算部分,一般 Spark 中的 action 操作(如 save、collect,后面进一步说明),会生成一个 Job。
Stage:Job 的组成单位,一个 Job 会切分成多个 Stage,Stage 彼此之间相互依赖顺序执行,而每个 Stage 是多个 Task 的集合,类似 map 和 reduce stage。
Task:被分配到各个 Executor 的单位工作内容,它是Spark 中的最小执行单位,一般来说有多少个 Paritition(物理层面的概念,即分支可以理解为将数据划分成不同部分并行处理),就会有多少个 Task,每个 Task 只会处理单一分支上的数据。
Spark on YARN
如果我们想要一个稳定的生产Spark环境, 那么最优的选择就是构建:HA StandAlone集群.不过在企业中, 服务器的资源总是紧张的, 许多企业不管做什么业务,都基本上会有Hadoop集群. 也就是会有YARN集群.
对于企业来说,在已有YARN集群的前提下在单独准备Spark StandAlone集群,对资源的利用就不高. 所以, 在企业中,多数场景下,会将Spark运行到YARN集群中.
YARN本身是一个资源调度框架, 负责对运行在内部的计算框架进行资源调度管理. 作为典型的计算框架, Spark本身也是直接运行在YARN中, 并接受YARN的调度的.
所以, 对于Spark On YARN, 无需部署Spark集群, 只要找一台服务器, 充当Spark的客户端, 即可提交任务到YARN集群中运行.
Spark On Yarn的本质?
Master角色由YARN的ResourceManager担任.
Worker角色由YARN的NodeManager担任.
Driver角色运行在YARN容器内 或 提交任务的客户端进程中
真正干活的Executor运行在YARN提供的容器内
Spark On Yarn需要啥?
1.需要Yarn集群
2.需要Spark客户端工具, 比如spark-submit, 可以将Spark程序提交到YARN中
3.需要被提交的代码程序:,如spark/examples/src/main/python/pi.py此示例程序,或自己开发的Spark任务
部署模式DeployMode
Spark On YARN是有两种运行模式的,一种是Cluster模式一种是Client模式.这两种模式的区别就是Driver运行的位置.
Cluster模式:Driver运行在YARN容器内部, 和ApplicationMaster在同一个容器内
Client模式:Driver运行在客户端进程中, 比如Driver运行在spark-submit程序的进程中
假设运行圆周率PI程序,采用client模式,命令如下:
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \ # spark on yarn
--deploy-mode client \ # client mode
--driver-memory 512m \
--executor-memory 512m \
--num-executors 1 \
--total-executor-cores 2 \
${SPARK_HOME}/examples/src/main/python/pi.py \
10
假设运行圆周率PI程序,采用cluster模式,命令如下:
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \ # spark on yarn
--deploy-mode cluster \ # cluster mode
--driver-memory 512m \
--executor-memory 512m \
--num-executors 1 \
--total-executor-cores 2 \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
${SPARK_HOME}/examples/src/main/python/pi.py \
10
日志:
Cluster模式下,Driver运行ApplicationMaster这个节点上,由Yarn管理,如果出现问题,yarn会重启ApplicationMaster(Driver)
两种模式详细流程
在YARN Client模式下,Driver在任务提交的本地机器上运行,示意图如下:
具体流程步骤如下:
1)Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster;
2)随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存;
3)ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程;
4)Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数;
5)之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行。
在YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如下:
具体流程步骤如下:
1)任务提交后会和ResourceManager通讯申请启动ApplicationMaster;
2)随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver;
3)Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后在合适的NodeManager上启动Executor进程;
4)Executor进程启动后会向Driver反向注册;
5)Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
分布式代码执行分析
Spark Application应用程序运行时,无论client还是cluster部署模式DeployMode,当Driver Program和Executors启动完成以后,就要开始执行应用程序中MAIN函数的代码,以词频统计WordCount程序为例剖析讲解。
第一、构建SparkContext对象和关闭SparkContext资源,都是在Driver Program中执行,上图中①和③都是,如下图所示:
第二、上图中②的加载数据【A】、处理数据【B】和输出数据【C】代码,都在Executors上执行,从WEB UI监控页面可以看到此Job(RDD#action触发一个Job)对应DAG图,如下所示:
所以对于WordCount代码,简单分析后得知:
SparkContext对象的构建 以及 Spark程序的退出, 由 Driver 负责执行
具体的数据处理步骤, 由Executor在执行.
其实简单来说就是:
非数据处理的部分由Driver工作
数据处理的部分(干活)由Executor工作
要知道: Executor不仅仅是一个, 视集群规模,Executor的数量可以是很多的.
那么在这里一定要有一个概念: 代码中的数据处理部分,是由非常多的服务器(Executor)执行的.
这也是分布式代码执行的概念