Iceberg与SparkSQL写操作整合

news2024/9/21 3:14:53

前言

spark操作iceberg之前先要配置spark catalogs,详情参考Iceberg与Spark整合环境配置。

有些操作需要在spark3中开启iceberg sql扩展。

Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API,在Spark版本中具有不同级别的支持:
在这里插入图片描述
Spark 3支持SQL INSERT INTO、MERGE INTO和INSERT OVERWRITE,以及新的DataFrameWriterV2 API来进行iceberg表的写操作,接下来我们进行详细讲解。

INSERT INTO

insert into是往iceberg表中插入新数据,主要有两种语法:

INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...

这两种语法和其它组件如hive等没有太多区别,比较容易掌握。

MERGE INTO

Iceberg "merge into"语法可以对表数据进行行级更新或删除,在Spark3.x版本之后支持,其原理是重写包含需要删除和更新行数据所在的data files。"merge into"可以使用一个查询结果数据来更新目标表的数据,其语法通过类似join关联方式,根据指定的匹配条件对匹配的行数据进行相应操作。

  1. 语法
MERGE INTO tbl t -- 目标表
USING (SELECT ...) s -- 数据源表,也就是用数据源表查出的数据来更新或删除目标表
ON t.id = s.id  -- 关联条件,类似join的on条件
WHEN MATCHED AND ... THEN DELETE -- 删除直接用delete命令
WHEN MATCHED AND ... THEN UPDATE SET ... --更新用upate set
WHEN MATCHED AND ... AND ... THEN UPDATE SET ... --多条件更新
WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...) --匹配不上向目标表插入数据
  1. 示例
  • 创建两张表a和b
create table  hadoop_prod.default.a (id int,name string,age int) using iceberg;
create table  hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg
  • 插入数据
insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20)
insert into hadoop_prod.default.b values (1,"zs",30,"delete"),(2,"李四",31,"update"),(4,"王五",32,"add")
  • 使用MERGE INTO 语法向目标表更新、删除、新增数据
    这里我们计划将b表与a表匹配id,如果b表中tp字段是"delete"那么a表中对应的id数据删除,如果b表中tp字段是"update",那么a表中对应的id数据其他字段进行更新,如果a表与b表id匹配不上,那么将b表中的数据插入到a表中,具体操作如下:
merge into hadoop_prod.default.a  t1  -- 目标表a
using (select id,name ,age,tp from hadoop_prod.default.b) t2 -- 数据源表b
on t1.id = t2.id -- 关联条件为id
when matched and t2.tp = 'delete' then delete -- 如果数据源表中tp字段为delete,则对目标表关联d对应的数据进行删除操作
when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age -- 如果数据源表tp字段为update,则对目标表关联id对应数据用数据源表中name和age更新目标表对应字段
when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age) -- 如果id关联不上,则直接把数据源表对应id这条数据插入到目标表中

注意:我们很多数据库都没有类似merge into的操作,为了便于初学者理解,每一行操作都有详细的注释。

  • 结果
    在这里插入图片描述
    id=1,可以匹配上,但数据源表tp为delete,因此会把目标表id=1对应的行删除;
    id=2,可以匹配上,但数据源表tp为update,因此会把目标表id=2对应的name和age用数据源表name和age进行更新;
    id=3,没有匹配上,需要把数据源表对应的这条数据插入到目标表,但是由于数据源中没有id=3的数据,因此没有插入数据,此时保留数据源表中id=3对应的数据;
    id=4,没有匹配上,需要把数据源表对应的这条数据插入到目标表;

注意更新数据时,在查询的数据中只能有一条匹配的数据更新到目标表,否则将报错。

INSERT OVERWRITE

"insert overwrite"可以覆盖Iceberg表中的数据,这种操作会将表中全部数据替换掉,建议如果有部分数据替换操作可以使用"merge into"操作。

对于Iceberg分区表使用"insert overwrite"操作时,有两种情况,第一种是“动态覆盖”,第二种是“静态覆盖”。

  1. 动态分区覆盖
    动态覆盖会全量将原有数据覆盖,并将新插入的数据根据Iceberg表分区规则自动分区,类似Hive中的动态分区。

  2. 静态分区覆盖
    静态覆盖需要在向Iceberg中插入数据时需要手动指定分区,如果当前Iceberg表存在这个分区,那么只有这个分区的数据会被覆盖,其他分区数据不受影响,如果Iceberg表不存在这个分区,那么相当于给Iceberg表增加了个一个分区。

  3. 示例

  • 创建三张表并插入数据
    创建test1分区表、test2普通表、test3普通表三张表,并插入数据,每张表字段相同,但是插入数据不同。
