【Flink】Flink 中的时间和窗口之水位线(Watermark)

news2024/9/25 17:20:32

1. 时间语义

这里先介绍一下什么是时间语义时间语义在Flink中是一种很重要的概念,下面介绍的水位线就是基于时间语义来讲的。
在这里插入图片描述

在Flink中我们提到的时间语义一般指的是事件时间处理时间

  • 处理时间(Processing Time),一般指执行处理操作的系统时间,也就是Flink的窗口算子对该数据的操作时间。
  • 事件时间(Event Time), 一般指每个事件在对应设备上发生的时间,也就是数据的生成的时间。

Flink中之所以会出现这两种时间语义,是因为Flink的分布式系统会有网络传输延迟以及时钟飘逸,处理时间相对于事件时间会有所滞后,并且数据在网络以及Flink中的传输是是乱序的。Flink的1.12版本之前默认使用的是处理时间,之后已经将事件时间作为默认的时间语义。

对于 先产生的数据先被处理,这就要求需要保证数据的到达顺序,但是因为网络传输延迟不确定性,是无法保证数据的到达顺序,在这种情况下就不能简单使用数据自带的时间戳当做时钟,而是需要另外的标志来表达事件时间的进展,在Flink中这就是事件时间水位线

2. 水位线

Flink中的水位线(Watermark)是一种用来表示事件时间进展的机制,它可以帮助Flink系统及时处理延迟数据,并保证处理结果的正确性。具体来说,水位线是一个时间戳,表示对于所有时间戳小于该水位线的事件已经全部到达,可以进行相应的计算处理。

2.1 事件时间和窗口

在实际应用中,一般都会采用事件时间语义,而水位线就是基于事件时间提出的概念。和水位线相关的还有一个窗口概念,事件时间和窗口也是有很大的关系。

这里举一个例子,我们有一个班车系统,班车上装的都是带有生产时间的商品,如果有一辆车的开车时间是8点,我们需要这辆车到九点就开始发车。商品一个个到达车上,车上的商品时间从8点开始依次增长。当到达车上的商品生产时间是九点的时候,这时候车门关闭开始发车。
在这里插入图片描述
在这个过程中,我们说的班车就相当于窗口,定义了一个8点到九点的窗口。并且我们会定义一个逻辑时钟,这个逻辑时钟是基于事件时间来的,不会自动流逝。时钟的进展就是靠着新的数据上的事件时间时间戳来推动的。这样就不需要依赖系统的时间,无论什么时候进行统计处理,得到的结果都是正确的。

2.2 什么是水位线

在事件时间语义下,我们可以不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。这样每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。

但是在分布式系统中,会存在一些问题,因为数据本身在处理转换过程中会发生变化,如果遇到窗口聚合的操作,呢么下游的数据就会变少,时间进度的控制就不精细了。另外,数据向下游任务传递时,一般只能传输给一个子任务(除广播外),这样其他的并行子任务的时钟就无法推进了。

所以时钟也需要以数据的形式传递出去,告诉下游任务当前时间的进展;而且时钟传递过程也不会因为运算而停滞。解决这个问题的想法就是在数据流种加入一个时间标记,记录当前的事件时间并且广播到下游,当下游收到这个标记就可以更新自己的时钟了。这种用来衡量事件时间进展的标记在Flink中就被称作水位线。
在这里插入图片描述
如图 6-5 所示,每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。这里没有指定单位,可以理解为秒或者毫秒(方便起见,下面讲述统一认为是秒)。当产生于2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线,随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。

水位线就像它的名字一样,是数据流中的一部分,随着数据一起流动,在不同任务之间传输。不过水位线也存在有序和乱序区分:

2.2.1 有序流中的水位线

在理想状态下,数据按照顺序排好队进入数据流,并且处理的过程遵循先来后到的原则,这样每个数据中提取的时间戳就会保持从小到大的增长,而水位线也会不断增长、事件时钟也不断向前推进。

