Apache Iceberg 与 Spark整合-使用教程(Iceberg 官方文档解析)

news2024/11/20 19:32:19

官方文档链接(Spark整合Iceberg)

在这里插入图片描述

1.Getting Started

Spark 目前是进行 Iceberg 操作最丰富的计算引擎。官方建议从 Spark 开始,以理解 Iceberg 的概念和功能。

The latest version of Iceberg is 1.6.1.(2024年9月24日11:45:55)

在 Spark shell 中使用 Iceberg,需使用 --packages 选项:

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

Tips:
在这里插入图片描述

如果您想将 Iceberg 包含在 Spark 安装中,请将 iceberg-spark-runtime-3.5_2.12 Jar 添加到 Spark 的 jars 文件夹中。

Adding Catalogs 添加目录

Iceberg 提供了目录功能,使 SQL 命令能够管理表并通过名称加载它们。目录通过以下属性进行配置:spark.sql.catalog.(catalog_name)

创建一个名为 local 的基于路径的目录,用于管理 $PWD/warehouse 下的表,并为 Spark 的内置目录添加对 Iceberg 表的支持:

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=$PWD/warehouse

创建 Iceberg 表

在 Spark 中创建第一个 Iceberg 表,可以使用 spark-sql shell 或 spark.sql(...) 来运行 CREATE TABLE 命令:
在这里插入图片描述

-- local 是上述定义的基于路径的目录
CREATE TABLE local.db.table (id bigint, data string) USING iceberg;

Iceberg 目录支持完整的 SQL DDL 命令,包括:

  • CREATE TABLE ... PARTITIONED BY
  • CREATE TABLE ... AS SELECT
  • ALTER TABLE
  • DROP TABLE

写入数据

创建表后,可以使用 INSERT INTO 向表中插入数据:

INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;

Iceberg 还支持行级 SQL 更新,包括 MERGE INTODELETE FROM

MERGE INTO local.db.target t 
USING (SELECT * FROM updates) u 
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count
WHEN NOT MATCHED THEN INSERT *;

此外,Iceberg 支持通过新的 v2 DataFrame 写入 API 写入 DataFrames:

spark.table("source").select("id", "data")
     .writeTo("local.db.table").append()

旧的写入 API 得到支持,但不推荐使用。

读取数据

要使用 SQL 读取数据,可以在 SELECT 查询中使用 Iceberg 表的名称:

SELECT count(1) as count, data
FROM local.db.table
GROUP BY data;

SQL 也是检查表的推荐方式。要查看表中的所有快照,可以使用快照元数据表:

SELECT * FROM local.db.table.snapshots;

输出:

+-------------------------+----------------+-----------+-----------+----------------------------------------------------+
| committed_at            | snapshot_id    | parent_id | operation | manifest_list                                      |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+
| 2019-02-08 03:29:51.215 | 57897183625154 | null      | append    | s3://.../table/metadata/snap-57897183625154-1.avro |
|                         |                |           |           |                                                    |
| ...                     | ...            | ...       | ...       | ...                                                |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+

DataFrame 读取也得到了支持,可以通过名称引用表:

val df = spark.table("local.db.table")
df.count()

这就是在 Spark 中使用 Iceberg 创建、写入和读取表的基本步骤。


Spark 和 Iceberg 的类型兼容性

Spark type to Iceberg type
Spark 类型Iceberg 类型备注
booleanboolean
shortinteger
byteinteger
integerinteger
longlong
floatfloat
doubledouble
datedate
timestamp带时区的时间戳
timestamp_ntz不带时区的时间戳
charstring
varcharstring
stringstring
binarybinary
decimaldecimal
structstruct
arraylist
mapmap

Tips:

  • 数字类型(integer、long、float、double、decimal)在写入时支持提升。例如,可以将 Spark 类型 shortbyteintegerlong 写入 Iceberg 类型 long
  • 可以使用 Spark 的 binary 类型写入 Iceberg 固定类型,但会进行长度验证。
Iceberg type to Spark type
Iceberg 类型Spark 类型备注
booleanboolean
integerinteger
longlong
floatfloat
doubledouble
datedate
time不支持
带时区的时间戳timestamp
不带时区的时间戳timestamp_ntz
stringstring
uuidstring
fixedbinary
binarybinary
decimaldecimal
structstruct
listarray
mapmap

