【Flink】窗口(Window)

news2025/1/18 1:59:20

窗口理解

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。

对窗口的正确理解
我们将窗口理解为一个一个的水桶,数据流(stream)就像水流,每个数据都会分发到对应的桶中,当达到结束时间时,对每个桶中收集的数据进行计算处理
在这里插入图片描述

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口

窗口的分类

按照驱动类型分

时间窗口(Time Window)

以时间来定义窗口的开始和结束,获取某一段时间内的数据(类比于我们的定时发车

计数窗口(Count Window)

计数窗口是基于元素的个数来获取窗口,达到固定个数时就计算并关闭窗口。(类比于我们的人齐才发车

按照窗口分配数据的规则分类

滚动窗口(Tumbling Window)

窗口之间没有重叠,也不会有间隔的首尾相撞状态,这样,每个数据都会被分到一个窗口,而且只会属于一个窗口。
滚动窗口的应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。
在这里插入图片描述

DataStream<T> input = ...;

// 滚动 event-time 窗口
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滚动 processing-time 窗口
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

滑动窗口(Sliding Windows)

滑动窗口大小也是固定的,但是窗口之间并不是首尾相接的,而是重叠的。
在这里插入图片描述

DataStream<T> input = ...;

// 滑动 event-time 窗口
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滑动 processing-time 窗口
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滑动 processing-time 窗口,偏移量为 -8 小时
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

会话窗口(Session Windows)

会话窗口,是基于“会话”(session)来对数据进行分组的,会话窗口只能基于时间来定义。
在这里插入图片描述

DataStream<T> input = ...;

// 设置了固定间隔的 event-time 会话窗口
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// 设置了动态间隔的 event-time 会话窗口
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // 决定并返回会话间隔
    }))
    .<windowed transformation>(<window function>);

// 设置了固定间隔的 processing-time session 窗口
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// 设置了动态间隔的 processing-time 会话窗口
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // 决定并返回会话间隔
    }))
    .<windowed transformation>(<window function>);

全局窗口

这种窗口对全局有效,会把相同的key的所有数据分配到同一个窗口中,这种窗口没有结束时间,默认不会触发计算,如果希望对数据进行处理,需要自定义“触发器”。
在这里插入图片描述

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法

滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。

stream.keyBy(...)
       .countWindow(10)
滑动计数窗口

与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...)
       .countWindow(103)

窗口函数(Window Functions)

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了
窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。

ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
    //v1 和v2是 2个相同类型的输入参数
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });

AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

ProcessWindowFunction

ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {
                // 上下文可以拿到window对象,还有其他东西:侧输出流 等等
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                long count = elements.spliterator().estimateSize();

                out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);
            }
        }).print();
        env.execute();

    }
}

增量聚合和全窗口函数的结合使用

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(
        ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) 

// ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(
        ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)

// AggregateFunction与WindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(
        AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)

// AggregateFunction与ProcessWindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(
        AggregateFunction<TACCV> aggFunction,
        ProcessWindowFunction<VRKW> windowFunction)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1228621.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Django-DRF用法】多年积累md笔记,第3篇:Django-DRF的序列化和反序列化详解

本文从分析现在流行的前后端分离Web应用模式说起&#xff0c;然后介绍如何设计REST API&#xff0c;通过使用Django来实现一个REST API为例&#xff0c;明确后端开发REST API要做的最核心工作&#xff0c;然后介绍Django REST framework能帮助我们简化开发REST API的工作。 全…

Ps:变换

可以向选区、整个图层、多个图层或图层蒙版应用变换 Transform&#xff0c;还可以向路径、矢量形状、矢量蒙版、选区边界或 Alpha 通道应用变换。 若要变换栅格&#xff08;像素&#xff09;图像&#xff0c;建议先将其转换为智能对象&#xff0c;以便进行非破坏性的变换。 Ps菜…

【Flink】核心概念:并行度与算子链

并行度&#xff08;Parallelism&#xff09; 当要处理的数据量非常大时&#xff0c;我们可以把一个算子操作&#xff0c;“复制”多份到多个节点&#xff0c;数据来了之后就可以到其中任意一个执行。这样一来&#xff0c;一个算子任务就被拆分成了多个并行的“子任务”&#x…

计算机毕业设计 基于SpringBoot的健身房管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解目录

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

3-docker安装centos7

CentOS7.9下安装完成docker后&#xff0c;后续我们可以在其上安装centos7系统。具体操作如下&#xff1a; 1.以root用户登录CentOS7.9服务器&#xff0c;拉取centos7 images 命令&#xff1a; docker pull centos:centos7 2.加载centos7 images并登录验证 命令&#xff1a;…

Linux调试器---gdb的使用

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C/C》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、gdb的背景 gdb&#xff0c;全称为GNU调试器&#xff08;GNU Debugger&#xff09;&#xff0c;是一个功能强大的源代码级调试工具&#xff0c;主要…

【C++】【Opencv】霍夫直线检测即cv::HoughLinesP()函数详解和示例

