Flink高手之路5-Table API SQL

news2025/1/3 4:41:50

文章目录

  • Flink 中的Table API & SQL
    • 一、Table API & SQL 介绍
      • 1. 为什么要Table API和SQL
      • 2. Table API & SQL的特点
      • 3. Table API& SQL发展历程
        • 3.1 架构升级
        • 3.2 查询处理器的选择
        • 3.3 了解-Blink planner和Flink Planner具体区别如下:
        • 3.4 了解-Blink planner和Flink Planner具体区别如下:
      • 4. 注意事项
    • 二.、API
    • 三、案例
      • 1. 案例一
        • 1)导入pom依赖
        • 2)新建包和类
        • 3)代码实现
        • 4)运行,查看结果
        • 5)表的合并操作
        • 6)合并结果

Flink 中的Table API & SQL

一、Table API & SQL 介绍

1. 为什么要Table API和SQL

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/overview/

image-20230420163500993

Flink的Table模块包括 Table API 和 SQL:

Table API 是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,非常直观和方便

SQL作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手

Flink Table API 和 SQL 的实现上有80%左右的代码是公用的。作为一个流批统一的计算引擎,Flink 的 Runtime 层是统一的。

2. Table API & SQL的特点

Flink之所以选择将 Table API & SQL 作为未来的核心 API,是因为其具有一些非常重要的特点:
在这里插入图片描述

  • 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行;
  • 高性能:可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;
  • 简单易学:易于理解,不同行业和领域的人都懂,学习成本较低;
  • 标准稳定:语义遵循SQL标准,非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;
  • 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批模式运行,Flink底层Runtime本身就是一个流与批统一的引擎

3. Table API& SQL发展历程

3.1 架构升级

自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。随着版本的不断更新,API 也出现了很多不兼容的地方。

在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能

在这里插入图片描述

在Flink 1.9 之前,Flink API 层 一直分为DataStream API 和 DataSet API,Table API & SQL 位于 DataStream API 和 DataSet API 之上。可以看处流处理和批处理有各自独立的api (流处理DataStream,批处理DataSet)。而且有不同的执行计划解析过程,codegen过程也完全不一样,完全没有流批一体的概念,面向用户不太友好。

在Flink1.9之后新的架构中,有两个查询处理器:Flink Query Processor,也称作Old Planner和Blink Query Processor,也称作Blink Planner。为了兼容老版本Table及SQL模块,插件化实现了Planner,Flink原有的Flink Planner不变,后期版本会被移除。新增加了Blink Planner,新的代码及特性会在Blink planner模块上实现。批或者流都是通过解析为Stream Transformation来实现的,不像Flink Planner,批是基于Dataset,流是基于DataStream。

3.2 查询处理器的选择

查询处理器是 Planner 的具体实现,通过parser、optimizer、codegen(代码生成技术)等流程将 Table API & SQL作业转换成 Flink Runtime 可识别的 Transformation DAG,最终由 Flink Runtime 进行作业的调度和执行。

Flink Query Processor查询处理器针对流计算和批处理作业有不同的分支处理,流计算作业底层的 API 是 DataStream API, 批处理作业底层的 API 是 DataSet API。

Blink Query Processor查询处理器则实现流批作业接口的统一,底层的 API 都是Transformation,这就意味着我们和Dataset完全没有关系了。

Flink1.11之后Blink Query Processor查询处理器已经是默认的了:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/

在这里插入图片描述

3.3 了解-Blink planner和Flink Planner具体区别如下:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

在这里插入图片描述

3.4 了解-Blink planner和Flink Planner具体区别如下:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

在这里插入图片描述

4. 注意事项

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/common/

All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
所有Flink Scala api都已弃用,并将在未来的Flink版本中删除。您仍然可以在Scala中构建应用程序,但是应该使用Java版本的DataStream和/或Table API。

二.、API

  • 创建表
  • 查询表
  • 输出表

三、案例

1. 案例一

将DataStream注册为Table和view并进行SQL统计

1)导入pom依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>1.12.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.7</version>
        </dependency>
    </dependencies>

注意本地scala版本:2.12为scala版本。

