大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例

news2024/11/12 21:06:51

Flink SQL

Flink SQL 是 Apache Flink 提供的一种高层次的查询语言接口,它基于 SQL 标准,为开发者提供了处理流式数据和批处理数据的能力。Flink SQL 允许用户使用标准 SQL 查询语言在数据流和数据表上执行复杂的操作,适用于多种应用场景,如实时分析、数据流处理、机器学习等。下面是 Flink SQL 的一些重要概念和功能:

流与批统一的查询模式

Flink SQL 的一大特点是流处理和批处理的统一性。通过同一套 SQL 语法,用户可以同时处理静态数据(批处理)和动态数据(流处理)。这使得应用程序的开发更加简化,因为可以用相同的逻辑编写实时流数据处理和历史数据的查询。

动态表 (Dynamic Tables)

Flink SQL 通过动态表的概念将流数据建模为不断变化的表。这种动态表随着时间推移不断更新,数据的每个变化(插入、更新、删除)都会影响表的状态。通过动态表的概念,Flink 可以使用 SQL 查询连续的流数据,并在查询执行时获得不断更新的结果。

窗口操作 (Windowing)

在流式数据处理场景中,窗口操作非常重要。Flink SQL 提供了多种类型的窗口操作,包括:

  • 滚动窗口 (Tumbling Window):将数据按照固定长度分割成不重叠的窗口。
  • 滑动窗口 (Sliding Window):窗口之间存在重叠,数据可能被分配到多个窗口。
  • 会话窗口 (Session Window):窗口由活动间隔定义,不同的事件可能会聚合在一个窗口中。

连接操作 (Joins)

Flink SQL 支持多种连接操作:

  • 流与流的连接:允许用户将多个流结合在一起,基于时间或键进行匹配。
  • 流与表的连接:将静态表与流数据进行匹配,从而使流式数据处理能够结合历史数据或参考数据。
  • 时态表连接 (Temporal Table Join):用于将流数据与一个时态表进行连接,时态表会随着时间不断更新。

内置函数和自定义函数

Flink SQL 提供了丰富的内置函数,涵盖了字符串操作、数学运算、时间日期处理、聚合操作等。此外,Flink SQL 还支持用户自定义函数(UDF、UDTF、UDAF),用户可以根据具体需求扩展 SQL 的功能。

Table API 与 SQL API 的互操作性

Flink 提供了两种高级数据处理 API:

  • Table API:一种与关系代数类似的编程接口,支持链式调用,功能类似于 SQL。
  • SQL API:用户可以直接使用标准 SQL 语句进行数据处理。

Table API 和 SQL API 具有很高的互操作性,用户可以在同一个程序中混合使用这两者。例如,可以先用 Table API 进行表定义和部分操作,再通过 SQL 语句执行复杂的查询。

支持多种数据源和数据接收器

Flink SQL 支持连接多种数据源和数据接收器,如 Kafka、文件系统、数据库(如 MySQL、PostgreSQL)、Hive、HBase 等。通过 SQL 语法,用户可以轻松地将流数据写入这些外部系统,也可以从这些系统中读取数据进行处理。

状态管理与容错机制

Flink SQL 继承了 Flink 强大的状态管理和容错机制。在流处理任务中,Flink SQL 能够有效地处理有状态的计算,并保证在失败时自动恢复。基于 Flink 的检查点(Checkpointing)和保存点(Savepoint)机制,Flink SQL 提供了 Exactly-Once 的状态一致性保障。

实时分析与 ETL

Flink SQL 可以用于实时数据的分析与处理,常用于构建实时 ETL (Extract, Transform, Load) 流程。例如,用户可以通过 SQL 查询对从 Kafka、数据库等数据源接收到的流数据进行清洗、过滤、转换,并将结果写入到其他系统中(如 Elasticsearch、HDFS、JDBC)。

HelloWorld

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <type>pom</type>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

依赖说明:

  • flink-table-api-java-bridge_2.12:桥接器,主要负责 TableAPI 和 DataStream/DataSetAPI 的连接支持,按照语言分Java和Scala。
  • flink-table-planner-blink_2.12:计划期,是TableAPI最主要的部分,提供了运行时环境和生成程序执行计划的Planner。
  • 如果是生产环境,则已经有 planner,就只需要有bridge就可以了
  • flink-table:基础依赖

编写代码

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;


