玩转数据-大数据-Flink SQL 中的时间属性

news2025/1/13 13:51:25

一、说明

时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。

二、处理时间

2.1、准备WaterSensor类,方便使用

package com.lyh.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;
}

2.2、DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它新增一个字段。
代码段:

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

public class Flink_Sql_Proctime {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> waterSensorStream =
                env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60));
// 1. 创建表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段
        Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());
        sensorTable.execute().print();
    }
}

执行结果:
在这里插入图片描述

2.3、创建数据文件sensor.txt 数据,方便使用

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

2.4、在创建表的 DDL 中定义

package com.lyh.flink12;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_Sql_ddl_Procetime {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("
                + "'connector' = 'filesystem',"
                + "'path' = 'input/sensor.txt',"
                + "'format' = 'csv'"
                + ")");
        Table table = tableEnv.sqlQuery("select * from sensor");
        table.execute().print();
    }
}

运行结果:
在这里插入图片描述

三、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

3.1、DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
1、在 schema 的结尾追加一个新的字段
2、替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
代码:
援用上面WaterSensor类

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class Flink_Sql_EventTime {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 100),
                new WaterSensor("sensor_1", 1000L, 100),
                new WaterSensor("sensor_2", 1000L, 200),
                new WaterSensor("sensor_2", 1000L, 200)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner((element, recordtime) -> element.getTs()));
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime())
                .execute().print();

    }
}

运行结果:
在这里插入图片描述

3.2、使用已有的字段作为时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

3.3、在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.

package com.lyh.flink12;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_Sql_ddl_EventTime {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
         tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int, " +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with("
                + "'connector' = 'filesystem',"
                + "'path' = 'input/sensor.txt',"
                + "'format' = 'csv'"
                + ")");
        tableEnv.sqlQuery("select * from sensor")
                .execute().print();
    }
}

运行结果:
在这里插入图片描述
说明:
1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1052853.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信号类型(雷达)——脉冲雷达(四)

系列文章目录 《信号类型&#xff08;雷达通信&#xff09;》 《信号类型&#xff08;雷达&#xff09;——雷达波形认识&#xff08;一&#xff09;》 《信号类型&#xff08;雷达&#xff09;——连续波雷达&#xff08;二&#xff09;》 《信号类型&#xff08;雷达&…

浏览器输入 URL 并回车发生了什么

本文节选自我的博客&#xff1a;浏览器输入 URL 并回车发生了什么 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是MilesChen&#xff0c;偏前端的全栈开发者。&#x1f4dd; CSDN主页&#xff1a;爱吃糖的猫&#x1f525;&#x1f4e3; 我的博客&#xff1a;爱吃糖…

建站软件WordPress和phpcms体验

一、网站程序 什么是网站程序? 网站程序是由程序员编写的一个网站安装包,程序是网站内容的载体。 常见的网站程序有: dedecms , phpcms ,帝国cms ,米拓cms , WordPress , discuz , ECShop ,shopex , z-blog等,根据不同类型的网站我们来选择不同的网站程序。 比如说搭建一个…

格拉姆角场GAF将时序数据转换为图像,可以应用于故障诊断等多个领域

效果 2.代码(这里用随机生成的数据来模拟一维振动信号,利用格拉姆角场GAF将时序数据转换为图像,并划分为训练集和测试集,最后利用SVM分类) # -*- coding: utf-8 -*- """ Created on Sat Sep 30 21:35:36 2023@author: pony """import nump…

5自由度雄克机械臂仿真描点

5自由度雄克机械臂仿真描点 任务 建立雄克机械臂的坐标系和D-H参数表&#xff0c;使用Matlab机器人工具箱&#xff08;Robotics Toolbox&#xff09;&#xff0c;用机械臂末端执行器触碰8个红色的目标点。 代码 %% 机器人学 format compact close all clear clc%% DH参数 L…

算法基础课第二部分

