Flink窗口(2)—— Window API

news2024/11/15 8:47:42

目录

窗口分配器

时间窗口

计数窗口

全局窗口

窗口函数

增量聚合函数

全窗口函数(full window functions)

增量聚合和全窗口函数的结合使用

Window API 主要由两部分构成:窗口分配器(Window Assigners)和窗口函数(Window Functions)

stream.keyBy(<key selector>)
 .window(<window assigner>) //指明窗口的类型
 .aggregate(<window function>) //定义窗口具体的处理逻辑

在window()方法中传入一个窗口分配器;

在aggregate()方法中传入一个窗口函数;

窗口分配器

指定窗口的类型,定义数据应该被“分配”到哪个窗口

方法:.window()

参数:WindowAssigner

返回值:WindowedStream

如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是 AllWindowedStream

时间窗口

滚动处理时间窗口

stream.keyBy(...)  
//1..of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //窗口大小
//2.通过设置偏移量offset 来调整起始点的时间戳
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) //窗口大小,偏移量
.aggregate(...)

默认的窗口起始点时间戳是窗口大小的整倍数

如果我们定义 1 天的窗口,默认就从 0 点开始;如果定义 1 小时的窗口,默认就从整点开始

如果不想用默认值,就需要设置好偏移量

偏移量的作用:标准时间戳其实就是1970 年 1 月 1 日 0 时 0 分 0 秒 0 毫秒开始计算的一个毫秒数,而这个时间是以 UTC 时间,也就是 0 时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了

滑动处理时间窗口

stream.keyBy(...)
//窗口大小,滑动步长(同样也可以设置偏移量)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

处理时间会话窗口

stream.keyBy(...)
//超时时间
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

以上是静态设置了超时时间,也可以动态设置:

.window(ProcessingTimeSessionWindows.withDynamicGap(newSessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
 @Override
 public long extract(Tuple2<String, Long> element) { 
// 提取 session gap 值返回, 单位毫秒
//提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔
 return element.f0.length() * 1000;
 }
}

滚动事件时间窗口

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5))) 
.aggregate(...)

滑动事件时间窗口

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

事件时间会话窗口

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

处理时间和事件时间的逻辑完全相同

计数窗口

滚动计数窗口:.countWindow(10) //窗口大小

滑动计数窗口:.countWindow(10,3) //窗口大小,滑动步长

每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果

全局窗口

.window(GlobalWindows.create());

需要自定义触发器

 

窗口函数

WindowedStream——>DataStream

增量聚合函数

像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以

区别在于不立即输出结果,而是要等到窗口结束时间

归约函数(ReduceFunction):和简单聚合时使用的ReduceFunction完全一样


聚合函数(AggregateFunction):取消类型一致的限制,直接基于 WindowedStream 调 用.aggregate()方法,不需要经过map处理;这个方法需要传入一个AggregateFunction 的实现类作为参数,源码如下:

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

    /**
     * Creates a new accumulator, starting a new aggregate.
     *
     * <p>The new accumulator is typically meaningless unless a value is added via {@link
     * #add(Object, Object)}.
     *
     * <p>The accumulator is the state of a running aggregation. When a program has multiple
     * aggregates in progress (such as per key and window), the state (per key and window) is the
     * size of the accumulator.
     *
     * @return A new accumulator, corresponding to an empty aggregate.
     */
    ACC createAccumulator();

    /**
     * Adds the given input value to the given accumulator, returning the new accumulator value.
     *
     * <p>For efficiency, the input accumulator may be modified and returned.
     *
     * @param value The value to add
     * @param accumulator The accumulator to add the value to
     * @return The accumulator with the updated state
     */
    ACC add(IN value, ACC accumulator);

    /**
     * Gets the result of the aggregation from the accumulator.
     *
     * @param accumulator The accumulator of the aggregation
     * @return The final aggregation result.
     */
    OUT getResult(ACC accumulator);

    /**
     * Merges two accumulators, returning an accumulator with the merged state.
     *
     * <p>This function may reuse any of the given accumulators as the target for the merge and
     * return that. The assumption is that the given accumulators will not be used any more after
     * having been passed to this function.
     *
     * @param a An accumulator to merge
     * @param b Another accumulator to merge
     * @return The accumulator with the merged state
     */
    ACC merge(ACC a, ACC b);
}

IN:输入数据类型

ACC:累加器类型

OUT:输出数据类型

AggregateFunction 接口中有四个方法:

除了继承AggregateFunction,自定义聚合函数之外,Flink为我们提供了一系列预定义的简单聚合方法,如sum()/max()/maxBy()/min()/minBy(),可以直接基于WindowedStream调用

全窗口函数(full window functions)

全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算

典型的批处理方式,适用于一些基于全部数据才能进行的运算等等

窗口函数(WindowFunction)

