大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1

news2024/11/13 9:55:26

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink 广播状态
  • 基本概念、代码案例、测试结果

在这里插入图片描述

状态存储

Flink的一个重要特性就是有状态计算(stateful processing),Flink提供了简单易用的API来存储和获取状态,但是我们还是要理解API背后的原理,才能更好的使用。

State存储方式

Flink为State提供了三种开箱即用的后端存储方式(state backend):

  • Memory State Backend
  • File System (FS)State Backend
  • RocksDB State Backend

MemoryStateBackend

MemoryStateBackend将工作状态数据保存在TaskManager的Java内存中。Key/Value状态和Window算子使用哈希表存储数值和触发器。进行快照时(CheckPointing),生成的快照数据将和 CheckPoint ACK消息一起发送给 JobManager,JobManager将收到的所有快照数据保存在Java内存中。
MemoryStateBackend现在被默认配置成异步的,这样避免阻塞主线程的pipline处理,MemoryStateBackend的状态存取的数据都非常快,但是不适合生产环境中使用。这是以为它有以下限制:

  • 每个state的默认大小被限制为5MB(这个值可以通过MemoryStateBackend构造方法设置)
  • 每个Task的所有State数据(一个Task可能包含一个Pipline中的多个的Operator)大小不能超过RPC系统的帧大小(akk.framesize 默认10MB)
  • JobManager收到的State数据总和不能超过JobManager内存

MemoryStateBackend适合的场景:

  • 本地开发和调试
  • 状态很小的作业

在这里插入图片描述

FsStateBackend

FsStateBackend 需要配置一个CheckPoint路径,例如:hdfs://xxxxxxx 或者 file:///xxxxx,我们一般都会配置HDFS的目录。
FsStateBackend将工作状态数据保存在TaskManager的Java内存中,进行快照时,再将快照数据写入上面的配置的路径,然后将写入的文件路径告知JobManager。JobManager中保存所有状态的元数据信息(在HA的模式下,元数据会写入CheckPoint目录)。
FsStateBackend 默认使用异步方式进行快照,防止阻塞主线程的Pipline处理,可以通过FsStateBackend构造函数取消该模式:

new FsStateBackend(path, false)

FsStateBackend 适合的场景:

  • 大状态、长窗口、大键值(键或者值很大)状态的作业
  • 适合高可用方案

在这里插入图片描述

RocksDBStateBackend

RocksDBStateBackend 也需要配置一个CheckPoint路径,例如:hdfs://xxx 或者 file:///xxx,一般是 HDFS 路径。
RocksDB是一种可嵌入的可持久型的 key-value 存储引擎,提供 ACID 支持。由Facebook基于LevelDB开发,使用LSM存储引擎,是内存和磁盘的混合存储。
RocksDBStateBackend将工作状态保存在TaskManager的RocksDB数据库中,CheckPoint时,RocksDB中的所有数据会被传输到配置的文件目录,少量元数据信息保存在JobManager内存中(HA模式下,会保存在CheckPoint目录)。
RocksDBStateBackend使用异步方式进行快照,RocksDBStateBackend的限制:

  • 由于RocksDB的JNI Bridge API是基于 byte[] 的,RocksDBStateBackend支持的每个Key或者每个Value的最大值不超过 2的31次方(2GB)
  • 要注意的是,有merge操作的状态(例如:ListState),可能会在运行过程中超过2的31次时,导致程序失败。
    RocksDBStateBackend适用于以下的场景:
  • 超大状态、超长窗口(天)、大键值状态的作业
  • 适合高可用模式

使用RocksDBStateBackend时,能够限制状态大小是TaskManager磁盘空间(相对于FsStateBackend状态大小限制与TaskManager内存)。这也导致RocksDBStateBackend的吞吐比其他两个要低一些,因为RocksDB的状态数据的读写都要经过反序列化/序列化。

RocksDBStateBackend时目前三者中唯一支持增量CheckPoint的。
在这里插入图片描述

三者吞吐量对比

在这里插入图片描述

KeyedState 和 Operator State

State分类

Operator State

(或 non-keyed state):
每个Operator State绑定一个并行的Operator实例,KafkaConnector是使用OperatorState的典型示例:每个并行的Kafka Consumer实例维护了每个Kafka Topic分区和该分区Offset的映射关系,并将这个映射关系保存为OperatorState。
在算子并行度改变时,OperatorState也会重新分配。

