Flink系列之Flink中Window原理及实践

news2024/9/21 16:15:30

title: Flink系列


一、Flink Window 概述

官网链接:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/

摘取一段话:

Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink and how the programmer can benefit to the maximum from its offered functionality.

聚合事件(比如计数、求和、求最值)在流上的工作方式与批处理不同。比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和” 。window 是一种可以把无限数据切割为有限数据块的手段。

  • 每隔 1个小时 做一次统计

  • 每接收到 500 条数据就做一次计算

​ 窗口可以是 时间驱动的 【Time Window】(比如:每30秒)或者 数据驱动的【Count Window】 (比如:每100个元素)

图片:

在这里插入图片描述

​ Flink 的 window 分为两种类型的 Window,分别是:Keyed Windows 和 Non-Keyed Windows,他们的使用方式不同:
Keyed Windows:

stream
	.keyBy(...) <- keyed versus non-keyed windows
	.window(...) <- required: "assigner"
	[.trigger(...)] <- optional: "trigger" (else default trigger)
	[.evictor(...)] <- optional: "evictor" (else no evictor)
	[.allowedLateness(...)] <- optional: "lateness" (else zero)
	[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
	.reduce/aggregate/apply() <- required: "function"
	[.getSideOutput(...)] <- optional: "output tag"

Non-Keyed Windows:

stream
	.windowAll(...) <- required: "assigner"
	[.trigger(...)] <- optional: "trigger" (else default trigger)
	[.evictor(...)] <- optional: "evictor" (else no evictor)
	[.allowedLateness(...)] <- optional: "lateness" (else zero)
	[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
	.reduce/aggregate/apply() <- required: "function"
	[.getSideOutput(...)] <- optional: "output tag"

关于 Window 的生命周期,官网有比较详细的描述,有以下几点:

01、当属于某个窗口的第一个元素到达的时候,就会创建一个窗口
02、当时间(event or processing time)超过 window 的结束时间戳加上用户指定的允许延迟(Allowed Lateness)时,窗口将被完全删除。
03、每个 Window 之上,都绑定有一个 Trigger 或者一个 Function(ProcessWindowFunction, ReduceFunction, or AggregateFunction)用来执行窗口内数据的计算。
04、可以给 Window 指定一个 Evictor,它能够在 after the trigger fires 以及 before and/or after the function is applied 从窗口中删除元素。

每隔 2 分钟,执行过去 5 分钟内的数据的计算,是有3分钟的重复的数据的。

20:10:00 【20:05:00 - 20:10:00】
20:12:00 【20:07:00 - 20:12:00】 和上一个窗口的共同部分:20:07:00 - 20:10:00, 删除 【20:07:00 - 20:09:00】
20:14:00 【20:09:00 - 20:14:00】 和上一个窗口的共同部分:20:09:00 - 20:12:00, 删除 【20:09:00 - 20:11:00】

二、Flink Window 类型

窗口通常被区分为不同的类型: 按照计算需求做分类:

tumbling windows:滚动窗口 【没有重叠】
sliding windows:滑动窗口 【有重叠】
session windows:会话窗口
global windows: 全局窗口,没有窗口

2.1 tumblingwindows:滚动窗口(没有重叠)

​ A tumbling windows assigner assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure.

{% asset_img tumbling-windows.svg %}
在这里插入图片描述

2.2 slidingwindows:滑动窗口(有重叠)

​ The sliding windows assigner assigns elements to windows of fixed length. Similar to a tumbling windows assigner, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.

​ For example, you could have windows of size 10 minutes that slides by 5 minutes. With this you get every 5 minutes a window that contains the events that arrived during the last 10 minutes as depicted by the following figure.

{% asset_img sliding-windows.svg %}
在这里插入图片描述

2.3 session windows

The session windows assigner groups elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time, in contrast to tumbling windows and sliding windows. Instead a session window closes when it does not receive elements for a certain period of time, i.e., when a gap of inactivity occurred. A session window assigner can be configured with either a static session gap or with a session gap extractor function which defines how long the period of inactivity is. When this period expires, the current session closes and subsequent elements are assigned to a new session window.

{% asset_img session-windows.svg %}
在这里插入图片描述

2.4 global windows

A global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.

{% asset_img non-windowed.svg %}
在这里插入图片描述

2.5 Flink Window 类型总结

2.5.1 图片对比

图片对比:

在这里插入图片描述

​ 一般来说,绝大部分需求,使用 countWindow 和 timeWindow 就能解决需求了,稍微偏一点的需求才需要自定义 Window

// 老版本的使用方式
dataStream.timeWindow()
dataStream.countWindow()

// 新版本的使用方式,WindowAssigner 有常用的四个子类
dataStream.Window(WindowAssigner)

2.5.2 代码样例

1、Flink01_Old_CountWindow

package com.aa.flinkjava.window2;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window2
 * 需求: 单词每出现三次统计一次
 *
 * CountWindow 的示例
 */
public class Flink01_Old_CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataTextStream = executionEnvironment.socketTextStream("hadoop12", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataTextStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        wordAndOne.keyBy(0)
                //本案例的核心逻辑点。 每过来三条数据统计一次。
                .countWindow(3)
                .sum(1)
                .print();

        executionEnvironment.execute("Flink01_Old_CountWindow");
    }
}

2、Flink02_Old_TimeWindow

package com.aa.flinkjava.window2;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window2
 * 需求:每3秒统计一次。
 */
public class Flink02_Old_TimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //老版本的这个必须设置一下,否则报错。
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        DataStreamSource<String> dataTextStream = executionEnvironment.socketTextStream("hadoop12", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataTextStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        wordAndOne.keyBy(0)
                //本案例的核心逻辑点。 每3秒钟统计一次。
                .timeWindow(Time.seconds(3))
                .sum(1)
                .print();

        executionEnvironment.execute("Flink02_Old_TimeWindow");
    }
}

