flink的窗口

news2025/3/12 23:44:25

目录

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

2.计数窗口(Count window)

2.按照窗口分配数据的规则分类

窗口API分类

API调用

窗口分配器器:

窗口函数

增量聚合函数:

全窗口函数

flink sql 窗口函数


窗口是flink中重要的概念,为了方便高效的处理无界流,将数据切成有限的数据块进行处理;

窗口 | Apache Flink

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

    时间窗口以时间点定义窗口的开始和结束,因此截取出就是某一段时间的数据。当到达结束时间时窗口不在接受数据,触发计算输出结果,并关闭销毁窗口。

flink有一个专门的类用来表示时间窗口TimeWindow,这个类只有两个私有属性;窗口的方法获取最大时间戳为end-1,因此窗口[start,end)  左开右闭;

@PublicEvolving
public class TimeWindow extends Window {

    private final long start;
    private final long end;

    public TimeWindow(long start, long end) {
        this.start = start;
        this.end = end;
    }
    @Override
    public long maxTimestamp() {
        return end - 1;
    }
2.计数窗口(Count window)

计数窗口是基于元素个数截取,在到达固定个数是就触发计算并关闭窗口。

3.全局窗口(Global Windows)

是计数窗口的底层实现,窗口分配器由GlobalWindows类提供,需要自定义触发器实现窗口的计算;

 stream.keyBy(data -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
//                .max()
                .aggregate(new AvgPv())
                .print();

查看源代码,windou函数后见windowStrream时获取默认的触发器
@PublicEvolving
    public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {

        this.input = input;

        this.builder =
                new WindowOperatorBuilder<>(
                        windowAssigner,
                        windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),  //湖区触发器
                        input.getExecutionConfig(),
                        input.getType(),
                        input.getKeySelector(),
                        input.getKeyType());
    }

// 计数窗口底层采用全局窗口加计数器来实现

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
        return window(GlobalWindows.create())
                .evictor(CountEvictor.of(size))
                .trigger(CountTrigger.of(slide));
    }

2.按照窗口分配数据的规则分类

滚动窗口(Tumbling Window):窗口大小固定,窗口没有重叠;

滑动窗口 (Sliding Window):滑动窗口有重叠,也可以没有重叠,如果窗口size和滑动size相等,等于滚动窗口;

会话窗口 (Session Window):基于会话对窗口进行分组,与其他两个不同的是,会话窗口是借用会话窗口的超时失效机触发窗口计算,当数据到来后会开启一个窗口,如果在超时时间内有数据陆续到来,窗口不会关闭,反之会关闭;极端情况,如果数据总能在窗口超时时间到达前远远不断的到来,该窗口会一直开启不会关闭;

全局窗口 (Global Window):比较通用的窗口,该窗口会把数据分配到一个窗口中,窗口为全局有效,会把相同key的数据分配到同一个窗口中,默认不会触发计算,跟没有窗口一样,需要自定义触发器才能使用;

窗口API分类

窗口大的分类可以分为按键分区和非按键分区两种:按键分需要经过keyby操作,会把数据进行分发,实现负载均分,可以并行处理更大的数据量。而非按键分区窗口,相当于并行度为1,使用上直接调用windowall(),因此一般并不推荐使用;

API调用

窗口操作分为窗口分配器(window Assigners)和窗口函数(window function)两部分,窗口分配器用于构建窗口,确定窗口类型,确定数据划分哪一个窗口,窗口函数制定数据的计算规则;

窗口分配器器:

窗口按照时间可以划分为:滚动、滑动和session,三种类型窗口;

窗口计数划分:滚动和滑动两种类型;

  eventStream.keyBy(data -> data.url)
                        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                        .aggregate();
窗口函数

窗口定义分配器后,知道数据属于哪一个窗口,窗口函数制定数据计算的规则;

增量聚合函数:

数据进入窗口会参与计算,窗口结束前只需要保留一个聚合后的状态值,内存压力小。

1.规约函数(ReduceFunction):数据保存留一个状态,输入类型和输出类型必须一致,来一条数据会处理将数据合并到状态中;

 stream.keyBy(r -> r.f0)
                // 设置滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 定义累加规则,窗口闭合时,向下游发送累加结果
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                })
                .print();

sum、max、min等底层都是通过同名AggregateFunction实现(非下面的聚合函数),本质还是实现ReduceFunction结构重写了reduce方法;

2.聚合函数(AggrateFunction):在规约函数基础上进行完善。解决输出和输入类型必须一致的限制问题。实现应用更灵活;

  // 所有数据设置相同的key,发送到同一个分区统计PV和UV,再相除
        stream.keyBy(data -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
                .aggregate(new AvgPv())
                .print();
 public static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {
        @Override
        public Tuple2<HashSet<String>, Long> createAccumulator() {
            // 创建累加器
            return Tuple2.of(new HashSet<String>(), 0L);
        }

        @Override
        public Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {
            // 属于本窗口的数据来一条累加一次,并返回累加器
            accumulator.f0.add(value.user);
            return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);
        }

        @Override
        public Double getResult(Tuple2<HashSet<String>, Long> accumulator) {
            // 窗口闭合时,增量聚合结束,将计算结果发送到下游
            return (double) accumulator.f1 / accumulator.f0.size();
        }

        @Override
        public Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {
            return null;
        }
    }
