Flink AggregateFunction窗口函数,merge何时执行

news2024/11/25 10:36:19

1.前言

在我们使用Flink DataStream API编写业务代码时,aggregate()算子和AggregateFunction无疑是非常常用的。编写一个AggregateFunction需要实现4个方法:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/**
 * The {@code AggregateFunction} is a flexible aggregation function, characterized by the following
 * features:
 *
 * <ul>
 *   <li>The aggregates may use different types for input values, intermediate aggregates, and
 *       result type, to support a wide range of aggregation types.
 *   <li>Support for distributive aggregations: Different intermediate aggregates can be merged
 *       together, to allow for pre-aggregation/final-aggregation optimizations.
 * </ul>
 *
 * <p>The {@code AggregateFunction}'s intermediate aggregate (in-progress aggregation state) is
 * called the <i>accumulator</i>. Values are added to the accumulator, and final aggregates are
 * obtained by finalizing the accumulator state. This supports aggregation functions where the
 * intermediate state needs to be different than the aggregated values and the final result type,
 * such as for example <i>average</i> (which typically keeps a count and sum). Merging intermediate
 * aggregates (partial aggregates) means merging the accumulators.
 *
 * <p>The AggregationFunction itself is stateless. To allow a single AggregationFunction instance to
 * maintain multiple aggregates (such as one aggregate per key), the AggregationFunction creates a
 * new accumulator whenever a new aggregation is started.
 *
 * <p>Aggregation functions must be {@link Serializable} because they are sent around between
 * distributed processes during distributed execution.
 *
 * <h1>Example: Average and Weighted Average</h1>
 *
 * <pre>{@code
 * // the accumulator, which holds the state of the in-flight aggregate
 * public class AverageAccumulator {
 *     long count;
 *     long sum;
 * }
 *
 * // implementation of an aggregation function for an 'average'
 * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
 *
 *     public AverageAccumulator createAccumulator() {
 *         return new AverageAccumulator();
 *     }
 *
 *     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
 *         a.count += b.count;
 *         a.sum += b.sum;
 *         return a;
 *     }
 *
 *     public AverageAccumulator add(Integer value, AverageAccumulator acc) {
 *         acc.sum += value;
 *         acc.count++;
 *         return acc;
 *     }
 *
 *     public Double getResult(AverageAccumulator acc) {
 *         return acc.sum / (double) acc.count;
 *     }
 * }
 *
 * // implementation of a weighted average
 * // this reuses the same accumulator type as the aggregate function for 'average'
 * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
 *
 *     public AverageAccumulator createAccumulator() {
 *         return new AverageAccumulator();
 *     }
 *
 *     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
 *         a.count += b.count;
 *         a.sum += b.sum;
 *         return a;
 *     }
 *
 *     public AverageAccumulator add(Datum value, AverageAccumulator acc) {
 *         acc.count += value.getWeight();
 *         acc.sum += value.getValue();
 *         return acc;
 *     }
 *
 *     public Double getResult(AverageAccumulator acc) {
 *         return acc.sum / (double) acc.count;
 *     }
 * }
 * }</pre>
 *
 * @param <IN> The type of the values that are aggregated (input values)
 * @param <ACC> The type of the accumulator (intermediate aggregate state).
 * @param <OUT> The type of the aggregated result
 */
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

    /**
     * Creates a new accumulator, starting a new aggregate.
     *
     * <p>The new accumulator is typically meaningless unless a value is added via {@link
     * #add(Object, Object)}.
     *
     * <p>The accumulator is the state of a running aggregation. When a program has multiple
     * aggregates in progress (such as per key and window), the state (per key and window) is the
     * size of the accumulator.
     *
     * @return A new accumulator, corresponding to an empty aggregate.
     */
    ACC createAccumulator();

    /**
     * Adds the given input value to the given accumulator, returning the new accumulator value.
     *
     * <p>For efficiency, the input accumulator may be modified and returned.
     *
     * @param value The value to add
     * @param accumulator The accumulator to add the value to
     * @return The accumulator with the updated state
     */
    ACC add(IN value, ACC accumulator);

    /**
     * Gets the result of the aggregation from the accumulator.
     *
     * @param accumulator The accumulator of the aggregation
     * @return The final aggregation result.
     */
    OUT getResult(ACC accumulator);

    /**
     * Merges two accumulators, returning an accumulator with the merged state.
     *
     * <p>This function may reuse any of the given accumulators as the target for the merge and
     * return that. The assumption is that the given accumulators will not be used any more after
     * having been passed to this function.
     *
     * @param a An accumulator to merge
     * @param b Another accumulator to merge
     * @return The accumulator with the merged state
     */
    ACC merge(ACC a, ACC b);
}

