第八章 Flink集成Iceberg的DataStreamAPI、TableSQLAPI详解

news2024/11/25 12:26:20

在这里插入图片描述


1、概述

​ 目前Flink支持使用DataStream API 和SQL API方式实时读取和写入I=ceberg表,建议使用SQL API方式实时读取和写入Iceberg表。

  • Iceberg支持的Flink版本为1.11.x版本以上,以下为版本匹配关系
Flink版本Iceberg版本备注
Flink1.11.XIceberg0.11.1
Flink1.12.x ~ Flink1.13.xIceberg0.12.1SQL API有Bug
Flink1.14.xIceberg0.12.1SQL API有Bug

​ 本次学习以Flink和Iceberg整合使用Flink版本为1.14.5,Iceberg版本为0.12.1版本

2、DataStream API

2.1、实时写入Iceberg表

2.1.1、导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flinkiceberg1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <!-- flink 1.12.x -1.13.x  版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容-->
        <flink.version>1.13.5</flink.version>
        <!--<flink.version>1.12.1</flink.version>-->
        <!--<flink.version>1.14.2</flink.version>-->
        <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
        <!--<flink.version>1.11.6</flink.version>-->
        <hadoop.version>3.1.1</hadoop.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-iceberg</artifactId>
            <version>1.13-vvr-4.0.7</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava-parent</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime</artifactId>
            <version>0.12.1</version>
            <!--<version>0.11.1</version>-->
        </dependency>
        <!-- java开发Flink所需依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Kafka连接器的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 读取hdfs文件需要jar包-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Flink SQL & Table-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.5</version>
        </dependency>
    </dependencies>


</project>
2.1.2、创建Iceberg表
  • 核心:通过Flink创建Iceberg表
-- 1、创建catalog
 CREATE CATALOG hadoop_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='hadoop',
>   'warehouse'='hdfs://leidi01:8020/iceberg_catalog',
>   'property-version'='1'
> );

-- 2、创建databases
create database flink_iceberg;

-- 3、创建Sink表
CREATE TABLE hadoop_catalog.flink_iceberg.icebergdemo1 (
    id STRING,
    data STRING
); 
  • 运行结果

在这里插入图片描述

2.1.3、代码实现
public class FlinkIcebergDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。
        env.enableCheckpointing(5000);

        //2.读取Kafka 中的topic 数据
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.6.102:6667")
                .setTopics("json")
                .setGroupId("my-group-id")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        //3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。
        SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() {
            @Override
            public RowData map(String s) throws Exception {
                System.out.println("s = "+s);
                String[] split = s.split(",");
                GenericRowData row = new GenericRowData(4);
                row.setField(0, Integer.valueOf(split[0]));
                row.setField(1, StringData.fromString(split[1]));
                row.setField(2, Integer.valueOf(split[2]));
                row.setField(3, StringData.fromString(split[3]));
                return row;
            }
        });

        //4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
        Configuration hadoopConf = new Configuration();
        Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://leidi01:8020/flinkiceberg/");

        //配置iceberg 库名和表名
        TableIdentifier name =
                TableIdentifier.of("icebergdb", "flink_iceberg_tbl");

        //创建Icebeng表Schema
        Schema schema = new Schema(
                Types.NestedField.required(1, "id", Types.IntegerType.get()),
                Types.NestedField.required(2, "nane", Types.StringType.get()),
                Types.NestedField.required(3, "age", Types.IntegerType.get()),
                Types.NestedField.required(4, "loc", Types.StringType.get()));

        //如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区
//        PartitionSpec spec = PartitionSpec.unpartitioned();
        PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();

        //指定Iceberg表数据格式化为Parquet存储
        Map<String, String> props =
                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
        Table table = null;

        // 通过catalog判断表是否存在,不存在就创建,存在就加载
        if (!catalog.tableExists(name)) {
            table = catalog.createTable(name, schema, spec, props);
        }else {
            table = catalog.loadTable(name);
        }

        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://leidi01:8020/flinkiceberg//icebergdb/flink_iceberg_tbl", hadoopConf);

        //5.通过DataStream Api 向Iceberg中写入数据
        FlinkSink.forRowData(dataStream)
                //这个 .table 也可以不写,指定tableLoader 对应的路径就可以。
                .table(table)
                .tableLoader(tableLoader)
                //默认为false,追加数据。如果设置为true 就是覆盖数据
                .overwrite(false)
                .build();

        env.execute("DataStream Api Write Data To Iceberg");
    }
}
  • 注意事项:

(1)需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据

(2)读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。

(3)在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。

(4)不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。

2.1.4、Kafka消费者启动
bin/kafka-console-producer.sh --topic json  --broker-list leidi01:6667

bin/kafka-console-consumer.sh --bootstrap-server  leidi01:6667 --topic json --from-beginning
  • 生产数据

在这里插入图片描述

  • 运行结果:data中有两个分区

在这里插入图片描述

2.1.5、查询表结果
  • 说明:在Flink SQL中创建Hadoop Catalog
-- 1、创建Hadoop Catalog
CREATE CATALOG flinkiceberg WITH (
    'type'='iceberg',
    'catalog-type'='hadoop',
    'warehouse'='hdfs://leidi01:8020/flinkiceberg/',
    'property-version'='1'
);

-- 2、查询表中数据
use catalog flinkiceberg;
use icebergdb;
select * from flink_iceberg_tbl;
  • 运行结果

在这里插入图片描述

2.2、批量/实时读取Iceberg表

  • 核心:DataStream API 读取Iceberg表又分为批量读取和实时读取,通过方法“streaming(true/false)”来控制。
2.2.1、批量读取
  • 说明:设置方法“streaming(false)
  • 代码实现
public class FlinkIcebergRead {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //1.配置TableLoader
        Configuration hadoopConf = new Configuration();
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://leidi01:8020/flinkiceberg//icebergdb/flink_iceberg_tbl", hadoopConf);

        //2.从Iceberg中读取全量/增量读取数据
        DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
                .tableLoader(tableLoader)
                //默认为false,整批次读取,设置为true 为流式读取
                .streaming(false)
                .build();

        batchData.map(new MapFunction<RowData, String>() {
            @Override
            public String map(RowData rowData) throws Exception {
                int id = rowData.getInt(0);
                String name = rowData.getString(1).toString();
                int age = rowData.getInt(2);
                String loc = rowData.getString(3).toString();
                return id+","+name+","+age+","+loc;
            }
        }).print();

        env.execute("DataStream Api Read Data From Iceberg");

    }
}
  • 运行结果

在这里插入图片描述

2.2.2、实时读取
  • 说明:设置方法“streaming(true)

  • 代码实现

DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
    .tableLoader(tableLoader)
    //默认为false,整批次读取,设置为true 为流式读取
    .streaming(true)
    .build();
  • Flink SQL插入数据
insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
  • 运行结果

在这里插入图片描述

2.2.3、指定基于快照实时增量读取数据
  • 核心:设置方法StartSnapshotId(快照编号)

(1)查看快照编号

在这里插入图片描述

(2)代码实现

//2.从Iceberg中读取全量/增量读取数据
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
    .tableLoader(tableLoader)
    //基于某个快照实时增量读取数据,快照需要从元数据中获取
    .startSnapshotId(1738199999360637062L)
    //默认为false,整批次读取; 设置为true为流式读取
    .streaming(true)
    .build();

(3)运行结果

  • 说明:*只读取到指定快照往后的数据*

在这里插入图片描述

2.2.4、合并Data Flies
  • 说明:Iceberg提供Api通过定期提交任务将小文件合并成大文件,可以通过Flink 批任务来执行

(1)未处理文件

  • 说明:Iceberg每提交一次数据都会产生一个Data File。

在这里插入图片描述

(2)代码实现

public class RewrietDataFiles {
    public static void main(String[] args) {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 1、配置TableLoader
        Configuration hadoopConf = new Configuration();
        //2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
        Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://leidi01:8020/flinkiceberg/");

        //3.配置iceberg 库名和表名并加载表
        TableIdentifier name = TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
        Table table = catalog.loadTable(name);

        //4..合并 data files 小文件
        RewriteDataFilesActionResult result = Actions.forTable(table)
                .rewriteDataFiles()
                //默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。
                .targetSizeInBytes(536870912L)
                .execute();

    }
}

(3)运行结果

在这里插入图片描述

3、SQL API

3.1、创建表并插入数据

(1)代码实现

public class SQLAPIWriteIceberg {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
        env.enableCheckpointing(1000);