三、Flink Window 操作使用

3.1 Keyed Windows 和 Non-Keyed Windows

Keyed Windows 语法:

stream
	.keyBy(...) <- keyed versus non-keyed windows
	.window(...) <- required: "assigner"
	[.trigger(...)] <- optional: "trigger" (else default trigger)
	[.evictor(...)] <- optional: "evictor" (else no evictor)
	[.allowedLateness(...)] <- optional: "lateness" (else zero)
	[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
	.reduce/aggregate/apply() <- required: "function"
	[.getSideOutput(...)] <- optional: "output tag"

Non-Keyed Windows 语法:

stream
	.windowAll(...) <- required: "assigner"
	[.trigger(...)] <- optional: "trigger" (else default trigger)
	[.evictor(...)] <- optional: "evictor" (else no evictor)
	[.allowedLateness(...)] <- optional: "lateness" (else zero)
	[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
	.reduce/aggregate/apply() <- required: "function"
	[.getSideOutput(...)] <- optional: "output tag"

3.2 Fink Window Function

3.2.1 Tumbling window 和 sliding window

演示程序:

wordAndOne.keyBy(0)
                //基于 EventTime 每隔5s 做一次计算
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum(1)
                .print();


wordAndOne.keyBy(0)
                // 每间隔 2 秒 ,计算过去4秒的数据
                .window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
                .sum(1)
                .print();

3.2.2 Session Window

1、小需求

一个需求

实时计算单词出现的次数,但是并不是每次接受到单词以后就输出单词出现的次数,而是当过了 5秒 以后没收到这个单词,就输出这个单词的次数

扩展:

思考一下,假设没有 session window ,有什么方式可以实现呢?Flink State 编程 + 定时器

解决问题的思路:

01、利用 state 存储 key,count 和 key 到达的时间, 定义一个 POJO 实体对象来保存这三个信息,然后通过一个 ValueState 来维护这个状态。
02、每接收到一个单词,更新状态中的数据。
03、对于每个 key 都注册一个【定时器】,如果过了 5秒 没接收到这个 key 的话,那么就触发这个定时器,这个定时器就判断当前的 time 是否等于这个 key 的最后修改时间 + 5s,如果等于则输出 key 以及对应的 count 。

2、SessionWindow实现

发现如果使用这样的方式来实现这个需求,还挺复杂的,其实Flink 给我们提供了专门的 window API 可以非常快速方便的进行求解

package com.aa.flinkjava.window2;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window2
 */
public class Flink04_WordCount_SessionWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataTextStream = executionEnvironment.socketTextStream("hadoop12", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataTextStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        wordAndOne.keyBy(0)
                //wordAndOne.keyBy(tuple -> tuple.f0)
                //本案例的核心逻辑点。 使用 SessionWindow 。5秒没有再次接受到数据的话,就执行计算。
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
                .sum(1)
                .print();

        executionEnvironment.execute("Flink04_WordCount_SessionWindow");
    }
}

3.2.3 Global window

Global window + Trigger 一起配合才能使用

需求:单词每出现三次统计一次

package com.aa.flinkjava.window2;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window2
 * GlobalWindow 的测试
 */
public class Flink05_WordCount_GlobalWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataTextStream = executionEnvironment.socketTextStream("hadoop12", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataTextStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        wordAndOne.keyBy(0)
                //wordAndOne.keyBy(tuple -> tuple.f0)
                //本案例的核心逻辑点。 指定使用
                .window(GlobalWindows.create())
                //调用定义的trigger。每间隔3条元素执行一次计算。CountTrigger 是flink提供的常用实现
                .trigger(CountTrigger.of(3))
                .sum(1)
                .print();

        executionEnvironment.execute("Flink05_WordCount_GlobalWindow");
    }
}

