Hudi-IDEA编程

news2025/1/16 0:24:08

项目

一、Hudi+Spark+Kafka(Scala)

配置详见【1.Scala配置】

依赖详见【1.Hudi+Spark+Kafka依赖】

1-1 构建SparkSession对象

  def main(args: Array[String]): Unit = {
    //1.构建SparkSession对象
    val spark: SparkSession = SparkUtils.createSparkSession(this.getClass);
    //2.从Kafka实时消费数据
    val kafkaStreamDF: DataFrame = readFromKafka(spark, "order-topic")
    //3.提取数据,转换数据类型
    val streamDF: DataFrame = process(kafkaStreamDF);
    //4.保存数据至Hudi表中:MOR(读取时保存)
    saveToHudi(streamDF);
    //5.流式应用启动以后,等待终止
    spark.streams.active.foreach(query => println(s"Query: ${query.name} is Running ............."))
    spark.streams.awaitAnyTermination()
  }

1-2 从Kafka/CSV文件读取数据

  /**
   * 指定Kafka topic名称,实时消费数据
   *
   * @param spark
   * @param topicName
   * @return
   */
  def readFromKafka(spark: SparkSession, topicName: String): DataFrame = {
    spark.readStream
      .format("kafka") //指定Kafka
      .option("kafka.bootstrap.servers", "node1.itcast.cn:9099") //指定Kafka的服务IP和端口
      .option("subscribe", topicName) //订阅Kafka的topic的名称
      .option("startingOffsets", "latest") //从最新消费
      .option("maxOffsetsPerTrigger", 100000) //每次最多处理10万条数据
      .option("failOnDataLoss", value = false) //如果数据丢失是否失败
      .load()
  }

  /**
   * 读取CSV格式文本文件数据,封装到DataFrame数据集
   */
  def readCsvFile(spark: SparkSession, path: String): DataFrame = {
    spark.read
      // 设置分隔符为\t
      .option("sep", "\\t")
      // 文件首行为列名称
      .option("header", "true")
      // 依据数值自动推断数据类型
      .option("inferSchema", "true")
      // 指定文件路径
      .csv(path)
  }

1-3 ETL转换后存储至Hudi表中

  /**
   * 对Kafka获取数据,进行转换操作,获取所有字段的值,转换为String,以便保存到Hudi表
   * @param streamDF
   * @return
   */
  def process(streamDF: DataFrame): DataFrame = {
    streamDF
      //选择字段
      .selectExpr(
        "CAST(key AS STRING) order_id",
        "CAST(value AS STRING) AS message",
        "topic","partition","offset","timestamp"
      )
      //解析message数据,提取字段值
      .withColumn("user_id",get_json_object(col("message"),"$.userId"))
      .withColumn("order_time",get_json_object(col("message"),"$.orderTime"))
      .withColumn("ip",get_json_object(col("message"),"$.ip"))
      .withColumn("order_money",get_json_object(col("message"),"$.orderMoney"))
      .withColumn("order_status",get_json_object(col("message"),"$.orderStatus"))
      //删除message字段
      .drop(col("message"))
      //转换订单日期时间格式为Long类型,作为Hudi表中合并数据字段
      .withColumn("ts",to_timestamp(col("order_time"),"yyyy-MM-dd HH:mm:ss.SSS"))
      //订单日期时间提取分区日期:yyyyMMdd
      .withColumn("day",substring(col("order_time"),0,10))
  }

  /**
   * 将流式数据集DataFrame保存至Hudi表,表类型可选:COW和MOR
   */
  def saveToHudi(streamDF: DataFrame): Unit = {
    streamDF.writeStream
      .outputMode(OutputMode.Append())
      .queryName("query-hudi-streaming")
      // 针对每微批次数据保存
      .foreachBatch((batchDF: Dataset[Row], batchId: Long) => {
        println(s"============== BatchId: ${batchId} start ==============")
        writeHudiMor(batchDF) // TODO:表的类型MOR
      })
      .option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-1001")
      .start()
  }

  /**
   * 将数据集DataFrame保存到Hudi表中,表的类型:MOR(读取时合并)
   */
  def writeHudiMor(dataframe: DataFrame): Unit = {
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.hudi.keygen.constant.KeyGeneratorOptions._

    dataframe.write
      .format("hudi")
      .mode(SaveMode.Append)
      // 表的名称
      .option(TBL_NAME.key, "tbl_hudi_order")
      // 设置表的类型
      .option(TABLE_TYPE.key(), "MERGE_ON_READ")
      // 每条数据主键字段名称
      .option(RECORDKEY_FIELD_NAME.key(), "order_id")
      // 数据合并时,依据时间字段
      .option(PRECOMBINE_FIELD_NAME.key(), "ts")
      // 分区字段名称
      .option(PARTITIONPATH_FIELD_NAME.key(), "day")
      // 分区值对应目录格式,是否与Hive分区策略一致
      .option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true")
      // 插入数据,产生shuffle时,分区数目
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // 表数据存储路径
      .save("/hudi-warehouse/tbl_hudi_order")
  }