        //1.创建Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
                "'type'='iceberg'," +
                "'catalog-type'='hadoop'," +
                "'warehouse'='hdfs://leidi01:8020/flinkiceberg')");

        //2.使用当前Catalog
        tblEnv.useCatalog("hadoop_iceberg");

        //3.创建数据库
        tblEnv.executeSql("create database iceberg_db");

        //4.使用数据库
        tblEnv.useDatabase("iceberg_db");

        //5.创建iceberg表 flink_iceberg_tbl
        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");

        //6.写入数据到表 flink_iceberg_tbl
        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");
    }
}

(2)运行结果

  • 说明:通过HDFS查看文件是否生成。

在这里插入图片描述

(3)查看数据

  • 说明:通过FlinkSQL查看表中数据
-- 1、创建Catalog
 CREATE CATALOG flinkiceberg WITH (
>     'type'='iceberg',
>     'catalog-type'='hadoop',
>     'warehouse'='hdfs://leidi01:8020/flinkiceberg/',
>     'property-version'='1'
> );

-- 2、查询数据
use catalog flinkiceberg
use iceberg_db;
select * from flink_iceberg_tbl2;
  • 查看结果

在这里插入图片描述

3.2、批量查询表数据

  • 说明:SQL API批量查询表中数据,直接查询显示即可

(1)代码逻辑

在这里插入图片描述

(2)代码实现

public class SQLAPIReadIceberg {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

//1.创建Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
                "'type'='iceberg'," +
                "'catalog-type'='hadoop'," +
                "'warehouse'='hdfs://leidi01:8020/flinkiceberg')");
//2.批量读取表数据
        TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");

        tableResult.print();
    }
}
  • 运行结果

在这里插入图片描述

3.3、实时查询表数据

  • 说明:link SQL API 实时查询Iceberg表数据时需要设置参数**“table.dynamic-table-options.enabled”为true**,以支持SQL语法中的“OPTIONS”选项

(1)代码逻辑

在这里插入图片描述

(2)代码实现

public class SQLStreamReadIceberg {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

        Configuration configuration = tblEnv.getConfig().getConfiguration();
        // 支持SQL语法中的 OPTIONS 选项
        configuration.setBoolean("table.dynamic-table-options.enabled", true);

        //1.创建Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
                "'type'='iceberg'," +
                "'catalog-type'='hadoop'," +
                "'warehouse'='hdfs://leidi01:8020/flinkiceberg')");

        //2.从Iceberg表当前快照读取所有数据,并继续增量读取数据
        // streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s
        TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

        tableResult.print();
    }
}
  • 运行结果:

在这里插入图片描述

(3)测试验证

  • FlinkSQL插入数据
insert into flink_iceberg_tbl2 values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
  • 运行结果:在IDEA的控制台可以看到新增数据

在这里插入图片描述

3.4、基于快照实时增量读取数据

  • 说明:基于某个snapshot-id来继续实时获取数据

(1)代码逻辑

在这里插入图片描述

(2)代码实现

  • FlinkSQL插入数据
insert into flink_iceberg_tbl2 values (7,'s11',30,'beijing'),(8,'s22',31,'beijing');
  • snapshot-id如下:

在这里插入图片描述

  • 代码实现
public class SQLSnapshotReadIceberg {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
        env.enableCheckpointing(1000);

        Configuration configuration = tblEnv.getConfig().getConfiguration();
        // 支持SQL语法中的 OPTIONS 选项
        configuration.setBoolean("table.dynamic-table-options.enabled", true);

        //1.创建Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
                "'type'='iceberg'," +
                "'catalog-type'='hadoop'," +
                "'warehouse'='hdfs://leidi01:8020/flinkiceberg')");

        //2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取
        //start-snapshot-id :快照ID
        TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='8334669420406375204')*/");
        tableResult2.print();
    }
}

(3)运行结果

在这里插入图片描述

4、常见报错

4.1、window远程连接hadoop环境变量找不到

  • 报错日志
HADOOP_HOME and hadoop.home.dir are unset.
  • 报错原因:本地远程连接Hadoop系统时需要在本地配置相关的Hadoop变量,主要包括hadoop.dll 与 winutils.exe 等。
winutils:由于hadoop主要基于linux编写,**winutil.exe主要用于模拟linux下的目录环境**。当Hadoop在windows下运行或调用远程Hadoop集群的时候,需要该辅助程序才能运行。winutils是Windows中的二进制文件,适用于不同版本的Hadoop系统并构建在Windows VM上,该VM用以在Windows系统中测试Hadoop相关的应用程序。
  • 解决方案:

(1)下载hadoop集群对应winutils版本

  • 注意事项:如果你安装的hadoop版本是:3.1.2或者3.2.0 就用winutils-master里面的hadoop-3.0.0配置环境变量吧!
