文章目录
- 数据湖Iceberg-SparkSQL集成
- 一、环境准备
- 安装Spark
- 二、Spark配置Catalog
- 2.1在配置文件中添加HiveCatalog与HadoopCatalog配置(一劳永逸)
- 2.2使用spark-sql连接Hive Catalog
- 2.3使用spark-sql连接Hadoop Catalog
- 三、SQL操作
- 3.1 创建表
- 创建分区表
- 分区表
- 创建隐藏分区表
- 使用CTAS(Create Table xxx As Select)语法建表
- 使用Replace Table建表
- 3.2删除表
- 3.3修改表
- 修改表名(不支持修改HadoopCatalog的表名)
- 修改表属性
- 删除表属性
- 添加列
- 修改列
- **修改列名**
- Alter Column修改类型(只允许安全的转换)
- Alter Column 修改列的注释
- Alter Column修改列的顺序
- Alter Column修改列是否允许为null
- 删除列
- 添加分区(spark3,需要在配置文件中扩展)
- 删除分区(Spark3,需要配置扩展)
- 修改分区(Spark3,需要配置扩展)
- 修改表的写入顺序
- 按分区并行写入
- 3.4插入数据
- INSERT INTO
- MERGE INTO 行级更新
- 3.5查询数据
- 普通查询
- 查询元数据
- 3.6存储过程
- 语法
- 四、DataFrame操作
- 4.1环境准备
- 创建Maven工程,pom.xml文件内容
- 代码中配置Catalog
- 4.2读取表
- 加载表
- 时间旅行:查询指定时间戳之前的数据
- 时间旅行:查询指定快照 ID 的数据
- 读取指定两个快照 ID 之间增量数据
- 4.3检查表
- 查询表元数据
- 指定快照ID查询元数据
- 4.4写入表
- 创建替换表结构并写入数据
- append添加数据
- 动态分区覆盖
- 静态分区覆盖
- 4.5维护表
- 获取Table对象
- 快照过期清理
- 删除无效文件
- 合并小文件
- 五、整体代码
- 代码
- 配置文件
- 可能遇到的问题
- 启动spark-sql异常` A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.`
数据湖Iceberg-简介(1)
数据湖Iceberg-存储结构(2)
数据湖Iceberg-Hive集成Iceberg(3)
数据湖Iceberg-SparkSQL集成(4)
数据湖Iceberg-FlinkSQL集成(5)
数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)
数据湖Iceberg-Flink DataFrame集成(7)
数据湖Iceberg-SparkSQL集成
一、环境准备
Spark安装包下载地址:
https://mirrors.huaweicloud.com/apache/spark
iceberg官网:
https://iceberg.apache.org/releases/#110-release
安装Spark
1.Spark与Iceberg的版本对应关系如下
Spark 版本 | Iceberg 版本 |
---|---|
2.4 | 0.7.0-incubating – 1.1.0 |
3 | 0.9.0 – 1.0.0 |
3.1 | 0.12.0 – 1.1.0 |
3.2 | 0.13.0 – 1.1.0 |
3.3 | 0.14.0 – 1.1.0 |
2.上传并解压Spark安装包
tar -zxvf spark-3.2.1-bin-hadoop2.7.tgz
修改目录名为spark-3.2.1
3.配置环境变量
sudo vim /etc/profile.d/spark.sh
export SPARK_HOME=/opt/spark-3.2.1
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile.d/spark.sh
4.拷贝iceberg的jar包到Spark的jars目录
iceberg-sparkjar包下载地址:
https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.2_2.12/1.2.0/iceberg-spark-runtime-3.2_2.12-1.2.0.jar
cp iceberg-spark-runtime-3.2_2.12-1.2.0.jar $SPARK_HOME/jars
5.修改spark目录为hdfs用户权限
平时使用hdfs提交任务,不设置一会用hdfs用户启动spark-sql会报错
chown -R hdfs.hdfs spark-3.2.1
二、Spark配置Catalog
Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是Iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。
官方文档:https://iceberg.apache.org/docs/1.1.0/getting-started/
2.1在配置文件中添加HiveCatalog与HadoopCatalog配置(一劳永逸)
修改$spark/conf/spark-defaults.conf
配置文件,添加以下内容
spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type=hive
spark.sql.catalog.hive_prod.uri=thrift://bigdata-24-199:9083
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://bigdata-24-198:8020/warehouse/spark-iceberg
# 添加分区需要使用该配置
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
直接启动spark-sql就可以使用配置的HiveCatalog和HadoopCatalog了
2.2使用spark-sql连接Hive Catalog
cd /opt/spark-3.2.1
spark-sql \
--conf spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_prod.type=hive \
--conf spark.sql.catalog.hive_prod.uri=thrift://bigdata-24-199:9083
配置 | 含义 |
---|---|
hive_prod | 我们要创建Hive catalog名称 |
spark.sql.catalog.hive_prod.uri | hive-site.xml配置的hive.metastore.uris值 |
执行该命令会在之前的当前目录生成一个
metastore_db
目录和derby.log
日志文件-rw-r--r-- 1 hdfs hadoop 659 Apr 13 16:42 derby.log drwxr-xr-x 5 hdfs hadoop 4096 Apr 13 16:42 metastore_db
基础使用:
# 使用hive_prod catalog
spark-sql> use hive_prod;
# 进入default数据库
spark-sql> use default;
Time taken: 0.104 seconds
# 创建表
spark-sql> CREATE TABLE hive_prod.default.sample2 (
> id bigint COMMENT 'unique id',
> data string)
> USING iceberg;
Time taken: 3.271 seconds
# 查看表,这里可以看到hive中的表
spark-sql> show tables;
sample2
Time taken: 0.729 seconds, Fetched 9 row(s)
# 写入数据
spark-sql> INSERT INTO hive_prod.default.sample2 VALUES(1,'数据1'),(2,'数据2');
Time taken: 6.256 seconds
# 查看数据
spark-sql> select * from hive_prod.default.sample2;
1 数据1
2 数据2
Time taken: 2.117 seconds, Fetched 2 row(s)
2.3使用spark-sql连接Hadoop Catalog
cd /opt/spark-3.2.1
spark-sql \
--conf spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_prod.type=hadoop \
--conf spark.sql.catalog.hadoop_prod.warehouse=hdfs://bigdata-24-198:8020/warehouse/spark-iceberg
三、SQL操作
后面统一使用在2.1章节配置文件添加配置方式演示
3.1 创建表
# 使用catalog hadoop_prod
use hadoop_prod;
# 创建数据库,默认没有
create database default;
# 进入数据库
use default;
# 创建表
CREATE TABLE hadoop_prod.default.sample1 (
id bigint COMMENT 'unique id',
data string)
USING iceberg
- PARTITIONED BY (partition-expressions) :配置分区
- LOCATION ‘(fully-qualified-uri)’ :指定表路径
- COMMENT ‘table documentation’ :配置表备注
- TBLPROPERTIES (‘key’=‘value’, …) :配置表属性
表属性:https://iceberg.apache.org/docs/latest/configuration/
对Iceberg表的每次更改都会生成一个新的元数据文件(json文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。
如果要自动清除元数据文件,在表属性中设置write.metadata.delete-after-commit.enabled=true。这将保留一些元数据文件(直到write.metadata.previous-versions-max),并在每个新创建的元数据文件之后删除旧的元数据文件。
创建分区表
分区表
CREATE TABLE hadoop_prod.default.sample2 (
id bigint,
data string,
category string)
USING iceberg
PARTITIONED BY (category)
看看插入数据数据存储结构如何
INSERT INTO hadoop_prod.default.sample2 VALUES(1,'数据1','分类1'),(2,'数据2','分类1'),(3,'数据3','分类2')
查看数据存储目录
查看元数据目录
发现会根据分区指定字段按照目录存储
创建隐藏分区表
CREATE TABLE hadoop_prod.default.sample3 (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);
支持的转换有:
-
years(ts):按年划分
-
months(ts):按月划分
-
days(ts)或date(ts):等效于dateint分区
-
hours(ts)或date_hour(ts):等效于dateint和hour分区
-
bucket(N, col):按哈希值划分mod N个桶
-
truncate(L, col):按截断为L的值划分,字符串被截断为给定的长度,整型和长型截断为bin: truncate(10, i)生成分区0,10,20,30,…
看看插入数据存储结构如何
INSERT INTO hadoop_prod.default.sample3 VALUES(1,'数据1','分类1',cast(from_unixtime(1649826749) as timestamp)),
(2,'数据2','分类1',cast(from_unixtime(1681362749) as timestamp)),
(3,'数据3','分类2',cast(from_unixtime(1681449149) as timestamp));
查看数据存储目录
发现,目录顺序与创建隐藏分区书序相同
使用CTAS(Create Table xxx As Select)语法建表
新表会包含源表数据
CREATE TABLE hadoop_prod.default.sample4
USING iceberg
AS SELECT * from hadoop_prod.default.sample3
注意:使用CTAS方法创建,如果新表不指定分区字段默认创建的表是不带分区字段的。表属性也相同
所以,创建新表时需要带上分区、属性配置:
CREATE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (bucket(8, id), hours(ts), category)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3
使用Replace Table建表
新表会包含源表数据
REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
AS SELECT * from hadoop_prod.default.sample3;
同样使用Replace Table替换表结构也不会带分区字段和表属性
REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3;
如果Replace的表不存在则会报错,需要使用CREATE OR REPLACE TABLE
语法创建即可
分区和表属性同样不会带进去
CREATE OR REPLACE TABLE hadoop_prod.default.sample6
USING iceberg
AS SELECT * from hadoop_prod.default.sample3
3.2删除表
对于HadoopCatalog而言,运行DROP TABLE将从catalog中删除表并删除表内容(整个存储目录均删除了)。
# 创建表
CREATE EXTERNAL TABLE hadoop_prod.default.sample15 (
id bigint COMMENT 'unique id',
data string)
USING iceberg;
# 添加数据
INSERT INTO hadoop_prod.default.sample15 values(1,'a');
# 删除表
DROP TABLE hadoop_prod.default.sample15;
对于HiveCatalog而言:
- 在0.14之前,运行DROP TABLE将从catalog中删除表并删除表内容。
- 从0.14开始,DROP TABLE只会从catalog中删除表,不会删除数据。为了删除表内容,应该使用DROP table PURGE。
CREATE TABLE hive_prod.default.sample16 (
id bigint COMMENT 'unique id',
data string)
USING iceberg;
INSERT INTO hive_prod.default.sample16 values(1,'a');
添加数据之后,hdfs文件分布
删除表
DROP TABLE hive_prod.default.sample16;
此时在spark-sql中show tables
已经看不到表名了,但是在hdfs中表的数据都存在
删除表和HDFS上的数据
我测试时候HDFS数据也没有删除~
记录一下,后续再验证 hdfs dfs -ls /warehouse/tablespace/managed/hive/sample16/metadata
DROP TABLE hive_prod.default.sample16 PURGE;
3.3修改表
Iceberg在Spark 3中完全支持ALTER TABLE,包括:
- 重命名表
- 设置或删除表属性
- 添加、删除和重命名列
- 添加、删除和重命名嵌套字段
- 重新排序顶级列和嵌套结构字段
- 扩大int、float和decimal字段的类型
- 将必选列变为可选列
此外,还可以使用SQL扩展来添加对分区演变的支持和设置表的写顺序。
创建一个表做验证使用
hive catalog 表
CREATE TABLE hive_prod.default.sample_alert (
id bigint COMMENT 'unique id',
data string)
USING iceberg;
hadoop catalog 表
CREATE TABLE hadoop_prod.default.sample_alert (
id bigint COMMENT 'unique id',
data string)
USING iceberg;
修改表名(不支持修改HadoopCatalog的表名)
修改表名后,存储在HDFS的目录名是不变的
ALTER TABLE hive_prod.default.sample_alert RENAME TO hive_prod.default.sample_alert2;
修改表属性
# 修改表属性
ALTER TABLE hive_prod.default.sample_alert2 SET TBLPROPERTIES (
'read.split.target-size'='268435456'
);
# 添加表描述
ALTER TABLE hive_prod.default.sample_alert2 SET TBLPROPERTIES (
'comment' = 'A table comment.'
);
删除表属性
ALTER TABLE hive_prod.default.sample_alert2 UNSET TBLPROPERTIES ('read.split.target-size');
添加列
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMNS (
category string comment 'new_column'
)
-- 添加struct类型的列
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN point struct<x: double, y: double>;
-- 往struct类型的列中添加字段
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN point.z double
-- 创建struct的嵌套数组列
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN points array<struct<x: double, y: double>>;
-- 在数组中的结构中添加一个字段。使用关键字'element'访问数组的元素列。
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN points.element.z double
-- 创建一个包含Map类型的列,key和value都为struct类型
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;
-- 在Map类型的value的struct中添加一个字段。
ALTER TABLE hive_prod.default.sample_alert2
ADD COLUMN pointsm.value.b int
在Spark 2.4.4及以后版本中(只针对Hadoop Catalog可以这么添加),可以通过添加FIRST或AFTER子句在任何位置添加列:
ALTER TABLE hadoop_prod.default.sample_alert
ADD COLUMN new_column1 bigint AFTER id
ALTER TABLE hadoop_prod.default.sample_alert
ADD COLUMN new_column2 bigint FIRST
修改列
修改列名
修改列名后,根据新的列名就可以查询到数据
ALTER TABLE hadoop_prod.default.sample_alert RENAME COLUMN data TO data1;
Alter Column修改类型(只允许安全的转换)
ALTER TABLE hadoop_prod.default.sample_alert
ADD COLUMNS (
idd int
);
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN idd TYPE bigint;
Alter Column 修改列的注释
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN ee COMMENT 'a';
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN id COMMENT 'b';
Alter Column修改列的顺序
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN id FIRST;
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN new_column2 AFTER new_column1;
Alter Column修改列是否允许为null
ALTER TABLE hadoop_prod.default.sample_alert ALTER COLUMN id DROP NOT NULL;
ALTER COLUMN不用于更新struct类型。使用ADD COLUMN和DROP COLUMN添加或删除struct类型的字段。
删除列
ALTER TABLE hadoop_prod.default.sample_alert DROP COLUMN idd;
ALTER TABLE hadoop_prod.default.sample_alert DROP COLUMN point.y;
添加分区(spark3,需要在配置文件中扩展)
添加配置文件
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
CREATE TABLE hadoop_prod.default.sample1 (
id bigint COMMENT 'unique id',
category string,
ts timestamp,
data string)
USING iceberg;
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD category;
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id);
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD truncate(data, 4);
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD years(ts);
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, category) AS shard;
验证一下修改分区数据存储的目录
1.创建表无分区时添加数据
insert into hadoop_prod.default.sample1 values(1,'1',null,'1');
2.添加分区字段category
后添加数据
insert into hadoop_prod.default.sample1 values(1,'1',null,'1');
发现新创建了一个目录目录下有数据文件,之前的文件也还在,分析:之前的数据不受影响,新数据会按照最新的配置进行
查询时刚刚写入的两条数据也都有
spark-sql> select * from hadoop_prod.default.sample1;
1 1 NULL 1
1 1 NULL 1
3.添加分区bucket(16, id)
后添加数据
insert into hadoop_prod.default.sample1 values(1,'1',null,'1');
发现和刚刚一样,新数据按照新的分区规则写入,老的数据还是不变,后面就不一一测试了,我们可以确认刚刚分析的结论:之前的数据不受影响,新数据会按照最新的配置进行
删除分区(Spark3,需要配置扩展)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD category;
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD bucket(16, id);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD truncate(data, 4);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD years(ts);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD shard;
注意,尽管删除了分区,但列仍然存在于表结构中。
删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。
当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么覆盖将覆盖每小时划分的分区,而不再覆盖按天划分的分区。
删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。
修改分区(Spark3,需要配置扩展)
ALTER TABLE hadoop_prod.default.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id)
修改表的写入顺序
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category, id
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC, id DESC
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST
表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。
WRITE ORDERED BY设置了一个全局排序,即跨任务的行排序,就像在INSERT命令中使用ORDER BY一样:
INSERT INTO hadoop_prod.default.sample1
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category
要在每个任务内排序,而不是跨任务排序,使用local ORDERED BY:
ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id
按分区并行写入
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id
3.4插入数据
测试表
CREATE TABLE hadoop_prod.default.a (
id bigint,
count bigint)
USING iceberg;
CREATE TABLE hadoop_prod.default.b (
id bigint,
count bigint,
flag string)
USING iceberg;
INSERT INTO
INSERT INTO hadoop_prod.default.a VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO hadoop_prod.default.b VALUES (1, 1, 'a'), (2, 2, 'b'), (4, 4, 'd');
MERGE INTO 行级更新
MERGE INTO hadoop_prod.default.a t
USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
WHEN MATCHED AND u.flag='a' THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count)
语句分析:
表a与表b通过id字段关联,当表b的flag='b’时,修改表a的count值=表a count+表b count;
当表b的flag='a’时,删除表a中的数据;
当匹配不上时,新增该条数据;
执行流程与执行结果
spark-sql> select * from hadoop_prod.default.a;
1 1
2 2
3 3
Time taken: 0.451 seconds, Fetched 3 row(s)
spark-sql> select * from hadoop_prod.default.b;
1 1 a
2 2 b
4 4 d
Time taken: 0.542 seconds, Fetched 3 row(s)
spark-sql> MERGE INTO hadoop_prod.default.a t
> USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
> WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
> WHEN MATCHED AND u.flag='a' THEN DELETE
> WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count);
Time taken: 4.833 seconds
spark-sql> select * from hadoop_prod.default.a;
4 4
2 4
3 3
Time taken: 0.281 seconds, Fetched 3 row(s)
3.5查询数据
普通查询
SELECT count(1) as count, data
FROM local.db.table
GROUP BY data
查询元数据
// 查询表快照
SELECT * FROM hadoop_prod.default.a.snapshots
// 查询数据文件信息
SELECT * FROM hadoop_prod.default.a.files
// 查询表历史
SELECT * FROM hadoop_prod.default.a.history
// 查询 manifest
SELECT * FROM hadoop_prod.default.a.manifests
3.6存储过程
Procedures可以通过CALL从任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。
语法
按照参数名传参
CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)
当按位置传递参数时,如果结束参数是可选的,则只有结束参数可以省略。
CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)
四、DataFrame操作
4.1环境准备
创建Maven工程,pom.xml文件内容
<properties>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.2.1</spark.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<!--fastjson <= 1.2.80 存在安全漏洞,-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- assembly打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<!--Maven编译scala所需依赖-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
代码中配置Catalog
var spark: SparkSession = SparkSession.builder()
.master("local")
.appName(this.getClass.getSimpleName)
//指定hive catalog, catalog名称为iceberg_hive
.config("spark.sql.catalog.iceberg_hive", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hive.type", "hive")
.config("spark.sql.catalog.iceberg_hive.uri", "thrift://172.16.24.199:9083")
// .config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名称为iceberg_hadoop
.config("spark.sql.catalog.iceberg_hadoop", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hadoop.type", "hadoop")
.config("spark.sql.catalog.iceberg_hadoop.warehouse", "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
.getOrCreate()
4.2读取表
加载表
/**
* 使用 Spark SQL 的方式展示指定表格的数据
*
* @param hadoopCatalogAndNamespace Hadoop Catalog 和 namespace,格式为 "catalog.namespace"
* @param tableName 表格名称
*/
def showTableData1(hadoopCatalogAndNamespace: String, tableName: String) = {
// 切换到指定的 namespace
spark.sql("use " + hadoopCatalogAndNamespace)
// 获取指定表格的全部数据
spark.table(tableName)
}
/**
* 使用 Spark SQL 的方式展示指定表格的数据
*
* @param hadoopCatalogAndNamespace Hadoop Catalog 和 namespace,格式为 "catalog.namespace"
* @param tableName 表格名称
*/
def showTableData2(hadoopCatalogAndNamespace: String, tableName: String) = {
// 切换到指定的 namespace
spark.sql("use " + hadoopCatalogAndNamespace)
// 获取指定表格的全部数据
spark.table(tableName)
}
/**
* 使用 Iceberg 的方式展示指定表格的数据
*
* @param hadoopNamespace Hadoop 的 namespace
* @param tableName 表格名称
*/
def showTableData3(hadoopNamespace: String, tableName: String) = {
// 拼接表格在 HDFS 上的路径
val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
// 通过 Iceberg 的方式获取指定表格的全部数据
spark.read
.format("iceberg")
.load(tableHDFS)
}
时间旅行:查询指定时间戳之前的数据
/**
* 查询指定时间戳之前的数据
*
* @param hadoopNamespace Hadoop 的 namespace
* @param tableName 表格名称
* @return DataFrame 对象
*/
def queryDataBeforeTimestamp(hadoopNamespace: String, tableName: String) = {
// 拼接表格在 HDFS 上的路径
val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
// 使用 Iceberg 的方式获取指定时间戳之前的数据
spark.read
.option("as-of-timestamp", "1681804793000")
.format("iceberg")
.load(tableHDFS)
}
时间旅行:查询指定快照 ID 的数据
/**
* 查询指定快照 ID 的数据
*
* @param hadoopNamespace Hadoop 的名称空间
* @param tableName 表的名称
* @return DataFrame 对象
*/
def querySnapshotData(hadoopNamespace: String, tableName: String) = {
// 构建表在 HDFS 上的路径
val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
// 使用 Iceberg 的格式加载指定快照 ID 的数据,并返回 DataFrame 对象
spark.read
.option("snapshot-id", 1150221491570194004L)
.format("iceberg")
.load(tableHDFS)
}
读取指定两个快照 ID 之间增量数据
查询的表只能是append的方式写数据,不支持replace, overwrite, delete操作。
/**
* 读取指定两个快照 ID 之间增量数据并返回 DataFrame 对象
*
* @param hadoopNamespace Hadoop 的名称空间
* @param tableName 表的名称
* @return DataFrame 对象
*/
def getTwoSnapshotsIncrementalData(hadoopNamespace: String, tableName: String): DataFrame = {
// 构建表在 HDFS 上的路径
val tableHDFS = s"hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/$hadoopNamespace/$tableName"
// 使用 Iceberg 的格式加载指定两个快照 ID 之间的增量数据,并返回 DataFrame 对象
spark.read
.format("iceberg")
.option("start-snapshot-id", 4809403494114685238L)
.option("end-snapshot-id", 1150221491570194004L)
.load(tableHDFS)
}
4.3检查表
查询表元数据
/**
* 加载指定快照 ID 下的 Iceberg 表元数据并返回 DataFrame
*
* @return DataFrame 对象
*/
def loadSnapshotDataFiles() = {
spark.read.format("iceberg")
.option("snapshot-id", 4809403494114685238L)
.load("iceberg_hadoop.default.sample1.files")
}
/**
* 加载指定 Iceberg 表的元数据并返回 DataFrame
*
* @return DataFrame 对象
*/
def loadIcebergTableFiles() = {
spark.read.format("iceberg").load("hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/default/sample1#files")
}
指定快照ID查询元数据
/**
* 加载指定快照 ID 下的 Iceberg 表元数据并返回 DataFrame
*
* @return DataFrame 对象
*/
def loadSnapshotDataFiles() = {
spark.read.format("iceberg")
.option("snapshot-id", 4809403494114685238L)
.load("iceberg_hadoop.default.sample1.files")
}
4.4写入表
创建替换表结构并写入数据
// 创建或替换表
def createOrReplaceTable(tableName: String) = {
val spark = this.spark // 获取 SparkSession 对象
// 创建包含数据的 DataFrame 对象。
val dataFrame: DataFrame = spark.createDataFrame(
Seq(Sample1(1, new Date().toLocaleString, "a"),
Sample1(2, new Date().toLocaleString, "b"),
Sample1(3, new Date().toLocaleString, "c")))
import spark.implicits._
dataFrame.writeTo(tableName) // 写入表
.tableProperty("write.format.default", "orc") // 表设置
//$ 符号是 Scala 语言中用于将对象转换为列的操作符,例如 $"category" 就表示将字符串 "category" 转换为 DataFrame 中的列
.partitionedBy($"category") // 分区
.createOrReplace() // 创建或替换表
}
append添加数据
/**
* 向指定的 Iceberg 表追加 Sample1 类型的数据。
*
* @param tableName 要追加数据的表名称。
*/
def appendData(tableName: String) = {
spark.sql("select count(1) from " + tableName).show()
// 创建包含数据的 DataFrame 对象。
val dataFrame: DataFrame = spark.createDataFrame(
Seq(Sample1(1, new Date().toLocaleString, "a"),
Sample1(2, new Date().toLocaleString, "b"),
Sample1(3, new Date().toLocaleString, "c")))
// 将 DataFrame 持久化到指定表,并调用 append() 方法将数据追加到指定表中。
dataFrame.writeTo(tableName).append()
spark.sql("select count(1) from " + tableName).show()
}
动态分区覆盖
写入的数据,根据分区字段,自动覆盖写入,数据不存在的分区就不覆盖了
// 覆盖指定表的分区数据
def overwritePartitionsData(tableName: String) = {
// 使用 SQL 查询统计表的记录数并在控制台输出结果
spark.sql("select count(1) from " + tableName).show()
// 创建包含数据的 DataFrame 对象。
val dataFrame: DataFrame = spark.createDataFrame(
Seq(Sample1(1, new Date().toLocaleString, "a"),
Sample1(2, new Date().toLocaleString, "b")))
// 将 DataFrame 持久化到指定表,并调用 append() 方法将数据追加到指定表中。
dataFrame.writeTo(tableName).overwritePartitions()
// 使用 SQL 查询统计表的记录数并在控制台输出结果
spark.sql("select count(1) from " + tableName).show()
}
静态分区覆盖
// 覆盖指定表的指定分区数据,其他分区直接追加
def overwritePartitionsData2(tableName: String) = {
// 获取 SparkSession 对象并为其创建一个变量 spark。
val spark: SparkSession = this.spark
// 使用 SQL 查询统计表的记录数并在控制台输出结果。
spark.sql("select count(1) from " + tableName).show()
import spark.implicits._
val dataFrame: DataFrame = spark.createDataFrame(
Seq(Sample1(5, new Date().toLocaleString, "a"),
Sample1(6, new Date().toLocaleString, "b")))
// 将指定 DataFrame 对象的数据覆盖到 tableName 表中,其中仅覆盖 category 字段等于 a 的分区。
dataFrame.writeTo(tableName).overwrite($"category" === "a")
// 使用 SQL 查询统计表的记录数并在控制台输出结果。
spark.sql("select count(1) from " + tableName).show()
}
4.5维护表
获取Table对象
- HadoopCatalog
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://hadoop1:8020/warehouse/spark-iceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("db","table1"))
- HiveCatalog
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)
val properties = new util.HashMap[String,String]()
properties.put("warehouse", "hdfs://hadoop1:8020/warehouse/spark-iceberg")
properties.put("uri", "thrift://hadoop1:9083")
catalog.initialize("hive", properties)
val table: Table = catalog.loadTable(TableIdentifier.of("db", "table1"))
快照过期清理
每次写入Iceberg表都会创建一个表的新快照或版本。快照可以用于时间旅行查询,或者可以将表回滚到任何有效的快照。建议设置快照过期时间,过期的旧快照将从元数据中删除(不再可用于时间旅行查询)。
/**
* 对样例表进行快照过期清理
*/
def snapshotOverdueClean() = {
// 创建 Hadoop Configuration 对象
val conf = new Configuration()
// 基于 Hadoop Catalog API 加载表格
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
// 设置过期时间(这里设置7天前的时间)
val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
// 过期快照并提交更改
table.expireSnapshots()
.expireOlderThan(tsToExpire)
.commit()
}
或使用SparkActions来设置过期:
/**
* 对样例表进行快照过期清理2
*/
def snapshotOverdueClean2() = {
// 创建 Hadoop Configuration 对象
val conf = new Configuration()
// 基于 Hadoop Catalog API 加载表格
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
// 设置过期时间(这里设置7天前的时间)
val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
SparkActions.get()
.expireSnapshots(table)
.expireOlderThan(tsToExpire)
.execute()
}
删除无效文件
在Spark和其他分布式处理引擎中,任务或作业失败可能会留下未被表元数据引用的文件,在某些情况下,正常的快照过期可能无法确定不再需要并删除该文件。
/**
* 删除无效文件
*/
def deleteInvalidFile() = {
// 创建 Hadoop Configuration 对象
val conf = new Configuration()
// 基于 Hadoop Catalog API 加载表格
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
SparkActions
.get()
.deleteOrphanFiles(table)
.execute()
}
合并小文件
数据文件过多会导致更多的元数据存储在清单文件中,而较小的数据文件会导致不必要的元数据量和更低效率的文件打开成本。
/**
* 合并小文件
*/
def mergeSmallFiles() = {
val conf = new Configuration()
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
// 加载表对象
val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
SparkActions
.get() //使用SparkActions获取操作对象实例
.rewriteDataFiles(table) //将表中的数据重写成查询引擎可读的格式
.filter(Expressions.equal("category", "a")) //使用Iceberg表达式引擎过滤category列等于a的记录
.option("target-file-size-bytes", 1024L.toString) //设置目标文件大小为1KB
.execute() //执行合并小文件操作并生成新版本
}
五、整体代码
代码
package wiki.hadoop.iceberg.spark
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.Table
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.spark.actions.SparkActions
import org.apache.spark.sql.{DataFrame, SparkSession}
import wiki.hadoop.iceberg.spark.Example.Sample1
/** *
*
* @date 2023/4/19 下午3:51
* @description
* 1. descTableDesc(tableName: String): 显示指定表的元数据信息。
* 2. createOrReplaceTable(tableName: String): 创建表,如果表存在则直接Replace表结构。
* 3. appendData(tableName: String): 向指定的 Iceberg 表追加 Sample1 类型的数据。
* 4. tableOrNameSpaceExists(): 判断表和数据库是否存在。
* 5. loadSnapshotDataFiles(): 加载指定快照 ID 下的 Iceberg 表元数据并返回 DataFrame。
* 6. loadIcebergTableFiles(fullTableName: String): 加载指定 Iceberg 表的元数据并返回 DataFrame。
* 7. getTwoSnapshotsIncrementalData(hadoopNamespace: String, tableName: String): 读取指定两个快照 ID 之间增量数据并返回 DataFrame 对象。
* 8. querySnapshotData(hadoopNamespace: String, tableName: String): 查询指定快照 ID 的数据。
* 9. queryDataBeforeTimestamp(hadoopNamespace: String, tableName: String): 查询指定时间戳之前的数据。
* 10. showTableData1(hadoopCatalogAndNamespace: String, tableName: String): 使用 Spark SQL 的方式展示指定表格的数据。
* 11. showTableData2(hadoopCatalogAndNamespace: String, tableName: String): 使用 Spark SQL 的方式展示指定表格的数据。
* 12. showTableData3(hadoopNamespace: String, tableName: String): 使用 Iceberg 的方式展示指定表格的数据。
* 13. printDataFrame(dataFrame: DataFrame): 打印 DataFrame 中的数据,格式为每行数据按列名分割,行与行之间使用分隔符分割。
* 14. showTables(): 查看指定库的所有表。
* 15. createTable(catalogAndNamespace: String, tableName: String): 创建表。
* 16. createDatabase(catalogAndNamespace: String): 如果不存在创建数据库。
* @author Jast
*/
object HadoopIcebergExample {
//指定一个特定的 Hadoop 用户运行应用程序
System.setProperty("HADOOP_USER_NAME", "hdfs")
//样例类,表结构
case class HadoopIcebergSample(id: Long, data: String, category: String)
var spark: SparkSession = _
def main(args: Array[String]): Unit = {
spark = SparkSession.builder()
.master("local")
.appName(this.getClass.getSimpleName)
//指定hive catalog, catalog名称为iceberg_hive
.config("spark.sql.catalog.iceberg_hive", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hive.type", "hive")
.config("spark.sql.catalog.iceberg_hive.uri", "thrift://172.16.24.199:9083")
// .config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名称为iceberg_hadoop
.config("spark.sql.catalog.iceberg_hadoop", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hadoop.type", "hadoop")
.config("spark.sql.catalog.iceberg_hadoop.warehouse", "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
.getOrCreate()
//
val hadoopCatalog = "iceberg_hadoop"
val hadoopNamespace = "default"
val hadoopCatalogAndNamespace = hadoopCatalog + "." + hadoopNamespace
val tableName = "sample1"
val fullTableName = hadoopCatalogAndNamespace + "." + tableName
// 创建 namespace
// createDatabase(hadoopCatalogAndNamespace)
// 创建表
// createTable(hadoopCatalogAndNamespace, tableName)
// 查看所有表
// showTables()
// 查看表数据
// printDataFrame(showTableData1(hadoopCatalogAndNamespace, tableName))
// printDataFrame(showTableData2(hadoopCatalogAndNamespace, tableName))
// printDataFrame(showTableData3(hadoopNamespace, tableName))
// 查询指定时间戳之前的数据
// printDataFrame(queryDataBeforeTimestamp(hadoopNamespace, tableName))
// 查询指定快照 ID 的数据
// printDataFrame(querySnapshotData(hadoopNamespace, tableName))
// 读取指定两个快照 ID 之间增量数据
// printDataFrame(getTwoSnapshotsIncrementalData(hadoopNamespace, tableName))
// 加载指定 Iceberg 表的元数据
// printDataFrame(loadIcebergTableFiles(fullTableName))
// printDataFrame(loadIcebergTableFiles)
// 加载指定快照 ID 下的 Iceberg 表元数据
// printDataFrame(loadSnapshotDataFiles)
// 判断表和数据库是否存在
// tableOrNameSpaceExists()
// 显示指定表的元数据信息
// descTableDesc(fullTableName)
// 创建表,如果表存在则直接Replace表结构
// createOrReplaceTable(fullTableName)
// append添加数据
// appendData(fullTableName)
//覆盖指定表的分区数据,根据分区字段动态覆盖
overwritePartitionsData(fullTableName)
// 手动指定分区进行覆盖
overwritePartitionsData2(fullTableName)
// 快照过期清理
// snapshotOverdueClean
// snapshotOverdueClean2
// 合并小文件
// mergeSmallFiles
}
/**
* 显示指定表的元数据信息。
*
* @param tableName 要查看元数据的表名称。
*/
def descTableDesc(tableName: String) = {
// 使用 Spark SQL 调用 desc formatted 命令来查看表的元数据信息,并使用 show() 方法显示结果。
spark.sql(s"desc formatted $tableName").show(true)
}
// 创建或替换表
def createOrReplaceTable(tableName: String) = {
val spark = this.spark // 获取 SparkSession 对象
// 创建包含数据的 DataFrame 对象。
val dataFrame: DataFrame = spark.createDataFrame(
Seq(Sample1(1, new Date().toLocaleString, "a"),
Sample1(2, new Date().toLocaleString, "b"),
Sample1(3, new Date().toLocaleString, "c")))
import spark.implicits._
dataFrame.writeTo(tableName) // 写入表
.tableProperty("write.format.default", "orc") // 表设置
//$ 符号是 Scala 语言中用于将对象转换为列的操作符,例如 $"category" 就表示将字符串 "category" 转换为 DataFrame 中的列
.partitionedBy($"category") // 分区
.createOrReplace() // 创建或替换表
}
/**
* 向指定的 Iceberg 表追加 Sample1 类型的数据。
*
* @param tableName 要追加数据的表名称。
*/
def appendData(tableName: String) = {
spark.sql("select count(1) from " + tableName).show()
// 创建包含数据的 DataFrame 对象。
val dataFrame: DataFrame = spark.createDataFrame(
Seq(Sample1(1, new Date().toLocaleString, "a"),
Sample1(2, new Date().toLocaleString, "b"),
Sample1(3, new Date().toLocaleString, "c")))
// 将 DataFrame 持久化到指定表,并调用 append() 方法将数据追加到指定表中。
dataFrame.writeTo(tableName).append()
spark.sql("select count(1) from " + tableName).show()
}
// 覆盖指定表的分区数据,写入的数据,根据分区字段,自动覆盖写入,数据不存在的分区就不覆盖了
def overwritePartitionsData(tableName: String) = {
// 使用 SQL 查询统计表的记录数并在控制台输出结果
spark.sql("select count(1) from " + tableName).show()
// 创建包含数据的 DataFrame 对象。
val dataFrame: DataFrame = spark.createDataFrame(
Seq(Sample1(5, new Date().toLocaleString, "a"),
Sample1(6, new Date().toLocaleString, "b")))
// 将 DataFrame 持久化到指定表,并调用 append() 方法将数据追加到指定表中。
dataFrame.writeTo(tableName).overwritePartitions()
// 使用 SQL 查询统计表的记录数并在控制台输出结果
spark.sql("select count(1) from " + tableName).show()
}
// 覆盖指定表的指定分区数据,其他分区直接追加
def overwritePartitionsData2(tableName: String) = {
// 获取 SparkSession 对象并为其创建一个变量 spark。
val spark: SparkSession = this.spark
// 使用 SQL 查询统计表的记录数并在控制台输出结果。
spark.sql("select count(1) from " + tableName).show()
import spark.implicits._
val dataFrame: DataFrame = spark.createDataFrame(
Seq(Sample1(5, new Date().toLocaleString, "a"),
Sample1(6, new Date().toLocaleString, "b")))
// 将指定 DataFrame 对象的数据覆盖到 tableName 表中,其中仅覆盖 category 字段等于 a 的分区。
dataFrame.writeTo(tableName).overwrite($"category" === "a")
// 使用 SQL 查询统计表的记录数并在控制台输出结果。
spark.sql("select count(1) from " + tableName).show()
}
/**
* 判断表是否存在
*/
def tableOrNameSpaceExists() = {
val conf = new Configuration()
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
println("表是否存在:" + catalog.tableExists(TableIdentifier.of("default", "sample1")))
println("表是否存在:" + catalog.tableExists(TableIdentifier.of("default", "sample2")))
println("数据库是否存在:" + catalog.namespaceExists(Namespace.of("default")))
println("数据库是否存在:" + catalog.namespaceExists(Namespace.of("default8")))
}
/**
* 对样例表进行快照过期清理
*/
def snapshotOverdueClean() = {
// 创建 Hadoop Configuration 对象
val conf = new Configuration()
// 基于 Hadoop Catalog API 加载表格
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
// 设置过期时间(这里设置7天前的时间)
val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
// 过期快照并提交更改
table.expireSnapshots()
.expireOlderThan(tsToExpire)
.commit()
}
/**
* 对样例表进行快照过期清理2
*/
def snapshotOverdueClean2() = {
// 创建 Hadoop Configuration 对象
val conf = new Configuration()
// 基于 Hadoop Catalog API 加载表格
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
// 设置过期时间(这里设置7天前的时间)
val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
SparkActions.get()
.expireSnapshots(table)
.expireOlderThan(tsToExpire)
.execute()
}
/**
* 删除无效文件
*/
def deleteInvalidFile() = {
// 创建 Hadoop Configuration 对象
val conf = new Configuration()
// 基于 Hadoop Catalog API 加载表格
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
SparkActions
.get()
.deleteOrphanFiles(table)
.execute()
}
/**
* 合并小文件
*/
def mergeSmallFiles() = {
val conf = new Configuration()
val catalog = new HadoopCatalog(conf, "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code")
// 加载表对象
val table: Table = catalog.loadTable(TableIdentifier.of("default", "sample1"))
SparkActions
.get() //使用SparkActions获取操作对象实例
.rewriteDataFiles(table) //将表中的数据重写成查询引擎可读的格式
.filter(Expressions.equal("category", "a")) //使用Iceberg表达式引擎过滤category列等于a的记录
.option("target-file-size-bytes", 1024L.toString) //设置目标文件大小为1KB
.execute() //执行合并小文件操作并生成新版本
}
/**
* 加载指定快照 ID 下的 Iceberg 表元数据并返回 DataFrame
*
* @return DataFrame 对象
*/
def loadSnapshotDataFiles() = {
spark.read.format("iceberg")
.option("snapshot-id", 4809403494114685238L)
.load("iceberg_hadoop.default.sample1.files")
}
/**
* 加载指定 Iceberg 表的元数据并返回 DataFrame
*
* @param fullTableName 完整的表名,包括数据库名称和表名称
* @return DataFrame 对象
*/
def loadIcebergTableFiles(fullTableName: String) = {
spark.read.format("iceberg").load(fullTableName + ".files")
}
/**
* 加载指定 Iceberg 表的元数据并返回 DataFrame
*
* @return DataFrame 对象
*/
def loadIcebergTableFiles() = {
spark.read.format("iceberg").load("hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/default/sample1#files")
}
/**
* 读取指定两个快照 ID 之间增量数据并返回 DataFrame 对象
*
* @param hadoopNamespace Hadoop 的名称空间
* @param tableName 表的名称
* @return DataFrame 对象
*/
def getTwoSnapshotsIncrementalData(hadoopNamespace: String, tableName: String): DataFrame = {
// 构建表在 HDFS 上的路径
val tableHDFS = s"hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/$hadoopNamespace/$tableName"
// 使用 Iceberg 的格式加载指定两个快照 ID 之间的增量数据,并返回 DataFrame 对象
spark.read
.format("iceberg")
.option("start-snapshot-id", 4809403494114685238L)
.option("end-snapshot-id", 1150221491570194004L)
.load(tableHDFS)
}
/**
* 查询指定快照 ID 的数据
*
* @param hadoopNamespace Hadoop 的名称空间
* @param tableName 表的名称
* @return DataFrame 对象
*/
def querySnapshotData(hadoopNamespace: String, tableName: String) = {
// 构建表在 HDFS 上的路径
val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
// 使用 Iceberg 的格式加载指定快照 ID 的数据,并返回 DataFrame 对象
spark.read
.option("snapshot-id", 1150221491570194004L)
.format("iceberg")
.load(tableHDFS)
}
/**
* 查询指定时间戳之前的数据
*
* @param hadoopNamespace Hadoop 的 namespace
* @param tableName 表格名称
* @return DataFrame 对象
*/
def queryDataBeforeTimestamp(hadoopNamespace: String, tableName: String) = {
// 拼接表格在 HDFS 上的路径
val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
// 使用 Iceberg 的方式获取指定时间戳之前的数据
spark.read
.option("as-of-timestamp", "1681804793000")
.format("iceberg")
.load(tableHDFS)
}
/**
* 使用 Spark SQL 的方式展示指定表格的数据
*
* @param hadoopCatalogAndNamespace Hadoop Catalog 和 namespace,格式为 "catalog.namespace"
* @param tableName 表格名称
*/
def showTableData1(hadoopCatalogAndNamespace: String, tableName: String) = {
// 切换到指定的 namespace
spark.sql("use " + hadoopCatalogAndNamespace)
// 获取指定表格的全部数据
spark.table(tableName)
}
/**
* 使用 Spark SQL 的方式展示指定表格的数据
*
* @param hadoopCatalogAndNamespace Hadoop Catalog 和 namespace,格式为 "catalog.namespace"
* @param tableName 表格名称
*/
def showTableData2(hadoopCatalogAndNamespace: String, tableName: String) = {
// 切换到指定的 namespace
spark.sql("use " + hadoopCatalogAndNamespace)
// 获取指定表格的全部数据
spark.table(tableName)
}
/**
* 使用 Iceberg 的方式展示指定表格的数据
*
* @param hadoopNamespace Hadoop 的 namespace
* @param tableName 表格名称
*/
def showTableData3(hadoopNamespace: String, tableName: String) = {
// 拼接表格在 HDFS 上的路径
val tableHDFS = "hdfs://172.16.24.198:8020/warehouse/spark-iceberg-code/" + hadoopNamespace + "/" + tableName
// 通过 Iceberg 的方式获取指定表格的全部数据
spark.read
.format("iceberg")
.load(tableHDFS)
}
/**
* 打印 DataFrame 中的数据,格式为每行数据按列名分割,行与行之间使用分隔符分割
*
* @param dataFrame DataFrame 对象
*/
def printDataFrame(dataFrame: DataFrame): Unit = {
// 获取 DataFrame 的 schema 信息
val schema = dataFrame.schema.fields
// 遍历 DataFrame 中的每一行数据,使用 "getAs()" 方法获取每一列的值
dataFrame.foreach(row => {
for (i <- 0 until schema.length) {
println(schema(i).name + ":" + row.getAs(schema(i).name))
}
// 使用分隔符分割不同行
println("----------------------------------------------")
})
}
/**
* 查看指定库的所有表
*/
def showTables(): Unit = {
spark.sql("use iceberg_hadoop.default")
spark.sql("show tables").show(false)
}
/**
* 创建表
*
* @param catalogAndNamespace
* @param tableName
*/
def createTable(catalogAndNamespace: String, tableName: String): Unit = {
spark.sql("CREATE TABLE if not exists " + catalogAndNamespace + "." + tableName + " ( id bigint COMMENT 'unique id', data string) USING iceberg;")
}
/**
* 如果不存在创建数据库
*/
def createDatabase(catalogAndNamespace: String): Unit = {
spark.sql("create database if not exists " + catalogAndNamespace)
}
}
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-iceberg</artifactId>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.2.1</spark.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<!--fastjson <= 1.2.80 存在安全漏洞,-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- assembly打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<!--Maven编译scala所需依赖-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
可能遇到的问题
启动spark-sql异常 A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.
完整内容:
23/04/13 15:36:51 ERROR PoolWatchThread: Error in trying to obtain a connection. Retrying in 7000ms
java.sql.SQLException: A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.
at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.setReadOnly(Unknown Source)
at com.jolbox.bonecp.ConnectionHandle.setReadOnly(ConnectionHandle.java:1324)
at com.jolbox.bonecp.ConnectionHandle.<init>(ConnectionHandle.java:262)
at com.jolbox.bonecp.PoolWatchThread.fillConnections(PoolWatchThread.java:115)
at com.jolbox.bonecp.PoolWatchThread.run(PoolWatchThread.java:82)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: ERROR 25505: A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection.
at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
at org.apache.derby.impl.sql.conn.GenericAuthorizer.setReadOnlyConnection(Unknown Source)
at org.apache.derby.impl.sql.conn.GenericLanguageConnectionContext.setReadOnly(Unknown Source)
解决方法:
删除当前执行所在目录的metastore_db/db.lck
文件
show current namespace;