hudi数据湖万字全方位教程+应用示例

news2025/1/18 10:44:39

1时间轴(TimeLine

Hudi的核心是维护表上在不同的即时时间(instants执行的所有操作的时间轴(timeline,这有助于提供表的即时视图

一个instant由以下三个部分组成:

1Instant action:在表上执行的操作类型

COMMITS:一次commit表示将一批数据原子性地写入一个表。

CLEANS:清除表中不再需要的旧版本文件的后台活动。

DELTA_COMMIT:增量提交指的是将一批数据原子性地写入一个MergeOnRead类型的表,其中部分或所有数据可以写入增量日志。

COMPACTION:合并Hudi内部差异数据结构的后台活动,例如:将更新操作从基于行的log日志文件合并到列式存储的数据文件。在内部,COMPACTION体现为timeline上的特殊提交。

ROLLBACK:表示当commit/delta_commit不成功时进行回滚,其会删除在写入过程中产生的部分文件。

SAVEPOINT:将某些文件组标记为已保存,以便其不会被删除。在发生灾难需要恢复数据的情况下,它有助于将数据集还原到时间轴上的某个点。

2Instant time:时间

通常是一个时间戳(例如:20190117010349),它按照动作开始时间的顺序单调增加。

3State:状态

REQUESTED:表示某个action已经调度,但尚未执行。

INFLIGHT:表示action当前正在执行

        COMPLETED:表示timeline上的action已经完成

区分两个重要的时间概念:

Arrival time: 数据到达 Hudi 的时间,commit time。

Event time: record 中记录的时间。

10:20 来了一条 9:00 的数据,根据event time该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 (commit time)之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到

2、文件布局

Hudi存储分为两个部分:

(1)元数据.hoodie目录对应着表的元数据信息,包括表的版本管理(Timeline)、归档目录(存放过时的instant也就是版本),一个instant记录了一次提交(commit)的行为、时间戳和状态,Hudi以时间轴的形式维护了在数据集上执行的所有操作的元数据;

(2)数据:和hive一样,以分区方式存放数据分区里面存放着Base File.parquet)和Log File.log.*

每个分区中,文件被组织成文件组,每个文件组包含几个文件片

每个文件片包含:

一个基本文件(.parquet):在某个commit/compaction即时时间(instant time)生成的(MOR可能没有)

多个日志文件(.log.*),这些日志文件包含自生成基本文件以来对基本文件的插入/更新(COW没有

索引(Index

给定的hoodie key(record key + partition path)与文件组id建立唯一映射。这种映射关系,数据第一次写入文件后保持不变

一个 FileGroup 包含了一批 record 的所有版本记录。Index 用于区分消息是 INSERT 还是 UPDATE。

索引选项

Bloom Index 默认配置,使用布隆过滤器来判断记录存在与否

Simple Index  性能比较差

HBase Index 把index存放在HBase里面,对于小批次的keys,查询效率高

Flink State-based

Index

Flink只有一种state based index(和bucket_index),其他index是Spark可选配置。

全局索引:全局索引在全表的所有分区范围下强制要求键的唯一性,但是随着表增大,update/delete 操作损失的性能越高,因此更适用于小表

非全局索引:默认的索引实现,只能保证数据在分区的唯一性。更适用于大表

HBase索引本质上是一个全局索引,bloom和simple index都有全局选项:

hoodie.index.type=GLOBAL_BLOOM

hoodie.index.type=GLOBAL_SIMPLE

对维度表的随机更删,使用简单索引对此场景更合适,如果额外的运维成本可以接受的话,也可以采用HBase索引

表类型

Copy On Write

在COW表中,只有数据文件/基本文件(.parquet,没有增量日志文件(.log.*

对每一个新批次写入都将创建相应数据文件的新版本(新的FileSlice),新版本文件包括旧版本文件的记录以及来自传入批次的记录(全量最新)。

data_file1 和 data_file2 都将创建更新的版本,data_file1 V2 data_file1 V1 的内容与data_file1 中传入批次匹配记录的记录合并。

由于在写入期间进行合并,COW 会产生一些写入延迟

Merge On Read

MOR表中,包含列存的基本文件(.parquet)和行存的增量日志文件(基于行的avro格式,.log.*

MOR表的合并成本在读取端。因此在写入期间我们不会合并或创建较新的数据文件版本。

每次的读取延迟都比较高

查询类型

1Snapshot Queries

快照查询,可以查询指定commit/delta commit即时操作后表的最新快照。

2Incremental Queries

增量查询,可以查询给定commit/delta commit即时操作以来新写入的数据。

3Read Optimized Queries

读优化查询,可查看给定的commit/compact即时操作的表的最新快照。

Read Optimized Queries是对Merge On Read表类型快照查询的优化。

写流程(INSERT

1Copy On Write

(1)先对 records 按照 record key 去重(可选)

(2)不会创建 Index

(3)如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file

2Merge On Read

(1)先对 records 按照 record key 去重(可选)

(2)不会创建 Index

(3)如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一个新的 FileSlice + base file

通过对写流程的梳理可以了解到 Apache Hudi 相对于其他数据湖方案的核心优势:

(1)写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。

(2)对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。

Compaction

(1)没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file

(2)有 base file:走 copy on write upsert 流程,先读 log file 建 index,再读 base file,最后读 log file 写新的 base file

Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。

flink操作hudi

插入数据

set sql-client.execution.result-mode=tableau;

-- 创建hudi

CREATE TABLE t1(

  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

)

PARTITIONED BY (`partition`)

WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1',

  'table.type' = 'MERGE_ON_READ' –- 默认是COW

);

-- 插入数据

INSERT INTO t1 VALUES

  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');

查询数据

select * from t1;

更新数据

insert into t1 values

  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

编写代码

package com.atguigu.hudi.flink;

 

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.contrib.streaming.state.PredefinedOptions;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

 

import java.util.concurrent.TimeUnit;

 

 

public class HudiDemo {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

        // 设置状态后端RocksDB

        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);

        embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        env.setStateBackend(embeddedRocksDBStateBackend);

 

        // checkpoint配置

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        checkpointConfig.setCheckpointStorage("hdfs://hadoop1:8020/ckps");

        checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));

        checkpointConfig.setTolerableCheckpointFailureNumber(5);

        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));

                checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

 

        StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);

 

        sTableEnv.executeSql("CREATE TABLE sourceT (\n" +

                "  uuid varchar(20),\n" +

                "  name varchar(10),\n" +

                "  age int,\n" +

                "  ts timestamp(3),\n" +

                "  `partition` varchar(20)\n" +

                ") WITH (\n" +

                "  'connector' = 'datagen',\n" +

                "  'rows-per-second' = '1'\n" +

                ")");

 

        sTableEnv.executeSql("create table t2(\n" +

                "  uuid varchar(20),\n" +

                "  name varchar(10),\n" +

                "  age int,\n" +

                "  ts timestamp(3),\n" +

                "  `partition` varchar(20)\n" +

                ")\n" +

                "with (\n" +

                "  'connector' = 'hudi',\n" +

                "  'path' = '/tmp/hudi_flink/t2',\n" +

                "  'table.type' = 'MERGE_ON_READ'\n" +

                ")");

 

        sTableEnv.executeSql("insert into t2 select * from sourceT");

 

    }

}

 

 

 

 