总结:效果跟 CountWindow(3)很像,但又有点不像,因为如果是 CountWindow(3),单词每次出现的都是 3 次,不会包含之前的次数,而我们刚刚的这个每次都包含了之前的次数。

3.3 Flink Window Trigger

需求:自定义一个 CountWindow:使用 Trigger 自己实现一个类似 CountWindow 的效果

CountWindow:Count 的滚动窗口: 每隔多少条数据执行一次计算。

package com.aa.flinkjava.window2;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window2
 *
 * 需求: 通过自定义的Trigger的方式,实现CountWindow的效果。
 */
public class Flink06_WordCount_ByUDTrigger {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataTextStream = executionEnvironment.socketTextStream("hadoop12", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataTextStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        wordAndOne.keyBy(0)
                //wordAndOne.keyBy(tuple -> tuple.f0)
                //本案例的核心逻辑点。 指定使用 GlobalWindows
                .window(GlobalWindows.create())
                //Trigger是一个抽象类 。 CountTrigger 是flink提供的常用实现 。
                //自己自定义的trigger 。比如说还是实现 计算逻辑是:每间隔 3 条元素执行一次计算 。 只不过这个计算逻辑是自己写的代码。
                .trigger(new MyCountTrigger(3))
                .sum(1)
                .print();

        executionEnvironment.execute("Flink06_WordCount_ByUDTrigger");
    }

    /**
     * 自定义触发器,每3个元素触发一次
     * Tuple2<String,Integer> 输入数据的类型
     * GlobalWindow 窗口的数据类型
     * 里面有四个抽象的方法,需要去实现: 其中 onElement 表示每次接收到这个window的输入,就调用一次。
     */
    static class MyCountTrigger extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
        //表示指定的元素的最大的数量
        private long maxCount;

        //用于存储每个key所对应的count的值
        //一般情况下,当遇到复杂的需求的时候,一般都是使用 mapstate 和 liststate
        private ReducingStateDescriptor<Long> stateDescriptor = new ReducingStateDescriptor<Long>("count",
                new ReduceFunction<Long>() {
                    @Override
                    public Long reduce(Long v1, Long v2) throws Exception {
                        return v1 + v2;
                    }
                }, Long.class
        );

        //定义一个有参构造器
        public MyCountTrigger(long maxCount) {
            this.maxCount = maxCount;
        }

