数据湖之Hudi基础:入门介绍和编译部署

news2025/1/12 19:55:15

主要记录下Hudi的概述和打包编译等内容,方便参考

文章目录

  • 简介
    • 官网
    • 发展历史
    • Hudi特性
    • 使用场景
  • 安装部署
    • 编译环境准备
  • 编译hudi
    • 1.源码包上传到服务器
    • 2.修改pom文件
    • 3.修改源码兼容hadoop3
    • 4.手动安装kafka依赖(非必须)
    • 5.解决spark模块依赖冲突
    • 6.执行编译
    • 7.测试hudi-client
  • 简单测试编译后spark包可用性

简介

Apache Hudi(Hadoop Upserts Delete and Incremental)是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发,同时保持数据的开源文件格式。

Apache Hudi不仅非常适合于流工作负载,而且还允许创建高效的增量批处理管道。

Apache Hudi可以轻松地在任何云存储平台上使用。Hudi的高级性能优化,使分析工作负载更快的任何流行的查询引擎,包括Apache Spark、Flink、Presto、Trino、Hive等。

在这里插入图片描述
在这里插入图片描述

官网

https://hudi.apache.org/

Apache Hudi(Hadoop Upserts Delete and Incremental)是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发,同时保持数据的开源文件格式。

Apache Hudi不仅非常适合于流工作负载,而且还允许创建高效的增量批处理管道。

Apache Hudi可以轻松地在任何云存储平台上使用。Hudi的高级性能优化,使分析工作负载更快的任何流行的查询引擎,包括Apache Spark、Flink、Presto、Trino、Hive等。

发展历史

2015 年:发表了增量处理的核心思想/原则(O’reilly 文章)。

2016 年:由 Uber 创建并为所有数据库/关键业务提供支持。

2017 年:由 Uber 开源,并支撑 100PB 数据湖。

2018 年:吸引大量使用者,并因云计算普及。

2019 年:成为 ASF 孵化项目,并增加更多平台组件。

2020 年:毕业成为 Apache 顶级项目,社区、下载量、采用率增长超过 10 倍。

2021 年:支持 Uber 500PB 数据湖,SQL DML、Flink 集成、索引、元服务器、缓存。

Hudi特性

  • 可插拔索引机制支持快速Upsert/Delete。

  • 支持增量拉取表变更以进行处理。

  • 支持事务提交及回滚,并发控制。

  • 支持Spark、Presto、Trino、Hive、Flink等引擎的SQL读写。

  • 自动管理小文件,数据聚簇,压缩,清理。

  • 流式摄入,内置CDC源和工具。

  • 内置可扩展存储访问的元数据跟踪。

  • 向后兼容的方式实现表结构变更的支持。

使用场景

近实时写入

  • 减少碎片化工具的使用。

  • CDC 增量导入 RDBMS 数据。

  • 限制小文件的大小和数量。

近实时分析

  • 相对于秒级存储(Druid, OpenTSDB),节省资源。

  • 提供分钟级别时效性,支撑更高效的查询。

  • Hudi作为lib,非常轻量。

增量 pipeline

  • 区分arrivetime和event time处理延迟数据。
  • 更短的调度interval减少端到端延迟(小时 -> 分钟) => Incremental Processing。

增量导出

  • 替代部分Kafka的场景,数据导出到在线服务存储 e.g. ES。

安装部署

hudi是以lib包的形式提供功能,不同版本对spark、flink支持的依赖包不一样,具体要看官网对应版本的版本支持说明

本文会做的测试的环境如下

Linux Centos7

组件版本
Hudi0.12.1
Hadoop3.2.4
Hive3.1.3
Flink1.14 scala-2.12
Spark3.2.2 scala-2.12

hudi官网只提供了源码,需要自己编译

编译环境准备

linux环境编译

部署maven并配置环境变量

这个简单就不贴了

maven版本最好3.6以上别太低

这里贴下指定阿里仓库

修改setting.xml,指定为阿里仓库地址

vim $MAVEN_HOME/conf/settings.xml 

