Flink--9、双流联结(窗口联结、间隔联结)

news2025/1/22 16:12:38

在这里插入图片描述
                       星光下的赶路人star的个人主页

                      我还有改变的可能性,一想起这点,我就心潮澎湃

文章目录

  • 1、基于时间的合流——双流联结(Join)
    • 1.1 窗口联结(Window Join)
    • 1.2 间隔联结(Interval Join)

1、基于时间的合流——双流联结(Join)

可以发现,根据某个key合并两条流,与关系型数据库中的表的join操作非常近似。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。

不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。

1.1 窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
1、窗口联结的调用
窗口联结在代码中的实现,首先需要调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的key;然后通过.window()开窗口,并调用apply()方法传入联结窗口函数进行处理计算。通用调用形式如下:

stream1.join(stream2)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)

上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的key;而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。
这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。
而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。
传入的JoinFunction也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。

其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:

SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id; 

这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。

2、窗口联结实例

public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
                .fromElements(
                        Tuple2.of("a", 1),
                        Tuple2.of("a", 2),
                        Tuple2.of("b", 3),
                        Tuple2.of("c", 4)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );


        SingleOutputStreamOperator<Tuple3<String, Integer,Integer>> ds2 = env
                .fromElements(
                        Tuple3.of("a", 1,1),
                        Tuple3.of("a", 11,1),
                        Tuple3.of("b", 2,1),
                        Tuple3.of("b", 12,1),
                        Tuple3.of("c", 14,1),
                        Tuple3.of("d", 15,1)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple3<String, Integer,Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        // TODO window join
        // 1. 落在同一个时间窗口范围内才能匹配
        // 2. 根据keyby的key,来进行匹配关联
        // 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
        DataStream<String> join = ds1.join(ds2)
                .where(r1 -> r1.f0)  // ds1的keyby
                .equalTo(r2 -> r2.f0) // ds2的keyby
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     * 关联上的数据,调用join方法
                     * @param first  ds1的数据
                     * @param second ds2的数据
                     * @return
                     * @throws Exception
                     */
                    @Override
                    public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
                        return first + "<----->" + second;
                    }
                });

        join.print();

        env.execute();
    }

运行截图:
在这里插入图片描述

1.2 间隔联结(Interval Join)

在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。
为了应对这样的需求,Flink提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

1、间隔联结的原理
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义
如下图所示,我们可以清楚地看到间隔联结的方式:
在这里插入图片描述
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。

2、间隔联结的调用
间隔联结在代码中,是基于KeyedStream的联结(join)操作。DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。
通用调用形式如下:

stream1
    .keyBy(<KeySelector>)
    .intervalJoin(stream2.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });

可以看到,抽象类ProcessJoinFunction就像是ProcessFunction和JoinFunction的结合,内部同样有一个抽象方法.processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中left指的就是第一条流中的数据,right则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的.processElement()方法,经处理转换之后输出结果。
3、间隔连接实例
案例需求:在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。
(1)代码实现:正常使用

public class IntervalJoinDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
                .fromElements(
                        Tuple2.of("a", 1),
                        Tuple2.of("a", 2),
                        Tuple2.of("b", 3),
                        Tuple2.of("c", 4)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );


        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
                .fromElements(
                        Tuple3.of("a", 1, 1),
                        Tuple3.of("a", 11, 1),
                        Tuple3.of("b", 2, 1),
                        Tuple3.of("b", 12, 1),
                        Tuple3.of("c", 14, 1),
                        Tuple3.of("d", 15, 1)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        // TODO interval join
        //1. 分别做keyby,key其实就是关联条件
        KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);
        KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);

        //2. 调用 interval join
        ks1.intervalJoin(ks2)
                .between(Time.seconds(-2), Time.seconds(2))
                .process(
                        new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                            /**
                             * 两条流的数据匹配上,才会调用这个方法
                             * @param left  ks1的数据
                             * @param right ks2的数据
                             * @param ctx   上下文
                             * @param out   采集器
                             * @throws Exception
                             */
                            @Override
                            public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
                                // 进入这个方法,是关联上的数据
                                out.collect(left + "<------>" + right);
                            }
                        })
                .print();



        env.execute();
    }
}

(2)代码实现,处理迟到的数据

