3 Paimon数据湖中的表类型详解

news2025/1/12 22:03:30

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

Paimon中支持多种表类型,下面我们来看一下。

3.1 全局维度

首先从全局维度来看,Paimon中的表类型可以大致划分为4种:
内部表、外部表、分区表和临时表

这些表类型,从名字上来看,和Hive中的表类型是类似的,下面我们来具体分析一下:

3.1.1 内部表

首先是内部表,我们前面创建的表其实都是内部表。

内部表是由Paimon Catalog负责管理的表,当内部表被删除的时候,表文件也会被一起删除。
表文件其实就是表中存储数据的文件,目前是存储在HDFS上面。

注意:在删除内部表之前需要先停止写入数据的任务,否则无法完全删除表文件。

内部表的建表语法是这样的:

CREATE TABLE Inner_Table(...) WITH (...)

内部表我们前面已经使用过了,在这里就不再演示了。

3.1.2 外部表

接下来我们来看一下外部表。

Paimon中的外部表是由其他Catalog负责记录,但是不由他们管理。
也就是说我们可以在任何非Paimon Catalog里面创建Paimon类型的外部表,但是表中的数据不是由他们进行管理的,最终是由Paimon Catalog来管理的。

所以当我们在其他Catalog中删除Paimon外部表时,不会删除表中的数据文件,只会把表的元数据删掉。

Paimon外部表可以在任何Catalog中使用。
其实Paimon外部表的存在主要是为了便于在其他Catalog里面去操作Paimon表,这样就不需要每次都创建Paimon Catalog了

它的典型应用场景是这样的:
举个例子:我们在使用Flink SQL的时候默认使用的是default Catalog,如果此时我们想要操作Paimon中的表,这个时候一般有两种解决方案

  • 1:创建Paimon Catalog,这样就可以操作Paimon表了,咱们前面在操作Paimon表的时候使用的就是这种方案。
  • 2:使用Paimon 外部表,此时就不需要创建Paimon Catalog了,在default Catalog中可以直接读写Paimon外部表。

Paimon外部表的建表语句是这样的:

CREATE TABLE External_Table (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'paimon',
    'path' = 'hdfs://.../table_path',
    'auto-create' = 'true' -- 如果表目录不存在,则自动创建
)

Paimon外部表需要在建表语句中的WITH中通过connectorpath表属性来指定。

下面我们来通过具体的案例来演示一下Paimon外部表的使用。

首先来看一下如何在Flink SQL中通过Paimon外部表来读取数据。

创建package:tech.xuwei.paimon.tabletype
创建object:FlinkSQLReadPaimonExternalTable

代码如下:

package tech.xuwei.paimon.tabletype

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用FlinkSQL从Paimon外部表中读取数据
 * Created by xuwei
 */
object FlinkSQLReadPaimonExternalTable {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon外部表
    tEnv.executeSql(
      """
        |CREATE TABLE paimon_external_user (
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'connector' = 'paimon',
        |    'path' = 'hdfs://bigdata01:9000/paimon/default.db/user',
        |    'auto-create' = 'true' -- 如果表目录不存在,则自动创建
        |)
        |""".stripMargin)

    //执行查询,并且打印输出结果
    tEnv.executeSql(
      """
        |SELECT * FROM `default_catalog`.`default_database`.`paimon_external_user`
        |""".stripMargin)
      .print()

  }

}

此时我们不需要定义Paimon Catalog,直接使用Flink SQL中的默认catalog即可读取Paimon表中的数据。

执行代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +U |                           jack |          11 |
| +I |                            tom |          20 |

接下来我们来看一下如何在Flink SQL中向Paimon外部表写入数据。

创建object:FlinkSQLWritePaimonExternalTable

代码如下:

package tech.xuwei.paimon.tabletype

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用FlinkSQL向Paimon外部表中写入数据
 * Created by xuwei
 */
object FlinkSQLWritePaimonExternalTable {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon外部表
    tEnv.executeSql(
      """
        |CREATE TABLE paimon_external_user2 (
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'connector' = 'paimon',
        |    'path' = 'hdfs://bigdata01:9000/paimon/default.db/user2',
        |    'auto-create' = 'true' -- 如果表目录不存在,则自动创建
        |)
        |""".stripMargin)

    //执行数据写入
    tEnv.executeSql(
      """
        |INSERT INTO paimon_external_user2(name,age) VALUES('jack',18)
        |""".stripMargin)
  }

}

执行代码。

然后复制一份FlinkSQLReadFromPaimon代码到tabletype包下面。
修改FlinkSQLReadFromPaimon里面读取的表名,改为user2

//读取Paimon表中的数据,并且打印输出结果
tEnv.executeSql(
  """
    |SELECT * FROM  `paimon_catalog`.`default`.`user2`
    |""".stripMargin)
  .print()

注意:此时表名需要使用user2,而不是paimon_external_user2!!!

因为paimon_external_user2这个表名是存储在Flink SQL中默认的基于内存的catalog中,当任务执行结束之后,paimon_external_user2这个表名就不存在了。
最终Paimon中存储的表名是user2,这个表名来源于path路径中的名称。

执行修改后的代码FlinkSQLReadFromPaimon,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |

注意:此时我们是以内部表的形式读取的数据。

3.1.3 分区表

Paimon中的分区表和Hive中的分区表的功能类似,主要也是为了提高查询效率。

分区表中的分区字段可以支持1个或者多个

注意:如果表中定义了主键,分区字段则必须是主键的子集。

Paimon分区表的建表语句是这样的:

CREATE TABLE Partition_Table (
    id INT,
    name STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (id, dt, hh) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH(
    ...
)

核心语法是PARTITIONED BY (....)

下面我们来通过具体的案例来演示一下Paimon分区表的使用。

首先来看一下如何在Flink SQL中向Paimon分区表中写入数据。

创建object:FlinkSQLWritePaimonPartitionTable

代码如下:

package tech.xuwei.paimon.tabletype

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用FlinkSQL向Paimon分区表中写入数据
 * Created by xuwei
 */
object FlinkSQLWritePaimonPartitionTable {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建Paimon分区表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS user_par (
        |    id INT,
        |    name STRING,
        |    dt STRING,
        |    hh STRING,
        |    PRIMARY KEY (id, dt, hh) NOT ENFORCED
        |) PARTITIONED BY (dt, hh)
        |""".stripMargin)

    //向Paimon分区表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO user_par(id,name,dt,hh)
        |VALUES (1,'jack','20230101','10'),(2,'tom','20230101','11')
        |""".stripMargin)

  }

}

执行代码。

此时到hdfs中可以看到这个表的底层数据是按照分区字段作为目录名称来存储的。

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/user_par/dt=20230101
Found 4 items
drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:08 /paimon/default.db/user_par/dt=20230101/hh=10
drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:08 /paimon/default.db/user_par/dt=20230101/hh=11

接下来我们来看一下如何在Flink SQL中从Paimon分区表中读取数据。
其实读取数据这一块是没什么特殊之处的,不管是内部表、外部表还是分区表。

只是针对分区表,如果想要提高查询效率,则需要在where子句中指定分区字段。

创建object:FlinkSQLReadPaimonPartitionTable

代码如下:

package tech.xuwei.paimon.tabletype

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用FlinkSQL从Paimon分区表中读取数据
 * Created by xuwei
 */
object FlinkSQLReadPaimonPartitionTable {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行查询,并且打印输出结果
    tEnv.executeSql(
      """
        |SELECT
        |    *
        |FROM `paimon_catalog`.`default`.`user_par`
        |WHERE dt = '20230101' AND hh in ('10','11')
        |""".stripMargin)
      .print()


  }

}

执行代码,可以看到如下结果:

+----+-------------+--------------------------------+--------------------------------+--------------------------------+
| op |          id |                           name |                             dt |                             hh |
+----+-------------+--------------------------------+--------------------------------+--------------------------------+
| +I |           1 |                           jack |                       20230101 |                             10 |
| +I |           2 |                            tom |                       20230101 |                             11 |

这就是Paimon分区表的使用了。

3.1.4 临时表

最后我们来看一下临时表,前面我们已经用过临时表了。

临时表是由Paimon Catalog临时进行记录,但是不由它管理。删除临时表时也不会删除表中的数据文件。

也就是说这个临时表的元数据信息只会临时存储在Paimon Catalog里面,当Flink SQL任务执行结束之后,这个临时表的元数据信息就会被删除了。

注意:其实这个临时表的特性是Flink SQL提供的,所以目前只能在Flink SQL中使用临时表。

临时表的典型应用场景是这样的,就是我们想要在Paimon Catalog里面使用其他类型的表。
因为在Paimon Catalog里面定义表的时候是不允许指定connector属性的,所以说如果我们想要通过connector指定kafka或者其他类型的数据存储系统,就需要定义临时表了。

下面我们来看一个例子,加深我们对临时表应用场景的理解:

首先我们创建了Paimon Catalog,并且进入了这个Catalog里面。

CREATE CATALOG paimon_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 'hdfs:///path/to/warehouse'
);

USE CATALOG paimon_catalog;

然后我们在Paimon Catalog里面创建了两个表:t1t2

CREATE TABLE t1(...) WITH (...);

CREATE TEMPORARY TABLE t2(
    id INT,
    name STRING
) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs://.../data.json',
    'format' = 'json'
);

注意:t1是Paimon类型的表。t2是临时表。

t2表中的数据来源于hdfs中的json数据文件,所以此时我们在Paimon Catalog里面创建t2表的时候就需要使用临时表了,因为我们需要通过connector指定数据的存储位置为filesystem。
如果不使用临时表,在Paimon Catalog里面创建t2的时候就不能使用connector属性了。