<!-- 添加阿里云镜像-->
<mirror>
        <id>nexus-aliyun</id>
        <mirrorOf>central</mirrorOf>
        <name>Nexus aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>

编译hudi

1.源码包上传到服务器

源码下载:https://dlcdn.apache.org/hudi/0.12.1/hudi-0.12.1.src.tgz

hudi-0.12.1.src.tgz上传到/opt/software,并解压

tar -zxvf /opt/software/hudi-0.12.1.src.tgz -C /opt/software

2.修改pom文件

vim /opt/software/hudi-0.12.1/pom.xml
  • 新增repository加速依赖下载
<repository>
        <id>nexus-aliyun</id>
        <name>nexus-aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
</repository>
  • 修改Hive/Hadoop依赖的组件版本
<hadoop.version>3.2.4</hadoop.version>
<hive.version>3.1.3</hive.version>

3.修改源码兼容hadoop3

要兼容hadoop3,除了修改版本,还需要修改如下代码:

vim /opt/software/hudi-0.12.1/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java

修改第110行,增加一个入参 null

vim 进入末行模式,然后输入set nu回车就可以看到行号

110行原先代码try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {

修改后:try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) {

110     try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) {
111       try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
112         for (IndexedRecord record : records) {
113           String recordKey = getRecordKey(record).orElse(null);
114           parquetWriter.writeAvro(recordKey, record);
115         }
116         outputStream.flush();
117       }
118     }

4.手动安装kafka依赖(非必须)

0.12.0似乎是有这个问题,我这0.12.1编译没有这个问题

如果编译报错:common-utils-5.3.4.jarcommon-config-5.3.4.jarkafka-avro-serializer-5.3.4.jarkafka-schema-registry-client-5.3.4.jar这几个jar找不到,那就要单独下载并install到你的maven仓库。

下载地址:http://packages.confluent.io/archive/5.3/confluent-5.3.4-2.12.zip

解压后找到以上报错找不到的jar包,上传服务器,并install

mvn install:install-file -DgroupId=io.confluent -DartifactId=common-config -Dversion=5.3.4 -Dpackaging=jar -Dfile=./common-config-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=common-utils -Dversion=5.3.4 -Dpackaging=jar -Dfile=./common-utils-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=5.3.4 -Dpackaging=jar -Dfile=./kafka-avro-serializer-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=5.3.4 -Dpackaging=jar -Dfile=./kafka-schema-registry-client-5.3.4.jar

5.解决spark模块依赖冲突

修改了Hive版本为3.1.3,其携带的jetty是0.9.3,hudi本身用的jetty是0.9.4,存在依赖冲突。

不改可以编译通过,但是运行spark向hudi里插入数据会报错

个人测试是编译OK,但是执行插入数据就报如下错误

java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
  • 修改hudi-spark-bundlepom文件,排除低版本jetty,添加hudi指定版本的jetty

    vim /opt/software/hudi-0.12.1/packaging/hudi-spark-bundle/pom.xml

    大概369行位置开始的hive-servicehive-jdbchive-metastorehive-common

    增加下方的<exclusions>...</exclusions>部分

<!-- Hive -->
    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.pentaho</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

 <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

 <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-metastore</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
        <groupId>${hive.groupid}</groupId>
        <artifactId>hive-common</artifactId>
        <version>${hive.version}</version>
        <scope>${spark.bundle.hive.scope}</scope>
        <exclusions>
            <exclusion>
                <groupId>org.eclipse.jetty.orbit</groupId>
                <artifactId>javax.servlet</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

在此文件增加依赖

<!-- 增加hudi配置版本的jetty -->
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-server</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-util</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-webapp</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-http</artifactId>
      <version>${jetty.version}</version>
    </dependency>
  • 修改hudi-utilities-bundlepom文件,排除低版本jetty,添加hudi指定版本的jetty

    解决的是:使用DeltaStreamer工具向hudi表插入数据时,也会报Jetty的错误使用DeltaStreamer工具向hudi表插入数据时,也会报Jetty的错误

vim /opt/software/hudi-0.12.1/packaging/hudi-utilities-bundle/pom.xml

hudi依赖相关:搜索找到hudi-common位置

