流数据湖平台Apache Paimon(三)Flink进阶使用

news2025/1/19 19:40:53

文章目录

    • 2.9 进阶使用
      • 2.9.1 写入性能
      • 2.9.2 读取性能
      • 2.9.3 多Writer并发写入
      • 2.9.4 表管理
      • 2.9.5 缩放Bucket
    • 2.10 文件操作理解
      • 2.10.1 插入数据
      • 2.10.2 删除数据
      • 2.10.3 Compaction
      • 2.10.4 修改表
      • 2.10.5 过期快照
      • 2.10.6 Flink 流式写入

2.9 进阶使用

2.9.1 写入性能

Paimon的写入性能与检查点密切相关,因此需要更大的写入吞吐量:

增加检查点间隔,或者仅使用批处理模式。

增加写入缓冲区大小。

启用写缓冲区溢出。

如果您使用固定存储桶模式,请重新调整存储桶数量。

2.9.1.1 并行度

建议sink的并行度小于等于bucket的数量,最好相等。

选项必需的默认类型描述
sink.parallelismNo(none)Integer定义sink的并行度。默认情况下,并行度由框架使用上游链式运算符的相同并行度来确定。

2.9.1.2 Compaction

当Sorted Run数量较少时,Paimon writer 将在单独的线程中异步执行压缩,因此记录可以连续写入表中。然而,为了避免Sorted Runs的无限增长,当Sorted Run的数量达到阈值时,writer将不得不暂停写入。下表属性确定阈值。

选项必需的默认类型描述
num-sorted-run.stop-triggerNo(none)Integer触发停止写入的Sorted Runs次数,默认值为 ‘num-sorted-run.compaction-trigger’ + 1。

当 num-sorted-run.stop-trigger 变大时,写入停顿将变得不那么频繁,从而提高写入性能。但是,如果该值变得太大,则查询表时将需要更多内存和 CPU 时间。如果您担心内存 OOM,请配置sort-spill-threshold。它的值取决于你的内存大小。

2.9.1.3 优先考虑写入吞吐量

如果希望某种模式具有最大写入吞吐量,则可以缓慢而不是匆忙地进行Compaction。可以对表使用以下策略

num-sorted-run.stop-trigger = 2147483647

sort-spill-threshold = 10

此配置将在写入高峰期生成更多文件,并在写入低谷期逐渐合并到最佳读取性能。

2.9.1.4 触发Compaction的Sorted Run数

Paimon使用LSM树,支持大量更新。 LSM 在多次Sorted Runs中组织文件。从 LSM 树查询记录时,必须组合所有Sorted Runs以生成所有记录的完整视图。

过多的Sorted Run会导致查询性能不佳。为了将Sorted Run的数量保持在合理的范围内,Paimon writers 将自动执行Compaction。下表属性确定触发Compaction的最小Sorted Run数。

选项必需的默认类型描述
num-sorted-run.compaction-triggerNo5Integer触发Compaction的Sorted Run数。包括 0 级文件(一个文件一级排序运行)和高级运行(一个一级排序运行)。

2.9.1.5 写入初始化

在write初始化时,bucket的writer需要读取所有历史文件。如果这里出现瓶颈(例如同时写入大量分区),可以使用write-manifest-cache缓存读取的manifest数据,以加速初始化。

2.9.1.6 内存

Paimon writer中主要占用内存的地方有3个:

Writer的内存缓冲区,由单个任务的所有Writer共享和抢占。该内存值可以通过 write-buffer-size 表属性进行调整。

合并多个Sorted Run以进行Compaction时会消耗内存。可以通过 num-sorted-run.compaction-trigger 选项进行调整,以更改要合并的Sorted Run的数量。

如果行非常大,在进行Compaction时一次读取太多行数据可能会消耗大量内存。减少 read.batch-size 选项可以减轻这种情况的影响。

写入列式(ORC、Parquet等)文件所消耗的内存,不可调。

2.9.2 读取性能

2.9.2.1 Full Compaction

配置“full-compaction.delta-commits”在Flink写入中定期执行full-compaction。并且可以确保在写入结束之前分区被完全Compaction。

注意:Paimon 默认处理小文件并提供良好的读取性能。请不要在没有任何要求的情况下配置此Full Compaction选项,因为它会对性能产生重大影响。

