视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili
- 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】
- 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,RDD-核心属性-执行原理-基础编程-并行度与分区-转换算子)】
- 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,RDD-转换算子-案例实操)】
- 尚硅谷大数据技术Spark教程-笔记04【SparkCore(核心编程,RDD-行动算子-序列化-依赖关系-持久化-分区器-文件读取与保存)】
- 尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器-实现原理-基础编程)】
目录
01_尚硅谷大数据技术之SparkCore
第05章-Spark核心编程
P081【081.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - 介绍】04:32
P082【082.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - 算子演示】08:00
P083【083.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - aggregate】04:25
P084【084.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - countByKey & countByValue】04:45
P085【085.尚硅谷_SparkCore - 核心编程 - RDD - WordCount不同的实现方式 - (1-8)】10:16
P086【086.尚硅谷_SparkCore - 核心编程 - RDD - WordCount不同的实现方式 - (9-11)】06:03
P087【087.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - save的方法】03:41
P088【088.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - foreach】11:37
P089【089.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - 闭包检测】14:10
P090【090.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - 实际执行时的问题】12:04
P091【091.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - Kryo序列化Core介绍】10:07
P092【092.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 依赖 & 血缘关系介绍】05:17
P093【093.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 血缘关系 - 演示】11:36
P094【094.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 宽窄依赖】11:36
P095【095.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 阶段&分区&任务 - 概念解析 - 秋游了】09:41
P096【096.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 阶段划分源码解读】11:31
P097【097.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 任务划分源码解读】08:57
P098【098.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 任务分类】02:52
P099【099.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - cache & persist基本原理和演示】14:46
P100【100.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 作用】05:19
P101【101.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 检查点】03:00
P102【102.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 区别】11:48
P103【103.尚硅谷_SparkCore - 核心编程 - RDD - 分区器 - 自定义数据分区规则】09:02
P104【104.尚硅谷_SparkCore - 核心编程 - RDD - 文件读取与保存】04:36
01_尚硅谷大数据技术之SparkCore
第05章-Spark核心编程
P081【081.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - 介绍】04:32
5.1.4.5 RDD行动算子
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// TODO - 行动算子
// 所谓的行动算子,其实就是触发作业(Job)执行的方法
// 底层代码调用的是环境对象的runJob方法
// 底层代码中会创建ActiveJob,并提交执行。
rdd.collect()
sc.stop()
}
}
P082【082.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - 算子演示】08:00
5.1.4.5 RDD行动算子
- reduce
- collect
- count
- first
- take
- takeOrdered
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// TODO - 行动算子
// 1、reduce
val i: Int = rdd.reduce(_ + _)
println(i) //10
// 2、collect:方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
val ints000: Array[Int] = rdd.collect()
println(ints000.mkString(",")) //1,2,3,4
// 3、count:数据源中数据的个数
val cnt = rdd.count()
println(cnt) //4
// 4、first:获取数据源中数据的第一个
val first = rdd.first()
println(first) //1
// 5、take:获取N个数据
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(",")) //1,2,3
// 6、takeOrdered:数据排序后,取N个数据
val rdd1 = sc.makeRDD(List(4, 2, 3, 1))
val ints1: Array[Int] = rdd1.takeOrdered(3)
println(ints1.mkString(",")) //1,2,3
sc.stop()
}
}
P083【083.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - aggregate】04:25
5.1.4.5 RDD行动算子
7、aggregate,函数说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// TODO - 行动算子
// 10 + 13 + 17 = 40
// aggregateByKey:初始值只会参与分区内计算
// aggregate:初始值会参与分区内计算,并且和参与分区间计算
val result1 = rdd.aggregate(10)(_ + _, _ + _)
println(result1) //40
sc.stop()
}
}
P084【084.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - countByKey & countByValue】04:45
5.1.4.5 RDD行动算子
8、fold
9、countByKey
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// TODO - 行动算子
// 10 + 13 + 17 = 40
// aggregateByKey:初始值只会参与分区内计算
// aggregate:初始值会参与分区内计算,并且和参与分区间计算
val result1 = rdd.aggregate(10)(_ + _, _ + _)
println(result1) //40
val result2 = rdd.fold(10)(_ + _)
println(result2) //40
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 1, 1, 4), 2)
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
println(intToLong) //Map(4 -> 1, 1 -> 3)
val rdd2 = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3)
))
val stringToLong: collection.Map[String, Long] = rdd2.countByKey()
println(stringToLong) //Map(a -> 3)
sc.stop()
}
}
P085【085.尚硅谷_SparkCore - 核心编程 - RDD - WordCount不同的实现方式 - (1-8)】10:16
package com.atguigu.bigdata.spark.core.wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark03_WordCount {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
wordcount91011(sc)
sc.stop()
}
// groupBy
def wordcount1(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
// groupByKey
def wordcount2(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
// reduceByKey
def wordcount3(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_ + _)
}
// aggregateByKey
def wordcount4(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_ + _, _ + _)
}
// foldByKey
def wordcount5(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_ + _)
}
// combineByKey
def wordcount6(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
v => v,
(x: Int, y) => x + y,
(x: Int, y: Int) => x + y
)
}
// countByKey
def wordcount7(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: collection.Map[String, Long] = wordOne.countByKey()
}
// countByValue
def wordcount8(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordCount: collection.Map[String, Long] = words.countByValue()
}
}
P086【086.尚硅谷_SparkCore - 核心编程 - RDD - WordCount不同的实现方式 - (9-11)】06:03
package com.atguigu.bigdata.spark.core.wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark03_WordCount {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
wordcount91011(sc)
sc.stop()
}
// reduce, aggregate, fold
def wordcount91011(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
// 【(word, count),(word, count)】
// word => Map[(word,1)]
val mapWord = words.map(
word => {
mutable.Map[String, Long]((word, 1))
}
)
val wordCount = mapWord.reduce(
(map1, map2) => {
map2.foreach {
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
map1
}
)
println(wordCount)
}
}
P087【087.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - save的方法】03:41
5.1.4.5 RDD行动算子
10、save相关算子
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// val rdd = sc.makeRDD(List(1, 1, 1, 4), 2)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3)
))
// TODO - 行动算子
rdd.saveAsTextFile("output007")
rdd.saveAsObjectFile("output008")
// saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output009")
sc.stop()
}
}
P088【088.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - foreach】11:37
5.1.4.5 RDD行动算子
11、foreach
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// foreach 其实是Driver端内存集合的循环遍历方法
rdd.collect().foreach(println)
println("******************")
// foreach 其实是Executor端内存数据打印
rdd.foreach(println)
// 算子 : Operator(操作)
// RDD的方法和Scala集合对象的方法不一样
// 集合对象的方法都是在同一个节点的内存中完成的。
// RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
// 为了区分不同的处理效果,所以将RDD的方法称之为算子。
// RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。
sc.stop()
}
}
P089【089.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - 闭包检测】14:10
5.1.4.5 RDD行动算子
11、foreach
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List[Int]())
val user = new User()
// SparkException: Task not serializable
// NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User
// RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能
// 闭包检测
rdd.foreach(
num => {
println("age = " + (user.age + num))
}
)
sc.stop()
}
//class User extends Serializable {
// 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
case class User() {// class User {
var age: Int = 30
}
}
P090【090.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - 实际执行时的问题】12:04
5.1.4.6 RDD序列化
1)闭包检查
从计算的角度,算子以外的代码都是在Driver端执行,算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变
2)序列化方法和属性
从计算的角度,算子以外的代码都是在Driver 端执行, 算子里面的代码都是在Executor端执行,看如下代码:
package com.atguigu.bigdata.spark.core.rdd.serial
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Serial {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
//search.getMatch1(rdd).collect().foreach(println)
search.getMatch2(rdd).collect().foreach(println)
sc.stop()
}
// 查询对象
// 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测
class Search(query: String) {
def isMatch(s: String): Boolean = {
s.contains(this.query)
}
//函数序列化案例
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 属性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
val s = query
rdd.filter(x => x.contains(s))
}
}
}
P091【091.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - Kryo序列化Core介绍】10:07
5.1.4.6 RDD序列化
3) Kryo 序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
object serializable_Kryo {
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher])) val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
}
def isMatch(s: String) = { s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = { rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = { val q = query rdd.filter(_.contains(q))
}
P092【092.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 依赖 & 血缘关系介绍】05:17
5.1.4.7 RDD依赖关系
1) RDD血缘关系
P093【093.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 血缘关系 - 演示】11:36
package com.atguigu.bigdata.spark.core.rdd.dep
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark01_RDD_Dep {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val lines: RDD[String] = sc.textFile("datas/word.txt")
println(lines.toDebugString)
println("*************************")
val words: RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("*************************")
val wordToOne = words.map(word => (word, 1))
println(wordToOne.toDebugString)
println("*************************")
val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println(wordToSum.toDebugString)
println("*************************")
val array: Array[(String, Int)] = wordToSum.collect()
array.foreach(println)
sc.stop()
}
}
P094【094.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 宽窄依赖】11:36
5.1.4.8 RDD持久化
2) RDD CheckPoint 检查点
3) RDD窄依赖
窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
4) RDD宽依赖
宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
P095【095.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 阶段&分区&任务 - 概念解析 - 秋游了】09:41
5.1.4.8 RDD持久化
5) RDD 阶段划分
P096【096.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 阶段划分源码解读】11:31
5.1.4.8 RDD持久化
6) RDD阶段划分源码
P097【097.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 任务划分源码解读】08:57
5.1.4.8 RDD持久化
7) RDD 任务划分
RDD任务切分中间分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个Application;
- Job:一个Action 算子就会生成一个Job;
- Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
- Task:一个 Stage 阶段中,最后一个RDD 的分区个数就是Task 的个数。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
P098【098.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 任务分类】02:52
5.1.4.8 RDD持久化
7) RDD 任务划分
8) RDD任务划分源码
val tasks: Seq[Task[_]] = try { stage match {
case stage: ShuffleMapStage => partitionsToCompute.map { id =>
val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage => partitionsToCompute.map { id =>
val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
……
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
……
override def findMissingPartitions(): Seq[Int] = { mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
P099【099.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - cache & persist基本原理和演示】14:46
5.1.4.8 RDD 持久化
1) RDD Cache 缓存
package com.atguigu.bigdata.spark.core.rdd.persist
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Persist {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map((_, 1))
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("**************************************")
val rdd1 = sc.makeRDD(list)
val flatRDD1 = rdd1.flatMap(_.split(" "))
val mapRDD1 = flatRDD1.map((_, 1))
val groupRDD = mapRDD1.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.persist
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Persist {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
val sc = new SparkContext(sparConf)
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(word => {
println("@@@@@@@@@@@@")
(word, 1)
})
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("**************************************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.persist
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Persist {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
val sc = new SparkContext(sparConf)
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(word => {
println("@@@@@@@@@@@@")
(word, 1)
})
// cache默认持久化的操作,只能将数据保存到内存中,如果想要保存到磁盘文件,需要更改存储级别
// mapRDD.cache()
// 持久化操作必须在行动算子执行时完成的。
mapRDD.persist(StorageLevel.DISK_ONLY)
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("**************************************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
P100【100.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 作用】05:19
5.1.4.8 RDD 持久化
1) RDD Cache 缓存
P101【101.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 检查点】03:00
5.1.4.8 RDD 持久化
2) RDD CheckPoint 检查点
package com.atguigu.bigdata.spark.core.rdd.persist
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Persist {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
val sc = new SparkContext(sparConf)
sc.setCheckpointDir("cp")
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(word => {
println("@@@@@@@@@@@@")
(word, 1)
})
// checkpoint 需要落盘,需要指定检查点保存路径
// 检查点路径保存的文件,当作业执行完毕后,不会被删除
// 一般保存路径都是在分布式存储系统中:HDFS
mapRDD.checkpoint()
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("**************************************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
P102【102.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 区别】11:48
5.1.4.8 RDD 持久化
3) 缓存和检查点区别
- 1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
- 2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
- 3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。
package com.atguigu.bigdata.spark.core.rdd.persist
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Persist {
def main(args: Array[String]): Unit = {
// cache : 将数据临时存储在内存中进行数据重用
// persist : 将数据临时存储在磁盘文件中进行数据重用
// 涉及到磁盘IO,性能较低,但是数据安全
// 如果作业执行完毕,临时保存的数据文件就会丢失
// checkpoint : 将数据长久地保存在磁盘文件中进行数据重用
// 涉及到磁盘IO,性能较低,但是数据安全
// 为了保证数据安全,所以一般情况下,会独立执行作业
// 为了能够提高效率,一般情况下,是需要和cache联合使用
val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
val sc = new SparkContext(sparConf)
sc.setCheckpointDir("cp")
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(word => {
println("@@@@@@@@@@@@")
(word, 1)
})
mapRDD.cache()
mapRDD.checkpoint()
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("**************************************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.persist
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Persist {
def main(args: Array[String]): Unit = {
// cache : 将数据临时存储在内存中进行数据重用
// 会在血缘关系中添加新的依赖。一旦,出现问题,可以重头读取数据
// persist : 将数据临时存储在磁盘文件中进行数据重用
// 涉及到磁盘IO,性能较低,但是数据安全
// 如果作业执行完毕,临时保存的数据文件就会丢失
// checkpoint : 将数据长久地保存在磁盘文件中进行数据重用
// 涉及到磁盘IO,性能较低,但是数据安全
// 为了保证数据安全,所以一般情况下,会独立执行作业
// 为了能够提高效率,一般情况下,是需要和cache联合使用
// 执行过程中,会切断血缘关系。重新建立新的血缘关系
// checkpoint等同于改变数据源
val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
val sc = new SparkContext(sparConf)
sc.setCheckpointDir("cp")
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(word => {
(word, 1)
})
//mapRDD.cache()
mapRDD.checkpoint()
println(mapRDD.toDebugString)
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("**************************************")
println(mapRDD.toDebugString)
sc.stop()
}
}
P103【103.尚硅谷_SparkCore - 核心编程 - RDD - 分区器 - 自定义数据分区规则】09:02
5.1.4.9 RDD分区器
package com.atguigu.bigdata.spark.core.rdd.part
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Spark01_RDD_Part {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(
("nba", "xxxxxxxxx"),
("cba", "xxxxxxxxx"),
("wnba", "xxxxxxxxx"),
("nba", "xxxxxxxxx"),
), 3)
val partRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)
partRDD.saveAsTextFile("output")
sc.stop()
}
/**
* 自定义分区器
* 1. 继承Partitioner
* 2. 重写方法
*/
class MyPartitioner extends Partitioner {
// 分区数量
override def numPartitions: Int = 3
// 根据数据的key值返回数据所在的分区索引(从0开始)
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case "wnba" => 1
case _ => 2
}
}
}
}
P104【104.尚硅谷_SparkCore - 核心编程 - RDD - 文件读取与保存】04:36
5.1.4.10 RDD文件读取与保存
package com.atguigu.bigdata.spark.core.rdd.io
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_IO_Save {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(
List(
("a", 1),
("b", 2),
("c", 3)
)
)
rdd.saveAsTextFile("output1111")
rdd.saveAsObjectFile("output2222")
rdd.saveAsSequenceFile("output3333")
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.io
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_IO_Load {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd = sc.textFile("output011")
println(rdd.collect().mkString(","))
val rdd1 = sc.objectFile[(String, Int)]("output012")
println(rdd1.collect().mkString(","))
val rdd2 = sc.sequenceFile[String, Int]("output013")
println(rdd2.collect().mkString(","))
sc.stop()
}
}