Flink系列Table API和SQL之:动态表、持续查询、将流转换成动态表、更新查询、追加查询、将动态表转换为流、更新插入流(Upsert)

news2025/1/8 4:53:20

Flink系列Table API和SQL之:动态表、持续查询、将流转换成动态表、更新查询、追加查询、将动态表转换为流、更新插入流

  • 一、表和流的转换
  • 二、动态表
  • 三、持续查询
  • 四、将流转换成动态表
  • 五、更新查询
  • 六、追加查询
  • 七、将动态表转换为流
  • 八、更新插入流(Upsert)

一、表和流的转换

  • Flink中使用表和SQL基本上跟其他场景是一样的。不过对于表和流的转换,却稍显复杂。当我们将一个Table转换成DataStream时,有"仅插入流"(Insert-Only Streams)和"更新日志流"(Changelog Streams)两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作。
  • 这种麻烦其实是不可避免的。Table API和SQL本质上都是基于关系型表的操作方式。关系型表本身是有界的,更适合批处理的场景。所以在MySQL、Hive这样的固定数据集中进行查询,使用SQL就会显得得心应手。而对于Flink这样的流处理框架来说,要处理的是源源不断到来的无界数据流,无法等到暑假都到齐再做查询,每来一条数据就应该更新一次结果。这时如果一定要使用表和SQL进行处理,就会显得别扭,需要引入一些特殊的概念。

可以将关系型表/SQL与流处理做一个对比。

关系型表/SQL流处理
处理的数据对象字段元组的有界集合字段元祖的无限序列
查询可以访问完整的数据输入无法访问到所有数据,必须持续等待流式输入
对数据的访问
查询终止条件生成固定大小的结果集合终止永不停止,根据持续收到的数据不断更新查询结果

流处理面对的数据是连续不断的,这导致了流处理中的表跟我们熟悉的关系型数据库中的表完全不同。基于表的查询操作,也就有了新的含义。
希望把流数据换成表的形式,那么这表中的数据就会不断增长。如果进一步基于表执行SQL查询,那么得到的结果就不是一成不变的,而是会随着新数据的到来持续更新。

二、动态表

  • 当流中有新数据到来,初始的表中会查入一行,而基于这个表定义的SQL查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为动态表(Dynamic Tables)。
  • 动态表是Flink在Table API和SQL中的核心概念,为流数据处理提供了表和SQL支持。所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是"静态表"。而动态表则完全不同,它里面的数据会随时间变化。
  • 其实动态表的概念,在传统的关系型数据库中已经有所接触。数据库中的表,其实是一系列INSERT、UPDATE和DELETE语句执行的结果。在关系型数据库中,我们一般把它称为更新日志流(changelog stream)。如果我们保存了表在某一时刻的快照(snapshot),那么接下来只要读取更新日志流,就可以得到表之后的变化过程和最终结果了。在很多高级关系型数据库(比如Oracle、DB2)中都有物化视图的概念,可以用来缓存SQL查询的结果。它的更新其实就是不停地处理更新日志流的过程。
  • Flink中的动态表,就借鉴了物化视图的思想。

三、持续查询

  • 动态表可以像静态的批处理一样进行查询操作。由于数据在不断变化,因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样依一来,对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作持续查询。对动态表定义的查询操作,都是持续查询,而持续查询的结果也会是一个动态表。
  • 由于每次暑假到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个快照(snapshot),当作有限数据集进行批处理。流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就构成了持续查询。

持续查询的过程,可以清晰地看到流、动态表和持续查询的关系:

在这里插入图片描述

持续查询的步骤如下:

  1. 流stream被转换为动态表dynamic table
  2. 对动态表进行持续查询(continuous query),生成新的动态表
  3. 生成的动态表被转换成流。

这样,只要API将流和动态表的转换封装起来,就可以直接在数据流上执行SQL查询,用处理表的方式来做流处理了。

四、将流转换成动态表

  • 为了能够使用SQL来做流处理,必须先把流(stream)转换成动态表
  • 如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert)操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只能在后面追加。所以我们其实是通过一个只有插入操作(insert-only)的更新日志(changelog)流,来构建一个表。