2.9.2.2 主键表

对于主键表来说,这是一种“MergeOnRead”技术。读取数据时,会合并多层LSM数据,并行数会受到桶数的限制。虽然Paimon的merge会高效,但是还是赶不上普通的AppendOnly表。

如果你想在某些场景下查询得足够快,但只能找到较旧的数据,你可以:

配置full-compaction.delta-commits,写入数据时(目前只有Flink)会定期进行full Compaction。

配置“scan.mode”为“compacted-full”,读取数据时,选择full-compaction的快照。读取性能良好。

2.9.2.3 仅追加表

小文件会降低读取速度并影响 DFS 稳定性。默认情况下,当单个存储桶中的小文件超过“compaction.max.file-num”(默认50个)时,就会触发compaction。但是当有多个桶时,就会产生很多小文件。

您可以使用full-compaction来减少小文件。full-compaction将消除大多数小文件。

2.9.2.4 格式

Paimon 对 parquet 读取进行了一些查询优化,因此 parquet 会比 orc 稍快一些。

2.9.3 多Writer并发写入

Paimon的快照管理支持向多个writer写入。

默认情况下,Paimon支持对不同分区的并发写入。推荐的方式是streaming job将记录写入Paimon的最新分区;同时批处理作业(覆盖)将记录写入历史分区。

img

如果需要多个Writer写到同一个分区,事情就会变得有点复杂。例如,不想使用 UNION ALL,那就需要有多个流作业来写入“partial-update”表。参考如下的“Dedicated Compaction Job”。

2.9.3.1 Dedicated Compaction Job

默认情况下,Paimon writer 在写入记录时会根据需要执行Compaction。这对于大多数用例来说已经足够了,但有两个缺点:

这可能会导致写入吞吐量不稳定,因为执行压缩时吞吐量可能会暂时下降。

Compaction会将某些数据文件标记为“已删除”(并未真正删除)。如果多个writer标记同一个文件,则在提交更改时会发生冲突。 Paimon 会自动解决冲突,但这可能会导致作业重新启动。

为了避免这些缺点,用户还可以选择在writer中跳过Compaction,并仅运行专门的作业来进行Compaction。由于Compaction仅由专用作业执行,因此writer可以连续写入记录而无需暂停,并且不会发生冲突。

选项必需的默认类型描述
write-onlyNofalseBoolean如果设置为 true,将跳过Compaction和快照过期。此选项与独立Compaction一起使用。

Flink SQL目前不支持compaction相关的语句,所以我们必须通过flink run来提交compaction作业。

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

compact \

–warehouse \

–database \

–table \

[–partition ] \

[–catalog-conf [–catalog-conf …]] \

如果提交一个批处理作业(execution.runtime-mode:batch),当前所有的表文件都会被Compaction。如果您提交一个流作业(execution.runtime-mode: Streaming),该作业将持续监视表的新更改并根据需要执行Compaction。

2.9.4 表管理

2.9.4.1 管理快照

1)快照过期

Paimon Writer每次提交都会生成一个或两个快照。每个快照可能会添加一些新的数据文件或将一些旧的数据文件标记为已删除。然而,标记的数据文件并没有真正被删除,因为Paimon还支持时间旅行到更早的快照。它们仅在快照过期时被删除。

目前,Paimon Writer在提交新更改时会自动执行过期操作。通过使旧快照过期,可以删除不再使用的旧数据文件和元数据文件,以释放磁盘空间。

设置以下表属性:

选项必需的默认类型描述
snapshot.time-retainedNo1 hDuration已完成快照的最长时间保留。
snapshot.num-retained.minNo10Integer要保留的已完成快照的最小数量。
snapshot.num-retained.maxNoInteger.MAX_VALUEInteger要保留的已完成快照的最大数量。

注意,保留时间太短或保留数量太少可能会导致如下问题:

批量查询找不到该文件。例如,表比较大,批量查询需要10分钟才能读取,但是10分钟前的快照过期了,此时批量查询会读取到已删除的快照。

表文件上的流式读取作业(没有外部日志系统)无法重新启动。当作业重新启动时,它记录的快照可能已过期。 (可以使用Consumer Id来保护快照过期的小保留时间内的流式读取)。

2)回滚快照

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

rollback-to \

–warehouse \

–database \

