Flink学习:Flink如何打印窗口的开始时间和结束时间

news2025/1/12 20:40:14

Window

    • 一、简介
    • 二、代码实现
    • 三、测试

一、简介

大家知道,Flink用水位线和窗口机制配合来处理乱序事件,保证窗口计算数据的正确性,当水位线超过窗口结束时间的时候,就会触发窗口计算

  • 水位线是动态生成的,根据进入窗口的最大事件时间-允许延迟时间

那么窗口的开始时间和结束时间是怎么计算的呢?这里不讨论计数窗口,因为数量统计很容易知道,只针对时间窗口的计算

  • 滚动时间窗口:按照固定的时间长度对数据进行分组,窗口之间没有重叠,例如,5秒的滚动窗口。开始时间为当前窗口大小的整数倍,结束时间为开始时间加上窗口大小,比如
  • 滑动时间窗口:按照固定的时间长度对数据进行分组,窗口之间有重叠,例如,5秒的滑动窗口,每2秒钟滑动一次。开始时间为当前窗口大小的整数倍加上窗口滑动步长的整数倍,结束时间为开始时间加上窗口大小。
  • 会话窗口:按照数据的时间间隔进行分组,当两个数据之间的时间间隔大于指定的间隔时间时,就认为前一个窗口结束,后一个窗口开始。开始时间为第一个数据的时间戳,结束时间为最后一个数据的时间戳加上间隔时间

看完上面可能还有些疑惑,没关系,下面会有具体示例

二、代码实现

自定义一个Mywindowfunction继承WindowFunction,可以在流上调用这个function从而打印

class MyWindowFunction extends WindowFunction[User, String, Tuple, TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[User], out: Collector[String]): Unit = {
    val startTime = window.getStart
    val endTime = window.getEnd
    val result = s"Window start time: $startTime, end time: $endTime"
    out.collect(result)
  }
}

三、测试

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class User(id:Int,name:String,age:Int,timestamp:Long)
object WaterMarkTest {
  def main(args: Array[String]): Unit = {
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(streamEnv)

    //指定时间类型为事件时间
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = streamEnv.fromElements(
      User(1, "nie", 22, 1511658001000L),
      User(2, "xiao", 19, 1511658005000L)
    ).assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(2)) {
        override def extractTimestamp(t: User): Long = t.timestamp
      }
    )

    stream.keyBy(0)
      .timeWindow(Time.seconds(2))
      .apply(new MyWindowFunction)
      .print()

    streamEnv.execute("test")

  }
}

class MyWindowFunction extends WindowFunction[User, String, Tuple, TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[User], out: Collector[String]): Unit = {
    val startTime = window.getStart
    val endTime = window.getEnd
    val result = s"Window start time: $startTime, end time: $endTime"
    out.collect(result)
  }
}

比如上面的两条数据时间戳分别为1511658001000L、1511658005000L,转换成日期格式为2017-11-26 9:0:1,2017-11-26 9:0:5,设置为滚动窗口,窗口间隔为2s,上面说到滚动窗口窗口开始时间为当前窗口大小的整数倍,结束时间为开始时间加上窗口大小,那么可以推断窗口为2017-11-26 9:0:0(整数倍) ~ 2017-11-26 9:0:2、2017-11-26 9:0:4 ~ 2017-11-26 9:0:6,运行下代码试下:
在这里插入图片描述
确实是一样的,窗口开始时间为事件时间往下推,直到找到窗口大小的整数倍,窗口结束时间就是开始时间加上时间间隔
这时候我们改成滑动窗口,窗口间隔时间为5s,滑动距离为2s,这时候修改一下上述代码

    stream.keyBy(0)
      .timeWindow(Time.seconds(5),Time.seconds(3))
      .apply(new MyWindowFunction)
      .print()

