Flink状态的理解

news2024/11/25 11:33:07

Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;

1. 状态

所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据,默认保存在 Java 的堆内存中。
在这里插入图片描述
根据算子是否存在按照Key进行分区,State可以划分为keyed state 和 Non-keyed state(Operator State、算子状态)

  • operator state是task级别的state,说白了就是每个task对应一个state, 在逻辑上,由算子task下所有subtask共享
    在这里插入图片描述
    Operator State的经常被用在Source或Sink算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。
  • keyed state 是基于KeyedStream上的状态,这个状态是跟特定的Key 绑定的。KeyedStream流上的每一个Key,都对应一个State
    在这里插入图片描述

2. 状态数据结构

状态数据由Flink内置状态机制管理。keyed state提供了5种数据结构

2.1 keyed state 数据结构

状态状态描述
ValueState保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的key
ListState保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索
MapState维护了一个映射列表
ReducingState保存一个单值,表示添加到状态的所有值的聚合
AggregateState保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与添加到状态的元素的类型不同

2.2 operator state

状态状态描述
ListStateListState的快照存储数据,系统重启后,list数据的重分配模式为: round-robin 轮询平均分配
UnionListStateUnionListState的快照存储数据,在系统重启后,list数据的重分配模式为: 广播模式; 在每个subtask上都拥有一份完整的数据;

3. 状态后端

默认情况下,state会保存在taskmanager的JVM堆内存,checkpoint会存储在JobManager的内存中。然而,状态数据的存储和checkpoint的存储位置可以改变,由state Backend(状态后端)配置实现

老版本(flink-1.12版及以前) Fsstatebackend MemoryStatebackend RocksdbStateBackend

flink1.12版本之后,可用的状态后端类型有两种
HashMapStateBackend、EmbeddedRocksDBStateBackend
而且其所生成的快照文件也统一了格式,因而在job重新部署或者版本升级时,可以任意替换statebackend

  • HashMapStateBackend
    ※ 状态数据是以java对象形式存储在heap内存中;
    ※ 内存空间不够时,也会溢出部分数据到本地磁盘文件;
    ※ 可以支撑大规模的状态数据;(只不过在状态数据规模超出内存空间时,读写效率就会明显降低)

  • EmbeddedRocksDBStateBackend

    ※ RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器, 为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。

    ※ 它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地, RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

    ※ 使用RocksDB + HDFS进行state存储:首先state先在taskManger的本地存储到RocksDB,然后异步写入到HDFS中,状态数量仅仅受限于本地磁盘容量限制
    在这里插入图片描述

4. 状态数据容错

Flink是一个stateful(带状态)的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;

一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的接续处理;因此,必须要一种机制能对系统内的各种状态进行持久化容错;Flink用checkpoint机制实现状态数据的容错

4.1 checkpoint

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

  • checkpoint默认关闭,需要手工开启。开启后,默认Exactly-once快照模式。还有一种快照模式为At-least-once

  • checkPoint的位置设置flink-conf.yaml#state.checkpoints.dir

  • Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略。配置参数flink-conf.yaml#restart-strategy

♦ 如果没有启用 checkpointing,则使用无重启 (no restart) 策略 ♦ 如果启用了 checkpointing,但没有配置重启策略,则使用固定 间隔 (fixed-delay) 策略,尝试重启次数默认值是:Integer.MAX_VALUE。
♦ 另一种重启策略为Failure rate,某时间段内失败了N次就重启

全局配置

# 每隔3s重启一次,重试间隔为10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
#5分钟内若失败了3次则认为该job失败,重试间隔为10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
# 不重启
restart-strategy: none

单个JOB内配置

Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//开启状态检查点机制(它将会定期对整个系统中各个task的状态进行快照持久化,以便失败重启后还能从失败之前的状态恢复)
 env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE);
// checkpoint机制触发后,持久化保存各task状态数据的存储位置(生产中用hdfs
env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/tmp/flink/state");
// 指定状态后端存储(内存)
env.setStateBackend(new HashMapStateBackend());
// 开启自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.milliseconds(2000)));
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)));
// env.setRestartStrategy(RestartStrategies.noRestart())
  • checkpoint个数默认保留最近成功生成的一个,支持保留多个,通过参数flink-conf.yaml#state.checkpoints.num-retained控制,如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现
