Flink Sql:四种Join方式详解(基于flink1.15官方文档)

news2024/11/27 3:41:20

JOINs

flink sql主要有四种连接方式,分别是Regular Joins、Interval Joins、Temporal Joins、lookup join

1、Regular Joins(常规连接 )

这种连接方式和hive sql中的join是一样的,包括inner join,left join,right join,full join

1、指定数据源建立students表
CREATE TABLE students (
  id STRING,
  name STRING,
  age INT,
  sex STRING,
  clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置为最新生成的数据
  'format' = 'csv' -- 指定数据的格式
);

2、kafka生产students表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班


3、创建关联表scores
CREATE TABLE scores (
  sid STRING,   
  cid STRING,     --学科id
  score INT     
) WITH (
  'connector' = 'kafka',
  'topic' = 'scores', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);

4、kafka生产scores数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,56
1500100002,1000001,139
1500100002,1000002,102
1500100004,1000001,42
1500100004,1000002,142

-- inner jion   两边数据都不为null的才会关联
select 
a.id,a.name,b.sid,b.score
from 
students as a
inner join
scores as b
on a.id=b.sid;

-- left join/right join    保证左边/右边数据的完整性
select 
a.id,a.name,b.sid,b.score
from 
students as a
right join
scores as b
on a.id=b.sid;

-- full join       保证两边数据的完整性
select 
a.id,a.name,b.sid,b.score
from 
students as a
full join
scores as b
on a.id=b.sid;



-- 注意:
-- 常规连接,会将两个表的数据一直保存在状态中,时间长了,状态会越来越大,导致任务执行失败,通常在批处理中使用,因为批处理没有状态这个概念。

为了避免状态过大可能会导致的任务失败问题,我们可以设置状态有效期
-- 状态有效期,状态在flink中保存的时间,但是如果sql中除了关联操作还有聚合这样也需要将数据保存在状态中的操作,状态有效期设置的太短可能会让聚合这样的操作失败,设置的太长延迟也会增加。所以,状态保留多久需要根据实际业务分析
SET 'table.exec.state.ttl' = '20000';
设置该参数后,那么只有在20秒内到达的数据才会被保存到状态中进行关联。

inner join结果:

left join 结果:

right join结果:

full join结果:

2、Interval Joins(间隔连接

Interval Joins:在一段时间内关联

对于流式查询,与常规连接相比,间隔连接仅支持具有时间属性的追加表。由于时间属性是拟单调递增的,因此 Flink 可以从其状态中删除旧值,而不会影响结果的正确性。

这种方式可以变相弥补Regular Joins中时间长了状态过大的问题。

CREATE TABLE students_proctime (
    id STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING,
    proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班

CREATE TABLE scores_proctime (
    sid STRING,
    cid STRING,
    score INT,
    proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'scores', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,56
1500100002,1000001,139
1500100002,1000002,102
1500100004,1000001,42
1500100004,1000002,142


select a.id,a.name,b.sid,b.score from 
students_proctime a, scores_proctime b
where a.id=b.sid
-- a表的时间需要在b表时间10秒内或b表的时间需要在a表时间10秒内
and (
	a.proctime BETWEEN b.proctime - INTERVAL '10' SECOND AND b.proctime
    or b.proctime BETWEEN a.proctime - INTERVAL '10' SECOND AND a.proctime
);

3、Temporal Joins(时态连接)

这种关联方式是专门用来关联时态表的。

  • Temporal Joins(时态连接)是在流式计算或数据处理中,对两个或多个随时间变化的表(也称为动态表或时态表)进行连接的操作。这些表包含随时间变化的数据,并且行与一个或多个时态周期相关联。

在我们生活中最常见的时态表就是汇率表,汇率随着时间变化而变化。

 

案例:

例如,假设我们有一张订单表,每张订单的价格都采用不同的货币。为了正确地将此表标准化为单一货币(如美元),每张订单都需要与下订单时相应的货币兑换率相结合。

1、创建订单表
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,    --币种
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);

2、订单表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
o_001,1,EUR,2024-06-06 12:00:00
o_002,100,EUR,2024-06-06 12:00:07
o_003,200,EUR,2024-06-06 12:00:16
o_004,10,EUR,2024-06-06 12:00:21
o_005,20,EUR,2024-06-06 12:00:25

3、创建汇率表
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED -- 主键,区分不同的汇率
) WITH (
  'connector' = 'kafka',
  'topic' = 'currency_rates1', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置
  'format' = 'canal-json' -- 指定数据的格式
);

4、向汇率表中添加数据
insert into currency_rates
values
('EUR',0.12,TIMESTAMP'2024-06-06 12:00:00'),
('EUR',0.11,TIMESTAMP'2024-06-06 12:00:09'),
('EUR',0.15,TIMESTAMP'2024-06-06 12:00:17'),
('EUR',0.14,TIMESTAMP'2024-06-06 12:00:23');

kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates

-- 使用常规关联方式关联时态表只能关联到最新的数据
select 
a.price,a.order_time,b.conversion_rate,b.update_time
from 
orders as a
join
currency_rates as b
on a.currency=b.currency;

-- 时态表join
-- FOR SYSTEM_TIME AS OF a.order_time: 使用a表的时间到b表中查询对应时间段的数据
select 
a.price,a.order_time,b.conversion_rate,b.update_time
from 
orders as a
join
currency_rates FOR SYSTEM_TIME AS OF a.order_time as b 
on a.currency=b.currency;

常规join结果:

时态join结果:

4、lookup join(查找连接

Lookup Join,也称为维表 Join,通常用于从外部系统查询的数据表。连接要求一个表具有处理时间属性,另一个表由查找源连接器支持。

具体来说:

lookup join用于流表(动态表)关联维度表

流表:动态表

维度表:不怎么变化的变,维度表的数据一般可以放在hdfs或者mysql等外部数据源


扩展:流表、事实表、维度表

-- 流表(动态表)
1、流表的数据来源通常是实时数据流,这些数据流可以来自各种数据源,如 Kafka、RabbitMQ、Kinesis 等。Flink可以通过数据源连接器(Source Connectors)将这些实时数据流接入到 Flink 系统中
2、与传统数据库中的表不同,流表的行是动态生成的,随着数据流的持续产生而不断增加


-- 维度表
1、主要提供数据的分析角度,包含了描述业务环境的属性信息,如时间、地理、产品等。
2、维度表:通常比较宽(包含多个属性列),但行数相对较少,因为维度表中的每一行通常代表一个具体的业务实体或类别,如一个商品、一个客户、一个日期等。
3、维度表与事实表之间通过外键相关联,共同构成了星型模型或雪花模型。事实表中的外键用于与维度表中的主键相匹配,从而提供数据的上下文和分类信息。
4、维度表存储的是对数据的描述性信息,这些信息通常不随时间变化,或者变化不频繁。例如,商品的品牌、型号、颜色等属性一旦确定后很少会发生变化。但在某些情况下,如新产品上市或促销活动,可能需要更新维度表以添加新的维度成员。

-- 事实表
1、存储了实际的数据度量值,如销售额、订单数量等。事实表是数据分析的核心,包含了所有用于分析的数据指标。
2、通常比较窄(包含较少的列),但行数非常多,因为事实表中的每一行通常代表一个具体的事件或交易,如一个订单、一次点击等。
3、事实表存储的是度量数据(即指标),这些数据会随时间变化,并且经常需要被汇总和分析。例如,销售额、订单数量、点击量等指标会随着业务活动的进行而不断更新。
4、事实表的数据更新频率通常较高,因为事实数据会随着业务活动的进行而不断产生。例如,每当有新的订单产生时,都需要在事实表中插入一条新的记录。

 

1、创建分数表
CREATE TABLE scores (
    sid INT,
    cid STRING,
    score INT,
    proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'scores', -- 指定topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  'properties.group.id' = 'testGroup', -- 指定消费者组
  'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  'format' = 'csv' -- 指定数据的格式
);

2、生产分数表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000003,137

3、建立学生表,我们将学生表当作维度表放在mysql中
CREATE TABLE students_test (
    id INT,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/bigdata29',
    'table-name' = 'students_test',
    'username' ='root',
    'password' = '123456',
    'lookup.cache.max-rows' = '1000', -- 最大缓存行数
    'lookup.cache.ttl' ='10000' -- 缓存过期时间
);


学生表数据
1500100001,施笑槐,22,女,文科六班



-- 使用常规关联方式
-- 维表的数据只在任务启动的时候读取一次,后面不再实时读取,
-- 只能关联到任务启动时读取的数据
-- 一旦mysql中的学生表更新数据,但是关联的学生表数据却是任务启动时从mysql读取的,这就有错误了,lookup join可以解决该问题。
select a.sid,a.score,b.id,b.name from
scores as a
left join
students_test  as b
on a.sid=b.id;

-- lookup join
-- 当流表每来一条数据时,使用关联字段到维表的数据源中查询
-- 优点:实时更新数据源,准确性高
-- 缺点:每一次都需要查询数据库,性能会降低
select a.sid,a.score,b.id,b.name from
scores as a
left join
students_test FOR SYSTEM_TIME AS OF a.proctime as b
on a.sid=b.id;

此时我们修改更新mysql中的学生表数据

修改之前

修改后:

常规关联结果:

look up关联结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1796828.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

240508Scala笔记

240508Scala笔记 Scala概述: SCala是Java的拓展,在Java的基础上又拓展了一些语法,例如: 输出Hello World println("HelloWorld")System.out.println("Hello Scala from Java") 上面两段代码都可以输出内容. package chapter01 ​ /*object: 关键字,声明…

python_将二维列表转换成HTML格式_邮件相关

python_将二维列表转换成HTML_邮件相关 data[["理想","2"],["理想2","3"]]def list_to_html_table(data):"""将二维列表转换为HTML表格格式的字符串。参数:data -- 二维列表,表示表格的数据。返回:一个字符…

手机相册的排列方式探讨

不论你是不是程序员,你一定留意过一个问题:相册 App 基本都将图片裁剪成了居中的 1:1 正方形。那么手机相册 App,为什么要将图片切割成 1:1 正方形,然后以网格排列?是行业标准吗? 自适应图片宽度的图库&a…

在不受支持的 Mac 上安装 macOS Sonoma (OpenCore Legacy Patcher v1.5.0)

在不受支持的 Mac 上安装 macOS Sonoma (OpenCore Legacy Patcher v1.5.0) Install macOS on unsupported Macs 请访问原文链接:https://sysin.org/blog/install-macos-on-unsupported-mac/,查看最新版。原创作品,转载请保留出处。 作者主…

[AIGC] 详解Mockito - 简单易学的Java单元测试框架

在Java的世界中, 单元测试是一项非常重要的任务. Mockito作为一个强大灵活的mock框架,可以帮助我们有效的编写和管理我们的单元测试. 了解并掌握Mockito的使用对于提高我们的开发效率和保证我们的软件质量有着巨大的帮助. 文章目录 什么是Mockito?Mockito的核心API…

【2024】Java,jdk环境变量配置(Windows)

只写了需要配置的环境变量 注:从JDK1.5开始,配置Java环境变量时,不再需要配置CLASSPATH,只需要配置JAVA_HOME和Path 操作流程 jdk8(或者你有自定义的jre文件夹)执行步骤1、2、4jdk8以上(17或2…

内网快速传输工具

常见的有LANDrop,支持多种设备,如电脑、pad、手机等等之间互传。但本文介绍的这款是很小的电脑间互传工具。 特点是非常的快速,文件很小,不用安装解压就可用。

太阳能航空障碍灯在航空安全发挥什么作用_鼎跃安全

随着我国经济的快速发展,空域已经成为经济发展的重要领域。航空运输、空中旅游、无人机物流、飞行汽车等经济活动为空域经济发展提供了巨大潜力。然而,空域安全作为空域经济发展的关键因素,受到了广泛关注。 随着空域经济活动的多样化和密集…

【阿里云】在云服务器ECS 安装MySQL、本地远程连接或宝塔连接(手动部署)

目录 一、安装MySQL 二、配置MySQL 三、远程访问MySQL数据库 四、Navicat本地连接远程MySQL 五、宝塔连接MySQL 如果你是使用宝塔安装的MySQL请绕过,以下是通过命令行模式(手动部署)进行安装、配置及运行。 安装:MySQL8.0 …

LabVIEW软件开发人员的核心能力是什么

LabVIEW软件开发人员的核心能力包括以下几个方面: 1. LabVIEW编程技能 熟练掌握LabVIEW编程语言:包括虚拟仪器(VI)的创建、数据流编程、图形化编程技巧等。 模块化编程:能够设计和实现模块化的代码结构,便…

Mysql执行一条语句都有哪些操作

Mysql的执行流程 MySQL 的架构共分为两层:Server 层和存储引擎层, Server 层负责建立连接、分析和执行 SQL。MySQL 大多数的核心功能模块都在这实现,主要包括连接器,查询缓存、解析器、预处理器、优化器、执行器等。另外&#xf…

redsystems教程的基本使用之重置密码(忘记密码解决方法)

前言: 相信很多人都有疑惑,要是我不记得密码怎么办?如果你登录了,点击更改密码后,还是要你填写登录密码才能修改。为了解决这问题,博主通过了钻研成功搞出来了!!!&#…

C语言指针介绍其一

指针是什么? 指针是内存中一个最小单元(一个字节)的编号,也就是地址,每一个单元都有属于自己的地址。 平时我们说的指针一般说的是指针变量,用来存放内存地址的变量就叫指针变量。 指针变量 int main()…

Django学习二:配置mysql,创建model实例,自动创建数据库表,对mysql数据库表已经创建好的进行直接操作和实验。

文章目录 前言一、项目初始化搭建1、创建项目:test_models_django2、创建应用app01 二、配置mysql三、创建model实例,自动创建数据库表1、创建对象User类2、执行命令 四、思考问题(****)1、是否会生成新表呢(答案报错&…

Paddle实现单目标检测

单目标检测 单目标检测(Single Object Detection)是人工智能领域中的一个重要研究方向,旨在通过计算机视觉技术,识别和定位图像中的特定目标物体。单目标检测可以应用于各种场景,如智能监控、自动驾驶、医疗影像分析等…

03_初识Spring Cloud Gateway

文章目录 一、网关简介1.1 网关提出的背景1.2 网关在微服务中的位置1.3 网关的技术选型1.4 补充 二、Spring Cloud Gateway的简介2.1 核心概念:路由(Route)2.2 核心概念:断言(Predicate)2.3 核心概念&#…

Python怎么发邮件不会被拦?如何设置信息?

Python发邮件的注意事项?Python发邮件需要哪些库? 使用Python发送电子邮件是一个常见的需求。然而,有时候邮件可能会被拦截,要确保发送的邮件不被拦截,需要一些技巧和注意事项。AokSend将介绍如何使用Python发送邮件&…

stm32中如何实现EXTI线 0 ~ 15与对应IO口的配置呢?

STM32的EXTI控制器支持19 个外部中断/ 事件请求。每个中断设有状态位,每个中断/ 事件都有独立的触发和屏蔽设置。 STM32的19个外部中断对应着19路中断线,分别是EXTI_Line0-EXTI_Line18: 线0~15:对应外部 IO口的输入中断。 线16&…

十年数据分析经验分享

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

小熊家务帮day10-day12 门户管理(缓存,主页,定时任务)

门户管理 1 门户介绍1.1 介绍1.2 常用技术方案 2 缓存技术方案2.1 需求分析2.1.1 C端用户界面原型2.1.2 缓存需求2.1.3 使用的工具 2.2 项目基础使用2.2.1 项目集成SpringCache2.2.2 测试Cacheable需求Service测试 2.1.3 缓存管理器(设置过期时间)2.1.4 …