Keyed State

这种State只存在于KeyedStream上的函数和操作中,比如Keyed UDF(KeyedProcessFunction)Window State。可以把Keyed State想象成被分区的OperatorState。每个KeyedState在逻辑上可以看成与一个 <parallel-operator-instance, key> 绑定,由于一个key肯定只存在于一个Operator实例,所以我们可以简单的的认为一个 <operator, key>对应一个 KeyedState。

每个KeyedState在逻辑上还会被分配一个KeyGroup,分配方法如下:

MathUtils.murmurHash(key.hashCode()) % maxParallelism;

其中maxParallelism是Flink程序的最大并行度,这个值一般我们不会去手动设置,使用默认的值(128)就好,这里注意下,maxParallelism和我们运行程序时指定的算子并行度(parallelism)不同,parallelism不能大于maxParallelism,最多两者相等。

为什么会有 Key Group这个概念呢?
我们通常写程序,会给算子指定一个并行度,运行一段时间后,积累了一些State,这时候数据量大了,需要增加并行度。我们修改并行度后重新提交,那这些已经存在的State该如何分配到各个Operator呢?这就有了最大并行度和KeyGroup的概念。
上面的计算公式也说明了KeyGroup的个数最多是maxParallelism个。当并行度改变之后,我们在计算这个Key被分配到的Operator:

keyGroupId * paralleism / maxParallelism;

可以看到,一个KeyGroupId会对应一个Operator,当并行度更改时,新的Operator会去拉取对应的KeyGroup的KeyedState,这样就把KeyedState尽量均匀的分配给所有的Operator了。
根据State数据是否被Flink托管,Flink又将State分类为:

  • Managed State:被Flink托管,保存为内部的哈希表或者RocksDB,CheckPoint时,Flink将State进行序列化编码。例如:ValueState ListState
  • Row State:Operator 自行管理的数据结构, Checkpoint时,它们只能以byte数据写入CheckPoint。

建议使用 Managed State,当使用 Managed State时,Flink会帮助我们更改并行度时重新分配State,优化内存。

使用ManageKeyedState

如何创建?
上面提到,KeyedState只能在KeyedStream上使用,可以通过Stream.keyBy创建KeyedStream,我们可以创建以下几种:

  • ValueState
  • ListState
  • ReducingState
  • AggregatingState<IN,OUT>
  • MapState<UK,UV>
  • FoldingState<T,ACC>

每种State都对应各种的描述符,通过描述符RuntimeContext中获取对应的State,而RuntimeContext只有RichFunction才能获取,所以想要使用KeyedState,用户编写的类必须继承RichFunction或者其他子类。

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregationState<IN,OUT> getAggregatingState(AggregatingStateDescriptor<IN,ACC,OUT>)
  • FoldingState<T,ACC> getFoldingState(FoldingStateDescriptor<T,ACC>)
  • MapState<UK,UV> getMapState(MapStateDescriptor<UK,UV>)