去重参数

通过如下语法设置主键:

-- 设置单个主键

create table hoodie_table (

  f0 int primary key not enforced,

  f1 varchar(20),

  ...

) with (

  'connector' = 'hudi',

  ...

)

 

-- 设置联合主键

create table hoodie_table (

  f0 int,

  f1 varchar(20),

  ...

  primary key(f0, f1) not enforced

) with (

  'connector' = 'hudi',

  ...

)

 

 

流读(Streaming Query

当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

CREATE TABLE t5(

  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t5',

  'table.type' = 'MERGE_ON_READ',

  'read.streaming.enabled' = 'true',

  'read.streaming.check-interval' = '4'   -- 默认60s

);

 

 

insert into t5 select * from sourceT;

 

select * from t5;

增量读取(Incremental Query

0.10.0 开始支持。

如果有增量读取 batch 数据的需求,增量读取包含三种场景。

(1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;

(2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit

(3)TimeTravelBatch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)

flink读取kafka数据并写入hudi数据湖

(1)创建kafka源表

create table stu3_binlog_source_kafka(

  id bigint not null,

  name string,

  school string,

  nickname string,

  age int not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string

 ) with (

  'connector' = 'kafka',

  'topic' = 'cdc_mysql_stu3_sink',

  'properties.bootstrap.servers' = 'hadoop1:9092',

  'format' = 'json',

  'scan.startup.mode' = 'earliest-offset',

  'properties.group.id' = 'testGroup'

  );

(2)创建hudi目标表

 create table stu3_binlog_sink_hudi(

  id bigint not null,

  name string,

  `school` string,

  nickname string,

  age int not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string,

  primary key (id) not enforced

)

 partitioned by (`school`)

 with (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',

  'table.type' = 'MERGE_ON_READ',

  'write.option' = 'insert',

  'write.precombine.field' = 'school'

  );

(3)将kafka数据写入到hudi中

insert into stu3_binlog_sink_hudi

select * from  stu3_binlog_source_kafka;

离线批量导入

如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

1)原理

(1)批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。

(2)bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。

SET execution.runtime-mode = batch;

SET execution.checkpointing.interval = 0;

(3)bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。

案例

Flink SQL client创建hudi表

 create table stu4_sink_hudi(

  id bigint not null,

  name string,

  `school` string,

  nickname string,

  age int not null,

 score decimal(4,2) not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string,

  primary key (id) not enforced

)

 partitioned by (`school`)

 with (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',

  'table.type' = 'MERGE_ON_READ',

  'write.option' = 'bulk_insert',

  'write.precombine.field' = 'school'

  );

Flink SQL client执行mysql数据插入到hudi中

insert into stu4_sink_hudi select * from stu4;

全量接增量

如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。

如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

(1) CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确

(2)设置 index.bootstrap.enabled = true开启索引加载功能

(6)重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle

Changelog 模式

如果希望 Hoodie 保留消息的所有变更(I/-U/U/D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。

changelog.enabled

false

false

默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息中间的变更可能会被 merge 改成 true 支持消费所有变更

批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。

案例演示

(1)使用changelog

set sql-client.execution.result-mode=tableau;

 

CREATE TABLE t6(

  id int,

  ts int,

  primary key (id) not enforced

) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t6',

  'table.type' = 'MERGE_ON_READ',

  'read.streaming.enabled' = 'true',

  'read.streaming.check-interval' = '4',

  'changelog.enabled' = 'true'

);

 

insert into t6 values (1,1);

insert into t6 values (1,2);

 

set table.dynamic-table-options.enabled=true;

select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

 

 

 

 

Hudi Catalog

从 0.12.0 开始支持,通过 catalog 可以管理 flink 创建的表避免每次使用都要重复建表操作,另外 hms 模式的 catalog 支持自动补全 hive 同步参数。

DFS 模式 Catalog SQL样例:

CREATE CATALOG hoodie_catalog

  WITH (

    'type'='hudi',

    'catalog.path' = '${catalog 的默认路径}',

    'mode'='dfs'

  );

Hms 模式 Catalog SQL 样例:

CREATE CATALOG hoodie_catalog

  WITH (

    'type'='hudi',

    'catalog.path' = '${catalog 的默认路径}',

    'hive.conf.dir' = '${hive-site.xml 所在的目录}',

    'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性

  );

 

离线 Compaction

MOR 表的 compaction 默认是自动打开的策略是 5 commits 执行一次压缩。 因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大的时候(10w+/s qps),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定。

设置参数

compaction.async.enabled 为 false,关闭在线 compaction。

compaction.schedule.enabled 仍然保持开启,由写任务阶段性触发压缩 plan。

命令行的方式

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

离线 Clustering

异步的 clustering 相对于 online 的 async clustering 资源隔离,从而更加稳定。

5.14.1 设置参数

clustering.async.enabled 为 false,关闭在线 clustering。

clustering.schedule.enabled 仍然保持开启,由写任务阶段性触发 clustering plan。

命令行的方式

./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

常见基础问题

5.15.1 存储一直看不到数据

如果是 streaming 写,请确保开启 checkpoint,Flink 的 writer 有 3 种刷数据到磁盘的策略:

当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)

当总的 buffer 大小积攒到一定大小(可配,默认 1GB)

当 checkpoint 触发,将内存里的数据全部 flush 出去

5.15.2 数据有重复

如果是 COW 写,需要开启参数 write.insert.drop.duplicates,COW 写每个 bucket 的第一个文件默认是不去重的,只有增量的数据会去重,全局去重需要开启该参数;MOR 写不需要开启任何参数,定义好 primary key 后默认全局去重。(注意:从 0.10 版本开始,该属性改名 write.precombine 并且默认为 true。)

如果需要多 partition 去重,需要开启参数: index.global.enabled 为 true。(注意:从 0.10 版本开始,该属性默认为 true。)

索引 index 是判断数据重复的核心数据结构,index.state.ttl 设置了索引保存的时间,默认为 1.5 天,对于长时间周期的更新,比如更新一个月前的数据,需要将 index.state.ttl 调大(单位天),设置小于 0 代表永久保存。(注意:从 0.10 版本开始,该属性默认为 0。)

5.15.3 Merge On Read 只有 log 文件

 Merge On Read 默认开启了异步的 compaction,策略是 5 commits 压缩一次,当条件满足参会触发压缩任务,另外,压缩本身因为耗费资源,所以不一定能跟上写入效率,可能会有滞后。

可以先观察 log,搜索 compaction 关键词,看是否有 compact 任务调度:

After filtering, Nothing to compact for 关键词说明本次 compaction strategy 是不做压缩。

集成 Hive

Hudi 源表对应一份 HDFS 数据,通过 Spark,Flink 组件或者 Hudi CLI,可以将 Hudi 表的数据映射为 Hive 外部表,基于该外部表, Hive可以方便的进行实时视图,读优化视图以及增量视图的查询。

6.1 集成步骤

以 hive3.1.2、hudi 0.12.0为例,其他版本类似。

1)拷贝编译好的jar

