Flink状态存储-StateBackend

news2024/9/24 3:18:27

文章目录

  • 前言
  • 一、MemoryStateBackend
  • 二、FSStateBackend
  • 三、RocksDBStateBackend
  • 四、StateBackend配置方式
  • 五、状态持久化
  • 六、状态重分布
          • OperatorState 重分布
          • KeyedState 重分布
  • 七、状态过期


前言

Flink是一个流处理框架,它需要对数据流进行状态管理以支持复杂的计算逻辑。在Flink中,状态存储是指如何和在哪里存储这些状态数据。Flink提供了多种状态后端(State Backend)来实现这种存储,以满足不同的应用场景和性能需求。 StateBackend需要具备如下两种能力:
1、在计算过程中提供访问 State 的能力,开发者在编写业务逻辑中能够使用 StateBackend 的接口读写数据。
2、能够将 State 持久化到外部存储,提供容错能力。
根据使用场景的不同, Flink 内置了 3 种 StateBackend 。其体系结构如下图所示。
在这里插入图片描述
纯内存:MemoryStateBackend,适用于验证、测试,不推荐生产环境。
内存+文件:FsStateBackend,适用于长周期大规模的数据。
RocksDB:RocksDBStateBackend,适用于长周期大规模的数据。

在运行时,MemoryStateBackend 和 FsStateBackend 本地的 State 都保存在 TaskManager 的内存中,所以其底层依赖于 HeapKeyedStateBackend。HeapKeyedStateBackend 面向 Flink 引擎内部,使用者无须感知。


一、MemoryStateBackend

默认情况下,状态信息是通过MemoryStateBackend 存储在 TaskManager 的堆内存中的, KV 类型的State,窗口算子的 State 使用 HashTable 来保存数据、触发器等。执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中。 MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。

基于内存的 StateBackend 在生产环境下不建议使用,可以在本地开发调试测试 。
注意点如下 :

  • State 存储在 JobManager 的内存中,受限于 JobManager 的内存大小。
  • 每个 State 默认 5MB,可通过 MemoryStateBackend 构造函数调整。
  • 每个 State 不能超过 Akka Frame 大小。

二、FSStateBackend

文件型状态存储 FSStateBackend,运行时所需的 State 数据全部保存在 TaskManager 的内存中, 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中,如使用 HDFS 的路径为 “hdfs://namenode:40010/flink/checkpoints”,使用本地文件系统的路径为:“file:///data/flink/checkpoints”。

FSStateBackend 适用于处理大状态、长窗口,或大键值状态的有状态处理任务。
缺点:
状态大小受TaskManager内存限制(默认支持5M)
优点:
状态访问速度很快
状态信息不会丢失
用于: 生产,也可存储状态数据量大的情况

三、RocksDBStateBackend

RocksDBStateBackend 跟内存型和文件型 StateBackend 不同,其使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于 TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中,在 JobManager 内存中会存储少量的检查点元数据。RocksDB 克服了 State 受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。 但是 RocksDBStateBackend 相比基于内存的 StateBackcnd ,访问 State 的成本高很多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10。

适用场景:
最适合用于处理大状态、长窗口,或大键值状态的有状态处理任务。
RocksDBStateBackend 非常适合用于高可用方案。
RocksDBStateBackend 是目前唯一支持增量检查点的后端,增量检查点非常适用于超大状态的场景。
注意点

  • 总 State 大小仅限于磁盘大小,不受内存限制。
  • RocksDBStateBackend 也需要配置外部文件系统,集中保存 State。
  • RocksDB的 JNI API 基于byte数组,单 key 和单 Value 的大小不能超过 231 字节。
  • 对于使用具有合并操作状态的应用程序,如 ListState ,随着时间可能会累积到超过 231 字节大小,这将会导致在接下来的查询中失败。

四、StateBackend配置方式

  • 单任务调整
修改当前任务代码
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStateBackend(new
	FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
	或者new MemoryStateBackend()
	或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
}
  • 全局调整(不建议)
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend),
filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

五、状态持久化

