一、Spark的三种持久化机制
1、cache
它是persist的一种简化方式,作用是将RDD缓存到内存中,以便后续快速访问,提高计算效率。cache操作是懒执行的,即执行action算子时才会触发。
2、persist
它提供了不同的存储级别(仅磁盘、仅内存、内存或磁盘、内存或磁盘+副本数、序列化后存入内存或磁盘、堆外)可以根据不同的应用场景进行选择。
3、checkpoint
它将数据永久保存,用于减少长血缘关系带来的容错成本。checkpoint不仅保存了数据,还保存了计算该数据的算子操作。当需要恢复数据时,可以通过这些操作重新计算,而不仅仅是依赖于原始数据。且在作业完成后仍然保留,可以用于后续的计算任务。
二、用法示例
1、cache
//制作数据
val data: RDD[Int] = sc.parallelize( 1 to 10000)
//简单加工
val tempRdd: RDD[(String, Int)] = data.map(num=>if(num%2==0)("even",num)else("odd",num))
//缓存
tempRdd.cache()
//调用action算子运行
tempRdd.foreach(println)
我们看下tempRdd的存储情况:
2、persist
//制作数据
val data: RDD[Int] = sc.parallelize( 1 to 10000)
//简单加工
val tempRdd: RDD[(String, Int)] = data.map(num=>if(num%2==0)("even",num)else("odd",num))
//持久化
tempRdd.persist(StorageLevel.MEMORY_AND_DISK)
//调用action算子运行
tempRdd.foreach(println)
3、checkpoint
//使用checkpoint之前需要用sc先设置检查点目录
sc.setCheckpointDir("./local-spark/checkpoint-data")
//制作数据
val data:RDD[Int] = sc.parallelize( 1 to 10000)
//简单加工
val tempRdd:RDD[(String, Int)] = data.map(num=>if(num%2==0)("even",num)else("odd",num))
//持久化
tempRdd.persist(StorageLevel.MEMORY_AND_DISK)
//创建checkpoint 会触发job
tempRdd.checkpoint()
//调用action算子运行
tempRdd.foreach(println)
从历史服务界面可以观察到,该程序启动了两个job(在源码分析中我们就会知道原因)
我们再看下两个job的DAG
发现重复的计算跑了两次,因此我们在使用checkpoint前一般都会添加一个persist来进行加速
下面是添加完persist后再进行checkpoint的DAG,虽然也是两个Job,但是tempRdd上的那个点变了颜色,这意味着tempRdd之前的步骤就不用重复计算了
三、源码分析
1、cache
//使用默认存储级别(`MEMORY_ONLY`)持久化此RDD
def cache(): this.type = persist()
//其实背后就是使用的persist
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
2、persist
RDD
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
//设置此RDD的存储级别,以便在第一次计算后跨操作持久化其值。
/只有当RDD尚未设置存储级别时,这才能用于分配新的存储级别。本地检查点是一个例外。
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
//这意味着用户之前调用了localCheckpoint(),它应该已经将此RDD标记为持久化。
//在这里,我们应该用用户明确请求的存储级别(在将其调整为使用磁盘后)覆盖旧的存储级别。
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
//标记此RDD以使用指定级别进行持久化
//newLevel 目标存储级别
//allowOverride 是否用新级别覆盖任何现有级别
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// 如果想要重新调整一个RDD的存储级别,就必须将allowOverride 置为 true
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// 如果这是第一次将此RDD标记为持久化,请在SparkContext中注册它以进行清理和核算。只做一次。
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
//注册此RDD以持久化在内存和/或磁盘存储中
sc.persistRDD(this)
}
//设置该RDD的storageLevel 以便在Task计算时直接获取数据,来加速计算
storageLevel = newLevel
this
}
//迭代器嵌套计算,如果该RDD是持久化的,就直接获取数据封装成iterator给后续RDD使用
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
//获取或计算RDD分区
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// 此方法在executors上调用,因此需要调用SparkEnv.get而不是sc.env 获取blockManager
//接下来我们看下BlockManager的getOrElseUpdate方法
//最后一个参数是一个匿名函数,如果缓存中没有块,需要调用它来获取块
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
// Block hit.
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
// Need to compute the block.
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
}
//当缓存中没有块时调用它来制作块
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
//如果checkpointed和materialized 那么直接返回
firstParent[T].iterator(split, context)
} else {
//继续计算,通过迭代器嵌套计算,知道读取到有持久化的块或者进行Shuffle或者最初的数据源
compute(split, context)
}
}
}
SparkContext
class SparkContext(config: SparkConf) extends Logging {
//跟踪所有持久的RDD
private[spark] val persistentRdds = {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala
}
private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}
}
BlockManager
private[spark] class BlockManager(
val executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
externalBlockStoreClient: Option[ExternalBlockStoreClient])
extends BlockDataManager with BlockEvictionHandler with Logging {
//如果给定的块存在,则检索它,
//否则调用提供的`makeIterator `方法来计算该块,持久化它,并返回其值。
def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
// 尝试从本地或远程存储读取块。如果它存在,那么我们就不需要通过local-get-or-put路径。
get[T](blockId)(classTag) match {
case Some(block) =>
return Left(block)
case _ =>
// 没有获取到块,需要计算,如果该RDD设置了持久化就对其持久化
}
// 最初,我们在这个块上没有锁.
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
// doPut() 没有将工作交还给我们,因此该块已经存在或已成功存储。
//因此,我们现在在块上持有读取锁。
val blockResult = getLocalValues(blockId).getOrElse {
// 由于我们在doPut()和get()调用之间保持了读取锁,因此该块不应该被驱逐,因此get()不返回该块表示存在一些内部错误
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
// 我们已经通过doPut()调用在块上持有读取锁,getLocalValue()再次获取锁,因此我们需要在这里调用releaseLock(),这样锁获取的净次数为1(因为调用者只会调用release())一次)。
releaseLock(blockId)
Left(blockResult)
case Some(iter) =>
// put失败,可能是因为数据太大,无法放入内存,无法放入磁盘。因此,我们需要将输入迭代器传递回调用者,以便他们可以决定如何处理这些值(例如,在不缓存的情况下处理它们)。
Right(iter)
}
}
//根据给定级别将给定块放入其中一个块存储中,必要时复制值
//如果该块已存在,则此方法不会覆盖它。
private def doPutIterator[T](
blockId: BlockId,
iterator: () => Iterator[T],
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeNs = System.nanoTime()
var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
// 块的大小(字节)
var size = 0L
//如果RDD持久化选择有内存
if (level.useMemory) {
// 先把它放在内存中,即使它也将useDisk设置为true;如果内存存储无法容纳它,我们稍后会将其放入磁盘。
//如果RDD持久化选择需要反序列化
if (level.deserialized) {
//尝试将给定块作为值放入内存存储中
memoryStore.putIteratorAsValues(blockId, iterator(), level.memoryMode, classTag) match {
case Right(s) =>
size = s
case Left(iter) =>
// 没有足够的空间展开此块;如果持久化也选择了磁盘,请下载到磁盘
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
}
size = diskStore.getSize(blockId)
} else {
iteratorFromFailedMemoryStorePut = Some(iter)
}
}
} else { // RDD持久化没有选择反序列化
//尝试将给定块作为字节放入内存存储中
memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
case Right(s) =>
size = s
case Left(partiallySerializedValues) =>
// 没有足够的空间展开此块;如果持久化也选择了磁盘,请下载到磁盘
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
partiallySerializedValues.finishWritingToStream(out)
}
size = diskStore.getSize(blockId)
} else {
iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
}
}
}
//RDD持久化时也选择了磁盘
} else if (level.useDisk) {
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
}
size = diskStore.getSize(blockId)
}
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// 现在该块位于内存或磁盘存储中,请将其告知主机
info.size = size
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug(s"Put block $blockId locally took ${Utils.getUsedTimeNs(startTimeNs)}")
//如果RDD持久化选择的副本数大于1
if (level.replication > 1) {
val remoteStartTimeNs = System.nanoTime()
val bytesToReplicate = doGetLocalBytes(blockId, info)
val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
scala.reflect.classTag[Any]
} else {
classTag
}
try {
replicate(blockId, bytesToReplicate, level, remoteClassTag)
} finally {
bytesToReplicate.dispose()
}
logDebug(s"Put block $blockId remotely took ${Utils.getUsedTimeNs(remoteStartTimeNs)}")
}
}
assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
iteratorFromFailedMemoryStorePut
}
}
}
3、checkpoint
RDD
//将此RDD标记为检查点。它将被保存到使用`SparkContext#setCheckpointDir`设置的检查点目录中的一个文件中,并且对其父RDD的所有引用都将被删除。必须在此RDD上执行任何作业之前调用此函数。强烈建议将此RDD持久化在内存中,否则将其保存在文件上将需要重新计算。
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// 注意:由于下游的复杂性,我们在这里使用全局锁来确保子RDD分区指向正确的父分区。今后我们应该重新考虑这个问题。
if (context.checkpointDir.isEmpty) {
//SparkContext中尚未设置检查点目录 , 因此使用之前需要用sc先设置检查点目录
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
ReliableRDDCheckpointData
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
//........省略..........
//将此RDD具体化,并将其内容写入可靠的DFS。在该RDD上调用的第一个action 完成后立即调用。
protected override def doCheckpoint(): CheckpointRDD[T] = {
//将RDD写入检查点文件,并返回表示RDD的ReliableCheckpointRDD
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
// 如果引用超出范围,可以选择清理检查点文件
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}
}
ReliableCheckpointRDD
private[spark] object ReliableCheckpointRDD extends Logging {
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
val sc = originalRDD.sparkContext
// 为检查点创建输出路径
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
}
// 保存到文件,并将其重新加载为RDD
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// 这很昂贵,因为它不必要地再次计算RDD ,因此一般都会在检查点前调用持久化
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
if (originalRDD.partitioner.nonEmpty) {
//将分区器写入给定的RDD检查点目录。这是在尽最大努力的基础上完成的;写入分区器时的任何异常都会被捕获、记录并忽略。
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took $checkpointDurationMs ms.")
//从以前写入可靠存储的检查点文件中读取的RDD
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
}
newRDD
}
}
什么时候对RDD进行checkpoint
当该RDD所属的Job执行后再对该RDD进行checkpoint
class SparkContext(config: SparkConf) extends Logging {
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
//执行任务
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
//递归调用父RDD查看是否要进行checkpoint
rdd.doCheckpoint()
}
}
abstract class RDD[T: ClassTag](... ) extends Serializable with Logging {
//递归函数
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
if (checkpointAllMarkedAncestors) {
// 我们可以收集所有需要检查点的RDD,然后并行检查它们。首先检查父母,因为我们的血统在检查自己后会被截断
dependencies.foreach(_.rdd.doCheckpoint())
}
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
}
总结
1、RDD执行checkpoint方法,对该RDD进行标记
2、RDD所在的Job执行
3、执行完会用这个Job最后的RDD递归向父寻找,找到所有的被标记需要checkpoint的RDD,再次调用runJob启动任务,将这个RDD进行checkpoint
所以我们在对RDD进行checkpoint前一般会对其persist