Iceberg与SparkSQL整合DDL操作

news2024/9/20 5:39:52

前言

使用SparkSql操作Iceberg表之前我们得先配置好catalog,配置方式参考这篇博客。

创建非分区表

Spark3使用USING iceberg来创建表:

CREATE TABLE prod.db.sample (
    id bigint NOT NULL COMMENT 'unique id',
    data string)
USING iceberg;

这里的数据类型,我们就用Spark的数据类型,iceberg会自动转成对应的iceberg类型。其实基本上一模一样,可以参考官网查看。

参数:

  • PARTITIONED BY (partition-expressions) :配置分区
  • LOCATION ‘(fully-qualified-uri)’ :指定表路径
  • COMMENT ‘table documentation’ :配置表备注
  • TBLPROPERTIES (‘key’=‘value’, …) :配置表属性

对 Iceberg 表的每次更改都会生成一个新的元数据文件(json 文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。如 果 要 自 动 清 除 元 数 据 文 件 , 在 表 属 性 中 设 置
write.metadata.delete-after-commit.enabled=true 。 这 将 保 留 一 些 元 数 据 文 件 ( 直 到
write.metadata.previous-versions-max),并在每个新创建的元数据文件之后删除旧的元数据文件。

创建分区表

分区使用PARTITIONED BY来指定。

  1. 非隐藏分区
CREATE TABLE prod.db.sample (
    id bigint,
    data string,
    category string)
USING iceberg
PARTITIONED BY (category);
  1. 隐藏分区
CREATE TABLE prod.db.sample (
    id bigint,
    data string,
    category string,
    ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);

支持的隐藏分区转换函数有:

  • year(ts):按年划分
  • month(ts):按月划分
  • day(ts)或 date(ts):等效于 dateint 分区
  • hour(ts)或 date_hour(ts):等效于 dateint 和 hour 分区
  • bucket(N, col):按哈希值划分 mod N 个桶
  • truncate(L, col):按截断为 L 的值划分,字符串被截断为给定的长度;整型和长型截断为 bin: truncate(10, i)生成分区 0,10,20,30,…。

老的函数years(ts), months(ts), days(ts) and hours(ts)也支持。

CATS(CREATE TABLE … AS SELECT)建表

当使用SparkCatalog时,Iceberg支持将CTAS作为原子操作。但在使用SparkSessionCatalog时不是原子的。所以我们一般最好用SparkCatalog。否则会有一些潜在的风险。

CREATE TABLE prod.db.sample
USING iceberg
AS SELECT ...

新创建的表不会继承源表的分区和属性,可以使用CTAS中的PARTITIONED BY和TBLPROPERTIES来声明新表的分区和属性。

CREATE TABLE prod.db.sample
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...

RTAS(REPLACE TABLE … AS SELECT)建表

当使用SparkCatalog时,Iceberg支持将RTAS作为原子操作。但在使用SparkSessionCatalog时不是原子性的。所以我们一般最好用SparkCatalog。否则会有一些潜在的风险。

替换的表会根据select查询的结果创建新的快照,但是会保留原表的历史记录。

REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...
REPLACE TABLE prod.db.sample
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...
CREATE OR REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...

如果我们仅仅是替换表中的数据,而不改变表的结构或属性,那么用INSERT OVERWRITE来替换REPLACE

删除表

  1. 删除表
DROP TABLE prod.db.sample;
  1. 删除表和数据
DROP TABLE prod.db.sample PURGE;

在 0.14 之前,运行 DROP TABLE 将从 catalog 中删除表并删除表内容。
从 0.14 开始,DROP TABLE 只会从 catalog 中删除表,不会删除数据。为了删除表内容,应该使用 DROP table PURGE

修改表

Iceberg 在 Spark 3 中完全支持 ALTER TABLE,包括:

  • 重命名表
  • 设置或删除表属性
  • 添加、删除和重命名列
  • 添加、删除和重命名嵌套字段
  • 重新排序顶级列和嵌套结构字段
  • 扩大 int、float 和 decimal 字段的类型
  • 将必选列变为可选

此外,还可以使用 SQL 扩展来添加对分区演变的支持和设置表的写顺序。

  1. 修改表名(ALTER TABLE … RENAME TO)
ALTER TABLE prod.db.sample RENAME TO prod.db.new_name;
  1. 修改表属性(ALTER TABLE … SET(UNSET) TBLPROPERTIES)
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    'read.split.target-size'='268435456'
);

包括修改comment

ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    'comment' = 'A table comment.'
);

