【SQL篇】一、Flink动态表与流的关系以及DDL语法

news2025/1/12 11:25:19

文章目录

  • 1、启动SQL客户端
  • 2、SQL客户端常用配置
  • 3、动态表和持续查询
  • 4、将流转为动态表
  • 5、用SQL持续查询
  • 6、动态表转为流
  • 7、时间属性
  • 8、DDL-数据库相关
  • 9、DDL-表相关

在这里插入图片描述

1、启动SQL客户端

启动Flink(基于yarn-session模式为例):

/opt/module/flink-1.17.0/bin/yarn-session.sh -d

再启动sql的客户端:

/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session

简单看下:

show databases;

2、SQL客户端常用配置

设置结果的显示模式,默认table,还可以设置为tableau、changelog

SET sql-client.execution.result-mode=changelog;

设置执行环境,默认streaming,也可以设置batch

SET execution.runtime-mode=streaming;

设置默认并行度:

SET parallelism.default=1;

设置状态TTL:

SET table.exec.state.ttl=1000;

通过SQL文件初始化,可以发现,exit退出客户端时,刚创建的库表都被清空了,这个SQL初始化文件就是在启动客户端时你想执行的SQL语句

# 创建SQL文件

vim conf/sql-client-init.sql

SET sql-client.execution.result-mode=tableau;
CREATE DATABASE mydatabase;

# 启动时,-i指定SQL文件

/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql

3、动态表和持续查询

和MySQL等关系型表不同的是,无限流下,会有源源不断的数据过来进入表中,即动态表,来一条数据,往表中插入一条数据。对应的,想获取最新结果就要写条SQL去不间断的查询,即持续查询(每次数据到来都会触发查询操作),持续查询的结果也是一个动态表。

关系型表流处理的动态表
处理的数据对象字段元组的有界集合字段元组的无限序列
查询时对数据的访问可以访问到完整的数据输入无法访问到所有数据,必须“持续”等待流式输入
查询终止条件生成固定大小的结果集后终止永不停止, 根据持续收到的数据不断更新查询结果

在这里插入图片描述

如图,持续查询的流程为:

  • 流(stream)被转换为动态表(dynamic table)
  • 对动态表进行持续查询(continuous query),生成新的动态表
  • 生成的动态表被转换成流

如此,就通过执行SQL实现了对数据流的处理。

4、将流转为动态表

把流看作一张表,来一条数据,insert一次,比如有个记录用户点击事件的无限流:

在这里插入图片描述

5、用SQL持续查询

代码中定义一条查询SQL:

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

此时,结果表(动态),可能是简单的insert,如Bob这条数据,也可能是对旧数据的更新update,如Alice,这就是更新查询。 此时,结果表转DataStream调用toChangelogStream()方法。
在这里插入图片描述

修改查询SQL,使用TUMBLE加一个开窗,每个窗口触发时,输出结果,此时对结果表就只有insert追加数据,没有update,即追加查询。

在这里插入图片描述

6、动态表转为流

仅追加(Append-only)流

动态表仅仅通过insert来修改,转为流时,对应一个仅追加的流,流中的每条数据,就是动态表的每行数据。

撤回(Retract)流

流中有添加消息add和撤回消息retract两种,对应表中:

  • insert就是add消息
  • delete就是retract
  • update就是被改行的retract+新结果的add

在这里插入图片描述

更新插入(Upsert)流

流中有更新插入消息upsert和删除消息delete两种,对应表中:

  • update和insert是upsert消息
  • delete为delete消息

在这里插入图片描述

最后,注意,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流两种。

7、时间属性

在表中加个时间字段,数据类型为TimeStamp,分为事件时间和处理时间。事件时间通过watermark语句来定义:

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  ...
);

# TIMESTAMP后的3为精确度

以上,ts字段为事件时间属性,且基于ts设置5s的水位线延迟,注意,延迟秒数5必须加单引号。时间戳类型需要转为秒或者毫秒时,可:

# ...
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
# ...
3即精确到毫秒

定义处理时间属性用procTime函数:

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts AS PROCTIME()
) WITH (
  ...
);

8、DDL-数据库相关

建库:

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

查询所有库:

SHOW DATABASES

查当前库:

SHOW CURRENT DATABASE

修改库的某些属性:

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

删库:

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]

注意,

  • RESTRICT:删除非空数据库会触发异常。默认启用
  • CASCADE:删除非空数据库也会删除所有相关的表和函数,慎用

切换当前库:

USE database_name;

9、DDL-表相关

建表:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  # 字段
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    # 定义watermark
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )   
  # 注释  
  [COMMENT table_comment]
  # 分区
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  # Flink特色
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] | AS select_query ]

关于表中的字段:physical_column就是常规列。metadata_column是元数据列,可访问到数据源本身的一些元数据,必须加METADATA关键字标识,如:读取数据写入Kafka时,Kafka引擎给数据打上的时间戳标记:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  # !!!
) WITH (
  'connector' = 'kafka'
  ...
);

