【Flink状态管理(八)】Checkpoint:CheckpointBarrier对齐后Checkpoint的完成、通知与对学习状态管理源码的思考

news2025/1/6 19:20:19

文章目录

  • 一. 调用StreamTask执行Checkpoint操作
    • 1. 执行Checkpoint总体代码流程
      • 1.1. StreamTask.checkpointState()
      • 1.2. executeCheckpointing
      • 1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中
      • 1.4. 算子状态进行快照
      • 1.5. 状态数据快照持久化
  • 二. CheckpointCoordinator管理Checkpoint
    • 1. Checkpoint执行完毕后的确认过程
    • 2. 触发并完成Checkpoint操作
    • 3. 通知CheckpointComplete给TaskExecutor
  • 三. 状态管理学习小结

上文介绍了CheckpointBarrier的对齐操作,当CheckpointBarrier完成对齐操作后,接下来就是通过notifyCheckpoint()方法触发StreamTask节点的Checkpoint操作。

一. 调用StreamTask执行Checkpoint操作

如下代码,notifyCheckpoint()方法主要包含如下逻辑。

> 1. 判断toNotifyOnCheckpoint不为空。
> 2. 创建CheckpointMetaDataCheckpointMetrics实例,CheckpointMetaData用于存储
> Checkpoint的元信息,CheckpointMetrics用于记录和监控Checkpoint监控指标。
> 3. 触发StreamTask中算子的Checkpoint操作。
protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, 
                                long bufferedBytes, 
                                long alignmentDurationNanos) throws Exception {
   if (toNotifyOnCheckpoint != null) {
      // 创建CheckpointMetaData对象用于存储Meta信息
      CheckpointMetaData checkpointMetaData =
         new CheckpointMetaData(checkpointBarrier.getId(), 
                                checkpointBarrier.getTimestamp());
            // 创建CheckpointMetrics对象用于记录监控指标
      CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
         .setBytesBufferedInAlignment(bufferedBytes)
         .setAlignmentDurationNanos(alignmentDurationNanos);
      // 调用toNotifyOnCheckpoint.triggerCheckpointOnBarrier()方法触发Checkpoint
        操作
      toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
         checkpointMetaData,
         checkpointBarrier.getCheckpointOptions(),
         checkpointMetrics);
   }
}

注意:StreamTask是唯一实现了Checkpoint方法的子类,即只有StreamTask才能触发当前Task实例中的Checkpoint操作。

 

接下来具体看Checkpoint执行细节

1. 执行Checkpoint总体代码流程

Checkpoint触发过程分为两种情况:一种是CheckpointCoordinator周期性地触发数据源节点中的Checkpoint操作;另一种是下游算子通过对齐CheckpointBarrier事件触发本节点算子的Checkpoint操作。

不管是哪种方式触发Checkpoint,最终都是调用StreamTask.performCheckpoint()方法实现StreamTask实例中状态数据的持久化操作。

 

在StreamTask.performCheckpoint()方法中,首先判断当前的Task是否运行正常,然后使用actionExecutor线程池执行Checkpoint操作,Checkpoint的实际执行过程如下。

  1. Checkpoint执行前的准备操作,让OperatorChain中所有的Operator执行Pre-barrier工作。
  2. 将CheckpointBarrier事件发送到下游的节点中。
  3. 算子状态数据进行快照

执行checkpointState()方法,对StreamTask中OperatorChain的所有算子进行状态数据的快照操作,该过程为异步非阻塞过程,不影响数据的正常处理进程,执行完成后会返回True到CheckpointInputGate中。

  1. task挂掉情况处理:
  • 如果isRunning的条件为false,表明Task不在运行状态,此时需要给OperatorChain中的所有算子发送CancelCheckpointMarker消息,这里主要借助recordWriter.broadcastEvent(message)方法向下游算子进行事件广播。
  • 当且仅当OperatorChain中的算子还没有执行完Checkpoint操作的时候,下游的算子接收到CancelCheckpointMarker消息后会立即取消Checkpoint操作。
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics,
      boolean advanceToEndOfTime) throws Exception {
   LOG.debug("Starting checkpoint ({}) {} on task {}",
             checkpointMetaData.getCheckpointId(), 
             checkpointOptions.getCheckpointType(), 
             getName());
   final long checkpointId = checkpointMetaData.getCheckpointId();
   if (isRunning) {
      // 使用actionExecutor执行Checkpoint逻辑
      actionExecutor.runThrowing(() -> {
         if (checkpointOptions.getCheckpointType().isSynchronous()) {
             setSynchronousSavepointId(checkpointId);
             if (advanceToEndOfTime) {
                 advanceToEndOfEventTime();
            }
         }
         //Checkpoint操作的准备工作
         operatorChain.prepareSnapshotPreBarrier(checkpointId);
         //将checkpoint barrier发送到下游的stream中
         operatorChain.broadcastCheckpointBarrier(
               checkpointId,
               checkpointMetaData.getTimestamp(),
               checkpointOptions);
         //对算子中的状态进行快照操作,此步骤是异步操作,
         //不影响streaming拓扑中数据的正常处理
         checkpointState(checkpointMetaData, checkpointOptions, 
            checkpointMetrics);
      });
      return true;
   } else {
      // 如果Task处于其他状态,则向下游广播CancelCheckpointMarker消息
      actionExecutor.runThrowing(() -> {
         final CancelCheckpointMarker message = 
             new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
         recordWriter.broadcastEvent(message);
      });
      return false;
   }
}

 