https://github.com/steveloughran/winutils

(2)将环境变量%HADOOP_HOME%设置为指向包含WINUTILS.EXE的BIN目录上方的目录

在这里插入图片描述

4.2、guava包版本冲突

  • 报错日志
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
  • 报错原因:guava包版本冲突
  • 解决方案:使用Maven Helper插件解决冲突

①第一步:在pom界面点击Dependency Analyzer

在这里插入图片描述

②第二步:查看Dependency Analyzer功能界面

在这里插入图片描述

Ⅰ、显示冲突的jar包

Ⅱ、以列表形式显示所有依赖

Ⅲ、以数的形式显示所有依赖

③第三步:逐个解决conflicts列表中的jar包冲突问题,以guava为例:

​ 点击guava,找到右侧部分红色字体,即依赖冲突的地方,下图显示当前guava版本是24.0,但是有两个依赖的guava版本分别是27.0.0.1和16.0.1。

④将低版本依赖都排除掉

在这里插入图片描述

选中红色字体显示的内容->右键->Exclude,完成上述步骤结果如下:

在这里插入图片描述

⑤重新加载依赖配置

在这里插入图片描述

-------------------------------------------------------------------分割线-------------------------------------------------------------------------------

以上guava包冲突解决后依旧报错,将Hadoop版本从3.2.2降低到3.1.1不报错。

​ 注意hive-3.1.2依赖的Hadoop版本是3.1.0 [3],一般不建议runtime的Hadoop版本高于hive依赖的版本。

Ⅰ、解决方法一是在hive-exec里对guava做迁移,这个需要自己手动给hive-exec重新打包

Ⅱ、解决方法二是降低Hadoop版本,这里不一定要降低集群的Hadoop版本,而只是降低flink和hive这边用到的Hadoop版本,相对于用老的Hadoop客户端去访问新的Hadoop服务器,这个小版本的包容性一般来说是没有问题的。

<hadoop.version>3.2.2</hadoop.version>
<!-->将hadoop版本由3.2.2版本降低为3.1.1<-->
<hadoop.version>3.1.1</hadoop.version>
4.4、log4j2配置文件报错
  • 报错日志
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'org.apache.logging.log4j.simplelog.StatusLogger.level' to TRACE to show Log4j2 internal initialization logging.
  • 报错原因:没有发现log4j2配置文件
  • 解决方案:添加配置log4j2.xml文件,对应org.apache.logging.log4j.Logger
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
	<Properties>
		<property name="log_level" value="info" />
		<Property name="log_dir" value="log" />
		<property name="log_pattern"
			value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%p] - [%t] %logger - %m%n" />
		<property name="file_name" value="test" />
		<property name="every_file_size" value="100 MB" />
	</Properties>
	<Appenders>
		<Console name="Console" target="SYSTEM_OUT">
			<PatternLayout pattern="${log_pattern}" />
		</Console>
		<RollingFile name="RollingFile"
			filename="${log_dir}/${file_name}.log"
			filepattern="${log_dir}/$${date:yyyy-MM}/${file_name}-%d{yyyy-MM-dd}-%i.log">
			<ThresholdFilter level="DEBUG" onMatch="ACCEPT"
				onMismatch="DENY" />
			<PatternLayout pattern="${log_pattern}" />
			<Policies>
				<SizeBasedTriggeringPolicy
					size="${every_file_size}" />
				<TimeBasedTriggeringPolicy modulate="true"
					interval="1" />
			</Policies>
			<DefaultRolloverStrategy max="20" />
		</RollingFile>
 
		<RollingFile name="RollingFileErr"
			fileName="${log_dir}/${file_name}-warnerr.log"
			filePattern="${log_dir}/$${date:yyyy-MM}/${file_name}-%d{yyyy-MM-dd}-warnerr-%i.log">
			<ThresholdFilter level="WARN" onMatch="ACCEPT"
				onMismatch="DENY" />
			<PatternLayout pattern="${log_pattern}" />
			<Policies>
				<SizeBasedTriggeringPolicy
					size="${every_file_size}" />
				<TimeBasedTriggeringPolicy modulate="true"
					interval="1" />
			</Policies>
		</RollingFile>
	</Appenders>
	<Loggers>
		<Root level="${log_level}">
			<AppenderRef ref="Console" />
			<AppenderRef ref="RollingFile" />
			<appender-ref ref="RollingFileErr" />
		</Root>
	</Loggers>
