Flink SQL之Temporal Joins

news2025/1/11 2:45:34

1.Temporal Joins(时态JOIN)

时态表是一个随时间演变的表,在Flink中也称为动态表。

时态表中的行与一个或多个时态周期相关联,并且所有Flink表都是时态的(动态的)。时态表包含一个或多个版本化的表快照,它可以是跟踪更改的更改历史表(例如数据库更改日志,包含所有快照),也可以是具体化更改的维表(例如包含最新快照的数据库表)。

时态表可以分为版本表普通表

  • 版本表:如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog (如mysql binlog)可以定义成版本表,版本表内的数据始终不会自动清理,只能通过upsert触发。
  • 普通表:如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 、redis的表可以定义成普通表。

特征:

  • 只支持INNER JOIN、LEFT JOIN
  • 只有左流触发更新
  • 输出流保留时间属性

时态join类型

  • JOIN Lookup
  • JOIN 版本表

2.语法

使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据(即关联维表当前最新的状态)

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1

注意:Temporary table(临时表)和Temporal table(时态表)是两个不同概念。Temporary table是临时的表对象,属于当前Session,随着Session的结束而消失,该表不属于Catalog和DB。

3.JOIN Lookup

概念:Lookup join是针对于由作业流表触发,关联右侧维表来补全数据的场景 。默认情况下,在流表有数据变更,都会触发维表查询(可以通过设置维表是否缓存,来减轻查询压力),由于不保存状态,因此对内存占用较小。

特性:

  • 左侧为流表、右侧为维表
  • 流表需要指定处理时间
  • 具备lookup能力的外部系统
  • 自己实现LookupTableSource接口connector

示例:

-- 维表  
CREATE TEMPORARY TABLE users (
  `user_id` STRING,
  `name` STRING,
  `age` INT,
  `gmt_time` TIMESTAMP(3)
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/flink',
   'table-name' = 'user',
   'username' = 'root',
   'password' = '123456'
);