hudi-hadoop-mr-bundle-0.12.0.jar , hudi-hive-sync-bundle-0.12.0.jar 放到 hive 节点的lib目录下;

cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/

 

cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/

2)配置完后重启 hive

// 按照需求选择合适的方式重启

nohup hive --service metastore &

nohup hive --service hiveserver2 &

 Hive 同步

6.2.1 Flink 同步Hive

1)使用方式

Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置模版如下:

## hms mode 配置

 

CREATE TABLE t1(

  uuid VARCHAR(20),

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

)

PARTITIONED BY (`partition`)

with(

  'connector'='hudi',

  'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',

  'table.type'='COPY_ON_WRITE',        -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出

  'hive_sync.enable'='true',           -- required,开启hive同步功能

  'hive_sync.table'='${hive_table}',              -- required, hive 新建的表名

  'hive_sync.db'='${hive_db}',             -- required, hive 新建的数据库名

  'hive_sync.mode' = 'hms',            -- required, hive sync mode设置为hms, 默认jdbc

  'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口

);

Flink 使用 HiveCatalog

6.3.1 直接使用Hive Catalog

1)上传hive connectorflinklib

hive3.1.3的connector存在guava版本冲突,需要解决:官网下载connector后,用压缩软件打开jar包,删除/com/google文件夹。处理完后上传flink的lib中。

