关于flink重新提交任务,重复消费kafka的坑

news2024/11/17 3:37:10

异常现象1

按照以下方式设置backend目录和checkpoint目录,fsbackend目录有数据,checkpoint目录没数据

env.getCheckpointConfig().setCheckpointStorage(PropUtils.getValueStr(Constant.ENV_FLINK_CHECKPOINT_PATH));
env.setStateBackend(new FsStateBackend(PropUtils.getValueStr(Constant.ENV_FLINK_STATEBACKEND_PATH)));

原因

我以为checkpoint和fsbackend要同时设置,其实,1.14.3版本,setCheckpointStorage和stateBackend改成了分着设置

我上边代码这样设置,相当于首先指定了以下checkpoint按照默认的backend存储,然后又指定了按照fsbackend存储,因此首先指定的checkpoint目录没有数据。

正解

env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(PropUtils.getValueStr(Constant.ENV_FLINK_CHECKPOINT_PATH));

State Backends | Apache Flink

异常现象2

开启checkpoint eos,开启容错,每次任务重新提交都会重新消费kafka已经完成了checkpoint的数据

原因

我以为只要开启这两个配置就可以保证已经checkpoint的kafka数据不会被重复消费,其实不然

checkpoint是flink内部的容错机制,他能保证在设置了失败重启策略之后(setRestartStrategy),如果发生异常导致失败重试之后自动从最新checkpoint恢复。不是手动重启。。。手动重启默认不会进行加载状态数据,所以每次都会从头消费

正解

flink任务 -s 指定恢复点提交,这个恢复点可以是checkpoint也可以时savepoint。

# 启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \
 /home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar

# 备份,创建savepoint
/home/cuadmin/flink-1.14.3/bin/flink savepoint 19f4bb5d103ea8695712d4d1a797893f /home/cuadmin/flink-1.14.3/savepoint

# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \
/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar

异常现象4

这是错误的

# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \

/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \

按照上述命令执行,这个地方显示恢复点的加载情况,这里没显示,代表恢复点没有执行成功

原因

-s的位置有问题,我之前以为没有顺序,把-s 放到了命令最后,结果没报错,也没识别。。

正解

-s 位置要正确

# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \
/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar

异常现象3

我记得savepoint和checkpoint是都可以用来flink -s 进行恢复点恢复的。但是每次都提示恢复失败,提示文件找不到,savepoint就可以。。。

原因

cancel job会将 checkpoint的数据删掉。。。

正解

测试的时候,直接stop-cluster,这样checkpoint数据就不会被删除了

保留 Checkpoint 

Checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup 配置项定义了当作业取消时,对作业 checkpoint 的操作:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。

总结

1、savepoint的数据要比checkpoint更加稳定,比如你可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。checkpoint就不可以,因为他有很多相对路径配置。

2、savepoint和checkpoint一般都能作为恢复点使用,例外情况是使用 RocksDB 状态后端的增量 Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 相比,更轻量级的 Checkpoint 机制的第一个实例。

3、任务因为偶然原因内部重启(task级别),通过失败重试机制+checkpoint自动进行重放,任务因重启、断电、死机等外部因素(cluster级别),通过-s 指定checkpoint/savepoint恢复点进行手动重放。这样就可以保证状态数据的稳定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1073517.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

漏洞复现--泛微E-Office前台文件读取漏洞

免责声明: 文章中涉及的漏洞均已修复,敏感信息均已做打码处理,文章仅做经验分享用途,切勿当真,未授权的攻击属于非法行为!文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直…

Matplotlib绘图基础详细教程

1、GPT引领前沿与应用突破之GPT4科研实践技术与AI绘图高级培训班 2、全流程R语言Meta分析核心技术 3、最新基于Citespace、vosviewer、R语言的文献计量学可视化分析技术及全流程文献可视化SCI论文高效写作方法 导入模块 import matplotlib as mpl import matplotlib.pyplot…

云架构师学习------云存储白皮书深入理解

云架构师学习------云存储白皮书深入理解 云架构师学习------云存储白皮书深入理解行业描述数字经济时代的到来1、基础设施的云化2、核心技术互联网化3、应用数据化和智能化 存储行业的变革1、存储服务网络的巨变2、云原生对云存储的新要求3、智能与存储的交相辉映 技术产品&am…

使用CountDownLatch解决接口循环网络请求造成的耗时问题

背景:由于查询接口调用第三方平台,需要通过http请求获得设备数据,但由于第三方接口获取数据的限制,只能通过某些接口获取机柜与机房、机柜与设备关系后再查询对应设备的信息,单线程执行效果慢造成了网络io耗时较长的问…

高并发 发送请求(asyncio)

在接手这个项目之前,关于数据存储的代码逻辑如上图,看起来按部就班,也很合理。(本人觉得这就像个玩具车) 在最后一步发送HTTP request响应足够快的话,其实速度说不上快但稳定,可以接受。但偏偏…