-- test1为分区表
create table  hadoop_prod.default.test1 (id int,name string,loc string)
using iceberg
partitioned by (loc);

-- 插入数据
insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai");
-- test2为普通无分区表
create table  hadoop_prod.default.test2 (id int,name string,loc string)
using iceberg;
-- 插入数据
insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan");
-- test3为普通无分区表
create table  hadoop_prod.default.test3 (id int,name string,loc string)
using iceberg;
-- 插入数据
insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou");
  • 使用insert overwrite 读取test3表中的数据覆盖到test2表中
-- 使用insert overwrite 读取test3 表中的数据覆盖到test2 普通表中
insert overwrite hadoop_prod.default.test2 select id,name,loc from  hadoop_prod.default.test3;
-- 查询test2表数据
select * from hadoop_prod.default.test2;

此时test2表中的结果如下:
在这里插入图片描述
说明此时insert overwrite操作是把test2表的数据全部删除,然后把test3表的所有数据插入到test2表。

  • 使用insert overwrite 读取test3表数据,动态分区方式覆盖到表test1
-- 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1
insert overwrite hadoop_prod.default.test1 select id,name,loc from  hadoop_prod.default.test3;
-- 查询 test1 表数据
select * from hadoop_prod.default.test1;

此时test1表中的数据如下:
在这里插入图片描述
说明此时insert overwrite操作是把test1表的数据全部删除,然后把test3表的所有数据插入到test1表,并且分区字段loc按照动态分区的方式进行分区。

  • 静态分区方式,将iceberg表test3的数据覆盖到Iceberg表test1中
    这里可以将test1表删除,然后重新创建,加载数据,也可以直接读取test3中的数据静态分区方式更新到test1。另外,使用insert overwrite 语法覆盖静态分区方式时,查询的语句中就不要再次写入分区列,否则会重复。
-- 删除表test1,重新创建表test1 分区表,并插入数据
drop table hadoop_prod.default.test1;
-- 重建test1分区表
create table  hadoop_prod.default.test1 (id int,name string,loc string) using iceberg partitioned by (loc);
-- 插入数据
insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai");
-- 查询test1表数据
select * from hadoop_prod.default.test1;

在这里插入图片描述

-- 注意:指定静态分区"jiangsu",静态分区下,就不要在查询 “loc" 列了,否则重复
insert overwrite hadoop_prod.default.test1 partition (loc = "jiangsu") select id,name from  hadoop_prod.default.test3;
-- 查询 test1 表数据
select * from hadoop_prod.default.test1;

此时test1表的数据如下:
在这里插入图片描述
我们可以看到test1表原来没有jiangsu分区,采用静态分区指定jiangsu分区的时候,并不影响非jiangsu的数据,只是从test3中读取所有数据,并存放到loc=jiangsu这个分区目录下。

注意:使用insert overwrite 读取test3表数据 静态分区方式覆盖到表 test1,表中其他分区数据不受影响,只会覆盖指定的静态分区数据。

至此,我相信我们已经完全掌握了merge into的用法。

DELETE FROM

Spark3.x版本之后支持"Delete from"可以根据指定的where条件来删除表中数据。如果where条件匹配Iceberg表一个分区的数据,Iceberg仅会修改元数据,如果where条件匹配的表的单个行,则Iceberg会只重写受影响行所在的data files。

-- 创建表 delete_tbl ,并加载数据
create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg;
insert into hadoop_prod.default.delete_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23);
-- 根据条件范围删除表 delete_tbl 中的数据
delete from hadoop_prod.default.delete_tbl where id >3 and id <6;
-- 查询数据
select * from hadoop_prod.default.delete_tbl;

删除了id大于3和小于6之间的所有数据:
在这里插入图片描述

-- 根据条件删除表 delete_tbl 中的一条数据
delete from hadoop_prod.default.delete_tbl where id = 2;
-- 查询数据
select * from hadoop_prod.default.delete_tbl;

删除了id=2的数据:
在这里插入图片描述

删除操作和其它数据库完全一样,操作很简单,但是得理解底层删除数据的原理。

UPDATE

Spark3.x+版本支持了update更新数据操作,可以根据匹配的条件进行数据更新操作。

-- 创建表 update_tbl ,并加载数据
create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg;
-- 插入数据
insert into hadoop_prod.default.update_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23);

insert into hadoop_prod.default.update_tbl values (1,“zs”,18),(2,“ls”,19),(3,“ww”,20),(4,“ml”,21),(5,“tq”,22),(6,“gb”,23),操作如下:

-- 更新 delete_tbl 表
update hadoop_prod.default.update_tbl set name = 'zhangsan' ,age = 30 where id <=3;
-- 查询数据
select * from hadoop_prod.default.update_tbl;

