Flink 窗口函数

news2024/12/25 1:30:04

一、Window 概述

Flink 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无线数据为有限块进行处理的手段。

二、Window 分类

Window 可以分为两类:

  • CountWindow(计数窗口):按照指定的数据条数生成一个Window,与时间无关;
  • TimeWindow(事件窗口):按照时间生成 Window;

对于TimeWindow ,可以根据窗口实现原理的不同分成三类:

  • 滚动窗口(Tumbling Window)
  • 滑动窗口(Sliding Window)
  • 会话窗口(Session Window)

2.1、滚动窗口(Tumbling Window)

将数据依据固定的窗口长度对数据进行切片;

特点:时间对其、窗口长度固定、没有重叠;
在这里插入图片描述
适用场景:适合做 BI 统计(每个时间段的聚合计算)。

2.2、滑动窗口(Sliding Window)

滑动窗口是固定的窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成;

特点:时间对齐、窗口长度固定,可以重叠;
在这里插入图片描述
适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。

2.3、会话窗口(Session Window)

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无法对齐;

session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不在收到元素,即非活动间隔产生,那么这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的 session 窗口中去。
在这里插入图片描述

三、Window API

3.1、CountWindow

3.1.1、滚动窗口

默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

val minTempPerWindow: DataStream[(String, Double)] = dataStream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.countWindow(5)
.reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))

3.1.2、滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。

val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0)
//每当某一个 key 的个数达到 2 的时候,触发计算,计算最近该 key 最近 10 个元素的内容
val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyedStream.countWindow(10,2)
val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)

3.2、TimeWindow

3.1.1、滚动窗口

Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。

val minTempPerWindow = dataStream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
//或者指定TumblingEventTimeWindows
//.window(TumblingEventTimeWindows.of(Time.seconds(15)))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

3.1.2、滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。

val minTempPerWindow: DataStream[(String, Double)] = dataStream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15), Time.seconds(5))
//或者指定SlidingEventTimeWindows
//.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5)
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2))))

3.1.2、会话窗口

val minTempPerWindow: DataStream[(String, Double)] = dataStream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2))))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1832100.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用Python语言调用讯飞星火认知大模型接口实战指南

什么是API接口 API(应用程序编程接口)是一组规则,允许不同的软件系统相互通信。通过API,开发者可以访问外部系统的功能和数据,而无需了解其内部实现。 API接口就像一座桥梁,连接应用程序和服务。例如&…

车企高管组团“出道”,汽车营销已经Next level了?

汽车进入了“卷”老板、“卷”高管的时代! 谁能想到,雷军凭一己之力,在一定程度上重塑了汽车的竞争策略。价格战之外,车市又开启了流量之战。 云略曾在《雷军20天吸粉500w!……》一文中,提到继雷军之后&…

敏捷开发时代,彻底结束了

最近,我收到一位读者的私信,他最近“内耗”得非常厉害,他可能一时兴起把我的私信当作了吐槽箱。 他们公司一直实行敏捷的管理模式,复盘发现了一个问题:发布与迭代具有强相关性,一个迭代就发布一次&#xf…

网络安全 DVWA通关指南 SQL Injection(SQL注入)

DVWA SQL Injection 文章目录 DVWA SQL InjectionLowMediumHighImpossible SQL注入漏洞基本原理 Web应用程序对用户输入的数据校验处理不严或者根本没有校验,致使用户可以拼接执行SQL命令。 可能导致数据泄露或数据破坏,缺乏可审计性,甚至导致…

RockChip Android12 Settings一级菜单

一:概述 在之前的文章中对Android8.1 Settings的流程进行了说明,本章将针对Android12 Settings一级菜单的加载逻辑进行详细说明,Settings版本之间的差异不是很大,有兴趣的同学可自行学习,本文不在做赘述。 Android8.1 Settings说明:RockChip Android8.1 Settings-CSDN博…

浏览器开发公司Brave 将自己的搜索结果与其 Leo AI 助手集成

Brave Software是一家开发浏览器的公司,其主要产品是Brave浏览器。Brave浏览器基于Chromium项目开发,具有高性能和隐私保护的特点。此外,Brave浏览器还提供了“off record”模式,允许用户在不记录浏览历史的情况下使用浏览器。关于…

Cisco Packet Tracer实验(五)不同vlan间的通信简单配置

1.单臂路由(图) 环境:一台路由器,一台二层交换机,两台pc机 单臂路由(Single Arm Routing)是指在网络架构中,只有一个物理接口(单臂)连接到路由器三层交换机,而…

电脑微信聊天记录监控要怎么做?找谁找?