全窗口函数

全窗口函数会将进入窗口的数据先进行缓存,然后在窗口关闭时一起计算,缓存数据会占用内存资源,如果一个窗口数据量太大时,可能出现内存溢出的问题;

全窗口函数可以划分窗口函数(windowFunction)和处理窗口函数(processWindowFunction)两种;

窗口函数(windowFunction):老版本通用窗口接口,window()后调用apply(),传入实现windowFunction接口; 缺点是不能获取上下文信息,也没有更高级的功能。因为在功能上可以被processWindowFunction全覆盖,因此主键被弃用

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());



@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param window The window that is being evaluated.
     * @param input The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

处理窗口函数(processWindowFunction):是窗口API中最底层通用的窗口函数接口,可以获取到上问对象(context),实现为调用process方法传入自定义继承ProcessWindowFunction类;

input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}

注意:一般增量窗口函数和全量窗口函数可以一起使用,window().aggregate()方法可以传入两个函数,第一个采用增量聚合函数,第二个传入全量函数,这样数据在进入窗口会触发增量计算,窗口不会缓存数据。当窗口关闭触发计算时,结果数据穿度到全量计算,参数Iterable中一般只有一个数据;

aggregate(acct1,acct2)

flink sql 窗口函数

flink sql 窗口也包含常见的滚动窗口、滑动窗口、session窗口,但还有一种累计窗口。

在flink1.13版本后flinksql支持累计窗口CUMULATE,可以实现没5分钟触发一次计算,输出当天的累计数据,使用样例

SELECT 
    cast(PROCTIME() as timestamp_ltz) as window_end_time,
    manufacturer_name,
    event_id,
    case when state is null then -1 else state end ,
    cast(sum(agg)as string ) as agg
FROM TABLE(CUMULATE(
       TABLE dm_cumulate
       , DESCRIPTOR(ts1)
       , INTERVAL '5' MINUTES
       , INTERVAL '1' DAY(9)))
GROUP BY
          window_end
         ,window_start
         ,manufacturer_name
         ,event_id
         ,case when state is null then -1 else state end

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1867948.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

llamafactory-llama3微调中文数据集

一、定义 https://github.com/SmartFlowAI/Llama3-Tutorial/tree/main 基准模型测试opencompass 离线测评数据准备微调训练合并测试人工审核对比 二、实现 基准模型测试 基准模型 llama3-8b https://zhuanlan.zhihu.com/p/694818596? https://github.com/SmartFlowAI/Llam…

什么样的台灯适合学生使用?五款暑假必入护眼大路灯分享

什么样的台灯适合学生使用&#xff1f;现在近视越来越低龄化&#xff0c;戴眼镜的小朋友越来越多&#xff0c;每每看着自己孩子眼睛贴到作业本上写作业&#xff0c;我的心都会提到嗓子眼。去医院一检查&#xff0c;果然&#xff0c;远视储备即将告罄&#xff0c;必须要防护了&a…

深度剖析:前端如何驾驭海量数据,实现流畅渲染的多种途径

文章目录 一、分批渲染1、setTimeout定时器分批渲染2、使用requestAnimationFrame()改进渲染2.1、什么是requestAnimationFrame2.2、为什么使用requestAnimationFrame而不是setTimeout或setInterval2.3、requestAnimationFrame的优势和适用场景 二、滚动触底加载数据三、Elemen…

【项目实训】解决前后端跨域问题

由于前端框架使用vue&#xff0c;后端使用flask&#xff0c;因此需要解决前后端通信问题 在vue.config.js中修改 module.exports defineConfig({transpileDependencies: true,lintOnSave:false, }) // 跨域配置 module.exports {devServer: { //记住&#x…

2024 年适用于 Windows 11/10/8/7 的最佳 SSD 磁盘克隆软件

磁盘克隆软件对于用户在发生数据灾难时保证数据/系统安全至关重要。克隆软件可以创建驱动器的副本并保持数据相同。如果发生数据灾难&#xff0c;您可以设置克隆驱动器以克隆回数据/驱动器。或者您可以直接使用克隆的驱动器继续工作。 除了传统的 HDD&#xff0c;Windows 11/1…

使用nvm切换node版本时报错:exit status 1解决办法

作者介绍&#xff1a;计算机专业研究生&#xff0c;现企业打工人&#xff0c;从事Java全栈开发 主要内容&#xff1a;技术学习笔记、Java实战项目、项目问题解决记录、AI、简历模板、简历指导、技术交流、论文交流&#xff08;SCI论文两篇&#xff09; 上点关注下点赞 生活越过…

C++学习/复习20--继承的权限/向上转换/重定义/默认成员函数/友元/静态成员/菱形虚拟继承/组合

