【Flink SQL】Flink SQL 基础概念:SQL 的时间属性

news2024/11/18 6:45:59

Flink SQL 基础概念:SQL 的时间属性

  • 1.Flink 三种时间属性简介
  • 2.Flink 三种时间属性的应用场景
    • 2.1 事件时间案例
    • 2.2 处理时间案例
    • 2.3 摄入时间案例
  • 3.SQL 指定时间属性的两种方式
  • 4.SQL 事件时间案例
  • 5.SQL 处理时间案例

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间事件时间摄入时间 三种时间语义。

三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。

1.Flink 三种时间属性简介

  • 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
  • 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()),在生产环境中用的次多。
  • 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴们要注意到:

  • 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
  • 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

2.Flink 三种时间属性的应用场景

讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

  • 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
  • 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

2.1 事件时间案例

还是以之前的 clicks 表拿来举例。

在这里插入图片描述
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)

上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.2 处理时间案例

还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。

那么这种触发机制就是处理时间。

2.3 摄入时间案例

在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

3.SQL 指定时间属性的两种方式

如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

4.SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊,如下。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- 1. 这个 ts 就是常见的毫秒级别时间戳
  ts BIGINT,
  -- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型
  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  -- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  • DataStream 中指定事件时间。

之前介绍了 TableDataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 分配 watermark
        DataStream<Row> r = env.addSource(new UserDefinedSource())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {
                    @Override
                    public long extractTimestamp(Row element) {
                        return (long) element.getField("f2");
                    }
                });
        // 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳
        Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");

        tEnv.createTemporaryView("source_table", sourceTable);

        // 3. 在 tumble window 中使用 f2
        String tumbleWindowSql =
                "SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
                + "FROM source_table\n"
                + "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)"
                ;

        Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

        tEnv.toDataStream(resultTable, Row.class).print();

        env.execute();
    }


    private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Row> sourceContext) throws Exception {

            int i = 0;

            while (!this.isCancel) {

                sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

                Thread.sleep(10L);
                i++;
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }

        @Override
        public TypeInformation<Row> getProducedType() {
            return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
                    TypeInformation.of(Long.class));
        }
    }
}

