flink学习(7)——window

news2024/12/1 10:26:16

 概述

窗口的长度(大小): 决定了要计算最近多长时间的数据

窗口的间隔: 决定了每隔多久计算一次

举例:每隔10min,计算最近24h的热搜词,24小时是长度,每隔10分钟是间隔。

窗口的分类

1、根据window前是否调用keyBy分为键控窗口和非键控窗口

2、根据window中参数的配置分为基于时间的,基于条数的,会话窗口

SlidingProcessingTimeWindows —— 滑动窗口,按照处理时间

TumblingProcessingTimeWindows —— 滚动窗口,按照处理时间

ProcessingTimeSessionWindows —— 会话窗口

 Keyed Window --键控窗口

// Keyed Window
stream
        .keyBy(...)              <-  按照一个Key进行分组
        .window(...)            <-  将数据流中的元素分配到相应的窗口中
        [.trigger(...)]            <-  指定触发器Trigger(可选)
        [.evictor(...)]            <-  指定清除器Evictor(可选)
        .reduce/aggregate/process/apply()      <-  窗口处理函数Window Function

Non-Keyed Window

// Non-Keyed Window
stream
        .windowAll(...)         <-  不分组,将数据流中的所有元素分配到相应的窗口中
        [.trigger(...)]            <-  指定触发器Trigger(可选)
        [.evictor(...)]            <-  指定清除器Evictor(可选)
        .reduce/aggregate/process()      <-  窗口处理函数Window Function

方括号([…]) 中的命令是可选的。

首先:我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。

经过keyBy的数据流将形成多组数据,下游算子的多个实例可以并行计算。

windowAll不对数据流进行分组,所有数据将发送到下游算子单个实例上。

决定是否分组之后,窗口的后续操作基本相同。

经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1

Flink窗口的骨架结构中有两个必须的两个操作:

  • 使用窗口分配器(WindowAssigner)将数据流中的元素分配到对应的窗口。

  • 当满足窗口触发条件后,对窗口内的数据使用窗口处理函数(Window Function)进行处理,常用的Window Function有reduce、aggregate、process。

其他的trigger、evictor则是窗口的触发和销毁过程中的附加选项,主要面向需要更多自定义的高级编程者,如果不设置则会使用默认的配置。

 

 

基于时间的窗口 

滚动窗口- TumblingWindow概念

package com.bigdata.day04;

public class _01_windows {
    /**
     * 1、实时统计每个红绿灯通过的汽车数量
     * 2、实时统计每个红绿灯每个1分钟,统计最近1分钟通过的汽车数量 ——滚动
     * 3、实时统计每个红绿灯每个1分钟,统计最近2分钟通过的汽车数量 ——滑动
     */

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8889);
        //3. transformation-数据处理转换
        socketStream.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> map(String line) throws Exception {
                String[] words = line.split(" ");
                return new Tuple2<>(Integer.parseInt(words[0]),Integer.parseInt(words[1]));
            }
        }).keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
            @Override
            public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
                return value.f0;
            }
        })
        // 基于这个部分实现 滚动窗口 每一分钟 统计前一分钟的数据
        .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
        .sum(1).print();

        env.execute();
    }
}

滑动窗口– SlidingWindow概念 

package com.bigdata.day04;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;


/**
 * @基本功能:
 * @program:flinkProject
 * @author: jinnian
 * @create:2024-11-25 10:13:46
 **/
public class _01_windows {
    /**
     * 1、实时统计每个红绿灯通过的汽车数量
     * 2、实时统计每个红绿灯每个1分钟,统计最近1分钟通过的汽车数量 ——滚动
     * 3、实时统计每个红绿灯每个1分钟,统计最近2分钟通过的汽车数量 ——滑动
     */

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8889);
        //3. transformation-数据处理转换
        socketStream.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> map(String line) throws Exception {
                String[] words = line.split(" ");
                return new Tuple2<>(Integer.parseInt(words[0]),Integer.parseInt(words[1]));
            }
        }).keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
            @Override
            public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
                return value.f0;
            }
        })
        // 基于这一部分实现,每30秒统计前一分钟的数据,大的在前,小的在后
         .window(SlidingProcessingTimeWindows.of(Time.minutes(1),Time.seconds(30)))
        .sum(1).print();



        //5. execute-执行
        env.execute();
    }
}
如何显示窗口时间——apply

——apply将reduce替代

kafka生产数据
package com.bigdata.day04;

public class _02_kafka生产数据 {
    public static void main(String[] args) throws InterruptedException {
        // Properties 它是map的一种
        Properties properties = new Properties();
        // 设置连接kafka集群的ip和端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        String[] arr = {"联通换猫","遥遥领先","恒大歌舞团","恒大足球队","郑州烂尾楼"};
        Random random = new Random();
        for (int i = 0; i < 5000; i++) {
            int index = random.nextInt(arr.length);
            ProducerRecord<String, String> record = new ProducerRecord<>("edu", arr[index]);
            producer.send(record);
            Thread.sleep(30);
        }
    }
}
flink消费数据
package com.bigdata.day04;

