FlinkSQL的常用语言

news2025/4/15 20:09:10

FlinkSQL 常用语言指南

FlinkSQL 是 Apache Flink 提供的 SQL 接口,允许用户使用标准 SQL 或扩展的 SQL 语法来处理流式和批式数据。以下是 FlinkSQL 的常用语言元素和操作:

  1. 基本查询
-- 选择查询
SELECT * FROM table_name;

-- 带条件的查询
SELECT column1, column2 FROM table_name WHERE condition;

-- 分组聚合
SELECT user_id, COUNT(*) as cnt 
FROM orders 
GROUP BY user_id;
  1. 时间属性定义
-- 定义处理时间
CREATE TABLE orders (
    order_id STRING,
    product STRING,
    amount DOUBLE,
    order_time TIMESTAMP(3),
    -- 声明处理时间属性
    proc_time AS PROCTIME()
) WITH (...);

-- 定义事件时间和水位线
CREATE TABLE orders (
    order_id STRING,
    product STRING,
    amount DOUBLE,
    order_time TIMESTAMP(3),
    -- 声明事件时间属性
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
  1. 窗口操作
-- 滚动窗口
SELECT 
    window_start, 
    window_end, 
    SUM(amount) as total_amount
FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;

-- 滑动窗口
SELECT 
    window_start, 
    window_end, 
    user_id,
    SUM(amount) as total_amount
FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTES, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, user_id;

-- 会话窗口
SELECT 
    window_start, 
    window_end, 
    user_id,
    COUNT(*) as event_count
FROM TABLE(
    SESSION(TABLE orders, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end, user_id;
  1. 连接操作
-- 常规连接
SELECT o.order_id, o.product, u.user_name
FROM orders AS o
JOIN users AS u ON o.user_id = u.user_id;

-- 时间区间连接
SELECT 
    o.order_id, 
    p.promotion_name,
    o.order_time,
    o.amount
FROM orders o
JOIN promotions p 
ON o.product_id = p.product_id
AND o.order_time BETWEEN p.start_time AND p.end_time;

-- 窗口连接
SELECT 
    o.order_id,
    s.shipment_id,
    o.order_time,
    s.ship_time,
    TIMESTAMPDIFF(HOUR, o.order_time, s.ship_time) as hours_to_ship
FROM orders o
JOIN shipments s
ON o.order_id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '1' HOUR AND s.ship_time;
  1. 常用函数

标量函数

-- 字符串函数
SELECT LOWER(name), SUBSTRING(email, 1, 5) FROM users;

-- 数学函数
SELECT ABS(amount), ROUND(price, 2) FROM products;

-- 时间函数
SELECT 
    DATE_FORMAT(order_time, 'yyyy-MM-dd'),
    TIMESTAMPDIFF(DAY, order_time, CURRENT_TIMESTAMP)
FROM orders;

聚合函数

SELECT 
    COUNT(*) as total_orders,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount,
    MAX(amount) as max_amount,
    MIN(amount) as min_amount
FROM orders;

窗口函数

SELECT 
    product_id,
    order_time,
    amount,
    ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY order_time) as row_num,
    SUM(amount) OVER (PARTITION BY product_id ORDER BY order_time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as moving_sum
FROM orders;
  1. DDL 语句
-- 创建表
CREATE TABLE orders (
    order_id STRING,
    product_id STRING,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- 创建视图
CREATE VIEW large_orders AS
SELECT * FROM orders WHERE amount > 1000;

-- 创建函数
CREATE FUNCTION my_udf AS 'com.example.MyUDF';
  1. DML 语句
-- 插入数据
INSERT INTO target_table
SELECT * FROM source_table WHERE amount > 100;

-- 更新数据 (Flink 1.12+ 支持有限)
UPDATE orders SET amount = 200 WHERE order_id = '123';

-- 删除数据 (Flink 1.12+ 支持有限)
DELETE FROM orders WHERE order_id = '456';
  1. 模式匹配 (MATCH_RECOGNIZE)
SELECT *
FROM orders
MATCH_RECOGNIZE (
    PARTITION BY user_id
    ORDER BY order_time
    MEASURES
        START_ROW.order_id AS start_order,
        LAST(PRICE_DOWN.order_id) AS bottom_order,
        LAST(PRICE_UP.order_id) AS end_order
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO LAST PRICE_UP
    PATTERN (START_ROW PRICE_DOWN+ PRICE_UP+)
    DEFINE
        PRICE_DOWN AS (LAST(PRICE_DOWN.amount, 1) IS NULL AND PRICE_DOWN.amount < START_ROW.amount) 
            OR PRICE_DOWN.amount < LAST(PRICE_DOWN.amount, 1),
        PRICE_UP AS PRICE_UP.amount > LAST(PRICE_DOWN.amount, 1)
) MR;
  1. 配置参数
-- 设置参数
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '1000';
  1. 常用连接器配置
-- Kafka 源表
CREATE TABLE kafka_source (
    id INT,
    name STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'input_topic',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

-- JDBC 结果表
CREATE TABLE jdbc_sink (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/mydb',
    'table-name' = 'sink_table',
    'username' = 'user',
    'password' = 'password'
);

FlinkSQL 不断演进,这里只是举例一些常用的语句,参考官方文档可以获取最新语法和功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2334498.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DeepSeek轻松入门教程——从入门到精通

大家好&#xff0c;我是吾鳴。 今天吾鳴要给大家分享一份DeepSeek小白轻松入门指导手册——《DeepSeek 15天指导手册&#xff0c;从入门到精通》。指导手册分为基础入门对话篇、效率飞跃篇、场景实战篇、高手进化篇等&#xff0c;按照指导手册操作&#xff0c;DeepSeek从入门到…

Vue2 老项目升级 Vue3 深度解析教程

Vue2 老项目升级 Vue3 深度解析教程 摘要 Vue3 带来了诸多改进和新特性&#xff0c;如性能提升、组合式 API、更好的 TypeScript 支持等&#xff0c;将 Vue2 老项目升级到 Vue3 可以让项目获得这些优势。本文将深入解析升级过程&#xff0c;涵盖升级前的准备工作、具体升级步骤…

WXJ196微机小电流接地选线装置使用简单方便无需维护

WXJ196微机小电流接地选线装置&#xff0c;能在系统发生单相接地时&#xff0c;准确、迅速地选出接地线路母 线。使用简单方便&#xff0c;无需维护&#xff0c;可根据用户需要将相关信息通过通信接口传给上级监控系统&#xff0c; 适用于无人值守变电站。 2 功能及特点 全新的…

Java第四节:idea在debug模式夏改变变量的值

作者往期文章 Java第一节&#xff1a;debug如何调试程序&#xff08;附带源代码&#xff09;-CSDN博客 Java第二节&#xff1a;debug如何调试栈帧链&#xff08;附带源代码&#xff09;-CSDN博客 Java第三节&#xff1a;新手如何用idea创建java项目-CSDN博客 步骤一 在需要修改…

门极驱动器DRV8353M设计(二)

目录 13.3.4.4 MOSFET VDS 感测 (SPI Only) 13.3.5 Gate Driver保护回路 13.3.5.1 VM 电源和 VDRAIN 欠压锁定 (UVLO) 13.3.5.2 VCP 电荷泵和 VGLS 稳压器欠压锁定 (GDUV) 13.3.5.3 MOSFET VDS过流保护 (VDS_OCP) 13.3.5.3.1 VDS Latched Shutdown (OCP_MODE 00b) 13.…

学点概率论,打破认识误区

概率论是统计分析和机器学习的核心。掌握概率论对于理解和开发稳健的模型至关重要&#xff0c;因为数据科学家需要掌握概率论。本博客将带您了解概率论中的关键概念&#xff0c;从集合论的基础知识到高级贝叶斯推理&#xff0c;并提供详细的解释和实际示例。 目录 简介 基本集合…

NVIDIA AI Aerial

NVIDIA AI Aerial 适用于无线研发的 NVIDIA AI Aerial 基础模组Aerial CUDA 加速 RANAerial Omniverse 数字孪生Aerial AI 无线电框架 用例构建商业 5G 网络加速 5G生成式 AI 和 5G 数据中心 加速 6G 研究基于云的工具 优势100% 软件定义通过部署在数字孪生中进行测试6G 标准化…

OpenCV 关键点定位

一、Opencv关键点定位介绍 关键点定位在计算机视觉领域占据着核心地位&#xff0c;它能够精准识别图像里物体的关键特征点。OpenCV 作为功能强大的计算机视觉库&#xff0c;提供了多种实用的关键点定位方法。本文将详细阐述关键点定位的基本原理&#xff0c;深入探讨 OpenCV 中…

arm_math.h、arm_const_structs.h 和 arm_common_tables.h

在 ​​FOC&#xff08;Field-Oriented Control&#xff0c;磁场定向控制&#xff09;​​ 中&#xff0c;arm_math.h、arm_const_structs.h 和 arm_common_tables.h 是 CMSIS-DSP 库的核心组件&#xff0c;用于实现高效的数学运算、预定义结构和查表操作。以下是它们在 FOC 控…

buuctf sql注入类练习

BUU SQL COURSE 1 1 实例无法访问 / Instance cant be reached at that time | BUUCTF但是这个地方很迷惑就是这个 一个 # 我们不抓包就不知道这个是sql注入类的判断是 get 类型的sql注入直接使用sqlmap我们放入到1.txt中 目的是 优先检测 ?id1>python3 sqlmap.py -r 1.t…

具身导航中的视觉语言注意力蒸馏!Vi-LAD:实现动态环境中的社会意识机器人导航

作者&#xff1a;Mohamed Elnoor 1 ^{1} 1, Kasun Weerakoon 1 ^{1} 1, Gershom Seneviratne 1 ^{1} 1, Jing Liang 2 ^{2} 2, Vignesh Rajagopal 3 ^{3} 3, and Dinesh Manocha 1 , 2 ^{1,2} 1,2单位&#xff1a; 1 ^{1} 1马里兰大学帕克分校电气与计算机工程系&#xff0c; 2…

全局前置守卫与购物车页面鉴权

在很多应用里&#xff0c;并非所有页面都能随意访问。例如购物车页面&#xff0c;用户需先登录才能查看。这时可以利用全局前置守卫来实现这一鉴权功能。 全局前置守卫的书写位置在 router/index.js 文件中&#xff0c;在创建 router 对象之后&#xff0c;暴露 router 对象之前…

layui 弹窗-调整窗口的缩放拖拽几次就看不到标题、被遮挡了怎么解决

拖拽几次&#xff0c;调整窗口的缩放&#xff0c;就出现了弹出的页面&#xff0c;右上角叉号调不出来了&#xff0c;窗口关不掉 废话不多说直入主题: 在使用layer.alert layer.confirm layer.msg 等等弹窗时&#xff0c;发现看不到弹窗&#xff0c;然后通过控制台检查代码发现…

网络空间安全(57)K8s安全加固

一、升级K8s版本和组件 原因&#xff1a;K8s新版本通常会引入一系列安全功能&#xff0c;提供关键的安全补丁&#xff0c;能够补救已知的安全风险&#xff0c;减少攻击面。 操作&#xff1a;将K8s部署更新到最新稳定版本&#xff0c;并使用到达stable状态的API。 二、启用RBAC&…

2025蓝桥杯C++A组省赛 题解

昨天打完蓝桥杯本来想写个 p y t h o n python python A A A 组的题解&#xff0c;结果被队友截胡了。今天上课把 C A CA CA 组的题看了&#xff0c;感觉挺简单的&#xff0c;所以来水一篇题解。 这场 B B B 是一个爆搜&#xff0c; C C C 利用取余的性质比较好写&#…

论文学习:《通过基于元学习的图变换探索冷启动场景下的药物-靶标相互作用预测》

原文标题&#xff1a;Exploring drug-target interaction prediction on cold-start scenarios via meta-learning-based graph transformer 原文链接&#xff1a;https://www.sciencedirect.com/science/article/pii/S1046202324002470 药物-靶点相互作用&#xff08;DTI&…

十八、TCP多线程、多进程并发服务器

1、TCP多线程并发服务器 服务端&#xff1a; #include<stdio.h> #include <arpa/inet.h> #include<stdlib.h> #include<string.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <pthread.h>…

AIGC-文生图与图生图

在之前的文章中&#xff0c;我们知道了如何通过Web UI和Confy UI两种SD工具来进行图片生成&#xff0c;今天进一步地讲解其中的参数用处及如何调节。 文生图 参数详解 所谓文生图&#xff0c;就是通过文字描述我们想要图片包含的内容。初学的话&#xff0c;还是以Web UI为例…

量化交易 - 聚宽joinquant - 多因子入门研究 - 源码开源

先看一下我们的收益&#xff1a; JoinQuant直达这里看看 下面讲解原理和代码。 目录 一、是否为st 二、是否停牌 三、市值小、roe大 四、编写回测代码 今天来研究一下多因子回测模型&#xff0c;这里以‘市值’、‘roe’作为例子。 几个标准&#xff1a;沪深300里选股&am…

FPGA 37 ,FPGA千兆以太网设计实战:RGMII接口时序实现全解析( RGMII接口时序设计,RGMII~GMII,GMII~RGMII 接口转换 )

目录 前言 一、设计流程 1.1 需求理解 1.2 模块划分 1.3 测试验证 二、模块分工 2.1 RGMII→GMII&#xff08;接收方向&#xff0c;rgmii_rx 模块&#xff09; 2.2 GMII→RGMII&#xff08;发送方向&#xff0c;rgmii_tx 模块&#xff09; 三、代码实现 3.1 顶层模块 …