5 Paimon数据湖之表数据查询详解

news2025/1/12 22:52:28

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

虽然前面我们已经讲过如何查询Paimon表中的数据了,但是有一些细节的东西还需要详细分析一下。

  • 首先是针对Paimon中系统表的查询,例如snapshots\schemas\options等等这些系统表。
    其实简单理解就是我们可以通过sql的形式查询系统表来查看实体表的快照、schema等信息,这些信息我们也可以直接到hdfs中查看,只是不太方便。

  • 在查询数据的时候,可以细分为批量读取和流式读取,因为Paimon可以同时支持批处理和流处理。

  • 在查询数据的时候,如果想要从之前的某一个时间点开始查询数据,也就说任务启动的时候想要查询一些历史数据,则需要用到时间旅行这个特性,可以在SQL查询语句中通过动态表选项指定scan.mode参数来控制具体查询哪些历史数据。

Scan Mode的值可以有多种,不同的值代表不同的含义,下面我们来具体分析一下:
在这里插入图片描述

注意:在分析的时候,我们需要针对批处理和流处理这两种情况分别进行分析。

  • (1)default:如果我们在执行查询的时候,没有指定scan.mode参数,则默认是default。但是此时需要注意,如果我们也没有同时指定其他参数,例如:timestamp-millis\snapshot-id等scan相关的参数,那么默认会执行latest-full策略。
    所以说,我们在执行查询的时候,如果没有指定任何scan相关的参数,那么默认执行的策略就是latest-full。

  • (2)latest-full:和full是一样的效果,不过full这个参数已经被标记为过期了。针对批处理,表示只读取最新快照中的所有数据,读取完成以后任务就执行结束了。针对流处理,表示第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据,这个任务会一直运行。

  • (3)latest:针对批处理,他的执行效果和latest-full是一样的,只会读取最新快照中的所有数据。但是针对流处理就不一样了,此时表示只读取最新的变更数据,也就是说任务启动之后,只读取新增的数据,之前的历史快照中的数据不读取。类似于kafka中消费者里面的latest消费策略。

  • (4)from-snapshot:使用此策略的时候,需要同时指定snapshot-id参数。针对批处理,表示只读取指定id的快照中的所有数据。针对流处理,表示从指定id的快照开始读取变更数据(注意:此时不是读取这个快照中的所有数据,而是读取此快照中的变更数据,也可以理解为这个快照和上一个快照相比新增的数据),当然,后续新增的变更数据也是可以读取到的,因为这个是流处理,他会一直执行读取操作。

  • (5)from-snapshot-full:使用此策略的时候,也需要同时指定snapshot-id参数。针对批处理,他的执行效果和from-snapshot是一样的。针对流处理,表示第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据,此时任务会一直执行。

  • (6)from-timestamp:使用此策略的时候,需要同时指定timestamp-millis参数。针对批处理,表示只读取指定时间戳的快照中的所有数据。针对流处理,表示从指定时间戳的快照开始读取变更数据,(注意,这里也是读取这个快照中的变更数据,不是所有数据。),然后读取后续新增的变更数据。

  • (7)incremental:表示是增量查询,这个主要是针对批处理的,通过这种策略可以读取开始和结束快照之间的增量变化。开始和结束快照可以通过快照id或者是时间戳进行指定。
    如果是使用快照id,则需要通过incremental-between参数指定。
    如果是使用时间戳,则需要通过incremental-between-timestamp参数指定。

  • (8)compacted-full:想要使用这个参数有一个前提,Paimon表需要开启完全压缩(full compaction)。此时针对批处理,表示只读取最新完全压缩(full compaction)的快照中的所有数据。针对流处理,表示第一次启动时读取最新完全压缩(full compaction)的快照中的所有数据,然后继续读取后续新增的变更数据。

针对这里面的latest、latest-full、compacted-full这几种策略放在一起可能容易混淆,下面我们来通过一个图重新梳理一下:
在这里插入图片描述

首先看中间这条线,表示是数据的时间轴,左边是历史数据,右边是最新产生的数据。

中间这条线上面是批处理,下面是流处理。

我们首先来看批处理:
如果我们指定了scan.modelatest-full或者是latest,则会读取最新的快照中的所有数据,也就是Last Snapshot中的数据。
如果我们指定了scan.modecompacted-full,则会读取最新的完全压缩(full compaction)的快照中的数据,也就是Last Compact Snapshot中的数据。