2)新建包和类

在这里插入图片描述

3)代码实现

package cn.edu.hgu.bigdata20.flink.table_demo;

import cn.edu.hgu.bigdata20.flink.mysql.Student;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

/**
 * description:flink的TableAPI和SQL演示
 * author 王
 * date 2023/04/20
 */
public class FlinkTableDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.table env
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 3.source
        DataStream<Student> StudentA = env.fromCollection(Arrays.asList(
                new Student(101, "张三", "男", 20, "14567457623", "唐山"),
                new Student(102, "李四", "女", 18, "17467457623", "张家口"),
                new Student(103, "王五", "男", 19, "18367457623", "承德")
        ));
        DataStream<Student> StudentB = env.fromCollection(Arrays.asList(
                new Student(201, "赵六", "女", 21, "18867457623", "秦皇岛"),
                new Student(202, "钱七", "男", 19, "17567457623", "邢台"),
                new Student(203, "孙八", "女", 18, "16767457623", "廊坊")
        ));

        // 3.注册表
        // 3.1 转换一个流为一个表
        Table tableA = tEnv.fromDataStream(StudentA, $("id"), $("sname"), $("sex"), $("age"), $("phone"), $("address"));
        // 3.2 注册一个流为表
        tEnv.createTemporaryView("StudentB", StudentB, $("id"), $("sname"), $("sex"), $("age"), $("phone"), $("address"));
        // 4.transformation
        Table tableB = tEnv.sqlQuery("select * from StudentB where age >= 19 ");//SQL

        // 5.sink
        // 把表转换为流
        DataStream<Student> resultA = tEnv.toAppendStream(tableA, Student.class);
        DataStream<Student> resultB = tEnv.toAppendStream(tableB, Student.class)
        // tableA.printSchema();
        // tableB.printSchema();
        // resultA.print();
         resultB.print();
        // 6.execute
        env.execute();
    }
}

4)运行,查看结果

在这里插入图片描述

5)表的合并操作

package cn.edu.hgu.bigdata20.flink.table;

import cn.edu.hgu.bigdata20.flink.mysql.Student;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

/**
 * description:flink的TableAPI和SQL演示
 * author 王
 * date 2023/04/20
 */
public class FlinkTableDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.table env
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 3.source
        DataStream<Student> StudentA = env.fromCollection(Arrays.asList(
                new Student(101, "张三", "男", 20, "14567457623", "唐山"),
                new Student(102, "李四", "女", 18, "17467457623", "张家口"),
                new Student(103, "王五", "男", 19, "18367457623", "承德")
        ));
        DataStream<Student> StudentB = env.fromCollection(Arrays.asList(
                new Student(201, "赵六", "女", 21, "18867457623", "秦皇岛"),
                new Student(202, "钱七", "男", 19, "17567457623", "邢台"),
                new Student(203, "孙八", "女", 18, "16767457623", "廊坊")
        ));

        // 3.注册表
        // 3.1 转换一个流为一个表
        Table tableA = tEnv.fromDataStream(StudentA, $("id"), $("sname"), $("sex"), $("age"), $("phone"), $("address"));
        // 3.2 注册一个流为表
        tEnv.createTemporaryView("StudentB", StudentB, $("id"), $("sname"), $("sex"), $("age"), $("phone"), $("address"));
        // 4.transformation
        Table tableB = tEnv.sqlQuery("select * from StudentB where age >= 19 ");//SQL
        // 合并
        Table unionTable = tEnv.sqlQuery("select * from " + tableA + " where age > 19 " +
                "union all " +
                "select * from StudentB where age >= 19");

        // 5.sink
        // 把表转换为流
        DataStream<Student> resultA = tEnv.toAppendStream(tableA, Student.class);
        DataStream<Student> resultB = tEnv.toAppendStream(tableB, Student.class);
        DataStream<Student> resultUnion = tEnv.toAppendStream(unionTable, Student.class);
        // tableA.printSchema();
        // tableB.printSchema();
        // resultA.print();
        // resultB.print();
        resultUnion.print();
        // 6.execute
        env.execute();
    }
}