        /**
         * 当一个元素进入到window的时候就会调用这个方法
         * @param element  元素
         * @param timestamp  进来的时间点
         * @param window  元素所属的窗口
         * @param ctx  上下文对象
         * @return  TriggerResult 有四种结果:  点击到源码中,可以看到有四种
         *      TriggerResult.CONTINUE: 表示对的window不做任何处理
         *      TriggerResult.FIRE_AND_PURGE:  先触发window计算,然后删除window中的数据
         *      TriggerResult.FIRE: 表示触发window的计算
         *      TriggerResult.PURGE: 表示清空window中的数据
         *
         * @throws Exception
         */
        @Override
        public TriggerResult onElement(Tuple2<String, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

            //拿到当前的key对应的count的状态的值, 并累加
            ReducingState<Long> count = ctx.getPartitionedState(stateDescriptor);
            count.add(1L);

            //下面是业务逻辑判断了,当单词的个数到了3个的时候输出
            if (count.get() == maxCount){
                //清空状态个数
                count.clear();
                //先触发计算,然后删除窗口内的数据
                return TriggerResult.FIRE_AND_PURGE;
            }else {
                return TriggerResult.CONTINUE;
            }
        }

        //一般也不用定时器,这个方法里面不用写逻辑了。
        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
            return null;
        }

        //一般也不用定时器,这个方法里面不用写逻辑了。
        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
            return null;
        }

        //这个里面清楚状态的值。
        @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
            ctx.getPartitionedState(stateDescriptor).clear();
        }
    }

}

经过测试,其实最终的效果和 CountWindow 是一样的。

3.4 Flink Window Evictor

需求:实现每隔 2 个单词,计算最近 3 个单词。类似于使用 Evictor 自己实现一个类似 CountWindow(3,2) 的效果

直接看代码: com.aa.flinkjava.window2.Flink07_WordCount_ByUDEvictor

3.5 Flink Window 增量聚合

Flink Window 增量聚合:窗口中每进入一条数据,就进行一次计算,等窗口结束时间到了,就输出最后的结果

常用的聚合算子:

1、reduce(ReduceFunction)       其实ReduceFunction 是 AggregateFunction 的一个特例 
2、aggregate(AggregateFunction)   AggregateFunction 是一个通用实现,代表聚合
3、sum(),min(),max()           其实sum(),min(),max() 是 reduce 的特例

下图的聚合逻辑:求和

在这里插入图片描述

3.5.1 增量案例1

增量代码演示实现:

package com.aa.flinkjava.window2;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Date 2022/3/16 14:17
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window2
 *
 * 增量聚合演示案例
 */
public class Flink08_ReduceFunction_Incremental {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataTextStream = executionEnvironment.socketTextStream("hadoop12", 9999);

        //给传递的数据转化成为整数
        SingleOutputStreamOperator<Integer> data1 = dataTextStream.map(number -> Integer.valueOf(number));

        /**
         * data2是一个元素是整型的数据流 。
         *
         * 统计的都是这一个窗口内的数据,可以给窗口 改 多点进行测试。
         */
        AllWindowedStream<Integer, TimeWindow> data2 = data1.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)));

        /**
         * ReduceFunction的参数泛型 就是输入的数据流的 数据的类型
         * 增量聚合逻辑,窗口中每次进入一个元素,就调用reduce执行一次聚合
         */
        SingleOutputStreamOperator<Integer> result = data2.reduce(new ReduceFunction<Integer>() {
            /**
             * @param value1 临时的累加的值
             * @param value2 新过来的一条新值
             * @return
             * @throws Exception
             */
            @Override
            public Integer reduce(Integer value1, Integer value2) throws Exception {
                System.out.println("value1: " + value1);
                System.out.println("value2: " + value2);
                return value1 + value2;
            }
        });

        result.print();

        executionEnvironment.execute(Flink08_ReduceFunction_Incremental.class.getSimpleName());
    }
}

3.5.2 增量案例2

需求:求每隔窗口里面的数据的平均值

package com.aa.flinkjava.window2;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @Author AA
 * @Date 2022/3/16 14:29
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window2
 *
 * 需求: 自定义的 MyAggregateFunction 实现平均值
 */
