【大数据】详解 Flink 中的 WaterMark

news2024/12/25 12:29:52

详解 Flink 中的 WaterMark

  • 1.基础概念
    • 1.1 流处理
    • 1.2 乱序
    • 1.3 窗口及其生命周期
    • 1.4 Keyed vs Non-Keyed
    • 1.5 Flink 中的时间
  • 2.Watermark
    • 2.1 案例一
    • 2.2 案例二
    • 2.3 如何设置最大乱序时间
    • 2.4 延迟数据重定向
  • 3.在 DDL 中的定义
    • 3.1 事件时间
    • 3.2 处理时间

1.基础概念

1.1 流处理

流处理,最本质的是在处理数据的时候,接受一条处理一条数据。

批处理,则是累积数据到一定程度在处理。这是他们本质的区别。

在设计上 Flink 认为数据是流式的,批处理只是流处理的特例。同时对数据分为有界数据和无界数据。

  • 有界数据对应批处理,API 对应 DateSet
  • 无界数据对应流处理,API 对应 DataStream

1.2 乱序

什么是乱序呢?

可以理解为数据到达的顺序和其实际产生时间的排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的。虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order 或者说 late element)。

✅ 某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有 5 秒的延时,也就是在实际时间的第 1 秒产生的数据有可能在第 5 秒中产生的数据之后到来(比如到 Window 处理节点)。例如,有 1 ~ 10 个事件,乱序到达的序列是:2, 3, 4, 5, 1, 6, 3, 8, 9, 10, 7。

1.3 窗口及其生命周期

对于 Flink,如果来一条消息计算一条,这样是可以的,但是这样计算是非常频繁而且消耗资源,如果想做一些统计这是不可能的。所以对于 Spark 和 Flink 都产生了窗口计算。

比如,因为我们想看到过去一分钟或过去半小时的访问数据,这时候我们就需要窗口。

  • Window:Window 是处理无界流的关键,Window 将流拆分为一个个有限大小的 buckets,可以在每一个 buckets 中进行计算。
  • 当 Window 是时间窗口的时候,每个 Window 都会有一个开始时间(start_time)和结束时间(end_time)(左闭右开),这个时间是系统时间。

简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

窗口有如下组件:

  • Window Assigner:用来决定某个元素被分配到哪个或哪些窗口中去。
  • Trigger:触发器。决定了一个窗口何时能够被计算或清除。触发策略可能类似于 “当窗口中的元素数量大于 4” 时,或 “当水位线通过窗口结束时”。
  • Evictor:驱逐器。Evictor 提供了在使用 WindowFunction 之前或者之后从窗口中删除元素的能力。

窗口还拥有函数,比如 ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction。该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。

1.4 Keyed vs Non-Keyed

在定义窗口之前,要指定的第一件事是流是否需要 Keyed,使用 keyBy(...) 将无界流分成逻辑的 keyed stream。如果未调用 keyBy(...),则表示流不是 keyed stream

  • 对于 Keyed 流,可以将传入事件的任何属性用作 key。拥有 keyed stream 将允许窗口计算由多个任务并行执行,因为每个逻辑 Keyed 流可以独立于其余任务进行处理。相同 Key 的所有元素将被发送到同一个任务。
  • 在 Non-Keyed 流的情况下,原始流将不会被分成多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行性为 1。

1.5 Flink 中的时间

Flink 在流处理程序支持不同的时间概念。分别为是 事件时间Event Time)、处理时间Processing Time)、提取时间Ingestion Time)。

从时间序列角度来说,发生的先后顺序是:事件时间提取时间处理时间

  • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入 Apache Flink 流处理系统的时间,也就是 Flink 读取数据源时间。
  • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是 Flink 程序处理该事件时当前系统时间。

2.Watermark

Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。Watermark 是用于处理乱序事件或延迟数据的,这通常用 Watermark 机制结合 Window 来实现(Watermarks 用来触发 Window 窗口计算)。

2.1 案例一

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3000; // 3.0 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        // 生成 watermark
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

在这里插入图片描述
上图中是一个 10s 大小的窗口,1000020000 为一个窗口。当 EventTime 为 23000 的数据到来,生成的 WaterMark 的时间戳为 20000,大于等于 window_end_time,会触发窗口计算。