public class IntervalJoinWithLateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
                .socketTextStream("hadoop102", 7777)
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return Tuple2.of(datas[0], Integer.valueOf(datas[1]));
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );


        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
                .socketTextStream("hadoop102", 8888)
                .map(new MapFunction<String, Tuple3<String, Integer, Integer>>() {
                    @Override
                    public Tuple3<String, Integer, Integer> map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        /**
         * TODO Interval join
         * 1、只支持事件时间
         * 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后
         * 3、process中,只能处理 join上的数据
         * 4、两条流关联后的watermark,以两条流中最小的为准
         * 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理
         *  => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流
         */

        //1. 分别做keyby,key其实就是关联条件
        KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);
        KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);

        //2. 调用 interval join
        OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
        OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));
        SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2)
                .between(Time.seconds(-2), Time.seconds(2))
                .sideOutputLeftLateData(ks1LateTag)  // 将 ks1的迟到数据,放入侧输出流
                .sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流
                .process(
                        new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                            /**
                             * 两条流的数据匹配上,才会调用这个方法
                             * @param left  ks1的数据
                             * @param right ks2的数据
                             * @param ctx   上下文
                             * @param out   采集器
                             * @throws Exception
                             */
                            @Override
                            public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
                                // 进入这个方法,是关联上的数据
                                out.collect(left + "<------>" + right);
                            }
                        });

        process.print("主流");
        process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
        process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");



        env.execute();
    }
}

在这里插入图片描述
                      您的支持是我创作的无限动力

在这里插入图片描述
                      希望我能为您的未来尽绵薄之力

在这里插入图片描述
                      如有错误,谢谢指正;若有收获,谢谢赞美

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1063399.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据恢复篇】浅谈FTK Imager数据恢复功能

【数据恢复篇】浅谈FTK Imager数据恢复功能 日常取证工作中&#xff0c;常用FTK Imager制作磁盘镜像、挂载镜像等&#xff0c;但FTK Imager的数据恢复功能也是非常强大的&#xff0c;某些数据的恢复效果不输专业的数据恢复软件&#xff0c;甚至略胜一筹—【蘇小沐】 文章目录 …

项目设计:YOLOv5目标检测+机构光相机(intel d455和d435i)测距

1.介绍 1.1 Intel D455 Intel D455 是一款基于结构光&#xff08;Structured Light&#xff09;技术的深度相机。 与ToF相机不同&#xff0c;结构光相机使用另一种方法来获取物体的深度信息。它通过投射可视光谱中的红外结构光图案&#xff0c;然后从被拍摄物体表面反射回来…

【C++】:类和对象(2)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux的基础知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数…

投资理财:利率下行时代应该怎样存钱?

大家好&#xff0c;我是财富智星&#xff0c;今天跟大家分享一下当下利率下行的时代&#xff0c;钱应该怎样存&#xff0c;存哪里的问题。 一、 银行利率下行 在过去的三十年里&#xff0c;您已经逐渐适应了不断下降的利率&#xff0c;从10%到现在的1.65%。而在未来&#xff0c…

vue3 中使用echarts图表——柱状图

柱状图是比较常用的图形结构&#xff0c;所以我先收集一些精美的柱状图 一、柱状图&#xff1a;设置圆角和颜色 <template><div class"box" ref"chartDom"></div> </template> <script setup> import { ref, onMounted } fr…

C中volatile总结

在CPU处理过程中&#xff0c;需要将内存中的数据载入到寄存器中才能计算&#xff0c;所以可能涉及到一个问题&#xff0c;如果内存中的数据被更改了&#xff0c;但是寄存器还是使用的旧数据&#xff0c;这样就会造成数据的不同步。 一、volatile关键字的作用 使用volatile关键…

中级工程师职称评审中业绩材料具体有哪些呢?甘建二告诉你

十年职称甘建二&#xff0c;一门心思只聊工程师职称评定。 关注我&#xff0c;轻松拿到职称。 中级工程师职称评审中需要准备的评审材料很多&#xff0c;业绩、论文、技术总结、评审表、社保、单位等各类材料&#xff0c;其中难倒大家的就是业绩&#xff0c;业绩到底指的是什么…

【初识Linux】Linux环境配置、Linux的基本指令 一