segment方案解决VXLAN分布式网关DCI间互联

segment概念: segment方案是在需要互联的两个DCI间建立3条VXLAN隧道实现两个DCI间的二层和三层间互通需求,常用于大型的DCI间互联,无需考虑两个DCI内的VXLAN参数规划的不同,其中二层互通可以采用映射VNI或局部VNI的方式进行解决&…

运营商大数据,三网融合大数据,联通大数据,移动大数据

有许多公司和企业依靠电话营销和短信营销。对于他们来说,客户资源就是维生素和维生素,客户资源的及时性和准确性是这些公司和企业最关心的问题。长期使用低质量、大量无效的客户资源,是对时间的浪费,是对人力物力财力的浪费&#…

基于Springboot实现房屋租赁租房平台系统项目【项目源码+论文说明】

基于Springboot实现房屋租赁租房平台系统演示 摘要 在网络高速发展的时代,众多的软件被开发出来,给用户带来了很大的选择余地,而且人们越来越追求更个性的需求。在这种时代背景下,房东只能以用户为导向,所以开发租房网…

基于springboot实现准妈妈孕期交流平台项目【项目源码+论文说明】分享

基于springboot实现准妈妈孕期交流平台演示 摘要 随着科学技术的飞速发展,社会的方方面面、各行各业都在努力与现代的先进技术接轨,通过科技手段来提高自身的优势,准妈妈孕期交流平台当然也不能排除在外。准妈妈孕期交流平台是以实际运用为开…

加密的重要性,MySQL加密有哪些好处?

加密是一种将信息转化为无法直接读取的格式的技术,从而保护信息安全。在当今数字化的世界中,数据已成为企业的重要资产,因此加密的重要性不言而喻。在这篇文章中,我们将探讨MySQL加密的好处以及如何选择合适的加密算法。 MySQL加密…

Windows系统安装

安装Windows系统有很多方法、而Windows系统也有不同的版本,下面主要介绍两种方法安装系统,第一种是使用微软官方提供的镜像文件安装Win10系统,第二种是使用微PE工具箱来安装Win10系统 准备工作:内存大于8G的空U盘,Win1…

软件测试「转行」答疑(未完更新中)

⭐ 专栏简介 软件测试行业「转行」答疑: 如果你对于互联网的职业了解一知半解!不知道行业的前景如何?对于众说纷纭的引流博主说法不知所措!不确定这个行业到底适不适合自己? 那么这一篇文章可以告诉你所有真实答案&a…

10-Node.js入门

01.什么是 Node.js 目标 什么是 Node.js,有什么用,为何能独立执行 JS 代码,演示安装和执行 JS 文件内代码 讲解 Node.js 是一个独立的 JavaScript 运行环境,能独立执行 JS 代码,因为这个特点,它可以用来…

使用docker搭建nacos单机、集群 + mysql

单机搭建 1 拉取mysql镜像 docker pull mysql:5.7.40 2 启动mysql容器 docker run -d --namemysql-server -p 3306:3306 -v mysql-data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD123456 mysql:5.7.40 3 执行nacos的数据库脚本 /* * Copyright 1999-2018 Alibaba Group Holding L…

树的基本概念及二叉树

目录 一、树的基本概念 (1)树的结点 (2)度 (3)结点层次 (4)树的高度 树的特点: 二、二叉树 (1)满二叉树 (2)完…

nodejs 16版本

Index of /download/release/latest-v16.x/

医院内网多台主机中毒流量分析案例

背景 最近医院的医生多次反馈网络出现慢和卡顿现象。医院十分重视这个问题,并将之反馈给网络部门同事进行处理。经过多次排查和分析,并没有发现网络中的异常情况。为了更好地解决这一问题,我们推荐安装NetInside流量分析系统。这个系统可以对…

c++视觉图像----扩充边界

图像扩充边界 #include <opencv2/opencv.hpp> #include <opencv2/highgui/highgui.hpp>int main() {// 读取图像cv::Mat image cv::imread("1.jpg", cv::IMREAD_COLOR);if (image.empty()) {std::cerr << "Could not open or find the imag…

【JVM--StringTable字符串常量池】

文章目录 1. String 的基本特性2. 字符串拼接操作3. intern()的使用4. StringTable 的垃圾回收 1. String 的基本特性 String 声明为 final 的&#xff0c;不可被继承String 实现了 Serializable 接口&#xff1a;表示字符串是支持序列化的。String 实现了 Comparable 接口&am…

求臻人故事 | 在求臻医学的沃土中,我像竹子般茁壮成长

在这个快节奏的社会中&#xff0c;我们时常忽略了身边的“小人物”&#xff0c;他们或许默默无闻&#xff0c;或许平凡无奇&#xff0c;但他们的经历、奋斗和成就&#xff0c;却能给我们带来深深的启示。让我们一起走进每个平凡的求臻人世界&#xff0c;聆听他们的“大故事”&a…