cv::HoughLinesP()&#xff08;函数霍夫直线&#xff09;功能分析是一种用于检测图像中直线的算法&#xff0c;它基于霍夫变换的原理。通过该算法&#xff0c;我们可以从图像中提取出直线信息&#xff0c;从而对图像进行分析和处理。主要经理边缘检测和霍夫直线处理两个步骤。本…

【LeetCode刷题-树】--100.相同的树

100.相同的树 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNode right) {* …

C/C++统计数 2021年12月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析

目录 C/C统计数 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C统计数 2021年12月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 给定一个数的序列S&#xff0c;以及一个区间[L, R], 求序列…

Codeforces Round 910 (Div. 2)(D~F)

1898D - Absolute Beauty 题意&#xff1a;给定长度为n的数组a和b&#xff0c;定义b数组的价值为&#xff0c;现可以交换一次b数组中的任意两个元素&#xff0c;求b数组的价值最大值。 思路&#xff1a;绝对值问题可以放在数轴上去解决。绝对值即为区间长度 观察上述三种情况&…

认识.NET Aspire:高效构建云原生应用的利器

简介 在几天前的.NET 8发布会上&#xff0c;来自微软的Glenn Condron和David Fowler为我们演示了.NET Aspire&#xff0c;在Visual Studio的帮助下&#xff0c;它展现出了惊人的开发效率。 短短的十分钟内&#xff0c;David现场演示了如何轻松创建了一个具有服务发现&#xf…

汽车标定技术--A2L格式分析

目录 1.A2L由来 2.A2L格式 2.1 PROJECT 2.2 MODULE中包含的内容 3. INCA和CANape兼容吗&#xff1f; 最近有朋友用Vector ASAP2Editor编译的A2L文件在INCA7.4中无法识别&#xff0c;我记得以前做的时候是可以识别的&#xff0c;难不成最近有什么变动吗&#xff1f;出于好…

JavaScript的学习,就这一篇就OK了!(超详细)

目录 Day27 JavaScript(1) 1、JS的引入方式 2、ECMAScript基本语法 3、ECMAScript 基本数据类型​编辑 3.1 数字类型 3.2 字符串 3.3 布尔值 3.4 空值&#xff08;Undefined和Null&#xff09; 3.5 类型转换 3.6 原始值和引用值 4、运算符 5、流程控制语句 5.1 分…

云原生微服务-理论篇

文章目录 分布式应用的需求分布式架构治理模式演进ESB 是什么&#xff1f;微服务架构 MSA微服务实践细节微服务治理框架sidercar 什么是service mesh&#xff1f;康威定律微服务的扩展性什么是MSA 架构&#xff1f;中台战略和微服务微服务总体架构组件微服务网关服务发现与路由…

ps找不到msvcp140.dll怎么办?亲测5个有效的修复方法分享

运行Photoshop时提示找不到MSVCP140.dll&#xff0c;这是因为计算机MSVCP140.dll文件丢失或者损坏。msvcp140.dll是微软Visual C 2015运行库的一部分&#xff0c;它包含了许多用于支持C运行的函数和类。当我们在使用某些程序时&#xff0c;如果这个程序依赖于msvcp140.dll&…

【c++随笔13】多态

【c随笔13】多态 多态性&#xff08;Polymorphism&#xff09;在面向对象编程中是一个重要概念&#xff0c;它允许以统一的方式处理不同类型的对象&#xff0c;并在运行时动态确定实际执行的方法或函数。一、什么是多态性&#xff1f;1、关键概念&#xff1a;C的多态性2、多态定…

构造函数,原型对象,实例对象

1.构造函数、原型对象、实例对象三者分别是什么&#xff1f; 构造函数&#xff1a;用来创建对象的函数&#xff0c;创建实例对象的模板 。构造函数的函数名尽量首字母大写(为了区分普通函数和构造函数)原型对象&#xff1a;每一个函数在创建的时候&#xff0c;系统都会给分配一…

【Android Jetpack】DataStore的介绍

DataStore Jetpack DataStore是一种数据存储解决方案&#xff0c;允许您使用协议缓冲区存储键值对或类型化对象。DataStore使用Kotlin协程和Flow以异步、一致的事务方式存储数据。 注意&#xff1a;如果您需要支持大型或复杂数据集、部分更新或参照完整性&#xff0c;请考虑使…

【算法挨揍日记】day28——413. 等差数列划分、978. 最长湍流子数组

413. 等差数列划分 413. 等差数列划分 题目描述&#xff1a; 如果一个数列 至少有三个元素 &#xff0c;并且任意两个相邻元素之差相同&#xff0c;则称该数列为等差数列。 例如&#xff0c;[1,3,5,7,9]、[7,7,7,7] 和 [3,-1,-5,-9] 都是等差数列。 给你一个整数数组 nums…

【wp】2023第七届HECTF信息安全挑战赛 Web

伪装者 考点&#xff1a;http协议flask的session伪造ssrf读取文件 首先根据题目要求就行伪造HTTP 这里不多说&#xff0c;比较基础 然后下面得到是个登入 页面&#xff0c;我们输入zxk1ing 得到 说什么要白马王子 &#xff0c;一眼session伪造 看到ey开头感觉是jwt 输入看看 得…