尚硅谷大数据技术Spark教程-笔记04【SparkCore(核心编程,RDD-行动算子-序列化-依赖关系-持久化-分区器-文件读取与保存)】

news2024/11/24 10:53:26

视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili

  1. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】
  2. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,RDD-核心属性-执行原理-基础编程-并行度与分区-转换算子)】
  3. 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,RDD-转换算子-案例实操)】
  4. 尚硅谷大数据技术Spark教程-笔记04【SparkCore(核心编程,RDD-行动算子-序列化-依赖关系-持久化-分区器-文件读取与保存)】
  5. 尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器-实现原理-基础编程)】

目录

01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P081【081.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - 介绍】04:32

P082【082.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - 算子演示】08:00

P083【083.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - aggregate】04:25

P084【084.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - countByKey & countByValue】04:45

P085【085.尚硅谷_SparkCore - 核心编程 - RDD - WordCount不同的实现方式 - (1-8)】10:16

P086【086.尚硅谷_SparkCore - 核心编程 - RDD - WordCount不同的实现方式 - (9-11)】06:03

P087【087.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - save的方法】03:41

P088【088.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - foreach】11:37

P089【089.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - 闭包检测】14:10

P090【090.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - 实际执行时的问题】12:04

P091【091.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - Kryo序列化Core介绍】10:07

P092【092.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 依赖 & 血缘关系介绍】05:17

P093【093.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 血缘关系 - 演示】11:36

P094【094.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 宽窄依赖】11:36

P095【095.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 阶段&分区&任务 - 概念解析 - 秋游了】09:41

P096【096.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 阶段划分源码解读】11:31

P097【097.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 任务划分源码解读】08:57

P098【098.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 任务分类】02:52

P099【099.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - cache & persist基本原理和演示】14:46

P100【100.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 作用】05:19

P101【101.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 检查点】03:00

P102【102.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 区别】11:48

P103【103.尚硅谷_SparkCore - 核心编程 - RDD - 分区器 - 自定义数据分区规则】09:02

P104【104.尚硅谷_SparkCore - 核心编程 - RDD - 文件读取与保存】04:36


01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P081【081.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - 介绍】04:32

5.1.4.5 RDD行动算子

 

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // TODO - 行动算子
    // 所谓的行动算子,其实就是触发作业(Job)执行的方法
    // 底层代码调用的是环境对象的runJob方法
    // 底层代码中会创建ActiveJob,并提交执行。
    rdd.collect()

    sc.stop()
  }
}

P082【082.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - 算子演示】08:00

5.1.4.5 RDD行动算子

  1. reduce
  2. collect
  3. count
  4. first
  5. take
  6. takeOrdered

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // TODO - 行动算子

    // 1、reduce
    val i: Int = rdd.reduce(_ + _)
    println(i) //10

    // 2、collect:方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
    val ints000: Array[Int] = rdd.collect()
    println(ints000.mkString(",")) //1,2,3,4

    // 3、count:数据源中数据的个数
    val cnt = rdd.count()
    println(cnt) //4

    // 4、first:获取数据源中数据的第一个
    val first = rdd.first()
    println(first) //1

    // 5、take:获取N个数据
    val ints: Array[Int] = rdd.take(3)
    println(ints.mkString(",")) //1,2,3

    // 6、takeOrdered:数据排序后,取N个数据
    val rdd1 = sc.makeRDD(List(4, 2, 3, 1))
    val ints1: Array[Int] = rdd1.takeOrdered(3)
    println(ints1.mkString(",")) //1,2,3

    sc.stop()
  }
}

P083【083.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - aggregate】04:25

5.1.4.5 RDD行动算子

7、aggregate,函数说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    // TODO - 行动算子

    // 10 + 13 + 17 = 40
    // aggregateByKey:初始值只会参与分区内计算
    // aggregate:初始值会参与分区内计算,并且和参与分区间计算
    val result1 = rdd.aggregate(10)(_ + _, _ + _)
    println(result1) //40

    sc.stop()
  }
}

P084【084.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - countByKey & countByValue】04:45

5.1.4.5 RDD行动算子

8、fold