2.2 案例二

public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxTimeLag = 3000; // 3 seconds

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current time minus the maximum time lag
        return new Watermark(System.currentTimeMillis() - maxTimeLag);
    }
}

在这里插入图片描述
只是简单的用当前系统时间减去最大延迟时间生成 Watermark ,当 WaterMark 为 20000 时,大于等于窗口的结束时间,会触发 1000020000 窗口计算。再当 EventTime 为 19500 的数据到来,它本应该是属于窗口 1000020000 窗口的,但这个窗口已经触发计算了,所以此数据会被丢弃。

2.3 如何设置最大乱序时间

虽说水位线表明着早于它的事件不应该再出现,接收到水位线以前的的消息是不可避免的,这就是所谓的 迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有 3 种:

  • 重新激活已经关闭的窗口并重新计算以修正结果。将迟到事件收集起来另外处理。将迟到事件视为错误消息并丢弃。Flink 默认的处理方式是直接丢弃,其他两种方式分别使用 Side OutputAllowed Lateness
  • Side Output 机制 可以将迟到事件单独放入一个数据流分支,这会作为 Window 计算结果的副产品,以便用户获取并对其进行特殊处理。
  • Allowed Lateness 机制 允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间迟到的事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

这里总结机制为:

  • 窗口 Window 的作用是为了周期性的获取数据。
  • WaterMark 的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess 是将窗口关闭时间再延迟一段时间。
  • sideOutPut 是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。
public class TumblingEventWindowExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
//        env.getConfig().setAutoWatermarkInterval(100);
        DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
        DataStream<Tuple2<String, Long>> resultStream = socketStream
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(String element) {
                        long eventTime = Long.parseLong(element.split(" ")[0]);
                        System.out.println(eventTime);
                        return eventTime;
                    }
                })
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return Tuple2.of(value.split(" ")[1], 1L);
                    }
                })
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 允许延迟处理2秒
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                    }
                });
        resultStream.print();
        env.execute();
    }
}

在这里插入图片描述
watermark 为 21000 时,触发了 [10000, 20000) 窗口计算,由于设置了 allowedLateness(Time.seconds(2)),即允许两秒延迟处理,watermark < window_end_time + lateTime 公式得到满足,因此随后 10000 和 12000 进入窗口时,依然能触发窗口计算;随后 watermark 增加到 22000,watermark < window_end_time + lateTime 不再满足,因此 11000 再次进入窗口时,窗口不再进行计算。

2.4 延迟数据重定向

流的返回值必须是 SingleOutputStreamOperator,其是 DataStream 的子类。通过 getSideOutput 方法获取延迟数据。可以将延迟数据重定向到其他流或者进行输出。

public class TumblingEventWindowExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
        //保存被丢弃的数据
        OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
        //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
        SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = socketStream
                // Time.seconds(3)有序的情况修改为0
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(String element) {
                        long eventTime = Long.parseLong(element.split(" ")[0]);
                        System.out.println(eventTime);
                        return eventTime;
                    }
                })
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return Tuple2.of(value.split(" ")[1], 1L);
                    }
                })
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .sideOutputLateData(outputTag) // 收集延迟大于2s的数据
                .allowedLateness(Time.seconds(2)) //允许2s延迟
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                    }
                });
        resultStream.print();
        //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
        DataStream<Tuple2<String, Long>> sideOutput = resultStream.getSideOutput(outputTag);
        sideOutput.print();
        env.execute();
    }
}

3.在 DDL 中的定义

3.1 事件时间

事件时间属性是通过 CREATE TABLE DDL 语句中的 WATERMARK 语句定义的。水印语句在现有事件时间字段上定义 水印生成表达式,将事件时间字段标记为事件时间属性。

Flink SQL 支持在 TIMESTAMPTIMESTAMP_LTZ 列上定义事件时间属性。如果源中的时间戳数据以 年-月-日-时-分-秒 表示,通常是不含时区信息的字符串值,例如 2020-04-15 20:13:40.564,建议将事件-时间属性定义为 TIMESTAMP 列。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- Declare the user_action_time column as an event-time attribute
  -- and use a 5-seconds-delayed watermark strategy.
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