最后就可以在Paimon Catalog里面直接操作这两个表了。

SELECT t1.id,t2.name,t1.age FROM t1 JOIN t2 ON t1.id = t2.id;

如果我不想使用临时表,但是还想实现这个需求,应该怎么做呢?

其实也很简单,我们只需要在默认的default catalog里面创建表t2即可。

但是需要注意,后续我们在同时使用表t1t2的时候,就需要指定表的完整名称了。
类似于FlinkSQLWriteToPaimon这个案例。

//向目的地表中写入数据
tEnv.executeSql(
  """
    |INSERT INTO `paimon_catalog`.`default`.`wc_sink_sql`
    |SELECT
    |    word,
    |    COUNT(*) as cnt
    |FROM `default_catalog`.`default_database`.`word_source`
    |GROUP BY word
    |""".stripMargin).print()
  • 如果是在paimon catalog里面执行这个SQL,则需要给word_source这个表指定完整名称。
  • 如果是在的default catalog里面执行这个SQL,则需要给wc_sink_sql这个表指定完整名称。

这就是临时表的典型应用场景。

3.2 存储维度

从存储维度来看,Paimon中的表可以分为两种:

  • Primary Key表,也可以称之为主键表。
  • Append Only表,也可以称之为仅追加表。

这两种表其实很好区分,如果表中定义的有主键字段,则是主键表;如果表中没有定义主键字段,则是仅追加表。

下面我们来详细分析一下。

3.2.1 Primary Key表(主键表)

主键表中包含主键字段,可以支持新增、更新和删除表中的数据。

注意:主键可以由一个或者多个字段组成。

主键表其实我们前面已经使用过了,就是在建表语句中通过PRIMARY KEY来指定主键字段。

主键表中还包含了多个高级特性:

  • Bucket
  • Changelog Producers
  • Merge Engines
  • Sequence Field

下面我们来具体看一下这些高级特性。

3.2.1.1 Bucket

Bucket:可以翻译为桶。

Bucket是表中数据读写的最小存储单元,所以Bucket的数量限制了读写的并行度,最终会影响读写性能,每个Bucket目录内部会包含一棵LSM树。

注意:LSM树是一种数据结构,Paimon采用了LSM树作为其文件存储的数据结构。

主键表目前支持两种Bucket mode(模式):

  • 1:Fixed Bucket mode:属于固定Bucket数量模式,也就是需要我们手工指定Bucket的数量。我们在建表时默认使用的就是这种模式,Bucket参数的值默认为1。我们只需要给Bucket设置一个大于0的数值即可。但是需要注意:Bucket数量过大会导致小文件过多,影响读取性能;Bucket数量过小会影响写入性能。一般情况下,每个Bucket中的数据量推荐为1G左右。
  • 2:Dynamic Bucket mode:属于动态Bucket数量模式,也就是说Bucket的数量是动态变化的。此时我们需要在建表时指定'bucket' = '-1',此时会由Paimon动态维护索引,将每个Bucket中的数据条数控制在2000000(2百万)以下,这个数值是由dynamic-bucket.target-row-num这个参数控制的。但是需要注意,目前这种模式属于实验性质,暂时不建议在生产环境下使用。

下面我们通过一个案例来具体感受一下Bucket。

创建package:tech.xuwei.paimon.bucket
创建object:BucketDemo

代码如下:

package tech.xuwei.paimon.bucket

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 验证Bucket特性
 * Created by xuwei
 */
object BucketDemo {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建Paimon表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS bucket_test (
        |    word STRING,
        |    cnt BIGINT,
        |    PRIMARY KEY (word) NOT ENFORCED
        |)WITH(
        |    'bucket' = '2' -- 手工指定bucket的值,默认为1
        |)
        |""".stripMargin)

    //查看最完整的建表语句
    tEnv.executeSql("SHOW CREATE TABLE bucket_test").print()

    //向表中添加数据
    tEnv.executeSql(
      """
        |INSERT INTO bucket_test(word,cnt)
        |VALUES('a',1) , ('b',2) , ('c',1) , ('d',3)
        |""".stripMargin)

  }

}

执行代码,可以看到输出的完整建表语句信息:

CREATE TABLE `paimon_catalog`.`default`.`bucket_test` (
  `word` VARCHAR(2147483647) NOT NULL,
  `cnt` BIGINT,
  CONSTRAINT `53dc473d-3e56-4e13-ac8b-a4f3c9abb72b` PRIMARY KEY (`word`) NOT ENFORCED
) WITH (
  'bucket' = '2',
  'path' = 'hdfs://bigdata01:9000/paimon/default.db/bucket_test'
)

接下来我们到hdfs中查看一个这个表的Bucket信息:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/bucket_test
Found 5 items
drwxr-xr-x   - yehua supergroup          0 2028-11-29 17:54 /paimon/default.db/bucket_test/bucket-0
drwxr-xr-x   - yehua supergroup          0 2028-11-29 17:54 /paimon/default.db/bucket_test/bucket-1
drwxr-xr-x   - yehua supergroup          0 2028-11-29 17:54 /paimon/default.db/bucket_test/manifest
drwxr-xr-x   - yehua supergroup          0 2028-11-29 17:54 /paimon/default.db/bucket_test/schema
drwxr-xr-x   - yehua supergroup          0 2028-11-29 17:54 /paimon/default.db/bucket_test/snapshot

此时可以看到,这个表中包含2个bucket目录,这两个bucket目录中存储的就是这个表中的所有数据。

bucket目录内部都是一些data数据文件,里面就是真实的数据内容了:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/bucket_test/bucket-0
Found 1 items
-rw-r--r--   3 yehua supergroup        545 2028-11-29 17:54 /paimon/default.db/bucket_test/bucket-0/data-8475e141-1725-489f-bd09-13606fd2302f-0.orc

咱们之前创建的表没有手工指定bucket,那么bucket默认为1,可以到表wc_sink_sql里面看一下,这个表里面存储了多条数据,但是只有1个bucket:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink_sql
Found 4 items
drwxr-xr-x   - yehua supergroup          0 2028-11-29 11:53 /paimon/default.db/wc_sink_sql/bucket-0
drwxr-xr-x   - yehua supergroup          0 2028-11-29 11:53 /paimon/default.db/wc_sink_sql/manifest
drwxr-xr-x   - yehua supergroup          0 2028-11-29 11:44 /paimon/default.db/wc_sink_sql/schema
drwxr-xr-x   - yehua supergroup          0 2028-11-29 11:53 /paimon/default.db/wc_sink_sql/snapshot

所以在实际工作中,设置合适的bucket数量就是非常重要的了。
但是在不同的时期,bucket的数量大概率是需要调整的,因为合适的bucket数量不可能是一成不变的,随着数据量的增加,bucket的数量也需要增大。

官方支持后期手工调整表中的bucket数量,但是想要对bucket中的数据进行重新分布,则只能通过离线流程来完成,也就是说需要跑一个离线任务来重新对bucket中的数据进行分布。

3.2.1.2 Changelog Producers

Changelog Producers:可以翻译为变更日志生产者。

Paimon表中存储数据的时候,除了存储数据本身,还可以选择存储数据的变更日志,也就是Changelog。

Changelog 主要应用在流读场景,在构建实时数据仓库的过程中,我们需要通过流读取上游的数据写入到下游,完成数仓各层之间的数据同步,让整个数仓的数据实时地流动起来。

如果上游数据来源于MySQL的 Binlog 日志,这样是可以直接提供完整的 Changelog 以供流来读取的。

但是针对湖仓一体架构,数仓分层是在Paimon里面实现的,数据会以表格的形式存储在文件系统中。
如果下游的Flink任务要通过流读取Paimon表中的数据,则需要Paimon的存储系统帮助生成 Changelog,以便下游流读。
此时就需要我们在建表时指定参数changelog-producer来决定在何时以何种方式生成Changelog。

如果不指定Changelog Producer则不会向Paimon表中写入数据的时候生成 Changelog,那么下游任务需要在流读时生成一个Changelog Normalize物化节点来产生Changelog。
这种方式的成本相对比较高,并且官方也不建议这样使用,因为下游任务会在状态中维护一份全量的数据,也就是说每条数据都需要保存在状态中便于任务在执行时生成Changelog。

可能大家在这会有一个疑问,为什么一定需要Changelog呢?

因为通过Changelog可以记录数据的中间变化,针对某些计算逻辑,我们需要知道数据之前的历史值是什么,这样才能得到正确的结果。

例如:我们接收到的数据中包含了相同主键的多条 INSERT 数据,这样会导致下游的流聚合任务有问题,因为相同主键的多条数据应该被认为是更新,而不是重复累加计算。

Paimon 支持的 Changelog Produers主要包括这几种:None、Input、Lookup和Full Compaction

下面我们来详细分析一下:

(1)None

如果不指定changelog-producer,默认就是 none,此时存储数据的时候不会存储数据的Changelog,后期读取数据时会动态生成Changelog,成本较高,不建议使用。
在这里插入图片描述

看这个图,此时这个数据源可以是任意类型的数据源,假设数据源依次产生了+I,+U,-D类型的数据,其实这里面缺少了-U类型的数据,我们通过Paimon 的SinkWriter组件将这些数据写入到了Paimon表中。

注意:此时这个Paimon表中配置的changelog-producer参数的值为none

此时在向Paimon表中写入数据的时候,这个表只会存储数据本身,不会存储数据的Changelog。

当我们再通过一个任务从这个Paimon表中读取数据的时候,这个任务只能读取到+I、+U和-D类型的数据,但是这个任务会产生一个Changelog Normalize物化节点来自己生成数据的Changelog,但是这个操作是非常昂贵的,因为它需要在状态中维护数据的所有历史变化情况来生成数据的Changelog。最终是可以获取到完整的+I、-U、+U、-D类型的数据的。

下面我们来通过一个案例具体演示一下建表语句中指定changelog-producer=none时的效果。

创建package:tech.xuwei.paimon.changelogproducer.none

创建object:FlinkDataStreamWriteToPaimonForNone

这个Object负责向Paimon表中模拟写入数据。

代码如下:

package tech.xuwei.paimon.changelogproducer.none

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForNone {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
      //Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
      //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
    )(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING().notNull())//主键非空
      .column("age", DataTypes.INT())
      .primaryKey("name")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `changelog_none` (
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'changelog-producer' = 'none' -- 注意:值为none时这一行配置可以省略不写
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `changelog_none`
        |SELECT name,age FROM t1
        |""".stripMargin)
  }

}

