Hudi系列7:使用SparkSQL操作Hudi

news2025/1/8 5:17:10

文章目录

  • 一. SparkSQL连接Hudi
    • 1.1 Hive配置
    • 1.2 SparkSQL连接Hudi
  • 二. 创建表
    • 2.1 常规的建表
    • 2.2 CTAS
  • 三. 插入数据
  • 四. 查询数据
  • 五. 更新数据
    • 5.1 普通
    • 5.2 MergeInto
  • 六. 删除数据
  • 七. Insert Overwrite
  • 参考:

一. SparkSQL连接Hudi

1.1 Hive配置

我们需要将Hive 的 metastore服务独立出来

-- 目前只指定一个节点,也可以只用zookeeper做个高可用
cd $HIVE_HOME/conf
vi hive-site.xml
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hp5:9083</value>
    </property>

然后启动hive metastore 服务

nohup hive --service metastore &
netstat -an | grep 9083

image.png

1.2 SparkSQL连接Hudi

# Spark 3.3
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

image.png

二. 创建表

创建表的时候有如下3个需要注意:

  1. 表类型
    Hudi的两种表类型,即写时复制(COW)和读时合并(MOR),都可以使用Spark SQL创建。在创建表时,可以使用type选项指定表的类型:type = 'cow’或type = ‘mor’。

  2. 分区表和非分区表
    用户可以在Spark SQL中创建分区表或非分区表。要创建分区表,需要使用partitioned by语句指定分区列以创建分区表。当没有使用create table命令进行分区的语句时,该表被认为是一个非分区表。

  3. Managed表和External表
    通常,Spark SQL支持两种表,即Managed表和External表。如果使用location语句或使用create external table显式地创建表来指定一个位置,则它是一个外部表,否则它被认为是一个托管表。你可以在这里关于外部vs托管表的信息。

2.1 常规的建表

语法:
创建表的时候需要指定路径,不指定路径创建到本地了,Spark启用的是集群,其它节点访问不到,会产生报错。

-- 创建数据库
create database spark_hudi;
use spark_hudi;


--  创建一个表,不指定参数
create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi
location '/user/hudi/hudi_cow_nonpcf_tbl';


-- 创建一个MOR的非分区表
-- preCombineField 预聚合列 当id相同的时候,保留ts更大的那一条
create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
)
location '/user/hudi/hudi_mor_tbl';

-- 创建一个预聚合分区的COW表
create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/user/hudi/hudi_cow_pt_tbl';

测试记录:
image.png

image.png

2.2 CTAS

代码:

-- CTAS: create a non-partitioned cow table without preCombineField
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
location '/user/hudi/hudi_ctas_cow_nonpcf_tbl'
as
select 1 as id, 'a1' as name, 10 as price;


create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
location '/user/hudi/hudi_ctas_cow_pt_tbl'
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;

测试记录:
虽然建表过程看到有报错,但是依旧是成功的
image.png

image.png

三. 插入数据

-- insert into non-partitioned table
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

-- insert dynamic partition
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

-- insert static partition
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

-- upsert mode for preCombineField-provided table
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001

-- bulk_insert mode for preCombineField-provided table
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001
1   a1_2    20.0    1002

测试记录:
虽然比insert hive_table快一些,但是感觉速度依旧不行
image.png

四. 查询数据

代码:

# 普通查询
 select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0;

# 基于时间线查询
create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/user/hudi/hudi_cow_pt_tbl';

insert into hudi_cow_pt_tbl select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl;

-- record id=1 changes `name`
insert into hudi_cow_pt_tbl select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl;

-- time travel based on first commit time, assume `20221118154519532`
select * from hudi_cow_pt_tbl timestamp as of '20221118154519532' where id = 1;
-- time travel based on different timestamp formats
select * from hudi_cow_pt_tbl timestamp as of '2022-11-18 15:45:19.532' where id = 1;
select * from hudi_cow_pt_tbl timestamp as of '2022-03-08' where id = 1;

image.png

五. 更新数据

5.1 普通

语法:

UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

代码:

update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl set ts = 1001 where name = 'a1';

image.png

5.2 MergeInto

语法:

MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

<merge_condition> =A equal bool condition 
<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

代码:

-- source table using hudi for testing merging into non-partitioned table
create table hudi_merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts')
location '/user/hudi/hudi_merge_source';
insert into hudi_merge_source values (1, "old_a1", 22.22, 900), (2, "old_a2", 33.33, 2000), (3, "old_a3", 44.44, 2000);


