Hudi系列6:使用pyspark操作Hudi

news2025/1/20 19:12:25

文章目录

  • 前言
  • 一. pyspark连接hudi
  • 二. 创建表
  • 三. 插入数据
  • 四. 查询数据
  • 五. Time Travel查询
  • 六. 更新数据
  • 七. 增量查询
  • 八. 基于时间点查询
  • 九. 删除数据
    • 9.1 软删除
    • 9.2 硬删除
  • 十. 插入覆盖
  • 十一. Spark其它命令
    • 11.1 Alter Table
    • 11.2 Partition SQL Command
  • 参考:

前言

软件版本
Python3.8
Hadoop3.3.2
Spark3.3.1
Hudi0.12.0

Hudi官网demo提供了3种通过Spark操作Hudi的方法:
image.png

这里我们选择通过pyspark来操作

一. pyspark连接hudi

pyspark连接hudi:

# Spark 3.3
export PYSPARK_PYTHON=$(which python3)
pyspark \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

设置表名、基本路径和数据生成器:

# pyspark
tableName = "hudi_trips_cow"
basePath = "hdfs://hp5:8020/tmp/hudi_trips_cow"
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

image.png

二. 创建表

spark中不需要单独的create table命令。如果表不存在,第一批写入操作将创建该表。

三. 插入数据

生成一些新的trip,将它们加载到DataFrame中,并将DataFrame写入Hudi表中.

# pyspark
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}

df.write.format("hudi"). \
    options(**hudi_options). \
    mode("overwrite"). \
    save(basePath)

image.png

四. 查询数据

# pyspark
tripsSnapshotDF = spark. \
  read. \
  format("hudi"). \
  load(basePath)
# load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

image.png

五. Time Travel查询

Hudi从0.9.0开始支持Time Travel查询。目前支持三种查询时间格式,如下所示。

#pyspark
spark.read. \
  format("hudi"). \
  option("as.of.instant", "20210728141108"). \
  load(basePath)



# It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read. \
  format("hudi"). \
  option("as.of.instant", "2021-07-28"). \
  load(basePath)

image.png

六. 更新数据

这类似于插入新数据。使用数据生成器生成现有行程的更新,加载到DataFrame中,并将DataFrame写入hudi表。

# pyspark
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)

image.png

七. 增量查询

Hudi还提供了获取自给定提交时间戳以来更改的记录流的功能。这可以通过使用Hudi的增量查询来实现,并提供需要流化更改的开始时间。如果我们希望在给定的提交之后进行所有更改(通常是这样),则不需要指定endTime。

# pyspark
# reload data
spark. \
  read. \
  format("hudi"). \
  load(basePath). \
  createOrReplaceTempView("hudi_trips_snapshot")

commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
beginTime = commits[len(commits) - 2] # commit time we are interested in

# incrementally query data
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
}

tripsIncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

image.png

八. 基于时间点查询

# pyspark
beginTime = "000" # Represents all commits > this time.
endTime = commits[len(commits) - 2]

# query point in time data
point_in_time_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.end.instanttime': endTime,
  'hoodie.datasource.read.begin.instanttime': beginTime
}

tripsPointInTimeDF = spark.read.format("hudi"). \
  options(**point_in_time_read_options). \
  load(basePath)

tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

image.png

九. 删除数据

Apache Hudi支持两种类型的删除:
(1)软删除:保留记录键,只清除所有其他字段的值(软删除中为空的记录始终保存在存储中,而不会删除);
(2)硬删除:从表中物理删除记录的任何痕迹。详细信息请参见写入数据页面的删除部分。

9.1 软删除

# pyspark
from pyspark.sql.functions import lit
from functools import reduce

spark.read.format("hudi"). \
  load(basePath). \
  createOrReplaceTempView("hudi_trips_snapshot")
# fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
# fetch two records for soft deletes
soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)

# prepare the soft deletes by ensuring the appropriate fields are nullified
meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", \
  "_hoodie_partition_path", "_hoodie_file_name"]
excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
  list(map(lambda field: (field.name, field.dataType), soft_delete_ds.schema.fields))))

hudi_soft_delete_options = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.recordkey.field': 'uuid',
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'upsert',
  'hoodie.datasource.write.precombine.field': 'ts',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}

soft_delete_df = reduce(lambda df,col: df.withColumn(col[0], lit(None).cast(col[1])), \
  nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns, soft_delete_ds))

# simply upsert the table after setting these fields to null
soft_delete_df.write.format("hudi"). \
  options(**hudi_soft_delete_options). \
  mode("append"). \
  save(basePath)

# reload data
spark.read.format("hudi"). \
  load(basePath). \
  createOrReplaceTempView("hudi_trips_snapshot")

