Flink——Flink检查点(checkpoint)、保存点(savepoint)的区别与联系

news2024/9/28 19:22:50

Flink checkpoint

Checkpoint是Flink实现容错机制最核心的功能,能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

  1. Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记
  2. 当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录
  3. 每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐)
  4. 该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入
  5. 最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据

开启checkpoint

 

        //1.1 开启CK
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。

上面代码配置了执行Checkpointing的时间间隔为1分钟。

保存多个checkpoint
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20

如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。

从checkpoint 恢复
如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
  • 所有的Checkpoint文件都在以Job ID为名称的目录里面
  • 当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID
  • Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863

checkpoint的建议

  • Checkpoint 间隔不要太短
    • 过短的间对于底层分布式文件系统而言,会带来很大的压力。
    • Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的checkpoint,可能会影响整体的性能。
  • 合理设置超时时间

Flink savepoint

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中

Flink程序中包含两种状态数据:

用户定义的状态(User-defined State)是基于Flink的Transformation函数来创建或者修改得到的状态数据
系统状态(System State),是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录
Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。

设置Operator ID:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

创建Savepoint

创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定

需要配置Savepoint的默认路径,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,设置Savepoint存储目录

state.savepoints.dir: hdfs://namenode01.td.com/flink/flink-savepoints

手动执行savepoint命令的时候,指定Savepoint存储目录

bin/flink savepoint :jobId [:targetDirectory]

使用默认配置

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

为正在运行的Flink Job指定一个目录存储Savepoint数据

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

从Savepoint恢复

bin/flink run -s :savepointPath [:runArgs]

以上面保存的Savepoint为例,恢复Job运行

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

会启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd

Savepoint 目录结构

1bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串
_metadata文件包含了Savepoint的元数据信息
其他文件内容都是序列化的状态信息

总结

checkpoint和savepoint是Flink为我们提供的作业快照机制,它们都包含有作业状态的持久化副本。

用几句话总结一下。

  1. checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。

  2. savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。

  3. checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。

  4. checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。

  5. checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1025909.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言】进阶——字符串和内存函数

目录 一&#xff1a;非限制字符串函数 1.strlen &#x1f44a;模拟实现 方法1&#xff1a;计算器法 方法2.指针-指针 方法3.函数调用 2.strcpy &#x1f44a;模拟实现 3.strcat &#x1f44a;模拟实现 4.strcmp &#x1f44a;模拟实现 二&#xff1a;可限制字符串函…

Echarts 旭日图的详细配置过程

文章目录 旭日图 简介配置过程简易示例 旭日图 简介 Echarts旭日图是一种数据可视化图表类型&#xff0c;用于展示层次关系数据的分布情况。旭日图通过不同的环形区域和扇形区域来表示数据的层次和大小关系&#xff0c;从而形成一个太阳的形状&#xff0c;因此得名旭日图。 E…

WebGL 计算平行光、环境光下的漫反射光颜色

目录 光照原理 光源类型 平行光 点光源 环境光 反射类型 漫反射 漫反射光颜色 计算公式 环境反射 环境反射光颜色 表面的反射光颜色&#xff08;漫反射和环境反射同时存在时&#xff09;计算公式 平行光下的漫反射 根据光线和法线方向计算入射角θ&#xff08;以便…

Intellij IDEA 提效小技巧

快速找到Controller方法 如果你的项目里有非常多的controller&#xff0c;里面有非常多的http或者resful方法。如何快速找到这些方法呢&#xff1f;这个时候&#xff0c;ctrlaltshiftn就可以派上用场了。 比如说&#xff0c;你依稀记得入账单相关的接口&#xff0c;都有个bil…

让Pegasus天马座开发板用上OLED屏

继上篇《让Pegasus天马座开发板吃上STM8S标准库》移植完标准库之后&#xff0c;于是我又想为天马座开发板添加一块屏幕。终于在我的零件箱底下找到了沉入箱底多年的0.96OLED屏幕。 屏幕介绍 这个是128x64像素的屏幕模块&#xff0c;其使用的SSD1306的驱动IC。而目前该模组&…

软件测试为什么外包更好?权威软件测试外包公司应该具备的资质

软件测试外包公司是一家专门从事软件测试服务的企业&#xff0c;其主要任务是帮助公司或个人进行软件产品的测试工作。相比较于自行开设测试部门或雇佣全职测试人员&#xff0c;外包软件测试具有成本更低、灵活性更高的优势。同时&#xff0c;外包公司通常拥有丰富的测试经验和…

Java高级-Junit单元测试框架

单元测试框架 1.介绍2.案例、断言机制3.常见注解 1.介绍 单元测试 就是在针对最小的功能单元方法&#xff0c;编写测试代码对其正确性测试 Junit单元测试框架 可以对方法进行测试&#xff0c;是第三方公式开源出来的 优点 可以灵活的编写测试代码&#xff0c;可以针对某个…

Stable Diffusion 参数介绍及用法

