Flink窗口分类简介及示例代码

news2025/1/23 15:04:01

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 1. 流式计算
      • 2. 窗口
      • 3. 窗口的分类
        • ◆ 基于时间的窗口(时间驱动)
          • 1) 滚动窗口(Tumbling Windows)
          • 2) 滑动窗口(Sliding Windows)
          • 3) 会话窗口(Session Windows)
        • ◆ 基于元素个数的(数据驱动)
          • 1) 滚动窗口(Tumbling Windows)
          • 2) 滑动窗口(Sliding Windows)

1. 流式计算

  Flink作为一个流式处理引擎,被设计用来处理无限数据集,理论上来说,无限数据集是一种不断产生,源源不断的数据集,说白了就是你不知道这个数据流它啥时候结束,这就是无限数据集。

  流式计算的思想是每来一个数据我就直接处理,而不用等,因此他非常适合在实时性要求比较高的场景下使用。

2. 窗口

  在流处理的场景下,如果我们想要统计过去某个时间段或过去多少条数据的指标时,就需要用到窗口,在Flink中,窗口(window)可以将流划分为有限块进行处理,Flink将这些有限的块抽象为“存储桶(bucket)”,我们可以在这些所谓的桶上做计算,也就实现了无限数据的有限计算。

  窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。

  窗口的声明周期是:一个窗口在第一个属于它的元素到达时就会被创建,然后在时间(event 或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness (可容忍的迟到时间)”时 被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证, 比如全局窗口(Global Windows)。 例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么第一个元素落入 12:00 至 12:05 这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06 时,这个窗口将被摧毁。关于窗口的详细介绍查看->官方对于窗口的介绍

3. 窗口的分类

◆ 基于时间的窗口(时间驱动)

1) 滚动窗口(Tumbling Windows)

window(TumblingProcessingTimeWindows.of(Time.seconds(10))),参数是时间滚动窗口大小。10秒滚动一个窗口

  滚动窗口将元素分发到指定大小的窗口。滚动窗口的大小是固定的,且个窗口之间没有空隙,不会重叠。比如说,如果你指定了滚动窗口的大小为5分钟,那么每5分钟就会有一个窗口被计算,且一个新的窗口被创建。如下图所示:
在这里插入图片描述
示例代码:

public class Flink01_Window_Time {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line->{
                    String[] datas = line.split(",");
                    return new WaterSensor(
                            datas[0],
                            Long.valueOf(datas[1]),
                            Integer.valueOf(datas[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义一个长度为10s的滚动窗口 每隔10s滚动一次
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .process(
                        // 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型
                        new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {
                            // 在窗口关闭的时候触发一次
                            @Override
                            public void process(String s, // key ,keyBy之后的key
                                                Context context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...
                                                Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素
                                                Collector<Object> out) throws Exception {
                                // 把Iterable中所有的元素取出并存入到 list 集合中
                                List<WaterSensor> list = AnqclnUtil.toList(elements);
                                // 获取窗口的相关信息,窗口开始时间和结束时间
                                String startTime = AnqclnUtil.toDateTime(context.window().getStart());
                                String endTime = AnqclnUtil.toDateTime(context.window().getEnd());

                                out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);
                            }
                        }
                )
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

自定义的工具类:

public class AnqclnUtil {
    // 要先声明泛型
    public static <T>List<T> toList(Iterable<T> elements) {
        List<T> list = new ArrayList<>();

        for (T t : elements) {
            list.add(t);
        }
        return list;
    }
	// 将long类型的时间转换为时间字符串
    public static String toDateTime(long ts) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(ts);
    }
}

运行结果:

在这里插入图片描述

2) 滑动窗口(Sliding Windows)

window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))),参数是时间滑动窗口大小和滑动距离。5秒滑动一个窗口,每个窗口最多放10个元素

  与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

  比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

在这里插入图片描述

示例代码:

public class Flink01_Window_Time {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line->{
                    String[] datas = line.split(",");
                    return new WaterSensor(
                            datas[0],
                            Long.valueOf(datas[1]),
                            Integer.valueOf(datas[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                .process(
                        // 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型
                        new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {
                            // 在窗口关闭的时候触发一次
                            @Override
                            public void process(String s, // key ,keyBy之后的key
                                                Context context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...
                                                Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素
                                                Collector<Object> out) throws Exception {
                                // 把Iterable中所有的元素取出并存入到 list 集合中
                                List<WaterSensor> list = AnqclnUtil.toList(elements);
                                // 获取窗口的相关信息,窗口开始时间和结束时间
                                String startTime = AnqclnUtil.toDateTime(context.window().getStart());
                                String endTime = AnqclnUtil.toDateTime(context.window().getEnd());

                                out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);
                            }
                        }
                )
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

3) 会话窗口(Session Windows)

window(ProcessingTimeSessionWindows.withGap(Time.seconds(4))),参数是会话间隔,也就是多久没有活跃就关闭当前会话。4秒不活跃就关闭窗口。

  会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

在这里插入图片描述

示例代码:

public class Flink01_Window_Time {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line->{
                    String[] datas = line.split(",");
                    return new WaterSensor(
                            datas[0],
                            Long.valueOf(datas[1]),
                            Integer.valueOf(datas[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                // 定义一个session窗口,时间间隔为4s  对于session窗口来说,不同的key出发时间不同,每个key都维护自己的session
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(4)))
                .process(
                        // 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型
                        new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {
                            // 在窗口关闭的时候触发一次
                            @Override
                            public void process(String s, // key ,keyBy之后的key
                                                Context context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...
                                                Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素
                                                Collector<Object> out) throws Exception {
                                // 把Iterable中所有的元素取出并存入到 list 集合中
                                List<WaterSensor> list = AnqclnUtil.toList(elements);
                                // 获取窗口的相关信息,窗口开始时间和结束时间
                                String startTime = AnqclnUtil.toDateTime(context.window().getStart());
                                String endTime = AnqclnUtil.toDateTime(context.window().getEnd());

                                out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);
                            }
                        }
                )
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

◆ 基于元素个数的(数据驱动)

1) 滚动窗口(Tumbling Windows)

countWindow(3),参数是个数滚动窗口大小。3个元素滚动一个窗口

  每来多少个元素就滚动一次

示例代码:

public class Flink02_Window_Count {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line -> {
                    String[] data = line.split(",");

                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发
                .countWindow(3)
                .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
                    @Override
                    public void process(String s,
                                        Context context,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                        List<WaterSensor> list = AnqclnUtil.toList(elements);

                        out.collect(" key: "+s+" "+list);
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

2) 滑动窗口(Sliding Windows)

countWindow(3,2),参数是个数滑动窗口大小和滑动步长。每两个元素产生一个新的窗口,每个窗口最多放3个元素。

  就比滚动的多了个参数,滑动步长。步长是生成新窗口的条件,而窗口大小是指这个窗口最多能放多少个元素

示例代码:

public class Flink02_Window_Count {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line -> {
                    String[] data = line.split(",");

                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发
//                .countWindow(3)
                // 定义一个长度为3(窗口内元素的最大个数)  每来两个2个元素滑动一次,
                .countWindow(3,2)
                .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
                    @Override
                    public void process(String s,
                                        Context context,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                        List<WaterSensor> list = AnqclnUtil.toList(elements);

                        out.collect(" key: "+s+" "+list);
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/856432.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

发现 Kubernetes 集群受到攻击

Aqua Security 的研究团队 Aqua Nautilus 发现数百个组织的 Kubernetes 集群受到攻击。 这位云原生安全专家宣布&#xff0c;一项为期三个月的调查显示&#xff0c;属于 350 多个组织、开源项目和个人的 Kubernetes 集群可公开访问且不受保护。 一个值得注意的集群子集与大型…

交流有效值,峰值和平均值关系

1&#xff0c;交流有效值&#xff0c;峰值和平均值关系&#xff1a; 2&#xff0c;根据负载&#xff0c;确定变压器满载时的输出电压&#xff1a; 1&#xff09;&#xff0c;为了使稳压芯片MIC29302输出4V&#xff0c;LDO压差 0.4V&#xff0c;整流桥压降为1V&#xff0c; 则…

新品发布会上出现国风数字人?写实数字人定制技术助推品牌引领年轻消费市场潮流

在小牧优品新品发布会上推出首位国风数字人潇沐&#xff0c;这是为了让年轻化、时尚化品牌特质更加呈现出来&#xff0c;聚焦年轻消费市场的一大战略。品牌结合虚拟形象3d建模技术&#xff0c;打造出符合品牌专属数字人&#xff0c;并且结合了动作捕捉技术打破行业交流壁垒&…

深度优先搜索与动态规划|543, 124, 687

深度优先搜索与动态规划|543. 二叉树的直径&#xff0c;124. 二叉树中的最大路径和&#xff0c;687. 最长同值路径 二叉树的直径二叉树中的最大路径和最长同值路径 二叉树的直径 好久没写二叉树了&#xff0c;主要还是看遍历的顺序是什么样的。 # Definition for a binary tr…

模拟出栈的所有顺序(dfs+回溯)

题目&#xff1a; 已知某一个字母序列&#xff0c;把序列中的字母按出现顺序压入一个栈&#xff0c;在入栈的任意过程中&#xff0c;允许栈中的字母出栈&#xff0c;求所有可能的出栈顺序 示例&#xff1a; 输入abc 输出abc、acb、bac、bca、cba 代码如下 #define _CRT_SECURE…

人工智能术语翻译(六)

文章目录 摘要UVWXYZ 摘要 人工智能术语翻译第六部分&#xff0c;包括U、V、W、X、Y、Z开头的词汇&#xff01; U 英文术语中文翻译常用缩写备注Ugly Duckling Theorem丑小鸭定理Unbiased无偏Unbiased Estimate无偏估计Unbiased Sample Variance无偏样本方差Unconstrained …

微服务与Nacos概述-2

微服务间消息传递 微服务是一种软件开发架构&#xff0c;它将一个大型应用程序拆分为一系列小型、独立的服务。每个服务都可以独立开发、部署和扩展&#xff0c;并通过轻量级的通信机制进行交互。 应用开发 common模块中包含服务提供者和服务消费者共享的内容 provider模块是…

【数学】CF1796 C

Problem - 1796C - Codeforces 题意&#xff1a; 思路&#xff1a; 模拟一下样例可以发现一些规律 Code&#xff1a; #include <bits/stdc.h>#define int long longusing i64 long long;constexpr int N 1e6 10; constexpr int mod 998244353;void solve() {int l…

搭建 Java 部署环境

yum 认识 yum yum(Yellow dog Updater, Modified)是Linux下非常常用的一种包管理器. 主要应用在Fedora, RedHat, Centos等发行版上. 包管理器就好比 "应用商店", 我们可以在应用商店上下载一些 app. yum 起到的功能和 Maven 的依赖管理功能类似. 使用 Maven 能帮…

kubernetes pod 资源限制 探针

资源限制 当定义 Pod 时可以选择性地为每个容器设定所需要的资源数量。 最常见的可设定资源是 CPU 和内存大小&#xff0c;以及其他类型的资源。 当为 Pod 中的容器指定了 request 资源时&#xff0c;代表容器运行所需的最小资源量&#xff0c;调度器就使用该信息来决定将 Pod …

JS逆向系列之猿人学爬虫第8题-验证码-图文点选

题目地址 https://match.yuanrenxue.cn/match/8本题的难点就在于验证码的识别,没啥js加密,只要识别对了携带坐标就给返回数据 回过头来看验证码 这里复杂的字体比较多,人看起来都有点费劲(感觉可能对红绿色盲朋友不太又好)&#x

为什么说Java是值传递?

值传递、引用传递 首要我们需要明确什么是值传递、什么是引用传递。 值传递&#xff1a;形参接收的是实参的拷贝&#xff08;副本&#xff09;。因此对形参的修改&#xff0c;不一定会影响实参。引用传递&#xff1a;形参接收的是实参本身&#xff0c;不会创建副本。因此对形…

flutter 手写日历组件

先看效果 直接上代码 calendar_popup_view.dart import package:flutter/material.dart; import package:intl/intl.dart;import custom_calendar.dart; import hotel_app_theme.dart;class CalendarPopupView extends StatefulWidget {const CalendarPopupView({required th…

翻出了我当时学习的笔记来了html

php&#xff1a;高级语言 web应用程序 万维网 浏览器中查看 apache&#xff1a;服务器 mysql&#xff1a;数据库 html 标签 css&#xff1a;层叠样式表 javascript&#xff1a;客户端脚本 js jquery mysql数据库基础 php语法基础 面向对象&#xff08;物件&#xff09; smar…

WebRTC | 实现数据流的一对一通信

目录 一、浏览器对WebRTC的支持 二、MediaStream与MediaStreamTrack 三、RTCPeerConnection 1. RTCPeerConnection与本地音视频数据绑定 2. 媒体协商SDP 3. ICE &#xff08;1&#xff09;Candidate信息 &#xff08;2&#xff09;WebRTC收集Candidate &#xff08;3&…

LOTO示波器实测过压保护芯片LP5300工作效果

过压保护电路是电子产品设置中经常要用到的&#xff0c;以前都是用分立元件搭的各种经典电路&#xff0c;最近LOTO虚拟示波器客户推荐了一款很便宜的集成的过压保护芯片LP5300&#xff0c;体积很小&#xff0c;使用简单&#xff0c;外接两个电容就可以了&#xff0c;下图是它的…

linux自定义网络访问规则

1.更改防火墙默认区域为trusted firewall-cmd --set-default-zonetrusted 2.新建一个zone&#xff0c;将想要访问本机80端口的ip&#xff0c;如&#xff1a;192.168.3.99 &#xff0c;添加的这个zone中&#xff0c;同时在这个zone中放行80端口。 firewall-cmd --permanent --ne…

django中使用bootstrap-datepicker时间插件

1、插件的下载 Bootstrap Datepicker是一款基 于Bootstrap框架的日期选择控件&#xff0c;可以方便地在Web应用中添加可交互的日期选择功能。Bootstrap Datepicker拥有丰富的选项和API,支持多种日期格式&#xff0c;可以自定义样式并支持各种语言。 Bootstrap Datepicker 依赖…

DolphinScheduler集群搭建详细笔记

1.DolphinScheduler Cluster部署 1.1 集群部署规划 集群模式下&#xff0c;可配置多个Master及多个Worker。通常可配置2~3个Master&#xff0c;若干个Worker。由于集群资源有限&#xff0c;此处配置一个Master&#xff0c;三个Worker&#xff0c;集群规划如下。 主机名ip服务…

Hybrid App 技术发展的趋势解读

Hybrid这个词&#xff0c;在App开发领域&#xff0c;相信大家都不陌生。Hybrid App是指介于web-app、native-app这两者之间的app&#xff0c;它虽然看上去是一个Native App&#xff0c;但只有一个UI WebView&#xff0c;里面访问的是一个Web App。Hybrid在移动领域的发展&#…