前三个方法都很容易理解,但第四个merge()方法就有些令人费解了:到底什么时候需要合并两个累加器的数据呢?最近也有童鞋问到了这个问题。实际上,这个方法是专门为会话窗口(session window)服务的。下面来解析一下会话窗口。

Session Window & MergingWindowAssigner

stream.keyBy("userId").window(EventTimeSessionWindows.withGap(Time.seconds(gap)))


在普通的翻滚窗口和滑动窗口中,窗口的范围是按时间区间固定的,虽然范围有可能重合,但是处理起来是各自独立的,并不会相互影响。但是会话窗口则不同,其范围是根据事件之间的时间差是否超过gap来确定的(超过gap就形成一个新窗口),也就是说并非固定。所以,我们需要在每个事件进入会话窗口算子时就为它分配一个初始窗口,起点是它本身所携带的时间戳(这里按event time处理),终点则是时间戳加上gap的偏移量。这样的话,如果两个事件所在的初始窗口没有相交,说明它们属于不同的会话;如果相交,则说明它们属于同一个会话,并且要把这两个初始窗口合并在一起,作为新的会话窗口。多个事件则依次类推,最终形成上面图示的情况。

为了支持会话窗口的合并,它们的WindowAssigner也有所不同,称为MergingWindowAssigner,如下类图所示。

 MergingWindowAssigner是一个抽象类,代码很简单,定义了用于合并窗口的mergeWindows()方法以及合并窗口时的回调MergeCallback。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.api.windowing.assigners;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.Window;

import java.util.Collection;

/**
 * A {@code WindowAssigner} that can merge windows.
 *
 * @param <T> The type of elements that this WindowAssigner can assign windows to.
 * @param <W> The type of {@code Window} that this assigner assigns.
 */
@PublicEvolving
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
    private static final long serialVersionUID = 1L;

    /**
     * Determines which windows (if any) should be merged.
     *
     * @param windows The window candidates.
     * @param callback A callback that can be invoked to signal which windows should be merged.
     */
    public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);

    /**
     * Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
     * windows should be merged.
     */
    public interface MergeCallback<W> {

        /**
         * Specifies that the given windows should be merged into the result window.
         *
         * @param toBeMerged The list of windows that should be merged into one window.
         * @param mergeResult The resulting merged window.
         */
        void merge(Collection<W> toBeMerged, W mergeResult);
    }
}

所有MergingWindowAssigner实现类的mergeWindows()方法都是相同的,即直接调用TimeWindow.mergeWindows()方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/765101.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

瀚高企业版数据库V6单机安装指导手册(Linux)

目录 瀚高企业版数据库V6单机安装指导手册&#xff08;Linux&#xff09; 1. 环境准备 1.1 防火墙设置 1.1.1 开放数据库使用端口 1.1.2 关闭防火墙 1.2 检查时区和时间 1.3 创建highgo用户 1.4 检验安装包 2. 软件安装 2.1 图形化安装 3. 设置highgo用户环境变量 4.…

RocketMQ无损扩容实战

