目录
一、分布式的思想
(一)存储
(二)计算
二、Spark 简介
(一)发展历程
(二)Spark 能做什么?
(三)spark 的组成部分
(四)各大计算引擎的对比
(五)spark 的应用
(六)spark 运行的五种模式【重点】
(七)Spark 分布式计算的步骤
(八)spark 比 MR 快的原因
三、Spark 单机模式以及测试
(一)安装步骤
(二)安装python:
(三)测试使用
启动该工具
案例一:map 算子实现分布式转换
案例二:WordCount 需求及分析
案例三:求 PI
四、结语
在大数据领域,Spark 是一款极为重要的计算引擎。本文将详细介绍 Spark 的相关知识,包括其分布式思想、自身简介、各组成部分、与其他计算引擎的对比、应用场景、运行模式、计算步骤以及它比 MapReduce 快的原因,并会涉及到 Spark 的单机模式安装和一些简单测试案例。
一、分布式的思想
(一)存储
在大数据存储中,没有一台能容纳所有数据的超级计算机。以存储 3T 文件为例,会将其拆分成多个小文件,比如每 500M 一个小文件,然后将这些小文件存储在不同的机器上,HDFS 就是这样的分布式存储系统,阿里巴巴的 FastDFS 也有类似功能。
(二)计算
- 分:对于大型计算任务,将其拆分成多个小任务,每台机器处理一个小任务,实现并行处理。
- 合:最后将各个小任务的结果合并,生成最终结果返回。像 MapReduce(Hive)就是这种分布式计算模式。需要注意的是,Spark 主要用于计算,不能解决存储问题。如果掌握了 Spark,之前的一些计算引擎可以被替代,Spark 可用于离线计算和准实时计算(目前实时计算更常用 Flink)。计算引擎有很多种,如 mapreduce (hive SQL)、impala、presto、spark、flink 等,而且存储通常还是依靠 HDFS。
- 发展历程:第一代计算引擎 MapReduce 用廉价机器实现分布式大数据处理;第二代计算引擎 Tez 基于 MR 优化了 DAG,性能更好;第三代计算引擎 Spark 是优先使用内存式计算引擎,在国内是主要应用的离线计算引擎;第四代计算引擎 Flink 是实时流式计算引擎,是国内目前最主流的实时计算引擎,计算引擎就如同汽车的发动机或计算机中的 CPU。
二、Spark 简介
(一)发展历程
Spark 诞生于 2009 年,是伯克利大学 AMPLab 的研究性项目。2014 年 2 月成为 Apache 顶级项目,同年 5 月发布 1.0 正式版本,2018 年发布的 2.4.0 版本使其成为全球最大的开源项目之一,目前我们学习的是 3.x 版本。可以在 DataBricks 官网(https://databricks.com/spark/about)和 apache 分配的网站(https://spark.apache.org/)获取更多信息。Spark 的诞生是因为 MR 计算引擎速度慢,MR 计算基于磁盘,而 Spark 计算基于内存,他是一个基于内存式计算的分布式的统一化的数据分析引擎。
(二)Spark 能做什么?
- 离线数据批处理:类似 MapReduce、Pandas,通过写代码处理离线数据。
- 交互式即时数据查询:类似 Hive、Presto、Impala,使用 SQL 进行即席查询分析,这属于 SQL 类的离线数据处理。
- 实时数据处理:类似 Storm、Flink,可实现分布式实时计算,包括代码类实时计算和 SQL 类实时计算。
- 机器学习开发:能代替传统的一些机器学习工具。
(三)spark 的组成部分
- Spark Core:是 Spark 最核心的模块,可以基于多种语言(Python、SQL、Scala、Java、R,Spark 源码是用 Scala 语言开发的)实现代码类的离线开发,类似 MR。
- Spark SQL:类似 Hive,基于 SQL 进行开发,SQL 语句会转换为 SparkCore 离线程序。
- Spark Streaming:基于 SparkCore 构建的准实时计算模块(目前已逐渐被淘汰)。
- Struct Streaming:基于 SparkSQL 构建的结构化实时计算模块,替代了 Spark Streaming。
- Spark ML lib:机器学习算法库,提供各种机器学习算法工具,可基于 SparkCore 或 SparkSQL 开发。
(四)各大计算引擎的对比
- Impala:集成 Hive 实现数据分析,性能最好,但数据接口少,只支持 Hive 和 Hbase 数据源。它是基于 CDH 的软件,写的 SQL(Impala SQL)大部分和普通 SQL 无异,操作 hive 或者 hbase 速度很快。
- Presto:集成 Hive 实现数据分析,性能适中,支持数据源广泛,但与大数据接口兼容性较差。Presto SQL 也大部分和普通 SQL 一样,可跨数据源查询,比如能关联查询 mysql 和 oracle 的表。
- SparkSQL:集成 Hive 实现数据分析,功能全面、开发接口多、学习成本低,但实时计算不够完善,实时计算通常交给 Flink。简单来说,数据量较小时,Impala 和 Presto 可以进行大数据分析,但数据量增大后,SparkSQL 更具优势。并且建议不要在项目中编写纯 hive 项目。
(五)spark 的应用
- 离线场景:可用于离线数据仓库中的数据清洗、数据分析、即席查询等。
- 实时场景:可实现实时数据流数据处理,但功能和性能相对不够完善,工作中建议使用 Flink 替代。即席查询类似于即兴发挥,和普通查询的 SQL 语句类似,只是普通查询语句是固定的,而即席查询是用户随机查询的。
(六)spark 运行的五种模式【重点】
- 本地模式(Local):一般用于测试,验证代码逻辑,不是分布式运行,只会启动 1 个进程来运行所有任务。
- 集群模式(Cluster):一般用于生产环境,实现 PySpark 程序的分布式运行。
- 1)Standalone:Standalone 是 Spark 自带的分布式资源平台,功能类似 YARN;
- 2)YARN:YARN(Spark on YARN)是将 Spark 程序提交给 YARN 运行,工作中常用;
- 3)Mesos:Mesos 类似 YARN,在国外较多见,国内少见;
- 4)K8s:K8s 是基于分布式容器的资源管理平台,属于运维层面的工具。
Spark 是分布式分析引擎,部署时是分布式的,有主节点和从节点等。如果公司已有 Yarn 分析平台,就没必要再搭建 spark 分析平台。学习过程可按照本地模式 -> Standalone -> YARN 的顺序,以后 Spark 可在 YARN 上运行。
(七)Spark 分布式计算的步骤
Spark 的计算涉及内存(存储数据,读写速度快,但会清空)和磁盘(存储数据,读写速度慢,但不会清空),还有 RDD(弹性分布式数据集)。以计算一个 list 列表中每个数的平方为例,在 Python 中是串行过程,但使用 Spark 可以利用多台电脑同时计算。
(八)spark 比 MR 快的原因
- MR 的不足:MR 不支持 DAG,计算过程固定,一个 MR 只有 1 个 Map 和 1 个 Reduce,前后过程在磁盘落地后无关联。而且 MR 是基于磁盘的计算框架,读写效率低,Task 计算是进程级别,每次运行一个 Task 都要启动和销毁进程,耗时较长。
- Spark 的优势:Spark 支持 DAG,程序中的过程由代码决定。Task 任务是线程级别的,并且计算基于内存。
三、Spark 单机模式以及测试
(一)安装步骤
通过网盘分享的:spark-3.1.2-bin-hadoop3.2.tgz
- 上传:将相关文件上传到服务器。
cd /opt/modules
- 解压:对上传的文件进行解压操作。
tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /opt/installs
- 重命名:根据需要重命名解压后的文件或文件夹。
cd /opt/installs mv spark-3.1.2-bin-hadoop3.2 spark-local
- 创建软链接:方便文件管理和访问。
ln -s spark-local spark
- 配置环境变量:
# 编辑环境变量 vi /etc/profile # 添加以下内容 export SPARK_HOME=/opt/installs/spark export PATH=$SPARK_HOME/bin:$PATH
(二)安装python:
通过 Anaconda 安装 Python,Anaconda 不仅有 Python 还有其他功能,比单纯安装 Python 更强大,且具有资源环境隔离功能,方便不同版本和环境的测试开发。
通过网盘分享的:Anaconda3-2021.05-Linux-x86_64.sh
1)上传
cd /opt/modules
2)安装
# 添加执行权限
chmod u+x Anaconda3-2021.05-Linux-x86_64.sh
# 执行
sh ./Anaconda3-2021.05-Linux-x86_64.sh
# 过程
#第一次:【直接回车,然后按q】
Please, press ENTER to continue
>>>
#第二次:【输入yes】
Do you accept the license terms? [yes|no]
[no] >>> yes
#第三次:【输入解压路径:/opt/installs/anaconda3】
[/root/anaconda3] >>> /opt/installs/anaconda3
#第四次:【输入yes,是否在用户的.bashrc文件中初始化
Anaconda3的相关内容】
Do you wish the installer to initialize Anaconda3
by running conda init? [yes|no]
[no] >>> yes
3)刷新环境变量
# 刷新环境变量
source /root/.bashrc
# 激活虚拟环境,如果需要关闭就使用:conda deactivate
conda activate
4)输入python3 查看命令是否可用
5)配置环境变量
# 编辑环境变量
vi /etc/profile
# 添加以下内容
# Anaconda Home
export ANACONDA_HOME=/opt/installs/anaconda3
export PATH=$PATH:$ANACONDA_HOME/bin
6)刷新环境变量,并且做一个软链接
# 刷新环境变量
source /etc/profile
# 创建软连接
ln -s /opt/installs/anaconda3/bin/python3 /usr/bin/python3
# 验证
echo $ANACONDA_HOME
(三)测试使用
Spark Python Shell 是一个交互工具,可以启动spark中的交互工具,里面可以写代码。
启动该工具
# 启动Python开发Spark的交互命令行
# --master:用于指定运行的模式,--master yarn
# local[2]:使用本地模式,并且只给2CoreCPU来运行程序
/opt/installs/spark/bin/pyspark --master local[2]
案例一:map 算子实现分布式转换
SparkCore 中的函数称为算子。
需求将一个包含 1 - 10 共 10 个元素的列表,使用 Spark 实现分布式处理,输出每个元素的平方。
# 定义一个列表
list1 = [1,2,3,4,5,6,7,8,9,10]
# 将列表通过SparkContext将数据转换为一个分布式集合RDD
inputRdd = sc.parallelize(list1)
# 将RDD中每个分区的数据进行处理
rsRdd = inputRdd.map(lambda x : x**2)
# 将结果RDD的每个元素进行输出
rsRdd.foreach(lambda x : print(x))
案例二:WordCount 需求及分析
数据在 /home/data.txt
hadoop spark
hive hadoop spark spark
hue hbase hbase hue hue
hadoop spark
hive hadoop spark spark
hue hbase hbase hue hue
hadoop spark
hive hadoop spark spark
hue hbase hbase hue hue
hadoop spark
在 python 中有一个 map 函数,将集合中的每一个元素,拿出来,一个个的传递一个另一个函数,形成一个新的集合。
1、读取数据
# 将这个文件读取到Spark中,变成一个分布式列表对象
fileRdd = sc.textFile("/home/data.txt")
# 输出这个数据一共有多少行
fileRdd.count()
# 输出这个数据前3行的内容
fileRdd.take(3)
2、过滤空行
filterRdd = fileRdd.filter(lambda line :len(line.strip()) > 0)
filterRdd.count()
# 输出这个数据前3行的内容
filterRdd.take(3)
3、将每一行多个单词转换为一行一个单词
# 将每条数据中一行多个单词,变成一行一个单词
# [["hello","world"],["spark","hadoop"]] ==> ["hello","world","spark","hadoop"]
# 三体科幻电影中的二向箔
wordRdd = filterRdd.flatMap(lambda line :line.strip().split(r" "))
wordRdd.count()
wordRdd.take(10)
4、将一个单词变为一个元组
tupleRdd = wordRdd.map(lambda word : (word,1))
tupleRdd.take(10)
5、按照单词进行分组聚合
# 按照Key进行分组并且进行聚合
# tmp 是前面计算的总和,item是本次需要计算的值 10,1 == 11,下一次 tem = 11 ,item = 1
rsRdd = tupleRdd.reduceByKey(lambda tmp,item : tmp+item)
6、循环遍历
rsRdd.foreach(lambda kv : print(kv))
7、保存到本地
rsRdd.saveAsTextFile("/home/wcoutput")
以上代码可以连着写:
# 读取数据
inputRdd = sc.textFile("/home/data.txt")
# 转换数据
rsRdd = inputRdd.filter(lambda line : len(line.strip())> 0) .flatMap(lambda line :line.strip().split(r" ")).map(lambda word : (word,1)).reduceByKey(lambda tmp,item :tmp+item)
# 保存结果
rsRdd.saveAsTextFile("/home/wcoutput2")
案例三:求 PI
/opt/installs/spark/bin/spark-submit --master local[2] /opt/installs/spark/examples/src/main/python/pi.py 100
四、结语
通过本文对 Spark 的全面介绍,从其分布式思想、自身的发展历程、功能特点等,再到单机模式的搭建和测试,希望读者能够对 Spark 有一个清晰、深入的理解。Spark 作为大数据领域的重要工具,还有更多的潜力等待大家去挖掘和探索,希望这篇文章能成为大家在 Spark 学习和实践道路上的一个有力指引。