Flink时间语义和时间窗口

news2024/11/28 2:31:30

前言

在实际的流计算业务场景中,我们会发现,数据和数据的计算往往都和时间具有相关性。

举几个例子:

  • 直播间右上角通常会显示观看直播的人数,并且这个数字每隔一段时间就会更新一次,比如10秒。
  • 电商平台的商品列表,会显示商品过去24小时的销量、或者总销量
  • 阅读CSDN博客会显示总的阅读量,并且会持续更新

归纳总结可以发现,这些和时间相关的数据计算可以统一用一个计算模型来描述:每隔一段时间,计算过去一段时间内的数据,并输出结果。这个计算模型,就是时间窗口。

时间窗口类型

时间窗口计算模型具备三个重要的属性:

  • 时间窗口的计算频次,即 隔多久计算一次
  • 时间窗口的大小,即 计算过去多久的数据
  • 时间窗口内数据的处理逻辑

举例来说,每隔1分钟计算商品过去24小时的销量。时间窗口的计算频次就是1分钟,时间窗口的大小是24小时,窗口数据的处理逻辑是 对商品销量求和。

Flink 提供了三种时间窗口的类型

滚动窗口(Tumble Window)

滚动窗口的特点是:时间窗口大小和计算频次相同!

顾名思义,滚动窗口就像一个车轮一样滚滚向前,因为窗口大小和计算频次相同,所以窗口是紧密相连的,窗口内的数据不会重复计算。

举个例子,每隔1分钟计算商品过去1分钟的销量。

如下示例程序,每隔5秒计算过去5秒的订单销售额:

public class TumblingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(new SourceFunction<Order>() {
                    @Override
                    public void run(SourceContext<Order> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            Order order = Order.mock();
                            sourceContext.collectWithTimestamp(order, order.createTime);
                            sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).keyBy(i -> i.itemId)
                .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5L)))
                .sum("orderAmount")
                .print();
        environment.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        public String itemId;
        public long orderAmount;
        public long createTime;

        static Order mock() {
            return new Order("001", ThreadLocalRandom.current().nextLong(100), System.currentTimeMillis());
        }
    }
}

这里采用滚动窗口计算模型,窗口大小和计算频次均是5秒,运行作业后,控制台会每隔5秒输出一次总销售额

1> TumblingWindow.Order(itemId=001, orderAmount=250, createTime=1722344630342)
1> TumblingWindow.Order(itemId=001, orderAmount=270, createTime=1722344635388)
1> TumblingWindow.Order(itemId=001, orderAmount=147, createTime=1722344640407)
1> TumblingWindow.Order(itemId=001, orderAmount=253, createTime=1722344645430)
......

滑动窗口(Sliding Window)

滑动窗口的特点是:时间窗口大小和计算频次不相同,如果窗口大小大于计算频次,就会导致数据被重复计算;如果窗口大小小于计算频次,就会导致数据被漏计算;如果二者相等,那就是滚动窗口了。

举个例子,每隔1分钟计算商品过去1小时的销量。窗口大小为1小时,计算频次为1分钟,因此数据会被重复计算多次。

如下示例程序,每隔1秒计算过去5秒的订单销售额,部分订单会被重复计算多次:

public class SlidingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(new SourceFunction<TumblingWindow.Order>() {
                    @Override
                    public void run(SourceContext<TumblingWindow.Order> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            TumblingWindow.Order order = TumblingWindow.Order.mock();
                            sourceContext.collectWithTimestamp(order, order.createTime);
                            sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).keyBy(i -> i.itemId)
                .window(SlidingEventTimeWindows.of(Duration.ofSeconds(5L), Duration.ofSeconds(1L)))
                .sum("orderAmount")
                .print();
        environment.execute();
    }
}

作业运行后,控制台每秒会输出一次过去5秒的销售额。

会话窗口(Session Window)

会话窗口的窗口大小和计算频次非常灵活,可以动态改变,每次都不一样。当窗口隔一段时间没有接收到新的数据,Flink就认为会话可以关闭并计算了,等下一次有新的数据进来,就会开启一个新的会话。这里的“隔一段时间”就是值会话窗口的间隔(Gap),这个间隔可以固定设置也可以动态设置。