注意:在执行代码的时候通过修改env.fromElements(...)中的注释来实现实时产生多种类型数据的效果。

接下来创建Object:FlinkDataStreamReadFromPaimonForNone

这个Object负责从Paimon表中实时读取数据。

注意:为了便于在本地观察Flink读取数据任务中自动生成的Changelog Normalize物化节点,所以我们需要在代码中开启本地WebUI功能。

先在pom.xml中引入相关的依赖:

<!-- Flink 本地Web页面 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>1.15.0</version>
    <!--<scope>provided</scope>-->
</dependency>

代码如下:

package tech.xuwei.paimon.changelogproducer.none

import java.time.ZoneId

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimonForNone {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,"8081")
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)

    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行SQL查询,打印输出结果
    val execSql =
      """
        |SELECT * FROM `changelog_none` -- 此时默认只能查到数据的最新值
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

  }

}

接下来先运行FlinkDataStreamWriteToPaimonForNone向Paimon表中写入+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForNone负责读取数据。
此时可以看到控制台输出如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          10 |

修改FlinkDataStreamWriteToPaimonForNone中的代码,继续执行,向Paimon表中写入+U类型的数据。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
  Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
  //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForNone的控制台看到如下结果:

| -U |                           jack |          10 |
| +U |                           jack |          11 |

注意:虽然我们向Paimon表中只写入了+U类型的数据,但是此时从Paimon表中读取数据的时候是可以产生-U类型的数据的。

为什么会这样呢?
此时这个Paimon表中设置的'changelog-producer' = 'none',我们在向这个表中写入数据的时候,他只会存储数据本身,不会存储changelog数据,可以到这个表对应的hdfs 数据目录中确认一下:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/changelog_none/bucket-0
Found 2 items
-rw-r--r--   3 yehua supergroup        582 2028-12-10 17:19 /paimon/default.db/changelog_none/bucket-0/data-2449a680-5a0b-4496-970a-5a0fdac78cfc-0.orc
-rw-r--r--   3 yehua supergroup        566 2028-12-10 17:16 /paimon/default.db/changelog_none/bucket-0/data-c3e8ff17-6c86-4c0b-95d7-82a1f2091a5e-0.orc

注意:此时只能看到两个data开头的数据文件,没有changelog开头的数据文件,这说明我们的配置生效了。

咱们之前在创建user表的时候指定了'changelog-producer' = 'input',这种情况下是会在数据目录中保存changelog数据文件的,可以到user表的hdfs目录中看一下:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/user/bucket-0     Found 2 items
-rw-r--r--   3 yehua supergroup        617 2028-11-28 17:50 /paimon/default.db/user/bucket-0/changelog-eb48583e-f8c9-424f-9000-05a2bbce7a2b-0.orc
-rw-r--r--   3 yehua supergroup        581 2028-11-28 17:50 /paimon/default.db/user/bucket-0/data-eb48583e-f8c9-424f-9000-05a2bbce7a2b-1.orc

注意:这里面有一个changelog开头的文件,所以这里面存储的就是changelog数据。

那我们现在创建的changelog_none这个表设置的是'changelog-producer' = 'none',所以它不会存储changelog数据,那为什么在读取数据的时候也可以读取到changelog数据呢?

咱们前面解释过,如果changelog-producer设置为none或者不设置,那么下游任务会在流读时生成一个Changelog Normalize物化节点来产生Changelog。

其实这个Changelog Normalize物化节点我们也可以到Flink任务的Web UI界面中查看验证一下:
在这里插入图片描述

在这可以看到,Flink任务中确实会产生一个Changelog Normalize物化节点,所以此时我们看到的-U类型的Changelog变更数据就是这个物化节点产生的。

他具体是如何实现这种效果的呢?
其实也很简单,他只需要在状态中维护接收到的每一条历史数据即可,如果接收到了相同主键的多条数据,那么它就知道是发生了数据更新这种行为,对应的就会补全-U和+U这种形式的数据。但是我们前面说了,这种方式成本较高,不建议使用,因为它需要在状态中维护数据的所有历史值。

接下来我们继续运行FlinkDataStreamWriteToPaimonForNone向Paimon表中写入-D类型的数据。

val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
  //Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
  Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForNone的控制台看到如下结果:

| -D |                           jack |          11 |

到目前为止,我们向这个Paimon表中写入了3条数据,+I,+U和-D

下面我们停止FlinkDataStreamReadFromPaimonForNone这个实时读取任务。

停止了之后,再重新运行FlinkDataStreamReadFromPaimonForNone这个实时读取任务,可以看到如下结果:

结果什么也没有看到,为什么呢?

因为此时我们执行的SQL查询语句默认只能查到数据的最新值,但是数据最新的情况是被删除了,所以什么也没有查到,这是正常的,也是正确的。

如果想要从头查看,需要指定scan相关的参数:

val execSql =
  """
    |SELECT * FROM `changelog_none` -- 此时默认只能查到数据的最新值
    |/*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
    |""".stripMargin

重新运行FlinkDataStreamReadFromPaimonForNone这个实时读取任务,此时可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          10 |
| -U |                           jack |          10 |
| +U |                           jack |          11 |
| -D |                           jack |          11 |

因为是从头读取的数据,所以借助于Changelog Normalize物化节点,在状态中维护历史接收到的数据,这样就可以获取到完整的Changelog数据了。

(2)Input

如果将changelog-producer指定为input,表示在向Paimon表中存储数据的时候会将数据源中的Changelog也存储到Paimon表中的Changelog文件中。

典型应用场景是这样的:数据源是MySQL的binlog日志,此时数据源中具有完整的Changelog,所以可以完全依赖数据源中的Changelog,并且后续可以将这份Changelog提供给下游任务读取时使用。这样下游任务读取数据时就不需要产生Changelog Normalize物化节点了。

注意:如果我们把MySQL的binlog日志实时写入到了Kafka中,那么Kafka中存储的数据也相当于具有了完整的Changelog,此时在从Kafka这个数据源中读取数据的时候也是可以将changelog-producer设置为input的。
在这里插入图片描述

看这个图,当我们通过Flink CDC去采集数据库中的数据的时候,是可以获取到数据库中的所有Changelog变更日志数据的,所以里面会包含完整的+I、-U、+U、-D这些类型的数据。

此时在Paimon中创建表的时候,就可以指定changelog-producer=input,这样在存储数据的时候就会单独存储一份Changelog File。

下游任务在从Paimon表中读取数据的时候就不需要再产生Changelog Normalize物化节点生成Changelog了,直接从Paimon表中读取Changelog File即可获取到完整的Changelog数据。

下面我们来具体演示一下建表语句中指定changelog-producer=input时的效果

创建package:tech.xuwei.paimon.changelogproducer.input

创建object:FlinkDataStreamWriteToPaimonForInput

这个Object负责向Paimon表中模拟写入数据。

代码如下:

package tech.xuwei.paimon.changelogproducer.input

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForInput {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
      //Row.ofKind(RowKind.UPDATE_BEFORE, "jack", Int.box(10))//-U
      //Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
      //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
    )(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING().notNull())//主键非空
      .column("age", DataTypes.INT())
      .primaryKey("name")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `changelog_input` (
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'changelog-producer' = 'input'
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `changelog_input`
        |SELECT name,age FROM t1
        |""".stripMargin)
  }

}

注意:在执行代码的时候通过修改env.fromElements(...)中的注释来实现实时产生多种类型数据的效果。

接下来创建Object:FlinkDataStreamReadFromPaimonForInput

这个Object负责从Paimon表中实时读取数据。

代码如下:

package tech.xuwei.paimon.changelogproducer.input

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimonForInput {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,"8081")
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)


    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行SQL查询,打印输出结果
    val execSql =
      """
        |SELECT * FROM `changelog_input` -- 此时默认只能查到数据的最新值
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

  }

}

接下来先运行FlinkDataStreamWriteToPaimonForInput向Paimon表中写入+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForInput负责读取数据。
此时可以看到控制台输出如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          10 |

来看一下这个Flink任务的Web UI界面
在这里插入图片描述

在这可以发现,此时这个任务中没有产生Changelog Normalize物化节点,因为我们在Paimon表中指定了changelog-producer=input,所以这个Paimon表内部会自己存储Changelog数据。

此时到这个Paimon表的hdfs数据目录中查看一下:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/changelog_input/bucket-0
Found 2 items
-rw-r--r--   3 yehua supergroup        566 2028-12-11 11:14 /paimon/default.db/changelog_input/bucket-0/changelog-bc3740e4-6adf-4e94-9d4e-c1ece10ed114-0.orc
-rw-r--r--   3 yehua supergroup        566 2028-12-11 11:14 /paimon/default.db/changelog_input/bucket-0/data-bc3740e4-6adf-4e94-9d4e-c1ece10ed114-1.orc

在这里可以发现里面有两个文件,一个以data开头的文件,里面存储的是数据自身。还有一个以changelog开头的文件,里面存储的是changelog变更数据。

修改FlinkDataStreamWriteToPaimonForInput中的代码,继续执行,向Paimon表中写入-U类型的数据。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
  Row.ofKind(RowKind.UPDATE_BEFORE, "jack", Int.box(10))//-U
  //Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
  //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForInput的控制台看到如下结果:

| -U |                           jack |          10 |

再修改FlinkDataStreamWriteToPaimonForInput中的代码,继续执行,向Paimon表中写入+U类型的数据。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
  //Row.ofKind(RowKind.UPDATE_BEFORE, "jack", Int.box(10))//-U
  Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
  //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForInput的控制台看到如下结果:

| +U |                           jack |          11 |

再修改FlinkDataStreamWriteToPaimonForInput中的代码,继续执行,向Paimon表中写入-D类型的数据。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
  //Row.ofKind(RowKind.UPDATE_BEFORE, "jack", Int.box(10))//-U
  //Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
  Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForInput的控制台看到如下结果:

| -D |                           jack |          11 |

下面我们停止FlinkDataStreamReadFromPaimonForInput这个实时读取任务。

停止了之后,修改一下代码,因为默认只会读取最新的数据快照

val execSql =
  """
    |SELECT * FROM `changelog_input` -- 此时默认只能查到数据的最新值
    |/*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
    |""".stripMargin

再重新运行FlinkDataStreamReadFromPaimonForNone这个实时读取任务,可以看到这个结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          10 |
| -U |                           jack |          10 |
| +U |                           jack |          11 |
| -D |                           jack |          11 |

注意:此时可以看到完整的数据变更情况,这是依赖于Paimon表中存储的changelog文件实现的,没有依赖于Flink任务中的Changelog Normalize物化节点。

所以说,如果我们数据源中可以提供完整的changelog数据,那么建议给存储数据的Paimon表设置changelog-producer=input,这样下游任务读取这个Paimon表的时候就可以直接从表中changelog文件里面获取变更数据了,不需要自己维护,效率比较高。

(3)Lookup

如果数据源中没有提供完整的 Changelog,并且我们也不想让下游任务在读取数据时通过Changelog Normalize物化节点来生成,那么这个时候我们可以考虑在Paimon表中配置 changelog-producer=lookup

这样可以通过Lookup(查找)的方式在向Paimon表中写入数据的时候生成 Changelog。

但是需要注意:Lookup这种方式目前处于实验阶段,还没有经过大量的生产环境验证。

在这里插入图片描述

看这个图,此时这个数据源中没有提供完整的Changelog,这个数据源可以是任意类型的数据源,数据源中可能只有+I、+U、-D类型的数据,缺少了-U类型的数据。

但是由于我们在Paimon表中设置了changelog-producer=lookup,所以在通过SinkWriter向Paimon表中写入数据的时候,底层会通过Lookup的方式查找表中已有的数据,自动生成Changelog File,补全-U类型的变更日志。

这样下游任务在读取这个Paimon表的时候就可以直接从表对应的Changelog File中读取到完整的+I、-U、+U、-D类型的数据了。

下面我们来具体演示一下建表语句中指定changelog-producer=lookup时的效果

创建package:tech.xuwei.paimon.changelogproducer.lookup

基于创建Object:FlinkDataStreamWriteToPaimonForLookup

这个Object负责向Paimon表中模拟写入数据。

代码如下:

package tech.xuwei.paimon.changelogproducer.lookup

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForLookup {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
      //Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
      //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
    )(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING().notNull())//主键非空
      .column("age", DataTypes.INT())
      .primaryKey("name")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `changelog_lookup` (
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'changelog-producer' = 'lookup'
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `changelog_lookup`
        |SELECT name,age FROM t1
        |""".stripMargin)
  }

}

注意:在执行代码的时候通过修改env.fromElements(...)中的注释来实现实时产生多种类型数据的效果。

接下来创建Object:FlinkDataStreamReadFromPaimonForLookup

这个Object负责从Paimon表中实时读取数据。

代码如下:

package tech.xuwei.paimon.changelogproducer.lookup

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimonForLookup {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,"8081")
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)


    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行SQL查询,打印输出结果
    val execSql =
      """
        |SELECT * FROM `changelog_lookup` -- 此时默认只能查到数据的最新值
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

  }

}

接下来先运行FlinkDataStreamWriteToPaimonForLookup向Paimon表中写入+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForLookup负责读取数据。
此时可以看到控制台输出如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          10 |

来看一下这个Flink任务的Web UI界面
在这里插入图片描述

在这可以发现,此时这个任务中没有产生Changelog Normalize物化节点,因为我们在Paimon表中指定了changelog-producer=lookup,Changelog数据会在我们向Paimon表中写入数据的时候通过Lookup产生。

到这个Paimon表的hdfs数据目录里面查看一下:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/changelog_lookup/bucket-0
Found 3 items
-rw-r--r--   3 yehua supergroup        566 2028-12-11 12:01 /paimon/default.db/changelog_lookup/bucket-0/changelog-edb23cdc-09be-4437-b2ac-716e06e25c6d-1.orc
-rw-r--r--   3 yehua supergroup        566 2028-12-11 12:01 /paimon/default.db/changelog_lookup/bucket-0/data-edb23cdc-09be-4437-b2ac-716e06e25c6d-0.orc
-rw-r--r--   3 yehua supergroup        566 2028-12-11 12:01 /paimon/default.db/changelog_lookup/bucket-0/data-f07e00b5-a815-4d64-b8d6-1b8a2e64dab6-0.orc

在这可以发现,里面有1个changelog开头的文件,这个就是Lookup产生的。

修改FlinkDataStreamWriteToPaimonForLookup中的代码,继续执行,向Paimon表中写入+U类型的数据。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
  Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
  //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForLookup的控制台看到如下结果:

| -U |                           jack |          10 |
| +U |                           jack |          11 |

注意:虽然我们向Paimon表中只写入了+U类型的数据,但是Lookup在生成changelog的时候会自动补全-U类型的数据。

后面的-D类型的数据就不再演示了,效果和前面是一样的。

所以说,Lookup这种方式属于一种折中方案,数据源里面无法提供完整的changelog变更日志,所以无法使用Input,但是我们还想摆脱昂贵的Changelog Normalize物化节点,这个时候就可以考虑Lookup了。

最后还需要注意,Lookup这种方式虽然不需要产生Changelog Normalize物化节点,但是他在生成Changelog的时候依然会消耗一部分资源的,因为它需要触发数据查找这个过程,只不过消耗的资源比Changelog Normalize物化节点这种方式低一些。

(4)Full Compaction

如果你的数据源无法提供完整的changelog变更日志数据,并且你觉得Lookup这种方式还是比较消耗资源,此时可以考虑使用Full Compaction这种方式,在创建Paimon表的时候指定changelog-producer=full-compaction

Full Compaction这种方式可以解耦写入数据和生成changelog这两个步骤。
也就是说我们会先把数据写入到Paimon表中,当表中的数据触发完全压缩之后,Paimon 会比较两次完全压缩之间的结果并生成差异作为changelog(变更日志),生成changelog的延迟会受到完全压缩频率的影响。

通过指定full-compaction.delta-commits表属性,表示在增量提交Checkpoint后将会触发完全压缩。默认情况下值为1,所以每次提交Checkpoint都会进行完全压缩并生成changelog。
这样其实对生成changelog的延迟没有特别大的影响。

Full Compaction这种方式可以为任何类型的数据源生成完整的changelog变更日志。但是它没有Input方式的效率高,并且生成changelog的延迟可能会比较高。

不过Full Compaction这种方式解耦了写入数据和生成changelog这两个步骤,他的资源消耗比Lookup这种方式要低一些。
在这里插入图片描述

看这个图,此时这个数据源中没有提供完整的Changelog,这个数据源可以是任意类型的数据源,数据源中可能只有+I、+U、-D的数据,缺少了-U类型的数据。

但是由于我们在Paimon表中设置了changelog-producer=full-compaction,所以Paimon会周期性的比较两次完全压缩(Full Compaction)之间的结果并生成差异作为changelog(变更日志),并且在Changelog中补全缺失的变更日志。

这样下游任务在读取这个Paimon表的时候就可以从表对应的Changelog File中读取到完整的+I、-U、+U、-D类型的数据了。

下面我们来具体演示一下建表语句中指定changelog-producer=full-compaction时的效果

创建package:tech.xuwei.paimon.changelogproducer.fullcompaction

创建object:FlinkDataStreamWriteToPaimonForFullcompaction

这个Object负责向Paimon表中模拟写入数据。

代码如下:

package tech.xuwei.paimon.changelogproducer.fullcompaction

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForFullcompaction {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
      //Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
      //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
    )(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING().notNull())//主键非空
      .column("age", DataTypes.INT())
      .primaryKey("name")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `changelog_fullcompaction` (
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'changelog-producer' = 'full-compaction',
        |    'full-compaction.delta-commits' = '1'
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `changelog_fullcompaction`
        |SELECT name,age FROM t1
        |""".stripMargin)
  }

}

注意:在执行代码的时候通过修改env.fromElements(...)中的注释来实现实时产生多种类型数据的效果。

接下来创建Object:FlinkDataStreamReadFromPaimonForFullcompaction

这个Object负责从Paimon表中实时读取数据。

代码如下:

package tech.xuwei.paimon.changelogproducer.fullcompaction

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimonForFullcompaction {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,"8081")
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)


    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行SQL查询,打印输出结果
    val execSql =
      """
        |SELECT * FROM `changelog_fullcompaction` -- 此时默认只能查到数据的最新值
        |--/*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

  }

}

接下来先运行FlinkDataStreamWriteToPaimonForFullcompaction向Paimon表中写入+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForFullcompaction负责读取数据。
此时可以看到控制台输出如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          10 |

来看一下这个Flink任务的Web UI界面
在这里插入图片描述

在这可以发现,此时这个任务中没有产生Changelog Normalize物化节点,其实只有我们把Changelog Producer设置为none的时候Flink任务才会产生Changelog Normalize物化节点。

那此时我们到这个Paimon表的hdfs数据目录里面查看一下有没有产生changelog文件:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/changelog_fullcompaction/bucket-0
Found 3 items
-rw-r--r--   3 yehua supergroup        566 2028-12-11 16:20 /paimon/default.db/changelog_fullcompaction/bucket-0/changelog-264c4b74-10dd-493d-95e0-8f5760e90dc8-1.orc
-rw-r--r--   3 yehua supergroup        566 2028-12-11 16:20 /paimon/default.db/changelog_fullcompaction/bucket-0/data-264c4b74-10dd-493d-95e0-8f5760e90dc8-0.orc
-rw-r--r--   3 yehua supergroup        566 2028-12-11 16:20 /paimon/default.db/changelog_fullcompaction/bucket-0/data-d7adcc2a-804a-4a13-876a-fb77dc4a0952-0.orc

在这可以发现,里面有1个changelog开头的文件,这个就是Full Compaction这种方式产生的。

修改FlinkDataStreamWriteToPaimonForFullcompaction中的代码,继续执行,向Paimon表中写入+U类型的数据。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
  Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
  //Row.ofKind(RowKind.DELETE, "jack", Int.box(11))//-D
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForFullcompaction的控制台看到如下结果:

| -U |                           jack |          10 |
| +U |                           jack |          11 |

注意:这块可能会有一些延迟,具体的延迟程度要看完全压缩触发的频率,我们前面指定了full-compaction.delta-commits的值为1,表示在每次提交Checkpoint都会进行完全压缩并生成changelog,所以目前的延迟是比较低的。

但是我们需要注意:完全压缩是一个资源密集型的过程,会消耗一定的CPU磁盘IO,因此过于频繁的完全压缩可能会导致写入速度变慢,所以这块也需要均衡考虑。

后面的-D类型的数据就不再演示了,效果和前面是一样的。

(5)总结
咱们前面一共讲了4种Changelog Producer。

  • 在实际工作中None这种方式基本上是不使用的,成本太高。
  • 如果数据源是完整的CDC数据,直接使用Input这种方式即可,成本最低,效率最高。
  • 如果数据源中无法提供完整的Changelog,此时可以考虑使用Lookup和Full Compaction。
  • 如果你觉得使用Lookup来实时生成 Changelog 成本过大,可以考虑通过Full Compaction和对应较大的延迟,以非常低的成本生成 Changelog。
3.2.1.3 Merge Engines

Merge Engines:可以翻译为合并引擎。

针对多条相同主键的数据,Paimon主键表收到之后,应该如何进行合并处理?

针对这块的处理逻辑,Paimon提供了参数merge-engine,通过这个参数来指定如何合并数据。

merge-engine一共支持3种取值:

  • deduplicate:默认值,表示去重,也就是说主键表默认只会保留相同主键最新的数据。
  • partial-update:表示局部更新,通过相同主键的多条数据来更新不同字段的值。
  • aggregation:表示聚合,可以对相同主键的多条数据根据指定的字段进行聚合。

下面我们来详细分析一下这几种合并引擎。

(1)Deduplicate

如果我们在Paimon中创建主键表时不指定merge-engine参数,那么默认值就是deduplicate

此时只保留主键最新的数据,之前表中相同主键的数据会被丢弃。

注意:如果主键最新的数据是-D类型的,那么这个主键的所有数据都会被删除。

下面我们来具体演示一下。
核心的思路是这样的:我们通过数据源模拟产生2条相同主键的+I类型的数据,依次写入到主键表中,最终发现主键表中只会保留最新的那一条数据。

创建package:tech.xuwei.paimon.mergeengine.deduplicate
创建object:FlinkDataStreamWriteToPaimonForDeduplicate

这个Object负责向Paimon表中模拟写入数据。
代码如下:

package tech.xuwei.paimon.mergeengine.deduplicate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForDeduplicate {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
      //Row.ofKind(RowKind.INSERT, "jack", Int.box(12))//+I
    )(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING().notNull())//主键非空
      .column("age", DataTypes.INT())
      .primaryKey("name")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `merge_engine_deduplicate` (
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'merge-engine' = 'deduplicate' -- 注意:值为deduplicate时这一行配置可以省略不写
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `merge_engine_deduplicate`
        |SELECT name,age FROM t1
        |""".stripMargin)
  }

}

注意:在执行代码的时候通过修改env.fromElements(...)中的注释来实现实时产生多条+I类型数据的效果。

接下来创建Object:FlinkDataStreamReadFromPaimonForDeduplicate

这个Object负责从Paimon表中实时读取数据。

代码如下:

package tech.xuwei.paimon.mergeengine.deduplicate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimonForDeduplicate {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,"8081")
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)


    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行SQL查询,打印输出结果
    val execSql =
      """
        |SELECT * FROM `merge_engine_deduplicate` -- 此时默认只能查到数据的最新值
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

  }

}

接下来先运行FlinkDataStreamWriteToPaimonForDeduplicate向Paimon表中写入一条+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForDeduplicate负责读取数据。
此时可以看到控制台输出如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          10 |

修改FlinkDataStreamWriteToPaimonForDeduplicate中的代码,继续执行,向Paimon表中写入第2条+I类型的数据。
注意:这两条数据的主键是相同的。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10))//+I
//Row.ofKind(RowKind.INSERT, "jack", Int.box(12))//+I
)(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForDeduplicate的控制台看到如下结果:

| -U |                           jack |          10 |
| +U |                           jack |          12 |

从这可以看出来,之前的数据被删除了,新增了一条年龄为12的数据。
所以deduplicate这种表引擎只会保留相同主键最新的数据。

(2)Partial Update

如果我们在Paimon中创建主键表时指定merge-engine的值为partial-update,那么就可以实现局部更新数据字段的效果。

举个例子:使用多个 Flink流任务去更新同一张表,每个流任务只更新一张表的部分列,最终实现一行完整数据的更新。对于需要构建宽表的业务场景,使用partial-update是非常合适的,并且构建宽表的操作也比较简单。

注意:这里所说的多个Flink 流任务并不是指多个Flink Job并发写同一张Paimon表,这样比较麻烦。目前推荐的是将多个Flink流任务 UNION ALL 起来,最终启动一个Flink Job 向Paimon表中写入数据。

还有一点需要注意的是:partial-update这种表引擎不支持流读,需要结合Lookup或者full-compaction变更日志生产者一起使用才可以支持流读。

同时由于partial-update不能接收和处理DELETE消息,为了避免接收到DELETE消息报错,需要在建表语句中配置partial-update.ignore-delete= true表示忽略 DELETE消息。

下面我们来具体演示一下:

核心思路是这样的,准备模拟产生3条+I类型的数据,数据内容大致是这样的。

<jack, 10, 175, null>
<jack, null, null, beijing>
<jack, 11, null, null>

将这3条数据写入到Paimon主键表之后,会得到什么结果呢?
结果是这样的:<jack, 11, 175, beijing>

为什么呢?因为null字段不会覆盖更新字段的值。

创建package:tech.xuwei.paimon.mergeengine.partialupdate
创建object:FlinkDataStreamWriteToPaimonForPartialupdate

这个Object负责向Paimon表中模拟写入数据。
代码如下:

package tech.xuwei.paimon.mergeengine.partialupdate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForPartialupdate {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(10),Int.box(175),null)//+I
      //Row.ofKind(RowKind.INSERT, "jack", null,null,"beijing")//+I
      //Row.ofKind(RowKind.INSERT, "jack", Int.box(11),null,null)//+I
    )(Types.ROW_NAMED(Array("name", "age", "height", "city"),Types.STRING,Types.INT,Types.INT,Types.STRING))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING().notNull())//主键非空
      .column("age", DataTypes.INT())
      .column("height", DataTypes.INT())
      .column("city", DataTypes.STRING())
      .primaryKey("name")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `merge_engine_partialupdate` (
        |    name STRING,
        |    age INT,
        |    height INT,
        |    city STRING,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'merge-engine' = 'partial-update',
        |    'partial-update.ignore-delete' = 'true'
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `merge_engine_partialupdate`
        |SELECT name,age,height,city FROM t1
        |""".stripMargin)
  }

}

注意:在执行代码的时候通过修改env.fromElements(...)中的注释来实现实时产生多条+I类型数据的效果。

接下来创建Object:FlinkDataStreamReadFromPaimonForPartialupdate

这个Object负责从Paimon表中实时读取数据。

代码如下:

package tech.xuwei.paimon.mergeengine.partialupdate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimonForPartialupdate {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,"8081")
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)


    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行SQL查询,打印输出结果
    val execSql =
      """
        |SELECT * FROM `merge_engine_partialupdate` -- 此时默认只能查到数据的最新值
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

  }

}

接下来先运行FlinkDataStreamWriteToPaimonFoPartialupdate向Paimon表中写入一条+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForPartialupdate负责读取数据。
结果发现代码报错了,错误日志如下:

Exception in thread "main" java.lang.RuntimeException: Partial update streaming reading is not supported. You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading.
	at org.apache.paimon.flink.utils.TableScanUtils.streamingReadingValidate(TableScanUtils.java:45)
	at org.apache.paimon.flink.source.FlinkSourceBuilder.build(FlinkSourceBuilder.java:170)
	at org.apache.paimon.flink.source.AbstractDataTableSource.configureSource(AbstractDataTableSource.java:233)
	at org.apache.paimon.flink.source.AbstractDataTableSource.lambda$getScanRuntimeProvider$0(AbstractDataTableSource.java:210)
	at org.apache.paimon.flink.PaimonDataStreamScanProvider.produceDataStream(PaimonDataStreamScanProvider.java:44)

通过错误日志可以看出来,Partial update表引擎默认不支持流读,我们现在在代码中指定了运行模式为STREAMING,就是流式读取的意思。

我们可以在表中指定使用lookup或者full-compaction变更日志生产者来支持流读。

注意:如果不需要流读的话,可以在代码中指定运行模式为BATCH,此时执行是不报错的。

如果想要使用流读,就需要在建表语中修改变更日志生产者了。

//创建Paimon类型的表
tEnv.executeSql(
  """
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS `merge_engine_partialupdate` (
    |    name STRING,
    |    age INT,
    |    height INT,
    |    city STRING,
    |    PRIMARY KEY (name) NOT ENFORCED
    |) WITH (
    |    'changelog-producer' = 'lookup',-- 注意:partial-update表引擎需要和lookup或者full-compaction一起使用时才支持流读
    |    'merge-engine' = 'partial-update',
    |    'partial-update.ignore-delete' = 'true'
    |)
    |""".stripMargin)

merge_engine_partialupdate这个表我们已经创建过了,所以我们需要删除这个表。其实有一种快捷方式,我们直接到HDFS中删除这个表对应的目录其实就可以了。
在这里插入图片描述

接下来继续重新运行FlinkDataStreamWriteToPaimonForPartialupdate向Paimon表中写入一条+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForPartialupdate负责读取数据。
此时可以看到控制台输出如下结果:

+----+--------------------------------+-------------+-------------+--------------------------------+
| op |                           name |         age |      height |                           city |
+----+--------------------------------+-------------+-------------+--------------------------------+
| +I |                           jack |          10 |         175 |                         <NULL> |

注意:此时city字段的值为null

修改FlinkDataStreamWriteToPaimonForDeduplicate中的代码,继续执行,向Paimon表中写入第2条+I类型的数据。
注意:这条数据的主键和前面的数据是相同的。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10),Int.box(175),null)//+I
  Row.ofKind(RowKind.INSERT, "jack", null,null,"beijing")//+I
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(11),null,null)//+I
)(Types.ROW_NAMED(Array("name", "age", "height", "city"),Types.STRING,Types.INT,Types.INT,Types.STRING))

此时可以在FlinkDataStreamReadFromPaimonForPartialupdate的控制台看到如下结果:

| -U |                           jack |          10 |         175 |                         <NULL> |
| +U |                           jack |          10 |         175 |                        beijing |

在这里看到最新数据中的city字段有值了,其实刚才这一条数据相当于局部更新了city字段的值。
注意:其他几个为null的字段不会覆盖之前的字段的值,那也就意味着,如果我们指定了字段的值为null,说明不需要覆盖更新这个字段的值。

修改FlinkDataStreamWriteToPaimonForDeduplicate中的代码,继续执行,向Paimon表中写入第3条+I类型的数据。
注意:这条数据的主键和前面的数据是相同的。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10),Int.box(175),null)//+I
  //Row.ofKind(RowKind.INSERT, "jack", null,null,"beijing")//+I
  Row.ofKind(RowKind.INSERT, "jack", Int.box(11),null,null)//+I
)(Types.ROW_NAMED(Array("name", "age", "height", "city"),Types.STRING,Types.INT,Types.INT,Types.STRING))

此时可以在FlinkDataStreamReadFromPaimonForPartialupdate的控制台看到如下结果:

| -U |                           jack |          10 |         175 |                        beijing |
| +U |                           jack |          11 |         175 |                        beijing |

在这里可以发现,age字段的值被修改为了11,其他字段的值没变。

这样我们就实现了局部更新数据字段的效果,这种业务场景和构建宽表的场景是非常类似的,所以Partial Update这个表引擎适合用于构建宽表的业务场景。

(3)Aggregation

如果我们在Paimon中创建主键表时指定merge-engine的值为aggregation,那么就可以实现指定列数据预聚合的效果了。

此时可以通过聚合函数做一些预聚合,除了主键以外的每个列都可以指定一个聚合函数,相同主键的数据就可以按照列上指定的聚合函数进行相应的预聚合;

常见的聚合函数包括sum、max、min等。

如果没有给列指定聚合函数,则默认使用last-non-null-value这个聚合函数,此时表示只保存最新非空值,空值不会覆盖。

注意:除了sum这个聚合函数,其他的聚合函数都不支持读取回撤数据,为了避免接收到DELETE和UPDATE BEFORE类型的消息报错,我们需要在建表语句中给指定字段进行配置fields.${field_name}.ignore-retract = true 忽略回撤数据。

还有一点需要注意:Aggregation表引擎也需要和Lookup或者full-compaction变更日志生产者一起使用。

下面我们来具体演示一下:

核心思路是这样的,准备模拟产生2条+I类型的数据,数据内容大致是这样的。

<1, 3.4, 10>
<1, 2.12, 15>

解释:<商品id(id),商品价格(price),商品数量(count)>

针对商品价格,我们希望统计最大值,针对商品数量,我们希望统计累加的和。

将这2条数据写入到Paimon主键表之后,会得到什么结果呢?
结果是这样的:<1,3.4,25>

创建package:tech.xuwei.paimon.mergeengine.Aggregation
创建object:FlinkDataStreamWriteToPaimonForAggregation

这个Object负责向Paimon表中模拟写入数据。
代码如下:

package tech.xuwei.paimon.mergeengine.aggregation

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForAggregation {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "1", Double.box(3.4), Int.box(10))//+I
      //Row.ofKind(RowKind.INSERT, "1", Double.box(2.12), Int.box(15))//+I
    )(Types.ROW_NAMED(Array("id", "price", "count"),Types.STRING,Types.DOUBLE,Types.INT))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("id", DataTypes.STRING().notNull())//主键非空
      .column("price", DataTypes.DOUBLE())
      .column("count", DataTypes.INT())
      .primaryKey("id")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `merge_engine_aggregation` (
        |    id STRING,
        |    price DOUBLE,
        |    `count` INT,
        |    PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |    'changelog-producer' = 'lookup',-- 注意:aggregation表引擎需要和lookup或者full-compaction一起使用时才支持流读
        |    'merge-engine' = 'aggregation',
        |    'fields.price.aggregate-function' = 'max',
        |    'fields.count.aggregate-function' = 'sum',
        |    'fields.price.ignore-retract' = 'true'
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `merge_engine_aggregation`
        |SELECT id,price,`count` FROM t1
        |""".stripMargin)
  }

}

注意:count字段需要加反引号转义,否则SQL会报错。

注意:在执行代码的时候通过修改env.fromElements(...)中的注释来实现实时产生多条+I类型数据的效果。

接下来创建Object:FlinkDataStreamReadFromPaimonForAggregation

这个Object负责从Paimon表中实时读取数据。

代码如下:

package tech.xuwei.paimon.mergeengine.aggregation

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimonForAggregation {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,"8081")
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)


    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行SQL查询,打印输出结果
    val execSql =
      """
        |SELECT * FROM `merge_engine_aggregation` -- 此时默认只能查到数据的最新值
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

  }

}

接下来首先运行FlinkDataStreamWriteToPaimonForAggregation向Paimon表中写入一条+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForAggregation负责读取数据。
此时可以看到控制台输出如下结果:

+----+--------------------------------+--------------------------------+-------------+
| op |                             id |                          price |       count |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              1 |                            3.4 |          10 |

修改FlinkDataStreamWriteToPaimonForAggregation中的代码,继续执行,向Paimon表中写入第2条+I类型的数据。
注意:这条数据的主键和前面的数据是相同的。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "1", Double.box(3.4), Int.box(10))//+I
  Row.ofKind(RowKind.INSERT, "1", Double.box(2.12), Int.box(15))//+I
)(Types.ROW_NAMED(Array("id", "price", "count"),Types.STRING,Types.DOUBLE,Types.INT))

此时可以在FlinkDataStreamReadFromPaimonForPartialupdate的控制台看到如下结果:

| -U |                              1 |                            3.4 |          10 |
| +U |                              1 |                            3.4 |          25 |

此时可以发现,price还是3.4,因为price字段是取的最大值;count字段的值变成了25,因为count字段取的是和。

通过Aggregation这个表引擎,可以实现数据写入的时候直接预聚合,可以简化数据聚合统计这块的操作。

3.2.1.4 Sequence Field

默认情况下,Paimon中的主键表会根据数据的输入顺序确定数据的合并顺序,最后输入的数据会在最后进行合并,但是在分布式计算程序中,肯定会存在数据乱序问题的,这样可能会导致数据合并的结果并不是我们期望的。

在Flink中,针对数据乱序问题是通过watermark解决的。

在Paimon的主键表中,可以通过Sequence Field(序列字段)来解决。

针对咱们前面讲到的使用Partial Update表引擎构建宽表的案例,如果数据的写入顺序出现了错乱,肯定会导致结果异常的。

针对这个需求,这3条数据是这样的,默认情况下这3条数据是按照时间先后顺序产生的

1:<jack, 10, 175, null>
2:<jack, null, null, 'beijing'>
3:<jack, 11, null, null>

如果他们按照1、2、3的顺序写入Paimon主键表中,那么可以得到我们期望的结果:<jack, 11, 175, 'beijing'>

如果先写入了第3条数据,再写入1、2条数据,那么结果就是这样的了:<jack, 10, 175, 'beijing'>,这样就不是我们期望看到的结果了。

所以,针对这种问题,可以使用Sequence Field来解决。
我们可以在建表语句中通过参数sequence.field来指定序列字段,一般建议使用时间字段作为序列字段。

这样就算顺序乱了,也不影响最终合并的结果,因为底层在合并数据的时候会把最大值的数据作为最后合并的结果。

下面我们来具体演示一下:

创建package:tech.xuwei.paimon.sequencefield

创建object:FlinkDataStreamWriteToPaimonForSequencefield

代码中需要用到这几条数据的时间戳:

2023-10-01 10:01:00     1696125660000
2023-10-01 10:01:01     1696125661000
2023-10-01 10:01:02     1696125662000

这个Object负责向Paimon表中模拟写入数据。
代码如下:

package tech.xuwei.paimon.sequencefield


import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 使用Flink DataStream API向Paimon表中写入数据
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForSequencefield {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(11),null,null,Long.box(1696125662000L))//+I
      //Row.ofKind(RowKind.INSERT, "jack", Int.box(10),Int.box(175),null,Long.box(1696125660000L))//+I
      //Row.ofKind(RowKind.INSERT, "jack", null,null,"beijing",Long.box(1696125661000L))//+I
    )(Types.ROW_NAMED(Array("name", "age", "height", "city","ts_millis"),Types.STRING,Types.INT,Types.INT,Types.STRING,Types.LONG))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING().notNull())//主键非空
      .column("age", DataTypes.INT())
      .column("height", DataTypes.INT())
      .column("city", DataTypes.STRING())
      .column("ts_millis", DataTypes.BIGINT())
      .primaryKey("name")//指定主键
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `sequence_field` (
        |    name STRING,
        |    age INT,
        |    height INT,
        |    city STRING,
        |    ts_millis BIGINT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |) WITH (
        |    'changelog-producer' = 'lookup',-- 注意:partial-update表引擎需要和lookup或者full-compaction一起使用时才支持流读
        |    'merge-engine' = 'partial-update',
        |    'partial-update.ignore-delete' = 'true',
        |    'sequence.field' = 'ts_millis',
        |    'sequence.auto-padding' = 'millis-to-micro' -- 将序列字段的精度补足到微妙
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `sequence_field`
        |SELECT name,age,height,city,ts_millis FROM t1
        |""".stripMargin)
  }

}

注意:在执行代码的时候通过修改env.fromElements(...)中的注释来实现实时产生多条+I类型数据的效果。

接下来创建Object:FlinkDataStreamReadFromPaimonForSequencefield

这个Object负责从Paimon表中实时读取数据。

代码如下:

package tech.xuwei.paimon.sequencefield

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 使用Flink DataStream API从Paimon表中读取数据
 * Created by xuwei
 */