2.Spark DDL

(1)CREATE TABLE 创建表

Spark 3 can create tables in any Iceberg catalog with the clause USING iceberg:

CREATE TABLE prod.db.sample (
    id bigint NOT NULL COMMENT '唯一ID',
    data string)
USING iceberg;

在这里插入图片描述

Iceberg会将Spark中的列类型转换为相应的Iceberg类型。

创建表的命令(包括CTAS和RTAS)支持一系列Spark创建选项,包括:

  • PARTITIONED BY (partition-expressions):配置分区。
  • LOCATION ‘(fully-qualified-uri)’:设置表的位置。
  • COMMENT ‘table documentation’:设置表描述。
  • TBLPROPERTIES (‘key’=‘value’, …):设置表配置。

Tips:CREATE TABLE ... LIKE ...语法不受支持。

(2)PARTITIONED BY 分区表的创建

在这里插入图片描述

CREATE TABLE prod.db.sample (
    id bigint,
    data string,
    category string)
USING iceberg
PARTITIONED BY (category);

PARTITIONED BY子句支持转换表达式以创建隐藏分区

CREATE TABLE prod.db.sample (
    id bigint,
    data string,
    category string,
    ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);

支持的分区转换表达式:

  • year(ts):按年分区
  • month(ts):按月分区
  • day(ts)date(ts):等同于按日期整数分区
  • hour(ts)date_hour(ts):等同于按日期整数和小时分区
  • bucket(N, col):按哈希值取模N的分区
  • truncate(L, col):按截断值分区(字符串按照给定长度截断;整数和长整型按区间截断,例如 truncate(10, i) 生成分区 0, 10, 20, 30, …)

注:为了向后兼容,旧语法 years(ts)months(ts)days(ts)hours(ts) 也被支持。


(3)创建表(CTAS)

Iceberg 支持使用 SparkCatalog 进行原子操作的 CREATE TABLE AS SELECT(CTAS)。在使用 SparkSessionCatalog 时,CTAS 是不原子的。

基本语法:

CREATE TABLE prod.db.sample
USING iceberg
AS SELECT ...;
  • 新创建的表不会继承源表的分区规范和表属性。可以使用 PARTITIONED BYTBLPROPERTIES 来声明新表的分区规范和表属性

示例:

CREATE TABLE prod.db.sample
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...;

(4)替换表(RTAS)

  • 原子替换表操作会创建一个新快照,保留表的历史记录。

基本语法:

REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...;

示例:

REPLACE TABLE prod.db.sample
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...;

(5)创建或替换表

CREATE OR REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...;
  1. 替换表时的影响

    • 如果使用 REPLACE TABLE 命令来替换一个表,并且新的查询结果的模式(schema)或分区规范(partition spec)发生了变化,那么原有的模式和分区会被新的内容替换。
  2. 如何避免修改

    • 如果想保持表的现有模式和分区不变,可以使用 INSERT OVERWRITE 命令,而不是 REPLACE TABLE。这样做可以更新表的数据,但不会影响表的结构或分区设置。
  3. 表属性的处理

    • REPLACE TABLE 命令中,如果你定义了新的表属性,这些新属性会与现有的属性合并。
    • 如果新属性与现有属性相同,则保持不变;如果不同,则会更新现有的属性。
  • 使用 REPLACE TABLE 会改变表的结构和分区。
  • 使用 INSERT OVERWRITE 可以仅更新数据而不影响结构。
  • 新的表属性会与旧的表属性合并,存在冲突时会更新。

(6)删除表

  • 在 0.14 之前,运行 DROP TABLE 将从目录中移除表,并删除表内容。
  • 从 0.14 开始,DROP TABLE 仅从目录中移除表。要删除表内容,需使用 DROP TABLE PURGE

基本语法:

  • 删除表:
DROP TABLE prod.db.sample;
  • 删除表及其内容:
DROP TABLE prod.db.sample PURGE;

(7)修改表

1.重命名表 ALTER TABLE … RENAME TO:

ALTER TABLE prod.db.sample RENAME TO prod.db.new_name;