1.1. StreamTask.checkpointState()

接下来我们看StreamTask.checkpointState()方法的具体实现,如下代码。

  1. 创建CheckpointStateOutputStream实例。主要有如下两种实现类:
    • FsCheckpointStateOutputStream:文件类型系统
    • MemoryCheckpointOutputStream:内存的数据流输出。
  2. 创建CheckpointingOperation实例,CheckpointingOperation封装了Checkpoint执行的具体操作流程,以及checkpointMetaData、checkpointOptions、storage和checkpointMetrics等Checkpoint执行过程中需要的环境配置信息。
  3. 调用CheckpointingOperation.executeCheckpointing()方法执行Checkpoint操作。
private void checkpointState(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics) throws Exception {
     // 创建CheckpointStreamFactory实例
   CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorag
      eLocation(
         checkpointMetaData.getCheckpointId(),
         checkpointOptions.getTargetLocation());
     // 创建CheckpointingOperation实例
   CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
      this,
      checkpointMetaData,
      checkpointOptions,
      storage,
      checkpointMetrics);
   // 执行Checkpoint操作
   checkpointingOperation.executeCheckpointing();
}

 

1.2. executeCheckpointing

如代码所示,CheckpointingOperation.executeCheckpointing()方法主要包含如下逻辑。

  1. 遍历所有StreamOperator算子,然后调用checkpointStreamOperator()方法为每个算子创建OperatorSnapshotFuture对象。这一步将所有算子的快照操作存储在OperatorSnapshotFutures集合中。
  2. 将OperatorSnapshotFutures存储到operatorSnapshotsInProgress的键值对集合中,其中Key为OperatorID,Value为该算子执行状态快照操作对应的OperatorSnapshotFutures对象
  3. 创建AsyncCheckpointRunnable线程对象,AsyncCheckpointRunnable实例中包含了创建好的OperatorSnapshotFutures集合。
  4. 调用StreamTask.asyncOperationsThreadPool线程池运行asyncCheckpointRunnable线程,执行operatorSnapshotsInProgress集合中算子的异步快照操作。
public void executeCheckpointing() throws Exception {
   //通过算子创建执行快照操作的OperatorSnapshotFutures对象
   for (StreamOperator<?> op : allOperators) {
      checkpointStreamOperator(op);
   }
   // 此处省略部分代码
   startAsyncPartNano = System.nanoTime();
   checkpointMetrics.setSyncDurationMillis(
      (startAsyncPartNano - startSyncPartNano) / 1_000_000);
   AsyncCheckpointRunnable asyncCheckpointRunnable = new 
      AsyncCheckpointRunnable(
      owner,
      operatorSnapshotsInProgress,
      checkpointMetaData,
      checkpointMetrics,
      startAsyncPartNano);
   // 注册Closeable操作
   owner.cancelables.registerCloseable(asyncCheckpointRunnable);
   // 执行asyncCheckpointRunnable
         owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
 }

 

1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中

如下代码,AbstractStreamOperator.snapshotState()方法将当前算子的状态快照操作封装在OperatorSnapshotFutures对象中,然后通过asyncOperationsThreadPool线程池异步触发所有的OperatorSnapshotFutures操作,方法主要步骤如下。

  1. 创建OperatorSnapshotFutures对象,封装当前算子对应的状态快照操作。
  2. 创建snapshotContext上下文对象,存储快照过程需要的上下文信息,并调用snapshotState()方法执行快照操作。