StateBackend 中的数据最终需要持久化到第三方存储中,确保集群故障或者作业故障能够恢复。 HeapSnapshotStrategy 策略对应于 HeapKeyedStateBackend,RocksDBStateBackend 的持久化策略有两种:全量持久化策略(RocksFullSnapshotStrategy)和 增量持久化策略 (RocksIncementalSnapshotStrategy)。

1、全量持久化策略
全盘持久化,也就是说每次把全量的 Slate 写人到状态存储中 (如 HDFS)。内存型、文件型、 RocksDB 类型的 StatcBackend 支持全量持久化策略。 在执行持久化策略的时候,使用异步机制,每个算子启动 1 个独立的线程,将自身的状态写入分布式存储中。在做持久化的过程中,状态可能会被持续修改,基于内存的状态后端使用 CopyOnWriteStateTable 来保证线程安全,RocksDBStateBackend 则使用 RocksDB 的快照机制,使用快照来保证线程安全。

2、增量持久化策略
增量持久化就是每次持久化增量的 State,只有 RocksDBStateBackend 支持增量持久化。Flink 增量式的检查点以 RocksDB 为基础, RocksDB 是一个基于 LSM-Tree 的 KV 存储。新的数据保存在内存中, 称为 memtable。如果 Key 相同,后到的数据将覆盖之前的数据,一旦 memtable 写满了,RocksDB 就会将数据压缩并写入磁盘。memtable 的数据持久化到磁盘后,就变成了不可变的 sstable。

因为 sstable 是不可变的,Flink 对比前一个检查点创建和删除的 RocksDB sstable 文件就可以计算出状态有哪些发生改变。

为了确保 sstable 是不可变的,Flink 会在 RocksDB 触发刷新操作,强制将 memtable 刷新到磁盘上 。在 Flink 执行检查点时,会将新的 sstable 持久化到 HDFS 中,同时保留引用。这个过程中 Flink 并不会持久化本地所有的 sstable,因为本地的一部分历史 sstable 在之前的检查点中已经持久化到存储中了,只需增加对 sstable 文件的引用次数就可以。 RocksDB 会在后台合并 sstable 并删除其中重复的数据。然后在 RocksDB 删除原来的 sstable,替换成新合成的 sstable.。新的 sstable 包含了被删除的 sstable中的信息,通过合并历史的 sstable 会合并成一个新的 sstable,并删除这些历史sstable。可以减少检查点的历史文件,避免大量小文件的产生。

六、状态重分布

在实际的生产环绕中,作业预先设置的并行度很多时候并不合理,太多则浪费资源,太少则资源不足,可能导致数据积压延迟变大或者处理时间太长,所以在运维过程中,需要根据作业的运行监控数据调整其并行度。调整并行度的关键是处理 State。回想一下前文中的内容,State 位于算子内,改变了并行度,则意味着算子个数改变了,需要将 State 重新分配给算子。下面从 OperatorState 和 KeyedState 两种 State 角度,介绍如何将 State 重新分配给算子。

OperatorState 重分布

1、ListState
并行度在改变的时候,会将并发上的每个 List 都取出,然后把这些 List 合并到一个新的 List,根据元素的个数均匀分配给新的 Task。

2、UnionListState
比 ListState 更加灵活, 把划分的方式交给用户去做,当改变并发的时候,会将原来的 List 拼接起来,然后不做划分,直接交给用户。

3、BroadcastState
操作 BroadcastState 的 UDF 需要保证不可变性,所以各个算子的同一个 BroadcastState 完全一样。在改变并发的时候,把这些数据分发到新的 Task 即可。

KeyedState 重分布

基于 Key-Group ,每个 Key 隶属于唯一的 Key-Group。Key Group 分配给 Task 实例,每个 Task 至少有 一个 Key-Group 。 Key-Group 数量取决于最大并行度 (MaxParallism) 。 KeyedStream 并发的上限是 Key-Group 的数量,等于最大并行度。

七、状态过期

1、DataStream 中状态过期
可以对 每一个 State 设置 清理策略 StateTtlConfig,可以设置的内容如下:
过期时间:超过多长时间未访问,视为 State 过期,类似于缓存。
过期时间更新策略:创建和写时更新、读取和写时更新。
State 可见性:未清理可用,超时则不可用。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