如果数据源中的时间戳数据以纪元时间表示,通常是一个长值,例如 1618989564564,建议将事件时间属性定义为 TIMESTAMP_LTZ 列。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  ts BIGINT,
  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  -- Declare the time_ltz column as an event-time attribute
  -- and use a 5-seconds-delayed watermark strategy.
  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

3.2 处理时间

处理时间能让表格程序根据本地机器的时间产生结果。这是最简单的时间概念,但会产生非确定性结果。处理时间不需要提取时间戳或生成水印。

CREATE TABLE DDL 语句中,使用系统 PROCTIME() 函数将处理时间属性定义为计算列。函数返回类型为 TIMESTAMP_LTZ

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- Declare an additional field as a processing-time attribute.
  user_action_time AS PROCTIME()
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1415914.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1.【Vue3】前端开发引入、Vue 简介

1. 前端开发引入 1.1 前端开发前置知识 通过之前的学习&#xff0c;已经通过 SpringBoot 和一些三方技术完成了大事件项目的后端开发。接下来开始学习大事件项目的前端开发&#xff0c;前端部分借助两个框架实现&#xff1a; Vue3&#xff08;一个 JS 框架&#xff09;基于 …

Vue-Router: 如何使用路由元信息来管理路由?

Vue-Router是Vue.js官方的路由管理器&#xff0c;它可以帮助我们快速构建单页应用程序&#xff08;SPA&#xff09;。除了常见的路由功能外&#xff0c;Vue-Router还支持使用路由元信息来管理和控制路由。路由元信息是可以附加到路由上的自定义属性&#xff0c;它可以帮助我们实…

LandrayOA内存调优 / JAVA内存调优 / Tomcat web.xml 超时时间调优实战

目录 一、背景说明 二、LandrayOA / Tomcat 内存调优 2.1 \win64\tomcat\conf\web.xml 文件调优 2.2 \win64\tomcat\bin\catalina64.bat 文件调优 一、背景说明 随着系统的使用时间越来越长&#xff0c;数据量越多&#xff0c;发现系统的有些功能越来越慢&…

在腾讯云上部署幻兽帕鲁,实现游戏自由!

在帕鲁的世界&#xff0c;你可以选择与神奇的生物「帕鲁」一同享受悠闲的生活&#xff0c;也可以投身于与偷猎者进行生死搏斗的冒险。帕鲁可以进行战斗、繁殖、协助你做农活&#xff0c;也可以为你在工厂工作。你也可以将它们进行售卖&#xff0c;或肢解后食用。引用自&#xf…

第17章_反射机制(理解Class类并获取Class实例,类的加载与ClassLoader的理解,反射的基本应用,读取注解信息,体会反射的动态性)

