[尚硅谷 flink] 基于时间的合流——双流联结(Join)

news2024/11/27 16:53:16

文章目录

      • 8.1 窗口联结(Window Join)
      • 8.2 **间隔联结(Interval Join)**

8.1 窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

package org.example.watermark;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
            .fromElements(
                Tuple2.of("a", 1),
                Tuple2.of("a", 2),
                Tuple2.of("b", 3),
                Tuple2.of("c", 4)
            )
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple2<String, Integer>>forMonotonousTimestamps()
                    .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
            );


        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
            .fromElements(
                Tuple3.of("a", 1, 1),
                Tuple3.of("a", 11, 1),
                Tuple3.of("b", 2, 1),
                Tuple3.of("b", 12, 1),
                Tuple3.of("c", 14, 1),
                Tuple3.of("d", 15, 1)
            )
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
                    .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
            );

        // TODO window join
        // 1. 落在同一个时间窗口范围内才能匹配
        // 2. 根据keyby的key,来进行匹配关联
        // 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
        DataStream<String> join = ds1.join(ds2)
            .where(r1 -> r1.f0)  // ds1的keyby
            .equalTo(r2 -> r2.f0) // ds2的keyby
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                /**
                 * 关联上的数据,调用join方法
                 * @param first  ds1的数据
                 * @param second ds2的数据
                 * @return
                 * @throws Exception
                 */
                @Override
                public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
                    return first + "<----->" + second;
                }
            });

        join.print();

        env.execute();
    }
}

其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。

8.2 间隔联结(Interval Join)

  • 间隔联结的原理

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
image.png
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。

所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。

stream1
    .keyBy(<KeySelector>)
    .intervalJoin(stream2.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });
  • 处理迟到数据
 //2. 调用 interval join
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));

SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2)
        .between(Time.seconds(-2), Time.seconds(2))
        .sideOutputLeftLateData(ks1LateTag)  // 将 ks1的迟到数据,放入侧输出流
        .sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流
        .process(
                new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     * 两条流的数据匹配上,才会调用这个方法
                     * @param left  ks1的数据
                     * @param right ks2的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
                        // 进入这个方法,是关联上的数据
                        out.collect(left + "<------>" + right);
                    }
                });

process.print("主流");
process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");



env.execute();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1577247.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟主机WordPress网站安装教程

一般的企业官网&#xff0c;简站WordPress小编都推荐使用虚拟主机&#xff0c;用虚拟主机搭建一般的WordPress企业官网足够用了。最主要的好处是使用虚拟主机可以省去了主机维护的成本。 下面是以简站WordPress主题在虚拟主机搭建企业官网为例子&#xff0c;写的一个教程&…

无人机详细操作方法:

不同型号的无人机操作方法会有所区别&#xff0c;以云卓无人机为例&#xff0c;为你介绍其操作方法&#xff1a; 1. 打开机臂&#xff0c;安装护架、红外避障头&#xff0c;盖上后盖&#xff1b; 2. 打开飞机和遥控器&#xff0c;将两个油门的外八节进行校准&#xff1b; 3.…

Echarts-实现地图并轮播地图信息

