Flink窗口分配器WindowAssigner

news2024/11/25 14:56:36

前言

Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。

WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。

当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。

WindowAssigner

先看一下 WindowAssigner 抽象类的定义:

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    public WindowAssigner() {
    }

    public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);

    public Trigger<T, W> getDefaultTrigger() {
        return this.getDefaultTrigger(new StreamExecutionEnvironment());
    }

    /** @deprecated */
    @Deprecated
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);

    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);

    public abstract boolean isEventTime();

    @PublicEvolving
    public abstract static class WindowAssignerContext {
        public WindowAssignerContext() {
        }

        public abstract long getCurrentProcessingTime();
    }
}

四个方法,作用如下:

  • assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
  • getDefaultTrigger 返回默认的窗口触发器 Trigger
  • getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
  • isEventTime 是否基于事件时间语义

Flink 内置的 WindowAssigner 实现类关系图如下:

首先,可以按照基于何种时间语义划分出三大类:

  • 基于事件时间语义
  • 基于处理时间语义
  • 不基于时间语义 --> GlobalWindows

在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:

  • 滚动窗口分配算法 tumbling windows
  • 滑动窗口分配算法 sliding windows
  • 会话窗口分配算法 session windows

定义窗口Window

窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window,Flink 内置了两种实现,分别是:

  • TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
  • GlobalWindow 全局窗口,与时间无关的窗口

如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。

如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。

public class NumberWindow extends Window {
    private final int min;
    private final int max;

    public NumberWindow(int min, int max) {
        this.min = min;
        this.max = max;
    }

    public int getMin() {
        return min;
    }

    public int getMax() {
        return max;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        NumberWindow that = (NumberWindow) o;
        return min == that.min && max == that.max;
    }

    @Override
    public int hashCode() {
        return Objects.hash(min, max);
    }

    @Override
    public long maxTimestamp() {
        return Long.MAX_VALUE;
    }
}

Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。

public static class Serializer extends TypeSerializerSingleton<NumberWindow> {

    @Override
    public boolean isImmutableType() {
        return true;
    }

    @Override
    public NumberWindow createInstance() {
        return new NumberWindow(0, 0);
    }

    @Override
    public NumberWindow copy(NumberWindow numberWindow) {
        return numberWindow;
    }

    @Override
    public NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {
        return numberWindow;
    }

    @Override
    public int getLength() {
        return 8;
    }

    @Override
    public void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {
        dataOutputView.writeInt(numberWindow.getMin());
        dataOutputView.writeInt(numberWindow.getMax());
    }

    @Override
    public NumberWindow deserialize(DataInputView dataInputView) throws IOException {
        return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());
    }

    @Override
    public NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {
        return this.deserialize(dataInputView);
    }

    @Override
    public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
        dataOutputView.writeInt(dataInputView.readInt());
        dataOutputView.writeInt(dataInputView.readInt());
    }

    @Override
    public TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {
        return new TimeWindowSerializerSnapshot();
    }

    public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {

        public TimeWindowSerializerSnapshot() {
            super(Serializer::new);
        }
    }
}

自定义WindowAssigner

窗口对象定义好了,接下来就是定义窗口分配对象。

简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。

public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {

    private final int startingMedian;
    private final int startingLarge;

    public MyWindowAssigner(int startingMedian, int startingLarge) {
        this.startingMedian = startingMedian;
        this.startingLarge = startingLarge;
    }

    @Override
    public Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {
        // 将数字划分到 小数、中位数、大数 窗口
        NumberWindow window;
        if (element < startingMedian) {
            window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);
        } else if (element < startingLarge) {
            window = new NumberWindow(startingMedian, startingLarge - 1);
        } else {
            window = new NumberWindow(startingLarge, Integer.MAX_VALUE);
        }
        return List.of(window);
    }

    @Override
    public Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
        return null;
    }

    @Override
    public TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new NumberWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }
}

把流程串起来

窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。