public class TableApiDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2<>("name", 10));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        // =======================
        // Table 方式
        Table table = tableEnvironment.fromDataStream(data, $("name"), $("age"));
        // 对Table的数据查询
        Table name = table.select($("name"));
        // 将数据输出到控制台
        DataStream<Tuple2<Boolean, Row>> result = tableEnvironment.toRetractStream(name, Row.class);
        result.print();
        System.out.println("=========================");
        // =======================
        // SQL 方式
        tableEnvironment.createTemporaryView("users",data, $("name"), $("age"));
        String sql = "select name from users";
        table = tableEnvironment.sqlQuery(sql);
        result = tableEnvironment.toRetractStream(table, Row.class);
        result.print();
        System.out.println("=========================");
        env.execute("TableApiDemo");
    }

}

运行代码

控制台会一直不间断的输出如下的内容:

=========================
=========================
1> (true,name)
6> (true,name)
2> (true,name)
7> (true,name)
3> (true,name)
8> (true,name)
4> (true,name)
1> (true,name)
5> (true,name)
2> (true,name)
6> (true,name)
3> (true,name)

控制台的运行结果如下所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2128060.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何模拟一个小程序项目打包的流程

一、Uni-app 执行 yarn run dev:mp-weixin后会发生什么 &#xff08;一&#xff09;准备工作 克隆项目&#xff1a;创建以 typescript 开发的工程&#xff08;如命令行创建失败&#xff0c;请直接访问 https://gitee.com/dcloud/uni-preset-vue/repository/archive/vite-ts.z…

htop、free -h对于可用内存显示不同的区别

htop中Mem包含了缓存和缓存区&#xff0c; free -h查看 used free buff/cache 上面htop显示的mem&#xff0c; 1、我看我还能用多少内存&#xff0c;看哪里 看free -h 中的free 2、buff/cache 是啥 缓存缓存区占用&#xff0c;htop显示的效果是把这个也算在一块了&#…

C# WinForm:禁用Panel容器滚动条自动移动位置的功能

1.在WinForm项目中新建一个类&#xff1a; 2.类里面的内容&#xff0c;重写Panel的这个方法 3.编译后这个控件就出现在工具箱了 4.然后用这个新Panel控件就好了 5.完事大吉。

【Python机器学习系列】建立super learner模型预测心脏疾病(案例+源码)

这是我的第353篇原创文章。 一、引言 Super learner 是 Vander Laan et al.&#xff08;2007&#xff09;提出的一种基于损失函数的组合预测的学习算法。Super learner算法基于交叉验证理论&#xff0c;通过加权的方式组合多种候选算法&#xff0c;从而构造一种最小交叉验证风…

Hadoop集群开启后使用jps命令查看发现没有NameNode、SecondaryNameNode、DataNode、NodeManager进程,缺少进程。

今天安装Hadoop集群,安装完成使用jps命令查看发现没有NameNode进程,别人jps后都有6个在跑,我就两个。看到别人的 我的👉。都看懵了。。。 处理NameNode不启动的问题 检查ip地址是否是namenode所在节点的ip。 要检查 IP 地址是否是 NameNode 所在节点的 IP 地址,你可以通…

大数据之Spark(二)

9.4.3、RDD持久化 RDD之间进行相互迭代计算&#xff08;Transformation的转换&#xff09;&#xff0c;当执行开启&#xff0c;新RDD的生成代表旧RDD消失。如果有的rdd需要重复使用就需要将rdd缓存&#xff0c;rdd.cache()或rdd.persist()。清理缓存rdd.unpersist() 缓存特点&…

Python项目虚拟环境(超详细讲解)

课 程 推 荐我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448;入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448;虚 拟 环 境 搭 建 &#xff1a;&#x1…

android 生SHH,并配置

1. ssh-keygen -t rsa -b 4096 -C "XXXXxx.com" 2. vim ~/.ssh/config 新建一个文件&#xff1a;~/.ssh/config&#xff1a;并将下列的内容放入&#xff1a; Host * HostKeyAlgorithms ssh-rsa PubkeyAcceptedKeyTypes ssh-rsa 4.得到XXX.pub去添加ssh 5.克隆

【Java】方法1_定义方法,完整格式,原理

文章目录 前言 一、方法是什么&#xff1f; 方法的完整格式 1、有返回值的函数 2、无返回值的函数 二、方法使用常见的问题三、方法在计算机中执行的原理总结 前言 学习记录方法 一、方法是什么&#xff1f; 方法是一种语法结构&#xff0c;它可以把一段代码封装成一个功能…

python绘制3d建筑

