Flink Watermark 源码分析

news2024/10/7 6:40:54

随着 flink 的快速发展与 API 的迭代导致新老版本差别巨大遂重拾 flink,在回顾到时间语义时对 watermark 有了不一样的理解。

一、如何生成

在 flink 1.12(第一次学习的版本)时 watermark 生成策略还有两种: punctuated 和 periodic,在 1.17 中 punctuated 已被标记过时。这里简单阐述一下这两种策略的不同

  • punctuated: 随事件进行生成,每条数据后插入一个根据当前事件解析出来的 watermark
  • periodic: 随时间进行生成,默认每 200ms 生成一次 watermark

当 flink 系统吞吐量巨大 punctuated 策略会导致系统数据量剧增甚至阻塞业务数据的流动(提前透露一下 watermark 本质也是一条数据);periodic 策略就很好的解决这个痛点,可能有人疑惑了: 当 flink 没有数据时 periodic 也会定时无限的生成 watermark 会不会有问题?显然是不会的,因为 flink 本身就是大数据处理框架这点 QPS 简直是洒洒水,其次下游对 watermark 的处理逻辑也是轻量级的(第三章)。下面是当前版本 flink 生成 watermark 的 api

package org.vital.eu.job.time;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class WaterMarkGenerateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.socketTextStream("127.0.0.1", 1111)
                .map(x -> x)
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<String>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> Long.parseLong(element)))
                .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) {
                        System.out.println("当前 watermark: " + ctx.timerService().currentWatermark());
                        out.collect(value);
                    }
                })
                .print();

        env.execute();
    }
}

Tip: 这里的 map 算子很重要,是我能进行下去的重要依据

对于构造 watermark 的策略又有三种:

  1. forMonotonousTimestamps: 时间戳单调递增生成策略
  2. forBoundedOutOfOrderness: 为记录乱序的情况创建水印策略
  3. forGenerator: 自定义策略

这里不再赘述 forMonotonousTimestamps 和 forBoundedOutOfOrderness 应用场景,forGenerator 自定义生成也不再赘述(给的两种策略够用了)。下面是两种策略源码

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
      return (ctx) -> new AscendingTimestampsWatermarks<>();
}

public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

    /** Creates a new watermark generator with for ascending timestamps. */
    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0));
    }
}

static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
      return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}

可以看出 forMonotonousTimestamps 本质上就是 forBoundedOutOfOrderness,只不过乱序程度为 0。

对于 watermark 生成策略在 flink 上是一个接口

public interface WatermarkGenerator<T> {

    // 数据来一条调用一次
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    // 定时调用,默认 200ms
    void onPeriodicEmit(WatermarkOutput output);
}

我们来看一下 forBoundedOutOfOrderness 是如何实现的

public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    // 记录接收到的最大时间戳
    private long maxTimestamp;

    // 乱序程度
    private final long outOfOrdernessMillis;

    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // 默认值为 Long 最小值 + 乱序程度 + 1
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 每条数据都会更新最大值
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发送 watermark 逻辑
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

需要注意的是new Watermark(maxTimestamp - outOfOrdernessMillis - 1),减去 outOfOrdernessMillis 好理解是为了修正乱序,减 1ms 是为了后续的开窗函数,保证窗口是一个左闭右开的状态,保证上层 flink 中刚好是窗口关闭时间的数据只会落在一个窗口,例如某个 flink 任务的窗口是 [0,5)、[5,10),保证 5s 的数据只在一个窗口

Tip: punctuated 策略其实就是将发送 watermark 的逻辑写到 onEvent 中

二、它的本质

2.1 assignTimestampsAndWatermarks 本质

它的本质是一个算子

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
          WatermarkStrategy<T> watermarkStrategy) {
      final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
      // match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship
      // and chain
      final int inputParallelism = getTransformation().getParallelism();
      final TimestampsAndWatermarksTransformation<T> transformation =
              new TimestampsAndWatermarksTransformation<>(
                      "Timestamps/Watermarks",
                      inputParallelism,
                      getTransformation(),
                      cleanedStrategy,
                      false);
      getExecutionEnvironment().addOperator(transformation);
      return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);
  }