电脑微信聊天记录的监控通常涉及到使用特定的监控软件,这些软件设计用于企业管理和网络监控,以确保工作场所的通信安全和提高工作效率。以下是进行电脑微信聊天记录监控的一般步骤和建议: 如何进行监控: 1.明确目的与合法性&…

计算机组成原理之存储器(二)

文章目录 随机读写存储器RAM静态MOS存储单元与存储芯片动态MOS存储单元与存储芯片 半导体存储器逻辑设计存储器的读写以及刷新存储器的读写动态存储芯片的刷新 随机读写存储器RAM 静态MOS存储单元与存储芯片 静态RAM用半导体管的导通和截止来记忆,只要不掉电&#x…

Transformer中的Self-Attention和Multi-Head Attention

2017 Google 在Computation and Language发表 当时主要针对于自然语言处理(之前的RNN模型记忆长度有限且无法并行化,只有计算完ti时刻后的数据才能计算ti1时刻的数据,但Transformer都可以做到) 文章提出Self-Attention概念&…

python学习笔记-06

函数进阶 1.无参数无返回值:这类函数往往用于提示信息打印 2.无参数有返回值:这类函数往往用于数据采集过程中 3.有参数有返回值:这类函数一般是计算型的 4.有参数无返回值:这类函数多用于设置某些不需要返回值的参数设置1.局部变…

实验2:RIPv2的配置

由于RIPv1是有类别的路由协议,路由更新不携带子网信息,不支持不连续子网、VLSM、手工汇总和验证等,本书重点讨论RIPv2。 1、实验目的 通过本实验可以掌握: RIPv1和 RIPv2的区别。在路由器上启动RIPv2路由进程。激活参与RIPv2路由协议的接口。auto-sum…

一个提问高下立见?国产AI大模型冲上扣子广场PK

以“国产GPTs”出名的扣子,做出了GPT没有的功能。 6月12日,字节跳动旗下的AI应用开发平台“扣子”(Coze国内版)悄悄上线了新功能“模型广场”。 扣子是AI应用开发平台,无论用户是否有编程基础,都可以在扣子…

OpenTiny CCF开源创新大赛赛事指南,助力你赢取10W赛事奖金

第七届CCF开源创新大赛在国家自然科学基金委信息科学部的指导下,由中国计算机学会(CCF)主办,长沙理工大学、CCF 开源发展委员会联合承办。大赛面向国家“十四五”开源生态发展战略布局,聚焦“卡脖子”软件领域以及人工…

clickhouse学习笔记(四)库、表、分区相关DDL操作

目录 一、数据库操作 1、创建数据库 2、查询及选择数据库 3、删除数据库 二、数据表操作 1、创建表 2、删除表 3、基本操作 ①追加新字段 ②修改字段类型或默认值 ③修改字段注释 ④删除已有字段 ⑤移动数据表(重命名) ⑥清空表 三、默认值…

【leetcode刷题】面试经典150题 , 27. 移除元素

leetcode刷题 面试经典150 27. 移除元素 难度:简单 文章目录 一、题目内容二、自己实现代码2.1 方法一:直接硬找2.1.1 实现思路2.1.2 实现代码2.1.3 结果分析 2.2 方法二:排序整体删除再补充2.1.1 实现思路2.1.2 实现代码2.1.3 结果分析 三、…

day12--150. 逆波兰表达式求值+239. 滑动窗口最大值+ 347. 前 K 个高频元素

一、150. 逆波兰表达式求值 题目链接:https://leetcode.cn/problems/evaluate-reverse-polish-notation/description/ 文章讲解:https://programmercarl.com/0150.%E9%80%86%E6%B3%A2%E5%85%B0%E8%A1%A8%E8%BE%BE%E5%BC%8F%E6%B1%82%E5%80%BC.html 视频…

QT 的文件

QT 和C、linux 一样,也有自带的文件系统. 它的操作和C、c差不多,不过也需要我们来了解一下。 输入输出设备类 QObject 有一个子类,名为 QIODevice 类,如其名字,该类是管理所有输入输出设备的类。 比如文件、网络套…

Java获取本机IP地址的方法(内网、公网)

起因是公司一个springboot项目启动类打印了本机IP地址加端口号,方便访问项目页面,但是发现打印出来的不是“无线局域网”的ip而是“以太网适配器”ip,如下图所示 这样就导致后续本地起项目连接xxl-job注册节点的时候因为不在同个局域网下ping…

Arcgis投影问题

今天下载数据,右键查看属性,发现只有地理坐标系,在arcgis里面进行展示有点丑 怎么变成下面的? 步骤1:加载数据 打开ArcGIS Pro或ArcMap。在目录窗口中,右键点击“文件夹连接”或“文件夹”选项&#xff0c…