大数据-玩转数据-Flink Sql 窗口

news2025/1/22 21:10:49

一、说明

时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算了。Table API和SQL中,主要有两种窗口:分组窗口(Group Windows)和 含Over字句窗口(Over Windows)。

二、Group Windows

分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。

2.1、分组窗口中的滚动窗口

代码示例:

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

public class Sql_Group_Windows_List {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),
                new WaterSensor("sensor_1", 1000L, 100),
                new WaterSensor("sensor_1", 1000L, 100),
                new WaterSensor("sensor_2", 1000L, 100),
                new WaterSensor("sensor_2", 1000L, 100))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((element, recordtime) -> element.getTs()));
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv
                .fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

        table
                .window(Tumble.over(lit(10).second()).on($("ts")).as("w"))  // 定义滚动窗口并给窗口起一个别名
                .groupBy($("id"), $("w")) // 窗口必须出现的分组字段中
                .select($("id"), $("w").start(), $("w").end(), $("vc").sum())
                .execute()
                .print();

        env.execute();

    }
}

运行结果:
在这里插入图片描述

2.2、分组窗口中的滑动窗口

.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("w"))

运行结果:
在这里插入图片描述

2.3、分组窗口中的会话窗口

.window(Session.withGap(lit(6).second()).on($("ts")).as("w"))

运行结果:
在这里插入图片描述

三、Over windows

Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。

3.1、Unbounded Over Windows

代码示例:

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.*;

public class Sql_Over_windows {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> waterSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),
                new WaterSensor("sensor_1", 1000L, 100),
                new WaterSensor("sensor_2", 4000L, 200),
                new WaterSensor("sensor_2", 2000L, 200),
                new WaterSensor("sensor_2", 3000L, 200),
                new WaterSensor("sensor_2", 5000L, 200),
                new WaterSensor("sensor_2", 6000L, 200))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner((element, stamptime) -> element.getTs()));
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSource, $("id"), $("ts").rowtime(), $("vc"));
        table.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w"))
                .select($("id"), $("ts"), $("vc").sum().over($("w")).as("sum_vc"))
                .execute()
                .print();
        env.execute();

    }
}

运行结果:
在这里插入图片描述

# 使用UNBOUNDED_RANGE
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w"))

在这里插入图片描述

3.2、Bounded Over Windows

当事件时间向前算3s得到一个窗口

.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w"))

运行结果:
在这里插入图片描述

当行向前推算2行算一个窗口

.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(rowInterval(2L)).as("w"))

四、总结

作为大数据工程师,我们最为熟悉的数据统计方式,当然就是写 SQL 了。SQL 是结构化查询语言(Structured Query Language)的缩写,是我们对关系型数据库进行查询和修改的通用编程语言。在关系型数据库中,数据是以表(table)的形式组织起来的,所以也可以认为 SQL 是用来对表进行处理的工具语言。无论是传统架构中进行数据存储的MySQL、PostgreSQL,还是大数据应用中的 Hive,都少不了 SQL 的身影;而 Spark 作为大数据处理引擎,为了更好地支持在 Hive 中的 SQL 查询,也提供了 Spark SQL 作为入口。Flink 同样提供了对于“表”处理的支持,这就是更高层级的应用 API,在 Flink 中被称为Table API 和 SQL。Table API 顾名思义,就是基于“表”(Table)的一套 API,它是内嵌在 Java、Scala 等语言中的一种声明式领域特定语言(DSL),也就是专门为处理表而设计的;在此基础上,Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来,我们就可以在 Flink 程序中直接写 SQL 来实现处理需求了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1054475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

S32K144 GPIO编程

前面的文章介绍了如何在MDK-Keil下面进行S32K144的开发&#xff0c;下面就使用该工程模板进行GPIO LED的编程试验。 1. 开发环境 S32K144EVB-Q100开发板MDK-Keil Jlink 2. 硬件连接 S32K144EVB-Q100开发板关于LED的原理图如下&#xff1a; 也就是具体连接关系如下&#xf…

【知识点随笔分析 | 第五篇】简单介绍什么是QUIC

前言&#xff1a; 随着互联网的快速发展&#xff0c;传统的基于TCP的协议开始显现出一些局限性。TCP在连接建立和拥塞控制方面存在一定的延迟&#xff0c;这可能导致用户在访问网页、观看视频或玩网络游戏时感受到不必要的等待时间。而QUIC作为一种新兴的传输协议&#xff0c;试…

Java编程技巧:swagger2、knif4j

目录 1、springbootswagger2knif4j2、springbootswagger3knif4j3、springcloudswagger2knif4j 1、springbootswagger2knif4j 2、springbootswagger3knif4j 3、springcloudswagger2knif4j 注意点&#xff1a; Api注解&#xff1a;Controller类上的Api注解需要添加tags属性&a…

P1311 [NOIP2011 提高组] 选择客栈(小小的也很可爱【指小动规】)

[NOIP2011 提高组] 选择客栈 题目描述 丽江河边有 n n n 家很有特色的客栈&#xff0c;客栈按照其位置顺序从 1 1 1 到 n n n 编号。每家客栈都按照某一种色调进行装饰&#xff08;总共 k k k 种&#xff0c;用整数 0 ∼ k − 1 0 \sim k-1 0∼k−1 表示&#xff09;&am…

stm32 - 中断

stm32 - 中断 中断向量表NVIC 嵌套中断向量控制器优先级 中断向量表 自定义的中断服务函数&#xff0c;由编译器随机指定函数地址 stm32的中断&#xff0c;由于硬件的限制&#xff0c;只能跳到固定的地址执行程序 为了能让硬件跳转到一个不固定的中断函数中&#xff0c; 需要在…

基于Qt Creator开发的坦克大战小游戏

