Flink Table API 和 Flink-SQL使用详解

news2024/11/18 17:37:06

Flink Table API 和 Flink-SQL使用详解

1.Table API & Flink SQL-核心概念

​ Apache Flink 有两种关系型 API 来做流批统一处理:

  • Table API

    • Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子
  • Flink SQL

    • Flink SQL 是基于 Apache Calcite 来实现的标准 SQL

​ Apache Calcite 是一种提供了标准的 SQL 语言、多种查询优化和连接各种数据源基础框架,可以让用户轻松的接入各种数据,并实现使用SQL查询。此外,Calcite 还提供了 OLAP 和流处理的查询引擎

​ Table API 和 Flink SQL 这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果

​ Table API 和 Flink SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。可在这些 API 之间或一些基于这些 API 的库之间进行切换。

​ 例如你可以先用 CEP( Flink复杂事件处理 ) 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据( Gelly是Flink的图API库 )

​ Table API 和 Flink SQL 现在还处于活跃开发阶段,没有完全实现所有的特性。不是所有的 [ Table API,SQL ] 和 [ 流,批 ] 的组合都支持

1-1.动态表和连续查询介绍

​ 动态表( Dynamic Tables ) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询( Continuous Query )。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其动态结果表,以反映其动态输入表上的更改。

需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果

流程:

  1. 将流转换为动态表( Dynamic Tables )
  2. 在动态表( Dynamic Tables ) 上计算一个连续查询( Continuous Query ),生成一个新的动态表。
  3. 生成的动态表被转换回流

1-2.在流上定义动态表

为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作

假设有如下格式的数据:

user:  VARCHAR   // 用户名
cTime: TIMESTAMP // 访问 URL 的时间
url:   VARCHAR   // 用户访问的 URL

下图显示了单击事件流(左侧)如何转换为表(右侧)。当插入更多的单击流记录时,结果表的数据将不断增长

  • 连续查询

​ 在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表,在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同

连续查询过程:

  1. 查询开始,clicks 表为空时,对应SELECT查询的数据为空
  2. 第一行数据被流入到 clicks 表时,SELECT 查询开始计算结果表。第一行数据 [ Mary,./home ] 产生后,SELECT的结果表则第一行 [ Mary, 1 ] 数据产生
  3. 第二行 [ Bob, ./cart ] 插入到 clicks 表时,查询会更新结果表并插入了一行新数据 [Bob, 1]。
  4. 第三行 [Mary, ./prod?id=1] 将产生已计算的结果行的更新,[Mary, 1] 更新成 [Mary, 2]。
  5. 第四行,当第四行数据加入 clicks 表时,查询将第三行 [Liz, 1] 插入到结果表中
  6. clicks等待新的数据流入,待新数据流入,则继续对应执行计算结果并更新右侧表

2.Flink Table API 开发介绍

2-1.常用依赖

API开发常用依赖配置:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-compress</artifactId>
    <version>1.21</version>
</dependency>

${flink.version}为变量名,在中声明, 例如:

    <properties>
        <flink.version>1.13.6</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

也可以直接写版本号

2-2.表与DataStream的混合使用

代码示例:

package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class Flink01_TableApi_test01 {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
        Table resultTable = table
                .where($("id").isEqual("sensor_1"))
                .select($("*"));
        DataStream<Row> toAppendStream = tableEnv.toAppendStream(resultTable, Row.class);
        toAppendStream.print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
/*
+I[sensor_1, 1000, 10]
+I[sensor_1, 2000, 20]
+I[sensor_1, 4000, 40]
+I[sensor_1, 5000, 50]
 */

2-3.聚合操作

代码示例:

package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class Flink01_TableApi_test02 {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);

        Table resultTable = table
                .where($("vc").isGreaterOrEqual(20))
                .groupBy($("id"))
                .aggregate($("vc").sum().as("vc_sum"))
                .select($("id"), $("vc_sum"));

        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(resultTable, Row.class);
        retractStream.print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
/*
(true,+I[sensor_1, 20])
(true,+I[sensor_2, 30])
(false,-U[sensor_1, 20])
(true,+U[sensor_1, 60])
(false,-U[sensor_1, 60])
(true,+U[sensor_1, 110])
(false,-U[sensor_2, 30])
(true,+U[sensor_2, 90])
 */

2-4.表到流的转换

​ 动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表( 没有 UPDATE 和 DELETE 修改 ),或者介于两者之间的其他表。
​ 在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:

  • Append-only流
  • Retract流
  • Upsert流

2-4-1.Append-only流

仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流

2-4-2.Retract流

​ retract 流包含两种类型的 message: add messagesretract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message,将 UPDATE 操作编码为更新先前行的 retract message 和更新最新行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程

  • 操作 - insert
    • 对应: add
  • 操作 - update
    • 对应: retract & add
  • 操作 - delete
    • 对应: retract

2-4-3.Upsert流

​ upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 Upsert流 的 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程

  • 操作 - insert
    • 对应: upsert
  • 操作 - update
    • 对应: upsert
  • 操作 - delete
    • 对应: delete

2-5.通过Connector声明读入数据

动态表直接连接到数据

2-5-1.File source

代码示例:

package com.zenitera.bigdata.flinksql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;


public class Flink01_TableApi_FileSource {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());

        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
        streamTableEnvironment.connect(new FileSystem().path("input/sensor.txt"))
                .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n"))
                .withSchema(schema)
                .createTemporaryTable("sensor");

        Table sensor = streamTableEnvironment.from("sensor");
        Table resultTable = sensor
                .groupBy($("id"))
                .select($("id"), $("id").count().as("id_count"));

        DataStream<Tuple2<Boolean, Row>> resultStream = streamTableEnvironment.toRetractStream(resultTable, Row.class);


        resultStream.print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

result:
(true,+I[sensor_1, 1])
(false,-U[sensor_1, 1])
(true,+U[sensor_1, 2])
(true,+I[sensor_2, 1])
(false,-U[sensor_1, 2])
(true,+U[sensor_1, 3])
(false,-U[sensor_2, 1])
(true,+U[sensor_2, 2])
(false,-U[sensor_2, 2])
(true,+U[sensor_2, 3])
 */

2-5-2.Kafka Source

代码示例:

package com.zenitera.bigdata.flinksql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class Flink01_TableApi_KafkaSource {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());

        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
        streamTableEnvironment
                .connect(new Kafka()
                        .version("universal")
                        .topic("sensor")
                        .startFromLatest()
                        .property("group.id", "bigdata")
                        .property("bootstrap.servers", "hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092"))
                .withFormat(new Json())
                .withSchema(schema)
                .createTemporaryTable("sensor");

        Table sensorTable = streamTableEnvironment.from("sensor");
        Table resultTable = sensorTable.groupBy($("id"))
                .select($("id"), $("id").count().as("id_count"));
        DataStream<Tuple2<Boolean, Row>> resultStream = streamTableEnvironment.toRetractStream(resultTable, Row.class);
        resultStream.print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-producer.sh --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092 --topic sensor
>{"id": "sensor_1", "ts": 1000, "vc": 10}
>{"id": "sensor_2", "ts": 2000, "vc": 20}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_2", "ts": 2000, "vc": 20}
>{"id": "sensor_2", "ts": 3000, "vc": 30}
--------------------------------------------------
(true,+I[sensor_1, 1])
(true,+I[sensor_2, 1])
(false,-U[sensor_1, 1])
(true,+U[sensor_1, 2])
(false,-U[sensor_1, 2])
(true,+U[sensor_1, 3])
(false,-U[sensor_1, 3])
(true,+U[sensor_1, 4])
(false,-U[sensor_2, 1])
(true,+U[sensor_2, 2])
(false,-U[sensor_2, 2])
(true,+U[sensor_2, 3])
 */

2-6.通过Connector声明写出数据

2-6-1.File Sink

代码示例:

package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;

public class Flink01_TableApi_FileSink {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> waterSensorStream =
                env.fromElements(
                        new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment TableEnvironment = StreamTableEnvironment.create(env);
        Table table = TableEnvironment.fromDataStream(waterSensorStream);
        Table ResultTable = table
                .where($("id").isEqual("sensor_1"))
                .select($("id"), $("ts"), $("vc"));

        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());

        TableEnvironment
                .connect(new FileSystem().path("output/sensor_0407.csv"))
                .withFormat(new Csv())
                .withSchema(schema)
                .createTemporaryTable("sensor");

        ResultTable.executeInsert("sensor");

    }
}

/*
output/sensor_0407.csv
sensor_1,1000,10
sensor_1,2000,20
sensor_1,4000,40
sensor_1,5000,50
 */

2-6-2.Kafka Sink

代码示例:

package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;


