大数据之Flink(五)

news2024/9/17 2:17:48

15、Flink SQL

15.1、sql-client准备
  1. 启用Hadoop集群(在Hadoop100上)

    start-all.sh
    
  2. 启用yarn-session模式

    /export/soft/flink-1.13.0/bin/yarn-session.sh -d
    
  3. 启动sql-client

     bin/sql-client.sh embedded -s yarn-session
    

在这里插入图片描述
sql文件初始化

可以初始化模式、环境(流/批)、并行度、ttl、数据库

  1. 创建文件,可在文件中编写sql语句完成建表初始化

    vim conf/sql-client-init.sql
    
  2. 启动

    bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql 
    
15.2、流处理的表
普通MYSQL流处理SQL
处理的数据对象有界集合无线序列
查询访问可以查询完整的数据无法访问到所有数据,持续等待输入
查询终止条件生成固定大小结果即终止永不停止,不断更新查询结果
15.2.1、动态表和持续查询
  1. 动态表

    当流中有数据来,初始的表会插入一行;基于这个表的查询应更新查询结果。这样得到的表会动态变化,即动态表

  2. 持续查询

    对动态表的查询永不停止
    在这里插入图片描述

15.2.2、流转动态表

每来一条数据向表中插入一条数据
在这里插入图片描述

15.2.3、SQL持续查询
  1. 更新查询

    随着数据不断到来,查询的结果需要不断更新,更新查询得到的结果表如要转成DataStream,必须用toChangelogStream()方法

  2. 追加查询

    开窗后的查询结果不会再变,只会随着窗口推移不断追加

15.2.4、动态表转流

动态表转流需要对更改操作编码,tableAPI和SQL支持三种编码方式:

  1. 仅追加流

    流中的发出的数据就是动态表新增的每一行,多在开窗条件下

  2. 撤回流

    撤回流调用toChangelogStream(),包含添加消息和撤回消息

    insert为add消息,delete为retract消息,update为被更改行的retract消息和更新后行的add消息,输出结果会膨胀

    在这里插入图片描述

  3. 更新插入流

    包含两种类型消息:更新插入消息和删除消息

    insert和update统一编码为upsert

    在这里插入图片描述

动态表转流只支持仅追加流和撤回流,连接外部系统才支持更新插入流

4、查询限制

在实际应用中,有些持续查询会因为计算代价太高而受到限制。所谓的“代价太高”,可能是由于需要维护的状态持续增长,也可能是由于更新数据的计算太复杂。

  • 状态大小

    用持续查询做流处理,往往会运行至少几周到几个月;所以持续查询处理的数据总量可能非常大。例如我们之前举的更新查询的例子,需要记录每个用户访问url 的次数。如果随着时间的推移用户数越来越大,那么要维护的状态也将逐渐增长,最终可能会耗尽存储空间导致查询失败

  • 更新计算

    对于有些查询来说,更新计算的复杂度可能很高。每来一条新的数据,更新结果的时候可能需要全部重新计算,并且对很多已经输出的行进行更新。一个典型的例子就是 RANK()函数, 它会基于一组数据计算当前值的排名。例如下面的 SQL 查询,会根据用户最后一次点击的时间为每个用户计算一个排名。当我们收到一个新的数据,用户的最后一次点击时间(lastAction) 就会更新,进而所有用户必须重新排序计算一个新的排名。当一个用户的排名发生改变时,被他超过的那些用户的排名也会改变;这样的更新操作无疑代价巨大,而且还会随着用户的增多越来越严重

15.3、DDL数据定义
15.3.1、数据库
  1. 建库

    create database db_flink;
    
  2. 查询

    show databases;
    
  3. 切换数据库

    use mydatabase;
    
15.3.2、表
  1. 建表

    使用kafka的元数据建表

    create table MyTable(
    	'user_id' string,
        'name' string,
        'record time' timestamp_ltz(3) metadata from 'timestamp'
    ) with (
    	'connector'='kafka'
    );
    

    其他现用现查

  2. 示例

    查看数据库

     show databases;
    

    切换数据库

    use mydatabase;
    

    建表

    create table test(id int,ts bigint,vc int) with ('connnector'='print');
    

    查看表

    show tables;
    

    使用like建表

     create table test1(name string) like test;
    

    查看表信息

    desc test1;
    