接下来看一下流处理:
如果我们指定了scan.modelatest-full,则会在任务第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据。也就是第一次启动时先读取Last Snapshot中的所有数据,接着读取后续新产生的数据。
如果我们指定了scan.modelatest,则此时只读取最新的变更数据,不读取LastSnapshot快照中的数据。
如果我们指定了scan.modecompacted-full,则第一次启动时会读取最新完全压缩(full compaction)的快照中的所有数据,也就是Last Compact Snapshot中的数据,接着读取后续新产生的数据。

这就是这些策略在批处理和流处理中的执行流程。

(1)查询系统表

下面我们来通过具体的案例来演示一下前面提到的查询数据相关的用法。

首先创建一个向Paimon表中模拟写入数据的类,便于一会测试使用
创建package:tech.xuwei.paimon.query

创建object:FlinkSQLWriteToPaimon

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 通过FlinkSQL 向Paimon中模拟写入数据
 * Created by xuwei
 */
object FlinkSQLWriteToPaimon {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS `query_table`(
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |)
        |""".stripMargin)

    //写入数据
    tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jack',18)")
    tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('tom',19)")
    tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('mick',20)")

  }

}

在idea中运行这个代码。

接下来创建一个类来查询一下Paimon中的系统表。

创建object:FlinkPaimonSystemTable

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 查询Paimon中的系统表
 * Created by xuwei
 */
object FlinkPaimonSystemTable {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")


    //snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息
    println("====================snapshot信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$snapshots").print()

    //schema信息表,对应的其实就是hdfs中表的schema目录下的schema-*文件信息
    println("====================schema信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$schemas").print()

    //manifest信息表,对应的其实就是hdfs中表的manifest目录下的manifest-*文件信息
    println("====================manifest信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$manifests").print()

    //file信息表,对应的其实就是hdfs中表的bucket-*目录下的data-*文件信息
    println("====================file信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$files").print()

    //option信息表,对应的就是建表语句中with里面指定的参数信息,在表的schema-*文件中也能看到option信息
    println("====================option信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$options").print()

    //consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到
    println("====================consumer信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$consumers").print()


    //audit log信息表,相当于是表的审核日志,可以看到表中每条数据的rowkind,也就是+I\-U\+U\-D
    println("====================audit log信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$audit_log").print()


  }
}

运行代码。
注意:在本地执行flink sql中的print,会看到下面错误:

java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
	at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1026)
	at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:899)
	at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:823)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:219)
	at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
	at tech.xuwei.paimon.query.FlinkPaimonSystemTable$.main(FlinkPaimonSystemTable.scala:35)
	at tech.xuwei.paimon.query.FlinkPaimonSystemTable.main(FlinkPaimonSystemTable.scala)

这个异常不影响程序执行,实际工作中我们不会写这种代码,一般都是在sql中写insert into select语句了,在这主要是为了方便测试,忽略这个异常即可。

如果感觉看起来比较乱,可以修改一下log4j.properties日志中的告警级别,改为error级别即可。

log4j.rootLogger=error,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

重新运行代码,可以看到如下结果:

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 8f74d97b-bf6b-4ac7-bb47-3bb... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:22.859 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 49412497-1749-4566-8bf8-1c5... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:24.802 |                    2 |                    1 |                      0 | -9223372036854775808 |
| +I |                    3 |                    0 | e55e756d-e528-4b7c-97f0-a01... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:26.409 |                    3 |                    1 |                      0 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
3 rows in set
====================schema信息表===========================
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |            schema_id |                         fields |                 partition_keys |                   primary_keys |                        options |                        comment |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    0 | [{"id":0,"name":"name","typ... |                             [] |                       ["name"] |                             {} |                                |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
1 row in set
====================manifest信息表===========================
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| op |                      file_name |            file_size |      num_added_files |    num_deleted_files |            schema_id |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| +I | manifest-800ac729-22d3-494b... |                 1665 |                    1 |                    0 |                    0 |
| +I | manifest-61d14e4e-d2a0-42ac... |                 1675 |                    1 |                    0 |                    0 |
| +I | manifest-fd8e45b0-d456-467a... |                 1673 |                    1 |                    0 |                    0 |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
3 rows in set
====================file信息表===========================
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op |                      partition |      bucket |                      file_path |                    file_format |            schema_id |       level |         record_count |   file_size_in_bytes |                        min_key |                        max_key |              null_value_counts |                min_value_stats |                max_value_stats |           creation_time |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I |                             [] |           0 | data-6b23bcaf-3dbe-46c0-a67... |                            orc |                    0 |           0 |                    1 |                  566 |                         [jack] |                         [jack] |                {age=0, name=0} |            {age=18, name=jack} |            {age=18, name=jack} | 2023-07-28 17:35:22.453 |
| +I |                             [] |           0 | data-ce40f0df-aa2a-4682-8b6... |                            orc |                    0 |           0 |                    1 |                  581 |                         [mick] |                         [mick] |                {age=0, name=0} |            {age=20, name=mick} |            {age=20, name=mick} | 2023-07-28 17:35:26.257 |
| +I |                             [] |           0 | data-ac9bd895-2b8e-4efe-969... |                            orc |                    0 |           0 |                    1 |                  572 |                          [tom] |                          [tom] |                {age=0, name=0} |             {age=19, name=tom} |             {age=19, name=tom} | 2023-07-28 17:35:24.603 |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
3 rows in set
====================option信息表===========================
Empty set
====================tag信息表===========================
Empty set
====================consumer信息表===========================
Empty set
====================audit log信息表===========================
+----+--------------------------------+--------------------------------+-------------+
| op |                        rowkind |                           name |         age |
+----+--------------------------------+--------------------------------+-------------+
| +I |                             +I |                           jack |          18 |
| +I |                             +I |                           mick |          20 |
| +I |                             +I |                            tom |          19 |
+----+--------------------------------+--------------------------------+-------------+
3 rows in set
(2)批量读取

下面演示一下如何在批量读取中使用时间旅行功能。

创建object:tech.xuwei.paimon.query.FlinkPaimonBatchQuery

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 批量读取
 * Created by xuwei
 */
object FlinkPaimonBatchQuery {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //SET 'execution.runtime-mode' = 'batch';
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //批量查询数据
    tEnv.executeSql(
      """
        |SELECT * FROM query_table
        |-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,只读取最新快照中的所有数据
        |-- /*+ OPTIONS('scan.mode'='latest') */ -- 在批处理模式下和latest-full的效果一致
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 只读取指定id的快照中的所有数据
        |-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 在批处理模式下和from-snapshot的效果一致
        |-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 只读取指定时间戳的快照中的所有数据
        |-- /*+ OPTIONS('scan.mode'='incremental','incremental-between' = '1,3') */ -- 指定两个快照id,查询这两个快照之间的增量变化
        |-- /*+ OPTIONS('scan.mode'='incremental','incremental-between-timestamp' = '1690536922859,1690536926409') */ -- 指定两个时间戳,查询这两个快照之间的增量变化
        |""".stripMargin)
      .print()

  }
}

运行代码,查看每一种策略的数据结果。

注意:在演示compacted-full这种策略的时候需要给表开启full-compaction

所以重新创建一个新的表。

创建object:FlinkSQLWriteToPaimonForCompact

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 通过FlinkSQL 向Paimon中模拟写入数据
 * Created by xuwei
 */
object FlinkSQLWriteToPaimonForCompact {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS `query_table_compact`(
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |)WITH(
        |    'changelog-producer' = 'full-compaction',
        |    'full-compaction.delta-commits' = '1'
        |)
        |""".stripMargin)

    //写入数据
    tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('jack',18)")
    tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('tom',19)")
    tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('mick',20)")

  }

}