public class Flink01_TableApi_KafkaSink {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table sensorTable = tableEnv.fromDataStream(waterSensorDataStreamSource);
        Table resultTable = sensorTable
                .where($("id").isEqual("sensor_1"))
                .select($("*"));

        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());

        tableEnv
                .connect(
                        new Kafka()
                                .version("universal")
                                .topic("sink_sensor")
                                .sinkPartitionerRoundRobin()
                                .property("bootstrap.servers", "hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092"))
                .withFormat(new Json())
                .withSchema(schema)
                .createTemporaryTable("sensor");

        resultTable.executeInsert("sensor");


    }
}

/*
[wangting@hdt-dmcp-ops01 bin]$ consumer sink_sensor
{"id":"sensor_1","ts":1000,"vc":10}
{"id":"sensor_1","ts":2000,"vc":20}
{"id":"sensor_1","ts":4000,"vc":40}
{"id":"sensor_1","ts":5000,"vc":50}
 */

3.Flink SQL 开发介绍

3-1.Flink SQL基本使用

3-1-1.使用sql查询未注册的表

代码示例:

package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 使用sql查询未注册的表
 */
public class Flink02_flinksql_test01 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
        Table resultTable = tableEnv.sqlQuery("select * from " + table + " where id = 'sensor_1'");
        tableEnv
                .toRetractStream(resultTable, Row.class)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
/*
(true,+I[sensor_1, 1000, 10])
(true,+I[sensor_1, 2000, 20])
(true,+I[sensor_1, 4000, 40])
(true,+I[sensor_1, 5000, 50])
 */

3-1-2.使用sql查询已注册的表

代码示例:

package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 使用sql查询已注册的表
 */
public class Flink02_flinksql_test02 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
        // 注册为一个临时视图
        tableEnv.createTemporaryView("sensor", table);
        Table resultTable = tableEnv.sqlQuery("select * from sensor where id = 'sensor_1'");
        tableEnv
                .toRetractStream(resultTable, Row.class)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
/*
(true,+I[sensor_1, 1000, 10])
(true,+I[sensor_1, 2000, 20])
(true,+I[sensor_1, 4000, 40])
(true,+I[sensor_1, 5000, 50])
 */

3-2.Flink-SQL从Kafka读数据并写入Kafka

代码示例:

package com.zenitera.bigdata.flinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink02_flinksql_KafkaToKafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv
                .executeSql("create table source_sensor (id string, ts bigint, vc int) with("
                        + "'connector' = 'kafka',"
                        + "'topic' = 'topic_source_sensor',"
                        + "'properties.bootstrap.servers' = 'hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092',"
                        + "'properties.group.id' = 'wangting',"
                        + "'scan.startup.mode' = 'latest-offset',"
                        + "'format' = 'json'"
                        + ")");
        tableEnv
                .executeSql("create table sink_sensor(id string, ts bigint, vc int) with("
                        + "'connector' = 'kafka',"
                        + "'topic' = 'topic_sink_sensor',"
                        + "'properties.bootstrap.servers' = 'hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092',"
                        + "'format' = 'json'"
                        + ")");

        tableEnv
                .executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");

    }
}

/*
在topic_source_sensor主题中使用producer生产者输入消息
[wangting@hdt-dmcp-ops01 bin]$ producer topic_source_sensor
>{"id": "sensor_1", "ts": 1000, "vc": 10}
>{"id": "sensor_2", "ts": 2000, "vc": 20}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_2", "ts": 4000, "vc": 40}
>{"id": "sensor_1", "ts": 5000, "vc": 50}
>{"id": "sensor_2", "ts": 6000, "vc": 60}
>
---------------------------------------------------
在topic_sink_sensor主题中使用consumer消费者消费消息
[wangting@hdt-dmcp-ops02 ~]$ consumer topic_sink_sensor
{"id":"sensor_1","ts":1000,"vc":10}
{"id":"sensor_1","ts":3000,"vc":30}
{"id":"sensor_1","ts":5000,"vc":50}

 */

4. Table API & Flink SQL-时间属性

​ 像窗口(在 Table API 和 Flink SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作

时间属性主要分为:

  • 处理时间
  • 事件时间

4-1.处理时间 - proctime()

代码示例:

package com.zenitera.bigdata.flinksql;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


/**
 * 处理时间
 * 声明一个额外的字段来作为处理时间字段
 */
public class Flink03_TimeAttributes_01 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt as PROCTIME()) with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor.txt'," +
                "'format' = 'csv'" +
                ")");
        TableResult tableResult = tableEnv.executeSql("select * from sensor");
        tableResult.print();
    }
}