15.4、TableAPI
15.41、简单测试

引入依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

创建测试类

package table;

import java.sql.Timestamp;

/**
 * @Title: Event
 * @Author lizhe
 * @Package table
 * @Date 2024/6/17 19:01
 * @description:
 */
public class Event {
    public String user; public String url; public Long timestamp;

    public Event() {
    }

    public Event(String user, String url, Long timestamp) { this.user = user;
        this.url = url; this.timestamp = timestamp;
    }

    @Override
    public String toString() { return "Event{" +
            "user='" + user + '\'' +
            ", url='" + url + '\'' +
            ", timestamp=" + new Timestamp(timestamp) + '}';
    }
}

模拟数据生成

package table;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.awt.*;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;

/**
 * @Title: ClickSource
 * @Author lizhe
 * @Package table
 * @Date 2024/6/17 13:50
 * @description:
 */
public class ClickSource implements SourceFunction<Event> {
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();	// 在指定的数据集中随机选取数据
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls =	{"./home",	"./cart",	"./fav",	"./prod?id=1","./prod?id=2"};
        while (running) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = Calendar.getInstance().getTimeInMillis();
            ctx.collect(new Event(
                    user,
                    url,
                    timestamp
        ));
        // 隔 1 秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }
    }




    @Override
    public void cancel() {
        running = false;

    }
}

测试用例

package table;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;


/**
 * @Title: SimpleTableDemo
 * @Author lizhe
 * @Package table
 * @Date 2024/6/17 19:38
 * @description:
 */
public class SimpleTableDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        SingleOutputStreamOperator<Event> eventDS = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        Table eventTable = tEnv.fromDataStream(eventDS);
        //使用TableAPI
        Table result1 = eventTable.select($("user"))
                .where($("user").isEqual("Alice"));
        //使用SQL
        Table result2 = tEnv.sqlQuery("select user,url from " + eventTable);
        tEnv.toDataStream(result1).print("result1");
        tEnv.toDataStream(result2).print("result2");
        env.execute();


    }
}



15.4.2、创建环境
  1. 创建表环境
  2. 创建输入表,连接外部系统读取数据
  3. 注册一个表,连接到外部系统用于输出
  4. 执行SQL对表进行查询转换得到新表(或者使用TableAPI对表进行查询转换得到新表)。
  5. 将结果写入输出表

TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:

  • 在内部的 catalog 中注册 Table
  • 注册外部的 catalog
  • 加载可插拔模块
  • 执行 SQL 查询
  • 注册自定义函数 (scalar、table 或 aggregation)
  • DataStreamTable 之间的转换(面向 StreamTableEnvironment )

Table 总是与特定的 TableEnvironment 绑定。 TableEnvironment 可以通过静态方法 TableEnvironment.create() 创建。

package table;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Title: CommonAPI
 * @Author lizhe
 * @Package table
 * @Date 2024/6/19 21:20
 * @description:
 */
public class CommonAPITest {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //1、定义环境配置来创建表执行环境
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
    }
}

15.4.3、创建表

创建表方式:通过连接器创建和虚拟表创建

连接器表:通过连接器连接到外部系统,并定义出对应的表结构。

虚拟表:在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。 从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。

如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享。为了方便地查询表,表环境中会维护一个目录(Catalog)和表的对应关系。所以表都是通过目录(Catalog)来进行注册创建的。表在环境中有一个唯一的ID,由三部分组成:目录(catalog)名.数据库(database)名.表名。

测试数据input/clicks.txt

Mary, ./home,1000
Bob, ./cart,2000
Alice, ./prod?id=100,3000
Bob, ./home,3214
Bob, ./cart,2000
Bob, ./home,321
Bob, ./cart,532
Bob, ./home,2000
Bob, ./cart,43356
Bob, ./home,2000
Bob, ./cart,76533

代码

