数据湖Iceberg-SparkSQL集成(4)

news2024/9/21 10:38:40

文章目录

  • 数据湖Iceberg-SparkSQL集成
    • 一、环境准备
      • 安装Spark
    • 二、Spark配置Catalog
      • 2.1在配置文件中添加HiveCatalog与HadoopCatalog配置(一劳永逸)
      • 2.2使用spark-sql连接Hive Catalog
      • 2.3使用spark-sql连接Hadoop Catalog
    • 三、SQL操作
      • 3.1 创建表
        • 创建分区表
          • 分区表
          • 创建隐藏分区表
          • 使用CTAS(Create Table xxx As Select)语法建表
          • 使用Replace Table建表
      • 3.2删除表
      • 3.3修改表
        • 修改表名(不支持修改HadoopCatalog的表名)
        • 修改表属性
        • 删除表属性
        • 添加列
        • 修改列
          • **修改列名**
          • Alter Column修改类型(只允许安全的转换)
          • Alter Column 修改列的注释
          • Alter Column修改列的顺序
          • Alter Column修改列是否允许为null
        • 删除列
        • 添加分区(spark3,需要在配置文件中扩展)
        • 删除分区(Spark3,需要配置扩展)
        • 修改分区(Spark3,需要配置扩展)
        • 修改表的写入顺序
        • 按分区并行写入
      • 3.4插入数据
        • INSERT INTO
        • MERGE INTO 行级更新
      • 3.5查询数据
        • 普通查询
        • 查询元数据
      • 3.6存储过程
        • 语法
    • 四、DataFrame操作
      • 4.1环境准备
        • 创建Maven工程,pom.xml文件内容
        • 代码中配置Catalog
      • 4.2读取表
        • 加载表
        • 时间旅行:查询指定时间戳之前的数据
        • 时间旅行:查询指定快照 ID 的数据
        • 读取指定两个快照 ID 之间增量数据
      • 4.3检查表
        • 查询表元数据
        • 指定快照ID查询元数据
      • 4.4写入表
        • 创建替换表结构并写入数据
        • append添加数据
        • 动态分区覆盖
        • 静态分区覆盖
      • 4.5维护表
        • 获取Table对象
        • 快照过期清理
        • 删除无效文件
        • 合并小文件
    • 五、整体代码
      • 代码
      • 配置文件
    • 可能遇到的问题
      • 启动spark-sql异常` A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.`

数据湖Iceberg-简介(1)
数据湖Iceberg-存储结构(2)
数据湖Iceberg-Hive集成Iceberg(3)
数据湖Iceberg-SparkSQL集成(4)
数据湖Iceberg-FlinkSQL集成(5)
数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)
数据湖Iceberg-Flink DataFrame集成(7)

数据湖Iceberg-SparkSQL集成

一、环境准备

Spark安装包下载地址:

https://mirrors.huaweicloud.com/apache/spark

iceberg官网:

https://iceberg.apache.org/releases/#110-release

安装Spark

1.Spark与Iceberg的版本对应关系如下

Spark 版本Iceberg 版本
2.40.7.0-incubating – 1.1.0
30.9.0 – 1.0.0
3.10.12.0 – 1.1.0
3.20.13.0 – 1.1.0
3.30.14.0 – 1.1.0

2.上传并解压Spark安装包

tar -zxvf spark-3.2.1-bin-hadoop2.7.tgz 

修改目录名为spark-3.2.1

3.配置环境变量

sudo vim /etc/profile.d/spark.sh

export SPARK_HOME=/opt/spark-3.2.1
export PATH=$PATH:$SPARK_HOME/bin

source /etc/profile.d/spark.sh

4.拷贝iceberg的jar包到Spark的jars目录
iceberg-sparkjar包下载地址:

https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.2_2.12/1.2.0/iceberg-spark-runtime-3.2_2.12-1.2.0.jar

cp iceberg-spark-runtime-3.2_2.12-1.2.0.jar $SPARK_HOME/jars

5.修改spark目录为hdfs用户权限

平时使用hdfs提交任务,不设置一会用hdfs用户启动spark-sql会报错

chown -R hdfs.hdfs spark-3.2.1

二、Spark配置Catalog

Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是Iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。

官方文档:https://iceberg.apache.org/docs/1.1.0/getting-started/

2.1在配置文件中添加HiveCatalog与HadoopCatalog配置(一劳永逸)

修改$spark/conf/spark-defaults.conf配置文件,添加以下内容

spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type=hive
spark.sql.catalog.hive_prod.uri=thrift://bigdata-24-199:9083

spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://bigdata-24-198:8020/warehouse/spark-iceberg

# 添加分区需要使用该配置
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

直接启动spark-sql就可以使用配置的HiveCatalog和HadoopCatalog了

2.2使用spark-sql连接Hive Catalog

cd /opt/spark-3.2.1
spark-sql \
--conf spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_prod.type=hive \
--conf spark.sql.catalog.hive_prod.uri=thrift://bigdata-24-199:9083
配置含义
hive_prod我们要创建Hive catalog名称
spark.sql.catalog.hive_prod.urihive-site.xml配置的hive.metastore.uris值

执行该命令会在之前的当前目录生成一个metastore_db目录和derby.log日志文件

-rw-r--r-- 1 hdfs hadoop  659 Apr 13 16:42 derby.log
drwxr-xr-x 5 hdfs hadoop 4096 Apr 13 16:42 metastore_db

基础使用:

# 使用hive_prod catalog
spark-sql> use hive_prod;
# 进入default数据库
spark-sql> use default;
Time taken: 0.104 seconds
# 创建表
spark-sql> CREATE TABLE hive_prod.default.sample2 (
         >     id bigint COMMENT 'unique id',
         >     data string)
         > USING iceberg;
Time taken: 3.271 seconds
# 查看表,这里可以看到hive中的表
spark-sql> show tables;
sample2
Time taken: 0.729 seconds, Fetched 9 row(s)
# 写入数据
spark-sql> INSERT INTO hive_prod.default.sample2 VALUES(1,'数据1'),(2,'数据2');
Time taken: 6.256 seconds
# 查看数据
spark-sql> select * from  hive_prod.default.sample2;
1       数据1
2       数据2
Time taken: 2.117 seconds, Fetched 2 row(s)

2.3使用spark-sql连接Hadoop Catalog

cd /opt/spark-3.2.1
spark-sql \
--conf spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_prod.type=hadoop \
--conf spark.sql.catalog.hadoop_prod.warehouse=hdfs://bigdata-24-198:8020/warehouse/spark-iceberg

三、SQL操作

后面统一使用在2.1章节配置文件添加配置方式演示

3.1 创建表

# 使用catalog hadoop_prod
use hadoop_prod;
# 创建数据库,默认没有
create database default;
# 进入数据库
use default;
# 创建表
CREATE TABLE hadoop_prod.default.sample1 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg
  • PARTITIONED BY (partition-expressions) :配置分区
  • LOCATION ‘(fully-qualified-uri)’ :指定表路径
  • COMMENT ‘table documentation’ :配置表备注
  • TBLPROPERTIES (‘key’=‘value’, …) :配置表属性

表属性:https://iceberg.apache.org/docs/latest/configuration/