1-4 SparkSQL加载Hudi表数据并分析

  /**
   * 从Hudi表加载数据,指定数据存在路径
   */
  def readFromHudi(spark: SparkSession, path: String): DataFrame = {
    // a. 指定路径,加载数据,封装至DataFrame
    val didiDF: DataFrame = spark.read.format("hudi").load(path);

    // b. 选择字段
    didiDF
      // 选择字段
      .select(
        "order_id", "product_id", "type", "traffic_type", 
        "pre_total_fee", "start_dest_distance", "departure_time" 
      )
  }

  /**
   * 订单类型统计,字段:product_id
   */
  def reportProduct(dataframe: DataFrame): Unit = {
    val reportDF: DataFrame = dataframe.groupBy("product_id").count();

    val to_name = udf(
      (product_id: Int) => {
        product_id match {
          case 1 => "滴滴专车"
          case 2 => "滴滴企业专车"
          case 3 => "滴滴快车"
          case 4 => "滴滴企业快车"
        }
      }
    )
    val resultDF: DataFrame = reportDF.select(
      to_name(col("product_id")).as("order_type"), //
      col("count").as("total") //
    )
    resultDF.printSchema();
    resultDF.show(10, truncate = false);
  }

二、Hudi+Flink+Kafka(Java)

依赖详见【2.Hudi+Flink+Kafka依赖】

2-1 从Kafka消费数据

第1步获取表执行环境无需赘述。

第2步创建输入表:指定了Kafka的服务IP和端口、topic等信息,从这里读取数据

第3步中转换数据为Hudi表中需要的格式(添加两个必须字段:数据合并字段ts,分区字段partition_day)

package cn.itcast.hudi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkSQLKafkaDemo {

    public static void main(String[] args) {
        //1.获取表执行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode() //流式
                .build();
        TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        //2.创建输入表:从Kafka消费数据
        tableEnvironment.executeSql(
                "CREATE TABLE order_kafka_source (\n" +
                        "  orderId STRING,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'order-topic',\n" +
                        "  'properties.bootstrap.servers' = 'node1.itcast.cn:9099',\n" +
                        "  'properties.group.id' = 'gid-1001',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")"
        );
        //3.转换数据:可以使用SQL,也可以是Table api
        Table table = tableEnvironment.from("order_kafka_source")
                //添加字段:hudi表数据合并字段,"orderId":"20211122103434136000001" -> 20211122103434136
                .addColumns(
                        $("orderId").substring(0, 17).as("ts")
                )
                //添加字段:hudi表中分区字段,"orderTime":"2021-11-22 10:34:34.136" -> 2021-11-22
                .addColumns(
                        $("orderTime").substring(0, 10).as("partition_day")
                );
        tableEnvironment.createTemporaryView("view_order",table);
        //4.创建输出表:将结果数据输出
        tableEnvironment.executeSql("select * from view_order").print();
    }
}