flink run -t yarn-per-job  -yjm 1024 -ytm 1024 -s hdfs://node01:8020/tmp/flink/state/715b120fe8736a3af7842ea0a5264c46/chk-6/_metadata

4.2 savePoint

savePoint是检查点一种特殊实现,底层其实也是使用Checkpoint的机制。

savePoint是用户以手工命令的方式触发checkpoint,并将结果持久化到指定的存储目录中。

作业升级、代码修改、任务迁移和维护,都可以使用savePoint

  1. savePoint的存储位置
    savePoint的存储位置flink-conf.yaml#state.savepoints.dir,不是必须设置,但设置了后, 后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置。

  2. savePoint的手动触发:

#【针对on standAlone模式】
bin/flink savepoint jobId [targetDirectory]

#【针对on yarn模式需要指定-yid参数】
bin/flink savepoint jobId [targetDirectory] -yid yarnAppId

jobId 需要触发savepoint的jobId编号
targetDirectory 指定savepoint存储数据目录
-yid 指定yarnAppId
例如: flink savepoint 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001

  1. 查看HDFS上savepoint目录

#Savepoint directory /flink/savepoints/savepoint-:shortjobid-:savepointid/

#Savepoint file contains the checkpoint meta data /savepoints/savepoint-:shortjobid-:savepointid/_metadata

在这里插入图片描述
4. 触发savepoint并且停止作业

##语法: bin/flink stop jobId -yid yarnAppId

##例如: flink stop 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001

  1. 从指定的savepoint启动job

##语法: bin/flink run -s savepointPath [runArgs]

##例如: flink run -t yarn-per-job -yjm 1024 -ytm 1024
-s hdfs://node01:8020/flink/savepoints/savepoint-84e766-0591f3377ad0
-c com.loess.checkpoint.TestCheckPoint flink-study-1.0-SNAPSHOT.jar

  1. 清除savepoint数据

bin/flink savepoint -d savepointPath

##也可以手动删除某个savepoint,这通过常规的文件系统操作就可以做到,不影响其它的savepoints和checkpoints

  1. savePoint使用建议

为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,推荐通过 uid(String) 方法手动的给算子赋予 ID,这些
ID 将用于确定每一个算子的状态范围。

不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点
(savepoint)将程序恢复回来。而这些自动生成的 ID
依赖于程序的结构,并且对代码的更改敏感。当程序改变时,ID会随之变化,所以建议用户手动设置 ID

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

4.3 savePoint与checkPoint的区别

  • checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从checkpoint来恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
  • savepoint是通过checkpoint机制创建的,所以savepoint本质上是特殊的checkpoint。
  • checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
  • checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。
    在这里插入图片描述

5. checkpoint机制原理

checkPoint是所有 Operator / Task 的状态在某个时间点的一份拷贝(一份快照), 这个时间点应该是所有 Operator / Task 任务都恰好处理完一个相同的输入数据的时候。
在这里插入图片描述
若某个subTask挂了,则此时的状态都被清空,从checkpoint恢复最近一次的状态,重新启动应用程序,计算输入流

5.1 Barrier机制

Barrier是一种特殊事件,用来作为快照信号,由checkpoint 协调器向数据流中注入该信号,subtask任务收到该信号后,就会执行状态的快照。
在这里插入图片描述

  1. 首先是JobManager中的checkpoint Coordinator(协调器) 向任务中的所有source Task周期性发送barrier(栅栏)进行快照请求。
  2. source Task接受到barrier后, 会把当前自己的状态进行snapshot(可以保存在HDFS上)。
  3. source向checkpoint coordinator确认snapshot已经完成。
  4. source继续向下游transformation operator发送 barrier。
  5. transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成。
  6. coordinator确认完成本周期的snapshot已经完成。

5.2 Barrier对齐

对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边 称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier,而且 不同输入里Checkpoint Barrier的流入进度可能不同。因此Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment)
在这里插入图片描述
算子对齐分为四部:
(1). 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐

(2). 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。

(3). 第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。

(4). 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。
在这里插入图片描述

6.快照性能优化方案

每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。

Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。

两种优化方案
① Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。通过基于 Chandy-Lamport 算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用。
② Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/795694.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot集成Redis的环境部署以及操作Redis

文章目录 Spring Boot 集成Redis1.环境配置 redis连接配置信息不写默认wei6379&#xff0c;数据库为02.操作Redis2.1 代码形式操作Redis2.2 使用注解方式操作Redis Spring Boot 集成Redis 1.环境配置 添加redis依赖 在老项目添加&#xff0c;可以在pom.xml文件直接添加&#…

DAMO-YOLO 论文学习

1. 解决了什么问题&#xff1f; 工业界追求高性能、低延迟的目标检测算法&#xff0c;研究人员于是聚焦于单阶段目标检测&#xff0c;探索高效的网络结构和训练策略。YOLOv5/v6/v7、YOLOX 和 PP-YOLOE 在 COCO 数据集上实现了不错的精度-速度平衡&#xff0c;得到广泛应用&…

超标量处理器寄存器rename

1.相关性介绍 在CPU中&#xff0c;一段程序会被编译成一连串的汇编指令&#xff0c;指令与指令之间可能会具有相关性&#xff08;dependency&#xff09;。所谓相关性&#xff0c;即一条指令的执行会依赖于另一条指令的结果&#xff0c;相关性可以分为&#xff1a;① 数据相关性…

el-table树形表格实现复选框多选效果

2023.7.26今天我学习了如何使用树形表格的时候进行复选框的多选效果。 当我们使用树形结构表格需要进行多选功能操作的时候会发现点击全选的时候&#xff0c;只有一级表格数据会被选中&#xff0c;问题如图&#xff1a; 我们需要实现的是点击全选的不管是几级表格数据都可以被…

ElasticSearch之IK分词器安装以及使用介绍

文章目录 一、IK 分词器简介1. 支持细粒度分词&#xff1a;2. 支持多种分词模式&#xff1a;3. 支持自定义词典&#xff1a;4. 支持拼音分词&#xff1a;5. 易于集成和使用&#xff1a; 二、安装步骤1、下载 IK 分词器插件&#xff1a;2、安装 IK 分词器插件&#xff1a;3. 安装…

各种知名游戏的技术分析

介绍一个GitHub&#xff0c;里面包括了市面上的各种游戏的技术分析&#xff0c;包括渲染管线、工作流、技术文章等等&#xff0c;在做某个类型的游戏的时候&#xff0c;可以针对某个游戏去进行技术参考&#xff0c;特别实用。 GitHub - OTFCG/Awesome-Game-Analysis: a compre…

C++设计模式之模板方法、策略模式、观察者模式

面向对象设计模式是”好的面向对象设计“&#xff0c;所谓”好的面向对象设计“指的是可以满足”应对变化&#xff0c;提高复用“的设计。 现代软件设计的特征是”需求的频繁变化“。设计模式的要点是”寻求变化点&#xff0c;然后在变化点处应用设计模式&#xff0c;从而更好地…

力扣天天练--week3-LeetCode75

topic75-9-t443:压缩字符串 题目描述&#xff1a; 给你一个字符数组 chars &#xff0c;请使用下述算法压缩&#xff1a; 从一个空字符串 s 开始。对于 chars 中的每组 连续重复字符 &#xff1a; 如果这一组长度为 1 &#xff0c;则将字符追加到 s 中。 否则&#xff0c;需…

Spring Boot中整合MyBatis(基于xml方式基于注解实现方式)

一、前提准备 在Spring Boot中整合MyBatis时&#xff0c;你需要导入JDBC&#xff08;不需要手动添加&#xff09;和Druid的相关依赖。 JDBC依赖&#xff1a;在Spring Boot中整合MyBatis时&#xff0c;并不需要显式地添加JDBC的包依赖。这是因为&#xff0c;当你添加mybatis-sp…

会捷通云视讯 list 目录文件泄露漏洞