对Iceberg表的每次更改都会生成一个新的元数据文件(json文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。

如果要自动清除元数据文件,在表属性中设置write.metadata.delete-after-commit.enabled=true。这将保留一些元数据文件(直到write.metadata.previous-versions-max),并在每个新创建的元数据文件之后删除旧的元数据文件。

创建分区表

分区表
CREATE TABLE hadoop_prod.default.sample2 (
    id bigint,
    data string,
    category string)
USING iceberg
PARTITIONED BY (category)

看看插入数据数据存储结构如何

INSERT INTO hadoop_prod.default.sample2 VALUES(1,'数据1','分类1'),(2,'数据2','分类1'),(3,'数据3','分类2')

查看数据存储目录

在这里插入图片描述

查看元数据目录

在这里插入图片描述

发现会根据分区指定字段按照目录存储

创建隐藏分区表
CREATE TABLE hadoop_prod.default.sample3 (
    id bigint,
    data string,
    category string,
    ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);

支持的转换有:

  • years(ts):按年划分

  • months(ts):按月划分

  • days(ts)或date(ts):等效于dateint分区

  • hours(ts)或date_hour(ts):等效于dateint和hour分区

  • bucket(N, col):按哈希值划分mod N个桶

  • truncate(L, col):按截断为L的值划分,字符串被截断为给定的长度,整型和长型截断为bin: truncate(10, i)生成分区0,10,20,30,…

看看插入数据存储结构如何

INSERT INTO hadoop_prod.default.sample3 VALUES(1,'数据1','分类1',cast(from_unixtime(1649826749) as timestamp)),
(2,'数据2','分类1',cast(from_unixtime(1681362749) as timestamp)),
(3,'数据3','分类2',cast(from_unixtime(1681449149) as timestamp));

查看数据存储目录

在这里插入图片描述

发现,目录顺序与创建隐藏分区书序相同

使用CTAS(Create Table xxx As Select)语法建表

新表会包含源表数据

CREATE TABLE hadoop_prod.default.sample4
USING iceberg
AS SELECT * from hadoop_prod.default.sample3

注意:使用CTAS方法创建,如果新表不指定分区字段默认创建的表是不带分区字段的。表属性也相同

所以,创建新表时需要带上分区、属性配置:

CREATE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (bucket(8, id), hours(ts), category)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3
使用Replace Table建表

新表会包含源表数据

REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
AS SELECT * from hadoop_prod.default.sample3;

同样使用Replace Table替换表结构也不会带分区字段和表属性

REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3;

如果Replace的表不存在则会报错,需要使用CREATE OR REPLACE TABLE 语法创建即可

分区和表属性同样不会带进去

CREATE OR REPLACE TABLE hadoop_prod.default.sample6
USING iceberg
AS SELECT * from hadoop_prod.default.sample3

3.2删除表

对于HadoopCatalog而言,运行DROP TABLE将从catalog中删除表并删除表内容(整个存储目录均删除了)。

# 创建表
CREATE EXTERNAL TABLE hadoop_prod.default.sample15 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg;

# 添加数据
INSERT INTO hadoop_prod.default.sample15 values(1,'a');
# 删除表
DROP TABLE hadoop_prod.default.sample15;

对于HiveCatalog而言:

  • 在0.14之前,运行DROP TABLE将从catalog中删除表并删除表内容。
  • 从0.14开始,DROP TABLE只会从catalog中删除表,不会删除数据。为了删除表内容,应该使用DROP table PURGE。
CREATE TABLE hive_prod.default.sample16 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg;

INSERT INTO hive_prod.default.sample16 values(1,'a');

添加数据之后,hdfs文件分布

在这里插入图片描述

删除表

DROP TABLE hive_prod.default.sample16;

此时在spark-sql中show tables已经看不到表名了,但是在hdfs中表的数据都存在

删除表和HDFS上的数据

我测试时候HDFS数据也没有删除~

记录一下,后续再验证 hdfs dfs -ls /warehouse/tablespace/managed/hive/sample16/metadata

DROP TABLE hive_prod.default.sample16 PURGE;

3.3修改表

Iceberg在Spark 3中完全支持ALTER TABLE,包括:

  • 重命名表
  • 设置或删除表属性
  • 添加、删除和重命名列
  • 添加、删除和重命名嵌套字段
  • 重新排序顶级列和嵌套结构字段
  • 扩大int、float和decimal字段的类型
  • 将必选列变为可选列

此外,还可以使用SQL扩展来添加对分区演变的支持和设置表的写顺序。

创建一个表做验证使用

hive catalog 表

CREATE TABLE hive_prod.default.sample_alert (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg;

hadoop catalog 表

CREATE TABLE hadoop_prod.default.sample_alert (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg;

修改表名(不支持修改HadoopCatalog的表名)

修改表名后,存储在HDFS的目录名是不变的

ALTER TABLE hive_prod.default.sample_alert RENAME TO hive_prod.default.sample_alert2;

修改表属性

# 修改表属性
ALTER TABLE hive_prod.default.sample_alert2 SET TBLPROPERTIES (
    'read.split.target-size'='268435456'
);
# 添加表描述
ALTER TABLE hive_prod.default.sample_alert2 SET TBLPROPERTIES (
    'comment' = 'A table comment.'
);

删除表属性

ALTER TABLE hive_prod.default.sample_alert2 UNSET TBLPROPERTIES ('read.split.target-size');

添加列

ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMNS (
    category string comment 'new_column'
  )

-- 添加struct类型的列
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN point struct<x: double, y: double>;

-- 往struct类型的列中添加字段
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN point.z double

-- 创建struct的嵌套数组列
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN points array<struct<x: double, y: double>>;

-- 在数组中的结构中添加一个字段。使用关键字'element'访问数组的元素列。
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN points.element.z double

-- 创建一个包含Map类型的列,key和value都为struct类型
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;

-- 在Map类型的value的struct中添加一个字段。
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN pointsm.value.b int

在Spark 2.4.4及以后版本中(只针对Hadoop Catalog可以这么添加),可以通过添加FIRST或AFTER子句在任何位置添加列:

ALTER TABLE hadoop_prod.default.sample_alert
ADD COLUMN new_column1 bigint AFTER id

ALTER TABLE hadoop_prod.default.sample_alert
ADD COLUMN new_column2 bigint FIRST

修改列

修改列名

修改列名后,根据新的列名就可以查询到数据

ALTER TABLE hadoop_prod.default.sample_alert RENAME COLUMN data TO data1;
Alter Column修改类型(只允许安全的转换)
ALTER TABLE hadoop_prod.default.sample_alert
ADD COLUMNS (
    idd int
  );
  
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN idd TYPE bigint;
Alter Column 修改列的注释
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN ee COMMENT 'a';
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN id COMMENT 'b';
Alter Column修改列的顺序
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN id FIRST;

ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN new_column2 AFTER new_column1;
Alter Column修改列是否允许为null
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN id DROP NOT NULL;

ALTER COLUMN不用于更新struct类型。使用ADD COLUMN和DROP COLUMN添加或删除struct类型的字段。

删除列

ALTER TABLE hadoop_prod.default.sample_alert DROP COLUMN idd;
ALTER TABLE hadoop_prod.default.sample_alert DROP COLUMN point.y;

添加分区(spark3,需要在配置文件中扩展)

添加配置文件

spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
CREATE TABLE hadoop_prod.default.sample1 (
    id bigint COMMENT 'unique id',
    category string,
    ts timestamp,
    data string)
USING iceberg;

ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD category;

ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id);
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD truncate(data, 4);
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD years(ts);

ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, category) AS shard;

验证一下修改分区数据存储的目录

1.创建表无分区时添加数据

insert into hadoop_prod.default.sample1 values(1,'1',null,'1');

在这里插入图片描述

2.添加分区字段category后添加数据

insert into hadoop_prod.default.sample1 values(1,'1',null,'1');

发现新创建了一个目录目录下有数据文件,之前的文件也还在,分析:之前的数据不受影响,新数据会按照最新的配置进行

在这里插入图片描述

查询时刚刚写入的两条数据也都有

spark-sql> select * from  hadoop_prod.default.sample1;
1       1       NULL    1
1       1       NULL    1

3.添加分区bucket(16, id)后添加数据

insert into hadoop_prod.default.sample1 values(1,'1',null,'1');

发现和刚刚一样,新数据按照新的分区规则写入,老的数据还是不变,后面就不一一测试了,我们可以确认刚刚分析的结论:之前的数据不受影响,新数据会按照最新的配置进行

在这里插入图片描述

删除分区(Spark3,需要配置扩展)

ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD category;
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD bucket(16, id);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD truncate(data, 4);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD years(ts);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD shard;

注意,尽管删除了分区,但列仍然存在于表结构中。

删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。

当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么覆盖将覆盖每小时划分的分区,而不再覆盖按天划分的分区。

删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。

修改分区(Spark3,需要配置扩展)

ALTER TABLE hadoop_prod.default.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id)