–table \

–snapshot \

[–catalog-conf [–catalog-conf …]]

2.9.4.2 管理分区

创建分区表时可以设置partition.expiration-time。 Paimon会定期检查分区的状态,并根据时间删除过期的分区。

判断分区是否过期:将分区中提取的时间与当前时间进行比较,看生存时间是否超过partition.expiration-time。比如:

CREATE TABLE T (…) PARTITIONED BY (dt) WITH (

‘partition.expiration-time’ = ‘7 d’,

‘partition.expiration-check-interval’ = ‘1 d’,

‘partition.timestamp-formatter’ = ‘yyyyMMdd’

);

选项默认类型描述
partition.expiration-check-interval1 hDuration分区过期的检查间隔。
partition.expiration-time(none)Duration分区的过期时间间隔。如果分区的生命周期超过此值,则该分区将过期。分区时间是从分区值中提取的。
partition.timestamp-formatter(none)String用于格式化字符串时间戳的格式化程序。它可以与“partition.timestamp-pattern”一起使用来创建使用指定值的格式化程序。> 默认格式化程序为“yyyy-MM-dd HH:mm:ss”和“yyyy-MM-dd”。> 支持多个分区字段,例如“ y e a r − year- yearmonth-$day $hour:00:00”。> 时间戳格式化程序与 Java 的 DateTimeFormatter 兼容。
partition.timestamp-pattern(none)String可以指定一种模式来从分区获取时间戳。格式化程序模式由“partition.timestamp-formatter”定义。> 默认情况下,从第一个字段读取。> 如果分区中的时间戳是名为“dt”的单个字段,则可以使用“ d t ”。 > 如果它分布在年、月、日和小时的多个字段中,则可以使用“ dt”。> 如果它分布在年、月、日和小时的多个字段中,则可以使用“ dt>如果它分布在年、月、日和小时的多个字段中,则可以使用year- m o n t h − month- monthday h o u r : 00 : 00 ”。 > 如果时间戳位于 d t 和 h o u r 字段中,则可以使用“ hour:00:00”。> 如果时间戳位于 dt 和 hour 字段中,则可以使用“ hour:00:00”>如果时间戳位于dthour字段中,则可以使用dt $hour:00:00”。

2.9.4.3 管理小文件

小文件可能会导致:

稳定性问题:HDFS中小文件过多,NameNode会承受过大的压力。

成本问题:HDFS中的小文件会暂时使用最小1个Block的大小,例如128MB。

查询效率:小文件过多查询效率会受到影响。

1)Flink Checkpoint的影响

使用Flink Writer,每个checkpoint会生成 1-2 个快照,并且checkpoint会强制在 DFS 上生成文件,因此checkpoint间隔越小,会生成越多的小文件。

默认情况下,不仅checkpoint会导致文件生成,writer的内存(write-buffer-size)耗尽也会将数据flush到DFS并生成相应的文件。可以启用 write-buffer-spillable 在 writer 中生成溢出文件,从而在 DFS 中生成更大的文件。

所以,可以设置如下:

增大checkpoint间隔

增加 write-buffer-size 或启用 write-buffer-spillable

2)快照的影响

Paimon维护文件的多个版本,文件的Compaction和删除是逻辑上的,并没有真正删除文件。文件只有在 Snapshot 过期后才会被真正删除,因此减少文件的第一个方法就是减少 Snapshot 过期的时间。 Flink writer 会自动使快照过期。

分区和分桶的影响

img

表数据会被物理分片到不同的分区,里面有不同的桶,所以如果整体数据量太小,单个桶中至少有一个文件,建议你配置较少的桶数,否则会出现也有很多小文件。

3)主键表LSM的影响

LSM 树将文件组织成Sorted Runs的运行。Sorted Runs由一个或多个数据文件组成,并且每个数据文件恰好属于一个Sorted Runs。

img

默认情况下,Sorted Runs数取决于 num-sorted-run.compaction-trigger,这意味着一个桶中至少有 5 个文件。如果要减少此数量,可以保留更少的文件,但写入性能可能会受到影响。

4)仅追加表的文件的影响

默认情况下,Append-Only 还会进行自动Compaction以减少小文件的数量

对于分桶的 Append-only 表,为了排序会对bucket内的文件行Compaction,可能会保留更多的小文件。

