Flink系列之:Savepoints

news2024/11/24 13:59:09

Flink系列之:Savepoints

  • 一、Savepoints
  • 二、分配算子ID
  • 三、Savepoint 状态
  • 四、算子
  • 五、触发Savepoint
  • 六、Savepoint 格式
  • 七、触发 Savepoint
  • 八、使用 YARN 触发 Savepoint
  • 九、使用 Savepoint 停止作业
  • 十、从 Savepoint 恢复
  • 十一、跳过无法映射的状态恢复
  • 十二、Restore 模式
  • 十三、NO_CLAIM (默认的)
  • 十四、CLAIM
  • 十五、LEGACY
  • 十六、删除 Savepoint
  • 十七、配置

一、Savepoints

Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。

二、分配算子ID

强烈建议你按照本节所述调整你的程序,以便将来能够升级你的程序。主要通过 uid(String) 方法手动指定算子 ID 。这些 ID 将用于恢复每个算子的状态。

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

这段代码使用Apache Flink框架创建了一个名为stream的数据流(DataStream类型)。代码中的每一个操作都代表了流处理的一步。

首先,代码通过调用addSource方法添加一个名为StatefulSource的有状态源。这个源可以是任何有状态的数据源,比如Kafka。uid(“source-id”)方法用于给源操作符指定一个唯一的ID。

接下来,代码调用shuffle方法对流进行一次shuffle操作,该操作可用于重新分区数据。

然后,代码调用map方法,使用一个名为StatefulMapper的有状态映射函数对流进行映射操作。同样地,uid(“mapper-id”)方法用于给映射操作符指定一个唯一的ID。

最后,代码调用print方法通过无状态的打印sink将流的内容输出到控制台。这个操作会自动生成一个ID。

整个代码段构建了一个有状态的数据流处理图,其中包含了源操作符、映射操作符和一个打印sink。该数据流会根据源生成的数据进行一系列的转换和处理,并将结果打印输出。

如果不手动指定 ID ,则会自动生成 ID 。只要这些 ID 不变,就可以从 Savepoint 自动恢复。生成的 ID 取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些 ID 。

三、Savepoint 状态

可以将 Savepoint 想象为每个有状态的算子保存一个映射“算子 ID ->状态”:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。

四、算子

你可以使用命令行客户端来触发 Savepoint,触发 Savepoint 并取消作业,从 Savepoint 恢复,以及删除 Savepoint。

从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复。

五、触发Savepoint

当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录(参见:targetDirectory参数来控制该目录的位置。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统(或者对象存储系统)上的位置。

以 FsStateBackend 或 RocksDBStateBackend 为例:

# Savepoint 目标目录
/savepoint/

# Savepoint 目录
/savepoint/savepoint-:shortjobid-:savepointid/

# Savepoint 文件包含 Checkpoint元数据
/savepoint/savepoint-:shortjobid-:savepointid/_metadata

# Savepoint 状态
/savepoint/savepoint-:shortjobid-:savepointid/...

从 1.11.0 开始,你可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。

在如下两种情况中不支持 savepoint 目录的移动:1)如果启用了 entropy injection :这种情况下,savepoint 目录不包含所有的数据文件,因为注入的路径会分散在各个路径中。 由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。2)作业包含了 task-owned state(比如 GenericWriteAhreadLog sink)。

和 savepoint 不同,checkpoint 不支持任意移动文件,因为 checkpoint 可能包含一些文件的绝对路径。

如果你使用 MemoryStateBackend 的话,metadata 和 savepoint 的数据都会保存在 _metadata 文件中,因此不要因为没看到目录下没有数据文件而困惑。

注意: 不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对精确一次的接收器有副作用,为了确保精确一次的语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。

六、Savepoint 格式

你可以在 savepoint 的两种二进制格式之间进行选择:

  • 标准格式 - 一种在所有 state backends 间统一的格式,允许你使用一种状态后端创建 savepoint 后,使用另一种状态后端恢复这个 savepoint。这是最稳定的格式,旨在与之前的版本、模式、修改等保持最大兼容性。
  • 原生格式 - 标准格式的缺点是它的创建和恢复速度通常很慢。原生格式以特定于使用的状态后端的格式创建快照(例如 RocksDB 的 SST 文件)。

