【入门Flink】- 08Flink时间语义和窗口概念

news2024/12/27 12:03:35

Flink-Windows

是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。【事件驱动,没有数据到达永远都不会创建窗口】

1)窗口分类

(1)按照驱动类型分

(1)时间窗口

时间窗口以时间点来定义窗口的开始(start)和结束(end),截取出的就是某一时间段的数据。

(2)计数窗口

计数窗口基于元素的个数截取数据,到达固定的个数时就触发计算并关闭窗口。

(2)按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

(1)滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是
“首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口

滚动窗口应用非常广泛,可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

(2)滑动窗口(Sliding Windows)

滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率

滚动窗口也可以看作是一种特殊的滑动窗口一一窗口大小等于滑动步长(size=slide)
滑动窗口适合计算结果更新频率非常高的场景。

(3)会话窗口(Session Windows)

会话窗口,是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义。
会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到
来的时间间隔(gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,
那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

会话窗口之间一定是不会重叠的,而且会留有至少为size的间隔(session)

在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计。

(4)全局窗口(Global Windows)

“全局窗口”,这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时侯, 默认是不会做触发计算的,如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

2)窗口 API

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

(1)按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。

stream.keyBy(...)
.window(...)

(2)非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。

stream.windowAll(...)

注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

(2)窗口分配器(Window Assigners)和窗口函数(WindowFunctions)

stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)

窗口分配器

(1)时间窗口

滚动处理时间窗口

stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .aggregate(...)

.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

滑动处理时间窗口

stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

处理时间会话窗口

stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

还可以调用 withDynamicGap()方法定义 session gap 的动态提取逻辑。

滚动事件时间窗口

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)

滑动事件时间窗口

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

事件时间会话窗口

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)

(2)计数窗口

滚动计数窗口

stream.keyBy(...)
.countWindow(10)

滑动计数窗口

stream.keyBy(...)
.countWindow(10, 3)

全局窗口

stream.keyBy(...)
.window(GlobalWindows.create());

注意:使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

窗口函数

(1)增量聚合函数(ReduceFunction / AggregateFunction)

归约函数(ReduceFunction)

类似Reduce算子,只不过固定时间才会输出

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> stream = env.socketTextStream("124.222.253.33", 7777);

        stream.map(new WaterSensorMapFunction())
                .keyBy(WaterSensor::getId)
                // 设置滚动事件时间窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("调用reduce 方法,之前的结果:" + value1 + ",现在来的数据:" + value2);
                        return new WaterSensor(value1.getId(), System.currentTimeMillis(), value1.getVc() + value2.getVc());
                    }
                })
                .print();
        env.execute();

聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样

image-20231109192227819

有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

输入类型IN 就是输入流中元素的数据类型;累加器类型 ACC 是进行聚合的中间状态类型;而输出类型OUT是最终计算结果的类型。

接口中有四个方法:

  • createAccumulator():创建一个累加器,为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

AggregateFunction 的工作原理:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);

        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(
                new AggregateFunction<WaterSensor, Integer, String>() {
                    @Override
                    public Integer createAccumulator() {
                        System.out.println("创建累加器");
                        return 0;
                    }

                    @Override
                    public Integer add(WaterSensor value, Integer accumulator) {
                        System.out.println(" 调用add方法,value=" + value);
                        return accumulator + value.getVc();
                    }

                    @Override
                    public String getResult(Integer accumulator) {
                        System.out.println("调用getResult方法");
                        return accumulator.toString();
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        System.out.println("调用merge方法");
                        return null;
                    }
                }
        );
        aggregate.print();
        env.execute();

(2)全窗口函数(full window functions)

基于全部的数据计算

全窗口函数有两种:WindowFunction ProcessWindowFunction

窗口函数(WindowFunction)

基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());

该类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用

处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。

时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> process = sensorWS.process(
                new ProcessWindowFunction<WaterSensor,
                        String, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        long count = elements.spliterator().estimateSize();
                        long windowStartTs = context.window().getStart();
                        long windowEndTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);
                    }
                }
        );
        process.print();
        env.execute();

增量聚合和全窗口函数结合使用

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,WindowFunction<TRKW>function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)
    
// AggregateFunction 与 WindowFunction 结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,
ProcessWindowFunction<VRKW>     

结合使用

public class WindowAggregateAndProcessDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数:
    /*
      增量聚合 Aggregate + 全窗口 process
      1、增量聚合函数处理数据: 来一条计算一条
      2、窗口触发时, 增量聚合的结果(只有一条)传递给全窗口函数
      3、经过全窗口函数的处理包装后,输出

      结合两者的优点:
      1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
      2、全窗口函数: 可以通过 上下文 实现灵活的功能
    */
        // sensorWS.reduce() //也可以传两个
        SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                new MyAgg(),
                new MyProcess()
        );
        result.print();
        env.execute();
    }

    public static class MyAgg implements AggregateFunction

            <WaterSensor, Integer, String> {
        @Override
        public Integer createAccumulator() {
            System.out.println("创建累加器");
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用 add 方法,value=" + value);
            return accumulator + value.getVc();
        }

        @Override
        public String getResult(Integer accumulator) {
            System.out.println("调用 getResult 方法");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            System.out.println("调用 merge 方法");
            return null;
        }
    }

    // 全窗口函数的输入类型 = 增量聚合函数的输出类型
    public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {
        @Override

        public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            long startTs = context.window().getStart();
            long endTs = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTs, "yyyyMM-dd HH:mm:ss.SSS");
            long count = elements.spliterator().estimateSize();
            out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);
        }
    }
}

Flink-Time

  • Event Time:事件时间,一个是数据产生的时间(时间戳Timestamp)
  • Processing time:处理时间,数据真正被处理的时间

image-20231108081425604

事件时间在实际应用中更为广泛,从Flink 1.12版本开始,Flink已经将事件时间作为默认的时间语义

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1194017.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BI 数据可视化平台建设(1)—交叉表组件演变实战

作者&#xff1a;vivo 互联网大数据团队 - Zhu Jianchen 本文是vivo互联网大数据团队《BI数据可视化平台建设》系列文章第1篇 - 交叉表组件。 交叉表在数据分析里应用广泛&#xff0c;通过本文&#xff0c;你将了解到&#xff1a; 交叉表的基本概念&#xff0c;以及BI可视化平…

canal实操应用

一、MySQL的binlog日志 1.1、binlog的分类 binlog一般分为三类&#xff1a;statement语句级&#xff0c;记录一条一条的SQL&#xff0c;一条SQL可能更改多行&#xff0c;且SQL语句中如果用到now()函数或者random()函数&#xff0c;会存在数据不一致的问题。row行级&#xff0…

大容量疯了!居然想把磁带放到硬盘,100TB+是否可以实现?

1.引言 上一篇关于大容量硬盘的文章&#xff08;HDD最后的冲刺&#xff1a;大容量硬盘的奋力一搏&#xff09;中&#xff0c;我们针对大容量硬盘研发状态&#xff0c;小编最近又有了新发现。WDC希望可以通过HDD和磁带结合&#xff0c;把盘的容量提升到100TB。 2.数据大爆炸的…

C# Socket通信从入门到精通(7)——单个异步TCP服务器监听单个客户端C#代码实现

前言: 我们在开发TCP服务器程序的时候,有的时候需要一些异步的应用,比如我读取客户端发送的数据,但是服务器程序不能一直等待客户端数据发送过来,服务器要先做一些别的事情,这个时候C# Socket通信从入门到精通(5)——单个同步TCP服务器监听一个客户端C#代码实现这篇文…

低代码平台受欢迎度排行榜:揭秘市场热门之选

对于企业而言&#xff0c;低代码平台不仅仅是一个开发工具&#xff0c;它更是一个加速器&#xff0c;推动了企业的数字化转型进程。传统的开发模式下&#xff0c;业务部门与IT部门之间常常存在沟通障碍&#xff0c;导致需求难以实现或实现速度缓慢。而低代码平台打破了这种障碍…

C++学习贴---C++预处理器

文章目录 前言预处理器#define预处理条件编译#ifdef#ifndef#if、#elif、#else 和 #endif #和##运算符 预定义宏 前言 预处理器 预处理器是指一些指示编译器在实际编译之前所需要完成的指令。 预处理器负责处理以**井号&#xff08;#&#xff09;**开头的预处理指令&#xff0…

为啥$p(w|D)=p(y|X,w)$?

为啥 p ( w ∣ D ) p ( y ∣ X , w ) p(w|D)p(y|X,w) p(w∣D)p(y∣X,w)&#xff1f; p ( w ∣ X , y ) p ( w ∣ D ) p(w|X,y)p(w|D) p(w∣X,y)p(w∣D), p ( w ∣ D ) p ( D , w ) / p ( D ) p(w|D)p(D,w)/p(D) p(w∣D)p(D,w)/p(D)为啥 p ( D ∣ w ) p ( y ∣ X , w ) p(D|…

