二次开发Flink-coGroup算子支持迟到数据通过测输出流提取

news2025/1/11 2:55:34

目录

1.背景

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

2.2coGroup方法入口

2.3 CoGroupedStreams对象分析

2.4WithWindow内部类分析

2.5CoGroupWindowFunction函数分析

3.修改源码支持获取迟到数据测输出流

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

3.3新增WithWindow构造方法

3.4修改apply方法

3.5开放UnionTypeInfo类的public权限

3.7项目中查看maven是否已经刷新为最新代码

4.测试


1.背景

coGroup算子开窗到时间关闭之后,迟到数据无法通过测输出流提取,intervalJoin算子提供了api,因为join算子底层就是coGroup算子,所以Join算子也不行。

flink版本 v1.17.1

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

    input1.coGroup(input2)
    .where(keySelector1)
    .equalTo(keySelector2)
    .window(windowAssigner)
    .trigger(trigger)
    .evictor(evictor)
    .allowedLateness(allowedLateness)
    .apply(cgroupFunction)

通过上述代码可以看到没有sideOutputLateData的相关方法,用来提取窗口关闭之后的迟到数据

2.2coGroup方法入口

其中创建了一个CoGroupedStreams流对象

    /**
     * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and
     * window can be specified.
     */
    public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
        return new CoGroupedStreams<>(this, otherStream);
    }

2.3 CoGroupedStreams对象分析

他可以理解为构造设计模式的一个Builder类,通过where方法配置第一条流的KeySelector,再返回一个CoGroupedStreams的内部类Where,再通过equalTo方法配置第二条流的KeySelector,再返回EqualTo内部类,window方法配置窗口划分器,返回WithWindow内部类,后续都是窗口的配置 trigger,evictor,allowedLateness配置窗口参数,最后调用apply方法传送用户业务函数

2.4WithWindow内部类分析

WithWindow是最终保存所有配置的内部类包括两条流,窗口配置,key提取器的配置,最终会用户调用apply方法触发CoGroup的业务,在apply方法中通过union联合两条流,然后通过keyby转为KeyedStream,再通过window配置窗口,最终调用窗口函数的apply方法,传入WindowFunction,做CoGroup的业务与用户业务。

具体代码如下已写好备注


    /**
     * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
     * well as a {@link WindowAssigner}.
     *
     * @param <T1> Type of the elements from the first input
     * @param <T2> Type of the elements from the second input
     * @param <KEY> Type of the key. This must be the same for both inputs
     * @param <W> Type of {@link Window} on which the co-group operation works.
     */
    @Public
    public static class WithWindow<T1, T2, KEY, W extends Window> {
        //第一条流
        private final DataStream<T1> input1;
        //第二条流
        private final DataStream<T2> input2;
        //第一个key提取器
        private final KeySelector<T1, KEY> keySelector1;
        //第二个Key提取器
        private final KeySelector<T2, KEY> keySelector2;
        //Key的类型
        private final TypeInformation<KEY> keyType;
        //窗口分配器
        private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
        //窗口出发计算器
        private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

        private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

        private final Time allowedLateness;

        private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
        //构造函数给上面对象赋值
        protected WithWindow(
                DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness) {
            this.input1 = input1;
            this.input2 = input2;

            this.keySelector1 = keySelector1;
            this.keySelector2 = keySelector2;
            this.keyType = keyType;

            this.windowAssigner = windowAssigner;
            this.trigger = trigger;
            this.evictor = evictor;

            this.allowedLateness = allowedLateness;
        }

        /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p>Note: This method's return type does not support setting an operator-specific
         * parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
         * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
         * parallelism.
         */
        public <T> DataStream<T> apply(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);
			//创建合并两个流的公共TypeInfo,UnionTypeInfo最终会将Input1,Input2的数据通过map算子转换为该类型
            UnionTypeInfo<T1, T2> unionType =
                    new UnionTypeInfo<>(input1.getType(), input2.getType());
			//转换成union的KeySelector
            UnionKeySelector<T1, T2, KEY> unionKeySelector =
                    new UnionKeySelector<>(keySelector1, keySelector2);
			//将taggedInput1的数据类容map成UnionTypeInfo<T1, T2>类型
            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
                    input1.map(new Input1Tagger<T1, T2>());
            taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
            taggedInput1.returns(unionType);
	       //将taggedInput2的数据类容map成UnionTypeInfo<T1, T2>类型
            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
                    input2.map(new Input2Tagger<T1, T2>());
            taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
            taggedInput2.returns(unionType);
			//将两个流进行union
            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
			//keyBy并且开窗
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                                    unionStream, unionKeySelector, keyType)
                            .window(windowAssigner);
			//配置窗口触发器
            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
			//配置移除器
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
			//配置allowedLateness
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }
			//创建CoGroupWindowFunction ,并把用户函数传入进去
            return windowedStream.apply(
                    new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

        /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction,
         * TypeInformation)} method has the wrong return type and hence does not allow one to set an
         * operator-specific parallelism
         *
         * @deprecated This method will be removed once the {@link #apply(CoGroupFunction,
         *     TypeInformation)} method is fixed in the next major version of Flink (2.0).
         */
        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            return (SingleOutputStreamOperator<T>) apply(function, resultType);
        }
		
        @VisibleForTesting
        Time getAllowedLateness() {
            return allowedLateness;
        }
		//获取窗口包装流,但是标记为VisibleForTesting,用户无法调用,如果可以调用的话可以通过该方法获取包装流之后通过窗口流获取迟到数据的测输出流
        @VisibleForTesting
        WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
            return windowedStream;
        }
    }