把id小于等于3的,name全部改成zhangshan,age全部改成30:
在这里插入图片描述
update操作和其它数据库一模一样,非常简单。

注意:UPDATE 更加专注于单一记录的修改,而 MERGE INTO 则是一个更全面的操作,可以同时处理多个数据状态的变化。因此一些复杂的操作直接用MERGE INTO,比如:

  • 同步外部数据源:如果你有一个外部数据库系统,你可能希望定期将更改(包括插入、更新和删除)同步到你的数据湖中的表。MERGE INTO 可以用来比较两个表,并根据匹配条件执行更新,对于没有匹配记录的新数据则执行插入。
  • 数据集成:当需要合并多个来源的数据到一个目标表中时,MERGE INTO 可以有效地处理这种情况。它可以检查数据是否已经存在,并决定是更新还是添加新的记录。
  • 高效的数据处理:在处理大量数据时,MERGE INTO 可以减少数据处理的时间,因为它只需要一次操作就可以完成更新和插入。

参考文献

Spark Write
https://bbs.huaweicloud.com/blogs/364273

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2119170.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

12. GIS地图制图工程师岗位职责、技术要求和常见面试题

本系列文章目录&#xff1a; 1. GIS开发工程师岗位职责、技术要求和常见面试题 2. GIS数据工程师岗位职责、技术要求和常见面试题 3. GIS后端工程师岗位职责、技术要求和常见面试题 4. GIS前端工程师岗位职责、技术要求和常见面试题 5. GIS工程师岗位职责、技术要求和常见面试…

Mac强制删除文件,碰上“拖拽到废纸篓”无法删除时怎么办?

我们都特别喜欢Mac&#xff0c;不仅是因为它漂亮的外观&#xff0c;还有它的运行顺畅、界面友好。然而&#xff0c;就像所有技术产品一样&#xff0c;有时它也会让我们头疼——比如&#xff0c;当某个文件无论如何都删不掉时。你可能遇到过这样的情况&#xff1a;尝试删除一个文…

亿道三防AI加固平板电脑首亮相,工业级AI PC开启行业新纪元!

8月28日至30日&#xff0c;亿道三防在第22届国际物联网展深圳站上隆重发布了多款AI加固平板电脑和户外三防新品&#xff0c;首次亮相便赢得了现场观众的热烈好评。此外&#xff0c;还有三防平板电脑、工业平板电脑、车载平板电脑以及防爆平板等众多行业类明星产品也悉数登场&am…

【Leetcode算法面试题】-1. 两数之和

文章目录 算法练习题目思路参考答案算法1算法2算法3 算法练习 面试经常会遇到算法题目&#xff0c;今天开启算法专栏&#xff0c;常用算法解析 题目 ** 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&…

【服务器第一期】Xshell、Xftp下载及连接

服务器环境配置 1 Xshell 和 Xftp 的下载安装与使用2 连接服务器2.1. Xshell 连接服务器2.2 文件传输 参考 1 Xshell 和 Xftp 的下载安装与使用 进入 Xshell 下载页面&#xff0c;点击下载 官网-XSHELL-NetSarang Website 选择免费授权页面&#xff1a; 直接下载即可。 PS&…

目标检测从入门到精通——常见iou及变体算法介绍

目标检测中的 IoU 算法及其变体 绪论 在计算机视觉领域&#xff0c;目标检测是一个重要的研究方向&#xff0c;广泛应用于自动驾驶、安防监控、图像搜索等多个场景。为了评估目标检测模型的性能&#xff0c;Intersection over Union&#xff08;IoU&#xff09;作为一种常用的…

SpringBoot OAuth2自定义登陆/授权页

背景 5 月份的时候&#xff0c;我实践并整理了一篇博客&#xff1a;SpringBoot搭建OAuth2&#xff0c;该博客完成之后&#xff0c;很长一段时间里我都有种意犹未尽的感觉。诚然&#xff0c;我把OAuth2搭起来了&#xff0c;各种场景的用例也跑通了&#xff0c;甚至源码也看了&am…

HTTP请求⽅法

HTTP请求⽅法 1. GET &#xff1a;申请获取资源&#xff0c;不对服务器产⽣影响 2. POST &#xff1a; POST 请求通常⽤于发送数据&#xff0c;例如提交表单数据、上传⽂件等&#xff0c;会影响服务器&#xff0c;服务器可能动态创建新的资源或更新原有资源。 3. HEAD &#…

GPU 计算 CMPS224 2021 学习笔记 02

