Flink的TopN

news2025/1/10 20:47:25

1.为什么定时器的时间设置为,窗口的end值+1ms就可以呢?

因为定时器是下游,水位线是取的多个上游的最小的, 水位线是跟在数据后面的,所以当定时器的时间到达时,上游一定计算完成了,并且数据已经在水位线之前到下游了,所以可以触发计算,就是当前窗口所有的数据,比如窗口是[8:00~9:00)

2.为什么要用定时器呢?

不用也可以计算,但是是来一条计算一条,假如有10万条,效率低,用定时器计算,可以在数据到齐时,一起计算,效率高。

3.不用window,但是一定要keyBy

不用window的原因:不是取有限的数据,而是取所有end是9:00的数据

一定要keyBy的原因:因为上游计算完成的有可能有[8:05~9:05)的数据,所以需要根据end分组

4.为什么读取文件,没到5分钟就触发计算了?

因为用的是事件时间

5.定义的flag变量,计算完需要置null吗?

不需要。每个key都有自己的ValueState 

6.定时器触发的时候,上游一定都计算完了吗?

一定计算完了。

因为上游是先keyBy,再window,计算的是A商品在 [8:00~9:00)时间段内的数据,B商品在 [8:00~9:00)时间段内的数据,当A的水位线到达9:00的时候,触发了计算,但是B的水位线才到8:30,这时候定时器会取上游最小的8:30,所以不会触发,当B的水位线推进到9:00的时候,现在最小的就是9:00,所以定时器会触发计算,这样,A和B都被计算了,没有丢失数据。

7.flink的定时器 如果重复注册相同的 会触发多次吗?

不会

“答案是不会,应为Flink内部使用的HeapPriorityQueueSet来存储定时器,一个注册请求到来时,其add()方法会检查是否已经存在,如果存在则不会加入。 ”

但是最好在外面手动控制,比如用一个Boolean值,只在第一个时注册。

8.下游定时器需要等所有上游时间都到达后计算,等的是哪些上游?

可以通过webui界面看上下游

对于topn,下游是process算子, 上游是aggregate算子,当A、B、C三个商品都完成之后,下游定时器计算。

9.对于水位线

源头是周期性产生的,但是之后是:水位线是跟在数据屁股后面的,所以等aggregate算子计算完后,定时器再计算。

package com.atguigu.flink.state;

import com.atguigu.flink.func.WaterSensorMapFunction;
import com.atguigu.flink.pojo.UserBehavior;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
 * Created by Smexy on 2023/6/21
 *
 *  需求: 每隔5min输出最近1h内点击量(pv)最多的前3个商品
 *
 *  数据:    543462,1715,1464116,pv,1511658000
 *          userId,商品id,商品类别id,行为类型,ts
 *
 *   输入:  543462,1715,1464116,pv,1511658000
 *          粒度: 一个用户点击一个商品的一次是一行
 *
 *
 *   推理计算过程:   聚合,keyBy  商品id
 *          第一次聚合:  统计最近1h(窗口)内,各个商品的点击总次数
 *              size: 范围,1h
 *              slide: 计算时机,5min
 *                  滑动的时间窗口。
 *
 *              输入:  543462,1715,1464116,pv,1511658000
 *
 *              输出:
 *                    [8:00,9:00):
 *                          A---120
 *                    [8:05,9:05)
 *                          A---150
 *                    [8:00,9:00):
 *                          B---130
 *                    [8:00,9:00):
 *                          C---132
 *                    [8:00,9:00):
 *                          D---131
 *
 *          第二次聚合:  将每个时间段窗口中各个商品的点击量,排序再取前3
 *                  用不用开窗? 不用
 *                  需要keyBy,按照窗口的统计的时间范围keyBy
 *
 *                  等同一个窗口的所有数据全部到达后,再一次性计算。
 *                  如何知道当前要计算的数据已经全部到达,可以触发运算?
 *                      使用定时器,将窗口的endTime作为触发时间,只要下游的时间到了endTime证明上游endTime之前的所有数据都已经到达了下游,可以进行运算。
 *
 *
 *
 *    输出:
 *              [8:00,9:00):
 *                      A--120
 *                      B--119
 *                      C--118
 *              [8:05,9:05):
 *                       E--120
 *                       B--119
 *                       C--118
 */
