Flink DataStream读写Hudi

news2025/1/19 17:21:05

一、pom依赖

测试案例中,pom依赖如下,根据需要自行删减。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>Examples</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hadoop.version>2.6.0</hadoop.version>
        <flink.version>1.14.5</flink.version>
        <kafka.version>2.0.0</kafka.version>
        <hbase.version>1.2.0</hbase.version>
        <hudi.version>0.12.0</hudi.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink1.14-bundle</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <classifier>core</classifier>
            <version>2.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--下面是打印日志的,可以不加-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <scope>provided</scope>
            <version>2.17.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <scope>provided</scope>
            <version>2.17.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <scope>provided</scope>
            <version>2.17.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.16</version>
        </dependency>

        <!--restTemplate启动器-->
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-rest -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-rest</artifactId>
            <version>2.7.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 配置管理 -->
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.2.1</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>${pom.artifactId}-${pom.version}</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.test.main.Examples</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <excludes>
                    <exclude>environment/dev/*</exclude>
                    <exclude>environment/test/*</exclude>
                    <exclude>environment/smoke/*</exclude>
                    <exclude>environment/pre/*</exclude>
                    <exclude>environment/online/*</exclude>
                    <exclude>application.properties</exclude>
                </excludes>
            </resource>
            <resource>
                <directory>src/main/resources/environment/${environment}</directory>
                <targetPath>.</targetPath>
            </resource>
        </resources>
    </build>

    <profiles>
        <profile>
            <!-- 开发环境 -->
            <id>dev</id>
            <properties>
                <environment>dev</environment>
            </properties>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
        </profile>
        <profile>
            <!-- 测试环境 -->
            <id>test</id>
            <properties>
                <environment>test</environment>
            </properties>
        </profile>
        <profile>
            <!-- 冒烟环境 -->
            <id>smoke</id>
            <properties>
                <environment>smoke</environment>
            </properties>
        </profile>
        <profile>
            <!-- 生产环境 -->
            <id>online</id>
            <properties>
                <environment>online</environment>
            </properties>
        </profile>
    </profiles>
</project>

Hudi官网文档链接:

Flink Guide | Apache Hudi

二、DataStream API方式读写Hudi

2.1 写Hudi

package com.test.hudi;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;
import java.util.Map;

public class FlinkDataStreamWrite2HudiTest {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录
        String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";
        StateBackend backend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(backend);
        CheckpointConfig conf = env.getCheckpointConfig();
        // 任务流取消和故障应保留检查点
        conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        conf.setCheckpointInterval(1000);//milliseconds
        conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
        conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔
        conf.setCheckpointStorage(checkPointPath);

        // 3.准备数据
        DataStreamSource<Student> studentDS = env.fromElements(
                new Student(101L, "Johnson", 17L, "swimming"),
                new Student(102L, "Lin", 15L, "shopping"),
                new Student(103L, "Tom", 5L, "play"));

        // 4.创建Hudi数据流
        // 4.1 Hudi表名和路径
        String studentHudiTable = "ods_student_table";
        String studentHudiTablePath = "hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/" + studentHudiTable;
        Map<String, String> studentOptions = new HashMap<>();
        studentOptions.put(FlinkOptions.PATH.key(), studentHudiTablePath);
        studentOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

        HoodiePipeline.Builder studentBuilder = HoodiePipeline.builder(studentHudiTable)
                .column("id BIGINT")
                .column("name STRING")
                .column("age BIGINT")
                .column("hobby STRING")
                .pk("id")
//                .pk("id,age")// 可以设置联合主键,用逗号分隔
                .options(studentOptions);

        // 5.转成RowData流
        DataStream<RowData> studentRowDataDS = studentDS.map(new MapFunction<Student, RowData>() {
            @Override
            public RowData map(Student value) throws Exception {
                try {
                    Long id = value.id;
                    String name = value.name;
                    Long age = value.age;
                    String hobby = value.hobby;

                    GenericRowData row = new GenericRowData(4);
                    row.setField(0, Long.valueOf(id));
                    row.setField(1, StringData.fromString(name));
                    row.setField(2, Long.valueOf(age));
                    row.setField(3, StringData.fromString(hobby));

                    return row;
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        });

        studentBuilder.sink(studentRowDataDS, false);

        env.execute("FlinkDataStreamWrite2HudiTest");
    }

    public static class Student{
        public Long id;
        public String name;
        public Long age;
        public String hobby;

        public Student() {
        }

        public Student(Long id, String name, Long age, String hobby) {
            this.id = id;
            this.name = name;
            this.age = age;
            this.hobby = hobby;
        }

        public Long getId() {
            return id;
        }

        public void setId(Long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Long getAge() {
            return age;
        }

        public void setAge(Long age) {
            this.age = age;
        }

        public String getHobby() {
            return hobby;
        }

        public void setHobby(String hobby) {
            this.hobby = hobby;
        }

        @Override
        public String toString() {
            return "Student{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", age=" + age +
                    ", hobby='" + hobby + '\'' +
                    '}';
        }
    }
}

案例中,通过env.fromElements造三条数据写入Hudi,通过查询,可证明3条数据写入成功:

 在实际开发中,需要切换数据源,比如从kafka读取数据,写入Hudi,将上面的数据源进行替换,并完成RowData转换即可。(切记,一定要开启checkpoint,否则只有一个,hoodie目录。本人在这里踩过坑,调了一个下午,数据都没有写入成功,只有一个hoodie目录,后来经过研究才知道需要设置checkpoint。本案例中,由于是造的三条数据,跑完之后程序就停了,不设置checkpoint,数据也会写入hudi表;但是如果正在的流计算,从kafka读数据,写入hudi,如果不设置checkpoint,数据最终无法写入hudi表)。

2.2 读Hudi

package com.test.hudi;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;
import java.util.Map;

public class FlinkDataStreamReadFromHudiTest {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.创建Hudi数据流
        String studentHudiTable = "ods_student_table";
        String studentHudiTablePath = "hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/" + studentHudiTable;
        Map<String, String> studentOptions = new HashMap<>();
        studentOptions.put(FlinkOptions.PATH.key(), studentHudiTablePath);
        studentOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        studentOptions.put(FlinkOptions.READ_AS_STREAMING.key(), "true");// this option enable the streaming read
        studentOptions.put(FlinkOptions.READ_START_COMMIT.key(), "16811748000000");// specifies the start commit instant time
        studentOptions.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "4");//
        studentOptions.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");//
        HoodiePipeline.Builder studentBuilder = HoodiePipeline.builder(studentHudiTable)
                .column("id BIGINT")
                .column("name STRING")
                .column("age BIGINT")
                .column("hobby STRING")
                .pk("id")
                .options(studentOptions);
        DataStream<RowData> studentRowDataDS = studentBuilder.source(env);

        // 3. 数据转换与输出
        DataStream<Student> studentDS = studentRowDataDS.map(new MapFunction<RowData, Student>() {
            @Override
            public Student map(RowData value) throws Exception {
                try {
                    String rowKind = value.getRowKind().name();
                    Long id = value.getLong(0);
                    String name = value.getString(1).toString();
                    Long age = value.getLong(2);
                    String hobby = value.getString(3).toString();

                    Student student = new Student(id, name, age, hobby, rowKind);
                    return student;
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        });
        studentDS.print();

        env.execute("FlinkDataStreamReadFromHudiTest");
    }

    public static class Student{
        public Long id;
        public String name;
        public Long age;
        public String hobby;
        public String rowKind;

        public Student() {
        }

        public Student(Long id, String name, Long age, String hobby, String rowKind) {
            this.id = id;
            this.name = name;
            this.age = age;
            this.hobby = hobby;
            this.rowKind = rowKind;
        }

        public Long getId() {
            return id;
        }

        public void setId(Long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Long getAge() {
            return age;
        }

        public void setAge(Long age) {
            this.age = age;
        }

        public String getHobby() {
            return hobby;
        }

        public void setHobby(String hobby) {
            this.hobby = hobby;
        }

        public String getRowKind() {
            return rowKind;
        }

        public void setRowKind(String rowKind) {
            this.rowKind = rowKind;
        }

        @Override
        public String toString() {
            return "Student{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", age=" + age +
                    ", hobby='" + hobby + '\'' +
                    ", rowKind='" + rowKind + '\'' +
                    '}';
        }
    }
}

输出结果:

 其中,rowKind,是对行的描述,有 INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE,分别对应op的 +I, -U, +U, -D,表示 插入、更新前、更新后、删除 操作。

三、Table API方式读写Hudi

3.1 写Hudi

3.1.1 数据来自DataStream

package com.test.hudi;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkDataStreamSqlWrite2HudiTest {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

        // 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录
        String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";
        StateBackend backend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(backend);
        CheckpointConfig conf = env.getCheckpointConfig();
        // 任务流取消和故障应保留检查点
        conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        conf.setCheckpointInterval(1000);//milliseconds
        conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
        conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔
        conf.setCheckpointStorage(checkPointPath);

        // 3.准备数据,真实环境中,这里可以替换成从kafka读取数据
        DataStreamSource<Student> studentDS = env.fromElements(
                new Student(201L, "zhangsan", 117L, "eat"),
                new Student(202L, "lisi", 115L, "drink"),
                new Student(203L, "wangwu", 105L, "sleep"));
        // 由于后续没有DataStream的执行算子,可以会报错:
        // Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
        // 不过不影响数据写入Hudi
        // 当然,也可以加一步DataStream的执行算子,比如 print
//        studentDS.print("DataStream: ");

        // 4.通过DataStream创建表
        // 4.1 第一个参数:表名;第二个参数:DataStream;第三个可选参数:指定列名,可以指定DataStream中的元素名和列名的匹配关系,比如 "userId as user_id, name, age, hobby"
        tabEnv.registerDataStream("tmp_student_table", studentDS, "id, name, age, hobby");

        // 5.准备Hudi表的数据流,并将数据写入Hudi表
        tabEnv.executeSql("" +
                "CREATE TABLE out_ods_student_table(\n" +
                "    id BIGINT COMMENT '学号',\n" +
                "    name STRING\t COMMENT '姓名',\n" +
                "    age BIGINT  COMMENT '年龄',\n" +
                "    hobby STRING    COMMENT '爱好',\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ")\n" +
                "WITH(\n" +
                "    'connector' = 'hudi',\n" +
                "    'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'compaction.async.enabled' = 'true',\n" +
                "    'compaction.tasks' = '1',\n" +
                "    'compaction.trigger.strategy' = 'num_commits',\n" +
                "    'compaction.delta_commits' = '3',\n" +
                "    'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +
                "    'hoodie.cleaner.commits.retained'='30',\n" +
                "    'hoodie.keep.min.commits'='35' ,\n" +
                "    'hoodie.keep.max.commits'='40'\n" +
                ")");
        tabEnv.executeSql("insert into out_ods_student_table select id,name,age,hobby from tmp_student_table");


        env.execute("FlinkDataStreamSqlWrite2HudiTest");
    }

    public static class Student{
        public Long id;
        public String name;
        public Long age;
        public String hobby;

        public Student() {
        }

        public Student(Long id, String name, Long age, String hobby) {
            this.id = id;
            this.name = name;
            this.age = age;
            this.hobby = hobby;
        }

        public Long getId() {
            return id;
        }

        public void setId(Long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Long getAge() {
            return age;
        }

        public void setAge(Long age) {
            this.age = age;
        }

        public String getHobby() {
            return hobby;
        }

        public void setHobby(String hobby) {
            this.hobby = hobby;
        }

        @Override
        public String toString() {
            return "Student{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", age=" + age +
                    ", hobby='" + hobby + '\'' +
                    '}';
        }
    }
}

通过查看Hudi表,证明3条数据写入成功:

3.1.2 数据来自Table

package com.test.hudi;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkValuesSqlWrite2HudiTest {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

        // 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录
        String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";
        StateBackend backend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(backend);
        CheckpointConfig conf = env.getCheckpointConfig();
        // 任务流取消和故障应保留检查点
        conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        conf.setCheckpointInterval(1000);//milliseconds
        conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
        conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔
        conf.setCheckpointStorage(checkPointPath);

        // 3.准备Hudi表的数据流,并将数据写入Hudi表
        tabEnv.executeSql("" +
                "CREATE TABLE out_ods_student_table(\n" +
                "    id BIGINT COMMENT '学号',\n" +
                "    name STRING\t COMMENT '姓名',\n" +
                "    age BIGINT  COMMENT '年龄',\n" +
                "    hobby STRING    COMMENT '爱好',\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ")\n" +
                "WITH(\n" +
                "    'connector' = 'hudi',\n" +
                "    'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'compaction.async.enabled' = 'true',\n" +
                "    'compaction.tasks' = '1',\n" +
                "    'compaction.trigger.strategy' = 'num_commits',\n" +
                "    'compaction.delta_commits' = '3',\n" +
                "    'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +
                "    'hoodie.cleaner.commits.retained'='30',\n" +
                "    'hoodie.keep.min.commits'='35' ,\n" +
                "    'hoodie.keep.max.commits'='40'\n" +
                ")");
        tabEnv.executeSql("" +
                "insert into out_ods_student_table values\n" +
                "    (301, 'xiaoming', 201, 'read'),\n" +
                "    (302, 'xiaohong', 202, 'write'),\n" +
                "    (303, 'xiaogang', 203, 'sing')");

        env.execute("FlinkValuesSqlWrite2HudiTest");
    }
}

通过查看Hudi表,证明3条数据写入成功: 

3.2 读Hudi

package com.test.hudi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSqlReadFromHudiTest {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

        // 2.准备Hudi表的数据流,并从Hudi表读取数据
        tabEnv.executeSql("" +
                "CREATE TABLE out_ods_student_table(\n" +
                "    id BIGINT COMMENT '学号',\n" +
                "    name STRING\t COMMENT '姓名',\n" +
                "    age BIGINT  COMMENT '年龄',\n" +
                "    hobby STRING    COMMENT '爱好',\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ")\n" +
                "WITH(\n" +
                "    'connector' = 'hudi',\n" +
                "    'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'compaction.async.enabled' = 'true',\n" +
                "    'compaction.tasks' = '1',\n" +
                "    'compaction.trigger.strategy' = 'num_commits',\n" +
                "    'compaction.delta_commits' = '3',\n" +
                "    'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +
                "    'hoodie.cleaner.commits.retained'='30',\n" +
                "    'hoodie.keep.min.commits'='35' ,\n" +
                "    'hoodie.keep.max.commits'='40'\n" +
                ")");
        tabEnv.executeSql("select id,name,age,hobby from out_ods_student_table").print();

        env.execute("FlinkSqlReadFromHudiTest");
    }
}

输出结果:

四、补充

在Flink Table操作Hudi的时候,可能会涉及到联合组件,可以在SQL中加入联合主键。比如:

tabEnv.executeSql("" +
        "CREATE TABLE out_ods_userinfo_table_test(\n" +
        "    province_id BIGINT COMMENT '省份编号',\n" +
        "    user_id BIGINT COMMENT '用户编号',\n" +
        "    name STRING\t COMMENT '姓名',\n" +
        "    age BIGINT COMMENT '年龄',\n" +
        "    hobby STRING COMMENT '爱好',\n" +
        "    PRIMARY KEY (province_id,user_id) NOT ENFORCED\n" +
        ")\n" +
        "WITH(\n" +
        "    'connector' = 'hudi',\n" +
        "    'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_userinfo_table_test',\n" +
        "    'table.type' = 'MERGE_ON_READ',\n" +
        "    'hoodie.datasource.write.keygenerator.class'='org.apache.hudi.keygen.ComplexKeyGenerator',\n" +
        "    'hoodie.datasource.write.recordkey.field'= 'province_id,user_id',\n" +
        "    'compaction.async.enabled' = 'true',\n" +
        "    'compaction.tasks' = '1',\n" +
        "    'compaction.trigger.strategy' = 'num_commits',\n" +
        "    'compaction.delta_commits' = '3',\n" +
        "    'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +
        "    'hoodie.cleaner.commits.retained'='30',\n" +
        "    'hoodie.keep.min.commits'='35' ,\n" +
        "    'hoodie.keep.max.commits'='40'\n" +
        ")");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/422564.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML+CSS+JS 学习笔记(一)———HTML(下)

&#x1f331;博客主页&#xff1a;大寄一场. &#x1f331;系列专栏&#xff1a;前端 &#x1f331;往期回顾&#xff1a;HTMLCSSJS 学习笔记&#xff08;一&#xff09;———HTML(上) HTMLCSSJS 学习笔记&#xff08;一&#xff09;———HTML(中) &#x1f618;博客制作不易…

Linux工具——gcc和gdb

&#x1f3c0;博主主页 &#x1f3c0;gitee主页 目录&#x1f3c0;Linux编译器-gcc⚽️gcc使用⚽️函数库&#x1f3c0;Linux调试器-gdb⚽️简介⚽️gdb使用&#x1f3c0;Linux项目自动化构建工具-make/Makefile⚽️简介⚽️依赖关系⚽️make/Makefile实现原理⚽️项目清理&…

证明电压电流相位差的余弦值和功率因数相等

证明&#xff1a;“电压电流相位差的余弦值”和“功率因数”相等。 电压电流相位差的余弦值和功率因数相等&#xff0c;这在《电路分析》中给出过结论&#xff0c;但没有给出详细的证明过程。其次&#xff0c;在电气工程师考试中&#xff0c;也会经常遇到。 电压电流相位差&am…

【Linux】虚拟机的克隆

【想要克隆虚拟机&#xff0c;被克隆的虚拟机必须是关机状态&#xff1b;】 一、克隆虚拟机 1、右击想要克隆的虚拟机 2、进入到这个页面后点击“下一步” 3、进入到这个页面后点击“下一步” 4、进入这个页面后选“创建完整克隆”&#xff0c;再点击下一步 5、最好将位置改成…

入门IC必读书目,你想知道的都在这里

在IC行业&#xff0c;技术和经验都很重要&#xff0c;为了更好的学习&#xff0c;现为大家整理了各岗位的学习书目。 通用基础类 《半导体物理学》 这本书被国内大部分高校都采用为半导体物理课程的教材。同时&#xff0c;也是部分高校推荐使用的微电子专业硕士生初试参考书。…

【cmake学习】搭建一个简单的cmake工程(优化版)

之前搭建了一个基本的cmake工程&#xff0c;仅使用了一个 CMakeLists.txt 文件来管理整个工程&#xff0c;实际上一个工程里可以包含多个 CMakeLists.txt 文件&#xff0c;这样做的目的是把引入所需文件、生成执行文件/库文件 这两个工作交由两个 CMakeLists.txt 分别实现。 【…

接口自动化【一】(抓取后台登录接口+postman请求通过+requests请求通过+json字典区别)

文章目录 前言一、requests库的使用二、json和字典的区别三、后端登录接口-请求数据生成四、接口自动化-对应电商项目中的功能五、来自postman的代码-后端登录总结前言 记录&#xff1a;json和字典的区别&#xff0c;json和字段的相互转化&#xff1b;postman发送请求与Python…

Python:清华ChatGLM-6B中文对话模型部署

1、简介 ChatGLM-6B 是一个开源的、支持中英双语的对话语言模型&#xff0c;基于 General Language Model (GLM) 架构&#xff0c;具有 62 亿参数。结合模型量化技术&#xff0c;用户可以在消费级的显卡上进行本地部署&#xff08;INT4 量化级别下最低只需 6GB 显存&#xff0…

SpringBoot程序运行时动态修改主数据库配置(不需要改配置,不需要重启)

SpringBoot程序运行时修改主数据库配置&#xff08;不需要改配置&#xff0c;不需要重启&#xff09;搞事背景心路历程搞事背景 在面试某家单位的时候&#xff0c;碰到了一家单位线上考试&#xff0c;要求开发一个springboot后台。一眼看去都是正常的需求&#xff0c;突然我在…

Raft: 基于 Log 复制的共识算法

References Raft 演示 In Search of an Understandable Consensus Algorithm (Extended Version) 1. Raft 是什么 1.1 目标: 复制 Log 在讲解 Raft 协议的具体行为之前我们需要明白 Raft 的目标是什么&#xff1f;在一些情况下我们需要保证分布式集群中的机器拥有相同的数…

IOC容器——Bean

IOC容器——BeanBean配置name别名属性Bean作用范围scopeBean的实例化构造方法示例化静态工厂实例化实例工厂与FactoryBean实例工厂FactoryBeanbean的生命周期Bean配置 name别名属性 Bean ID 唯一&#xff0c;而关于Spring别名&#xff0c;我们可以在配置文件中使用name来定义&…

Google Play管理中心和ASO的重要性

Android Vitals 是我们应用优化的重要组成部分&#xff0c;能够显示应用的运行状况。一般来说&#xff0c;如果应用具有良好的体验&#xff0c;它会更容易在Google Play中被用户发现&#xff0c;从而获得更好的排名和更多的安装量。 从开发者的角度来看&#xff0c;Android Vi…

JAVA8新特性stream流收集为Map,value为null导致空指针的问题

jdk8 新特性stream深受喜爱&#xff0c;平时使用比较多&#xff0c;其中有&#xff1a; Map<String, String> collect2 list.stream().collect(Collectors.toMap(Book::getName, Book::getIdNO,(pre, after) -> pre)); 现象如下&#xff1a; package MainTest.str…

HTML5 <nav> 标签、HTML5 <noscript> 标签

HTML5 <nav> 标签 实例 HTML5 <nav>标签用于表示HTML页面中的导航&#xff0c;可以是页与页之间导航&#xff0c;也可以是页内的段与段之间导航。 一个导航链接实例&#xff1a; <nav> <a href"/html/">HTML</a> | <a href&qu…

关于pinduoduo开放接口测试

什么是接口测试 接口测试是测试系统组件间接口的一种方式&#xff0c;接口测试主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点。测试的重点是检查数据的增删改查操作&#xff0c;以及系统之间的逻辑关系等。 接口测试作为集成测试的一部分&#xff0c;通过直接…

归并排序(非递归实现) 计数排序

上一期我们说了归并排序的递归是如何实现的&#xff0c;但是递归如果层次太多的话容易栈溢出&#xff0c;所以我们还需要掌握非递归的实现&#xff0c;但是我们非递归需要如何实现&#xff1f; 下面我们就来看一下非递归的实现 归并排序的非递归实现他并不需要栈队列这些东西…

No.042<软考>《(高项)备考大全》【第26章】法律法规(合同法、招投标法、政府采购法、著作权法)

【第26章】法律法规&#xff08;合同法、招投标法、政府采购法、著作权法&#xff09;1 考试相关2 合同法练习题参考答案3 招投标法3.1 法规时间总结3.2 招投标流程3.3 招标3.4 投标3.5 评标3.6 练习题参考答案3.7 论文写作3.8 投标文件的编写应该注意哪些事项4 著作权法4.1 练…

找漏洞赚外快?给ChatGPT挑毛病,最高奖励14万

反正闲着也是闲着&#xff0c;不如来给ChatGPT找漏洞&#xff1f;毕竟&#xff0c;万一真的找到漏洞了还能赚一笔外快。 当地时间 4 月 11 日&#xff0c;OpenAI 宣布推出漏洞赏金计划。该公司将根据报告问题的严重性和影响提供现金奖励&#xff0c;奖励范围从 200 美元到 200…

Spring经典扩展接口应用:BeanPostProcessor

备注&#xff1a;新进行基本思路总结&#xff0c;四五月总结完 一、BeanPostProcessor基本知识总结 BeanPostProcessor是Bean级处理器&#xff0c;用于在bean实例化后、初始化后自定义修改bean实例&#xff0c;如属性校验、针对自定义bean做统一处理等。 BeanPostProcessor接…

实战:向人工智能看齐用Docker部署一个ChatGPT

文章目录前言鉴赏chatgpt环境要求开始搭建云安装docker从docker仓库拉取chatgpt-web镜像创建容器并运行chatgpt-web创建容器启动chatgpt-web访问自己的chatgpt总结前言 目前GPT-4都官宣步入多模态大型语言模型领域了&#xff0c;大佬竟然还没有体验GPT么。作为一个资深搬砖人士…