hudi-0.12.1中,此包使用的是maven-shade-plugin插件进行include hudi相关依赖,故我们也是用相同方式进行exclude

在的下一级(与同级)增加

<excludes>
    <exclude>org.eclipse.jetty:*</exclude>
</excludes>

另外:如果是hudi-0.12.0版本,可能不是使用maven-shade-plugin插件进行include hudi相关依赖,而使用的是正常depency的依赖引入,那么需要做如下几个依赖的exclude

hudi-commonhudi-client-common增加exclude项【hudi-0.12.1不用此操作】

<exclusions>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>

Hive依赖相关:搜索hive-service的依赖位置,对如下几个依赖进行处理

hive-service增加exclude项

<exclusions>
		<exclusion>
          <artifactId>servlet-api</artifactId>
          <groupId>javax.servlet</groupId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.pentaho</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>

hive-jdbc增加exclude

	<exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>

hive-metastore增加exclude项

<exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
      </exclusions>

hive-common

<exclusions>
        <exclusion>
          <groupId>org.eclipse.jetty.orbit</groupId>
          <artifactId>javax.servlet</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>

增加jetty单独依赖

<!-- 增加hudi配置版本的jetty -->
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-server</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-util</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-webapp</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-http</artifactId>
      <version>${jetty.version}</version>
    </dependency>

6.执行编译

mvn clean package -DskipTests -Dspark3.2 -Dflink1.14 -Dscala-2.12 -Dhadoop.version=3.2.4 -Pflink-bundle-shade-hive3

编译完成后,相关的包在packaging目录的各个模块中的target里:

[root@m1 packaging]# pwd
/opt/software/hudi-0.12.1/packaging
[root@m1 packaging]# ls
hudi-aws-bundle           hudi-hadoop-mr-bundle      hudi-presto-bundle           hudi-utilities-bundle
hudi-datahub-sync-bundle  hudi-hive-sync-bundle      hudi-spark-bundle            hudi-utilities-slim-bundle
hudi-flink-bundle         hudi-integ-test-bundle     hudi-timeline-server-bundle  README.md
hudi-gcp-bundle           hudi-kafka-connect-bundle  hudi-trino-bundle

7.测试hudi-client

/opt/software/hudi-0.12.1/hudi-cli/hudi-cli.sh

出现如下即OK

Main called
===================================================================
*         ___                          ___                        *
*        /\__\          ___           /\  \           ___         *
*       / /  /         /\__\         /  \  \         /\  \        *
*      / /__/         / /  /        / /\ \  \        \ \  \       *
*     /  \  \ ___    / /  /        / /  \ \__\       /  \__\      *
*    / /\ \  /\__\  / /__/  ___   / /__/ \ |__|     / /\/__/      *
*    \/  \ \/ /  /  \ \  \ /\__\  \ \  \ / /  /  /\/ /  /         *
*         \  /  /    \ \  / /  /   \ \  / /  /   \  /__/          *
*         / /  /      \ \/ /  /     \ \/ /  /     \ \__\          *
*        / /  /        \  /  /       \  /  /       \/__/          *
*        \/__/          \/__/         \/__/    Apache Hudi CLI    *
*                                                                 *
===================================================================
10137 [main] INFO  org.apache.hudi.cli.Main [] - Starting Main v0.12.1 using Java 1.8.0_181 on m1 with PID 6681 (/opt
/software/hudi-0.12.1/hudi-cli/target/hudi-cli-0.12.1.jar started by root in /opt/software/hudi-0.12.1/packaging)
10145 [main] INFO  org.apache.hudi.cli.Main [] - No active profile set, falling back to 1 default profile: "default"
Table command getting loaded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/software/hudi-0.12.1/hudi-cli/target/lib/log4j-slf4j-impl-2.17.2.jar!/org/slf4
j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/software/hudi-0.12.1/hudi-cli/target/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/
impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
11689 [main] WARN  org.jline [] - The Parser of class org.springframework.shell.jline.ExtendedDefaultParser does not 
support the CompletingParsedLine interface. Completion with escaped or quoted words won't work correctly.
11768 [main] INFO  org.apache.hudi.cli.Main [] - Started Main in 2.031 seconds (JVM running for 11.806)
hudi->