以原生格式创建 savepoint 的能力在 Flink 1.15 中引入,在那之前 savepoint 都是以标准格式创建的。

七、触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

这将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径。 你需要此路径来恢复和删除 Savepoint 。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。

$ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]

八、使用 YARN 触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

这将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。

九、使用 Savepoint 停止作业

$ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId

这将自动触发 ID 为 :jobid 的作业的 Savepoint,并停止该作业。此外,你可以指定一个目标文件系统目录来存储 Savepoint 。该目录需要能被 JobManager(s) 和 TaskManager(s) 访问。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。

十、从 Savepoint 恢复

$ bin/flink run -s :savepointPath [:runArgs]

这将提交作业并指定要从中恢复的 Savepoint 。 你可以给出 Savepoint 目录或 _metadata 文件的路径。

十一、跳过无法映射的状态恢复

默认情况下,resume 操作将尝试将 Savepoint 的所有状态映射回你要还原的程序。 如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态:

十二、Restore 模式

Restore 模式 决定了在 restore 之后谁拥有Savepoint 或者 externalized checkpoint的文件的所有权。在这种语境下 Savepoint 和 externalized checkpoint 的行为相似。 这里我们将它们都称为“快照”,除非另有明确说明。

如前所述,restore 模式决定了谁来接管我们从中恢复的快照文件的所有权。快照可被用户或者 Flink 自身拥有。如果快照归用户所有,Flink 不会删除其中的文件,而且 Flink 不能依赖该快照中文件的存在,因为它可能在 Flink 的控制之外被删除。

每种 restore 模式都有特定的用途。尽管如此,我们仍然认为默认的 NO_CLAIM 模式在大多数情况下是一个很好的折中方案,因为它在提供明确的所有权归属的同时只给恢复后第一个 checkpoint 带来较小的代价。

你可以通过如下方式指定 restore 模式:

$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]

十三、NO_CLAIM (默认的)

在 NO_CLAIM 模式下,Flink 不会接管快照的所有权。它会将快照的文件置于用户的控制之中,并且永远不会删除其中的任何文件。该模式下可以从同一个快照上启动多个作业。

为保证 Flink 不会依赖于该快照的任何文件,它会强制第一个(成功的) checkpoint 为全量 checkpoint 而不是增量的。这仅对state.backend: rocksdb 有影响,因为其他 backend 总是创建全量 checkpoint。

一旦第一个全量的 checkpoint 完成后,所有后续的 checkpoint 会照常创建。所以,一旦一个 checkpoint 成功制作,就可以删除原快照。在此之前不能删除原快照,因为没有任何完成的 checkpoint,Flink 会在故障时尝试从初始的快照恢复。

在这里插入图片描述

十四、CLAIM

另一个可选的模式是 CLAIM 模式。该模式下 Flink 将声称拥有快照的所有权,并且本质上将其作为 checkpoint 对待:控制其生命周期并且可能会在其永远不会被用于恢复的时候删除它。因此,手动删除快照和从同一个快照上启动两个作业都是不安全的。Flink 会保持配置数量的 checkpoint。

在这里插入图片描述
注意:

Retained checkpoints 被存储在 <checkpoint_dir>/<job_id>/chk- 这样的目录中。Flink 不会接管 <checkpoint_dir>/<job_id> 目录的所有权,而只会接管 chk- 的所有权。Flink 不会删除旧作业的目录。

Native 格式支持增量的 RocksDB savepoints。对于这些 savepoints,Flink 将所有 SST 存储在 savepoints 目录中。这意味着这些 savepoints 是自包含和目录可移动的。然而,在 CLAIM 模式下恢复时,后续的 checkpoints 可能会复用一些 SST 文件,这反过来会阻止在 savepoints 被清理时删除 savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件,但不会删除 savepoints 目录。因此,如果在 CLAIM 模式下恢复,Flink 可能会留下一个空的 savepoints 目录。

十五、LEGACY

Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 checkpoint。同时,用户也不清楚是否可以删除它。导致该的问题原因是, Flink 会在用来恢复的 checkpoint 之上创建增量的 checkpoint,因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint。总而言之,恢复的 checkpoint 的所有权没有明确的界定。

在这里插入图片描述

十六、删除 Savepoint

