文章目录
- Spark on Yarn
- 两种模式
- client
- cluster
- spark-shell 和 spark-submit 的区别的理解
- spark-shell
- spark-submit
- WorldCount实现
- IDEA本地实现
- On Yarn 实现
- WorldCount图解
Spark on Yarn
spark on yarn 的两种模式是指 spark 应用程序的 driver 进程(负责控制和协调整个应用程序的运行)在哪里运行的问题。
两种模式
client
yarn-client 模式是指 driver 运行在客户端
上,通过 application master(负责向 yarn 的 resource manager 申请资源和监控应用程序状态)来管理集群中的 executor 进程(负责执行具体的任务)。这样做的好处是可以方便地获取日志和返回信息,因为 driver 在客户端。缺点是会占用客户端的资源,可能影响性能和并发。这种模式适合开发测试环境,因为可以方便地调试和交互。
cluster
yarn-cluster 模式是指driver 运行在 yarn 集群中的一个 worker 节点上
,这个节点会在 spark web UI 上显示为 driver 节点。这样做的好处是可以节省客户端(提交 spark 应用程序的机器)的资源,不受客户端的限制。缺点是不方便查看日志和返回信息,因为 driver 不在客户端。这种模式适合生产环境,因为可以保证应用程序的稳定性和效率。
spark-shell 和 spark-submit 的区别的理解
spark-shell
spark-shell 是一个交互式的 shell,可以用来运行 spark 代码和 SQL 语句,支持 Scala 和 Python 两种语言。spark-shell 可以在本地或者集群上运行,可以方便地进行数据分析和探索,也可以用来测试和调试 spark 应用程序。spark-shell 启动时会创建两个对象,一个是 sc,一个是 spark。sc 是 SparkContext 的实例,是 spark 的底层核心对象,负责和集群进行通信和协调。spark 是 SparkSession 的实例,是 spark 的高层封装对象,提供了更多的功能和便利,如 SQL, DataFrame, Dataset 等。
spark-submit
spark-submit 是一个用来在集群上提交和运行 spark 应用程序的脚本,支持 Scala, Java 和 Python 三种语言。spark-submit 可以指定各种选项和配置,如集群管理器,部署模式,资源分配,依赖包等。spark-submit 需要提供一个包含 spark 应用程序的 jar 包或者 py 文件,以及传递给应用程序的参数。spark-submit 适合运行生产环境的 spark 应用程序,可以保证应用程序的稳定性和效率。
spark-submit 的一般语法是:
./bin/spark-submit \
–class <main-class> \
–master <master-url> \
–deploy-mode <deploy-mode> \
–conf <key>=<value> \
… # 其他选项
<application-jar> \
[application-arguments]
其中,一些常用的选项是:
–class: 应用程序的入口类(例如 org.apache.spark.examples.SparkPi)
–master: 集群的 master URL(例如 yarn)
–deploy-mode: 是否在 worker 节点上部署 driver(cluster)或者在本地作为一个外部客户端(client)(默认是 client)
–conf: 任意的 spark 配置属性,以 key=value 的格式。如果值包含空格,需要用引号括起来(如下所示)。多个配置需要用不同的参数传递。(例如 --conf <key>=<value> --conf <key2>=<value2>)
application-jar: 包含应用程序和所有依赖的 jar 包的路径。这个 URL 必须在集群中全局可见,例如,一个 hdfs:// 路径或者一个 file:// 路径,且在所有节点上存在。
WorldCount实现
IDEA本地实现
在maven中导入相应依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version> </dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.4.17</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.12</artifactId>
<version>2.4.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包插件, 否则 scala 类不会编译并打包进去 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
代码实现
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//准备sc/SparkContext/Spark上下文执行环境
/*这段代码的意思是创建一个 SparkConf 对象,用来配置 spark 应用程序的属性。SparkConf 对象可以传递给 SparkContext,用来创建 spark 环境。
这段代码设置了两个属性:
setAppName(“wc”): 设置 spark 应用程序的名称为 “wc”,这个名称会显示在 spark web UI 上,也可以用来标识应用程序。
setMaster("local[*]"): 设置 spark 应用程序的运行模式为 local 模式,即在本地运行。
local[*] 表示使用所有可用的 CPU 核数,也可以指定具体的数字,例如 local[2] 表示使用两个 CPU 核数。*/
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
/*这段代码的意思是创建一个 SparkContext 对象,用来初始化 spark 环境。
SparkContext 是 spark 的核心对象,负责和集群进行通信和协调,创建 RDD, DataFrame, Dataset 等分布式数据集。
这段代码传递了一个 SparkConf 对象,用来指定 spark 应用程序的配置属性,例如应用程序的名称,运行模式,资源分配等。*/
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//source/读取数据
//RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
//从本地文件系统中读取 data/input/words.txt 文件,创建一个 RDD[String] 类型的数据集,每个元素是文件中的一行文本。
val lines: RDD[String] = sc.textFile("data/input/words.txt")
//transformation/数据操作/转换
//对 lines 数据集进行 flatMap 转换操作,将每行文本按空格切分成单词,然后扁平化成一个 RDD[String] 类型的数据集,每个元素是一个单词。
val words: RDD[String] = lines.flatMap(_.split(" "))
//对 words 数据集进行 map 转换操作,将每个单词映射成一个 (单词, 1) 的元组,创建一个 RDD[(String, Int)] 类型的数据集,每个元素是一个 (单词, 1) 的键值对。
// RDD[(单词, 1)]
val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
//分组聚合:groupBy + mapValues(_.map(_._2).reduce(_+_)) ===>在Spark里面分组+聚合一步搞定:reduceByKey
// 对 wordAndOnes 数据集进行 reduceByKey 转换操作,
// 按照单词进行分组,然后对每组的值进行累加,创建一个 RDD[(String, Int)] 类型的数据集,每个元素是一个 (单词, 频数) 的键值对。
val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
//直接输出
result.foreach(println)
//收集为本地集合再输出
println(result.collect().toBuffer)
//输出到指定path(可以是文件/夹)
/*result.repartition(1):
对 result 数据集进行 repartition 操作,将数据集的分区数调整为 1,即将所有的数据合并到一个分区中。
这样做的目的是为了方便保存到一个文件中,但是可能会影响性能和并行度。
saveAsTextFile(“data/output/result”):
对 repartition 后的数据集进行 saveAsTextFile 操作,将数据集的内容以文本格式保存到 data/output/result 目录中。
每个分区对应一个文件,文件名为 part-00000, part-00001 等。
因为 repartition 之后只有一个分区,所以只会生成一个文件,即 part-00000。*/
result.repartition(1).saveAsTextFile("data/output/result")
result.repartition(2).saveAsTextFile("data/output/result2")
//为了便于查看Web-UI可以让程序睡一会
Thread.sleep(1000 * 60)
//关闭资源
sc.stop()
}
}
On Yarn 实现
代码实现
import org.apache.hadoop.fs.{FileSystem, Path} //导入Hadoop文件系统相关的类
import org.apache.spark.{SparkConf, SparkContext} //导入Spark相关的类
object WordCount1 {
def main(args: Array[String]): Unit = {
//这里不需要设置setMaster,因为在集群运行时,可以通过 spark-submit 命令的 --master 选项来指定 master URL,而不需要在代码中设置
val conf: SparkConf = new SparkConf().setAppName("WordCount1") //创建SparkConf对象,设置应用程序名称
val sc = new SparkContext(conf) //创建SparkContext对象
val fs = FileSystem.get(sc.hadoopConfiguration) //获取Hadoop文件系统
val outPutPath = new Path("/result") //设置输出路径
if (fs.exists(outPutPath)) //如果输出路径已经存在,则删除
fs.delete(outPutPath,true)
sc.textFile("/FirstDemo.txt") //读取文本文件
.flatMap(_.split(" ")) //将每一行按空格分割成单词
.map((_, 1)) //将每个单词映射为(key, value)对,value为1
.reduceByKey(_ + _) //按key进行聚合,统计每个单词出现的次数
.saveAsTextFile("/result") //将结果保存到输出路径
sc.stop() //停止SparkContext
}
}
打包
虚拟机和hdfs
提交任务
spark-submit --class com.qst.spark.WordCount1 --master yarn --deploy-mode client /home/project/sparkdemo-1.0-SNAPSHOT.jar
Yarn端查看
hdfs端查看
命令查看结果