99-spark-核心编程-持久化-分区-io:
RDD持久化
1) RDD Cache 缓存 Spark02_RDD_Persist
RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
RDD中不存储数据
如果一个RDD需要重复使用,那么需要从头再次执行来获取数据
RDD对象可以重用的,但是数据无法重用
RDD对象的持久化操作不一定是为了重用
在数据执行较长,或数据比较重要的场合也可以采用持久化操作
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
RDD CheckPoint 检查点 Spark02_RDD_Persist
检查点就是通过将 RDD 中间结果写入磁盘,由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点
之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
缓存和检查点区别
1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。
RDD分区器
Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。
➢ 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
➢ 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
- Hash分区:对于给定的 key,计算其 hashCode,并除以分区个数取余
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be
negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
- Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the
default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found
$partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
...
}
def numPartitions: Int = rangeBounds.length + 1
private var binarySearch: ((Array[K], K) => Int) =
CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k,
rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
override def equals(other: Any): Boolean = other match {
...
}
override def hashCode(): Int = {
...
}
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit =
Utils.tryOrIOException {
...
}
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException
{
...
} }
自定义分区器 Spark01_RDD_Part
package spark.core.com.zh.operator.part
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
/**
* 分区器
*/
object Spark01_RDD_Part {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("hPartitioner") //appname 应用名称
val sc = new SparkContext(sparkConf)
val rdd: RDD[(String, String)] = sc.makeRDD(List(
("nba", "xx-nba-xxx"),
("cba", "xx-cba-xxx"),
("wnba", "xx-wnba-xxx"),
("nba", "xx-nba-xxx")
), 3)
val partRdd: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)
partRdd.saveAsTextFile("part-ouput ")
sc.stop()
}
//自定义分区器
//1.继承Partitioner
//2.实现方法
class MyPartitioner extends Partitioner {
//分区数量
override def numPartitions: Int = 3
//根据数据的key值返回数据的分区索引,从0开始,
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case "wnba" => 1
case _ => 0
}
}
}
}
RDD 文件读取与保存
Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件;
文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。
➢ text 文件
// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")
➢ sequence 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。在 SparkContext 中,可以调用 sequenceFilekeyClass, valueClass。
// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)
➢ object 对象文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFileT: ClassTag函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)
➢ RDD : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量
累加器:用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。注意累加器的少加和多加问题。(Spark02_Acc)
累加器实现wordcount Spark03_Acc_wordcount
未使用累加器图解,虚线表示sum并不会返回
使用累加器图解,会将executor中的sumacc进行返回,在driver进行merge
实现原理
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。如果需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。
广播变量的原因: Spark04_Broadcast
闭包数据,都是以Task为 单位发送的,每个任务中包含闭包数据
这样可能会导致,一个Executor中含有大量重复的数据,并且占用大量的内存
Executor其实就一个 JVM,所以在启动时,会自动分配内存
完全可以将任务中的闭包数据放置在Executor的内存中,达到共享的目的
Spark中的广播变量就可以将闭包的数据保存到Executor的内存中
Spark中的广播变量不能够更改:分布式共享只读变量
Spark案例测试 来源借鉴学习于(https://www.bilibili.com/video/BV11A411L7CK?p=110)
上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:
➢ 数据文件中每行数据采用下划线分隔数据
➢ 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
➢ 如果搜索关键字为 null,表示数据不是搜索数据
➢ 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据
➢ 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示
➢ 支付行为和下单行为类似
需求 1:Top10热门品类 Spark02_Req1_HotCategoryTop10Analysis2
需求 2:Top10热门品类中每个品类的Top10活跃 Session 统计 Spark03_Req2_HotCategoryTop10SessionAnalysis
需求 3:页面单跳转换率统计图解 Spark04_Req3_PageflowAnalysis
工程化代码:架构模式-三层架构
三层架构:controller(控制层),service(服务层),dao(持久层)
ThreadLocal可以对线程的内存进行控制,存储数据,共享数据
路径:big-data-study\Spark-demo\src\main\java\spark\core\com\zh\operator\framework
学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230