【入门Flink】- 10基于时间的双流联合(join)

news2025/1/16 3:59:50

统计固定时间内两条流数据的匹配情况,需要自定义来实现——可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。

窗口联结(Window Join)

一段时间的双流合并

定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

stream1.join(stream2)
    .where(<KeySelector>) // stream1 的 keyBy
    .equalTo(<KeySelector>) // stream2 的 keyBy
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
                .fromElements(
                        Tuple2.of("a", 1),
                        Tuple2.of("a", 2),
                        Tuple2.of("b", 3),
                        Tuple2.of("c", 4)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String,
                                        Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
                .fromElements(
                        Tuple3.of("a", 1, 1),
                        Tuple3.of("a", 11, 1),
                        Tuple3.of("b", 2, 1),
                        Tuple3.of("b", 12, 1),
                        Tuple3.of("c", 14, 1),
                        Tuple3.of("d", 15, 1)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple3<String,
                                        Integer, Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        DataStream<String> join = ds1.join(ds2)
                .where(r1 -> r1.f0) // ds1 的keyby
                .equalTo(r2 -> r2.f0) // ds2 的keyby
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     * 关联上的数据,调用 join 方法
                     * @param first ds1 的数据
                     * @param second ds2 的数据
                     */
                    @Override
                    public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
                        return first + "<----->" + second;
                    }
                });
        join.print();
        env.execute();
    }
}

输出:

image-20231112153403293

window join:

  1. 两条流落在同一个时间窗口范围内才能匹配
  2. 根据 keyBy 的 key,来进行匹配关联
  3. 只能拿到匹配上的数据,类似有固定时间范围的inner join

间隔联结(Interval Join)

存在如下场景:两条流匹配的两个数据有可能刚好“卡在”窗口边缘两侧,窗口内就都没有匹配了,可以使用“间隔联结”(interval join)来解决。

原理

给定两个时间点,分别叫作间隔的“上界”(upperBound)“下界”(lowerBound);可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp +upperBound], 即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:这段时间作为可以匹配另一条流数据的“窗口”范围。

匹配的条件为:

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

image-20231112154002415

stream1
.keyBy(<KeySelector>)
 // KeyedStream 调用   
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){
    @Override
    public void processElement(Integer left, Integer right,Context ctx, Collector<String> out){
    	out.collect(left + "," + right);
    }
});

处理迟到数据,可以使用左右侧输出流

完整代码:

public class IntervalJoinWithLateDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
                .socketTextStream("hadoop102", 7777)
                .map((MapFunction<String, Tuple2<String, Integer>>) value -> {
                    String[] datas = value.split(",");
                    return Tuple2.of(datas[0], Integer.valueOf(datas[1]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String,
                                        Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );


        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
                .socketTextStream("hadoop102", 8888)
                .map((MapFunction<String, Tuple3<String, Integer, Integer>>) value -> {
                    String[] datas = value.split(",");
                    return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        /**
         * 【Interval join】
         * 1、只支持事件时间
         * 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后
         * 3、process 中,只能处理 join 上的数据
         * 4、两条流关联后的 watermark,以两条流中最小的为准
         * 5、如果 当前数据的事件时间 < 当前的 watermark,就是迟到数据,主流的 process 不处理
         * => between 后,可以指定将 左流 或 右流的迟到数据放入侧输出流
         * */
        //1. 分别做 keyby,key 其实就是关联条件
        KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);
        KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);
        //2. 调用 interval join
        // 左右测输出流迟到标签
        OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
        OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));
        SingleOutputStreamOperator<String> process = ks1
                .intervalJoin(ks2)
                .between(Time.seconds(-2), Time.seconds(2)) // 指定上下界
                .sideOutputLeftLateData(ks1LateTag) // 将ks1的迟到数据,放入侧输出流
                .sideOutputRightLateData(ks2LateTag) // 将ks2的迟到数据,放入侧输出流
                .process(
                        new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                            /**
                             * 两条流的数据匹配上,才会调用这个方法
                             * @param left ks1 的数据
                             * @param right ks2 的数据
                             * @param ctx 上下文
                             * @param out 采集器
                             */
                            @Override
                            public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
                                // 进入这个方法,是关联上的数据
                                out.collect(left + "<------>" + right);
                            }
                        });
        process.print("主流");
        process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
        process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");
        env.execute();

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1200251.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Acer宏碁Aspire A715-75G笔记本工厂模式原厂Windows10预装OEM系统2004带恢复功能

下载链接&#xff1a;https://pan.baidu.com/s/1nJFd25lElc1VAPf_RqSDYA?pwdd05h 提取码&#xff1a;d05h 原装出厂系统自带所有驱动、Office办公软件、出厂主题壁纸、系统属性Acer宏基专属的LOGO标志、 Acer Care Center、Quick Access等预装程序 所需要工具&#xff1a…

Linux文件系统(1)

Linux文件系统(1) &#x1f4df;作者主页&#xff1a;慢热的陕西人 &#x1f334;专栏链接&#xff1a;Linux &#x1f4e3;欢迎各位大佬&#x1f44d;点赞&#x1f525;关注&#x1f693;收藏&#xff0c;&#x1f349;留言 本博客主要内容从系统层面重新认识我们的文件系统 文…

关于值传递和引用传递的问题记录

目录 1. 问题概述 1.1 测试 1.2 结果 2. ArrayList和Arrays.ArrayList 1. 问题概述 最近忙着写论文很久没更新了&#xff0c;趁现在有时间简单记录一下最近遇到的一个坑。 对于Java中的List<>类型的对象&#xff0c;按我以前理解是引用传递&#xff0c;但有一点要注…

