Hudi集成Spark(一)Spark Shell方式

news2025/1/11 2:35:25

文章目录

  • 环境准备
    • 安装 Spark
    • 启动 Hadoop(略)
  • spark-shell 方式
    • 启动 spark-shell
    • 插入数据
    • 查询数据
    • 查询数据
    • 更新数据
    • 增量查询
    • 指定时间点查询
    • 删除数据
    • 覆盖数据

环境准备

安装 Spark

1)Hudi 支持的 Spark 版本

HudiSupported Spark 3 version
0.12.x3.3.x,3.2.x,3.1.x
0.11.x3.2.x(default build, Spark bundle only),3.1.x
0.10.x3.1.x(default build), 3.0.x
0.7.0-0.9.03.0.x
0.6.0 and priorNot supported

Spark 安装

参考:https://blog.csdn.net/weixin_45417821/article/details/121323669

2)下载 Spark 安装包,解压

cd /opt/software/
wget https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-binhadoop3.2.tgz
tar -zxvf spark-3.2.2-bin-hadoop3.2.tgz -C /opt/module/
mv /opt/module/spark-3.2.2-bin-hadoop3.2 /opt/module/spark-3.2.2

3)配置环境变量

sudo vim /etc/profile.d/my_env.sh
export SPARK_HOME=/opt/module/spark-3.2.2
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile.d/my_env.sh

4)拷贝编译好的包到 spark 的 jars 目录

cp /opt/software/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/module/spark-3.2.2/jars

启动 Hadoop(略)

建议大家用Spark-Yarn的方式接入Hudi ,生产环境中,基本都是Spark on Yarn 同步

spark-shell 方式

启动 spark-shell

1)启动命令

针对 Spark 3.2

spark-shell \
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
 --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
 --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

出现如下图片提示,则代表已经成功!

在这里插入图片描述

2)设置表名,基本路径和数据生成器

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///opt/datas/hudi_trips_cow"
val dataGen = new DataGenerator

不需要单独的建表。如果表不存在,第一批写表将创建该表。

插入数据

新增数据,生成一些数据,将其加载到 DataFrame 中,然后将 DataFrame 写入 Hudi 表。

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
 options(getQuickstartWriteConfigs).
 option(PRECOMBINE_FIELD_OPT_KEY, "ts").
 option(RECORDKEY_FIELD_OPT_KEY, "uuid").
 option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
 option(TABLE_NAME, tableName).
 mode(Overwrite).
 save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/opt/datas/hudi_trps_cow 路径下是否有数据生成。

cd /opt/datas/hudi_trips_cow/
ls

在这里插入图片描述

数据文件的命名规则,源码如下:

在这里插入图片描述

查询数据

1)转换成 DF

val tripsSnapshotDF = spark.
 read.
 format("hudi").
 load(basePath)

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

查询数据

1)转换成 DF

val tripsSnapshotDF = spark.
 read.
 format("hudi").
 load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

径需要按照分区目录拼接"",如:load(basePath + "////*"),当前版本不需要。
2)查询

spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

查询后的效果图如下:

在这里插入图片描述

3)时间旅行查询

Hudi 从 0.9.0 开始就支持时间旅行查询。目前支持三种查询时间格式,如下所示。

spark.read.
 format("hudi").
 option("as.of.instant", "20210728141108100").
 load(basePath)
spark.read.
 format("hudi").
 option("as.of.instant", "2021-07-28 14:11:08.200").
 load(basePath)
// 表示 "as.of.instant = 2021-07-28 00:00:00"
spark.read.
 format("hudi").
 option("as.of.instant", "2021-07-28").
 load(basePath)

更新数据

类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame 中并将 DataFrame 写入 Hudi 表中。

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 
2))
df.write.format("hudi").
 options(getQuickstartWriteConfigs).
 option(PRECOMBINE_FIELD_OPT_KEY, "ts").
 option(RECORDKEY_FIELD_OPT_KEY, "uuid").
 option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
 option(TABLE_NAME, tableName).
 mode(Append).
 save(basePath)