object FlinkDataStreamReadFromPaimonForSequencefield {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,"8081")
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)


    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //执行SQL查询,打印输出结果
    val execSql =
      """
        |SELECT * FROM `sequence_field` -- 此时默认只能查到数据的最新值
        |-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
        |""".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

  }

}

接下来首先运行FlinkDataStreamWriteToPaimonForSequencefield向Paimon表中写入一条+I类型的数据。

再运行FlinkDataStreamReadFromPaimonForSequencefield负责读取数据。
此时可以看到控制台输出如下结果:

+----+--------------------------------+-------------+-------------+--------------------------------+----------------------+
| op |                           name |         age |      height |                           city |            ts_millis |
+----+--------------------------------+-------------+-------------+--------------------------------+----------------------+
| +I |                           jack |          11 |      <NULL> |                         <NULL> |        1696125662000 |

修改FlinkDataStreamWriteToPaimonForSequencefield中的代码,继续执行,向Paimon表中写入第2条+I类型的数据。
注意:这条数据的主键和前面的数据是相同的。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(11),null,null,Long.box(1696125662000L))//+I
  Row.ofKind(RowKind.INSERT, "jack", Int.box(10),Int.box(175),null,Long.box(1696125660000L))//+I
  //Row.ofKind(RowKind.INSERT, "jack", null,null,"beijing",Long.box(1696125661000L))//+I
)(Types.ROW_NAMED(Array("name", "age", "height", "city","ts_millis"),Types.STRING,Types.INT,Types.INT,Types.STRING,Types.LONG))