9、countByKey

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    // TODO - 行动算子

    // 10 + 13 + 17 = 40
    // aggregateByKey:初始值只会参与分区内计算
    // aggregate:初始值会参与分区内计算,并且和参与分区间计算
    val result1 = rdd.aggregate(10)(_ + _, _ + _)
    println(result1) //40

    val result2 = rdd.fold(10)(_ + _)
    println(result2) //40

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 1, 1, 4), 2)
    val intToLong: collection.Map[Int, Long] = rdd.countByValue()
    println(intToLong) //Map(4 -> 1, 1 -> 3)

    val rdd2 = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3)
    ))
    val stringToLong: collection.Map[String, Long] = rdd2.countByKey()
    println(stringToLong) //Map(a -> 3)

    sc.stop()
  }
}

P085【085.尚硅谷_SparkCore - 核心编程 - RDD - WordCount不同的实现方式 - (1-8)】10:16

package com.atguigu.bigdata.spark.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark03_WordCount {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    wordcount91011(sc)

    sc.stop()
  }

  // groupBy
  def wordcount1(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
    val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
  }

  // groupByKey
  def wordcount2(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()
    val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
  }

  // reduceByKey
  def wordcount3(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_ + _)
  }

  // aggregateByKey
  def wordcount4(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_ + _, _ + _)
  }

  // foldByKey
  def wordcount5(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_ + _)
  }

  // combineByKey
  def wordcount6(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
      v => v,
      (x: Int, y) => x + y,
      (x: Int, y: Int) => x + y
    )
  }

  // countByKey
  def wordcount7(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: collection.Map[String, Long] = wordOne.countByKey()
  }

  // countByValue
  def wordcount8(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordCount: collection.Map[String, Long] = words.countByValue()
  }
}

P086【086.尚硅谷_SparkCore - 核心编程 - RDD - WordCount不同的实现方式 - (9-11)】06:03

package com.atguigu.bigdata.spark.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark03_WordCount {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    wordcount91011(sc)

    sc.stop()
  }

  // reduce, aggregate, fold
  def wordcount91011(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))

    // 【(word, count),(word, count)】
    // word => Map[(word,1)]
    val mapWord = words.map(
      word => {
        mutable.Map[String, Long]((word, 1))
      }
    )

    val wordCount = mapWord.reduce(
      (map1, map2) => {
        map2.foreach {
          case (word, count) => {
            val newCount = map1.getOrElse(word, 0L) + count
            map1.update(word, newCount)
          }
        }
        map1
      }
    )

    println(wordCount)
  }
}

P087【087.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - save的方法】03:41

5.1.4.5 RDD行动算子

10、save相关算子

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // val rdd = sc.makeRDD(List(1, 1, 1, 4), 2)
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3)
    ))

    // TODO - 行动算子
    rdd.saveAsTextFile("output007")
    rdd.saveAsObjectFile("output008")
    // saveAsSequenceFile方法要求数据的格式必须为K-V类型
    rdd.saveAsSequenceFile("output009")

    sc.stop()
  }
}

P088【088.尚硅谷_SparkCore - 核心编程 - RDD - 行动算子 - foreach】11:37

5.1.4.5 RDD行动算子

11、foreach

 

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // foreach 其实是Driver端内存集合的循环遍历方法
    rdd.collect().foreach(println)
    println("******************")
    // foreach 其实是Executor端内存数据打印
    rdd.foreach(println)

    // 算子 : Operator(操作)
    //    RDD的方法和Scala集合对象的方法不一样
    //    集合对象的方法都是在同一个节点的内存中完成的。
    //    RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
    //    为了区分不同的处理效果,所以将RDD的方法称之为算子。
    //    RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。
    sc.stop()
  }
}

P089【089.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - 闭包检测】14:10

5.1.4.5 RDD行动算子

11、foreach

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List[Int]())

    val user = new User()

    // SparkException: Task not serializable
    // NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User

    // RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能
    // 闭包检测
    rdd.foreach(
      num => {
        println("age = " + (user.age + num))
      }
    )
    sc.stop()
  }

  //class User extends Serializable {
  // 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
  case class User() {// class User {
    var age: Int = 30
  }
}

P090【090.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - 实际执行时的问题】12:04

5.1.4.6 RDD序列化

1)闭包检查

从计算的角度,算子以外的代码都是在Driver端执行算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测Scala2.12 版本后闭包编译方式发生了改变

2)序列化方法和属性

从计算的角度算子以外的代码都是在Driver 端执行, 算子里面的代码都是在Executor端执行,看如下代码:

package com.atguigu.bigdata.spark.core.rdd.serial

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Serial {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))

    val search = new Search("h")

    //search.getMatch1(rdd).collect().foreach(println)
    search.getMatch2(rdd).collect().foreach(println)

    sc.stop()
  }

  // 查询对象
  // 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测
  class Search(query: String) {
    def isMatch(s: String): Boolean = {
      s.contains(this.query)
    }

    //函数序列化案例
    def getMatch1(rdd: RDD[String]): RDD[String] = {
      rdd.filter(isMatch)
    }

    // 属性序列化案例
    def getMatch2(rdd: RDD[String]): RDD[String] = {
      val s = query
      rdd.filter(x => x.contains(s))
    }
  }
}

P091【091.尚硅谷_SparkCore - 核心编程 - RDD - 序列化 - Kryo序列化Core介绍】10:07

5.1.4.6 RDD序列化

3) Kryo 序列化框架

参考地址: https://github.com/EsotericSoftware/kryo

Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。

object serializable_Kryo {
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher])) val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)

val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)

result.collect.foreach(println)
}
}

case class Searcher(val query: String) {
}

def isMatch(s: String) = { s.contains(query)
}

def getMatchedRDD1(rdd: RDD[String]) = { rdd.filter(isMatch)
}

def getMatchedRDD2(rdd: RDD[String]) = { val q = query rdd.filter(_.contains(q))
}

P092【092.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 依赖 & 血缘关系介绍】05:17

5.1.4.7 RDD依赖关系

1) RDD血缘关系

P093【093.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 血缘关系 - 演示】11:36

package com.atguigu.bigdata.spark.core.rdd.dep

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Spark01_RDD_Dep {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    val lines: RDD[String] = sc.textFile("datas/word.txt")
    println(lines.toDebugString)
    println("*************************")

    val words: RDD[String] = lines.flatMap(_.split(" "))
    println(words.toDebugString)
    println("*************************")

    val wordToOne = words.map(word => (word, 1))
    println(wordToOne.toDebugString)
    println("*************************")

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
    println(wordToSum.toDebugString)
    println("*************************")

    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)

    sc.stop()
  }
}

P094【094.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 宽窄依赖】11:36

5.1.4.8 RDD持久化

2) RDD CheckPoint 检查点

   

3) RDD窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。

4) RDD宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。

P095【095.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 阶段&分区&任务 - 概念解析 - 秋游了】09:41

5.1.4.8 RDD持久化

5) RDD 阶段划分

P096【096.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 阶段划分源码解读】11:31

5.1.4.8 RDD持久化

6) RDD阶段划分源码

P097【097.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 任务划分源码解读】08:57

5.1.4.8 RDD持久化

7) RDD 任务划分

RDD任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个Application;
  • Job:一个Action 算子就会生成一个Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个RDD 的分区个数就是Task 的个数。

注意:Application->Job->Stage->Task 每一层都是 1 n 的关系。

 

P098【098.尚硅谷_SparkCore - 核心编程 - RDD - 依赖关系 - 任务分类】02:52

5.1.4.8 RDD持久化

7) RDD 任务划分

8) RDD任务划分源码

val tasks: Seq[Task[_]] = try { stage match {
case stage: ShuffleMapStage => partitionsToCompute.map { id =>
val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary,	part,	locs,	stage.latestInfo.taskMetrics,	properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage => partitionsToCompute.map { id =>
val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}

……

val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

……

override def findMissingPartitions(): Seq[Int] = { mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}

P099【099.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - cache & persist基本原理和演示】14:46

5.1.4.8 RDD 持久化

1) RDD Cache 缓存

  

package com.atguigu.bigdata.spark.core.rdd.persist

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Persist {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    val list = List("Hello Scala", "Hello Spark")

    val rdd = sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map((_, 1))

    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)

    reduceRDD.collect().foreach(println)
    println("**************************************")

    val rdd1 = sc.makeRDD(list)

    val flatRDD1 = rdd1.flatMap(_.split(" "))

    val mapRDD1 = flatRDD1.map((_, 1))

    val groupRDD = mapRDD1.groupByKey()

    groupRDD.collect().foreach(println)

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.persist

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Persist {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
    val sc = new SparkContext(sparConf)

    val list = List("Hello Scala", "Hello Spark")

    val rdd = sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map(word => {
      println("@@@@@@@@@@@@")
      (word, 1)
    })

    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
    reduceRDD.collect().foreach(println)
    println("**************************************")
    val groupRDD = mapRDD.groupByKey()
    groupRDD.collect().foreach(println)

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.persist

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Persist {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
    val sc = new SparkContext(sparConf)

    val list = List("Hello Scala", "Hello Spark")

    val rdd = sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map(word => {
      println("@@@@@@@@@@@@")
      (word, 1)
    })
    // cache默认持久化的操作,只能将数据保存到内存中,如果想要保存到磁盘文件,需要更改存储级别
    // mapRDD.cache()

    // 持久化操作必须在行动算子执行时完成的。
    mapRDD.persist(StorageLevel.DISK_ONLY)

    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
    reduceRDD.collect().foreach(println)
    println("**************************************")
    val groupRDD = mapRDD.groupByKey()
    groupRDD.collect().foreach(println)

    sc.stop()
  }
}

P100【100.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 作用】05:19

5.1.4.8 RDD 持久化

1) RDD Cache 缓存

P101【101.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 检查点】03:00

5.1.4.8 RDD 持久化

2) RDD CheckPoint 检查点