注意:保存模式现在是 Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的行程数据。每个写操作都会生成一个用时间戳表示的新提交。查找以前提交中相同的_hoodie_record_keys 在该表的_hoodie_commit_time、rider、driver 字段中的变化。

查询更新后的数据,要重新加载该 hudi 表:

val tripsSnapshotDF1 = spark.
 read.
 format("hudi").
 load(basePath)

tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot1")

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot1").show()

查询到两个不一样的commit_time

在这里插入图片描述

进入文件中查看

cd /opt/datas/hudi_trips_cow/americas/brazil/sao_paulo

发现有两拨更新文件了,下面是新文件,上面是旧文件

在这里插入图片描述

如果查看旧文件也很简单,基于查询章节中,基于时间查询即可,举例:

val tripsSnapshotDF2 = spark.read.
 format("hudi").
 option("as.of.instant", "20230111143334593").
 load(basePath)

tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot2")

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot2").show()

这样就能看到清一色的以前的文件了

在这里插入图片描述

增量查询

Hudi 还提供了增量查询的方式,可以获取从给定提交时间戳以来更改的数据流。需要指定增量查询的 beginTime,选择性指定 endTime。如果我们希望在给定提交之后进行所有更改,则不需要指定 endTime(这是常见的情况)。
1)重新加载数据

spark.
 read.
 format("hudi").
 load(basePath).
 createOrReplaceTempView("hudi_trips_snapshot")

2)获取指定 beginTime

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)

val beginTime = commits(commits.length - 2) 

3)创建增量查询的表

val tripsIncrementalDF = spark.read.format("hudi").
 option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
 option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
 load(basePath)

tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

4)查询增量表

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

这将过滤出 beginTime 之后提交且 fare>20 的数据。利用增量查询,我们能在批处理数据上创建 streaming pipelines。

在这里插入图片描述

指定时间点查询

查询特定时间点的数据,可以将 endTime 指向特定时间,beginTime 指向 000(表示最早提交时间)
1)指定 beginTime 和 endTime

val beginTime = "000" 
val endTime = commits(commits.length - 2) 

2)根据指定时间创建表

val tripsPointInTimeDF = spark.read.format("hudi").
 option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
 option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
 option(END_INSTANTTIME_OPT_KEY, endTime).
 load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")

3)查询

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

查询到最早的时间

在这里插入图片描述

删除数据

根据传入的 HoodieKeys 来删除(uuid + partitionpath),只有 append 模式,才支持删除
功能。

1)获取总行数

spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

2)取其中 2 条用来删除

val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

3)将待删除的 2 条数据构建 DF

val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

4)执行删除

df.write.format("hudi").
 options(getQuickstartWriteConfigs).
 option(OPERATION_OPT_KEY,"delete").
 option(PRECOMBINE_FIELD_OPT_KEY, "ts").
 option(RECORDKEY_FIELD_OPT_KEY, "uuid").
 option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
 option(TABLE_NAME, tableName).
 mode(Append).
 save(basePath)

5)统计删除数据后的行数,验证删除是否成功

val roAfterDeleteViewDF = spark.
 read.
 format("hudi").
 load(basePath)

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")


// 返回的总行数应该比原来少 2 行
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

覆盖数据

对于表或分区来说,如果大部分记录在每个周期都发生变化,那么做 upsert 或 merge 的效率就很低。我们希望类似 hive 的 "insert overwrite "操作,以忽略现有数据,只用提供的新数据创建一个提交。

也可以用于某些操作任务,如修复指定的问题分区。我们可以用源文件中的记录对该分区进行’插入覆盖’。对于某些数据源来说,这比还原和重放要快得多。Insert overwrite 操作可能比批量 ETL 作业的 upsert 更快,批量 ETL 作业是每一批次都要重新计算整个目标分区(包括索引、预组合和其他重分区步骤)。

1)查看当前表的 key