public class Flink09_AggregateFunction_WindowAvg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataTextStream = executionEnvironment.socketTextStream("hadoop12", 9999);

        //给传递的数据转化成为整数
        SingleOutputStreamOperator<Integer> data1 = dataTextStream.map(number -> Integer.valueOf(number));

        /**
         * data2是一个元素是整型的数据流 。
         */
        AllWindowedStream<Integer, TimeWindow> data2 = data1.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        /**
         * 调用自定义的 MyAggregateFunction 实现平均值
         */
        SingleOutputStreamOperator<Double> result = data2.aggregate(new MyAggregateFunction());

        result.print();

        executionEnvironment.execute(Flink09_AggregateFunction_WindowAvg.class.getSimpleName());
    }

    /**
     * AggregateFunction<IN, ACC, OUT>
     * Type parameters:
     * <IN> – The type of the values that are aggregated (input values)  输入数据类型
     * <ACC> – The type of the accumulator (intermediate aggregate state). 自定义的中间状态类型
     * <OUT> – The type of the aggregated result  输出的数据类型
     */
    static class MyAggregateFunction implements AggregateFunction<Integer, Tuple2<Integer,Integer>, Double>{

        /**
         * 初始化的累加器, 辅助变量
         * @return
         */
        @Override
        public Tuple2<Integer, Integer> createAccumulator() {
            return new Tuple2<>(0,0);
        }

        /**
         * 针对于数据的计算
         * 也就是聚合  临时的结果 + 新的一条数据
         * @param value  新的数据
         * @param accumulator  临时的中间的状态结果  泛型Tuple2<Integer, Integer> 中的第一个参数是元素个数,第二个是累加的结果值
         * @return
         */
        @Override
        public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0 + 1, accumulator.f1 + value);
        }

        /**
         * 返回结果,获取最终的值
         * @param accumulator
         * @return
         */
        @Override
        public Double getResult(Tuple2<Integer, Integer> accumulator) {
            return (double) accumulator.f1 / accumulator.f0;
        }

        /**
         * 临时的结果进行合并
         * @param a
         * @param b
         * @return
         */
        @Override
        public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
            return Tuple2.of(a.f0 + b.f0 ,a.f1 + b.f1);
        }
    }
}

3.6 Flink Window 全量聚合

​ Flink Window 全量聚合:等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】
​ 常用的聚合算子:

apply(WindowFunction)
process(ProcessWindowFunction)

ProcessWindowFunction 比 WindowFunction提供了更多的上下文信息。类似于 map 和 RichMap 的关系

在这里插入图片描述

3.6.1 全量案例1

代码如下:

com.aa.flinkjava.window2.Flink10_AggregateFunction_Full

3.7 Flink Window Join

3.7.0 官网链接

官网链接:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/joining/

两个 window 之间可以进行 join,join 操作只支持三种类型的 window:滚动窗口,滑动窗口,会话窗口

具体使用方式:

select a.*, b.* from a join b on a.id = b.id;
1、指定两张表
2、指定这两张表的链接字段

具体使用方式:

// 在 Flink 中对两个 DataStream 做 Join
// 1、指定两个流
// 2、指定这两个流的链接字段
	stream.join(otherStream) //两个流进行关联
	.where(<KeySelector>) //选择第一个流的key作为关联字段
	.equalTo(<KeySelector>) //选择第二个流的key作为关联字段
	.window(<WindowAssigner>) //设置窗口的类型
	.apply(<JoinFunction>) //对结果做操作 process处理   apply 类似于 foreach

重点理解清楚这个语法即可!

3.7.1 Tumbling Window Join

When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to a JoinFunction or FlatJoinFunction. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted!

{% asset_img tumbling-window-join.svg %}
在这里插入图片描述

核心代码:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

3.7.2 Sliding Window Join

When performing a sliding window join, all elements with a common key and common sliding window are joined as pairwise combinations and passed on to the JoinFunction or FlatJoinFunction. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another!

{% asset_img sliding-window-join.svg %}
在这里插入图片描述

核心代码:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

3.7.3 Session Window Join

When performing a session window join, all elements with the same key that when “combined” fulfill the session criteria are joined in pairwise combinations and passed on to the JoinFunction or FlatJoinFunction. Again this performs an inner join, so if there is a session window that only contains elements from one stream, no output will be emitted!