给KeyedState设置过期时间
在Flink 1.6.0 以后,还可以给KeyedState设置 TTL(Time-To-Live),当某一个Key的State数据过期时,会被StateBackend尽力删除。
官方给出了示例:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1)) // 状态存活时间
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // TTL 何时被更新,这里配置的 state 创建和写入时
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();// 设置过期的 state 不被读取
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("textstate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

State的TTL何时被更新?
可以进行以下配置,默认只是key的state被modify(创建或者更新)的时候才更新TTL:

  • StateTtlConfig.UpdateType.OnCreateAndWrite:只在一个key的state创建和写入时更新TTL(默认)
  • StateTtlConfig.UpdateType.onReadAndWrite:读取state时仍然更新TTL

当State过期但是还未删除时,这个状态是否还可见?
可以进行以下配置,默认是不可见的:

  • StateTtlConfig.StateVisibility.NerverReturnExpired:不可见(默认)
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp:可见

注意:

  • 状态的最新访问时会和状态数据保存在一起,所以开启TTL特性增大State的大小,Heap State Backend会额外存储一个包括用户状态以及时间戳的Java8对象,RocksDB StateBackend会在每个状态值(list或者map的每个元素)序列化后增加8个字节。
  • 暂时只支持基于 Processing Time的TTL。
  • 尝试从CheckPoint/SavePoint进行恢复时,TTL的状态(是否开启)必须和之前保存一致,否则会遇到:StateMigrationException。
  • TTL的配置并不会保存在CheckPoint/SavePoint中,仅对当前的Job有效。
  • 当前开启TTL的MapState仅在用户序列化支持NULL的情况下,才支持用户值为NULL,如果用户值序列化器不支持NULL,可以用NullableSerializer包装一层。

使用ManageOperatorState

(这里以及后续放到下一篇:大数据-127 Flink)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2118225.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来&#xff0c;一站式有声阅读平台听书系统 &#x1f31f; 开篇&#xff1a;遇见未来&#xff0c;从“智听”开始 在这个快节奏的时代&#xff0c;你是否渴望在忙碌的间隙&#xff0c;找到一片属于自己的宁静角落&#xff1f;是否梦想着能随时随地&#xff0c;沉浸在知…

海外盲盒系统开发搭建,助力盲盒出海!

随着盲盒全球化发展&#xff0c;盲盒出口海外已经成为了大多数盲盒企业的选择。在互联网时代中&#xff0c;盲盒出口主要通过跨境电商和独立海外盲盒系统。而在目前市场发展趋势中&#xff0c;对于企业来说&#xff0c;一个独立的盲盒系统至关重要。海外盲盒APP可以帮助盲盒企业…

【ACM出版-高录用EI稳检索!九大高校联合举办】2024年人工智能、数字媒体技术与交互设计国际学术会议(ICADI2024)

​【ACM出版&#xff0c;EI稳定检索&#xff0c;九大高校联合举办, IEEE Fellow支持】 2024年人工智能、数字媒体技术与交互设计国际学术会议&#xff08;ICADI2024&#xff09; 2024 International Conference on artificial intelligence, digital media technology and …

期权开户攻略:期权开户主要的流程是什么?

今天期权懂带你了解期权开户攻略&#xff1a;期权开户主要的流程是什么&#xff1f;交易期权可以为投资者提供多种灵活性和机会&#xff0c;但同时也伴随风险。因此&#xff0c;了解期权的基本概念和策略&#xff0c;结合自身的风险承受能力&#xff0c;才能更好地进行投资决策…

解决el-date-picker切换类型type时错位

vue代码如下 样式效果如下 切换日期类型时&#xff0c;立马点日期会出现错位&#xff0c;检查发现是日期的展开面板并没有插入到body中导致的错位 解决方法 给el-date-picker加上唯一key值就可以

智慧园区,为园区多场景提供智能化赋能

智慧园区解决方案旨在通过现代信息技术手段&#xff0c;为园区空间多场景提供智能化赋能&#xff0c;以提升园区的运营效率、管理水平和服务质量。以下是对智慧园区解决方案的详细阐述&#xff1a; 一、智慧园区解决方案的核心目标 智慧园区解决方案的核心目标是实现园区的“…

ruoyi若伊项目(vue前后端分离)下载搭建

介绍 &#x1f4a1; RuoYi-Vue ​ RuoYi-Vue 是一款开源的后台管理系统&#xff0c;是一个 Java EE 企业级快速开发平台&#xff0c;基于经典技术组合&#xff08;Spring Boot、Spring Security、MyBatis、Jwt、Vue&#xff09;&#xff0c;内置模块如&#xff1a;部门管理、…

Gemma 2大模型的训练范式解析

咱们聊聊大型语言模型&#xff08;LLMs&#xff09;的训练范式吧&#xff0c;这可是个大话题。从最早的GPT模型到现在的复杂开放权重LLMs&#xff0c;这一路走来&#xff0c;变化可真不少。记得最开始&#xff0c;LLMs的训练就只关注预训练&#xff0c;但现在&#xff0c;这事儿…

C++设计模式——Strategy策略模式

一&#xff0c;策略模式简介 策略模式是一种行为型设计模式&#xff0c;策略模式在软件开发场景中定义了一系列的算法&#xff0c;并将每个算法单独封装在可替换的对象中&#xff0c;使应用程序在运行时可以根据具体的上下文来动态地选择和切换算法&#xff0c;同时保持原有的…

【运维监控】influxdb 2.0+grafana 监控java 虚拟机以及方法耗时情况(2)

关于java应用的监控本系列有文章如下&#xff1a; 【运维监控】influxdb 2.0telegraf 监控tomcat 8.5运行情况 【运维监控】influxdb 2.0grafana 监控java 虚拟机以及方法耗时情况 【运维监控】Prometheusgrafana监控tomcat运行情况 【运维监控】Prometheusgrafana监控spring b…

【即时通讯】轮询方式实现

技术栈 LayUI、jQuery实现前端效果。django4.2、django-ninja实现后端接口。 代码仓 - 后端 代码仓 - 前端 实现功能 首次访问页面并发送消息时需要设置昵称发送内容为空时要提示用户不能发送空消息前端定时获取消息&#xff0c;然后展示在页面上。 效果展示 首次发送需要…

【java入门】八大基本数据类型与变量的声明与使用,超详细讲解!

&#x1f680; 个人简介&#xff1a;某大型国企资深软件开发工程师&#xff0c;信息系统项目管理师、CSDN优质创作者、阿里云专家博主&#xff0c;华为云云享专家&#xff0c;分享前端后端相关技术与工作常见问题~ &#x1f49f; 作 者&#xff1a;码喽的自我修养&#x1f9…

【PyCharm使用教程】PyCharm的基本使用教程,适合完全零基础,小白快速上手!(Python+PyCharm安装包)

如果你正在学习Python&#xff0c;但是找不到方向的话可以试试我这一份学习方法和籽料呀&#xff01;点击 [领取籽料]&#xff08;不要米米&#xff09; Pycharm的基本使用教程 【一】PIP换源 ①问题描述 在使用Python时需要经常用到pip安装第三方包。在某些情况下由于网络速…

基于C++实现(控制台)学生成绩管理系统

学生成绩管理系统 一、系统需求分析 一个巨大的学校有数以万计的教工、学生和相应的资料需要管理。一个好的学生成绩管理系统可以协助管理员管理巨大的数据库&#xff0c;允许管理员、教师跟学生这三种用户登录进行相应的操作。 管理员具有管理数据库的一切权限。管理员负责…

兔英语语法体系——观后笔记

目录 一、视频链接 二、视频前言 三、简单句(Simple Sentences) 1. 可独立完成的动作 2. 有1个动作的承受者 3. 有两个动作承受者 4. 只有一个动作承受者(但需补充) 5. 非 “动作” 6. 总结 四、五大基本句型 五、句子成分 6. 定语 7. 状语 8. 同位语 9. 总结 …

[SWPUCTF 2022 新生赛]

目录 [SWPUCTF 2022 新生赛]ez_rce 什么是poc&#xff1f; [SWPUCTF 2022 新生赛]where_am_i [SWPUCTF 2022 新生赛]js_sign [SWPUCTF 2022 新生赛]xff ​[SWPUCTF 2022 新生赛]numgame call_user_func()函数 ::双冒号运算符 [SWPUCTF 2022 新生赛]ez_sql [SWPUCTF 2…

Anylogic比较运行实验

比较运行实验案例&#xff1a; 设置好参数后&#xff0c;点击左下角的开始&#xff0c;即可运算出结果 设置图例参数&#xff0c;在界面上图例显示为改变的变量值&#xff1a;

雕虫小技:解决VSCode中extern “C“的代码缩进问题

问题现象 创建一个标准的C语言头文件&#xff1a;foo.h #ifndef _FOO_H_ #define _FOO_H_#ifdef __cplusplus extern "C" { #endif/************************************************************************** * Include Files …

Security(lt2)

some basic terminology • plaintext - original message • ciphertext - coded message • cipher - algorithm for transforming plaintext to ciphertext • key - info used in cipher known only to sender/receiver • encipher (encrypt) - converting plaintext to …

从fasta文件中提取指定长度序列构建矩阵

要从 FASTA 文件中提取指定长度的序列并构建矩阵&#xff0c;你可以使用 BioPython 库&#xff0c;它可以方便地处理生物序列数据。你可以通过从 FASTA 文件中读取序列&#xff0c;然后将每个序列拆分成指定长度的子序列&#xff0c;最终构建矩阵。 以下是一个示例代码&#x…