修改表的写入顺序

ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category, id

ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC, id DESC

ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST

表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。

WRITE ORDERED BY设置了一个全局排序,即跨任务的行排序,就像在INSERT命令中使用ORDER BY一样:

INSERT INTO hadoop_prod.default.sample1
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category

要在每个任务内排序,而不是跨任务排序,使用local ORDERED BY:

ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id

按分区并行写入

ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION

ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id

3.4插入数据

测试表

CREATE TABLE hadoop_prod.default.a (
    id bigint,
    count bigint)
USING iceberg;

CREATE TABLE hadoop_prod.default.b (
    id bigint,
count bigint,
flag string)
USING iceberg;

INSERT INTO

INSERT INTO hadoop_prod.default.a VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO hadoop_prod.default.b VALUES (1, 1, 'a'), (2, 2, 'b'), (4, 4, 'd');

MERGE INTO 行级更新

MERGE INTO hadoop_prod.default.a t 
USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
WHEN MATCHED AND u.flag='a' THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count)

语句分析:

表a与表b通过id字段关联,当表b的flag='b’时,修改表a的count值=表a count+表b count;

当表b的flag='a’时,删除表a中的数据;

当匹配不上时,新增该条数据;

执行流程与执行结果

spark-sql> select * from  hadoop_prod.default.a;
1       1
2       2
3       3
Time taken: 0.451 seconds, Fetched 3 row(s)
spark-sql> select * from  hadoop_prod.default.b;
1       1       a
2       2       b
4       4       d
Time taken: 0.542 seconds, Fetched 3 row(s)
spark-sql> MERGE INTO hadoop_prod.default.a t 
         > USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
         > WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
         > WHEN MATCHED AND u.flag='a' THEN DELETE
         > WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count);
Time taken: 4.833 seconds
spark-sql> select * from  hadoop_prod.default.a;
4       4
2       4
3       3
Time taken: 0.281 seconds, Fetched 3 row(s)

3.5查询数据

普通查询

SELECT count(1) as count, data
FROM local.db.table
GROUP BY data

查询元数据

// 查询表快照
SELECT * FROM hadoop_prod.default.a.snapshots

// 查询数据文件信息
SELECT * FROM hadoop_prod.default.a.files

// 查询表历史
SELECT * FROM hadoop_prod.default.a.history

// 查询 manifest
SELECT * FROM hadoop_prod.default.a.manifests

3.6存储过程

Procedures可以通过CALL从任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。

语法

按照参数名传参

CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)

当按位置传递参数时,如果结束参数是可选的,则只有结束参数可以省略。

CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)

四、DataFrame操作

4.1环境准备

创建Maven工程,pom.xml文件内容

  <properties>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.2.1</spark.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <!--            <scope>provided</scope>-->
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <!--            <scope>provided</scope>-->
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <!--            <scope>provided</scope>-->
            <version>${spark.version}</version>
        </dependency>

        <!--fastjson <= 1.2.80 存在安全漏洞,-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-spark-runtime-3.2_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <!-- assembly打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <manifest>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>

            <!--Maven编译scala所需依赖-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

