6 Flink 状态管理

news2025/2/2 15:08:58

6 Flink 状态管理

  • 1. State-Keyed State
  • 2. State-Operator State
  • 3. Broadcast State

我们前面写的 wordcount 的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。
因此可以说flink因为引入了state和checkpoint所以才支持的exactly once
首先区分一下两个概念:

state:
state一般指一个具体的task/operator的状态:
state数据默认保存在java的堆内存中,TaskManage节点的内存中。
operator表示一些算子在运行的过程中会产生的一些中间结果。

checkpoint:
checkpoint可以理解为checkpoint是把state数据定时持久化存储了,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。
注意:task(subTask)是Flink中执行的基本单位。operator指算子(transformation)
State可以被记录,在失败的情况下数据还可以恢复。

Flink中有两种基本类型的State:
Keyed State
Operator State

Keyed State和Operator State,可以以两种形式存在:
原始状态(raw state)
托管状态(managed state)
托管状态是由Flink框架管理的状态。

我们说operator算子保存了数据的中间结果,中间结果保存在什么类型中,如果我们这里是托管状态,则由flink框架自行管理
原始状态由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

1. State-Keyed State

基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解为分区过的Operator State。
保存state的数据结构:
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。
ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素。
需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。

  1. ValueState
    使用ValueState保存中间结果对下面数据进行分组求和。
    开发步骤:
  2. 获取流处理执行环境
  3. 加载数据源
  4. 数据分组
  5. 数据转换,定义ValueState,保存中间结果
  6. 数据打印
  7. 触发执行
    ValueState:测试数据源:
 List(
   (1L, 4L),
   (2L, 3L),
   (3L, 1L),
   (1L, 2L),
   (3L, 2L),
   (1L, 2L),
   (2L, 2L),
   (2L, 9L)
)

示例代码:

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

object TestKeyedState {
  class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
    /**
     * ValueState状态句柄. 第一个值为count,第二个值为sum。
     */
    private var sum: ValueState[(Long, Long)] = _
    override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
      // 获取当前状态值
      val tmpCurrentSum: (Long, Long) = sum.value
      // 状态默认值
      val currentSum = if (tmpCurrentSum != null) {
        tmpCurrentSum
      } else {
        (0L, 0L)
      }
      // 更新
      val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
      // 更新状态值
      sum.update(newSum)

      // 如果count >=3 清空状态值,重新计算
      if (newSum._1 >= 3) {
        out.collect((input._1, newSum._2 / newSum._1))
        sum.clear()
      }
    }
    override def open(parameters: Configuration): Unit = {
      sum = getRuntimeContext.getState(
        new ValueStateDescriptor[(Long, Long)]("average", // 状态名称
          TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 状态类型
      )
    }
  }  
  def main(args: Array[String]): Unit = {
    //初始化执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //构建数据源
    val inputStream: DataStream[(Long, Long)] = env.fromCollection(
      List(
        (1L, 4L),
        (2L, 3L),
        (3L, 1L),
        (1L, 2L),
        (3L, 2L),
        (1L, 2L),
        (2L, 2L),
        (2L, 9L))
    )
    //执行数据处理
    inputStream.keyBy(0)
      .flatMap(new CountWithKeyedState)
      .setParallelism(1)
      .print
    //运行任务
    env.execute
  }
}  
  1. MapState
    使用MapState保存中间结果对下面数据进行分组求和:
  2. 获取流处理执行环境
  3. 加载数据源
  4. 数据分组
  5. 数据转换,定义MapState,保存中间结果
  6. 数据打印
  7. 触发执行
    MapState:测试数据源:
List(
   ("java", 1),
   ("python", 3),
   ("java", 2),
   ("scala", 2),
   ("python", 1),
   ("java", 1),
   ("scala", 2)
)  

示例代码:

object MapState {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    /**
      * 使用MapState保存中间结果对下面数据进行分组求和
      * 1.获取流处理执行环境
      * 2.加载数据源
      * 3.数据分组
      * 4.数据转换,定义MapState,保存中间结果
      * 5.数据打印
      * 6.触发执行
      */
    val source: DataStream[(String, Int)] = env.fromCollection(List(
      ("java", 1),
      ("python", 3),
      ("java", 2),
      ("scala", 2),
      ("python", 1),
      ("java", 1),
      ("scala", 2)))
  
