Hudi-集成Spark之spark-shell 方式

news2024/9/20 5:51:49

Hudi集成Spark之spark-shell 方式

启动 spark-shell

(1)启动命令

#针对Spark 3.2
spark-shell \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

(2)设置表名,基本路径和数据生成器(不需要单独的建表。如果表不存在,第一批写表将创建该表):

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。

数据文件的命名规则,源码如下:

查询数据

查询

(1)转换成DF

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

注意:该表有三级分区(区域/国家/城市),在0.9.0版本以前的hudi,在load中的路径需要按照分区目录拼接"*",如:load(basePath + “////”),当前版本不需要。

(2)查询

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

时间旅行查询

Hudi从0.9.0开始就支持时间旅行查询。目前支持三种查询时间格式,如下所示。

spark.read.
  format("hudi").
  option("as.of.instant", "20210728141108100").
  load(basePath)
 
spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28 14:11:08.200").
  load(basePath)
 
// 表示 "as.of.instant = 2021-07-28 00:00:00"
spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28").
  load(basePath)

增量查询

Hudi还提供了增量查询的方式,可以获取从给定提交时间戳以来更改的数据流。需要指定增量查询的beginTime,选择性指定endTime。如果我们希望在给定提交之后进行所有更改,则不需要指定endTime(这是常见的情况)。

(1)重新加载数据

spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")

(2)获取指定beginTime

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) 

(3)创建增量查询的表

val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

(4)查询增量表

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

这将过滤出beginTime之后提交且fare>20的数据。

利用增量查询,我们能在批处理数据上创建streaming pipelines。

指定时间点查询

查询特定时间点的数据,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)

(1)指定beginTime和endTime

val beginTime = "000" 
val endTime = commits(commits.length - 2) 

(2)根据指定时间创建表

val tripsPointInTimeDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  option(END_INSTANTTIME_OPT_KEY, endTime).
  load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")

(3)查询

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

更新数据

类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中。

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

注意:保存模式现在是Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的行程数据。每个写操作都会生成一个用时间戳表示的新提交。查找以前提交中相同的_hoodie_record_keys在该表的_hoodie_commit_time、rider、driver字段中的变化。

查询更新后的数据,要重新加载该hudi表:

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
 
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

删除数据

根据传入的HoodieKeys来删除(uuid + partitionpath),只有append模式,才支持删除功能。

(1)获取总行数

spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

(2)取其中2条用来删除

val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

(3)将待删除的2条数据构建DF

val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

(4)执行删除

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

(5)统计删除数据后的行数,验证删除是否成功

val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)
 
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
 
// 返回的总行数应该比原来少2行
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

覆盖数据

对于表或分区来说,如果大部分记录在每个周期都发生变化,那么做upsert或merge的效率就很低。我们希望类似hive的 "insert overwrite "操作,以忽略现有数据,只用提供的新数据创建一个提交。

也可以用于某些操作任务,如修复指定的问题分区。我们可以用源文件中的记录对该分区进行’插入覆盖’。对于某些数据源来说,这比还原和重放要快得多。

Insert overwrite操作可能比批量ETL作业的upsert更快,批量ETL作业是每一批次都要重新计算整个目标分区(包括索引、预组合和其他重分区步骤)。

(1)查看当前表的key

spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)

(2)生成一些新的行程数据

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
  read.json(spark.sparkContext.parallelize(inserts, 2)).
  filter("partitionpath = 'americas/united_states/san_francisco'")

(3)覆盖指定分区

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION.key(),"insert_overwrite").
  option(PRECOMBINE_FIELD.key(), "ts").
  option(RECORDKEY_FIELD.key(), "uuid").
  option(PARTITIONPATH_FIELD.key(), "partitionpath").
  option(TBL_NAME.key(), tableName).
  mode(Append).
  save(basePath)

(4)查询覆盖后的key,发生了变化

spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/343993.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leaflet 本地上传shp文件,在map上解析显示图形(058)

