Apache Paimon 文件操作

news2024/11/18 7:31:19

本文旨在澄清不同文件操作对文件的影响。

本页面提供具体示例和实用技巧,以有效地管理这些操作。此外,通过对提交(commit)和压实(compact)等操作的深入探讨,我们旨在提供有关文件创建和更新的见解。

前提

对以下几篇有了解:
1、Apache Paimon 介绍
2、Apache Paimon 基础概念
3、Apache Paimon 文件布局设计
4、知道如何在 Flink 中使用 Paimon

创建 catalog

在 Flink lib 中放入 paimon-flink 依赖包,执行  ./sql-client.sh 启动 Flink SQL Client,然后执行下面的命令去创建 Paimon catlog:

CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 'file:///tmp/paimon'
);

USE CATALOG paimon;

执行完后会在 file:///tmp/paimon 路径下创建目录 default.db

创建 Table

执行下面的命令会创建一个带有 3 个属性的 Paimon 表:

CREATE TABLE T (
  id BIGINT,
  a INT,
  b STRING,
  dt STRING COMMENT 'timestamp string in format yyyyMMdd',
  PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt);

执行后,Paimon 表 T 会在 /tmp/paimon/default.db/T 目录下生成目录,它的 schema 会存放在目录 /tmp/paimon/default.db/T/schema/schema-0 下。

c37214dd4edc3b3e73604a12c1d91d38.png

写入数据到 Table

INSERT INTO T VALUES (1, 10001, 'varchar00001', '20230501');
5ee958a74e2c7719d687bf90f4242668.png

用户可以通过执行查询 SELECT * FROM T 来验证这些记录的可见性,该查询将返回一行结果。提交过程会创建一个位于路径 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。快照-1 的文件布局如下所述:

一旦任务运行完成变成 finished 时,提交成功后数据就写入到 Paimon 表中。用户可以通过执行查询 SELECT * FROM T 来验证数据的可见性,该查询将返回一行结果。

另外可以发现目录的结构发生如下变化,新增 dt=20230501manifestsnapshot三个目录。三个的目录结构如下:

10beb60936705a7907e03ffa1169a724.png

查询目录下的数据如下所示:

4ac9fbff8bda08899c8c86c09a8ca923.png

这是因为提交过程中会创建一个位于 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。snapshot-1 的文件布局如下所述:

ac5165245eb8114a939e11e1b51262b7.png
snapshot-1的内容包含了这个 snapshot 的元数据,比如 manifest list  和 schema id:

{
  "version" : 3,
  "id" : 1,
  "schemaId" : 0,
  "baseManifestList" : "manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0",
  "deltaManifestList" : "manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1",
  "changelogManifestList" : null,
  "commitUser" : "5132ef16-41ec-4172-8bf4-01a304507b36",
  "commitIdentifier" : 9223372036854775807,
  "commitKind" : "APPEND",
  "timeMillis" : 1697080282120,
  "logOffsets" : { },
  "totalRecordCount" : 1,
  "deltaRecordCount" : 1,
  "changelogRecordCount" : 0,
  "watermark" : -9223372036854775808
}

需要提醒的是,manifest list 包含了 snapshot 的所有更改,baseManifestList 是应用在 deltaManifestList 中的更改所基于的基本文件。第一次提交将导致生成 1 个清单文件,并创建了 2 个清单列表(文件名可能与你的实验中的不同):

➜  T 
manifest-232271e3-f294-4556-8947-ee87483c3bfd-0
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1
6636815cd4bff6ef359c6e0e46705656.png
image.png

manifest-232271e3-f294-4556-8947-ee87483c3bfd-0: 如前面文件布局图中的 manifest-1-0,存储了关于 snapshot 中数据文件信息的 manifest。

cf9bbbbb14fdab1b0ffdb68af60e2b09.png

manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0: 是基础的 baseManifestList,如前面文件布局图中的 manifest-list-1-base,实际上是空的。

00210ba45c64d5d1a01f54ce88389212.png

manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1:是deltaManifestList,如前面文件布局图中的 manifest-list-1-delta,其中包含一系列对数据文件执行操作的清单条目,而在这种情况下,清单条目为 manifest-1-0

5254d85be145f7145c1a4d0176519df7.png

在不同分区中插入一批记录,在 Flink SQL 中执行以下语句:

INSERT INTO T VALUES 
(2, 10002, 'varchar00002', '20230502'),
(3, 10003, 'varchar00003', '20230503'),
(4, 10004, 'varchar00004', '20230504'),
(5, 10005, 'varchar00005', '20230505'),
(6, 10006, 'varchar00006', '20230506'),
(7, 10007, 'varchar00007', '20230507'),
(8, 10008, 'varchar00008', '20230508'),
(9, 10009, 'varchar00009', '20230509'),
(10, 10010, 'varchar00010', '20230510');
78c1c0510ee7690053425a27d2f2c475.png

