Hudi集成Spark(二)Spark SQL方式

news2025/1/12 12:02:34

文章目录

  • 环境准备
  • 创建表
  • 插入数据
  • 查询数据
  • 更新数据
  • 删除数据
  • 覆盖数据
  • 修改表结构(Alter Table)
  • 修改分区
  • 存储过程(Procedures)

Catalog:可以和Spark或者Flink中做一个共享,共享之后,计算引擎才可以去读取计算Hive引擎

环境准备

将如下配置内容放入hive-site.xml配置文件中

<!-- 指定存储元数据要连接的地址  -->
  <property>
   	<name>hive.metastore.uris</name>
	<value>thrift://hadoop102:9083</value>
  </property>
<!--Hive开启元数据-->
    <property>
        <name>datanucleus.schema.autoCreateAll</name>
        <value>true</value>
     </property>

并将hudi的jar包hudi-hive-sync-bundle-0.12.0.jar 放到hive的lib中,方便hive查询

cp /opt/software/hudi/hudi-0.12.0/packaging/hudi-hive-sync-bundle/targethudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib

启动 Hive 的 Metastore

[root@hadoop102 spark-3.2.2]# nohup hive --service metastore &
[1] 10796

查看进程,有如下情况,则成功!

[root@hadoop102 spark-3.2.2]# netstat -anp|grep 9083
tcp6       0      0 :::9083                 :::*                    LISTEN      10796/java 

启动 spark-sql

#针对 Spark 3.2
spark-sql \
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
 --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
 --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

如果没有配置 hive 环境变量,手动拷贝 hive-site.xml 到 spark 的 conf 下

在这里插入图片描述

创建表

1)建表参数

参数名默认值说明
primaryKeyuuid表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field
preCombineField表的预合并字段。同 hoodie.datasource.write.precombine.field
typecow创建的表类型: type = ‘cow’ type = 'mor’同 hoodie.datasource.write.table.type

2)创建非分区表
(1)创建一个 cow 表,默认 primaryKey ‘uuid’,不提供 preCombineField

create database spark_hudi;

use spark_hudi;

create table hudi_cow_nonpcf_tbl (
 uuid int,
 name string,
 price double
) using hudi;

(2)创建一个 mor 非分区表

create table hudi_mor_tbl (
 id int,
 name string,
 price double,
 ts bigint
) using hudi
tblproperties (
 type = 'mor',
 primaryKey = 'id',
 preCombineField = 'ts'
);

3)创建分区表
创建一个 cow 分区外部表,指定 primaryKey 和 preCombineField

此刻数据在hdfs上

create table hudi_cow_pt_tbl (
 id bigint,
 name string,
 ts bigint,
 dt string,
 hh string
) using hudi
tblproperties (
 type = 'cow',
 primaryKey = 'id',
 preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/opt/hudi/hudi_cow_pt_tbl';

4)在已有的 hudi 表上创建新表,不需要指定模式和非分区列(如果存在)之外的任何属性,Hudi 可以自动识别模式和配置。
(1)非分区表

create table hudi_existing_tbl0 using hudi
location 'file:///opt/datas/hudi/dataframe_hudi_nonpt_table';

(2)分区表

create table hudi_existing_tbl1 using hudi
partitioned by (dt, hh)
location 'file:///opt/datas/dataframe_hudi_pt_table';

5)通过 CTAS (Create Table As Select)建表为了提高向 hudi 表加载数据的性能,CTAS 使用批量插入作为写操作。
(1)通过 CTAS 创建 cow 非分区表,不指定 preCombineField

create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;

(2)通过 CTAS 创建 cow 分区表,指定 preCombineField

create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 
'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-
01' as dt;

(3)通过 CTAS 从其他表加载数据

# 创建内部表
create table parquet_mngd using parquet location 
'file:///opt/datas/parquet_dataset/*.parquet';
# 通过 CTAS 加载数据
create table hudi_ctas_cow_pt_tbl2 using hudi location 
'file://opt/datas/hudi/hudi_tbl/' options (
 type = 'cow',
 primaryKey = 'id',
 preCombineField = 'ts'
)
partitioned by (datestr) as select * from parquet_mngd;

插入数据