$ bin/flink savepoint -d :savepointPath

这将删除存储在 :savepointPath 中的 Savepoint。

请注意,还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint(请记住,每个 Savepoint 都是自包含的)。 在 Flink 1.2 之前,使用上面的 Savepoint 命令执行是一个更乏味的任务。

十七、配置

你可以通过 state.savepoints.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。 你可以通过使用触发器命令指定自定义目标目录来覆盖缺省值

# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1327847.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

阿里云大模型数据存储解决方案,为 AI 创新提供推动力

云布道师 随着国内首批大模型产品获批名单问世&#xff0c;百“模”大战悄然开启。在这场百“模”大战中&#xff0c;每一款大模型产品的诞生&#xff0c;都离不开数据的支撑。如何有效存储、管理和处理海量多模态数据集&#xff0c;并提升模型训练、推理的效率&#xff0c;保…

【湖仓一体尝试】MYSQL和HIVE数据联合查询

爬了两天大大小小的一堆坑&#xff0c;今天把一个简单的单机环境的流程走通了&#xff0c;记录一笔。 先来个完工环境照&#xff1a; mysqlhadoophiveflinkicebergtrino 得益于IBM OPENJ9的优化&#xff0c;完全启动后的内存占用&#xff1a; 1&#xff09;执行联合查询后的…

【Java探索之旅】我与Java的初相识(二):程序结构与运行关系和JDK,JRE,JVM的关系

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; Java入门到精通 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一. 第一个Java程序1.1 main方法1.2 Java的程序结构 二. Java程序的运行三. JDK、JR…

【零基础入门】凸优化1:怎么培养研究能力,从模型+优化开始!

凸优化1 优化问题的形式优化问题类别1&#xff1a;凸函数 和 非凸函数优化问题类别2&#xff1a;带条件 和 无条件优化问题类别3&#xff1a;离散 和 连续优化问题类别4&#xff1a;平滑 和 非平滑如何判断一个目标函数是凸函数&#xff0c;还是非凸函数&#xff1f;怎么设计模…

Exynos4412 移植Linux-6.1(九)移植tiny4412_backlight驱动的过程及问题解决

系列文章目录 Exynos4412 移植Linux-6.1&#xff08;一&#xff09;下载、配置、编译Linux-6.1 Exynos4412 移植Linux-6.1&#xff08;二&#xff09;SD卡驱动——解决无法挂载SD卡的根文件系统 Exynos4412 移植Linux-6.1&#xff08;三&#xff09;SD卡驱动——解决mmc0: Ti…

解决 elementPlus 组件内容显示为英文的问题

解决 elementPlus 组件内容显示为英文的问题 一、问题描述 刚开始用 ElementPlus 发现默认的组件内容都是英文的 二、解决办法 找了找&#xff0c;发现是国际化的问题&#xff0c;默认就是显示英文&#xff0c;如果要显示中文需要配置中文显示。 关于显示中文的官方说明&a…

Windows11系统下如何通过.cab文件更新PL2303串口驱动?

Windows11系统下如何通过.cab文件更新PL2303串口驱动? 首先,在微软官方网站上下载所需版本的.cab文件,具体链接如下: https://www.catalog.update.microsoft.com/Search.aspx?q=Prolific%20USB-to-Serial%20Comm%20Port 如下图所示,进入该网站后,找到自己所需的驱动版…

神经网络可视化新工具:TorchExplorer

TorchExplorer是一个交互式探索神经网络的可视化工具&#xff0c;他的主要功能如下&#xff1a; TorchExplorer是一款创新的人工智能工具&#xff0c;专为使用非常规神经网络架构的研究人员设计。可以在本地或者wandb中生成交互式Vega自定义图表&#xff0c;提供网络结构的模块…

掌握Apache Kylin:工作原理、设置指南及实际应用全解析

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

设计模式(4)--对象行为(1)--职责链

1. 意图 使多个对象都有机会处理请求&#xff0c;从而避免请求的发送者和接收者之间的耦合关系。 将这些对象连成一条链&#xff0c;并沿着这条链传递该请求&#xff0c;直到有一个对象处理它为止。 2. 两种角色 抽象处理者(Handler)、具体处理者(Concrete Handler) 3. 优点 …