目录 ./map-geojson/jinhua.json./CenterMap.vue./center.vue 使用地图组件效果 ./map-geojson/jinhua.json {"type":"FeatureCollection","features":[{"type":"Feature","properties":{"adcode":330…

ZLMediaKit ubantu 下编译

1、获取代码 #国内用户推荐从同步镜像网站gitee下载 git clone --depth 1 https://gitee.com/xia-chu/ZLMediaKit cd ZLMediaKit #千万不要忘记执行这句命令 git submodule update --init二、依赖库 Debian系(包括ubuntu&#xff09;系统下安装依赖的方法&#xff1a; #除了…

什么是 SD NAND?

什么是 CS 创世 SD NAND 呢&#xff1f;很多的朋友一直想知道这个问题。 什么是 CS 创世 SD NAND 呢&#xff1f;很多的朋友一直想知道这个问题。今天我们雷龙也精心准备了 SD NAND 的一个介绍。其实很多工程师朋友对 CS 创世 SD NAND 有很多称呼。比如&#xff1a;贴片式 T 卡…

Ethernet 汇总

Ethernet系统 硬件最小系统 CPU:可以是复杂的芯片,也可以是小的单片机DMA:用于减轻CPU负担,搬运数据系统Memory<->FIFOMAC:可以集成在芯片里面,用于CPU和PHY之间的通信MII:接口用于MAC和PHY的通信,包括控制MDIO和数据DataPHY:模拟器件,最底层,数据收发源头软件…

AI智慧医疗:探索机器学习在医疗保健中的应用与进展

&#x1f9d1; 作者简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟,欢迎关注。提供嵌入式方向的学习指导…

TEMU平台重要合规内容!关于 RSL Report 铅镉/ RSL Phthalate邻苯

TEMU平台重要合规内容&#xff01;关于 RSL Report 铅镉 / RSL Phthalate 邻苯 的资质认证&#xff01; 欧盟对进口和在欧盟内销售的电子产品都有明确的安全、环保、健康等相关标准要求。 对于可能存在的产品有害物质&#xff0c;欧盟是明确禁止销售的。 因此拼多多跨境平台…

停用net stop mysql 服务名无效。

net stop mysql服务名 错误截图 解决方案 1. 按下 Win R 键&#xff0c;然后输入 services.msc 并按下 Enter 键&#xff0c;打开服务管理器 &#xff0c;查找与 MySQL 相关的服务 重新运行 net stop mysql服务名

python学习笔记——类

1. 类和对象**** 类、类属性、类方法不需要实例化就可以直接访问 实例相关&#xff0c;如实例属性、实例方法必须实例化后才可以访问 1.1. 类、类属性、实例属性、私有属性**** 1.1.1. 定义**** 类就是拥有相同属性和功能对象的集合 动物&#xff1a;猫、狗、鸡 人类&…

PCA算法(Principal Component Analysis)揭秘

经典PCA算法 PCA算法的应用包括降维、有损数据压缩、特征抽取、数据可视化等。目前PCA算法有两个通用定义&#xff0c;能殊途同归&#xff0c;得到相同的结果。一方面&#xff0c;我们可以用正交投影来定义PCA&#xff0c;即将数据投影到更低维的线性子空间&#xff0c;也被称…

【LAMMPS学习】八、基本知识的讨论(1.2)可视化 LAMMPS 快照

8. 基本知识的讨论 此部分描述了如何使用 LAMMPS 为用户和开发人员执行各种任务。术语表页面还列出了 MD 术语&#xff0c;以及相应 LAMMPS 手册页的链接。 LAMMPS 源代码分发的 examples 目录中包含的示例输入脚本以及示例脚本页面上突出显示的示例输入脚本还展示了如何设置和…

【单】Unity _RPG项目中的问题

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a; ⭐…

CVE-2023-2928 DedeCMS 文件包含漏洞getshell 漏洞分析

DedeCMS&#xff08;也称为织梦CMS&#xff09;是一款基于PHPMySQL的开源内容管理系统。 在 DedeCMS 5.7.106 及之前的版本中发现一个漏洞。它已被宣布为关键。受此漏洞影响的是未知功能的文件uploads/dede/article_allowurl_edit.php。对参数 allurls 的操作会导致代码注入。…

用three.js 制作鸡你太美游戏——只因大逃杀

看到网上有好多关于坤坤的游戏&#xff0c;自己突发奇想&#xff0c;用前端的three.js制作一款同人游戏&#xff0c;因是初学three.js&#xff0c;开发的游戏内容相对较少&#xff0c;但作为一个真正的man必须得玩一下 链接&#xff1a; 只因大逃杀https://www.jaron.top/app…

【ZBrush】制作章鱼练习 01——头部

目录 本篇效果 步骤 一、准备工作 二、制作头部外形 三、制作眼睛 本篇效果 步骤 一、准备工作 我们需要制作的模型如下 首先创建一个球体 点击编辑对象 点击“生成 多边形网格物体” 选择“MatCap Gray”材质&#xff0c;颜色设置为白色 要打开“Mrgb”通道和“绘制”选…

MySQL学习笔记------多表查询

目录 多表关系 一对多 多对多 一对一 多表查询 概述 分类 内连接&#xff08;交集&#xff09; 隐式内连接 显式内连接 ​编辑 外连接&#xff08;from后为左表&#xff0c;join后为右表&#xff09; 左外连接 右外连接 自连接 联合查询&#xff08;union&#…

怎么寻找免费3D模型网站?

无论是建筑设计、产品设计还是动画制作&#xff0c;3D模型都发挥着不可或缺的作用。然而&#xff0c;寻找高质量且免费的3D模型并非易事。那么怎么寻找免费3D模型网站? 1.网络搜索&#xff1a;在搜索引擎中输入“免费3D模型网站”或类似的关键词&#xff0c;可以得到一系列相关…

【GEE】下载研究区的Landsat8去云清晰影像

如果你要绘制研究区概况或者是做一个产品的比较 需要整个研究区的Landsat8影像的话 你可以使用下面代码 // 定义研究区域 var table ee.FeatureCollection("projects/ee-1261423515/assets/EHEimage/EHE"); function rmCloud (image){var qa image.select(pixel_…

顺子日期(StringBuffer)

题目 public class Main {static int[] date new int[] {0,31,28,31,30,31,30,31,31,30,31,30,31};public static boolean res(StringBuffer s) {String ss s.toString();//yyrrfor(int i0;i<2;i) {int x Integer.parseInt(s.charAt(i)"");int y Integer.par…