flink的AggregateFunction,merge方法作用范围

news2025/1/22 12:44:47

背景

AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现

AggregateFunction.merge方法调用时机

AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
在这里插入图片描述

对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并

public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows =
                windowAssigner.assignWindows(
                        element.getValue(), element.getTimestamp(), windowAssignerContext);
 
        // if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;
 
        final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
        if (windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<W> mergingWindows = getMergingWindowSet();
 
            for (W window : elementWindows) {
 
                // adding the new window might result in a merge, in that case the actualWindow
                // is the merged window and we work with that. If we don't merge then
                // actualWindow == window
                W actualWindow =
                        mergingWindows.addWindow(
                                window,
                                new MergingWindowSet.MergeFunction<W>() {
                                    @Override
                                    public void merge(
                                            W mergeResult,
                                            Collection<W> mergedWindows,
                                            W stateWindowResult,
                                            Collection<W> mergedStateWindows)
                                            throws Exception {
 
                                        triggerContext.key = key;
                                        triggerContext.window = mergeResult;
 
                                        triggerContext.onMerge(mergedWindows);
 
                                        for (W m : mergedWindows) {
                                            triggerContext.window = m;
                                            triggerContext.clear();
                                            deleteCleanupTimer(m);
                                        }
 
                                        // 合并窗口的状态
                                        windowMergingState.mergeNamespaces(
                                                stateWindowResult, mergedStateWindows);
                                    }
                                });

继续查看AbstractHeapMergingState.mergeNamespaces方法,

public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    if (sources == null || sources.isEmpty()) {
        return; // nothing to do
    }
 
    final StateTable<K, N, SV> map = stateTable;
 
    SV merged = null;
 
    // merge the sources
    for (N source : sources) {
 
        // get and remove the next source per namespace/key
        SV sourceState = map.removeAndGetOld(source);
 
        if (merged != null && sourceState != null) {
            //此处合并状态并调用AggregateFunction.merge方法
            merged = mergeState(merged, sourceState);
        } else if (merged == null) {
            merged = sourceState;
        }
    }
 
    // merge into the target, if needed
    if (merged != null) {
        map.transform(target, merged, mergeTransformation);
    }
}
 
//真正调用AggregateFunction.merge方法合并自定义的状态
@Override
protected ACC mergeState(ACC a, ACC b) {
    return aggregateTransformation.aggFunction.merge(a, b);
}

这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1182258.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis 扩展 RedisBloom 插件,解决缓存击穿、穿透

文章目录 一、概述二、编译准备2.1 升级 make2.2 安装 Python3 三、编译 RedisBloom四、测试 RedisBloom五、应用场景5.1 缓存击穿5.2 缓存穿透5.3 原理总结 六、存在的问题 如果您对Redis的了解不够深入请关注本栏目&#xff0c;本栏目包括Redis安装&#xff0c;Redis配置文件…

计算机基础知识45

JS的RegExp对象(正则) text: 正则校验数据 # T/F match: 匹配 # (3) [s, s, s] //定义 var reg1 new RegExp("^[a-zA-Z][a-zA-Z0-9]{5,11}"); var reg2 /^[a-zA-Z][a-zA-Z0-9]{5,9}$/; //正则校验数据 var res reg1.test(jason666); console.log(res…

二叉树的前序、中序、后序、层序遍历

参考内容&#xff1a; 五分钟让你彻底理解二叉树的非递归遍历 Python实现二叉树的非递归遍历 二叉树遍历——深度优先&#xff08;前中后序&#xff09;广度优先&#xff08;层序遍历&#xff09; 构造二叉树 定义二叉树结构如下 struct node {int data;node *left;node *rig…

移远通信蝉联“年度杰出创新企业”大奖,以核心技术实力永攀行业高峰

11月2日&#xff0c;“国际集成电路展览会暨研讨会”&#xff08;IIC Shenzhen 2023&#xff09;在深圳大中华交易广场重磅启幕。业界领袖共探国内外创新技术与产品成果&#xff0c;并对推动全球电子产业创新做出贡献的企业进行了表彰。其中&#xff0c;全球领先的物联网整体解…

Android 使用.9图 NinePatchDrawable实现动态聊天气泡

最近一段时间&#xff0c;在做一个需求&#xff0c;需要实现一个聊天气泡的动画效果&#xff0c;如下图所示&#xff1a; GitHub源码demo &#xff0c;建议下载demo&#xff0c;运行查看。 动态聊天气泡动画 静态聊天气泡 经过一段时间调研&#xff0c;实现方案如下: 实现方…

使用Redis实现缓存及对应问题解决

一、为什么需要Redis作缓存&#xff1f; 在业务场景中&#xff0c;如果有些数据需要极高频的存取&#xff0c;每次都要在mysql中查询的话代价太大&#xff0c;假如有一个存在于客户端和mysql之间的存储空间&#xff0c;每次可以在这空间中进行存取操作&#xff0c;就会减轻mys…

Docker配置Nginx反向代理

文章目录 1.部署微程序到docker中1.1 dockerfile文件1.2 依据自定义的dockerfile文件创建docker镜像1.3 创建容器1.4 测试 2.在docker中安装Nginx2.1 安装Nginx镜像2.2 获取Nginx配置文件并将其同步到宿主电脑指定位置中安装nginx容器删除nginx容器 2.3 安装Nginx容器并数据挂载…

C++: 类和对象(中) (构造函数, 析构函数, 拷贝构造函数, 赋值重载, 取地址重载)

文章目录 1. 类的6个默认成员函数2. 构造函数构造函数概念构造函数特性特性1,2,3,4特性5特性6特性7 3. 析构函数析构函数概念析构函数特性特性1,2,3,4特性5特性6 4. 拷贝构造函数拷贝构造函数概念拷贝构造函数特性特性1,2特性3特性4特性5 5. 运算符重载一般运算符重载赋值运算符…

mysql安装成功

先在官网下载 地址&#xff1a;MySQL :: Download MySQL Community Server下载的 下载的这个 解压后 zip格式是自己解压&#xff0c;解压缩之后其实MySQL就可以使用了&#xff0c;但是要进行环境变量配置 我的电脑->属性->高级->环境变量->系统变量 选择Path,在其…

java版直播商城免费搭建平台规划及常见的营销模式+电商源码+小程序+三级分销+二次开发

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框架…

二十、泛型(3)

本章概要 构建复杂模型泛型擦除 C 的方式迁移兼容性擦除的问题边界处的动作 构建复杂模型 泛型的一个重要好处是能够简单安全地创建复杂模型。例如&#xff0c;我们可以轻松地创建一个元组列表&#xff1a; TupleList.java import java.util.ArrayList;public class TupleL…

深入理解强化学习——多臂赌博机:乐观初始值

分类目录&#xff1a;《深入理解强化学习》总目录 目前为止我们讨论的所有方法都在一定程度上依赖于初始动作值 Q 1 ( a ) Q_1(a) Q1​(a)的选择。从统计学角度来说&#xff0c;这些方法&#xff08;由于初始估计值&#xff09;是有偏的。对于采样平均法来说&#xff0c;当所有…

软件专业毕业生的如何找工作?——加速度jsudo

据统计&#xff0c;2023届全国高校毕业生预计达到1158万人&#xff0c;同比增长82万人。根据某大学的统计数据&#xff0c;IT专业的就业率在过去五年中保持了稳定增长的趋势&#xff0c;平均超过90%。 IT行业的薪资水平相对较高&#xff0c;也让很多高校和培训机构愿意投入更多…

Sybase连接详解

Sybase连接详解 Sybase连接详解摘要一、JDBC基础1.1 JDBC简介1.2 JDBC驱动程序 二、配置Sybase JDBC连接2.1 连接Sybase数据库2.2 验证Sybase JDBC连接2.3 获取Sybase数据库表信息和注释2.4 根据表名获取Sybase字段信息和注释2.5 执行SQL查询2.6 插入数据2.7 执行Sybase存储过程…

ElasticSearch离线安装

1. 上传和解压软件 将elasticsearch-7.11.2-linux-x86_64.tar.gz和kibana-7.11.2-linux-x86_64.tar.gz 上传到/data/es目录 解压文件 tar -zxvf elasticsearch-7.11.2-linux-x86_64.tar.gz tar -zxvf kibana-7.11.2-linux-x86_64.tar.gz 2. 创建es用户 因为安全问题&#xff…

windows好玩的cmd命令

颜色 后边的数字查表吧,反正我是喜欢一个随机的数字 color 01MAC getmac /v更新主机IP地址 通过DHCP更新 ipconfig /release ipconfig /renew改标题 title code with 你想要的标题

如何实现Word文档中的书签双向定位

工作中&#xff0c;经常需要拟定合同&#xff0c;一般都有固定的模板&#xff0c;在特定的位置填写内容。通过zOffice编辑合同文件时&#xff0c;可以在模板需要填写的位置预设书签&#xff0c;配合zOffice SDK使用&#xff0c;利用zOffice书签双向定位的特性&#xff0c;更方便…

3.5、Linux:命令行git的使用

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 在Linux Centos7.6下安装git yum -y install git 注册一个gitee账号 进去注册就好&#xff0c;记住自己的用户名和密码。 创建一个仓库 点击复制&#xff0c;接着就可以在Linux上使用了 git clone git clone 刚才复制的地…

《UML和模式应用(原书第3版)》2024新修订译本部分截图

DDD领域驱动设计批评文集 做强化自测题获得“软件方法建模师”称号 《软件方法》各章合集 机械工业出版社即将在2024春节前后推出《UML和模式应用&#xff08;原书第3版&#xff09;》的典藏版。 受出版社委托&#xff0c;UMLChina审校了原中译本并做了一些修订。同比来说&a…

Qwt QwtThermo绘制温度计

1.简介 QwtThermo 是一个基于 Qt 框架的类库&#xff0c;用于创建温度计控件。它提供了一些方便的功能来展示和处理温度计相关的数据。 QwtThermo 添加了特定于温度计的功能。 使用 QwtThermo&#xff0c;可以实现以下功能&#xff1a; 设置温度范围&#xff1a;可以通过设置…