{% asset_img session-window-join.svg %}
在这里插入图片描述

核心代码:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

3.7.4 Interval Join

{% asset_img interval-join.svg %}
在这里插入图片描述

核心代码:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

小总结:

Flink 的容错: state checkpoint savepoint statebackend restartStrategy failoverstartergy

Flink 处理乱序数据:Window + Time + Watermark

Flink Window 详细使用:定义window(WindowAssigner + Trigger + Evictor) + 定义window的计算逻辑:reduce / aggregate / process / apply

在企业环境中,一般情况下,使用 Flink 实现需求,就是要通过以上这些技能来完成就行了。其实还差一个技能:部署 Application 的时候,需要指定资源。需要搞清楚 Task 运行的时候内存使用。其实也就是Flink 的内存模型。



声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/67807.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于蒙特卡洛法的规模化电动车有序充放电及负荷预测(PythonMatlab实现)

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️❤️&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f389;作者研究&#xff1a;&#x1f3c5;&#x1f3c5;&#x1f3c5;主要研究方向是电力系统和智能算法、机器学…

刷爆力扣之三个数的最大乘积

刷爆力扣之三个数的最大乘积 HELLO&#xff0c;各位看官大大好&#xff0c;我是阿呆 &#x1f648;&#x1f648;&#x1f648; 今天阿呆继续记录下力扣刷题过程&#xff0c;收录在专栏算法中 &#x1f61c;&#x1f61c;&#x1f61c; 该专栏按照不同类别标签进行刷题&#x…

运维实战100:CDH5.16.2升级至CDH6.3.2

本期来分享一个cdh企业运维实战案例 背景 为适应公司业务发展需求&#xff0c;提高相关大数据组件版本&#xff0c;解决开发中的一些技术问题和代码优化&#xff0c;需要将现有集群CDH版本由5.x版本升级为6.3.x版本&#xff0c;也是为了适配如Flink、Doris等一些计算引擎。由…

ArcGIS Pro从0到1入门实战教程 书籍淘宝线上销售,免费下载数据和视频

网址&#xff1a;https://m.tb.cn/h.USz9rbD?tkcu0Vd2cABAV 购书后五星好评&#xff0c;加下面微信&#xff0c;截图发给我们&#xff1a;送Python电子书&#xff0c;下面是我们的微信 关注翎树文化&#xff0c;获得更多好书信息 翎树文化 翎树文化致力于图书出版|科技文化|视…

leetcode:1203. 项目管理【双topo:组间topo + 组内topo】

目录题目截图题目分析ac code总结题目截图 题目分析 没有第一个条件&#xff0c;就是简单topo排序有了第一个条件&#xff0c;每个小组都需要完全隔开&#xff0c;因此不同小组间也需要一个topo排step1&#xff1a;对于group为-1的自成一组step2&#xff1a;建图&#xff0c;组…

什么是信息摘要?

信息摘要就是原数据通过某个算法生成的一个固定长度的单向Hash散列值&#xff08;PS:常用来生成信息摘要的算法有MD5与SHA算法)。固定长度得意思就是不论原文内容多大&#xff0c;其生成的信息摘要都是固定长度的。单向的意思是过程不可逆&#xff0c;即只能通过原始数据生成Ha…

Mybatis用到的设计模式

虽然我们都知道有26个设计模式&#xff0c;但是大多停留在概念层面&#xff0c;真实开发中很少遇到&#xff0c;Mybatis源码中使用了大量的设计模式&#xff0c;阅读源码并观察设计模式在其中的应用&#xff0c;能够更深入的理解设计模式。 Mybatis至少遇到了以下的设计模式的…

提高组比赛分析(1)

停更n个月&#xff0c;我又来了&#xff01; 今天打了场模拟赛&#xff0c;差点就AK IOI了 废话不多说 正片开始 题目一&#xff1a;#1751. 第 T 个数 Description 给定一个 n(0<n≤10000) 个整数构成的序列&#xff0c;每个数 a[i] 都是小于 210^9 的非负整数 &#x…

[附源码]Python计算机毕业设计SSM家居购物系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

