Flink系列之:Generating Watermarks生成水印
- 一、水印策略简介
- 二、使用水印策略
- 三、处理闲置资源
- 四、水印对齐
- 五、编写水印生成器
- 六、编写周期性水印生成器
- 七、编写标点水印生成器
- 八、水印策略和 Kafka 连接器
- 九、Operators如何处理水印
- 十、已弃用的AssignerWithPeriodicWatermarks 和AssignerWithPunctuatedWatermarks
了解 Flink 提供的用于处理事件时间时间戳和水印的 API。
一、水印策略简介
为了使用事件时间,Flink 需要知道事件时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用 TimestampAssigner 从元素中的某些字段访问/提取时间戳来完成的。
时间戳分配与生成水印密切相关,水印告诉系统事件时间的进展情况。可以通过指定 WatermarkGenerator 来配置它。
Flink API 需要一个包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。 WatermarkStrategy 上提供了许多现成的静态方法作为常用策略,但用户也可以在需要时构建自己的策略。
为了完整起见,以下是界面:
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T>{
/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
* strategy.
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
如前所述,通常不会自己实现此接口,而是使用 WatermarkStrategy 上的静态帮助器方法来实现常见的水印策略,或者将自定义 TimestampAssigner 与 WatermarkGenerator 捆绑在一起。例如,要使用有界无序水印和 lambda 函数作为时间戳分配器,您可以使用以下命令:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
指定 TimestampAssigner 是可选的,在大多数情况下您实际上不想指定它。例如,当使用 Kafka 或 Kinesis 时,您将直接从 Kafka/Kinesis 记录中获取时间戳。
注意:时间戳和水印均指定为自 Java 纪元 1970-01-01T00:00:00Z 以来的毫秒数。
二、使用水印策略
Flink 应用程序中有两个地方可以使用 WatermarkStrategy:1)直接在源上,2)在非源操作之后。
第一个选项是更可取的,因为它允许来源利用有关水印逻辑中的分片/分区/分割的知识。然后,源通常可以更精细地跟踪水印,并且源生成的整体水印将更加准确。直接在源上指定 WatermarkStrategy 通常意味着您必须使用源特定接口/请参阅 Watermark Strategies 和 Kafka Connector 以了解它在 Kafka Connector 上的工作原理以及有关每个分区水印如何在其中工作的更多详细信息。
仅当您无法直接在源上设置策略时,才应使用第二个选项(在任意操作后设置 WatermarkStrategy):
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
这段代码使用Apache Flink的DataStream API来执行一系列操作,将数据流从文件中读取并处理。
首先,使用StreamExecutionEnvironment.getExecutionEnvironment()
获取执行环境对象。StreamExecutionEnvironment
是Flink程序的入口点,用于配置和控制执行环境。
接下来,使用env.readFile()
方法从指定的文件路径中读取数据流。该方法接受多个参数,包括文件格式、文件路径、文件处理模式、文件处理间隔、文件路径过滤器和类型信息。这里的文件处理模式为PROCESS_CONTINUOUSLY
,表示持续处理文件中的数据。文件处理间隔为100毫秒,表示每隔100毫秒检查文件是否有新的数据可用。文件路径过滤器使用默认过滤器,可以自定义文件路径过滤逻辑。类型信息用于指定数据流中元素的类型。
然后,使用stream.filter()
方法过滤数据流中的元素,只保留severity
为WARNING
的事件。该方法接受一个lambda表达式作为参数,用于过滤数据流中的元素。
接着,使用assignTimestampsAndWatermarks()
方法为数据流中的元素分配事件时间戳和水印。该方法接受一个水印策略作为参数,用于生成水印。水印用于表示事件时间进展的估计,并用于触发时间窗口的计算。
然后,使用keyBy()
方法将数据流按照event.getGroup()
的值进行分组。该方法接受一个lambda表达式作为参数,用于指定分组的键。
接下来,使用window()
方法将分组后的数据流划分为时间窗口。这里使用了滚动的事件时间窗口,窗口大小为10秒。滚动窗口是一种固定大小的窗口,不会重叠。
然后,使用reduce()
方法对窗口中的元素进行聚合操作。该方法接受一个lambda表达式作为参数,用于指定如何将窗口中的元素聚合为一个结果。
最后,使用addSink()
方法将聚合结果写入指定的目标。该方法接受一个sink函数作为参数,用于指定如何处理聚合结果。在这里,...
表示需要根据实际需求替换为合适的sink函数。
综上所述,这段代码的作用是从文件中读取数据流,过滤出severity
为WARNING
的事件,为事件分配事件时间戳和水印,按照event.getGroup()
进行分组,对分组后的数据流进行滚动窗口计算,并将计算结果写入指定的目标。
以这种方式使用 WatermarkStrategy 获取流并生成带有时间戳元素和水印的新流。如果原始流已经具有时间戳和/或水印,则时间戳分配器将覆盖它们。
Python版本
env = StreamExecutionEnvironment.get_execution_environment()
# currently read_file is not supported in PyFlink
stream = env \
.read_text_file(my_file_path, charset) \
.map(lambda s: MyEvent.from_string(s))
with_timestamp_and_watermarks = stream \
.filter(lambda e: e.severity() == WARNING) \
.assign_timestamp_and_watermarks(<watermark strategy>)
with_timestamp_and_watermarks \
.key_by(lambda e: e.get_group()) \
.window(TumblingEventTimeWindows.of(Time.seconds(10))) \
.reduce(lambda a, b: a.add(b)) \
.add_sink(...)
三、处理闲置资源
如果输入分割/分区/分片之一在一段时间内不携带事件,这意味着 WatermarkGenerator 也不会获得任何作为水印基础的新信息。我们称之为空闲输入或空闲源。这是一个问题,因为某些分区可能仍然携带事件。在这种情况下,水印将被保留,因为它被计算为所有不同并行水印的最小值。
为了解决这个问题,您可以使用 WatermarkStrategy 来检测空闲状态并将输入标记为空闲。 WatermarkStrategy 为此提供了一个方便的助手:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
四、水印对齐
在上一段中,我们讨论了分割/分区/分片或源空闲并且可能阻止增加水印的情况。另一方面,拆分/分区/分片或源可以非常快地处理记录,从而比其他源相对更快地增加其水印。这本身并不是问题。然而,对于使用水印来发出一些数据的下游运营商来说,这实际上可能会成为一个问题。
在这种情况下,与空闲源相反,此类下游运算符(如聚合上的窗口连接)的水印可以继续进行。然而,这样的运算符可能需要缓冲来自快速输入的过量数据,因为来自其所有输入的最小水印被滞后输入阻止。因此,快速输入发出的所有记录都必须在所述下游算子状态中进行缓冲,这可能导致算子状态的不可控制的增长。
为了解决这个问题,您可以启用水印对齐,这将确保没有源/分割/分片/分区将其水印增加得远远超出其他部分。您可以单独为每个源启用对齐:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));
启用对齐时,您需要告诉 Flink,源应该属于哪个组。您可以通过提供一个标签(例如alignment-group-1)来将共享它的所有源绑定在一起。此外,您必须告诉属于该组的所有源的当前最小水印的最大漂移。第三个参数描述当前最大水印应该更新的频率。频繁更新的缺点是,TM 和 JM 之间会传输更多 RPC 消息。
为了实现对齐,Flink 将暂停源/任务的消费,这会生成太远的未来水印。与此同时,它将继续从其他源/任务读取记录,这可以将组合水印向前移动,从而解锁更快的水印。
五、编写水印生成器
TimestampAssigner 是一个从事件中提取字段的简单函数,因此我们不需要详细查看它们。另一方面,WatermarkGenerator 的编写稍微复杂一些,我们将在接下来的两节中介绍如何做到这一点。这是 WatermarkGenerator 接口:
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or
* periodically (in a fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine
* and remember the event timestamps, or to emit a watermark based on
* the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks
* are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
水印生成有两种不同的样式:周期式水印和间断式水印。
周期性生成器通常通过 onEvent() 观察传入事件,然后在框架调用 onPeriodicEmit() 时发出水印。
标点生成器将查看 onEvent() 中的事件并等待流中携带水印信息的特殊标记事件或标点符号。当它看到这些事件之一时,它会立即发出水印。通常,标点生成器不会从 onPeriodicEmit() 发出水印。
接下来我们将了解如何为每种样式实现生成器。
六、编写周期性水印生成器
周期性生成器定期观察流事件并生成水印(可能取决于流元素,或纯粹基于处理时间)。
生成水印的时间间隔(每 n 毫秒)是通过 ExecutionConfig.setAutoWatermarkInterval(…) 定义的。每次都会调用生成器的 onPeriodicEmit() 方法,如果返回的水印非空且大于前一个水印,则会发出新的水印。
这里我们展示了两个使用周期性水印生成的水印生成器的简单示例。请注意,Flink 附带了 BoundedOutOfOrdernessWatermarks,它是一个 WatermarkGenerator,其工作方式与下面所示的 BoundedOutOfOrdernessGenerator 类似。您可以在这里阅读有关使用它的信息。
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
/**
* This generator generates watermarks that are lagging behind processing time
* by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// don't need to do anything because we work on processing time
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
七、编写标点水印生成器
标点水印生成器将观察事件流,并在看到携带水印信息的特殊元素时发出水印。
这是实现标点生成器的方法,只要事件表明它带有特定标记,该生成器就会发出水印:
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// don't need to do anything because we emit in reaction to events above
}
}
八、水印策略和 Kafka 连接器
当使用 Apache Kafka 作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(升序时间戳或有界无序)。然而,当从 Kafka 消费流时,多个分区通常会并行消费,从而交错来自分区的事件并破坏每个分区的模式(这是 Kafka 消费者客户端工作方式所固有的)。
在这种情况下,您可以使用 Flink 的 Kafka 分区感知水印生成功能。使用该功能,可以在 Kafka 消费者内部按 Kafka 分区生成水印,并且按分区水印的合并方式与在流 shuffle 上合并水印的方式相同。
例如,如果每个 Kafka 分区的事件时间戳严格升序,则使用升序时间戳水印生成器生成每个分区水印将产生完美的整体水印。请注意,我们在示例中没有提供 TimestampAssigner,而是使用 Kafka 记录本身的时间戳。
下图展示了如何使用每个 Kafka 分区的水印生成,以及在这种情况下水印如何通过流数据流传播。
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("my-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
这段代码使用了 Flink 的 KafkaSource,用于从 Kafka 中读取数据。具体解释如下:
定义 KafkaSource 对象
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("my-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
这里使用了 KafkaSource 的 builder 方法,构建了一个 KafkaSource 对象。其中:
setBootstrapServers(brokers)
设置 Kafka 的地址,多个地址用逗号分隔。setTopics("my-topic")
设置要读取的 topic 名称。setGroupId("my-group")
设置消费者组的名称。setStartingOffsets(OffsetsInitializer.earliest())
设置起始偏移量,这里使用了最早的偏移量。setValueOnlyDeserializer(new SimpleStringSchema())
设置反序列化器,这里使用了简单的字符串反序列化器。
定义数据流对象
DataStream<String> stream = env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
这里使用了 Flink 的 fromSource
方法,将 KafkaSource 对象转换为 Flink 的数据流对象。其中:
kafkaSource
表示要读取的 KafkaSource 对象。WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
表示使用事件时间,并设置了一个 20 秒的最大乱序度,用于处理乱序数据。
"mySource"
表示数据流的名称。
最终,这段代码可以用于从 Kafka 中读取数据,并将其转换为 Flink 的数据流对象。
Python代码
kafka_source = KafkaSource.builder()
.set_bootstrap_servers(brokers)
.set_topics("my-topic")
.set_group_id("my-group")
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
stream = env.from_source(
source=kafka_source,
watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(20)),
source_name="kafka_source")
九、Operators如何处理水印
作为一般规则,Operators需要在将给定水印转发到下游之前完全处理该水印。例如,WindowOperator将首先评估所有应该被触发的窗口,并且只有在产生由水印触发的所有输出之后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素都将在水印之前被发射。
相同的规则适用于 TwoInputStreamOperator。然而,在这种情况下,运算符的当前水印被定义为其两个输入的最小值。
此行为的详细信息由 OneInputStreamOperator#processWatermark、TwoInputStreamOperator#processWatermark1 和 TwoInputStreamOperator#processWatermark2 方法的实现定义。
十、已弃用的AssignerWithPeriodicWatermarks 和AssignerWithPunctuatedWatermarks
在引入当前的 WatermarkStrategy、TimestampAssigner 和 WatermarkGenerator 抽象之前,Flink 使用了AssignerWithPeriodicWatermarks 和AssignerWithPunctuatedWatermarks。仍然会在 API 中看到它们,但建议使用新界面,因为它们提供了更清晰的关注点分离,并且还统一了水印生成的周期和标点样式。