public class _02_flink消费数据 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "gw2");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("edu",new SimpleStringSchema(),properties);
        DataStreamSource<String> source = env.addSource(consumer);
        source.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(String value) throws Exception {
                return Tuple2.of(value,1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }

        }).window(SlidingProcessingTimeWindows.of(Time.minutes(1),Time.seconds(30)))
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
         /*
         *
         *
         */
                .apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
            @Override
            public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                StringBuilder sb = new StringBuilder();
                long start = window.getStart();
                long end = window.getEnd();
                String startStr = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                String endStr = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
                int sum = 0;
                for (Tuple2<String, Integer> tuple2 : input) {
                    sum +=tuple2.f1;
                }
                sb.append("开始时间:"+startStr+",").append("结束时间:"+endStr+",").append("key: "+key+ ",").append("数量:"+sum);

                out.collect(sb.toString());

            }
        }).print();

        env.execute();
    }
}

基于条数的窗口——countWindow

package com.bigdata.day04;

public class _04_agg函数 {
    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2. source-加载数据
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);
        // 此时我要获取每个班级的平均成绩
        // 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)
        // IN——Tuple3<String, String, Long>
        // ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩
        // OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩
        dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {
        
            // 初始化一个 累加器
            @Override
            public Tuple3<String, Integer, Long> createAccumulator() {
                return Tuple3.of(null,0,0L);
            }


            // 累加器和输入的值进行累加
            // Tuple3<String, String, Long> value 第一个是传入的值
            // Tuple3<String, Integer, Long> accumulator 第二个是累加器的值
            @Override
            public Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {

                return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);
            }

            // 获取结果——在不同节点的结果进行汇总后实现
            @Override
            public Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {

                return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);
            }


            // 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总
            // 即累加器之间的累加
            @Override
            public Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {
                return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

会话窗口

package com.bigdata.day04;

public class _03_会话窗口 {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("localhost", 8889);

        source.map(new MapFunction<String, Tuple2<String,Integer>>() {

            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] s = value.split(" ");

                return Tuple2.of(s[0],Integer.valueOf(s[1]));
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
            // 1、主要就是 ProcessingTimeSessionWindows 参数的使用
            // 2、使用 EventTimeSessionWindows的时候,若没有水印就不会有结果
        }).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of(value1.f0,value1.f1+value2.f1);
            }
        }).print();

        env.execute();

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2251049.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot 的 WebClient 实践教程

什么是 WebClient&#xff1f; 在 Spring Boot 中&#xff0c;WebClient 是 Spring WebFlux 提供的一个非阻塞、响应式的 HTTP 客户端&#xff0c;用于与 RESTful 服务或其他 HTTP 服务交互。相比于传统的 RestTemplate&#xff0c;WebClient 更加现代化&#xff0c;具有异步和…

二叉搜索树讲解

二叉搜索树概念和定义 二叉搜索树是一个二叉树&#xff0c;其中每个节点的值都满足以下条件&#xff1a; 节点的左子树只包含小于当前节点值的节点。节点的右子树只包含大于当前节点值的节点。左右子树也必须是二叉搜索树。 二叉树搜索树性质 从上面的二叉搜索树定义中可以了…

FinalShell工具数据备份升级、密码解密方法

前言 FinalShell 作为国产的服务器管理工具和远程终端软件。一个一体化的运维工具&#xff0c;在国内运维人员中还是比较受欢迎。它整合了多个常用功能&#xff0c;界面友好&#xff0c;使用方便。不过它是一个闭源的商业软件&#xff0c;虽然提供免费版本&#xff0c;但部分高…

241130_昇思MindSpore函数式自动微分

241130_昇思MindSpore函数式自动微分 函数式自动微分是Mindspore学习框架所特有的&#xff0c;更偏向于数学计算的习惯。这里也是和pytorch差距最大的部分&#xff0c;具体体现在训练部分的代码&#xff0c;MindSpore是把各个梯度计算、损失函数计算 在这幅图中&#xff0c;右…

菱形打印(Python)

“以块组合块”&#xff0c;以行凝结循环打印。 (笔记模板由python脚本于2024年11月30日 19:55:22创建&#xff0c;本篇笔记适合正在学习python循环的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”…

【QT入门到晋级】QT项目打生产环境包--(Linux和window)

前言 使用QTcreator完成正常编译后&#xff0c;在构建目录中有可执行程序生成&#xff0c;如果直接把可执行程序拷贝到干净的生产环境上是无法运行成功的&#xff0c;使用ldd&#xff08;查看程序依赖包&#xff09;会发现缺失很多QT的特性包&#xff0c;以及将介绍国产Linux桌…

Super Vlan与Mux Vlan

SuperVlan VLAN Aggregation&#xff0c; 也称 Super-VLAN : 指 在一个物理网络内&#xff0c;用多个 VLAN &#xff08;称为 Sub-VLAN &#xff09;隔离 广播域&#xff0c;并将这些 Sub-VLAN 聚合成一个逻辑的 VLAN &#xff08;称为 Super-VLAN &#xff09;&#xff0c;这…