流转换成动态表的过程:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        //创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //将dataStream转换成table
        Table eventTable = tableEnv.fromDataStream(eventStream);
        //直接写sql进行转换
        Table resultTable = tableEnv.sqlQuery("select user,url from " + eventTable);
        //基于table直接转换
        Table resultTable2 = eventTable.select($("user"), $("url"))
                .where($("user").isEqual("Alice"));

        //转换成流打印输出
        tableEnv.toDataStream(resultTable2).print("result2");
        tableEnv.toDataStream(resultTable).print("result");

        env.execute();

在这里插入图片描述

五、更新查询

在代码中定义了一个SQL查询

Table urlCountTable = tableEnv.sqlQuery("SELECT user,COUNT(url) as cnt FROM EnentTable GROUP BY user")

这个查询很简答,主要是分组聚合统计每个用户点击次数。把原始的动态表注册为EventTable,经过查询转换后得到urlCountTable。这个结果动态表中包含两个字断,具体定义如下:

[
user: VARCHAR,  //用户名
cnt: BIGINT。//用户访问url的次数
]

当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。换句话说,用来定义结果表的更新日志(changelog)流中,包含了INSERT和UPDATE两种操作。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用toChangelogStream()方法。
在这里插入图片描述

六、追加查询

  • 查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一个简单的条件查询,结果表中就会像原始表EventTable一样,只有插入Insert操作了。
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url,user FROM EventTable WHERE user = 'Cary' ");
  • 这样的持续查询,就被称为追加查询(Append Query),定义的结果表的更新日志(changelog)流中只有INSERT操作。追加查询得到的结果表,转换成DataStream调用方法没有限制,可以直接用toDataStream(),也可以像更新查询一样调用toChangeStream()。
  • 这样看来似乎可以总结一个规律:只要用到了聚合,在之前的结果上有叠加,就会产生更新操作,就是一个更新查询。但是事实上,更新查询的判断标准是结果表中的数据是否会有UPDATE操作,如果聚合的结果不再改变,那么同样也不是更新查询。

可以考虑开一个滚动窗口,统计每一小时内所有用户的点击次数,并在接过表中增加一个endT字段,表示当前统计窗口的结束时间。这时结果表的字段定义如下:

[
	user: VARCHAR, //用户名
	endT: TIMESTAMP, //窗口结束时间
	cnt: BIGINT. //用户访问url的次数

]
  • 与之前的分组聚合一样,当原始动态表不停地插入新的数据时,查询得到的结果result会持续地进行更改。比如时间戳在12:00:00到12:59:59之间有四条数据,其中Alice三次点击、Bob一次点击,所以当水位线达到13:00:00时窗口关闭,输出到结果表中的就是新增两条数据[Alice,13:00:00,3]和[Bob,13:00:00,1]。同理,当下一个小时的窗口关闭时,也会将统计结果追加到result表后面,而不会更新之前的数据。
  • 我们发现,由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入INSERT操作,而没有更新UPDATE操作,所以这里的持续查询,依然是一个追加(Append)查询。结果表result如果转换成DataStream,可以直接调用toDataStream()方法。

七、将动态表转换为流

  • 与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)、删除(Delete)操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。
  • 在Flink中,Table API和SQL支持三种编码方式:
  • 仅追加流(Append-only):仅通过插入Insert更改来修改的动态表,可以直接转换为仅追加流。这个流中发出的数据,其实就是动态表中新增的每一行。
  • 撤回流:撤回流是包含两类消息的流,添加(add)消息和撤回(retract)消息。
  • 具体的编码规则是:INSERT插入操作编码为add消息。DELETE删除操作编码为retract消息。而UPDATE更新操作则编码为被更改行的retract消息,和更新后行的add消息。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。
  • 可以看到,更新操作对于撤回流来说,对应着两个消息,之前数据的撤回(删除)和新数据的插入。

将动态表转换为撤回流的过程:
在这里插入图片描述

  • 用+代表add消息(对应插入INSERT操作)
  • 用-代表retract消息(对应删除DELETE操作)
  • 当Alice的第一个点击事件到来时,结果表新增一条数据[Alice,1]
  • 当Alice的第二个点击事件到来时,结果表会将[Alice,1]更新为[Alice,2],对应的编码就是删除[Alice,1]、插入[Alice,2]。这样当一个外部系统收到这样的两条消息时,就知道要对Alice的点击统计次数进行更新了。

八、更新插入流(Upsert)

  • 更新插入流只包含两种类型的消息:更新插入(upsert)和删除(delete)消息。
  • 所谓upsert其实就是update和insert的合成词,所以对于更新插入流来说,INSERT插入操作和UPDATE更新操作,统一被编码为upsert消息。
  • DELETE删除操作则被编码为delete消息。