2.5CoGroupWindowFunction函数分析

CoGroupWindowFunction也是CoGroupedStreams内部类,负责做CoGroup的业务,最终将数据封装好转发给用户函数(也就是2.1中apply中的cgroupFunction)

   private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
            extends WrappingFunction<CoGroupFunction<T1, T2, T>>
            implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

        private static final long serialVersionUID = 1L;

        public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
            super(userFunction);
        }

        @Override
        public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
                throws Exception {
			//缓存当前窗口里1号流的数据
            List<T1> oneValues = new ArrayList<>();
			//缓存当前窗口里2号流的数据
            List<T2> twoValues = new ArrayList<>();

            for (TaggedUnion<T1, T2> val : values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
			//传入到用户函数中
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }
    }

3.修改源码支持获取迟到数据测输出流

思路 复制CoGroupedStreams新增一个NewCoGroupedStreams,在WithWindow函数中增加方法sideOutputLateData,让用户传入outputTag,用于提取窗口关闭后的测输出流。

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

新增该方法,传入outputTag,下图WithWindow构造方法是3.3新增的

    @PublicEvolving
        public WithWindow<T1, T2, KEY, W> sideOutputLateData(
                OutputTag<TaggedUnion<T1, T2>> outputTag) {
            return new WithWindow<>(
                    input1,
                    input2,
                    keySelector1,
                    keySelector2,
                    keyType,
                    windowAssigner,
                    trigger,
                    evictor,
                    allowedLateness,
                    outputTag
            );
        }

3.3新增WithWindow构造方法

新增属性laterDataOutputTag,用来保存构造函数中传入的laterOutputTag

   protected WithWindow(
                DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness,
                OutputTag<TaggedUnion<T1, T2>> laterOutputTag
        ) {
            this(
                    input1,
                    input2,
                    keySelector1,
                    keySelector2,
                    keyType,
                    windowAssigner,
                    trigger,
                    evictor,
                    allowedLateness);
            this.lateDataOutputTag = laterOutputTag;

        }

3.4修改apply方法

判断lateDataOutputTag 是否为null,如果不为null则调用windowedStream的sideOutputLateData设置迟到数据tag

 /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p>Note: This method's return type does not support setting an operator-specific
         * parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
         * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
         * parallelism.
         */
        public <T> DataStream<T> apply(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType =
                    new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector =
                    new UnionKeySelector<>(keySelector1, keySelector2);

            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
                    input1.map(new Input1Tagger<T1, T2>());
            taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
            taggedInput1.returns(unionType);

            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
                    input2.map(new Input2Tagger<T1, T2>());
            taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
            taggedInput2.returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

            // we explicitly create the keyed stream to manually pass the key type information in
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                            unionStream, unionKeySelector, keyType)
                            .window(windowAssigner);
            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }
            //判断lateDataOutputTag是否为NULL,如果不为NULL,则调用windowedStream
            //的sideOutputLateData方法,传入lateDataOutputTag让迟到数据输出到测输出流中
            if (lateDataOutputTag != null) {
                windowedStream.sideOutputLateData(lateDataOutputTag);
            }
            return windowedStream.apply(
                    new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

3.5开放UnionTypeInfo类的public权限

该类就是union之后的公共类的类型 oneType代表Input1流的数据类型,TwoType代表Input2流的数据类型

进入到flink-streaming-java所在磁盘目录输入以下命令编译

mvn clean install -DskipTests -Dfast

编译成功

3.7项目中查看maven是否已经刷新为最新代码

编译之后,可以看到导入的maven包已经有了新增的NewCoGroupedStreams类了,注意项目中的maven依赖中的flink版本,要与编译源码的版本一致,否则无法引入到。

4.测试

新建两个流,通过new NewCoGroupedStreams创建对象,在allowedLateness之后通过sideOutputLateData设置outputTag,然后通过with方法触发业务,with底层也是调用了apply,只不过他帮我们把返回的流转为了SingleOutputStreamOperator类型,可以用于提取测输出流。最后通过with.getSideOutput(outputTag)提取测输出流,最后通过map转换为 Tuple2<Integer, WaterSensor> 类型进行打印

    OutputTag<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>> outputTag = new OutputTag<>("later",
                new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));
        
        NewCoGroupedStreams<WaterSensor, WaterSensor> newCgroupStream = new NewCoGroupedStreams<>(ds1, ds2);
        
        SingleOutputStreamOperator<String> with = newCgroupStream.where((x) -> x.getId()).equalTo(x -> x.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(outputTag)
                .with(new RichCoGroupFunction<WaterSensor, WaterSensor, String>() {
                    @Override
                    public void coGroup(Iterable<WaterSensor> first, Iterable<WaterSensor> second, Collector<String> out) throws Exception {
                        out.collect(first.toString() + "======" + second.toString());
                    }
                });
        with.print();
        with.getSideOutput(outputTag).map(new MapFunction<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>, Tuple2<Integer, WaterSensor>>() {
            @Override
            public Tuple2<Integer, WaterSensor> map(NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor> value) throws Exception {
                return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());
            }
        }).print();

