【Flink】Flink 中的时间和窗口之窗口其他API的使用

news2024/9/27 12:17:07

1. 窗口的其他API简介

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,可以更加灵活地控制窗口行为。

1.1 触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的"触发计算"本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于WindowsStream调用.trigger()方法,参数传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)
 .window(...)
 .trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTriggerCountTrigger。所以一般情况下是不需要自定义触发器的。

Trigger是一个抽象类,自定义时必须实现以下四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法。
  • onEventTime():当注册的事件时间定时触发时,将调用这个方法。
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

除了 clear()比较像生命周期方法,其他三个方法其实都是对某种事件的响应。onElement()是对流中数据元素到来的响应;而另两个则是对时间的响应。这些方法参数中都有一个“触发器上下文”(TriggerContext)对象,可以用来注册定时器回调(callback)。对于时间窗口(TimeWindow)而言,就是在窗口的结束时间设定了一个定时器,这样到时间就可以触发窗口的计算输出了。

另外这三个方法的返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。并且TriggerResult的返回结果可以让计算输出结果和关闭窗口分开执行。

示例:
在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 创建自定义数据源的实时流
        DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        })
                );
        stringDataStreamSource.keyBy(Event::getUrl)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .trigger(new MyTrigger())
                .process(new WindowResult())
                .print();

        stringDataStreamSource.print("data");
        env.execute();
    }

    public static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {

        @Override
        public void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {
            out.collect(new UrlViewCount(s, elements.spliterator().getExactSizeIfKnown(), context.window().getStart(), context.window().getEnd()));
        }
    }

    private static class UrlViewCount {
        private String s;
        private long size;
        private long start;
        private long end;

        public UrlViewCount(String s, long size, long start, long end) {
            this.s = s;
            this.size = size;
            this.start = start;
            this.end = end;
        }

        @Override
        public String toString() {
            return "UrlViewCount{" +
                    "s='" + s + '\'' +
                    ", size=" + size +
                    ", start='" + start + '\'' +
                    ", end='" + end + '\'' +
                    '}';
        }
    }

    public static class MyTrigger extends Trigger<Event, TimeWindow> {

        @Override
        public TriggerResult onElement(Event element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            ValueState<Boolean> isFirstEvent = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
            if (isFirstEvent.value() == null) {
                for (long i = window.getStart(); i < window.getEnd() ; i = i + 1000L) {
                    ctx.registerEventTimeTimer(i);
                }
                isFirstEvent.update(true);
            }
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
            ValueState<Boolean> isFirstEvent = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
            isFirstEvent.clear();
        }
    }

输出结果:
在这里插入图片描述

1.2 移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。

1.3 允许延迟(Allowed Lateness)

在事件时间语义下,窗口会出现迟到的数据。之所以出现迟到数据是因为在乱序流中,水位线并一定能保证时间戳更早的所有数据不会再出现,当水位线已经到达窗口的结束时间时,窗口触发计算并输出结果,这时一般就要销毁窗口了;如果还有本该属于这个窗口的数据到达,默认情况下会被丢弃。