这里是weihubeats,觉得文章不错可以关注公众号小奏技术&#xff0c;文章首发。拒绝营销号&#xff0c;拒绝标题党 背景 假设目前我们的线上部署的RocketMQ部署的是一主一从&#xff0c;现在随着业务的发展&#xff0c;或者是我们需要做一些促销活动&#xff0c;会有突发流量高…

【unity细节】为什么发射炮弹实例化出来了却无法移动

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 收录于专栏&#xff1a;unity细节和bug ⭐为什么发射炮弹实例化出来了却无法移动⭐ 文章目录 ⭐为什么发射炮弹实例化出来…

JS高级进阶

JavaScript 进阶 - 第1节 学习作用域、变量提升、闭包等语言特征&#xff0c;加深对 JavaScript 的理解&#xff0c;掌握变量赋值、函数声明的简洁语法&#xff0c;降低代码的冗余度。 理解作用域对程序执行的影响能够分析程序执行的作用域范围理解闭包本质&#xff0c;利用闭包…

java mybatis

1.框架介绍 为什么使用框架? &#xff08;1&#xff09;框架效率高&#xff0c;成本低 &#xff08;2&#xff09;框架是别人写好的构建&#xff0c;我们只需学会如何使用它【可维护性高】 &#xff08;3&#xff09;框架是基于MVC的思想【web层独有的思想】的拓展而开发的…

DeepC 实用教程(四)分析

目 录 一、前言二、DeepC分析三、新建分析 / New Analysis四、Response Storage五、Static Analysis Options六、Dynamic Analysis Options七、Multiple Analysis八、提交分析/执行分析九、参考文献 一、前言 SESAM &#xff08;Super Element Structure Analysis Module&#…

【动手学深度学习】GPU初步认识与使用

【动手学深度学习】GPU初步认识与使用 查看显卡 使用nvidia-smi命令来查看显卡信息 pytorch中每一个数组都有一个设备&#xff0c;将其称之为环境&#xff0c;那么默认情况下都是在CPU上&#xff0c;有时候环境是GPU 计算设备 默认情况下&#xff0c;张量是在内存中创建的&a…

Codeforces Round 885 (Div. 2) A题

原题div.2A 很容易看不懂样例4&#xff0c;就是我们以为此题只能走一分钟&#xff0c;但是事实上不是的&#xff0c;这个人可以走无限分钟&#xff0c;我们借助样例2来推演出来ps:可能不是正解&#xff0c;正解可以去看官方题解或者别的题解&#xff0c;但是也大差不差 #inclu…

应用在电磁炉中的常用IGBT管 IHW20N135R5 优势及其特性

应用在电磁炉中的常用IGBT管 IHW20N135R5深力科 在TO-247封装中具有单片集成反向导通二极管的反向导通R5 1350 V&#xff0c;20 A RC-H5 IGBT已针对感应烹饪应用的苛刻要求进行了优化。1350 V RC-H5 IGBT采用单片集成二极管&#xff0c;非常适合软开关应用&#xff0c;如感应烹…

时序预测 | MATLAB实现LSTM时间序列未来多步预测

基本介绍 实际工程中&#xff0c;未来预测是值得研究的课题之一&#xff0c;大部分深度模型在短期预测上表现不错&#xff0c;在中长 期预测上往往欠佳。 本文依然借助LSTM专栏的一些基础预测&#xff0c;探讨未来预测的简单实现方式。 程序设计 直接多步预测 直接多步预测的本…

【分布式应用】Ceph的实战应用

目录 一、创建 CephFS 文件系统 MDS 接口1.1服务端操作1&#xff09;在管理节点创建 mds 服务2&#xff09;查看各个节点的 mds 服务3&#xff09;创建存储池&#xff0c;启用 ceph 文件系统4&#xff09;查看mds状态&#xff0c;一个up&#xff0c;其余两个待命&#xff0c;目…

【深度学习】张量的广播专题