    source.keyBy(0)
      .map(new RichMapFunction[(String, Int), (String, Int)] {
        var mste: MapState[String, Int] = _

        override def open(parameters: Configuration): Unit = {
          val msState = new MapStateDescriptor[String, Int]("ms",
            TypeInformation.of(new TypeHint[(String)] {}),
            TypeInformation.of(new TypeHint[(Int)] {}))

          mste = getRuntimeContext.getMapState(msState)
        }
        override def map(value: (String, Int)): (String, Int) = {
          val i: Int = mste.get(value._1)
          mste.put(value._1, value._2 + i)
          (value._1, value._2 + i)
        }
      }).print()

    env.execute()
  }
}  

2. State-Operator State

与Key无关的State,与Operator绑定的state,整个operator只对应一个state。
保存state的数据结构:
ListState
举例来说,Flink中的 Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。
步骤:
1.获取执行环境
2.设置检查点机制:路径,重启策略
3.自定义数据源
需要继承并行数据源和CheckpointedFunction
设置listState,通过上下文对象context获取
数据处理,保留offset
制作快照
4.数据打印
5.触发执行
示例代码:

import java.util

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object ListOperate {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    env.enableCheckpointing(5000)
    env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //重启策略
    env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))

    //模拟kakfa偏移量
    env.addSource(new MyRichParrelSourceFun)
      .print()

    env.execute()
  }

}

class MyRichParrelSourceFun extends RichParallelSourceFunction[String]
  with CheckpointedFunction {

  var listState: ListState[Long] = _
  var offset: Long = 0L

  //任务运行
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {

    val iterState: util.Iterator[Long] = listState.get().iterator()

    while (iterState.hasNext) {
      offset = iterState.next()
    }

    while (true) {

      offset += 1
      ctx.collect("offset:"+offset)
      Thread.sleep(1000)
      if(offset > 10){
        1/0
      }
    }

  }

  //取消任务
  override def cancel(): Unit = ???

  //制作快照
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    listState.clear()
    listState.add(offset)

  }

  //初始化状态
  override def initializeState(context: FunctionInitializationContext): Unit = {

    listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long](
      "listState", TypeInformation.of(new TypeHint[Long] {})
    ))
  }
}

3. Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。

  1. API介绍
    通常,我们首先会创建一个Keyed或Non-Keyed的Data Stream,然后再创建一个Broadcasted Stream,最后通过Data Stream来连接(调用connect方法)到Broadcasted Stream上,这样实现将Broadcast State广播到Data Stream下游的每个Task中。
    如果Data Stream是Keyed Stream,则连接到Broadcasted Stream后,添加处理ProcessFunction时需要使用KeyedBroadcastProcessFunction来实现,下面是KeyedBroadcastProcessFunction的API,代码如下所示:
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
    public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}

上面泛型中的各个参数的含义,说明如下:
KS:表示Flink程序从最上游的Source Operator开始构建Stream,当调用keyBy时所依赖的Key的类型;
IN1:表示非Broadcast的Data Stream中的数据记录的类型;
IN2:表示Broadcast Stream中的数据记录的类型;
OUT:表示经过KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法处理后输出结果数据记录的类型。
如果Data Stream是Non-Keyed Stream,则连接到Broadcasted Stream后,添加处理ProcessFunction时需要使用BroadcastProcessFunction来实现,下面是BroadcastProcessFunction的API,代码如下所示:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
    }

上面泛型中的各个参数的含义,与前面KeyedBroadcastProcessFunction的泛型类型中的后3个含义相同,只是没有调用keyBy操作对原始Stream进行分区操作,就不需要KS泛型参数。
注意事项:
1.Broadcast State 是Map类型,即K-V类型。
2.Broadcast State 只有在广播一侧的方法中processBroadcastElement可以修改;在非广播一侧方法中processElement只读。
3.Broadcast State在运行时保存在内存中。
2) 场景举例
1.动态更新计算规则: 如事件流需要根据最新的规则进行计算,则可将规则作为广播状态广播到下游Task中。
2.实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2290823.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第1章 量子暗网中的血色黎明