代码中配置Catalog

 var spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName(this.getClass.getSimpleName)
      //指定hive catalog, catalog名称为iceberg_hive
      .config("spark.sql.catalog.iceberg_hive", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.iceberg_hive.type", "hive")
      .config("spark.sql.catalog.iceberg_hive.uri", "thrift://172.16.24.199:9083")
      //    .config("iceberg.engine.hive.enabled", "true")
      //指定hadoop catalog,catalog名称为iceberg_hadoop
      .config("spark.sql.catalog.iceberg_hadoop", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.iceberg_hadoop.type", "hadoop")
      .config("spark.sql.catalog.iceberg_hadoop.warehouse", "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
      .getOrCreate()

4.2读取表

加载表

 /**
   * 使用 Spark SQL 的方式展示指定表格的数据
   *
   * @param hadoopCatalogAndNamespace Hadoop Catalog 和 namespace,格式为 "catalog.namespace"
   * @param tableName                 表格名称
   */
  def showTableData1(hadoopCatalogAndNamespace: String, tableName: String) = {
    // 切换到指定的 namespace
    spark.sql("use " + hadoopCatalogAndNamespace)
    // 获取指定表格的全部数据
    spark.table(tableName)
  }

  /**
   * 使用 Spark SQL 的方式展示指定表格的数据
   *
   * @param hadoopCatalogAndNamespace Hadoop Catalog 和 namespace,格式为 "catalog.namespace"
   * @param tableName                 表格名称
   */
  def showTableData2(hadoopCatalogAndNamespace: String, tableName: String) = {
    // 切换到指定的 namespace
    spark.sql("use " + hadoopCatalogAndNamespace)
    // 获取指定表格的全部数据
    spark.table(tableName)
  }

  /**
   * 使用 Iceberg 的方式展示指定表格的数据
   *
   * @param hadoopNamespace Hadoop 的 namespace
   * @param tableName       表格名称
   */
  def showTableData3(hadoopNamespace: String, tableName: String) = {
    // 拼接表格在 HDFS 上的路径
    val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
    // 通过 Iceberg 的方式获取指定表格的全部数据
    spark.read
      .format("iceberg")
      .load(tableHDFS)
  }

时间旅行:查询指定时间戳之前的数据

  /**
   * 查询指定时间戳之前的数据
   *
   * @param hadoopNamespace Hadoop 的 namespace
   * @param tableName       表格名称
   * @return DataFrame 对象
   */
  def queryDataBeforeTimestamp(hadoopNamespace: String, tableName: String) = {
    // 拼接表格在 HDFS 上的路径
    val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
    // 使用 Iceberg 的方式获取指定时间戳之前的数据
    spark.read
      .option("as-of-timestamp", "1681804793000")
      .format("iceberg")
      .load(tableHDFS)
  }

时间旅行:查询指定快照 ID 的数据

  /**
   * 查询指定快照 ID 的数据
   *
   * @param hadoopNamespace Hadoop 的名称空间
   * @param tableName       表的名称
   * @return DataFrame 对象
   */
  def querySnapshotData(hadoopNamespace: String, tableName: String) = {
    // 构建表在 HDFS 上的路径
    val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
    // 使用 Iceberg 的格式加载指定快照 ID 的数据,并返回 DataFrame 对象
    spark.read
      .option("snapshot-id", 1150221491570194004L)
      .format("iceberg")
      .load(tableHDFS)
  }

读取指定两个快照 ID 之间增量数据

查询的表只能是append的方式写数据,不支持replace, overwrite, delete操作。

/**
   * 读取指定两个快照 ID 之间增量数据并返回 DataFrame 对象
   *
   * @param hadoopNamespace Hadoop 的名称空间
   * @param tableName       表的名称
   * @return DataFrame 对象
   */
  def getTwoSnapshotsIncrementalData(hadoopNamespace: String, tableName: String): DataFrame = {
    // 构建表在 HDFS 上的路径
    val tableHDFS = s"hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/$hadoopNamespace/$tableName"
    // 使用 Iceberg 的格式加载指定两个快照 ID 之间的增量数据,并返回 DataFrame 对象
    spark.read
      .format("iceberg")
      .option("start-snapshot-id", 4809403494114685238L)
      .option("end-snapshot-id", 1150221491570194004L)
      .load(tableHDFS)
  }

4.3检查表

查询表元数据

 /**
   * 加载指定快照 ID 下的 Iceberg 表元数据并返回 DataFrame
   *
   * @return DataFrame 对象
   */
  def loadSnapshotDataFiles() = {
    spark.read.format("iceberg")
      .option("snapshot-id", 4809403494114685238L)
      .load("iceberg_hadoop.default.sample1.files")
  }

  /**
   * 加载指定 Iceberg 表的元数据并返回 DataFrame
   *
   * @return DataFrame 对象
   */
  def loadIcebergTableFiles() = {
    spark.read.format("iceberg").load("hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/default/sample1#files")
  }

指定快照ID查询元数据

 /**
   * 加载指定快照 ID 下的 Iceberg 表元数据并返回 DataFrame
   *
   * @return DataFrame 对象
   */
  def loadSnapshotDataFiles() = {
    spark.read.format("iceberg")
      .option("snapshot-id", 4809403494114685238L)
      .load("iceberg_hadoop.default.sample1.files")
  }

4.4写入表

创建替换表结构并写入数据

// 创建或替换表
def createOrReplaceTable(tableName: String) = {
  val spark = this.spark // 获取 SparkSession 对象
  // 创建包含数据的 DataFrame 对象。
  val dataFrame: DataFrame = spark.createDataFrame(
    Seq(Sample1(1, new Date().toLocaleString, "a"),
      Sample1(2, new Date().toLocaleString, "b"),
      Sample1(3, new Date().toLocaleString, "c")))
  import spark.implicits._
  dataFrame.writeTo(tableName) // 写入表
    .tableProperty("write.format.default", "orc") // 表设置
    //$ 符号是 Scala 语言中用于将对象转换为列的操作符,例如 $"category" 就表示将字符串 "category" 转换为 DataFrame 中的列
    .partitionedBy($"category") // 分区
    .createOrReplace() // 创建或替换表
}

append添加数据

/**
   * 向指定的 Iceberg 表追加 Sample1 类型的数据。
   *
   * @param tableName 要追加数据的表名称。
   */
  def appendData(tableName: String) = {
    spark.sql("select count(1) from " + tableName).show()
    // 创建包含数据的 DataFrame 对象。
    val dataFrame: DataFrame = spark.createDataFrame(
      Seq(Sample1(1, new Date().toLocaleString, "a"),
        Sample1(2, new Date().toLocaleString, "b"),
        Sample1(3, new Date().toLocaleString, "c")))
    // 将 DataFrame 持久化到指定表,并调用 append() 方法将数据追加到指定表中。
    dataFrame.writeTo(tableName).append()
    spark.sql("select count(1) from " + tableName).show()
  }

动态分区覆盖

写入的数据,根据分区字段,自动覆盖写入,数据不存在的分区就不覆盖了

	// 覆盖指定表的分区数据
  def overwritePartitionsData(tableName: String) = {
    // 使用 SQL 查询统计表的记录数并在控制台输出结果
    spark.sql("select count(1) from " + tableName).show()
    // 创建包含数据的 DataFrame 对象。
    val dataFrame: DataFrame = spark.createDataFrame(
      Seq(Sample1(1, new Date().toLocaleString, "a"),
        Sample1(2, new Date().toLocaleString, "b")))
    // 将 DataFrame 持久化到指定表,并调用 append() 方法将数据追加到指定表中。
    dataFrame.writeTo(tableName).overwritePartitions()
    // 使用 SQL 查询统计表的记录数并在控制台输出结果
    spark.sql("select count(1) from " + tableName).show()
  }

静态分区覆盖

 // 覆盖指定表的指定分区数据,其他分区直接追加
  def overwritePartitionsData2(tableName: String) = {
    // 获取 SparkSession 对象并为其创建一个变量 spark。
    val spark: SparkSession = this.spark
    // 使用 SQL 查询统计表的记录数并在控制台输出结果。
    spark.sql("select count(1) from " + tableName).show()
    import spark.implicits._
    val dataFrame: DataFrame = spark.createDataFrame(
      Seq(Sample1(5, new Date().toLocaleString, "a"),
        Sample1(6, new Date().toLocaleString, "b")))
    // 将指定 DataFrame 对象的数据覆盖到 tableName 表中,其中仅覆盖 category 字段等于 a 的分区。
    dataFrame.writeTo(tableName).overwrite($"category" === "a")
    // 使用 SQL 查询统计表的记录数并在控制台输出结果。
    spark.sql("select count(1) from " + tableName).show()
  }

4.5维护表

获取Table对象

  • HadoopCatalog
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;

val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://hadoop1:8020/warehouse/spark-iceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("db","table1"))
  • HiveCatalog
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;

val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)

val properties = new util.HashMap[String,String]()
properties.put("warehouse", "hdfs://hadoop1:8020/warehouse/spark-iceberg")
properties.put("uri", "thrift://hadoop1:9083")

catalog.initialize("hive", properties)
val table: Table = catalog.loadTable(TableIdentifier.of("db", "table1"))

快照过期清理

每次写入Iceberg表都会创建一个表的新快照或版本。快照可以用于时间旅行查询,或者可以将表回滚到任何有效的快照。建议设置快照过期时间,过期的旧快照将从元数据中删除(不再可用于时间旅行查询)。

/**
   * 对样例表进行快照过期清理
   */
  def snapshotOverdueClean() = {
    // 创建 Hadoop Configuration 对象
    val conf = new Configuration()
    // 基于 Hadoop Catalog API 加载表格
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
    val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
    // 设置过期时间(这里设置7天前的时间)
    val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
    // 过期快照并提交更改
    table.expireSnapshots()
      .expireOlderThan(tsToExpire)
      .commit()
  }

或使用SparkActions来设置过期:

 /**
   * 对样例表进行快照过期清理2
   */
  def snapshotOverdueClean2() = {
    // 创建 Hadoop Configuration 对象
    val conf = new Configuration()
    // 基于 Hadoop Catalog API 加载表格
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
    val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
    // 设置过期时间(这里设置7天前的时间)
    val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
    SparkActions.get()
      .expireSnapshots(table)
      .expireOlderThan(tsToExpire)
      .execute()
  }

删除无效文件

在Spark和其他分布式处理引擎中,任务或作业失败可能会留下未被表元数据引用的文件,在某些情况下,正常的快照过期可能无法确定不再需要并删除该文件。

/**
   * 删除无效文件
   */
  def deleteInvalidFile() = {
    // 创建 Hadoop Configuration 对象
    val conf = new Configuration()
    // 基于 Hadoop Catalog API 加载表格
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
    val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
    SparkActions
      .get()
      .deleteOrphanFiles(table)
      .execute()
  }

合并小文件

数据文件过多会导致更多的元数据存储在清单文件中,而较小的数据文件会导致不必要的元数据量和更低效率的文件打开成本。


  /**
   * 合并小文件
   */
  def mergeSmallFiles() = {
    val conf = new Configuration()
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")

    // 加载表对象
    val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))

    SparkActions
      .get() //使用SparkActions获取操作对象实例
      .rewriteDataFiles(table) //将表中的数据重写成查询引擎可读的格式
      .filter(Expressions.equal("category", "a")) //使用Iceberg表达式引擎过滤category列等于a的记录
      .option("target-file-size-bytes", 1024L.toString) //设置目标文件大小为1KB
      .execute() //执行合并小文件操作并生成新版本
  }

