5. 状态

news2025/1/16 18:00:55

在这里插入图片描述

一、状态是什么

  • 由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
  • 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问
  • Flink 会进行状态管理,包括状态一致性、故障处理以及高效存储和访问,以 便开发人员可以专注于应用程序的逻辑

二、三种状态

  1. **KeyedState:**根据数据流中定义的 key 来维护和访问,有如下的数据结构:

    • ValueState<T>: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
    • ListState<T>: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List<T>) 进行添加元素,通过 Iterable<T> get() 获得整个列表。还可以通过 update(List<T>) 覆盖当前的列表。
    • ReducingState<T>: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
    • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
    • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries()keys()values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
  2. **OperateState:**算子状态的作用范围限定为算子任务,只有一种数据结构:ListState

    在大部分的 Flink 程序中不需要 Operator State,它一般用于实现一个有状态的 source 或者 sink,并且数据无法按照某一个 key 分区的时候。

    通过实现 CheckPointedFunction 接口来使用 operator state,需要实现下面两个方法:

    void snapshotState(FunctionSnapshotContext context) throws Exception;
    
    void initializeState(FunctionInitializationContext context) throws Exception;
    

    在进行 checkpoint 的时候会调用 snapshotState 方法。initializeState 包括了第一次自定义函数初始化和从之前的 checkpoint 恢复,因此,initializeState 中不仅要定义初始化的逻辑,还要定义状态恢复的逻辑。

  3. **BroadcastState:**广播状态是一种特殊类型的 OperatorState,它用于当下流任务都需要同一份上流的 state 的情形。broadcastState 可以支持 MapState。

    在使用 broadcast 的时候,需要用非广播流来 connect 广播流,如下:

    DataStream<String> output = colorPartitionedStream
                     .connect(ruleBroadcastStream)
                     .process(
                         
                         // KeyedBroadcastProcessFunction 中的类型参数表示:
                         //   1. key stream 中的 key 类型
                         //   2. 非广播流中的元素类型
                         //   3. 广播流中的元素类型
                         //   4. 结果的类型,在这里是 string
                         
                         new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                             // 模式匹配逻辑
                         }
                     );
    

    在 KeyedBroadcastProcessFunction 中,需要重写两个方法:

    public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
    
        public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    
        public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
    
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
    }
    

    使用 broadcast 需要注意:

    • 没有跨 task 通讯:如上所述,这就是为什么只有(Keyed)-BroadcastProcessFunction 中处理广播流元素的方法里可以更改 broadcast state 的内容。 同时,用户需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。
    • **broadcast state 在不同的 task 的事件顺序可能是不同的:**虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同。 所以 broadcast state 的更新不能依赖于流中元素到达的顺序
    • 所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。 这个设计是为了防止在作业恢复后读文件造成的文件热点。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态/改变并发的时候数据没有重复没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(p_new - p_old)会使用轮询调度算法读取之前 task 的 state。
    • 不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State。