2-2 将数据输出到hudi表中

第4步创建输出表:指定了输出Hudi表路径(本地路径、Hadoop等)、表类型、数据合并字段、分组字段等,数据输出到这里

第5步将数据插入到输出Hudi表中

package cn.itcast.hudi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;


/**
 * 基于Flink SQL Connector实现:实时消费Topic中数据,转换处理后,实时存储Hudi表中
 */
public class FlinkSQLHudiDemo {

    public static void main(String[] args) {
        //1.获取表执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000);//由于增量将数据写入到Hudi表,所以需要启动Flink CheckPoint检查点
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode() //流式
                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env,settings);
        //2.创建输入表:从Kafka消费数据
        tableEnvironment.executeSql(
                "CREATE TABLE order_kafka_source (\n" +
                        "  orderId STRING,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'order-topic',\n" +
                        "  'properties.bootstrap.servers' = 'node1.itcast.cn:9099',\n" +
                        "  'properties.group.id' = 'gid-1001',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")"
        );
        //3.转换数据:可以使用SQL,也可以是Table api
        Table table = tableEnvironment.from("order_kafka_source")
                //添加字段:hudi表数据合并字段,"orderId":"20211122103434136000001" -> 20211122103434136
                .addColumns(
                        $("orderId").substring(0, 17).as("ts")
                )
                //添加字段:hudi表中分区字段,"orderTime":"2021-11-22 10:34:34.136" -> 2021-11-22
                .addColumns(
                        $("orderTime").substring(0, 10).as("partition_day")
                );
        tableEnvironment.createTemporaryView("view_order", table);
        //4.创建输出表:将数据输出到hudi表中
        tableEnvironment.executeSql(
                "CREATE TABLE order_hudi_sink (\n" +
                        "  orderId STRING PRIMARY KEY NOT ENFORCED,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT,\n" +
                        "  ts STRING,\n" +
                        "  partition_day STRING\n" +
                        ")\n" +
                        "PARTITIONED BY (partition_day) \n" +
                        "WITH (\n" +
                        "  'connector' = 'hudi',\n" +
                        "  'path' = 'file:///D:/flink_hudi_order',\n" +
                        "  'table.type' = 'MERGE_ON_READ',\n" +
                        "  'write.operation' = 'upsert',\n" +
                        "  'hoodie.datasource.write.recordkey.field' = 'orderId',\n" +
                        "  'write.precombine.field' = 'ts',\n" +
                        "  'write.tasks'= '1'\n" +
                        ")"
        );
        // 5.通过子查询方式,将数据写入输出表(注意,字段顺序要一致)
        tableEnvironment.executeSql(
                "INSERT INTO order_hudi_sink\n" +
                        "SELECT\n" +
                        "  orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day\n" +
                        "FROM view_order"
        );
    }
}

2-3 从hudi表中加载数据

创建输入表,加载Hudi表查询数据即可。

package cn.itcast.hudi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * 基于Flink SQL Connector实现:从Hudi表中加载数据,编写SQL查询
 */
public class FlinkSQLReadDemo {

    public static void main(String[] args) {
        //1.获取表执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        //2.创建输入表,加载Hudi表查询数据
        tableEnvironment.executeSql(
                "CREATE TABLE order_hudi(\n" +
                        "  orderId STRING PRIMARY KEY NOT ENFORCED,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT,\n" +
                        "  ts STRING,\n" +
                        "  partition_day STRING\n" +
                        ")\n" +
                        "PARTITIONED BY (partition_day)\n" +
                        "WITH (\n" +
                        "  'connector' = 'hudi',\n" +
                        "  'path' = 'file:///D:/flink_hudi_order',\n" +
                        "  'table.type' = 'MERGE_ON_READ',\n" +
                        "  'read.streaming.enabled' = 'true',\n" +
                        "  'read.streaming.check-interval' = '4'\n" +
                        ")"
        );
        //3.执行查询语句,流式读取Hudi数据
        tableEnvironment.executeSql(
                "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts ,partition_day FROM order_hudi"
        ).print();
    }
}

