ProcessWindowFunction 结合自定义触发器的陷阱

news2024/11/26 10:32:25

背景:

flink中常见的需求如下:统计某个页面一天内的点击率,每10秒输出一次,我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢?如果这样实现问题是什么呢?

ProcessWindowFunction 结合自定义触发器实现统计点击率

关键代码:
在这里插入图片描述
在这里插入图片描述
完整代码参见:

package wikiedits.func;


import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import wikiedits.func.model.KeyCount;



public class ProcessWindowFunctionAndTiggerDemo {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/windowtrigger"));

        // 并行度为1
        env.setParallelism(1);
        // 设置数据源,一共三个元素
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                int xxxNum = 0;
                int yyyNum = 0;
                for (int i = 1; i < Integer.MAX_VALUE; i++) {
                    // 只有XXX和YYY两种name
                    String name = (0 == i % 2) ? "XXX" : "YYY";
                    // 更新aaa和bbb元素的总数
                    if (0 == i % 2) {
                        xxxNum++;
                    } else {
                        yyyNum++;
                    }
                    // 使用当前时间作为时间戳
                    long timeStamp = System.currentTimeMillis();
                    // 将数据和时间戳打印出来,用来验证数据
                    if(xxxNum % 2000==0){
                        System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,
                                time(timeStamp), xxxNum, yyyNum));
                    }
                    // 发射一个元素,并且戴上了时间戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);
                    // 每发射一次就延时1秒
                    Thread.sleep(1);
                }
            }

            @Override
            public void cancel() {}
        });

        // 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunction
        SingleOutputStreamOperator<String> mainDataStream = dataStream
                // 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种
                .keyBy(value -> value.f0)
                // 5秒一次的滚动窗口
                .timeWindow(Time.minutes(5))
                // 10s触发一次计算,更新统计结果
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                // 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子
                .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
                    // 自定义状态
                    private ValueState<KeyCount> state;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 初始化状态,name是myState
                        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
                    }

                    public void clear(Context context) {
                        ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));
                        contextWindowValueState.clear();
                    }

                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,
                            Collector<String> collector) throws Exception {
                        // 从backend取得当前单词的myState状态
                        KeyCount current = state.value();
                        // 如果myState还从未没有赋值过,就在此初始化
                        if (current == null) {
                            current = new KeyCount();
                            current.key = s;
                            current.count = 0;
                        }
                        int count = 0;
                        // iterable可以访问该key当前窗口内的所有数据,
                        // 这里简单处理,只统计了元素数量
                        for (Tuple2<String, Integer> tuple2 : iterable) {
                            count++;
                        }
                        // 更新当前key的元素总数
                        current.count += count;
                        // 更新状态到backend
                        state.update(current);

                        ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));
                        KeyCount windowValue = contextWindowValueState.value();
                        if (windowValue == null) {
                            windowValue = new KeyCount();
                            windowValue.key = s;
                            windowValue.count = 0;
                        }
                        windowValue.count += count;
                        contextWindowValueState.update(windowValue);

                        // 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串
                        String value = String.format("window, %s, %s - %s, %d, windowStateCount :%d,   total : %d",
                                // 当前key
                                s,
                                // 当前窗口的起始时间
                                time(context.window().getStart()),
                                // 当前窗口的结束时间
                                time(context.window().getEnd()),
                                // 当前key在当前窗口内元素总数
                                count,
                                // 当前key所在窗口的总数
                                contextWindowValueState.value().count,
                                // 当前key出现的总数
                                current.count);

                        // 发射到下游算子
                        collector.collect(value);
                    }
                });

        // 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据
        mainDataStream.print();

        env.execute("processfunction demo : processwindowfunction");

    }



    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }



}

这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的,不过这其中需要注意点的点是:
每隔10s触发public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector)方法时,iterable对象是包含了一天的窗口内收到的所有消息,也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。
到这里所引起的问题也自然而然的出来了:对于ProcessWindowFunction 实现而言,flink内部是通过ListState的形式保存窗口内收到的所有消息的,注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息,这会导致状态膨胀,想一下,一天内所有的消息都会当成状态保存起来,这对于状态后端的压力是有多大!这些保存在ListState中的消息只有在窗口结束后才会清理:具体参见WindowOperator.clearAllState,那有解决方案吗?使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗?请看下一篇文章