大模型 CheckPoint 介绍 作用&#xff1a;定调了作图风格&#xff0c;可以理解为指挥者 安装路径&#xff1a;models/Stable-diffusion 推荐&#xff1a; AnythingV5Ink_v32Ink.safetensors cuteyukimixAdorable_midchapter2.safetensors manmaruMix_v10.safetensors counterf…

leetcode刷题笔记——位运算

C/C语言中逻辑右移和算数右移共享同一个运算符>> 如果运算数类型是unsigned则采用逻辑右移&#xff0c;而signed则采用算数右移。对于signed类型的数据&#xff0c;如果需要使用算数右移&#xff0c;或者unsigned类型的数据需要使用逻辑右移&#xff0c;都需要进行类型转…

JAVASE---认识异常

在Java中&#xff0c;将程序执行过程中发生的不正常行为称为异常。 1.算数异常 2.数组越界异常 3.空指针异常 java中不同类型的异常&#xff0c;都有与其对应的类来进行描述。 异常的体系结构 1. Throwable&#xff1a;是异常体系的顶层类&#xff0c;其派生出两个重要的子…

阿里云服务器租用费用价格表(2023新版报价)

租用阿里云服务器怎么收费&#xff1f;阿里云服务器配置不同一年价格也不同&#xff0c;阿里云2核2G3M带宽108元一年、2核4G4M带宽297.98元12个月&#xff0c;云服务器u1公网带宽可选1M到5M&#xff0c;系统盘为ESSD云盘40GB起&#xff0c;CPU内存配置可选2核2G、2核4G、4核8G、…

Python爬虫在电商数据获取与分析中的应用

前言 随着电商平台的兴起&#xff0c;越来越多的人开始在网上购物。而对于电商平台来说&#xff0c;商品信息、价格、评论等数据是非常重要的。因此&#xff0c;抓取电商平台的商品信息、价格、评论等数据成为了一项非常有价值的工作。本文将介绍如何使用Python编写爬虫程序&a…

BaseRecyclerView - 一个强大的RecyclerAdapter框架

官网 BRVAH 项目介绍 高效的使用RecyclerView应对项目中的常见需求的Adapter&#xff0c;RecycleView从未如此简单&#xff01; BRVAH官方使用指南 BRVAH官方使用指南&#xff08;持续更新&#xff09; - 简书

智慧交通:连接城市未来的纽带

在当今快节奏的现代生活中&#xff0c;交通问题一直是城市面临的重要挑战之一。拥堵、事故和空气污染等问题不仅影响着居民的日常生活&#xff0c;也对经济和环境产生了负面影响。为了解决这些问题&#xff0c;智慧交通作为一项重要的技术和社会创新出现在我们的视野中。 智慧交…

docker镜像相关

docker镜像相关 docker镜像相关理解解释unionFS&#xff08;联合文件系统&#xff09;镜像加载原理docker镜像要采用这种分层结构 重点理解docker镜像commit 操作实例案例演示总结 docker镜像相关理解 解释 镜像是一种轻量级&#xff0c;可执行的独立软件包&#xff0c;它包含…

软考高级之系统架构师之企业应用集成EAI

概述 在企业信息化建设的过程中&#xff0c;由于缺乏统一规划和总体布局&#xff0c;往往形成多个信息孤岛。信息孤岛使数据的一致性无法得到保证&#xff0c;信息无法共享和反馈&#xff0c;需要重复多次的采集和输入。信息孤岛是企业信息化一个重要的负面因素&#xff0c;其…

SpringBoot结合Vue.js+axios框架实现增删改查功能+网页端实时显示数据库数据(包括删除多条数据)

本文适用对象&#xff1a;已有基础的同学&#xff0c;知道基础的SpringBoot配置和Vue操作。 在此基础上本文实现基于SpringBoot和Vue.js基础上的增删改查和数据回显、刷新等。 一、实时显示数据库数据 实现步骤&#xff1a; 第1步&#xff1a;编写动态请求响应类&#xff1…

由河北吉力宝战略发展规划看中国品牌商业发展新方向

当今时代&#xff0c;一个经济体的发展和崛起背后&#xff0c;往往是一批民族品牌在提供强力的支撑。中国作为全世界最大的发展中国家&#xff0c;在经济建设中取得了举世瞩目的发展成就&#xff0c;各个行业涌现出一批优秀的国民品牌。 随着高质量发展成为各行各业的广泛共识…

docker alpine:3.16 root权限安装Anaconda3-2020.07-Linux-x86_64和jdk

首先查看系统版本: rootfv-az454-287:/tmp# uname -a Linux fv-az454-287 5.15.0-1046-azure #53~20.04.1-Ubuntu SMP Mon Aug 28 14:17:23 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux rootfv-az454-287:/tmp# grep NA /etc/os-release NAME"Ubuntu" PRETTY_NAME&q…

笔记本电脑没有麦克风,声音无法找到输入设备

新买的电脑没有扬声器&#xff0c;电脑声音没有输入设备&#xff0c;在开腾讯会议的时候才发现竟然有这个问题。 网上找原因&#xff0c;哎&#xff0c;找了一大堆每一个靠谱的 这让我想起来上次电脑没有热键的问题&#xff0c;所有问题的终极解决方案&#xff0c;都在源头那里…