😄 之前简单了解过Spark,并简单用别人的代码跑过pyspark的数据处理和模型的分布式推理,但没做系统的总结,那这篇博客就对Spark做个基础入门讲解,看完基本就算基础入门了,后面再实操就会轻松一些。
文章目录
- 1、Spark简单介绍
- 2、搭建 PySpark
- 2.1、java8安装:
- 2.2、pyspark安装
- 3、MapReduce 原理
- 3.1、Map阶段:
- 3.2、Reduce阶段:
- 3.3、⭐细节+重点:
- 4、Spark 原理
- 4.1、Spark 优势
- 4.2、Spark 基本概念
- 4.3、RDD(Spark的基本数据结构)
- 4.4、Spark 架构
- 4.5、Spark 执行流程
- 5、Spark 部署
- 6、PySpark简单实战(词频统计)
1、Spark简单介绍
- Spark是一个快速、通用的大数据处理引擎,可以进行分布式数据处理和分析。与Hadoop的MapReduce相比,Spark具有更高的性能和更丰富的功能。Spark支持多种编程语言(如Scala、Java和Python(pyspark)),并提供了一组丰富的API,包括用于数据处理、机器学习和图计算的库。
- 据我了解,大部分公司,都会对于数据的预处理+模型的推理,都会用pyspark来做分布式处理,如模型的分布式推理(tensorflow和torch只支持分布式训练,不支持分布式预测)。
- 一般用的最多的是spark-scala(追求高性能,强于工程)和pyspark(AI算法场景,方便和其他python库配合,强于分析)。此外spark-scala支持spark graphx图计算模块,而pyspark是不支持的。
大佬说了spark-scala比较难(要学scala语言,配环境听说也很痛苦😂),但能解锁spark的所有技能;pyspark较简单,方便新手入门。所以我打算先学pyspark,再学spark-scala。当然了,学习pyspark的前提是:会python,会一丢丢SQL。
2、搭建 PySpark
单纯为了学习,配个单机版环境玩玩(无需安装hadoop, scala)
2.1、java8安装:
- jdk-1.8下载:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
- 安装教程:https://www.runoob.com/java/java-environment-setup.html
2.2、pyspark安装
pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install findspark
安装完,测试一波,输出成功即为安装成功。
import findspark
# 作用就是初始化自动找到本机安装的spark和当前运行的python环境(也可以传参指定)
findspark.init()
#%%
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)
#%%
print("spark version:",pyspark.__version__)
rdd = sc.parallelize(["I love you, ","spark"])
print(rdd.reduce(lambda x,y:x+' '+y))
# spark version: 3.4.0
# I love you, spark
以上为在python中通过调用pyspark库的方式和spark交互。工业界通常通过spark-submit提交spark作业的方式到集群上运行,如提交py脚本或是jar包到集群让成百上千台机器分布式跑。
3、MapReduce 原理
在学习spark之前,我简单学习了下MapReduce,因为spark作为大数据计算框架MapReduce的继任者,我觉得有必要了解下MapReduce。
- Hadoop是一个开源的分布式计算框架,用于存储和处理大规模数据集。它提供了一个可扩展的分布式文件系统(HDFS)和一个分布式计算框架(MapReduce),可以在大量廉价硬件上进行并行计算。
MapReduce也是分布式运行的,由两个阶段组成: Map和Reduce。map阶段是一个独立的程序,相当于将任务分给多个节点同时运行,每个节点做局部汇总。而reduce阶段也是一个独立的程序,也可在多个节点运行,负责对map阶段处理的局部结果进行最终的汇总。【😄类似分治的思想】
MapReduce其实就是分治(map)+规约(reduce)。下面以词频统计为例,阐述map和reduce阶段究竟在干嘛:
假设我们有一个大文件,这个文件以刚好以n个block的形式存储在hadoop集群的n个节点上。block块是文件的物理切分,在磁盘上是真实存在的。split是逻辑划分,不是对文件真正的切分,默认情况下我们可以认为一个split的大小和一个block的大 小是一样的。
3.1、Map阶段:
- 1、map阶段会把会把输入文件划分为很多InputSplit。计算程序会被分发到每一个InputSplit所在节点上进行计算(一个InputSplit对应一个map任务),各自有一个map任务。默认情况下,每个hdfs的block对应一个InputSplit。然后通过RecordReader 类,把每个InputSplit解析成一个一个的<k,v>,k代表着该数据在文件中的位置信息,v代表着数据的内容。
- 2、调用Mapper类中的map(k,v)函数,这个map函数根据自己要实现的功能来写,输出是新的<k,v>(对应上图中map的输出就是<单词,1次>)。
- 3、shuffle过程:按照新的key进行排序,然后分组,相同key的分到一组即<k, {v1, v2…}>。中间结果写入磁盘中。至此,map阶段结束。
3.2、Reduce阶段:
- 1、数据copy到多个reduce节点,调用Reducer类中的reduce(k, {v1, v2…})函数,该函数根据要实现的功能来写,输出为新的<k,v>(对应上图中<单词,频次>)。
- 2、把reduce阶段的计算结果输出存储到HDFS中。至此,reduce阶段结束。
3.3、⭐细节+重点:
-
1、map阶段可以单独使用,当不需要一些聚合操作(reduce)时,便可map阶段结束后直接输出。
-
2、reduce阶段不能脱离map阶段,必须要通过map阶段来定义哪个是key和哪个是value,以此作为reduce的输入。reduce本质上指的是reduce by key,它是将有着相同的key的数据进行合并,在map到reduce的中间过程中,会将map的结果根据这个key进行排序(sort)和分组(combine)【专业点:也就是shuffle过程】。当然了,这个过程计算框架会自动完成。
-
🔥 但是!!!重点来了!spark的MapReduce是可以不要map只要reduce的。因为spark并不需要定义数据的key是啥value是啥。所以spark还是非常灵活的。
4、Spark 原理
与Hadoop的MapReduce相比,Spark具有更高的性能和更丰富的功能。
4.1、Spark 优势
1、高效。
- Hadoop的MapReduce将中间计算结果放入磁盘中,适合处理离线的静态的大数据。
- Spark中间结果存放优先存放在内存中,内存不够再存放在磁盘中,不放入HDFS,避免了大量的IO和刷写读取操作。并通过并行计算DAG图的优化,减少了不同任务之间的依赖,降低了延迟等待时间。内存计算下,Spark 比 MapReduce 快100倍。
2、简单易用。
- MapReduce仅支持Map和Reduce两种编程算子。task以进程的方式维护,启动慢。
- Spark提供了超过80种不同的Transformation算子+Action算子,如map,reduce,filter,groupByKey,sortByKey,foreach等,并且采用函数式编程风格,实现相同的功能需要的代码量极大缩小。task以线程的方式维护,启动快。
3、通用。
Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
4、兼容。
Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。
4.2、Spark 基本概念
Spark应用程序即Application由多个Job组成,Job由多个Stage组成,Stage由多个Task组成。Stage是作业调度的基本单位。
-
RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称(Spark的基本数据结构),是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。它是被分区的,一个RDD可存在多个分区中,每个分区分布在集群中的不同Worker Node上,从而让RDD中的数据可以被并行操作。
-
DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系。
-
Cluster Manager:集群资源管理中心,负责分配计算资源。
-
Worker Node:工作节点,负责完成具体计算。
-
Executor:是运行在工作节点(Worker Node)上的一个进程,负责运行Task,并为应用程序存储数据。
-
Application:用户编写的Spark应用程序,一个Application包含多个Job。
-
Driver Program:控制程序,负责为Application构建DAG图。
-
Stage:阶段,是作业的基本调度单位,一个job会分多个stage。
-
Task:任务,运行在Executor上的工作单元,是Executor中的一个线程。
4.3、RDD(Spark的基本数据结构)
上面介绍了RDD的概念,总结:弹性分布式数据集。不可变、可分区、里面的元素可并行计算的集合。
有2钟方式创建RDD
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)
# 法1:通过读取文件中的数据生成RDD
rdd = sc.textFile("hdfs://hans/data_warehouse/test/data")
# 法2:通过将内存中的对象并行化得到RDD
arr = [1,2,3,4,5]
rdd = sc.parallelize(arr)
创建了RDD后,便可对RDD做各种操作(2种):
- 1、transformation:从已经存在的RDD创建一个新的RDD。transformation具有 Lazy 特性,即 Spark 不会立刻进行实际的计算,只会记录执行的轨迹,只有触发Action操作的时候,它才会根据 DAG 图真正执行。
- 2、action:在RDD上进行计算后返回结果到 Driver。
这样一来,各种操作就确定了RDD之间的依赖关系了(窄依赖和宽依赖)。而依赖关系也就确定了DAG图,而DAG图也就指定了切割stage的方式。
- 窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖。如map操作。
- 宽依赖:父RDD的一个分区会被子RDD的多个分区依赖,涉及Shuffle。如reducebykey操作。
DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就切割stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。
4.4、Spark 架构
- Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。
- SparkContext是spark功能的主要入口。其代表与spark集群的连接,能够用来在集群上创建RDD、累加器、广播变量。每个JVM里只能存在一个处于激活状态的SparkContext,在创建新的SparkContext之前必须调用stop()来关闭之前的SparkContext。
- 每一个Spark应用都是一个SparkContext实例,可以理解为一个SparkContext就是一个spark application的生命周期,一旦SparkContext创建之后,就可以用这个SparkContext来创建RDD、累加器、广播变量,并且可以通过SparkContext访问Spark的服务,运行任务。SparkContext设置内部服务,并建立与spark执行环境的连接。
- pyspark只是在spark的外围包装了一个python api方便调用。在Driver端,借助Py4j实现Python和Java的交互,也就可以用python写spark应用程序了。在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码。
4.5、Spark 执行流程
-
1、Application首先被Driver构建DAG图并分解成Stage。
-
2、然后Driver向Cluster Manager申请资源。
-
3、Cluster Manager向某些Work Node发送征召信号。
-
4、被征召的Work Node启动Executor进程响应征召,并向Driver申请任务。
-
5、Driver分配Task给Work Node。
-
6、Executor以Stage为单位执行Task,期间Driver进行监控。
-
7、Driver收到Executor任务完成的信号后向Cluster Manager发送注销信号。
-
8、Cluster Manager向Work Node发送释放资源信号。
-
9、Work Node对应Executor停止运行。
5、Spark 部署
6、PySpark简单实战(词频统计)
读取本地文件(当然也可hdfs)转成RDD,然后就可以进行各种操作了。
words.txt内容:
hello world
hello spark
spark love jupyter
spark love pandas
spark love sql
import findspark
# 作用就是初始化自动找到本机安装的spark和当前运行的python环境(也可以传参指定)
findspark.init()
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)
#%%
rdd_line = sc.textFile("./words.txt")
rdd_word = rdd_line.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda t:(t,1))
rdd_count = rdd_one.reduceByKey(lambda x,y:x+y) # 分到同个组的词频相加。
rdd_count.collect()
输出:
[('world', 1),
('love', 3),
('jupyter', 1),
('pandas', 1),
('hello', 2),
('spark', 4),
('sql', 1)]
Reference
- [1] 梁云大佬的: https://github.com/lyhue1991/eat_pyspark_in_10_days。也欢迎大家给梁云大佬点点关注,他的微信公众号叫:“算法美食屋”