三、使用

  1. KeyedState

    import org.apache.flink.api.common.functions.RichFlatMapFunction
    import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
    import org.apache.flink.api.scala.{createTypeInformation, getCallLocationName}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.util.Collector
    
    object KeyedStateTest extends App {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
    
      val source = env.fromCollection(List(
        (1L, 3L),
        (1L, 5L),
        (1L, 7L),
        (1L, 4L),
        (1L, 2L),
        (1L, 4L)
      )).keyBy(_._1)
        .flatMap(new CountWindowAverage())
        .print()
      // the printed output will be (1,4), (1,5), (1,3)
    
      env.execute("ExampleKeyedState")
    }
    
    //创建一个类继承自富函数
    class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
      //申明状态,在open方法中初始化
      private var sum: ValueState[(Long, Long)] = _
    
      override def flatMap(value: (Long, Long), out: Collector[(Long, Long)]): Unit = {
        val tmpCurrentSum = sum.value()
    
        val currentSum = if(tmpCurrentSum != null){
          tmpCurrentSum
        }  else {
          (0L, 0L)
        }
    
        val newSum = (currentSum._1 + 1, currentSum._2 + value._2)
    
        sum.update(newSum)
    
        if(newSum._1 >= 2) {
          out.collect((value._1, newSum._2 / newSum._1))
          sum.clear()
        }
      }
    
      //初始化状态
      override def open(parameters: Configuration): Unit = {
        sum = getRuntimeContext.getState(
          new ValueStateDescriptor[(Long, Long)]("average", classOf[(Long, Long)])
        )
      }
    }
    
  2. OperateState

    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueStateDescriptor}
    import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    import scala.collection.mutable.ListBuffer
    
    object OperateStateTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val source = env.socketTextStream("localhost", 9999)
    
        val stateStream = source.map( new MyMapState(2) )
    
        stateStream.print
    
        env.execute()
      }
    
    }
    
    //创建OperateState要实现CheckpointedFunction接口,并且状态必须是序列化的
    class MyMapState(threshold: Int = 1) extends RichMapFunction[String, String] with CheckpointedFunction {
    
      //申明状态,并申明为可序列化的
      @transient
      private var checkpointedState: ListState[String] = _
    
      private val bufferedElements = ListBuffer[String]()
    
      override def map(value: String): String = {
        bufferedElements += value
    
        if(bufferedElements.size == threshold) {
          val ret = new StringBuilder()
    
          for(element <- bufferedElements) {
            ret.append(element)
          }
    
          bufferedElements.clear()
          ret.toString()
        } else {
          "not reach threshold"
        }
      }
    
    
      //初始化状态
      override def initializeState(context: FunctionInitializationContext): Unit = {
        val descriptor = new ListStateDescriptor[String](
          "buffered-elements",
          TypeInformation.of(new TypeHint[String]() {})
        )
    
        checkpointedState = context.getOperatorStateStore.getListState(descriptor)
    
        //如果使用下面这种方式,创建的是KeyedState
        //chechkpointedState = getRuntimeContext.getListState(new ListStateDescriptor[String]("listState", classOf[String]))
    
      }
    
      override def snapshotState(context: FunctionSnapshotContext): Unit = {
        checkpointedState.clear()
        for (elem <- bufferedElements) {
          checkpointedState.add(elem)
        }
      }
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/992301.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

go通过pprof定位groutine泄漏

日常开发中除了会出现Panic、ErrorInfo等通过日志很容易捕捉到的错误&#xff0c;还会出现内存泄漏、CPU激增等日志难以捕捉的问题。今天小老虎就给大家介绍下如何使用pprof去捕捉内存、CPU这些日志难以排查到的问题。 pprof的访问 pprof是Golang的性能分析工具&#xff0c;可…

Inno Setup 打包的文件以管理员权限运行

在 Inno Setup 安装目录中找到文件 SetupLdr.e32&#xff0c;用软件 ResourceHacker 打开。如下图&#xff0c;点开清单&#xff0c;找到 <requestedExecutionLevel level"asInvoker" uiAccess"false"/></requestedPrivileges>改为 <requ…

Nomad系列-Nomad网络模式

系列文章 Nomad 系列文章 概述 Nomad 的网络和 Docker 的也有很大不同, 和 K8s 的有很大不同. 另外, Nomad 不同版本(Nomad 1.3 版本前后)或是否集成 Consul 及 CNI 等不同组件也会导致网络模式各不相同. 本文详细梳理一下 Nomad 的主要几种网络模式 在Nomad 1.3发布之前&a…

CSS_文字渐变

/* 定义渐变背景样式 */ .gradient-text {background-image: linear-gradient(to right, #ff0000, #00ff00); /* 渐变色范围 */background-clip: text; /* 应用渐变背景到文本 */-webkit-background-clip: text; /* Safari 和 Chrome 的前缀 */color: transparent; /* 将文本颜…

ADS1115 模拟IIC

ADS1115是16位ADC&#xff0c;基准源内部可选&#xff0c;PGA 可提供从 256mV 到 6.144V 的输入范围。 地址可由ADDR引脚决定&#xff0c;一般接地&#xff0c;地址为0x90 写寄存器地址为0x90&#xff0c;读寄存器地址为0x91 ADS1115有4个控制寄存器&#xff0c;0x00,0x01,0x0…

debian apt安装mysqlodbc

mysql的deb包下载地址 下载后上传到linux后&#xff0c; #安装deb包 apt install ./mysql-apt-config_0.8.26-1_all.deb #更新源 apt-get update #搜索包 apt search odbc #安装包 apt-get install mysql-connector-odbc

3. 自定义datasource

一、自定义DataSource ​ 自定义DataSource有两大类&#xff1a;单线程的DataSource和多线程的DataSource 单线程&#xff1a;继承 SourceFunction 多线程&#xff1a;继承 ParallelSourceFunction&#xff0c;继承 RichParallelSourceFunction&#xff08;可以有其他的很多操…

origin中optimal cluster安装报错解决

1.在安装之后程序运行出错&#xff0c;报错信息为缺少numpy包。解决办法&#xff1a;打开窗口-脚本窗口&#xff0c;用pip安装numpy&#xff0c;其他缺少的包可用同样方法解决。 2.有的包在外部python中才有&#xff0c;通过origin无法下载。解决办法&#xff1a;在连接-python…

WIFI版本云音响设置教程阿里云平台版本

文章目录 WIFI本云音响设置教程介绍一、申请设备三元素1.登录阿里云物联网平台2.创建产品3.设置产品参数4.添加设备5.获取三元素 二、设置设备三元素1.打开MQTTConfigTools2.计算MQTT参数3.使用windows电脑的WIFI连接到设备热点4.设置参数5.配置设备连接路由器 三、阿里云物联网…

有始有终!

作者 | 磊哥 来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09; 转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09; 开始是立秋之日&#xff08;8.8 号&#xff09;&#xff0c;结束是白露之时&#xff08;9.8 号&#xff09;。 为期一月&…

Day60|leetcode 84.柱状图中最大的矩形

leetcode 84.柱状图中最大的矩形 题目链接&#xff1a;84. 柱状图中最大的矩形 - 力扣&#xff08;LeetCode&#xff09; 视频链接&#xff1a;单调栈&#xff0c;又一次经典来袭&#xff01; LeetCode&#xff1a;84.柱状图中最大的矩形_哔哩哔哩_bilibili 题目概述 给定 n 个…

CSS3技巧36:backdrop-filter 背景滤镜

CSS3 有 filter 滤镜属性&#xff0c;能给内容&#xff0c;尤其是图片&#xff0c;添加各种滤镜效果。 filter 滤镜详见博文&#xff1a;CSS3中强大的filter(滤镜)属性_css3滤镜_stones4zd的博客-CSDN博客 后续&#xff0c;CSS3 又新增了 backdrop-filter 背景滤镜。 backdr…

卷积概念理解

卷积(convolution)最容易理解的解释_一点一点的进步的博客-CSDN博客 图像处理之卷积模式及C实现_利用卷积模型分类图片 c_扫地工的博客-CSDN博客 卷积的重要的物理意义是&#xff1a;一个函数&#xff08;如&#xff1a;单位响应&#xff09;在另一个函数&#xff08;如&…

进程

目录 进程定义 进程与程序对比 进程分类 系统进程 用户进程 交互进程 批处理进程 守护进程 进程状态 进程组成 ​编辑正文段&#xff08;text&#xff09;和用户数据段 用户数据段 正文段 PCB进程控制块 进程标识信息 处理机状态 进程调度信息 进程控制信息 …

通达信自定义副图行业指标K线指标 HYZS_QD

行业指数:HY_INDEXC,NODRAW; DRAWKLINE(HY_INDEXH,HY_INDEXO,HY_INDEXL,HY_INDEXC); MA5:MA(HY_INDEXC,5),COLORWHITE; {MA10:MA(HY_INDEXC,10),COLORYELLOW,LINETHICK2}; DRAWTEXT_FIX(1,1,1,1,STRCAT(STRCAT(CON2STR(HY_INDEXADV,0),/),STRCAT(CON2STR(HY_INDEXDEC,0), ))),…

06_瑞萨GUI(LVGL)移植实战教程之驱动EC11旋转编码器(GPIO)

本系列教程配套出有视频教程&#xff0c;观看地址&#xff1a;https://www.bilibili.com/video/BV1gV4y1e7Sg 6. 驱动EC11旋转编码器(GPIO) 本次实验我们驱动EC11旋转编码器。 6.1 复制工程 上次实验得出的工程我们可以通过复制在原有的基础上得到一个新的工程。 如果你不清…

XCE18T4K1P40-FJJP40、F4Z1P40规格书(泰兴创航)

关于XCE18T4K1P40-FJJP40、F4Z1P40电连接器规格书 主要性能指标 工作温度&#xff1a;-55℃~200℃相对湿度&#xff1a;温度40℃2℃时达98%振动&#xff1a;频率10-2000Hz&#xff0c;加速度196m/s2冲击&#xff1a;加速度980m/s2机械寿命&#xff1a;5000次壳体材料&#xff1…

05_瑞萨GUI(LVGL)移植实战教程之添加LVGL库,对接显示和触摸驱动

本系列教程配套出有视频教程&#xff0c;观看地址&#xff1a;https://www.bilibili.com/video/BV1gV4y1e7Sg 5. 添加LVGL库&#xff0c;对接显示和触摸驱动 本次实验我们会融合前面实验的成果&#xff0c;添加LVGL库&#xff0c;对接显示和触摸驱动&#xff0c;让屏幕能显示…

金蝶云星空与泛微OA集成的方案落地与实践

打破信息孤岛&#xff0c;泛微OA集成的方案落地与实践 在现代企业内部&#xff0c;不同类型的业务系统和泛微OA平台层出不穷。企业需要找到一种高效的方法来整合和协同这些多样化的系统&#xff0c;同时将它们与泛微OA平台融合&#xff0c;以实现资源整合和高度协同的办公环境…

Win10下python的命令行启动和调用问题

Win10下python的命令行启动和调用问题 Win10下Python的启动问题解决办法 Win10下Python的启动问题 Win10下安装了python&#xff0c;但是命令行启动仍然显示Windows商店界面 同时在C:\Users\用户名\AppData\Local\Microsoft\WindowsApps目录下发现空的python3.exe文件 即便在…