create table hudi_merge_source2 (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts')
location '/user/hudi/hudi_merge_source2';
insert into hudi_merge_source2 values (2, "new_a2", 22.22, 900), (3, "new_a3", 33.33, 2000), (4, "new_a4", 44.44, 2000);


merge into hudi_merge_source as target
using (
  select * from hudi_merge_source2
) source
on target.id = source.id
when matched then
 update set name = source.name, price = source.price, ts = source.ts
when not matched then
 insert (id, name, price, ts) values(source.id, source.name, source.price, source.ts);

测试记录:

spark-sql> 
         > create table hudi_merge_source (id int, name string, price double, ts bigint) using hudi
         > tblproperties (primaryKey = 'id', preCombineField = 'ts')
         > location '/user/hudi/hudi_merge_source';
22/11/25 11:33:55 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
22/11/25 11:33:55 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
22/11/25 11:33:58 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
Time taken: 3.65 seconds
spark-sql> insert into hudi_merge_source values (1, "old_a1", 22.22, 900), (2, "old_a2", 33.33, 2000), (3, "old_a3", 44.44, 2000);
00:27  WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://hp5:8020/user/hudi/hudi_merge_source.  Falling back to direct markers.
00:32  WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://hp5:8020/user/hudi/hudi_merge_source.  Falling back to direct markers.
Time taken: 25.452 seconds
spark-sql> create table hudi_merge_source2 (id int, name string, price double, ts bigint) using hudi
         > tblproperties (primaryKey = 'id', preCombineField = 'ts')
         > location '/user/hudi/hudi_merge_source2';
Time taken: 0.541 seconds
spark-sql> insert into hudi_merge_source2 values (2, "new_a2", 22.22, 900), (3, "new_a3", 33.33, 2000), (4, "new_a4", 44.44, 2000);
00:58  WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://hp5:8020/user/hudi/hudi_merge_source2.  Falling back to direct markers.
01:02  WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://hp5:8020/user/hudi/hudi_merge_source2.  Falling back to direct markers.
Time taken: 11.574 seconds
spark-sql> merge into hudi_merge_source as target
         > using (
         >   select * from hudi_merge_source2
         > ) source
         > on target.id = source.id
         > when matched then
         >  update set name = source.name, price = source.price, ts = source.ts
         > when not matched then
         >  insert (id, name, price, ts) values(source.id, source.name, source.price, source.ts);
01:18  WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://hp5:8020/user/hudi/hudi_merge_source.  Falling back to direct markers.
01:21  WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://hp5:8020/user/hudi/hudi_merge_source.  Falling back to direct markers.
Time taken: 14.218 seconds
spark-sql> 
spark-sql> 
         > select * from hudi_merge_source2 ;
20221125113448990       20221125113448990_0_0   id:3            e3dec8f3-1c73-42dd-b1fa-b8d0c01748f5-0_0-64-2460_20221125113448990.parquet      3new_a3   33.33   2000
20221125113448990       20221125113448990_0_1   id:2            e3dec8f3-1c73-42dd-b1fa-b8d0c01748f5-0_0-64-2460_20221125113448990.parquet      2new_a2   22.22   900
20221125113448990       20221125113448990_0_2   id:4            e3dec8f3-1c73-42dd-b1fa-b8d0c01748f5-0_0-64-2460_20221125113448990.parquet      4new_a4   44.44   2000
Time taken: 0.781 seconds, Fetched 3 row(s)
spark-sql> select * from hudi_merge_source;
20221125113508944       20221125113508944_0_0   id:3            8ac8139e-0e9c-41f3-8046-24bf1b99aa9d-0_0-111-3707_20221125113508944.parquet     3new_a3   33.33   2000
20221125113412110       20221125113412110_0_1   id:1            8ac8139e-0e9c-41f3-8046-24bf1b99aa9d-0_0-111-3707_20221125113508944.parquet     1old_a1   22.22   900
20221125113412110       20221125113412110_0_2   id:2            8ac8139e-0e9c-41f3-8046-24bf1b99aa9d-0_0-111-3707_20221125113508944.parquet     2old_a2   33.33   2000
20221125113508944       20221125113508944_0_3   id:4            8ac8139e-0e9c-41f3-8046-24bf1b99aa9d-0_0-111-3707_20221125113508944.parquet     4new_a4   44.44   2000
Time taken: 1.231 seconds, Fetched 4 row(s)
spark-sql> 

六. 删除数据

Apache Hudi支持两种类型的删除:
(1)软删除:保留记录键,只清除所有其他字段的值(软删除中为空的记录始终保存在存储中,而不会删除);
(2)硬删除:从表中物理删除记录的任何痕迹。详细信息请参见写入数据页面的删除部分。

Spark SQL目前只支持硬删除

语法:

DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]