默认情况下,如果提供了 preCombineKey,则 insert into 的写操作类型为 upsert,否则使用 insert

1)向非分区表插入数据

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

2)向分区表动态分区插入数据

insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

3)向分区表静态分区插入数据

insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') 
select 2, 'a2', 1000;

4)使用 bulk_insert 插入数据
hudi 支持使用 bulk_insert 作为写操作的类型,只需要设置两个配置:hoodie.sql.bulk.insert.enable 和 hoodie.sql.insert.mode。

-- 向指定 preCombineKey 的表插入数据,则写操作为 upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
-- 向指定 preCombineKey 的表插入数据,指定写操作为 bulk_insert 
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
1 a1_2 20.0 1002

查询数据

1)查询

select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0;

2)时间旅行查询
Hudi 从 0.9.0 开始就支持时间旅行查询。Spark SQL 方式要求 Spark 版本 3.2 及以上。

-- 关闭前面开启的 bulk_insert
set hoodie.sql.bulk.insert.enable=false;
-- 数据写入到hdfs上
create table hudi_cow_pt_tbl1 (
 id bigint,
 name string,
 ts bigint,
 dt string,
 hh string
) using hudi
tblproperties (
 type = 'cow',
 primaryKey = 'id',
 preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/opt/datas/hudi/hudi_cow_pt_tbl1';
-- 插入一条 id 为 1 的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;
-- 修改 id 为 1 的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;
-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;
-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;

更新数据

1)update
更新操作需要指定 preCombineField。
(1)语法

UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

(2)执行更新

update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;
-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';

2)MergeInto
(1)语法

MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
<merge_condition> =A equal bool condition 
<matched_action> =
 DELETE |
 UPDATE SET * |
 UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action> =
 INSERT * |
 INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

(2)执行案例

执行前开启hive的hiveservice2

[root@hadoop102 bin]# ./hiveserver2 start
-- 1、准备 source 表:非分区的 hudi 表,插入数据
create table merge_source (id int, name string, price double, ts 
bigint) using hudi tblproperties (primaryKey = 'id', preCombineField = 'ts');

insert into merge_source values (1, "old_a1", 22.22, 2900), (2, 
"new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target using merge_source as source on target.id = source.id when matched then update set * when not matched then insert *;
-- 2、准备 source 表:分区的 parquet 表,插入数据
create table merge_source2 (id int, name string, flag string, dt 
string, hh string) using parquet;

insert into merge_source2 values (1, "new_a1", 'update', '2021-12-
09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, 
"new_a3", 'insert', '2021-12-09', '12');

merge into hudi_cow_pt_tbl1 as target
using (
 select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt 
= source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, 
source.ts, source.dt, source.hh);

mergeInto会发生的报错:

Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool


java.sql.SQLException: Could not open client transport with JDBC Uri: jdbc:hive2://localhost:10000: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.security.AccessControlException: Permission denied: user=hive, access=EXECUTE, inode="/tmp":root:supergroup:drwxrwx---

解决方案:https://blog.csdn.net/weixin_45417821/article/details/128651942

删除数据

1)语法

DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]

2)案例

delete from hudi_cow_nonpcf_tbl where uuid = 1;
delete from hudi_mor_tbl where id % 2 = 0;

-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';

覆盖数据

  • 使用 INSERT_OVERWRITE 类型的写操作覆盖分区表
  • 使用 INSERT_OVERWRITE_TABLE 类型的写操作插入覆盖非分区表或分区表(动态分区)

1)insert overwrite 非分区表

insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;

2)通过动态分区 insert overwrite table 到分区表

insert overwrite table hudi_cow_pt_tbl1 select 10, 'a10', 1100, '2021-12-09', '11';

3)通过静态分区 insert overwrite 分区表

insert overwrite hudi_cow_pt_tbl1 partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

修改表结构(Alter Table)

1)语法

-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName
-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')

2)案例

--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid int;
--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

修改分区

1)语法

-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )
-- Show Partitions
SHOW PARTITIONS tableIdentifier

2)案例

--show partition:
show partitions hudi_cow_pt_tbl1;
--drop partition:
alter table hudi_cow_pt_tbl1 drop partition (dt='2021-12-09', hh='10');