一、说明 张量广播&#xff08;tensor broadcasting&#xff09;是一种将低维张量自动转化为高维张量的技术&#xff0c;使得张量之间可以进行基于元素的运算&#xff08;如加、减、乘等&#xff09;。在进行张量广播时&#xff0c;会将维度数较少的张量沿着长度为1的轴进行复制…

数据结构和算法——堆排序(选择排序、思路图解、代码、时间复杂度、堆排序及代码)

目录 选择排序 思路图解 代码&#xff08;C语言&#xff09; 时间复杂度 堆排序 算法1 算法2 堆排序代码 选择排序 思路图解 代码&#xff08;C语言&#xff09; void Selection_Sort(ElementType A[], int N) {for(i 0; i < N; i){MinPosition ScanForMin(A,i,N…

【CSS】 position : static | absolute | relative | fixed | sticky

目录 1️⃣前言2️⃣内容2.1、什么是定位&#xff1f;2.2、为什么需要定位&#xff1f;2.3、如何使用定位&#xff1f;2.3.1、定位的组成2.3.2、static2.3.3、relative2.3.4、absolute2.3.5、fixed2.3.6、sticky 2.4、什么是 z-index &#xff1f;2.5、z-index 属性的取值2.6、…

Docker本地镜像发布到阿里云

我们构建了自己的镜像后&#xff0c;可以发布到远程镜像提供给其他人使用&#xff0c;比如发布到阿里云 使用build/commit生成新的镜像&#xff0c;并生成自己镜像的版本标签tag&#xff0c;此新的镜像在自己的本地库中&#xff0c;使用push可以将镜像提交到阿里云公有库/私有库…

阿里云推出“ModelScopeGPT”大模型调用工具,再添新贵

阿里云近日对外宣布将要推出其首款大模型调用工具——“魔搭GPT&#xff08;ModelScopeGPT&#xff09;”&#xff0c;从而为使用者们提供更为优质的人工智能应用支持。而作为阿里云人工智能产品家族中的新成员&#xff0c;这一创新工具也将进一步扩展阿里云在人工智能领域的影…

对抗网络GAN详解:GAN训练不稳定解决方法、GAN中使用的深度学习技巧、GAN使用任务领域、GAN资料大全整理

不建议用博弈论思想 (game theory) 去理解对抗网络&#xff0c;减弱生成器 G 与判别器 D 间的对抗属性有利于稳定训练。不要把判别器理解成一个分类器 (Discriminator, Classifier)&#xff0c;让判别器回归判别属性&#xff0c;像 Critic 那样输出一个评分。这样能让判别器为生…

总结SPI、I2C、UART三者的区别以及有关知识

SPI&#xff08;串行外设接口&#xff09;&#xff0c;I2C&#xff08;串行总线接口&#xff09;和UART&#xff08;通用异步收发器&#xff09;是三种常用的通信协议&#xff0c;用于在不同的设备之间进行数据传输。 目录 三者的区别&#xff1a; 单工&#xff0c;半双工&am…

windows10 搭建hadoop环境,并且使用hadoop命令

hadoop 环境创建 1. 八、window搭建spark IDEA开发环境 按照步骤安装完 2. windows下安装和配置hadoop 配置环境变量&#xff0c;注意JAVA_HOME路径&#xff0c;修改后&#xff0c;重启电脑&#xff0c;不重启容易报错&#xff01;&#xff01;&#xff01; ​ 新建dat…

Kubernetes - kubeadm部署

Kubernetes - kubeadm部署 1 环境准备1.1 在各个节点上配置主机名&#xff0c;并配置 Hosts 文件1.2 关闭防护墙&#xff0c;禁用selinux&#xff0c;关闭swap1.3 配置免密登录1.4 配置内核参数1.5 配置br_netfilter 2. 安装K8s2.1 安装docker(各节点)2.2 安装K8s组件(各节点)2…