/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
--------------------------------
+----+--------------------------------+----------------------+-------------+-------------------------+
| op |                             id |                   ts |          vc |                      pt |
+----+--------------------------------+----------------------+-------------+-------------------------+
| +I |                       sensor_1 |                    1 |          10 | 2023-04-11 17:13:53.840 |
| +I |                       sensor_1 |                    2 |          20 | 2023-04-11 17:13:53.842 |
| +I |                       sensor_2 |                    4 |          30 | 2023-04-11 17:13:53.842 |
| +I |                       sensor_1 |                    4 |         400 | 2023-04-11 17:13:53.842 |
| +I |                       sensor_2 |                    5 |          50 | 2023-04-11 17:13:53.842 |
| +I |                       sensor_2 |                    6 |          60 | 2023-04-11 17:13:53.842 |
+----+--------------------------------+----------------------+-------------+-------------------------+
 */

4-2.事件时间 - rowtime()

​ 事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
​ 除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
​ 为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

​ 事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

  • 在 schema 的结尾追加一个新的字段

  • 替换一个已经存在的字段

    不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳

代码示例:

package com.zenitera.bigdata.flinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 事件时间 rowtime()
 * 声明一个额外的字段来作为事件时间字段
 */
public class Flink03_TimeAttributes_02 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int, " +
                "rowtime as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for rowtime as rowtime - interval '5' second)" +
                "with("
                + "'connector' = 'filesystem',"
                + "'path' = 'input/sensor.txt',"
                + "'format' = 'csv'"
                + ")");

        tableEnv.sqlQuery("select * from sensor").execute().print();
    }
}

/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
--------------------------------
+----+--------------------------------+----------------------+-------------+-------------------------+
| op |                             id |                   ts |          vc |                 rowtime |
+----+--------------------------------+----------------------+-------------+-------------------------+
| +I |                       sensor_1 |                    1 |          10 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_1 |                    2 |          20 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    4 |          30 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_1 |                    4 |         400 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    5 |          50 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    6 |          60 | 1970-01-01 08:00:00.000 |
+----+--------------------------------+----------------------+-------------+-------------------------+
 */

5.Table API & Flink SQL-窗口-window

在Table API和Flink SQL中,主要有两种窗口:

  • Group Windows
  • Over Windows

5-1. Table API 中 使用window窗口

5-1-1. Table API - Group Windows

​ 分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。
​ Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。

  • Table API - Group Windows - 滚动窗口
package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

/**
 * Table API - Group Windows - 滚动窗口
 * Tumble
 */
public class Flink04_TableApi_GroupWindow01 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> waterSensorDataStreamSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60))

                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorDataStreamSource, $("id"), $("ts").rowtime(), $("vc"));

        table
                .window(Tumble.over(lit(10).second()).on($("ts")).as("timestamp"))
                .groupBy($("id"), $("timestamp"))
                .select($("id"), $("timestamp").start().as("start_time"), $("timestamp").end().as("end_time"), $("vc").sum().as("vc_sum"))
                .execute()
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
+----+--------------------------------+-------------------------+-------------------------+-------------+
| op |                             id |              start_time |                end_time |      vc_sum |
+----+--------------------------------+-------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1970-01-01 00:00:00.000 | 1970-01-01 00:00:10.000 |         120 |
| +I |                       sensor_2 | 1970-01-01 00:00:00.000 | 1970-01-01 00:00:10.000 |          90 |
+----+--------------------------------+-------------------------+-------------------------+-------------+
 */
  • Table API - Group Windows - 滑动窗口
package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

/**
 * Table API - Group Windows - 滑动窗口
 * Slide
 */
public class Flink04_TableApi_GroupWindow02 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> waterSensorDataStreamSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60))

                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorDataStreamSource, $("id"), $("ts").rowtime(), $("vc"));

        table
                .window(Slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("timestamp"))
                .groupBy($("id"), $("timestamp"))
                .select($("id"), $("timestamp").start().as("start_time"), $("timestamp").end().as("end_time"), $("vc").sum().as("vc_sum"))
                .execute()
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
/*
+----+--------------------------------+-------------------------+-------------------------+-------------+
| op |                             id |              start_time |                end_time |      vc_sum |
+----+--------------------------------+-------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1969-12-31 23:59:55.000 | 1970-01-01 00:00:05.000 |          70 |
| +I |                       sensor_2 | 1969-12-31 23:59:55.000 | 1970-01-01 00:00:05.000 |          30 |
| +I |                       sensor_1 | 1970-01-01 00:00:00.000 | 1970-01-01 00:00:10.000 |         120 |
| +I |                       sensor_2 | 1970-01-01 00:00:00.000 | 1970-01-01 00:00:10.000 |          90 |
| +I |                       sensor_2 | 1970-01-01 00:00:05.000 | 1970-01-01 00:00:15.000 |          60 |
| +I |                       sensor_1 | 1970-01-01 00:00:05.000 | 1970-01-01 00:00:15.000 |          50 |
+----+--------------------------------+-------------------------+-------------------------+-------------+
 */
  • Table API - Group Windows - 会话窗口
