大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动基于事件驱动

news2024/12/23 17:54:45

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink DataSet
  • Flink DataSet 转换操作
  • Flink DataSet 输出
  • 容错机制、对比、发展方向

在这里插入图片描述

Flink Window 背景

Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。

通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。
DataStreamAPI提供了Time和Count的Window。

Flink Window 总览

基本概念

  • Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。
  • Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。
  • 而Window窗口是从Streaming到Batch的一个桥梁。
  • Flink提供了非常完善的窗口机制
  • 在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。
  • 当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页
  • 在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算
  • 窗口可以基于时间驱动、也可以基于事件驱动
  • 同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口
  • Flink要操作窗口,先要将StreamSource转换成WindowedStream

转换步骤

  • 获取流数据源
  • 获取窗口
  • 操作窗口数据
  • 输出窗口数据

滚动时间窗口

在这里插入图片描述

类型特点

将数据依据固定的窗口长度对数据进行切分:

  • 时间对齐
  • 窗口长度固定,没有重叠

Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。

在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。

关键点

  • 事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。
  • 窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。
  • 触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。

基于时间驱动

场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。
启动的主类:

package icu.wzk;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Random;


public class TumblingWindow {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });

        // 基于时间驱动 每隔 10秒 划分一个窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
                .timeWindow(Time.seconds(10));
        timeWindow.apply(new MyTimeWindowFunction()).print();
        env.execute("TumblingWindow");

    }

}

我们实现一个 MyTimeWindowFunction,滚动时间窗口:

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {

    /**
     * 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)
     * @author wzk
     * @date 16:58 2024/7/26

    **/
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        out.collect("key: " + tuple.getField(0) + ", value: " + sum  +
                ", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));
    }
}

基于事件驱动

场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
编写一个启动类:

package icu.wzk;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Random;


public class TumblingWindow {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });

        // 基于时间驱动 每隔 10秒 划分一个窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
                .countWindow(3);
        globalWindow.apply(new MyCountWindowFuntion());
        env.execute("TumblingWindow");

    }

}

编写一个事件驱动的类:MyCountWindowFuntion

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;


import java.text.SimpleDateFormat;


public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {

    /**
     * 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
     * @author wzk
     * @date 17:11 2024/7/26
    **/
    @Override
    public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        // 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间
        long maxTimestamp = window.maxTimestamp();
        out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"
                + maxTimestamp + "," + format.format(maxTimestamp));
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2115252.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

揭秘 AMD GPU 上 PyTorch Profiler 的性能洞察

Unveiling performance insights with PyTorch Profiler on an AMD GPU — ROCm Blogs 2024年5月29日&#xff0c;作者&#xff1a;Phillip Dang。 在机器学习领域&#xff0c;优化性能通常和改进模型架构一样重要。在本文中&#xff0c;我们将深入探讨 PyTorch Profiler&#…

小白建立个人网站初步尝试

一、VScode 代码是在VScode上运行的&#xff0c;可以看作者另一篇文章&#xff1a;http://t.csdnimg.cn/mOmdF 二、代码基本框架 代码解释<!DOCTYPE html>声明为HTML5文档<html><head>头部元素&#xff0c;不显示在页面<meta charset"utf-8"…

数学建模强化宝典(14)Fisher 最优分割法

前言 Fisher最优分割法是一种对有序样品进行聚类的方法&#xff0c;它在分类过程中不允许打破样品的顺序。这种方法的目标是找到一种分割方式&#xff0c;使得各段内样品之间的差异最小&#xff0c;而各段之间的差异最大。以下是关于Fisher最优分割法的详细介绍&#xff1a; 一…

【LeetCode热题100】前缀和

这篇博客共记录了8道前缀和算法相关的题目&#xff0c;分别是&#xff1a;【模版】前缀和、【模版】二维前缀和、寻找数组的中心下标、除自身以外数组的乘积、和为K的子数组、和可被K整除的子数组、连续数组、矩阵区域和。 #include <iostream> #include <vector> …

【408数据结构】散列 (哈希)知识点集合复习考点题目

苏泽 “弃工从研”的路上很孤独&#xff0c;于是我记下了些许笔记相伴&#xff0c;希望能够帮助到大家 知识点 1. 散列查找 散列查找是一种高效的查找方法&#xff0c;它通过散列函数将关键字映射到数组的一个位置&#xff0c;从而实现快速查找。这种方法的时间复杂度平均为…

自我指导:提升语言模型自我生成指令的能力

人工智能咨询培训老师叶梓 转载标明出处 传统的语言模型&#xff0c;尤其是经过指令微调的大型模型&#xff0c;虽然在零样本&#xff08;zero-shot&#xff09;任务泛化上表现出色&#xff0c;但它们高度依赖于人类编写的指令数据。这些数据往往数量有限、多样性不足&#xf…

配置Java(JDK)环境变量

一、配置JDK环境变量 将JDK-22压缩包加压缩到指定目录下面&#xff0c;本机路径是&#xff1a;C:\Program Files\Java&#xff08;可以加压缩到自己的指定路径&#xff0c;记住这个路径&#xff0c;配置环境变量时候要使用&#xff09;。 鼠标右键“此电脑”&#xff0c;点击“…