-- 流表
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    user_id    STRING,
    order_time  TIMESTAMP(3),
    proctime AS PROCTIME()) WITH (
  'connector' = 'kafka',
  'topic' = 'order',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

 -- 使用FOR SYSTEM_TIME AS OF table1.proc_time表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据(即关联维表当前最新的状态)
select orders.order_id,orders.price,orders.order_time,c.name  
FROM orders 
 LEFT  JOIN users FOR SYSTEM_TIME AS OF orders.proctime  AS c 
ON orders.user_id = c.user_id;

4.JOIN 版本表

版本表是可以追溯数据历史版本的表,如:数据库changelog,数据源有:mysql-binlog、kafka-upsert、oracle-cdc等。需要具备事件时间和主键两个属性。

特性:

与双流join不同,尽管构建端发生了更改,但之前的临时表结果不会受到影响。与间隔join相比,时态表join没有定义join记录的时间窗口。左侧表的记录总是在时间属性指定的时间与右侧表的版本连接。因此,构建端的行可能任意陈旧。

  • 左侧为流表、右侧为版本表
  • 两侧表都需要指定事件时间
  • 版本表的数据会持续增加

满足场景:

  • 左输入表为流表,右输入表为版本表( Changelog 动态表,即 Upsert、Retract 数据流,而非 Append 数据流)
  • 两侧表都需要设置watermark,版本表需要设置主键,主键必须包含在 JOIN 等值条件中
  • 版本表发生变更,不会触发查询结果输出,会根据主键更新临时表

示例:

用户在下订单时,需要根据订单时间的汇率,计算订单金额,其中下单是以不同的货币,需要将他输出到特定货币(CNY)

# 订单表(普通表)
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time   timestamp(3),
    WATERMARK FOR order_time AS order_time
) WITH (
  'connector' = 'kafka',
  'topic' = 'order',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'orders2ConsumerGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);


-- 汇率表 (版本表)

CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH  (
  'connector' = 'kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'currencyRatesGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'debezium-json',
  'debezium-json.schema-include' = 'true'
);

select o.order_id,o.price,o.order_time,c.currency  
FROM orders AS o 
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time  AS c 
ON o.currency = c.currency;

-- 汇率表(版本视图)

CREATE TABLE ratesHistory (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time
) WITH  (
  'connector' = 'kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'currencyRatesGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE VIEW versionedRates AS
SELECT currency,conversion_rate,update_time 
FROM (
	SELECT * ,ROW_NUMBER() OVER(PARTITION BY currency
                              ORDER BY update_time DESC) AS rowNum 
  FROM ratesHistory)
where rowNum=1;

select o.order_id,o.price,o.order_time,c.currency  
FROM orders AS o 
LEFT JOIN versionedRates FOR SYSTEM_TIME AS OF o.order_time  AS c 
ON o.currency = c.currency;

总结:因为实际项目中一些表可能没有事件时间或主键,因此JOIN版本表使用的相对少一些。而在关联维表时JOIN Lookup会经常使用。需要注意维表设置缓存时间,需要根据具体业务可接受延迟时间确定缓存时间。当维表经常变化时,取到的缓存数据会有误差,需要根据具体的业务场景确定是否使用该种方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/701472.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Oracle数据库中的包的介绍及示例

Oracle的包是一种封装存储过程&#xff0c;函数&#xff0c;变量和游标等代码对象的方法。包可以视为一组相关的程序单元&#xff0c;它们共享相同的命名空间和存储空间。包可以被看做是一个数据库程序库&#xff0c;它包含一个或多个程序单元&#xff0c;这些单元可以被视为一…

图片加载失败捕获上报及处理

图片加载失败捕获上报及处理 前端页面中加载最多的静态资源之一就是图片了&#xff0c;当出现图片加载失败时&#xff0c;非常影响用户体验。这时候我们就需要对图片是否成功加载进行判断&#xff0c;并对图片加载失败进行处理。 图片加载监听 单个捕获 HTML中的img标签可以…

vue 组件简单实例及传参交互

前言:vue 可以比较灵活的使用 html的片段&#xff0c;并将html的片段进行数据隔离&#xff0c;参数也可以互相传递&#xff0c;组件与组件之间也可以进行数据的交互 合理的使用组件可以避免重复代码或者很方便的调用第三方组件库 vue组件 简单实例组件传参实际应用父子组件交互…

SpringBoot2+Vue2实战(四)进行组件内容拆分及路由实现

一、拆分 新建包&#xff1a; Aside和Header都是组件 User为视图 Aside.vue&#xff1a; <template><el-menu :default-openeds"[1, 3]" style"min-height: 100%; overflow-x: hidden"background-color"rgb(48, 65, 86)"text-color…

I2C总线协议详解

I2C总线物理拓扑结构 I2C总线物理拓扑图 I2C 总线在物理连接上非常简单&#xff0c;分别由SDA(串行数据线)和SCL(串行时钟线)及上拉电阻组成。通信原理是通过对SCL和SDA线高低电平时序的控制&#xff0c;来 产生I2C总线协议所需要的信号进行数据的传递。在总线空闲状态时&#…

linux下使用pyqt5的QMediaPlayer制作简易播放器(存在进度条、前进、后退、暂停、打开、播放等操作)

目录 1. 问题2. 解决3. 效果 1. 问题 关于pyqt5的qmediaplayer模块制作简易播放器&#xff0c;网上很多博客都是win下的&#xff0c;放在linux系统会出问题&#xff1b;另外&#xff0c;部分博客缺少文件&#xff0c;无法直接使用。 参考博客《基于pyqt5的QMediaPlayer实现视…

android studio git使用

pull代码 我们从远程仓库拉取代码时&#xff0c;一般有下面的两个选项 当使用Android Studio拉取代码时&#xff0c;有两种常见的选项&#xff1a;合并&#xff08;merge&#xff09;传入的更改到当前分支和变基&#xff08;rebase&#xff09;。 合并&#xff08;Merge&…

【Django学习】(九)自定义校验器_单字段_多字段校验_模型序列化器类

之前学习了视图集里运用序列化器进行序列化和反序列化操作&#xff0c;定义序列化器类&#xff0c;需要继承Serializer基类或者Serializer的子类&#xff1b; 这次我们将学习如何自定义校验器、如何进行单字段或者多字段校验&#xff0c;最后初步使用模型序列化器 一、自定义…

OPPO手机无网络可支持3km通信,对讲机将被淘汰?

OPPO在2019世界移动通信大会即MWC上海发布了一项新技术&#xff0c;被称为“无网络通信技术”&#xff08;MeshTalk&#xff09;。这是OPPO自主研发的一项去中心化通讯技术&#xff0c;能够在没有蜂窝网络、Wi-Fi、蓝牙的情况下&#xff0c;实现3km内通讯。 无网通信技术 不过…

Java教程-Java异常抛出

在Java中&#xff0c;异常允许我们编写高质量的代码&#xff0c;可以在编译时检查错误而不是在运行时&#xff0c;并且我们可以创建自定义异常&#xff0c;使代码的恢复和调试更加容易。 Java的throw关键字 Java的throw关键字用于显式地抛出异常。 我们指定要抛出的异常对象。异…

华为breeze ideploy部署流程示例

https://www.cnblogs.com/withfeel/p/11640877.html 华为breeze ideploy部署流程示例

Canal对MySQL进行数据迁移

Canal简单介绍 贴个官方网址&#xff1a;阿里巴巴MySQL binlog 增量订阅&消费组件 架构图&#xff1a; 基于日志增量订阅和消费的业务包括 数据库镜像数据库实时备份索引构建和实时维护(拆分异构索引、倒排索引等)业务 cache 刷新带业务逻辑的增量数据处理 当前的 cana…

CSS知识点汇总(十一)--回流重绘

文章目录 怎么理解回流跟重绘&#xff1f;什么场景下会触发&#xff1f;1、回流和重绘是什么&#xff1f;2、如何触发回流和重绘3、如何避免回流和重绘的发生 怎么理解回流跟重绘&#xff1f;什么场景下会触发&#xff1f; 1、回流和重绘是什么&#xff1f; 在HTML中&#xf…

二十、socket套接字编程(二)——TCP

文章目录 一、TCP套接字接口&#xff08;一&#xff09;inet_aton &#xff08;和inet_addr一样&#xff0c;换一种方式而已&#xff09;&#xff08;二&#xff09;socket()&#xff08;三&#xff09;bind()&#xff08;四&#xff09;listen()&#xff08;五&#xff09;acc…

ASP.NET Core MVC -- 入门

先决条件&#xff08;开发配置二选一&#xff09;&#xff1a; 带有 ASP.NET 和 Web 开发工作负载的Visual Studio Visual Studio Code Visual Studio Code用于 Visual Studio Code 的 C#&#xff08;最新版本&#xff09;.NET 7.0 SDK 创建Web应用 visual studio ctrl F5 …

云原生之深入解析Kubernetes网络流量的流转路径

一、Kubernetes 网络要求 Kubernetes 网络模型定义了一组基本规则&#xff1a; 在不使用网络地址转换 (NAT) 的情况下&#xff0c;集群中的 Pod 能够与任意其他 Pod 进行通信&#xff1b; 在不使用网络地址转换 (NAT) 的情况下&#xff0c;在集群节点上运行的程序能与同一节点…

王道《计算机网络》思维导图汇总

第一章 1.1.1 概念与功能 1.1.2 组成与分类 1.1.3 标准化工作及相关组织 1.1.4 性能指标 速率 带宽 吞吐量 时延 时延带宽积 往返时延RTT 利用率 1.2.1 分层结构、协议、接口、服务 1.2.2 OSI参考模型 应用层 表示层 会话层 传输层 网络层 数据链路层 物理层 1.2.4 TCP/IP 参…

内核角度看IO模型

聊聊Netty那些事儿之从内核角度看IO模型 网络包接收流程 当网络数据帧通过网络传输到达网卡时&#xff0c;网卡会将网络数据帧通过DMA的方式放到环形缓冲区RingBuffer中。RingBuffer是网卡在启动的时候分配和初始化的环形缓冲队列。当RingBuffer满的时候&#xff0c;新来的数据…

【AUTOSAR】BMS开发实际项目讲解(十三)----电池管理系统碰撞安全功能和SFR

SG-BMS-7 : BMS系统应避免碰撞保护功能异常引起的安全事故&#xff08;ASIL A&#xff09; 功能框图&#xff08;SG-BMS-7&#xff09; 功能组件说明 功能组件ID 功能组件名称 描述 ASIL等级 FSC-FC-05 Relay Drive 驱动继电器开启和关断 ASIL A FSC-FC-11 Detection …

【vue】可选链运算符(?.)和空值合并运算符(??):

文章目录 一、问题一:二、问题二:三、使用:【1】空值合并运算符&#xff08;??&#xff09;【2】可选链运算符&#xff08;?.&#xff09; 一、问题一: http://www.codebaoku.com/question/question-sd-1010000042870944.html //1、npm安装 npm install babel/plugin-propo…