并行类型 &#xff08;1&#xff09;任务并行 &#xff08;2&#xff09;数据并行 CPU & GPU CPU和GPU拥有相互独立的内存空间&#xff0c;需要在两者之间相互传输数据。 &#xff08;1&#xff09;分配GPU内存 &#xff08;2&#xff09;将CPU上的数据复制到GPU上 &…

UE4_后期处理_后期处理材质四—场景物体描边

一、效果如下图&#xff1a; 二、分析&#xff1a; 回顾复习&#xff1a;在后期处理材质三中&#xff0c;我们通过计算开启自定义深度通道物体的像素点上下左右4个像素SceneTextureCustomDepth深度之和来判断物体的外部&#xff08;包含物体的边&#xff09;和内部&#xff0c…

【漏洞利用】2018年-2024年HVV 6000+个漏洞 POC 合集分享

此份poc 集成了Zabbix、用友、通达、Wordpress、Thinkcmf、Weblogic、Tomcat等 下载链接: 链接: https://pan.quark.cn/s/1cd7d8607b8a

Java小白一文讲清Java中集合相关的知识点(七)

LinkedHashSet LinkedHashSet是HashSet的子类 LinkedHashSet底层是一个LinkedHashMap,底层维护了一个数组双向链表 而在之前讲的HashSet中的链表是单向的哈&#xff0c;注意区分&#xff01; LinkedHashSet根据元素的hashcode值来决定元素的存储位置&#xff0c;同时使用链表…

从搜索热度上看Arcgis的衰退

Arcgis已被qgis快速赶上 google trends是一个google综合了每日的搜索情况的统计网站&#xff0c;可以追踪从2004年开始各个关键字的搜索热度。 我用arcgis和qgis作为对比&#xff0c;简单探索了arcgis和qgis的全球相关热度。 假设&#xff0c;搜索arcgis越高的区域&#xff…

机器学习 第8章 集成学习

目录 个体与集成BoostingBagging与随机森林Bagging随机森林 结合策略平均法投票法学习法 个体与集成 定义&#xff1a;集成学习&#xff0c;也叫多分类器系统、基于委员会的学习等&#xff0c;它是一种通过结合多个学习器来构建一个更强大的学习器的技术。如下图所示 在这里&a…

轨道交通系统详解,以及地铁如何精准停靠站台

ATC系统 全称“自动列车控制系统”&#xff0c;Automatic Train Control&#xff0c;ATC ATC是地铁运行的核心系统&#xff0c;它包括列车自动防护&#xff08;ATP&#xff09;、列车自动运行&#xff08;ATO&#xff09;和列车自动监控&#xff08;ATS&#xff09;三个子系统。…

嵌入式day41

哈希表 将要存储的数据的关键字和位置建立对应的关系&#xff0c;通过哈希函数&#xff08;散列函数&#xff09;将数据映射到存储的位置&#xff0c;方便快速查找 哈希冲突/哈希矛盾&#xff1a; key1 ! key2 f(key1) f(key2) 解决方法&#xff1a; 链地址法 算法 解决…

都2024年了还不明白Redis持久化?RDB文件、AOF文件、AOF重写

都2024年了&#xff0c;不会还有人不知道redis的RDB和Aof吧&#xff1f;不知道没关系&#xff0c;看完这篇文章我相信你就会有个大概的了解和认识了 1. Redis持久化 1.1 持久化概念 Redis本身是一个基于内存的数据库&#xff0c;它提供了RDB持久化、AOF持久化两种方式&#…

黑神话,XSKY 星飞全闪单卷性能突破310万

当下&#xff0c;云计算仍然是企业主要的基础架构&#xff0c;随着关键业务的逐步虚拟化和云化&#xff0c;对于块存储的性能要求也日益提高。企业对于低延迟、高稳定性的存储解决方案的需求日益迫切。为了满足这些日益增长的 IO 密集型应用场景&#xff0c;众多云服务提供商正…

大奖收割机!望繁信科技荣获年度技术创新和应用品牌奖

2023年8月14日&#xff0c;第七届GAIR全球人工智能与机器人大会在新加坡如期举行。 会上公布了「GAIR 2023 GPT Pioneer 5」榜单&#xff0c;望繁信科技凭借完全自主研发的流程智能平台&#xff0c;以及一系列在头部企业中的成功实践案例&#xff0c;与百度智能云、阿里云、知…

vector 容器基础操作及用法

目录 vector 容器基础操作及用法 一&#xff1a;定义及初始化 二&#xff1a;添加数据 三&#xff1a;删除数据 vector 容器基础操作及用法 CSTL是一个非常强大的容器库&#xff0c;其中 vector 是最为常用也较为方便的容器之一&#xff0c;下面主要介绍一下 vector 的一些…