更直观的可以使用 flink web ui,对 assignTimestampsAndWatermarks 应用禁用算子链策略(调用 disableChaining)

image-20230515173312021

至于这个算子到底执行了什么逻辑定义在 TimestampsAndWatermarksOperator 中,其生命周期 open 方法

public void open() throws Exception {
      super.open();

      timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
      watermarkGenerator =
              emitProgressiveWatermarks
                      ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                      : new NoWatermarksGenerator<>();

      wmOutput = new WatermarkEmitter(output);

      watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
      if (watermarkInterval > 0 && emitProgressiveWatermarks) {
          final long now = getProcessingTimeService().getCurrentProcessingTime();
          getProcessingTimeService().registerTimer(now + watermarkInterval, this);
      }
  }

首先从配置中获取定时生成 watermark 间隔参数并创建当前时间(处理时间)+间隔的定时器定义了第一个 watermark 是如何生成的。定时器会自动执行 onProcessingTime 方法

public void onProcessingTime(long timestamp) throws Exception {
      watermarkGenerator.onPeriodicEmit(wmOutput);

      final long now = getProcessingTimeService().getCurrentProcessingTime();
      getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

可以看到 onPeriodicEmit 在这里被调用发送一次 watermark,随后再次创建下一次的定时器(这种思路可以在开发定时器时借鉴一下),作为一个算子肯定会接受数据并进行处理,即 processElement 方法

public void processElement(final StreamRecord<T> element) throws Exception {
      final T event = element.getValue();
      final long previousTimestamp =
              element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
      final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);

      element.setTimestamp(newTimestamp);
      output.collect(element);
      watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
}

从方法的入参可以看出来 flink 算子间的数据流动是 StreamRecord 对象。它对数据的处理逻辑是什么都不做直接向下游发送,然后调用 onEvent 记录最大时间戳,也就是说:flink 是先发送数据再生成 watermark,watermark 永远在生成它的数据之后。

image-20230515191729954

总结: watermark 生成器本质上是一个算子,在生命周期方法 open 中注册定时器并在定时器中发送记录的最大时间戳的 watermark 并继续注册定时器;算子对业务数据不做任务处理直接发送给下游后记录当前数据的时间与记录的最大时间作比较(即使是事件时间也不可回溯)

2.2 Watermark 本质

探究一下output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));做了什么,首先 output 是一个 WatermarkOutput 对象同样是在 open 方法中被定义

wmOutput = new WatermarkEmitter(output);

发送 watermark 的方法如下

@Override
public void emitWatermark(Watermark watermark) {
    final long ts = watermark.getTimestamp();

    if (ts <= currentWatermark) {
        return;
    }

    currentWatermark = ts;

    markActive();

    output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
}

最终时间会被封装成org.apache.flink.streaming.api.watermark.Watermark

public final class Watermark extends StreamElement {

}

继承自 StreamElement,这个类就有意思了从第三章的分析可以得出结论,flink 算子见的数据流动统一是 StreamElement(用于 checkpoint 的 barrier 不在其中,它是分布式快照机制具有对应的操作,普通数据是没有操作的)。对于 StreamElement 有四个子类分别是:

  1. StreamRecord: 业务数据
  2. Watermark: 用于表示事件时间的特殊数据
  3. LatencyMarker: 特殊记录数据,记录创建时间、算子id、subtask编号
  4. WatermarkStatus: 用于标记是否为空闲流,即:IDLE 和 ACTIVE

也就是说 watermark 本质上和业务数据没有什么区别,都是作为 StreamElement 在算子间流动,只不过下游分发策略是广播

三、如何传递

在 process 算子中打上断点进行 debug,通过分析调用方法的堆栈发现核心方法 processElement

image-20230515193832049

源码如下:

private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
    if (recordOrMark.isRecord()) {
        output.emitRecord(recordOrMark.asRecord());
    } else if (recordOrMark.isWatermark()) {
        statusWatermarkValve.inputWatermark(
                recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
    } else if (recordOrMark.isLatencyMarker()) {
        output.emitLatencyMarker(recordOrMark.asLatencyMarker());
    } else if (recordOrMark.isWatermarkStatus()) {
        statusWatermarkValve.inputWatermarkStatus(
                recordOrMark.asWatermarkStatus(),
                flattenedChannelIndices.get(lastChannel),
                output);
    } else {
        throw new UnsupportedOperationException("Unknown type of StreamElement");
    }
}

这里则验证上面的说明,算子间数据传递都被封装成 StreamElement,并在 processElement 中进行判断

3.1 StreamRecord 处理逻辑

StreamRecord 作为业务数据的封装,在后续被 ProcessOperator 中被 processElement 调用

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    context.element = element;
    userFunction.processElement(element.getValue(), context, collector);
    context.element = null;
}

最终将数据传递给用户函数 userFunction 即我们再算子中定义的逻辑

3.2 Watermark 处理逻辑

我们知道当算子接收到 Watermark 时首先会进行对其操作并发送接收到的最小的 Watermark 到下游,也就是在多并行下 Watermark 传递规则是发送接收到的最小的 Watermark。

public void inputWatermark(Watermark watermark, int channelIndex, DataOutput<?> output)
        throws Exception {
    // ignore the input watermark if its input channel, or all input channels are idle (i.e.
    // overall the valve is idle).
    if (lastOutputWatermarkStatus.isActive()
            && channelStatuses[channelIndex].watermarkStatus.isActive()) {
        long watermarkMillis = watermark.getTimestamp();

        // if the input watermark's value is less than the last received watermark for its input
        // channel, ignore it also.
        if (watermarkMillis > channelStatuses[channelIndex].watermark) {
            channelStatuses[channelIndex].watermark = watermarkMillis;

            if (channelStatuses[channelIndex].isWatermarkAligned) {
                adjustAlignedChannelStatuses(channelStatuses[channelIndex]);
            } else if (watermarkMillis >= lastOutputWatermark) {
                // previously unaligned input channels are now aligned if its watermark has
                // caught up
                markWatermarkAligned(channelStatuses[channelIndex]);
            }

            // now, attempt to find a new min watermark across all aligned channels
            findAndOutputNewMinWatermarkAcrossAlignedChannels(output);
        }
    }
}

接收 Watermark 的前提条件是上游是活跃状态即不处于 IDLE,注意 channelIndex 代表当前通道的索引,channel 是 flink 算子内部维护了一组输入和输出通道,用于实现数据流的输入和输出

img

实现接收上游数据的通道在 flink 中由 InputChannel 实现(跑题了)

回到对 Watermark 的处理逻辑上,首先判断接收到的 Watermark 与保存的 Watermark 大小,如果小于保存的则什么都不做否则将保存接收到的 Watermark;随后进行通道对其,关于内部的对其逻辑博主还需要花时间继续研究一下,从方法名和参数来看初步判断是使用 PriorityQueue 来实现的(后续研究透了再来水一篇)。

3.3 WatermarkStatus 处理逻辑

WatermarkStatus 主要是将当前 channel 状态进行转换,从 idle -> active 或 active -> idle,除了状态上的变化还会修改其对其状态等

3.4 LatencyMarker 处理机制

这个最简单直接进行透传

总结: 从 processElement 中可以看出在算子内部数据、Watermark是串行处理的,当业务数据没有处理完是不会处理后面的数据,这也就是为什么我们通过上下文对象拿个当前的 Watermark 时都是会慢与当前数据,即使你在函数中等待 Watermark 触发定时也拿不到,因为此时 Watermark 还有没进入算子中没有被处理,算子在 processElement 中被阻塞了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/529651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于自建靶场三层网络的内网渗透