此时可以在FlinkDataStreamReadFromPaimonForSequencefield的控制台看到如下结果:

| -U |                           jack |          11 |      <NULL> |                         <NULL> |        1696125662000 |
| +U |                           jack |          11 |         175 |                         <NULL> |        1696125662000 |

注意:此时age字段的值没有被更新,因为这一条数据的时间没有上一条数据的时间大,因为我们指定了序列字段是ts_millis,所以ts_millis时间最大值的数据将是最后合并的结果。

其他为null的字段的值是可以被覆盖更新的。

修改FlinkDataStreamWriteToPaimonForSequencefield中的代码,继续执行,向Paimon表中写入第3条+I类型的数据。
注意:这条数据的主键和前面的数据是相同的。

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(11),null,null,Long.box(1696125662000L))//+I
  //Row.ofKind(RowKind.INSERT, "jack", Int.box(10),Int.box(175),null,Long.box(1696125660000L))//+I
  Row.ofKind(RowKind.INSERT, "jack", null,null,"beijing",Long.box(1696125661000L))//+I
)(Types.ROW_NAMED(Array("name", "age", "height", "city","ts_millis"),Types.STRING,Types.INT,Types.INT,Types.STRING,Types.LONG))

此时可以在FlinkDataStreamReadFromPaimonForSequencefield的控制台看到如下结果:

| -U |                           jack |          11 |         175 |                         <NULL> |        1696125662000 |
| +U |                           jack |          11 |         175 |                        beijing |        1696125662000 |

这样最终得到的结果就是我们期望的了,这就是Sequence Field的典型应用场景了。

3.3 Append Only表(仅追加表)

仅追加表很好理解,只要没有定义主键的表就是仅追加表。

仅追加表采用追加写入的方式,只能支持新增数据,不能更新和删除。

想要创建仅追加表很简单,我们只需要在建表语句中指定这个参数即可: write-mode = append-only

仅追加表主要用于无需更新数据的场景,例如数据仓库中ODS层的数据,不需要进行修改,保留数据原始的样子即可,此时推荐采用Paimon中的仅追加表。

仅追加表可以自动压缩表中的小文件,并且提供有序流式读取,我们也可以通过它来替代消息队列。

在实际工作中,除了数据库中的Binlog这种数据,还有大量的日志数据。
日志数据其实就属于Append Only数据,这种数据的数据量会非常大,一般情况下,我们会把这些日志数据存储在HDFS这种分布式文件系统中。

当我们在Paimon中使用仅追加表来存储这种数据的时候,数据可以实时写入,并且还可以实时读取;而且对实时资源的消耗也比较低,完全可以替代部分消息队列的场景。