USET可以移除属性

ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
  1. 添加列(ALTER TABLE … ADD COLUMN)
ALTER TABLE hadoop_prod.default.sample
ADD COLUMNS (
category string comment 'new_column'
)
-- 添加 struct 类型的列
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN point struct<x: double, y: double>;
-- 往 struct 类型的列中添加字段
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN point.z double
-- 创建 struct 的嵌套数组列
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN points array<struct<x: double, y: double>>;
-- 在数组中的结构中添加一个字段。使用关键字'element'访问数组的元素列。
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN points.element.z double
-- 创建一个包含 Map 类型的列,key 和 value 都为 struct 类型
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;
-- 在 Map 类型的 value 的 struct 中添加一个字段。
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN pointsm.value.b int

在 Spark 2.4.4 及以后版本中,可以通过添加 FIRST 或 AFTER 子句在任何位置添加

ALTER TABLE hadoop_prod.default.sample
ADD COLUMN new_column bigint AFTER id


ALTER TABLE hadoop_prod.default.sample
ADD COLUMN new_column bigint FIRST
  1. 修改列名(ALTER TABLE … RENAME COLUMN)
ALTER TABLE prod.db.sample RENAME COLUMN data TO payload;
ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude;
  1. 修改类型(ALTER TABLE … ALTER COLUMN)

注意:只允许安全的转换

ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double;
  1. 修改注释(ALTER TABLE … ALTER COLUMN)
ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double COMMENT 'unit is bytes per second';
ALTER TABLE prod.db.sample ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';
  1. 修改列顺序
ALTER TABLE prod.db.sample ALTER COLUMN col FIRST;
ALTER TABLE prod.db.sample ALTER COLUMN nested.col AFTER other_col;
  1. 修改列是否允许为NULL
ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL;

ALTER COLUMN 不用于更新 struct 类型。使用 ADD COLUMN 和 DROP COLUMN 添加或删除 struct 类型的字段。

  1. 删除列(ALTER TABLE … DROP COLUMN)
ALTER TABLE prod.db.sample DROP COLUMN id;
ALTER TABLE prod.db.sample DROP COLUMN point.z;
  1. 添加分区(Spark3,需要配置扩展,ALTER TABLE … ADD PARTITION FIELD)
    扩展配置,我们这篇博客也做了简单介绍,一般我们都会一次性修改Spark配置:
vim spark-default.conf
spark.sql.extensions =org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
ALTER TABLE prod.db.sample ADD PARTITION FIELD catalog; -- identity transform

修改分区转换:

ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id);
ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(4, data);
ALTER TABLE prod.db.sample ADD PARTITION FIELD year(ts);
-- use optional AS keyword to specify a custom name for the partition field 
ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id) AS shard;

注意添加分区字段是元数据操作,不会更改任何现有表数据。新数据将使用新分区写入,但现有数据将保留在旧分区布局中。对于元数据表中的新分区字段,旧数据文件将具有空值。

当表的分区发生变化时,动态分区覆盖行为也会发生变化,因为动态覆盖会隐式地替换分区。要显式覆盖,请使用新的DataFrameWriterV2 API。

重要
在这里插入图片描述

当你想要改变数据的分区粒度时,比如从每天的数据分区细化到每小时的数据分区,你可以利用transforms(转换)来实现这一点,而不需要删除原有的按天分区的字段。这么做的好处就是,历史的任务可能很多都是通过天粒度进行查看的,后面任务才会用小时查看,因此天分区不要删除了。这个很有用。

危险
在这里插入图片描述

当分区发生变化时,动态分区覆盖行为将发生变化。例如,如果你按天划分分区并改为按小时划分分区,覆盖将覆盖每小时分区,而不再覆盖按天分区。

  1. 删除分区(Spark3,需要配置扩展,ALTER TABLE … DROP PARTITION FIELD)
ALTER TABLE prod.db.sample DROP PARTITION FIELD catalog;
ALTER TABLE prod.db.sample DROP PARTITION FIELD bucket(16, id);
ALTER TABLE prod.db.sample DROP PARTITION FIELD truncate(4, data);
ALTER TABLE prod.db.sample DROP PARTITION FIELD year(ts);
ALTER TABLE prod.db.sample DROP PARTITION FIELD shard;

注意,尽管删除了分区,但列仍然存在于表结构中。
删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。

当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么覆盖将覆盖每小时划分的分区,而不再覆盖按天划分的分区。