但是在实际应用中,数据量非常大,还有可能好多数据的事件时间戳都是相同的,每一个都插入水位线是做了很多无用功的。所以为了提高效率一般每隔一段时间生成一个水位线,这个水位线的时间戳就是当前最新数据的时间戳,就如下图所示:

在这里插入图片描述
这里注意水位线插入的周期本身也是时间概念,并且这个周期时间是指处理时间也就是系统时间,而不是事件时间,如果使用事件时间就陷入了死循环了。

2.2.2 乱序流中的水位线

有序流的处理非常简单,但是在分布式系统中,会因为数据在节点的传输以及网络传输的延迟不确定性导致顺序发生改变,这就是乱序数据。

这里所说的乱序是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。如图 6-7 所示,一个 7 秒时产生的数据,生成时间自然要比 9 秒的数据早;但是经过数据缓存和传输之后,处理任务可能先收到了 9 秒的数据,之后 7 秒的数据才姗姗来迟。这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?

在这里插入图片描述解决思路也很简单:我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,如图 6-8 所示。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
在这里插入图片描述
考虑到大量数据同时到来的处理效率,我们同样可以周期性的生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线,如图 6-9 所示。
在这里插入图片描述但是又如何处理迟到的数据?其实也很简单,那就是等待几秒,比如上图当收到9秒的数据,等待2秒也就是7秒,这时候7秒的数据到来,数据就到齐了。
在这里插入图片描述
下图是我们使用周期性的方式生成等待2秒的水位线
在这里插入图片描述
另外需要注意的是,这里一个窗口所收集的数据,并不是之前所有已经到达的数据。因为数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。也就是说,上图中尽管水位线 W(20)之前有时间戳为 22 的数据到来,10~20 秒的窗口中也不会收集这个数据,进行计算依然可以得到正确的结果。

2.2.3 水位线的特性

  • 水位线是插入到数据流中的一种标记,可以认为是一种特殊的数据
  • 水位线主要内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t) 表示在当前流中事件时间已经达到了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’<= t的数据

水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

2.3 如何生成水位线

水位线是保证窗口处理结果的正确性的,如果不能正确处理所有乱序数据,可以尝试调大延迟时间,但是在实际应用中不是随便调整的。

2.3.1 生成水位线的总体原则

只能尽量保证水位线的正确,如果想对结果正确性要求很高,那就需要设置等待时间长一点,但是等待的时间越长,漏掉数据的概率越低。但是这样做的代价就是处理的实时性降低了,所以我们希望能处理的更快,更实时,就必须将水位线设置的低一些。而对于漏掉的数据,Flink提供了窗口处理迟到数据。

Flink中的水位线其实就是流处理中对延迟和结果正确性的一个权衡机制,而且把控制权交给了程序员,我们可以在代码中定义水位线的生成策略。

2.3.2 水位线的生成策略(Watermark Strategies)

FlinkDataStream API中,提供了用于生成水位线的方法.assignTimestampsAndWatermarks(),主要是用来为流中的数据分配时间戳,并生成水位线来指示事件时间,

// 为数据流中的元素分配时间戳并生成水位线以表示事件时间进度。给定的 WatermarkStrategy 用于创建 TimestampAssigner 和 WatermarkGenerator。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)

具体使用时直接用DataSream调用该方法即可,与普通transform方法完全一样

DataStream<String> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.STRING_TYPE_INFO);
        DataStream<String> withTimestampsAndWatermarks = stringDataStreamSource.assignTimestampsAndWatermarks(<watermark strategy>);

.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这个参数就是水位线生成策略WatermarkStrategy中包含了一个时间戳分配器(TimestampAssigner)和一个水位线生成器(WatermarkGenerator)

@Public
public interface WatermarkStrategy<T>
        extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
@Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
@Override
    default TimestampAssigner<T> createTimestampAssigner(
            TimestampAssignerSupplier.Context context) {
        return new RecordTimestampAssigner<>();
    }
}
  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator接口中,主要又有两个方法:onEvent()onPeriodicEmit()
  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件,时间戳以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作。
  • onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms
@Public
public interface WatermarkGenerator<T> {
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    void onPeriodicEmit(WatermarkOutput output);
}

也可以通过设置周期时间
env.getConfig().setAutoWatermarkInterval(60 * 1000L);

2.3.3 Flink内置水位线生成器

Flink提供了两种内置的水位线生成器(WatermarkGenerator),分别对应着有序流和乱序流的场景。

2.3.3.1 有序流

对于有序流主要特点就是时间戳单调增长,所以永远不会出现迟到的数据问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStategy.forMonotonousTimestamps()方法就可以实现,简单说就是拿当前最大时间戳作为水位线就可以了。

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
        return (ctx) -> new AscendingTimestampsWatermarks<>();
    }

源码可以看到有序流返回的水位线生成器策略是AscendingTimestampsWatermarks对象:
示例代码

ArrayList<Event> list = new ArrayList<>();
        list.add(new Event("ming","www.baidu1.com",1200L));
        list.add(new Event("xiaohu","www.baidu5.com",1267L));
        list.add(new Event("ming","www.baidu7.com",4200L));
        list.add(new Event("xiaohu","www.baidu8.com",5500L));

        DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
        // 使用forMonotonousTimestamps
        DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );

上面的代码中我们调用的.withTimestampAssigner()方法,将数据中的timestamp字段提取出来作为时间戳分配给数据元素;然后使用内置有序流水位线生成器构造出了生成策略,这样提取出来的数据时间戳就是我们处理计算时间的事件时间

要注意的是这里的时间戳和水位线的单位必须都是毫秒。

2.3.3.2 乱序流

由于乱序流中需要等待迟到的数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成的水位线的时间戳就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。

static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
    }

不过这个方法需要传一个maxOutOfOrderness参数,表示最大乱序程度, 它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,呢么设置对应时间长度的延迟就可以等到所有的乱序数据了。
示例代码

DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );

上面的代码提取了timestamp字段作为时间戳,并且以5毫秒的延迟时间创建了处理乱序流的水位线生成器。

查看有序流的水位线生成器和乱序流的水位线生成器的源码可以发现,其实有序流的水位线生成器本质上和乱序流是一样的,相当于延迟为0秒的乱序流水位线生成器。

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0));
    }
}

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    private long maxTimestamp;
    private final long outOfOrdernessMillis;
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

这里还要注意一下,乱序流中生成的水位线真正的时间戳,其实是当前最大时间戳 - 延迟时间 - 1,这里的单位是毫秒。为什么要减去1毫秒是以为Flink中的窗口都是左开右闭的(8,7] 表示7点到8点的数据。

2.3.4 自定义水位线策略

WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了;不同的点在于WatermarkGenerator的实现,整体来说,有两种生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)

创建自定义水位线离不开WatermarkGeneratoronEvent() onPeriodicEmit(),前者是每个事件到来时调用,后者由框架周期性的调用。周期性调用的方法中onPeriodicEmit()发出水位线就是周期性生成水位线,由事件触发的 方法中onEvent()发出的水位线就是断点式生成的水位线。两种方式的不同就集中体现在这两个方法上。

2.3.4.1 周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
示例代码:

public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 从集合中读取数据
        ArrayList<Event> list = new ArrayList<>();
        list.add(new Event("ming","www.baidu1.com",1200L));
        list.add(new Event("xiaohu","www.baidu5.com",1267L));
        list.add(new Event("ming","www.baidu7.com",4200L));
        list.add(new Event("xiaohu","www.baidu8.com",5500L));

        DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
        DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
            @Override
            public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                };
            }

            @Override
            public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Event>() {
                    private Long delayTime = 5000L; // 延迟时间

                    private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
                    @Override
                    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
                        // 每来一条数据就调用一次
                        maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        // 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间
                        output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
                    }
                };
            }
        });
    }