# This should return the same total count as before
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
# This should return (total - 2) count as two records are updated with nulls
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

image.png

9.2 硬删除

# pyspark
# fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
# fetch two records to be deleted
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

# issue deletes
hudi_hard_delete_options = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.recordkey.field': 'uuid',
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'delete',
  'hoodie.datasource.write.precombine.field': 'ts',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}

from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0))
hard_delete_df.write.format("hudi"). \
  options(**hudi_hard_delete_options). \
  mode("append"). \
  save(basePath)

# run the same read query as above.
roAfterDeleteViewDF = spark. \
  read. \
  format("hudi"). \
  load(basePath) 
roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
# fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

image.png

十. 插入覆盖

生成一些新的trip,覆盖输入中出现的所有分区。对于批处理ETL作业,此操作比upsert快,批处理ETL作业一次重新计算整个目标分区(与增量更新目标表相反)。这是因为,我们能够完全绕过索引、预合并和upsert写路径中的其他重分区步骤。

# pyspark
self.spark.read.format("hudi"). \
    load(basePath). \
    select(["uuid", "partitionpath"]). \
    sort(["partitionpath", "uuid"]). \
    show(n=100, truncate=False) 
    
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) 
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)). \
    filter("partitionpath = 'americas/united_states/san_francisco'")
hudi_insert_overwrite_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.operation': 'insert_overwrite',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}
df.write.format("hudi").options(**hudi_insert_overwrite_options).mode("append").save(basePath)
spark.read.format("hudi"). \
    load(basePath). \
    select(["uuid", "partitionpath"]). \
    sort(["partitionpath", "uuid"]). \
    show(n=100, truncate=False)

十一. Spark其它命令

11.1 Alter Table

语法:

-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')

案例:

--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;

--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

11.2 Partition SQL Command

语法:

-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )

-- Show Partitions
SHOW PARTITIONS tableIdentifier

案例:

--show partition:
show partitions hudi_cow_pt_tbl;

--drop partition:
alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');

参考:

  1. https://hudi.apache.org/docs/0.12.0/quick-start-guide

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/158084.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

低成本MEMS惯导系统的捷联惯导解算MATLAB仿真

低成本MEMS惯导系统的捷联惯导解算MATLAB仿真一、姿态角转换为四元数二、四元数转换为姿态角三、反对称阵四、位置更新五、姿态更新六、程序及数据主程序:子程序:数据及完整程序之前将高成本的捷联惯导忽略地球自转、圆锥曲线运动以及划桨运动等化简为可…

【学习笔记之Linux】工具之make/Makefile与git

make/Makefile: 背景知识: 一个工程中的源文件不计数,按类型、功能、模块分别放在若干个目录中,Makefile定义了一系列的规则来指定,哪些文件需要先编译,哪些文件需要后编译,那些文件需要重新编…

电源《龙珠超:超级人造人》观后感

上周看了动画电影《龙珠超:超级人造人》,《龙珠》这个系列同《火影》、《死神》、《海贼王》和《名侦探柯南》等都存在了很长时间,不断在更新,都是非常好的IP,伴随着很多人走过童年,也是因为时间太长了,记得…

品牌打假,假货治理,有什么好的方法

品牌打假,清除渠道假货,可以提高消费者对品牌的满意度与忠诚度,增强经销商的经销信心,维护稳定的价格体系及经销体系,树立良好的品牌形象。 但是品牌在打假的过程中,由于经验、时间、方法、技术等方面的局…

测试开发 | 接口测试之HTTP 协议讲解

本文节选自霍格沃兹测试开发学社内部教材HTTP 协议是一种用于分布式、协作式和超媒体信息系统的应用层协议。HTTP 是万维网的数据通信的基础。客户端向服务端发送 HTTP 请求,服务端则会在响应中返回所请求的数据。了解了 HTTP 协议,才能对接口测试进行更…

sql实现字段分割一行转多行的示例代码

先看一下数据结构,我这里字段比较少,只弄了最重要的部分 根据我们上次学到的LEFT()函数进行分组 SELECT LEFT(provinces,6),COUNT(1) FROM region_map_copy GROUP BY LEFT(provinces,6) 得到的结果如下: 这样的效果并不是我们想要的&#x…

必贝特科创板IPO过会:预计2025年前实现商业化,钱长庚为实控人

2023年1月10日,上海证券交易所披露的信息显示,广州必贝特医药股份有限公司(下称“必贝特”)获得上市委会议审核通过。据贝多财经了解,必贝特于2022年6月29日在科创板递交上市申请。 公开信息显示,必贝特是一…

SwiftUI之深入解析如何使用组合矩形GeometryReader创建条形(柱状)图