snapshotState()方法由StreamOperator子类实现,例如在AbstractUdfStreamOperator中会调用StreamingFunctionUtils.snapshotFunctionState(context,getOperatorStateBackend(),
userFunction)方法执行函数中的状态快照操作。

  1. 向snapshotInProgress中指定KeyedStateRawFuture和OperatorStateRawFuture,专门用于处理原生状态数据的快照操作
  • 如果operatorStateBackend不为空,则将operatorStateBackend.snapshot()方法块设定到OperatorStateManagedFuture中,并注册到snapshotInProgress中等待执行。
  • 如果keyedStateBackend不为空,则将keyedStateBackend.snapshot()方法块设定到KeyedStateManagedFuture中,并注册到snapshotInProgress中等待执行。
  1. 返回创建的snapshotInProgress异步Future对象,snapshotInProgress中封装了当前算子需要执行的所有快照操作。
public final OperatorSnapshotFutures snapshotState(long checkpointId, 
                                                   long timestamp, 
                                                   CheckpointOptions 
                                                   checkpointOptions,
                                                   CheckpointStreamFactory factory
                                                   ) throws Exception {
      // 获取KeyGroupRange
   KeyGroupRange keyGroupRange = null != keyedStateBackend ?
         keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_
            RANGE;
      // 创建OperatorSnapshotFutures处理对象
   OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
      // 创建snapshotContext上下文对象
   StateSnapshotContextSynchronousImpl snapshotContext = 
   new StateSnapshotContextSynchronousImpl(
      checkpointId,
      timestamp,
      factory,
      keyGroupRange,
      getContainingTask().getCancelables());
   try {
      snapshotState(snapshotContext);
      // 设定KeyedStateRawFuture和OperatorStateRawFuture
      snapshotInProgress
      .setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
      snapshotInProgress
      .setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
            // 如果operatorStateBackend不为空,设定OperatorStateManagedFuture
      if (null != operatorStateBackend) {
         snapshotInProgress.setOperatorStateManagedFuture(
            operatorStateBackend
            .snapshot(checkpointId, timestamp, factory, checkpointOptions));
      }
      // 如果keyedStateBackend不为空,设定KeyedStateManagedFuture
      if (null != keyedStateBackend) {
         snapshotInProgress.setKeyedStateManagedFuture(
            keyedStateBackend
            .snapshot(checkpointId, timestamp, factory, checkpointOptions));
      }
   } catch (Exception snapshotException) {
    // 此处省略部分代码
   }
   return snapshotInProgress;
}

这里可以看出,原生状态和管理状态的RunnableFuture对象会有所不同

  • RawState主要通过从snapshotContext中获取的RawFuture对象 管理状态的快照操作
  • ManagedState主要通过operatorStateBackend和keyedStateBackend进行状态的管理,并根据StateBackend的不同实现将状态数据写入内存或外部文件系统中。

 

1.4. 算子状态进行快照

我们知道所有的状态快照操作都会被封装到OperatorStateManagedFuture对象中,最终通过AsyncCheckpointRunnable线程触发执行。

下面我们看AsyncCheckpointRunnable线程的定义。如代码所示,AsyncCheckpointRunnable.run()方法主要逻辑如下。

  1. 调用FileSystemSafetyNet.initializeSafetyNetForThread()方法为当前线程初始化文件系统安全网,确保数据能够正常写入。
  2. 创建TaskStateSnapshot实例:

创建jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates对应的TaskStateSnapshot实例,其中jobManagerTaskOperatorSubtaskStates用于存储和记录发送给JobManager的Checkpoint数据,localTaskOperatorSubtaskStates用于存储TaskExecutor本地的状态数据。

  1. 执行所有状态快照线程操作

遍历operatorSnapshotsInProgress集合,获取OperatorSnapshotFutures并创建OperatorSnapshotFinalizer实例,用于执行所有状态快照线程操作。在OperatorSnapshotFinalizerz中会调用FutureUtils.runIfNotDoneAndGet()方法执行KeyedState和OperatorState的快照操作。

  1. 从finalizedSnapshots中获取JobManagerOwnedState和TaskLocalState,分别存储在jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates集合中。
  2. 调用checkpointMetrics对象记录Checkpoint执行的时间并汇总到Metric监控系统中。
  3. 如果AsyncCheckpointState为COMPLETED状态,则调用reportCompletedSnapshotStates()方法向JobManager汇报Checkpoint的执行结果。
  4. 如果出现其他异常情况,则调用handleExecutionException()方法进行处理。