目录 介绍开发环境技术介绍安装说明项目目录设计思想项目介绍运行演示知识点记录Gitee源码链接 介绍 &#xff01;&#xff01;&#xff01;资源图片是从网上免费下载&#xff0c;源码都是原创&#xff0c;供个人学习使用&#xff0c;非盈利&#xff01;&#xff01;&#xff…

Acwing 836. 合并集合

Acwing 836. 合并集合 题目描述思路讲解代码展示 题目描述 思路讲解 并查集&#xff1a;代码短&#xff0c;思路精巧&#xff0c;面试常见。 近乎O&#xff08;1&#xff09;的时间复杂度。 代码展示 #include<iostream>using namespace std;const int N 100010; in…

Python海洋专题四之水深地图图像修饰

Python海洋专题四之水深地图图像修饰 海洋与大气科学 上期会修改画布大小、坐标轴字体 没有对数据本身进行修改 本期内容 1&#xff1a;修改colormap&#xff01; 2&#xff1a;倒置colormap 3&#xff1a;加上colorbar、调整其显示位置和字体大小 1&#xff1a;修改colo…

OpenCV实现视频的追踪(meanshift、Camshift)

目录 1&#xff0c;meanshift 1.1 算法流程 1.2 算法实现 1.3 代码实现 1.4 结果展示 1&#xff0c;meanshift 1.1 算法流程 1.2 算法实现 1.3 代码实现 import numpy as np import cv2 as cv# 读取视频 cap cv.VideoCapture(video.mp4)# 检查视频是否成功打开 if n…

意大利储能公司【Energy Dome】完成1500万欧元融资

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;总部位于意大利米兰的储能公司Energy Dome今日宣布已完成1500万欧元B轮融资。 本轮融资完成后&#xff0c;Energy Dome的融资总额已经达到了5500万欧元&#xff0c;本轮融资的参与者包括阿曼创新发…

【Java 进阶篇】JDBC插入数据详解

在Java应用程序中&#xff0c;与数据库交互是一项常见的任务。其中&#xff0c;插入数据操作是一种基本的数据库操作之一。本文将详细介绍如何使用Java JDBC&#xff08;Java Database Connectivity&#xff09;来执行插入数据操作。无论您是初学者还是有一定经验的开发人员&am…

好题分享

1.Problem - G - Codeforces &#xff08;1&#xff09;题意 &#xff08;2&#xff09;思路 因为最多13次&#xff0c;那么不如我们就问13次&#xff0c;然后考虑把每一个位置重新按二进制拆分成一个下标&#xff0c;因为C(13,6) > 1000,因此在数量上是满足得&#xff0c;我…

【centos7】centos7卸载gitlab

一、GitLab安装 1. 安装依赖包 yum install -y curl policycoreutils-python openssh-server 2. 安装lrzsz&#xff08;如已经安装可忽略&#xff09; yum -y install lrzsz 3. 下载rpm包 cd /usr/local wget https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/yum/el7/gi…

spring security(二)--授权

零.前情提要 这篇文章主要借鉴B站三更大大关于spring security的教程&#xff0c;这篇文章的大部分内容也来自于那个教程&#xff0c;写这个的主要目的是记录加强印象&#xff0c;总结&#xff0c;并且在文章中我也有穿插自己的想法。 前面的文章【spring security教程&#…

java Spring Boot 将日志写入文件中记录

我们之前的一套操作来讲 日志都是在控制台上的 但 如果你的项目在正式环境上跑 运维人员突然告诉你说日志报错了&#xff0c;但你日志只在控制台上&#xff0c;那公司项目如果访问量很大 那你是很难在控制台上找到某一条日志的 这时 我们就可以用文件把它记下来 我们打开项目 …

python使用mitmproxy和mitmdump抓包在电脑上抓包(二)

在我的上篇文章中&#xff0c;主要记录如何安装mitmproxy和抓取https流量。参考链接&#xff1a; python使用mitmproxy和mitmdump抓包在电脑上抓包-CSDN博客 本篇主要使用python配合mitmdump来抓包和处理返回包&#xff0c;更加灵活&#xff0c;这也是mitmproxy(mitmdump)的最…

快看看你的手机有没有:谷歌Android全面封杀此类软件!

谷歌坐不住了&#xff0c;因为Android应用商店中&#xff0c;充斥着大量可窃取用户数据的应用&#xff0c;所以必然要出手整治了。 一款名叫“SonicSpy”软件是整个事情的导火索&#xff0c;而该应用是典型的窃取用户数据的应用&#xff0c;其除了可以从手机中提取个人数据外&…

一维数组和二维数组的使用(一)

目录 导读1. 一维数组1.1 一维数组的创建1.2 数组的初始化1.3 一维数组的使用1.4 一维数组在内存中的存储 2. 二维数组2.1 二维数组的创建2.2 二维数组的初始化2.3 二维数组的使用2.4 二维数组在内存中的存储 博主有话说 导读 本篇主要讲解一维数组和二维数组的创建和使用&…

【Redis】redis基本数据类型详解(String、List、Hash、Set、ZSet)

目录 RedisString(字符串)List(列表)Hash(字典)Set(集合)ZSet(有序集合) Redis Redis有5种基本的数据结构&#xff0c;分别为&#xff1a;string&#xff08;字符串&#xff09;、list&#xff08;列表&#xff09;、set&#xff08;集合&#xff09;、hash&#xff08;哈希&a…

【Django笔记】认证系统

使用Django 的认证系统实现用户信息验证 在前面使用migrate 命令进行数据迁移时候&#xff0c;生成了auth_user表 ,该表中存放了用户信息可以用来登录Django自带的Admin 管理后台。 创建Admin 后台管理账号 python manage.py createuperuser E:\data\python\djaongo_prj\gu…