自定义的列名称和 Connector 中定义 metadata 字段的名称一样时,后面的FROM省略:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA  # !!!
) WITH (
'connector' = 'kafka'
...
);

自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致时,会自动强转,因此这两个类型必须可以强转:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);

默认metadata_column列可读可写,加VIRTUAL表示只读:

CREATE TABLE MyTable (
  `timestamp` BIGINT METADATA, 
  `offset` BIGINT METADATA VIRTUAL,  # !!!!
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);

computed_column即计算列,把几列的计算结果做为新列,这在关系型SQL中一般在查询语句中完成,而不存成一个新列。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `price` DOUBLE,
  `quantity` DOUBLE,
  `cost` AS price * quanitity   # !!!
) WITH (
  'connector' = 'kafka'
  ...
);

主键的定义,只支持 not enforced:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced   # !!!
) WITH (
'connector' = 'kafka'
...
);

with子句,用于指定这个表相关的外部系统的相关配置,如Kafka:

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`name` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

like子句,即在现有表的基础上,创建另一种表:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- Add watermark definition
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- Overwrite the startup-mode
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;  # !!!!

举例:新表中的value字段加偏引号是因为value和关键字冲突了

CREATE TABLE test(
    id INT, 
    ts BIGINT, 
    vc INT
) WITH (
'connector' = 'print'
);

CREATE TABLE test1 (
    `value` STRING
)
LIKE test;

create-table-as-select,即CTAS语句,通过查询结果创建表:

CREATE TABLE my_ctas_table
WITH (
    'connector' = 'kafka',
    ...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

# 注意此时不能自己来定义列

查所有表:

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]

查某张表信息:

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

修改表名:

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

修改表属性:

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

删表:

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1178855.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PubDef:使用公共模型防御迁移攻击

对抗性攻击对机器学习系统的可靠性和安全性构成了严重威胁。通过对输入进行微小的变动&#xff0c;攻击者就可以导致模型生成完全错误的输出。防御这种攻击是一个很活跃的研究领域&#xff0c;但大多数提议的防御措施都存在重大的缺点。 这篇来自加州大学伯克利分校研究人员的…

全光谱大面积氙光灯太阳光模拟器老化测试

氙灯光源太阳光模拟器广泛应用于光解水产氢、光化学催化、二氧化碳制甲醇、光化学合成、光降解污染物、 水污染处理、生物光照,光学检测、太阳能电池研究、荧光材料测试(透射、反射、吸收) 太阳能电池特性测试&#xff0c;光热转化&#xff0c;光电材料特性测试&#xff0c;生物…

Docker Compose安装milvus向量数据库单机版-milvus基本操作

目录 安装Ubuntu 22.04 LTS在power shell启动milvus容器安装docker desktop下载yaml文件启动milvus容器Milvus管理软件Attu python连接milvus配置下载wget示例导入必要的模块和类与Milvus数据库建立连接创建名为"hello_milvus"的Milvus数据表插入数据创建索引基于向量…

docker容器技术基础入门

docker容器技术基础入门 容器(Container) 容器是一种基础工具&#xff1b;泛指任何可以用于容纳其他物品的工具&#xff0c;可以部分或完全封闭&#xff0c;被用于容纳、储存、运输物品&#xff1b;物体可以被放置在容器中&#xff0c;而容器则可以保护内容物&#xff1b;容器…

AVL树性质和实现

AVL树 AVL是两名俄罗斯数学家的名字&#xff0c;以此纪念 与二叉搜索树的区别 AVL树在二叉搜索树的基础上增加了新的限制&#xff1a;需要时刻保证每个树中每个结点的左右子树高度之差的绝对值不超过1 因此&#xff0c;当向树中插入新结点后&#xff0c;即可降低树的高度&…

数据结构:AVL树的实现和全部图解

文章目录 为什么要有AVL树什么是AVL树AVL树的实现元素的插入平衡因子的更新AVL树的旋转 AVL树的检查完整实现 本篇总结的是AVL树中的全部内容&#xff0c;配有详细的图解过程 为什么要有AVL树 前面对map/multimap/set/multiset进行了简单的介绍&#xff0c;在其文档介绍中发现…

计算机毕业设计java+springboot+vue的旅游攻略平台

项目介绍 本系统结合计算机系统的结构、概念、模型、原理、方法&#xff0c;在计算机各种优势的情况下&#xff0c;采用JAVA语言&#xff0c;结合SpringBoot框架与Vue框架以及MYSQL数据库设计并实现的。员工管理系统主要包括个人中心、用户管理、攻略管理、审核信息管理、积分…

Go 接口-契约介绍

