大数据-玩转数据-Flink窗口

news2025/1/12 0:51:36

一、Flink 窗口 理解

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。

在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.

时间窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp()),时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。

二、数据准备

准备一个WaterSensor类方便演示

package com.lyh.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;
}

三、时间滚动窗口

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定,2.我们传递给window函数的对象叫窗口分配器.

时间滚动窗口代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                    List<WaterSensor> list  = toList(elements);
                        long starttime = ctx.window().getStart();
                        long endtime = ctx.window().getEnd();

                        out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);

                    }
                }).print();
        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);
            
        }
        return list;
    }
}

运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
在这里插入图片描述

消费结果:
在这里插入图片描述

四、时间滑动窗口

与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。

时间滑动窗口代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
                .process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                    List<WaterSensor> list  = toList(elements);
                        long starttime = ctx.window().getStart();
                        long endtime = ctx.window().getEnd();

                        out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);

                    }
                }).print();
        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);
            
        }
        return list;
    }
}

执行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
在这里插入图片描述
消费结果
在这里插入图片描述

五、时间会话窗口

会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。

时间会话窗口代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
                .process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                    List<WaterSensor> list  = toList(elements);
                        long starttime = ctx.window().getStart();
                        long endtime = ctx.window().getEnd();

                        out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);

                    }
                }).print();
        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);
            
        }
        return list;
    }
}

运行结果

在hadoop100 服务器
输入nc -lk 999 启动socket
在这里插入图片描述
消费结果
在这里插入图片描述
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction 。

六、基于元素个数的滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.

基于元素个数的滚动窗口代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s_n {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                .countWindow(2)
                .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                        List<WaterSensor> list  = toList(elements);
                        out.collect("窗口:" + "key:" + key + "  " + "list:" + list);
                    }
                }).print();

        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);

        }
        return list;
    }
}

运行结果
在这里插入图片描述
在这里插入图片描述

七、基于元素个数的滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s_n {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
//                .countWindow(2)
                .countWindow(3,2)
                .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                        List<WaterSensor> list  = toList(elements);
                        out.collect("窗口:" + "key:" + key + "  " + "list:" + list);
                    }
                }).print();

        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);

        }
        return list;
    }
}

运行结果
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/938699.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

医疗器械行业的MES系统解决方案

医疗器械行业的MES系统&#xff08;制造执行系统&#xff09;解决方案是为医疗器械制造企业提供的一种集成化的信息技术系统&#xff0c;用于管理和监控制造过程&#xff0c;提高生产效率&#xff0c;确保产品质量&#xff0c;以及优化整个生产流程。MES系统通常涵盖了生产计划…

No118.精选前端面试题,享受每天的挑战和学习

文章目录 为什么说HTTP是无状态的协议&#xff1f;HTTP 报文结构是怎样的&#xff1f;HTTP1.1 中如何解决 HTTP 的队头阻塞问题&#xff1f;HTTP 中如何处理表单数据的提交&#xff1f;说下application/x-www-form-urlencoded 和 multipart/form-data对于定长和不定长的数据&am…

影视公司技术流程设计之Pipeline数据管理

使用场景 1 设计师画好设计稿后怎样让导演审查&#xff1f; 叫到位子上看&#xff1f; 放个文件夹会议室统一看&#xff1f;意见怎么记录&#xff0c;怎么反馈&#xff1f; 2 材质&#xff0c;绑定文件怎么与模型同步&#xff0c; 很多时间是绑定不小心改了&#xff0c; 去了灯…

WinRAR<6.23 远程代码执行漏洞【Poc公开】(CVE-2023-38831) [有POC]

漏洞类型代码注入发现时间2023-08-25漏洞等级高危MPS编号MPS-bw2s-d0rvCVE编号CVE-2023-38831漏洞影响广度广 漏洞危害 OSCS 描述WinRAR 是一款适用于 Windows 系统的压缩包管理器。 WinRAR 6.2.3之前版本打开压缩文件时会调用 ShellExecute 函数匹配文件名&#xff0c;如果目…

软件设计师(十一)标准化和软件知识产权基本知识

一、标准化基础知识 标准(Standard)是对重复性事物和概念所做的统一规定 标准化工作的特征包括横向综合性、政策性和统一性 1、基本概念 &#xff08;1&#xff09;标准的分类 国际标准&#xff1a;ISO、IEC等国际标准化组织国家标准&#xff1a;GB一中国、ANSI一美国、BS一…

AIGC领航,智能AI赋能乡村教育,梦想扬帆远航

一位扎根深山支教十年的湖北教师袁辉的故事曾经感受无数人&#xff0c;新华社、人民日报都撰文为他点赞。在他带过的学生中&#xff0c;有一位名叫青青的女学生患有成骨不全症&#xff0c;俗称“瓷娃娃”&#xff0c;学校离家十几公里山路&#xff0c;上学对她来说&#xff0c;…