一、图表布局 条形(柱状)图以矩形条的形式呈现数据的类别,其宽度和高度与它们表示的值成比例。SwiftUI 对探索不同布局和预览实时视图结果是很友好的,很容易将部分内容提取到子视图中,以便每个部分都很小且易于维护。…

给程序提速 | 多进程与多线程

目录 一、背景 1.1、前言 1.2、说明 二、线程与进程 2.1、什么是进程 2.2、什么是线程 2.3、进程与线程的关系 2.4、多进程与多线程的最佳使用条件 2.5、线程与进程的锁 2.6、特别注意 三、第一个线程、线程池 3.1、线程测试 3.2、执行结果 3.3、线程池测试 3.4…

华中科技大学计算机组成原理-计算机数据表示实验(全部通关)

计算机数据表示实验(HUST) 计算机数据表示目录 [建议收藏]计算机数据表示实验(HUST)第1关 汉字国标码转区位码实验第2关 汉字机内码获取实验第3关 偶校验编码设计第4关 偶校验解码电路设计第5关 16位海明编码电路设计第6关 16位海明解码电路设计第7关 海明编码流水传输实验第8关…

Leetcode:700. 二叉搜索树中的搜索(C++)

目录 问题描述: 实现代码与解析: 递归: 原理思路: 迭代: 原理思路: 问题描述: 给定二叉搜索树(BST)的根节点 root 和一个整数值 val。 你需要在 BST 中找到节点值…

CHAPTER 4 Docker仓库

docker仓库4.1 Docker Hub公共镜像市场4.2 第三方镜像市场4.2.1 daocloud4.2.2 阿里云4.3 *搭建本地私有仓库仓库(Repository)是集中存放镜像的地方,又分公共仓库和私有仓库。有时候容易把仓库与注册服务器(Registory)…

逆向-还原代码之continue (Interl 64)

// source code #include <stdio.h> int main() { int i; for (i 0; i < 10; i) { if (i 5) continue; printf("%d\n", i); } }

那年我双手离桌,被《剑指offer》打的还不了手(第八天)

跟着博主一起刷题 这里使用的是题库&#xff1a; https://leetcode.cn/problem-list/xb9nqhhg/?page1 目录剑指 Offer 55 - II. 平衡二叉树剑指 Offer 56 - I. 数组中数字出现的次数剑指 Offer 56 - II. 数组中数字出现的次数 II剑指 Offer 55 - II. 平衡二叉树 剑指 Offer 55…

缓存一致性问题解决方案(超全超易懂)

文章目录1、缓存模型和思路2、缓存更新策略3、两种解决方案3.1、先删除缓存&#xff0c;再更新数据库3.1.1延时双删&#xff08;解决先删除缓存&#xff0c;再更新数据库产生的缓存不一致问题&#xff09;1、什么是延时双删2、为什么要进行延迟双删&#xff1f;3、如何实现延迟…

【 uniapp - 黑马优购 | 购物车页面(2)】如何实现收货地址区域功能、常见问题解决方案

个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名大二在校生&#xff0c;讨厌编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;小新爱学习. &#x1f43c;个人WeChat&#xff1a;见文末 &#x1f54a;️系列专栏&#xff1a;&#x1f5bc;…

JVM—类加载与字节码技术

目录一、类文件结构1、魔术2、版本3、常量池二、字节码指令1、javap工具2、图解方法执行流程3、通过字节码指令来分析问题4、构造方法5、方法调用6、多态原理——HSDB7、异常处理四、类加载阶段五、类加载器六、运行期优化一、类文件结构 以一个简单的HelloWord.java程序为例 …

聊聊VMware的三种网络模式

聊聊VMware的三种网络模式1.Bridged&#xff08;桥接模式&#xff09;2.NAT&#xff08;地址转换模式&#xff09;3.Host-Only&#xff08;仅主机模式&#xff09;VMware有三种虚拟网络工作方式&#xff0c;即&#xff1a; Briged&#xff08;桥接模式&#xff09;NAT&#xf…

实现内核线程

文章目录前言前置知识实验操作实验一实验二实验三前言 博客记录《操作系统真象还原》第九章实验的操作~ 实验环境&#xff1a;ubuntu18.04VMware &#xff0c; Bochs下载安装 实验内容&#xff1a; 在内核空间实现线程。实现双向链表。实现多线程在调度器的调度下轮流执行。…

【Nginx】Nginx配置实例-反向代理

1. 反向代理实例一 实现过程 1. 启动一个 tomcat&#xff0c;浏览器地址栏输入 127.0.0.1:8080&#xff0c;出现如下界面2. 通过修改本地 host 文件&#xff0c;将 www.123.com 映射到 127.0.0.13. 在 nginx.conf 配置文件中增加如下配置 2. 反向代理实例二 实现过程 1.准备两…