package table;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Title: CommonAPI
 * @Author lizhe
 * @Package table
 * @Date 2024/6/19 21:20
 * @description:
 */
public class CommonAPITest {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //1、定义环境配置来创建表执行环境
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
        //创建表
        String createDDl="create table clickTable(" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT " +
                ")with (" +
                "'connector'='filesystem'," +
                "'path'='input/clicks.txt'," +
                "'format'='csv')";
        tableEnvironment.executeSql(createDDl);
        //调用TableAPI进行表的查询转换
        Table clickTable = tableEnvironment.from("clickTable");
        Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
                .select($("user_name"), $("url"));
        tableEnvironment.createTemporaryView("result2",resultTable);
        //执行sql进行表的查询转换
        Table resultTable2 = tableEnvironment.sqlQuery("select user_name,url from result2");
        //创建一张用于输出的表
        String createOutDDl="create table clickOutTable(" +
                "user_name STRING," +
                "url STRING" +
//                "ts BIGINT " +
                ")with (" +
                "'connector'='filesystem'," +
                "'path'='output'," +
                "'format'='csv')";
        tableEnvironment.executeSql(createOutDDl);
        //输出表
        resultTable.executeInsert("clickOutTable");

    }
}

输出output
在这里插入图片描述
单个文件内容

Bob," ./home"
Bob," ./cart"

使用控制台输出

package table;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Title: CommonAPI
 * @Author lizhe
 * @Package table
 * @Date 2024/6/19 21:20
 * @description:
 */
public class CommonAPITest {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //1、定义环境配置来创建表执行环境
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
        //创建表
        String createDDl="create table clickTable(" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT " +
                ")with (" +
                "'connector'='filesystem'," +
                "'path'='input/clicks.txt'," +
                "'format'='csv')";
        tableEnvironment.executeSql(createDDl);
        //调用TableAPI进行表的查询转换
        Table clickTable = tableEnvironment.from("clickTable");
        Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
                .select($("user_name"), $("url"));
        tableEnvironment.createTemporaryView("result2",resultTable);
        //执行sql进行表的查询转换
        Table resultTable2 = tableEnvironment.sqlQuery("select user_name,url from result2");

       
        //创建一张用于输出的表
        String createOutDDl="create table clickOutTable(" +
                "user_name STRING," +
                "url STRING" +
//                "ts BIGINT " +
                ")with (" +
                "'connector'='filesystem'," +
                "'path'='output'," +
                "'format'='csv')";
        tableEnvironment.executeSql(createOutDDl);

        //创建一张用于控制台打印的输出表
        String createPrintOutDDl="create table printOutTable(" +
                "user_name STRING," +
                "url STRING" +
//                "ts BIGINT " +
                ")with (" +
                "'connector'='print')";
        tableEnvironment.executeSql(createPrintOutDDl);

        //输出表
//        resultTable.executeInsert("clickOutTable");
        resultTable2.executeInsert("printOutTable");

    }
}

聚合函数

package table;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Title: CommonAPI
 * @Author lizhe
 * @Package table
 * @Date 2024/6/19 21:20
 * @description:
 */
public class CommonAPITest {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //1、定义环境配置来创建表执行环境
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
        //创建表
        String createDDl="create table clickTable(" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT " +
                ")with (" +
                "'connector'='filesystem'," +
                "'path'='input/clicks.txt'," +
                "'format'='csv')";
        tableEnvironment.executeSql(createDDl);
        //调用TableAPI进行表的查询转换
        Table clickTable = tableEnvironment.from("clickTable");
        Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
                .select($("user_name"), $("url"));
        tableEnvironment.createTemporaryView("result2",resultTable);
        //执行sql进行表的查询转换
        Table resultTable2 = tableEnvironment.sqlQuery("select user_name,url from result2");

        //执行聚合计算的查询转换
        Table aggRes = tableEnvironment.sqlQuery("select user_name ,count(url) as cnt from clickTable group by user_name");

