📋 博主简介
- 💖 作者简介:大家好,我是wux_labs。😜
热衷于各种主流技术,热爱数据科学、机器学习、云计算、人工智能。
通过了TiDB数据库专员(PCTA)、TiDB数据库专家(PCTP)、TiDB数据库认证SQL开发专家(PCSD)认证。
通过了微软Azure开发人员、Azure数据工程师、Azure解决方案架构师专家认证。
对大数据技术栈Hadoop、Hive、Spark、Kafka等有深入研究,对Databricks的使用有丰富的经验。- 📝 个人主页:wux_labs,如果您对我还算满意,请关注一下吧~🔥
- 📝 个人社区:数据科学社区,如果您是数据科学爱好者,一起来交流吧~🔥
- 🎉 请支持我:欢迎大家 点赞👍+收藏⭐️+吐槽📝,您的支持是我持续创作的动力~🔥
《PySpark大数据分析实战》-07.Spark本地模式安装
- 《PySpark大数据分析实战》-07.Spark本地模式安装
- 前言
- Spark本地模式安装
- 使用交互式pyspark运行代码
- 使用spark-submit提交代码
- 结束语
《PySpark大数据分析实战》-07.Spark本地模式安装
前言
大家好!今天为大家分享的是《PySpark大数据分析实战》第2章第2节的内容:Spark本地模式安装。
Spark本地模式安装
Spark本地模式即单机模式,是以一个独立的进程,通过其内部的多个线程来模拟整个Spark运行时环境,本地模式只需要在1台服务器上安装Spark。本地模式的安装非常简单,将下载的Spark软件安装包解压到目标位置即安装完成,解压安装包的命令如下:
$ tar -xzf spark-3.4.0-bin-hadoop3.tgz -C apps
解压后的Spark目录结构如图所示。
- bin目录存放的是提交Spark应用程序需要用到的命令,例如pyspark、spark-submit等命令。
- conf目录存放的是Spark的配置文件,这里可以配置Spark的部署模式,例如独立集群信息、YARN信息。
- jars目录存放的是Spark的依赖软件包,Spark各个组件的核心代码都存放在这里,与第三方框架集成,例如MySQL、Kafka等,用到的依赖包也需要添加到jars目录下。
- sbin目录下存放的是Spark集群管理相关的可执行命令,例如启动、停止集群的相关命令。
使用交互式pyspark运行代码
解压安装完成后,验证安装结果,在没有配置相关的环境变量时,pyspark不能直接在任意路径执行,将工作目录切换到Spark的安装目录,在此执行相关命令。命令如下:
$ cd apps/spark-3.4.0-bin-hadoop3/
$ bin/pyspark
pyspark命令执行后,会进入交互式解释器环境,如图所示。
从交互式解释器环境可以知道:
- Spark的版本是version 3.4.0。
- Python的版本是version 3.8.10。
- Spark Driver Web UI的地址是http://node1.internal.cloudapp.net:4040。
- 环境实例化了一个SparkContext对象,名为sc。
- 当前环境的master是local[*]。
- 环境实例化了一个SparkSession对象,名为spark。
通过浏览器访问Spark Driver Web UI地址,打开的Spark Driver Web UI界面如图所示。
在Spark Driver Web UI中:
- Jobs页面可以查看根据Spark应用程序创建的Job信息,当前没有任何Job在运行。
- Stages页面可以查看每个任务的Stage的划分。
- Storage页面可以查看Spark应用程序缓存在内存或磁盘中的数据的详细信息,包括缓存级别、大小、分区数等信息。
- Environment页面可以查看Spark的环境信息。
- Executors页面可以查看Spark环境中Executors的列表信息。
本地模式环境下,只有一个driver,不含其他Executor,如图所示。
下面通过pyspark交互式命令行提交Spark代码来执行,以经典的WordCount程序来验证Spark环境。将文件words.txt放到服务器上,目前还没有部署HDFS,如果有额外的HDFS也可以上传到HDFS。文件内容如下:
Hello Python
Hello Spark You
Hello Python Spark
You know PySpark
编写WordCount的程序代码,实现文件的读取,按单词统计单词在文件中出现的次数。代码如下:
count = sc.textFile("/home/hadoop/words.txt") \
.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a,b: a + b).collect()
print(count)
代码运行完成,统计出words.txt文件中的单词出现的次数,统计结果为3个Hello、2个Python、2个Spark、1个know、1个PySpark和2个You,如图所示。
代码执行完成, Spark Driver Web UI中的数据会发生变化。提交的Spark应用程序的Job列表中,当前运行完成的Job有1个,如图所示。
点击Job列表中的链接,可以看到Job的详细信息,包括Job的执行流程DAG图、Stage的划分、Stage列表等,当前Job被划分成两个Stage,如图所示。
点击Stage列表中的链接,可以看到Stage的详细信息,包括Stage概览信息、执行流程DAG图、Task列表等,如图所示。
在页面底部可以看到Stage中Task的划分情况,当前Stage包含两个Task,如图所示。
使用spark-submit提交代码
交互式pyspark命令行并不适合用于生产环境提交代码执行,在生产环境上运行Spark应用程序,需要将代码开发写入Python文件,将文件保存到系统中的某个路径下,比如/home/hadoop/WordCount.py。WordCount.py的代码如下:
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("WordCount")
# 通过SparkConf对象构建SparkContext对象
sc = SparkContext(conf=conf)
# 通过SparkContext对象读取文件
fileRdd = sc.textFile("/home/hadoop/words.txt")
# 将文件中的每一行按照空格拆分成单词
wordsRdd = fileRdd.flatMap(lambda line: line.split(" "))
# 将每一个单词转换为元组,
wordRdd = wordsRdd.map(lambda x: (x, 1))
# 根据元组的key分组,将value相加
resultRdd = wordRdd.reduceByKey(lambda a, b: a + b)
# 将结果收集到Driver并打印输出
print(resultRdd.collect())
使用spark-submit命令进行提交运行。spark-submit命令如下:
spark-submit [options] <app jar | python file | R file> [app arguments]
将其中的参数替换为具体的值,在不设置任何选项或者参数的情况下,则只需要指定python file。具体执行命令如下:
$ bin/spark-submit /home/hadoop/WordCount.py
执行命令后,Spark启动相关的进程,进行数据文件处理,输出处理过程中的日志,日志内容如下:
INFO SparkContext: Running Spark version 3.4.0
...
INFO SparkContext: Submitted application: WordCount
...
INFO Utils: Successfully started service 'SparkUI' on port 4040.
INFO Executor: Starting executor ID driver on host node1.internal.cloudapp.net
...
INFO SparkContext: Starting job: collect at /home/hadoop/WordCount.py:16
INFO DAGScheduler: Registering RDD 3 (reduceByKey at /home/hadoop/WordCount.py:14) as input to shuffle 0
INFO DAGScheduler: Got job 0 (collect at /home/hadoop/WordCount.py:16) with 2 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (collect at /home/hadoop/WordCount.py:16)
...
INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
INFO HadoopRDD: Input split: file:/home/hadoop/words.txt:32+33
INFO HadoopRDD: Input split: file:/home/hadoop/words.txt:0+32
...
INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1678 bytes result sent to driver
INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1678 bytes result sent to driver
...
INFO DAGScheduler: looking for newly runnable stages
...
INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
...
INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1732 bytes result sent to driver
INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1628 bytes result sent to driver
...
INFO DAGScheduler: Job 0 finished: collect at /home/hadoop/WordCount.py:16, took 2.965328 s
[('Hello', 3), ('Python', 2), ('Spark', 2), ('know', 1), ('PySpark', 1), ('You', 2)]
INFO SparkContext: Invoking stop() from shutdown hook
INFO SparkUI: Stopped Spark web UI at http://node1.internal.cloudapp.net:4040
...
INFO SparkContext: Successfully stopped SparkContext
...
通过输出的日志可以了解到Spark应用程序执行情况:
- 应用名称是WordCount。
- Spark Driver Web UI端口是4040。
- collect算子触发job的创建执行,首先会构建DAG图及划分Stage。
- Stage按顺序执行,Stage 0中实现了将文件拆分成两个分区。
- collect输出最终的执行结果。
- 应用程序执行完成后会停止Spark Driver Web UI和SparkContext。
由于Spark Driver Web UI已经停止,所以无法通过浏览器打开查看应用程序的执行情况。
结束语
好了,感谢大家的关注,今天就分享到这里了,更多详细内容,请阅读原书或持续关注专栏。