可以看到下图结果,ts代表时间戳,第一个打印是RichCoGroupFunction打印,代表关闭了1~10s的时间窗,后面我们在输入,WaterSensor{id='a', ts=1, vc=1} 就通过测输出流打印为二元组了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1542629.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

YiYi-Web项目介绍

YiYi-Web项目介绍 1. 简介2. 使用2.1 后端开发环境2.2 前端开发环境 3. 测试环境&#xff1a;4. 更新日志5. 打包情况6.项目截图 本项目前端是html、css、js、jQuery基础技术。 后端都是最新的SpringBoot技术&#xff0c;不分离版本&#xff0c; 是最基础的项目开发教程&#x…

Spark Map 和 FlatMap 的比较

Spark Map 和 FlatMap 的比较 本节将介绍Spark中map(func)和flatMap(func)两个函数的区别和基本使用。 函数原型 map(func) 将原数据的每个元素传给函数func进行格式化&#xff0c;返回一个新的分布式数据集。 flatMap(func) 跟map(func)类似&#xff0c;但是每个输入项和…

Flink GateWay、HiveServer2 和 hive on spark

Flink SQL Gateway简介 从官网的资料可以知道Flink SQL Gateway是一个服务&#xff0c;这个服务支持多个客户端并发的从远程提交任务。Flink SQL Gateway使任务的提交、元数据的查询、在线数据分析变得更简单。 Flink SQL Gateway的架构如下图&#xff0c;它由插件化的Endpoi…

孩子看书用白光还是暖白光好呢?五大护眼台灯推荐,必选!

孩子看书时&#xff0c;选择合适的光线是至关重要的。白光与暖白光各有优劣&#xff0c;白光亮度高、色彩还原性好&#xff0c;适合需要高度集中注意力和精细操作的场合&#xff1b;而暖白光则更加柔和、不刺眼&#xff0c;有助于营造轻松的阅读氛围。为了帮助家长更好地为孩子…

coco creator 3.x: 居中展示

coco creator 3.x 里面有很多需要居中处理&#xff0c;下面积累记录一些日常的实验。 下面一张图如下&#xff0c;在这张图里面创建了一个3x3的展示。但是这个展示过程并不在居中展示。而它的容器节点 恰恰就在中心点位置。所以在创建后&#xff0c;布局预制体元素并不能实现居…

Docker容器初始

华子目录 docker简介虚拟化技术硬件级虚拟化硬件级虚拟化历史操作系统虚拟化历史基于服务的云计算模式 什么是dockerDocker和传统虚拟化方式的不同之处为什么要使用docker&#xff1f;Docker 在如下几个方面具有较大的优势 对比传统虚拟机总结docker应用场景docker改变了什么 基…

iOS开发 - 转源码 - __weak问题解决

iOS开发 - 转源码 - __weak问题解决 在使用clang转换OC为C代码时&#xff0c;可能会遇到以下问题 cannot create __weak reference in file using manual reference 原因 __weak弱引用是需要runtime支持的&#xff0c;如果我们还只是使用静态编译&#xff0c;是无法正常转换的…

MultiArch与Ubuntu/Debian 的交叉编译

