1、Bloom Index
Bloom Index (default) 使用根据记录键构建的bloom过滤器,也可以使用记录键范围修剪候选文件.原理为计算RecordKey的hash值然后将其存储到bitmap中,为避免hash冲突一般选择计算3次
-
HoodieKey 主键信息:主要包含recordKey 和patitionPath 。recordkey 是由hoodie.datasource.write.recordkey.field 配置项根据列名从记录中获取的主键值。patitionPath 是分区路径。Hudi 会根据hoodie.datasource.write.partitionpath.field 配置项的列名从记录中获取的值作为分区路径。
-
https://llimllib.github.io/bloomfilter-tutorial/zh_CN/
原理:计算RecordKey的hash值然后将其存储到bitmap中去,key值做hash可能出现hash 碰撞的问题,为了较少hash 值的碰撞使用多个hash算法进行计算后将hash值存入BitMap,一般三次hash最佳
查找步骤:
1、提取所有的分区路径和主键值,然后计算每个分区路径中需要根据主键查找的索引的数量。
2、有了需要加载的分区后,调用LoadInvolvedFiles 方法加载分区下所有的parquet 文件。在加载paquet文件只是加载文件中的页脚信息,页脚存放的有布隆过滤器、记录最小值、记录最大值。对于布隆过滤器其实是存放的是bitmap序列化的对象。
3、加载好parquet 的页脚信息后会根据最大值和最小值构造线段树。
4、据Rdd 中RecordKey 进行数据匹配查找数据属于那个parqeut 文件中,对于RecordKey查找只有符合最大值和最小值范围才会去查找布隆过滤器中的bitmap ,RecordKey小于最小值找左子树,RecordKey大于最大值的key找右子树。递归查询后如果查找到节点为空说明RecordKey在当前分区中不存在,当前Recordkey是新增数据。查找索引时spark会自定义分区避免大量数据在一个分区查找导致分区数据倾斜。查找到RecordKey位置信息后会构造<HoodieKey,HoodieRecordLocation> Rdd 对象。
查找步骤,以Spark举例:
tagLocation:
-
从Spark Rdd中提取partitionPath以及recordKey,构建partitionRecordKeyPairRDD对象
-
调用lookupIndex方法,获取文件位置,返回值为:JavaPairRDD<HoodieKey, HoodieRecordLocation>
-
//1、将从Rdd中提取的一批值根据partition,进行分组 //key:partition value:数据数量 Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); //2、根据recordsPerPartition,将对应partition下的全部parquet文件加载上来 //tuple2.t1:partitionPath,tuple2.t2:BloomIndexFileInfo->fileId、parquet footer max min List<Tuple2<String, BloomIndexFileInfo>> fileInfoList = loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable); //3、根据partitionPath,对parquet元数据进行分组 final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo = fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList()))); //4、record与parquet文件进行匹配 fileid对应有HoodieKey的数据 -- 根据配置决定构建线段树还是暴力查找 //返回值 Tuple2 t1:fileId t2:partitionPath and recordKey 形成对应关系 JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD); //5、计算对每个文件组执行的布隆过滤器比较的估计数量 //返回值:key:fileId value:数据数量 Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context); //6、将partition数量与配置项 config.getBloomIndexParallelism()对比 //索引查找并行度 int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); //7、根据key值获取位置信息 //返回值:JavaPairRDD<HoodieKey, HoodieRecordLocation> //String instantTime; String fileId; findMatchingFilesForRecordKeys(fileComparisonsRDD, joinParallelism, hoodieTable, comparisonsPerFileGroup);
-
加载好parquet 的页脚信息后会根据最大值和最小值构造线段树
-
根据Rdd 中RecordKey 进行数据匹配查找数据属于那个parqeut 文件中,对于RecordKey查找只有符合最大值和最小值范围才会去查找布隆过滤器中的bitmap ,RecordKey小于最小值找左子树,RecordKey大于最大值的key找右子树。递归查询后如果查找到节点为空说明RecordKey在当前分区中不存在,当前Recordkey是新增数据。查找索引时spark会自定义分区避免大量数据在一个分区查找导致分区数据倾斜。查找到RecordKey位置信息后会构造<HoodieKey,HoodieRecordLocation> Rdd 对象。
-
加载paquet文件只是加载文件中的页脚信息,页脚存放的有布隆过滤器、记录最小值、记录最大值。对于布隆过滤器其实是存放的是bitmap序列化的对象。递归查询后如果查找到节点为空说明RecordKey在当前分区中不存在,当前Recordkey是新增数据。
-
-
2、 Global Bloom Index
全局布隆索引,与布隆索引的差异点在查找每个RecordKey 属于那个parquet 文件中,会加载所有parquet文件的页脚信息构造线段树,然后在去查询索引。因为Hudi需要加载所有的parquet文件和线段树节点变多对于查找性能会比普通的布隆索引要差。
但是对于分区字段的值发生了修改,如果还是使用普通的布隆索引会导致在当前分区查询不到当成新增数据写入Hudi表。这样我们的数据就重复了,在很多业务场景是不被允许的。所以在选择那个字段做分区列时,尽量选择列值永远不会发生变更的,这样我们使用普通布隆索引就可以了。
3、Simple Index
简易索引与布隆索引的不同是直接加载分区中所有的parquet数据然后在与当前的数据比较是否存在,实现比较简单。
- 1、提取所有的分区路径和主键值
- 2、根据分区路径加载所有涉及的分区路径内的parquet文件数据,主要加载HooieKey和fileID两列数据
- 3、同布隆索引一样,将原始数据与包含位置信息的查询数据进行做关联,提取位置信息赋值至原始数据上
4、Global Simple Index
简易全局索引同布隆全局索引一样,需要加载所有分区的parquet 文件数据,构造<HoodieKey,HoodieRecordLocation>Rdd然后进行关联。在简易索引中hoodie.simple.index.update.partition.path
配置项也是可以选择是否允许分区数据变更。数据文件比较多数据量很大,这个过程会很耗时。
5、HBase Index
将索引映射存储在外部hbase表中,为全局索引。在HBase索引中,文件和索引是分开在特定的情况下可能有一致性问题
HBase索引实现步骤如下:
-
1、连接HBase数据库
-
2、批量请求Hbase数据库
-
3、检查get获取数据是否为有效索引,这时Hudi会连接元数据检查commit时间是否有效,若无效currentLocation将不会被赋值。检查是否为有效索引的目的是当索引更新一半,导致Hbase宕机导致任务失败,保证不会加载过期的索引。避免Hbase索引和数据不一致导致数据进入错误的分区。
-
检查是否开启允许分区变更
-
另一个值得理解的关键方面是全局索引和非全局索引之间的区别。布隆和简单索引都有全局选项 - hoodie.index.type=GLOBAL_BLOOM 和 hoodie.index.type=GLOBAL_SIMPLE。 HBase 索引本质上是一个全局索引。 -
全局索引:全局索引强制跨表的所有分区的键的唯一性,即保证表中对于给定的记录键恰好存在一条记录。 全局索引提供了更强的保证,但更新/删除成本随着表 O(表大小)的大小而增长,这对于较小的表可能仍然是可以接受的。
-
非全局索引:另一方面,默认索引实现仅在特定分区内强制执行此约束。 可以想象,非全局索引依赖于编写器在更新/删除期间为给定的记录键提供相同的一致分区路径,但可以提供更好的性能,因为索引查找操作变为 O(更新/删除的记录数) 并且可以很好地扩展写入量。
6、InMemoryHashIndex(仅适用测试环境)
内存索引目前Spark的实现只是构造一个ConcurrentMap在内存中,不会加载parquet文件中的索引,当调用tagLocation方法会在map中判断key值是否存在
7、Bucket Index (0.11.0版本引入)
字节跳动引入,引入原因:初始使用bloom过滤器,数据量达到30TB,约5千亿条记录分布在40000个file Group中,bloom Filter Index假阳性很频繁。Hudi 需要确定该 Record Key 是否真的存在这个操作需要读取文件里的实际数据一条一条做对比,而实际数据量规模很大,这会导致查询 Record Key 跟 File ID 的映射关系代价非常大,因此造成了索引的性能下滑。
为什么没有使用Hbase替代?
业务方不希望引入 HBase 这一额外依赖,且担心运维 Hbase 过程中存在新的问题,认为 Hbase Index 整体不够轻量,因此在整个业务场景中也无法作为 Bloom Filter 索引的替代。
设计原理
Bucket Index 是一种基于哈希的索引,借鉴了数据库里的 Hash Index。给定 n 个桶, 用 Hash 函数决定某个记录属于哪个桶。最终所有分区被分成 N 个桶,每个桶对应一个 File Group。
相比较 Bloom Filter Index 来说,Hash Index 在逻辑层面提供了 Record Key 跟 File Group 的映射关系,不存在假阳性问题。相同 key 的数据一定是落在同一个桶里面。最终一分区内的结构如下,目前一个 Partition 里面 Bucket 和 File Group 是一一对应的关系。
数据写入原理
Bucket Index 的实际写入流程可以参考下面的过程示意图。以下面的实时插入场景为例,某业务批次新增了 5 条记录,并且需要 Upsert 到已有的分区 partition=20220203 中,对已有数据根据主键 Record 做一个更新,保留最新的数据。整个过程可以用下面的示意图表示:
- 在建表时先预估表的单个分区数据存储大小,设置一个分桶数 numBuckets。
- 在数据插入前,首先生成 n 个 File ID, 将 File ID 的前 8 位替换成 bucketId 的数字
- 00000000-e929-4327-8b0c-7d0d66091321
- 00000001-e3cd-4756-b311-863803a6cdaf
- 00000002-c4ed-4418-90d4-6e348f380636
- 00000003-c7bd-4916-78c5-6g787g090636
在插入过程中,最重要的一步就是标记每条新插入的记录属于哪个文件 File Group,然后找到对应的 File Group 去更新或者合并。在目前的设计中, 分桶数跟 File Group 是一一对应的映射关系,因此找到每条 Record 对应的桶 ID ,即可确定 Record Key 跟 File Group 的映射关系。
在具体实现中,我们会对更新数据的索引键计算哈希,再对分桶数取模快速定位到每个 Record 对应的桶,整个过程如下面的 Hash 函数所示:
hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets
其中 hashKeyFields 可以由用户指定,是 Record Key 的一个子集,当默认不指定时,会以 Record Key 本身作为 hash 键。在计算好后,每条记录即可知道即将写入的桶。
经过索引层之后,每条数据都会带有一个 File ID,引擎会根据 File ID 进行一次 Shuffle,将相同 File ID 的数据导入到同一个子任务中。对于 COW 表而言,更新 Update 部分需要和已有的 BaseFile 合并生成新的 BaseFile。而 MOR 表将 Update 的数据直接写入对应 File Group 的 delta log,Insert 部分生成新的 BaseFile,最终完成该批次数据的 Upsert。
由此可见,整个过程中 Bucket Index 不需要对现有的数据进行扫描组成类似 Bloom Filter 一样的过滤器,因此可以省去整个定位 File Group 的查询时间,定位 File Group 的时间也不会随着已有 Record 条数的增加而导致性能下降。同时分桶操作会在每个桶内对分桶列排序,排序后的数据一般能获得更高的压缩率,也能节省存储。