2.3.4.2 断点式水位线生成器(Punctuated Generator)

断点式生成器会不停的检测onEvent()事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过onPeriodicEmit()发出水位线。
示例代码

public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 从集合中读取数据
        ArrayList<Event> list = new ArrayList<>();
        list.add(new Event("ming","www.baidu1.com",1200L));
        list.add(new Event("xiaohu","www.baidu5.com",1267L));
        list.add(new Event("ming","www.baidu7.com",4200L));
        list.add(new Event("xiaohu","www.baidu8.com",5500L));

        DataStream<Event> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
        DataStream<Event> watermarks = stringDataStreamSource.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
            @Override
            public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                };
            }

            @Override
            public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Event>() {
                    @Override
                    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
                        // 只有在遇到特定的 itemId 时,才发出水位线
                        if (event.getUser().equals("Mary")) {
                            output.emitWatermark(new Watermark(event.getTimestamp() - 1));
                        }
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
                    }
                };
            }
        });
    }

我们在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。

2.3.5 在自定义数据源中发送水位线

在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
示例代码

env.addSource(new SourceFunction<Event>() {
            private boolean running = true;
            @Override
            public void run(SourceContext<Event> ctx) throws Exception {
                Random random = new Random();
                String[] userArr = {"Mary", "Bob", "Alice"};
                String[] urlArr = {"./home", "./cart", "./prod?id=1"};
                while (running) {
                    long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                    String username = userArr[random.nextInt(userArr.length)];
                    String url = urlArr[random.nextInt(urlArr.length)];
                    Event event = new Event(username, url, currTs);
                    // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
                    ctx.collectWithTimestamp(event, event.timestamp);
                    // 发送水位线
                    ctx.emitWatermark(new Watermark(event.timestamp - 1L));
                    Thread.sleep(1000L);
                }
            }
            @Override
            public void cancel() {
                this.running = false;
            }
        });

ctx.collectWithTimestamp(event, event.timestamp); 这里发出的时间戳,其实就是我们经常使用的内置水位线生成器的方法中的recordTimestamp

return new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                };

在自定义水位线中生成水位线相比 assignTimestampsAndWatermarks 方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写 Flink 的测试程序,测试 Flink 的各种各样的特性。

2.4 水位线的传递

在每一个分区中都有一个分区水位线。
在这里插入图片描述
如图 6-12 所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:

  1. 上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
  2. 当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
  3. 再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
  4. 同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务。

水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。

2.5 水位线的总结

水位线在事件时间的世界里面,承担了时钟的角色。也就是说在事件时间的流中,水位线是唯一的时间尺度。如果想要知道现在几点,就要看水位线的大小。后面讲到的窗口的闭合,以及定时器的触发都要通过判断水位线的大小来决定是否触发。

水位线是一种特殊的事件,由程序员通过编程插入的数据流里面,然后跟随数据流向下游流动。水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。
所以这里涉及到一个问题,就是不同的算子看到的水位线的大小可能是不一样的。因为下游的算子可能并未接收到来自上游算子的水位线,导致下游算子的时钟要落后于上游算子的时钟。比如 map->reduce 这样的操作,如果在 map 中编写了非常耗时间的代码,将会阻塞水位线的向下传播,因为水位线也是数据流中的一个事件,位于水位线前面的数据如果没有处理完毕,那么水位线不可能弯道超车绕过前面的数据向下游传播,也就是说会被前面的数据阻塞。这样就会影响到下游算子的聚合计算,因为下游算子中无论由窗口聚合还是定时器的操作,都需要水位线才能触发执行。这也就告诉了我们,在编写 Flink 程序时,一定要谨慎的编写每一个算子的计算逻辑,尽量避免大量计算或者是大量的 IO 操作,这样才不会阻塞水位线的向下传递。

在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE)的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发。

对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算的正确,无需在数据流的中间插入水位线了。