</Configuration>
4.5、Flink Hive Catalog报错
  • 报错日志
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.calcite.sql.parser.SqlParser.config()Lorg/apache/calcite/sql/parser/SqlParser$Config;
  • 报错原因:依赖报错

  • 解决方案:将所有依赖切换到2.12,切换flink-table-api-java-bridgeflink-table-api-scala-bridge_2.12

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/336693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows 安装 Docker

一、Docker 官网 Dockerhttps://hub.docker.com/下载地址 window Docker 下载地址https://desktop.docker.com/win/main/amd64/Docker%20Desktop%20Installer.exe 二、安装 检测是否已经安装了docker docker -v 如下图 &#xff1a; 如果没有安装&#xff0c;则在第一点下…

【Spark分布式内存计算框架——Spark Core】5. RDD 函数补充:关联函数与练习

关联函数 当两个RDD的数据类型为二元组Key/Value对时&#xff0c;可以依据Key进行关联Join。 首先回顾一下SQL JOIN&#xff0c;用Venn图表示如下&#xff1a; RDD中关联JOIN函数都在PairRDDFunctions中&#xff0c;具体截图如下&#xff1a; 具体看一下join&#xff08;等…

1624_MIT 6.828 stabs文档信息整理_上

全部学习汇总&#xff1a; GreyZhang/g_unix: some basic learning about unix operating system. (github.com) 前面为了完成MIT 6.828的堆栈信息解析大概看了一下这个文档&#xff0c;现在把看过的信息简单整理一下。 1. stabs主要是为调试器提供调试信息用的描述数据&#x…

11.XMLHttpRequest的进阶用法

目录 1 设置HTTP的请求时限 2 Formdata对象 2.1 简单使用 2.2 Formdata直接获取表单数据 2.3 上传文件 2.4 上传文件的进度 3 定义API根路径 4 请求出错 上面我们用的都是旧版的XMLHttpRequest&#xff0c;旧版有两个缺点 无法上传文件没有传送数据的进度信息…

Docker部署ddns-go,动态域名解析公网IPv6地址

Docker部署ddns-go&#xff0c;动态域名解析公网IPv6地址 ddns-go&#xff0c;自动获得你的公网 IPv4 或 IPv6 地址&#xff0c;并解析到对应的域名服务。 嫌IPv6地址太难记&#xff1f;ddns来解决&#xff0c;将家里的公网IPv6地址用起来吧&#xff01; 前言 为什么需要DDNS …

Java设计模式——装饰器模式

目录 1.什么是装饰器模式 2.装饰器模式优缺点 3.装饰器模式结构 4.代码示例 1.什么是装饰器模式 装饰器模式&#xff0c;指在不改变原有对象结构的基础情况下&#xff0c;动态地给该对象增加一些额外功能的职责。装饰器模式相比生成子类更加灵活。它属于对象结构型模式。 …

DCGAN

