flinksql的滚动窗口实现

news2024/12/26 19:32:07

滚动窗口在flinksql中是TUMBLE

eventTime

package com.bigdata.day08;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class _01_flinkSql_eventTime_tumble {
    /**
     * eventTime + 滚动窗口 60秒 + 3秒的水印
     * 
     * 
     * 数据格式
     * {"username":"zs","price":20,"event_time":"2023-07-18 12:12:04"}
     * {"username":"zs","price":20,"event_time":"2023-07-18 12:13:00"}
     * {"username":"zs","price":20,"event_time":"2023-07-18 12:13:03"}
     * {"username":"zs","price":20,"event_time":"2023-07-18 12:14:03"}
     */

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        //2. 创建表
        tenv.executeSql("CREATE TABLE table1 (\n" +
                "  `username` String,\n" +
                "  `price` int,\n" +
                "  `event_time` TIMESTAMP(3),\n" +
                "   watermark for event_time as event_time - interval '3' second\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topic1',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +
                "  'properties.group.id' = 'testGroup1',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");
        //3. 通过sql语句统计结果

        tenv.executeSql("select \n" +
                "   window_start,\n" +
                "   window_end,\n" +
                "   username,\n" +
                "   count(1) zongNum,\n" +
                "   sum(price) totalMoney \n" +
                "   from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second))\n" +
                "group by window_start,window_end,username").print();
        //4. sink-数据输出



        //5. execute-执行
        env.execute();
    }
}

processTime

