flink 的 State

news2025/3/1 22:03:06

目录

一、前言

二、什么是State

2.1:什么时候需要历史数据

2.2:为什么要容错,以及checkpoint如何进行容错

2.3:state basckend 又是什么

三、有哪些常见的是 State

四、 State的使用

五、State backend

5.1  MemoryStateBackend:

5.2  FsStatebackend:

5.3  RocksDBStateBackend:

六、Checkpoint

七、 Deep

7.1 Checkpoint Barries


 

一、前言

首先State是flink中的一个非常基本且重要的概念,本文将介绍什么是State ,如何使用State,

State的存储和原理。以及State衍生的一些概念和应用。

二、什么是State

一种为了满足算子计算时需要历史数据需求的,使用checkpoint机制进行容错,存储在state backend 的数据结构。

首先state 其实就是一种数据结构。然后上面定义中隐含了三个基本知识点:

2.1:什么时候需要历史数据

    去重:在流处理系统中,上游的系统数据可能会有重复,落到下游是希望把重复的数据去掉,此时就需要记录历史的数据。

    窗口计算:在触发窗口计算函数前,需要将窗口中手机的数据保存起来,等到触发时进行计算。

    机器学习/深度学习:如训练的模型以及当前模型的参数也是一种状态,机器学习可以每次都用一个数据集,需要在数据集上进行学习,对模型进行一个反馈。    

2.2:为什么要容错,以及checkpoint如何进行容错

2.3:state basckend 又是什么

三、有哪些常见的是 State

最常见的是Keyed State 应用于keyedStreamh上,必须在KeyBy操作之后使用。它的特点是 同一个sub task 上的同一个 key 共享一个 state 。 另外还有 operator state ,顾名思义每一个operator state 都只有一个operation 的实列绑定。常见的 operation state 是 source state ,列如记录当前source 的 offset 。它的特点是  同一个 sub task 共享一个  state 。另外还有一种特殊的 operation state 称为 broadcast state , 它的特点是 同一个算子的多个 sub task 共享一个 state   。

四、 State的使用

这里以常用的 Keyed State 进行举例。前面说了 State 本质上就是一种用来存储数据的数据结构,那么作为 Keyed State,都支持哪些数据结构呢?以下列举了常见的几种数据结构:

ValueState 存储单个值,比如 Wordcount,用 Word 当 Key,State 就是它的 Count。这里面的单个值可能是数值或者字符串,作为单个值,访问接口可能有两种,get 和 set。在 State 上体现的是 update(T) / T value()。

MapState 的状态数据类型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。

ListState 状态数据类型是 List,访问接口如 add、update 等

flink官网的State  :

Working with State | Apache FlinkWorking with State # In this section you will learn about the APIs that Flink provides for writing stateful programs. Please take a look at Stateful Stream Processing to learn about the concepts behind stateful stream processing.Keyed DataStream # If you want to use keyed state, you first need to specify a key on a DataStream that should be used to partition the state (and also the records in the stream themselves).https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _
  
  /** 
  也可以使用 lazy 的方式对 state 进行初始化
  lazy private val sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    ) 
  **/

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}


object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleKeyedState")
}

以上代码的功能是对输入的流数据进行平均数计算,当输入的数据大于等于 2 个时,触发计算。这里有几点需要注意:

  • 因为 state 的初始化需要用到运行时上下文,所以定义的类需要继承 RichXXFunction
  • state 有两种初始化方式,一种是在成员变量初定义并在 open 函数中初始化。另一种是直接在成员变量处通过 lazy 的方式进行定义和初始化。
  • 这里的例子中使用的是 ValueState,他的 get 和 put 方法分别是 .value().update()
  • state 除了需要我们自己维护状态更新,状态的删除也需要在合适的时间点通过调用 clear
    方法实现。

使用 state 除了继承 RichXXFunction 外还可以直接使用系统提供的函数。如 keyBy 之后直接使用 flatMapWithState

五、State backend

前面介绍了 State 的类型和常见的数据结构,那么这些 state 存储的介质有哪些呢? Flink 提供了三种存储 State 的介质:

5.1  MemoryStateBackend:

  • 构造方法: MemoryStateBackend( int maxStateSize, boolean asynchronousSnapshots )
  • 存储方式:
    • State: TaskManager 内存
    • Checkpoint: Jobmanager 内存
  • 使用场景:本地测试用,不推荐生产场景使用

5.2  FsStatebackend:

  • 构造方法: FaStateBackend( URI checkpointDataUri, boolean asynchronousSnapshots )

  • 存储方式:

    • State:Taskmanager 内存
    • Checkpoint: 外部文件系统( 本地或 HDFS )
  • 使用场景:常规使用 State 的作业,可以在生产中使用

