大数据-玩转数据-Flink时间滚动动窗口

news2025/1/7 12:15:58

一、说明

时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸.
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp())
时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。

二、思路

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定.
2.我们传递给window函数的对象叫窗口分配器.

三、数据准备

准备一个WaterSensor类方便演示

package com.lyh.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;
}

四、代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                    List<WaterSensor> list  = toList(elements);
                        long starttime = ctx.window().getStart();
                        long endtime = ctx.window().getEnd();

                        out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);

                    }
                }).print();
        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);
            
        }
        return list;
    }
}

五、结果

在hadoop100 服务器
输入nc -lk 999
在这里插入图片描述

消费结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/923610.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTTP原理与实现

一、基本概念 一、基本原理* 1、全称&#xff1a; HyperText Transfer Protocol (超文本传输协议) 2、底层实现协议&#xff1a;建立在 TCP/IP 上的无状态连接。 3、基本作用&#xff1a;用于客户端与服务器之间的通信&#xff0c;规定客户端和服务器之间的通信格式。包括请…

PTS性能测试工具-使用记录

因为PTS使用是要收费的&#xff0c;所以文中会有大量图片记录&#xff0c;为我自己以后工作中&#xff0c;可能会再次使用PTS做个参照&#xff0c;以免时间长&#xff0c;容易忘记~ 目录 一、创建场景 二、填写一个压测节点 1、填写节点基本信息 2、Body / Header填写 …

牡丹宣言|对国潮化妆品品牌的理解

化妆品的国潮概念&#xff1f; ■ 是中国的时代潮流。 ■ 是传统元素与现代元素的碰撞。 ■ 是一股年轻的力量。 ■ 是大国崛起的象征。 ■ 是中国文化自信的体现。 如何正确认知化妆品&#xff1f; ■ 化妆品不是药品 ■ 化妆品是一种观念 ■ 化妆品是一种习惯 ■ 化…

自动化测试工具:Airtest入门教程

目录 1.什么是Airtest&#xff1f; 2.AirtestIDE下载安装 3.如何开始使用 4.Airtest入门特例教程 5.总结 1.什么是Airtest&#xff1f; Airtest是一款基于 Python 的、跨平台的UI自动化测试框架。因为它基于 图像识别 的原理&#xff0c;所以适用于所有 Android、 iOS和 …

Linux系统安装(虚拟机安装;系统分区;Linux系统安装;远程登录管理工具)

文章目录 1. VMware虚拟机安装与使用2. 系统分区2.1 磁盘分区2.2 格式化2.3 硬件设备文件名2.4 分区设备文件名2.5 挂载2.6 文件系统结构2.7 总结 3. Linux系统安装4. 远程登录管理工具 1. VMware虚拟机安装与使用 VMware是一个虚拟PC的软件&#xff0c;可以在现有的操作系统上…

opencv 车牌号的定位和识别+UI界面识别系统

目录 一、实现和完整UI视频效果展示 主界面&#xff1a; 识别结果界面&#xff1a;&#xff08;识别车牌颜色和车牌号&#xff09; 查看历史记录界面&#xff1a; 二、原理介绍&#xff1a; 车牌检测->图像灰度化->Canny边缘检测->膨胀与腐蚀 边缘检测及预处理…

【C++11】future和async等

C11的future和async等关键字 1.async和future的概念 std::async 和 std::future 是 C11 引入的标准库功能&#xff0c;用于实现异步编程&#xff0c;使得在多线程环境中更容易处理并行任务。它们可以帮助你在不同线程中执行函数&#xff0c;并且能够方便地获取函数的结果。 在…

mysql(八)事务隔离级别及加锁流程详解

目录 MySQL 锁简介什么是锁锁的作用锁的种类共享排他锁共享锁排它锁 粒度锁全局锁表级锁页级锁行级锁种类 意向锁间隙临键记录锁记录锁间隙锁 加锁的流程锁的内存结构加锁的基本流程根据主键加锁根据二级索引加锁根据非索引字段查询加锁加锁规律 锁信息查看查看锁的sql语句 数据…

npm install sentry-cli失败的问题

1. 目前报错 2. 终端运行 npm set ENTRYCLI_CDNURLhttps://cdn.npm.taobao.org/dist/sentry-cli npm set sentrycli_cdnurlhttps://cdn.npm.taobao.org/dist/sentry-cli3. 再安装 npx sentry/wizardlatest -i nextjs即可成功

信创测试的应用是什么