五、整体代码

代码

package wiki.hadoop.iceberg.spark

import java.util.Date

import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.Table
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.spark.actions.SparkActions
import org.apache.spark.sql.{DataFrame, SparkSession}
import wiki.hadoop.iceberg.spark.Example.Sample1

/** *
 *
 * @date 2023/4/19 下午3:51
 * @description
 * 1. descTableDesc(tableName: String): 显示指定表的元数据信息。
 * 2. createOrReplaceTable(tableName: String): 创建表,如果表存在则直接Replace表结构。
 * 3. appendData(tableName: String): 向指定的 Iceberg 表追加 Sample1 类型的数据。
 * 4. tableOrNameSpaceExists(): 判断表和数据库是否存在。
 * 5. loadSnapshotDataFiles(): 加载指定快照 ID 下的 Iceberg 表元数据并返回 DataFrame。
 * 6. loadIcebergTableFiles(fullTableName: String): 加载指定 Iceberg 表的元数据并返回 DataFrame。
 * 7. getTwoSnapshotsIncrementalData(hadoopNamespace: String, tableName: String): 读取指定两个快照 ID 之间增量数据并返回 DataFrame 对象。
 * 8. querySnapshotData(hadoopNamespace: String, tableName: String): 查询指定快照 ID 的数据。
 * 9. queryDataBeforeTimestamp(hadoopNamespace: String, tableName: String): 查询指定时间戳之前的数据。
 * 10. showTableData1(hadoopCatalogAndNamespace: String, tableName: String): 使用 Spark SQL 的方式展示指定表格的数据。
 * 11. showTableData2(hadoopCatalogAndNamespace: String, tableName: String): 使用 Spark SQL 的方式展示指定表格的数据。
 * 12. showTableData3(hadoopNamespace: String, tableName: String): 使用 Iceberg 的方式展示指定表格的数据。
 * 13. printDataFrame(dataFrame: DataFrame): 打印 DataFrame 中的数据,格式为每行数据按列名分割,行与行之间使用分隔符分割。
 * 14. showTables(): 查看指定库的所有表。
 * 15. createTable(catalogAndNamespace: String, tableName: String): 创建表。
 * 16. createDatabase(catalogAndNamespace: String): 如果不存在创建数据库。
 * @author Jast
 */
object HadoopIcebergExample {

  //指定一个特定的 Hadoop 用户运行应用程序
  System.setProperty("HADOOP_USER_NAME", "hdfs")

  //样例类,表结构
  case class HadoopIcebergSample(id: Long, data: String, category: String)

  var spark: SparkSession = _

