背景
算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态
算子联合列表状态
首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
算子联合列表状态主要由这两个方法处理:
1初始化方法
public final void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
// 在初始化方法中获取联合列表状态
this.unionOffsetStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
createStateSerializer(getRuntimeContext().getExecutionConfig())));
if (context.isRestored()) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中
// populate actual holder for restored state
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
LOG.info(
"Consumer subtask {} restored state: {}.",
getRuntimeContext().getIndexOfThisSubtask(),
restoredState);
} else {
LOG.info(
"Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask());
}
}
2.开始通知检查点开始的方法:
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
subscribedPartitionsToStartOffsets.entrySet()) {
// 进行checkpoint时,把数据保存到联合列表状态中进行保存
unionOffsetStates.add(
Tuple2.of(
subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call
// can happen
// on this function at a time: either snapshotState() or
// notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call
// can happen
// on this function at a time: either snapshotState() or
// notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(
kafkaTopicPartitionLongEntry.getKey(),
kafkaTopicPartitionLongEntry.getValue()));
}
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
}
}