        //创建一张用于输出的表
        String createOutDDl="create table clickOutTable(" +
                "user_name STRING," +
                "url STRING" +
//                "ts BIGINT " +
                ")with (" +
                "'connector'='filesystem'," +
                "'path'='output'," +
                "'format'='csv')";
        tableEnvironment.executeSql(createOutDDl);

        //创建一张用于控制台打印的输出表
        String createPrintOutDDl="create table printOutTable(" +
                "user_name STRING," +
                "cnt BIGINT" +
//                "ts BIGINT " +
                ")with (" +
                "'connector'='print')";
        tableEnvironment.executeSql(createPrintOutDDl);

        //输出表
//        resultTable.executeInsert("clickOutTable");
//        resultTable2.executeInsert("printOutTable");
        aggRes.executeInsert("printOutTable");

    }
}

表和流的转换

1、表转流

toDataStream()针对只插入数据的流

tEnv.toDataStream(result1).print("result1");

toChangelogStream()针对有更新操作的流,可以替代toDataStream()方法

package table;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;


/**
 * @Title: SimpleTableDemo
 * @Author lizhe
 * @Package table
 * @Date 2024/6/17 19:38
 * @description:
 */
public class SimpleTableDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        SingleOutputStreamOperator<Event> eventDS = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        Table eventTable = tEnv.fromDataStream(eventDS);
        //使用TableAPI
        Table result1 = eventTable.select($("user"))
                .where($("user").isEqual("Alice"));
        //使用SQL
        Table result2 = tEnv.sqlQuery("select user,url from " + eventTable);
        tEnv.toDataStream(result1).print("result1");
        tEnv.toDataStream(result2).print("result2");

        //聚合转换
        tEnv.createTemporaryView("clickTable",eventTable);
        Table aggRes = tEnv.sqlQuery("select user ,count(url) as cnt from clickTable group by user");
        tEnv.toChangelogStream(aggRes).print("aggRes");
        env.execute();



    }
}


2、流转表

调用 fromDataStream()方法

// 读取数据源
SingleOutputStreamOperator<Event> eventStream = env.addSource(...)

// 将数据流转换成表,可以提取流中某些字段
Table eventTable = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),
$("url")
);

调用createTemporaryView()方法

调用 fromDataStream()方法简单直观,可以直接实现DataStream 到 Table 的转换;不过如果我们希望直接在 SQL 中引用这张表,就还需要调用表环境的 createTemporaryView()方法来创建虚拟视图了。

tableEnv.createTemporaryView("EventTable", eventStream,$("timestamp").as("ts"),$("url"));

调用 fromChangelogStream ()方法,可以将一个更新日志流转换成表

DataStream<Row> dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

// 将更新日志流转换为表
Table table = tableEnv.fromChangelogStream(dataStream);
15.5、时间属性

基于时间的操作(如开窗)要定义相关的时间语义和时间数据来源的信息。在tableAPI和SQL中会给表单独提供一个逻辑上的时间字段专用指示时间。

时间属性可以在建表时指定也可流转表时定义,时间属性的数据类型为timestamp

15.5.1、事件时间

通过WaterMark来定义事件时间属性

create table eventTable(
	user string,
    url string,
    ts timestamp(3),
    watermark for ts as ts -interval '5' second
) with (...);

上面的语句将ts字段定义为事件时间属性,并基于ts设置了5s的水位延迟

package table;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Title: TimeAndWindowTest
 * @Author lizhe
 * @Package table
 * @Date 2024/6/20 21:21
 * @description:
 */
public class TimeAndWindowTest {
    public static void main(String[] args)  throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        //在建表的DDL中直接定义时间属性
        String createDDl="create table clickTable(" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT ," +
                "et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)),"+
                "WATERMARK FOR et AS et - INTERVAL '1' SECOND"+
                ")with (" +
                "'connector'='filesystem'," +
                "'path'='input/clicks.txt'," +
                "'format'='csv')";
        tableEnvironment.executeSql(createDDl);
        //在流转换成table时定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        Table clickTable = tableEnvironment.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());
        clickTable.printSchema();

    }
}

时间戳必须是timestamp类型,类型转换