月球暗面的危机与阴谋 量子隧穿效应催生的幽蓝电弧&#xff0c;于环形山表面肆意跳跃&#xff0c;仿若无数奋力挣扎的机械蠕虫&#xff0c;将月球暗面的死寂打破&#xff0c;徒增几分诡异。艾丽伫立在被遗弃的“广寒宫”量子基站顶端&#xff0c;机械义眼之中&#xff0c;倒映着…

MCU内部ADC模块误差如何校准

本文章是笔者整理的备忘笔记。希望在帮助自己温习避免遗忘的同时&#xff0c;也能帮助其他需要参考的朋友。如有谬误&#xff0c;欢迎大家进行指正。 一、ADC误差校准引言 MCU 片内 ADC 模块的误差总包括了 5 个静态参数 (静态失调&#xff0c;增益误差&#xff0c;微分非线性…

【Rust自学】15.4. Drop trait:告别手动清理,释放即安全

喜欢的话别忘了点赞、收藏加关注哦&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 15.4.1. Drop trait的意义 类型如果实现了Drop trait&#xff0c;就可以让程序员自定义当值离开作用域时发生的操作。例如文件、网络资源…

【Block总结】CPCA,通道优先卷积注意力|即插即用

论文信息 标题: Channel Prior Convolutional Attention for Medical Image Segmentation 论文链接: arxiv.org 代码链接: GitHub 创新点 本文提出了一种新的通道优先卷积注意力&#xff08;CPCA&#xff09;机制&#xff0c;旨在解决医学图像分割中存在的低对比度和显著…

信息学奥赛一本通 1607:【 例 2】任务安排 2 | 洛谷 P10979 任务安排 2

【题目链接】 ybt 1607&#xff1a;【 例 2】任务安排 2 洛谷 P10979 任务安排 2 注&#xff1a;ybt1607中n最大达到 1 0 4 10^4 104&#xff0c;洛谷P10979中n最大达到 3 ∗ 1 0 5 3*10^5 3∗105&#xff0c;本题解统一认为n最大达到 3 ∗ 1 0 5 3*10^5 3∗105。 【题目考点…

OFDM系统仿真

1️⃣ OFDM的原理 1.1 介绍 OFDM是一种多载波调制技术&#xff0c;将输入数据分配到多个子载波上&#xff0c;每个子载波上可以独立使用 QAM、PSK 等传统调制技术进行调制。这些子载波之间互相正交&#xff0c;从而可以有效利用频谱并减少干扰。 1.2 OFDM的核心 多载波调制…

【Go语言圣经】第四节:复合数据类型

第四章&#xff1a;复合数据类型 本节主要讨论四种类型——数组、slice、map和结构体。 数组和结构体都是有固定内存大小的数据结构。相比之下&#xff0c;slice 和 map 则是动态的数据结构&#xff0c;它们可以根据需要动态增长。 4.1 数组 数组是一个定长的由特定类型元素…

完美还是完成?把握好度,辨证看待

完美还是完成&#xff1f; 如果说之前这个答案有争议&#xff0c;那么现在&#xff0c;答案毋庸置疑 ■为什么完美大于完成 ●时间成本&#xff1a; 做事不仅要考虑结果&#xff0c;还要考虑时间和精力&#xff0c;要说十年磨一剑的确质量更好&#xff0c;但是现实没有那么多…

Many Whelps! Handle It! (10 player) Many Whelps! Handle It! (25 player)

http://db.nfuwow.com/80/?achievement4403 http://db.nfuwow.com/80/?achievement4406 最少扣你50DKP! 第二阶段 当奥妮克希亚升空后&#xff0c;在10秒内引出50只奥妮克希亚雏龙&#xff0c;随后击败奥妮克希亚。 World of Warcraft [CLASSIC][80猎人][Grandel][最少扣你5…

【React】PureComponent 和 Component 的区别

前言 在 React 中&#xff0c;PureComponent 和 Component 都是用于创建组件的基类&#xff0c;但它们有一个主要的区别&#xff1a;PureComponent 会给类组件默认加一个shouldComponentUpdate周期函数。在此周期函数中&#xff0c;它对props 和 state (新老的属性/状态)会做一…

MongoDb user自定义 role 添加 action(collStats, EstimateDocumentCount)