spark.
 read.format("hudi").
 load(basePath).
 select("uuid","partitionpath").
 sort("partitionpath","uuid").
 show(100, false)

2)生成一些新的行程数据

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
 read.json(spark.sparkContext.parallelize(inserts, 2)).
 filter("partitionpath = 'americas/united_states/san_francisco'")

3)覆盖指定分区

df.write.format("hudi").
 options(getQuickstartWriteConfigs).
 option(OPERATION.key(),"insert_overwrite").
 option(PRECOMBINE_FIELD.key(), "ts").
 option(RECORDKEY_FIELD.key(), "uuid").
 option(PARTITIONPATH_FIELD.key(), "partitionpath").
 option(TBL_NAME.key(), tableName).
 mode(Append).
 save(basePath)

4)查询覆盖后的 key,发生了变化

spark.
 read.format("hudi").
 load(basePath).
 select("uuid","partitionpath").
 sort("partitionpath","uuid").
 show(100, false)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/157107.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

131页8万字数字化矿山整体解决方案

【版权声明】本资料来源网络,知识分享,仅供个人学习,请勿商用。 【侵删致歉】如有侵权请联系小编,将在收到信息后第一时间删除! 完整资料领取见文末,部分资料内容: 目 录 1、煤矿综合自动化系统…

两台 mac 通过 scp 命令快速传输数据

这两天由于电脑进水了,所以申请换了一台 mac 电脑,所以想把老电脑的数据拷贝到新电脑,折腾了半天,最后还是发现 scp 命令最好用。 使用 「scp 命令方式」之前尝试的其他方法 1、隔空投送 刚开始使用那个隔空投送功能,但…

数据结构与算法(一)——时间复杂度和空间复杂度

时间复杂度 1、概念引入 先说结论:时间复杂度是用来估计算法运行时间的一个式子(单位)。 例如:这四组代码,哪组运行时间最短? q:我们该用什么方式来体现算法运行的快慢? a&#…

安卓apk包破解

安卓apk包破解前言一、前置工作二、正式开始前言 拿到安卓的apk包如何,如何破解呢。流程如下 一、前置工作 拿到一个apk文件,修改其后缀为zip 例如: test.apk > test.zip 使用解压工具进行解压。解压后如图 获取到原始文件目录&…

Django项目——通过APIView实现API访问,增删改查数据库

前提 该文章在已有项目的基础上进行修改 https://blog.csdn.net/qq_38122800/article/details/128583379?spm1001.2014.3001.5502 1、配置序列化器 序列化器包含序列化和反序列化两个过程,简单点理解就是 序列化 : 将从数据库中查的数据变为前端页面可以接受的json数据 反…

机器学习的逻辑回归(Logistic)的实例————预测学生是否被录取

目录要求代码1. 导入模块2. 导入数据3. 求解theat的最优值,画出样本的位置和决策边界。4. 画出迭代函数随迭代次数变化的曲线,代价函数最终的收敛值5.比较三种学习率的代价函数随迭代次数变化的曲线5.1 学习率为0.00035.2 学习率为0.00055.3 学习率为0.00001要求 代码 1. 导入…

PySpark和RDD对象详解

目录 一.了解Spark、PySpark Spark是什么 Python on Spark Pyspark 小结 二.构建PySpark执行环境入口对象 PySpark的编程模型 小结 三.RDD对象 python数据容器转RDD对象 注意 演示 读取文件转RDD对象 演示 一.了解Spark、PySpark Spark是什么 定义:Apache Spark是用…

计算机组成原理【2】

文章目录一 计算机系统的层次结构1.1 思维导图1.2 计算机机器的五个层次1.3 三种级别的语言1.4 区分计算机组成原理和计算机体系二 计算机性能指标2.1 思维导图2.2 主存储器性能指标2.3 CPU性能指标2.4 系统整体的性能指标2.5 系统整体性能指标【动态测试】三 进位计数制3.1 思…

springboot整合opencv进行灰度图像与RGB图像互转