代码:

delete from hudi_merge_source where id = 1;

七. Insert Overwrite

代码:

-- insert overwrite non-partitioned table
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;

-- insert overwrite partitioned table with dynamic partition
insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';

-- insert overwrite partitioned table with static partition
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

参考:

  1. https://hudi.apache.org/docs/quick-start-guide/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/160670.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Ubuntu18.04系统开启防火墙】

【Ubuntu18.04系统开启防火墙】1 查看防火墙状态2 开启防火墙3 关闭防火墙4 允许开启防火墙时&#xff0c;ssh连接和22端口许可4.1 允许tcp 22端口通过防火墙4.2 允许SSH服务4.3 防火墙规则重启4.4 验证端口号是否开启1 查看防火墙状态 sudo ufw status2 开启防火墙 sudo ufw…

如何重装windows10系统(超详细图文版)

目录1.&#xff08;制作装机盘&#xff09;准备好装机U盘2. (下载驱动软件&#xff09;(※这步很重要&#xff09;3.&#xff08;下载镜像&#xff09;准备好要安装的新操作系统镜像4.&#xff08;查询bios快捷键&#xff09;查询你的主板品牌&#xff0c;找到你主板品牌进入bi…

医用球囊和导管制造中的精确压力控制

摘要&#xff1a;在医用导管和球囊成型过程中对压力控制有非常严格要求&#xff0c;如高精度和宽量程的控制能力&#xff0c;需具备可编程、自动手动切换和外接压力传感器功能&#xff0c;还需具备可用于球囊泄漏、爆破和疲劳性能测试的多功能性。本文介绍了可满足这些要求的压…

连Pycharm都不知道怎么用,学什么Python?(doge))

python初始设置日常使用一、设置Python 解释器1.1 远程配置2、调整字体及其大小2.1 调整编辑器字体及其大小2.2 调整控制台的字体及其大小3、设置编码4、修改文件背景颜色5、设置Git 和Github5.1 配置git5.2 配置github5.3 下载仓库内容6 、新建.py文件时默认添加信息7、恢复代…

[ 环境搭建篇 ] 安装 java 环境并配置环境变量(附 JDK1.8 安装包)

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

Demo演示:ARM+FPGA主流嵌入式架构板卡-HDMI显示摄像画面

各位工程师小伙伴们&#xff0c;大家好&#xff0c;ARMFPGA 作为一种主流的嵌入式系统的处理架构。相对于单纯的的ARM开发或单纯的FPGA开发&#xff0c;ARM加FPGA能够带来功耗、性能、成本等组合优势。 米尔新推出的MYD-JX8MMXA7开发板基于ARMFPGA架构&#xff0c;集成i.MX 8M…

多线程理解之3

1.怎么解决多线程并发访问临界资源的产生的问题&#xff1f; 用锁 pthread_mutex_t mtx解决。 2.mtx锁的作用具体是什么&#xff1f; 先把临界资源锁起来&#xff0c;再把它打开&#xff0c;这样一来&#xff0c;多个执行流想要同时执行临界资源就不可以了&#xff0c;只能一个…

[C++]string的使用及模拟实现

&#x1f941;作者&#xff1a; 华丞臧 &#x1f4d5;​​​​专栏&#xff1a;【C】 各位读者老爷如果觉得博主写的不错&#xff0c;请诸位多多支持(点赞收藏关注)。如果有错误的地方&#xff0c;欢迎在评论区指出。 推荐一款刷题网站 &#x1f449;LeetCode 文章目录一、stri…

Android Compose——Paging3

Paging3效果视频简述HiltRetrofit访问接口网络实例PagingSourceViewModelView效果视频 简述 本Demo采用HiltRetrofitPaging3完成&#xff0c;主要为了演示paging3分页功能的使用&#xff0c;下列为Demo所需要的相关依赖 //retrofitimplementation com.squareup.retrofit2:retr…

多线程之内功精修

