目录
0. 相关文章链接
1. 启动 spark-shell
2. 插入数据
3. 查询数据
3.1. 转换成DF
3.2. 查询
3.3. 时间旅行查询
4. 更新数据
5. 增量查询
5.1. 重新加载数据
5.2. 获取指定beginTime
5.3. 创建增量查询的表
5.4. 查询增量表
6. 指定时间点查询
7. 删除数据
8. 覆盖数据
0. 相关文章链接
Hudi文章汇总
1. 启动 spark-shell
- 启动命令:
#针对Spark 3.2
spark-shell \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
- 设置表名,基本路径和数据生成器(不需要单独的建表。如果表不存在,第一批写表将创建该表):
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
2. 插入数据
新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。
数据文件的命名规则,源码如下:
3. 查询数据
3.1. 转换成DF
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
注意:该表有三级分区(区域/国家/城市),在0.9.0版本以前的hudi,在load中的路径需要按照分区目录拼接"*",如:load(basePath + "/*/*/*/*"),当前版本不需要。
3.2. 查询
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
3.3. 时间旅行查询
Hudi从0.9.0开始就支持时间旅行查询。目前支持三种查询时间格式,如下所示。
spark.read.
format("hudi").
option("as.of.instant", "20210728141108100").
load(basePath)
spark.read.
format("hudi").
option("as.of.instant", "2021-07-28 14:11:08.200").
load(basePath)
// 表示 "as.of.instant = 2021-07-28 00:00:00"
spark.read.
format("hudi").
option("as.of.instant", "2021-07-28").
load(basePath)
4. 更新数据
类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中。
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
注意:保存模式现在是Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的行程数据。每个写操作都会生成一个用时间戳表示的新提交。查找以前提交中相同的_hoodie_record_keys在该表的_hoodie_commit_time、rider、driver字段中的变化。
查询更新后的数据,要重新加载该hudi表:
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
5. 增量查询
Hudi还提供了增量查询的方式,可以获取从给定提交时间戳以来更改的数据流。需要指定增量查询的beginTime,选择性指定endTime。如果我们希望在给定提交之后进行所有更改,则不需要指定endTime(这是常见的情况)。
5.1. 重新加载数据
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
5.2. 获取指定beginTime
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)
5.3. 创建增量查询的表
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
5.4. 查询增量表
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
这将过滤出beginTime之后提交且fare>20的数据。
利用增量查询,我们能在批处理数据上创建streaming pipelines。
6. 指定时间点查询
查询特定时间点的数据,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)
- 指定beginTime和endTime
val beginTime = "000"
val endTime = commits(commits.length - 2)
- 根据指定时间创建表
val tripsPointInTimeDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
- 查询
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
7. 删除数据
根据传入的HoodieKeys来删除(uuid + partitionpath),只有append模式,才支持删除功能。
- 获取总行数
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
- 取其中2条用来删除
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
- 将待删除的2条数据构建DF
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
- 执行删除
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
- 统计删除数据后的行数,验证删除是否成功
val roAfterDeleteViewDF = spark.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// 返回的总行数应该比原来少2行
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
8. 覆盖数据
对于表或分区来说,如果大部分记录在每个周期都发生变化,那么做upsert或merge的效率就很低。我们希望类似hive的 "insert overwrite "操作,以忽略现有数据,只用提供的新数据创建一个提交。
也可以用于某些操作任务,如修复指定的问题分区。我们可以用源文件中的记录对该分区进行'插入覆盖'。对于某些数据源来说,这比还原和重放要快得多。
Insert overwrite操作可能比批量ETL作业的upsert更快,批量ETL作业是每一批次都要重新计算整个目标分区(包括索引、预组合和其他重分区步骤)。
- 查看当前表的key
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
- 生成一些新的行程数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
- 覆盖指定分区
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(),"insert_overwrite").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
- 查询覆盖后的key,发生了变化
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
注:其他Hudi相关文章链接由此进 -> Hudi文章汇总