2.设置或移除表属性:

  • 设置属性 ALTER TABLE … SET TBLPROPERTIES
    ALTER TABLE prod.db.sample SET TBLPROPERTIES (
        'read.split.target-size'='268435456'
    );
    
  • 移除属性 ALTER TABLE … UNSET TBLPROPERTIES
    ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
    
  • 设置表注释
    ALTER TABLE prod.db.sample SET TBLPROPERTIES (
        'comment' = 'A table comment.'
    );
    

在这里插入图片描述

  1. 添加、删除和重命名列
    • 添加列 ALTER TABLE … ADD COLUMN
      ALTER TABLE prod.db.sample ADD COLUMNS (
          new_column string comment 'new_column docs'
      );
      

在这里插入图片描述

  1. 添加嵌套字段

    • 创建结构列
      ALTER TABLE prod.db.sample ADD COLUMN point struct<x: double, y: double>;
      
    • 向结构中添加字段
      ALTER TABLE prod.db.sample ADD COLUMN point.z double;
      
    • 创建嵌套数组列
      ALTER TABLE prod.db.sample ADD COLUMN points array<struct<x: double, y: double>>;
      
    • 向数组中的结构添加字段
      ALTER TABLE prod.db.sample ADD COLUMN points.element.z double;
      
    • 创建映射列
      ALTER TABLE prod.db.sample ADD COLUMN points map<struct<x: int>, struct<a: int>>;
      
    • 向映射值结构中添加字段
      ALTER TABLE prod.db.sample ADD COLUMN points.value.b int;
      
  2. 调整列的位置
    在这里插入图片描述

    • 在其他列后添加新列
      ALTER TABLE prod.db.sample ADD COLUMN new_column bigint AFTER other_column;
      
    • 在最前面添加新列
      ALTER TABLE prod.db.sample ADD COLUMN nested.new_column bigint FIRST;
      
  • Note: Altering a map ‘key’ column by adding columns is not allowed. Only map values can be updated.
  • 修改map的“键”列以添加列是不允许的,只能更新map的“值”。

6.重命名列 RENAME COLUMN
使用 RENAME COLUMN 可以重命名任何字段:

ALTER TABLE prod.db.sample RENAME COLUMN data TO payload;
ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude;

在 Iceberg 中,嵌套重命名命令只会影响到最底层的字段(即叶子字段)。

location.lat 重命名为 location.latitude

location
└─ lat
└─ long

执行命令后,结构变为:

location
└─ latitude
└─ long

这里的 location 仍然是父结构,但 lat 字段被重命名为 latitude,而其他字段保持不变。这就是嵌套重命名的含义。

7.修改列类型 ALTER COLUMN
使用 ALTER COLUMN 来修改列的类型,前提是这种更新是安全的。安全更新包括:

  • intbigint
  • floatdouble
  • decimal(P,S)decimal(P2,S) (当 P2 > P 时)

示例:

ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double;

8.更新列注释

ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double COMMENT 'unit is bytes per second';
ALTER TABLE prod.db.sample ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';

9. 重新排序列
使用 FIRSTAFTER 子句,可以重新排序顶级列或结构中的列:

ALTER TABLE prod.db.sample ALTER COLUMN col FIRST;
ALTER TABLE prod.db.sample ALTER COLUMN nested.col AFTER other_col;

10. 更改列的可空性
对于非可空列,可以使用 DROP NOT NULL 更改可空性:

ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL;

注意:不能通过 SET NOT NULL 将可空列更改为非可空列,因为 Iceberg 无法确保是否存在空值。

11. 删除列
要删除列,可以使用 DROP COLUMN

ALTER TABLE prod.db.sample DROP COLUMN id;
ALTER TABLE prod.db.sample DROP COLUMN point.z;

ALTER TABLE SQL 扩展

Iceberg 支持使用 ADD PARTITION FIELD 命令向分区规范中添加新的分区字段:

ALTER TABLE prod.db.sample ADD PARTITION FIELD catalog; -- 身份变换

也可以使用不同的分区变换,例如:

ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id);
ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(4, data);
ALTER TABLE prod.db.sample ADD PARTITION FIELD year(ts);

使用可选的 AS 关键字可以为分区字段指定自定义名称:

ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id) AS shard;
  • 添加分区字段是一个元数据操作,不会更改现有表的数据。新数据将按照新的分区方式写入,但现有数据仍将保持在旧的分区布局中。在元数据表中,旧数据文件的新的分区字段将显示为 null 值。

