文章目录
- 基本概念
- 时间轴(Timeline)
- 文件布局(File Layout)
- 索引(Index)
- 索引原理
- 索引类型
- 索引的选择策略
- 表类型(Table Types)
- 查询类型(Query Types)
- 写操作(Write Operations)
- 写流程
基本概念
时间轴(Timeline)
Hudi 的核心是维护表上在不同的即时时间Instants
执行的所有操作的时间轴timeline
. 有助于提供表的即时视图,还有效支持按照到达顺序检索数据。
一个Instant
由以下三个部分组成
-
Instant Action
: 在表上执行的操作类型- COMMITS: 一次commit表示将一批数据原子性地写入一个表
- CLEANS: 清除表中不再需要的旧版本文件的后台活动
- DELTA_COMMIT: 增量提交指的是将一批数据原子性地写入一个
MergeOnRead
类型的表,其中部分或所有数据可以写入增量日志 - COMPACTION: 合并hudi内部差异数据结构的后台活动,例如将更新操作从基于
行
的log日志文件合并到列
式存储的数据文件。在内部,COMPACTION
体现为timeline
上的特殊提交 - ROLLBACK: 表示当
commit
/delta_commit
不成功时进行回滚,其会删除在写入过程中产生的部分文件 - SAVEPOINT: 将某些
文件组
标记为已保存,以便其不会被删除。在发生灾难时需要恢复数据的情况下,它有助于将数据集还原到时间轴上的某个点
-
Instant time
: 是一个时间戳(例如20221229140047416
),按照动作开始的时间顺序单调递增 -
State
:Instant
的当前状态- REQUESTED: 表示某个action已经被调度,但尚未执行
- INFLIGHT: 表示action当前正在执行
- COMPLETED: 表示timeline上的action已经完成
两个时间概念:
Arrival time
: 数据到达hudi的时间,commit timeEvent time
: record中记录的时间
官网示例如下:
上图中采用时间(小时)
作为分区字段,从10:00开始陆续产生各种commits, 10:20来了一条09:00的数据(这就是延迟数据),根据event time
该数据仍然可以落到09:00对应的分区,通过timeline直接消费10:00之后的增量更新(即10点之后的commits),那么这条延迟的数据仍然可以被消费到的。
文件布局(File Layout)
hudi 将一个表映射为如下结构
一个hudi表分为如下两部分
-
元数据:
.hoodie
目录对应着表的元数据信息,包括表的版本管理(timeline)、归档目录(archived,存放过时的Instant)。一个Instant记录了一个commit的行为、时间戳和状态。Hudi以时间轴的形式维护了在数据集上执行的所有操作的元数据
-
数据区:和hive一样,以分区方式存放数据;分区里面存放着
Base File
(.parquet
)和Log File
(.log.*
)
如果是一个分区表(默认就是分区,如果用
Sparksql可以创建非分区表
,那么数据区下面就直接存放的是数据文件BaseFile
),每个分区路径下面有多个文件组(File Group
),每个文件组有唯一的文件ID(File Id
)标识,每个文件组有多个文件片(File Slice
),每个文件片由一个数据文件Base File(.parquet)
和多个日志文件(.log.*
)组成. 像cow类型的表是没有.log
文件的(后面表类型再谈)
- Hudi将数据表组织成分布式文件系统基本路径(basepath)下的目录结构
- 表被划分为多个分区,这些分区是包含该分区的数据文件的文件夹,非常类似于Hive表
- 每个分区路径下面有多个文件组(
File Group
),每个文件组有唯一的文件ID(File Id
)标识 - 每个文件组有多个文件片(
File Slice
) - 每个文件片由一个数据文件
Base File(.parquet)
和多个日志文件(.log.*
)组成- 一个数据文件也叫一个基本文件:在某个instant通过commit或compaction生成。
mor
类型的表在未compaction前可能没有 - 多个日志文件(
.log.*
):这些日志文件包含自生成基本文件以来对基本文件的插入更新,cow
类型的表没有日志文件
- 一个数据文件也叫一个基本文件:在某个instant通过commit或compaction生成。
- huid采用了多版本并发控制(
Multiversion Concurrency Control, MVCC
)- compaction操作:合并日志和基本文件以产生新的文件片
- clean操作:清除不适用的或旧的文件片以回收文件系统上的空间
- Hudi的
base file
(parquet 文件
)在 footer 的 meta 区记录了 record key 组成的 BloomFilter,用于在 file based index 的实现中实现高效率的 key contains 检测 - Hudi 的
log
(avro 文件
)是自己编码的,通过积攒数据 buffer 以 LogBlock 为单位写出,每个 LogBlock 包含 magic number、size、content、footer 等信息,用于数据读、校验和过滤
索引(Index)
索引原理
hudi 通过索引机制提供高效的upserts,具体是将给定的hoodie key (record key + partition path)
与文件ID(文件组
)建立唯一映射。这种映射关系,在数据第一次写入文件后永远保持不变,换言之,一个File Group
包含了一批Record
的所有版本记录。Index
用于区分是Insert
还是Update
。
对于Copy-On-Write
类型的表,这种索引机制通过避免连接整个数据集来确定哪些数据文件需要被重写,可以实现快速的upserts/delete
操作。
对于Merge-On-Read
类型的表,对于任何给定的基本文件只需要根据这些基本文件的record记录key的更新来进行合并即可。
相比之下,没有索引的设计如(Hive ACID
),对于传入的所有更新/删除记录都不得不去和所有的基本文件进行合并。
上图是官网的例子,Hudi 为了消除不必要的读写,引入了索引的实现。在有了索引之后,更新的数据可以快速被定位到对应的 File Group
。上图为例,白色是基本文件,黄色是更新数据,有了索引机制,可以做到:避免读取不需要的文件、避免更新不必要的文件、无需将更新数据与历史数据做分布式关联,只需要在File Group
内做合并。
索引类型
Index类型 | 原理 | 优点 | 缺点 |
---|---|---|---|
Bloom Index (默认) | 使用布隆过滤器来判断记录存在与否,也可选使用record key的范围裁剪 需要的文件 | 效率高,不依赖外部系统,数据和索引保持一致性 | 因假阳性问题(hash冲突)还需回溯源文件再查找以便 |
Simple Index | 把update/delete操作的新数据和老数据进行join | 实现最简单,无需额外的资源 | 性能比较差 |
HBase Index | 把index存放在HBase里面。在插入 File Group定位阶段所有task向HBase发送 Batch Get 请求,获取 Record Key 的 Mapping 信息 | 对于小批次的keys,查询效率高 | 需要外部的系统,增加了运维压力 |
Flink State-based Index | HUDI 在 0.8.0 版本中实现的 Flink witer,采用了 Flink 的 state 作为底层的 index 存储,每个 records 在写入之前都会先计算目标 bucket ID | 不同于 BloomFilter Index,避免了每次重复的文件 index 查找 |
注意:Flink只有一种state based index(和bucket_index),其他index是Spark可选配置。
全局索引与非全局索引
- 全局索引: 全局索引在全表的所有分区范围下强制要求键的唯一性,也就是确保对给定的键有且只有一个对应的记录。全局索引提供了更强的保证,但是随着表增大,update/delete 操作损失的性能越高,因此更适用于小表。
- 非全局索引:默认的索引实现,只能保证数据在分区的唯一性。非全局索引依靠写入器为同一个记录的update/delete提供一致的分区路径,同时大幅提高了效率,更适用于大表。
从index的维护成本和写入性能的角度考虑,维护一个global index的难度更大,对写入性能的影响也更大,所以需要non-global index。
HBase索引
本质上是一个全局索引,Bloom Index
和Simple Index
都有全局选项:
hoodie.index.type=GLOBAL_BLOOM
hoodie.index.type=GLOBAL_SIMPLE
索引的选择策略
一、对事实表的延迟更新
许多公司会在NoSQL数据存储中存放大量的交易数据。例如共享出行的行程表、股票买卖记录的表、和电商的订单表。这些表通常一直在增长,且大部分的更新随机发生在较新的记录上,而对旧记录有着长尾分布型的更新。这通常是源于交易关闭或者数据更正的延迟性。换句话说,大部分更新会发生在最新的几个分区上而小部分会在旧的分区。
对于这样的作业模式,Bloom Index
就能表现地很好,因为查询索引可以靠设置得当的布隆过滤器来裁剪很多数据文件。另外,如果生成的键可以以某种顺序排列,参与比较的文件数会进一步通过范围裁剪而减少。Hudi用所有文件的键域来构造区间树,这样能来高效地依据输入的更删记录的键域来排除不匹配的文件。
为了高效地把记录键和布隆过滤器进行比对,即尽量减少过滤器的读取和均衡执行器间的工作量,Hudi缓存了输入记录并使用了自定义分区器和统计规律来解决数据的偏斜。有时,如果布隆过滤器的假阳性率过高,查询会增加数据的打乱操作。Hudi支持动态布隆过滤器(设置hoodie.bloom.index.filter.type=DYNAMIC_V0
)。它可以根据文件里存放的记录数量来调整大小从而达到设定的假阳性率。
二、对事件表的去重
事件流无处不在。从Apache Kafka或其他类似的消息总线发出的事件数通常是事实表大小的10-100倍。事件通常把时间(到达时间、处理时间)作为首类处理对象,比如物联网的事件流、点击流数据、广告曝光数等等。由于这些大部分都是仅追加的数据,插入和更新只存在于最新的几个分区中。由于重复事件可能发生在整个数据管道的任一节点,在存放到数据湖前去重是一个常见的需求。
总的来说,低消耗去重是一个非常有挑战的工作。虽然可以用一个键值存储来实现去重(即HBase索引),但索引存储的消耗会随着事件数增长而线性增长以至于变得不可行。事实上,有范围裁剪功能的布隆索引是最佳的解决方案。我们可以利用作为首类处理对象的时间来构造由事件时间戳和事件id(event_ts+event_id)组成的键,这样插入的记录就有了单调增长的键。这会在最新的几个分区里大幅提高裁剪文件的效益。
三、对维度表的随机更删
正如之前提到的,如果范围比较不能裁剪许多文件的话,那么布隆索引并不能带来很好的效益。在这样一个随机写入的作业场景下,更新操作通常会触及表里大多数文件从而导致布隆过滤器依据输入的更新对所有文件标明阳性。最终会导致,即使采用了范围比较,也还是检查了所有文件。使用简单索引对此场景更合适,因为它不采用提前的裁剪操作,而是直接和所有文件的所需字段连接。如果额外的运维成本可以接受的话,也可以采用HBase索引,其对这些表能提供更加优越的查询效率。
当使用全局索引时,也可以考虑通过设置hoodie.bloom.index.update.partition.path=true
或hoodie.simple.index.update.partition.path=true
来处理 的情况;例如对于以所在城市分区的用户表,会有用户迁至另一座城市的情况。这些表也非常适合采用Merge-On-Read
表型。
表类型(Table Types)
一、Copy-On-Write
类型的表
在COW
表中,只有列式存储的数据文件.parquet
,没有行式存储的日志文件.log.*
对每一个新批次写入都将创建相应数据文件的新版本(新的File Slice
),新版本文件就包括旧版本文件的记录以及来自传入批次的记录(即全量最新)
假设我们有 3 个文件组,其中包含如下数据文件。
我们进行一批新的写入,在索引后,我们发现这些记录与File group 1 和File group 2 匹配,然后有新的插入,我们将为其创建一个新的文件组(File group 4)。
因此data_file1 和 data_file2 都将创建更新的版本,data_file1 V2 是data_file1 V1 的内容与data_file1 中传入批次匹配记录的记录合并。
由于在写入期间进行合并,COW 会产生一些写入延迟,有写放大
的问题。但是COW 的优势在于它的简单性,不需要其他表服务(如压缩),也相对容易调试。
二、Merge-On-Read
类型的表
MOR表包含列存的基本文件(.parquet
)和行存的增量日志文件(基于行的avro
格式,.log.*
)。增量更新会被记录到日志文件中,然后以同步或异步的方式进行合并compaction
来生成新的基本文件(.parquet
)。
MOR表的合并成本在读取端。因此在写入期间我们不会合并或创建较新的数据文件版本。标记/索引完成后,对于具有要更新记录的现有数据文件,Hudi 创建增量日志文件并适当命名它们,以便它们都属于一个文件组。
读取端将实时合并基本文件及其各自的增量日志文件。每次的读取延迟都比较高(因为查询时进行合并),所以 Hudi 使用压缩机制来将数据文件和日志文件合并在一起并创建更新版本的数据文件。
用户可以选择内联或异步模式运行压缩。Hudi也提供了不同的压缩策略供用户选择,最常用的一种是基于提交的数量。例如可以将压缩的最大增量日志配置为 4。这意味着在进行 4 次增量写入后,将对数据文件进行压缩并创建更新版本的数据文件。压缩完成后,读取端只需要读取最新的数据文件,而不必关心旧版本文件。
MOR表的写入行为,依据 index
的不同会有细微的差别:
- 对于
BloomFilter
这种无法对 log file 生成 index 的索引方案,对于 INSERT 消息仍然会写 base file (parquet format),只有 UPDATE 消息会 append log 文件(因为 base file 已经记录了该 UPDATE 消息的 FileGroup ID)。 - 对于可以对 log file 生成 index 的索引方案,例如 Flink writer 中基于 state 的索引,每次写入都是 log format,并且会不断追加和 roll over。
三、COW表和MOR表的比较
Trade-Off | COW | MOR |
---|---|---|
数据延迟 | 高 | 低 |
查询延迟 | 低 | 高 |
更新IO开销 | 高(重写所有的数据文件) | 低(追加到日志文件即可) |
Parquet文件大小 | 单个全量使用parquet文件存储,占用空间小 | 单个全量存储使用parquet+avro格式存储,占用空间相对大 |
写放大 | 高 | 低(依赖于合并策略) |
查询类型(Query Types)
表类型 | 支持的查询类型 |
---|---|
COW | Snapshot Queries + Incremental Queries |
MOR | Snapshot Queries + Incremental Queries + Read Optimized Queries |
-
Snapshot Queries
快照查询,可以查询指定commit
或compaction
操作后表的最新快照。
对于MOR
类型的表,它通过即时合并最新文件片的基本文件和增量文件来提供近实时的表数据(分钟级)。
对于COW
类型的表,它可以替代现有的parquet表(或相同基本文件类型的表),同时提供upsert/delete和其他写入方面的功能,可以理解为查询最新版本的Parquet数据文件。下图是官方提供的
COW
类型的表的快照查询示意图
-
Incremental Queries
增量查询,可以查询给定commit
或compaction
操作后的新写入的数据。有效地提供变更流来启用增量数据管道。 -
Read Optimized Queries
读优化查询,MOR
类型的表独有的查询,相比于MOR
表的快照查询,读优化查询主要是为了保证查询效率,只提供表的基本文件的查询而不合并表的增量日志文件。下图是
MOR
表快照查询与读优化查询的对比。
Read Optimized Queries
是对Merge On Read
表类型快照查询的优化。
快照查询 | 读优化查询 | |
---|---|---|
数据延迟 | 低 | 高 |
查询延迟 | 高(合并列式的基本文件和行式的增量日志文件) | 低(只返回列式的基本文件) |
写操作(Write Operations)
下面几种hudi的写操作都能在每次commit
前去设置。知道这些写操作的区别就能知道在哪种场景选择更加合适的操作。
-
UPSERT
默认行为,记录records
会首先通过Index
打标是insert
还是update
,有一些启发式算法决定消息的组织以优化文件的大小。使用UPSERT
操作类型写入的hudi表不会有重复数据。 -
INSERT
类似于upsert
,但是跳过了Index
打标步骤,性能上比upsert
要好。如果只是需要 Hudi 的事务写
/增量拉取数据
/存储管理
的能力,并且可以容忍重复数据
,那么可以选择 INSERT 操作。 -
BULK_INSERT
实现了基于排序的数据写入算法,对hudi表大数据量初始化很友好。该操作性能是最高的,但是无法控制小文件,而UPSERT和INSERT操作使用启发式方法可以很好的控制小文件。
在确定数据都为新数据时建议使用
INSERT
,当存在更新数据时建议使用UPSERT
,当初始化数据集时建议使用BULK_INSERT
。
DELETE
hudi 对hudi表中的数据有两种类型的删除方式- 软删除:将除
hoodie key (record key + partition path)
之外的所有字段都置为null
值即可 - 物理删除:
- 在使用
DataSource
时,将OPERATION_OPT_KEY
设置为DELETE_OPERATION_OPT_VAL
- 在使用
DataSource
时,将PAYLOAD_CLASS_OPT_KEY
设置为org.apache.hudi.EmptyHoodieRecordPayload
- 在使用
DataSource
或DeltaStreamer
时,给数据集添加一个_hoodie_is_deleted
字段,其中要删除的数据该字段设置为true
,要保留的数据该字段设置为false
或null
- 在使用
- 软删除:将除
写流程
- 去重:如果同一批数据有重复的
key
值,则需要去重保留最新的记录 - 索引查询:查询该批数据是属于哪个
File Group
并打上Insert
或Update
的标记 - 文件大小:hudi会基于上次
commit
的数据大小,尽量往小文件上添加足够多的数据以达到文件最大阈值 - 分区:决定该批数据属于哪一些分区或者需要新建分区
- 写IO:此时进行数据写入,要么生成一个新的数据文件,要么写入到日志文件中,要么对数据文件更新一个版本
- 更新索引:数据在写入完成后需要更新索引
- 提交:
commit
所有的变更动作 - 清理(若必要):
commit
之后如有必要可做一些清理动作 - 合并:如果是
MOR
表,compaction
动作会同步或异步执行
10.归档:执行archive
动作将老版本的文件归档到指定归档目录