package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Session;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

/**
 * Table API - Group Windows - 会话窗口
 * Session
 */
public class Flink04_TableApi_GroupWindow03 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> waterSensorDataStreamSource = env.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60))

                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorDataStreamSource, $("id"), $("ts").rowtime(), $("vc"));

        table
                .window(Session.withGap(lit(6).second()).on($("ts")).as("timestamp"))
                .groupBy($("id"), $("timestamp"))
                .select($("id"), $("timestamp").start().as("start_time"), $("timestamp").end().as("end_time"), $("vc").sum().as("vc_sum"))
                .execute()
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
/*
+----+--------------------------------+-------------------------+-------------------------+-------------+
| op |                             id |              start_time |                end_time |      vc_sum |
+----+--------------------------------+-------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1970-01-01 00:00:01.000 | 1970-01-01 00:00:11.000 |         120 |
| +I |                       sensor_2 | 1970-01-01 00:00:03.000 | 1970-01-01 00:00:12.000 |          90 |
+----+--------------------------------+-------------------------+-------------------------+-------------+
 */

5-1-2. Table API - Over Windows

​ Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。
​ Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
​ 无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。

  • Table API - Over Windows - Unbounded Over Windows
package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.UNBOUNDED_ROW;

/**
 * Table API - Over Windows - Unbounded Over Windows
 */
public class Flink05_TableApi_OverWindow01 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> waterSensorStreamSource = env
                .fromElements(
                        new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorStreamSource, $("id"), $("ts").rowtime(), $("vc"));
        table
                .window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w"))
                .select($("id"), $("ts"), $("vc").sum().over($("w")).as("vc_sum"))
                .execute()
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
+----+--------------------------------+-------------------------+-------------+
| op |                             id |                      ts |      vc_sum |
+----+--------------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1970-01-01 00:00:01.000 |          10 |
| +I |                       sensor_1 | 1970-01-01 00:00:02.000 |          30 |
| +I |                       sensor_1 | 1970-01-01 00:00:04.000 |          70 |
| +I |                       sensor_1 | 1970-01-01 00:00:05.000 |         120 |
| +I |                       sensor_2 | 1970-01-01 00:00:03.000 |          30 |
| +I |                       sensor_2 | 1970-01-01 00:00:06.000 |          90 |
+----+--------------------------------+-------------------------+-------------+
 */
  • Table API - Over Windows - Bounded Over Windows
package com.zenitera.bigdata.flinksql;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.*;

/**
 * Table API - Over Windows - Bounded Over Windows
 * 向前推一个时间单位得到窗口  lit(2).second()
 */
public class Flink05_TableApi_OverWindow02 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> waterSensorStreamSource = env
                .fromElements(
                        new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(waterSensorStreamSource, $("id"), $("ts").rowtime(), $("vc"));
        table
                .window(Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w"))
                .select($("id"), $("ts"), $("vc").sum().over($("w")).as("vc_sum"))
                .execute()
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
+----+--------------------------------+-------------------------+-------------+
| op |                             id |                      ts |      vc_sum |
+----+--------------------------------+-------------------------+-------------+
| +I |                       sensor_1 | 1970-01-01 00:00:01.000 |          10 |
| +I |                       sensor_1 | 1970-01-01 00:00:02.000 |          30 |
| +I |                       sensor_2 | 1970-01-01 00:00:03.000 |          30 |
| +I |                       sensor_1 | 1970-01-01 00:00:04.000 |          70 |
| +I |                       sensor_1 | 1970-01-01 00:00:05.000 |         110 |
| +I |                       sensor_2 | 1970-01-01 00:00:06.000 |          90 |
+----+--------------------------------+-------------------------+-------------+
 */

5-2. Flink SQL API 中 使用window窗口

5-2-1.Flink SQL - Group Windows

  • TUMBLE(time_attr, interval)

​ 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上

  • HOP(time_attr, interval, interval)

​ 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上

  • SESSION(time_attr, interval)

​ 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。

Flink SQL - Group Windows - 滚动窗口 代码示例

package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Flink SQL - Group Windows
 * 滚动窗口 - TUMBLE(time_attr, interval)
 */
public class Flink06_FlinkSQL_GroupWindow01 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int," +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor.txt'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.sqlQuery("SELECT " +
                " id, " +
                " TUMBLE_START(t, INTERVAL '1' minute) as wStart, " +
                " TUMBLE_END(t, INTERVAL '1' minute) as wEnd, " +
                " SUM(vc) sum_vc " +
                " from sensor " +
                " GROUP BY TUMBLE(t, INTERVAL '1' minute), id ")
                .execute()
                .print();

    }
}
/*
+----+-------------+-------------------------+-------------------------+-------------+
| op |          id |                  wStart |                    wEnd |      sum_vc |
+----+-------------+-------------------------+-------------------------+-------------+
| +I |    sensor_1 | 1970-01-01 08:00:00.000 | 1970-01-01 08:01:00.000 |         430 |
| +I |    sensor_2 | 1970-01-01 08:00:00.000 | 1970-01-01 08:01:00.000 |         140 |
+----+-------------+-------------------------+-------------------------+-------------+
 */