第058个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+leaflet中本地上传shp文件,利用shapefile读取shp数据,并在地图上显示图形。 直接复制下面的 vue+openlayers源代码,操作2分钟即可运行实现效果 文章目录 示例效果加载shapefile.js方式配置方式示例源代码(共126…

GBDT+LR

为什么需要GBDTLR 协同过滤和矩阵分解存在问题: 仅利用了用户与物品相互行为信息进行推荐, 忽视了用户自身特征, 物品自身特征以及上下文信息等,导致生成的结果往往会比较片面。 FFM 存在问题 FFM特征交叉能力有限:虽然 FFM 模型…

Excel里数字太长显示为科学计数法如何显示完整数字

Excel里数字太长显示为科学计数法如何显示完整数字 注意:以下测试都是在macos的Microsoft Excel for Mac的16.53版本中实际测试的,在windows中应该也是一样的。 一、问题描述 数字太长在Excel中会显示为E形式 有些值,比如身份证号、银行卡…

编译原理(第3版-王生原)课后习题答案-第三章

1.构造下列正规式相应的 DFA。(1)1(0|1) *101(2)1(1010* |1(010)*1) *0(3)a((a|b)* |ab*a)*b(4)b((ab)* bb)*ab答案:(2)(3)(4)略。 写1个(1)体现解题思路。2.已知 NFA ((x,yz),{0,1}M,{x},{z}),其中:M(x,0){z}, M(y,0){x,y}, M(z,0){x,z}, M(x,1){x}, M(…

Linux文件默认权限:umask

umask就是指定目前用户在建立文件或目录时候的权限默认值 查看方式有两种:一种可以直接输入umask,就可以看到数字类型的权限设置值,一种则是加入umask后加入-S(Symbolic)选项,就会以符号类型的方式来显示出…

HJY-E1A/4D AC220V数字式交流【电压继电器】

系列型号 HJY-2B-2H2D DC220V欠电压继电器 HJY-1A-2H2D DC220V过压继电器 一、用途 本系列电压继电器为瞬时动作特性,用于发电机,变压器,输电线路的继电保护装置中作为过压或欠压的闭锁启动元件。 二、特点 (1).采用拨盘设定;或数码管显…

Spring Batch ItemReader组件-读数据库

目录 引言 数据准备 游标方式 分页方式 转视频版 引言 接着上篇:Spring Batch ItemReader组件-Json文件,了解Spring Batch 读取Json文件后,接下来一起学习一下Spring Batch 如何读数据库中的数据 数据准备 下面是一张用户表user&…

UnityShader35:光晕光效

一、光晕逻辑 光晕的逻辑很简单,就是在屏幕上画上一个一个方形的 Mesh,然后采样带 Alpha 通道的光晕贴图,效果就出来了,其中方形 Mesh 的大小、位置、纹理表现全部都由美术配置,因此效果好坏主要取决于光晕贴图以及是…

ESP8266点亮 0.96 英寸 OLED 显示屏,基于Arduino IDE

本指南介绍如何使用 Arduino IDE 将 0.96 英寸 SSD1306 OLED 显示屏与 ESP8266 结合使用。我们将向您展示如何编写文本、设置不同的字体、绘制形状和显示位图图像。安装 SSD1306 OLED 库 – ESP8266有几个库可用于使用 ESP8266 控制 OLED 显示屏。在本教程中,我们将…

IP地址与用户行为

IP地址能够解决网络风险和提高网络安全的原因是:所有的网络请求都会带有IP信息,是访问者的独立标识,另外ip地址的分配和管理比较严格,难以造假。另外ip属于网络层,可以轻松的对其进行阻断。现有的各种网络安全、负载均…

操作系统开发:BIOS/MBR基础与调试

这里在实验之前需要下载 Bochs-win32-2.6.11 作者使用的是Linux版本的,在Linux写代码不太舒服,所以最好在Windows上做实验,下载好虚拟机以后还需要下载Nasm汇编器,以及GCC编译器,为了能够使用DD命令实现磁盘拷贝&#…

树莓派 安装 宝塔linux面板5.9. 2023-2-14

一.环境 1.硬件环境: 树莓派3b , 8GB tf卡 ,micro usb电源 2.网络环境: 网线直连路由器 , 可访问互联网 3.软件环境: 树莓派操作系统 CentOS-Userland-7-armv7hl-RaspberryPI-Minimal-2009-sda(linux) 系统刻录工具 Win32DiskImager (win) ip扫描工具 Advanced IP Scanne…

公司招聘:33岁以上的和两年一跳的不要,开出工资我还以为看错了...

导读:对于公司来说,肯定是希望花最少的钱招到最优秀的员工,但事实上这个想法是不太现实的,虽然如今互联网不太好找工作,但要员工降薪去入职,相信还是有很大难度的,很多人宁可在家休息&#xff0…

【Linux】进程的虚拟地址空间

文章目录现象引入进程地址空间进程地址空间的描述进程地址空间是怎么产生的进程地址空间的好处对开篇问题的解释现象引入 我们运行下面一段代码&#xff1a; #include <stdio.h> #include <unistd.h>int global_val 100;int main() {pid_t id fork();int count…

根据 Jupyter-lab 源码实现 notebook(.ipynb)在页面中的渲染

前言 最近因为工作项目的需要&#xff0c;要在项目中尽可能的还原notebook渲染效果。由于网上没找到相关的指导文章&#xff0c;所以只能生啃JupyterLab源码&#xff0c;独自摸索实现。经过一段时间“跌跌撞撞”的摸索尝试&#xff0c;总算勉强实现了。 因此编写此文章做一下…

转转微服务容量管理实践

1 背景2 容量管理的目标3 发展阶段4 容量管理4.1 容量水位4.2 资源容量优化4.3 集群容量4.4 压测指标4.5 压测标准5 扩容、缩容6 总结1 背景 随着转转业务的不断发展和用户不断增长&#xff0c;公司持续增加对硬件和基础设施的投入&#xff0c;用于满足业务发展的需要&#xff…

计算机网络8-在浏览器中输入URL后会发生什么

参考&#xff1a; 在浏览器中输入URL并按下回车后会发生什么&#xff1f; DNS域名详细解析过程 1.URL解析拿到域名 当用户输入URL并回车后&#xff0c;浏览器对拿到的URL进行识别&#xff0c;抽取出域名字段&#xff0c;比如https://www.baidu.com,它的域名就是www.baidu.com…

SQL数据库根据需求发送邮件

一、启用数据库邮件 手动启用数据库邮件功能&#xff0c;需执行以下脚本&#xff1a; exec sp_configure show advanced options,1 RECONFIGURE exec sp_configure Database Mail XPs,1 RECONFIGURE With Override 二、邮件服务器设置 1.邮箱启用设置-POP3/IMAP/SMTP/Exch…

DAMA数据管理知识体系指南之数据质量管理

第12章 数据质量管理 12.1 简介 数据质量管理是组织变革管理中一项关键的支撑流程。业务重点的变化、公司的业务整合战略&#xff0c;以及并购与合作&#xff0c;都对IT职能提出了更高要求&#xff0c;包括整合数据源、创建一致的数据副本、交互提供数据或整合数据。与遗留系…

SpringAOP理解实现方式

Aop 什么是Aop&#xff1f; AOP就是面向切面编程&#xff0c;通过预编译方式以及运行期间的动态代理技术来实现程序的统一维护功能。 什么是切面&#xff0c;我理解的切面就是两个方法之间&#xff0c;两个对象之间&#xff0c;两个模块之间就是一个切面。假设在两个模块之间…