Flink SQL Checkpoint 学习总结

news2025/1/11 18:46:34

前言

学习总结Flink SQL Checkpoint的使用,主要目的是为了验证Flink SQL流式任务挂掉后,重启时还可以继续从上次的运行状态恢复。

验证方式

Flink SQL流式增量读取Hudi表然后sink MySQL表,任务启动后处于running状态,先查看sink表有数据,然后将对应的yarn kill掉,再通过设置的checkpoint重启任务,任务重启后验证sink表的数据量。Flink SQL流式增量读取Hudi表可以参考:Flink SQL增量查询Hudi表

版本

  • Flink 1.14.3
  • Hudi 0.13.0

Checkpoint 参数

一般需要设置的常用参数

-- checkpoint间隔时间,单位毫秒,没有默认值,如果想开启checkpoint,需要将该参数设置一个大于0的数值
-- 如果想提升sink性能,比如写hudi,需要将该值设置大一点,因为间隔时间决定了批次大小
-- checkpoint间隔时间不能设置太短也不能设置太长,太短影响写入性能,太长影响数据及时性。
set execution.checkpointing.interval=1000;
-- 保存checkpoint文件的目录
set state.checkpoints.dir=hdfs:///flink/checkpoints/hudi2mysql;
-- 任务取消后保留checkpoint,默认值NO_EXTERNALIZED_CHECKPOINTS,
-- 可选值NO_EXTERNALIZED_CHECKPOINTS、DELETE_ON_CANCELLATION、RETAIN_ON_CANCELLATION
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;

从checkpoint恢复

set execution.savepoint.path=hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314;

其他参数

-- checkpoint模式,默认值EXACTLY_ONCE,可选值:EXACTLY_ONCE、AT_LEAST_ONCE
-- 要想支持EXACTLY_ONCE,需要sink端支持事务
set execution.checkpointing.mode=EXACTLY_ONCE;
-- checkpoint超时时间,默认10分钟
set execution.checkpointing.timeout=600000;
-- checkpoint文件保留数,默认1
set state.checkpoints.num-retained=3;

Checkpoint 目录结构

/user-defined-checkpoint-dir
    /{job-id}
        |
        + --shared/
        + --taskowned/
        + --chk-1/
        + --chk-2/
        + --chk-3/
        ...   

验证

创建Hudi和MySQL物理表

Hudi表

CREATE TABLE hudi_source (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_source'
);

MySQL表