水位线的重要性在于它的逻辑时钟特性,而逻辑时钟这个概念可以说是分布式系统里面最为重要的概念之一了,理解透彻了对理解各种分布式系统非常有帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/688980.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

git介绍和安装/git,github,gitee,gitlab区别/git使用流程/ git常用命令/git忽略文件

git介绍和安装 # 版本管理软件-1 对代码版本进行管理---》首页功能完成---》课程功能完成---》可以回退到某个版本-2 协同开发--》多人开发--》合并代码---》可能会有冲突&#xff0c;解决冲突# 版本管理软件&#xff1a;主流就两个-git&#xff1a;现在用的最多&#xff08;学…

100天精通Golang(基础入门篇)——第10天:Go语言中的数组

&#x1f337; 博主 libin9iOak带您 Go to Golang Language.✨ &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &#x1f30a; 《I…

python与adb无缝衔接控制手机(手机截屏实例)

目录 连接 常用操作 截图到PC端 使用pure-python-adb库可以实现python控制手机&#xff0c;支持input输入&#xff0c;支持shell命令&#xff0c;支持pull/push上传下载文件等。 安装库&#xff1a;pip install pure-python-adb 电脑端需要安装adb程序&#xff0c;在命令行…

技术管理第二板斧建团队-建机制

1.机制作用&#xff1f; 站在团队的角度&#xff0c;建机制尤为重要&#xff0c;你要通过机制让团队有统一的行为与规则&#xff0c;让组织像人一样&#xff0c;言行举止有规律可循。 听起来很容易&#xff0c;可要设计一个有效、持续发挥作用的机制并不简单。你不但清晰地认识…

10 Debug功能及方法简述

常见的Debug方法 原文链接&#xff1a;MDK5 Debug调试方法总结_keil5debug调试_小z不会累&#xff5e;的博客-CSDN博客 连接好硬件DAP之后&#xff0c;需要进行一些设置 开始仿真调试 Debug调试按钮分别对应的功能 按钮1->reset复位按钮按钮2->run按钮&#xff0c;程序运…

Keil V5版本开发STM32缺少PACKDFP解决办法 (Error Flash Downloadfailed-“Cortex-M7)

Keil V5版本开发STM32缺少PACKDFP解决办法 Error Flash Downloadfailed-“Cortex-M7 这个网址里有各个固件包的最新版&#xff0c;下载速度快&#xff0c;滋溜一下就下好啦 固件包

CISP-PTE-考前综合题记录

CISP-PTE-考前综合题记录 http://10.1.10.71/&#xff08;文件上传传不上去&#xff09; 获取key6 端口扫描只扫到80端口开放 输入用户名密码&#xff0c;抓包 对Authorization Basic的值进行base64解码&#xff0c;为爆破做准备 猜测用户名为admin,所有留下admin:这一段…

设计模式第15讲——模板模式(Template)

目录 一、什么是模板模式 二、角色组成 三、优缺点 四、应用场景 4.1 生活场景 4.2 java场景 五、代码实现 5.0 代码结构 5.1 OrderFood——抽象类&#xff08;Abstract&#xff09; 5.2 具体类&#xff08;Concrete Class&#xff09; 5.3 testTemplate 六、总结…

JMeter工具使用

1. Jmeter设置语言为简体中文 2. Jmeter添加线程组 3. Jmeter添加http请求 4. Jmeter添加数据统计结果 5. JMeterAddress Already in use 错误解决 windows本身提供的端口访问机制的问题。 Windows提供给 TCP/IP链接的端口为1024-5000&#xff0c;并且要四分钟来循环回收他们。…

PostgreSQL

一. PostgreSQL 简介 1 资料来源&#xff1a; 中文手册&#xff1a;http://www.postgres.cn/docs/14/index.html 知乎链接&#xff1a;https://www.zhihu.com/column/c_1452567507496689664 视频链接&#xff1a;https://www.bilibili.com/video/BV1uW4y1m7pD/?spm_id_frompa…

win下docker安装和使用