当表的分区发生变化时,动态分区覆盖行为将发生变化,因为动态覆盖会隐式替换分区。要显式覆盖,请使用新的 DataFrameWriterV2 API。

在这里插入图片描述

如果从按天分区迁移到按小时分区,动态分区覆盖行为将有所不同。例如,如果原本是按天分区,改为按小时分区,覆盖操作将只覆盖小时分区,而不再覆盖天分区。

如果需要从日分区迁移到小时分区,建议保留日分区字段,以确保现有元数据表查询能够继续正常工作。


3.Spark Queries

选择表中的所有记录

SELECT * FROM prod.db.table;
  • prod 是目录,db 是命名空间,table 是表名。

访问元数据表:

可以使用 Iceberg 表名作为命名空间查询元数据表,例如,要读取特定 Iceberg 表的文件元数据:

在这里插入图片描述

使用 DataFrame 查询

要将 Iceberg 表加载为 Spark 中的 DataFrame,可以使用以下命令:

val df = spark.table("prod.db.table")

使用此命令可以利用 Spark DataFrame 操作的全范围对 DataFrame 进行操作。

加载 DataFrame 后,可以执行各种操作。例

  1. 显示 DataFrame:

    df.show()
    
  2. 过滤记录:

    val filteredDf = df.filter($"columnName" === "someValue")
    
  3. 分组和聚合:

    val aggregatedDf = df.groupBy("columnName").count()
    
  4. 写回 Iceberg 表:

    filteredDf.write.format("iceberg").mode("append").save("prod.db.table")
    

Iceberg 表的时间旅行(Time Travel)

从 Spark 3.3 开始,Iceberg 支持在 SQL 查询中使用 TIMESTAMP AS OFVERSION AS OF 子句进行时间旅行。

时间旅行查询示例:

  1. 基于时间戳的查询

    • 时间旅行到1986年10月26日01:21:00
      SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
      
  2. 基于快照 ID 的查询

    • 时间旅行到快照 ID 为 10963874102873 的快照
      SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
      
  3. 基于分支的查询

    • 时间旅行到 audit-branch 的最新快照
      SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
      
  4. 基于标签的查询

    • 时间旅行到 historical-snapshot 标签引用的快照
      SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
      

时间戳也可以以 Unix 时间戳(秒)提供:

  • 使用 Unix 时间戳查询
    SELECT * FROM prod.db.table TIMESTAMP AS OF 499162860;
    SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF 499162860;
    

可以使用类似于元数据表的语法指定分支或标签:

  • 指定分支

    SELECT * FROM prod.db.table.`branch_audit-branch`;
    
  • 指定标签

    SELECT * FROM prod.db.table.`tag_historical-snapshot`;
    

(包含“-”的标识符无效,因此必须使用反引号转义。)

不同的时间旅行查询可以使用快照的架构或表的架构:

  • 使用快照的架构(基于时间戳和快照 ID 的查询)

    SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
    SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
    SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
    SELECT * FROM prod.db.table.`tag_historical-snapshot`;
    
  • 使用表的架构(基于分支的查询)

    SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
    SELECT * FROM prod.db.table.`branch_audit-branch`;
    

Iceberg 中的DataFrame时间旅行

Iceberg 支持在 DataFrame API 中使用四个 Spark 读取选项来选择特定的表快照或特定时间的快照:

  1. snapshot-id: 选择特定的表快照。
  2. as-of-timestamp: 选择某个时间点的当前快照,单位为毫秒。
  3. branch: 选择指定分支的最新快照。注意,目前不能将分支与时间戳结合使用。
  4. tag: 选择与指定标签关联的快照。标签也不能与时间戳结合使用。

示例:

  1. 时间旅行到1986年10月26日01:21:00

    spark.read
        .option("as-of-timestamp", "499162860000")
        .format("iceberg")
        .load("path/to/table")
    
  2. 时间旅行到快照 ID 为 10963874102873 的快照

    spark.read
        .option("snapshot-id", 10963874102873L)
        .format("iceberg")
        .load("path/to/table")
    
  3. 时间旅行到标签 historical-snapshot

    spark.read
        .option(SparkReadOptions.TAG, "historical-snapshot")
        .format("iceberg")
        .load("path/to/table")
    
  4. 时间旅行到 audit-branch 的最新快照

    spark.read
        .option(SparkReadOptions.BRANCH, "audit-branch")
        .format("iceberg")
        .load("path/to/table")
    

