目录
一、Checkpoint 剖析
State 与 Checkpoint 概念区分
设置 Checkpoint 实战
执行代码所需的服务与遇到的问题
二、重启策略解读
重启策略意义
代码示例与效果展示
三、SavePoint
与 Checkpoint 异同
操作步骤详解
四、总结
在大数据流式处理领域,Apache Flink 凭借其卓越的性能和强大的功能占据重要地位。而理解 Flink 中的 Checkpoint(检查点)、重启策略以及 SavePoint(保存点)这些关键概念,对于保障流处理任务的稳定性、容错性以及可维护性至关重要。本文将深入剖析它们的原理、用法,并结合实际代码示例展示其效果,希望能帮助大家更好地掌握 Flink 相关知识。
一、Checkpoint 剖析
State 与 Checkpoint 概念区分
State(状态)
在 Flink 中,State 代表某一个 Operator(算子)在某一时刻的状态,像常见的聚合算子 maxBy
、sum
等操作过程中就会维护状态信息。比如在对数据流按某个字段做 sum
聚合时,它需要记住历史数据以便持续累加计算,并且这些状态数据默认存于内存之中,为算子的持续、准确运行提供依据。
Checkpoint(检查点 / 快照点)
它是 Flink 中所有有状态的 Operator 在某一个特定时刻的 State 快照信息汇总,也就是 State 的存档记录。可以简单理解为对整个作业运行时状态拍了一张 “照片”,定格所有相关算子彼时的状态,方便后续在故障恢复等场景使用。
设置 Checkpoint 实战
以下是一段设置 Checkpoint 的 Flink Java 代码示例:
package com.bigdata.day06;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class _01CheckPointDemo {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。
System.setProperty("HADOOP_USER_NAME", "root");
// 在这个基础之上,添加快照
// 第一句:开启快照,每隔1s保存一次快照
env.enableCheckpointing(1000);
// 第二句:设置快照保存的位置
env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2. source-加载数据
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] arr = s.split(",");
return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
}
});
//3. transformation-数据处理转换
SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);
result.print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
执行代码所需的服务与遇到的问题
启动本地的nc, 启动hdfs服务。
启动代码,发现有权限问题:
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x
解决方案:
System.setProperty("HADOOP_USER_NAME", "root");
在设置检查点之前,设置一句这样带权限的语句,如果是集群运行中,不存在该问题。可以不设置!!!
查看快照情况:
运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。
启动HDFS、Flink
start-dfs.sh
start-cluster.sh
数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:
第一次运行的时候
在本地先clean, 再package ,再Wagon一下:
flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar
flink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
记得,先启动nc ,再启动任务,否则报错!
通过nc -lk 9999 输入以下内容:
想查看运行结果,可以通过使用的slot数量判断一下:
取消flink job的运行
查看一下这次的单词统计到哪个数字了:
第二次运行的时候
flink run -c 全类名 -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34 /opt/app/flink-test-1.0-SNAPSHOT.jar
启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样
从上一次离开时,截止的checkpoint目录
观察数据:输入一个hello,1 得到新的结果hello,8
二、重启策略解读
重启策略意义
流式数据如同永不干涸的河流持续流淌,一旦因某条错误数据致使程序异常退出,后续海量数据丢失风险极高,对企业而言,这意味着数据资产受损、业务分析结果偏差等严重后果,重启策略应运而生。它作为独立策略,与 Checkpoint 虽无必然绑定关系(即便没配置 Checkpoint 也能单独配置重启策略),却在保障程序持续运行层面协同发挥关键作用。
一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:
进行wordcount时,输入了一个bug,1 人为触发异常。
注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。
程序中添加log4j.properties的代码:
# Global logging configuration
# Debug info warn error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
开启检查点之后,报错了程序还在运行是因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)。
//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());
//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(
RestartStrategies.failureRateRestart(3,
Time.of(2,TimeUnit.MINUTES),
Time.of(5,TimeUnit.SECONDS))
);
env.execute("checkpoint自动重启"); //最后一句execute可以设置jobName,显示在8081界面
程序如果上传至服务器端运行,可以看到重启状态
代码示例与效果展示
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class Demo02 {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的
// 通过如下方式可将重试机制关掉
// env.setRestartStrategy(RestartStrategies.noRestart());
//
// 两种办法
// 第一种办法:重试3次,每一次间隔10S
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
// 第二种写法:在2分钟内,重启3次,每次间隔10s
env.setRestartStrategy(
RestartStrategies.failureRateRestart(3,
Time.of(2,TimeUnit.MINUTES),
Time.of(5,TimeUnit.SECONDS))
);
//2. source-加载数据
DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);
streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] arr = value.split(",");
String word = arr[0];
if(word.equals("bug")){
throw new Exception("有异常,服务会挂掉.....");
}
// 将一个字符串变为int类型
int num = Integer.valueOf(arr[1]);
// 第二种将字符串变为数字的方法
System.out.println(Integer.parseInt(arr[1]));
Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);
// 还有什么方法? 第二种创建tuple的方法
Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);
return tuple2;
}
}).keyBy(tuple->tuple.f0).sum(1).print();
//3. transformation-数据处理转换
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
在此代码中人为在 map
函数里设置异常触发点(输入包含 “bug” 的数据时抛出异常)。若开启 Checkpoint,因它自带重试机制(默认无限重启),异常可能被掩盖,需关闭 Checkpoint 相关代码才能看到异常打印情况。同时,要完整看到重启策略效果(如按设定的次数、间隔重启),需打包代码上传至集群运行,本地测试难以呈现完整现象,且提交时务必确认使用的类名准确无误。
三、SavePoint
与 Checkpoint 异同
相同点
本质都是对 Flink 作业状态的一种保存方式,以便后续恢复作业时复用状态,保障数据处理连贯性。
不同点
Checkpoint 是 Flink 自动按设定规则周期性完成 State 快照保存,旨在应对故障自动恢复场景;而 SavePoint 是手动触发的快照操作,提供更灵活的作业状态管理时机,比如在版本升级、业务规则调整需暂停并后续重启作业场景发挥优势。
操作步骤详解
提交作业并输入数据
提交含重启策略代码打包成的 jar
包运行作业(类似 flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar
),输入数据观察单词对应数字变化。
执行 SavePoint 操作
以下是 --> 停止flink job,并且触发savepoint操作
flink stop --savepointPath hdfs://bigdata01:9820/flink-savepoint 152e493da9cdeb327f6cbbad5a7f8e41
后面的序号为Job 的ID
以下是 --> 不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint
备注:如何正确停止一个 flink 的任务
flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)
查看与重启作业
查看最近完成作业对应的 SavePoint,之后依据之前保存路径重启作业(flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar
),再次输入数据可看到基于之前状态的累加效果。
此外,在集群运行 Flink 程序时,默认并行度常为 1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml
web-ui 界面提交作业:
这个图形化界面,跟我们使用如下命令是一个效果:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
四、总结
通过对 Flink 中 Checkpoint、重启策略和 SavePoint 的详细解读与代码实践展示,我们明晰它们各自在保障流处理任务稳定、容错与灵活运维层面的独特价值。合理运用这些机制,能助我们打造更健壮、高效的 Flink 大数据处理应用,从容应对复杂多变的业务需求与运行环境挑战,后续大家可在实际项目中深入实践优化,挖掘其更大潜力。