5.3  RocksDBStateBackend:

  • 构造方法:RocksDBStateBackend( URI checkpointDataUri, boolean enableIncrementalCheckpointing )
  • 存储方式:
    • State: TaskManager 上的 KV 数据库(实际使用内存 + 磁盘)

    • Checkpoint: 外部文件系统(本地或 HDFS )

  • 使用场景:超大状态作业,对性能要求不高的生产场景

六、Checkpoint

前面对 State 的使用中没有考虑容错的问题,当集群出现故障时进行恢复时,State 的值肯定不会从头开始计算,这就需要进行容错。State 使用 Checkpoint 机制进行容错。简单来说就是定时制作分布式快照,当出现故障需要进行恢复时,将所有 Task 恢复到最近一次成功的 Checkpoint 状态中,然后从那个点开始继续处理。Checkpoint 通过 Barries 对齐机制保证了恰好一次的一致性语义,关于 Barries 的原理后面将进行详细说明。

七、 Deep

7.1 Checkpoint Barries

checkpoint 是 jobmanager 从 source 触发到下游所有节点完成的一次全局操作。checkpoint barriers 和 watermark 类似,都是一种特殊的事件。对某一 subtask 而言,checkpoint 表示所有 subtask 恰好处理完(不能多处理,也不能少处理。为了状态恢复时保持一致性)某个相同数据。watermark 表示这之前的数据已经接收完毕。
watermark在多 subtask 上游向下游传递时,是广播 + 取上游最小 watermark 作为当前 task 的watermark,不取最小 watermark 会丢数据。
checkpoint barriers 在多 subtask 上游向下游传递时,是广播 + checkpoint barriers 对齐(alignment)。所谓对齐就是下游 subtask 会等待他上游所有分区的 subtask 的 checkpoint barriers 都到达才进行 checkpoint。上游已经到达 checkpoint barriers 的 substask 后续数据会缓存,没有到达 checkpoint barriers 的 subtask 数据会继续处理直到 checkpoint barriers 到达。

 

以 even 流的 Sum 算子为例,从图中可以看到先接收到 Source1 的 barrier 后接收到 Source2 的 barrier( 分别对应蓝色和黄色的三角 )。所以在接收到 Source1 的 barrier 后,对后面值为 4 的蓝流数据进行了缓存没有进行下一步计算,因为这个数据属于下一个 checkpoint。而在接收到 Source2 的 barrier 之前,对值为 4 的黄流数据照常进行计算直到接收到 Source2 的 barrier 为止。这就是所谓的 barriers alignment。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/412349.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

进程,线程,调度和调度算法基本知识

进程 我们编写的代码只是一个存储在硬盘的静态文件,通过编译后就会生成二进制可执行文件,当我们运行这个可执行文件后,它会被装载到内存中,接着 CPU 会执行程序中的每一条指令,那么这个运行中的程序,就被称…

【C++】内联函数理解

内联函数 内联函数的使用是对于C语言中宏函数的一种改进,他继承了宏的优点并避免了宏的缺点。 宏的优点:a. 代码复用性高 b. 宏函数减少栈帧建立,提高效率 宏的缺点:a. 可读性差 b. 没有类型安全检查 c. 不方便调试 C基本不再建议…

银行数字化转型导师坚鹏:金融数据治理、数据安全政策解读

金融数据治理、数据安全政策解读及大数据应用课程背景: 很多银行存在以下问题: 不知道如何准确理解金融数据治理及数据安全相关政策 不清楚金融数据治理及数据安全相关政策对银行有什么影响? 不清楚如何有效应用金融数据治理及数据安全相关…

软考软件设计师 下午试题二笔记

E-R图基本图形元素 实体 一个实体的存在要以另一个实体存在为前提,这个就是弱实体,比如家属和职工,家属的存在就是依赖于职工 属性 属性带下划线的是主键 联系 三个实体之间的联系 试题二问题一例题 问题二 将er图转成关系模式就是问题二答…

Cell Discovery:人类特异基因促进大脑皮层折叠新机制

在人类进化过程中,新皮层的扩张与智力的提高和认知功能的改善密切相关。这种扩张的一个关键方面是大脑皮层沟回的形成,它使扩张的皮质表面积能够适应有限的颅骨空间。这些进化特征主要依赖于多种神经干细胞和祖细胞亚型及其神经源性分裂产生的更多数量的…

《计算机网络-自顶向下》05. 网络层-控制平面

文章目录路由控制方式每路由控制逻辑集中式控制路由选择算法LS —— 链路状态路由选择算法DV —— 距离向量路由选择算法LS 和 DV 算法的比较自治系统内部路由协议RIPOSPF自治系统外部路由协议:BGP通告 BGP 路由信息选择最好的路由相关术语热土豆选择路由选择算法&a…

Swagger教程

Swagger 目标 Swagger简介【了解】 Springboot整合swagger【掌握】 Swagger 常用注解【掌握】 一、Swagger简介 ​ Swagger 是一系列 RESTful API 的工具,通过 Swagger 可以获得项目的⼀种交互式文档,客户端 SDK 的自 动生成等功能。 ​ Swagger …

