前言
使用SparkSql操作Iceberg表之前我们得先配置好catalog,配置方式参考这篇博客。
创建非分区表
Spark3使用USING iceberg来创建表:
CREATE TABLE prod.db.sample (
id bigint NOT NULL COMMENT 'unique id',
data string)
USING iceberg;
这里的数据类型,我们就用Spark的数据类型,iceberg会自动转成对应的iceberg类型。其实基本上一模一样,可以参考官网查看。
参数:
- PARTITIONED BY (partition-expressions) :配置分区
- LOCATION ‘(fully-qualified-uri)’ :指定表路径
- COMMENT ‘table documentation’ :配置表备注
- TBLPROPERTIES (‘key’=‘value’, …) :配置表属性
对 Iceberg 表的每次更改都会生成一个新的元数据文件(json 文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。如 果 要 自 动 清 除 元 数 据 文 件 , 在 表 属 性 中 设 置
write.metadata.delete-after-commit.enabled=true 。 这 将 保 留 一 些 元 数 据 文 件 ( 直 到
write.metadata.previous-versions-max),并在每个新创建的元数据文件之后删除旧的元数据文件。
创建分区表
分区使用PARTITIONED BY来指定。
- 非隐藏分区
CREATE TABLE prod.db.sample (
id bigint,
data string,
category string)
USING iceberg
PARTITIONED BY (category);
- 隐藏分区
CREATE TABLE prod.db.sample (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);
支持的隐藏分区转换函数有:
- year(ts):按年划分
- month(ts):按月划分
- day(ts)或 date(ts):等效于 dateint 分区
- hour(ts)或 date_hour(ts):等效于 dateint 和 hour 分区
- bucket(N, col):按哈希值划分 mod N 个桶
- truncate(L, col):按截断为 L 的值划分,字符串被截断为给定的长度;整型和长型截断为 bin: truncate(10, i)生成分区 0,10,20,30,…。
老的函数years(ts), months(ts), days(ts) and hours(ts)也支持。
CATS(CREATE TABLE … AS SELECT)建表
当使用SparkCatalog时,Iceberg支持将CTAS作为原子操作。但在使用SparkSessionCatalog时不是原子的。所以我们一般最好用SparkCatalog。否则会有一些潜在的风险。
CREATE TABLE prod.db.sample
USING iceberg
AS SELECT ...
新创建的表不会继承源表的分区和属性,可以使用CTAS中的PARTITIONED BY和TBLPROPERTIES来声明新表的分区和属性。
CREATE TABLE prod.db.sample
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...
RTAS(REPLACE TABLE … AS SELECT)建表
当使用SparkCatalog时,Iceberg支持将RTAS作为原子操作。但在使用SparkSessionCatalog时不是原子性的。所以我们一般最好用SparkCatalog。否则会有一些潜在的风险。
替换的表会根据select查询的结果创建新的快照,但是会保留原表的历史记录。
REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...
REPLACE TABLE prod.db.sample
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...
CREATE OR REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...
如果我们仅仅是替换表中的数据,而不改变表的结构或属性,那么用INSERT OVERWRITE来替换REPLACE
删除表
- 删除表
DROP TABLE prod.db.sample;
- 删除表和数据
DROP TABLE prod.db.sample PURGE;
在 0.14 之前,运行 DROP TABLE 将从 catalog 中删除表并删除表内容。
从 0.14 开始,DROP TABLE 只会从 catalog 中删除表,不会删除数据。为了删除表内容,应该使用 DROP table PURGE
修改表
Iceberg 在 Spark 3 中完全支持 ALTER TABLE,包括:
- 重命名表
- 设置或删除表属性
- 添加、删除和重命名列
- 添加、删除和重命名嵌套字段
- 重新排序顶级列和嵌套结构字段
- 扩大 int、float 和 decimal 字段的类型
- 将必选列变为可选
此外,还可以使用 SQL 扩展来添加对分区演变的支持和设置表的写顺序。
- 修改表名(ALTER TABLE … RENAME TO)
ALTER TABLE prod.db.sample RENAME TO prod.db.new_name;
- 修改表属性(ALTER TABLE … SET(UNSET) TBLPROPERTIES)
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
'read.split.target-size'='268435456'
);
包括修改comment
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
'comment' = 'A table comment.'
);
USET可以移除属性
ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
- 添加列(ALTER TABLE … ADD COLUMN)
ALTER TABLE hadoop_prod.default.sample
ADD COLUMNS (
category string comment 'new_column'
)
-- 添加 struct 类型的列
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN point struct<x: double, y: double>;
-- 往 struct 类型的列中添加字段
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN point.z double
-- 创建 struct 的嵌套数组列
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN points array<struct<x: double, y: double>>;
-- 在数组中的结构中添加一个字段。使用关键字'element'访问数组的元素列。
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN points.element.z double
-- 创建一个包含 Map 类型的列,key 和 value 都为 struct 类型
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;
-- 在 Map 类型的 value 的 struct 中添加一个字段。
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN pointsm.value.b int
在 Spark 2.4.4 及以后版本中,可以通过添加 FIRST 或 AFTER 子句在任何位置添加
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN new_column bigint AFTER id
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN new_column bigint FIRST
- 修改列名(ALTER TABLE … RENAME COLUMN)
ALTER TABLE prod.db.sample RENAME COLUMN data TO payload;
ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude;
- 修改类型(ALTER TABLE … ALTER COLUMN)
注意:只允许安全的转换
ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double;
- 修改注释(ALTER TABLE … ALTER COLUMN)
ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double COMMENT 'unit is bytes per second';
ALTER TABLE prod.db.sample ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';
- 修改列顺序
ALTER TABLE prod.db.sample ALTER COLUMN col FIRST;
ALTER TABLE prod.db.sample ALTER COLUMN nested.col AFTER other_col;
- 修改列是否允许为NULL
ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL;
ALTER COLUMN 不用于更新 struct 类型。使用 ADD COLUMN 和 DROP COLUMN 添加或删除 struct 类型的字段。
- 删除列(ALTER TABLE … DROP COLUMN)
ALTER TABLE prod.db.sample DROP COLUMN id;
ALTER TABLE prod.db.sample DROP COLUMN point.z;
- 添加分区(Spark3,需要配置扩展,ALTER TABLE … ADD PARTITION FIELD)
扩展配置,我们这篇博客也做了简单介绍,一般我们都会一次性修改Spark配置:
vim spark-default.conf
spark.sql.extensions =org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
ALTER TABLE prod.db.sample ADD PARTITION FIELD catalog; -- identity transform
修改分区转换:
ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id);
ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(4, data);
ALTER TABLE prod.db.sample ADD PARTITION FIELD year(ts);
-- use optional AS keyword to specify a custom name for the partition field
ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id) AS shard;
注意:添加分区字段是元数据操作,不会更改任何现有表数据。新数据将使用新分区写入,但现有数据将保留在旧分区布局中。对于元数据表中的新分区字段,旧数据文件将具有空值。
当表的分区发生变化时,动态分区覆盖行为也会发生变化,因为动态覆盖会隐式地替换分区。要显式覆盖,请使用新的DataFrameWriterV2 API。
重要
:
当你想要改变数据的分区粒度时,比如从每天的数据分区细化到每小时的数据分区,你可以利用transforms(转换)来实现这一点,而不需要删除原有的按天分区的字段。这么做的好处就是,历史的任务可能很多都是通过天粒度进行查看的,后面任务才会用小时查看,因此天分区不要删除了。这个很有用。
危险
当分区发生变化时,动态分区覆盖行为将发生变化。例如,如果你按天划分分区并改为按小时划分分区,覆盖将覆盖每小时分区,而不再覆盖按天分区。
- 删除分区(Spark3,需要配置扩展,ALTER TABLE … DROP PARTITION FIELD)
ALTER TABLE prod.db.sample DROP PARTITION FIELD catalog;
ALTER TABLE prod.db.sample DROP PARTITION FIELD bucket(16, id);
ALTER TABLE prod.db.sample DROP PARTITION FIELD truncate(4, data);
ALTER TABLE prod.db.sample DROP PARTITION FIELD year(ts);
ALTER TABLE prod.db.sample DROP PARTITION FIELD shard;
注意,尽管删除了分区,但列仍然存在于表结构中。
删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。
当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么覆盖将覆盖每小时划分的分区,而不再覆盖按天划分的分区。
删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。
- 修改分区(Spark3,需要配置扩展,ALTER TABLE … REPLACE PARTITION FIELD)
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts);
-- use optional AS keyword to specify a custom name for the new partition field
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts) AS day_of_ts;
- 修改表的写入顺序(ALTER TABLE … WRITE ORDERED BY)
ALTER TABLE prod.db.sample WRITE ORDERED BY category, id
-- use optional ASC/DEC keyword to specify sort order of each field (default ASC)
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC, id DESC
-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST
表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。
WRITE ORDERED BY 设置了一个全局排序,即跨任务的行排序,就像在 INSERT 命令中使用 ORDER BY 一样
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category
要在每个任务内排序,而不是跨任务排序,使用 local ORDERED BY:
ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id
- 按分区并行写入(ALTER TABLE … WRITE DISTRIBUTED BY PARTITION)
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id
参考文献
Spark DDL