yalmip和cplex安装步骤(Matlab)

&#x1f4cb;&#x1f4cb;&#x1f4cb;本文目录如下&#xff1a;⛳️⛳️⛳️ ​ 目录 1 yalmip和cplex的安装 1.1 yalmip安装 1.2 cplex安装过程 1 yalmip和cplex的安装 链接&#xff1a;https://pan.baidu.com/s/13One78qt1uSz92zNC6Xvlg 提取码&#xff1a;bicr --来…

websocket实践与浅入浅出

websocket实践与浅入浅出websocket与http的区别&#xff1f;websocket的应用场景&#xff1f;websocket通信方式websocket协议结构分布式下IM多端同步的实现方案TIP1. 心跳2. 多端同步3. wss4. otherwebsocket与http的区别&#xff1f; Http&#xff1a;请求与响应的模式&…

2023最新SSM计算机毕业设计选题大全(附源码+LW)之java校园招聘信息管理系统64f99

这个选题的话其实有很多的&#xff0c;就看你自己能接受怎么样的&#xff0c;比如可以做网站类、系统类、小程序类、安卓app、大数据类等等&#xff0c;这个也要看你个人能力和技术问题&#xff0c;如果技术小白或者有一点点基础的话建议选择网站类和系统类的&#xff0c;如果有…

关于NDK

libc_shared.so 在目前ndk的最新版本25.1.8937393中有4个libc_shared.so&#xff0c;用Everything搜索结果如下&#xff1a; 可以看到&#xff0c;大小最小的有4M多。 对于libc库&#xff0c;官方介绍在此&#xff0c;摘取一些片段如下&#xff1a; LLVM 的 libc 是 C 标准库…

[附源码]Python计算机毕业设计Django养生药膳推荐系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

Java学习多态之向下转型

目录 语法 注意事项 一、 二、 三、 举例说明 注意事项第二条的解释 总结 向下转型&#xff1a;解决向上转型中不能调用子类特有成员的问题 语法 子类类型 引用名 &#xff08;子类类型&#xff09;父类引用&#xff1b; 注意事项 一、 只能强转父类的引用&#x…

Windows子系统WSL2 (ubuntu安装 docker、nvidia-docker)

文章目录一、准备二、安装WSL2三、安装docker nvidia-docker附录&#xff1a;WSL与linux路径映射一、准备 第一步&#xff1a;【win R】输入winver 检查你的 Windows 版本&#xff0c;验证内部版本是否低于19041, 升级系统选择Dev 渠道 第二步&#xff1a;【控制面板】>…

基于RRT算法的最优动力学路径规划(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 RRT是Steven M. LaValle和James J. Kuffner Jr.提出的一种通过随机构建Space Filling Tree实现对非凸高维空间快速搜索的算法。…

这款台灯,不仅能护眼,还能点读和互动

疫情反复&#xff0c;孩子不能正常返校 天天在家上网课、写作业 长时间用眼引发视疲劳 用眼健康需要格外关注 想要改善孩子的用眼环境 CTWing物联网市场推荐使用 好记星智能学习台灯 国AA级护眼标准&#xff0c;能点读&#xff0c;会说话 这款智能学习台灯好在哪里&…

旅游网站毕业设计,旅游网站网页设计设计源码,旅游网站设计毕业论文

项目背景和意义 目的&#xff1a;本课题主要目标是设计并能够实现一个基于web网页的景区景点购票系统&#xff0c;整个网站项目使用了B/S架构&#xff0c;基于python的Django框架下开发&#xff1b;管理员通过后台录入信息、管理信息&#xff0c;设置网站信息&#xff0c;管理会…

186页13万字智慧能源大数据分析平台建设方案

目录 智慧能源大数据分析平台及能源集团数字化平台建设方案 目录 一、相关项目背景 二、需求理解 2.1 需求理解 三、方案设计 3.1 整体方案设计 3.3.1 整体架构 3.3.2 解决方案说明 3.3.3 需求应答 3.2 数据仓库 3.2.1 数据仓库架构 3.2.2 数据仓库产品说明 3.2.3…