参考文章:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/980120.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于YOLOv8模型的安全背心目标检测系统(PyTorch+Pyside6+YOLOv8模型)

摘要&#xff1a;基于YOLOv8模型的安全背心目标检测系统可用于日常生活中检测与定位安全背心目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的目标检测&#xff0c;另外本系统还支持图片、视频等格式的结果可视化与结果导出。本系统采用YOLOv8目标检测算法训…

Zabbix 利用 Grafana 进行图形展示

安装插件 配置数据源 导入模版 查看 1.安装 wget https://mirrors.tuna.tsinghua.edu.cn/grafana/yum/rpm/Packages/grafana-10.0.0-1.x86_64.rpm [rootrocky8 apps]# yum install grafana-10.0.0-1.x86_64.rpm [rootrocky8 apps]# systemctl start grafana-server.service …

Mac 手动安装 sshpass

1. 下载安装包 https://sourceforge.net/projects/sshpass/ 解压并进入到安装包目录 tar -zxvf sshpass-xx.xx.tar.gz cd sshpass-xx.xx2. 检验环境&#xff0c;编译源码安装 ./configuremake&&make install3. 检测安装是否成功 ▶ sshpass Usage: sshpass [-f|-…

freertos之资源管理

中断屏蔽 屏蔽中断函数 在任务中使用 taskENTER_CRITICA()/taskEXIT_CRITICAL() 在中断中使用 taskENTER_CRITICAL_FROM_ISR()/taskEXIT_CRITICAL_FROM_ISR() 功能介绍 使用上述函数&#xff0c;进入临界中断&#xff0c;任务不会切换&#xff0c;且中断优先级处于con…

邮件钓鱼的防守策略

一、攻击背景 在历年的实战攻防演练中&#xff0c;人的漏洞是网络安全最大的脆弱点&#xff0c;而钓鱼攻击就是从内部攻破堡垒至关重要的手段。攻击者通过伪装成可信来源发送虚假邮件&#xff0c;诱导接收者点击恶意链接、提供敏感信息或执行恶意附件&#xff0c;从而获取机密…

python解压gz包

import gzipdef un_gz(file_name):# 获取文件的名称&#xff0c;去掉后缀名f_name file_name.replace(".gz", "")# 开始解压g_file gzip.GzipFile(file_name)#读取解压后的文件&#xff0c;并写入去掉后缀名的同名文件&#xff08;即得到解压后的文件&am…

聊聊操作系统中 进程 and 线程中哪些事??

操作系统&#xff08;英语&#xff1a;Operating System&#xff0c;缩写&#xff1a;OS&#xff09;是一组主管并控制计算机操作、运用和运行硬件、软件资源和提供公共服务来组织用户交互的相互关联的系统软件程序。根据运行的环境&#xff0c;操作系统可以分为桌面操作系统&a…

java 身份证号码验证

需要编号文件 编号文件部分内容如下 11:北京市 1101:市辖区 110101:东城区 110102:西城区 110105:朝阳区 110106:丰台区 110107:石景山区 110108:海淀区 ...... 编号文件内容比较多 csdn点击 下载地址 java代码如下 import java.io.*; import java.text.ParseException; im…

Win10怎么设置默认看图软件?Win10设置自带看图软件为默认程序操作方法

大小&#xff1a;4.19 GB类别&#xff1a;雨林木风系统 Win10怎么设置默认看图软件&#xff1f;有用户在使用电脑去浏览图片内容的时候&#xff0c;总是使用第三方的看图软件自动打开&#xff0c;不使用系统自带的看图软件来开启图片。那么怎么设置系统自带的看图软件为默认程序…

【嵌入式软件C编程】主函数free子函数malloc地址的两种方式以及注意事项