举个例子,读书类APP都会有的一个功能,就是统计用户的阅读时长。用户必须有持续的动作,APP才会认为用户是真的在阅读,反之用户长时间没有操作,APP会认为用户已经离开,此时不会再统计阅读时长。

如下示例,随机5秒内模拟一次用户行为,会话窗口间隔设置为3秒,超过3秒认为用户离开,关闭窗口并统计用户阅读时长。

public class SessionWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(new SourceFunction<UserAction>() {
                    @Override
                    public void run(SourceContext<UserAction> ctx) throws Exception {
                        while (true) {
                            UserAction userAction = UserAction.mock();
                            ctx.collectWithTimestamp(userAction, userAction.time);
                            ctx.emitWatermark(new Watermark(System.currentTimeMillis()));
                            // 随机5秒内 用户才会有新的操作
                            Threads.sleep(ThreadLocalRandom.current().nextLong(0L, 5000L));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                })
                // 超过三秒没有收到用户新的动作,认为用户离开,关闭窗口并计算
                .windowAll(EventTimeSessionWindows.withGap(Duration.ofSeconds(3L)))
                .aggregate(new AggregateFunction<UserAction, UserReadingTime, UserReadingTime>() {
                    @Override
                    public UserReadingTime createAccumulator() {
                        return new UserReadingTime();
                    }

                    @Override
                    public UserReadingTime add(UserAction userAction, UserReadingTime userReadingTime) {
                        // 记录窗口内的用户阅读开始和结束时间
                        userReadingTime.userId = userAction.userId;
                        if (userReadingTime.startTime == 0L) {
                            userReadingTime.startTime = userAction.time;
                        }
                        userReadingTime.endTime = userAction.time;
                        return userReadingTime;
                    }

                    @Override
                    public UserReadingTime getResult(UserReadingTime userReadingTime) {
                        return userReadingTime;
                    }

                    @Override
                    public UserReadingTime merge(UserReadingTime userReadingTime, UserReadingTime acc1) {
                        return null;
                    }
                }).addSink(new SinkFunction<UserReadingTime>() {
                    @Override
                    public void invoke(UserReadingTime value, Context context) throws Exception {
                        System.err.println("用户" + value.userId + " 阅读了 " + (value.endTime - value.startTime) + " ms");
                    }
                });
        environment.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserAction {
        public Long userId;
        public long time;

        public static UserAction mock() {
            return new UserAction(1L, System.currentTimeMillis());
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserReadingTime {
        public Long userId;
        public long startTime;
        public long endTime;
    }
}

运行Flink作业,控制台随机输出用户的阅读时长

用户1 阅读了 3240 ms
用户1 阅读了 9414 ms
用户1 阅读了 138 ms
用户1 阅读了 2960 ms

时间语义

时间语义和时间窗口息息相关。

Flink 提供了三种不同的时间语义,分别是:处理时间、事件时间、摄入时间。

在不同的时间语义下,针对同样的数据,Flink 分配的时间窗口是不一样的。

举个例子,我们要统计某个商品过去1分钟的销量,这是个典型的一分钟大小的时间窗口。用户在 09:00:50 下了一笔订单,中间由于网络延时等原因,Flink 在 09:01:01 才收到这笔订单数据,恰巧此时 Flink 因为自身作业压力宕机崩溃,在 09:02:10 才恢复作业,该笔订单数据随即被 keyBy 分组发送给下游算子处理。

这个例子中的三个时间点,刚好对应了 Flink 的三种时间语义:

  • 事件时间:事件发生的时间,通常数据本身会携带一个时间戳,即例子中的 09:00:50
  • 摄入时间:Flink 数据源接收数据的subTask算子本地时间,即例子中的 09:01:01
  • 处理时间:Flink 算子处理数据的机器本地时间,即例子中的 09:02:10

事件时间

事件时间是最常用的,在事件时间语义下,数据本身通常会携带一个时间戳,Flink 会根据该时间戳为数据分配正确的时间窗口。

因为事件时间是不会改变的,所以在事件时间语义下,Flink 窗口计算的结果始终是一致的,数据是清晰明确的。

但是,事件时间语义 会带来另一个问题。事件的产生是顺序的,但是数据在传输过程中,可能会因为网络拥塞等种种原因,到达 Flink 时乱序了。此时,Flink 如何处理这些乱序数据就是个麻烦事儿了。

举个例子,还是统计商品过去1分钟的销量,Flink 先是接收到事件时间为 09:00:30 的订单数据,此时将其分配到 [09:00,09:01] 窗口缓存起来,接着接收到了 09:01:30 的订单数据,此时 [09:00,09:01] 窗口可以关闭并计算了吗?显然不能,因为数据乱序到达的原因,谁也不能保证 Flink 待会不会收到 09:00 分钟产生的订单。

那怎么办呢?[09:00,09:01] 窗口总不能一直不关闭吧。为了解决这个问题,Flink 引入了 Watermark 机制,这里不做介绍。

使用事件时间对应的窗口分配器是:

  • TumblingEventTimeWindows 基于事件时间的滚动窗口
  • SlidingEventTimeWindows 基于事件时间的滑动窗口
  • EventTimeSessionWindows 基于事件时间的会话窗口

如下示例,每秒生成一个带时间戳的随机数,数据用 Flink 自带的 Tuple2 封装,同时用 TumblingEventTimeWindows 让 Flink 基于事件时间语义来分配 5秒 的滚动窗口。运行 Flink 作业,控制台每隔5秒会输出前5秒的随机数之和。

public class TumblingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(new SourceFunction<Tuple2<Long, Long>>() {
                    @Override
                    public void run(SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            // f0是随机数 f1是时间戳
                            Tuple2<Long, Long> tuple2 = new Tuple2<>(ThreadLocalRandom.current().nextLong(100), System.currentTimeMillis());
                            sourceContext.collectWithTimestamp(tuple2, tuple2.f1);
                            sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(5L)))
                .sum(0)
                .print();
        environment.execute();
    }
}

控制台输出

// subTask任务ID 数字和 时间戳
19> (108,1722432788302)
20> (308,1722432790305)
21> (324,1722432795346)

总结一下,如果业务要按照事件发生的时间计算结果或分析数据,那么只能选事件时间语义。通常情况下,事件时间也确实更有价值。例如,利用Flink分析用户的行为日志,用户具体在什么时间点做了哪些行为,会更有分析价值,至于 Flink 是什么时候处理这些日志的,对业务方来说并不重要。因为事件时间具有不变性,所以基于事件时间统计的结果总是清晰明确的,缺点是数据到达Flink是乱序的,处理迟到数据会给Flink带来一定的压力。

摄入时间

摄入时间是指数据到达 Flink Source 算子的本地机器时间,它为处理数据流提供了一种相对简单而直观的时间参考,算是在 事件时间 和 处理时间 中间做了一个折中。

摄入时间具备一定的优势。一方面,它避免了事件时间的乱序问题,相较于事件时间具备更高的处理效率;另一方面,相较于处理时间而言,它具备不变性,计算产生的结果也会更加准确。

摄入时间适用于那些对时间精度要求不是特别高,但又希望时间能够相对反映数据进入系统先后顺序的场景。

如下示例,使用摄入时间语义计算过去5秒窗口生成的随机数之和。因为用的是摄入时间,所以无须发送 Watermark,数据本身也无须携带时间戳。

public class IngestionTimeFeature {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 采用摄入时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        environment.addSource(new SourceFunction<Tuple1<Long>>() {
                    @Override
                    public void run(SourceContext<Tuple1<Long>> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            sourceContext.collect(new Tuple1<>(ThreadLocalRandom.current().nextLong(100)));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).keyBy(IN -> "all").timeWindow(Time.of(5L, TimeUnit.SECONDS))
                .sum(0)
                .print();
        environment.execute();
    }
}

处理时间

处理时间语义是指数据实际被处理的时间,也就是数据到达Window算子时subTask机器的本地时间。

因为 处理时间语义 完全依靠算子的机器本地时间,所以时间窗口在划分数据和触发计算,都只需要依靠本地时间来驱动,性能是最好的,延迟低,适用于对高性能和延迟敏感的业务。

同样的,处理时间语义也有它的劣势。因为采用的是subTask算子的本地时间,所以数据的时间其实是具备不确定性的。举个例子,订单数据在 09:00:01 被算子接收,它会被分配到 [09:00,09:01]窗口,假设此时该subTask作业故障宕机,等到 09:10:00 才恢复,Flink 重新消费这条数据,它又会被分配到 [09:10,09:11] 窗口,产出的数据就会不一致。因此在使用处理时间语义时,要保证业务方能接受这种因为异常情况导致的计算结果不符合预期的场景。

如下示例,采用处理时间语义,因为是采用subTask本地时间,所以同样也不需要发送 Watermark。

public class ProcessTimeFeature {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 采用处理时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        environment.addSource(new SourceFunction<Tuple1<Long>>() {
                    @Override
                    public void run(SourceContext<Tuple1<Long>> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            sourceContext.collect(new Tuple1<>(ThreadLocalRandom.current().nextLong(100)));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).windowAll(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5L)))
                .sum(0)
                .print();
        environment.execute();
    }
}

尾巴

Flink 具有丰富的时间语义,包括事件时间、处理时间和摄入时间。事件时间基于数据本身携带的时间戳,处理时间基于系统处理数据的本地时钟,摄入时间则是数据进入 Flink Source算子的时间。

时间窗口是 Flink 处理流式数据的重要方式,Flink 提供了 滚动窗口、滑动窗口、会话窗口 三种窗口类型。滚动窗口有固定大小且不重叠,滑动窗口大小固定且可重叠,会话窗口根据数据间隔来划分。合理选择时间语义和时间窗口,能更准确有效地处理和分析流式数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2219163.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法笔记day05

目录 1.最小公倍数 2.最长连续的子序列 3.字母收集 1.最小公倍数 求最小公倍数_牛客题霸_牛客网 算法思路&#xff1a; 这就是一道数学题&#xff0c;a,b的最小公倍数 a * b / 最大公约数。 使用辗转相除法&#xff0c;求a&#xff0c;b的最大公约数。 #include <iostre…

比亚迪车机安装第三方应用教程

比亚迪车机安装第三方应用教程 比亚迪车机U盘安装APP&#xff0c; 无论是dlink3.0还是4.0都是安卓系统&#xff0c;因此理论上安卓应用是都可以安装的&#xff0c;主要就是横屏和竖屏的区别。在比亚迪上安装软件我主要推荐两种方法。 第一种&#xff0c;直接从电脑端下载安装布…

一次使用LD_DEBUG定位问题的经历

在实际工作中&#xff0c;当遇到段错误&#xff0c;我们会很容易的想到这是非法访问内存导致的&#xff0c;比如访问了已经释放的内存&#xff0c;访问数据越界&#xff0c;尝试写没有写权限的内存等。使用gdb进行调试&#xff0c;查看出异常的调用栈&#xff0c;往往可以定位到…

RTThread-Nano学习二-RT-Thread启动流程

一、简介 上一章&#xff0c;我们已经了解了如何通过MDK来移植RTT&#xff0c;不熟悉的可以看如下链接&#xff1a;RTThread-Nano学习一-基于MDK移植-CSDN博客本章我们就来继续了解一下&#xff0c;RTT的启动流程。 二、启动流程 官方给了一幅非常清晰的启动流程图&am…

11.学生成绩管理系统(Java项目基于SpringBoot + Vue)

目录 1.系统的受众说明 2 总体设计 2.1 需求概述 2.2 软件结构 3 模块设计 3.1 模块基本信息 3.2 功能概述 3.3 算法 3.4 模块处理逻辑 4 数据库设计 4.1 E-R图 4.2 表设计 4.2.1 管理员信息表 4.2.2 课程基本信息表 4.2.3 课程扩展信息表 4.2.4 专业信…

Cuda By Example - 8 (性能测量)

时间戳记录API 使用constant内存&#xff0c;究竟带来多少性能提升&#xff0c;如何尽可能精确的测量GPU完成某项任务所花的时间&#xff1f;CUDA提供了cudaEvent_t 以及 CUDA event API来做运行时间的测量。 cudaError_t cudaEventCreate(cudaEvent_t *event); cudaError_t c…

架构设计笔记-22-论文

1.论企业应用系统的数据持久层架构设计 2.论企业信息化规划的实施与应用 3.论企业应用系统的分层架构风格 4.论分布式存储架构系统设计 5.论云原生架构及其应用 6.论企业集成架构设计及应用 7.论数据湖技术及其应用 8.论系统安全架构设计及其应用 9.论企业集成平台的理解与应用…

【双指针算法】快乐数

1.题目解析 2.算法分析 由图可知&#xff0c;不管是最后可以变成1的还是不可以变成1的都相当于形成环了&#xff0c;只是成环处值不一样 问题转变成&#xff0c;判断链表是否有环 采用双指针&#xff0c;快慢指针算法 1.定义快慢指针2.慢指针每次向后移动一步&#xff0c;快…

ES-入门-javaApi-文档-新增-删除

新增指定索引的文档数据的代码如下&#xff1a; package com.atgulgu.es.test;import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRe…

UNI VFX Missiles Explosions for Visual Effect Graph

Unity URP和HDRP的通用视觉效果 使用在视觉效果图中制作的高性能GPU粒子系统。 无需进入视觉效果图编辑器即可轻松自定义VFX。 使用(VFX)事件——一个游戏对象可存储多个效果,这些效果可通过C#或视觉脚本触发。 总共32个事件(不包括“停止”事件)。 ❓ 什么是(VFX)事件?…

STM32Cubemx 配置ADC(HAL库)

一、ADC几种模式 1、扫描模式&#xff1a; 使用STM32CUBEMX配置了多通道后&#xff0c;这一项默认开启且无法设置成关闭。这个模式就是自动扫描你开启的所有通道进行转换&#xff0c;直至转换完。例如你开启了CH0、CH1、CH2、CH3这四个通道&#xff0c;启动转换后ADC会自动将这…

动态规划原理及算法题(1)

课程规划会分为四个阶段进行&#xff1a; 1.题目解析 2.讲解算法原理(动态规划的原理) 3.编写代码 4.空间优化 1. 第 N 个泰波那契数&#xff08;easy&#xff09; 泰波那契数相当于斐波那契数的孪生兄弟&#xff0c;是它的加强版。 1.题目解析 2.讲解算法原理 如果用动态规…

Java中的一些名词概念

**函数式接口:** 概念&#xff1a;一个接口中的抽象方法只有一个&#xff0c;那么这个接口就是一个函数式接口。形参: 形参变量是**功能函数里的变量**&#xff0c;只有<u>在被调用的时候才分配内存单元</u>&#xff0c;<u>调用结束后立即释放</u>。…

Unity使用Git及GitHub进行项目管理

git: 工作区,暂存区(存放临时要存放的内容),代码仓库区1.初始化 git init 此时展开隐藏项目,会出现.git文件夹 2.减小项目体积 touch .gitignore命令 创建.gitignore文件夹 gitignore文件夹的内容 gitignore中添加一下内容 # This .gitignore file should be place…

Java项目-基于Springboot的应急救援物资管理系统项目(源码+说明).zip

作者&#xff1a;计算机学长阿伟 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、ElementUI等&#xff0c;“文末源码”。 开发运行环境 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBoot、Vue、Mybaits Plus、ELementUI工具&#xff1a;IDEA/…

Ubuntu20.04编译安卓aosp 15源码编译到模拟器运行

背景&#xff1a; aosp15也开始悄悄在各个手机厂商开始酝酿了&#xff0c;感叹时间很快&#xff0c;今天也准备针对aosp15进行一下源码环境的搭建&#xff0c;整体aosp15的搭建和13/14其实没啥大的差别&#xff0c;只不过在lunch目标这个地方确实很大不同&#xff0c;还有就是…

HCIP-HarmonyOS Application Developer 习题(十四)

&#xff08;多选&#xff09;1、HarmonyOs为应用提供丰富的Al(Artificial Intelligence)能力&#xff0c;支持开箱即用。下列哪些是它拥有的AI能力? A、通用文字识别 B、词性标注 C、实体识别 D、语音播报 答案&#xff1a;ABCD 分析&#xff1a; AI能力简介二维码生成根据开…

工业级三防平板在工厂极端环境下保障稳定运行

在现代工业环境中&#xff0c;尤其是在工厂车间&#xff0c;设备和技术的稳定性直接关系到生产效率与产品质量。然而&#xff0c;极端的工作条件常常给电子设备的使用带来了不小的挑战。为此&#xff0c;市场上出现了一种专为工业应用设计大尺寸手持三防平板电脑。这种设备以其…

2024年十大前沿图像分割模型汇总:工作机制、优点和缺点介绍

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

antd vue 输入框高亮设置关键字

<highlight-textareaplaceholder"请输入主诉"type"textarea"v-model"formModel.mainSuit":highlightKey"schema.componentProps.highlightKey"></highlight-textarea> 参考链接原生input&#xff0c;textarea demo地址 …