附:依赖

1.Hudi+Spark+Kafka依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>

<properties>
    <scala.version>2.12.10</scala.version>
    <scala.binary.version>2.12</scala.binary.version>
    <spark.version>3.0.0</spark.version>
    <hadoop.version>2.7.3</hadoop.version>
    <hudi.version>0.9.0</hudi.version>
</properties>

<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Spark SQL 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Structured Streaming + Kafka  依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>

    <!-- hudi-spark3 -->
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-spark3-bundle_2.12</artifactId>
        <version>${hudi.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <!-- Spark SQL 与 Hive 集成 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.4.13</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.12</version>
    </dependency>

</dependencies>

<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2.Hudi+Flink+Kafka依赖

<repositories>
    <repository>
        <id>nexus-aliyun</id>
        <name>Nexus aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </repository>
    <repository>
        <id>central_maven</id>
        <name>central maven</name>
        <url>https://repo1.maven.org/maven2</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
            <enabled>false</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.binary.version>2.12</scala.binary.version>
    <flink.version>1.12.2</flink.version>
    <hadoop.version>2.7.3</hadoop.version>
    <mysql.version>8.0.16</mysql.version>
</properties>

<dependencies>
    <!-- Flink Client -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Flink Table API & SQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId>
        <version>0.9.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-shaded-hadoop-2-uber</artifactId>
        <version>2.7.5-10.0</version>
    </dependency>

    <!-- MySQL/FastJson/lombok -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
    </dependency>

    <!-- slf4j及log4j -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

</dependencies>

<build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/test/java</testSourceDirectory>
    <plugins>
        <!-- 编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <!--<encoding>${project.build.sourceEncoding}</encoding>-->
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>
        <!-- 打jar包插件(会包含所有依赖) -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <!-- <mainClass>com.itcast.flink.batch.FlinkBatchWordCount</mainClass> -->
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

附:报错

1.运行报错

【报错代码】

Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

【原因】

windows下运行时需要安装Windows下运行的支持插件:hadoop2.7-common-bin

网址:https://gitcode.net/mirrors/cdarlint/winutils?utm_source=csdn_github_accelerator

选择需要版本的包下载,配置环境变量HADOOP_HOME和path,重启idea再运行就不会报错了

cd hudi/server/hadoop

./bin/hadoop checknative

2.运行报错

【报错】

NoSuchFieldError: INSTANCE

【原因】

由于代码中的httpclient和httpcore版本过高, 而hadoop中的版本过低导致(<4.3)

【解决】

将&HADOOP_HOME/share/hadoop/common/lib 下和 &HADOOP_HOME/share/hadoop/tools/lib/下的httpclient和httpcore替换成高版本(>4.3)

cd /home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
rm httpclient-4.2.5.jar
rm httpcore-4.2.5.jar
cd /home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib
rm httpclient-4.2.5.jar
rm httpcore-4.2.5.jar
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpclient-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpcore-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpclient-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpcore-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib

3.运行警告

【警告】

WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped

【原因】

spark版本太高,最开始选的spark版本为v3.0.0,但是不太合适,改成v2.4.6,就ok了。

【解决】

官方网址:https://archive.apache.org/dist/spark/spark-2.4.6/
下载安装配置环境变量:spark-2.4.6-bin-hadoop2.7.tgz   

附:配置

1.Scala配置

1.Windows安装Scala:https://www.scala-lang.org/
安装完成后配置环境变量SCALA_HOME、path
输入scala -version查看是否安装成功
2.idea安装Scala插件:plugins搜索scala直接安装
重启之后,找到file(工具)——>project structure,找到左下角Glob libararies,然后点击中间 + 号,选择最后一个 Scala SDK,找到自己安装scala的版本,点击ok即可

2.idea中虚拟机配置

Tools -> Deployment -> Browse Remote Host
配置自己虚拟机的SSH configuration、Root path、Web server URL。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1603342.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

社交媒体数据恢复:YY语音

YY语音数据恢复指南 在我们的日常生活中&#xff0c;数据丢失是一种常见的现象。有时候&#xff0c;我们可能会不小心删除了重要的文件&#xff0c;或者因为硬件故障而导致数据丢失。在这种情况下&#xff0c;数据恢复软件可以帮助我们找回丢失的数据。本文将重点介绍如何使用Y…

手机拍照技术

拍照技巧 说明: 本文将主要介绍摄影和手机常见技巧&#xff1b; 1. 摄影的基本知识 **说明&#xff1a;**关于摄影&#xff0c;手机和相机的原理都是相同的&#xff0c;不同的是相机在很多方面优于手机&#xff0c;但是专业的设备对于我们这种的非专业的人来说&#xff0c;刚…

MAC上如何将某个目录制作成iso格式磁盘文件,iso文件本质是什么?以及挂载到ParallelDesktop中?(hdiutil makehybrid )

背景 ParallelsDesktop没有安装ParallelsTools的无法共享目录&#xff0c;可以通过ParallelsDesktop提供CD磁盘的方式共享进去 命令 # 准备文档 mkdir mytestdir cp xxx mytestdir# 生成iso hdiutil makehybrid -o output.iso mytestdir -iso -joliethdiutil是MAC提供的磁盘…

linux 修改 root 密码

1、先重启 2、看到下面的界面&#xff0c;按上下箭头&#xff0c;然后按 e 键。 3、进入该界面&#xff0c;按界面步骤操作 4、按ctrlx&#xff0c;进入到下面的界面&#xff0c;依次输入下面的指令即可 mount -o remount,rw /sysroot #让sysroot 能读写chroot /sysroot #切换到…

校园综合服务平台V3.9.2 源码修复大部分已知BUG

校园综合服务平台&#xff0c;版本更新至V3.9.1 &#xff0c;源码功能强大&#xff0c;ui 精美&#xff0c; 功能包含但不限于校园跑腿&#xff0c;外卖&#xff0c;组局&#xff0c;圈子&#xff0c;商城&#xff0c;抽奖&#xff0c;投票&#xff0c;团购&#xff0c;二手市场…

Python中2种常用数据可视化库:Bokeh和Altair

本文分享自华为云社区《探究数据可视化&#xff1a;Bokeh vs. Altair》&#xff0c;作者&#xff1a;柠檬味拥抱。 在数据科学和数据分析领域&#xff0c;数据可视化是一种强大的工具&#xff0c;可以帮助我们更好地理解数据、发现模式和趋势。Python作为一种流行的数据科学工…

【QT进阶】Qt Web混合编程之QWebEngineView基本用法

往期回顾 【QT入门】Qt自定义控件与样式设计之自定义QTabWidget实现tab在左&#xff0c;文本水平的效果-CSDN博客【QT进阶】Qt Web混合编程之CEF、QCefView简单介绍-CSDN博客 【QT进阶】Qt Web混合编程之VS2019 CEF的编译与使用-CSDN博客 【QT进阶】Qt Web混合编程之QWebEngi…

(十四)C++自制植物大战僵尸游戏windows平台视频播放实现

植物大战僵尸游戏开发教程专栏地址http://t.csdnimg.cn/8UFMs VLC库 在Cocos2d-x游戏开发框架中&#xff0c;没有实现windows平台视频播放的功能&#xff0c;需要自定义实现。在本项目中使用vlc库实现windows平台的视频播放功能。 vlc官网&#xff1a;网址 下载完成后&#x…

.net反射(Reflection)

文章目录 一.概念&#xff1a;二.反射的作用&#xff1a;三.代码案例&#xff1a;四.运行结果&#xff1a; 一.概念&#xff1a; .NET 反射&#xff08;Reflection&#xff09;是指在运行时动态地检查、访问和修改程序集中的类型、成员和对象的能力。通过反射&#xff0c;你可…

分布式搭载博客网站

一.运行环境&#xff1a; IP主机名系统服务192.168.118.128Server-WebLinuxWeb192.168.118.131Server-NFS-DNSLinuxNFS/DNS 二.基础配置 1. 配置主机名&#xff0c;hosts映射 [rootserver ~]# hostnamectl set-hostname Server-Web [rootserver ~]# hostname Server-Web [r…

每日算法4/17

1552. 两球之间的磁力 题目 在代号为 C-137 的地球上&#xff0c;Rick 发现如果他将两个球放在他新发明的篮子里&#xff0c;它们之间会形成特殊形式的磁力。Rick 有 n 个空的篮子&#xff0c;第 i 个篮子的位置在 position[i] &#xff0c;Morty 想把 m 个球放到这些篮子里&…

目标检测——行人交通信号灯数据集

一、重要性及意义 行人交通信号灯检测的重要性及意义主要体现在以下几个方面&#xff1a; 首先&#xff0c;行人交通信号灯检测对于提高道路安全性至关重要。通过准确识别交通信号灯的状态&#xff0c;行人可以更加清晰地了解何时可以安全地过马路&#xff0c;从而避免与车辆…

《ElementPlus 与 ElementUI 差异集合》el-popconfirm 气泡确认框之插槽写法有差异

ElementUI 直接在 el-button 上配置属性 slot&#xff1b; <el-popconfirm title"确定删除吗&#xff1f;请谨慎操作&#xff01;" confirm"delete"><el-button slot"reference" size"small" type"danger">删…

C++stack oj题目详解

1. 最小栈 155. 最小栈 设计一个支持 push &#xff0c;pop &#xff0c;top 操作&#xff0c;并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。 void push(int val) 将元素val推入堆栈。 void pop() 删除堆栈顶部的元素。 int top() 获取…

【Git】git命令大全(持续更新)

本文架构 0.描述git简介术语 1.常用命令2. 信息管理新建git库命令更改存在库设置获取当前库信息 3.工作空间相关将工作空间文件添加到缓存区&#xff08;增&#xff09;从工作空间中移除文件&#xff08;删&#xff09;撤销提交 4.远程仓库相关同步远程仓库分支 &#xff08;持…

IP爬虫代理服务器是什么以及为什么使用爬虫代理?

在网络抓取领域&#xff0c;爬虫代理发挥着关键作用。 但它们到底是什么&#xff1f; 从本质上讲&#xff0c;爬虫代理是位于网络抓取工具和目标网站之间的中间服务器。 该中间服务器充当盾牌&#xff0c;提供匿名性&#xff0c;并允许您访问网站并提取数据&#xff0c;而无需透…

Unity应用开机自启动

使用说明 以代码设置的方式设置Unity应用开机自启动。 将下面脚本挂载到场景物体&#xff0c;通过UI按钮开启应用自启动和取消应用自启动&#xff0c;设置下次运行应用生效。 所用到的Dll下载地址&#xff1a;Interop.IWshRuntimeLibrary 脚本代码 using System; using Syst…

考研数学|「基础」和「强化」阶段分别怎么做?

从目前考研数学的趋势来看&#xff0c;更加注重数学基础的理解和计算量。也就是基础知识和计算&#xff0c;如何锻炼这两种能力就显得尤为重要。希望我的复习经验可以给到读者一些启发。 数学规划 从备考过程来看&#xff0c;数学的复习可以分为三个阶段&#xff1a;1、基础阶…

社区论坛小圈子小程序源码系统:自定义小程序管理社区圈子软件圈子系统系统开发 - 做社区圈子

制作阶段 1.确定需求&#xff1a;首先&#xff0c;要明确小程序的功能需求。例如&#xff0c;用户注册、登录、发表动态、浏览动态、评论、点赞等。同时&#xff0c;要确定页面的布局和设计风格。 2.设计界面&#xff1a;根据需求&#xff0c;进行界面设计。包括首页、个人中…

Android开发——实现简易登录功能

登录界面&#xff08;mainActivity.xml) <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent" android:layout_height&…