注意&#xff1a;一切内容仅用于安全技术的分享&#xff0c;切勿用于其他用途&#xff0c;产生严重的后果与作者无关 前言介绍&#xff1a; 网络拓扑图&#xff1a; 为了方便起见&#xff0c;我在每个服务器放有webshell&#xff0c;这里主要是让我们熟悉sock代理的使用。 这…

修剪二叉搜索树

1题目 给你二叉搜索树的根节点 root &#xff0c;同时给定最小边界low 和最大边界 high。通过修剪二叉搜索树&#xff0c;使得所有节点的值在[low, high]中。修剪树 不应该 改变保留在树中的元素的相对结构 (即&#xff0c;如果没有被移除&#xff0c;原有的父代子代关系都应当…

10 常见网站安全攻击手段及防御方法

在某种程度上&#xff0c;互联网上的每个网站都容易遭受安全攻击。从人为失误到网络罪犯团伙发起的复杂攻击均在威胁范围之内。 网络攻击者最主要的动机是求财。无论你运营的是电子商务项目还是简单的小型商业网站&#xff0c;潜在攻击的风险就在那里。 知己知彼百战不殆&…

【一起撸个深度学习框架】6 折与曲的相会——激活函数

CSDN个人主页&#xff1a;清风莫追欢迎关注本专栏&#xff1a;《一起撸个DL框架》GitHub获取源码&#xff1a;https://github.com/flying-forever/OurDLblibli视频合集&#xff1a;https://space.bilibili.com/3493285974772098/channel/series 文章目录 6 折与曲的相会——激活…

史蒂夫·青木主题的 Game Jam

准备好潜入史蒂夫青木的脑海中&#xff0c;创造一个探索他内心思想的游戏吧&#xff01;史蒂夫青木主题 Game Jam 正式推出&#xff0c;这是一场为期两周的游戏制作比赛&#xff0c;鼓励参赛者创造和史蒂夫青木内心世界有关的游戏。 探索这位传奇艺术家和 DJ 潜意识&#xff0c…

nginx压测记录

nginx压测记录 1 概述2 原理3 环境3.1 设备与部署3.2 nginx配置/服务器配置 4 netty服务5 步骤6 结果7 写在最后 1 概述 都说nginx的负载均衡能力很强&#xff0c;最近出于好奇对nginx的实际并发能力进行了简单的测试&#xff0c;主要测试了TCP/IP层的长链接负载均衡 2 原理 …

Python 与数据科学实验(Exp9)

实验9 多分类手写数字识别实验 1.实验数据 &#xff08;1&#xff09;训练集 所给数据一共有42000张灰度图像&#xff08;分辨率为28*28&#xff09;&#xff0c;目前以train_data.csv文件给出. 图像内容涵盖了10个手写数字0-9。 图像示例如图所示&#xff1a; train_data.…

算法(一)—— 回溯(4)困难题

文章目录 1 37 解数独2 51 N 皇后 1 37 解数独 首先明确需要两个for循环&#xff0c;这样才可以遍历整个9*9的表。 此题数字的选取逻辑再次展现了回溯的暴力性。 此题需要拥有返回值&#xff0c;与数据结构&#xff08;六&#xff09;—— 二叉树&#xff08;5&#xff09;中…

物联网和云计算:如何将设备数据和云端服务相结合

第一章&#xff1a;引言 物联网和云计算是当今IT领域中的两个重要概念&#xff0c;它们的结合为企业和个人带来了巨大的机遇和挑战。物联网通过连接各种设备和传感器&#xff0c;实现了设备之间的互联互通&#xff0c;而云计算则提供了强大的计算和存储能力。本文将深入探讨如何…

MySQL学习(基础篇1.0)

MySQL概述&#xff08;基础&#xff09; SQL 全称Structured Query Language,结构化察浑语言。操作关系型数据库的编程语言&#xff0c;定义了一套操作关系型数据库的统一标准。 SQL通用语法 SQL语言的统统用语法&#xff1a; SQL语句可以单行或多行书写&#xff0c;以分号…