ts BIGINT,
time_ltz as to_timestamp_ltz(ts,3),
15.5.2、处理时间

处理时间属性的定义也有两种方式:创建表 DDL 中定义,或者在数据流转换成表时定义。

1、在创建表的DDL 中定义

在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个额外的字段,通过调用系统内置的 PROCTIME()函数来指定当前的处理时间属性,返回的类型是TIMESTAMP_LTZ。

create table eventTable(
	user string,
    url string,
    ts as proctime()
) with (...);

可以用一个 AS 语句来在表中产生数据中不存在的列, 并且可以利用原有的列、各种运算符及内置函数。在前面事件时间属性的定义中,将 ts 字段转换成 TIMESTAMP_LTZ 类型的 ts_ltz,也是计算列的定义方式

2、在数据流转换为表时定义

处理时间属性同样可以在将 DataStream 转换为表的时候来定义。 我们调用fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。

Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").proctime());
15.5.3、窗口

以滚动窗口为例:

这里的 ts 是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL 来定义。在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。基本使用方式如下

Table result = tableEnv.sqlQuery(
"SELECT " +
"user, " +
"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +
"COUNT(url) AS cnt " + "FROM EventTable " +
"GROUP BY " +	// 使用窗口和用户名进行分组
"user, " +
"TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口
);

分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。

1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions, Windowing TVFs)来定义窗口。直接调用 TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。

  • 滚动窗口:TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘1’ HOUR)。这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。
  • 滑动窗口:HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘5’ MINUTES, INTERVAL ‘1’ HOURS));这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。
  • 累积窗口:CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘1’ HOURS, INTERVAL ‘1’ DAYS));累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)
15.5.4、聚合查询
15.5.4.1、分组聚合
Table aggTable = tableEnvironment.sqlQuery("select user_name ,count(url) from clickTable group by user_name");
15.5.4.2、时间窗口聚合

滚动窗口

        Table tumbleWindowResult = tableEnvironment.sqlQuery(
                "SELECT " +
                        "user_name, " +
                        "window_end AS endT, " + "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE clickTable, " + "DESCRIPTOR(et), "  + "INTERVAL '10' SECOND)) " +
                        "GROUP BY user_name, window_start, window_end "
        );

滑动窗口

 Table hopWindowResult = tableEnvironment.sqlQuery(
                "SELECT " +
                        "user_name, " +
                        "window_end AS endT, " + "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                        "HOP( TABLE clickTable, " + "DESCRIPTOR(et),INTERVAL '5' SECOND ,"  + "INTERVAL '10' SECOND)) " +
                        "GROUP BY user_name, window_start, window_end "
        );

累积窗口

        Table cumulateWindowResult = tableEnvironment.sqlQuery(
                "SELECT " +
                        "user_name, " +
                        "window_end AS endT, " + "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                        "CUMULATE( TABLE clickTable, " + "DESCRIPTOR(et),INTERVAL '5' SECOND ,"  + "INTERVAL '10' SECOND)) " +
                        "GROUP BY user_name, window_start, window_end "
        );
15.5.4.3、开窗聚合

可根据行数进行开窗,开窗选择的范围可以基于时间,也可以基于数据的数量,以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。

SELECT
<聚合函数> OVER (
[PARTITION BY <字段 1>[, <字段 2>, ...]]
ORDER BY <时间属性字段>
<开窗范围>),
...
FROM ...

PARTITION BY(可选)用来指定分区的键(key),类似于 GROUP BY 的分组

开窗范围:还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界” 的范围。目前支持的上界只能是 CURRENT ROW

BETWEEN ... PRECEDING AND CURRENT ROW
  • 范围间隔

    范围间隔以RANGE 为前缀,就是基于ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。当前行之前 1 小时的数据:

    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    
  • 行间隔

    行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取

    开窗范围选择当前行之前的 5 行数据(含当前行):

    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
    
        Table overWindowResult = tableEnvironment.sqlQuery("SELECT user_name,avg(ts) OVER( partition BY user_name ORDER BY et ROWS BETWEEN 3 PRECEDING AND CURRENT ROW )AS avg_ts FROM clickTable" );