信创测试作为评估创意和创新项目的工具&#xff0c;为企业的发展提供了重要的支持和指导。它能够帮助企业降低风险、优化资源配置&#xff0c;促进创意与创新的迭代和改进。其具体应用&#xff0c;小编带大家一起来看看详情吧! 一、产品和服务创新 信创测试可以用于评估新产品和…

linux并发服务器 —— 动态库和静态库实战(一)

-E 预处理指定源文件 -S 编译指定源文件 -c 汇编指定源文件 -o 生成可执行文件 -I directory 指定Include包含文件的搜索目录 -g 编译的时候生成调试信息 -D 在程序编译时指定一个宏 -w 不生成任何的警告信息 -Wall 生成所有警告 -On n:0~3&#xff1b;表示编译器的优…

Kubernetes(K8S)使用PV和PVC做存储安装mysql

Kubernetes使用PV和PVC做存储安装mysql 环境准备什么是PV和PVC环境准备配置nfs安装nfs配置nfs服务端 创建命名空间配置pv和pvcpv的yaml文件pvc的yaml文件 部署mysql创建mysql的root密码的secret创建mysql部署的yaml部署mysql链接mysql外部链接内部链接 环境准备 首先你需要一个…

【路由器】小米 WR30U 解锁并刷机

文章目录 解锁 ssh环境准备解锁过程 刷入 mt798x uboot简介刷入流程 刷入 ImmortalWrt简介刷入流程 刷为原厂固件参考资料 本文主要记录个人对小米 WR30U 路由器的解锁和刷机过程&#xff0c;整体步骤与 一般安装流程 类似&#xff0c;但是由于 WR30U 的解锁 ssh 和刷机的过程中…

Docker打包JDK20镜像

文章目录 Docker 打包 JDK 20镜像步骤1.下载 jdk20 压缩包2.编写 dockerfile3.打包4.验证5.创建并启动容器6.检查 Docker 打包 JDK 20镜像 步骤 1.下载 jdk20 压缩包 https://www.oracle.com/java/technologies/downloads/ 2.编写 dockerfile #1.指定基础镜像&#xff0c;并…

MongoDB Long 类型 shell 查询

场景 1、某数据ID为Long类型&#xff0c;JAVA 定义实体类 Id Long id 2、查询数据库&#xff0c;此数据存在 3、使用 shell 查询&#xff0c;查不到数据 4、JAVA代码查询Query.query 不受任何影响 分析 尝试解决&#xff08;一&#xff09; long 在 mongo中为 int64 类型…

ARM-汇编指令

一&#xff0c;map.lds文件 链接脚本文件 作用&#xff1a;给编译器进行使用&#xff0c;告诉编译器各个段&#xff0c;如何进行分布 /*输出格式&#xff1a;32位可执行程序&#xff0c;小端对齐*/ OUTPUT_FORMAT("elf32-littlearm", "elf32-littlearm",…

ELFNet: Evidential Local-global Fusion for Stereo Matching

论文地址&#xff1a;https://arxiv.org/pdf/2308.00728.pdf 源码地址&#xff1a;https://github.com/jimmy19991222/ELFNet 概述 针对现有立体匹配模型面临可靠性和跨域泛化的问题&#xff0c;本文提出了Evidential Local-global Fusion&#xff08;ELF&#xff09;框架&…

十一、内部类(2)

本章概要 为什么需要内部类 闭包与回调内部类与控制框架 继承内部类内部类可以被重写么&#xff1f;局部内部类内部类标识符 为什么需要内部类 至此&#xff0c;我们已经看到了许多描述内部类的语法和语义&#xff0c;但是这并不能同答“为什么需要内部类”这个问题。那么&a…

几个nlp的小项目(文本分类)

几个nlp的小项目(文本分类) 导入加载数据类、评测类查看数据集精确展示数据测评方法设置参数tokenizer,token化的解释对数据集进行预处理加载预训练模型进行训练设置训练模型的参数一个根据任务名获取,测评方法的函数创建预训练模型开始训练本项目的工作完成了什么任务?导…

(AcWing) 任务安排(I,II,III)

任务安排I: 有 N 个任务排成一个序列在一台机器上等待执行&#xff0c;它们的顺序不得改变。 机器会把这 N 个任务分成若干批&#xff0c;每一批包含连续的若干个任务。 从时刻 0 开始&#xff0c;任务被分批加工&#xff0c;执行第 i 个任务所需的时间是 Ti。 另外&#x…