package com.atguigu.bigdata.spark.core.rdd.persist

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Persist {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
    val sc = new SparkContext(sparConf)
    sc.setCheckpointDir("cp")

    val list = List("Hello Scala", "Hello Spark")

    val rdd = sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map(word => {
      println("@@@@@@@@@@@@")
      (word, 1)
    })
    // checkpoint 需要落盘,需要指定检查点保存路径
    // 检查点路径保存的文件,当作业执行完毕后,不会被删除
    // 一般保存路径都是在分布式存储系统中:HDFS
    mapRDD.checkpoint()

    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
    reduceRDD.collect().foreach(println)
    println("**************************************")
    val groupRDD = mapRDD.groupByKey()
    groupRDD.collect().foreach(println)

    sc.stop()
  }
}

P102【102.尚硅谷_SparkCore - 核心编程 - RDD - 持久化 - 区别】11:48

5.1.4.8 RDD 持久化

3) 缓存和检查点区别

  • 1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
  • 2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
  • 3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。
package com.atguigu.bigdata.spark.core.rdd.persist

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Persist {
  def main(args: Array[String]): Unit = {
    // cache : 将数据临时存储在内存中进行数据重用
    // persist : 将数据临时存储在磁盘文件中进行数据重用
    //           涉及到磁盘IO,性能较低,但是数据安全
    //           如果作业执行完毕,临时保存的数据文件就会丢失
    // checkpoint : 将数据长久地保存在磁盘文件中进行数据重用
    //           涉及到磁盘IO,性能较低,但是数据安全
    //           为了保证数据安全,所以一般情况下,会独立执行作业
    //           为了能够提高效率,一般情况下,是需要和cache联合使用

    val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
    val sc = new SparkContext(sparConf)
    sc.setCheckpointDir("cp")

    val list = List("Hello Scala", "Hello Spark")

    val rdd = sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map(word => {
      println("@@@@@@@@@@@@")
      (word, 1)
    })
    mapRDD.cache()
    mapRDD.checkpoint()
    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
    reduceRDD.collect().foreach(println)
    println("**************************************")
    val groupRDD = mapRDD.groupByKey()
    groupRDD.collect().foreach(println)
    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.persist

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Persist {
  def main(args: Array[String]): Unit = {
    // cache : 将数据临时存储在内存中进行数据重用
    //         会在血缘关系中添加新的依赖。一旦,出现问题,可以重头读取数据
    // persist : 将数据临时存储在磁盘文件中进行数据重用
    //           涉及到磁盘IO,性能较低,但是数据安全
    //           如果作业执行完毕,临时保存的数据文件就会丢失
    // checkpoint : 将数据长久地保存在磁盘文件中进行数据重用
    //           涉及到磁盘IO,性能较低,但是数据安全
    //           为了保证数据安全,所以一般情况下,会独立执行作业
    //           为了能够提高效率,一般情况下,是需要和cache联合使用
    //           执行过程中,会切断血缘关系。重新建立新的血缘关系
    //           checkpoint等同于改变数据源

    val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
    val sc = new SparkContext(sparConf)
    sc.setCheckpointDir("cp")

    val list = List("Hello Scala", "Hello Spark")

    val rdd = sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map(word => {
      (word, 1)
    })
    //mapRDD.cache()
    mapRDD.checkpoint()
    println(mapRDD.toDebugString)
    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
    reduceRDD.collect().foreach(println)
    println("**************************************")
    println(mapRDD.toDebugString)

    sc.stop()
  }
}

P103【103.尚硅谷_SparkCore - 核心编程 - RDD - 分区器 - 自定义数据分区规则】09:02

5.1.4.9 RDD分区器

package com.atguigu.bigdata.spark.core.rdd.part

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

object Spark01_RDD_Part {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List(
      ("nba", "xxxxxxxxx"),
      ("cba", "xxxxxxxxx"),
      ("wnba", "xxxxxxxxx"),
      ("nba", "xxxxxxxxx"),
    ), 3)
    val partRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)

    partRDD.saveAsTextFile("output")

    sc.stop()
  }

  /**
   * 自定义分区器
   * 1. 继承Partitioner
   * 2. 重写方法
   */
  class MyPartitioner extends Partitioner {
    // 分区数量
    override def numPartitions: Int = 3

    // 根据数据的key值返回数据所在的分区索引(从0开始)
    override def getPartition(key: Any): Int = {
      key match {
        case "nba" => 0
        case "wnba" => 1
        case _ => 2
      }
    }
  }
}