注意:show partition 结果是基于文件系统表路径的。删除整个分区数据或直接删除某个分区目录并不精确。

存储过程(Procedures)

1)语法

--Call procedure by positional arguments
CALL system.procedure_name(arg_1, arg_2, ... arg_n)
--Call procedure by named arguments
CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => 
arg_1, ... arg_name_n => arg_n)

2)案例

可用的存储过程:https://hudi.apache.org/docs/procedures/

--show commit's info
call show_commits(table => 'hudi_cow_pt_tbl1', limit => 10);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/156678.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

跨域与Nginx总结

跨源资源共享&#xff08;CORS&#xff09;是一种机制&#xff0c;他使浏览器可以访问由其他域提供的Web资源。通常&#xff0c;浏览器会使用同源策略来限制从脚本中发出的HTTP请求。例如&#xff0c;如果一个网站的资源位于https://haha.com&#xff0c;那么它就不能发出对htt…

使用ResNet50实现CIFAR10数据集的训练

如果对你有用的话&#xff0c;希望能够点赞支持一下&#xff0c;这样我就能有更多的动力更新更多的学习笔记了。&#x1f604;&#x1f604; 使用ResNet进行CIFAR-10数据集进行测试&#xff0c;这里使用的是将CIFAR-10数据集的分辨率扩大到32X32&#xff0c;因为算力相关的…

C语言常用内存函数的深度解析

文章目录前言memcpymemcpy函数的使用memcpy函数的自我实现memmovememmove函数的使用memmove函数的自我实现memcmpmemcmp函数的使用memcmp函数的自我实现memsetmemset函数的使用memset函数的自我实现写在最后前言 内存函数的使用广泛度大于常用字符串函数的使用广泛度&#xff0…

教程- VTK.js的基本介绍

VTK.js的核心是标准可视化工具包(VTK)库的JavaScript移植&#xff0c;这是一个c库&#xff0c;旨在促进数据可视化&#xff0c;在此基础上构建了科学可视化应用程序Paraview。VTK.js没有使用OpenGL&#xff0c;而是利用WebGL&#xff0c;主要关注几何和体渲染。因此&#xff0c…

JavaFx程序使用Gloun打包成Android平台App教程

0. 提要 !!! 适合有Maven基础&#xff0c;对JavaFx或JavaFX移动端感兴趣的朋友 提示必须在Linux环境下进行&#xff0c;可以使用虚拟机 推荐使用CentOS系统进行,虚拟机硬盘大小推荐最少给30G 不要像我一样,搞一半又去给文件系统根目录扩大容量 如果容量不够可以看篇博客: http…

C++模板(第二版)笔记之第十八章:模板的多态性

文章目录一、动态多态&#xff08;dynamic polymorphism&#xff09;二、静态多态三、静态多态VS动态多态1.术语2.优点和缺点3.结合两种多态形式&#xff1a;CRTP四、使用concepts五、新形势的设计模式六、泛型编程七、总结一、动态多态&#xff08;dynamic polymorphism&#…

【C语言】内存函数介绍

它们所在的头文件&#xff1a; &#xff08;这里出现的arr都为char类型数组&#xff09;strlen作用&#xff1a;计算一个字符串的长度本质&#xff1a;历经千辛找一个 \0 &#xff0c;找到 \0 就立马停止。&#xff08;就是找 \0 &#xff09;易错&#xff1a;strlen 返回值为 …

物联网无线通信技术中蓝牙和WIFI有哪些区别?

在物联网快速发展的时代&#xff0c;联网运行的设备越来越多&#xff0c;无线通信技术在物联网中发挥着举足轻重的作用&#xff0c;无线通信技术的发展改变了信息传输的方式&#xff0c;人们在任何时间、任何地点都可以访问设备&#xff0c;目前最常用的两种无线通信技术分别是…

云服务器CentOS前后端部署流程记录

部署流程记录 购买云服务ecs服务器&#xff0c;建立CentOS系统 通过xftpxshell访问远程服务 doker部署&#xff08;https://www.runoob.com/docker/centos-docker-install.html&#xff09; docker docker部署环境&#xff08;mysql&#xff09; docker常用命令 1. docker i…

【Linux】进程状态与优先级

