视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili
- 尚硅谷大数据技术Spark教程-笔记01【SparkCore(概述、快速上手、运行环境、运行架构)】
- 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,RDD-核心属性-执行原理-基础编程-并行度与分区-转换算子)】
- 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,RDD-转换算子-案例实操)】
- 尚硅谷大数据技术Spark教程-笔记04【SparkCore(核心编程,RDD-行动算子-序列化-依赖关系-持久化-分区器-文件读取与保存)】
- 尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】
- 尚硅谷大数据技术Spark教程-笔记06【SparkCore(案例实操,电商网站)】
目录
01_尚硅谷大数据技术之SparkCore
第05章-Spark核心编程
P105【105.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 原理及简单演示】15:49
P106【106.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 问题】03:39
P107【107.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现】10:55
P108【108.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现 - 1】07:14
P109【109.尚硅谷_SparkCore - 核心编程 - 数据结构 - 广播变量】17:16
01_尚硅谷大数据技术之SparkCore
第05章-Spark核心编程
P105【105.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 原理及简单演示】15:49
5.2 累加器
5.2.1 实现原理
累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
package com.atguigu.bigdata.spark.core.acc
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_Acc {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
//reduce:分区内计算,分区间计算
//val i: Int = rdd.reduce(_+_)
//println(i)
var sum = 0
rdd.foreach(
num => {
sum += num
}
)
println("sum = " + sum) // sum = 0
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.acc
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_Acc {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// 获取系统累加器
// Spark默认就提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
//sc.doubleAccumulator
//sc.collectionAccumulator
rdd.foreach(
num => {
// 使用累加器
sumAcc.add(num)
}
)
// 获取累加器的值
println(sumAcc.value) // 10
sc.stop()
}
}
P106【106.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 问题】03:39
package com.atguigu.bigdata.spark.core.acc
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_Acc {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// 获取系统累加器
// Spark默认就提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
//sc.doubleAccumulator
//sc.collectionAccumulator
val mapRDD = rdd.map(
num => {
// 使用累加器
sumAcc.add(num)
num
}
)
// 获取累加器的值
// 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
// 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
// 一般情况下,累加器会放置在行动算子进行操作
mapRDD.collect()
mapRDD.collect()
println(sumAcc.value) // 20
sc.stop()
}
}
P107【107.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现】10:55
package com.atguigu.bigdata.spark.core.acc
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark04_Acc_WordCount {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List("hello", "spark", "hello"))
// 累加器 : WordCount
// 创建累加器对象
val wcAcc = new MyAccumulator()
// 向Spark进行注册
sc.register(wcAcc, "wordCountAcc")
rdd.foreach(
word => {
// 数据的累加(使用累加器)
wcAcc.add(word)
}
)
// 获取累加器累加的结果
println(wcAcc.value)
sc.stop()
}
/*
自定义数据累加器:WordCount
1. 继承AccumulatorV2, 定义泛型
IN : 累加器输入的数据类型 String
OUT : 累加器返回的数据类型 mutable.Map[String, Long]
2. 重写方法(6)
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
}
}
P108【108.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现 - 1】07:14
package com.atguigu.bigdata.spark.core.acc
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark04_Acc_WordCount {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List("hello", "spark", "hello"))
// 累加器 : WordCount
// 创建累加器对象
val wcAcc = new MyAccumulator()
// 向Spark进行注册
sc.register(wcAcc, "wordCountAcc")
rdd.foreach(
word => {
// 数据的累加(使用累加器)
wcAcc.add(word)
}
)
// 获取累加器累加的结果
println(wcAcc.value)
sc.stop()
}
/*
自定义数据累加器:WordCount
1. 继承AccumulatorV2, 定义泛型
IN : 累加器输入的数据类型 String
OUT : 累加器返回的数据类型 mutable.Map[String, Long]
2. 重写方法(6)
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
private var wcMap = mutable.Map[String, Long]()
// 判断是否初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}
override def reset(): Unit = {
wcMap.clear()
}
// 获取累加器需要计算的值
override def add(word: String): Unit = {
val newCnt = wcMap.getOrElse(word, 0L) + 1
wcMap.update(word, newCnt)
}
// Driver合并多个累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach {
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
}
// 累加器结果
override def value: mutable.Map[String, Long] = {
wcMap
}
}
}
P109【109.尚硅谷_SparkCore - 核心编程 - 数据结构 - 广播变量】17:16
5.3 广播变量
5.3.1 实现原理
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。
package com.atguigu.bigdata.spark.core.acc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark05_Bc {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
))
// val rdd2 = sc.makeRDD(List(
// ("a", 4),("b", 5),("c", 6)
// ))
val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))
// join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
//val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
//joinRDD.collect().foreach(println)
// (a, 1), (b, 2), (c, 3)
// (a, (1,4)),(b, (2,5)),(c, (3,6))
rdd1.map {
case (w, c) => {
val l: Int = map.getOrElse(w, 0)
(w, (c, l))
}
}.collect().foreach(println)
//(a,(1,4))
//(b,(2,5))
//(c,(3,6))
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.acc
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark06_Bc {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
))
val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))
// 封装广播变量
val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
rdd1.map {
case (w, c) => {
// 访问广播变量
val l: Int = bc.value.getOrElse(w, 0)
(w, (c, l))
}
}.collect().foreach(println)
//(a,(1,4))
//(b,(2,5))
//(c,(3,6))
sc.stop()
}
}