简单测试编译后spark包可用性

需要有hadoop环境和spark

1.部署好hadoop集群、spark组件

这里不过多赘述如何安装这俩,spark只需要解压就行

spark下载:https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz

2.拷贝编译好的包到spark的jars目录

cp /opt/software/hudi-0.12.1/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.1.jar /opt/module/spark-3.2.2/jars

3.启动hadoop

4.spark-shell方式测试

  • 启动spark-shell

    spark-shell \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    
  • 执行如下scala代码

    // 设置表名,基本路径和数据生成器
    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    
    val tableName = "hudi_trips_cow"
    val basePath = "file:///tmp/hudi_trips_cow"
    val dataGen = new DataGenerator
    
    // 插入数据
    val inserts = convertToStringList(dataGen.generateInserts(10))
    val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(basePath)
    
    // 查询数据
    val tripsSnapshotDF = spark.
      read.
      format("hudi").
      load(basePath)
    tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
    

    也可以去/tmp/hudi_trips_cow/目录下查看是否有数据文件

  • 执行示例

    [root@m3 spark3]# bin/spark-shell   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'   --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'   --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    2023-01-16 01:40:40,221 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://m3:4040
    Spark context available as 'sc' (master = local[*], app id = local-1673851241535).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3.2.2
          /_/
    
    Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> import org.apache.hudi.QuickstartUtils._
    import org.apache.hudi.QuickstartUtils._
    
    scala> import scala.collection.JavaConversions._
    import scala.collection.JavaConversions._
    
    scala> import org.apache.spark.sql.SaveMode._
    import org.apache.spark.sql.SaveMode._
    
    scala> import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceReadOptions._
    
    scala> import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    
    scala> import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.hudi.config.HoodieWriteConfig._
    
    scala> val tableName = "hudi_trips_cow"
    tableName: String = hudi_trips_cow
    
    scala> val basePath = "file:///tmp/hudi_trips_cow"
    basePath: String = file:///tmp/hudi_trips_cow
    
    scala> val dataGen = new DataGenerator
    dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@2b2bcb4a
    
    scala> val inserts = convertToStringList(dataGen.generateInserts(10))
    inserts: java.util.List[String] = [{"ts": 1673839191417, "uuid": "0b652f6a-1349-444e-8442-976fc149b589", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1673565741975, "uuid": "20e0932b-5baa-4c74-a423-8e72a3c1dcef", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1673405567377, "uuid": "2417e9e6-c5a7-4399-b7a1-4b6e2fb90372", "rider": "rider-213", "driver"...
    
    scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    warning: one deprecation (since 2.12.0)
    warning: one deprecation (since 2.2.0)
    warning: two deprecations in total; for details, enable `:setting -deprecation' or `:replay -deprecation'
    df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]
    
    scala> df.write.format("hudi").
         |   options(getQuickstartWriteConfigs).
         |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
         |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
         |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
         |   option(TABLE_NAME, tableName).
         |   mode(Overwrite).
         |   save(basePath)
    warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
    2023-01-16 01:41:57,422 WARN config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
    2023-01-16 01:41:57,443 WARN config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
    2023-01-16 01:41:57,471 WARN hudi.HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow already exists. Deleting existing data & overwriting with new data.
    2023-01-16 01:41:58,404 WARN metadata.HoodieBackedTableMetadata: Metadata table was not found at path file:/tmp/hudi_trips_cow/.hoodie/metadata
    
    scala> val tripsSnapshotDF = spark.
         |   read.
         |   format("hudi").
         |   load(basePath)
    tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields]
    
    scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    
    scala> spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
    +------------------+-------------------+-------------------+-------------+
    |              fare|          begin_lon|          begin_lat|           ts|
    +------------------+-------------------+-------------------+-------------+
    | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1673405567377|
    | 93.56018115236618|0.14285051259466197|0.21624150367601136|1673491795155|
    | 27.79478688582596| 0.6273212202489661|0.11488393157088261|1673772916404|
    | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1673347004963|
    |34.158284716382845|0.46157858450465483| 0.4726905879569653|1673839191417|
    | 66.62084366450246|0.03844104444445928| 0.0750588760043035|1673613988097|
    |  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1673565741975|
    | 41.06290929046368| 0.8192868687714224|  0.651058505660742|1673298206656|
    +------------------+-------------------+-------------------+-------------+
    
    