直播怎么录制视频?轻松提升视频质量!

录制直播视频是保存和分享游戏过程、教程或其他在线活动的好方法。随着直播行业的兴起&#xff0c;许多用户都希望能够录制自己的直播内容以供日后观看或与他人分享。可是直播怎么录制视频呢&#xff1f;本文将详细介绍两种直播录制视频的方法&#xff0c;希望通过具体的步骤讲…

Redis-Day3实战篇-商户查询缓存(缓存的添加和更新, 缓存穿透/雪崩/击穿, 缓存工具封装)

Redis-Day3实战篇-商户查询缓存 什么是缓存添加Redis缓存业务流程项目实现练习 - 给店铺类型查询业务添加缓存 缓存更新策略最佳实践方案案例 - 给查询商铺的缓存添加超时剔除和主动更新 缓存穿透/雪崩/击穿缓存穿透概述项目实现 - 商铺查询缓存 缓存雪崩缓存击穿概述互斥锁逻辑…

百模大战中的AI行业:新趋势与未来发展

文章目录 每日一句正能量前言技术进步应用拓展行业变革人才竞争后记 每日一句正能量 人生最重要的价值是心灵的幸福&#xff0c;而不是任何身外之物。 前言 随着科技的迅猛发展&#xff0c;人工智能&#xff08;AI&#xff09;已经成为引领技术革命的重要驱动力之一。在当前的…

物业服务投诉反馈建议建议二维码

为高效处理物业方面的投诉问题&#xff0c;进一步提升居住品质。凡尔码平台推出“二维码”便民投诉、反馈方式&#xff0c;如有群租扰民、占用堵塞消防通道或私拉乱建等问题&#xff0c;可以立即扫码或进入“凡尔码”小程序进行投诉或反馈。 如电梯出现故障物业服务企业未及时维…

助力智能车辆检测计数,基于官方YOLOv8全系列[n/s/m/l/x]开发构建道路交通场景下不同参数量级车流检测计数系统

在很多道路交通卡口都有对车流量的统计计算需要&#xff0c;有时候一些特殊时段、特殊节日等时间下对于车流的监测预警更为重要&#xff0c;恶劣特殊天气下的提早监测、预警、限流对于保证乘客、驾驶员的安全是非常重要的措施&#xff0c;本文的主要目的就是想要开发构建道路交…

最后一公里物流:发展历程与未来趋势

导言 最后一公里物流&#xff0c;作为物流体系中的关键环节&#xff0c;一直是行业关注的焦点。本文将深入研究最后一公里物流的发展历程、遇到的问题及解决过程&#xff0c;探讨未来的可用范围、在各国的应用和未来的研究趋势&#xff0c;并分析在哪些方面能取胜、在哪些方面发…

HarmonyOS引导页登陆页以及tabbar的代码说明 底部的Tabs功能3

效果 代码说明 这一功能实现起来还是麻烦&#xff0c;需要自己实现&#xff0c;在uniapp中的pages.json底部加上就能实现&#xff0c;在这里需要自己写 引入三个内容页 Home,Car,Setting &#xff0c;说明界面模块也行。引入 private tabsController: TabsController new Tab…

逆波兰计算器的完整代码

前置知识&#xff1a; 将中缀表达式转为List方法&#xff1a; //将一个中缀表达式转成中缀表达式的List//即&#xff1a;(3042)*5-6 》[(, 30, , 42, ), *, 5, -, 6]public static List<String> toIndixExpressionList(String s) {//定义一个List&#xff0c;存放中缀表达…

[Unity]接入Firebase 并且关联支付埋点

首先 在这个下一下FireBase的资源 firebase11.0.6 然后导入Analytics Auth Crashlytics 其他的看着加就行 然后直接丢到Unity里面 接下来需要去Firebase里面下载 Google json 丢到 这个下面 然后就是脚本代码了 using System.Collections; using System.Collection…

html/css实现简易圣诞贺卡

一、前言 HTML&#xff0c;全称HyperText Markup Language&#xff0c;即超文本标记语言&#xff0c;是用于创建网页的标准标记语言。HTML是一种标记语言&#xff0c;由一系列的元素标签组成&#xff0c;用于描述网页的结构和内容。 CSS&#xff0c;全称是“层叠样式表”&#…