问题: 在开发过程中遇到一个问题,需要在图片上加上数据(原卷留痕),由于图片是灰度的,无法进行彩色编辑,需要将灰度图片转成RGB图片,才能进行彩色编辑,于是想到用opencv进…

Grafana 告警模块介绍

Grafana 系列文章,版本:OOS v9.3.1 Grafana 的介绍和安装Grafana监控大屏配置参数介绍(一)Grafana监控大屏配置参数介绍(二)Grafana监控大屏可视化图表Grafana 查询数据和转换数据Grafana 告警模块介绍 Gra…

Java对接JeePay支付、转账实现以及回调函数

最近公司对接了第三方支付平台JeePay,看到网上文章比较少,给大家发一篇对接微信支付的吧,支付宝也一样,更换里面的参数即可,官方文档地址:系统介绍 - 计全文档,具体的服务需要大家去搭建&#x…

为ABP新增手机验证模块

当前手机验证基本是标配,但Abp自身并没有实现这个功能,于是有了通过自定义模块实现的想法。 经过研究,发现要实现这个,只要重写和替换包含ReplaceEmailToUsernameOfInputIfNeeds方法的类就可以了。但要实现这个,首先要…

sql server提供三种常用截取字符串方法,LEFT()、RIGHT()、SUBSTRING()

一、sql server提供了三种常用截取字符串方法,LEFT()、RIGHT()、SUBSTRING() 1、LEFT()函数语法:LEFT(character,integer) 注释:参数1:要截取的字符串,参数2:截取字符个数说明:返回从字符串左边…

你的 VS Code 扩展值得信赖吗?

Aqua Nautilus 研究人员最近发现,攻击者可以轻松地冒充流行的 Visual Studio Code 扩展并诱骗不知情的开发人员下载它们。VSCode 是迄今为止最受欢迎的 IDE;StackOverflow 的一项调查指出,其目前已被 74.48% 的开发人员所使用。VSCode 的强大…

SPDK技术浅析

目录SPDK基础知识SPDK架构SPDK使用rpc后台启动基础机制分析后端vhost异步I/O写该篇的来由是因为翻阅到了TriCache: A User-Transparent Block Cache Enabling High-Performance Out-of-Core Processing with In-Memory Programs文章,其中对SPDK的运用的炉火纯青&…

数据结构(1)并查集

(4条消息) 第五课、Trie树、并查集、堆和堆排序_yan__kai_的博客-CSDN博客 活动 - AcWing 并查集作用:一群元素将可以归类到一个代表元素上。可以维护元素到根节点的距离。可以维护每个并查集的大小。 基本操作回顾基础课,特别是“食物链”那道题 目录…

【Django项目开发】部门管理模块的开发(八)

文章目录一、模型类设计二、视图设计1.都有哪些接口三、序列化器类设计四.分页操作1.utils工具中定义pagination.py2.视图类中使用五.路由配置一、模型类设计 一个部门下面可能会有很多子部门,一个子部门上面可能会有父部门;即部门内部之间进行关联&…

国科大模式识别与机器学习2022年期末总结

我根据本学期老师说的考试重点和我自身的情况总结的,希望能帮助到你,如有错误欢迎指正 目录第三章 判别函数Fisher线性判别感知机算法第四章 特征选择和提取K-L变换第五章 统计学习学习基础损失函数风险正则化过拟合欠拟合泛化误差第六章 有监督学习有监…

【jQuery】常用API——jQuery内容文本值

要针对元素的内容还有表单的值操作。 普通元素内容 html()&#xff08;相当于原生 inner HTML) html(); // 获取元素的内容html(内容); // 设置元素的内容<script src"../jquery.min.js"></script> </head><body><div><span>我是…

118页4万字智慧检务大数据平台解决方案

【版权声明】本资料来源网络&#xff0c;知识分享&#xff0c;仅供个人学习&#xff0c;请勿商用。【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间删除&#xff01;完整资料领取见文末&#xff0c;部分资料内容&#xff1a; 目录 第1章 前言 1.1、 政策背…