5)Full-Compaction的影响

主键表是5个文件,但是Append-Only表(桶)可能单个桶里有50个小文件,这是很难接受的。更糟糕的是,不再活动的分区还保留了如此多的小文件。

建议配置Full-Compaction,在Flink写入时配置‘full-compaction.delta-commits’定期进行full-compaction。并且可以确保在写入结束之前分区被full-compaction。

2.9.5 缩放Bucket

1)说明

由于总桶数对性能影响很大,Paimon 允许用户通过 ALTER TABLE 命令调整桶数,并通过 INSERT OVERWRITE 重新组织数据布局,而无需重新创建表/分区。当执行覆盖作业时,框架会自动扫描旧桶号的数据,并根据当前桶号对记录进行哈希处理。

– rescale number of total buckets

ALTER TABLE table_identifier SET (‘bucket’ = ‘…’)

– reorganize data layout of table/partition

INSERT OVERWRITE table_identifier [PARTITION (part_spec)]

SELECT …

FROM table_identifier

[WHERE part_spec]

注意:

ALTER TABLE 仅修改表的元数据,不会重新组织或重新格式化现有数据。重新组织现有数据必须通过INSERT OVERWRITE来实现。

重新缩放桶数不会影响读取和正在运行的写入作业。

一旦存储桶编号更改,任何新安排的 INSERT INTO 作业写入未重新组织的现有表/分区将抛出 TableException ,并显示如下类似异常:

Try to write table/partition … with a new bucket num …,

but the previous bucket num is … Please switch to batch mode,

and perform INSERT OVERWRITE to rescale current data layout first.

对于分区表,不同的分区可以有不同的桶号。例如:

ALTER TABLE my_table SET (‘bucket’ = ‘4’);

INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-01’)

SELECT * FROM …;

ALTER TABLE my_table SET (‘bucket’ = ‘8’);

INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-02’)

SELECT * FROM …;

在覆盖期间,确保没有其他作业写入同一表/分区。

注意:对于启用日志系统的表(例如Kafka),请重新调整主题的分区以保持一致性。

重新缩放存储桶有助于处理吞吐量的突然峰值。假设有一个每日流式ETL任务来同步交易数据。该表的DDL和管道如下所示。

2)官方示例:

​ 如下是正在跑的一个作业:

– 建表

CREATE TABLE verified_orders (

trade_order_id BIGINT,

item_id BIGINT,

item_price DOUBLE,

dt STRING,

PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED

) PARTITIONED BY (dt)

WITH (

‘bucket’ = ‘16’

);

– kafka表

CREATE temporary TABLE raw_orders(

trade_order_id BIGINT,

item_id BIGINT,

item_price BIGINT,

gmt_create STRING,

order_status STRING

) WITH (

‘connector’ = ‘kafka’,

‘topic’ = ‘…’,

‘properties.bootstrap.servers’ = ‘…’,

‘format’ = ‘csv’

);

– 流式插入16个分桶

INSERT INTO verified_orders

SELECT trade_order_id,

​ item_id,

​ item_price,

​ DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt

FROM raw_orders

WHERE order_status = ‘verified’;

过去几周运行良好。然而,最近数据量增长很快,作业的延迟不断增加。为了提高数据新鲜度,用户可以执行如下操作缩放分桶:

(1)使用保存点暂停流作业

$ ./bin/flink stop \

–savepointPath /tmp/flink-savepoints \

$JOB_ID

(2)增加桶数

ALTER TABLE verified_orders SET (‘bucket’ = ‘32’);

(3)切换到批处理模式并覆盖流作业正在写入的当前分区

SET ‘execution.runtime-mode’ = ‘batch’;

– 假设今天是2022-06-22

– 情况1:没有更新历史分区的延迟事件,因此覆盖今天的分区就足够了

INSERT OVERWRITE verified_orders PARTITION (dt = ‘2022-06-22’)

SELECT trade_order_id,

​ item_id,

​ item_price

FROM verified_orders

WHERE dt = ‘2022-06-22’;

- 情况2:有更新历史分区的延迟事件,但范围不超过3天

INSERT OVERWRITE verified_orders

SELECT trade_order_id,

​ item_id,

​ item_price,

​ dt

FROM verified_orders

