状态数据结构升级
a)概述
Flink 流应用通常被设计为永远或者长时间运行,与所有长期运行的服务一样,应用程序需要随着业务的迭代而进行调整,应用所处理的数据 schema 也会随着进行变化。
升级状态类型的数据 schema ,对不同类型的状态结构(ValueState
、ListState
等)有不同的限制;只适用于 Flink 自己生成的状态序列化器;即在声明状态时,状态描述符不可以配置为使用特定的 TypeSerializer 或 TypeInformation , 此时 Flink 会推断状态类型的信息:
ListStateDescriptor<MyPojoType> descriptor =
new ListStateDescriptor<>(
"state-name",
MyPojoType.class);
checkpointedState = getRuntimeContext().getListState(descriptor);
在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器,状态数据结构只有在其序列化器正确支持时才能升级;这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的。
b)升级状态数据结构
对状态类型升级,步骤如下:
- 对 Flink 流作业进行 savepoint 操作。
- 升级程序中的状态类型(例如:修改 Avro 的结构)。
- 从 savepoint 恢复作业。当第一次访问状态数据时,Flink 会判断状态数据 schema 是否已经改变,并进行必要的迁移。
适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。
Flink 内部首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有, 那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。
c)数据结构升级支持的数据类型
目前,仅支持 POJO 和 Avro 类型的 schema 升级。
POJO 类型
Flink 基于下面的规则来支持 POJO 类型结构的升级:
- 可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
- 可以添加字段。新字段会使用类型对应的默认值进行初始化。
- 不可以修改字段的声明类型。
- 不可以改变 POJO 类型的类名,包括类的命名空间。
注意:只有从 1.8.0 及以上版本的 Flink 生产的 savepoint 进行恢复时,POJO 类型的状态才可以进行升级;对 1.8.0 版本之前的 Flink 是没有办法进行 POJO 类型升级的。
Avro 类型
Flink 支持 Avro 状态类型的升级,只要数据结构的修改是被 Avro 的数据结构解析规则认为兼容的即可。
除非新的 Avro 数据 schema 生成的类无法被重定位或者使用了不同的命名空间,在作业恢复时状态数据会被认为是不兼容的。
d)Schema 迁移限制
Flink 的 Schema 迁移有一些限制,这些限制是确保正确性所必需的;对于需要绕过这些限制并理解它们在特定用例中是安全的用户,可以考虑使用自定义序列化程序或状态处理器 API。
不支持 key 的 schema 演变
无法迁移 key 的 schema,因为这可能导致不确定性行为;例如,如果一个POJO 被用作 key,并且一个字段被丢弃,那么可能会突然出现多个现在相同的单独键,Flink无法合并相应的值。
此外,RocksDB 状态后端依赖于二进制对象标识,而不是 hashCode 方法,对 key 的对象结构的任何更改都可能导致不确定性行为。
Kryo 不能用于 schema 演变
当使用 Kryo 时,框架不能验证是否进行了不兼容的更改;如果包含给定类型的数据结构通过 Kryo 进行序列化,那么所包含的类型就不能进行 schema 进化。
例如,如果一个 POJO 包含一个 List<SometherPojo>,那么该 List 及其内容是通过 Kryo 序列化的,SometherPojo 不支持模式演化。