TryHackMe-Year of the Owl(Windows渗透测试)

Year of the Owl 当迷宫在你面前,你迷失了方向时,有时跳墙思考是前进的方向。 端口扫描 循例 nmap SMB枚举 smbmap enum4linux也什么都没有 Web枚举 80端口 gobuster扫到一堆403,并没有什么有用的信息 443端口与80端口一致 47001端口依…

【SQL】公网远程访问局域网SQL Server数据库【无公网IP内网穿透】

目录 1.前言 2.本地安装和设置SQL Server 2.1 SQL Server下载 2.2 SQL Server本地连接测试 2.3 Cpolar内网穿透的下载和安装 2.3 Cpolar内网穿透的注册 3.1 Cpolar云端设置 3.2 Cpolar本地设置 4.公网访问测试 5.结语 转发自CSDN远程穿透的文章:[无需公网IP&am…

详解以太坊

以太坊原理 以太坊通过建立终极的抽象的基础层-内置有图灵完备编程语言的区块链-使得任何人都能够创建合约和去中心化应用,并在其中设立他们自由定义的所有权规则、交易方式和状态转换函数。 图灵完备:能够运行非常复杂的运算,最简单的理解…

基于共享储能电站的工业用户日前优化经济调度

目录 1 主要内容 共享电站示意图 目标函数 2 部分程序 3 程序结果 4 程序链接 1 主要内容 该程序方法复现《基于共享储能电站的工业用户日前优化经济调度》算例2和算例3,根据共享储能电站的商业运营模式,将共享储能电站应用到工业用户经济优化调度…

〖Python网络爬虫实战⑨〗- 正则表达式基本原理

订阅:新手可以订阅我的其他专栏。免费阶段订阅量1000 python项目实战 Python编程基础教程系列(零基础小白搬砖逆袭) 说明:本专栏持续更新中,目前专栏免费订阅,在转为付费专栏前订阅本专栏的,可以免费订阅付…

【Linux】用户命令(创建,修改,切换,删除,密码)

目录 1.创建 查看用户信息 查看id 2.修改 修改用户名 修改用户uid 操作前: 操作后 修改组名 操作前: 操作后: 修改组id 操作前: 操作后: 操作前: 操作后: 3.切换用户 4.删除 操作前: 操作…

如何在Spring Boot中使用Spring MVC

目录 1.MVC 2.Spring MVC 3.Spring Boot中使用Spring MVC 3.1.配置 3.1.1.文件配置 3.1.2.代码配置 3.2.使用 3.2.1.映射处理器 3.2.2.传参 3.2.3.参数转换 3.2.4.数据校验 3.2.5.数据模型 3.2.6.视图和解析器 3.2.7.拦截器 1.MVC MVC 是一种常见的软件设计模式…

企业级信息系统开发讲课笔记2.4 利用MyBatis实现条件查询

文章目录零、本节学习目标一、查询需求二、打开MyBatisDemo项目三、对学生表实现条件查询(一)创建学生映射器配置文件(二)在MyBatis配置文件里注册学生映射器配置文件(三)创建学生映射器接口(四…

macOS Ventura 13.3.1 (22E261) Boot ISO 原版可引导镜像

本站下载的 macOS 软件包,既可以拖拽到 Applications(应用程序)下直接安装,也可以制作启动 U 盘安装,或者在虚拟机中启动安装。另外也支持在 Windows 和 Linux 中创建可引导介质。 macOS Ventura 13.3.1 为 Mac 提供下…

os库的使用与第三方库安装脚本

os库基本介绍 os 顾名思义,就是与操作系统相关的标准库。如:文件,目录,执行系统命令等。 os库是Python标准库,包含几百个函数 常用路径操作、进程管理、环境参数等几类 路径操作:os.path子库&#xff0…

安捷伦34970A

18320918653 34970A Agilent 34970A 数据采集器|安捷伦数据采集器|34970A 您可信任的测量: 我们把销售良好数字多用表测量引擎嵌入在3槽主机箱中。您能获得优异的测量能力,带有内置信号调整的通用输入,模块化的灵活性,低廉的售…

Ubuntu20.04配置CuckooSandbox环境

Ubuntu20.04配置CuckooSandbox环境 因为最近要做恶意软件分析,阅读论文发现动态分析的效果普遍比静态分析的效果要好一些,所以需要搭建一个动态分析的环境,查阅资料发现Cuckoo Sandbox是不错的自动化分析环境,但是搭建起来还是比…

机器学习相关建议

1、开发机器学习系统或者优化的路径 训练数据量的增加对机器学习系统的影响增加特征集或减少特征集改变正则化参数尝试增加多项式特征 | 高偏差、欠拟合 | 高方差、过拟合 | | ------------------ | ------------------------------ | | 尝试增加多项式特征 | 尝试减少特征的数…