CREATE TABLE `sink_mysql` (
  `id` int(11) NOT NULL,
  `name` text,
  `price` double DEFAULT NULL,
  `ts` int(11) DEFAULT NULL,
  `dt` text,
  `insert_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

造数

insert into hudi_source values(1,'hudi1',11.1,1000,'20230301');
insert into hudi_source values(2,'hudi2',22.2,1000,'20230301');
......

流读Hudi写MySQL

hudi2mysql.sql

set yarn.application.name=hudi2mysql;
set execution.checkpointing.interval=1000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/hudi2mysql;
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;

CREATE TABLE hudi_source_incr (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_source',
  'read.streaming.enabled' = 'true', 
  'read.start-commit' = '202302', 
  'read.streaming.check-interval' = '4'
);

create table sink_mysql (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
) with (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
 'username' = 'root',
 'password' = 'password',
 'table-name' = 'sink_mysql'
);

insert into sink_mysql select * from hudi_source_incr;

执行上面的SQL

bin/sql-client.sh -f sql/hudi2mysql.sql

这样我们启动了一个常任务,在Flink界面上可以看到checkpoint的相关信息,如下图显示了checkpoint具体文件地址

可以用hdfs命令看一下checkpoint路径下有哪些文件

drwxr-xr-x   - hive hdfs          0 2023-03-01 14:47 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-589
drwxr-xr-x   - hive hdfs          0 2023-03-01 14:36 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/shared
drwxr-xr-x   - hive hdfs          0 2023-03-01 14:36 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/taskowned

其中255bdd01cee7486113feb1cbe8b45ee0为flink的jobid

将yarn任务kill

yarn app -kill application_1676855463066_0177

再看一下,发现checkpoint文件还在

hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314

重启任务验证checkpoint效果

需要先在hudi2mysql.sql,添加下面的配置

-- 从该checkpoint文件对应的状态恢复
set execution.savepoint.path=hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314;

重启flink sql任务

bin/sql-client.sh -f sql/hudi2mysql.sql

我们可以在新启动的yarn界面上看到,最新的恢复点,和我们设置的一样,这样代表我们设置恢复点生效

最后再造几条新的增量数据,在MySQL里看验证以下数据量是否一致

insert into hudi_source values(3,'hudi3',33.3,1000,'20230301');

MySQL数据量一致,且更新时间和插入时间一致,代表id=1、2的数据重启时没有重复消费,达到了预期效果。(也可以对MySQL表不设置主键,直接通过验证数据量验证效果)

这样我们通过一个简单的示例,了解了checkpoint的具体使用。大致过程
1、设置开启checkpoint和保存的路径,
2、任务运行时会根据设置的时间间隔不断生成新的ckp文件,
3、等任务挂掉后,重启任务时先设置execution.savepoint.path为我们最后一次保存的ckp文件
这样就达到了任务重启时继续从上次的运行状态恢复。

Checkpoint和Hudi

流任务写hudi时,必须设置checkpoint,不然不会生成commit,感觉像是卡住一样,具体表现为只生成.commit.requested.inflight,然后不写文件、不生成.commit也不报错,对于新手来说很费劲,很难找到解决方法。
大概原因是因为写文件、生成commit的动作是在coordinator里面,只有当checkpoint完成后才会调用coordinator,所以不设置checkpoint就不会生成commit,这里的逻辑是在Hudi源码里(具体没看),也就是说checkpoint和生成hudi commit是绑定一起的,这样才能保证流写Hudi的事务性,从而保证checkpoint的EXACTLY_ONCE。

StateBackend

在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend。

在学习Flink SQL Checkpoint时,发现网上的资料有下面的这个配置,本来以为这样设置后,就会将checkpoint文件保存到文件系统中,后来发现并不是这样。并且官网文档和源码描述的也不是很清楚,所以专门研究了一下这一块

set state.backend=filesystem;

从 Flink 1.13 版本开始,社区改进了 state backend 的公开类,进而帮助用户更好理解本地状态存储和 checkpoint 存储的区分。 这个变化并不会影响 state backend 和 checkpointing 过程的运行时实现和机制,仅仅是为了更好地传达设计意图。 用户可以将现有作业迁移到新的 API,同时不会损失原有 state。
旧版本的 MemoryStateBackend 等价于使用 HashMapStateBackend 和 JobManagerCheckpointStorage。

新版本的有两个参数state.backendstate.checkpoint-storage

state.backend可选参数:hashmap、roksdb,另外也支持filesystem(弃用)和jobmanager(弃用),官方文档并没有说明filesystem和jobmanager已经弃用

只设置state.backend:

state.backendCheckpoint StorageState Backend
默认JobManagerCheckpointStorageHashMapStateBackend
hashmapJobManagerCheckpointStorageHashMapStateBackend
filesystem(弃用)JobManagerCheckpointStorageHashMapStateBackend
roksdbJobManagerCheckpointStorageEmbeddedRocksDBStateBackend
jobmanager (弃用)MemoryStateBackend(弃用)MemoryStateBackend (弃用)

总结:对于State Backend,只有HashMapStateBackend和EmbeddedRocksDBStateBackend,另外还有一个弃用的MemoryStateBackend

state.checkpoint-storage可选参数:jobmanager、filesystem,当设置了state.checkpoints.dir,flink会自动使用filesystem对应的FileSystemCheckpointStorage

只设置state.checkpoint-storage:

state.checkpoint-storageCheckpoint StorageState Backend
默认JobManagerCheckpointStorageHashMapStateBackend
jobmanagerJobManagerCheckpointStorageHashMapStateBackend
filesystemFileSystemCheckpointStorageHashMapStateBackend
设置state.checkpoints.dirFileSystemCheckpointStorageHashMapStateBackend
总结:对于Checkpoint Storage只有JobManagerCheckpointStorage和FileSystemCheckpointStorage
另外,当设置state.checkpoint-storage=filesystem时,必须同时设置state.checkpoints.dir,否则会有异常:
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'

其实可以不设置state.checkpoint-storage,当设置了state.checkpoints.dir时Checkpoint Storage 自动使用FileSystemCheckpointStorage,不设置的话就使用默认的JobManagerCheckpointStorage

一开始对于默认的JobManagerCheckpointStorage、HashMapStateBackend不是很理解,不明白这样的checkpoint有啥用,因为是保存到内存中,不是保存到文件系统中,所以任务挂掉后就没办法恢复。
后来发现这种默认保存在内存中的checkpoint可以用于flink作业失败时自动恢复,而不是任务挂掉后手动恢复,另外默认情况下,程序取消时也不保存checkpoint

其他总结

  • 对于flink sql读取mysql,设置checkpoint恢复不生效(不是flink cdc)
  • checkpoint 一个时间间隔内只有一个批次,这样才能保证eos,时间间隔大小影响写入性能
  • 对于kafka2hudi的场景,checkpoint时间间隔如果比较小(1s),会因为时间不够导致第一个批次卡住,等超时(默认10分钟)后才会报错,所以需要间隔时间设置大一点,10s以上即可
  • 默认情况,只有全部任务running才会生成checkpoint,可以通过参数修改:execution.checkpointing.checkpoints-after-tasks-finish.enabled=true

pipeline.operator-chaining

set pipeline.operator-chaining=false;

将该参数设置为false,实现将多个算子拆分,利于观察每个任务的运行情况。对于上面说的kafka2hudi的场景,本来只是为了观察任务卡住的原因,但是发现设置了该参数后,任务不卡了
原因是虽然官方文档说的是将该参数设置为false后,会影响性能,但是我测试的kafka2hudi的场景反而提升了性能~,所以不卡了(不增加checkpoint时间间隔的情况)
下面是我测试的结果,总数据量:1000万,checkpoint间隔:10s

pipeline.operator-chaining第一个批次的数据量第二个批次的数据量第三个批次的数据量总用时
false7742701409025155219566s
true838610896459124514279s

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IDEA 断点总是进入class文件没有进入源文件解决

前言 idea 断点总是进入class文件没有进入源文件解决 问题 在源文件里打了断点,断点模式启动时却进入了class文件里的断点,而没有进入到java源文件里的断点。 比如:我在 A.java 里打了断点,调试时却进入到了 jar 包里的 A.clas…

基于ensp的小型局域网网络搭建及需求分析

一 需求分析本实验的目的在于建立小型局域网。由于公司由财政部、人事部、科技部三个部门组成,分布在同一个交换机下。设计以下网络:三个个部门使用两台交换机连接,然后连接到汇聚交换机,再通过路由器与外网以及其他部门网络相连。…

java TCP Socket 数据传输,服务端与客户端

java TCP Socket 数据传输,服务端与客户端 1. socket通信基本原理 socket 通信是基于TCP/IP 网络层上的一种传送方式,我们通常把TCP和UDP称为传输层。 如上图,在七个层级关系中,我们讲的socket属于传输层,其中UDP是…

单位冲激函数与单位阶跃函数

目录 单位阶跃函数 单位冲激函数 狄拉克定义: 冲激函数的性质 对时间的积分等于单位阶跃函数 筛分性质 尺度特性 冲击偶的定义:单位冲击函数的导数 各阶导数 离散的阶跃信号与冲激信号 (1)阶跃信号,其定义为…

vi编辑器操作指令分享

vi编辑器是所有Unix及Linux系统下标准的编辑器,它的强大不逊色于任何最新的文本编辑器,这里只是简单地介绍一下它的用法和一小部分指令。由于对Unix及Linux系统的任何版本,vi编辑器是完全相同的,因此您可以在其他任何介绍vi的地方…

国内vs国外:外贸建站该如何选择?

外贸建站找国内还是国外? 答案是:国内。 随着互联网的发展,越来越多的企业开始意识到在网络上进行商业活动的重要性。 其中,建立一个专业的外贸网站是企业在国际市场上拓展业务的关键。 然而,对于选择国内还是国外…

电脑出问题了怎么重装系统修好

电脑在使用过程中经常会出现各种各样的问题,如系统崩溃、蓝屏、病毒感染等。这些问题如果不能及时得到解决,将会给用户带来很多麻烦和损失。小白一键重装系统是一个功能强大的工具,可以帮助用户快速解决电脑常见问题。下面我们就来详细介绍如…

Vulnhub靶场----8、DC-8

文章目录一、环境搭建二、渗透流程三、思路总结一、环境搭建 DC-8下载地址:https://download.vulnhub.com/dc/DC-8.zip kali:192.168.144.148 DC-8:192.168.144.156 二、渗透流程 1、信息收集nmap -T5 -A -p- -sV -sT 192.168.144.156思路&am…

每日分享(四合一即时通讯聊天源码APP群聊、私聊、朋友圈)

demo软件园每日更新资源,请看到最后就能获取你想要的: 1.Python整洁编程 完整版PDF Python 与其他语言的不同之处在于,它是一种简单而有深度的语言。因为简单,所以谨慎编写代码要重要得多,尤其是在大项目中,因为代码很容易变得复…

ros2创建一个工程

第一步:创建src目录 $ mkdir ros2-demo $ cd ros2-demo/ $ mkdir src $ cd src/第二步:创建功能包cd src$ ros2 pkg create --build-type ament_cmake ros2_demo --dependencies rclcpp std_msgsros2 pkg create --build-type ament_python learning_pkg…

打地鼠游戏-第14届蓝桥杯STEMA测评Scratch真题精选

[导读]:超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成,后续会不定期解读蓝桥杯真题,这是Scratch蓝桥杯真题解析第102讲。 蓝桥杯选拔赛现已更名为STEMA,即STEM 能力测试,是蓝桥杯大赛组委会与美国普林斯顿多…

外贸建站多少钱?不同预算对应的建站方案!

外贸建站多少钱? 答案是:3000左右。 作为一个外贸企业的经营者,我们深知一个优质的外贸网站对于企业的重要性。 然而,建立一个优质的外贸网站需要耗费大量的时间和资金,因此我们需要在预算有限的情况下,…

代码随想录算法训练营第三天| 203. 移除链表、707. 设计链表、206.反转链表

977 有序数组的平方题目链接:203 移除链表元素介绍给你一个链表的头节点 head 和一个整数 val ,请你删除链表中所有满足 Node.val val 的节点,并返回 新的头节点 。思路一如果要被删除的元素是非头节点,只需要找到前一个节点&…

MySQL之幻读问题

MySQL之幻读问题 导读 在进入今天的主题之前必须先了解事务的四大特性ACID、MVCC、事务隔离级别(具体的自行查询),其中I(Isolation)隔离性所产生的问题涉及到的事务隔离分为不同级别,包括读未提交&#x…

python+pytest接口自动化(7)-cookie绕过登录(保持登录状态)

在编写接口自动化测试用例或其他脚本的过程中,经常会遇到需要绕过用户名/密码或验证码登录,去请求接口的情况,一是因为有时验证码会比较复杂,比如有些图形验证码,难以通过接口的方式去处理;再者&#xff0c…

Java多线程——线程安全、synchronized、volatile关键字以及多线程案例

文章目录前言一、线程安全—多线程不可避免的风险!1、线程不安全的示例2、线程不安全的原因二、synchronized关键字1.synchronized的特性1)互斥2)刷新内存3)可重入2、synchronized使用示例3、Java标准库中的线程安全类三、volatil…

【巨人的肩膀】JAVA面试总结(四)

💪、JVM 目录💪、JVM1、说一下JVM的主要组成部分及其作用2、什么是JVM内存结构(谈谈对运行时数据区的理解)3、堆和栈的区别是什么4、堆中存什么?栈中存什么?5、为什么不把基本类型放堆中呢?6、为…

理论上BI软件适配任何行业,但为什么有些行业做不了?

BI商业智能是一种通用的数据类技术解决方案。理论上来说,BI商业智能的核心是数据。只要企业有数据累积,就可以在BI软件上展开一系列的数据开发,获取决策所需的数据依据。但在现实中,却会发现有些BI软件对特定的行业束手无策&#…

【站外SEO】如何利用外部链接来提高你的网站排名

随着互联网的快速发展,越来越多的企业开始注重SEO优化,以提升自己的网站排名,增加流量和曝光度。 而站外SEO作为SEO的重要组成部分,对于提升网站排名具有不可忽视的作用。 站外SEO主要是通过外部链接来提高网站的排名。而GPB外链…

4EVERLAND 的 IPFS Pinning 服务:4EVER Pin

我们很高兴地宣布 4EVERLAND Storage 的一个令人兴奋的补充,即 4EVER Pin。什么是 4EVER Pin?您可能已经知道星际文件系统或IPFS是一个分布式存储网络,来自世界各地的计算机组成节点共享数据。通常,在IPFS中获取一条数据时&#x…