论文阅读|基于图神经网络的配电网故障定位方法

来源&#xff1a;北京交通大学硕士学位论文&#xff0c;2022 摘要 电网拓扑形态多样&#xff0c;重构场景频繁&#xff0c;&#xff0c;传统故障定位方法的单一阈值设定无法满足要求&#xff0c;基于人工智能的配电网故障定位技术具有很大的应用潜力&#xff0c;但仍存在着拓…

HTML概述及常用语法

什么是 HTML HTML 用来描述网页的一种语言 HTML -- hyper text markup language 超文本标记语言 超文本包括&#xff1a;文字、图片、音频、视频、动画等等 标记语言&#xff1a;是一套标记标签&#xff0c; HTML 使用标记标签来 描述 网页 <> HTML 发展史 HTML5 …

Web基础 ( 二 ) CSS

2.CSS 2.1.概念与基础 2.1.1.什么是CSS Cascading Style Sheets 全称层叠样式单 简称样式表。 是告诉浏览器如何来显示HTML的元素的特殊标记 2.1.2.编写方式 2.1.2.1.外部文件 在html文件的<head>中加入<link>结点来引入外部的文件 <link rel"stylesh…

Go Wails Docker图形界面管理工具 (5)

文章目录 1. 前言2. 效果图3. 代码 1. 前言 接上篇&#xff0c;本次添加Docker存储卷功能 待优化: 优化分页效果添加存储卷大小查看功能 2. 效果图 3. 代码 直接调用官方库 app.go func (a *App) VolumeList() ([]*volume.Volume, error) {resp, err : Cli.VolumeList(context…

Linux中关于时间修改的命令

目录 Linux中关于时间修改的命令 data命令 语法格式 示例 date命令中的参数以及作用 常用格式示例 timedatectl命令 语法格式 timedatectl 命令中的参数以及作用 常用格式 Linux中关于时间修改的命令 data命令 data --- 用于显示或设置系统的时间与日期 用户只需在强…

干货丨警惕!14个容易导致拒稿的常见错误

Hello,大家好&#xff01; 这里是壹脑云科研圈&#xff0c;我是喵君姐姐~ 从做研究、到写论文、再到投稿&#xff0c;每一步都是巨大的挑战。以下列举了一些在这些过程中可能导致拒稿的常见错误&#xff0c;希望能帮助大家避开。 01 格式问题 1.没有遵守投稿须知 期刊提供了…

oracle基于时间点恢复遇到ORA-10877错误

一次给客户进行基于时间点恢复的时候,出现报错ORA-10877,如下: 这里很奇怪,这个归档日志有的,当前全库的备份是05-14 23点的,所以应该是可以恢复的,检查一下alter日志: 这里报错,指定的时间scn不属于当前的incarnation,那么检查一下当前的incarnation: 这里当前的incarnation是…

Linux实操篇---常用的基本命令3(用户(组)管理命令、文件权限类、搜索查找类、压缩解压类)

一、用户管理命令 Linux是一个多用户&#xff0c;多任务的分时操作系统。甚至有可能同时登录&#xff0c;同时操作。所以给用户不同的账号。 useradd添加新用户 基本语法&#xff1a; 只能用root进行操作。 useradd 用户名 添加新用户 useradd -g 组名 用户名 添加新用…

MyBatis Plus 代码生成器

一、引入POM依赖 <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version></dependency><dependency><groupId>com.baomidou</groupId&g…

3ds Max云渲染平台哪个好?

3ds Max云渲染平台哪个好&#xff1f; 3ds Max是一款包含建模、动画、粒子动力学等强大功能的三维动画制作软件&#xff0c;3ds Max对特定如游戏建模、特效制作、产品模型设计等领域都具备了过硬的专业能力&#xff0c;同时3ds Max也是很多CGer青睐的CG软件。 作为支持3ds Ma…