如果插入时报错:java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V

去看下上文:解决spark依赖冲突小节解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/166650.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【基础篇】4 # 链表(上):如何实现LRU缓存淘汰算法?

说明 【数据结构与算法之美】专栏学习笔记 链表结构 数组需要一块连续的内存空间来存储&#xff0c;对内存的要求比较高&#xff0c; 而链表并不需要一块连续的内存空间&#xff0c;它通过指针将一组零散的内存块串联起来使用。 结点&#xff1a;指的是内存块后继指针 next…

Postgresql源码(98)lex与yacc的定制交互方式

1 背景知识一&#xff1a;LEX %option prefix Postgresql中使用%option prefix"core_yy"&#xff0c;影响范围&#xff1a;yy_create_buffer,yy_delete_buffer,yy_flex_debug,yy_init_buffer,yy_flush_buffer,yy_load_buffer_state,yy_switch_to_buffer,yyin,yyleng…

【并发编程十一】c++线程同步——future

【并发编程十一】c线程同步——future一、互斥二、条件变量三、future1、promise1.1、子线程设值&#xff0c;主线程获取1.2、主线程设置值&#xff0c;子线程获取1.3、shared_future2、async2.1、不开新线程的async2.2、开新线程的async3、packaged_task3.1、不使用bind3.2、提…

Kafka-概述

一、Kafka是什么 1.定义 Apache Kafka 是一款开源的消息引擎系统。 消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息&#xff0c;实现松耦合的异步式数据传递。 二、消息队列的使用场景 传统消息队列的应用场景包括 缓存/削峰、解耦、异步通信 …

vue(透传属性,$attrs)

官方文档 https://cn.vuejs.org/guide/components/attrs.html 案例 <FirstLevel class"attr-test-class" name"zs" age"18"></FirstLevel>FirstLevel组件没有用props去申明name和age&#xff0c;所以这两个属性会透视传递。 <…

RT-Thread系列--组件初始化

一、目的RT-Thread里面有个特别有意思的软件设计叫做组件自动初始化。有些小伙伴可能是第一次听说&#xff0c;所以这边我解释一下&#xff0c;请看下面的代码片段static void clock_init() {// 时钟初始化 } static void uart_init() {// 串口初始化 } static void i2c_init()…

SpringBoot自定义拦截器

&#x1f341;博客主页&#xff1a;&#x1f449;不会压弯的小飞侠 ✨欢迎关注&#xff1a;&#x1f449;点赞&#x1f44d;收藏⭐留言✒ ✨系列专栏&#xff1a;&#x1f449;SpringBoot专栏 &#x1f525;欢迎大佬指正&#xff0c;一起学习&#xff01;一起加油&#xff01; …

JavaScript 浏览器的重排和重绘

文章目录JavaScript 浏览器的重排和重绘概述浏览器解析过程重排重绘优化将多次改变样式的属性操作合并为一次需要多次重排的元素设置为绝对定位减少DOM操作复杂元素处理先设置display为none处理完后再显示缓存频繁操作的属性减少使用table布局使用事件委托绑定事件处理程序利用…

上海亚商投顾:沪指重返3200点 牛市旗手回归!

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。市场情绪三大指数今日继续走强&#xff0c;沪指重返3200点上方&#xff0c;创业板指午后一度涨近3%&#xff0c;随后涨幅有所…

2023.1. Stimulsoft 报告和仪表板的新版本:Crack

2023.1. Stimulsoft 报告和仪表板的新版本。 发布时间&#xff1a;2022 年 12 月 9 日 我们很高兴地宣布发布 Stimulsoft Reports and Dashboards 2023.1 版&#xff01;我们为 .NET Core 组件添加了对Razor Pages的支持&#xff0c;为PHP和Blazor平台更新了组件。此外&#x…