  def main(args: Array[String]): Unit = {
    spark = SparkSession.builder()
      .master("local")
      .appName(this.getClass.getSimpleName)
      //指定hive catalog, catalog名称为iceberg_hive
      .config("spark.sql.catalog.iceberg_hive", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.iceberg_hive.type", "hive")
      .config("spark.sql.catalog.iceberg_hive.uri", "thrift://172.16.24.199:9083")
      //    .config("iceberg.engine.hive.enabled", "true")
      //指定hadoop catalog,catalog名称为iceberg_hadoop
      .config("spark.sql.catalog.iceberg_hadoop", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.iceberg_hadoop.type", "hadoop")
      .config("spark.sql.catalog.iceberg_hadoop.warehouse", "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
      .getOrCreate()


    //
    val hadoopCatalog = "iceberg_hadoop"
    val hadoopNamespace = "default"
    val hadoopCatalogAndNamespace = hadoopCatalog + "." + hadoopNamespace
    val tableName = "sample1"
    val fullTableName = hadoopCatalogAndNamespace + "." + tableName

    // 创建 namespace
    //        createDatabase(hadoopCatalogAndNamespace)

    // 创建表
    //        createTable(hadoopCatalogAndNamespace, tableName)

    // 查看所有表
    //    showTables()

    // 查看表数据
    //    printDataFrame(showTableData1(hadoopCatalogAndNamespace, tableName))
    //    printDataFrame(showTableData2(hadoopCatalogAndNamespace, tableName))
    //    printDataFrame(showTableData3(hadoopNamespace, tableName))

    // 查询指定时间戳之前的数据
    //    printDataFrame(queryDataBeforeTimestamp(hadoopNamespace, tableName))

    // 查询指定快照 ID 的数据
    //    printDataFrame(querySnapshotData(hadoopNamespace, tableName))

    // 读取指定两个快照 ID 之间增量数据
    //    printDataFrame(getTwoSnapshotsIncrementalData(hadoopNamespace, tableName))

    // 加载指定 Iceberg 表的元数据
    //    printDataFrame(loadIcebergTableFiles(fullTableName))
    //    printDataFrame(loadIcebergTableFiles)

    // 加载指定快照 ID 下的 Iceberg 表元数据
    //    printDataFrame(loadSnapshotDataFiles)

    // 判断表和数据库是否存在
    //    tableOrNameSpaceExists()

    // 显示指定表的元数据信息
    //    descTableDesc(fullTableName)

    // 创建表,如果表存在则直接Replace表结构
    //    createOrReplaceTable(fullTableName)

    // append添加数据
    //    appendData(fullTableName)

    //覆盖指定表的分区数据,根据分区字段动态覆盖
    overwritePartitionsData(fullTableName)
    // 手动指定分区进行覆盖
    overwritePartitionsData2(fullTableName)

    // 快照过期清理
    //    snapshotOverdueClean
    //    snapshotOverdueClean2

    // 合并小文件
    //    mergeSmallFiles
  }

  /**
   * 显示指定表的元数据信息。
   *
   * @param tableName 要查看元数据的表名称。
   */
  def descTableDesc(tableName: String) = {
    // 使用 Spark SQL 调用 desc formatted 命令来查看表的元数据信息,并使用 show() 方法显示结果。
    spark.sql(s"desc formatted $tableName").show(true)
  }


  // 创建或替换表
  def createOrReplaceTable(tableName: String) = {
    val spark = this.spark // 获取 SparkSession 对象
    // 创建包含数据的 DataFrame 对象。
    val dataFrame: DataFrame = spark.createDataFrame(
      Seq(Sample1(1, new Date().toLocaleString, "a"),
        Sample1(2, new Date().toLocaleString, "b"),
        Sample1(3, new Date().toLocaleString, "c")))
    import spark.implicits._
    dataFrame.writeTo(tableName) // 写入表
      .tableProperty("write.format.default", "orc") // 表设置
      //$ 符号是 Scala 语言中用于将对象转换为列的操作符,例如 $"category" 就表示将字符串 "category" 转换为 DataFrame 中的列
      .partitionedBy($"category") // 分区
      .createOrReplace() // 创建或替换表
  }


  /**
   * 向指定的 Iceberg 表追加 Sample1 类型的数据。
   *
   * @param tableName 要追加数据的表名称。
   */
  def appendData(tableName: String) = {
    spark.sql("select count(1) from " + tableName).show()
    // 创建包含数据的 DataFrame 对象。
    val dataFrame: DataFrame = spark.createDataFrame(
      Seq(Sample1(1, new Date().toLocaleString, "a"),
        Sample1(2, new Date().toLocaleString, "b"),
        Sample1(3, new Date().toLocaleString, "c")))
    // 将 DataFrame 持久化到指定表,并调用 append() 方法将数据追加到指定表中。
    dataFrame.writeTo(tableName).append()
    spark.sql("select count(1) from " + tableName).show()
  }

  // 覆盖指定表的分区数据,写入的数据,根据分区字段,自动覆盖写入,数据不存在的分区就不覆盖了
  def overwritePartitionsData(tableName: String) = {
    // 使用 SQL 查询统计表的记录数并在控制台输出结果
    spark.sql("select count(1) from " + tableName).show()
    // 创建包含数据的 DataFrame 对象。
    val dataFrame: DataFrame = spark.createDataFrame(
      Seq(Sample1(5, new Date().toLocaleString, "a"),
        Sample1(6, new Date().toLocaleString, "b")))
    // 将 DataFrame 持久化到指定表,并调用 append() 方法将数据追加到指定表中。
    dataFrame.writeTo(tableName).overwritePartitions()
    // 使用 SQL 查询统计表的记录数并在控制台输出结果
    spark.sql("select count(1) from " + tableName).show()
  }

  // 覆盖指定表的指定分区数据,其他分区直接追加
  def overwritePartitionsData2(tableName: String) = {
    // 获取 SparkSession 对象并为其创建一个变量 spark。
    val spark: SparkSession = this.spark
    // 使用 SQL 查询统计表的记录数并在控制台输出结果。
    spark.sql("select count(1) from " + tableName).show()
    import spark.implicits._
    val dataFrame: DataFrame = spark.createDataFrame(
      Seq(Sample1(5, new Date().toLocaleString, "a"),
        Sample1(6, new Date().toLocaleString, "b")))
    // 将指定 DataFrame 对象的数据覆盖到 tableName 表中,其中仅覆盖 category 字段等于 a 的分区。
    dataFrame.writeTo(tableName).overwrite($"category" === "a")
    // 使用 SQL 查询统计表的记录数并在控制台输出结果。
    spark.sql("select count(1) from " + tableName).show()
  }


  /**
   * 判断表是否存在
   */
  def tableOrNameSpaceExists() = {
    val conf = new Configuration()
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
    println("表是否存在:" + catalog.tableExists(TableIdentifier.of("default", "sample1")))
    println("表是否存在:" + catalog.tableExists(TableIdentifier.of("default", "sample2")))
    println("数据库是否存在:" + catalog.namespaceExists(Namespace.of("default")))
    println("数据库是否存在:" + catalog.namespaceExists(Namespace.of("default8")))
  }

  /**
   * 对样例表进行快照过期清理
   */
  def snapshotOverdueClean() = {
    // 创建 Hadoop Configuration 对象
    val conf = new Configuration()
    // 基于 Hadoop Catalog API 加载表格
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
    val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
    // 设置过期时间(这里设置7天前的时间)
    val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
    // 过期快照并提交更改
    table.expireSnapshots()
      .expireOlderThan(tsToExpire)
      .commit()
  }

  /**
   * 对样例表进行快照过期清理2
   */
  def snapshotOverdueClean2() = {
    // 创建 Hadoop Configuration 对象
    val conf = new Configuration()
    // 基于 Hadoop Catalog API 加载表格
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
    val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
    // 设置过期时间(这里设置7天前的时间)
    val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
    SparkActions.get()
      .expireSnapshots(table)
      .expireOlderThan(tsToExpire)
      .execute()
  }

  /**
   * 删除无效文件
   */
  def deleteInvalidFile() = {
    // 创建 Hadoop Configuration 对象
    val conf = new Configuration()
    // 基于 Hadoop Catalog API 加载表格
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
    val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
    SparkActions
      .get()
      .deleteOrphanFiles(table)
      .execute()
  }

  /**
   * 合并小文件
   */
  def mergeSmallFiles() = {
    val conf = new Configuration()
    val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")

    // 加载表对象
    val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))

    SparkActions
      .get() //使用SparkActions获取操作对象实例
      .rewriteDataFiles(table) //将表中的数据重写成查询引擎可读的格式
      .filter(Expressions.equal("category", "a")) //使用Iceberg表达式引擎过滤category列等于a的记录
      .option("target-file-size-bytes", 1024L.toString) //设置目标文件大小为1KB
      .execute() //执行合并小文件操作并生成新版本
  }


  /**
   * 加载指定快照 ID 下的 Iceberg 表元数据并返回 DataFrame
   *
   * @return DataFrame 对象
   */
  def loadSnapshotDataFiles() = {
    spark.read.format("iceberg")
      .option("snapshot-id", 4809403494114685238L)
      .load("iceberg_hadoop.default.sample1.files")
  }

  /**
   * 加载指定 Iceberg 表的元数据并返回 DataFrame
   *
   * @param fullTableName 完整的表名,包括数据库名称和表名称
   * @return DataFrame 对象
   */
  def loadIcebergTableFiles(fullTableName: String) = {
    spark.read.format("iceberg").load(fullTableName + ".files")
  }

  /**
   * 加载指定 Iceberg 表的元数据并返回 DataFrame
   *
   * @return DataFrame 对象
   */
  def loadIcebergTableFiles() = {
    spark.read.format("iceberg").load("hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/default/sample1#files")
  }

  /**
   * 读取指定两个快照 ID 之间增量数据并返回 DataFrame 对象
   *
   * @param hadoopNamespace Hadoop 的名称空间
   * @param tableName       表的名称
   * @return DataFrame 对象
   */
  def getTwoSnapshotsIncrementalData(hadoopNamespace: String, tableName: String): DataFrame = {
    // 构建表在 HDFS 上的路径
    val tableHDFS = s"hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/$hadoopNamespace/$tableName"
    // 使用 Iceberg 的格式加载指定两个快照 ID 之间的增量数据,并返回 DataFrame 对象
    spark.read
      .format("iceberg")
      .option("start-snapshot-id", 4809403494114685238L)
      .option("end-snapshot-id", 1150221491570194004L)
      .load(tableHDFS)
  }


  /**
   * 查询指定快照 ID 的数据
   *
   * @param hadoopNamespace Hadoop 的名称空间
   * @param tableName       表的名称
   * @return DataFrame 对象
   */
  def querySnapshotData(hadoopNamespace: String, tableName: String) = {
    // 构建表在 HDFS 上的路径
    val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
    // 使用 Iceberg 的格式加载指定快照 ID 的数据,并返回 DataFrame 对象
    spark.read
      .option("snapshot-id", 1150221491570194004L)
      .format("iceberg")
      .load(tableHDFS)
  }


  /**
   * 查询指定时间戳之前的数据
   *
   * @param hadoopNamespace Hadoop 的 namespace
   * @param tableName       表格名称
   * @return DataFrame 对象
   */
  def queryDataBeforeTimestamp(hadoopNamespace: String, tableName: String) = {
    // 拼接表格在 HDFS 上的路径
    val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
    // 使用 Iceberg 的方式获取指定时间戳之前的数据
    spark.read
      .option("as-of-timestamp", "1681804793000")
      .format("iceberg")
      .load(tableHDFS)
  }


  /**
   * 使用 Spark SQL 的方式展示指定表格的数据
   *
   * @param hadoopCatalogAndNamespace Hadoop Catalog 和 namespace,格式为 "catalog.namespace"
   * @param tableName                 表格名称
   */
  def showTableData1(hadoopCatalogAndNamespace: String, tableName: String) = {
    // 切换到指定的 namespace
    spark.sql("use " + hadoopCatalogAndNamespace)
    // 获取指定表格的全部数据
    spark.table(tableName)
  }

  /**
   * 使用 Spark SQL 的方式展示指定表格的数据
   *
   * @param hadoopCatalogAndNamespace Hadoop Catalog 和 namespace,格式为 "catalog.namespace"
   * @param tableName                 表格名称
   */
  def showTableData2(hadoopCatalogAndNamespace: String, tableName: String) = {
    // 切换到指定的 namespace
    spark.sql("use " + hadoopCatalogAndNamespace)
    // 获取指定表格的全部数据
    spark.table(tableName)
  }

  /**
   * 使用 Iceberg 的方式展示指定表格的数据
   *
   * @param hadoopNamespace Hadoop 的 namespace
   * @param tableName       表格名称
   */
  def showTableData3(hadoopNamespace: String, tableName: String) = {
    // 拼接表格在 HDFS 上的路径
    val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
    // 通过 Iceberg 的方式获取指定表格的全部数据
    spark.read
      .format("iceberg")
      .load(tableHDFS)
  }


  /**
   * 打印 DataFrame 中的数据,格式为每行数据按列名分割,行与行之间使用分隔符分割
   *
   * @param dataFrame DataFrame 对象
   */
  def printDataFrame(dataFrame: DataFrame): Unit = {
    // 获取 DataFrame 的 schema 信息
    val schema = dataFrame.schema.fields
    // 遍历 DataFrame 中的每一行数据,使用 "getAs()" 方法获取每一列的值
    dataFrame.foreach(row => {
      for (i <- 0 until schema.length) {
        println(schema(i).name + ":" + row.getAs(schema(i).name))
      }
      // 使用分隔符分割不同行
      println("----------------------------------------------")
    })
  }


  /**
   * 查看指定库的所有表
   */
  def showTables(): Unit = {
    spark.sql("use iceberg_hadoop.default")
    spark.sql("show tables").show(false)
  }

  /**
   * 创建表
   *
   * @param catalogAndNamespace
   * @param tableName
   */
  def createTable(catalogAndNamespace: String, tableName: String): Unit = {
    spark.sql("CREATE TABLE if not exists " + catalogAndNamespace + "." + tableName + " ( id bigint COMMENT 'unique id', data string) USING iceberg;")
  }

  /**
   * 如果不存在创建数据库
   */
  def createDatabase(catalogAndNamespace: String): Unit = {
    spark.sql("create database if not exists " + catalogAndNamespace)
  }


}

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark-iceberg</artifactId>