P104【104.尚硅谷_SparkCore - 核心编程 - RDD - 文件读取与保存】04:36

5.1.4.10 RDD文件读取与保存

package com.atguigu.bigdata.spark.core.rdd.io

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_IO_Save {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(
      List(
        ("a", 1),
        ("b", 2),
        ("c", 3)
      )
    )

    rdd.saveAsTextFile("output1111")
    rdd.saveAsObjectFile("output2222")
    rdd.saveAsSequenceFile("output3333")

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.io

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_IO_Load {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    val rdd = sc.textFile("output011")
    println(rdd.collect().mkString(","))

    val rdd1 = sc.objectFile[(String, Int)]("output012")
    println(rdd1.collect().mkString(","))

    val rdd2 = sc.sequenceFile[String, Int]("output013")
    println(rdd2.collect().mkString(","))

    sc.stop()
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/461642.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

加强人工智能共性技术研发与产业化协同发展

央视网消息:“以5G为代表的新一代信息技术与制造业、交通、旅游等实体经济重要领域深度融合。”4月20日下午,国新办举行一季度工业和信息化发展情况新闻发布会,相关部门负责人在答问时表示,将用好融合应用这把金钥匙,开…

ReactHook学习(第一篇-N)

文章目录 Hook简介概述class组件的不足什么是 Hook?Hook 使用规则 state的研究(useState)State:组件的记忆(响应式数据)当普通的变量无法满足时添加一个 state 变量遇见你的第一个 Hook剖析 useState 赋予一个组件多个…

【C++】面向对象

文章目录 3.1 类与对象3.1.1 类成员的访问控制3.1.2 类的成员函数对象的访问方式成员函数的实现内联成员函数 3.1.3 构造函数复制构造函数调用复制构造函数的三种情况深复制与浅复制? 析构函数类的组合 3.1.4 前向引用声明3.1.5 结构体与类对比3.1.6 UML类图属性表示…

IMX6ULL裸机篇之按键消抖实验

一. 按键消抖 在之前的 按键中断实验时,我们讲了如何使用中断的方式驱动按键或GPIO。如果通过中断的方式处理按键的话,按键是需要消抖处理的。 而在之前 按键中断实验中,在中断处理函数中对按键进行消抖,调用了 delay 延时函数。…

剑指 Offer 32 - II. 从上到下打印二叉树 II

目录 题目思路BFS 题目来源 剑指 Offer 32 - II. 从上到下打印二叉树 II 题目思路 I. 按层打印: 题目要求的二叉树的 从上至下 打印(即按层打印),又称为二叉树的 广度优先搜索(BFS)。BFS 通常借助 队列 的…

Midjourney v4 | 如何结合参考图像来生成AI艺术图

网址:midjourney.com 首页展示 首页如下图: 第一步:进入社群 点击首页右下角“Join the Beta”,进入如下页面: 点击“接受邀请”,验证之后进入 可以点击认证账号,进行注册: 应该不…

Redis三种集群模式

一、引言 Redis有三种集群模式,第一个就是主从模式,第二种“哨兵”模式,第三种是Cluster集群模式,第三种的集群模式是在Redis 3.x以后的版本才增加进来的,我们今天就来说一下Redis第一种集群模式:主从集群模…

【英语】100个句子记完7000个托福单词

其实主要的7000词其实是在主题归纳里面,不过过一遍100个句子也挺好的,反正也不多。 文章目录 Sentence 01Sentence 02Sentence 03Sentence 04Sentence 05Sentence 06Sentence 07Sentence 08Sentence 09Sentence 10Sentence 11Sentence 12Sentence 13Sent…

Redis的底层数据结构

Redis的底层数据结构 Redis的底层数据类型(对比)Redis的底层数据结构Redis数据类型和底层数据结构的对应关系Redis的使用 Redis的底层数据类型(对比) String(字符串)List(列表)Hash…

CRE66365 应用资料

CRE66365是一款高度集成的电流模式PWM控制IC,为高性能、低待机功耗和低成本的隔离型反激转换器。在正常负载条件下,AC输入高电压下工作在QR模式。为了最大限度地减少开关损耗,QR 模式下的最大开关频率被内部限制为 77kHz。当负载较低时&#…

Dcoekr 部署前后端分离项目SpringBoot +Vue

1.docker 部署vue docker 安装 nginx的镜像 niginx 配置文件 nginx.conf #user nobody; worker_processes 1;#error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info;#pid logs/nginx.pid;events {worker_connections…

给大家分享一个比Top更好用的Linux进程管理工具htop

一、前言 相信用过Linux操作系统的同学对Top应该都不陌生,我们通过Top命令可以查看CPU的占用率以及每个进程的详细信息,但是今天我要给大家分享一个比Top更好用的进程管理工具htop(High Top)。 二、htop功能介绍 htop 是一个高…

Shell编程规范及变量

这里写目录标题 一、Shell脚本编程概述1.1 shell脚本的概念1.2Shell脚本应用场景1.3 shell的作用1.4 linux中有哪些shell 二、 shell脚本的使用2.1shell脚本的构成2.2 运行脚本2.3 重定向和管道操作2.31交互式硬件设备2.32 重定向操作2.33 管道符号 三、shell脚本变量3.1 shell…

【FPGA-DSP】第九期:音频信号处理

从本文开始将记录一些简单的音频信号处理算法在System Generator中的实现方法。本文将介绍如何搭建音频信号的采集与输出模型。 音频信号属于一维信号,一些基本概念如下: 采样频率:根据奈奎斯特采样定理,采样频率Fs应该不低于声…

Vite vue 使用cdn引入element-plus

vite-plugin-cdn-import:cdn的引入插件 npm i vite-plugin-cdn-import or pnpm i vite-plugin-cdn-import vite.config.js import AutoImport from unplugin-auto-import/viteexport default defineConfig({ plugins: [vue({reactivityTransform: true}),importT…

0401概述-最短路径-加权有向图-数据结构和算法(Java)

文章目录 1 最短路径2 最短路径的性质3 加权有向图的数据结构3.1 加权有向边3.2 加权有向图 4 最短路径4.1 最短路径API4.2 最短路径的数据结构4.3 边的松弛4.4 顶点的松弛 结语 1 最短路径 如图1-1所示,一幅加权有向图和其中的一条最短路径: 定义&…

事务—MySQL

文章目录 1.事务的四大特性1.1原子性1.2一致性1.3隔离性1.4持久性 2.并发访问中存在的一些问题2.1丢失更新2.2脏读2.3不可重复读2.4幻读 3.隔离级别解决一致性的问题3.1未提交读3.2提交读3.3可重复读3.4可串行化 4.不同隔离级别可以解决的问题 1.事务的四大特性 1.1原子性 事…

BBR原版/魔改/plus/锐速/七合一脚本linux加速脚本/硬盘挂载/cc防御/宝塔

BBR原版/魔改/plus/锐速七合一脚本linux加速脚本/硬盘挂载/CC防御/宝塔 新云分享的七合一脚本,包含原版BBR、魔改BBR、bbrplus以及锐速可选。 在vultr上Centos 7, Debian 8/9, Ubuntu 16/18测试通过,不支持ovz。 安装指令:复制下面命令在s…

Openswan安装和简单配置

Openswan安装和简单配置 安装环境: 操作系统:Ubuntu20.0.4TLS 用户权限:root下载Openswan: wget https://github.com/xelerance/Openswan/archive/refs/tags/v3.0.0.zip安装Openswan: 解压Openswan:(PS&#xff1a…

[golang gin框架] 26.Gin 商城项目-前台自定义商品列表模板, 商品详情数据渲染,Markdown语法使用

一.前台自定义商品列表模板 当在首页分类点击进入分类商品列表页面时,可以根据后台分类中的分类模板跳转到对应的模板商品列表页面 1.管理后台商品分类模板设置如下图所示 2.代码展示 (1).商品控制器方法Category()完善 修改controllers/frontend/productController…