Linux基本指令一 一、学习前提(环境配置&#xff09;①安装Xshell和云服务器推荐②Xshell用途如下图③打开Xshell 二、 Linux基本指令①whoami和who指令②pwd、ls、ls -l三个指令ls指令扩充 ③cd指令前提了解有了上面的认识&#xff0c;我们就可以开始cd指令的学习了 ④tree指令…

从零开始的C++(五)

1.类和对象的补充 当对象是const修饰的常量时&#xff0c;形参中的this是隐含的&#xff0c;那么该如何写函数才能传常量对象呢&#xff1f;如果还是按照正常的方式写&#xff0c;则会出现实参是const修饰的&#xff0c;形参没有&#xff0c;出现了权限的扩大&#xff0c;无法…

基于卷积神经网络的法线贴图生成器

在本文中&#xff0c;我们将学习如何训练卷积神经网络从彩色图像生成法线贴图。 推荐&#xff1a;用 NSDT编辑器 快速搭建可编程3D场景 1、数据和工具 我们正着手训练神经网络从彩色图像生成法线贴图。 我们将以“成对”的方式做到这一点。 这意味着我们将显示相应图像的网络对…

程序人生 / 散文分享 / 生活感悟——【追光的日子】《爷爷的12本日历》,若你也共情,欢迎在评论区分享你的故事、观点、感悟和思考!

在一切变好之前,我们总要经历一些不开心的日子,这段日子也许很长,也许只是一觉醒来。有时候,选择快乐,更需要勇气。 🎯作者主页: 追光者♂🔥 🌸个人简介: 💖[1] 计算机专业硕士研究生💖 🌿[2] 2023年城市之星领跑者TOP1(哈尔滨)🌿 🌟[3]…

Spring Cloud OpenFeign 性能优化的4个方法

OpenFeign 是 Spring 官方推出的一种声明式服务调用和负载均衡组件。它的出现就是为了替代已经进入停更维护状态的 Netflix Feign&#xff0c;是目前微服务间请求的常用通讯组件。 1.超时设置 OpenFeign 底层依赖Ribbon 框架&#xff0c;并且使用了 Ribbon 的请求连接超时时间…

打表找规律与分析判断:ARC144C

https://atcoder.jp/contests/arc144/tasks/arc144_c?langen 一开始我猜的结论是前后 k k k 个预处理&#xff0c;中间贪心。 通过打表&#xff1a; 可以发现是前面 2 k 2k 2k 连续块直接暴配&#xff0c;最后一段再用我想的贪心。 究其原因&#xff0c;其实是我们本质上…

安卓开发中遇到的奇奇怪怪的问题(四)

好久没有写这个系列了&#xff0c;感觉还是需要把日常开发中遇到的问题做一个记录总结&#xff0c;因为有些问题我当时遇到时&#xff0c;搜都搜不到&#xff0c;只能慢慢摸索。帮助他人的同时也能给自己留个备忘录。话不多说&#xff0c;凡是近一年的奇怪问题&#xff0c;我想…

Rust中的枚举和模式匹配

专栏简介&#xff1a;本专栏作为Rust语言的入门级的文章&#xff0c;目的是为了分享关于Rust语言的编程技巧和知识。对于Rust语言&#xff0c;虽然历史没有C、和python历史悠远&#xff0c;但是它的优点可以说是非常的多&#xff0c;既继承了C运行速度&#xff0c;还拥有了Java…

24 mysql all 查询

前言 这里主要是 探究一下 explain $sql 中各个 type 诸如 const, ref, range, index, all 的查询的影响, 以及一个初步的效率的判断 这里会调试源码来看一下 各个类型的查询 需要 lookUp 的记录 以及 相关的差异 此系列文章建议从 mysql const 查询 开始看 测试表结构…

【网络】路由器和交换机的区别

&#x1f341; 博主 "开着拖拉机回家"带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341; 希望本文能够给您带来一定的帮助&#x1…

小谈设计模式(21)—迭代器模式

小谈设计模式&#xff08;21&#xff09;—迭代器模式 专栏介绍专栏地址专栏介绍 迭代器模式对象分析聚合对象&#xff08;Aggregate&#xff09;迭代器对象&#xff08;Iterator&#xff09; Java程序示例程序分析12 优缺点分析优点简化了聚合对象的接口统一的遍历方式增加了代…

20秒基于Chat GPT完成工作中的小程序

1. 写在前面 GPT自从去年爆发以来&#xff0c;各大公司在大模型方面持续发力&#xff0c;行业大模型也如雨后春笋一般发展迅速&#xff0c;日常工作中比较多的应用场景还是问答模式&#xff0c;作为写程序的辅助也偶尔使用。今天看到一篇翻译的博客“我用 ChatGPT&#xff0c;…

更新Xcode 版本后运行项目出现错误 Unable to boot the Simulator 解决方法

错误截图 出现 Unable to boot the Simulator 错误原因很多&#xff0c;以下方法不一定都适用&#xff0c;我是通过以下方法解决的 打开命令终端输入以下命令&#xff0c;可能需要你输入开机密码 sudo rm -rf ~/Library/Developer/CoreSimulator/Caches