    <properties>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.2.1</spark.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <!--            <scope>provided</scope>-->
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <!--            <scope>provided</scope>-->
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <!--            <scope>provided</scope>-->
            <version>${spark.version}</version>
        </dependency>

        <!--fastjson <= 1.2.80 存在安全漏洞,-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-spark-runtime-3.2_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <!-- assembly打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <manifest>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>

            <!--Maven编译scala所需依赖-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

可能遇到的问题

启动spark-sql异常 A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.

完整内容:

23/04/13 15:36:51 ERROR PoolWatchThread: Error in trying to obtain a connection. Retrying in 7000ms
java.sql.SQLException: A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.
        at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
        at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
        at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source)
        at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source)
        at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
        at org.apache.derby.impl.jdbc.EmbedConnection.setReadOnly(Unknown Source)
        at com.jolbox.bonecp.ConnectionHandle.setReadOnly(ConnectionHandle.java:1324)
        at com.jolbox.bonecp.ConnectionHandle.<init>(ConnectionHandle.java:262)
        at com.jolbox.bonecp.PoolWatchThread.fillConnections(PoolWatchThread.java:115)
        at com.jolbox.bonecp.PoolWatchThread.run(PoolWatchThread.java:82)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: ERROR 25505: A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.
        at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
        at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
        at org.apache.derby.impl.sql.conn.GenericAuthorizer.setReadOnlyConnection(Unknown Source)
        at org.apache.derby.impl.sql.conn.GenericLanguageConnectionContext.setReadOnly(Unknown Source)

解决方法:

删除当前执行所在目录的metastore_db/db.lck文件

show current namespace;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/459342.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一个 24 通道 100Msps 逻辑分析仪

这是一个创建非常便宜的逻辑分析仪的项目&#xff0c;但其功能可与昂贵的商业分析仪相媲美。该分析仪可以以每秒 1 亿个样本的最高速度对多达 24 个通道进行采样&#xff0c;并且可以通过单个通道中的极性变化或多达 16 个通道形成的模式来触发。 该项目不仅包含硬件&#xff0…

去银行还是干嵌入式?

晚上要睡觉的时候&#xff0c;一个读者给我发消息 说是最近拿到了4个offer&#xff0c;现在犹豫不决 听说&#xff0c;最近嵌入式突然就火起来了。 不过&#xff0c;嵌入式很多人的薪资还是低了&#xff0c;而且&#xff0c;工作很多年后&#xff0c;嵌入式的工作&#xff0c;那…

蛋白冠™蛋白质组学技术实现快速深入精确地解析血浆蛋白质图谱

文章标题&#xff1a;Rapid, deep and precise profiling of the plasma proteome with multi-nanoparticle protein corona 发表期刊&#xff1a;Nature Communications 影响因子&#xff1a;17.694 作者单位&#xff1a;哈佛医学院&#xff1b;Seer&#xff0c;美国&#…