public void run() {
   FileSystemSafetyNet.initializeSafetyNetForThread();
   try {
      // 创建TaskStateSnapshot
      TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
         new TaskStateSnapshot(operatorSnapshotsInProgress.size());
      TaskStateSnapshot localTaskOperatorSubtaskStates =
         new TaskStateSnapshot(operatorSnapshotsInProgress.size());
      for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : 
           operatorSnapshotsInProgress.entrySet()) {
         OperatorID operatorID = entry.getKey();
         OperatorSnapshotFutures snapshotInProgress = entry.getValue();
         // 创建OperatorSnapshotFinalizer对象
         OperatorSnapshotFinalizer finalizedSnapshots =
            new OperatorSnapshotFinalizer(snapshotInProgress);
         jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
            operatorID,
            finalizedSnapshots.getJobManagerOwnedState());
         localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
            operatorID,
            finalizedSnapshots.getTaskLocalState());
      }
      final long asyncEndNanos = System.nanoTime();
      final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
      checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
      if (asyncCheckpointState.compareAndSet(
          CheckpointingOperation.AsyncCheckpointState.RUNNING,
         CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
         reportCompletedSnapshotStates(
            jobManagerTaskOperatorSubtaskStates,
            localTaskOperatorSubtaskStates,
            asyncDurationMillis);
      } else {
         LOG.debug("{} - asynchronous part of checkpoint {} could not be 
            completed because it was closed before.",
            owner.getName(),
            checkpointMetaData.getCheckpointId());
      }
   } catch (Exception e) {
      handleExecutionException(e);
   } finally {
      owner.cancelables.unregisterCloseable(this);
      FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
   }
}

至此,算子状态数据快照的逻辑基本完成,算子中的托管状态主要借助KeyedStateBackend和OperatorStateBackend管理。

KeyedStateBackend和OperatorStateBackend都实现了SnapshotStrategy接口,提供了状态快照的方法。SnapshotStrategy根据不同类型存储后端,主要有HeapSnapshotStrategy和RocksDBSnapshotStrategy两种类型。

 

1.5. 状态数据快照持久化

这里我们以HeapSnapshotStrategy为例,介绍在StateBackend中对状态数据进行状态快照持久化操作的步骤。如代码所示,

HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates()方法中定义了对KeyedState以及OperatorState的状态处理逻辑。

  1. 遍历每个StateSnapshotRestore。
  2. 调用StateSnapshotRestore.stateSnapshot()方法,此时会创建StateSnapshot对象。
  3. 将创建的StateSnapshot添加到metaInfoSnapshots和cowStateStableSnapshots集合中,完成堆内存存储类型KvState的快照操作。
private void processSnapshotMetaInfoForAllStates(
   List metaInfoSnapshots,
   Map<StateUID, StateSnapshot> cowStateStableSnapshots,
   Map<StateUID, Integer> stateNamesToId,
   Map<String, ? extends StateSnapshotRestore> registeredStates,
   StateMetaInfoSnapshot.BackendStateType stateType) {
   for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
        registeredStates.entrySet()) {
      final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
      stateNamesToId.put(stateUid, stateNamesToId.size());
      StateSnapshotRestore state = kvState.getValue();
      if (null != state) {
         final StateSnapshot stateSnapshot = state.stateSnapshot();
         metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
         cowStateStableSnapshots.put(stateUid, stateSnapshot);
      }
   }
}

 

二. CheckpointCoordinator管理Checkpoint

1. Checkpoint执行完毕后的确认过程

当StreamTask中所有的算子完成状态数据的快照操作后,Task实例会立即将TaskStateSnapshot消息发送到管理节点的CheckpointCoordinator中,并在CheckpointCoordinator中完成后续的操作。如图所示,Checkpoint执行完毕后的确认过程如下。

在这里插入图片描述

  1. 调用StreamTask.reportCompletedSnapshotStates

