大数据Flink(八十八):Interval Join(时间区间 Join)

news2024/11/24 15:46:17

文章目录

Interval Join(时间区间 Join)


Interval Join(时间区间 Join)

Interval Join 定义(支持 Batch\Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。

应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以可以理解 Interval Join 就是用于消灭回撤流的。

Interval Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):

  • Inner Interval Join:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]
  • Left Interval Join:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null],如果右流 State 中的数据过期了,就直接从 State 中删除。
  • Right Interval Join:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反
  • Full Interval Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 -[null, R])

可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。

使用示例:

间隔联接需要至少一个等联接谓词和在两侧限制时间的联接条件。可以通过比较两个输入表中相同类型的时间属性(即处理时间或事件时间)的两个适当的范围谓词(<, <=, >=, >)BETWEEN谓词或单个相等谓词来定义这种条件。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

如果订单在收到订单后四个小时内发货,则上面的示例会将所有订单与其相应的发货合并在一起。

实际案例:还是刚刚的案例,曝光日志关联点击日志筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 小时之内的点击,并且补充点击的扩展参数(show inner interval click):

下面为 Inner Interval Join:

 

Flink SQL> CREATE TABLE show_log_table (
    log_id BIGINT,
    show_params STRING,
    `timestamp` bigint,
    row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
    watermark for row_time as row_time 
) WITH (
  'connector' = 'socket',
  'hostname' = 'node1',        
  'port' = '8888',
  'format' = 'csv'
);

Flink SQL> CREATE TABLE click_log_table (
  log_id BIGINT,
  click_params     STRING,
  `timestamp` bigint,
  row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
  watermark for row_time as row_time 
)
WITH (
  'connector' = 'socket',
  'hostname' = 'node1',        
  'port' = '9999',
  'format' = 'csv'
);

Flink SQL> SELECT
    show_log_table.log_id as s_id,
    show_log_table.show_params as s_params,
    click_log_table.log_id as c_id,
    click_log_table.click_params as c_params
FROM show_log_table 
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '10' SECOND AND click_log_table.row_time;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800 ->2022-07-20 15:06:40

开启netcat的9999端口,输入数据:

1,zhangsan,1658300805 ->2022-07-20 15:06:45

输出结果如下:

s_id                       s_params                 c_id                       c_params

 1                         hadoop                    1                         zhangsan

9999端口,继续输入数据:

1,zhangsan,1658300811 -> 2022-07-20 15:06:51

输出结果没有反应,因为这个时间:2022-07-20 15:06:51超过了时间区间下限。

如果是 Left Interval Join:

Flink SQL> SELECT
    show_log_table.log_id as s_id,
    show_log_table.show_params as s_params,
    click_log_table.log_id as c_id,
    click_log_table.click_params as c_params
FROM show_log_table 
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time + INTERVAL '5' SECOND;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800		->2022-07-20 15:06:40

开启netcat的9999端口,输入数据:

1,zhangsan,1658300805	->2022-07-20 15:06:45

输出结果如下:

s_id                       s_params                 c_id                       c_params
  1                         hadoop                    1                       zhangsan

8888端口,继续输入数据:

1,hadoop,1658300801		->2022-07-20 15:06:41
1,hadoop,1658300811		->2022-07-20 15:06:51

输出结果如下:

s_id                       s_params                 c_id                       c_params
 1                         hadoop                    1                       zhangsan
 1                         hadoop                    1                       zhangsan

2022-07-20 15:06:51这条数据不会有任何的输出,因为已经超过了右表的边界。

如果是 Full Interval Join:

Flink SQL> SELECT
    show_log_table.log_id as s_id,
    show_log_table.show_params as s_params,
    click_log_table.log_id as c_id,
    click_log_table.click_params as c_params
FROM show_log_table 
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time + INTERVAL '5' SECOND;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800		->2022-07-20 15:06:40
1,hadoop,1658300801		->2022-07-20 15:06:41

 开启netcat的9999端口,输入数据:

1,zhangsan,1658300805	->2022-07-20 15:06:45
1,zhangsan,1658300811	->2022-07-20 15:06:51

输出结果如下:

 s_id                       s_params                 c_id                       c_params
   1                         hadoop                    1                       zhangs
 1                         hadoop                    1                       zhangsan
  • 关于 Interval Join 的注意事项:

实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1050973.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kafka伪集群部署,使用docker环境拷贝模式

线上启动容器的方式是复制容器的运行环境出来&#xff0c;然后进行运行脚本的形式 1&#xff1a;在home/kafka目录下创建如下目录 2&#xff1a;复制kafka1容器内的数据/bitnami/kafka/data&#xff0c;直接放在1992_data里面&#xff0c;同理,复制kafka2容器内的数据/bitnami/…

GitHub配置SSH key

GitHub配置SSH key Git配置信息并生成密钥 设置用户名和密码 设置用户名 git config --global user.name "用户名" 设置邮箱 git confir --global user.email "邮箱" 生成密钥 ssh-keygen -t rsa -C "邮箱" 查看密钥 到密钥所保存的位置 复…

cgroup限制cpu使用率

写一段代码&#xff0c;如下所示&#xff0c;可以看到是单核拉满情况 #include <stdio.h> #include <pthread.h>int main() { int i 0; for(;;)i; return 0; }运行起来&#xff0c;通过top命令可以看到cpu使用率大致是100%&#xff0c;如下图所示&#xff1a; …

Python相关知识点

读取zip文件 import zipfilelistAipInfo zipfile.ZipFile(xxx.zip, r) print(listZipInfo.namelist()) # 以列表列出所有压缩文件列出所有被压缩的文件&#xff0c;以及文件名、文件大小和压缩结果大小。 for info in listZipInfo.infolist():print(info.filename, info.…

数据分析三剑客之一:Numpy详解及实战

1 NumPy介绍 NumPy 软件包是Python生态系统中数据分析、机器学习和科学计算的主力军。它极大地简化了向量和矩阵的操作处理。Python的一些主要软件包&#xff08;如 scikit-learn、SciPy、pandas 和 tensorflow&#xff09;都以 NumPy 作为其架构的基础部分。除了能对数值数据…

VMware下的ubuntu虚拟机,实现虚拟机与本地硬盘间的文件互传

本次安装vmware tools工具&#xff0c;以实现二者间的文件互传。 1、打开VMware软件&#xff0c;运行Ubuntu系统虚拟机 安装过程需在ubuntu虚拟机启动的情况下&#xff0c;才能进行安装&#xff1b; 2、安装VMware Tools 在VM主菜单栏中&#xff0c;点击 “虚拟机&#xff0…

【图论C++】Floyd算法(多源最短路径长 及 完整路径)