再运行一下:
在这里插入图片描述
上面说到滑动窗口开始时间为当前窗口大小的整数倍加上窗口滑动步长的整数倍,可以理解为窗口大小为5s,滑动距离为2s,事件事件分别为1511658001000L、1511658005000L,相对于1511658001000L,5s的整数倍可以是1511657995000,1511657990000,这时候加上滑动距离的整数倍,可以是1511657992000、1511657994000、1511657998000、1511657997000,为什么窗口最开始为1511657998000,肯定是综合考量的结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/428926.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

力扣70爬楼梯:思路分析+优化思路+代码实现+补充思考

文章目录第一部分:题目描述第二部分:思路分析2.1 初步分析2.2 问题描述2.3 优化思路第三部分:代码实现第四部分:补充思考第一部分:题目描述 🏠 链接:70. 爬楼梯 - 力扣(LeetCode&am…

“衰老标志物”重磅综述:细胞衰老、器官衰老、衰老时钟及其应用

大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 随着人口老龄化程度不断加深,实现“健康老龄化(healthy aging)”已成为我国乃至世界迫切需要解决的重大社会和科学问题。据测算,我国60岁及…

LVGL界面开发之模拟器环境搭建

前言 通常我们在使用 LVGL 进行界面开发时,会先在PC上搭建模拟器环境,而不是直接烧录到硬件板子上,使用模拟器是百利而无一害的,而且它是跨平台的,任何Windows,Linux或macOS系统都可以运行PC模拟器。每当界…

网上投票系统的设计与实现(论文+源码)_kaic

摘要 随着全球Internet的迅猛发展和计算机应用的普及,特别是近几年无线网络的广阔覆盖以及无线终端设备的爆炸式增长,使得人们能够随时随地的访问网络,以获取最新信息、参与网络活动、和他人在线互动。为了能及时地了解民情民意,把…

【高项】项目风险管理与采购管理(十大管理)

【高项】项目风险管理与采购管理(十大管理) 文章目录1、风险管理1.1 什么是风险管理?1.2 规划风险管理 & 识别风险(规划)1.3 实施定性风险分析(规划)1.4 实施定量风险分析(规划&…

分布式缓存之Redis(持久化、主从、哨兵、分片集群)

更多内容请参考官网:https://redis.io/Redis持久化Redis有两种持久化方案:RDB持久化和AOF持久化。RDB持久化RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有…

HTML5 表单属性

文章目录HTML5 表单属性HTML5 新的表单属性\<form> / \<input> autocomplete 属性\<form> novalidate 属性\<input> autofocus 属性\<input> form 属性\<input> formaction 属性\<input> formenctype 属性\<input> formmethod…

批量给TXT文档插入带标题合成图片-Chatgpt生成TXT文档配图神器

1、我们用《Chatgpt 3.5-turbo软件》批量生成txt文档&#xff0c;但是这样txt文档里不带图片&#xff0c;直接发布到网站上&#xff0c;光有文字没有图片&#xff0c;效果也不是很理想&#xff0c;就需要一款配图软件。 2、提高文章的可读性和吸引力&#xff1a;插入图片可以丰…

Mybatis(四):自定义映射resultMap

自定义映射resultMap前言一、处理字段和属性的映射关系问题&#xff1a;方案一&#xff1a;使用别名方案二&#xff1a;在mybatis-config.xml中设置mapUnderscoreToCamelCase方案三&#xff1a;在映射文件中设置redultMap二、多对一映射处理问题&#xff1a;方案一&#xff1a;…

Windows10系统安装Redis教程

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、下载Redis二、安装或解压三、基本配置设置四、启动四、Redis详细配置前言 由于工作中的项目需要用到Redis&#xff0c;那么记录一下Windows11系统安装Redis…

微服务学习-SpringCloud -Nacos (服务注册源码学习)

文章目录源码版本及下载服务注册核心流程图&#xff08;看不清请双击打开大图&#xff09;源码详解客户端注册源码服务端注册源码源码版本及下载 此次源码版本为1.4.1&#xff0c;2.x版本在服务请求时使用了grpc的方式&#xff0c;所以先以1.4.1版本学习&#xff0c;后续再看2…

uni-app--》如何实现网上购物小程序(中上)?

&#x1f3cd;️作者简介&#xff1a;大家好&#xff0c;我是亦世凡华、渴望知识储备自己的一名在校大学生 &#x1f6f5;个人主页&#xff1a;亦世凡华、 &#x1f6fa;系列专栏&#xff1a;uni-app &#x1f6b2;座右铭&#xff1a;人生亦可燃烧&#xff0c;亦可腐败&#xf…

Jenkins终极部署详细版

&#xff08;一&#xff09;首先你需要配置好虚拟机的JDK环境和Maven环境 1、配置JDK环境 &#xff08;1&#xff09;上传安装包&#xff0c;然后解压 &#xff08;2&#xff09;修改Linux环境变量 具体参考&#xff1a; https://blog.csdn.net/u010227042/article/details/1…

腾讯云轻量应用服务器可以修改镜像,但有限制!

腾讯云轻量应用服务器镜像可以更换或修改吗&#xff1f;可以&#xff01;镜像可以修改&#xff0c;镜像是指轻量服务器的预装操作系统&#xff0c;轻量服务器创建成功后镜像也是可以更换的&#xff0c;如下图&#xff1a; 腾讯云轻量应用服务器镜像可以修改 目录 轻量服务器修…

自适应模糊PID控制算法

一、自适应模糊PID控制 自适应模糊PID控制将模糊控制与传统PID控制相结合&#xff0c;将两种控制方式进行结合&#xff0c;取长补短&#xff0c;对传统的算法进行优化&#xff0c;形成一种新的控制算法&#xff0c;自适应模糊PID控制可以用于很多场景&#xff0c;比如温度控制&…

stm32下载代码到单片机上需要调节BOOT为什么模式

一、BOOT模式选择图解 二、BOOT模式介绍 所谓启动&#xff0c;一般来说就是指下好程序后&#xff0c;重启芯片时&#xff0c;SYSCLK的第4个上升沿&#xff0c;BOOT引脚的值将被锁存。用户可以通过设置BOOT1和BOOT0引脚的状态&#xff0c;来选择在复位后的启动模式。 A. Mai…

【读书笔记】《MySQL技术NM InnoDB存储引擎》第一章 MySQL体系结构和存储引擎

文章目录第一章 MySQL体系结构和存储引擎前言1.1 定义数据库和实例1.2MySQL体系结构1.3MySQL存储引擎1.3.1InnoDB存储引擎1.3.2MyISAM存储引擎1.3.3NDB存储引擎1.3.4 Memory存储引擎1.3.5其他存储引擎1.4各存储引擎之间的比较1.5连接MySQL1.5.1 TCP/IP1.5.2命名管道和共享内存1…

np.concatenate函数和np.append函数用于数组拼接

一&#xff1a;np.concatenate() 函数介绍&#xff1a;np.concatenate((a, b), axis0)参数意思&#xff1a;a和b都为数组&#xff0c;axis可以选择大小&#xff0c;axis0 按照行拼接。axis1 按照列拼接。 对于一维数组&#xff0c;情况如下&#xff1a; import numpy as np a…

客户关系管理系统的设计与实现(论文+源码)_kaic

摘 要 近些年来&#xff0c;由于信息科技的不断进步&#xff0c;网络也越来越深入到了各行各业中&#xff0c;信息量呈现的方式各种各样。我们所处的时代社会不管在经济体制、方式&#xff0c;或是在居民消费构成上都产生了巨大的变化&#xff0c;然而现代科技不仅仅为人们生…

fastadmin弹窗添加二级类别

在程序开发中,经常遇上有一、二级表格情况,例如ask和answer,一个ask中,就有很多个answer,如果在后台中分两个列表很容易实现,但很不直观,现通过代码,实现在ask列表中,每个item添加一个查看answer按钮,点击该按钮弹窗显示对应的answer列表,在该弹窗中实现增删改查操作…