当StreamTask中的所有算子都完成快照操作后,会调用StreamTask.reportCompletedSnapshotStates()方法将TaskStateSnapshot等Ack消息发送给TaskStateManager。TaskStateManager封装了CheckpointCoordinatorGateway,因此可以直接和CheckpointCoordinator组件进行RPC通信。

  1. 消息传递
  • 将消息传递给CheckpointCoordinatorGateway
    TaskStateManager通过CheckpointResponder.acknowledgeCheckpoint()方法将acknowledgedTaskStateSnapshot消息传递给CheckpointCoordinatorGateway接口实现者,实际上就是JobMasterRPC服务。
  • 消息传递给CheckpointCoordinator
    JobMaster接收到RpcCheckpointResponder返回的Ack消息后,会调用SchedulerNG.acknowledgeCheckpoint()方法将消息传递给调度器。调度器会将Ack消息封装成AcknowledgeCheckpoint,传递给CheckpointCoordinator组件继续处理。
  1. 管理PendingCheckpoint

当CheckpointCoordinator接收到AcknowledgeCheckpoint后,会从pendingCheckpoints集合中获取对应的PendingCheckpoint,然后判断当前Checkpoint中是否收到AcknowledgedTasks集合所有的Task实例发送的Ack确认消息。
如果notYetAcknowledgedTasks为空,则调用completePendingCheckpoint()方法完成当前PendingCheckpoint操作,并从pendingCheckpoints集合中移除当前的PendingCheckpoint。

  1. 添加CompletedCheckpoint:

紧接着,PendingCheckpoint会转换成CompletedCheckpoint,此时CheckpointCoordinator会在completedCheckpointStore集合中添加CompletedCheckpoint。

  1. 通知Checkpoint操作结束。

CheckpointCoordinator会遍历tasksToCommitTo集合中的ExecutionVertex节点并获取Execution对象,然后通过Execution向TaskManagerGateway发送CheckpointComplete消息,通知所有的Task实例本次Checkpoint操作结束。

  1. 通知同步

当TaskExecutor接收到CheckpointComplete消息后,会从TaskSlotTable中获取对应的Task实例,向Task实例中发送CheckpointComplete消息。所有实现CheckpointListener监听器的组件或算子都会获取Checkpoint完成的消息,然后完成各自后续的处理操作。

 

2. 触发并完成Checkpoint操作

CheckpointCoordinator组件接收到Task实例的Ack消息(快照完成了?)后,会触发并完成Checkpoint操作。如代码PendingCheckpoint.finalizeCheckpoint()方法的具体实现如下。

1)向sharedStateRegistry中注册operatorStates。
2)结束pendingCheckpoint中的Checkpoint操作并生成CompletedCheckpoint3)将completedCheckpoint添加到completedCheckpointStore中,
4)从pendingCheckpoint中移除checkpointId对应的PendingCheckpoint,
并触发队列中的Checkpoint请求。
5)向所有的ExecutionVertex节点发送CheckpointComplete消息,
通知Task实例本次Checkpoint操作完成。



private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) 
   throws CheckpointException {
   final long checkpointId = pendingCheckpoint.getCheckpointId();
   final CompletedCheckpoint completedCheckpoint;
   // 首先向sharedStateRegistry中注册operatorStates
   Map<OperatorID, OperatorState> operatorStates = 
      pendingCheckpoint.getOperatorStates();
   sharedStateRegistry.registerAll(operatorStates.values());
   // 对pendingCheckpoint中的Checkpoint做结束处理并生成CompletedCheckpoint
   try {
      try {
         completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
         failureManager.handleCheckpointSuccess(pendingCheckpoint.
            getCheckpointId());
      }
      catch (Exception e1) {
         // 如果出现异常则中止运行并抛出CheckpointExecution
         if (!pendingCheckpoint.isDiscarded()) {
             failPendingCheckpoint(pendingCheckpoint,
                                   CheckpointFailureReason.FINALIZE_CHECKPOINT_
                                        FAILURE, e1);
         }
         throw new CheckpointException("Could not finalize the pending 
                                       checkpoint " +
                                       checkpointId + '.',
                                       CheckpointFailureReason
                                       .FINALIZE_CHECKPOINT_FAILURE, e1);
      }
      // 当完成finalization后,PendingCheckpoint必须被丢弃
      Preconditions.checkState(pendingCheckpoint.isDiscarded() 
                               && completedCheckpoint != null);
      // 将completedCheckpoint添加到completedCheckpointStore中
      try {
         completedCheckpointStore.addCheckpoint(completedCheckpoint);
      } catch (Exception exception) {
         // 如果completed checkpoint存储出现异常则进行清理
         executor.execute(new Runnable() {
            @Override
            public void run() {
               try {
                  completedCheckpoint.discardOnFailedStoring();
               } catch (Throwable t) {
                  LOG.warn("Could not properly discard completed checkpoint {}.",
                           completedCheckpoint.getCheckpointID(), t);
               }
            }
         });
         throw new CheckpointException("Could not complete the pending 
                                       checkpoint " + 
                                       checkpointId + '.', 
                                       CheckpointFailureReason.
                                       FINALIZE_CHECKPOINT_FAILURE, exception);
      }
   } finally {
      // 最后从pendingCheckpoints中移除checkpointId对应的PendingCheckpoint
      pendingCheckpoints.remove(checkpointId);
      // 触发队列中的Checkpoint请求
      triggerQueuedRequests();
   }
   // 记录checkpointId
   rememberRecentCheckpointId(checkpointId);
   // 清除之前的Checkpoints
   dropSubsumedCheckpoints(checkpointId);
   // 计算和前面Checkpoint操作之间的最低延时
   lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
   LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", 
            checkpointId, job,
            completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
   // 通知所有的ExecutionVertex节点Checkpoint操作完成
   final long timestamp = completedCheckpoint.getTimestamp();
   for (ExecutionVertex ev : tasksToCommitTo) {
      Execution ee = ev.getCurrentExecutionAttempt();
      if (ee != null) {
          ee.notifyCheckpointComplete(checkpointId, timestamp);
      }
   }
}

 

