1) RDD Cache 缓存
- RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存
在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算
子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存。
wordToOneRdd.cache()
// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
存储级别
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
- 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机
制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数
据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,
并不需要重算全部 Partition。 - Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样
做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时
候,如果想重用数据,仍然建议调用 persist 或 cache。
2) RDD CheckPoint 检查点
所谓的检查点其实就是通过将 RDD 中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点
之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
word => {
(word, System.currentTimeMillis())
}
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)
3) 缓存和检查点区别
1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存
储在 HDFS 等容错、高可用的文件系统,可靠性高。
3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存
中读取数据即可,否则需要再从头计算一次 RDD。
缓存(Caching)和检查点(Checkpointing)是在数据处理和计算中常见的技术,它们有以下区别:
-
目的和使用场景:
- 缓存:缓存是将计算过程中的中间结果存储在内存或其他高速存储设备中,以便在后续的计算中重复使用。它主要用于优化计算性能,减少重复计算的开销。
- 检查点:检查点是将整个数据集的状态存储在持久化存储介质中,以便在系统故障或需要恢复时能够快速恢复数据。它主要用于提供容错性和可恢复性。
-
存储位置:
- 缓存:缓存通常将数据存储在内存或其他高速存储设备中,以便快速访问。
- 检查点:检查点将数据存储在持久化存储介质(如磁盘或分布式文件系统)中,以确保数据的持久性和可靠性。
-
存储内容:
- 缓存:缓存通常存储计算过程中的中间结果或经常访问的数据,以便在后续的计算中重复使用。
- 检查点:检查点存储整个数据集的状态,包括所有的数据和计算中间结果。
-
开销和效率:
- 缓存:缓存可以减少重复计算的开销,但需要消耗额外的内存或存储资源。对于大规模数据或计算密集型任务,过多的缓存可能导致内存压力或性能问题。
- 检查点:检查点需要将整个数据集写入磁盘或持久化存储介质,因此可能会有一定的存储和IO开销。对于大规模数据集或频繁的检查点操作,需要考虑存储和IO资源的管理和性能优化。
总的来说,缓存主要用于优化计算性能和减少重复计算,而检查点主要用于提供容错性和数据恢复能力。在实际应用中,可以根据具体的场景和需求来选择使用缓存还是检查点,或者两者结合使用,以实现更好的性能和可靠性。