如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.addSource(new SourceFunction<Integer>() {
                @Override
                public void run(SourceContext<Integer> sourceContext) throws Exception {
                    while (true) {
                        Threads.sleep(100);
                        sourceContext.collect(ThreadLocalRandom.current().nextInt(100));
                    }
                }

                @Override
                public void cancel() {

                }
            }).keyBy(i -> "all")
            .window(new MyWindowAssigner(20, 80))
            .trigger(new Trigger<Integer, NumberWindow>() {

                @Override
                public TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
                    ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));
                    Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;
                    if (count < 10) {
                        countState.update(count);
                        return TriggerResult.CONTINUE;
                    }
                    countState.update(0);
                    return TriggerResult.FIRE_AND_PURGE;
                }

                @Override
                public TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
                    return null;
                }

                @Override
                public TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
                    return null;
                }

                @Override
                public void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {

                }
            }).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {
                @Override
                public void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {
                    StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");
                    int sum = 0;
                    for (Integer value : iterable) {
                        builder.append(value + ",");
                        sum += value;
                    }
                    builder.append("] sum=" + sum);
                    System.err.println(builder.toString());
                }
            });
    environment.execute();
}

运行 Flink 作业,控制台输出:

[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344

尾巴

Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2220070.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JAVA线程的多种状态

线程的状态图 new状态&#xff08;新建状态&#xff09;&#xff1a; 创建了一个线程的对象&#xff0c;但是这个线程没有启动start,那么此时这个线程的状态就是NEW也就是新建状态 此时线程对象就是一个普通的JAVA对象&#xff0c;CPU还没有给其分配资源 public class Main16 {…

微信小程序案例:计算器(含代码)

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

动态中的守候:滑动窗口与距离的诗篇

公主请阅 1. 长度最小的子数组1.1 题目说明 示例 1 示例 1 示例 2 示例 3 1.2 题目分析1.3 代码部分1.4 代码分析 2. 无重复字符的最长子串2.1 题目说明示例 1示例 1示例 2示例 3 2.2 题目分析2.3 代码部分2.4 代码分析2.5 代码深度分析 1. 长度最小的子数组 题目传送门 1.1 题…

2020年计算机网络408真题解析

第一题&#xff1a; 解析&#xff1a;OSI参考模型网络协议的三要素 网络协议的三要素&#xff1a;语法 &#xff0c;语义&#xff0c;同步&#xff08;时序&#xff09; 语法&#xff1a;定义收发双方所交换信息的格式 语法&#xff1a;定义收发双方所要完成的操作 网页的加载 …

「iOS」——YYModel学习

iOS学习 前言优势使用方法简单的Model与JSON互转多样化的数据类型交换容器类数据交换 model中包含其他model白名单与黑名单 总结 前言 YYModel是YYKit的高效组件之一&#xff0c;在实际场景中的非常实用&#xff0c;在项目中使用MVC架构时&#xff0c;可以简化数据处理。在性能…

Tailwind Starter Kit 一款极简的前端快速启动模板

Tailwind Starter Kit 是基于TailwindCSS实现的一款开源的、使用简单的极简模板扩展。会用Tailwincss就可以快速入手使用。Tailwind Starter Kit 是免费开源的。它不会在原始的TailwindCSS框架中更改或添加任何CSS。它具有多个HTML元素&#xff0c;并附带了ReactJS、Vue和Angul…

tensorflow c++ api + windwos + vs部署 详细避坑

文章目录 前言一、安装MSYS2二、选择tensorflow的版本三、安装Bazel四、配置一个anconda的tensorflow环境五、生成dll,lib,include六、在vs2019中配置项目七、测试并针对性修补问题 前言 不能使用vs2022配置tensorflow c api&#xff0c;即使要安装 2.10.0版本&#xff0c;也尽…

【Next.js 项目实战系列】02-创建 Issue

原文链接 CSDN 的排版/样式可能有问题&#xff0c;去我的博客查看原文系列吧&#xff0c;觉得有用的话&#xff0c;给我的库点个star&#xff0c;关注一下吧 上一篇【Next.js 项目实战系列】01-创建项目 创建 Issue 配置 MySQL 与 Prisma​ 在数据库中可以找到相关内容&…

机器学习篇-day09-支持向量机SVM

一. 支持向量机介绍 支持向量机 介绍 SVM全称是Supported Vector Machine&#xff08;支持向量机&#xff09; 即寻找到一个超平面使样本分成两类&#xff0c;并且间隔最大。 是一种监督学习算法&#xff0c;主要用于分类&#xff0c;也可用于回归 与逻辑回归和决策树等其…

Android摄像头Camera2和Camera1的一些总结

Android 系统对摄像头的同时使用有限制&#xff0c;不能同时使用摄像头进行预览或者录制音视频。 例如&#xff1a;界面上有两个SurfaceView, 这两个SurfaceView不能同时预览或者录制音视频&#xff0c;只能有一个正常工作&#xff08;一个SurfaceView预览前置摄像头&#xff…

Linux 问题故障定位的技巧大全

1、背景 有时候会遇到一些疑难杂症,并且监控插件并不能一眼立马发现问题的根源。这时候就需要登录服务器进一步深入分析问题的根源。那么分析问题需要有一定的技术经验积累,并且有些问题涉及到的领域非常广,才能定位到问题。所以,分析问题和踩坑是非常锻炼一个人的成长和提…

Mybatis day 1020

ok了这周学习了mybatis框架&#xff0c;今天最后一天&#xff0c;加油各位&#xff01;&#xff01;&#xff01;(接上文) 八.MyBatis扩展 8.1 Mapper批量映射优化 需求 Mapper 配置文件很多时&#xff0c;在全局配置文件中一个一个注册太 麻烦&#xff0c;希望有一个办法…

MFC工控项目实例二十六创建数据库

承接专栏《MFC工控项目实例二十五多媒体定时计时器》 用选取的型号为文件名建立文件夹&#xff0c;再在下面用测试的当天的时间创建文件夹&#xff0c;在这个文件中用测试的时/分/秒为数据库名创建Adcess数据库。 1、在StdAfx.h文件最下面添加代码 #import "C:/Program F…

Ubuntu下安装Bochs2.7

文章目录 前言下载安装在Bochs实现最简单的操作系统创建软盘编写并编译汇编指令编写bochs配置文件将操作系统写入到软盘启动操作系统 前言 通过自带软件库sudo apt-get install bochs bochs-x安装的Bochs运行时不显示任何内容&#xff0c;这里选用源码安装方式。 下载安装 …

Atlas800昇腾服务器(型号:3000)—AIPP加速前处理(四)

服务器配置如下&#xff1a; CPU/NPU&#xff1a;鲲鹏 CPU&#xff08;ARM64&#xff09;A300I pro推理卡 系统&#xff1a;Kylin V10 SP1【下载链接】【安装链接】 驱动与固件版本版本&#xff1a; Ascend-hdk-310p-npu-driver_23.0.1_linux-aarch64.run【下载链接】 Ascend-…

CSS 居中那些事

一、父子元素高度确定 简单粗暴, 直接通过设置合适的 padding 或 margin 实现居中 <style>.p {padding: 20px 0;background: rgba(255, 0, 0, 0.1);}.c {width: 40px;height: 20px;background: blue;} </style> <div class"p"><div class"…

服务器模块测试

目录 测试逻辑 测试工具 测试 测试逻辑 我们可以使用一个简单的业务处理逻辑来进行测试。 最简单的&#xff0c;我们业务逻辑就直接返回一个固定的字符串 void Message(const PtrConnection&con,Buffer* inbuffer) //模拟用户新数据回调 {inbuffer->MoveReadOf…

Vite 前端开发的超级加速器 - 从入门到精通

大家好&#xff01;今天我们来聊聊前端开发中的一个革命性工具 - Vite。如果你觉得你的前端开发速度慢得像蜗牛爬&#xff0c;那么Vite就是为你量身打造的超级加速器&#xff01; 一、什么是Vite&#xff1f; Vite&#xff08;法语意为"快速"&#xff09;是一个现代化…

LDR6500芯片:引领USB-C拓展坞转接器新风

在当今这个数字化浪潮汹涌澎湃的时代&#xff0c;手机和电脑已然深深融入我们生活的每一个角落&#xff0c;成为了不可或缺的关键工具。然而&#xff0c;不得不承认的是&#xff0c;它们所配备的接口数量往往有限&#xff0c;难以充分满足我们日益多样化、丰富化的需求。正因如…

5G 现网信令参数学习(1) - MIB

MIB消息中的参数 systemFrameNumber 000101B, subCarrierSpacingCommon scs30or120, ssb-SubcarrierOffset 6, dmrs-TypeA-Position pos2, pdcch-ConfigSIB1 { controlResourceSetZero 10, searchSpaceZero 4 }, cellBarred notBarred, intraFreqReselection allowed, sp…