Iceberg增量读取

  1. start-snapshot-id: 用于增量扫描的起始快照 ID(不包括该快照)。
  2. end-snapshot-id: 用于增量扫描的结束快照 ID(包括该快照)。这是可选的。如果省略,将默认为当前快照。

示例代码

// 获取在 start-snapshot-id (10963874102873L) 之后追加的数据,直到 end-snapshot-id (63874143573109L)
spark.read
  .format("iceberg")
  .option("start-snapshot-id", "10963874102873")
  .option("end-snapshot-id", "63874143573109")
  .load("path/to/table")
  • 当前仅支持从追加操作中获取数据,无法支持替换、覆盖或删除操作。
  • 增量读取适用于 V1 和 V2 格式版本。
  • Spark 的 SQL 语法不支持增量读取。

在这里插入图片描述


Iceberg 支持使用元数据表来检查表的历史和快照。元数据表通过在原始表名称后添加元数据表名来识别。例如,查看 db.table 的历史可以使用 db.table.history

表历史查询

要显示表的历史,可以执行以下查询:

SELECT * FROM prod.db.table.history;
made_current_atsnapshot_idparent_idis_current_ancestor
2019-02-08 03:29:51.2155781947118336215154NULLtrue
2019-02-08 03:47:55.94851792995261850568305781947118336215154true
2019-02-09 16:24:30.132964100402475335445179299526185056830false
2019-02-09 16:32:47.33629998756080624373305179299526185056830true
2019-02-09 19:42:03.91989245587860605834792999875608062437330true
2019-02-09 19:49:16.34365367338231819750458924558786060583479true

元数据日志条目查询

要查看表的元数据日志条目,可以执行以下查询:

SELECT * FROM prod.db.table.metadata_log_entries;
timestampfilelatest_snapshot_idlatest_schema_idlatest_sequence_number
2022-07-28 10:43:52.93s3://…/table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.jsonnullnullnull
2022-07-28 10:43:57.487s3://…/table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json17026083367764530001
2022-07-28 10:43:58.25s3://…/table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json95890649397670977402

快照查询

SELECT * FROM prod.db.table.snapshots;

示例结果:

committed_atsnapshot_idparent_idoperationmanifest_listsummary
2019-02-08 03:29:51.21557897183625154nullappends3://…/table/metadata/snap-57897183625154-1.avro{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, spark.app.id -> application_1520379288616_155055 }

Apache Iceberg 是一个高性能的表格式,用于大数据处理。下面我将根据你提到的各个方面,概述 Iceberg 的使用。

Entries 快照的详细信息

Entries 表提供了有关表中每个快照的详细信息。查询示例如下:

SELECT * FROM my_catalog.db.my_table.entries;

4.Spark Write

1. 特性支持

Iceberg 利用 Apache Spark 的 DataSourceV2 API,支持多种写入方式。不同版本的 Spark 对某些功能的支持程度不同:

功能Spark 3备注
SQL 插入✔️⚠ 需要 spark.sql.storeAssignmentPolicy=ANSI(自 Spark 3.0 默认)
SQL 合并✔️⚠ 需要 Iceberg Spark 扩展
SQL 覆盖插入✔️⚠ 需要 spark.sql.storeAssignmentPolicy=ANSI
SQL 删除✔️⚠ 行级删除需要 Iceberg Spark 扩展
SQL 更新✔️⚠ 需要 Iceberg Spark 扩展
DataFrame 附加写入✔️
DataFrame 覆盖写入✔️
DataFrame CTAS 和 RTAS✔️⚠ 需要 DSv2 API

2. 使用 SQL 写入

Spark 3 支持 SQL 的 INSERT INTOMERGE INTOINSERT OVERWRITE 操作。

INSERT INTO

用于向表中追加新数据:

INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b');
INSERT INTO prod.db.table SELECT ...;

MERGE INTO

支持对目标表进行行级更新。Iceberg 通过重写包含需要更新行的数据文件来实现 MERGE INTO

推荐使用 MERGE INTO 而不是 INSERT OVERWRITE,因为它只替换受影响的数据文件,避免了因分区变化导致的数据覆盖不一致问题。