文章目录进程状态概念Linux中的进程状态R(running)状态S(sleeping)状态D(disk sleep)状态T(stopped)状态t(tracing stop)状态X(dead)状态Z(zombie)状态特殊的孤儿进程进程优先级进程性质补充进程状态概念 《现代操作系统》中给出的进程状态的定义如下&#xff1a; 进程状态反映…

Qt+C++窗体界面中英文多语言切换

程序示例精选 QtC窗体界面中英文语言切换 如需安装运行环境或远程调试&#xff0c;见文章底部个人微信名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对<<QtC窗体界面中英文语言切换>>编写代码&#xff0c;代码整洁&#xff0c;规则&#x…

【Linux】软件包管理器 yum

目录 一、什么是软件包 二、如何进行软件安装 1、yum 的使用 2、yum 配置 一、什么是软件包 在Linux下安装软件&#xff0c;一个通常的办法是下载到程序的源代码&#xff0c;并进行编译&#xff0c;得到可执行程序。但是这样太麻烦了&#xff0c;于是有些人把一些常用的软…

InnoDB数据存储结构

InnoDB数据存储结构 本专栏学习内容来自尚硅谷宋红康老师的视频 有兴趣的小伙伴可以点击视频地址观看 1. 数据库的存储结构&#xff1a;页 索引结构给我们提供了高效的索引方式&#xff0c;不过索引信息以及数据记录都是保存在文件上的&#xff0c;确切来说是存储在页结构中。…

不讨论颜色的前提下,如何证明自己不是色盲?神奇的零知识证明

0x01 一个小故事 《阿里巴巴与四十大盗》中有这样一段小故事&#xff1a; 阿里巴巴会芝麻开门的咒语&#xff0c;强盗向他拷问打开山洞石门的咒语&#xff0c;他不想让人听到咒语&#xff0c;又要向强盗证明他知道这个咒语。 那应该怎么办呢&#xff1f; 便对强盗说&#xf…

基于KVM安装部署RHCOS操作系统

参考&#xff1a;Openshift 4.4 静态 IP 离线安装系列&#xff1a;初始安装 - 米开朗基杨 - 博客园 一、Openshift OCP集群安装架构示意图 RHCOS 的默认用户是 core 如果安装有问题会进入 emergency shell&#xff0c;检查网络、域名解析是否正常&#xff0c;如果正常一般是以…

重修JAVA

程序员的差距是在构思上&#xff1a;思想决定了深度&#xff0c;思想的精髓高深是很多人学不来的&#xff01; 每一门语言都有它的特点&#xff0c;有优势也有劣势&#xff0c; 所以不必拘泥于招式&#xff0c;掌握底层原理即可&#xff01; 每一们语言实际上都是一个“工具”&…

如何在您的香港主机帐户上注册多个域名

注册多个域名非常普遍。事实上&#xff0c;香港主机服务提供商鼓励这样做&#xff0c;因为它既有意义又是必要的。下面将介绍决定为什么您可能需要在香港主机上注册多个域名的几个因素。注册多个域名的原因是什么?方便多个项目如果香港主机帐户的所有者在网络上有多个不同域名…

优化vue项目后, 启动编译项目过程中 报 javaScript heap out of memory 错误 及 nodejs内存溢出

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 1、优化vue项目后&#xff0c;运行npm run serve 启动编译项目过程中 报 javaScript heap out of memory 错误 2、项目启动时&#xff0c;出现 nodejs 内存溢出错误 问题描述 提示&#xff1a;遇到…

分布式事务的背景和解决方案

在常用的关系型数据库&#xff0c;都是具备事务特性的。 那什么是事务呢&#xff1f;事务是数据库运行的一个逻辑工作单元&#xff0c;在这个工作单元内的一系列SQL命令具有原子性操作的特点&#xff0c;也就是说这一系列SQL指令要么全部执行成功&#xff0c;要么全部回滚不执…

经典算法之深度优先搜索(DFS)

&#x1f451;专栏内容&#xff1a;算法学习笔记⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;日拱一卒&#xff0c;功不唐捐。 目录一、前言二、基本概念1.简单介绍2. 官方概念三、动图分析四、模板框架五、例题分析组合问题题干描述&#xff1a;思路…