大多数情况下直接丢弃数据会导致统计结果不准,为了解决迟到数据的问题,Flink提供了一个特殊接口,可以为窗口算子设置一个"允许最大延迟"(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。

基于 WindowedStream 调用.allowedLateness()方法,传入一个Time类型的延迟时间,就可以表示允许这段时间内的延迟数据。

stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .allowedLateness(Time.minutes(1))

比如上面的代码中,我们定义了 1 小时的滚动窗口,并设置了允许 1 分钟的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点的窗口,本来应该是水位线到达 9 点整触发计算并关闭窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果,并不会关窗。后续到达的数据,只要属于 8 点~9 点窗口,依然可以在之前统计的基础上继续叠加,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分,这时就真正清空状态、关闭窗口,之后再来的迟到数据就会被丢弃了。

从这里可以看到,窗口的触发计算(Fire)清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

1.4 将迟到的数据放入侧输出流

对于处理迟到数据,仅仅提供延迟时间还是会出现迟到的数据,所以Flink提供了另外一种方式处理迟到数据。可以将迟到数据放到侧输出流(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。

基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以OutputTag的类型与流中数据类型相同。

DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

这里注意,getSideOutput()SingleOutputStreamOperator的方法,获取到的侧输出流数据
类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1538905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法系列--动态规划--子序列(2)

&#x1f495;"你可以说我贱&#xff0c;但你不能说我的爱贱。"&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;算法系列–动态规划–子序列(2) 今天带来的是算法系列--动态规划--子序列(2),包含了关于子序列问题中较难的几道题目(尤其是通过二维状…

uni-app打包证书android

Android平台打包发布apk应用&#xff0c;需要使用数字证书&#xff08;.keystore文件&#xff09;进行签名&#xff0c;用于表明开发者身份。 Android证书的生成是自助和免费的&#xff0c;不需要审批或付费。 可以使用JRE环境中的keytool命令生成。 以下是windows平台生成证…

springboot实现文件上传

SpringBoot默认静态资源访问方式 首先想到的就是可以通过SpringBoot通常访问静态资源的方式&#xff0c;当访问&#xff1a;项目根路径 / 静态文件名时&#xff0c;SpringBoot会依次去类路径下的四个静态资源目录下查找&#xff08;默认配置&#xff09;。 在资源文件resour…

极大提高工作效率的 Linux 命令

作为一名软件开发人员&#xff0c;掌握 Linux 命令是必不可少的技能。即使你使用 Windows 或 macOS&#xff0c;你总会遇到需要使用 Linux 命令的场合。例如&#xff0c;大多数 Docker 镜像都基于 Linux 系统。要进行 DevOps 工作&#xff0c;你需要熟悉Linux&#xff0c;至少要…

Redis中的缓存穿透

缓存穿透 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;导致这些请求直接到了数据库上&#xff0c;对数据库造成了巨大的压力&#xff0c;可能造成数据库宕机。 常见的解决方案&#xff1a; 1&#xff09;缓存无效 key 如果缓存和数据库中都查不到某…

【漏洞复现】WordPress Plugin NotificationX 存在sql注入CVE-2024-1698

漏洞描述 WordPress和WordPress plugin都是WordPress基金会的产品。WordPress是一套使用PHP语言开发的博客平台。该平台支持在PHP和MySQL的服务器上架设个人博客网站。WordPress plugin是一个应用插件。 WordPress Plugin NotificationX 存在安全漏洞,该漏洞源于对用户提供的…

校招免费资料大集合

通过以下资料&#xff0c;你可以免费获取到大量的校招资料和相关信息&#xff0c;帮助你更好地准备校园招聘。 学习交流群&#xff1a;进行计算机知识分享和交流&#xff0c;提供内推机会&#xff0c;QQ群号&#xff1a;325280438 夏沫Coding&#xff1a;致力于分享计算机干货…

STM32利用串口标准库发送字节,发送数组,发送字符串,发送数字,实现printf功能。

早晨到现在刚刚完成的功能&#xff1a;发送字节&#xff0c;发送数组&#xff0c;发送字符串&#xff0c;发送数字&#xff0c;实现printf功能。 当然这是建立在昨天学习使用串口发送数据的基础上&#xff0c;新建立的功能函数&#xff0c;咱们先来看看这次实验的结果吧&#…

AIGC:让生成式AI成为自己的外脑

前言 在数字化浪潮席卷全球的今天&#xff0c;人工智能&#xff08;AI&#xff09;已经渗透到了我们生活的方方面面。其中&#xff0c;生成式AI以其独特的魅力&#xff0c;正逐渐改变我们与世界的交互方式。AIGC&#xff08;人工智能生成内容&#xff09;作为生成式AI的重要应用…

LeetCode 热题 100 | 堆(二)

目录 1 什么是优先队列 1.1 优先队列与堆的关系 1.2 如何定义优先队列 1.3 如何使用优先队列 1.4 如何设置排序规则 2 347. 前 K 个高频元素 2.1 第 2 步的具体实现 2.2 举例说明 2.3 完整代码 3 215. 数组中的第 K 个最大元素 - v2 菜鸟做题&#xff0c;语…

cesium Clock JulianDate 日照分析

cesium在初始化的时候会自动把Clock对象挂载到容器上Clock内部以JulianDate维护时间&#xff0c;比北京时间慢8个小时&#xff0c;想显示北京时间需要计算时差JulianDate的日期部分和秒数部分是分开的 julianDayNumber&#xff1a;指整数天&#xff0c;记录从公元前4713年正午以…

基于SpringBoot实现WebSocket实时通讯的服务端和客户端

实现功能 服务端注册的客户端的列表&#xff1b;服务端向客户端发送广播消息&#xff1b;服务端向指定客户端发送消息&#xff1b;服务端向多个客户端发送消息&#xff1b;客户端给服务端发送消息&#xff1b; 效果&#xff1a; 环境 jdk&#xff1a;1.8 SpringBoot&#x…

社区热议!54.8k Star开源项目,GPT-4Free : 让GPT4免费不是梦

Hello&#xff0c;我是Aitrainee&#xff0c;GPT4Free就是最近传得沸沸扬扬的那个GPT4项目。大家都知道&#xff0c;虽然ChatGPT是免费的&#xff0c;但如果你想用到那些功能更强大的大模型&#xff0c;比如GPT-4、gemini-pro、claude&#xff0c;那就只能选择付费了。 但现在&…

在Linux搭建Emlog博客结合内网穿透实现公网访问本地个人网站

文章目录 前言1. 网站搭建1.1 Emolog网页下载和安装1.2 网页测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar临时数据隧道2.2.Cpolar稳定隧道&#xff08;云端设置&#xff09;2.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 3. 公网访问测试总结 前言 博客作为使…

【2024最新版,redis7】redis底层的10种数据结构

前言&#xff1a;本文redis版本&#xff1a;7.2.4 本文语雀原文地址&#xff08;首发更新&#xff09;&#xff1a;https://www.yuque.com/wzzz/redis/xg2cp37kx1s4726y 本文CSDN转载地址&#xff1a; https://blog.csdn.net/u013625306/article/details/136842107 1. 常见的数…

烯冷新能源邀您参观2024长三角快递物流展

参加企业介绍 宁波戈雷贝拓科技有限公司&#xff08;宁波烯冷新能源科技有限公司&#xff09;宁波烯冷新能源科技有限公司于2022年初成立&#xff0c;依托中国科学院宁波材料技术与工程研究所和国家石墨烯创新中心&#xff0c;公司开发产品包括&#xff1a;新能源制冷系统和集…

Mysql学习--深入探究索引和事务的重点要点与考点

꒰˃͈꒵˂͈꒱ write in front ꒰˃͈꒵˂͈꒱ ʕ̯•͡˔•̯᷅ʔ大家好&#xff0c;我是xiaoxie.希望你看完之后,有不足之处请多多谅解&#xff0c;让我们一起共同进步૮₍❀ᴗ͈ . ᴗ͈ აxiaoxieʕ̯•͡˔•̯᷅ʔ—CSDN博客 本文由xiaoxieʕ̯•͡˔•̯᷅ʔ 原创 CSDN …

一键入门Ubuntu22!

目录 一、安装 二、常用目录 三、常用指令 四、用户指令 五、ssh与scp 六、服务相关 七、Python与Pycharm 八、Vim编辑器 九、Ubuntu22下使用Mysql 十、Ubuntu22下使用mongodb 十一、Ubuntu22下使用redis Ubuntu是一个基于Debian的开源操作系统&#xff0c;由Canoni…

LeetCode每日一题——x 的平方根

x 的平方根OJ链接&#xff1a;69. x 的平方根 - 力扣&#xff08;LeetCode&#xff09; 题目&#xff1a; 思路&#xff1a; 乍一看题目只需要算一个数的平方根&#xff0c;根据我们之前学的C语言我们能很快的想到使用sqrt&#xff0c;pow这类的<math.h>库函数&#xf…

【计算机网络篇】数据链路层(2)封装成帧和透明传输

文章目录 &#x1f95a;封装成帧和透明传输&#x1f388;封装成帧&#x1f388;透明传输&#x1f5d2;️面向字节的物理链路使用字节填充的方法实现透明传输。&#x1f5d2;️面向比特的物理链路使用比特填充的方法实现透明传输。 &#x1f6f8;练习 &#x1f95a;封装成帧和透…