最后,我们还需要注意一点,由于仅追加表没有主键,所以建议在使用的时候指定bucket-key,其实就是指定数据分桶(Bucket)的字段。
因为Bucket的范围是由数据中一个或多个列的哈希值确定的,我们可以通过bucket-key参数来指定分桶字段。
如果没有指定分桶字段,则默认会使用整行数据作为分桶字段,这样效率会比较低。

注意:如果是主键表,就算我们没有指定bucket-key,也不会使用整行数据作为分桶字段,因为主键表中有主键字段,默认会使用主键字段作为分桶字段。

下面我们来通过一个案例感受一下仅追加表的使用:

创建package:tech.xuwei.paimon.appendonlytable

创建object:FlinkDataStreamWriteToPaimonForAppendonly

代码如下:

package tech.xuwei.paimon.appendonlytable

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**
 * 仅追加表
 * Created by xuwei
 */
object FlinkDataStreamWriteToPaimonForAppendonly {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)


    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
      Row.ofKind(RowKind.INSERT, "jack", Int.box(10)),//+I
      Row.ofKind(RowKind.INSERT, "tom", Int.box(10)),//+I
    )(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))


    //将DataStream转换为Table
    val schema = Schema.newBuilder()
      .column("name", DataTypes.STRING())
      .column("age", DataTypes.INT())
      .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.insertOnly())//仅追加类型的数据

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH (
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //注册临时表
    tEnv.createTemporaryView("t1",table)

    //创建Paimon类型的表
    tEnv.executeSql(
      """
        |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
        |CREATE TABLE IF NOT EXISTS `append_only` (
        |    name STRING,
        |    age INT
        |) WITH (
        |    'write-mode' = 'append-only',
        |    'bucket' = '2'
        |)
        |""".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `append_only`
        |SELECT name,age FROM t1
        |""".stripMargin)
  }

}

注意:write-mode的值可以有三种,auto、change-log和append-only,对应的源码路径是这个:org.apache.paimon.WriteMode
默认值是auto,他会自动识别,如果表中有主键,则定义为change-log,否则定义为append-only

  • change-log:表示这个表可以接收新增、更新和删除类型的数据,
  • append-only:表示这个表只能接收新增类型的数据。

运行代码,然后到hdfs中确认这个表底层产生了几个bucket目录:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/append_only
Found 5 items
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:40 /paimon/default.db/append_only/bucket-0
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:40 /paimon/default.db/append_only/bucket-1
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:40 /paimon/default.db/append_only/manifest
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:40 /paimon/default.db/append_only/schema
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:40 /paimon/default.db/append_only/snapshot

此时可以看到产生了2个bucket目录,每个bucket目录里面有1条数据。

咱们前面分析过,如果我们创建的仅追加表没有指定bucket-key,这个时候会根据整行数据取哈希值,然后把数据分配到对应的bucket里面。

其实底层的计算公式是类似这样的:hash(数据)%bucket数量 ,此时返回的值就相当于是bucket的编号。

如果我们指定了bucket-key,在计算哈希值的时候就会根据bucket-key字段的值进行计算,下面我们验证一下:

首先删除刚才创建的表:
在这里插入图片描述

修改代码,在建表语句指定bucket-keyage

//创建Paimon类型的表
tEnv.executeSql(
  """
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS `append_only` (
    |    name STRING,
    |    age INT
    |) WITH (
    |    'write-mode' = 'append-only',
    |    'bucket' = '2',
    |    'bucket-key' = 'age'
    |)
    |""".stripMargin)

重新运行代码,到hdfs中查看bucket目录:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/append_only
Found 4 items
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:54 /paimon/default.db/append_only/bucket-0
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:54 /paimon/default.db/append_only/manifest
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:54 /paimon/default.db/append_only/schema
drwxr-xr-x   - yehua supergroup          0 2028-12-25 14:54 /paimon/default.db/append_only/snapshot

此时发现只产生了1个bucket目录。
我们在建表语句中其实定义了bucket的数量是2,由于目前所有的数据都写入了bucket-0这个目录,所以另一个bucket目录还没有被创建,当后续有数据写入的时候会自动创建。

由于目前两条数据的age都是10,所以根据age计算出来的哈希值是一样的,最终这两条数据都会写入到相同的bucket目录里面。
这就是bucket-key这个参数的作用。

如果不指定bucket-key,则默认会使用整行数据来计算哈希值,整行数据如果有上百个字段,那就需要根据这上百个字段的值计算哈希值,所以效率会降低。

注意:bucket-key也支持指定多个字段,多个字段的话使用逗号分隔开即可。

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1183999.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI对抗中的AI:技术展望与应用研究

随着人工智能&#xff08;AI&#xff09;技术的迅猛发展&#xff0c;AI系统之间的对抗性冲突也逐渐崭露头角。本文从AI技术的发展与应用入手&#xff0c;探讨如何利用AI技术来打败AI系统。我们将关注领域包括对抗学习、对抗生成网络、强化学习等&#xff0c;并分析潜在应用领域…

第三章:java的三大特征

系列文章目录 文章目录 系列文章目录前言一、封装二、继承三、多态总结 前言 面向对象编程有三大特征&#xff1a; 封装、 继承和多态。 一、封装 封装&#xff08;encapsulation&#xff09;就是把抽象出的数据【属性】和对数据的操作【方法】封装在一起&#xff0c;数据被保…

RabbitMQ 消息中间件

消息中间件 1、简介 消息中间件也可以称消息队列&#xff0c;是指用高效可靠的消息传递机制进行与平台无关的数据交流&#xff0c;并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型&#xff0c;可以在分布式环境下扩展进程的通信。 当下主流的消息中间…

用这个方法简直开挂!精密空调轻松拿捏

在当今现代社会&#xff0c;空调系统已成为生活和商业环境中不可或缺的一部分。随着气温的波动和能源效率的关注不断增加&#xff0c;精密空调监控技术变得至关重要。 精密空调监控系统可以帮助用户实现更高的能源效率、更稳定的温度控制、降低维护成本&#xff0c;并提供更高水…

做不好数据可视化不丢人,去奥威BI下载模板

有UI&#xff0c;自然可以很大程度上确保BI数据可视化报表审美在线&#xff0c;那没有UI呢&#xff1f;别怕&#xff0c;还有BI数据可视化报表模板&#xff0c;由资深UI亲自打磨&#xff0c;即保留了数据分析、数据可视化的优点&#xff0c;又做到了美观炫酷的要求。一键下载替…

带您了解流程中的人工活动处理方式

这次咱们来介绍 O2OA (翱途) 开发平台流程引擎中的人工活动的处理方式和逻辑&#xff0c;O2OA (翱途) 主要采用拖拽可视化开发的方式完成流程的设计和配置&#xff0c;不需要过多的代码编写&#xff0c;业务人员可以直接进行修改操作。 例如&#xff0c;咱们做一个 “报销申请…

南大通用数据库-Gbase-8a-报错集锦-07-图型化管理工具中存过不可见

目录 一、测试版本 二、排查过程 1、问题现象 2、查看用户权限 3、赋予系统视图权限 一、测试版本 名称值CPUIntel(R) Core(TM) i5-1035G1 CPU 1.00GHz操作系统CentOS Linux release 7.9.2009 (Core)内存3G逻辑核数2Gbase8a版本8.6.2-R43图型化管理工具版本9.5.2.0 二、…

缓冲流详解

缓冲流概述 缓冲流也称为高效流、或者高级流。之前学习的字节流可以称为原始流。 作用&#xff1a;缓冲流自带缓冲区、可以提高原始字节流、字符流读写数据的性能。 字节缓冲流 字节缓冲流性能优化原理&#xff1a; 字节缓冲输入流自带了8KB缓冲池&#xff0c;以后我们直接…

城市内涝积水预防,万宾科技内涝监测仪如何预警?

近几年来城市内涝所引发的安全隐患极为突出&#xff0c;影响着城市道路安全&#xff0c;而且也让市民心中多有惶恐。一旦城市内涝问题出现背后不仅是路面积水问题&#xff0c;更会导致城市无法正常运行&#xff0c;导致市民日常生活和工作受到影响。所以对于排水防涝设施的建设…

gwIvy for 3dMax插件教程

gwIvy for 3dMax插件教程 常春藤旨在在真实世界规模的场景中生长&#xff01; 常春藤在 500m 的树上看起来不太好...所以&#xff0c;尝试使用这个最大功能... 种植常春藤实际上就像编写插件一样简单:)&#xff09; 如果您阅读下面写的内容&#xff0c;那就更容易了... 1) 种植…

Leetcode—125.验证回文串【简单】

2023每日刷题&#xff08;二十三&#xff09; Leetcode—125.验证回文串 实现代码 class Solution { public:bool isPalindrome(string s) {int n s.size();if(n 1 && s[0] ) {return true;}int left 0, right 0;for(right 0; right < n; right) {if(s[rig…

地表水与地下水耦合丨基于QSWATMOD的SWAT-MODFLOW模拟丨模型率定丨案例分析

耦合模型被应用到很多科学和工程领域来改善模型的性能、效率和结果&#xff0c;SWAT作为一个地表水模型可以较好的模拟主要的水文过程&#xff0c;包括地表径流、降水、蒸发、风速、温度、渗流、侧向径流等&#xff0c;但是对于地下水部分的模拟相对粗糙&#xff0c;考虑到SWAT…

C# TabControl实现为每一个TabPage添加关闭按钮

默认情况下TabControl是无法通过界面关闭TabPage的 有些情况下我们需要手动关闭任意一个TabPage&#xff0c;如下图所示 TabControl控件自带属性是无法满足以上需求&#xff0c;下面简单介绍实现过程 1、首先需要对TabPage进行重绘&#xff0c;其目的是为了在TabPage上画出…

千兆光模块和万兆光模块的发展前景与市场分析

随着互联网技术的不断发展&#xff0c;千兆光模块和万兆光模块作为网络传输的核心部件&#xff0c;在数据传输领域已得到广泛的应用。本文将从发展历程、市场前景和应用案例三个方面详细分析千兆光模块和万兆光模块的优势和未来发展前景。 一、千兆光模块和万兆光模块的发展历…

Ubuntu配置网络与静态IP地址的常见方法

这里的环境是虚拟机中的Ubuntu18版本的网络配置&#xff0c;使用虚拟机比较常见&#xff0c;因为主机一般是有线直连&#xff0c;配置很简单 1、NAT模式 虚拟机最简单的联网就是勾选“NAT模式”&#xff0c;不需要任何配置&#xff0c;主机能上网&#xff0c;虚拟机就可以上网…

电脑丢失dll文件一键修复的方法,一分钟快速解决dll问题

动态链接库&#xff08;DLL&#xff09;在Windows操作系统中起到至关重要的作用。DLL文件能够在程序的运行期间提供所需的代码和数据&#xff0c;以参与和影响程序的执行。如果电脑中的某个或多个DLL文件丢失&#xff0c;或遭到破坏&#xff0c;那么涉及到这些文件的程序可能会…

windows cmake x86 x64 下载与安装

cmake 下载路径&#xff1a;cmake 下载选择&#xff1a; 界面下拉选取适合自己的版本 这里是windows x86 x64 &#xff08;x86是32位系统&#xff1b;x64是64位系统&#xff09; 安装&#xff1a; 点击安装。 此处选择添加环境变量 命令提示符 验证查看 cmake 桌面可以…

在Python中使用deepfakes实现AI换脸功能

目录 一、Deepfakes技术原理 二、Deepfakes技术实现方法 三、Deepfakes技术应用与实现代码 四、结论 近年来&#xff0c;深度学习技术在图像处理、计算机视觉和人工智能领域取得了显著的进步。其中&#xff0c;Deepfakes技术是一种基于深度学习的图像合成技术&#xff0c;可…

After Effects 2024 v24.0.2(AE2024)

After Effects 2024是视频特效和动态图形设计软件。以下是After Effects 2024的主要功能和特点&#xff1a; 支持创建各种令人惊叹的视觉效果&#xff0c;例如粒子系统、合成特效、绿屏抠像等。支持动画制作&#xff0c;包括关键帧动画、形状动画、运动跟踪等工具&#xff0c;…

若依框架详细教程

一、若依下载以及配置启动 1、下载地址 2、打开样式 3、数据库引入及配置 导入数据库 配置后端数据库 同时还要redis redis安装教程&#xff1a;安装教程 要是修改端口的话都需要改 打开前端package.json下载这个会提示 4、启动 后端启动 前端启动