>>>竞赛算法 /*** file * author jUicE_g2R(qq:3406291309)————彬(bin-必应)* 一个某双流一大学通信与信息专业大二在读 * * brief 一直在算法竞赛学习的路上* * copyright 2023.9* COPYRIGHT 原创技术笔记&#xff…

【AI视野·今日NLP 自然语言处理论文速览 第四十四期】Fri, 29 Sep 2023

AI视野今日CS.NLP 自然语言处理论文速览 Fri, 29 Sep 2023 Totally 45 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computation and Language Papers MindShift: Leveraging Large Language Models for Mental-States-Based Problematic Smartphone Use Interve…

【单片机】11-步进电机和直流电机

1.直流电机 1.什么是电机 电能转换为动能 2.常见电机 &#xff08;1&#xff09;交流电机【大功率】&#xff1a;两相【200W左右】&#xff0c;三相【1000W左右】 &#xff08;2&#xff09;直流电机【小功率】&#xff1a;永磁【真正的磁铁】&#xff0c;励磁【电磁铁】 &…

Mysql 安装搭建

文章目录 Mysql 搭建一、安装包下载二、创建用户组用户和修改权限三、配置my.cnf Mysql 搭建 一、安装包下载 mysql 下载地址&#xff1a;https://downloads.mysql.com/archives/community/ 这里有所有的mysql的版本&#xff0c;下载自己需要的版本&#xff0c;我们这里下载 …

知识储备--基础算法篇-回溯法

1.回溯法介绍 1.1递归和回溯 每一个递归都包含回溯&#xff0c;回溯是一种纯暴力搜索方法。每个回溯法都可以抽象为一种N叉树。树的宽度为子集的个数&#xff0c;深度为递归返回的条件。二叉树中的递归都会有回溯算法&#xff0c;只不过有些题目用到了&#xff0c;有些没有用…

计算机竞赛 深度学习实现行人重识别 - python opencv yolo Reid

文章目录 0 前言1 课题背景2 效果展示3 行人检测4 行人重识别5 其他工具6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习的行人重识别算法研究与实现 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c…

R语言进行孟德尔随机化+meta分析(2)----基于R和stata

目前不少文章用到了孟德尔随机化meta分析&#xff0c;在上一章咱们简单介绍了一下meta分析的基础知识。咱们今天来介绍一篇11分文章&#xff0c;由文章看看孟德尔随机化meta分析如何进行&#xff0c;文章的题目是&#xff1a;Appraising the causal role of smoking in multipl…

大数据Doris(二):Doris原理篇

文章目录 Doris原理篇 一、Doris 特点 1、支持标准SQL接口 2、列式存储引擎

百度统计配置详细图文教程包含siteId、百度统计AccessToken、百度统计代码获取步骤教程

一、前言 很多网友开发者都不知道百度统计siteId、百度统计token怎么获取&#xff0c;在网上找的教程都是几年前老的教程&#xff0c;因此给大家出一期详细百度统计siteId、百度统计token、百度统计代码获取详细步骤教程。 二、登录到百度统计 1.1 登录到百度统计官网 使用个…

【利用冒泡排序的思想模拟实现qsort函数】

1.qsort函数 1.1qsort函数的介绍 资源来源于cplusplus网站 1.2qsort函数的主要功能 对数组的元素进行排序 对数组中由 指向的元素进行排序&#xff0c;每个元素字节长&#xff0c;使用该函数确定顺序。 此函数使用的排序算法通过调用指定的函数来比较元素对&#xff0c;并将指…

测试用例的编写(面试常问)

作者&#xff1a;爱塔居 专栏&#xff1a;软件测试 作者简介&#xff1a;不断总结&#xff0c;才能变得更好~踩过的坑&#xff0c;不能再踩~ 文章简介&#xff1a;常见的几个测试用例。 一、淘宝购物车 二、登录页面 三、三角形测试用例 abc结果346普通三角形333等边三角形334…

文件I/O与标准I/O

如果不知道inode&#xff0c;请看这篇文章inode 我们知道当打开一个文件时&#xff0c;OS会先使用inode编号在磁盘文件系统里面去寻找这个文件&#xff0c;找到以后根据文件的属性为其创建一个内核层面的结构体来描述这个文件&#xff0c;该结构体里面含有文件的属性信息&#…

我的创作纪念日 不忘初心,砥砺前行

机缘 本来我只是记录一些自己平时安装各种软件或者组件的教程&#xff0c;以及记录平时遇到的一些bug。 没想到一些教程收到了各位同学的喜爱。 收获 这篇VMware虚拟机安装Linux教程(超详细) 深受大家喜爱。写这篇文章的初衷一是为了记录&#xff0c;二是为了分享。自己一步…

操作符详解——(比特课件)

操作符怎么搞&#xff1f;没办法掌握基础知识就可以 ** 需要课件完整版的可以关注私信我&#xff01;&#xff01;&#xff01; 你的支持就是我更新的最大动力 **