[一]使用Spark-shell
1-配置hudi Jar包
[root@cdp73-1 ~]# for i in $(seq 1 6); do scp /opt/software/hudi-1.0.0/packaging/hudi-spark-bundle/target/hudi-spark3.4-bundle_2.12-1.0.0.jar cdp73-$i:/opt/cloudera/parcels/CDH/lib/spark3/jars/; done
hudi-spark3.4-bundle_2.12-1.0.0.jar 100% 105MB 418.2MB/s 00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar 100% 105MB 304.8MB/s 00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar 100% 105MB 365.0MB/s 00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar 100% 105MB 406.1MB/s 00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar 100% 105MB 472.7MB/s 00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar 100% 105MB 447.1MB/s 00:00
[root@cdp73-1 ~]#
2-进入Spark-shell
spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
3-初始化项目
// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._
val tableName = "trips_table"
val basePath = "hdfs:///tmp/trips_table"
4-创建表
首次提交将自动初始化表,如果指定的基本路径中尚不存在该表。
5-出入数据
// spark-shell
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
option("hoodie.datasource.write.partitionpath.field", "city").
option("hoodie.embed.timeline.server", "false").
option("hoodie.table.name", tableName).
mode(Overwrite).
save(basePath)
【映射到Hudi写操作】Hudi提供了多种写操作——包括批量和增量写操作——以将数据写入Hudi表,这些操作具有不同的语义和性能。当未配置记录键(请参见下面的键)时,将选择bulk_insert作为写操作,这与Spark的Parquet数据源的非默认行为相匹配。
6-查询数据
// spark-shell
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")
spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM trips_table").show()
7-更新数据
// Lets read data from target Hudi table, modify fare column for rider-D and update it.
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option("hoodie.datasource.write.operation", "upsert").
option("hoodie.embed.timeline.server", "false").
option("hoodie.datasource.write.partitionpath.field", "city").
option("hoodie.table.name", tableName).
mode(Append).
save(basePath)
8-合并数据
// spark-shell
val adjustedFareDF = spark.read.format("hudi").
load(basePath).limit(2).
withColumn("fare", col("fare") * 10)
adjustedFareDF.write.format("hudi").
option("hoodie.datasource.write.payload.class","com.payloads.CustomMergeIntoConnector").
mode(Append).
save(basePath)
// Notice Fare column has been updated but all other columns remain intact.
spark.read.format("hudi").load(basePath).show()