文章目录一、常见的锁策略&#xff08;一&#xff09;悲观锁和乐观锁&#xff08;二&#xff09;读写锁和互斥锁&#xff08;三&#xff09;重量级锁和轻量级锁&#xff08;四&#xff09;挂起等待锁和自旋锁&#xff08;五&#xff09;公平锁和非公平锁&#xff08;六&#xf…

nerdctl容器管理工具

nerdctl容器管理工具nerdctl简介nerdctl的两个版本安装nerdctl1.配置nerdctl自动补全2.将nerdctl设别名为dockernerdctl使用方法1、运行/计入容器2、容器管理3、镜像管理4、镜像构建nerdctl简介 k8s1.22版本及以上强制安装containerd,要求卸载Docker. 虽然Docker能干的事Conta…

嗨格式数据恢复的 10 种最佳替代方法

当您意识到自己删除了错误的文件时&#xff0c;您是否有过那种恐惧和无助的感觉&#xff1f;或者&#xff0c;也许您的计算机在一项重要任务到期的前一天死机了——您所有的辛勤工作突然消失了。 嗨格式数据恢复是一款流行的数据恢复软件应用程序&#xff0c;它为找回这些文件…

《人月神话》浅读一下吧(上)

1.焦油坑 1.什么是焦油坑 焦油坑是作者用来形容大型系统开发的一个概念。史前时代&#xff0c;恐龙、猛犸象、剑齿虎这些大型食肉动物碰到焦油坑也是没有办法挣脱的&#xff0c;而且越用力就越容易被沉入坑底。 而在项目中好像没有任何一个单独的问题会导致困难&#xff0c;每个…

Spring基础(一)

Spring基础&#xff08;一&#xff09;Spring是什么下载地址IOCAop导入对象创建Spring是什么 Spring是开源的J2EE应用程序框架&#xff0c;针对bean的生命周期进行管理的轻量级容器&#xff0c;其中轻量级是指jar包数量较少。 下载地址 https://repo.spring.io/ui/native/re…

堆与优先级队列

目录 一、堆 1、简介 2、堆的模拟实现 a、向下调整堆 b、向上调整堆 c、插入元素 d、删除堆的根结点 e、获得堆顶元素 二、优先级队列 1、简介 2、常用方法 3、Top-k问题 一、堆 1、简介 堆也是一种数据结构&#xff0c;将一组数据集合按照完全二叉树的方式存储…

C++ 算法进阶系列之从 Brute Force 到 KMP 字符串匹配算法的优化之路

1. 字符串匹配算法 所谓字符串匹配算法&#xff0c;简单地说就是在一个目标字符串中查找是否存在另一个模式字符串。如在字符串 ABCDEFG 中查找是否存在 EF 字符串。 可以把字符串 ABCDEFG 称为原始&#xff08;目标&#xff09;字符串&#xff0c;EF 称为子字符串或模式字符…

Docker搭建SonarQube服务 - Linux

Docker搭建SonarQube服务 - Linux 本文介绍如何在Linux服务器上使用docker简便并快速的搭建SonarQube服务。 参考文档&#xff1a; Prerequisites and Overview&#xff5c;SonarQube Docs Installing SonarQube from the Docker Image | SonarQube Docs 本文使用的镜像版本…

假期来临,Steam内容文件锁定怎么办?

忙忙碌碌又一年&#xff0c;春节假期终于进入倒计时了&#xff01;已经能想象到Steam将迎来一波玩家的狂欢。 不过小编想起不少Windows用户反映过的一个问题&#xff1a;Steam更新游戏时不断收到报错&#xff0c;提示内容文件锁定&#xff0c;怎么办&#xff1f; 为了不妨碍大…

研发与环境的那些事儿

文章目录影响开发效率的环境问题研发需要的环境环境的演变测试单体环境到多环境的演变单体环境上线流程多环境上线流程提供高效研发环境环境是开发工作的核心步骤之一&#xff0c;对研发的开发测试是有影响的。研发与环境之间的关系是非常重要的&#xff0c;研发环境的质量直接…

完美解决了报错:app.js:249 Uncaught TypeError: Cannot redefine property: $router

场景&#xff1a; 项目打包优化阶段&#xff0c;为了解决打包成功后&#xff0c;单文件体积过大的问题 &#xff0c;可以通过 webpack 的 externals 节点&#xff0c;来配置并加载外部的 CDN 资源 原因&#xff1a;报错的原因就是重新定义了$router&#xff0c;因为在项目中安装…