package com.bigdata.day08;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class _03_flinkSql_processTime_tumble {
    /**
     * process + 滚动窗口60秒
     * 
     * 数据格式
     * {"username":"zs","price":20}
     * {"username":"lisi","price":15}
     * {"username":"lisi","price":20}
     * {"username":"zs","price":20}
     * {"username":"zs","price":20}
     * {"username":"zs","price":20}
     * {"username":"zs","price":20}
     */

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        //2. 创建表
        tenv.executeSql("CREATE TABLE table1 (\n" +
                "  `username` String,\n" +
                "  `price` int,\n" +
                "  `event_time` as proctime()\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topic1',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +
                "  'properties.group.id' = 'testGroup1',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");
        //3. 通过sql语句统计结果

        tenv.executeSql("select \n" +
                "   window_start,\n" +
                "   window_end,\n" +
                "   username,\n" +
                "   count(1) zongNum,\n" +
                "   sum(price) totalMoney \n" +
                "   from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second))\n" +
                "group by window_start,window_end,username").print();
        //4. sink-数据输出



        //5. execute-执行
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253821.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

物联网——WatchDog(监听器)

看门狗简介 独立看门狗框图 看门狗原理:定时器溢出,产生系统复位信号;若定时‘喂狗’则不产生系统复位信号 定时中断基本结构(对比) IWDG键寄存器 独立看门狗超时时间 WWDG(窗口看门狗) WWDG特性 WWDG超时时间 由于…

医疗挂号|基于springBoot的医疗挂号管理设计与实现(附项目源码+论文+数据库)

目录 一、摘要 二、相关技术 三、系统设计 四、数据库设计 五、核心代码 六、论文参考 七、源码获取 一、摘要 在如今社会上,关于信息上面的处理,没有任何一个企业或者个人会忽视,如何让信息急速传递,并且归档储存…

lobeChat安装

一、安装Node.js version > v18.17.0 二、下载 cd F:\AITOOLS\LobeChat git clone https://github.com/lobehub/lobe-chat.git (下载要是失败就手动下:https://codeload.github.com/lobehub/lobe-chat/zip/refs/heads/main) npm install …

电子应用设计方案-38:智能语音系统方案设计

智能语音系统方案设计 一、引言 智能语音系统作为一种便捷、自然的人机交互方式,正逐渐在各个领域得到广泛应用。本方案旨在设计一个高效、准确、功能丰富的智能语音系统。 二、系统概述 1. 系统目标 - 实现高准确率的语音识别和自然流畅的语音合成。 - 支持多种语…

AWS创建ec2实例并连接成功

aws创建ec2实例并连接 aws创建ec2并连接 1.ec2创建前准备 首先创建一个VPC隔离云资源并且有公有子网 2.创建EC2实例 1.启动新实例或者创建实例 2.创建实例名 3.选择AMI使用linux(HVM) 4.选择实例类型 5.创建密钥对下载到本地并填入密钥对名称 6.选择自己创建的VPC和公有子网…

数字逻辑——二进制

目录 1 信息与编码 1.1 什么是信息? 1.2 什么是编码? 2 数制和码制 2.1 数制 3 一些基本概念 3.1 位(bit) 3.2 字节(byte) 3.3 数据量的大小表示符号 4 二进制 4.1 二进制简介 4.2 二进制的…

初识TCP(编写回显服务器)

目录 初识TCP(编写回显服务器)TCP相关的API服务器代码实现客户端代码实现部分代码解释注意事项效果展示 初识TCP(编写回显服务器) TCP相关的API ServerSocket : 这是socket类,对应到网卡,但是…

ElasticSearch7.x入门教程之全文搜索聚合分析(十)

文章目录 前言一、指标聚合1、统计最大值:Max Aggregation2、统计最小值:Min Aggregation3、统计平均值:Avg Aggregation4、求和:Sum Aggregation5、Cardinality Aggregation6、基本统计:Stats Aggregation7、高级统计…

【Linux】DNS服务配置

DNS DNS是什么 DNS是Domain Name System的缩写,即域名系统。它是一种用来将域名转化为IP地址的系统。在互联网中,每个网站都有一个唯一的IP地址,但是人们更习惯使用简单易记的域名来访问网站。DNS的作用就是将这些域名转化为对应的IP地址,使得人们可以通过域名来访问网站…

第31天:安全开发-JS应用WebPack打包器第三方库JQuery安装使用安全检测

时间轴: 演示案例: 打包器-WebPack-使用&安全 第三方库-JQuery-使用&安全 打包器-WebPack-使用&安全 参考:https://mp.weixin.qq.com/s/J3bpy-SsCnQ1lBov1L98WA Webpack 是一个模块打包器。在 Webpack 中会将前端的所有资源…

开发者如何使用GCC提升开发效率Opencv操作

看此篇前请先阅读 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144216351?spm=1001…

使用PaddlePaddle实现线性回归模型

目录 ​编辑 引言 PaddlePaddle简介 线性回归模型的构建 1. 准备数据 2. 定义模型 3. 准备数据加载器 4. 定义损失函数和优化器 5. 训练模型 6. 评估模型 7. 预测 结论 引言 线性回归是统计学和机器学习中一个经典的算法,用于预测一个因变量&#xff0…

图像处理网络中的模型水印

论文信息:Jie Zhang、Han Fang、Weiming Zhang、Wenbo Zhou、Hao Cui、Hao Cui、Nenghai Yu:Model Watermarking for Image Processing Networks 本文首次提出了图像处理网络中深度水印问题,将知识产权问题引入图像处理模型 提出了第一个深…

全面UI组件库Telerik 2024 Q4全新发布——官方宣布支持.NET 9

Telerik DevCraft包含一个完整的产品栈来构建您下一个Web、移动和桌面应用程序。它使用HTML和每个.NET平台的UI库,加快开发速度。Telerik DevCraft提供最完整的工具箱,用于构建现代和面向未来的业务应用程序,目前提供UI for ASP.NET MVC、Ken…

计算机毕业设计hadoop+spark民宿推荐系统 民宿数据分析可视化大屏 民宿爬虫 民宿大数据 知识图谱 机器学习 大数据毕业设计

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

工业—使用Flink处理Kafka中的数据_ChangeRecord1

使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据,当某设备 30 秒状态连续为 “ 预警 ” ,输出预警 信息。当前预警信息输出后,最近30

【Android】从事件分发开始:原理解析如何解决滑动冲突

【Android】从事件分发开始:原理解析如何解决滑动冲突 文章目录 【Android】从事件分发开始:原理解析如何解决滑动冲突Activity层级结构浅析Activity的setContentView源码浅析AppCompatActivity的setContentView源码 触控三分显纷争,滑动冲突…

消息中间件-Kafka2-3.9.0源码构建

消息中间件-Kafka2-3.9.0源码构建 1、软件环境 JDK Version 1.8Scala Version 2.12.0Kafka-3.9.0 源码包 下载地址:https://downloads.apache.org/kafka/3.9.0/kafka-3.9.0-src.tgzGradle Version > 8.8Apache Zookeeper 3.7.0 2、源码编译 打开源码根目录修改…

【ElasticSearch】倒排索引与ik分词器

ElasticSearch,简称ES(后文将直接使用这一简称),是一款卓越的开源分布式搜索引擎。其独特之处在于其近乎实时的数据检索能力,为用户提供了迅速、高效的信息查询体验。 它能够解决全文检索,模糊查询、数据分析等问题。那么它的搜索…

【开源免费】基于Vue和SpringBoot的洗衣店订单管理系统(附论文)

博主说明:本文项目编号 T 068 ,文末自助获取源码 \color{red}{T068,文末自助获取源码} T068,文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析…