stream
 .keyBy(<key selector>)
 .window(<window assigner>)
    //基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类
 .apply(new MyWindowFunction());

WindowFunction的实现类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口本身的信息

功能可以被 ProcessWindowFunction(处理窗口函数,见下) 全覆盖


处理窗口函数(ProcessWindowFunction)

增强版的 WindowFunction

基于 WindowedStream 调用.process()方法,传入一个 ProcessWindowFunction 的实现类

ProcessWindowFunction的泛型:ProcessWindowFunction<IN,OUT,KEY,W>

分别是输入数据类型,输出数据类型,分区键的类型,Window类型(比如,是时间窗口,就是TimeWindow)

process()方法的定义:

示例代码如下,自定义窗口处理函数来处理数据:

public class UvCountByWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        // 将数据全部发往同一分区,按窗口统计UV
        stream.keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new UvCountByWindow())
                .print();

        env.execute();
    }

     //自定义窗口处理函数
    public static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow>{
        @Override
        public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            HashSet<String> userSet = new HashSet<>();
            // 遍历所有数据,放到Set里去重
            for (Event event: elements){
                userSet.add(event.user);
            }
            // 结合窗口信息,包装输出内容
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end)
                    + " 的独立访客数量是:" + userSet.size());
        }
    }


}

这里的Event是一个POJO类,ClickSource是自定义的数据源,其代码如下:
Event.java:

public class Event {
    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

ClickSource.java: 

public class ClickSource implements SourceFunction<Event> {
    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();    // 在指定的数据集中随机选取数据
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