2)解决与hadoop的冲突   

避免与hadoop的冲突,拷贝hadoop-mapreduce-client-core-3.1.3.jar到flink的lib中(5.2.1已经做过)

3)创建catalog

CREATE CATALOG hive_catalog

  WITH (

    'type' = 'hive',

    'default-database' = 'default',

    'hive-conf-dir' = '/opt/module/hive/conf',

'hadoop-conf-dir'='/opt/module/hadoop-3.1.3/etc/hadoop'

  );

 

use catalog hive_catalog;

 

-- hive-connector内置了hive module,提供了hive自带的系统函数

load module hive with ('hive-version'='3.1.2');

show modules;

show functions;

 

-- 可以调用hivesplit函数

select split('a,b', ',');

查询 Hive 外表

6.5.1 设置参数

使用 Hive 查询 Hudi 表前,需要通过set命令设置 hive.input.format,否则会出现数据重复,查询异常等错误,如下面这个报错就是典型的没有设置 hive.input.format 导致的:

java.lang.IllegalArgumentException: HoodieRealtimeReader can oly work on RealTimeSplit and not with xxxxxxxxxx

除此之外对于增量查询,还需要 set 命令额外设置3个参数。

set hoodie.mytableName.consume.mode=INCREMENTAL;

set hoodie.mytableName.consume.max.commits=3;