蓝牙定位的MATLAB程序,四个锚点、三维空间

这段代码通过RSSI信号强度实现了在三维空间中的蓝牙定位&#xff0c;展示了如何使用锚点位置和测量的信号强度来估计未知点的位置。代码涉及信号衰减模型、距离计算和最小二乘法估计等基本概念&#xff0c;并通过三维可视化展示了真实位置与估计位置的关系。 目录 程序描述 运…

Hutool 秒速实现 2FA 两步验证

前言 随着网络安全威胁的日益复杂&#xff0c;传统的用户名和密码认证方式已不足以提供足够的安全保障。为了增强用户账户的安全性&#xff0c;越来越多的应用和服务开始采用多因素认证&#xff08;MFA&#xff09;。基于时间的一次性密码&#xff08;TOTP, Time-based One-Ti…

【继承】—— 我与C++的不解之缘(十九)

前言&#xff1a; 面向对象编程语言的三大特性&#xff1a;封装、继承和多态 本篇博客来学习C中的继承&#xff0c;加油&#xff01; 一、什么是继承&#xff1f; ​ 继承(inheritance)机制是⾯向对象程序设计使代码可以复⽤的最重要的⼿段&#xff0c;它允许我们在保持原有类…

【目标跟踪】Anti-UAV数据集详细介绍

Anti-UAV数据集是在2021年公开的专用于无人机跟踪的数据集&#xff0c;该数据集采用RGB-T图像对的形式来克服单个类型视频的缺点&#xff0c;包含了318个视频对&#xff0c;并提出了相应的评估标准&#xff08;the state accurancy, SA)。 文章链接&#xff1a;https://arxiv.…

偏差-方差权衡(Bias–Variance Tradeoff):理解监督学习中的核心问题

偏差-方差权衡&#xff08;Bias–Variance Tradeoff&#xff09;&#xff1a;理解监督学习中的核心问题 在机器学习中&#xff0c;我们希望构建一个能够在训练数据上表现良好&#xff0c;同时对未见数据也具有强大泛化能力的模型。然而&#xff0c;模型的误差&#xff08;尤其…

Figma入门-原型交互

Figma入门-原型交互 前言 在之前的工作中&#xff0c;大家的原型图都是使用 Axure 制作的&#xff0c;印象中 Figma 一直是个专业设计软件。 最近&#xff0c;很多产品朋友告诉我&#xff0c;很多原型图都开始用Figma制作了&#xff0c;并且很多组件都是内置的&#xff0c;对…

Windows系统怎么把日历添加在桌面上用来记事?

在众多电脑操作系统中&#xff0c;Windows系统以其广泛的用户基础和强大的功能&#xff0c;成为许多人的首选。对于习惯于在电脑前工作和学习的用户来说&#xff0c;能够直接在桌面上查看和记录日历事项&#xff0c;无疑会大大提高工作效率和生活便利性。今天&#xff0c;就为大…

蓝桥杯备赛笔记(一)

这里的笔记是关于蓝桥杯关键知识点的记录&#xff0c;有别于基础语法&#xff0c;很多内容只要求会用就行&#xff0c;无需深入掌握。 文章目录 前言一、编程基础1.1 C基础格式和版本选择1.2 输入输出cin和cout&#xff1a; 1.3 string以下是字符串的一些简介&#xff1a;字符串…

大数据新视界 -- 大数据大厂之 Hive 数据压缩:优化存储与传输的关键(上)(19/ 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

RNN And CNN通识

CNN And RNN RNN And CNN通识一、卷积神经网络&#xff08;Convolutional Neural Networks&#xff0c;CNN&#xff09;1. 诞生背景2. 核心思想和原理&#xff08;1&#xff09;基本结构&#xff1a;&#xff08;2&#xff09;核心公式&#xff1a;&#xff08;3&#xff09;关…

求整数的和与均值

求整数的和与均值 C语言代码C 代码Java代码Python代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 读入n&#xff08;1 < n < 10000&#xff09;个整数&#xff0c;求它们的和与均值。 输入 输入第一行是一个整数n&#xff0c;…

配置idea环境进行scala编程

这里用的jdk是jdk-8u161,scala版本是2.12.0 在d盘新建一个本地仓库用来存放下载的maven包&#xff0c;在里面创建如下两个文件 更改settings文件为下面的样子 点击左下角的设置&#xff0c;更改maven本地仓库的位置&#xff08;默认在c盘用户目录下的.m2文件中&#xff0c;更改…

WSL简介与安装流程(Windows 下的 Linux 子系统)

目录 1.wsl安装 1.1 WSL简介 1.1.1 WSL 的主要功能 1.1.2 WSL 的版本 1.1.3 为什么使用 WSL&#xff1f; 1.1.4 WSL 的工作原理 1.1.5 WSL 的常见使用场景 1.1.6 与虚拟机的区别 1.1.7 适合使用 WSL 的人群 1.2 启用 WSL 1.2.1 打开 PowerShell&#xff08;管理员模…