本文档主要记录嵌入式C语言在子函数中应用malloc函数的方式&#xff0c;在实际项目中内存管理特别重要 一般在主函数中&#xff08;main&#xff09;使用malloc函数&#xff0c;然后在通过free函数进行释放内存&#xff0c;但有时候如果必须在子函数长调用malloc函数该怎样进行…

[HDCTF 2023]YamiYami

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言涉及知识点解题详细过程session伪造反弹shell 前言 从暑假末尾一直搁置&#xff0c;当时卡在反弹shell搞得离flag就差一步。不过最近一两天学习完反弹shell的知…

面试算法-数据结构二

大厂算法面试 1&#xff09; 图论 2&#xff09; 大数据 3&#xff09;动态规划 优秀的算法往往取决于你采取那种数据结构 高级数据结构 1&#xff09;优先队列 2&#xff09;图 3&#xff09;前缀树 4&#xff09;线段树 5&#xff09;树状数组 在分析问题的时候&a…

OB Cloud 初体验

文章来源&#xff1a;韩锋频道 韩锋 数据库行业资深从业者&#xff0c;著有《SQL 优化最佳实践》、《数据库高效优化》等数据库相关著作。 OceanBase&#xff08;下文简称OB&#xff09; 作为国内一款优秀的分布式数据库&#xff0c;这些年来发展很快&#xff0c;在金融、电商…

UDP协议结构及其注意事项

UDP报文结构 UDP报文结构主要是由两个部分组成的&#xff1a;UDP头部和数据部分。 UDP头部 源端口号&#xff1a;16位字段&#xff0c;指示数据发送方的端口号。目的端口号&#xff1a;16位字段&#xff0c;指示数据接收方端口号。UDP报文长度&#xff1a;16位字段&#xff0…

new HashMap{{put(“a“,“b“)}}

如题&#xff0c;这是什么鬼&#xff1f; Runnable r new Runnable(){Overridepublic void run() {}}; 上面这种写法大家不陌生吧&#xff0c;实际上 就是 定义了一个匿名内部类。 比如&#xff1a;下面这个就是 定义了一个 匿名的Test子类&#xff0c;旁边那个向下的箭头就…

Lua02——应用场景及环境安装

应用场景 是当今游戏领域使用最广泛的脚本语言之一。 搭配 OpenResty 使用&#xff0c;可以扩展Nginx服务器的功能&#xff0c;使用者仅需要编写Lua代码就能轻松完成业务逻辑。 与 Redis 结合。 Adobe Photoshop Lightroom 搭配 Lua 编写插件。 与游戏结合&#xff1a; C/…

DNS域名解析 不同网段的 实验

路漫漫其修远兮&#xff0c;吾将上下而求索 前言 知识应该是 有难宜简&#xff0c;用简单的语言讲清楚&#xff0c;而不是干拧螺丝的事&#xff0c;却说着造火箭的理论。 &#xff08;尤其是大学讲台上的那一部分人&#xff0c;想想人家的学费&#xff0c;你的工资&#xff…

震惊!可视化大屏都卷成这样了!

如果你还认为可视化大屏只是一个屏幕展示几个数据、图表&#xff0c;那就大错特错了。进入2023年&#xff0c;可视化大屏都内卷到可以实时更新、多维动态、模板一键套用了。 奥威BI系统可视化大屏特点&#xff1a; 1、实时更新 所有取数都基于底层数据&#xff0c;当底层数据…

java开源 VR全景商城 saas商城 b2b2c商城 o2o商城 积分商城 秒杀商城 拼团商城 分销商城 短视频商城 小程序商城搭建

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框架…

【Node.js】—基本知识点总结

【Node.js】—基本知识总结 一、命令行常用操作 二、Node.js注意点 Node.js中不能使用BOM和DOM操作 总结 三、Buffer buffer是一个类似于数组的对象&#xff0c;用于表示固定长度的字节序列buffer的本质是一段内存空间&#xff0c;专门用来处理二进制数据 特点&#xff1a;…