删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。

  1. 修改分区(Spark3,需要配置扩展,ALTER TABLE … REPLACE PARTITION FIELD)
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts);
-- use optional AS keyword to specify a custom name for the new partition field 
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts) AS day_of_ts;
  1. 修改表的写入顺序(ALTER TABLE … WRITE ORDERED BY)
ALTER TABLE prod.db.sample WRITE ORDERED BY category, id
-- use optional ASC/DEC keyword to specify sort order of each field (default ASC)
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC, id DESC
-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST

表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。
WRITE ORDERED BY 设置了一个全局排序,即跨任务的行排序,就像在 INSERT 命令中使用 ORDER BY 一样

INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category

要在每个任务内排序,而不是跨任务排序,使用 local ORDERED BY:

ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id
  1. 按分区并行写入(ALTER TABLE … WRITE DISTRIBUTED BY PARTITION)
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id

参考文献

Spark DDL

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2112094.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

伦敦银ATR策略

ATR这个技术指标由J.Welles Wilder发明&#xff0c;‌主要用来衡量伦敦银的价格波动&#xff0c;它虽然‌不能直接反映银价走向及其趋势稳定性&#xff0c;但‌ATR指标价值越高&#xff0c;‌趋势改变的可能性就越高&#xff1b;‌价值越低&#xff0c;‌趋势的移动性就越弱。 …

麒麟安全加固工具,为系统打造坚固“金钟罩”!

当今数字化时代&#xff0c;系统安全的重要性不言而喻。为应对网络安全风险、满足用户高等级安全诉求&#xff0c;麒麟软件打造了满足用户高等级安全诉求的 “麒麟安全加固工具”&#xff0c;实现服务器操作系统安全配置的规范化、标准化、制度化&#xff0c;为系统安全打造坚固…

node.js、php、Java、python校园点餐与数据分析系统 校园食堂订餐系统(源码、调试、LW、开题、PPT)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人 八年开发经验&#xff0c;擅长Java、Python、PHP、.NET、Node.js、Android、微信小程序、爬虫、大数据、机器学习等&#xff0c;大家有这一块的问题可以一起交流&…

uni-app--》打造个性化壁纸预览应用平台(三)

&#x1f3d9;️作者简介&#xff1a;大家好&#xff0c;我是亦世凡华、渴望知识储备自己的一名前端工程师 &#x1f304;个人主页&#xff1a;亦世凡华、 &#x1f306;系列专栏&#xff1a;uni-app &#x1f307;座右铭&#xff1a;人生亦可燃烧&#xff0c;亦可腐败&#xf…

微服务注册中心都有哪些

在微服务架构中&#xff0c;注册中心扮演着至关重要的角色&#xff0c;用于服务的注册与发现。以下是一些常见的注册中心&#xff1a; Eureka&#xff1a; Eureka是Netflix开发的服务发现框架&#xff0c;后来贡献给了Spring Cloud。它主要用于AWS云&#xff0c;但也可以在其他…

【MySQL超详细安装步骤】Centos7安装MySQL8

文章目录 1.卸载2.修改yum源为阿里源2.1首先检查是否安装wget2.2 备份 yum 源文件2.3 下载阿里云yum源文件2.4 清理yum缓存 3.安装mysql源3.1 下载mysql源3.2 安装mysql源3.3 检查是否安装成功 4. 安装MySQL4.1 使用yum安装4.2 启动MySQL 5.配置防火墙5.1 开放3306端口 6.登录M…

服务器数据恢复—Raid磁盘阵列故障类型和常见故障原因

出于尽可能避免数据灾难的设计初衷&#xff0c;RAID解决了3个问题&#xff1a;容量问题、IO性能问题、存储安全(冗余)问题。从数据恢复的角度讨论RAID的存储安全问题。 常见的起到存储安全作用的RAID方案有RAID1、RAID5及其变形。基本设计思路是相似的&#xff1a;当部分数据异…

go切片的深入学习以及context库的使用

Go切片专项学习 go切片扩容机制 go1.18 之前&#xff1a; 1.如果期望容量大于当前容量的两倍就会使用期望容量&#xff1b; 2.如果当前切片的长度小于 1024 就会将容量翻倍&#xff1b; 3.如果当前切片的长度大于 1024 就会每次增加 25% 的容量&#xff0c;直到新容量大于期…

JavaScript - 对象编程之详解DOM对象

1. 文档对象模型&#xff08;DOM&#xff09; HTML DOM全称为HTML Document Object Model&#xff0c;专门适用于HTML/XHTML文档的对象模型。可以将HTML DOM理解为网页的API&#xff0c;将网页中的各种元素都看作一个对象&#xff0c;从而使网页中的元素也可以被计算机语言获取…