WHERE dt IN (‘2022-06-20’, ‘2022-06-21’, ‘2022-06-22’);

(4)覆盖作业完成后,切换回流模式,从保存点恢复(可以增加并行度=新bucket数量)。

SET ‘execution.runtime-mode’ = ‘streaming’;

SET ‘execution.savepoint.path’ = ;

INSERT INTO verified_orders

SELECT trade_order_id,

item_id,

item_price,

DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt

FROM raw_orders

WHERE order_status = ‘verified’;

2.10 文件操作理解

2.10.1 插入数据

当我们执行INSERT INTO

CREATE CATALOG paimon WITH (

‘type’ = ‘paimon’,

‘warehouse’ = ‘file:///tmp/paimon’

);

USE CATALOG paimon;

CREATE TABLE T (

id BIGINT,

a INT,

b STRING,

dt STRING COMMENT ‘timestamp string in format yyyyMMdd’,

PRIMARY KEY(id, dt) NOT ENFORCED

) PARTITIONED BY (dt);

INSERT INTO T VALUES (1, 10001, ‘varchar00001’, ‘20230501’);

一旦Flink作业完成,记录就会通过成功提交写入Paimon表中。用户可以通过执行查询 SELECT * FROM T 来验证这些记录的可见性,该查询将返回单行。提交过程创建位于路径 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。 snapshot-1 处生成的文件布局如下所述:

img

snapshot-1 的内容包含快照的元数据,例如清单列表(manifest list)和schema ID:

{

“version” : 3,

“id” : 1,

“schemaId” : 0,

“baseManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0”,

“deltaManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1”,

“changelogManifestList” : null,

“commitUser” : “7d758485-981d-4b1a-a0c6-d34c3eb254bf”,

“commitIdentifier” : 9223372036854775807,

“commitKind” : “APPEND”,

“timeMillis” : 1684155393354,

“logOffsets” : { },

“totalRecordCount” : 1,

“deltaRecordCount” : 1,

“changelogRecordCount” : 0,

“watermark” : -9223372036854775808

}

清单列表包含快照的所有更改,baseManifestList 是应用 deltaManifestList 中的更改的基础文件。第一次提交将生成 1 个清单文件(manifest file),并创建 2 个清单列表(manifest list):

./T/manifest:

–deltaManifestList:包含对数据文件执行操作的清单条目列表(上图中的 manifest-list-1-delta)

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1

–baseManifestList:空的(上图中的 manifest-list-1-base)

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0

–清单文件:存储快照中数据文件的信息(上图中的manifest-1-0)

manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0

跨不同分区插入一批记录:

INSERT INTO T VALUES

(2, 10002, ‘varchar00002’, ‘20230502’),

(3, 10003, ‘varchar00003’, ‘20230503’),

(4, 10004, ‘varchar00004’, ‘20230504’),

(5, 10005, ‘varchar00005’, ‘20230505’),

(6, 10006, ‘varchar00006’, ‘20230506’),

(7, 10007, ‘varchar00007’, ‘20230507’),

(8, 10008, ‘varchar00008’, ‘20230508’),

(9, 10009, ‘varchar00009’, ‘20230509’),

(10, 10010, ‘varchar00010’, ‘20230510’);

第二次提交发生,执行 SELECT * FROM T 将返回 10 行。创建一个新快照,即 snapshot-2,并为我们提供以下物理文件布局:

% ls -atR .

./T:

dt=20230501

dt=20230502

dt=20230503

dt=20230504

dt=20230505

dt=20230506

dt=20230507

dt=20230508

dt=20230509

dt=20230510

snapshot

schema

manifest

./T/snapshot:

LATEST

snapshot-2

EARLIEST

snapshot-1

./T/manifest:

manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list for snapshot-2

manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for snapshot-2

manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0 # manifest file for snapshot-2

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list for snapshot-1

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for snapshot-1

manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 # manifest file for snapshot-1

./T/dt=20230501/bucket-0:

data-b75b7381-7c8b-430f-b7e5-a204cb65843c-0.orc

# each partition has the data written to bucket-0

./T/schema:

schema-0

截至 snapshot-2 的新文件布局如下所示:

img

2.10.2 删除数据

执行如下删除:

DELETE FROM T WHERE dt >= ‘20230503’;