public class Flink12_TopN
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        FileSource<String> fileSource = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(StandardCharsets.UTF_8.name())
            ,
            new Path("input/UserBehavior.csv")
        ).build();

         WatermarkStrategy<UserBehavior> watermarkStrategy = WatermarkStrategy
                     .<UserBehavior>forMonotonousTimestamps()
                     .withTimestampAssigner( (e, ts) -> e.getTimestamp() * 1000);

         //1.读数据,封装bean,过滤pv,生成水印
        SingleOutputStreamOperator<UserBehavior> ds = env
            .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "source")
            .map(line -> {
                String[] words = line.split(",");
                return new UserBehavior(
                    Long.valueOf(words[0]),
                    Long.valueOf(words[1]),
                    Integer.valueOf(words[2]),
                    words[3],
                    Long.valueOf(words[4])
                );
            })
            .filter(bean -> "pv".equals(bean.getBehavior()))
            .assignTimestampsAndWatermarks(watermarkStrategy);

        /*
            2.开窗,统计每种商品的点击次数

         */
        SingleOutputStreamOperator<HotItem> ds1 = ds
            .keyBy(UserBehavior::getItemId)
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
            .aggregate(new AggregateFunction<UserBehavior, Long, HotItem>()
            {
                @Override
                public Long createAccumulator() {
                    return 0l;
                }

                @Override
                public Long add(UserBehavior value, Long accumulator) {
                    return accumulator + 1;
                }

                @Override
                public HotItem getResult(Long accumulator) {
                    return new HotItem(null, null, null, accumulator);
                }

                @Override
                public Long merge(Long a, Long b) {
                    return null;
                }
            }, new ProcessWindowFunction<HotItem, HotItem, Long, TimeWindow>()
            {
                @Override
                public void process(Long key, ProcessWindowFunction<HotItem, HotItem, Long, TimeWindow>.Context context, Iterable<HotItem> iterable, Collector<HotItem> collector) throws Exception {
                    HotItem hotItem = iterable.iterator().next();
                    TimeWindow window = context.window();
                    //赋值
                    hotItem.setStart(window.getStart());
                    hotItem.setEnd(window.getEnd());
                    hotItem.setItemId(key);
                    collector.collect(hotItem);
                }
            });

        //3.在下游按照窗口的时间范围分组,top3统计。使用定时器触发运算。
        ds1
            .keyBy(HotItem::getStart)
            .process(new KeyedProcessFunction<Long, HotItem, String>()
            {
                private ValueState<Boolean> flag;
                private ListState<HotItem> listState;
                /*
                    没来一条数据,先存起来,等定时器到点了,再触发top3
                 */

                @Override
                public void open(Configuration parameters) throws Exception {
                    listState = getRuntimeContext().getListState(new ListStateDescriptor<>("hot3", HotItem.class));
                    flag = getRuntimeContext().getState(new ValueStateDescriptor<>("flag", Boolean.class));
                }

                //进行top3计算
                @Override
                public void onTimer(long timestamp, KeyedProcessFunction<Long, HotItem, String>.OnTimerContext ctx, Collector<String> out) throws Exception {

                    List<HotItem> top3 = StreamSupport.stream(listState.get().spliterator(), true)
                                                         .sorted((h1, h2) -> -h1.getCount().compareTo(h2.getCount()))
                                                         .limit(3)
                                                         .collect(Collectors.toList());

                    //整理数据的格式
                    String resultStr = top3.stream().map(item -> item.getItemId() + ":" + item.getCount()).collect(Collectors.joining(","));

                    String timeStr = MyUtil.parseTimeWindow(new TimeWindow(top3.get(0).getStart(), top3.get(0).getEnd()));

                    out.collect(timeStr + ": top3  : " + resultStr);

                }

                @Override
                public void processElement(HotItem hotItem, KeyedProcessFunction<Long, HotItem, String>.Context context, Collector<String> collector) throws Exception {

                    listState.add(hotItem);
                    //在当前组中第一条数据来的时候,定定时器
                    if (flag.value() == null){
                        //定定时器
                        context.timerService().registerEventTimeTimer(hotItem.getEnd());
                        flag.update(false);
                    }

                }
            })
            .print();




        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class HotItem{

        //定义窗口范围 时间窗口
        private Long start;
        private Long end;
        //定义统计的指标
        private Long itemId;
        private Long count;

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/685749.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LLM - 搭建 DrugGPT 结合药物化学分子知识的 ChatGPT 系统

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://blog.csdn.net/caroline_wendy/article/details/131384199 论文&#xff1a;DrugChat: Towards Enabling ChatGPT-Like Capabilities on Drug Molecule Graphs DrugChat&#xff0c;基…

便携式明渠流量计比对装置的基本要求

便携式明渠流量计比对装置的基本要求有哪些&#xff1f; 符合国标要求中对流量监测单元的比对&#xff08;每2分钟采集一次数据&#xff0c;连续记录 6次数据与在线流量计数据进行液位误差比对&#xff0c;测量10分钟内的流量数据与在线流量计数据进行流量误差比对&#xff09;…

springboot服务时间筛选问题,同一服务部署在本地和Linux服务器,同样的时间筛选数据不一致

项目场景&#xff1a; springboot项目通过时间筛选数据、 问题描述 时间筛选问题&#xff0c;同一服务部署在本地和Linux服务器&#xff0c;同样的时间筛选数据不一致 接收的参数 JsonFormat(pattern "yyyy-MM-dd") private Date queryDate;服务器打印日志出来的…

Linux常用命令——fsck命令

在线Linux命令查询工具 fsck 检查并且试图修复文件系统中的错误 补充说明 fsck命令被用于检查并且试图修复文件系统中的错误。当文件系统发生错误四化&#xff0c;可用fsck指令尝试加以修复。 语法 fsck(选项)(参数)选项 -a&#xff1a;自动修复文件系统&#xff0c;不询…

JavaScript数组遍历的各种方式

目录 第一种方式&#xff08;for循环&#xff09; 第二种方式&#xff08;forEach 遍历&#xff09; 第三种方式&#xff08;for...in&#xff09; 第四种方式&#xff08;for...of&#xff09; 第五种方式&#xff08;map方法遍历&#xff09; 六、性能对比 第一种方式&…

【yolov5系列】将yolov5s模型部署到SigmaStar的9383芯片上

今年年初&#xff0c;接触了星宸科技&#xff08;sigmastar&#xff09;的芯片9383&#xff0c;将深度学习模型进行部署&#xff0c;使用sigmastar的深度学习加速硬件IPU进行模型推理。这里简单记录下sigmastar相关内容。 补充说明&#xff0c;之前使用的是瑞芯微的芯片&#x…

大数据从0到1的完美落地之Flume案例_3

案例演示 案例演示&#xff1a;SyslogtcpMemLogger Syslogtcp: syslog广泛应用于系统日志。syslog日志消息既可以记录在本地文件中&#xff0c;也可以通过网络发送到接收syslog的服务器。接收syslog的服务器可以对多个设备的syslog消息进行统一的存储&#xff0c;或者解析其中…

经济统计类的实证论文解剖

整理来源 UP主&#xff1a;Michaelscholar https://space.bilibili.com/550661456/video 截图 1.读什么文献 国内&#xff1a;中文CSSCI 期刊&#xff1a;中国社会科学&#xff0c;经济研究&#xff0c;经济学&#xff08;季刊&#xff09;&#xff0c;管理世界&#xff0…

MySQL数据库——主从复制

目录 前言一、读写分离概述1. 什么是读写分离&#xff1f;2. 为什么要读写分离呢&#xff1f;3. 什么时候要读写分离&#xff1f;4. 主从复制与读写分离5. mysq支持的复制类型6. 主从复制的工作过程7. MySQL主从复制延迟 二、主从复制配置方法 前言 在实际的生产环境中&#x…

c++ GoogleTest编译使用

编译 1.下载zip源码 2.解压&#xff0c;使用cmake生成工程 第一处填解压生成的文件夹 第二处的build路径可以不存在&#xff0c;点击configure会cmake提示创建 弹出界面中要选择自己的vs版本&#xff0c;选择finish。 然后点击generate&#xff0c;然后点击open Project。 生…

使用kubesphere搭建k8s集群

目录 1准备3台虚拟机 2 每台虚拟机更新yum的软件包&#xff0c;时间设置等 3 关闭防火墙 4 添加三台服务器的域名设置 5 设置三台服务器之间免密 6 安装kubesphere必要依赖&#xff0c;每个节点都要装&#xff0c;不然报错&#xff1a;socat not found in system path 7…

校园广播对讲音柱话筒

校园广播对讲话筒 校园广播对讲话筒&#xff1a;增强校园沟通的利器 提起校园广播对讲话筒&#xff0c;你或许会想到它在紧急情况下的应用。然而&#xff0c;这个功能强大的设备在校园内的广泛运用&#xff0c;不仅仅局限于危机解决。它成为了促进校园内部沟通、加强团队协作…

Tensorflow:from tensorflow.keras import layers 报错

执行代码&#xff1a; from tensorflow.keras import layers 报错&#xff1a; keras模块不存在 查&#xff0c;有其他博客表示keras包在tensorflow模块的pyhon包中&#xff1b; 于是&#xff1a; from tensorflow.python.keras import layers 在类似的如此调用keras的其…

chatgpt赋能python:Python能自动化办公吗?

Python能自动化办公吗&#xff1f; 在当今信息时代&#xff0c;办公工作中我们不可避免地要使用电脑&#xff0c;进行各种办公处理&#xff0c;比如文字处理、数据处理、图表制作等。这些操作看起来简单&#xff0c;但从事这些工作的人员都知道&#xff0c;日积月累之后&#…

【MySQL】不就是MySQL——多表查询综合练习

前言 嗨咯大家好&#xff01;我们学习完毕了多表查询&#xff0c;今天我们就要对我们所学的成果进行测验&#xff0c;本期主要是对多表查询相关内容的练习课程。可以先试着自己敲&#xff0c;遇到不会可以查看查考代码。 目录 前言 目录 练习题 1.查询员工的姓名、年龄、职位…

HDFS常见的Shell操作

HDFS常见的Shell操作 文章目录 HDFS常见的Shell操作写在前面基本语法HDFS命令大全常用命令实操准备工作上传文件下载文件HDFS直接操作 写在前面 Hadoop版本&#xff1a;Hadoop-3.xLinux版本&#xff1a;CentOS7.5 HDFS的Shell操作是入门Hadoop开发的学习重点 基本语法 Shell客…

分布式批处理:MapReduce初探

大家好&#xff0c;我是方圆。《数据密集型应用系统设计》第十章中有介绍到 MapReduce 相关的内容&#xff0c;当时觉得看得意犹未尽&#xff0c;所以便找了一些资料又看了一下。随着深入发现能扩展的东西实在太多&#xff0c;考虑时间有限&#xff0c;准备先把 MapReduce 基础…

基于smardaten无代码快速开发智慧城管系统

0️⃣需求背景 现代城市管理的面临着一系列问题&#xff1a;如执法人员不足、信息化手段应用少和时间处理不及时等&#xff0c;开发一个智慧城管回访系统的需求与日俱增… 通过引入智慧城管回访系统&#xff0c;可以提高城市管理的科学性、智能化和透明度&#xff0c;为城市发…

【软考网络管理员】2023年软考网管初级常见知识考点(14)- linux命令及目录相关详解

涉及知识点 Linux 目录结构&#xff0c; Linux 常用命令&#xff0c; Linux 下的文件基本属性&#xff0c; Linux 的启动与关闭 软考网络管理员常考知识点&#xff0c;软考网络管理员网络安全&#xff0c;网络管理员考点汇总。 原创于&#xff1a;CSDN博主-《拄杖盲学轻声码》…

HTML5 游戏开发实战 | 推箱子

经典的推箱子是一个来自日本的古老游戏&#xff0c;目的是在训练玩家的逻辑思考能力。在一个狭小的仓库中&#xff0c;要求把木箱放到指定的位置&#xff0c;稍不小心就会出现箱子无法移动或者通道被堵住的情况&#xff0c;所以需要巧妙地利用有限的空间和通道&#xff0c;合理…