PLC开放式以太网通信网络状态查看工具netstat

在进行PLC的开放式以太网通信时,为了查看网络状态我们可以利用ping这个强有力的工具,还可以使用netstat这个工具。 博途PLC开放式以太网通信 UDP通信 博途PLC 1200/1500PLC开放式以太网通信TSEND_C通信(UDP)_RXXW_Dor的博客-CSDN博客文章浏览阅读1.7k次。开放式TSEND_C通信…

大数据毕业设计选题推荐-污水处理大数据平台-Hadoop-Spark-Hive

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

【Java】记一次服务内实现排队消费模式

主要是记录一下实现过程和实现的过程中遇到的坑。 我的业务 系统中有一个接口&#xff0c;是从大数据那边拉数据&#xff0c;之前的做法是&#xff0c;开个线程池&#xff0c;让SQL去执行&#xff0c;可是如果大量的慢SQL同时&#xff0c;请求数据库的话会适得其反。并且还有…

Python语法基础(字符串 列表 元组 字典 集合)

目录 字符串(str)字符串的创建特殊情况字符串的转义字符字符串的运算符字符串常用方法求字符串长度去掉多余空格是否包含某子串分割字符串合并字符串替换字符串统计统计字符串出现的次数 练习&#xff1a;判断字符串是否为回文串 列表(list)列表的创建列表常用方法遍历列表列表…

小程序如何设置下单提示语句

下单提示会展示在购物车和提交订单页面&#xff0c;它可以帮助商家告知客户事项&#xff0c;提高用户体验和减少错误操作。例如提示&#xff1a;商品是否包邮、某些区域是否发货、商品送达时间等等。 在小程序管理员后台->配送设置处&#xff0c;填写下单提示。在设置下单提…

基于ssm的高校失物招领管理系统

基于ssm的高校失物招领管理系统 摘要 失物招领管理系统是一种利用现代信息技术&#xff0c;为高校提供高效、便捷的失物招领服务的平台。本系统基于SSM框架&#xff08;Spring SpringMVC MyBatis&#xff09;&#xff0c;充分利用了各框架的优势&#xff0c;实现了系统的稳定…

1.微服务与SpringCloud

微服务和SpringCloud 文章目录 微服务和SpringCloud1.什么是微服务2.SpringCloud3. 微服务 VS SpringCloud4. SpringCloud 组件5.参考文档6.版本要求 1.什么是微服务 微服务是将一个大型的、单一的应用程序拆分成多个小型服务&#xff0c;每个服务实现特定的业务功能&#xff…

C#上位机序列10: Winform上位机通用框架

C#上位机序列1: 多线程&#xff08;线程同步&#xff0c;事件触发&#xff0c;信号量&#xff0c;互斥锁&#xff0c;共享内存&#xff0c;消息队列&#xff09; C#上位机序列2: 同步异步(async、await) C#上位机序列3: 流程控制&#xff08;串行&#xff0c;并行&#xff0c…

防火防盗防小人 使用 Jasypt 库来加密配置文件

⚔️ 项目配置信息存放在哪&#xff1f; 在日常开发工作中&#xff0c;我们经常需要使用到各种敏感配置&#xff0c;如数据库密码、各厂商的 SecretId、SecretKey 等敏感信息。 通常情况下&#xff0c;我们会将这些敏感信息明文放到配置文件中&#xff0c;或者放到配置中心中。…

原厂监视综合控制继电器 ZZS-7/1 AC220V 凸出端子固定安装

ZZS-7/11分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/12分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/13分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/14分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/102分闸、合闸、电源监视综合控制装置…

GIT的安装与常见命令

Git的介绍 Git是一个开源的分布式版本控制系统&#xff0c;最初由Linus Torvalds在2005年创建用于管理Linux内核的开发&#xff0c;现在已成为全球最流行的版本控制工具之一。 Git可以跟踪代码的修改&#xff0c;记录开发历程&#xff0c;保证多人合作开发时代码的一致性&…

关于maven读取settings.xml文件的优先级问题

今天在IDEA中配置maven的setting.xml文件路径指向的.m2路径下的setting_a.xml文件&#xff0c;同时&#xff0c;我的maven3.6.3也放在.m2中。 [1] .m2文件夹 [2] apache-maven-3.6.3文件夹 然后&#xff0c;在IDEA中打包发布时发现&#xff0c;无论如何都读取不到指定的setti…

【Linux】Linux常用命令—磁盘管理、压缩包管理

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…