MERGE INTO prod.db.target t   -- 目标表
USING (SELECT ...) s          -- 源更新
ON t.id = s.id                -- 用于找到更新的条件
WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
WHEN NOT MATCHED THEN INSERT *;

可以根据条件添加多个 WHEN MATCHED 子句。如果源数据中的多条记录匹配同一目标表的行,将会抛出错误。


INSERT OVERWRITE

在 Iceberg 中,INSERT OVERWRITE 允许用查询结果替换表中的数据。此操作是原子的,确保数据的一致性。

1. 覆盖行为

Iceberg 支持两种覆盖模式:静态模式动态模式

  • 静态覆盖模式(默认):

    • 通过将 PARTITION 子句转换为过滤器来确定要覆盖的分区。
    • 如果省略了 PARTITION 子句,将替换表中的所有分区。
  • 动态覆盖模式(推荐):

    • 仅替换由 SELECT 查询产生的行所在的分区。
    • 通过设置 spark.sql.sources.partitionOverwriteMode=dynamic 来启用。
2. 示例表结构

以下是一个示例日志表的 DDL 定义:

CREATE TABLE prod.my_app.logs (
    uuid string NOT NULL,
    level string NOT NULL,
    ts timestamp NOT NULL,
    message string)
USING iceberg
PARTITIONED BY (level, hours(ts))
3. 动态覆盖示例

当 Spark 的覆盖模式为动态时,以下查询会替换所有包含查询结果的分区:

INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid

在动态模式下,仅会替换 2020 年 7 月 1 日的小时分区。

4. 静态覆盖示例

在静态模式下,如果没有 PARTITION 子句,将替换所有现有行:

INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid

这将删除表中所有行,只写入 7 月 1 日的日志。

要仅覆盖特定分区,可以添加 PARTITION 子句:

INSERT OVERWRITE prod.my_app.logs
PARTITION (level = 'INFO')
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid

注意:静态模式无法像动态模式那样替换小时分区,因为 PARTITION 子句只能引用表列,而不能引用隐藏分区。


DELETE

DELETE FROM 查询允许根据条件过滤来删除表中的行。

  1. 删除指定时间范围内的记录:

    DELETE FROM prod.db.table
    WHERE ts >= '2020-05-01 00:00:00' AND ts < '2020-06-01 00:00:00'
    
  2. 删除all_events表中 session_time 小于good_events表中的最小 session_time 的记录:

    DELETE FROM prod.db.all_events
    WHERE session_time < (SELECT MIN(session_time) FROM prod.db.good_events)
    
  3. 删除orders表中存在于returned_orders表的订单:

    DELETE FROM prod.db.orders AS t1
    WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
    

注意:

  • 如果删除条件匹配整个分区,Iceberg 将执行元数据仅删除(metadata-only delete)。
  • 如果删除条件匹配单个行,Iceberg 将仅重写受影响的数据文件。

UPDATE

  1. 更新指定时间范围内的记录的字段值:

    UPDATE prod.db.table
    SET c1 = 'update_c1', c2 = 'update_c2'
    WHERE ts >= '2020-05-01 00:00:00' AND ts < '2020-06-01 00:00:00'
    
  2. 更新all_events表中 session_time 小于good_events表中的最小 session_time 的记录:

    UPDATE prod.db.all_events
    SET session_time = 0, ignored = true
    WHERE session_time < (SELECT MIN(session_time) FROM prod.db.good_events)
    
  3. 更新orders表中存在于returned_orders表的订单状态:

    UPDATE prod.db.orders AS t1
    SET order_status = 'returned'
    WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
    

Iceberg 分支写入指南

  • 分支存在性:在执行任何写入操作之前,分支必须已经存在。可以使用 Spark DDL 命令创建分支。
  • 模式验证:在向分支写入数据时,将验证表的当前模式。

通过 SQL 写入

  1. 插入到审计分支

    INSERT INTO prod.db.table.branch_audit VALUES (1, 'a'), (2, 'b');
    
  2. 合并到审计分支

    MERGE INTO prod.db.table.branch_audit t 
    USING (SELECT ...) s        
    ON t.id = s.id          
    WHEN ...
    
  3. 更新审计分支

    UPDATE prod.db.table.branch_audit AS t1
    SET val = 'c';
    
  4. 从审计分支删除

    DELETE FROM prod.db.table.branch_audit WHERE id = 2;
    
  5. WAP 分支写入

    SET spark.wap.branch = audit-branch;
    INSERT INTO prod.db.table VALUES (3, 'c');
    

