插入数据
查询数据
更新数据
删除数据
覆盖数据
修改表结构
修改分区
插入数据
默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。
向非分区表插入数据
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
向分区表动态分区插入数据
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
向分区表静态分区插入数据
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;
使用bulk_insert插入数据
非严格模式的开启:
set hoodie.sql.insert.mode=non-strict;
bulk_insert的开启
set hoodie.sql.bulk.insert.enable=true;
插入数据:
insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1 a1_2 20.0 1002
查询数据
普通查询
select fare, begin_lon, begin_lat, ts from hudi_tb where fare > 20.0;
时间旅行查询
Hudi从0.9.0开始就支持时间旅行查询。Spark SQL方式要求Spark版本 3.2及以上。
-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;
create table hudi_cow_pt_tbl1 (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';
-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;
-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;
-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;
-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;
更新数据
update语法 :
更新操作需要指定preCombineField
update 表名 set 字段 = 值 where 过滤条件
执行更新:
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
MergeInto语法
启动hiveserver2
hiveserver2 &
与join语法类似需要指定一个主表和一个源数据表然后通过匹配来执行修改表数据的语法
-- 准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);
merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert * ;
-- 准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string,hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');
merge into hudi_cow_pt_tbl1 as target
using (
select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh);
删除数据
语法
DELETE FROM 表名 where 过滤条件;
delete from hudi_cow_nonpcf_tbl where uuid = 1;
覆盖数据
- 使用INSERT_OVERWRITE类型的写操作覆盖分区表
- 使用INSERT_OVERWRITE_TABLE类型的写操作插入覆盖非分区表或分区表(动态分区)
insert overwrite 非分区表
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
通过动态分区insert overwrite table到分区表
会根据最后的一个字段作为分区字段去匹配
insert overwrite table hudi_cow_pt_tbl1 select 10, 'a10', 1100, '2021-12-09';
通过静态分区insert overwrite 分区表
insert overwrite hudi_cow_pt_tbl1 partition(dt = '2021-12-09') select 13, 'a13', 1100;
修改表结构
修改表名
ALTER TABLE oldTableName RENAME TO newTableName
添加字段
ALTER TABLE 表名 ADD COLUMNS(字段名 类型)
改变列
ALTER TABLE 表名 CHANGE COLUMN 旧列名 新列名 类型
修改参数属性
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
修改分区
show partition结果是基于文件系统表路径的。删除整个分区数据或直接删除某个分区目录并不精确。
查看表分区
show partitions 表名;
删除分区
alter table 表名 drop partition (dt='2021-12-09');