等任务执行完成并提交快照后,执行 SELECT * FROM T 将返回 10 行数据。

53935a35ecbf59ebf67b1e59f904d4ac.png

创建了一个新的快照,即 snapshot-2,并给出了以下物理文件布局:

ac71d4fd4c53aa07e60a8f4e0b80d2f8.png

dt=20230501
dt=20230502
dt=20230503
dt=20230504
dt=20230505
dt=20230506
dt=20230507
dt=20230508
dt=20230509
dt=20230510
manifest
schema
snapshot


➜  T ll snapshot
EARLIEST
LATEST
snapshot-1
snapshot-2

➜  T ll manifest
manifest-232271e3-f294-4556-8947-ee87483c3bfd-0 //snapshot-1 manifest file
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0 //snapshot-1 baseManifestList
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1 //snapshot-1 deltaManifestList

manifest-ed94ba0c-e71f-4a01-863c-479b34af2551-0 //snapshot-2 manifest file
manifest-list-03bad670-05ae-48b2-b88c-de631ad14333-0 //snapshot-2 baseManifestList
manifest-list-03bad670-05ae-48b2-b88c-de631ad14333-1 snapshot-2 baseManifestList

新的快照文件布局图如下:

d83d7ca3ede10fcff688245a1649159a.png

删除数据

接下来删除满足条件 dt>=20230503 的数据。在 Flink SQL Client 中,执行以下语句:

DELETE FROM T WHERE dt >= '20230503';

注意⚠️:

1、需使用 Flink 1.17 及以上版本,否则不支持该语法

Flink SQL> DELETE FROM T WHERE dt >= '20230503';
>
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Unsupported query: DELETE FROM T WHERE dt >= '20230503';
3f6e34d0e4fa8108ee248f35d9e333e0.png

2、使用 batch 模式,否则报错不支持

Flink SQL> DELETE FROM T WHERE dt >= '20230503';
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: DELETE statement is not supported for streaming mode now.
8cba985e4a6077c4c8d03e0fcd7ccae1.png

需要设置 SET 'execution.runtime-mode' = 'batch';

f6ae65f4d88ef90106d7f7a91be18d20.png

cfe389a547190d230108158d2d4bdc7f.png

第三次提交完成,并生成了快照 snapshot-3。现在,表下的目录,会发现没有分区被删除。相反,为分区 20230503 到 20230510 创建了一个新的数据文件:

71d27104f729d5dafd9f49a3bc265894.png

➜  T ll dt=20230510/bucket-0

data-0531fa1e-6ff1-47ed-aeea-3a01896c9698-0.orc # newer data file created by the delete statement 
data-0c8a6a16-13c5-4049-b4ed-e968d34e20ae-0.orc # older data file created by the insert statement

这是有道理的,因为我们在第二次提交中插入了一条数据(为 +I[10, 10010, 'varchar00010', '20230510'] ),然后在第三次提交中删除了数据。现在再执行一下 SELECT * FROM T只能查询到两条数据。

+I[1, 10001, 'varchar00001', '20230501']
+I[2, 10002, 'varchar00002', '20230502']
9e2bfece666a555f0c787c08db76be83.png

f54388d8ebd37539035300e9f3b61db4.png

snapshot-3后新的文件布局如下图:

cf60933c659066f1353336aa7d6d9654.png

9625cbe157d4995a7c90f8214bbc9a69.png

4dfa9936a847f42d9e550daadae4c426.png

请注意,manifest-3-0 包含了 8 个 ADD 操作类型的清单条目,对应着 8 个新写入的数据文件。

ae60db9dd79c4b573cedd100e82d1320.png

Compact Table

你可能已经注意到的,随着连续 snapshot 的增加,小文件的数量会增加,这可能会导致读取性能下降。因此,需要进行全量压缩以减少小文件的数量。

通过 flink run 运行一个专用的合并小文件任务:

<FLINK_HOME>/bin/flink run \
    -D execution.runtime-mode=batch \
    ./paimon-flink-action-0.6-SNAPSHOT.jar \
    compact \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    [--partition <partition-name>] \
    [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
    [--table-conf <paimon-table-dynamic-conf> [--table-conf <paimon-table-dynamic-conf>] ...]

例如(提前下载好压缩任务 jar 在 Flink 客户端下):

./bin/flink run \
   -D execution.runtime-mode=batch \
    ./paimon-flink-action-0.6-20231012.001913-36.jar \
    compact \
    --path file:///tmp/paimon/default.db/T
e5563a9678ae637da9037c652ed4d9fc.png

d3e949f396eadd31eb1e8bc113d5b85d.png

压缩任务结束后,再来看下 snapshot-4 文件结构:

{
  "version" : 3,
  "id" : 4,
  "schemaId" : 0,
  "baseManifestList" : "manifest-list-00af18ef-486d-4046-be60-25533e078333-0",
  "deltaManifestList" : "manifest-list-00af18ef-486d-4046-be60-25533e078333-1",
  "changelogManifestList" : null,
  "commitUser" : "ab094a14-17e0-4215-8e79-cd0651436dee",
  "commitIdentifier" : 9223372036854775807,
  "commitKind" : "COMPACT",
  "timeMillis" : 1697102418177,
  "logOffsets" : { },
  "totalRecordCount" : 2,
  "deltaRecordCount" : -16,
  "changelogRecordCount" : 0,
  "watermark" : -9223372036854775808
}
dae13a84c0deafcf77d7b3459948e9b7.png

16d2ce5a4f4b9c41ea7bbb10b07c9f4e.png

manifest-4-0 包含 20 个清单条目(18 个 DELETE 操作和 2 个 ADD 操作):

  • 对于分区 20230503 到 20230510,有两个删除操作,对应两个数据文件

  • 对于分区 20230501 到 20230502,有一个删除操作和一个添加操作,对应同一个数据文件。

Alter Table

执行以下语句来配置全量压缩:

ALTER TABLE T SET ('full-compaction.delta-commits' = '1');

这将为 Paimon 表创建一个新的 schema,即 schema-1,但在下一次提交之前,不会有任何 snapshot 使用此模式。

3e105c1f595d8d263e76ea29b28e59ff.png

过期 snapshot

请注意,标记为删除的数据文件直到 snapshot 过期且没有任何消费者依赖于该 snapshot 时才会真正被删除。参考 Manage Snapshots 可以查阅更多信息。

在 snapshot 过期过程中,首先确定 snapshot 的范围,然后标记这些 snapshot 内的数据文件以进行删除。只有当存在引用特定数据文件的 DELETE 类型的清单条目时,才会标记该数据文件进行删除。这种标记确保文件不会被后续的 snapshot 使用,并且可以安全地删除。

假设上图中的所有 4 个 snapshot 即将过期。过期过程如下:

1、首先删除所有标记为删除的数据文件,并记录任何更改的 bucket。
2、然后删除所有的 changelog 文件和关联的 manifests。
3、最后,删除快照本身并写入最早的提示文件。

如果删除过程后留下的空目录,也将被删除。

假设创建了另一个快照 snapshot-5,并触发了快照过期。将删除 snapshot-1 到 snapshot-4。为简单起见,我们只关注以前快照的文件,快照过期后的最终布局如下:

20b502dfeebe97fd3c1763b3935a4233.png
因此,分区 20230503 到 20230510 的数据被物理删除了。

Flink 流式写入

我们通过利用 CDC 数据的示例来测试 Flink 流式写入,将介绍将变更数据捕获并写入 Paimon 的过程,以及异步压缩、快照提交和过期的机制背后的原理。帮我们更详细地了解 CDC 数据摄取的工作流程以及每个参与组件所扮演的独特角色。

d0a88a3857ccb937c6ce88ebb9d27e83.png

1、MySQL CDC Source 统一读取快照数据和增量数据,其中 SnapshotReader 读取快照数据,而 BinlogReader 读取增量数据。
2、Paimon Sink 将数据按 Bucket 级别写入 Paimon 表中。其中的 CompactManager 将异步触发压缩操作。
3、Committer Operator 是一个单例,负责提交和过期快照。

接下来,我们将逐步介绍端到端的数据流程:

0c81aca22d6de1f13cd71dd5e312c7de.png

MySQL CDC Source 首先读取快照数据和增量数据,然后对它们进行规范化处理,并将其发送到下游。

f1c5797ae530bdb31755ef8f5d4d07e6.png

Paimon Sink 首先将新记录缓存在基于堆的 LSM 树中,并在内存缓冲区满时将其 flush 到磁盘上。请注意,每个写入的数据文件都是一个 sorted run。在这个阶段,还没有创建 manifest 文件和 snapshot。在 Flink 执行 Checkpoint 之前,Paimon Sink 将 flush 所有缓冲的记录并发送可提交的消息到下游,下游会在 Checkpoint 期间由 Committer Operator 读取并提交。

b439f35f502c32b49c12c9d4e318a091.png

在 Checkpoint 期间,Committer Operator 将创建一个新的 snapshot,并将其与 manifest lists 关联,以便snapshot 包含表中所有数据文件的信息。

a91a006ef76561aab533def80691a5c8.png

稍等一会后,可能会进行异步压缩,CompactManager 生成的可提交消息包含有关先前文件和合并文件的信息,以便 Committer Operator 可以构建相应的 manifest entries。在这种情况下,Committer Operator 在Flink Checkpoint 期间可能会生成两个 snapshot,一个用于写入的数据(类型为 Append 的快照),另一个用于压缩(类型为 Compact 的快照)。如果在 Checkpoint 间隔期间没有写入数据文件,则只会创建类型为Compact 的快照。Committer Operator 将检查 snapshot 的过期情况,并对标记为删除的数据文件执行物理删除操作。

b2863a1c2486c6419bf27704d9ec5ff4.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1438078.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

006集——where语句进行属性筛选——arcgis

在arcgis中&#xff0c; dBASE 文件除了 WHERE 语句以外&#xff0c;不支持 其它 SQL 命令。选择窗口如下&#xff1a; 首先&#xff0c;我们了解下什么是where语句。 WHERE语句是SQL语言中使用频率很高的一种语句。它的作用是从数据库表中选择一些特定的记录行来进行操作。WHE…

第二证券:沪指涨近1%收复2800点,券商等板块拉升,稀土板块爆发

7日早盘&#xff0c;两市股指延续昨日强势&#xff0c;再度拉升。沪指涨近1%克复2800点&#xff0c;深成指、科创50指数大涨约3%&#xff1b;两市半日成交超6000亿元&#xff0c;北向资金净买入超20亿元。 截至午间收盘&#xff0c;沪指涨0.91%报2814.89点&#xff0c;深成指涨…

第1章 认识Flask

学习目标 了解Flask框架&#xff0c;能够说出Flask框架的发展史以及特点 熟悉隔离Python环境的创建方式&#xff0c;能够独立在计算机上创建隔离的Python环境 掌握Flask的安装方式&#xff0c;能够独立在计算机上安装Flask框架 掌握PyCharm配置隔离环境的方式&#xff0c;能…

电脑文件误删除怎么办?8个恢复软件解决电脑磁盘数据可能的误删

您是否刚刚发现您的电脑磁盘数据丢失了&#xff1f;不要绝望&#xff01;无论分区是否损坏、意外格式化或配置错误&#xff0c;存储在其上的文件都不一定会丢失到数字深渊。 我们已经卷起袖子&#xff0c;深入研究电脑分区恢复软件的广阔领域&#xff0c;为您带来一系列最有效…

如何在 emacs 上开始使用 Tree-Sitter (archlinux)

文章目录 如何在emacs上开始使用Tree-Sitter&#xff08;archlinux&#xff09; 如何在emacs上开始使用Tree-Sitter&#xff08;archlinux&#xff09; 在archlinux上使用比windows上不知道要方便多少倍&#xff01; $ sudo pacman -S emacs $ sudo pacman -S tree-sitter这里…

国内首个openEuler师训营圆满结营! 麒麟信安助力培养国产操作系统高质量师资人才

2024年1月22日&#xff0c;全国首个openEuler师训营圆满结营&#xff01;旨在深化产教融合&#xff0c;加速开源教育走进高校&#xff0c;提高师资队伍openEuler专业能力及实践教学水平。 本次师训营由长沙市大数据产业链、长沙市新一代自主安全计算系统产业链指导&#xff0c…

RxJava Subject

目录 AsyncSubjectBehaviorSubjectPublishSubjectReplaySubjectSerializedSubjectUnicastSubject 在Rxjava中&#xff0c; Subject可以同时表示Observer和Observable, 允许从单个源到多个子观察者multiple child Observers。 除了 onSubscribe(io.reactivex.disposables.Dispos…

云计算运维1

1、企业服务器LNMP环境搭建 集群&#xff1a;多台服务器在一起作同样的事 。分布式 &#xff1a;多台服务器在一起作不同的事 。 环境准备&#xff1a; 1、设置静态ip&#xff08;NAT模式网关为.2&#xff09; # cat /etc/sysconfig/network-scripts/ifcfg-ens33 TYPE"E…

【C生万物】C语言分支和循环语句

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;小杨水平有…

uniapp /微信小程序 使用map组件实现手绘地图方案

获取地图范围 点图拾取坐标-地图开放平台|腾讯位置服务 获取需要手绘地图左下角和右上角GPS坐标 以北京故宫为例&#xff1a; 截取需要手绘地图进行手绘地图制作 ​​​​​​​​​​​​​​ 素材处理 由于地图素材文件比较大&#xff0c;小程序又限制包大小<2M,无…

51单片机基础:定时器

1.定时器介绍 51单片机通常有两个定时器&#xff1a;定时器 0/1&#xff0c;好一点的可能有定时器3。 在介绍定时器之前我们先科普下几个知识&#xff1a; 1&#xff0c;CPU 时序的有关知识 ①振荡周期&#xff1a;为单片机提供定时信号的振荡源的周期&#xff08;晶振周期或…

RAPTOR:树组织检索的递归抽象处理

RAPTOR: RECURSIVE ABSTRACTIVE PROCESSING FOR TREE-ORGANIZED RETRIEVAL Title&#xff1a;树组织检索的递归抽象处理 https://arxiv.org/pdf/2401.18059.pdf 摘要 检索增强语言模型可以更好的融入长尾问题&#xff0c;但是现有的方法只检索短的连续块&#xff0c;限制了整…

深度测评:ONLYOFFICE 桌面编辑器 v8.0新功能

目录 前言 一、PDF表单处理&#xff1a;提升办公效率 二、RTL&#xff08;从右到左&#xff09;支持&#xff1a;满足不同语言习惯 三、Moodle集成&#xff1a;教育行业的新助力 四、本地界面主题&#xff1a;个性化办公体验 五、性能优化与稳定性提升 六、性能与稳定性…

Ubuntu Linux使用PL2302串口和minicom进行开发板调试

调试远程的服务器上面的BMC&#xff0c;服务器上面安装了Ubuntu&#xff0c;想着可以在服务器接个串口到BMC&#xff0c;然后SSH到服务器的Ubuntu&#xff0c;用minicom来查看串口信息。 准备&#xff1a; 服务器Ubuntu安装mimicom 本机可以ssh到Ubuntu 串口工具PL2302 或者CH3…

React+Antd+tree实现树多选功能(选中项受控+支持模糊检索)

1、先上效果 树型控件&#xff0c;选中项形成一棵新的树&#xff0c;若父选中&#xff0c;子自动选中&#xff0c;子取消&#xff0c;父不取消&#xff0c;子选中&#xff0c;所有的父节点自动取消。同时支持模糊检索&#xff0c;会检索出所有包含该内容的关联节点。 2、环境准…

【iOS ARKit】人形遮挡

人形遮挡简介 在 AR系统中&#xff0c;计算机通过对设备摄像头采集的图像进行视觉处理和组织&#xff0c;建立起实景空间&#xff0c;然后将生成的虚拟对象依据几何一致性原理嵌入到实景空间中&#xff0c;形成虚实融合的增强现实环境&#xff0c;再输出到显示系统中呈现给使用…

6.electron之上下文隔离,预加载JS脚本

如果可以实现记得点赞分享&#xff0c;谢谢老铁&#xff5e; Electron是一个使用 JavaScript、HTML 和 CSS 构建桌面应用程序的框架。 Electron 将 Chromium 和 Node.js 嵌入到了一个二进制文件中&#xff0c;因此它允许你仅需一个代码仓库&#xff0c;就可以撰写支持 Windows、…

白嫖10款游戏加速器,一年都不用开会员!

过年期间你们是走亲串戚还是窝家玩游戏、追剧&#xff1f;相信很多小伙伴都不会放过这个难得的假期&#xff0c;肯定是会百忙之中来两把的&#xff0c;那么人一多玩游戏肯定就会拥堵&#xff0c;有延迟。解决延迟最好的办法就是用加速器&#xff0c;当你的网络比别人强时&#…

Rust通用代码生成器莲花发布红莲尝鲜版二十一,前端代码生成物有巨大改进

Rust通用代码生成器莲花发布红莲尝鲜版二十一&#xff0c;前端代码生成物有巨大改进 Rust通用代码生成器莲花已发布红莲尝鲜版二十一&#xff0c;此版本采用了新的前端代码生成引擎&#xff1a;时空之门前端代码生成器6.2.0。此引擎支持Nodejs 21,Nodejs 18和Nodejs 14。消除了…

开源软件:技术创新与应用的推动力量

文章目录 每日一句正能量前言开源软件如何推动技术创新开源软件的历史开源软件的开发模式开源软件与闭源软件源代码和开发许可维护特点、支持和成本开源软件的优势减少开支可定制性快速创新发展透明度和安全性 开源软件的应用 常见问题后记 每日一句正能量 不好等待运气降临&am…