整体代码

package table;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Title: TimeAndWindowTest
 * @Author lizhe
 * @Package table
 * @Date 2024/6/20 21:21
 * @description:
 */
public class TimeAndWindowTest {
    public static void main(String[] args)  throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        //在建表的DDL中直接定义时间属性
        String createDDl="create table clickTable (" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT ," +
                "et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)),"+
                "WATERMARK FOR et AS et - INTERVAL '1' SECOND"+
                ")with (" +
                "'connector'='filesystem'," +
                "'path'='input/clicks.txt'," +
                "'format'='csv')";
        tableEnvironment.executeSql(createDDl);
        //在流转换成table时定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        Table clickTable = tableEnvironment.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());
        //聚合查询
        Table aggTable = tableEnvironment.sqlQuery("select user_name ,count(url) from clickTable group by user_name");
        //窗口聚合
        Table tumbleWindowResult = tableEnvironment.sqlQuery(
                "SELECT " +
                        "user_name, " +
                        "window_end AS endT, " + "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE clickTable, " + "DESCRIPTOR(et), "  + "INTERVAL '10' SECOND)) " +
                        "GROUP BY user_name, window_start, window_end "
        );
        //滑动窗口
        Table hopWindowResult = tableEnvironment.sqlQuery(
                "SELECT " +
                        "user_name, " +
                        "window_end AS endT, " + "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                        "HOP( TABLE clickTable, " + "DESCRIPTOR(et),INTERVAL '5' SECOND ,"  + "INTERVAL '10' SECOND)) " +
                        "GROUP BY user_name, window_start, window_end "
        );
        //累积窗口
        Table cumulateWindowResult = tableEnvironment.sqlQuery(
                "SELECT " +
                        "user_name, " +
                        "window_end AS endT, " + "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                        "CUMULATE( TABLE clickTable, " + "DESCRIPTOR(et),INTERVAL '5' SECOND ,"  + "INTERVAL '10' SECOND)) " +
                        "GROUP BY user_name, window_start, window_end "
        );
        //开窗聚合
        Table overWindowResult = tableEnvironment.sqlQuery("SELECT user_name,avg(ts) OVER( partition BY user_name ORDER BY et ROWS BETWEEN 3 PRECEDING AND CURRENT ROW )AS avg_ts FROM clickTable" );


        clickTable.printSchema();
        tableEnvironment.toChangelogStream(aggTable).print("aggTable");
        tableEnvironment.toChangelogStream(tumbleWindowResult).print("tumbleWindowResult");
        tableEnvironment.toChangelogStream(hopWindowResult).print("hopWindowResult");
        tableEnvironment.toChangelogStream(cumulateWindowResult).print("cumulateWindowResult");
        tableEnvironment.toChangelogStream(overWindowResult).print("overWindowResult");

        env.execute();

    }
}

15.5.5、联结查询

待续

15.5.6、函数

待续

15.6、连接外部系统

官网链接:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/overview/

16、容错机制

16.1、检查点

数据流

在这里插入图片描述

保存点为红色的hello

将之前某个时间点所有的状态保存下来,这个存档就是检查点
在这里插入图片描述
遇到故障可从检查点恢复,从而不用再从头开始统计

16.1.1、检查点保存
  1. 周期性保存
  2. 保存的时间点:所有任务都恰好处理完一个相同数据之后保存状态,从而实现一个数据被完整处理。
  3. 保存的流程:关键是要等所有任务将同一个数据处理完毕
16.1.2、从检查点恢复

出现故障
在这里插入图片描述
恢复步骤:

  1. 重启应用:所有任务状态都会清空

    在这里插入图片描述

  2. 读取检查点,重置状态:找到检查点,恢复快照并填充到对应状态

    在这里插入图片描述

  3. 重置偏移量:从检查点之后开始处理数据,要更改偏移量

    在这里插入图片描述

  4. 继续处理数据

    在这里插入图片描述

16.1.3、检查点算法
  1. 检查点分界线Barrier
  2. 分布式快照算法(Barrier精准一次)
  3. 分布式快照算法(Barrier至少一次)
  4. 分布式快照算法(非Barrier精准一次)