出现问题:Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered “<”,可能是符号有问题。

6)合并结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/442671.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

神采PromeAI 2.0版本上线,助你释放创作超能力

上个月&#xff0c;我们推出神采PromeAI 1.0版本&#xff0c;让用户可以免费体验AI草图渲染功能。神采作为设计师的提效工具和灵感源泉&#xff0c;深受用户的广大好评。于是&#xff0c;在经过算法优化后&#xff0c;神采PromeAI 2.0版本终于在本周上线了&#xff01; 我们提供…

【Vulnhub】之Symfonos2

一、 部署方法 在官网上下载靶机ova环境&#xff1a;https://download.vulnhub.com/symfonos/symfonos2.7z使用VMware搭建靶机环境攻击机使用VMware上搭建的kali靶机和攻击机之间使用NAT模式&#xff0c;保证靶机和攻击机放置于同一网段中。 二、 靶机下载安装 靶机下载与安…

ETCD(四)读请求处理过程

客户端通过etcdctl执行get命令 etcdctl get name --endpoints localhost:12379,192.158.00.32:12379client端 首先是client会解析这条命令&#xff0c;包括其中的get API方法&#xff0c;key值&#xff0c;请求server地址。解析完之后etcdctl会创建一个clientv3库对象&#xf…

Ubantu docker学习笔记(七)容器网络

文章目录 一、容器网络管理1.1查看容器网络1.2创建容器网络1.3 删除容器网络1.4 容器网络详细信息1.5 配置容器网络1.6 断开容器网络连接 二、none网络三、host网络四、bridge网络五、container网络六、容器连接外部网络6.1创建Overlay网络6.2创建Macvlan网络 一、容器网络管理…

研0进阶式学习---数据库配置

目录 最开始的问题&#xff1a;不同的连接名下面的数据库信息完全一样尝试新建用户名和密码&#xff0c;以此来建立新的连接 但这样建立的连接下面的数据库仍然是和之前的一模一样尝试改变xampp端口号&#xff0c;以此来建立新的连接 结论MySQL实例的数据库文件是与实例绑定的&…

完美解决丨+# TypeError: ‘dict_keys‘ object does not support indexing

结构 - 标题 - 问题描述 - 代码栗子 - 总结 目录 TypeError: dict_keys object does not support indexing 如何实现&#xff1f; python a {a: 1} b a.keys() c b[0] 异常描述 TypeError Traceback (most recent call last) <ipython-input-9-9dceb06f3f…

信号完整性分析基本概念之Retimer和Redriver

一两句话讲清楚版&#xff1a; Retimer 通过 其 Rx 端 CTLE/DFE (连续时间线性均衡/判断反馈均衡) 、CDR (时钟数据恢复) 及 Tx 端 EQ (均衡)&#xff0c;来够补偿信道损耗&#xff0c;消除信号抖动&#xff0c;提升信号完整性&#xff0c;从而增加传输距离。 Redriver 是放大…

多线程拉取+kafka推送

多线程拉取kafka推送 1 多线程 在本次需求中&#xff0c;多线程部分我主要考虑了一个点&#xff0c;就是线程池的配置如何最优。因为数据量级比较大&#xff0c;所以这个点要着重处理&#xff0c;否则拉取的时间会非常长或者是任务失败会比较频繁&#xff1b; 因为数据的量级…

Spring Security OAuth2.0(一)-----前言-授权码模式及代码实例

什么是 OAuth2 OAuth 是一个开放标准&#xff0c;该标准允许用户让第三方应用访问该用户在某一网站上存储的私密资源&#xff08;如头像、照片、视频等&#xff09;&#xff0c;而在这个过程中无需将用户名和密码提供给第三方应用。实现这一功能是通过提供一个令牌&#xff08…

如何治理“网络暴力” 在人类文明不断发展向前的进程中,大数据时代应运而来。数学建模解题步骤,愚见而已,欢迎指错和探讨呀~

题目可见文章&#xff1a;(20条消息) 如何治理“网络暴力” 在人类文明不断发展向前的进程中&#xff0c;大数据时代应运而来。 数学建模&#xff0c;90%成品论文&#xff0c;附附件、原题、代码 注&#xff0c;水平有限&#xff0c;非广告&#xff0c;仅供交流参考&#xff0c…