第三次提交发生,它为我们提供了 snapshot-3。现在,列出表下的文件,您会发现没有分区被删除。相反,会为分区 20230503 到 20230510 创建一个新的数据文件:

./T/dt=20230510/bucket-0:

data-b93f468c-b56f-4a93-adc4-b250b3aa3462-0.orc # newer data file created by the delete statement

data-0fcacc70-a0cb-4976-8c88-73e92769a762-0.orc # older data file created by the insert statement

因为我们在第二次提交中插入一条记录(由 +I[10, 10010, ‘varchar00010’, ‘20230510’] 表示),然后在第三次提交中删除该记录。执行 SELECT * FROM T 将返回 2 行,即:

+I[1, 10001, ‘varchar00001’, ‘20230501’]

+I[2, 10002, ‘varchar00002’, ‘20230502’]

截至 snapshot-3 的新文件布局如下所示

img

manifest-3-0包含8个ADD操作类型的manifest条目,对应8个新写入的数据文件。

2.10.3 Compaction

小文件的数量会随着连续快照的增加而增加,这可能会导致读取性能下降。因此,需要进行full compaction以减少小文件的数量。

现在触发full-compaction:

./bin/flink run \

./lib/paimon-flink-action-0.5-SNAPSHOT.jar \

compact \

–path file:///tmp/paimon/default.db/T

所有当前表文件将被压缩,并创建一个新快照,即 snapshot-4,并包含以下信息:

{

“version” : 3,

“id” : 4,

“schemaId” : 0,

“baseManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-0”,

“deltaManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-1”,

“changelogManifestList” : null,

“commitUser” : “a3d951d5-aa0e-4071-a5d4-4c72a4233d48”,

“commitIdentifier” : 9223372036854775807,

“commitKind” : “COMPACT”,

“timeMillis” : 1684163217960,

“logOffsets” : { },

“totalRecordCount” : 38,

“deltaRecordCount” : 20,

“changelogRecordCount” : 0,

“watermark” : -9223372036854775808

}

截至 snapshot-4 的新文件布局如下所示

img

manifest-4-0 包含 20 个清单条目(18 个 DELETE 操作和 2 个 ADD 操作):

对于分区20230503到20230510,对两个数据文件进行两次DELETE操作

对于分区20230501到20230502,对同一个数据文件进行1次DELETE操作和1次ADD操作。

2.10.4 修改表

执行以下语句来配置full-compaction:

ALTER TABLE T SET (‘full-compaction.delta-commits’ = ‘1’);

它将为 Paimon 表创建一个新schema,即 schema-1,但在下一次提交之前还没有快照实际使用该schema。

2.10.5 过期快照

在快照过期的过程中,首先确定快照的范围,然后将这些快照内的数据文件标记为删除。仅当存在引用特定数据文件的类型为 DELETE 的清单条目时,数据文件才会被标记为删除。此标记可确保该文件不会被后续快照使用并可以安全删除。

假设上图中的所有 4 个快照都即将过期。过期流程如下:

它首先删除所有标记的数据文件,并记录任何更改的存储桶。

然后它会删除所有更改日志文件和关联的清单。

最后,它删除快照本身并写入最早的提示文件。

如果删除过程后有任何目录留空,它们也将被删除。

假设创建了另一个快照 snapshot-5 并触发了快照过期。 snapshot-1 到 snapshot-4 被删除。为简单起见,我们将只关注以前快照中的文件,快照过期后的最终布局如下所示:

img

结果,分区20230503至20230510被物理删除。

2.10.6 Flink 流式写入

用 CDC 摄取的示例来说明 Flink Stream Write。本节将讨论更改数据的捕获和写入 Paimon,以及异步Compaction和快照提交和过期背后的机制。

CDC 摄取工作流程以及所涉及的每个组件所扮演的独特角色:

img

(1)MySQL CDC Source统一读取快照和增量数据,分别由SnapshotReader读取快照数据和BinlogReader读取增量数据。

(2)Paimon Sink将数据写入桶级别的Paimon表中。其中的CompactManager将异步触发Compaction。

(3)Committer Operator 是一个单例,负责提交和过期快照。

端到端数据流:

img

MySQL Cdc Source读取快照和增量数据,并在规范化后将它们发送到下游:

img