返回&#xff1a;OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇&#xff1a;基于ARM 的Linux系统的交叉编译 下一篇&#xff1a;MultiArch与Ubuntu/Debian 的交叉编译 警告&#xff1a; 本教程可能包含过时的信息。 什么是“MultiArch” OpenCV 可能…

家乡特色推荐系统设计与实现|SpringBoot+ Mysql+Java+ B/S结构(可运行源码+数据库+设计文档)

本项目包含可运行源码数据库LW&#xff0c;文末可获取本项目的所有资料。 推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 2024年56套包含java&#xff0c;…

flink join的分类

带窗口的join 下图是固定窗口,同样的还有滑动窗口和会话窗口join DataStream<Integer> orangeStream = ...; DataStream<Integer> greenStream = .

物联网应用技术中的stm32该怎么学,该从哪入手?

物联网应用技术中的stm32该怎么学&#xff0c;该从哪入手&#xff1f; STM32是只物联网中的一部分&#xff0c;单纯的学个STM32是没法满足物联网开发需求的&#xff0c;实际产品开发过程中会考虑成本等多种因素选择合适的方案&#xff0c;比如使用单片机还是stm32或是更高端的芯…

上位机图像处理和嵌入式模块部署(qmacvisual点线测量)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 上面一篇文章&#xff0c;我们完成了直线的拟合操作。在实际场景中&#xff0c;拟合之后更多地是需要进行长度的测量。既然是测量&#xff0c;那么…

Linux:传输层和UDP的传输原理

文章目录 端口号端口号理解端口号的划分 netstatUDP协议 之前结束了对于应用层的理解&#xff0c;那么从本篇开始往后&#xff0c;将会深入到传输层当中进行理解&#xff0c;尝试打通整个网络的协议栈 从对于之前的理解来说&#xff0c;在应用层涉及到的知识体系是相当庞大的&…

力扣450 删除二叉搜索树中的节点 Java版本

文章目录 题目描述思路代码 题目描述 给定一个二叉搜索树的根节点 root 和一个值 key&#xff0c;删除二叉搜索树中的 key 对应的节点&#xff0c;并保证二叉搜索树的性质不变。返回二叉搜索树&#xff08;有可能被更新&#xff09;的根节点的引用。 一般来说&#xff0c;删除…

力扣100热题[哈希]:最长连续序列

原题&#xff1a;128. 最长连续序列 题解&#xff1a; 官方题解&#xff1a;. - 力扣&#xff08;LeetCode&#xff09;题解&#xff0c;最长连续序列 &#xff1a;哈希表 官方解题思路是先去重&#xff0c;然后判断模板长度的数值是否存在&#xff0c;存在就刷新&#xff0c…

算法打卡day15

今日任务&#xff1a; 1&#xff09;110.平衡二叉树 2&#xff09;257. 二叉树的所有路径 3&#xff09;404.左叶子之和 110.平衡二叉树 题目链接&#xff1a;110. 平衡二叉树 - 力扣&#xff08;LeetCode&#xff09; 给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树…

爬楼梯C语言

方法一&#xff1a;动态规划 int climbStairs(int n) {int f[100] {0};f[0] 0;f[1] 1;f[2] 2;for(int i 3;i<n;i)f[i] f[i-1] f[i-2];//可能是从i-1阶爬上第i阶&#xff0c;也有可能是从i-2阶 return f[n]; } 方法二&#xff1a;滚动数组 int climbStairs(int n){int…

DCDC60V80V100V转12V5V1A2A降压恒压芯片 惠海半导体原厂

H4020是一种内置40V耐压MOS&#xff0c;并且能够实现精确恒压以及恒流的同步降压型DC-DC转换器&#xff1b;支持1A持续输出电流输出电压可调&#xff0c;最大可支持 100%占空比&#xff1b;通过调节 FB 端口的分压电阻&#xff0c;可以输出 2.5V到 24V 的稳定电压 。H4020 具有…

【Science】:配位不饱和 Al3+ 中心作为 γ-Al2O3 上铂活性相催化剂的结合位点

在许多非均相催化剂中&#xff0c;金属颗粒与其氧化物载体的相互作用可以改变金属的电子属性&#xff0c;并且在确定颗粒形态和保持分散性方面发挥关键作用。我们结合使用了超高磁场、固态魔角旋转核磁共振光谱技术和高角环形暗场扫描透射电子显微镜技术&#xff0c;配合密度泛…

array go 语言的数组 /切片

内存地址通过& package mainimport "fmt"func main() {var arr [2][3]int16fmt.Println(arr)fmt.Printf("arr的地址是: %p \n", &arr)fmt.Printf("arr[0]的地址是 %p \n", &arr[0])fmt.Printf("arr[0][0]的地址是 %p \n"…