劳动永远是医治精神创伤的良药。 漏洞描述 会捷通云视讯某个文件 list参数 存在目录文件泄露漏洞&#xff0c;攻击者通过漏洞可以获取一些敏感信息 漏洞复现 构造payload访问漏洞url&#xff1a; /him/api/rest/V1.0/system/log/list?filePath../漏洞证明&#xff1a; 文…

Mendix 创客访谈录|综合业务展示大屏应用开发

本期创客 刘书智 西门子工业领域专家 我在西门子工厂自动化工程有限公司工作。一直从事SCADA产品的技术支持工作&#xff0c;已经过去17个年头了。赶上数字化发展的浪潮&#xff0c;不断学习各种IT技术&#xff0c;践行 IT与OT融合&#xff0c;希望借助自己的IT知识助力OT的发…

编程实战班--C语言和Python语言实现五子棋游戏的代码

文章目录 下面分别是C语言和Python语言实现五子棋游戏的代码&#xff1a;C语言实现Python语言实现总结 下面分别是C语言和Python语言实现五子棋游戏的代码&#xff1a; C语言实现 在使用C语言实现五子棋游戏时&#xff0c;可以使用SDL2图形库来实现图形界面和图形绘制等功能&…

华为华三思科 交换机基础配置一览

console密码修改 华为 user-interface console 0 authentication-mode password set authentication password cipher XXXXXXXXX华三 line aux 0 authentication-mode password set auth pass simple XXX思科 en configure terminal line console 0 password 123 login忘记…

打开英雄联盟提示d3dcompiler47.dll缺失怎么修复

1.d3dcompiler_47.dll缺失的原因 损坏的文件&#xff1a;d3dcompiler_47.dll文件可能由于某些原因损坏&#xff0c;如病毒感染、意外删除等。 不兼容的操作系统&#xff1a;某些应用程序要求特定版本的d3dcompiler_47.dll文件&#xff0c;如果操作系统不兼容&#xff0c;则可能…

前端实现导出excel表格(单行表头)

需求&#xff1a;实现勾选行导出为表格 一、安装插件 npm install --save file-saver xlsx运行项目报如下警告的话 运行npm install xlsx0.16.0 --save 来降低版本号&#xff08;最初我安装的版本号是0.18.16的版本&#xff09;再次运行项目就不会报如下警告了 二、新建一个ex…

语音分帧简述

目录 1. 分帧 1.1 非整齐分帧 1.2 整齐分帧 2. 示例代码 1. 分帧 问题1&#xff1a;总帧数如何计算&#xff1f; 记符号N为语音总长度&#xff0c;FRAME_LEN为帧长&#xff0c;OVERLAP_LEN为帧与帧之间的重叠部分&#xff0c;STEP_LEN为帧移(步长)。则总帧数N_Frames计算…

kotlin 编写一个简单的天气预报app(二)

增加界面显示openweathermap返回的信息。 在activity_main.xml里增加输入框来输入城市&#xff0c;在输入款旁边增加搜索按钮来进行查询。 然后原来显示helloworld的TextView用来显示结果。 1. 增加输入城市名字的EditText <EditTextandroid:id"id/editTextCity"…

AcrelEMS企业微电网能效管理平台实现用户侧智能配电和智能用电管理-安科瑞黄安南

摘要&#xff1a;随着科技的发展&#xff0c;电力系统正逐步向智能化、数字化、互联网化迈进。智能配电与智能用电是电力产业发展的重要方向&#xff0c;将为传统电力系统带来革命性的变革。本文将对智能配电和智能用电的概念、特点、关键技术及应用进行详细介绍。 1、智能配电…

Rust vs Go:常用语法对比(八)

题目来自 Golang vs. Rust: Which Programming Language To Choose in 2023?[1] 141. Iterate in sequence over two lists Iterate in sequence over the elements of the list items1 then items2. For each iteration print the element. 依次迭代两个列表 依次迭代列表项1…

【linux基础(一)】Linux基本指令(上)

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:Linux从入门到开通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学更多操作系统知识   &#x1f51d;&#x1f51d; 这里写目录标题 1. 前言1. 创…