安装 下载安装包&#xff1a;https://docs.docker.com/desktop/install/windows-install/ 下载 Linux 内核更新包 适用于 x64 计算机的 WSL2 Linux 内核更新包 解决docker下载镜像速度慢问题 阿里云镜像加速器&#xff1a;https://阿里ID.mirror.aliyuncs.com 复制上面镜像…

轮廓检测,高斯模糊及功能

轮廓检测 一、实验介绍 1. 实验内容 本实验将学习轮廓检测及功能。 2. 实验要点 生成二进制图像来查找轮廓找到并画出轮廓轮廓特征边界矩形 3. 实验环境 Python 3.6.6numpymatplotlibcv2 二、实验步骤 1 导入资源并显示图像 import numpy as np import matplotlib.py…

rabbitmq第四课-RabbitMQ高可用集群架构详解以及生产环境最佳实践

一、如何保证RabbitMQ服务高可用 1、RabbitMQ如何保证消息安全 之前通过单机环境搭建起来的RabbitMQ服务有一个致命的问题&#xff0c;那就是服务不稳定的问题。如果只是单机RabbitMQ的服务崩溃了&#xff0c;那还好&#xff0c;大不了重启下服务就是了。 但是如果是服务器的…

MySQL 逻辑备份mysqldump

逻辑备份mysqldump MySQL 自带的逻辑备份工具。可以保证数据的一致性和服务的可用性原理是通过协议连接到 MySQL 数据库&#xff0c;将需要备份的数据查询出来&#xff0c;将查询出的数据转换成对应的 insert 语句&#xff0c;当我们需要还原这些数据时&#xff0c;只要执行这些…

红帽“背叛”开源:限制RHEL源码访问,突袭下游发行版

红帽决定停止公开提供其企业发行版 Red Hat Enterprise Linux (RHEL) 源代码。 从现在开始&#xff0c;CentOS Stream 将成为公共 RHEL 相关源代码发布的唯一仓库&#xff0c;付费客户和合作伙伴可通过 Red Hat Customer Portal 访问到源代码。 CentOS Stream 是由 Red Hat 公…

UNIX环境高级编程——网络IPC:套接字

16.1 引言 本章将考察不同计算机&#xff08;通过网络相连&#xff09;上的进程相互通信的机制&#xff1a;网络进程间通信&#xff08;network IPC&#xff09;。 16.2 套接字描述符 为创建一个套接字&#xff0c;调用socket函数&#xff1a; #include <sys/socket.h&g…

图像增强之图像锐化(边缘增强)之robot算子

目录 note code test note matx (-1,0;1,0) maty (0,-1;1,0) code // 图像增强之图像锐化(边缘增强)之robot算子 void GetRobot(Mat& robotX, Mat& robotY) {robotX (Mat_<int>(2,2) << -1,0,1,0);robotY (Mat_<int>(2,2) << 0,-1,1…

bug汇集-二

1、多个表格 设置 只让当前选中行对应的表格行--高亮 问题&#xff1a;只能设置一个表格高亮&#xff0c;选中一个表格某行高亮&#xff0c;另一行就不高亮 解决&#xff1a; 1、在 表格属性配置里&#xff0c; 把 current-changecurrentChange 写成 current-change"…

多账号矩阵管理系统技术嫁接开发源代码

多账号矩阵管理系统技术嫁接开发源代码 文章目录 一、剪辑部分源代码开发示例二、发布投放部分源代码示例 1.账号绑定一码多扫技术应用开发代码示例2.定时挂载投放源代码示例 一、剪辑部分源代码开发示例 创建工程项目 */ public function createProjectAction() { …

windows电脑如何设置通电自启

原来电脑是需要摁启动摁扭才能开机&#xff0c;现在需要给服务器设置成通电自启 开机后摁del或者F2键进入bois设置&#xff0c;再摁F7进入高级设置&#xff0c;进入Advanced---APM Configuration 进入后 Restore AC Power Loss选择开启。