使用 mongosh cd mongsh_bin_path mongosh “mongodb://user:passip:port/db”这样就直接进入了对应的db 直接输入&#xff1a; 这样 role “read_only_role" 就获得了3个 action&#xff0c; 分别是 查询&#xff0c;列举集合&#xff0c;集合元数据查询 P.S: 如果没有 …

蓝桥杯刷题DAY2:二维前缀和 一维前缀和 差分数组

闪耀的灯光 &#x1f4cc; 题目描述 蓝桥公园是一个适合夜间散步的好地方&#xff0c;公园可以被视为由 n m 个矩形区域构成。每个区域都有一盏灯&#xff0c;初始亮度为 a[i][j]。 小蓝可以选择一个大的矩形区域&#xff0c;并按下开关一次&#xff0c;这将使得该区域内每盏…

C++初阶 -- 手撕string类(模拟实现string类)

目录 一、string类的成员变量 二、构造函数 2.1 无参版本 2.2 有参版本 2.3 缺省值版本 三、析构函数 四、拷贝构造函数 五、c_str函数 六、operator重载 七、size函数 八、迭代器iterator 8.1 正常版本 8.2 const版本 九、operator[] 9.1 正常版本 9.2 const版…

【Unity3D】实现2D角色/怪物死亡消散粒子效果

核心&#xff1a;这是一个Unity粒子系统自带的一种功能&#xff0c;可将粒子生成控制在一个Texture图片网格范围内&#xff0c;并且粒子颜色会自动采样图片的像素点颜色&#xff0c;之后则是粒子编辑出消散效果。 Particle System1物体&#xff08;爆发式随机速度扩散10000个粒…

85.[1] 攻防世界 WEB easyphp

进入靶场 属于代码审计 <?php // 高亮显示当前 PHP 文件的源代码&#xff0c;常用于调试或展示代码 highlight_file(__FILE__);// 初始化两个标志变量&#xff0c;用于后续条件判断 $key1 0; $key2 0;// 从 GET 请求中获取参数 a 和 b $a $_GET[a]; $b $_GET[b];// 检…

pytorch图神经网络处理图结构数据

人工智能例子汇总&#xff1a;AI常见的算法和例子-CSDN博客 图神经网络&#xff08;Graph Neural Networks&#xff0c;GNNs&#xff09;是一类能够处理图结构数据的深度学习模型。图结构数据由节点&#xff08;vertices&#xff09;和边&#xff08;edges&#xff09;组成&a…

海外问卷调查,最常用到的渠道查有什么特殊之处

市场调研&#xff0c;包含市场调查和市场研究两个步骤&#xff0c;是企业和机构根据经营方向而做出的决策问题&#xff0c;最终通过海外问卷调查中的渠道查&#xff0c;来系统地设计、收集、记录、整理、分析、研究市场反馈的工作流程。 市场调研的工作流程包括&#xff1a;确…

【Uniapp-Vue3】解决uni-popup弹窗在安全区显示透明问题

我们在使用uni-popup时&#xff0c;如果想要给弹出内容添加一个背景颜色&#xff0c;我们会发现在安全区域是不显示该背景颜色的。 首先根据如下的目录结构找到uni-popup.vue文件 在该文件中找到bottom配置&#xff0c;将红箭头所指代码注释掉 下面的安全区域就没有了&#xff…

项目练习:重写若依后端报错cannot be cast to com.xxx.model.LoginUser

文章目录 一、情景说明二、解决办法 一、情景说明 在重写若依后端服务的过程中 使用了Redis存放LoginUser对象数据 那么&#xff0c;有存就有取 在取值的时候&#xff0c;报错 二、解决办法 方法1、在TokenService中修改如下 getLoginUser 方法中&#xff1a;LoginUser u…

核心集:DeepCore: A Comprehensive Library for CoresetSelection in Deep Learning

目录 一、TL&#xff1b;DR 二、为什么研究核心集&#xff1f; 三、问题定义和如何做 3.1 问题定义 3.2 业界方法 3.2.1 基于几何的方法 3.2.2 基于不确定性的方法 3.2.3 基于误差/损失的方法 3.2.5 GraNd 和 EL2N 分数 3.2.6 重要性采样 3.2.7 基于决策边界的办法 …