通过 DataFrame 写入

  1. 插入到审计分支

    val data: DataFrame = ...
    data.writeTo("prod.db.table.branch_audit").append()
    
  2. 覆盖审计分支

    val data: DataFrame = ...
    data.writeTo("prod.db.table.branch_audit").overwritePartitions()
    

后续继续更新~

列位可以移步博主Apache Iceberg专栏,或许对您理解Iceberg有所帮助😊

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2165905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何在云端使用 Browserless 进行网页抓取?

云浏览器是什么&#xff1f; 云浏览器是一种基于云的组合&#xff0c;它将网页浏览器应用程序与一个虚拟化的容器相结合&#xff0c;实现了远程浏览器隔离的概念。开发人员可以使用流行的工具&#xff08;如 Playwright 和​ Puppeteer​&#xff09;来自动化网页浏览器&#…

repo 查看指定日期内,哪些仓库有修改,具体的修改详情

文章目录 想看指定时间段内仓库中修改了哪些具体的文件&#xff0c;是谁修改的&#xff0c;commit的备注信息等详情只想看某段时间内有更改的仓库的修改详情&#xff0c;其他没有修改的仓库不显示。 想看指定时间段内仓库中修改了哪些具体的文件&#xff0c;是谁修改的&#xf…

VSCode#include头文件时找不到头文件:我的解决方法

0.前言 1.在学习了Linux之后&#xff0c;我平常大部分都使用本地的XShell或者VSCode连接远程云服务器写代码&#xff0c;CentOS的包管理器为我省去了不少繁琐的事情&#xff0c;今天使用vscode打开本地目录想写点代码发现#include头文件后&#xff0c;下方出现了波浪线&#…

SparkSQL-初识

一、概览 Spark SQL and DataFrames - Spark 3.5.2 Documentation 我们先看下官网的描述&#xff1a; SparkSQL是用于结构化数据处理的Spark模块&#xff0c;与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部&a…

C++中vector类的使用