运行代码。

再创建一个新的读取数据的类:

创建object:FlinkPaimonBatchQueryForCompact

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 批量读取
 * Created by xuwei
 */
object FlinkPaimonBatchQueryForCompact {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //SET 'execution.runtime-mode' = 'batch';
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //批量查询数据
    tEnv.executeSql(
      """
        |SELECT * FROM query_table_compact
        |/*+ OPTIONS('scan.mode' = 'compacted-full') */ --表需要开启full-compaction,设置changelog-producer和full-compaction.delta-commits
        |""".stripMargin)
      .print()

  }
}

运行代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+

由于目前每一次提交数据都会触发完全压缩,所以我们查询最新的完全压缩快照中的数据是可以获取到所有数据的。

此时可以通过系统表查看一下这个表的snapshot信息:
创建object:FlinkPaimonSystemTableForCompact

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 查询Paimon中的系统表
 * Created by xuwei
 */
object FlinkPaimonSystemTableForCompact {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")


    //snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息
    println("====================snapshot信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table_compact$snapshots").print()

  }
}

执行代码,可以看到如下结果

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:07.293 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:08.211 |                    3 |                    2 |                      1 | -9223372036854775808 |
| +I |                    3 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:09.423 |                    4 |                    1 |                      0 | -9223372036854775808 |
| +I |                    4 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:09.641 |                    8 |                    4 |                      1 | -9223372036854775808 |
| +I |                    5 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:11.500 |                    9 |                    1 |                      0 | -9223372036854775808 |
| +I |                    6 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:12.130 |                   15 |                    6 |                      1 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+