一、继承的概念 二、继承的权限 三、向上转换 四、重定义(隐藏) 五、派生类的默认成员函数 六、继承与友元 七、继承与静态成员 八、菱形继承 数据冗余与二义性 虚拟继承(virtual) 九、继承组合

【ARM】MCU和SOC的区别

【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 了解SOC芯片和MCU芯片的区别 2、 问题场景 用于了解SOC芯片和MCU芯片的区别&#xff0c;内部结构上的区别。 3、软硬件环境 1&#xff09;、软件版本&#xff1a;无 2&#xff09;、电脑环境&#xff1a;无 3&am…

Java洗鞋预约小程序源码

&#x1f4a5;洗鞋神器来袭&#xff01;轻松预约&#xff0c;让你的鞋子焕然一新&#x1f45f; &#x1f389; 告别洗鞋烦恼&#xff0c;洗鞋预约小程序来啦&#xff01; 你是不是常常为洗鞋而烦恼&#xff1f;手洗太累&#xff0c;送去洗衣店又贵又麻烦。现在&#xff0c;好…

数据恢复篇:如何从 Mac 硬盘安全恢复丢失的文件

Mac RAID 阵列用于大存储。Mac RAID 上的数据丢失可能很复杂。一般来说&#xff0c;从 Mac RAID 硬盘恢复已删除的文件并不困难。但如果​​您想从 Mac RAID 硬盘恢复由于格式化、病毒感染、硬盘故障而丢失的文件&#xff0c;情况就会发生变化。您必须找到一个功能强大的 Mac R…

基于Java微信小程序校园自助打印系统设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f;感兴趣的可以先收藏起来&#xff0c;还…

如何制作自己的网站

制作自己的网站可以帮助个人或组织在互联网上展示自己的品牌、作品、产品或服务。随着技术的发展&#xff0c;现在制作网站变得越来越简单。下面是一个简单的步骤指南&#xff0c;帮助你制作自己的网站。 1. 确定你的网站需求和目标 在开始之前&#xff0c;你需要明确你的网站的…

【STM32】看门狗

1.看门狗简介 看门狗起始就是一个定时器&#xff0c;从功能上说它可以让微控制器在程序发生意外&#xff08;程序进入死循环或跑飞&#xff09;的时候&#xff0c;能重新恢复到系统刚上电状态&#xff0c;以保障系统出问题的时候可以重启一次。说的简单一点&#xff0c;看门狗…

【机器学习】K-means++: 一种改进的聚类算法详解

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 K-means: 一种改进的聚类算法详解引言1. K-means算法回顾1.1 基本概念1.2 局限性…

2024智能驾驶兴趣人群研究报告

来源&#xff1a;百分点舆情中心 近期历史回顾&#xff1a; 劳动力效能提升指引白皮书》人效研究院.pdf 【标准】企业ESG管理体系(T-CERDS 5—2023).pdf 【实用标准】GB_T 43868-2024 电化学储能电站启动验收规程.pdf 【实用模板】用户侧新型储能项目管理流程图及备案资料清单…

搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者

系列文章目录 文章目录 系列文章目录前言一、本文要点二、开发环境三、原项目四、修改项目五、测试一下五、小结 前言 本插件稳定运行上百个kafka项目&#xff0c;每天处理上亿级的数据的精简小插件&#xff0c;快速上手。 <dependency><groupId>io.github.vipjo…

【数据同步】什么是ETL增量抽取?

目录 一、什么是ETL增量抽取 二、企业如何应用ETL增量抽取 三、如何进行ETL增量抽取 1.基于时间戳的增量抽取 2.基于主键的增量抽取 在当今信息化时代&#xff0c;数据的快速增长和多样化使得企业面临着巨大的数据管理挑战。为了高效地处理和利用数据&#xff0c;ETL&#xff0…

每日一题——Python实现PAT乙级1058 选择题(举一反三+思想解读+逐步优化)6千字好文

一个认为一切根源都是“自己不够强”的INTJ 个人主页&#xff1a;用哲学编程-CSDN博客专栏&#xff1a;每日一题——举一反三Python编程学习Python内置函数 Python-3.12.0文档解读 目录 我的写法 代码点评 时间复杂度分析 空间复杂度分析 总结 我要更强 空间复杂度优…

【学习】开发板接口

工作用到机器的开发板 有如上三个接口 。最右是仿真器&#xff0c;中间是RS232串口&#xff0c;最左是电源线 仿真器 这个是仿真器 接入机器那端用的是SWD模式&#xff0c;另一端通过USB接电脑&#xff08;这小肥手拍的怪好看&#xff09;仿真口连接了四条线分别是 VCC&#…

gsap动画库对threejs模型的应用

前言 公司的一个3D编辑器项目&#xff0c;要在three模型上加一些补间动画。做了一些调研&#xff0c;最终选择了gsap&#xff0c;其丰富的缓动函数&#xff0c;强大的动画效果和兼容性&#xff0c;更适合公司的需求。 查看gsap文档&#xff0c;发现所有的例子都是针对dom元素…