Java12新增特性

前言 前面的文章&#xff0c;我们对Java9、Java10、Java11的特性进行了介绍&#xff0c;对应的文章如下 Java9新增特性 Java10新增特性 Java11新增特性 今天我们来介绍一下Java12版本的新增特性 版本介绍 Java 12是Java SE的第12个版本&#xff0c;于2019年3月19日发布。这个…

fastANI-基因组平均核酸一致性(ANI)计算

文章目录 简介安装使用Many to Man-使用基因组路径作为输入One to One 结果其他参数说明可视化两个基因组之间的保守区域并行化 简介 FastANI 是为快速计算全基因组平均核苷酸同一性&#xff08;Average Nucleotide Identity&#xff0c;ANI&#xff09;而开发的&#xff0c;无…

【第七章】软件设计师 之 程序设计语言与语言程序处理程序基础

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 1、前言 正规式 2、编译过程 编译型&…

深度解析找不到msvcp120.dll相关问题以及解决方法

​在计算机使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“msvcp120.dll丢失”。这个错误通常会导致某些应用程序无法正常运行&#xff0c;给用户带来很大的困扰。那么&#xff0c;如何解决msvcp120.dll丢失的问题呢&#xff1f;本文将为大家介绍…

xml schema中的sequence的含义

https://www.w3.org/TR/xmlschema-1/#element-sequence xml schema中的sequence的含义&#xff1a;包含的元素必须按规定的顺序出现。通过属性maxOccurs和minOccurs可以定义最多、最少出现的次数。最多可以定义不限制次数&#xff0c;最少可以定义0次。默认是最少出现1次&…

pyTorch Hub 系列#4:PGAN — GAN 模型

一、主题描述 2014 年生成对抗网络的诞生及其对任意数据分布进行有效建模的能力席卷了计算机视觉界。两人范例的简单性和推理时令人惊讶的快速样本生成是使 GAN 成为现实世界中实际应用的理想选择的两个主要因素。 然而&#xff0c;在它们出现后的很长一段时间内&#xff0c;GA…

02:2440---时钟体系

目录 一:时钟控制 1:基本概念 2:时钟结构图 3:结构图分析 4:总线 5:寄存器 A:FCLK--MPLLCON B:HCLK和PCLK--CLKDIVN C:注意 二:上电复位 1:上电复位 2:时钟选择 三:代码 一:时钟控制 1:基本概念 S3C2440A中的时钟控制逻辑可以产生所需的时钟信号&#xff0c;包括C…

node插件express(路由)的插件使用(二)——cookie 和 session的基本使用区别

文章目录 前言一、express 框架中的 cookie0、cookie的介绍和作用1. 设置cookie2.删除cookie3.获取cookie&#xff08;1&#xff09;安装cookie-parser&#xff08;2&#xff09;导入cookie-parser&#xff08;3&#xff09;注册中间件&#xff08;4&#xff09;获取cookie&…

类和对象(3):拷贝构造函数

引入&#xff1a; class Stack { public:Stack(int capacity 3){_a (int*)malloc(sizeof(int) * capacity);if (nullptr _a){perror("malloc");exit(-1);}_top 0;_capacity capacity;}~Stack(){free(_a);_top _capacity 0;_a nullptr;}private:int* _a;int _…

终止进程,GPU显存仍被占用 | kill -9彻底杀死进程

问题描述&#xff1a;在Linux终端把进程终止后&#xff0c;发现显存没有被释放出来&#xff01; 显示所有进程 ps aux|grep python杀死单个进程 kill -9 PID杀死多个进程 kill -9 PID PID PID...结果如下&#xff0c;显存已经被释放出来了&#xff01;

【操作系统】1.1 操作系统的基础概念、功能以及特性

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

gpt支持json格式的数据返回(response_format: ‘json_object‘)

Api.h5.chatCreateChatCompletion({model: gpt-3.5-turbo-1106,token: sk-f4fe8b67-fcbe-46fd-8cc9-fd1dac5d6d59,messages: [{role: user,content:使用json格式返回十二生肖&#xff0c;包含中文名和英文名&#xff0c;[{id:"1", enName:"", cnName: &quo…

Python 多进程多线程

多任务 并发&#xff1a;在一段时间内交替执行多个任务 并行&#xff1a;在一段时间内同事一起执行多个任务 进程 Process 进程&#xff1a;一个程序运行在系统之上&#xff0c; 便称这个程序喂一个运行进程&#xff0c;并分配进程ID方便系统管理。操作系统进行资源分配和调…

Android开发之apk瘦身计划

为什么apk越来越大&#xff1f; 1.项目不断发展&#xff0c;功能越多&#xff0c;代码量增加的同时&#xff0c;资源文件也在不断的增多。 2.app支持的主流dpi越来越多&#xff0c;如ldpi、mdpi、hdpi、xh xxh xxxh等等&#xff0c;间接导致资源增多。 3.引入的第三方sdk或开…

Linux进程空间地址

程序地址空间回顾 问题引入 ---------------明天再写0.0

Linux提取RPM包文件

在讲解如何从 RPM 包中提取文件之前&#xff0c;先来系统学习一下 cpio 命令。cpio 命令用于从归档包中存入和读取文件&#xff0c;换句话说&#xff0c;cpio 命令可以从归档包中提取文件&#xff08;或目录&#xff09;&#xff0c;也可以将文件&#xff08;或目录&#xff09…