2、Flink SQL 中状态过期
Flink SQL 在流 Join、聚合类的场景中,使用了 State,如果 State 不定时清理。 则可能会导致 State 过多,内存溢出。 为了稳妥起见,最好为每个 FLink SQL 作业提供 State 清理的策略。如果定时清理 State,则存在可能因为 State 被清理而导致计算结果不完全准确的风险。FLink 的 Table API 和 SQL 接口中提供了参数设置选项,能够让使用者在精确和资源消耗做折中。

StreamQueryConfig qConfig = ... 
//设置过期时间为 min = 12 小时 ,max = 24 小时 
qConfig.withIdleStateRetentionTime(Time.hours(12)Time.hours(24));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1483686.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

猫头虎分享已解决Bug || AttributeError: ‘str‘ object has no attribute ‘decode‘

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

pycharm专业版本的安装

一 、到官网下载对应的pycharm安装包 也可以把安装软件&#xff08;用物理机下载到共享文件夹&#xff09; 然后进入Ubuntu系统把下载大的安装包剪贴到目标路径 1 在ubuntu中创建一个用来存放pycharm安装包的文件夹 rootzmq-virtual-machine:/home/zmq/Desktop# mkdir pycha…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的教室人员检测与计数(Python+PySide6界面+训练代码)

摘要&#xff1a;开发教室人员检测与计数系统对于优化教学资源和提升教学效率具有重要意义。本篇博客详细介绍了如何利用深度学习构建此系统&#xff0c;并提供了完整的实现代码。该系统基于强大的YOLOv8算法&#xff0c;并对比了YOLOv7、YOLOv6、YOLOv5的性能&#xff0c;展示…

初识Maven

介绍&#xff1a; web后端开发技术ApacheMaven是一个项目管理和构建工具&#xff0c;它基于项目对象模型&#xff08;POM&#xff09;的概念&#xff0c;通过一小段描述信息来管理项目的构建。安装&#xff1a;http://maven.apache.org/ Apache软件基金会&#xff0c;成立于19…

Leetcode42. 接雨水 -hot100

题目&#xff1a; 代码(首刷自解 2024年3月2日 有3个案例超时&#xff09;&#xff1a; 不算完全做出来&#xff0c;看了答案了&#xff0c;等以后二刷吧 class Solution { public:int helper(const vector<int>& height,const int high) {for(int i 0; i < hei…

对单元测试的思考(稳定性建设)

单测是很常见的技术的名词&#xff0c;但背后的逻辑和原理你是否清楚&#xff0c;让我们一起review一下。 1. 单测是什么&#xff1f;&#x1f914; 单测是单元测试,主要是测试一个最小逻辑块。比如一个函数、一个react、vue 组件。 2.为什么要写单测&#xff1f;&#x1f9…

扩展学习|大数据分析的现状和分类

文献来源&#xff1a;[1] Mohamed A , Najafabadi M K , Wah Y B ,et al.The state of the art and taxonomy of big data analytics: view from new big data framework[J].Artificial Intelligence Review: An International Science and Engineering Journal, 2020(2):53. 下…

windows系统下安装RabbitMQ

一、RabbitMQ安装软件资源准备 因为RabbitMQ是Erlang语言开发的&#xff0c;因此安装Erlang环境在进行安装RbbitMQ的操作&#xff0c;选择两者版本时一定要参考版本的兼容性 1.RabbitMQ国内下载地址&#xff0c;因官网下载比较缓慢&#xff0c;还是国内的稍微快些 https://r…

CTFHUB--文件包含漏洞--RCE

文件包含漏洞 文件包含漏洞也是一种注入型漏洞&#xff0c;其本质就是输入一段用户能够控制的脚本或者代码&#xff0c;并让服务端执行。有时候由于网站功能需求&#xff0c;会让前端用户选择要包含的文件&#xff0c;而开发人员又没有对要包含的文件进行安全考虑&#xff0c;…

Linux yum安装pgsql出现Bad GPG signature错误

官方文档&#xff1a;https://www.postgresql.org/download/linux/redhat/ sudo yum install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm sudo yum install -y postgresql12-server sudo /usr/pgsql-12/bin/…