既然更新插入流中不区分插入(insert)和更新(update),自然会想到一个问题,如果希望更新一行数据时,怎么保证最后做的操作不是插入呢?

这就需要动态表中必须有唯一的键(key)。通过这个key进行查询,如果存在对应的数据就做更新(update),如果不存在就直接插入(insert)。这是一个动态表可以转换为更新插入流的必要条件。当然,收到这条流中数据的外部系统,也需要知道这唯一的键(key),这样才能正确地处理消息。

将动态表转换为更新插入流的过程:
在这里插入图片描述

  • 可以看到,更新插入流跟撤回流的主要区别在于,更新操作由于有key的存在,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/132248.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

综合能源系统电压稳定研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【锟斤拷�⊠是怎样炼成的】——两分钟帮你彻底弄懂计算机的编码原理

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; &#x1f33b;&#x1f33b;&#x1f33b;Hello&#xff0c;大家好&#xff0c;我是天寒雨落&#xff0c;一名有趣的博主&#xff0c;小白一枚&#xff0c;多多关照&#x1f61c;&#x1f61c…

解决vue-cli项目打包出现空白页和路径错误的问题

今天为大家分享一篇解决vue-cli(&#xff08;vue-cli2.x版本&#xff09;项目打包出现空白页和路径错误的问题。具有很好的参考价值。希望对大家有所帮助。 vue-cli项目打包&#xff1a; 1. 命令行输入&#xff1a;npm run build 打包出来后项目中就会多了一个文件夹dist&am…

k8s1.23.15版本二进制部署/扩容及高可用架构详解

前言 众所周知&#xff0c;kubernetes在2020年的1.20版本时就提出要移除docker。这次官方消息表明在1.24版本中彻底移除了dockershim&#xff0c;即移除docker。但是在1.24之前的版本中还是可以正常使用docker的。考虑到可能并不是所有项目环境都紧跟新版换掉了docker&#xff…

五、树和二叉树

一、定义及基本术语 详见书本P111~113 二叉树不是树的特殊情况&#xff0c;它们是两个概念&#xff0c;但有关树的基本术语对二叉树都适用。 二叉树的子树一定要区分左子树还是右子树&#xff0c;即使只有一棵子树也一定要说明是左子树还是右子树&#xff0c;树只有一个孩子的…

事务隔离:为什么你改了我还看不见?

提到事务&#xff0c;你肯定不陌生&#xff0c;和数据库打交道的时候&#xff0c;我们总是会用到事务。最经典的例子就是转账&#xff0c;你要给朋友小王转 100 块钱&#xff0c;而此时你的银行卡只有 100 块钱。 转账过程具体到程序里会有一系列的操作&#xff0c;比如查询余…

迎接2023,用JAVA演奏“新年”

&#x1f60a;你好&#xff0c;我是小航&#xff0c;一个正在变秃、变强的文艺倾年。 &#x1f514;2023年快要到来啦&#xff0c;再此祝大家诸事顺遂&#xff0c;所见所盼皆如愿。 &#x1f514;本文讲解如何使用Java演奏一首歌曲&#xff0c;一起卷起来叭&#xff01; 众所周…

【复习】计算机网络学习笔记

前言 本篇笔记方便本人用于复习回顾知识点&#xff0c;内容庞杂&#xff0c;见谅。含有目录方便大家跳转复习&#xff01; 此复习笔记总结于 湖科大教书匠出品&#xff1a;深入浅出计算机网络 微课视频 此笔记尚未完结&#xff0c;持续更新中… 文章目录前言第一章 概述1.1 …

高并发系统设计 -- 服务限流算法

常见的限流算法 漏桶算法 漏桶法的关键点在于漏桶始终按照固定的速率运行&#xff0c;但是它并不能很好的处理有大量突发请求的场景&#xff0c;毕竟在某些场景下我们可能需要提高系统的处理效率&#xff0c;而不是一味的按照固定速率处理请求。 关于漏桶的实现&#xff0c;u…

快速入门 .NET nanoFramework 开发 ESP32-Pico 应用

本文是一篇适合初学者的 .NET nanoFramework 保姆级入门教程&#xff0c;并提供了基本的入门程序并介绍了微雪的 ESP32-S2-Pico 使用 .NET nanoFramework 开发过程的基础知识。 目录 1. 背景 1.1 .NET IOT 与 .NET nanoFramework 1.2 微控制器 1.3 实验板介绍 2. 搭建 .NET…

移动Web【空间转换[空间位移、透视、空间旋转、立体呈现、3D导航、空间缩放]、动画、综合案例】

文章目录一、空间转换1.1 空间位移1.2 透视1.3 空间旋转1.4 立体呈现1.5 3D导航1.6 空间缩放二、动画2.1 动画的实现步骤2.2 动画属性三、综合案例2.1 走马灯一、空间转换 空间&#xff1a;是从坐标轴角度定义的。 x 、y 和z三条坐标轴构成了一个立体空间&#xff0c;z轴位置与…

Android实战进阶 - 拉取项目代码后多处报红?如资源找不到该如何处理?

近期参与了一个我很感兴趣的项目&#xff0c;项目内用到了很多新东西&#xff0c;例如组件化、模块化、ARouter路由、MVI框架、Kt高阶用法等等&#xff0c;感觉可以学一段时间… Gradle相关Blog Android Gradle - Gradle、Gradle plugin 基础认知Android Gradle - AndroidStud…

函数极限定义的理解

回顾一下非正式的极限定义法。当x从任意一侧(自左向右或自右向左)接近常量 c时&#xff0c;如果f(x)变得任意接近一个单独的值L, 则当x接近c时f(x)的极限值是L, 写作 咋一看&#xff0c;这个定义似乎非常技术化。即使这样&#xff0c;它仍然是非正式的&#xff0c;因为它没有给…

三、Django -视图

Django 提示&#xff1a;本文根据b站黑马python课整理 链接指引 > 黑马程序员python企业级开发项目-手把手从0到1开发《美多商城》 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录Django视图介绍和项目准备视图…

【数据集6】全球人工不透水面积GAIA(清华数据)

全球人工不透水面积&#xff08;lobal artificial impervious area, GAIA&#xff09; 人工不透水区是表征建成区和城市范围的重要覆盖类型&#xff0c;特别是在较细的空间分辨率下。 1 简介 原理&#xff1a; 由Landsat卫星图像和辅助数据集生成&#xff0c;如夜间灯光数据…

健康码识别[QT+OpenCV]

&#x1f482; 个人主页:风间琉璃&#x1f91f; 版权: 本文由【风间琉璃】原创、在CSDN首发、需要转载请联系博主&#x1f4ac; 如果文章对你有帮助、欢迎关注、点赞、收藏(一键三连)和订阅专栏哦目录 一、识别原理 1.二维码定位 2.颜色识别 二、部分源码 一、识别原理 二维…

matlab实现基本相位调制

相位调制&#xff08;PM&#xff09;是将信息编码为载波的瞬时相位变化的一种调制模式。 调相的基本表达式如下&#xff1b; 载波c(t)是一个标准正弦信号&#xff1b;m(t)是调制信号&#xff1b;调制以后是把m(t)的变化附加到了载波的相位变化上&#xff1b; 调相的基本示意如…

WPF中iconfont图标库的使用

总目录 文章目录总目录前言一、查找项目需要的图标二、图标的使用1.将下载的文件解压缩2.将ttf文件复制粘贴到自己的项目中3.使用总结前言 本文主要介绍在WPF中iconfont图标库的使用 一、查找项目需要的图标 首先进入阿里巴巴矢量图标库网站&#xff0c;登录自己的账号&#…

MySQL快速生成大量测试数据 (脚本一键生成分表数据)

生成128个分表的测试数据敲到手累&#xff1b; 生成的测试数据虽然有离散分布&#xff0c;但随着时间的增长数据量不增反降&#xff0c;不符合大多数线上业务的增长趋势&#xff1b; 生成的测试数据部分超过当前日期。 具体表现如下图所示&#xff1a; 我们直接看下脚本的用法…

月入8000+的steam/csgo搬砖项目(详细拆解)

大家好&#xff0c;我是阿阳 今天就给大家带来一个在steam游戏搬砖项目的拆解&#xff0c;目前这个项目我们团队也一直在带队实操&#xff0c;已经跑通了项目的整个流程&#xff0c;提炼出了完整的赚钱体系。 先给大家看看近期的收益情况&#xff1a; 近期的出售记录&#xf…