set hoodie.mytableName.consume.start.timestamp=commitTime;

注意这3个参数是表级别参数。

COW 表查询

这里假设同步的 Hive 外表名为 hudi_cow。

1实时视图

设置 hive.input.format 为以下两个之一:

org.apache.hadoop.hive.ql.io.HiveInputFormat

org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat

像普通的hive表一样查询即可:

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

select count(*) from hudi_cow;

2增量视图

除了要设置 hive.input.format,还需要设置上述的3个增量查询参数,且增量查询语句中的必须添加 where 关键字并`_hoodie_commit_time > 'startCommitTime' 作为过滤条件(这地方主要是hudi的小文件合并会把新旧commit的数据合并成新数据,hive是没法直接从parquet文件知道哪些是新数据哪些是老数据)

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

set hoodie.hudicow.consume.mode= INCREMENTAL;

set hoodie.hudicow.consume.max.commits=3;

set hoodie.hudicow.consume.start.timestamp= xxxx;

select count(*) from hudicow where `_hoodie_commit_time`>'xxxx'

-- (这里注意`_hoodie_commit_time` 的引号是反引号(tab键上面那个)不是单引号, 'xxxx'是单引号)

6.5.3 MOR 表查询

这里假设 MOR 类型 Hudi 源表的表名为hudi_mor,映射为两张 Hive 外部表hudi_mor_ro(ro表)和 hudi_mor_rt(rt表)。

1实时视图

设置了 hive.input.format 之后,即可查询到Hudi源表的最新数据

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

select * from hudicow_rt;

2读优化视图

ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可

3)增量视图

这个增量查询针对的rt,不是ro表。同 COW 表的增量查询类似:

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; // 这地方指定为HoodieCombineHiveInputFormat

set hoodie.hudimor.consume.mode=INCREMENTAL;

set hoodie.hudimor.consume.max.commits=-1;

set hoodie.hudimor.consume.start.timestamp=xxxx;

select * from hudimor_rt where `_hoodie_commit_time`>'xxxx';// 这个表名要是rt

索引

说明:

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;最好只用于 rt 表的增量查询 当然其他种类的查询也可以设置为这个,这个参数会影响到普通的hive表查询,因此在rt表增量查询完成后,应该设置 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; 或者改为默认值set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 用于其他表的查询。

set hoodie.mytableName.consume.mode=INCREMENTAL; 仅用于该表的增量查询模式,若要对该表切换为其他查询模式,应设置set hoodie.hudisourcetablename.consume.mode=SNAPSHOT;

 

湖仓一体的优势如下:

流数仓架构本质上有两个痛点实时/离线计算层不统一;实时/离线存储层不统一。

减少数据冗余:湖仓一体提单一的数据存储平台减少了数据的冗余和重复,避免维护多个存储系统的成本和时间。

成本效益:湖仓一体利用低成本对象存储实现高效益数据存储,降低了存储成本,并避免了维护多个数据存储系统的成本。

事务支持:湖仓体支持 ACID 事务,确保了多方同时读取或写入数据的一致性。

Schema的实施和治理:湖仓一体支持Schema的实施和演化,确保数据的完整性,并提供了强大的治理和审计机制。

开放性:湖仓一体采用开放和标准化的存储格式,如Parquet,可以让各种工具和引擎直接访问数据。

存算分离:湖仓一体将存储和计算解耦,可以横向扩展到更大规模和更多并发用户。

支持多种工作负载:湖仓一体支持数据科学、机器学习、SQL和数据分析等各种工作负载,减少了需要维护多个工具的成本。

端到端的流计算支持:湖仓一体支持流计算,实时/离线存储层统一,实现实时报告的需求,避免了使用单独系统来实时数据应用程序的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1920226.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

YOLOv10改进 | Conv篇 | RCS-OSA替换C2f实现暴力涨点(减少通道的空间对象注意力机制)

一、本文介绍 本文给大家带来的改进机制是RCS-YOLO提出的RCS-OSA模块,其全称是"Reduced Channel Spatial Object Attention",意即"减少通道的空间对象注意力"。这个模块的主要功能是通过减少特征图的通道数量,同时关注空…

Android使用AndServer在安卓设备上搭建服务端(Java)(Kotlin)两种写法

一直都是通过OkHttp远程服务端进行数据交互,突发奇想能不能也通过OkHttp在局域网的情况下对两个安卓设备或者手机进行数据交互呢? 这样一方安卓设备要当做服务端与另一个安卓设备通过OkHttp进行数据交互即可 当然还可以通过 socket 和 ServerSocket 通…

IC后端设计中的shrink系数设置方法

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 在一些成熟的工艺节点通过shrink的方式(光照过程中缩小特征尺寸比例)得到了半节点,比如40nm从45nm shrink得到,28nm从32nm shrink得到,由于半节点的性能更优异,成本又低,漏电等不利因素也可以…

旷野之间5 - AI基础代理决策的范式转变

介绍 让我们来谈谈最近在人工智能领域引起轰动的一件事——基础代理及其彻底改变我们所知的决策的潜力。现在,我知道你可能会想,“另一天,又一个人工智能突破,乏味无趣。”但相信我,这是一个改变游戏规则的突破,值得你关注。 如果您一直在关注人工智能和人工智能代理的…

JupyterNotebook中导出当前环境,并存储为requirements.txt

​使用Anaconda管理Python环境时,可以轻松地导出环境配置,以便在其他机器或环境中重新创建相同的环境。可以通过生成一个environment.yml文件实现的,该文件包含了环境中安装的所有包及其版本。但是,常常在一些课程中JupyterNotebo…

synchronized关键字详解(全面分析)

目录 synchronized关键字详解1、synchronized关键字简介2、synchronized作用和使用场景作用使用场景①、用在代码块上(类级别同步)②、用在代码块上(对象级别同步)③、用在普通方法上(对象级别同步)④、用在静态方法上(类级别同步)总结: 3、synchronized底层原理&am…

记录些Redis题集(1)

为什么Redis要有淘汰机制? 淘汰机制的存在是必要的,因为Redis是一种基于内存的数据库,所有数据都存储在内存中。然而,内存资源是有限的。在Redis的配置文件redis.conf中,有一个关键的配置项: # maxmemory…

vue3<script setup>自定义指令

main.ts // 自定义指令 app.directive(color,(el,binding) > {el.style.color binding.value })这段代码定义了一个名为color的自定义指令,并将其注册到Vue应用实例app上。自定义指令接收两个参数:el和binding。el是绑定指令的元素,而bi…

240711_昇思学习打卡-Day23-LSTM+CRF序列标注(2)

240711_昇思学习打卡-Day23-LSTMCRF序列标注(2) 今天记录LSTMCRF序列标注的第二部分。仅作简单记录 Score计算 首先计算正确标签序列所对应的得分,这里需要注意,除了转移概率矩阵𝐏外,还需要维护两个大小…

解决鸿蒙开发中克隆项目无法签名问题

文章目录 问题描述问题分析解决方案 问题描述 在一个风和日丽的早晨,这是我学习鸿蒙开发的第四天,把文档过了一遍的我准备看看别人的项目学习一下,于是就用git去clone了一个大佬的开源项目,在签名的时候遇到了问题: h…

Codeforces Round 957 (Div. 3)(A~E题解)

这次比赛只能用抽象来形容,前五道题都没有什么算法,都是思维加模拟都能过,然后第四题卡住了,第五题不知道为什么做出来的人那么少,就是纯暴力就能过,但是没抓住上分的机会,有些可惜,…

Pytorch(笔记8神经网络nn)

1、nn.Module torch.nn是专门为深度学习而设计的模块。torch.nn的核心数据结构是Module,它是一个抽象的概念,既可以表示神经网络中的某个层(layer),也可以表示一个包含很多层的神经网络。在实际使用中,最常…

可视化学习:如何用WebGL绘制3D物体

在之前的文章中,我们使用WebGL绘制了很多二维的图形和图像,在学习2D绘图的时候,我们提过很多次关于GPU的高效渲染,但是2D图形的绘制只展示了WebGL部分的能力,WebGL更强大的地方在于,它可以绘制各种3D图形&a…

一行命令快速导出、导入Python的依赖环境(Python)

文章目录 一、pip1、导出2、导入 二、Conda(简)1、导出1、导入 一、pip 1、导出 在Pycharm的Terminal窗口输入如下命令,即可将环境导出至文件requirements.txt。 pip freeze > C:\Users\sdl\Deskto\requirements.txt也可以在DOS界面执行…

python:sympy 求解一元五次方程式

pip install sympy 或者 本人用的 anaconda 3 自带 sympy 在北大数学训练营,韦东奕 用卡丹公式 巧妙 求解一元五次方程式: \latex $x^510*x^320*x-4 0$ from sympy import *x symbols(x) expr x**5 10*x**3 20*x -4# 用卡丹公式 尝试化简 a sym…

【操作系统】进程管理——用信号量机制解决问题,以生产者-消费者问题为例(个人笔记)

学习日期:2024.7.10 内容摘要:利用信号量机制解决几个经典问题模型 目录 引言 问题模型 生产者-消费者问题(经典) 多生产者-多消费者问题 吸烟者问题 读者写者问题(难点) 哲学家进餐问题&#xff0…

如何在vue的项目中导入阿里巴巴图标库

阿里巴巴矢量图标库官网:iconfont-阿里巴巴矢量图标库 选择你喜欢的图标,添加入库 点击添加至项目,并新建文件夹,点击确定 选择font-class,点击生成代码 代码生成后,在网站上打开 全选复制到style 点击复制…

Agents 要点

一、Agents概念 人类是这个星球上最强大的 Agent。Agent是一个能感知并自主地采取行动的实体,这里的自主性极其关键,Agent要能够实现设定的目标,其中包括具备学习和获取知识的能力以提高自身性能。 关键点:感知环境、自主决策、具…

SpringBoot新手快速入门系列教程十一:基于Docker Compose部署一个最简单分部署服务项目

如果您还对于Docker或者Docker Compose不甚了解,可以劳烦移步到我之前的教程: SpringBoot新手快速入门系列教程九:基于docker容器,部署一个简单的项目 SpringBoot新手快速入门系列教程十:基于Docker Compose&#xf…

CSS特效:pointer-events: none;的一种特殊应用

一、需求描述 今天看到一个设计需求:需要在弹框中显示如下界面,其中有两个效果: 1.顶部点击项目,下面的内容能相应滚动定位,同时滚动的时候顶部项目也能相应激活显示 2.顶部右侧有一个模糊渐变效果,并且要…