3. 通知CheckpointComplete给TaskExecutor

当TaskExecutor接收到来自CheckpointCoordinator的CheckpointComplete消息后,会调用Task.notifyCheckpointComplete()方法将消息传递到指定的Task实例中。Task线程会将CheckpointComplete消息通知给StreamTask中的算子。

如下代码,

/**
将notifyCheckpointComplete()转换成RunnableWithException线程并提交到Mailbox中运行,且在MailboxExecutor线程模型中获取和执行的优先级是最高的。
最终notifyCheckpointComplete()方法会在MailboxProcessor中运行。
**/

public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
   return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
      () -> notifyCheckpointComplete(checkpointId),
      "checkpoint %d complete", checkpointId);
}

继续具体看StreamTask.notifyCheckpointComplete(),如下代码:

1)获取当前Task中算子链的算子,并发送Checkpoint完成的消息。
2)获取TaskStateManager对象,向其通知Checkpoint完成消息,这里主要调用
TaskLocalStateStore清理本地无用的Checkpoint数据。
3)如果当前Checkpoint是同步的Savepoint操作,直接完成并终止当前Task实例,并调用
resetSynchronousSavepointId()方法将syncSavepointId重置为空。

private void notifyCheckpointComplete(long checkpointId) {
   try {
      boolean success = actionExecutor.call(() -> {
         if (isRunning) {
            LOG.debug("Notification of complete checkpoint for task {}", 
               getName());
            // 获取当前Task中operatorChain所有的Operator,并通知每个Operator 
               Checkpoint执行成功的消息
            for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
               if (operator != null) {
                  operator.notifyCheckpointComplete(checkpointId);
               }
            }
            return true;
         } else {
            LOG.debug("Ignoring notification of complete checkpoint for 
               not-running task {}", getName());
            return true;
         }
      });
      // 获取TaskStateManager,并通知Checkpoint执行完成的消息
      getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);
      // 如果是同步的Savepoint操作,则直接完成当前Task
      if (success && isSynchronousSavepointId(checkpointId)) {
         finishTask();
         // Reset to "notify" the internal synchronous savepoint mailbox loop.
         resetSynchronousSavepointId();
      }
   } catch (Exception e) {
      handleException(new RuntimeException("Error while confirming checkpoint", e));
   }
}

算子接收到Checkpoint完成消息后,会根据自身需要进行后续的处理,默认在AbstractStreamOperator基本实现类中会通知keyedStateBackend进行后续操作。

对于AbstractUdfStreamOperator实例,会判断当前userFunction是否实现了CheckpointListener,如果实现了,则向UserFucntion通知Checkpoint执行完成的信息

例如在FlinkKafkaConsumerBase中会通过获取到的Checkpoint完成信息,将Offset提交至Kafka集群,确保消费的数据已经完成处理,详细实现可以参考FlinkKafkaConsumerBase.notifyCheckpointComplete()方法。

