46、Flink 的table api与sql之配项列表及示例

news2025/1/23 1:03:11

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
31、Flink的SQL Gateway介绍及示例
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介绍及详细示例
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
45、Flink 的指标体系介绍及验证
46、Flink 的table api与sql之配项列表及示例


文章目录

  • Flink 系列文章
  • 一、Table 和 SQL API 的配置
    • 1、概览
    • 2、简单示例(java和SQL Client)
      • 1)、maven依赖
      • 2)、示例代码及运行结果
      • 3)、SQL Client示例
    • 3、执行配置
    • 4、优化器配置
    • 5、Planner 配置
    • 6、SQL Client 配置


本文简单介绍了table和SQL API在执行、优化器、planner和sql client几方面的配置属性以及以java代码示例性的演示属性的配置方式。
本文依赖kafka集群能正常使用。
本文分为6个部分,即maven依赖、概述、执行配置、优化器配置、planner配置、sql client配置和示例。
本文的示例是在Flink 1.17版本中运行。

一、Table 和 SQL API 的配置

Table 和 SQL API 的默认配置能够确保结果准确,同时也提供可接受的性能。

根据 Table 程序的需求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态是有限的(请参阅 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置).

1、概览

当实例化一个 TableEnvironment 时,可以使用 EnvironmentSettings 来传递用于当前会话的所期望的配置项 —— 传递一个 Configuration 对象到 EnvironmentSettings。

此外,在每个 TableEnvironment 中,TableConfig 提供用于当前会话的配置项。

对于常见或者重要的配置项,TableConfig 提供带有详细注释的 getters 和 setters 方法。

对于更加高级的配置,用户可以直接访问底层的 key-value 配置项。以下章节列举了所有可用于调整 Flink Table 和 SQL API 程序的配置项。

因为配置项会在执行操作的不同时间点被读取,所以推荐在实例化 TableEnvironment 后尽早地设置配置项。

2、简单示例(java和SQL Client)

1)、maven依赖

	<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<!-- flink连接器 -->
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
	</dependencies>

2)、示例代码及运行结果

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestTableAndSQLConfigurationDemo {
	final static List<User> userList = Arrays.asList(
			new User(1L, "alan", 18, 1698742358391L), 
			new User(2L, "alan", 19, 1698742359396L), 
			new User(3L, "alan", 25, 1698742360407L),
			new User(4L, "alanchan", 28, 1698742361409L), 
			new User(5L, "alanchan", 29, 1698742362424L)
			);
	
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class User {
		private long id;
		private String name;
		private int balance;
		private Long rowtime;
	}
	
	public static void testConfiguration() throws Exception {
		// instantiate table environment
		Configuration configuration = new Configuration();
		// set low-level key-value options
		configuration.setString("table.exec.mini-batch.enabled", "true");
		configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
		configuration.setString("table.exec.mini-batch.size", "5000");
		
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().withConfiguration(configuration).build();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

		// access flink configuration after table environment instantiation
		TableConfig tableConfig = tenv.getConfig();
		// set low-level key-value options
		tableConfig.set("table.exec.mini-batch.enabled", "true");
		tableConfig.set("table.exec.mini-batch.allow-latency", "5 s");
		tableConfig.set("table.exec.mini-batch.size", "5000");

		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						);
		
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
				
		//按属性、时间窗口分组后的互异(互不相同、去重)聚合
		Table groupByWindowResult = usersTable
			    .window(Tumble
			            .over(lit(5).minutes())
			            .on($("rowtime"))
			            .as("w")
			    )
			    .groupBy($("name"), $("w"))
			    .select($("name"), $("balance").sum().distinct().as("sum_balance"));
		DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(groupByWindowResult, Row.class);
		result2DS.print("result2DS:");
//		result2DS::2> (true,+I[alan, 62])
//		result2DS::16> (true,+I[alanchan, 57])
		
		//使用分组窗口结合单个或者多个分组键对表进行分组和聚合。
		Table result = usersTable
			    .window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w")) // 定义窗口
			    .groupBy($("name"), $("w")) // 按窗口和键分组
			    // 访问窗口属性并聚合
			    .select(
			        $("name"),
			        $("w").start(),
			        $("w").end(),
			        $("w").rowtime(),
			        $("balance").sum().as("sum(balance)")
			    );
				
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print("resultDS:");
//		resultDS::16> (true,+I[alanchan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 57])
//		resultDS::2> (true,+I[alan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 62])
		
		env.execute();
	}

	public static void main(String[] args) throws Exception {
		testConfiguration();
	}

}