6、ThingsBoard使用jar包自己构建镜像部署

1、概述 这一节主要讲解你自己使用jar包构建镜像,一般在很多企业中,都是使用Jenkins配置流水线,自动打包,然后拷贝程序在target目录下生成的jar包,然后使用Dockerfile文件进行构建镜像,其实我这一节讲的也是类似,只是不使用Jenkins来实现自动,原理都一样,估计也是很多…

C++ MySQL存储二进制数据、存储照片

版权声明&#xff1a;本文为CSDN博主「intfre」的原创文章&#xff0c;遵循CC 4.0 BY-SA版权协议&#xff0c;转载请附上原文出处链接及本声明。 原文链接&#xff1a;https://blog.csdn.net/nibiru_holmes/article/details/51387047 0x01 首先MySQL支持二进制的类型有Blob: …

Doris-1.2.0升级到Doris-1.2.4

0 背景 在使用doris-1.2.0版本时发现BE节点无故宕机&#xff0c;自己尝试解决无果后再官网寻找解决方案&#xff0c;发现在doris-1.2.0版本中存在这样的隐患bug导致BE节点宕机。 而在咨询社区之后建议对doris进行升级&#xff0c;升级版本doris-1.2.4。该版本是解决1.2.x问题…

Springboot集成neo4j实现知识图谱关系图

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、neo4j是什么&#xff1f;二、安装步骤1.启动2.使用2.简单命令 二、使用springboot集成neo4j1.引入依赖2.功能实现3.查询关系节点4. 查询指定评委和指定选手…

基于matlab使用光线追踪自定义 CDL 通道模型

一、前言 此示例演示如何使用光线追踪分析的输出自定义 CDL 通道模型参数。该示例演示如何&#xff1a; 指定发射器和接收器在 3D 环境中的位置。 使用光线追踪来计算通道的几何方面&#xff1a;光线数量、角度、延迟和衰减。 使用光线追踪分析的结果配置 CDL 通道模型。 使用相…

KDYZ-YM晶闸管伏安特性测试仪

一、概述 晶闸管的伏安特性是晶闸管的基本特性&#xff0c;这项特性的好坏&#xff0c;直接影响到器件在整机上的正常使用。因此&#xff0c;检测晶闸管的伏安特性在晶闸管器件的生产、经销及使用过程中都是十分重要的。 测试方法符合国标JB/T7624-94《整流二极管测试方法》和J…

深入理解Go语言中的函数【单元测试】14

文章目录 go test工具测试函数测试函数的格式测试函数示例 测试组子测试测试覆盖率基准测试基准测试函数格式基准测试示例性能比较函数重置时间并行测试 Setup与TearDownTestMain子测试的Setup与Teardown 示例函数示例函数示例 go test工具 Go语言中的测试依赖go test命令。编…

Three.js+TypeScript+Webpack学习记录(一)

使用环境参考 Node.js v16.19.1 VSCode 插件&#xff1a;Live Server 正文 初始化新工程 安装好 node 环境后&#xff0c;新建一个空项目文件夹&#xff0c;执行 npm init 一路回车即可。 然后配置 npm 所需要的包&#xff0c;直接列一下 package.json {"name":…

SpringBoot集成模板引擎Thymeleaf

本博文重点内容共3个方面&#xff0c;分别是&#xff1a; 在SpringBoot中使用thymeleaf自定义thymeleaf工具类导出静态网页thymeleaf常用的标签 一、在SpringBoot中使用thymeleaf pom.xml <!--Thymeleaf 启动器--><dependency><groupId>org.springframewo…

java基础总结-java技术栈快速复习

java基础 java基础概念 java概述和语言背景 java语言是没有sun公司&#xff08;Stanford University Network&#xff1a;斯坦福大学网络&#xff09;在1995年推出的计算机语言java之父&#xff1a;詹姆斯高斯林&#xff08;James Gosling&#xff09;2009年&#xff0c;sun公…