启动spark-shell
spark-shell \
> --jars /opt/software/hudi-spark3.1-bundle_2.12-0.12.0.jar \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'\
> --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
2
hudi内置数据生成器,生成10条json数据
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName="hudi_trips_cow"
val basePath ="file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts=convertToStringList(dataGen.generateInserts(10))
3加载到DF,写入hudi,实现简单etl处理
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
4读取存储数据及注册临时表
scala> :paste
// Entering paste mode (ctrl-D to finish)
val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()