一. 前言
本文计划通过走读代码来理解Presto(其实是OpenLookeng)中BloomFilter索引的建立以及Presto中利用BloomFilter索引对查询进行优化的执行流程。OpenLookeng BloomFilter索引的基本资料可以参考官网介绍:openLooKeng documentation。
二. BloomFilterIndex
BloomFilter原理相关的资料网上很多,本章不重复叙述。本章主要简述Presto中BloomFilter的几个重要接口,因为BloomFilter的几个接口是Presto中整个BloomFilter索引优化的核心所在。理解BloomFilter对于理解三、四章节有较大的帮助。
BloomFilter最只要的接口主要有2个,一个是add接口,一个是test接口。add接口用于往BloomFilter的二进制池中添加数据,test接口用于测试数据是否存在。
Presto中创建BloomFilter索引就是利用对索引列的所有数据调用一遍add接口构建BloomFilter BitSet,然后在BloomFilter序列化发hdfs上。在查询的时候先发序列化成BloomFilter,然后使用Bloom索引test接口对Split中是否有匹配数据进行测试,如果split中没有匹配数据,则将整个split裁剪掉。
三. 创建BloomFilter索引代码走读
创建 BloomFilter索引是在CreateIndexOperator的算子中实现的。其数据入口为addInput方法,执行流程为
addInput
values.computeIfAbsent // 将tableScan的数据保存起来
// 一个tale的文件将产生一个索引文件
levelWriter.computeIfAbsent(filePath, getIndexWriter)
levelWriter.get(filePath).addData(values)
// indexPages的key值为stripe偏移,value为一个HashMap,
// HashMap的key值为索引列名称,value为索引列的所有的数据
indexPages.get(stripeOffset)...add(values.getValue())
// stripe的所有数据都已经收到
if (pageCountExpected.get(stripeOffset).get() == 0) {
persistStripe
index.addValues
BloomIndex.addValues
getFilter().add //调用了第二章节的add接口
// 将BloomFilter bitsets数据序列化后保存
// 到临时文件中
LOCAL_FS_CLIENT.newOutputStream
index.serialize(os)
}
当所有page都处理完毕时候,会调用到finish接口将本地文件物化到hdfs上:
finish
entry.getValue().persist // getValue()的值为FileIndexWriter
// 将索引数据从本地文件搬迁到hdfs
IndexServiceUtils.writeToHdfs(LOCAL_FS_CLIENT, fs, tmpPath, tarPath)
四. 利用BloomFilter索引进行查询优化
要想使用BloomFilter进行优化,首先需要将第三章节序列化物化到hdfs上的索引文件反序列化成BloomFilter,其代码执行流程如下所示,入口为IndexCache中的executor.scheduleAtFixedRate定时器:
cache.get(filterKey)
IndexCacheLoader.loadSplitIndex
indexClient.readSplitIndex
readIndexMap
index.deserialize(new CloseShieldInputStream(i)) // 反序列化章节三中序列化后的索引文件
构造出BloomFilter后,便可以利用where 条件 + BloomFilter进行split过滤。比如where id = 1,Presto会利用每个split所对应的BloomFilter测试一下该split中id中是否包含数值1,如果不包含,则将整个split裁剪掉,如下为其执行流程:
filterUsingForwardIndex
inputSplits.parallelStream().filter
indexCache.getIndices // 获取到反序列化的索引
indexerManager.getIndexFilter(allIndices).matches(expression)
HeuristicIndexFilter.matches
indexMetadata.getIndex().matches
BloomIndex.matches
getFilter().test // 最终调用章节二所示的test接口进行split过滤