【Linux】基础:进程间通信

【Linux】基础&#xff1a;进程间通信 摘要&#xff1a;本文主要介绍进程间通信的基础知识&#xff0c;首先将会对进程间通信进行简单概述&#xff0c;其中包括本质目的和方法分类。再介绍对于方法的实现过程&#xff0c;其中有三大类方法&#xff08;管道、System V、POSIX&am…

Kotlin 中变量,类型,表达式,函数详解

博主前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住也分享一下给大家 &#x1f449;点击跳转到教程 一、变量&#xff0c;编译时变量 1、要声明可修改变量&#xff0c;使用var关键字。 2、要声明只读变量&#xff0c;使用…

35岁高龄程序员的 4 条出路,提早布局,避免出局!

目录 一、40岁回首往事&#xff1a;自己竟没有任何核心优势二、公司遇到危机时40岁大龄程序员会怎么样三、适合大龄程序员的几条职业发展路线四、最后的寄语 这篇文章&#xff0c;给大家聊聊Java工程师的职业发展规划的一些思考&#xff0c;同时也给不少20多岁、30多岁&#…

Spark 运行架构

文章目录Spark 运行架构一、运行架构二、核心组件1、Driver2、Executor3、Master & Worker4、ApplicationMaster三、核心概念1、Exuecutor 和 Core2、并行度&#xff08;Parallelism&#xff09;3、有向无环图&#xff08;DAG&#xff09;4、提交流程Yarn Client 模式Spark…

Spring Cloud Gateway(黑马springcloud笔记)

Gateway 目录Gateway一、为什么需要网关二、gateway入门三、断言工厂四、过滤器工厂五、全局过滤1. 实现2. 过滤器执行顺序六、跨域问题一、为什么需要网关 不能让外部能够直接访问微服务&#xff0c;而是需要通过网关访问&#xff1a; 网关的作用&#xff1a; 身份认证和权限…

数据结构与算法基础(王卓)(8):线性表的应用(并集和有序表合并)

PPT&#xff1a;第二章P173&#xff1b; 并集集合&#xff1a;线性表的合并&#xff08;无需有序&#xff0c;不能重复&#xff09; 线性表&#xff1a; Status Union(Sqlist& A, Sqlist& B)//并集 {int len_A A.length;int len_B B.length;for (int i 1; i < …

研究生如何能(较快)找出某领域(去噪)已有算法的创新点或者引入其他领域的新算法?

广义上说&#xff0c;滤波就是给不同的信号分量分配不同的权重&#xff0c;较为复杂的维纳滤波, 是根据信号的统计量设计权重。狭义上说&#xff0c;降噪/去噪&#xff0c;可以看成滤波的一种。降噪的目的在于突出信号本身而抑制噪声影响。从这个角度&#xff0c;降噪就是给信号…

C/C++ 调用规则

平栈&#xff1a;清理参数对调用栈的操作步骤&#xff1a;参数传递三种调用约定&#xff1a;cdecl &#xff08;C调用约定&#xff09;:从右往左传参&#xff0c;参数通过栈传递&#xff0c;调用方(caller)负责平参&#xff08;支持类似printf的不定参&#xff09;stdcall (标准…

hadoop简介

文章目录1&#xff1a;hadoop简介2&#xff1a;Hadoop系统2.1&#xff1a;hadoop架构1&#xff1a;MapReduce2&#xff1a;YARN架构3&#xff1a;HDFS2.2&#xff1a;HDFS、YARN、MapReduce三者关系1&#xff1a;hadoop简介 Hadoop是一个由Apache基金会所开发的分布式系统基础…

如何快速删除CSV、Excel、Markdown表格的重复行?

如果你正在使用 CSV、Excel 或 Markdown 表格&#xff0c;你可能会遇到重复行的问题。这可能是因为你手动输入了重复的数据&#xff0c;或者是因为你从其他源导入了重复的数据。无论原因是什么&#xff0c;删除重复行是一项重要的数据清理任务。本文将向你展示如何使用几种不同…