一、状态是什么
- 由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
- 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问
- Flink 会进行状态管理,包括状态一致性、故障处理以及高效存储和访问,以 便开发人员可以专注于应用程序的逻辑
二、三种状态
-
**KeyedState:**根据数据流中定义的 key 来维护和访问,有如下的数据结构:
ValueState<T>
: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过update(T)
进行更新,通过T value()
进行检索。ListState<T>
: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过add(T)
或者addAll(List<T>)
进行添加元素,通过Iterable<T> get()
获得整个列表。还可以通过update(List<T>)
覆盖当前的列表。ReducingState<T>
: 保存一个单值,表示添加到状态的所有值的聚合。接口与ListState
类似,但使用add(T)
增加元素,会使用提供的ReduceFunction
进行聚合。AggregatingState<IN, OUT>
: 保留一个单值,表示添加到状态的所有值的聚合。和ReducingState
相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与ListState
类似,但使用add(IN)
添加的元素会用指定的AggregateFunction
进行聚合。MapState<UK, UV>
: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用put(UK,UV)
或者putAll(Map<UK,UV>)
添加映射。 使用get(UK)
检索特定 key。 使用entries()
,keys()
和values()
分别检索映射、键和值的可迭代视图。你还可以通过isEmpty()
来判断是否包含任何键值对。
-
**OperateState:**算子状态的作用范围限定为算子任务,只有一种数据结构:ListState
在大部分的 Flink 程序中不需要 Operator State,它一般用于实现一个有状态的 source 或者 sink,并且数据无法按照某一个 key 分区的时候。
通过实现 CheckPointedFunction 接口来使用 operator state,需要实现下面两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception;
在进行 checkpoint 的时候会调用 snapshotState 方法。initializeState 包括了第一次自定义函数初始化和从之前的 checkpoint 恢复,因此,initializeState 中不仅要定义初始化的逻辑,还要定义状态恢复的逻辑。
-
**BroadcastState:**广播状态是一种特殊类型的 OperatorState,它用于当下流任务都需要同一份上流的 state 的情形。broadcastState 可以支持 MapState。
在使用 broadcast 的时候,需要用非广播流来 connect 广播流,如下:
DataStream<String> output = colorPartitionedStream .connect(ruleBroadcastStream) .process( // KeyedBroadcastProcessFunction 中的类型参数表示: // 1. key stream 中的 key 类型 // 2. 非广播流中的元素类型 // 3. 广播流中的元素类型 // 4. 结果的类型,在这里是 string new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() { // 模式匹配逻辑 } );
在 KeyedBroadcastProcessFunction 中,需要重写两个方法:
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> { public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception; public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception; }
使用 broadcast 需要注意:
- 没有跨 task 通讯:如上所述,这就是为什么只有在
(Keyed)-BroadcastProcessFunction
中处理广播流元素的方法里可以更改 broadcast state 的内容。 同时,用户需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。 - **broadcast state 在不同的 task 的事件顺序可能是不同的:**虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同。 所以 broadcast state 的更新不能依赖于流中元素到达的顺序。
- 所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。 这个设计是为了防止在作业恢复后读文件造成的文件热点。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态/改变并发的时候数据没有重复且没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(
p_new
-p_old
)会使用轮询调度算法读取之前 task 的 state。 - 不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State。
- 没有跨 task 通讯:如上所述,这就是为什么只有在
三、使用
-
KeyedState
import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.api.scala.{createTypeInformation, getCallLocationName} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector object KeyedStateTest extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment val source = env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L), (1L, 4L) )).keyBy(_._1) .flatMap(new CountWindowAverage()) .print() // the printed output will be (1,4), (1,5), (1,3) env.execute("ExampleKeyedState") } //创建一个类继承自富函数 class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] { //申明状态,在open方法中初始化 private var sum: ValueState[(Long, Long)] = _ override def flatMap(value: (Long, Long), out: Collector[(Long, Long)]): Unit = { val tmpCurrentSum = sum.value() val currentSum = if(tmpCurrentSum != null){ tmpCurrentSum } else { (0L, 0L) } val newSum = (currentSum._1 + 1, currentSum._2 + value._2) sum.update(newSum) if(newSum._1 >= 2) { out.collect((value._1, newSum._2 / newSum._1)) sum.clear() } } //初始化状态 override def open(parameters: Configuration): Unit = { sum = getRuntimeContext.getState( new ValueStateDescriptor[(Long, Long)]("average", classOf[(Long, Long)]) ) } }
-
OperateState
import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import scala.collection.mutable.ListBuffer object OperateStateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val source = env.socketTextStream("localhost", 9999) val stateStream = source.map( new MyMapState(2) ) stateStream.print env.execute() } } //创建OperateState要实现CheckpointedFunction接口,并且状态必须是序列化的 class MyMapState(threshold: Int = 1) extends RichMapFunction[String, String] with CheckpointedFunction { //申明状态,并申明为可序列化的 @transient private var checkpointedState: ListState[String] = _ private val bufferedElements = ListBuffer[String]() override def map(value: String): String = { bufferedElements += value if(bufferedElements.size == threshold) { val ret = new StringBuilder() for(element <- bufferedElements) { ret.append(element) } bufferedElements.clear() ret.toString() } else { "not reach threshold" } } //初始化状态 override def initializeState(context: FunctionInitializationContext): Unit = { val descriptor = new ListStateDescriptor[String]( "buffered-elements", TypeInformation.of(new TypeHint[String]() {}) ) checkpointedState = context.getOperatorStateStore.getListState(descriptor) //如果使用下面这种方式,创建的是KeyedState //chechkpointedState = getRuntimeContext.getListState(new ListStateDescriptor[String]("listState", classOf[String])) } override def snapshotState(context: FunctionSnapshotContext): Unit = { checkpointedState.clear() for (elem <- bufferedElements) { checkpointedState.add(elem) } } }