Flink SQL - Group Windows - 滑动窗口 代码示例

package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Flink SQL - Group Windows
 * 滑动窗口 - HOP(time_attr, interval, interval)
 */
public class Flink06_FlinkSQL_GroupWindow02 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int," +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor.txt'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.sqlQuery("SELECT " +
                " id, " +
                "  hop_start(t, INTERVAL '10' minute, INTERVAL '1' hour) as WatermarkStart,  " +
                "  hop_end(t, INTERVAL '10' minute, INTERVAL '1' hour) as WatermarkEnd,  " +
                " SUM(vc) vc_sum_value " +
                " from sensor " +
                "GROUP BY hop(t, INTERVAL '10' minute, INTERVAL '1' hour), id")
                .execute()
                .print();

    }
}
/*
+----+--------------+-------------------------+-------------------------+--------------+
| op |           id |          WatermarkStart |            WatermarkEnd | vc_sum_value |
+----+--------------+-------------------------+-------------------------+--------------+
| +I |     sensor_1 | 1970-01-01 07:10:00.000 | 1970-01-01 08:10:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:10:00.000 | 1970-01-01 08:10:00.000 |          140 |
| +I |     sensor_2 | 1970-01-01 07:20:00.000 | 1970-01-01 08:20:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:20:00.000 | 1970-01-01 08:20:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:30:00.000 | 1970-01-01 08:30:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:30:00.000 | 1970-01-01 08:30:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:40:00.000 | 1970-01-01 08:40:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:40:00.000 | 1970-01-01 08:40:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:50:00.000 | 1970-01-01 08:50:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:50:00.000 | 1970-01-01 08:50:00.000 |          430 |
| +I |     sensor_1 | 1970-01-01 08:00:00.000 | 1970-01-01 09:00:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 08:00:00.000 | 1970-01-01 09:00:00.000 |          140 |
+----+--------------+-------------------------+-------------------------+--------------+
 */

5-2-2.Flink SQL - Over Windows

sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row)

package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink07_FlinkSQL_OverWindow01 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int," +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor.txt'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.sqlQuery("select " +
                "id," +
                "vc," +
                "sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row) vc_sum_value " +
                "from sensor")
                .execute()
                .print();

    }
}
/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
#####################################################
+----+---------------+-------------+--------------+
| op |            id |          vc | vc_sum_value |
+----+---------------+-------------+--------------+
| +I |      sensor_1 |          10 |           10 |
| +I |      sensor_1 |          20 |           30 |
| +I |      sensor_1 |         400 |          420 |
| +I |      sensor_2 |          30 |           30 |
| +I |      sensor_2 |          50 |           80 |
| +I |      sensor_2 |          60 |          110 |
+----+---------------+-------------+--------------+
 */

window w as (partition by id order by t rows between 1 PRECEDING and current row)