Paimon Sink 首先将新记录缓冲在基于堆的 LSM 树中,并在内存缓冲区满时将它们刷新到磁盘。请注意,写入的每个数据文件都是Sorted Run。此时,还没有创建清单文件和快照。在 Flink 检查点发生之前,Paimon Sink 将刷新所有缓冲记录并向下游发送可提交消息,该消息在检查点期间由 Committer Operator 读取并提交:

img

在检查点期间,Committer Operator 将创建一个新快照并将其与清单列表关联起来,以便该快照包含有关表中所有数据文件的信息:

img

稍后可能会发生异步Compaction,CompactManager 生成的提交表包含有关先前文件和合并文件的信息,以便 Committer Operator 可以构造相应的清单条目。在这种情况下,Committer Operator 可能会在 Flink 检查点期间生成两个快照:

一个用于写入数据(Append 类型的快照),

另一个用于compact(Compact 类型的快照)。

如果在检查点间隔期间没有写入数据文件,则只会创建 Compact 类型的快照。 Committer Operator 将检查快照是否过期并执行标记数据文件的物理删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/809006.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++ 类的特殊成员函数:拷贝构造函数(四)

1. 简介 拷贝构造是一种特殊的构造函数&#xff0c;用于创建一个对象&#xff0c;该对象是从同一类中的另一个对象复制而来的。拷贝构造函数通常采用引用参数来接收要复制的对象&#xff0c;并使用该对象的副本来创建一个新对象。 2. 结构 class MyClass { public:MyClass(c…

一种新的基于区域的在线活动轮廓模型研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

SpringBoot热部署的开启与关闭

1、 开启热部署 &#xff08;1&#xff09;导入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId> </dependency>&#xff08;2&#xff09;设置 此时就搞定了。。。 2、…

TCP网络通信编程之网络上传文件

【图片】 【思路解析】 【客户端代码】 import java.io.*; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException;/*** ProjectName: Study* FileName: TCPFileUploadClient* author:HWJ* Data: 2023/7/29 18:44*/ public class TCPFil…

解决在云服务器开放端口号以后telnet还是无法连接的问题

这里用阿里云服务器举例&#xff0c;在安全组开放了对应的TCP端口以后。使用windows的cmd下的telnet命令&#xff0c;还是无法正常连接。 telnet IP地址 端口号解决方法1&#xff1a; 在轻量服务器控制台的防火墙规则中添加放行端口。 阿里云-管理防火墙 如图&#xff0c;开放…

Windows 11 下 OpenFace 2.2.0 的安装

写在前面 最近需要做关于面部的东西&#xff0c;所以需要使用到OpenFace这个工具&#xff0c;本文仅用来记录本人安装过程以供后续复现&#xff0c;如果可以帮助到读者也是非常荣幸。 安装过程 不编译直接使用 这种方法可以直接从官方下载下来编译好的exe以及gui进行使用&a…

1000Wqps生产级IM,怎么架构?

前言 在40岁老架构师 尼恩的读者社区(50)中&#xff0c;很多小伙伴拿高薪&#xff0c;完成架构的升级&#xff0c;进入架构师赛道&#xff0c;打开薪酬天花板。 然后&#xff0c;在架构师的面试过程中&#xff0c;常常会遇到IM架构的问题&#xff1a; 如果要你从0到1做IM架构…

python与深度学习(十):CNN和cifar10二

目录 1. 说明2. cifar10的CNN模型测试2.1 导入相关库2.2 加载数据和模型2.3 设置保存图片的路径2.4 加载图片2.5 图片预处理2.6 对图片进行预测2.7 显示图片 3. 完整代码和显示结果4. 多张图片进行测试的完整代码以及结果 1. 说明 本篇文章是对上篇文章训练的模型进行测试。首…

Flutter 使用texture_rgba_renderer实现桌面端渲染视频

Flutter视频渲染系列 第一章 Android使用Texture渲染视频 第二章 Windows使用Texture渲染视频 第三章 Linux使用Texture渲染视频 第四章 全平台FFICustomPainter渲染视频 第五章 Windows使用Native窗口渲染视频 第六章 桌面端使用texture_rgba_renderer渲染视频&#xff08;本…

MySQL高级篇第3章(用户与权限管理)

