04-240606Spark笔记
1.行动算子-2
-
save相关算子:
格式:
def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
例子:
val rdd = sc.makeRDD(List( ("a",1),("a",2),("a",3) )) rdd.saveAsTextFile("output") rdd.saveAsObjectFile("output1") // saveAsSequenceFile方法要求数据的格式必须为K-V类型 rdd.saveAsSequenceFile("output2")
输出结果:
-
foreach
格式:
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
例子:
val rdd = sc.makeRDD(List(1,2,3,4)) //foreach 其实是Driver端内存集合的循环遍历方法 rdd.collect().foreach(println) //Driver println("***************") // foreach 其实是Executor端内存数据打印 rdd.foreach(println) // Executor // 算子 : Operator(操作) // RDD的方法和Scala集合对象的方法不一样 // 集合对象的方法都是在同一个节点的内存中完成的。 // RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行 // 为了区分不同的处理效果,所以将RDD的方法称之为算子。 // RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。
输出结果:
2. 序列化
2.1 闭包检测
-
闭包检测
因为Driver需要给两个Executor共享User方法,共享就需要序列化
案例:
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List[Int]()) val user = new User() // SparkException: Task not serializable // NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User // RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能 // 闭包检测 rdd.foreach( num => { println("age = " + (user.age + num)) } ) sc.stop() } //class User extends Serializable { // 样例类在编译时,会自动混入序列化特质(实现可序列化接口) //case class User() { class User { var age : Int = 30 }
-
RDD 的分区器
自己来写分区器:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List( ("nba", "xxxxxxxxx"), ("cba", "xxxxxxxxx"), ("wnba", "xxxxxxxxx"), ("nba", "xxxxxxxxx"), ),3) val partRDD: RDD[(String, String)] = rdd.partitionBy( new MyPartitioner ) partRDD.saveAsTextFile("output") sc.stop() }
自定义的分区器:
class MyPartitioner extends Partitioner{ // 分区数量 override def numPartitions: Int = 3 // 根据数据的key值返回数据所在的分区索引(从0开始) override def getPartition(key: Any): Int = { key match { case "nba" => 0 case "wnba" => 1 case _ => 2 } } }
* 自定义分区器 * 1. 继承Partitioner * 2. 重写方法
输出结果:
-
RDD 文件读取与保存
案例1:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparConf) val rdd = sc.textFile("output1") println(rdd.collect().mkString(",")) val rdd1 = sc.objectFile[(String, Int)]("output2") println(rdd1.collect().mkString(",")) val rdd2 = sc.sequenceFile[String, Int]("output3") println(rdd2.collect().mkString(",")) sc.stop() }
输出结果:
案例2:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparConf) val rdd = sc.makeRDD( List( ("a", 1), ("b", 2), ("c", 3) ) ) rdd.saveAsTextFile("output1") rdd.saveAsObjectFile("output2") rdd.saveAsSequenceFile("output3") sc.stop() }
输出结果:
1. 数据结构:
-
累加器
累加器用来把 Executor 端变量信息聚合到 Driver 端。
![image-20240605202228850](E:\Files2\Typictures\image-20240605202228850.png
Acc,累加器可以把Excutor端的数据返回到Driver中去:
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(1,2,3,4)) // reduce : 分区内计算,分区间计算 //val i: Int = rdd.reduce(_+_) //println(i) var sum = 0 rdd.foreach( num => { sum += num } ) println("sum = " + sum) sc.stop() }
-
系统累加器
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(1,2,3,4)) // 获取系统累加器 // Spark默认就提供了简单数据聚合的累加器 val sumAcc = sc.longAccumulator("sum") //sc.doubleAccumulator //sc.collectionAccumulator rdd.foreach( num => { // 使用累加器 sumAcc.add(num) } ) // 获取累加器的值 println(sumAcc.value) sc.stop() }
累加器的一些特殊情况:
少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行 一般情况下,累加器会放置在行动算子进
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(1,2,3,4)) // 获取系统累加器 // Spark默认就提供了简单数据聚合的累加器 val sumAcc = sc.longAccumulator("sum") //sc.doubleAccumulator //sc.collectionAccumulator val mapRDD = rdd.map( num => { // 使用累加器 sumAcc.add(num) num } ) // 获取累加器的值 // 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行 // 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行 // 一般情况下,累加器会放置在行动算子进行操作 mapRDD.collect() mapRDD.collect() println(sumAcc.value) sc.stop() }
-
自定义累加器
分布式共享只写变量
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List("hello", "spark", "hello")) // 累加器 : WordCount // 创建累加器对象 val wcAcc = new MyAccumulator() // 向Spark进行注册 sc.register(wcAcc, "wordCountAcc") rdd.foreach( word => { // 数据的累加(使用累加器) wcAcc.add(word) } ) // 获取累加器累加的结果 println(wcAcc.value) sc.stop() } /* 自定义数据累加器:WordCount 1. 继承AccumulatorV2, 定义泛型 IN : 累加器输入的数据类型 String OUT : 累加器返回的数据类型 mutable.Map[String, Long] 2. 重写方法(6) */ class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] { private var wcMap = mutable.Map[String, Long]() // 判断是否初始状态 override def isZero: Boolean = { wcMap.isEmpty } override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new MyAccumulator() } override def reset(): Unit = { wcMap.clear() } // 获取累加器需要计算的值 override def add(word: String): Unit = { val newCnt = wcMap.getOrElse(word, 0L) + 1 wcMap.update(word, newCnt) } // Driver合并多个累加器 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { val map1 = this.wcMap val map2 = other.value map2.foreach{ case ( word, count ) => { val newCount = map1.getOrElse(word, 0L) + count map1.update(word, newCount) } } } // 累加器结果 override def value: mutable.Map[String, Long] = { wcMap } }
-
广播变量
实现原理:
广播变量用来高效分发较大的对象。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务
分别发送。
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf) val rdd1 = sc.makeRDD(List( ("a", 1),("b", 2),("c", 3) )) // val rdd2 = sc.makeRDD(List( // ("a", 4),("b", 5),("c", 6) // )) val map = mutable.Map(("a", 4),("b", 5),("c", 6)) // join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用 //val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2) //joinRDD.collect().foreach(println) // (a, 1), (b, 2), (c, 3) // (a, (1,4)),(b, (2,5)),(c, (3,6)) rdd1.map { case (w, c) => { val l: Int = map.getOrElse(w, 0) (w, (c, l)) } }.collect().foreach(println) sc.stop() }
join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
Spark 中的广播变量就可以将闭包的数据保存到Executor的内存中
Spark 中的广播变量不能更改 : 分布式共享只读变量
封装广播变量1
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparConf) val rdd1 = sc.makeRDD(List( ("a", 1),("b", 2),("c", 3) )) val map = mutable.Map(("a", 4),("b", 5),("c", 6)) // 封装广播变量 val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map) rdd1.map { case (w, c) => { // 方法广播变量 val l: Int = bc.value.getOrElse(w, 0) (w, (c, l)) } }.collect().foreach(println) sc.stop() }