package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink07_FlinkSQL_OverWindow02 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int," +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor.txt'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.sqlQuery("select " +
                "id," +
                "vc," +
                "count(vc) over w vc_count_value, " +
                "sum(vc) over w vc_sum_value " +
                "from sensor " +
                "window w as (partition by id order by t rows between 1 PRECEDING and current row)")
                .execute()
                .print();

    }
}
/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
#####################################################
+----+---------------+-------------+----------------------+--------------+
| op |            id |          vc |       vc_count_value | vc_sum_value |
+----+---------------+-------------+----------------------+--------------+
| +I |      sensor_1 |          10 |                    1 |           10 |
| +I |      sensor_1 |          20 |                    2 |           30 |
| +I |      sensor_1 |         400 |                    2 |          420 |
| +I |      sensor_2 |          30 |                    1 |           30 |
| +I |      sensor_2 |          50 |                    2 |           80 |
| +I |      sensor_2 |          60 |                    2 |          110 |
+----+---------------+-------------+----------------------+--------------+
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/445033.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023-04-21 学习记录--C/C++-实现升序降序(冒泡法/沉底法)

合抱之木&#xff0c;生于毫末&#xff1b;九层之台&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; 一、冒泡法(沉底法) —— 升序 ⭐️ &#xff08;一&#xff09;、思路 从左到右&#xff1a; 1、第一个与第二个比较&#xff0…

Ceph入门到精通-Ceph之对象存储网关RADOS Gateway(RGW)

一、Ceph整体架构及RGW在Ceph中的位置 1.Ceph的整体架构 Ceph是一个统一的、分布式的的存储系统&#xff0c;具有优秀的性能、可靠性和可扩展性。Ceph支持对象存储&#xff08;RADOSGW&#xff09;、块存储&#xff08;RBD&#xff09;和文件存储&#xff08;CephFS&#xff…

asp.net+C#医院人事办公自动化OA系统设计

3.3.2 普通用户 普通用户只能查看自己的信息&#xff0c;修改登录密码&#xff0c;查看通知公告信息&#xff0c;公文发送信息&#xff0c;下载办公文件&#xff0c;邮件发送接收&#xff0c;会议记录信息管理&#xff0c;留言交流等功能。办公OA系统主要的功能是实现员工资料的…

【OS实验】【学习笔记】

文章目录 零、实验参考实验1 熟悉实验环境实验2 操作系统的引导实验3 系统调用实验4 进程运行轨迹的跟踪与统计实验5 基于内核栈切换的进程切换实验6 信号量的实现和应用实验7 地址映射与共享实验8 终端设备的控制实验9 proc文件系统的实现Reference 零、实验参考 &#x1f52…

SpingBoot使用Mybatis-Plus操作多数据源,同时操作sqlserver和mysql

目录 需求场景 需求逻辑&#xff1a; 难点&#xff1a; 说明&#xff1a; 代码 pom.xml依赖只贴sqlserver的 文件目录 yml配置文件 DataSource自定义注解 DataSourceAspect类文件 DruidConfig类 DruidProperties类 DynamicDataSource DynamicDataSourceContextHo…

03-java数组的使用

概念 数组就是存储数据长度固定的容器&#xff0c;存储多个数据的数据类型要一致。 数组的定义格式 // 第一种格式 // 数据类型[] 数组名 int[] arr; double[] arr; char[] arr;// 第二种格式 // 数据类型 数组名[] int arr[]; double arr[]; char arr[];数组…

记录解决Maven依赖冲突导致的NoSuchMethodError问题的过程

摘要 本文记录了解决 Maven 依赖冲突导致的 NoSuchMethodError 问题的过程。问题出现的原因是多个库包含了 Jackson 库&#xff0c;导致 Jackson 序列化与反序列化时出现 NoSuchMethodError 异常。通过查看依赖树&#xff0c;排除冲突库的方法&#xff0c;最终成功解决了该问题…

在职读研理论结合实践,社科院与杜兰大学金融管理硕士助你完成质的飞跃

我们知道&#xff0c;学习不能停留在理论层面&#xff0c;要用于实践才能真正的消化吸收。学习的目的在于运用&#xff0c;实践是检验学习成果的练兵场。社科院与杜兰大学金融管理硕士项目的课程中美授课教师在项目管理委员会的指导下&#xff0c;负责制订金融管理硕士教学方案…

【工程化】之5分钟发布一个Npm包

NPM是一个包管理器&#xff0c; 为js开发人员提供可以在项目中使用的模块&#xff0c;业界有很多流行的开源库&#xff0c;如Lodash&#xff0c;在我们内部也免不了通过对能力的封装打包&#xff0c;快速复用到其他地方去&#xff0c;使用NPM包很简单。您只需使用NPM安装包&…

SHELL中for循环和IF判断的使用