DCGAN的论文地址[https://arxiv.org/pdf/1511.06434.pdf]。DCGAN是GAN的一个变体&#xff0c;DCGAN就是将CNN和原始的GAN结合到一起&#xff0c;生成网络和鉴别网络都运用到了深度卷积神经网络。DCGAN提高了基础GAN的稳定性和生成结果质量。DCGAN主要是在网络架构上改进了原始的…

JVET-AC0315:用于色度帧内预测的跨分量Merge模式

ECM采用了许多跨分量的预测&#xff08;Cross-componentprediction&#xff0c;CCP&#xff09;模式&#xff0c;包括跨分量包括跨分量线性模型&#xff08;CCLM&#xff09;、卷积跨分量模型&#xff08;CCCM&#xff09;和梯度线性模型&#xff08;GLM&#xff09;&#xff0…

软件太多?1 秒找到您想要的应用

您的手机页面是怎样的呢&#xff1f; 也许像这样&#xff0c;所有 APP 平铺在一起~ 亦或是这样&#xff0c;将所有 APP 分类整理好~ 在整理的过程中&#xff0c;我们免不了要进行 “拖拽” 。 平铺式的呈现方式&#xff0c;如果 APP 数量众多&#xff0c;我们免不了要进行搜索…

基于Java的题库管理系统的设计与实现

技术&#xff1a;Java、JSP等摘要&#xff1a;随着我国社会的进步&#xff0c;社会的各个领域的创新改革都在高速发展的信息时代下突显出来。在信息时代这个大的背景下&#xff0c;对软件开发的要求越来越高&#xff0c;对考试的管理更需要加强&#xff0c;所以迫使我们不得不对…

【设计模式之美 设计原则与思想:面向对象】12丨实战一(下):如何利用基于充血模型的DDD开发一个虚拟钱包系统?

上一节课&#xff0c;我们做了一些理论知识的铺垫性讲解&#xff0c;讲到了两种开发模式&#xff0c;基于贫血模型的传统开发模式&#xff0c;以及基于充血模型的 DDD 开发模式。今天&#xff0c;我们正式进入实战环节&#xff0c;看如何分别用这两种开发模式&#xff0c;设计实…

疫情开发,软件测试行情趋势是怎么样的?

如果说&#xff0c;2022年对于全世界来说&#xff0c;都是一场极大的挑战的话&#xff1b;那么&#xff0c;2023年绝对是机遇多多的一年。众所周知&#xff0c;随着疫情在全球范围内逐步得到控制&#xff0c;无论是国际还是国内的环境&#xff0c;都会呈现逐步回升的趋势&#…

每天一个摸鱼小技巧之「理解代码评审」

每个人的代码风格不同&#xff0c;在需要团队协作的项目里&#xff0c;如果没有统一的编程规范&#xff0c;那么会出现各式各样的代码&#xff0c;这对于团队成员来讲是个「灾难」。在需要对接协作模块时&#xff0c;要花费大量的时间去阅读代码&#xff0c;如果注释写的不明确…

观点分享 | 冲量在线联合创始人CTO陈浩栋:基于可信执行环境构建更安全的数据流通方案

2022年12月28日&#xff0c;由中国信息通信研究院&#xff08;以下简称中国信通院&#xff09;、中国通信标准化协会指导&#xff0c;隐私计算联盟、中国通信标准化协会大数据技术标准推进委员会联合主办的2022可信隐私计算峰会在北京召开&#xff0c;本次峰会的主题为“推进隐…

IO多路转接 —— poll和epoll

文章目录1. poll1.1 poll的函数接口1.2 poll的简单测试程序1.3 poll的优缺点分析2. epoll2.1 epoll的函数接口2.2 epoll的工作原理2.3 epoll的工作模式(LT,ET)2.4 epoll的简易服务器实现(默认是LT工作模式)前言&#xff1a; 接上文讲述的select&#xff0c;它有缺点&#xff0c…

一个UML的例子

一、需求分析 1.概念定义 需求分析&#xff0c;要明确以下4个概念的具体内容&#xff1a; 资源是指使用或产生的对象&#xff0c;如人、物料、信息、产品等。 目标是指希望资源处于什么样的状态。 过程是指被执行的活动&#xff0c;这些活动会改变资源的状态。 规则是指在某些…

2023-02-09 - 3 Elasticsearch基础操作

本章主要介绍ES的基础操作&#xff0c;具体包括索引、映射和文档的相关操作。其中&#xff0c;在文档操作中将分别介绍单条操作和批量操作。在生产实践中经常会通过程序对文档进行操作&#xff0c;因此在介绍文档操作时会分别介绍DSL请求形式和Java的高级REST编码形式。 1 索引…

VIF原理

文章目录一、VIF公式和原理对于R方一般回归模型皮尔逊相关系数中的方差VIF原理&#xff1a;一、VIF公式和原理 所谓VIF方法&#xff0c;计算难度并不高。在线性回归方法里&#xff0c;应用最广泛的就是最小二乘法&#xff08;OLS&#xff09;&#xff0c;只不过我们对每个因子…

锁的优化机制了解嘛?请进!

点个关注&#xff0c;必回关 文章目录自旋锁&#xff1a;自适应锁&#xff1a;锁消除&#xff1a;锁粗化&#xff1a;偏向锁&#xff1a;轻量级锁&#xff1a;从JDK1.6版本之后&#xff0c;synchronized本身也在不断优化锁的机制&#xff0c;有些情况下他并不会是一个很重量级的…

Open AI登录,可以使用ChatGPT!

目录 第一步&#xff1a;准备好上网工具 1、确认自己的IP 二、登录 三、无法登录的问题 第一步&#xff1a;准备好上网工具 1、确认自己的IP 先查看自己的电脑的IP&#xff0c;使用https://www.ip138.com/,看看&#xff0c;然后就可以将自己电脑的IP改成国外的IP&#x…