        while (running) {
            ctx.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            // 隔1秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }

}

增量聚合和全窗口函数的结合使用

增量聚合函数处理计算会更高效;而全窗口函数的优势在于提供了更多的信息

我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction

处理机制:

基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1380886.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

世邦spon IP网络对讲广播系统任意文件上传漏洞

产品介绍 世邦通信IP网络对讲广播系统采用领先的IPAudio™技术,将音频信号以数据包形式在局域网和广域网上进行传送,是一套纯数字传输系统。 漏洞描述 spon IP网络对讲广播系统存在任意文件上传漏洞&#xff0c;攻击者可以通过构造特殊请求包上传恶意后门文件&#xff0c;从…

软件测试|解决Github port 443 : Timed out连接超时的问题

前言 GitHub是全球最大的开源代码托管平台之一&#xff0c;许多开发者和团队使用它来管理和协作开源项目。但在当下&#xff0c;我们在clone或者提交代码时会经常遇到"GitHub Port 443: Timed Out"错误&#xff0c;这意味着我们的电脑无法建立与GitHub服务器的安全连…

UISegmentedControl控件定制

1.在xib中设计如下: 背景颜色: 段标题与数量 : 2.在代码中控制 关联控件 注册控件事件 控件事件处理函数实现: 定制Title颜色 4 --- > UIControlStateSelected 0 --- > UIControlStateNormal 最终实现效果: 取控件选中时的索引与显示文本: 输出:

理想汽车迎来新算力平台负责人,内部化名为张一粟;王者荣耀在抖音直播全面开放;陈楚生等人现身央视春晚彩排

今日精选 • 理想汽车迎来新算力平台负责人,内部化名为张一粟。目前理想内部暂未公布其内部职级• 王者荣耀在抖音直播全面开放• 陈楚生等人现身央视春晚彩排 投融资 • 2023年12月份&#xff0c;中国社会融资规模增量为1.94万亿元• OpenAI 支持的人形机器人公司 1X 完成 …

【C++】- 类和对象(构造函数!!explicit关键字stastic关键字!!详解)

类和对象④ 构造函数初始化列表explicit关键字static成员 构造函数初始化列表 我们已经初步了解了构造函数------->类和对象②那么调用构造函数就是给了对象中各个成员变量一个合适的初始值。 但实际上&#xff0c;我们想要做的是初始化成员变量&#xff0c;在构造函数中对…

【STM32】STM32学习笔记-FlyMCU串口下载和STLINK Utility(30)

00. 目录 文章目录 00. 目录01. 串口简介02. 串口连接电路图03. FlyMCU软件下载程序04. 串口下载原理05. FlyMCU软件其它操作06. STLINK Utility软件07. 软件下载08. 附录 01. 串口简介 串口通讯(Serial Communication)是一种设备间非常常用的串行通讯方式&#xff0c;因为它简…

【MySQL】MySQL表的约束-空属性/默认值/列属性/zerofill/主键/自增长/唯一键/外键

文章目录 表的约束1.空属性 --null && not null2.默认值 -- default3.列描述4.zerofill5.主键6.自增长7.唯一键8.外键 表的约束 表的约束&#xff1a;表中一定要有各种约束&#xff0c;通过约束&#xff0c;让我们未来插入数据库表中的数据是符合预期的。约束的本质是…

成就动机测试

成就动机测试广泛应用在职业发展领域&#xff0c;如&#xff1a;企业Hr人力资源管理部门&#xff0c;用于评估分析员工的潜能和价值&#xff0c;适用场景有人才招聘&#xff0c;岗位晋升&#xff0c;绩效考评等等。在大学生做职业规划&#xff0c;求职应聘中&#xff0c;应用成…

UG装配体组件重命名与导出组件

在一个装配文件中&#xff0c;如果我们想对其中一个零件的名称进行更改&#xff0c;可以打开单独文件然后另存为改名&#xff0c;或者直接改名后在装配体中进行替换&#xff0c;但是这样这样都是比较麻烦 我们可以使用零组件更名及导出命令 菜单-GC工具箱-GC数据规范-其他工具…

哪里能找到好用的PPT模板?12个免费模板网站让你畅快办公!

你是否有过这样的经历&#xff0c;在准备重要会议或者演讲的时候&#xff0c;为找不到合适的PPT模板而困扰&#xff1f;或是在网上漫无目的地搜寻&#xff0c;结果收获的是设计平淡无奇的PPT模板&#xff1f; 如果你有同样的疑问&#xff0c;那么你来对地方了&#xff01;在这…

PADS 改变图纸和图页边界大小

PADS 改变图纸和图页边界大小 有时候画一画原理图发现画布不够用了&#xff0c;可改变图纸大小&#xff0c;对应的改变图页边界 若图页边界怎么选择都改变不了&#xff0c;可将途中图页边界删除&#xff0c;重新加载 选择对应的图页边距就好啦 分类: PADS

【LeetCode:30. 串联所有单词的子串 | 滑动窗口 + 哈希表】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

嵌套的CMake

hehedalinux:~/Linux/multi-v1$ tree . ├── calc │ ├── add.cpp │ ├── CMakeLists.txt │ ├── div.cpp │ ├── mult.cpp │ └── sub.cpp ├── CMakeLists.txt ├── include │ ├── calc.h │ └── sort.h ├── sort │ ├── …

(三)CMake为什么几乎一统C++跨平台构建?

先看几个简单的例子再回头来看这个问题 回想一下当我们用windows写C第一个Hello World!的步骤&#xff0c;先用VS IDE 创建一个控制台的工程&#xff0c;IDE 会自动生成一个 cpp 文件&#xff0c;里面有一句 输出"Hello World!" 代码&#xff0c;这个时候按下F5 就可…

如何生成文本: 通过 Transformers 用不同的解码方法生成文本

如何生成文本: 通过 Transformers 用不同的解码方法生成文本 假设 $p0.92$&#xff0c;Top-p 采样对单词概率进行降序排列并累加&#xff0c;然后选择概率和首次超过 $p92%$ 的单词集作为采样池&#xff0c;定义为 $V_{\text{top-p}}$。在 $t1$ 时 $V_{\text{top-p}}$ 有 9 个…

GULP 案例 4:如何计算热力学性质(热容、熵、焓、自由能等)?

---------------------------------------------------------------------- 物体的热力学性质是指物质处于平衡状态下压力 P、体积 V、温度 T、组成以及其他的热力学函数之间的变化规律。一般将材料的压力 P、体积 V、温度 T、内能 U、焓 H、熵 S 等统称为物体热力学性质。 热…

Android Lint的使用

代码检查方式一&#xff1a; Android Studio使用Lint进行代码检查 找到Analyze目录下的Inspect Code检查代码选项点击然后弹出下面这个框框&#xff0c;在这个列表选项中我们可以选择Inspect Code的范围&#xff0c;点击OK 待分析完毕后&#xff0c;我们可以在Inspection栏目中…

mysql数据库被黑恢复—应用层面delete删除---惜分飞

客户的mysql被人从应用层面攻击,并且删除了一些数据,导致业务无法正常使用,通过底层分析binlog确认类似恢复操作 确认这类的业务破坏是通过delete操作实现的,客户那边不太幸,客户找了多人进行恢复,现场严重破坏,老库被删除,并且还原了历史的备份文件(非故障第一现场),通过底层…

iOS Universal Links(通用链接)详细教程

一&#xff1a;Universal Links是用来做什么的&#xff1f; iOS9.0推出的用于应用之间跳转的一种机&#xff0c; 通过一个https的链接启动app。如果手机有安装需要启动的app&#xff0c;可实现无缝跳转。如果没有安装&#xff0c;会打开网页。 实现场景&#xff1a;微信链接无…

分布式系统的三字真经CAP

文章目录 前言C&#xff08;Consistency 数据一致性&#xff09;A&#xff08;Availability 服务可用性&#xff09;P&#xff08;Partition Tolerance 分区容错性&#xff09;CAP理论最后 前言 你好&#xff0c;我是醉墨居士&#xff0c;我一起探索一下分布式系统的三字真经C…