1。编写脚本for1.sh,使用for循环创建20账户&#xff0c;账户名前缀由用户从键盘输入&#xff0c;账户初始密码由用户输入&#xff0c;例如: test1、test2、test3、.....、 test10 2.编写脚本for2.sh,使用for循环,通过ping命令测试网段的主机连通性&#xff0c;网段前3段由用户输…

stable- diffusion新版本V2效果有提升吗?

之前版本是最初版本&#xff0c;不是V2&#xff0c;那么这里就新版本V2进行系列试验&#xff0c;如下&#xff0c;附代码及link 1、text2img from diffusers import StableDiffusionPipeline, DPMSolverMultistepSchedulermodel_id "stabilityai/stable-diffusion-2-1&…

在android项目上集成libyuv库以及使用libyuv库完成camera的缩放,旋转,翻转,裁剪操作

目录 一、下拉google官方的libyuv库代码 二、在android项目中集成libyuv库 1.环境配置 2.拷贝libyuv源码文件 ​编辑3.配置cmake libyuv相关的链接编译等 三、使用libyuv库 1.libyuv库完成camera的旋转 2.libyuv库实现翻转 3.libyuv库实现缩放 4.libyuv库实现裁剪 一…

九龙证券|多巨头竞相布局这个热门赛道,机构一致看好的概念股

华为高阶智能驾驭体系ADS 2.0版本发布。 早前&#xff0c;华为在2023华为智能轿车解决方案发布会上&#xff0c;发布了高阶智能驾驭体系 ADS 2.0。新体系将由 AITO 问界 M5 高阶智能驾驭版首发&#xff0c;并已适配阿维塔 11 全系列以及极狐阿尔法 S 全新 Hi 版等车型。 最近&…

JDBC(java数据库连接)—MySQL

文章目录 1.定义2.优势3.环境准备4.使用jdbc&#xff08;java程序操作数据库&#xff09;5.使用jdbc的操作步骤 1.定义 jdbc是一种用于执行SQL语句的java api&#xff0c;它是java中的数据库连接规范&#xff0c;为java开发人员操作数据库提供了一个标准的api&#xff0c;可以…

【Linux】系统文件接口

目录 一、C文件接口 1、fopen 2、fprintf 3、snprintf 二、系统文件IO 1、open 2、write 3、read 4、C文件接口与系统文件IO的关系 一、C文件接口 1、fopen FILE *fopen(const char *path, const char *mode); fopen 函数返回值类型为 FILE 。参数列表中&#xff0c…

优加DaaS背后,看见新的营销潮

DaaS、融合开放&#xff0c;这是京东云优加对外传递出来的两个最清晰的声音。前者对应的是能力和边界&#xff0c;后者对应的是态度和打法。两者结合&#xff0c;恰构成了京东云优加&#xff0c;或者说京东在营销侧的未来想象力。 作者|皮爷 出品|产业家 “今年我们有接近60%以…

第五章 工厂模式

文章目录 一、简单工厂模式1、传统方式实现披萨订购( 可以忽略)披萨父类 Pizza子类胡椒披萨 PepperPizza子类印度披萨 GreekPizza订购披萨 OrderPizza订购披萨的 客户端 PizzaStore运行结果传统的方式的优缺点&#xff0c;新增子类需要修改的地方牵扯太多传统方式的究极耦合 2、…

YOLOv8 更换骨干网络之 MobileNetV3

论文地址:https://arxiv.org/abs/1905.02244 代码地址:https://github.com/xiaolai-sqlai/mobilenetv3 我们展示了基于互补搜索技术和新颖架构设计相结合的下一代 MobileNets。MobileNetV3通过结合硬件感知网络架构搜索(NAS)和 NetAdapt算法对移动设计如何协同工作,利用互…

BswM模块之Ecu State Handling

文章目录 前言一、ESH是什么&#xff1f;二、基于BswM管理的ECU状态切换流程1.ECU启动2.ECU关闭 总结 前言 BswM – 基础软件模式管理模块&#xff0c; 它的职责是基于简单规则的BSW模块仲裁来自应用层sw - c或其他模块的模式请求&#xff0c;并根据仲裁结果进行相应的操作。 …

QT QPainter 绘制基本图形元件简介

1.基本图形元件 QPainter 提供了很多绘制基本图形的功能&#xff0c;包括点、直线、椭圆、矩形、曲线等&#xff0c;由这些基本的图形可以构成复杂的图形。QPainter 中提供的绘制基本图元的函数如下表所示。每个函数基本上都有多种参数形式&#xff0c;这里只列出函数名&#x…