git的简单学习

&#xff08;这个模块本来是会用的&#xff0c;但是了解并不是那么深入&#xff0c;因此需要继续学习一下&#xff09; 1.下载安装 下载网址&#xff1a;https://git-scm.com/download/win/ 一直next就可以了。 2.检查 winr&#xff1a;cmd &#xff08;不建议&#xff09;…

零基础Opencv学习(四)

一、查找并绘制轮廓 /// 载入原始图&#xff0c;必须以二值图模式载入cv::Mat image cv::imread("E:/OpencvStudyTest/4.png", cv::ImreadModes::IMREAD_GRAYSCALE);cv::imshow("image", image);/// 初始化结果图cv::Mat dstImage cv::Mat::zeros(image.…

005:VTK世界坐标系中的相机和物体

VTK医学图像处理---世界坐标系中的相机和物体 左侧是成像结果 右侧是世界坐标系中的相机与被观察物体 简介 上图右侧的图像是模拟的世界坐标系和世界坐标系中相机以及被观察物体&#xff1b; 左侧是在右侧世界坐标系中相机…

黑神话:游戏的诞生

&#x1f6f0; 前言 近期&#xff0c;国产 3A 大作《黑神话&#xff1a;悟空》给我们带来了一波惊喜。相信各位或多或少都有所了解。看见如此激动人心的产品我们除了欣喜&#xff0c;也不禁让我们思考起来游戏是如何实现的&#xff1f;我们能否开发一款属于自己的游戏&#xff…

3D 场景模拟 2D 碰撞玩法的方案

目录 方法概述顶点到平面的垂直投影求解最小降维 OBB主成分分析&#xff08;PCA&#xff09;协方差矩阵求矩阵特征值Jacobi 方法 OBB 拉伸方法 对于类似《密特罗德 生存恐惧》和《暗影火炬城》这样 3D 场景&#xff0c;但玩法还是 2D 卷轴动作平台跳跃&#xff08;类银河恶魔城…

[项目][CMP][Central Cache]详细讲解

目录 1.设计&结构2.申请内存3.释放内存4.框架 1.设计&结构 Central Cache也是一个哈希桶结构&#xff0c;它的哈希桶的映射关系跟Thread Cache是一样的不同的是它的每个哈希桶位置挂的是SpanList链表结构(带头双向循环链表)&#xff0c;不过每个映射桶下面的span中的大…

链式栈、队列

1、链式栈&#xff1a; 声明&#xff1a; #ifndef _STACK_H #define _STACK_H #include<stdlib.h>typedef int DataType;typedef struct snode //节点 {DataType data;struct snode *pnext; }SNode_t;typedef struct stack //链表 {SNode_t *ptop;int clen; }St…

Patlibc———更快捷的更换libc

起初是为了简化做pwn题目时&#xff0c;来回更换libc的麻烦&#xff0c;为了简化命令&#xff0c;弄了一个小脚本&#xff0c;可以加入到/usr/local/bin中&#xff0c;当作一个快捷指令&#x1f522; 这个写在了tools库&#xff08;git clone https://github.com/CH13hh/tools…

C++利用jsoncpp库实现写入和读取json文件(含中文处理)

C利用jsoncpp库实现写入和读取json文件 1 jsoncpp常用类1.1 Json::Value1.2 Json::Reader1.3 Json::Writer 2 json文件3 写json文件3.1 linux存储结果3.2 windows存储结果 3 读json文件4 读json字符串参考文章 在C中使用跨平台的开源库JsonCpp&#xff0c;实现json的序列化和反…

【有啥问啥】大模型应用中的哈希链推理任务

大模型应用中的哈希链推理任务 随着人工智能技术的快速发展&#xff0c;尤其是大模型&#xff08;如GPT、BERT、Vision Transformer等&#xff09;的广泛应用&#xff0c;确保数据处理和模型推理的透明性与安全性变得愈发重要。哈希链推理任务作为一种技术手段&#xff0c;能够…

会员营销如何利用JSON发送短信

在当今这个数字化时代&#xff0c;企业间的竞争日益激烈&#xff0c;如何高效地触达并维护用户群体&#xff0c;提升用户粘性和忠诚度&#xff0c;成为了每个企业都必须面对的重要课题。在众多营销手段中&#xff0c;会员营销因其精准性和个性化而备受青睐。而在会员营销的策略…