3)、SQL Client示例

该部分的配置选项在文章 :23、Flink 的table api与sql之流式聚合(性能调优)有说明。

SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '5000';

Flink SQL> SET 'table.exec.mini-batch.enabled' = 'true';
[INFO] Execute statement succeed.

Flink SQL> SET 'table.exec.mini-batch.allow-latency' = '5s';
[INFO] Execute statement succeed.

Flink SQL> SET 'table.exec.mini-batch.size' = '5000';
[INFO] Execute statement succeed.

3、执行配置

以下选项可用于优化查询执行的性能。
本选项采用了中英文对照的方式,避免翻译的不准确。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4、优化器配置

以下配置可以用于调整查询优化器的行为以获得更好的执行计划。
有些优化属性是流批都可以用的,有些用于流式,有些用于批处理。在default中没有特别说明的是流批均可,如果标明了则表示只适用于流或批。
在这里插入图片描述
在这里插入图片描述

5、Planner 配置

以下配置可以用于调整 planner 的行为。
有些配置属性是流批都可以用的,有些用于流式,有些用于批处理。在default中没有特别说明的是流批均可,如果标明了则表示只适用于流或批。
在这里插入图片描述
在这里插入图片描述

6、SQL Client 配置

以下配置可以用于调整 sql client 的行为。
有些配置属性是流批都可以用的,有些用于流式,有些用于批处理。在default中没有特别说明的是流批均可,如果标明了则表示只适用于流或批。
在这里插入图片描述

以上,本文简单介绍了table和SQL API在执行、优化器、planner和sql client几方面的配置属性以及以java代码示例性的演示属性的配置方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1246124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

opencv-GrabCut 图像分割算法

GrabCut 是一种图像分割算法&#xff0c;通过迭代优化的方式将图像分割为前景和背景。这种算法最初由Carsten Rother、Vladimir Kolmogorov和Andrew Blake于2004年提出。 GrabCut 算法的基本思想是通过用户**提供的一个矩形区域&#xff08;称为"掩模"&#xff09;*…

【C++】:多态

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关多态的知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数据结…

MFC所有控件介绍及基本使用

一、前言 本篇文档介绍了MFC控件的基本使用&#xff0c;同时提供了关于MFC控件使用的工程代码&#xff0c;程序界面如下图&#xff0c;有兴趣的可以到文档最后的链接处进行下载。 二、控件介绍 2.1 Button &#xff08;按钮&#xff09; 2.2 CheckBox&#xff08;复选框&am…

【算法】链表-20231124

这里写目录标题 一、83. 删除排序链表中的重复元素二、206. 反转链表三、234. 回文链表 一、83. 删除排序链表中的重复元素 简单 1.1K 相关企业 给定一个已排序的链表的头 head &#xff0c; 删除所有重复的元素&#xff0c;使每个元素只出现一次 。返回 已排序的链表 。 示例…

Android相机性能提高50%

文章目录 应用举例&#xff08;可以不看这一part&#xff0c;直接跳过看具体怎么做&#xff09;&#xff1a;Snapchat 通过 Camera2 Extensions API 将新相机功能的集成速度提高了 50%**Camera2 扩展 API 可以访问高级功能更多设备上的更多机会 正文&#xff1a;开始使用扩展架…

甄知燕千云ITAM,您的IT资产管理专家

IT 资产是实现企业持续发展的重要资源之一&#xff0c;也是反观企业数字化转型发展的缩影。通常情况下&#xff0c;IT 资产指组织拥有、租赁或使用的任何信息技术设备和资源&#xff0c;这些设备和资源对组织的业务运行起到支持作用。包括硬件设备&#xff08;如服务器、计算机…

Rust语言入门教程(一) - 简介及Cargo使用

Rust编程入门 为什么学习Rust 我本人是一个DevOps工程师&#xff0c;并不是专职的开发人员&#xff0c;但需要了解各种各样的语言的基本知识和特性&#xff0c;以便在不同的项目中帮助开发人员设计软件架构&#xff0c;部署流程以及进行错误排查和调试。但是对任何新生的优秀…

大语言模型概述(三):基于亚马逊云科技的研究分析与实践

上期介绍了基于亚马逊云科技的大语言模型相关研究方向&#xff0c;以及大语言模型的训练和构建优化。本期将介绍大语言模型训练在亚马逊云科技上的最佳实践。 大语言模型训练在亚马逊云科技上的最佳实践 本章节内容&#xff0c;将重点关注大语言模型在亚马逊云科技上的最佳训…