目录 1.vector类常用接口说明 1.1默认成员函数 1.1.1构造函数(constructor) 1.1.2 赋值运算符重载(operator()) 2. vector对象的访问及遍历操作(Iterators and Element access) 3.vector类对象的容量操作(Capacity) 4. vector类对象的修改及相关操作(Modifiers and Stri…

【Java数据结构】 ---对象的比较

乐观学习&#xff0c;乐观生活&#xff0c;才能不断前进啊&#xff01;&#xff01;&#xff01; 我的主页&#xff1a;optimistic_chen 我的专栏&#xff1a;c语言 &#xff0c;Java 欢迎大家访问~ 创作不易&#xff0c;大佬们点赞鼓励下吧~ 前言 上图中&#xff0c;线性表、堆…

[Redis][主从复制][上]详细讲解

目录 0.前言1.配置1.建立复制2.断开复制3.安全性4.只读5.传输延迟 2.拓扑1.一主一从结构2.一主多从结构2.树形主从结构 0.前言 说明&#xff1a;该章节相关操作不需要记忆&#xff0c;理解流程和原理即可&#xff0c;用的时候能自主查到即可主从复制&#xff1f; 分布式系统中…

PyTorch自定义学习率调度器实现指南

在深度学习训练过程中&#xff0c;学习率调度器扮演着至关重要的角色。这主要是因为在训练的不同阶段&#xff0c;模型的学习动态会发生显著变化。 在训练初期&#xff0c;损失函数通常呈现剧烈波动&#xff0c;梯度值较大且不稳定。此阶段的主要目标是在优化空间中快速接近某…

ResNet残差网络:深度学习的里程碑

引言 在深度学习领域&#xff0c;卷积神经网络&#xff08;CNN&#xff09;的发展一直推动着图像识别、目标检测等任务的进步。然而&#xff0c;随着网络层数的增加&#xff0c;传统的CNN面临着梯度消失和梯度爆炸等难题&#xff0c;限制了深层网络的训练效果。为了克服这些挑…

oracle direct path read处理过程

文章目录 缘起处理过程1.AWR Report 分析2.调查direct path read发生的table3.获取sql text4.解释sql并输出执行计划&#xff1a; 结论&#xff1a;补充direct path read等待事件说明 缘起 记录direct path read处理过程 处理过程 1.AWR Report 分析 问题发生时间段awr如下…

FortiGate OSPF动态路由协议配置

1.目的 本文档针对 FortiGate 的 OSPF 动态路由协议说明。OSPF 路由协议是一种 典型的链路状态(Link-state)的路由协议,一般用于同一个路由域内。在这里,路由 域是指一个自治系统,即 AS,它是指一组通过统一的路由政策或路由协议互相交 换路由信息的网络。在这个 AS 中,所有的 …

基于JSP+Servlet+Layui实现的博客系统

> 这是一个使用 Java 和 JSP 开发的博客系统&#xff0c;并使用 Layui 作为前端框架。 > 它包含多种功能&#xff0c;比如文章发布、评论管理、用户管理等。 > 它非常适合作为 Java 初学者的练习项目。 一、项目演示 - 博客首页 - 加载动画 - 右侧搜索框可以输入…

开源服务器管理软件Nexterm

什么是 Nexterm &#xff1f; Nexterm 是一款用于 SSH、VNC 和 RDP 的开源服务器管理软件。 安装 在群晖上以 Docker 方式安装。 在注册表中搜索 nexterm &#xff0c;选择第一个 germannewsmaker/nexterm&#xff0c;版本选择 latest。 本文写作时&#xff0c; latest 版本对…

【STM32】RTT-Studio中HAL库开发教程七:IIC通信--EEPROM存储器FM24C04

文章目录 一、简介二、模拟IIC时序三、读写流程四、完整代码五、测试验证 一、简介 FM24C04D&#xff0c;4K串行EEPROM&#xff1a;内部32页&#xff0c;每个16字节&#xff0c;4K需要一个11位的数据字地址进行随机字寻址。FM24C04D提供4096位串行电可擦除和可编程只读存储器&a…

Excel 设置自动换行

背景 版本&#xff1a;office 专业版 11.0 表格内输入长信息&#xff0c;发现默认状态时未自动换行的&#xff0c;找了很久设置按钮&#xff0c;遂总结成经验帖。 操作 1&#xff09;选中需设置的单元格/区域/行/列。 2&#xff09;点击【开始】下【对齐方式】中的【自动换…

HAproxy,nginx实现七层负载均衡

环境准备&#xff1a; 192.168.88.25 &#xff08;client&#xff09; 192.168.88.26 &#xff08;HAproxy&#xff09; 192.168.88.27 &#xff08;web1&#xff09; 192.168.88.28 (web2) 192.168.88.29 &#xff08;php1&#xff09; 192.168.88.30…

基于微信小程序的教学质量评价系统ssm(lw+演示+源码+运行)

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了基于微信小程序的教学质量评价系统的开发全过程。通过分析基于微信小程序的教学质量评价系统管理的不足&#xff0c;创建了一个计算机管理基于微信小程序的教学…

【Anti-UAV410】论文阅读

摘要 无人机在红外视频中的感知&#xff0c;对于有效反无人机是很重要的。现有的跟踪数据集存在目标大小和环境问题&#xff0c;不能完全表示复杂的逼真场景。因此作者就提出了Anti-UAV410数据集&#xff0c;该数据集总共410个视频和超过438K个标注框。为了应对复杂环境无人机跟…

丹摩智算(damodel)部署stable diffusion实验

名词解释&#xff1a; 丹摩智算&#xff08;damodel&#xff09;&#xff1a;是一款带有RTX4090&#xff0c;Tesla-P40等显卡的公有云服务器。 stable diffusion&#xff1a;是一个大模型&#xff0c;可支持文生图&#xff0c;图生图&#xff0c;文生视频等功能 一.实验目标 …

Linux-TCP重传

问题描述&#xff1a; 应用系统进行切换&#xff0c;包含业务流量切换&#xff08;即TongWeb主备切换&#xff09;和MYSQL数据库主备切换。首先进行流量切换&#xff0c;然后进行数据库主备切换。切换后发现备机TongWeb上有两批次慢请求&#xff0c;第一批慢请求响应时间在133…