独立按键单击检测(延时消抖+定时器扫描)

目录 独立按键简介 按键抖动 模块接线 延时消抖 Key.h Key.c 定时器扫描按键代码 Key.h Key.c main.c 思考 MultiButton按键驱动 独立按键简介 ​ 轻触按键相当于一种电子开关&#xff0c;按下时开关接通&#xff0c;松开时开关断开&#xff0c;实现原理是通过轻触…

Spring框架-----ioc

基本概念 Spring 是一个轻量级的,IOC和AOP的一站式Java 开发框架&#xff0c;是为了简化企业级应用开发而生的 轻量级&#xff1a;框架核心模块体积小 IOC:Inversion of Control&#xff08;控制反转&#xff09;把创建对象的控制权反转给Spring框架管理 以前我们程序中需要…

Linux下构建Docker镜像

Docker在Linux构建镜像 Docker是一种轻量级的容器化技术&#xff0c;可以让开发者将应用程序及其所有依赖项打包到一个独立的容器中&#xff0c;从而实现跨平台和快速部署&#xff0c;在Linux系统上&#xff0c;我们可以使用D0cker来构建自己的镜像&#xff0c;并且可以通过简…

RocketMQ学习(三)

文章目录 1. 高级功能1.1 消息存储1.1.1 存储介质关系型数据库DB文件系统 1.1.2 性能对比1.1.3 消息的存储和发送1&#xff09;消息存储2&#xff09;消息发送 1.1.4 消息存储结构1.1.5 刷盘机制1&#xff09;同步刷盘2&#xff09;异步刷盘3&#xff09;配置 1.2 高可用性机制…

软件设计之JavaWeb(1)

软件设计之JavaWeb(1) 此篇应在MySQL之后进行学习: 路线图推荐&#xff1a; 【Java学习路线-极速版】【Java架构师技术图谱】 尚硅谷全新JavaWeb教程&#xff0c;企业主流javaweb技术栈 资料可以去尚硅谷官网免费领取 此章节最好学完JDBC观看 学习内容&#xff1a; XML概述T…

excel翻译软件有哪些?如何高效提翻译?

你是否曾在面对满屏的英文Excel表格时感到头疼&#xff1f;项目报告、数据分析、财务报表... 当这些重要的信息被语言壁垒阻挡时&#xff0c;效率和理解度都会大打折扣。别担心&#xff0c;只需3分钟&#xff0c;我将带你轻松解锁excel翻译成中文的秘籍。 无论是职场新人还是…

解决浏览器自动将http网址转https

删除浏览器自动使用https的方式 在浏览器地址栏输入&#xff1a;chrome://net-internals/#hsts PS:如果是edge浏览器可输入&#xff1a;edge://net-internals/#hsts 在Delete domain security policies搜索框下&#xff0c;输入要删除的域名,然后点击delete 解决方法&#…

VMware中共享文件夹没了怎么办?

1.进入root su root 需要提前设置密码 sudo passwd root 2.创建一个hgfs文件夹&#xff0c;share就在这里面 sudo mkdir /mnt/hgfs/ 3.输入下面的命令 sudo mount -t fuse.vmhgfs-fuse .host:/ /mnt/hgfs -o allow_other 4.然后就能找到share文件夹了&#xff0c;注意每…

CCF推荐B类会议和期刊总结(计算机网络领域)

CCF推荐B类会议和期刊总结&#xff08;计算机网络领域&#xff09; 在计算机网络领域&#xff0c;中国计算机学会&#xff08;CCF&#xff09;推荐的B类会议和期刊代表了该领域的较高水平。以下是对所有B类会议和期刊的总结&#xff0c;包括全称、出版社、dblp文献网址以及所属…

串口通信协议(UART)

简介 uart通讯协议&#xff0c;是一种成本低、容易使用、通信线路简单&#xff0c;可实现两个设备的互相通信的协议&#xff1b;是一种全双工&#xff0c;设备点对点通信的协议。下面从硬件电路、电平标准和串口参数等方面来了解uart通信协议。 硬件电路 硬件电路非常简单&am…

如何限制与管控员工上网行为?五个管控方法让员工效率倍增!

在现代企业中&#xff0c;互联网是工作中不可或缺的工具&#xff0c;但与此同时&#xff0c;员工在工作时间浏览与工作无关的网站、进行网络娱乐等行为&#xff0c;也成为了影响企业生产力和效率的主要因素之一。如何有效限制和管控员工的上网行为&#xff0c;从而提升工作效率…

利士策分享,逆境破局关键:精准策略

利士策分享&#xff0c;逆境破局关键&#xff1a;精准策略 在人生的征途上&#xff0c;逆境如同试炼场&#xff0c;考验着我们的智慧与勇气。 为了在这片试炼场上稳健前行&#xff0c;我们需要一套具体而精准的应对策略。 以下&#xff0c;是结合实践经验与智慧总结的应对策略…