总结:

  • Barrier对齐:一个Task收到所有上游同一个编号的Barrier后,才会对自己的本地状态做备份
    • 精准一次:对齐过程中,Barrier后的数据阻塞等待,不会越过Barrier
    • 至少一次:对齐过程中,先到的Barrier其后的数据不阻塞,接着计算
  • 非Barrier对齐:一个Task收到第一个Barrier时开始执行备份,能保证精准一次
    • 先到的Barrier将本地状态备份,后面的数据接着计算输出
    • 未到的Barrier其前面的数据接着计算输出,同时也保存到备份中
    • 最后一个Barrier到达该Task时,这个Task的备份结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2116369.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BUUCTF(34)特殊的 BASE64

使用pycharm时&#xff0c;如果想把代码撤销到之前的状态可以用 Ctrlz 如果不小心撤销多了&#xff0c;可以用 CtrlShiftZ 还原&#xff0c; 别傻傻的重新敲了 BUUCTF在线评测 (buuoj.cn) 查看字符串&#xff0c;想到base64的变表 这里用的c的标准程序库中的string&#xff0…

JS_循环结构

目录标题 while结构for循环foreach循环 while结构 几乎和JAVA一致 代码 <script> /* 打印99 乘法表 */ var i 1; while(i < 9){ var j 1; while(j < i){ document.write(j"*"i""i*j" "); j; } document.write("<hr/…

《机器学习》数据预处理 删除、替换、填充 案例解析及实现

目录 一、了解数据清洗 1、什么是数据清洗 2、数据清洗步骤 1&#xff09;缺失值处理 2&#xff09;异常值处理 3&#xff09;重复值处理 4&#xff09;格式修正 5&#xff09;数据一致性检查 6&#xff09;数据类型转换 二、数据清洗用法 1、有如下文件内容 2、完…

前向渲染路径

1、前向渲染路径处理光照的方式 前向渲染路径中会将光源分为以下3种处理方式&#xff1a; 逐像素处理&#xff08;需要高等质量处理的光&#xff09;逐顶点处理&#xff08;需要中等质量处理的光&#xff09;球谐函数&#xff08;SH&#xff09;处理&#xff08;需要低等质量…

线程的四种操作

所属专栏&#xff1a;Java学习 1. 线程的开启 start和run的区别&#xff1a; run&#xff1a;描述了线程要执行的任务&#xff0c;也可以称为线程的入口 start&#xff1a;调用系统函数&#xff0c;真正的在系统内核中创建线程&#xff08;创建PCB&#xff0c;加入到链…

C++17: 用折叠表达式实现一个IsAllTrue函数

前言 让我们实现一个 IsAllTrue 函数&#xff0c;支持变长参数&#xff0c;可传入多个表达式&#xff0c;必须全部计算为true&#xff0c;该函数才返回true。 本文记录了逐步实现与优化该函数的思维链&#xff0c;用到了以下现代C新特性知识&#xff0c;适合对C进阶知识有一定…

2025最新:如何打造公司财务管理系统?Java SpringBoot实现,一步到位管理企业财务!

✍✍计算机毕业编程指导师** ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java…

STM32F407VET6开发板RT-Thread的移植适配

前言 最近购买了一块 STM32F407VET6开发板【魔女】&#xff0c;http://www.stm32er.com/ 通过原理图了解到&#xff0c;开发板板载 CMSIS-DAP 调试下载口&#xff0c;例程部分大部分以裸机程序为主 目标&#xff1a;打算移植适配到 RT-Thread 适配 RT-Thread RT-Thread 支持…

基于准静态自适应环型缓存器(QSARC)的taskBus万兆吞吐实现

文章目录 概要整体架构流程技术名词解释技术细节1. 数据结构2. 自适应计算队列大小3. 生产者拼接缓存4. 高效地通知消费者 小结1. 性能表现情况2. 主要改进3. 源码和发行版 概要 准静态自适应环形缓存器&#xff08;Quasi-Static Adaptive Ring Cache&#xff09;是taskBus用于…

【Python报错已解决】 No Python at ‘C:\Users...\Python Python39\python.exe’

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 前言一、问题描述1.1 报错示例1.2 报错分析1.3 解决思路 二、解决方法2.1 方法一&#xff1a;检查Python安装路径2.2 …

苍穹外卖随记(一)

黑马苍穹外卖逻辑和细节的问题和解决 1.后端查询到员工的日期信息&#xff0c;将信息进行json化传给前端时发生&#xff1a;前端收到的是不标准的日期json串。 解决&#xff1a;1.注解进行json格式化&#xff08;JsonFormat&#xff09;2. 在spring MVC中&#xff0c;通过消息…

如何验证VMWare WorkStation的安装?

如何验证VMWare WorkStation的安装&#xff1f; 右击"网络"&#xff0c;点击 打开"网络和Internet设置"&#xff0c;点击更改适配器选项&#xff0c;如果出现VMNet1和VMNet8&#xff0c;则说明安装成功。

内网穿透(cpolar实现)

目录 一、介绍 二、内网穿透工具cpolar实现 1.下载与安装 2.cpolar指定authtoken 3.获取临时域名 4.验证临时域名有效性 一、介绍 内网穿透&#xff0c;即 NAT 穿透&#xff08;Network Address Translation Traversal&#xff09;&#xff0c;是一种网络技术&#xff0…

Python爬虫使用实例-wallpaper

1/ 排雷避坑 &#x1f95d; 中文乱码问题 print(requests.get(urlurl,headersheaders).text)出现中文乱码 原因分析&#xff1a; <meta charset"gbk" />解决方法&#xff1a; 法一&#xff1a; response requests.get(urlurl,headersheaders) response.en…

java基础-IO(4)管道流 PipedOutputStream、PipedInputStream、PipedReader、PipedWriter

管道 提到管道&#xff0c;第一印象就是水管或者地下管道&#xff0c;一节接着一节&#xff0c;形如下图。 管道流 "流"从上一个管道 ——-> 下一个管道。又细分为字节流和字符流。 字节流&#xff08;PipedOutputStream、PipedInputStream&#xff09; 如果…

SSM框架介绍

SSM通常指的是三个开源框架的组合&#xff0c;即Spring、SpringMVC&#xff08;Spring Web MVC&#xff09;和MyBatis&#xff0c;这三个框架经常一起使用来开发Java企业级应用&#xff0c;特别是在Web应用开发中非常流行。 SSM框架介绍 Spring 简介&#xff1a;Spring是一个…

谷粒商城のNginx

文章目录 前言一、Nginx1、安装Nginx2、相关配置2.1、配置host2.2、配置Nginx2.3、配置网关 前言 本篇重点介绍项目中的Nginx配置。 一、Nginx 1、安装Nginx 首先需要在本地虚拟机执行&#xff1a; mkdir -p /mydata/nginx/html /mydata/nginx/logs /mydata/nginx/conf在项目…

ModuleNotFoundError: No module named ‘mmcv.transforms‘

不得已的解决方法&#xff1a; mmcv升级到2.0.0即可解决 升级后自然又面临一系列不兼容问题&#xff01; 官方文档查漏补缺

【STM32】呼吸灯实现

对应pwm概念可以去看我的博客51实现的呼吸灯 根据对应图我们可知预分频系数为999&#xff0c;重装载值为2000&#xff0c;因为设置内部时钟晶振频率为100MHZ &#xff0c;1s跳 100 000000次 &#xff0c;跳一次需要1/100 000000s 20ms0.02s 对应跳的次数为 我们使用通用定时器…

九,自定义转换器详细操作(附+详细源码解析)

九&#xff0c;自定义转换器详细操作&#xff08;附详细源码解析&#xff09; 文章目录 九&#xff0c;自定义转换器详细操作&#xff08;附详细源码解析&#xff09;1. 基本介绍2. 准备工作3. 自定义转换器操作4. 自定义转换器的注意事项和细节5. 总结&#xff1a;6. 最后&…