Hudi(6):Hudi集成Spark之spark-shell 方式

news2024/11/16 2:33:22

目录

0. 相关文章链接

1. 启动 spark-shell

2. 插入数据

3. 查询数据

3.1. 转换成DF

3.2. 查询

3.3. 时间旅行查询

4. 更新数据

5. 增量查询

5.1. 重新加载数据

5.2. 获取指定beginTime

5.3. 创建增量查询的表

5.4. 查询增量表

6. 指定时间点查询

7. 删除数据

8. 覆盖数据


0. 相关文章链接

 Hudi文章汇总 

1. 启动 spark-shell

  • 启动命令:
#针对Spark 3.2
spark-shell \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
  • 设置表名,基本路径和数据生成器(不需要单独的建表。如果表不存在,第一批写表将创建该表):
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

2. 插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。

数据文件的命名规则,源码如下:

3. 查询数据

3.1. 转换成DF

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

注意:该表有三级分区(区域/国家/城市),在0.9.0版本以前的hudi,在load中的路径需要按照分区目录拼接"*",如:load(basePath + "/*/*/*/*"),当前版本不需要。

3.2. 查询

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

3.3. 时间旅行查询

Hudi从0.9.0开始就支持时间旅行查询。目前支持三种查询时间格式,如下所示。

spark.read.
  format("hudi").
  option("as.of.instant", "20210728141108100").
  load(basePath)

spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28 14:11:08.200").
  load(basePath)

// 表示 "as.of.instant = 2021-07-28 00:00:00"
spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28").
  load(basePath)

4. 更新数据

类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中。

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

        注意:保存模式现在是Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的行程数据。每个写操作都会生成一个用时间戳表示的新提交。查找以前提交中相同的_hoodie_record_keys在该表的_hoodie_commit_time、rider、driver字段中的变化。

查询更新后的数据,要重新加载该hudi表:

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

5. 增量查询

        Hudi还提供了增量查询的方式,可以获取从给定提交时间戳以来更改的数据流。需要指定增量查询的beginTime,选择性指定endTime。如果我们希望在给定提交之后进行所有更改,则不需要指定endTime(这是常见的情况)。

5.1. 重新加载数据

spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")

5.2. 获取指定beginTime

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) 

5.3. 创建增量查询的表

val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

5.4. 查询增量表

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

这将过滤出beginTime之后提交且fare>20的数据。

利用增量查询,我们能在批处理数据上创建streaming pipelines。

6. 指定时间点查询

查询特定时间点的数据,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)

  • 指定beginTime和endTime
val beginTime = "000" 
val endTime = commits(commits.length - 2) 
  • 根据指定时间创建表
val tripsPointInTimeDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  option(END_INSTANTTIME_OPT_KEY, endTime).
  load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
  • 查询
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

7. 删除数据

根据传入的HoodieKeys来删除(uuid + partitionpath),只有append模式,才支持删除功能。

  • 获取总行数
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
  • 取其中2条用来删除
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
  • 将待删除的2条数据构建DF
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
  • 执行删除
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)
  • 统计删除数据后的行数,验证删除是否成功
val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")

// 返回的总行数应该比原来少2行
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

8. 覆盖数据

        对于表或分区来说,如果大部分记录在每个周期都发生变化,那么做upsert或merge的效率就很低。我们希望类似hive的 "insert overwrite "操作,以忽略现有数据,只用提供的新数据创建一个提交。

        也可以用于某些操作任务,如修复指定的问题分区。我们可以用源文件中的记录对该分区进行'插入覆盖'。对于某些数据源来说,这比还原和重放要快得多。

        Insert overwrite操作可能比批量ETL作业的upsert更快,批量ETL作业是每一批次都要重新计算整个目标分区(包括索引、预组合和其他重分区步骤)。

  • 查看当前表的key
spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)
  • 生成一些新的行程数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
  read.json(spark.sparkContext.parallelize(inserts, 2)).
  filter("partitionpath = 'americas/united_states/san_francisco'")
  • 覆盖指定分区
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION.key(),"insert_overwrite").
  option(PRECOMBINE_FIELD.key(), "ts").
  option(RECORDKEY_FIELD.key(), "uuid").
  option(PARTITIONPATH_FIELD.key(), "partitionpath").
  option(TBL_NAME.key(), tableName).
  mode(Append).
  save(basePath)
  • 查询覆盖后的key,发生了变化
spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)

注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/132663.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python学习基础笔记六十二——反射2

1、 isinstanace(obj, cls) # 检查是否obj是否是类cls的对象: class Foo(object):passobj Foo()print(isinstance(obj, Foo)) 结果返回:True。 issubclass(sub, super) # 检查sub类是否是 super 类的派生类 class Foo(object):passclass Bar(F…

python详解(5)——类,类,还是类

目录 🏆一、前言 🏆二、类 🚩1、面向对象到底是什么 🚩2、数据成员and访问,汉堡店大升级(超难) 👍①、类变量(超难) 👍②、实例变量 &#x1f6a9…

A Latent Transformer for Disentangled Face Editing in Images and Videos翻译

点击下载论文 代码地址 图1 我们将真实图像投影到StyleGAN生成器的潜空间,并在编码的潜码上实现连续的解纠缠属性编辑。从原始图像和投影图像中,我们可以连续编辑一系列属性,例如:“微笑”、“刘海”、“拱形眉毛”、“年龄”、…

分布式对象存储设计原理

保存像图片、音视频这类大文件就是对象存储。不仅有很好的大文件读写性能,还可通过水平扩展实现近乎无限容量,并兼顾服务高可用、数据高可靠。 对象存储“全能”,主要因,对象存储是原生分布式存储系统,相对于MySQL、R…

[Linux]yum安装工具和vim编辑器

🥁作者: 华丞臧. 📕​​​​专栏:【LINUX】 各位读者老爷如果觉得博主写的不错,请诸位多多支持(点赞收藏关注)。如果有错误的地方,欢迎在评论区指出。 推荐一款刷题网站 👉 LeetCode刷题网站 文…

每日一问-ChapGPT-20230101-关于新年的规划

文章目录每日一问-ChapGPT系列起因每日一问-ChapGPT-20230101-关于新年的规划优秀的人,新年之初做哪些规划疫情时代,如何更好的保护好自己有哪些运动可以提升抵抗力冥想的具体实现步骤为什么制定了年度规划但往往完成不了如何克服看手机刷视频的习惯当日…

聊聊八卦,当年的顶流明星事件是如何把公司的缓存架构“击垮”的?

V-xin:ruyuan0330 获得600页原创精品文章汇总PDF 目录 一、为什么要用缓存集群二、20万用户同时访问一个热点缓存的问题三、基于流式计算技术的缓存热点自动发现四、动加载为JVM本地缓存五、限流熔断保护六、总结 一、为什么要用缓存集群 这篇文章,咱…

数值优化之基本概念

本文ppt来自深蓝学院《机器人中的数值优化》 目录 1 推荐书单 2 优化问题的基本范式 3 数值优化在机器人领域的应用 1 推荐书单 对于英语阅读有困难的同学可以看第一本书,对于最优化的介绍也是比较详细的。 这是第一本书的ppt链接最优化:建模、算法…

【LDF】线性判别函数(三)

松弛方法 学习准则 在感知函数准则中, 目标函数中采用了 −aTy-\mathbf{a}^T \mathbf{y}−aTy 的形式。实际上有很多其它准则也可以用于感知函数的学习。线性准则 Jp(a)∑y∈Y(−aTy)J_p(\mathbf{a})\sum_{\mathbf{y} \in Y}\left(-\mathbf{a}^T \mathbf{y}\right) Jp​(a)y∈…

MP中定义全局常量用于xml的判断