此时可以看到在commit_kind这一列中显示的有APPENDCOMPACT,表示这个快照是追加产生的还是完全压缩产生的。

由于我们配置的每一次提交数据都会触发完全压缩,所以对应的有3个完全压缩产生的快照。

为了便于验证,我们可以把最新的那个完全压缩的快照删除掉,再执行查询,看看结果是什么样的:

删除最新的完全压缩的快照:

[root@bigdata04 ~]# hdfs dfs -rm -r /paimon/default.db/query_table_compact/snapshot/snapshot-6

注意:这个删除操作建议大家在命令行执行,不要在web页面执行,在web页面删除可能会直接把这个表的目录删除掉!!!!!

然后再执行FlinkPaimonBatchQueryForCompact,结果如下:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+
2 rows in set

注意:此时最新的完全压缩的快照就是snapshot-4了,这个快照中只有2条数据。

这就是批量读取中时间旅行参数的使用。

(3)流式读取

下面演示一下如何在流式读取中使用时间旅行功能。

创建object:FlinkPaimonStreamingQuery

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 流式读取
 * Created by xuwei
 */
object FlinkPaimonStreamingQuery {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //SET 'execution.runtime-mode' = 'streaming';
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //流式查询数据
    tEnv.executeSql(
      """
        |SELECT * FROM query_table
        |-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据
        |-- /*+ OPTIONS('scan.mode'='latest') */ -- 只读取最新的变更数据
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 从指定id的快照开始读取变更数据(包含后续新增)
        |-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据
        |-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 从指定时间戳的快照开始读取变更数据(包含后续新增)
        |""".stripMargin)
      .print()
  }
}
(4)Consumer ID

最后我们在流式读取这里扩展一个知识点:Consumer ID,这个功能是针对流式读取设计的。

相当于我们在kafka消费者中指定一个groupid,这样可以通过groupid维护消费数据的偏移量信息,便于任务停止以后重启的时候继续基于之前的进度进行查询。

在这里Consumer ID的主要作用是为了方便记录每次查询到的数据快照的位置,他会把下一个还未读取的快照id记录到hdfs文件中。
当之前的任务停止以后,新启动的任务可以基于之前任务记录的快照id继续查询数据,不需要从状态中恢复位置信息。

这个特性目前属于实验特性,还没有经过大量生产环境的验证,大家可以先提前了解一下。

下面来结合一个案例演示一下:
具体的思路是这样的:

  • 1:首先使用Consumer ID查询一次query_table表中的数据。
  • 2:然后停止之前的查询任务,向query_table表中模拟产生1条数据。
  • 3:重新启动第1步骤中的任务,验证一下是否只读取到了新增的那1条数据

创建object:FlinkPaimonStreamingQueryForConsumerid

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 流式读取
 * Created by xuwei
 */
object FlinkPaimonStreamingQueryForConsumerid {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //SET 'execution.runtime-mode' = 'streaming';
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。
    env.enableCheckpointing(5000)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //流式查询数据
    tEnv.executeSql(
      """
        |SELECT * FROM query_table
        |/*+ OPTIONS('consumer-id'='con-1') */ -- 指定消费者id
        |""".stripMargin)
      .print()
  }
}

注意:在这需要开启checkpoint,否则Consumer ID的功能无法正常触发。

第一次执行此代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |

停止此代码。

此时其实可以到hdfs中查看一下维护的Consumer ID信息:

[root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/query_table/consumer/consumer-con-1
{
  "nextSnapshot" : 4
}

这里面记录的是下一次需要读取的快照id,数值为4,此时最新的快照id是3,因为快照id为3的快照已经读取过了,下一个快照id就是4了。

其实直接查询consumer系统表也是可以看到这些信息的。

创建object:FlinkPaimonSystemTableForConsumerid

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 查询Paimon中的系统表
 * Created by xuwei
 */
object FlinkPaimonSystemTableForConsumerid {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")


    //consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到
    println("====================consumer信息表===========================")
    tEnv.executeSql("SELECT * FROM query_table$consumers").print()

  }
}