opencv (六十四)监督学习聚类(k近邻原理、支持向量机原理、k近邻(KNN)手写字识别、支持向量机数据分类)

文章目录 1 k近邻原理介绍2 支持向量机原理3 K近邻(KNN)手写字识别训练模型4 手写字识别5 支持向量机 进行数据分类6 源代码及数据文件下载1 k近邻原理介绍 k最近邻(k-Nearest Neighbor)算法是比较简单的机器学习算法。它采用测量不同特征值之间的距离方法进行分类。它的思想…

算法刷题知识点总结

因为数组的在内存空间的地址是连续的&#xff0c;所以我们在删除或者增添元素的时候&#xff0c;就难免要移动其他元素的地址。 二分法&#xff1a;采用两个指针&#xff0c;注意他们的区间划分&#xff1b; 双指针法&#xff0c;用于查找排序&#xff1a;双指针将一个两层循…

BUUCTF warmup_csaw_2016

小白垃圾做题笔记而已&#xff0c;不建议阅读。 唉&#xff0c;本来以为是让写shellcode的打了半天没打通&#xff0c;后来发现疏忽了sub_40060D函数。 前两行(6,7)没啥就是把那个字符串写到屏幕上。 然后是第八行&#xff0c; sprintf(): 这个函数用于将格式化的字符串写入…

Linux安装MongoDB数据库,实现外网远程连接访问

文章目录 1. 配置Mongodb源2. 安装MongoDB3. 局域网连接测试4. 安装cpolar内网穿透5. 配置公网访问地址6. 公网远程连接7. 固定连接公网地址8. 使用固定地址连接 简单几步实现Linux安装mongoDB数据库并且结合cpolar内网穿透实现在公网环境下的远程连接。 1. 配置Mongodb源 进…

聊聊开源类ChatGPT工作——MOSS

自从ChatGPT发布以来&#xff0c;它的“三步走方案”就好比《九阴真经》流落到AI江湖中&#xff0c;各大门派练法不一&#xff0c;有人像郭靖一样正着练&#xff0c;循序渐进&#xff1b;有人像欧阳锋一样反着练&#xff0c;守正出奇&#xff1b;也有像梅超风一样仅练就半部《九…

5、认真学习枚举类型

1、数字枚举 // 这里你的TSLint可能会报一个&#xff1a;枚举声明只能与命名空间或其他枚举声明合并。这样的错误&#xff0c;这个不影响编译 enum Status {Uploading,Success,Failed } console.log(Status.Uploading); // 0 console.log(Status["Success"]); // 1 …

智能骨传导蓝牙耳机该如何选,分享几款不错的骨传导蓝牙耳机

骨传导耳机是一种通过骨骼传递声音的耳机。相比于传统的耳塞和头戴式耳机&#xff0c;它有许多优点&#xff0c;例如&#xff1a; 1.安全。因为无需通过耳膜进行声音传递&#xff0c;所以对听力影响较小。 2.对耳朵没有伤害。 3.舒适。 4.节省时间。由于无需通过耳膜传递声音&a…

Codefi基于区块链的开发框架

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Codefi基于区块链的开发框架 Codefi技术是一种基于区块链的开发框架&#xff0c;它提供了一系列工具和服务&#xff0c;帮助开发者轻松构建和管理去中心化应用程序。C…

【Access】win 10 / win 11:Access 下载、安装、使用教程(「管理信息系统」实践专用软件)

目录 一、前言 二、卸载 Office 三、下载 Office Tool Plus 四、安装 Office&#xff08;内含 Access&#xff09; &#xff08;1&#xff09;启动 Office Tool Plus &#xff08;2&#xff09;部署 &#xff08;3&#xff09;安装 Office&#xff08;内含 Access&#…

C++STL详解(10) -- 使用哈希表封装unordered_set和unordered_map

文章目录 哈希表模板参数改造针对模板参数V改造增加仿函数获取具体数据类型. 哈希表的正向迭代器正向迭代器中的内置成员:正向迭代器的成员函数 哈希表插入函数的修改(适用于unordered_map)一个类型K去做set和unordered_set他的模板参数的必备条件.unordered_set的模拟实现(完整…

看完这篇文章你就彻底懂啦{保姆级讲解}-----(LeetCode刷题242有效的字母异位词) 2023.4.25

目录 前言算法题&#xff08;LeetCode刷题242有效的字母异位词&#xff09;—&#xff08;保姆级别讲解&#xff09;分析题目&#xff1a;有效的字母异位词代码&#xff1a;算法思想&#xff1a; 结束语 前言 本文章一部分内容参考于《代码随想录》----如有侵权请联系作者删除…

详解js跨页面传参以及API的解释

详解js跨页面传参 前言什么是跨页面传参&#xff1f;跨页面传参本质是什么&#xff1f;常见的跨页面传参方法URL参数传递localStorage和sessionStorage参数传递Cookie传递 经常听到API&#xff0c;那么到底的什么是API&#xff1f; 前几天有粉丝私信我&#xff0c;希望能把js跨…

超越YOLOv8,飞桨推出精度最高的实时检测器RT-DETR!

‍‍ 众所周知&#xff0c;实时目标检测( Real-Time Object Detection )一直由 YOLO 系列模型主导。 飞桨在去年 3 月份推出了高精度通用目标检测模型 PP-YOLOE &#xff0c;同年在 PP-YOLOE 的基础上提出了 PP-YOLOE 。后者在训练收敛速度、下游任务泛化能力以及高性能部署能力…

搞懂 API ,地图 API 制作方法分享

地图 API 是一种基于 Web 开发的应用程序编程接口&#xff0c;可以用于创建和展示地图及地理信息。以下是一些地图 API 制作的方法&#xff1a; 选择地图 API 平台&#xff1a;目前市场上有很多地图 API 平台供选择&#xff0c;比如 Google Maps API、百度地图 API、高德地图 A…

Chess.com:象棋社区网站每月访问量达 2.8 亿,年收入在 5000 万至 1 亿之间

Chess.com是世界领先的国际象棋社区。它始于 2007 年&#xff0c;目前年收入超过 5000 万美元。 核心功能 Live Chess 花了 5 个多月才发布。到那时&#xff0c;该网站已经拥有近100,000名会员。Chess.com 域名的重要性他们 80% 的用户来自过去 4 年 Chess.com的故事是如何开…

[算法前沿]--004-transformer的前世今生

文章目录 1.transformer介绍1.1 注意力机制1.2 Transformer架构1.2.1编码器1.2.2解码器 2. Transformer中的模块2.1 注意模块2.1.1 缩放点积注意事项2.1.2 多头注意 2.2 Transformer中的注意事项2.2.1 自注意2.2.2 掩蔽的自注意&#xff08;自回归或因果注意&#xff09;2.2.3 …

027:Mapbox GL加载circle样式图层,用data-driven风格绘制圆形

第027个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中加载circle样式图层。圆形样式图层在地图上呈现一个或多个实心圆。 您可以使用圆形图层来配置矢量切片中点或点集合要素的视觉外观。 圆形层渲染其半径以屏幕单位测量的圆形。 直接复制下面的 vue+mapbox源…