调研Iceberg中表的原地演变
文章目录
- 调研Iceberg中表的原地演变
- 原生非分区表
- 文件关系图
- 表的原地演变之表schema演变
- 新增字段new_column
- 文件关系变化图
- 为新增字段写入数据
- 文件关系变化图
- 删除新增字段
- 文件关系变化图
- 新增字段new_column2
- 文件关系变化图
- 删除数据
- 文件关系变化图
- 原生分区表
- Iceberg支持如下几种分区转换
- 文件关系变化图
- 表的原地演变之分区演变
- 新增分区
- 文件关系变化图
- 删除分区
- 删除数据
- 文件关系变化图
- 小结
以《基于spark3.4.2+iceberg1.6.1搭建本地阅读调试环境》为基础环境,调研原地演变特性
工程中iceberg_warehouse
是spark.sql.catalog.local.warehouse
指定了 Iceberg 数据文件和元数据文件的存放路径。
原生非分区表
创建非分区原生表,并插入数据。
// 1.创建库
spark.sql("create database iceberg_db");
// 2.新建表
spark.sql("CREATE TABLE local.iceberg_db.table1 (id bigint, data string) USING iceberg ");
// 3.第1次新增数据
spark.sql("INSERT INTO local.iceberg_db.table1 VALUES (1, 'a'), (2, 'b'), (3, 'c')");
// 4.第2次新增数据
spark.sql("INSERT INTO local.iceberg_db.table1 VALUES (4, 'd'), (5, 'e'), (6, 'f')");
// 5.第3次新增数据
spark.sql("INSERT INTO local.iceberg_db.table1 VALUES (7, 'g'), (8, 'h'), (9, 'i')");
文件关系图
新建表时,会触发元数据的变化,此时是没有数据文件的,所以只有v1.metadata.json
文件。
snap-
开头的是清单列表文件(manifest list)- 紧接着snap之后的数字开头的是清单文件(manifest file)
表的原地演变之表schema演变
新增字段new_column
// 6.新增字段new_column
spark.sql("ALTER TABLE local.iceberg_db.table1 " +
"ADD COLUMNS ( new_column string comment 'new_column docs' )");
文件关系变化图
v5.metadata.json
中 schemas
以数组的形式记录了不同的表schema。以schema-id
区分。new_column
字段上有对应的字段id3
。current-schema-id
中是当前生效的schema-id
。
"current-schema-id" : 1,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "data",
"required" : false,
"type" : "string"
} ]
}, {
"type" : "struct",
"schema-id" : 1,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "data",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "new_column",
"required" : false,
"type" : "string",
"doc" : "new_column docs"
} ]
} ]
为新增字段写入数据
// 7.为新增字段new_column增加数据
spark.sql("INSERT INTO local.iceberg_db.table1 VALUES (10, 'j','new1'), (11, 'k','new2'), (12, 'l','new3')");
Dataset<Row> result = spark.sql("select * from local.iceberg_db.table1");
result.show();
查询结果
表的schema中新增字段在之前的记录以null
填充展示。
+---+----+----------+
| id|data|new_column|
+---+----+----------+
| 7| g| null|
| 8| h| null|
| 9| i| null|
| 1| a| null|
| 2| b| null|
| 3| c| null|
| 10| j| new1|
| 11| k| new2|
| 12| l| new3|
| 4| d| null|
| 5| e| null|
| 6| f| null|
+---+----+----------+
文件关系变化图
删除新增字段
// 8.删除字段new_column
spark.sql("ALTER TABLE local.iceberg_db.table1 DROP COLUMNS new_column");
Dataset<Row> result = spark.sql("select * from local.iceberg_db.table1");
result.show();
查询结果
表schema的删除字段在,之前的记录全部除了删除字段,全部可以查询展示。
+---+----+
| id|data|
+---+----+
| 4| d|
| 5| e|
| 6| f|
| 1| a|
| 2| b|
| 3| c|
| 10| j|
| 11| k|
| 12| l|
| 7| g|
| 8| h|
| 9| i|
+---+----+
文件关系变化图
v7.metadata.json
中current-schema-id
中是当前生效的schema-id
改为了0
。
新增字段new_column2
// 9.删除字段new_column
spark.sql("ALTER TABLE local.iceberg_db.table1 " +
"ADD COLUMNS ( new_column2 string comment 'new_column2 docs' )");
文件关系变化图
v8.metadata.json
中schemas
的变化,删除字段new_column
的id3
,不会再之后新增的new_column2
不会再使用了。
"current-schema-id" : 2,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "data",
"required" : false,
"type" : "string"
} ]
}, {
"type" : "struct",
"schema-id" : 1,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "data",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "new_column",
"required" : false,
"type" : "string",
"doc" : "new_column docs"
} ]
}, {
"type" : "struct",
"schema-id" : 2,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "data",
"required" : false,
"type" : "string"
}, {
"id" : 4,
"name" : "new_column2",
"required" : false,
"type" : "string",
"doc" : "new_column2 docs"
} ]
} ]
删除数据
// 10.删除字段new_column
spark.sql("DELETE FROM local.iceberg_db.table1 where id in (2,5,10)");
Dataset<Row> result = spark.sql("select * from local.iceberg_db.table1");
result.show();
文件关系变化图
不同版本的metadata
文件会使用不同的清单文件指向相同的数据文件,清单文件(manifest file)中的status
字段取值说明,值1
代表add,值2
代表删除。
原生分区表
Iceberg支持如下几种分区转换
转换名称 | 描述 | 源字段类型 | 结果类型 |
---|---|---|---|
identity | id值,默认没有转换函数。注意:如果用时间戳做为分区的话,每个时间戳是一个分区,随着数据的写入,元数据很快会崩溃 | Any | Source type |
bucket[N] | 哈希值],模N | int , long , decimal , date , time , timestamp , timestamptz , timestamp_ns , timestamptz_ns , string , uuid , fixed , binary | int |
truncate[W] | 将字段按宽度截取 | int , long , decimal , string , binary | 与源字段类型一致,如果源字段是字符串则截取W长度,如果是int/long则相除W倍后取整 |
year | 将时间转换为年 | date , timestamp , timestamptz , timestamp_ns , timestamptz_ns | int |
month | 将时间转换为月 | date , timestamp , timestamptz , timestamp_ns , timestamptz_ns | int |
day | 将时间转换为日 | date , timestamp , timestamptz , timestamp_ns , timestamptz_ns | int |
hour | 将时间转换为小时 | timestamp , timestamptz , timestamp_ns , timestamptz_ns | int |
void | Always produces null | Any | Source type or int |
创建分区原生表,使用分区转换进行隐藏分区,并插入数据。
// 1.创建分区表,以month方法进行隐藏式分区
spark.sql("CREATE TABLE local.iceberg_db.table2( id bigint, data string, ts timestamp) USING iceberg PARTITIONED BY (month(ts))");
// 2.新增数据
spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (1, 'a', cast(1727601585 as timestamp)),(2, 'b', cast(1724923185 as timestamp)),(3, 'c', cast(1724919585 as timestamp))");
文件关系变化图
Iceberg 通过获取列值并对其进行可选转换来生成分区值。建表时,ts
字段类型是使用timestamp
,默认使用带时区的timestamptz
。
在v1.metadata.json
中partition-specs
以数组的形式记录了不同的表分区规则,以spec-id
区分。default-spec-id
中是当前生效的spec-id
。
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "ts_month",
"transform" : "month",
"source-id" : 3,
"field-id" : 1000
} ]
} ]
表的原地演变之分区演变
新增分区
// 3.以day()方法新增分区
spark.sql("ALTER TABLE local.iceberg_db.table2 ADD PARTITION FIELD day(ts)");
// 4.新增数据
spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (4, 'd', cast(1727605185 as timestamp)),(5, 'e', cast(1725963585 as timestamp)),(6, 'f', cast(1726827585 as timestamp))");
文件关系变化图
v3.metadata.json
中partition-specs
的变化,default-spec-id
采用了新的分区组合spec-id
为1
。
"default-spec-id" : 1,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "ts_month",
"transform" : "month",
"source-id" : 3,
"field-id" : 1000
} ]
}, {
"spec-id" : 1,
"fields" : [ {
"name" : "ts_month",
"transform" : "month",
"source-id" : 3,
"field-id" : 1000
}, {
"name" : "ts_day",
"transform" : "day",
"source-id" : 3,
"field-id" : 1001
} ]
} ]
可以发现:
- 由
v3.metadata.json
发现分区演变是一种元数据操作,并不急于重写文件。 - 表分区可以在现有表中更新
- 多个分区的共同存在。
删除分区
spark.sql("ALTER TABLE local.iceberg_db.table2 DROP PARTITION FIELD month(ts)");
v5.metadata.json
中partition-specs
的变化,default-spec-id
采用了新的分区组合spec-id
为2
。
"default-spec-id" : 2,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "ts_month",
"transform" : "month",
"source-id" : 3,
"field-id" : 1000
} ]
}, {
"spec-id" : 1,
"fields" : [ {
"name" : "ts_month",
"transform" : "month",
"source-id" : 3,
"field-id" : 1000
}, {
"name" : "ts_day",
"transform" : "day",
"source-id" : 3,
"field-id" : 1001
} ]
}, {
"spec-id" : 2,
"fields" : [ {
"name" : "ts_day",
"transform" : "day",
"source-id" : 3,
"field-id" : 1001
} ]
} ]
删除数据
spark.sql("DELETE FROM local.iceberg_db.table2 where id in (2)");
文件关系变化图
删除数据操作会触发数据文件的变化,此时目录ts_day=2024-08-29
已经于ts_month=2024-08
平级。ts_day=2024-08-29
中的数据文件会保留删除之后的数据。
由于分区的变化后,旧的分区规则产生的数据文件发生了数据变化,会产生一个新清单文件(maifest file)中的,会对旧的数据文件进行索引,以上述为例,v6.metadata.json
对应的清单列表文件(maifest list)中存储了一个清单文件(maifest file)即虚线框展示的,其中存储了两个datafile的引用,status=2
代表删除,status=0
代表文件已经存在。
小结
- 每一个操作都会产生一个新的元数据文件(
metadata.json
),需要配置自动清理元数据文件 - 所有一个文件都伴有一个
.crc
文件,小文件的问题怎么办? - Iceberg使用唯一的id来跟踪表中的每一列。添加列时,将为其分配一个新ID,以便不会错误地使用现有数据。
- 分区演变时,是元数据的操作,数据文件的操作是滞后的,有数据变动时才会进行文件的重写。