1.普通方式 mybatis-plus.configuration.variables.secretFilterSwitch0 yml的方式 mybatis: mapper-locations: classpath:mapper/*.xml type-aliases-package: com.demo configuration: variables: userId: 456132465 userName: 李四 配置完成后在代码…

NLP论文RoFormer(含源码)中文解读:具有旋转式位置嵌入的增强型transformer模型(一场相对革命)

目录 1、论文与源码2、摘要介绍3、展开解读3.1、匹配或超过了目前可用于将位置信息注入变换器的所有其他方法3.2、模型思想3.3、公式推导3.4、源码解释GPT-NeoX(PyTorch)网状变压器 JAX (JAX)4、 对比实验参考文献1、论文与源码 RoFormer匹配或超过了目前可用于将位置信息注…

分享76个PHP源码,总有一款适合您

链接:https://pan.baidu.com/s/1dC6_-CLs_qSyNnKEmno0Pg?pwd6666 提取码:6666 下面是文件的名字,我放了一些图片,文章里不是所有的图主要是放不下...,大家下载后可以看到。 UTForum社区论坛 v2.5 响应式健身房信息展…

小波分析—— 3. 实现一个简单的Haar小波

由于小波在应用形式上与卷积很相似,所以如果你有需要,可以查看我以前写过的内容: 信号采样基本概念 —— 冲激函数卷积计算——1. 关于卷积的基本概念卷积计算——2. 一些常用于图像的卷积核与应用 另外常见的信号处理工具,傅里…

Unity脚本(三)

视频教程:https://www.bilibili.com/video/BV12s411g7gU?p128 目录 Time Prefab Animation Time Time.time:自应用程序启动以来,每帧的开始时间(只读) Time.deltaTime:每帧间隔,或说完…

SpringData

文档:D:\springdata SpringData是一个用来简化dao层开发的框架.在保证了各个底层存储特性同时,提供了一套统一的数据访问API.它可以很好的支持常用的关系型数据库和非关系型数据库. 使用SpringData做为dao层开发技术,将大大简化代码,而且其API比各个技…

【力扣周赛#326】6279.数组乘积中的不同质因数数目+6196.将字符串分割成值不超过K的子字符串+6280.范围内最接近的两个质数

目录 6278.统计能整除数字的位数 - 简单ac 6279.数组乘积中的不同质因数数目 - 质因数 6196.将字符串分割成值不超过K的子字符串 - 贪心 6280.范围内最接近的两个质数 - 质数筛 贪心 6278.统计能整除数字的位数 - 简单ac 6278. 统计能整除数字的位数 class Solution {pu…

aws codebuild 配置codecommit更新触发和squid正向代理

本文主要讨论如何通过监听codecommit仓库自动触发codebuild的构建,以及为codebuild配置正向代理 通过codecommit更新触发codebuild codecommit触发器相关 每个codecommit最多配置10个触发器 sns触发器 为sns创建lambda函数订阅,在lambda日志中查看s…

一个例题,了解包装类

下列代码输出什么,为什么? public class Test3 {public static void main(String[] args) {Integer a1 100;Integer a2 Integer.valueOf(100);Integer a3 new Integer(100);System.out.println(a1 a2);System.out.println(a1 a3);System.out.println(a1.equals(a3));Sys…

分离编译、类型萃取、变参模板

分离编译 一个程序由若干个源文件共同实现,每个源文件单独编译生成目标文件,最后将所有的目标文件链接起来形成单一可执行文件的过程称之为分离编译模式。模板不支持分离编译 编译器报的这种错误属于链接性错误,也就是当程序预处理、编译、汇…

Sutherland–Hodgman 算法介绍(简单易懂)

目录 一、算法介绍 二、算法描述 三、计算细节补充 四、算法总结 一、算法介绍 我们使用Sutherland–Hodgman算法来裁剪多边形的边,一般是给你一个多边形顶点序列(P1,P2,P3,P4,…Pn)让你裁剪,最终裁剪掉裁剪多边形的外部部分(下图黑框就是裁剪多边形…