执行代码,可以看到如下结果:

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    4 |
+----+--------------------------------+----------------------+
1 row in set

从这可以看出来,next_snapshot_id4,查出来的结果是一样的。

接下来我们向query_table中新增一条数据。

创建object:FlinkSQLWriteToPaimonForConsumerid

代码如下:

package tech.xuwei.paimon.query

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 通过FlinkSQL 向Paimon中模拟写入数据
 * Created by xuwei
 */
object FlinkSQLWriteToPaimonForConsumerid {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS `query_table`(
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |)
        |""".stripMargin)

    //写入数据
    tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jessic',30)")

  }

}

执行代码。

最后,我们再重新启动FlinkPaimonStreamingQueryForConsumerid,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                         jessic |          30 |

能看到这个结果,说明这个consumer id生效了,当我们第二次使用相同的consumer id读取这个表的时候,是可以基于之前的进度继续读取的。

停止此任务。

此时再执行FlinkPaimonSystemTableForConsumerid,查看最新的next_snapshot_id

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    5 |
+----+--------------------------------+----------------------+
1 row in set

此时next_snapshot_id变成了5,这是正确的。

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1192634.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【每日一题】—— C. Anonymous Informant(Codeforces Round 908 (Div. 2))(思维题)

🌏博客主页:PH_modest的博客主页 🚩当前专栏:每日一题 💌其他专栏: 🔴 每日反刍 🟡 C跬步积累 🟢 C语言跬步积累 🌈座右铭:广积粮,缓称…

(动手学习深度学习)第7章 残差网络---ResNet

目录 ResNet总结 ResNet代码实现ResNet的梯度计算 ResNet 总结 残差块使得很深的网络更加容易训练 甚至可以训练一千层的网络 残差网络对随后的深层神经网络设计产生了深远影响,无论是卷积类网络还是全连接类网络。 ResNet代码实现 导入相关库 import torch fro…

K8S容器内安装cur/telnet命令(Alpine Linux离线环境安装curl/telnet或其他工具)

背景 需求: 微服务的基础是镜像,通常在最小化的Linux镜像中安装jdk,然后运行编译好的java程序。将镜像运行到K8S上就得到了微服务Pod,Pod通常使用安装K8S时配置的私有网段,与宿主机不同。很多时候需要排查从Pod网段内…

概念解析 |移动群智感知:智能设备时代的数据革命

注1:本文系“概念解析”系列之一,致力于简洁清晰地解释、辨析复杂而专业的概念。本次辨析的概念是:移动群智感知(Mobile Crowd Sensing, MCS)。 移动群智感知:智能设备时代的数据革命 移动群智感知示意图 背景介绍 随着智能手机和物联网设备的普及,我们进入了一个全新的…

《未来之路:技术探索与梦想的追逐》

创作纪念日 日期:2023年07月05日文章标题:《从零开始-与大语言模型对话学技术-gradio篇(1)》成为创作者第128天 在这个平凡的一天,我撰写了自己的第一篇技术博客,题为《从零开始-与大语言模型对话学技术-…

【Docker】iptables命令的使用

iptables是一个非常强大的Linux防火墙工具,你可以使用它来控制网络流量的访问和转发。 前面已经学习了iptables的基本原理,四表五链的基本概念,也已经安装好了iptables,下面我们主要学习iptables命令的基本使用。 可以使用iptable…

Redis持久化之RDB和AOF操作

文章目录 前言一、什么是RDB?1.与持久化相关的一些配置2.触发机制3.如何恢复rdb文件4.优点5.缺点 二、什么是AOF?1.重写规则说明2.优点3.缺点 总结 前言 无论是面试还是工作,持久化都是重点! Redis是内存数据库,如果不将内存中的…

阿里云AIGC小说生成【必得京东卡】

任务步骤 此文真实可靠不做虚假宣传,绝对真实,可截图为证。 领取任务 链接(复制到wx打开):#小程序://ITKOL/1jw4TX4ZEhykWJd 教程实践 打开函数计算控制台 应用->创建应用->人工智能->通义千问 AI 助手-…

【字符串】【双指针翻转字符串+快慢指针】Leetcode 151 反转字符串中单词【好】

【字符串】【双指针翻转字符串快慢指针】Leetcode 151 反转字符串中单词 解法1 双指针翻转字符串快慢指针更新数组大小 ---------------🎈🎈题目链接🎈🎈------------------- ---------------🎈🎈解答链接…

矩阵起源荣获第八届“创客中国”深圳市中小企业创新创业大赛三等奖

近日,2023年第八届“创客中国”深圳市中小企业创新创业大赛圆满落下帷幕,矩阵起源(深圳)信息科技有限公司凭借项目”MatrixOne 新一代超融合异构云原生数据库”荣获企业组三等奖。 本届大赛由深圳市工业和信息化局、深圳市中小企业…

Blazor之Router入门

前言:博主文章仅用于学习、研究和交流目的,不足和错误之处在所难免,希望大家能够批评指出,博主核实后马上更改。 概述: Router 组件允许路由到 Razor 组件。 Router 组件在 App 组件 (App.razor) 中使用。编译带有 p…

鲁大师电动车智能化测评报告第二十三期:实测续航95km,九号Q90兼顾个性与实用

鲁大师第二十三期智能化电动车测评排行榜数据来源于鲁大师智慧实验室,测评的车型均为市面上主流品牌的主流车型。截止目前,鲁大师智能化电动车测评的车型高达130余台,且还在不断增加和丰富中。 一、测评依据 鲁大师电动车智能化测评体系包含车辆的状态采集与管理硬件系统、车辆…

【MySQL】手把手教你centos7下载MySQL

centos7下载MySQL 前言正式开始卸载不需要的环境(如果你之前没有安装过数据库相关的东西可以跳过)下载mysql登录mysql登陆⽅法⼀【不⾏就下⼀个】登陆⽅法⼆【不⾏就下⼀个】登录方式三 前言 安装和卸载MySQL都用系统的root权限,更方便一点&…

pyqt5学习-01 UI界面创建以及生成python代码

前提 环境搭建 打开designer 选择创建主窗体,拖入一个按钮 保存主窗体UI文件为firstMainWin.ui 将UI文件转化为python文件 # 可以把E:\Python\envs\pyqt5stu\Scripts\pyuic5.exe添加到环境变量中 E:\Python\envs\pyqt5stu\Scripts\pyuic5.exe -o firstMainWin.…

新零售时代,传统便利店如何转型?

在零售批发业,如何降低各环节成本、提高业务运转效率、更科学地了解客户服务客户,是每家企业在激烈竞争中需要思考的课题。 对零售批发企业来说,这些问题或许由来已久: (1)如何对各岗位的员工进行科学的考…

【nginx】使用arthas协助定位 nginx 499

看到这个499 到服务端 通过arthas查看 并没有耗时很长的 心跳接口 看都是很快的 通过 monitor 命令 通过watch 定位看到这个现象: watch -x 3 在nginx配置文件中添加 在nginx 中查看 没有499 了 再看nginx 中有存在 401 这个是业务问题 剩下的是检测器同事定位…

Flink之SQL查询操作

SQL查询 基本SELECT查询生成测试数据WITHWHEREDISTINCTORDER BYLIMIT 窗口函数概述创建数据表滚动窗口 TUMBLE滑动窗口 HOP累积窗口 CUMULATE窗口偏移 聚合窗口聚合分组聚合OVER聚合 TOP-N普通Top-N窗口Top-N 联结Join查询内部等连接外部等连接间隔联结 集合操作UNION 和 UNION…

内存对齐规则

前言 求结构体的大小是很热门的考点,无论你是学C还是C,都会遇到这样的问题,在面试中也很受欢迎,所以我们先思考这样一个问题:计算结构体,联合体和类的大小应该怎么去计算呢?我们知道&#xff0c…

105.am40刷机(linux)折腾记1-前期的准备工作1

前段时间在某鱼上逛的时候,发现一款3399的盒子只要150大洋,内心就开始澎拜,一激动就下手了3台,花了450大洋(现在想想,心都碎了一地)。 然后自己又来来回回折腾了几天,目前能跑上fire…

C# 异步日志记录类,方便下次使用,不用重复造轮子

先定义接口类: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace 异常 {internal interface ILog{Task WriteErrorLog(string message);Task WriteInfoLog(string message);Task W…