Java注解与反射

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Java注解与反射 Java注解和反射是Java语言中两个强大的特性&#xff0c;它们可以一起使用以实现动态的、灵活的编程和元数据处理 注解 Java注解&#xff08;Annotatio…

振弦采集读数模块开发原理详细介绍

飞讯教学篇&#xff1a;振弦采集读数模块开发原理详细介绍 振弦采集读数模块是一种用于采集弦振信息的模块&#xff0c;其原理是通过传感器感知弦的振动&#xff0c;将其转化为电信号&#xff0c;然后经过模拟处理和数字化处理&#xff0c;最终输出为可供后续处理的数字信号。…

Python照片压缩教程:如何轻松减小图片大小

介绍 在日常的编程工作中&#xff0c;我们经常需要处理图像&#xff0c;例如上传、下载、显示、编辑等。有时候&#xff0c;我们需要对图像进行压缩&#xff0c;以减少占用的空间和带宽&#xff0c;提高加载速度和用户体验。那么&#xff0c;如何用Python来实现图像压缩呢&…

【数据分析】统计量

1. 均值、众数描述数据的集中趋势度量&#xff0c;四分位差、极差描述数据的离散程度。 2. 标准差、四分位差、异众比率度量离散程度&#xff0c;协方差是度量相关性。 期望值分别为E[X]与E[Y]的两个实随机变量X与Y之间的协方差Cov(X,Y)定义为&#xff1a; 从直观上来看&…

开源vue动态表单组件

一、项目简介 vueelement的动态表单组件&#xff0c;拖拽组件到面板即可实现一个表单 二、实现功能 支持拖拽 支持输入框 支持文本框 支持数字输入框 支持下拉选择器 支持多选框 支持日期控件 支持开关 支持动态表格 支持上传图片 支持上传文件 支持标签 支持ht…

无涯教程-Python机器学习 - Semi-supervised Learning函数

Python机器学习 中的 Semi - 无涯教程网无涯教程网提供https://www.learnfk.com/python-machine-learning/machine-learning-with-python-semi-supervised-learning.html

图像翻拍检测——反射分量分离的特征融合

随着计算机技术的迅速发展&#xff0c;需要建立人与信息一一对应的安保认证技术&#xff0c;通过建立完整的映射网络体系&#xff0c;从而确保每个人的人身、财产、隐私等的安全.与指纹、基因等人体生物特征识别系统相比&#xff0c;人脸识别系统更加友好&#xff0c;不需要人的…

【C语言】位操作符的一些题目与技巧

初学者在学完位操作符之后&#xff0c;总是不能很好的掌握&#xff0c;因此这篇文章旨在巩固对位操作符的理解与使用。 有的题目可能会比较难以接受&#xff0c;但是看完一定会有收获 目录 位操作符&#xff1a;一些题目&#xff1a;不创建临时变量交换整数整数转换二进制中1的…

【校招VIP】产品设计分析之思维整体性

考点介绍&#xff1a; 对于产品分析设计时需要全面的分析用户需求&#xff0c;而产品思维方式的核心是“以问题为核心”&#xff0c;即先多花时间搞清楚要解决的问题究竟是什么&#xff0c;要深入、全面的思考。 『产品设计分析之思维整体性』相关题目及解析内容可点击文章末尾…

揭秘偏向锁的升级

今天开始&#xff0c;我会和大家一起深入学习synchronized的原理&#xff0c;原理部分会涉及到两篇&#xff1a; 偏向锁升级到轻量级锁的过程轻量级锁升级到重量级锁的过程 今天我们先来学习偏向锁升级到轻量级锁的过程。因为涉及到大量HotSpot源码&#xff0c;会有单独的一篇…

从钉钉到金蝶云星空通过接口配置打通数据

从钉钉到金蝶云星空通过接口配置打通数据 对接系统钉钉 钉钉&#xff08;DingTalk&#xff09;是阿里巴巴集团打造的企业级智能移动办公平台&#xff0c;是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能…

python | 将pdf文件转换为图片,这一招就够了

一、背景 部分情况下&#xff0c;需要将 PDF 页面转换为图片&#xff0c;例如 PNG 或 JPEG 格式。 python 的开源库 pdfplumber&#xff0c;提供了将 pdf 文件转换为图片的方法。 如果之前还没有安装和使用过pdfplumber库&#xff0c;pdfplumber的安装及基础使用&#xff0c;可…

【React学习】—SetState的使用(九)

【React学习】—SetState的使用&#xff08;九&#xff09; state的简写方式 state属性总结

PCD格式点云文件结构及在线查看工具

本文档描述了 PCD&#xff08;点云数据&#xff09;文件格式及其在点云库&#xff08;PCL&#xff09;中的使用方式。可以使用NSDT 3DConvert 在线预览查看PCD格式的点云数据文件。 推荐&#xff1a;用 NSDT编辑器 快速搭建可编程3D场景 1、为何定义新的点云数据文件格式&#…