import matplotlib.pyplot as plt import numpy as np from mpl_toolkits.mplot3d.art3d import Poly3DCollection# 随机生成建筑块数据 def generate_building_blocks(num_blocks, grid_size100, height_range(5, 50), base_size_range(10, 30)):buildings []for _ in range(…

<<编码>>第 11 章 逻辑门电路--开关电路 示例

网络电路 info::操作说明 鼠标单击开关切换开合状态 primary::在线交互操作链接 https://cc.xiaogd.net/?startCircuitLinkhttps://book.xiaogd.net/code-hlchs-examples/assets/circuit/code-hlchs-ch11-01-network-circuit.txt 继电器开关电路 info::操作说明 鼠标单击开关切…

python-游戏自动化(二)(OpenCV图像运用基础)

OpenCV OpenCV简介 首先我们来了解一下&#xff0c;OpenCV是什么&#xff1f; OpenCV 是计算机视觉中经典的专用库&#xff0c;其支持多语言、跨平台&#xff0c;功能强大。 OpenCV现在支持与计算 机视觉和机器学习有关的多种算法&#xff0c;并且正在日益扩展…

基于vue框架的宠爱有佳宠物医疗管理系统4x10z(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;用户,宠物信息,医生,用户挂号,病历记录,科室信息,药物信息 开题报告内容 基于Vue框架的宠爱有佳宠物医疗管理系统开题报告 一、引言 随着现代社会生活节奏的加快&#xff0c;宠物已成为许多家庭不可或缺的一员。宠物不仅带来了欢乐与…

海康威视相机在QTcreate上的使用教程

文章目录 前言&#xff1a;基础夯实&#xff1a;效果展示&#xff1a;图片展示&#xff1a;视频展示&#xff1a; 参考的资料&#xff1a;遇到问题&#xff1a;问题1&#xff1a;int64 does not问题2&#xff1a;LNK2019配置思路(这个很重要)配置关键图片&#xff1a;配置具体过…

HyperWorks二维网格划分与单元连续性

自动网格划分 HyperWorks中为零件定义几何曲面是创建零件壳单元网格的最佳方式。HyperMesh 创建二维网格最有效的方法是使用 Automesh 面板直接在零件的表面创建网格。 Automesh 面板是 HyperMesh 重要的网格划分工具&#xff0c;通过 automesh 可实现单元尺寸、单元密度、单…

TopN问题

100亿个integer数据&#xff0c;如何找到前k个最小值。 也就是问的如何排序最快 堆排序最快 完全二叉树 堆结构其实就是一颗完全二叉树 大根堆和小根堆 大根堆&#xff1a;每一个根节点都大于它的叶子结点 小根堆&#xff1a;每一个根节点都小于它的叶子结点 通过建立大根…

【最新顶刊综述】【多模态学习】Vision + X:A Survey on Multimodal Learning in the Light of Data

VisionX&#xff1a;基于数据的多模态学习综述 论文链接 0.论文摘要和信息 摘要 摘要——我们以多感官的方式感知世界并与世界交流&#xff0c;不同的信息源由人脑的不同部分复杂地处理和解释&#xff0c;构成一个复杂但和谐统一的感知系统。为了赋予机器真正的智能&#x…

核心系统用PG了,抠脑壳的权限,搞晕了!

作者&#xff1a;IT邦德 中国DBA联盟(ACDU)成员&#xff0c;10余年DBA工作经验&#xff0c; Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主&#xff0c;全网粉丝10万 擅长主流Oracle、MySQL、PG、 高斯及Greenplum备份恢复&#xff0c; 安装迁移&#xff0c;性能优化、故障…

ThinkPHP8出租屋管理系统

有需要请加文章底部Q哦 可远程调试 ThinkPHP8出租屋管理系统 一 介绍 此出租屋管理系统基于ThinkPHP8框架开发&#xff0c;数据库mysql&#xff0c;前端Vue3&#xff0c;前后端不分离&#xff0c;系统主要角色为管理员。房租计算器&#xff0c;房东记账收租管理&#xff0c;房…

使用 Prism 框架实现导航.NET 6.0 + WPF

动动您的手指关注下公众号&#xff0c;获取更多优质文章 前言 Prism 一个开源的框架&#xff0c;专门用于开发可扩展、模块化和可测试的企业级 XAML 应用程序&#xff0c;适用于 WPF&#xff08;Windows Presentation Foundation&#xff09;和 Xamarin Forms 等平台。 Prism…