Angular 由一个bug说起之四:jsonEditor使用不当造成的bug

一&#xff1a;问题 项目中使用了一个JSON第三方库&#xff1a; GitHub - josdejong/jsoneditor: A web-based tool to view, edit, format, and validate JSON 当用户编辑JSON格式的数据&#xff0c;查找替换时&#xff1a; 用户的期望结果是&#xff1a;$$ 被替换为$$_text&a…

使用 Azure 部署静态网页

Author&#xff1a;AXYZdong 硕士在读 工科男 有一点思考&#xff0c;有一点想法&#xff0c;有一点理性&#xff01; 定个小小目标&#xff0c;努力成为习惯&#xff01;在最美的年华遇见更好的自己&#xff01; CSDNAXYZdong&#xff0c;CSDN首发&#xff0c;AXYZdong原创 唯…

ROS开发基础-Linux基础第四部(开发板设置本地IP)

一 、网线连接设备 使用网线连接jetson NX与机械臂&#xff0c;如下图所示&#xff1a; 二、 修改上位机IPV4 IP ①测试是否可连接。网线连接机械臂之后&#xff0c;在桌面打开终端输入命令“ping 192.168.1.18”,如不可正常通信&#xff0c;可按照下述步骤进行设置。 ②在U…

React富文本编辑器开发(一)

这是一个系统的完整的教程&#xff0c;每一节文章的内容都很重要。这个教程学完后自己可以开发出一个相当完美的富文本编辑器了。下面就开始我们今天的内容&#xff1a; 安装 是的&#xff0c;我们的开发是基于Slate的开发基础&#xff0c;所以要安装它&#xff1a; yarn ad…

Go 互斥锁的实现原理?

Go sync包提供了两种锁类型&#xff1a;互斥锁sync.Mutex 和 读写互斥锁sync.RWMutex&#xff0c;都属于悲观锁。 概念 Mutex是互斥锁&#xff0c;当一个 goroutine 获得了锁后&#xff0c;其他 goroutine 不能获取锁&#xff08;只能存在一个写者或读者&#xff0c;不能同时…

Neural Network Diffusion论文解读

详细解读&#xff1a;扩散模型生成神经网络参数&#xff0c;速度快了44倍&#xff1a;Neural Network Diffusion论文解读 摘要&#xff1a; 扩散模型在图像和视频生成方面取得了显著的成功。在这项工作中&#xff0c;我们证明了扩散模型也可以生成高性能的神经网络参数。我们…

程序员的金三银四求职宝典!

目录 ​编辑 程序员的金三银四求职宝典 一、为什么金三银四是程序员求职的黄金时期&#xff1f; 二、如何准备金三银四求职&#xff1f; 1. 完善简历 2. 增强技术能力 3. 提前考虑目标公司 4. 提前准备面试 三、程序员求职的常见面试题 1. 数据结构和算法 2. 数据库 …

在原有pytorch环境下安装DGL库和其对应的CUDA【自用】

前段时间看到一篇AAAI2024的论文Patch-wise Graph Contrastive Learning for Image Translation&#xff0c;它采用GNN的思想来进行image-to-image translation的任务&#xff0c;非常的新颖&#xff0c;但我进行复现的时候&#xff0c;发现直接下载它里面需要的DGL库是无法运行…

旧的Spring Security OAuth已停止维护,全面拥抱新解决方案Spring SAS

Spring Authorization Server 替换 Shiro 指引 背景 Spring 团队正式宣布 Spring Security OAuth 停止维护&#xff0c;该项目将不会再进行任何的迭代 目前 Spring 生态中的 OAuth2 授权服务器是 Spring Authorization Server 已经可以正式生产使用作为 SpringBoot 3.0 的最新…

C#中什么是非托管代码?托管代码和非托管代码有什么区别

在C#中&#xff0c;托管代码和非托管代码是两种不同类型的代码&#xff0c;它们在内存管理和执行环境上有所不同。 托管代码&#xff08;Managed Code&#xff09;&#xff1a; 托管代码是由.NET运行时&#xff08;CLR&#xff0c;Common Language Runtime&#xff09;管理和执…