数据结构算法-贪心算法

引言 贪心&#xff1a;人只要有 “需求“ &#xff0c;都会有有点“贪“&#xff0c; 这种“贪“是一种选择&#xff0c;或者“”取舍“ RTS&#xff08;即时战略&#xff09;游戏&#xff1a; 帝国时代里 首先确保拥有足够的人口 足够的粮食&#xff0c;足够的战略资源 足够的…

VMware vShere download

VMware 前言 VMware vSphere 是 VMware 的虚拟化平台,可将数据中心转换为包括 CPU、存储和网络资源的聚合计算基础架构。vSphere 将这些基础架构作为一个统一的运行环境进行管理,并为您提供工具来管理加入该环境的数据中心。 vSphere 的两个核心组件是 ESXi 和 vCenter Ser…

前端必学——实现电商图片放大镜效果(附代码)

放大镜可以说是前端人必须学会的程序之一,今天的案例为大家展示一下怎么实现放大镜的效果&#xff01; 效果图展示 整个效果就是当鼠标放到展示图上的时候&#xff0c;会出现一个遮罩层以及弹出来一个框展示一个详情图&#xff0c;并且鼠标移动的时候详情图跟着移动&#xff0c…

老外对开发信的评价是什么?如何写开发信?

老外对开发信的评价和态度怎么样&#xff1f;国外客户喜欢的开发信类型有哪些&#xff1f; 许多中国公司和个人都在与老外打交道时&#xff0c;不可避免地需要发送开发信。但是&#xff0c;老外对开发信的评价究竟如何呢&#xff1f;在这篇文章中&#xff0c;蜂邮将深入探讨老…

【代码可视化实践】代码变更影响分析 | 京东云技术团队

1.前言 笔者前文“浅析代码可视化”中讲述了代码可视化的基本实现原理&#xff0c;并给出了一些业界的应用场景。由于涉及原理和技术范围较广&#xff0c;以笔者能力难以做到面面俱到&#xff0c;为了减少信息传递偏差&#xff0c;便给出了一些信息来源供读者深入阅读。不过针…

css图片缩放属性object-fit说明

object-fit 属性可以设置以下值&#xff1a; 属性值说明例子fill填充容器&#xff0c;可能会改变图片的比例。object-fit: fill;contain保持图片的原始比例&#xff0c;确保图片完全包含在容器内。object-fit: contain;cover保持图片的原始比例&#xff0c;确保图片覆盖整个容…

微信小程序开发资源汇总

本文收集了微信小程序开发过程中会使用到的资料、问题以及第三方组件库。本文不是一篇关于如何学习微信小程序的入门指南&#xff0c;也非参考手册&#xff0c;只是一些资料的整理。 本仓库中的资料整理自网络&#xff0c;也有一些来自网友的推荐。 官方文档 小程序设计指南…

【超详细】手搓一个微信日记本

&#x1f380; 文章作者&#xff1a;二土电子 &#x1f338; 关注公众号获取更多资料&#xff01; &#x1f438; 期待大家一起学习交流&#xff01; 这里对之前的微信记事本小程序进行了重新编写&#xff0c;增加了更加详细的步骤描述&#xff0c;将全部图片都改成了本地图…

文心一言 VS 讯飞星火 VS chatgpt (142)-- 算法导论12.1 2题

二、用go语言&#xff0c;二叉搜索树性质与最小堆性质(见 6.1 节)之间有什么不同?能使用最小堆性质在 O(n)时间内按序输出一棵有 n 个结点树的关键字吗?可以的话&#xff0c;请说明如何做&#xff0c;否则解释理由。 文心一言&#xff1a; 在二叉搜索树&#xff08;Binary …

openEuler 22.03 LTS x86_64 cephadm 部署ceph18.2.0 未完成 笔记

环境 准备三台虚拟机 10.47.76.94 node-1 10.47.76.95 node-2 10.47.76.96 node-3 下载cephadm [rootnode-1 ~]# yum install cephadm Last metadata expiration check: 0:11:31 ago on Tue 21 Nov 2023 10:00:20 AM CST. Dependencies resolved. Package …

6 个有效且可用的顶级 Android 数据恢复工具

经过测试 42 种数据恢复软件产品&#xff0c;发现奇客数据恢复安卓版是 Android 设备的最佳选择。 过去几十年来&#xff0c;我一直在科技行业工作&#xff0c;经常帮助人们应对计算机灾难&#xff0c;包括丢失数据。 Android 数据恢复应用程序不在您的设备上运行&#xff0c…