文章目录 第17章_反射机制本章专题与脉络1. 反射(Reflection)的概念1.1 反射的出现背景1.2 反射概述1.3 Java反射机制研究及应用1.4 反射相关的主要API1.5 反射的优缺点 2. 理解Class类并获取Class实例2.1 理解Class2.1.1 理论上2.1.2 内存结构上 2.2 获取Class类的实例(四种方…

Linux系统优化要义

这里不敢说 linux优化奥义&#xff0c;主要是本文比较浅显&#xff0c;适合普通开发相关人员去读 linux作为服务器系统的王者&#xff0c;以稳定性著称&#xff0c;但对于不同的“应用场景”&#xff0c;相关配置还需调整&#xff0c;才能保证业务稳定性。以下是相关总结 IO优…

函数入门.

函数入门 1. 初识函数2. 函数的参数2.1 参数2.2 默认参数2.3 动态参数 3. 函数返回值总结作业 1. 初识函数 函数到底是个什么东西&#xff1f; 函数&#xff0c;可以当做是一大堆功能代码的集合。 def 函数名():函数内编写代码......函数名()例如&#xff1a; # 定义名字叫in…

Linux 驱动开发基础知识—— 具体单板的 LED 驱动程序(五)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

THM学习笔记——john

John the Ripper是目前最好的哈希破解工具之一。 John基本语法&#xff1a; john [options] [path to file] john&#xff1a;调用John the Ripper程序。 [path to file]&#xff1a;包含你要尝试破解的哈希的文件&#xff0c;如果它们在同一个目录中&#xff0c;你就不需要命名…

S275 4G网络IO模块:智能酒店的理想选择

行业背景 随着物联网技术的发展&#xff0c;酒店服务也变得更加“智能”——自动灯光效果、室内温湿度控制、各种人性化操作等贴心服务&#xff0c;带给顾客真正的宾至如归之感。 同时&#xff0c;智慧酒店更为管理者提供了高效的管理手段&#xff0c;将酒店物耗、能耗、人员…

CSS探索浏览器兼容性

学习如何探索浏览器的兼容性对于编写跨浏览器兼容的CSS代码非常重要。以下是一些学习CSS兼容性的方法&#xff1a; MDN文档&#xff1a;Mozilla开发者网络&#xff08;MDN&#xff09;提供了广泛而详细的CSS文档&#xff0c;其中包含有关CSS属性、选择器和功能的信息。在MDN上…

解决 PDF.js v2.3.200 (build: 4ae3f9fc) 信息:PDFDocument: Stream must have data

文章标题 问题描述&#xff1a;思考分析&#xff1a;解决方案&#xff1a;参考资料 问题描述&#xff1a; 项目中使用PDF.js去预览已上传的附件文件时&#xff0c;加载PDF文件的时候报了以下的错误 错误信息如下 PDF.js v2.3.200 (build: 4ae3f9fc) 信息&#xff1a;PDFDocu…

基于QC-LDPC编码的循环移位网络的FPGA实现

一、桶式移位寄存器(barrel shifter) 八位桶式移位寄存器的VHDL实现如下&#xff0c;由于每一层结构相似&#xff0c;于是采用生成语句for_generate实现&#xff0c;使用该代码实现的RTL级分析和理论的结构一致&#xff0c;仿真结果也符合预期。 entity barrel_shift isGENE…

降低文件增长和失真的零系数JPEG图像可逆信息隐藏

一、研究概述和意义 随着多媒体技术和网络的迅猛发展&#xff0c;数字媒体的应用越来越广泛。在网络上传输的数字 媒体如音频、文本、视频和图像的内容及数据安全问题也随之而来。伴随着计算机技术发展与普及&#xff0c;数字媒体的机密性、完整性经常受到非法活动的威胁&…

OJ_完数和盈数

题干 c实现 #include<stdio.h> #include<vector> using namespace std;int IsWanOrYing(int a) {int sum 1;for (int i 2; i < a; i) {if (a % i 0) {sum i;}}if (sum a) {return 1;}else if (sum > a) {return 2;}return -1; }int main() {vector<…

DjangoURL调度器(二)

一、默认值与额外参数 1.1、默认值 1.1.1、urls.py from django.urls import pathfrom . import viewsurlpatterns [# http://127.0.0.1:8000/polls/blog/ 等同于 # http://127.0.0.1:8000/polls/blog/1/path(blog/, views.page),# http://127.0.0.1:8000/polls/blo…

【鸿蒙】大模型对话应用(二):对话界面设计与实现

Demo介绍 本demo对接阿里云和百度的大模型API&#xff0c;实现一个简单的对话应用。 DecEco Studio版本&#xff1a;DevEco Studio 3.1.1 Release HarmonyOS SDK版本&#xff1a;API9 关键点&#xff1a;ArkTS、ArkUI、UIAbility、网络http请求、列表布局、层叠布局 对话页…

Java笔记 --- 一、双列集合

一、双列集合 双列集合的特点 Map 创建Map对象时&#xff0c;要规定键和值的泛型 Map是一个接口&#xff0c;不能直接创建&#xff0c;要创建实例化对象 Map的遍历 通过键找值 先获取到键的对象&#xff0c;并放到一个单列集合中&#xff08;map.KeySet()方法&#xff09;…

如何预防服务器IP被劫持,危害有什么?

服务器IP被劫持是一种严重的网络安全问题&#xff0c;攻击者通过篡改服务器的IP地址&#xff0c;将网络流量重定向到恶意服务器或网站&#xff0c;导致用户无法正常访问目标服务器&#xff0c;并可能面临数据泄露、恶意软件感染等安全风险。了解服务器IP被劫持的危害和预防措施…

【数据结构与算法】6.栈

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;小杨水平有限&#xff0c;欢迎各位大佬指点&…