文章目录
- 一 Flink中的状态管理
- 1 有状态的算子和应用程序
- (1)算子状态(operator state)
- (2)键控状态(keyed state)
- 2 状态后端
- 3 选择一个状态后端
- 二 Flink中的容错机制
- 1 一致性检查点
- (1)程序执行
- (2)从检查点恢复状态
- (3)重启应用
- (4)从检查点恢复状态
- (5)重新消费
一 Flink中的状态管理
流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过90度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。
- 所有类型的窗口。例如,计算过去一小时的平均温度,就是有状态的计算。
- 所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20度以上的温度读数,则发出警告,这是有状态的计算。
- 流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
下图展示了无状态流处理和有状态流处理的主要区别。无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。
上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。
尽管无状态的计算很重要,但是流处理对有状态的计算更感兴趣。事实上,正确地实现有状态的计算比实现无状态的计算难得多。旧的流处理系统并不支持有状态的计算,而新一代的流处理系统则将状态及其正确性视为重中之重。
Flink中的状态
- 由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态。
- 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问。
- Flink 会进行状态管理,包括状态一致性、故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑。
- 在 Flink 中,状态始终与特定算子相关联。
- 为了使运行时的 Flink 了解算子的状态,算子需要预先注册其状态。
1 有状态的算子和应用程序
Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。
在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
- 算子状态(operator state):可见范围为当前任务槽(一个并行任务)。
- 键控状态(keyed state):可将范围为当前key。
(1)算子状态(operator state)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
Flink为算子状态提供三种基本数据结构:
- 列表状态(List state):将状态表示为一组数据的列表。
- 联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
- 广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
(2)键控状态(keyed state)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
Flink的Keyed State支持以下数据类型:
- 值状态:ValueState[T]保存单个的值,值的类型为T。
- get操作: ValueState.value()
- set操作: ValueState.update(value: T)
- 列表状态:ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:
- ListState.add(value: T)
- ListState.addAll(values: java.util.List[T])
- ListState.get()返回Iterable[T]
- ListState.update(values: java.util.List[T])
- 字典状态:MapState[K, V]保存Key-Value对。
- MapState.get(key: K)
- MapState.put(key: K, value: V)
- MapState.contains(key: K)
- MapState.remove(key: K)
State.clear()是清空操作,减少内存、HDFS、状态后端的存储压力。
2 状态后端
- 每传入一条数据,有状态的算子任务都会读取和更新状态。
- 由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。
- 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)。
- 状态后端主要负责两件事:本地的状态管理(当前任务机器上面的JVM堆,其实就是内存),以及将检查点(checkpoint)状态写入远程存储(HDFS、RocksDB 之类的)。
3 选择一个状态后端
-
MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。
特点:快速、低延迟,但不稳定。
-
FsStateBackend
将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
同时拥有内存级的本地访问速度,和更好的容错保证。
-
RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB中存储。
RocksDB 是一个硬盘 KV 数据库,LevelDB,RocketDB。
自己设定一个检查点:
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 第二个参数设置为异步保存还是同步保存
env.setStateBackend(new FsStateBackend("file:\\E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\checkpoint",false));
// 间隔10s保存一次检查点,默认只保存最近一次的检查点
// 因为使用最近一次的检查点就可以恢复程序
env.enableCheckpointing(10 * 1000L);
env
.addSource(new Example1.ClickSource())
.print();
env.execute();
}
二 Flink中的容错机制
1 一致性检查点
Flink 故障恢复机制的核心,就是应用状态的一致性检查点。
有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据(其实就是检查点屏障)的时候。
(1)程序执行
例子:分别求1-5中奇偶数的和。
env
.fromElements(1,2,3,4,5)
.keyBy(r -> r % 2)
.sum(0)
.print();
下图的上游是一个kafka消息队列,偏移量的值就是元素的值。对应的一致性检查点如下图:
(2)从检查点恢复状态
如果消费完7,在向奇数相加的任务槽传递过程中,机器出现了问题,如下图。
在执行流应用程序期间,Flink 会定期保存状态的一致检查点,如果发生故障,Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程。
(3)重启应用
遇到故障之后,第一步就是重启应用,重启应用后,Flink会清空其内部的所有状态。
(4)从检查点恢复状态
第二步是从 checkpoint 中读取状态,根据状态变量中状态描述符的定义字符串的名字恢复,将状态重置。
从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同。
此时注意从上游已经消费完7,下一个数据消费8,但恢复完之后,又变为原始状态,所以这就需要上游的存储设备是一个可重置读取偏移量的持久化设备,可以回退读取位置。
(5)重新消费
第三步:开始消费并处理检查点到发生故障之间的所有数据。
这种检查点的保存和恢复机制可以为应用程序内部状态提供“精确一次”(exactly-once,每一个数据只会被计算一次,遇到回退现象,需要将加上的数字再减去)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置。