5.SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- 使用下面这句来将 user_action_time 声明为处理时间
  user_action_time AS PROCTIME()
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  • DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 分配 watermark
        DataStream<Row> r = env.addSource(new UserDefinedSource());

        // 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳
        Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");

        tEnv.createTemporaryView("source_table", sourceTable);

        // 3. 在 tumble window 中使用 f2
        String tumbleWindowSql =
                "SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
                + "FROM source_table\n"
                + "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)"
                ;

        Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

        tEnv.toDataStream(resultTable, Row.class).print();

        env.execute();
    }


    private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Row> sourceContext) throws Exception {

            int i = 0;

            while (!this.isCancel) {

                sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

                Thread.sleep(10L);
                i++;
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }

        @Override
        public TypeInformation<Row> getProducedType() {
            return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
                    TypeInformation.of(Long.class));
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1516698.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

rviz上不显示机器人模型(模型只有白色)

文档中的是base_footprint&#xff0c;需要根据自己所设的坐标系更改&#xff0c;我的改为base_link 如何查看自己设的坐标系&#xff1a; 这些parent父坐标系就是 同时打开rviz后需要更改成base_link

初识Python语言-课堂练习【pyhton123题库】

初识Python语言-课堂练习【pyhton123题库】 一、单项选择题 1、Guido van Rossum正式对外发布Python版本的年份是&#xff1a; A 2008B 1998C 1991D 2002 【答案】C 【解析】暂无解析2、下面不是Python语言特点的是&#xff1a;‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪‬‪…

新上线的coze知识库好用吗?看完你就知道了

近年来&#xff0c;知识库工具逐渐成为企业管理和个人工作的有力辅助&#xff0c;而其中&#xff0c;新上线的coze就引起了广泛关注。相对于其他同类产品&#xff0c;coze有一些显著优势。首先&#xff0c;它简洁优雅的用户界面人性化布局&#xff0c;给用户提供了一流的使用体…

c++指针的定义和使用

1、定义一个指针 int a10; //定义指针的语法&#xff1a;数据类型 * 指针变量名&#xff1a;int * p&#xff1b; //让指针记录变量a的地址&#xff1a;p &a; int a 10;int* p; p &a; cout << "a的地址为&#xff1a;" << &a <<…

python中的常用模块

os和sys模块 os和sys是Python标准库中两个非常重要的模块&#xff0c;它们提供了丰富的方法来与Python解释器以及操作系统交互。 os模块 os模块提供了许多函数&#xff0c;用于处理文件和目录等操作系统任务&#xff0c;如路径管理、执行命令、获取进程信息等。 常用方法&a…

【话题】2024年AI辅助研发趋势,有那些应用领域

大家好&#xff0c;我是全栈小5&#xff0c;欢迎阅读文章&#xff01; 此篇是【话题达人】系列文章&#xff0c;这一次的话题是《2024年AI辅助研发趋势》 目录 背景概念实践医药领域汽车设计领域展望未来文章推荐 背景 随着人工智能技术的持续发展与突破&#xff0c;2024年AI辅…

如何实现接口

类实现接口 用关键字implements声明自己实现一个或 多个接口 实现 多个接口&#xff0c;用 逗号分隔开 重写接口中的方法 要求&#xff1a;类实现某个接口&#xff0c;类必须重写该接口的所有方法。 重写规则&#xff1a; 去掉public abstact修饰符 给出方法体&#xff08;具…

2024Python二级

1. 2. 前序遍历首先访问根节点再访问左子树和右子树 3. 4. sub不属于保留字 5. 6. 7. 8. continue是再重新开始进行循环&#xff0c;不是题目中所规定字母的话就对它进行输出 9. Python没有主函数的说法 10. 未转化为数据所要求的形式&#xff0c;应首先考虑eval 11. l…

力扣日记3.14-【贪心算法篇】376. 摆动序列

力扣日记&#xff1a;【贪心算法篇】376. 摆动序列 日期&#xff1a;2024.3.14 参考&#xff1a;代码随想录、力扣 376. 摆动序列 题目描述 难度&#xff1a;中等 如果连续数字之间的差严格地在正数和负数之间交替&#xff0c;则数字序列称为 摆动序列 。第一个差&#xff08;…

【动态规划】代码随想录算法训练营第五十七天 |647. 回文子串, 516.最长回文子序列,动态规划总结篇 (待补充)

647. 回文子串 1、题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 2、文章讲解&#xff1a;代码随想录 3、题目&#xff1a; 给定一个字符串&#xff0c;你的任务是计算这个字符串中有多少个回文子串。 具有不同开始位置或结束位置的子串&#xff0c;即使…

html--彩虹马

文章目录 htmljscss 效果 html <!DOCTYPE html> <html lang"en" > <head> <meta charset"UTF-8"> <title>Rainbow Space Unicorn</title> <link rel"stylesheet" href"css/style.css"> &l…

基于智慧灯杆的智慧城市解决方案(2)

功能规划 智慧照明功能 智慧路灯的基本功能仍然是道路照明, 因此对照明功能的智慧化提升是最基本的一项要求。 对道路照明管理进行智慧化提升, 实施智慧照明, 必然将成为智慧城市中道路照明发展的主要方向之一。 智慧照明是集计算机网络技术、 通信技术、 控制技术、 数据…

Kubernetes弃用Dockershim,转向Containerd:影响及如何应对

Kubernetes1.24版本发布时&#xff0c;正式宣布弃用Dockershim&#xff0c;转向Containerd作为默认的容器运行环境。Kubernetes以CRI(Container Runtime Interface)容器运行时接口制定接入准则&#xff0c;用户可以使用Containerd、CRI-O、CRI- Dockerd及其他容器运行时作为Kub…

在分布式环境中使用状态机支持数据的一致性

简介 在本文中&#xff0c;我们将介绍如何在分布式系统中使用transaction以及分布式系统中transaction的局限性。然后我们通过一个具体的例子&#xff0c;介绍了一种通过设计状态机来避免使用transaction的方法。 什么是数据库transaction Transaction是关系型数据普遍支持的…

如何利用WebRTC构建点对点的即时通讯工具

在当今竞争激烈的商业环境中&#xff0c;企业越来越需要构建自己的即时通讯工具来提升内部沟通效率和信息安全&#xff0c;减少第三方工具依赖带来的潜在风险&#xff0c;并能与自身的行业业务深入融合。 拥有专用的通讯平台能够加快信息的流动&#xff0c;提升工作协同和任务执…

【C语言】【时间复杂度】Leetcode 153. 寻找旋转排序数组中的最小值

文章目录 题目时间复杂度概念时间复杂度的计算 解题思路代码呈现 题目 链接: link 时间复杂度 概念 时间复杂度是一种函数&#xff0c;定量地描述了该算法运行的时间。既然是一种函数&#xff0c;就涉及到自变量与因变量。因变量代表是时间复杂的规模&#xff0c;自变量是…

HTTP代理的特性、功能作用是什么样的?

在当今互联网时代&#xff0c;HTTP代理作为网络通信中的一项重要技术&#xff0c;在各行各业都有着广泛的应用。然而&#xff0c;对于许多人来说&#xff0c;HTTP代理的特性和功能作用并不十分清晰。在本文中&#xff0c;我们将深入探讨HTTP代理的各种特性和功能&#xff0c;帮…

Linux系统Docker部署Plik系统结合内网穿透实现公网访问本地文件

文章目录 1. Docker部署Plik2. 本地访问Plik3. Linux安装Cpolar4. 配置Plik公网地址5. 远程访问Plik6. 固定Plik公网地址7. 固定地址访问Plik 本文介绍如何使用Linux docker方式快速安装Plik并且结合Cpolar内网穿透工具实现远程访问&#xff0c;实现随时随地在任意设备上传或者…

jenkins+maven+gitlab自动化构建打包、部署

Jenkins自动化部署实现原理 环境准备 1、jenkins已经安装好 docker安装jenkins 2、gitlab已经安装好 docker安装gitlab 一、Jenkins系统配置 1.Global Tool Configuration 任务构建所用到的编译环境等配置&#xff0c;配置参考&#xff1a; jdk配置&#xff08;jenkins自带…

C# ListView 控件使用

1.基本设置 listView1.Columns.Add("序号", 60); //向 listView1控件中添加1列 同时设置列名称和宽度listView1.Columns.Add("温度", 100); //下同listView1.Columns.Add("偏移", 100);listView1.Columns.Add("分割", 50);listView1…