算法基础课 第四讲 数学知识AcWing1381. 阶乘(同余&#xff0c;因式分解) 质数AcWing 866. 质数的判定---试除法AcWing 868. 质数的判定---埃氏筛AcWing867. 分解质因数---试除法AcWing 197. 阶乘---分解质因数---埃式筛 约数AcWing 869. 求约数---试除法AcWing 870. 约数个数-…

JUnit介绍

JUnit是用于编写和运行可重复的自动化测试的开源测试框架&#xff0c; 这样可以保证我们的代码按预期工作。JUnit可广泛用于工业和作为支架(从命令行)或IDE(如Eclipse)内单独的Java程序。 JUnit提供&#xff1a; 断言测试预期结果。 测试功能共享通用的测试数据。 测试套件轻…

【Django 笔记】第一个demo

1. pip 安装 2. django 指令 D:\software\python3\anconda3\Lib\site-packages\django\bin>django-adminType django-admin help <subcommand> for help on a specific subcommand.Available subcommands:[django]checkcompilemessagescreatecachetabledbshelldiff…

【C++学习】多态

目录 一、多态的概念 1. 概念 二、多态的定义及实现 2.1 多态的构成条件 2.2 虚函数 2.3 虚函数的重写 2.4 C11 override 和 final 2.5 重载、覆盖(重写)、隐藏(重定义)的对比 三、抽象类 3.1 概念 3.2 接口继承和实现继承 四、多态的原理 4.1 虚函数表 4.2 多态的…

【JavaEE初阶】 计算机是如何工作的

文章目录 &#x1f332;计算机发展史&#x1f38b;冯诺依曼体系&#xff08;Von Neumann Architecture&#xff09;&#x1f38d;CPU 基本工作流程&#x1f4cc;逻辑门&#x1f388;电子开关 —— 机械继电器(Mechanical Relay)&#x1f388;门电路(Gate Circuit)NOT GATE&…

排序算法之【快速排序】

&#x1f4d9;作者简介&#xff1a; 清水加冰&#xff0c;目前大二在读&#xff0c;正在学习C/C、Python、操作系统、数据库等。 &#x1f4d8;相关专栏&#xff1a;C语言初阶、C语言进阶、C语言刷题训练营、数据结构刷题训练营、有感兴趣的可以看一看。 欢迎点赞 &#x1f44d…

关于RabbitMQ你了解多少?

关于RabbitMQ你了解多少&#xff1f; 文章目录 关于RabbitMQ你了解多少&#xff1f;基础篇同步和异步MQ技术选型介绍和安装数据隔离SpringAMQP快速入门Work queues交换机Fanout交换机Direct交换机Topic交换机 声明队列和交换机MQ消息转换器 高级篇消息可靠性问题发送者的可靠性…

泡泡玛特城市乐园即将开园 解锁“文化+科技”潮流空间

近年来&#xff0c;泡泡玛特以潮玩IP为核心&#xff0c;不断拓展业务版图&#xff0c;推进国际化布局同时实现集团化运营&#xff0c;而泡泡玛特首个城市乐园即将开业。 据了解&#xff0c;泡泡玛特城市乐园是由泡泡玛特精心打造的沉浸式IP主题乐园&#xff0c;占地约4万平方米…

四、浏览器渲染过程,DOM,CSSDOM,渲染,布局,绘制详细介绍

知识点&#xff1a; 1、为什么不能先执行 js文件&#xff1f;&#xff1f; 我们不能先执行JS文件&#xff0c;必须等到CSSOM构建完成了才能执行JS文件&#xff0c;因为前面已经说过渲染树是需要DOM和CSSOM构建完成了以后才能构建&#xff0c;而且JS是可以操控CSS样式的&#…

【Java 进阶篇】深入理解 JDBC:Java 数据库连接详解

数据库是现代应用程序的核心组成部分之一。无论是 Web 应用、移动应用还是桌面应用&#xff0c;几乎都需要与数据库交互以存储和检索数据。Java 提供了一种强大的方式来实现与数据库的交互&#xff0c;即 JDBC&#xff08;Java 数据库连接&#xff09;。本文将深入探讨 JDBC 的…

Linux系统编程(七):线程同步

参考引用 UNIX 环境高级编程 (第3版)黑马程序员-Linux 系统编程 1. 同步概念 所谓同步&#xff0c;即同时起步、协调一致。不同的对象&#xff0c;对 “同步” 的理解方式略有不同 设备同步&#xff0c;是指在两个设备之间规定一个共同的时间参考数据库同步&#xff0c;是指让…

从 低信噪比陆上地震记录 解决办法收集 到 走时层析反演中的折射层析调研

目录 (前言1) 关于背景的回答:(前言2) 现有的降低噪声, 提高信噪比的一些特有方法的论文资料 (传统策略):1. 关于波形反演与走时层析反演2. 折射层析3. 用一个合成数据来解释折射层析反演的思路4. 其他层析反演方法:5. 关于层析反演的一些TIPS (可补充)参考文献: 降噪有关资料参…

ElementUI之CUD+表单验证

目录 前言&#xff1a; 增删改查 表单验证 前言&#xff1a; 继上篇博客来写我们的增删改以及表单验证 增删改查 首先先定义接口 数据样式&#xff0c;我们可以去elementUI官网去copy我们喜欢的样式 <!-- 编辑窗体 --><el-dialog :title"title" :visib…

国庆《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书行将售罄

国庆《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书行将售罄 国庆《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书行将售罄

leetcode-----二叉树习题

目录 前言 1. 二叉树的中序遍历 2. 相同的树 3. 二叉树的最大深度 4. 二叉树的最小深度 5.二叉树的前序遍历 6. 二叉树的后序遍历 7. 对称二叉树 前言 前面我们学习过了二叉树的相关知识点&#xff0c;那么今天我们就做做练习&#xff0c;下面我会介绍几道关于二叉树的…