public void notifyCheckpointComplete(long checkpointId) throws Exception {
   super.notifyCheckpointComplete(checkpointId);
   if (userFunction instanceof CheckpointListener) {
      ((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
   }
}

 

三. 状态管理学习小结

通过学习状态管理的源码,我们可以再来思考下如下几个场景问题,是不是有一点“庖丁解牛”的意思!

flink中状态存在的意义是什么,涉及到哪些场景。

  1. 实时聚合:比如,计算过去一小时内的平均销售额。这时,你会需要使用到Flink的状态来存储过去一小时内的所有销售数据。
  2. 窗口操作:Flink SQL支持滚动窗口、滑动窗口、会话窗口等。这些窗口操作都需要Flink的状态来存储在窗口期限内的数据。
  3. 状态的持久化与任务恢复:实时任务挂掉之后,为了快速从上一个点恢复任务,可以使用savepoint和checkpoint。
  4. 多流join:Flink至少存储一个流中的数据,以便于在新的记录到来时进行匹配。

 

其次通过学习Flink状态管理相关源码,可以进一步了解状态管理的细节操作,为解决更加复杂的问题打下理论基础

  1. 深入理解任务运行过程中,各算子状态的流转机制;
  2. 快速定位问题:在遇到实际问题时,能够快速反应出是哪块逻辑出现了问题;
  3. 应对故障:状态管理和Flink容错机制相关,可以了解Flink发生故障时如何保证状态的一致性和可恢复性
  4. 二次开发:可以自定义状态后端,或者拓展优化已有的例如RocksDB状态后端等;
  5. 性能优化:了解了Flink是如何有效的处理和管理状态,就可以优化任务性能,减少资源消耗。

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1460234.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于PSO优化的CNN多输入回归预测(Matlab)粒子群算法优化卷积神经网络回归预测

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、部分代码&#xff1a; 四、完整程序下载&#xff1a; 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 本代码基于Matlab平台编译&am…

Stable Diffusion 绘画入门教程(webui)-图生图

通过之前的文章相信大家对文生图已经不陌生了&#xff0c;那么图生图是干啥的呢&#xff1f; 简单理解就是根据我们给出的图片做为参考进行生成图片。 一、能干啥 这里举两个例子 1、二次元头像 真人转二次元&#xff0c;或者二次元转真人都行&#xff0c; 下图为真人转二次…

电脑任务栏一直转圈圈怎么办 电脑底部任务栏卡死桌面没事的解决办法

最近有一些用户反映&#xff0c;自己的电脑底部任务看卡死桌面没事&#xff0c;不知道是什么原因&#xff0c;也不清楚应该如何解决&#xff0c;是由于资讯和兴趣页面加载时卡死导致的&#xff0c;将其关闭后即可解决&#xff0c;以下是小编提供的电脑任务栏一直转圈圈的解决方…

c语言结构体与共用体

前面我们介绍了基本的数据类型 在c语言中 有一种特殊的数据类型 由程序员来定义类型 目录 一结构体 1.1概述 1.2定义结构体 1.3 结构体变量的初始化 1.4 访问结构体的成员 1.5结构体作为函数的参数 1.6指向结构的指针 1.7结构体大小的计算 二共用体 2.1概述 2.2 访…

网络编程知识整理

目录 1.1 引言 1.2 分层 1.3 TCP/IP的分层 1.4 互联网的地址 1.5 域名服务 1.6 封装 1.7 分用 1.8 端口号 1.1 引言 很多不同的厂家生产各种型号的计算机&#xff0c;它们运行完全不同的操作系统&#xff0c;但 T C P / I P协议族允许它们互相进行通信。这一点很让人感…

小红书家居种草商品笔记,需要注意哪些?

家居赛道一直都是小红书的强势赛道之一。那么如果想要成为一个家居博主&#xff0c;或者家居品牌想要入驻小红书&#xff0c;提升影响力&#xff0c;该如何进行推广呢?今天我们就从家居商品笔记的角度&#xff0c;和大家探讨下小红书家居种草商品笔记&#xff0c;需要注意哪些…

PHP分析二维数据表(长度|数字字段|空值|纯姓名|英文用户名|科学计数|是否等长|是否唯一)

先看图&#xff0c;后有完整代码 <?php $t "Excel数据转Sql查询系统字段半智能分析"; $s "Excel复制过来的二维结构表内容,分析查询条件&#xff01;"; $x "字段|最大长度|长度有|数字字段|空值存在|纯姓名|英文用户名|科学计数|是否等长|是否…

pandas DataFrame 导出到Excel格式美化

默认情况下&#xff0c;DataFrame 的 to_excel() 方法导出到 Excel 格式是比较丑的&#xff0c;本篇的代码演示了用一种比较简单的方法进行美化。要点&#xff1a; 使用 Excel 的 Table &#xff08;经常被称为超级表&#xff09;自动列宽 美化前&#xff0c;输出的格式如下&…

高级统计方法 第2次作业

概念 1. &#xff08;a&#xff09; 光滑度高的好&#xff0c;样本足够多光滑度越高就越能表征真实情况&#xff0c;也能对预测变量更好的预测。 &#xff08;b&#xff09; 光滑度低的好&#xff0c;因为可能“过拟合”&#xff0c;一些误差大的数可能会较大的影响到预测…

微信小程序video 点击自动全屏播放

//因为这个地址可能是图片也可能是视频 点击 图片可以预览&#xff0c;点击视频可放大全屏自动播放。 代码如下 <view v-else :class{contentImg: x.picture.length0} style"margin-top: 10px;"v-for"(x1, y1) in x.picture" :key"y"><…

【Docker】Linux主机部署Docker

Docker部署 1.二进制文件部署 到如下地址&#xff0c;下载二进制包。 Docker官网&#xff1a;https://docs.docker.com/engine/install/binaries/ 网易镜像源&#xff1a;https://mirrors.163.com/docker-ce/linux/static/stable/x86_64/ 下载好的二进制包上传到主机&#xf…

【LeetCode】递归精选8题——基础递归、链表递归

目录 基础递归问题&#xff1a; 1. 斐波那契数&#xff08;简单&#xff09; 1.1 递归求解 1.2 迭代求解 2. 爬楼梯&#xff08;简单&#xff09; 2.1 递归求解 2.2 迭代求解 3. 汉诺塔问题&#xff08;简单&#xff09; 3.1 递归求解 4. Pow(x, n)&#xff08;中等&…

(每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理第11章 项目成本管理(一)

博主2023年11月通过了信息系统项目管理的考试&#xff0c;考试过程中发现考试的内容全部是教材中的内容&#xff0c;非常符合我学习的思路&#xff0c;因此博主想通过该平台把自己学习过程中的经验和教材博主认为重要的知识点分享给大家&#xff0c;希望更多的人能够通过考试&a…

消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

4、Work Queues Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务&#xff0c;而不得不等待它完成。我们把任务封装为消息并将其发送到队列&#xff0c;在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时&#xff0c;这些工作…

python51-Python流程控制if分支之不要随意缩进

需要说明的是,虽然Python 语法允许代码块随意缩进N个空格,但同一个代码块内的代码必须保持相同的缩进,不能一会缩进2个空格,一会缩进4个空格。例如如下代码。 上面程序中第二条print语句缩进了5个空格,在这样的情况下,Python解释器认为这条语句与前一条语句(缩进了4个空格…

用html编写的招聘简历

用html编写的招聘简历 相关代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</tit…

没有代码签名证书会怎么样?

随着Windows的SmartScreen功能的普及&#xff0c;如果一个软件发布的时候没有通过代码签名证书进行数字签名&#xff0c;那这个软件从发布&#xff0c;下载&#xff0c;安装&#xff0c;运行等&#xff0c;基本都会遭到系统的风险警告&#xff0c;运行拦截。其目的在于警示用户…

MapGIS 10.6 Pro前端开发低代码,快速构建WebGIS应用

随着实景三维、CIM、数字孪生等的快速发展&#xff0c;相关应用开发需求的市场增长对企业IT交付能力的要求越来越高&#xff0c;为了确保质量并实现提效降本&#xff0c;并让专业开发者更加专注于更具有价值和创新型的工作&#xff0c;低代码开发技术成为大家的优先选择。 为了…

工作入职必备:一寸照片尺寸要求及自拍换底色方法

踏入职场的第一步&#xff0c;往往从一张小小的一寸照片开始。这张看似不起眼的照片&#xff0c;却是你给新同事、新领导的第一印象。今天&#xff0c;我们就来深入探讨一寸照片的尺寸要求&#xff0c;以及如何巧妙地通过自拍来更换背景颜色&#xff0c;让你的入职照片既专业又…

【Java EE初阶二十一】关于http(二)

2. 深入学习http 2.5 关于referer Referer 描述了当前页面是从哪个页面跳转来的&#xff0c;如果是直接在地址栏输入 url(或者点击收藏夹中的按钮) 都是没有 Referer。如下图所示&#xff1a; HTTP 最大的问题在于"明文传输”,明文传输就容易被第三方获取并篡改. …