【flink状态管理(2)各状态初始化入口】状态初始化流程详解与源码剖析

news2024/9/24 9:22:58

文章目录

    • 1. 状态初始化总流程梳理
    • 2.创建StreamOperatorStateContext
    • 3. StateInitializationContext的接口设计。
    • 4. 状态初始化举例:UDF状态初始化

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.
        if (!isRunning) {
            LOG.debug("Restoring during invoke will be called.");
            restoreInternal();
        }

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  
    Iterator var2 = this.getAllOperators(true).iterator();  
  
    while(var2.hasNext()) {  
        StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  
        StreamOperator<?> operator = operatorWrapper.getStreamOperator();  
        operator.initializeState(streamTaskStateInitializer);  
        operator.open();  
    }  
  
}

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeState

public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  
        throws Exception {  
    //1. 获取类型序列化器
    final TypeSerializer<?> keySerializer =  
            config.getStateKeySerializer(getUserCodeClassloader());  
    //2. get containingTask
    final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  
    final CloseableRegistry streamTaskCloseableRegistry =  
            Preconditions.checkNotNull(containingTask.getCancelables());  
   //3. create StreamOperatorStateContext
    final StreamOperatorStateContext context =  
            streamTaskStateManager.streamOperatorStateContext(  
                    getOperatorID(),  
                    getClass().getSimpleName(),  
                    getProcessingTimeService(),  
                    this,  
                    keySerializer,  
                    streamTaskCloseableRegistry,  
                    metrics,  
                    config.getManagedMemoryFractionOperatorUseCaseOfSlot(  
                            ManagedMemoryUseCase.STATE_BACKEND,  
                            runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  
                            runtimeContext.getUserCodeClassLoader()),  
                    isUsingCustomRawKeyedState());  
   //4. create stateHandler
    stateHandler =  
            new StreamOperatorStateHandler(  
                    context, getExecutionConfig(), streamTaskCloseableRegistry);  
    timeServiceManager = context.internalTimerServiceManager();  
    //5. initialize OperatorState
    stateHandler.initializeOperatorState(this);  
    //6. set KeyedStateStore in runtimeContext
    runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  
        @Nonnull OperatorID operatorID,  
        @Nonnull String operatorClassName,  
        @Nonnull ProcessingTimeService processingTimeService,  
        @Nonnull KeyContext keyContext,  
        @Nullable TypeSerializer<?> keySerializer,  
        @Nonnull CloseableRegistry streamTaskCloseableRegistry,  
        @Nonnull MetricGroup metricGroup,  
        double managedMemoryFraction,  
        boolean isUsingCustomRawKeyedState)  
        throws Exception {  
    //1. 获取task实例信息
    TaskInfo taskInfo = environment.getTaskInfo();  
    OperatorSubtaskDescriptionText operatorSubtaskDescription =  
            new OperatorSubtaskDescriptionText(  
                    operatorID,  
                    operatorClassName,  
                    taskInfo.getIndexOfThisSubtask(),  
                    taskInfo.getNumberOfParallelSubtasks());  
  
    final String operatorIdentifierText = operatorSubtaskDescription.toString();  
  
    final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  
            taskStateManager.prioritizedOperatorState(operatorID);  
  
    CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  
    OperatorStateBackend operatorStateBackend = null;  
    CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  
    CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  
    InternalTimeServiceManager<?> timeServiceManager;  
  
    try {  
        // 创建keyed类型的状态后端
        // -------------- Keyed State Backend --------------  
        keyedStatedBackend =  
                keyedStatedBackend(  
                        keySerializer,  
                        operatorIdentifierText,  
                        prioritizedOperatorSubtaskStates,  
                        streamTaskCloseableRegistry,  
                        metricGroup,  
                        managedMemoryFraction);  
        //创建operator类型的状态后端
        // -------------- Operator State Backend --------------  
        operatorStateBackend =  
                operatorStateBackend(  
                        operatorIdentifierText,  
                        prioritizedOperatorSubtaskStates,  
                        streamTaskCloseableRegistry);  
        //创建原生类型状态后端
        // -------------- Raw State Streams --------------  
        rawKeyedStateInputs =  
                rawKeyedStateInputs(  
                        prioritizedOperatorSubtaskStates  
                                .getPrioritizedRawKeyedState()  
                                .iterator());  
        streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  
  
        rawOperatorStateInputs =  
                rawOperatorStateInputs(  
                        prioritizedOperatorSubtaskStates  
                                .getPrioritizedRawOperatorState()  
                                .iterator());  
        streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  
        //创建Internal Timer Service Manager
        // -------------- Internal Timer Service Manager --------------  
        if (keyedStatedBackend != null) {  
  
            // if the operator indicates that it is using custom raw keyed state,  
            // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  
                    (prioritizedOperatorSubtaskStates.isRestored()  
                                    && !isUsingCustomRawKeyedState)  
                            ? rawKeyedStateInputs  
                            : Collections.emptyList();  
  
            timeServiceManager =  
                    timeServiceManagerProvider.create(  
                            keyedStatedBackend,  
                            environment.getUserCodeClassLoader().asClassLoader(),  
                            keyContext,  
                            processingTimeService,  
                            restoredRawKeyedStateTimers);  
        } else {  
            timeServiceManager = null;  
        }  
  
        // -------------- Preparing return value --------------  
  
        return new StreamOperatorStateContextImpl(  
                prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  
                operatorStateBackend,  
                keyedStatedBackend,  
                timeServiceManager,  
                rawOperatorStateInputs,  
                rawKeyedStateInputs);  
    } catch (Exception ex) {  
  
。。。。
}

流程梳理:

  1. 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
  2. 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
  3. 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
  4. 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
  5. 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
  6. 将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
在这里插入图片描述

  1. ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。

  2. FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。

  3. StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等

  4. StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

在这里插入图片描述

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {
   super.initializeState(context);
   StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1439423.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

嵌入式学习之Linux入门篇笔记——15,Linux编写第一个自己的命令

配套视频学习链接&#xff1a;http://【【北京迅为】嵌入式学习之Linux入门篇】 https://www.bilibili.com/video/BV1M7411m7wT/?p4&share_sourcecopy_web&vd_sourcea0ef2c4953d33a9260910aaea45eaec8 1.什么是命令&#xff1f; 命令就是可执行程序。 比如 ls -a…

专栏《数据结构与算法:初学者入门指南》序言

&#x1f389;&#x1f389;欢迎光临我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是苏泽&#xff0c;一位对技术充满热情的探索者和分享者。&#x1f680;&#x1f680; &#x1f31f;特别推荐给大家我的最新专栏《数据结构与算法&#xff1a;初学者入…

解决flex gap兼容性问题

前言 一个项目写下来&#xff0c;在网页端预览的时候正常&#xff0c;结果到产品经理手上。 设计稿样式 实际产品手机上样式 产品&#xff1a;“你这玩意儿怎么没间距&#xff1f;” 我&#xff1a;“为什么我的正常&#xff1f;&#xff1f;&#xff1f;呐呐呐你看我手机&a…

CDN相关和HTTP代理

CDN相关和HTTP代理 参考&#xff1a; 《透视 HTTP 协议》——chrono 把这两个放在一起是因为容易搞混&#xff0c;我一开始总以为CDN就是HTTP代理&#xff0c;但是看了极客时间里透视HTTP协议的讲解&#xff0c;感觉又不仅于此&#xff0c;于是专门写下来。 先说结论&#xf…

Redis篇之redis是单线程

一、redis是单线程 Redis是单线程的&#xff0c;但是为什么还那么快&#xff1f;主要原因有下面3点原因&#xff1a; 1. Redis是纯内存操作&#xff0c;执行速度非常快。 2. 采用单线程&#xff0c;避免不必要的上下文切换可竞争条件&#xff0c;多线程还要考虑线程安全问题。 …

YOLOv8改进 | 利用训练好权重文件计算YOLOv8的FPS、推理每张图片的平均时间(科研必备)

一、本文介绍 本文给大家带来的改进机制是利用我们训练好的权重文件计算FPS,同时打印每张图片所利用的平均时间,模型大小(以MB为单位),同时支持batch_size功能的选择,对于轻量化模型的读者来说,本文的内容对你一定有帮助,可以清晰帮你展示出模型速度性能的提升以及轻量…

python coding with ChatGPT 打卡第18天| 二叉树:从中序与后序遍历序列构造二叉树、最大二叉树

相关推荐 python coding with ChatGPT 打卡第12天| 二叉树&#xff1a;理论基础 python coding with ChatGPT 打卡第13天| 二叉树的深度优先遍历 python coding with ChatGPT 打卡第14天| 二叉树的广度优先遍历 python coding with ChatGPT 打卡第15天| 二叉树&#xff1a;翻转…

在Visual Studio中引用和链接OpenSceneGraph (OSG) 库

在Visual Studio中引用和链接OpenSceneGraph (OSG) 库&#xff0c;按照以下步骤操作&#xff1a; 构建或安装OSG库 下载OpenSceneGraph源代码&#xff08;如3.0版本&#xff09;并解压。使用CMake配置项目&#xff0c;为Visual Studio生成解决方案文件。通常您需要设置CMake中的…

ctfshow-命令执行(web73-web77)

web73 用不了上一题的通用poc了 因为禁用了strlen 但是可以改一个函数自定义一个函数只要是能实现strlen效果即可 cvar_export(scandir(/));exit(0); 根目录下有一个flagc.txt文件 cinclude(/flagc.txt);exit(0); web74 禁用了scandir函数 那就使用web72的glob协议 查看目录下…

Wireshark不显示Thrift协议

使用Wireshark对thrift协议进行抓包&#xff0c;但是只显示了传输层的tcp协议&#xff1a; "右键" -> "Decode As" 选择thrift的tcp端口 将“当前”修改为Thrift&#xff0c;然后点击“确定” 设置后&#xff0c;可以发现Wireshark里面显示的协议从Tcp变…

【每日一题】LeetCode——链表的中间结点

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;小杨水平有…

Unity3d Shader篇(五)— Phong片元高光反射着色器

文章目录 前言一、Phong片元高光反射着色器是什么&#xff1f;1. Phong片元高光反射着色器的工作原理2. Phong片元高光反射着色器的优缺点优点缺点 二、使用步骤1. Shader 属性定义2. SubShader 设置3. 渲染 Pass4. 定义结构体和顶点着色器函数5. 片元着色器函数 三、效果四、总…

vue对于安装依赖时不好习惯的反省

因为一个不好的习惯&#xff0c;我总是喜欢–save去安装依赖包&#xff0c;然后发现最后打包后的内容总是很大。就想着怎么能让包小一些&#xff0c;就发现我遗漏了vue安装依赖的一个小知识点 安装依赖的时候可以-s -d -g去安装&#xff0c;要根据使用的内容选择去安装&#xf…

第一个 Angular 项目 - 静态页面

第一个 Angular 项目 - 静态页面 之前的笔记&#xff1a; [Angular 基础] - Angular 渲染过程 & 组件的创建 [Angular 基础] - 数据绑定(databinding) [Angular 基础] - 指令(directives) 这是在学完了上面这三个内容后能够完成的项目&#xff0c;目前因为还没有学到数…

【漏洞复现】多语言药房管理系统MPMS文件上传漏洞

Nx01 产品简介 多语言药房管理系统 (MPMS) 是用 PHP 和 MySQL 开发的, 该软件的主要目的是在药房和客户之间提供一套接口&#xff0c;客户是该软件的主要用户。该软件有助于为药房业务创建一个综合数据库&#xff0c;并根据到期、产品等各种参数提供各种报告。 Nx02 漏洞描述 …

【服务器数据恢复】HP EVA虚拟化磁盘阵列数据恢复原理方案

EVA存储结构&原理&#xff1a; EVA是虚拟化存储&#xff0c;在工作过程中&#xff0c;EVA存储中的数据会不断地迁移&#xff0c;再加上运行在EVA上的应用都比较繁重&#xff0c;磁盘负载高&#xff0c;很容易出现故障。EVA是通过大量磁盘的冗余空间和故障后rss冗余磁盘动态…

笔记---dp---数字三角形模型

所谓数字三角形模型&#xff0c;即是从数字三角形这一题衍生出来的 题目为经典题目&#xff0c;不再赘述&#xff0c;此笔记根据AcWing算法提高课来进行对数字三角形模型衍生例题的记录 题目关系如下&#xff08;见AcWing里的AcSaber&#xff09;&#xff1a; AcWing.1015.摘…

Nature Machine Intelligence 使用机器学习驱动的可拉伸智能纺织手套捕捉复杂的手部动作和物体交互

研究背景 对灵巧手运动的精确实时跟踪在人机交互、元宇宙、机器人和远程医疗等领域有着广泛的应用。当前的可穿戴设备中的大多数仅用于检测精度有限的特定手势&#xff0c;并且没有解决与设备的可靠性、准确性和可清洗相关的挑战。对传感器直接放置在用户的手上有严格的要求&am…

自然人如何代开发票

1&#xff1a;登录国家税务总局深圳市电子税务局 地址&#xff1a;国家税务总局深圳市电子税务局 2&#xff1a;个人所得税APP 扫描登录 或 身份证登录 3&#xff1a;选择 自然人代开增值税电子普通发票 4&#xff1a;申请代开 5&#xff1a;人脸识别 6&#xff1a;画框的…

Ubuntu20.04更新Cmake版本详解

最近在跑一个融合惯导定位的slam框架ins_eskf_kitti&#xff0c;在框架的安装过程中&#xff0c;需要对从GitHub上克隆下来的glog进行编译。其命令如下&#xff1a; glog&#xff1a; git clone https://github.com/google/glog.git cd glog mkdir build cd build cmake .. m…