【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析

news2025/1/13 13:11:24

文章目录

    • 1. 状态初始化总流程梳理
    • 2.创建StreamOperatorStateContext
    • 3. StateInitializationContext的接口设计。
    • 4. 状态初始化举例:UDF状态初始化

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.
        if (!isRunning) {
            LOG.debug("Restoring during invoke will be called.");
            restoreInternal();
        }

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  
    Iterator var2 = this.getAllOperators(true).iterator();  
  
    while(var2.hasNext()) {  
        StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  
        StreamOperator<?> operator = operatorWrapper.getStreamOperator();  
        operator.initializeState(streamTaskStateInitializer);  
        operator.open();  
    }  
  
}

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeState

public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  
        throws Exception {  
    //1. 获取类型序列化器
    final TypeSerializer<?> keySerializer =  
            config.getStateKeySerializer(getUserCodeClassloader());  
    //2. get containingTask
    final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  
    final CloseableRegistry streamTaskCloseableRegistry =  
            Preconditions.checkNotNull(containingTask.getCancelables());  
   //3. create StreamOperatorStateContext
    final StreamOperatorStateContext context =  
            streamTaskStateManager.streamOperatorStateContext(  
                    getOperatorID(),  
                    getClass().getSimpleName(),  
                    getProcessingTimeService(),  
                    this,  
                    keySerializer,  
                    streamTaskCloseableRegistry,  
                    metrics,  
                    config.getManagedMemoryFractionOperatorUseCaseOfSlot(  
                            ManagedMemoryUseCase.STATE_BACKEND,  
                            runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  
                            runtimeContext.getUserCodeClassLoader()),  
                    isUsingCustomRawKeyedState());  
   //4. create stateHandler
    stateHandler =  
            new StreamOperatorStateHandler(  
                    context, getExecutionConfig(), streamTaskCloseableRegistry);  
    timeServiceManager = context.internalTimerServiceManager();  
    //5. initialize OperatorState
    stateHandler.initializeOperatorState(this);  
    //6. set KeyedStateStore in runtimeContext
    runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  
        @Nonnull OperatorID operatorID,  
        @Nonnull String operatorClassName,  
        @Nonnull ProcessingTimeService processingTimeService,  
        @Nonnull KeyContext keyContext,  
        @Nullable TypeSerializer<?> keySerializer,  
        @Nonnull CloseableRegistry streamTaskCloseableRegistry,  
        @Nonnull MetricGroup metricGroup,  
        double managedMemoryFraction,  
        boolean isUsingCustomRawKeyedState)  
        throws Exception {  
    //1. 获取task实例信息
    TaskInfo taskInfo = environment.getTaskInfo();  
    OperatorSubtaskDescriptionText operatorSubtaskDescription =  
            new OperatorSubtaskDescriptionText(  
                    operatorID,  
                    operatorClassName,  
                    taskInfo.getIndexOfThisSubtask(),  
                    taskInfo.getNumberOfParallelSubtasks());  
  
    final String operatorIdentifierText = operatorSubtaskDescription.toString();  
  
    final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  
            taskStateManager.prioritizedOperatorState(operatorID);  
  
    CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  
    OperatorStateBackend operatorStateBackend = null;  
    CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  
    CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  
    InternalTimeServiceManager<?> timeServiceManager;  
  
    try {  
        // 创建keyed类型的状态后端
        // -------------- Keyed State Backend --------------  
        keyedStatedBackend =  
                keyedStatedBackend(  
                        keySerializer,  
                        operatorIdentifierText,  
                        prioritizedOperatorSubtaskStates,  
                        streamTaskCloseableRegistry,  
                        metricGroup,  
                        managedMemoryFraction);  
        //创建operator类型的状态后端
        // -------------- Operator State Backend --------------  
        operatorStateBackend =  
                operatorStateBackend(  
                        operatorIdentifierText,  
                        prioritizedOperatorSubtaskStates,  
                        streamTaskCloseableRegistry);  
        //创建原生类型状态后端
        // -------------- Raw State Streams --------------  
        rawKeyedStateInputs =  
                rawKeyedStateInputs(  
                        prioritizedOperatorSubtaskStates  
                                .getPrioritizedRawKeyedState()  
                                .iterator());  
        streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  
  
        rawOperatorStateInputs =  
                rawOperatorStateInputs(  
                        prioritizedOperatorSubtaskStates  
                                .getPrioritizedRawOperatorState()  
                                .iterator());  
        streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  
        //创建Internal Timer Service Manager
        // -------------- Internal Timer Service Manager --------------  
        if (keyedStatedBackend != null) {  
  
            // if the operator indicates that it is using custom raw keyed state,  
            // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  
                    (prioritizedOperatorSubtaskStates.isRestored()  
                                    && !isUsingCustomRawKeyedState)  
                            ? rawKeyedStateInputs  
                            : Collections.emptyList();  
  
            timeServiceManager =  
                    timeServiceManagerProvider.create(  
                            keyedStatedBackend,  
                            environment.getUserCodeClassLoader().asClassLoader(),  
                            keyContext,  
                            processingTimeService,  
                            restoredRawKeyedStateTimers);  
        } else {  
            timeServiceManager = null;  
        }  
  
        // -------------- Preparing return value --------------  
  
        return new StreamOperatorStateContextImpl(  
                prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  
                operatorStateBackend,  
                keyedStatedBackend,  
                timeServiceManager,  
                rawOperatorStateInputs,  
                rawKeyedStateInputs);  
    } catch (Exception ex) {  
  
。。。。
}

流程梳理:

  1. 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
  2. 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
  3. 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
  4. 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
  5. 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
  6. 将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
在这里插入图片描述

  1. ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。

  2. FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。

  3. StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等

  4. StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

在这里插入图片描述

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {
   super.initializeState(context);
   StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1440127.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何从 iPhone 上恢复永久删除的照片

您的 iPhone 上缺少照片吗&#xff1f;讽刺的是&#xff0c;iPhone 的许多高级功能可能正是这个问题如此普遍的原因。幸运的是&#xff0c;还有很多方法可以从 iPhone 恢复已删除的照片&#xff0c;具体取决于您设备的设置方式。 本文涵盖了所有这些内容。该过程根据您的具体情…

git安装配置

1、下载安装 下载地址 2、配置git用户 git config --global user.name "yw" git config --global user.email "88888qq.com" 3、git init 初始化 4、生成ssh密钥 mkdir .ssh //创建文件夹cd .ssh //进入新建文件夹 ssh-keygen -t rsa // 输入密钥文…

计算机服务器中了halo勒索病毒如何处理,halo勒索病毒解密数据恢复

网络技术的不断发展与应用&#xff0c;为企业的生产生活提供了极大便利&#xff0c;但网络数据安全威胁无处不在&#xff0c;近日&#xff0c;云天数据恢复中心接到某连锁超市求助&#xff0c;企业计算机服务器被halo勒索病毒攻击&#xff0c;导致计算机系统瘫痪&#xff0c;无…

将xyz格式的GRACE数据转成geotiff格式

我们需要将xyz格式的文件转成geotiff便于成图&#xff0c;或者geotiff转成xyz用于数据运算&#xff0c;下面介绍如何实现这一操作&#xff0c;采用GMT和matlab两种方法。 1.GMT转换 我们先准备一个xyz文件&#xff0c;这里是一个降水文件。在gmt中采用以下的语句实现xyz转grd…

【Spring】GoF 之工厂模式

一、GoF 23 设计模式简介 设计模式&#xff1a;一种可以被重复利用的解决方案 GoF&#xff08;Gang of Four&#xff09;&#xff0c;中文名——四人组 《Design Patterns: Elements of Reusable Object-Oriented Software》&#xff08;即《设计模式》一书&#xff09;&…

TCP 传输控制协议

1 TCP 1.1 TCP 最主要的特点 1.TCP 是面向连接的运输层协议。 2.每一条 TCP 连接只能有两个端点 (endpoint)&#xff0c;每一条 TCP 连接只能是点对点的&#xff08;一对一&#xff09;。 3.TCP 提供可靠交付的服务。 4.TCP 提供全双工通信。 5.面向字节流 TCP 中的“流…

【Qt】Android上运行keeps stopping, Desktop上正常

文章目录 问题 & 背景背景问题 解决方案One More ThingTake Away 问题 & 背景 背景 在文章【Qt】最详细教程&#xff0c;如何从零配置Qt Android安卓环境中&#xff0c;我们在Qt中配置了安卓开发环境&#xff0c;并且能够正常运行。 但笔者在成功配置并完成上述文章…

Git、github与gitee码云

1.git核心是两个仓库&#xff1a;本地仓库和远程仓库 主要用于团队合作和代码版本控制&#xff08;个人现有版本代码出错可回溯上个提交版本的代码&#xff09; 远程仓库国际主流githut&#xff0c;但外网速度问题&#xff0c;国内可使用码云gitee github&#xff1a;https:…

Springboot拦截器中跨域失效的问题、同一个接口传入参数不同,一个成功,一个有跨域问题、拦截器和@CrossOrigin和@Controller

Springboot拦截器中跨域失效的问题 一、概述 1、具体场景 起因&#xff1a; 同一个接口&#xff0c;传入不同参数进行值的修改时&#xff0c;一个成功&#xff0c;另一个竟然失败&#xff0c;而且是跨域问题拦截器内的request参数调用getHeader方法时&#xff0c;获取不到前端…

CSS:九宫格布局

九宫格布局效果如下&#xff1a; HTML 结构&#xff1a; <div class"container"><div class"item">1</div><div class"item">2</div><div class"item">3</div><div class"item&q…

机器学习系列——(十六)回归模型的评估

引言 在机器学习领域&#xff0c;回归模型是一种预测连续数值输出的重要工具。无论是预测房价、股票价格还是天气温度&#xff0c;回归模型都扮演着不可或缺的角色。然而&#xff0c;构建模型只是第一步&#xff0c;评估模型的性能是确保模型准确性和泛化能力的关键环节。本文…

【力扣】快乐数,哈希集合 + 快慢指针 + 数学

快乐数原题地址 方法一&#xff1a;哈希集合 定义函数 getNext(n) &#xff0c;返回 n 的所有位的平方和。一直执行 ngetNext(n) &#xff0c;最终只有 2 种可能&#xff1a; n 停留在 1 。无限循环且不为 1 。 证明&#xff1a;情况 1 是存在的&#xff0c;如力扣的示例一…

(十八)springboot实战——spring securtity注解方式的授权流程源码解析

前言 在上一节内容中&#xff0c;我们介绍了如何在FilterSecurityInterceptor过滤器中处理用户的授权流程&#xff0c;并分析了其源码&#xff0c;spring security还提供了方法级别的授权方式&#xff0c;通过EnableMethodSecurity注解启用权限认证流程&#xff0c;只需要在方…

每日一题——LeetCode1422.分割字符串的最大得分

方法一 暴力枚举 枚举所有分割点的情况&#xff0c;取最大得分 var maxScore function(s) {let res 0;const n s.length;for (let i 1; i < n; i) {let score 0;for (let j 0; j < i; j) {if (s[j] 0) {score;}}for (let j i; j < n; j) {if (s[j] 1) {sco…

分享86个行业PPT,总有一款适合您

分享86个行业PPT&#xff0c;总有一款适合您 86个行业PPT下载链接&#xff1a;https://pan.baidu.com/s/1avbzwqK8ILLWYIOylK1aRQ?pwd8888 提取码&#xff1a;8888 Python采集代码下载链接&#xff1a;采集代码.zip - 蓝奏云 学习知识费力气&#xff0c;收集整理更不易…

TI的电量计驱动在卸载时导致Linux卡死

背景 最近移植TI电量计芯片bq40z50的驱动&#xff0c;移植完毕后&#xff0c;能正常读取电池信息了&#xff0c;但是无意中发现驱动卸载会导致Linux卡死&#xff0c;死前终端闪过大量打印&#xff0c;将putty的缓冲区都耗尽了&#xff0c;必须启用syslog转发并用visual syslog…

【数据结构】链表OJ面试题4(题库+解析)

1.前言 前五题在这http://t.csdnimg.cn/UeggB 后三题在这http://t.csdnimg.cn/gbohQ 给定一个链表&#xff0c;判断链表中是否有环。http://t.csdnimg.cn/Rcdyc 记录每天的刷题&#xff0c;继续坚持&#xff01; 2.OJ题目训练 10. 给定一个链表&#xff0c;返回链表开始…

Qt PCL学习(二):点云读取与保存

注意事项 版本一览&#xff1a;Qt 5.15.2 PCL 1.12.1 VTK 9.1.0前置内容&#xff1a;Qt PCL学习&#xff08;一&#xff09;&#xff1a;环境搭建 0. 效果演示 1. pcl_open_save.pro QT core guigreaterThan(QT_MAJOR_VERSION, 4): QT widgets// 添加下行代码&#…

外贸邮件群发如何做?外贸邮件群发靠谱吗?

外贸邮件群发有哪些平台&#xff1f;外贸群发邮件用什么邮箱&#xff1f; 外贸邮件群发是许多企业在开展国际贸易时常用的营销手段&#xff0c;它不仅能够快速地将产品信息和促销活动传达给目标客户&#xff0c;还能够有效地建立和维护客户关系。下面&#xff0c;就让蜂邮探讨…

yolov8自制数据训练集

目录 1.YOLOv8是啥 2.系统环境 3.安装labelimg 3.1安装 3.2启动 labelimg 4.自制分类图片 4.1 YOLO数据集要求 4.2 图片保存目录 4.3 利用labelimg进行标注 4.4 存储图片 4.5 标注文件 5.数据集训练 5.1yaml文件 5.2训练命令 5.3查看训练过程 5.3.1启动tensorb…