文章目录 1、用户管理1.1 登录MySQL服务器1.2 创建用户1.3 修改用户1.4 删除用户1.5 设置当前用户密码1.6 修改其他用户密码1.7 MySQL8密码管理 2、权限管理2.1 全新列表2.2 授予权限的原则2.3 授予权限2.4 查看权限2.5 收回权限 3、权限表3.1 user表3.2 db表3.3 tables_priv表…

ssti总结转载

一、初识SSTI 1、什么是SSTI&#xff1f; SSTI就是服务器端模板注入(Server-Side Template Injection)&#xff0c;实际上也是一种注入漏洞。 可能SSTI对大家而言不是很熟悉&#xff0c;但是相信大家很熟悉SQL注入。实际上这两者的思路都是相同的&#xff0c;因此可以类比来分…

【嵌入式Linux系统开发】——系统移植概述

目录 &#x1f349;&#x1f349;一、什么是嵌入式系统 &#x1f349;&#x1f349;二、嵌入式系统操作 &#x1f349;&#x1f349;三、嵌入式Linux的特点 &#x1f349;&#x1f349;四、嵌入式系统的组成 1、硬件和软件 2、硬件层 3、中间层 4、软件层 5、 功能层与执…

手动创建一张“资产负债表”和“利润表”

1. 前言 了解了“复式记账法”&#xff0c;以及“增值税”等概念后&#xff0c;让我们通过一个简化的例子&#xff0c;来手动创建一张资产负债表和利润表&#xff0c;进而加深对于记账和这两种报表的理解。 2. 手动创建财务报表 2.1 期初余额 假设某公司的2022年度期初余额…

找不到vcruntime140.dll无法继续执行代码怎么办?(详解)

1.vcruntime140.dll是什么&#xff1f;有什么作用&#xff1f; vcruntime140.dll是Windows操作系统中的一个动态链接库文件&#xff0c;它属于Microsoft Visual C Redistributable的一部分。DLL是Dynamic Link Library的缩写&#xff0c;它包含了一系列函数和资源&#xff0c;…

C计数问题---2023河南萌新联赛第(三)场:郑州大学

解析&#xff1a; n 可以分成两个数&#xff0c;记录每个数的因子对数&#xff0c;乘起来即可。 注意当因子相同时&#xff0c;只1 #include<bits/stdc.h> using namespace std; int n,res; int main(){cin>>n;for(int i1;i<n;i){int xi,yn-i;int cnt10,cnt20;…

【周末闲谈】剑指offer,了解面试,学会面试

我们在找工作时&#xff0c;需要结合自己的现状&#xff0c;针对意向企业做好充分准备。作为程序员&#xff0c;你有哪些面试IT技术岗的技巧&#xff1f; 你可以从一下几个方向谈谈你的想法和观点。 个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️…

DHCP中继代理原理(第二十八课)

当客户机和DHCP服务器不在一个广播域时,DHCP服务器无法接收到客户机的DHCP discover广播数据包,客户机就无法获得IP地址 第一步配置DHCP服务器的信息 <Huawei>u t m //清除日志 Info: Current terminal monitor is off. <Huawei>sys [Huawei]sysname DHCP-R…

RBAC三级菜单实现(从前端到后端)未完待续

1、菜单设计 2、前端路由 根据不同的用户id显示不同的菜单 一个是找 一个是路由 3、多级菜单 展示所有权限&#xff0c;并且根据当前用户id展示它所属的角色的所有菜单。 前端树状展示 太牛逼了&#xff01; 思路&#xff1a; 后端&#xff1a;传给前端map&#xff0…

Android AIDL 使用

工程目录图 请点击下面工程名称&#xff0c;跳转到代码的仓库页面&#xff0c;将工程 下载下来 Demo Code 里有详细的注释 代码&#xff1a;LearnAIDL代码&#xff1a;AIDLClient. 参考文献 安卓开发学习之AIDL的使用android进阶-AIDL的基本使用Android AIDL 使用使用 AIDL …

Matlab Image Processing toolbox 下载安装方法

当安装好Matlab之后&#xff0c;发现没有Image Processing toolbox这个图像处理工具箱 从新安装一遍&#xff0c; 选上 Image Processing toolbox 但是不用选matlab即可 1.找到之前安装时的Setup安装程序包&#xff0c;按照之前安装Matlab步骤&#xff0c;到选择需要安装的Ma…