Go 接口-契约介绍 文章目录 Go 接口-契约介绍一、接口基本介绍1.1 接口类型介绍1.2 为什么要使用接口1.3 面向接口编程1.4 接口的定义 二、空接口2.1 空接口的定义2.2 空接口的应用2.2.1 空接口作为函数的参数2.2.2 空接口作为map的值 2.3 接口类型变量2.4 类型断言 三、尽量定…

Day22力扣打卡

打卡记录 替换子串得到平衡字符串&#xff08;滑动窗口&#xff09; 链接 由于是以后统计替换的子串&#xff0c;不可以直接使用hash表统计的每个次数大于 n / 4 的字符&#xff0c;再将其次数减去平衡数来得到答案&#xff0c;根据字符串的连贯性&#xff0c;使用 滑动窗口 …

MySQL 8.0 如何修改密码安全策略!!!

目录 安全策略参数和常见等级:1.Mysql8.X常见安全策略参数指定密码的强度验证等级validate_password.policy 取值&#xff1a; 解决步骤1.登录mysql2.修改安全策略(1)语法如下:(2)修改完可以看一下&#xff1a; 3.改完密码策略&#xff0c;就可以根据自己修改的策略&#xff0c…

pytorch复现_UNet

什么是UNet U-Net由收缩路径和扩张路径组成。收缩路径是一系列卷积层和汇集层&#xff0c;其中要素地图的分辨率逐渐降低。扩展路径是一系列上采样层和卷积层&#xff0c;其中特征地图的分辨率逐渐增加。 在扩展路径中的每一步&#xff0c;来自收缩路径的对应特征地图与当前特征…

什么是分治算法?

分治算法(divide and conquer algorithm)是指把大问题分割成多个小问 题&#xff0c;然后把每个小问题分割成多个更小的问题&#xff0c;直到问题的规模小到能够 轻易解决。这种算法很适合用递归实现&#xff0c;因为把问题分割成多个与自身相 似的小问题正对应递归情况&#x…

Java —— 类和对象(一)

目录 1. 面向对象的初步认知 1.1 什么是面向对象 1.2 面向对象与面向过程 2. 类定义和使用 2.1 认识类 2.2 类的定义格式 3. 类的实例化(如何产生对象) 3.1 什么是实例化 3.2 访问对象的成员 3.3 类和对象的说明 4. this引用 4.1 为什么要有this引用 4.2 什么是this引用 4.3 th…

无线发射芯片解决方案在智能家居中的应用

随着物联网的发展&#xff0c;智能家居已经成为一个热门话题。智能家居利用无线技术来实现设备之间的互联互通&#xff0c;提供更智能、更便利的生活体验。无线发射芯片解决方案在智能家居中扮演着关键的角色&#xff0c;它们为智能家居设备之间的通信提供了稳定、高效的连接&a…

stm32f103+HC-SR04+ssd1306实现超声波测距

&#x1f64c;秋名山码民的主页 &#x1f602;oi退役选手&#xff0c;Java、大数据、单片机、IoT均有所涉猎&#xff0c;热爱技术&#xff0c;技术无罪 &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; 获取源码&#xff0c;添加WX 目录 前言HC…

【江协科技-用0.96寸OLED播放知名艺人打篮球视频】

Python进行视频图像处理&#xff0c;通过串口发送给stm32&#xff0c;stm32接收数据&#xff0c;刷新OLED进行显示。 步骤&#xff1a; 1.按照接线图连接好硬件 2.把Keil工程的代码下载到STM32中 3.运行Python代码&#xff0c;通过串口把处理后的数据发送给STM32进行显示 …

Spark 新特性+核心回顾

Spark 新特性核心 本文来自 B站 黑马程序员 - Spark教程 &#xff1a;原地址 1. 掌握Spark的Shuffle流程 1.1 Spark Shuffle Map和Reduce 在Shuffle过程中&#xff0c;提供数据的称之为Map端&#xff08;Shuffle Write&#xff09;接收数据的称之为Reduce端&#xff08;Sh…

Leetcode刷题详解——组合

1. 题目链接&#xff1a;77. 组合 2. 题目描述&#xff1a; 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;n 4, k 2 输出&#xff1a; [[2,4],[3,4],[2,3],[1,2],[1,3],[…

vue3拖拽排序——vuedraggable

文章目录 安装代码效果拖拽前拖拽时拖拽后 vue3 的拖拽排序博主用的是 vuedraggable 安装 安装 npm i vuedraggable4.1.0 --save 引用 import Draggable from vuedraggable;代码 html <van-checkbox-group v-model"dataMap.newsActionChecked"><van-cell…

LazyVim: 将 Neovim 升级为完整 IDE | 开源日报 No.67

curl/curl Stars: 31.5k License: NOASSERTION Curl 是一个命令行工具&#xff0c;用于通过 URL 语法传输数据。 核心优势和关键特点包括&#xff1a; 可在命令行中方便地进行数据传输支持多种协议 (HTTP、FTP 等)提供丰富的选项和参数来满足不同需求 kubernetes/ingress-n…