leveldb架构
顺序写:level db不需要在各个level中去修改位置,而是只要放到最新的memTable中插入,所以在读取数据时如果在上层找到了数据或者数据flag是已删除就不需要继续寻找了。所以其能够提供比读更好的性能。
一、memTable
1.1 LSM-Tree VS B Tree
hash索引,需要将所有数据都读入到内存,对于大规模数据读不到内存里就不能使用hash索引。
- 从使用的角度上来说,B-tree等索引存储结构多用于OLTP型的数据库,因为这类数据库主要以事务,或是行级别的读取和存储为主的(比如Mysql)。换句话说,这种类型的数据库更多的操作是小批量或单行级别的更新或读取,并且可能还有事务方面的需求,这种类型正是B-tree结构所擅长的。
- 而 LSM-tree则多用于大规模数据情况下的检索分析和快速写入的情况。在写入的性能上,因为上直接写入内存再定期刷入到磁盘中,所以写入操作对用户的感知而言上非常迅速的。而检索速度也因为key顺序存储,可以快速定位到key对应的位置,因而具有较好的检索性能。
- 但是LSM-tree比较显著的应用方向还是在大规模分析这方面,在大规模分析(OLAP)场景下,数据通常都是列式存储,并且需要全表扫描。其中磁盘数据可以使用二进制进行压缩,读取的时候可以有效减少磁盘IO的处理时间(与之相比,B-tree等存储结构就无法充分压缩,因为每次都只处理小部分数据)。同时在存储文件中还能再进一步切分,比如将列式数据按照水平切分成不同的Page,同时存储一些简单的索引,用来指定不同Page大概范围,Hadoop的存储数据格式Parquet就是类似的设计。
memTable是基于跳表实现的结构。node格式,数据在memtable上是有序的,担当memtable到达一定大小后,就会转化为immutable之后放入到level 0,每一层level里面又有多个文件,但是在level 0中单个文件内是有序的,文件间可能是无序的有重叠,但是在其他level 层中文件间和文件内都是有序的。同时每个文件中也包含了多个block。(问题1:如果在immutable minor compact到sstable中的时候memtable马上又满了leveldb会停止写入数据吗?答案是会停止写入),
1.2 kv数据在memtable中的结构
使用protobuf varint编码,将internal_key_size和value_size由定长存储改为变长存储。
- 整数由定长改为变长存储
- 小整数仅占用1个字节,随着数值变大占用的字节数也变大,最多占用5个字节。
sequence:为该操作的编号数,编号越大表示该操作越新。
type:由于没有删除操作,所以通过type类型来判断是写入操作还是删除操作
MemTable
相关的有多种 key,本质上就是上图里三种:
- userkey: 用户传入的 key,即用户key.
- internal key: 增加了
sequence
type
,用于支持同一 key 的多次操作,以及 snapshot,即内部key. - memtable key: 增加了 varint32 encode 后的 internal key长度
1.3 memtable
explicit MemTable(const InternalKeyComparator& comparator); //初始化接收一个比较函数
//提供两种接口
void Add(SequenceNumber seq, ValueType type,
const Slice& key,
const Slice& value);
bool Get(const LookupKey& key, std::string* value, Status* s);
//通过迭代器形式 暴露底层skiplist
Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_);
}
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
int operator()(const char* a, const char* b) const;
};
friend class MemTableIterator;
friend class MemTableBackwardIterator;
typedef SkipList<const char*, KeyComparator> Table;
KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_; //底层skiplist
1.4 数据比较
- userkey 按照指定的排序方式,默认字节大小排序,akey-userkey < bkey-userkey 则返回-1.
- 如果 userkey 相等,那么解析出 sequence,按照 sequence 大小逆序排序,即 akey-sequence > bkey-sequence 则返回-1.sequence越大则代表数据越新,这样的好处是越新的数据越排在前面。
add:通过比较来插入,组装memtable_key的过程
get:userkey不存在的两种情况,1:没有userkey 2:存在userkey,但是最大的seq对应的type是删除
二、日志
每个block的长度最大为0x8000 = 32k,根据图中日志结构可知日志是变长的,只能从前往后遍历block,type的作用就是当 当前block的空间写不下那么多data时来标识该data跨Block了。
日志写入类的关系,因为日志只有添加,所以用不到其他Close等函数。PosixW是WriteableFile的子类,PosixW和Log::Writer是聚合关系(代表整体与部分的关系,比如Writer的功能知识PosixW的一部分)
三、SkipList实现
3.1 placement new:在用户指定的内存位置上构建新的对象
Object * p = new (address) ClassConstruct(...);
//先分配一对内存
int* buff = new int;
memset(buff,0,sizeof(int));
//此处new的placement new,在buff的内存上构造int对象,不需要分配额外的内存
int *p = new (buff)int(3);
std::cout << *p << std::endl; //3
- 优点:
- 在已分配好的内存上进行对象的构建,构建速度快
- 已分配好的内存可以反复利用,有效的避免内存碎片问题
3.2 跳表简介
skiplist,即跳表是由William Pugh在1989年发明的,允许快速查询一个有序连续元素的数据链表,搜索、插入、删除的平均时间复杂度均为O(lgn)。根据推导,在理想情况下当跳表的高度为logn时其具有最佳性能。
第一层:n 第二层:n/2 第三层:n/4元素 但在实际实现环节要想一直保持如此高的性能,需要经常调整结构,所以在实现时使用抛硬币的方法,从height=1开始每抛一次正面增加一层,同时一般也会设置最大层数。因为memtable中不需要删除元素,所以在leveldb代码中只提供了插入和查找两个接口。
3.3 SkipList类
template<typename Key, class Comparator>
class SkipList //类模板
void Insert(const Key& key); //插入函数接口
bool Contains(const Key& key) const; //查询函数接口
//成员变量
Comparator const compare_; //比较函数
Arena* const arena_; //leveldb的内存池 在该处用于分配node
Node* const head_; //跳表中的节点 对应一列
port::AtomicPointer max_height_; //
Random rnd_; //随机数产生器
//构造函数
template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
max_height_(reinterpret_cast<void*>(1)), //当前最大高度为1
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, nullptr); //初始化head_高度为最大高度 后面节点在高度大于最大高度时能够使用head_作为prev节点
}
}
3.4 Node && NewNode
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;//数据本身
//获取该节点在第n层的后继节点
Node* Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
}
//设置该节点在第n层的后继节点
void SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].Release_Store(x);
}
//不能保证线程安全
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
}
private:
// 作为Node的最后一个成员变量
// 由于Node通过placement new的方式构造,因此next_实际上是一个不定长的数组
// 数组长度即该节点的高度
// next_记录了该节点在所有层的后继节点,0是最底层链表。
port::AtomicPointer next_[1];
};
//所有的 Node 对象都通过NewNode构造出来:先通过arena_分配内存,然后通过 placement new 的方式调用 Node 的构造函数。
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
//额外存储(height - 1)个port::AtomicPointer
char* mem = arena_->AllocateAligned(
sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
return new (mem) Node(key);
}
3.5 Insert
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
Node* prev[kMaxHeight];
//prev记录每一层最后一个 < key的节点,也就是待插入节点的前驱节点
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
//随机决定节点高度height
int height = RandomHeight();
//如果新的高度比当前所有节点高度都大,那么填充prev更高层为head_,同时更新max_height_
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_; //prev 保存的是node节点 也就是跳表的一整列
}
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
}
//构造Node,高度为height
x = NewNode(key, height);
//插入节点x到prev及prev->next中间
for (int i = 0; i < height; i++) {
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
// 先修改x节点,再修改prev节点
prev[i]->SetNext(i, x);
}
}
template<typename Key, class Comparator> //有点楞 前面不是说1/2概率往上增加嘛 下面总结有解答
int SkipList<Key,Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
// 1/4概率继续增加height
while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
height++;
}
assert(height > 0);
assert(height <= kMaxHeight);
return height;
}
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {//如果next->key < key
// Keep searching in this list
x = next;
} else {//如果next->key >= key
//notes:如果单纯为了判断是否相等,这里可以加一个判断直接返回了,没必>要level--到0再返回,不过复杂度没有变化
if (prev != nullptr) prev[level] = x;//prev记录该level最后一个<key的节点
if (level == 0) {//到达最底层则返回next (next是第一个>=key的节点)
return next;
} else {
// Switch to next list
level--;
}
}
}
}
3.6 Contains
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
//x记录第一个>= key的Node
//注意FindGreaterOrEqual是查找>=key的Node,因此会迭代直到level = 0才返回
//实际上可以实现一个接口直接查找==key的Node,这样会在level >=0 时就能返回,查找的时间复杂度不变,不过可以预期减少比较次数。
Node* x = FindGreaterOrEqual(key, nullptr);
//判断x->key == key
if (x != nullptr && Equal(key, x->key)) {
return true;
} else {
return false;
}
}
3.7 总结
- 除了insert和contains还提供了FindLast和FindLessThan函数实现起来都差不多就不介绍了
- MemTable在读取时使用的时SkipList::Iterator
- 之前介绍抛硬币的方法概率为1/2,这里使用的是1/4,结合
kBranching = 4
和kMaxHeight = 12
,不影响复杂度的情况下,可以最多支持4**11 = 4194304
个节点,因此在百万节点左右,这么设置参数效果最优。 - 读写并发:读操作不会修改内部数据,因此多个reader不存在竞争,并发没有问题;多个读单个写操作也没有问题,因为采用了原子变量以及
memory order
,以及Insert
里执行语句的前后顺序;多个写操作之间存在竞争关系,需要锁控制。 - 重点说明下
Insert
里设置max_height_
前的那段注释,读线程可能读到旧的或者新的值,无论是哪种值,写线程都可能在更新SkipList
,因为后面更新是从低往高更新,而读是从高往低读,所以当读到新的节点的时候,继续往下层,一定是能读到正确值的。
四、block
sstable则是由一系列block组成,有data block(键值对,同时为了增加读取速度增加了index 索引),filter block等等,index block就是在个data block后面保存一个大于该data block所有key的值(结构类似下面这种把entry改为data block,restart pointer改为索引行,这样就能直接使用二分快速查找了。
index block,meta index block 都采用相同的数据格式,由类 BlockBuilder 负责生成。
4.1 leveldb::BlockBuilder
BlockBuilder
用于格式化传入的 key:value
数据,采用了 share key 的手段来优化存储的数据大小。
注意BlockBuilder
本身并不与存储打交道,所有数据都格式化到了内存,通过Finish
接口返回数据,由更上层对象写入到文件。
其采用合并相同前缀来节省空间(和其前一个key相同的部分),但为了使读取更高效,每N条entry则不再应用该规则。 restart pointer则记录了每个没有使用合并前缀的entry地址,同时又因为entry是有序的,所以可以在restart pointer之间进行二分查找。
4.2 Filter Block
为了加快SST中数据查询的效率,在直接查询DataBlock中的内容之前,会先根据FilterBlock中的过滤数据判定DataBlock中是否有需要查询的数据,若判断不存在,则无需对整个DataBlock进行数据查找。
FilterBlock存储的是DataBlock数据的一些过滤信息,这些过滤数据一般指代布隆过滤器的数据,用于加快查询的速度。每个DataBlock在FilterBlock中对应一个FilterData。
2.3 数据写入操作
- 向SST中追加文件,会先将数据写入内存中的DataBlock,如果DataBlock数据量大于指定大小,会将内存中的DataBlock写入磁盘。在将DataBlock写入前,会写入RestartPoint信息,数据压缩,CRC校验。最后将DataBlock的索引信息写入IndexBlock
- 当SST写入完成,写入内存中的FilterBlock、MetaIndexBlock、 IndexBlock、Footer信息
2.4 数据读取操作
- 读取Footer字段,获取IndexBlockIndex、MetaIndexBlockIndex,根据offset和length读取IndexBlock、MeatIndexBlock数据
- 根据IndexBlock索引数据获取查询数据的DataBlock,然后根据FilterBlock数据判定查询数据是否在定位的DataBlock中,加速数据查询
- 读取定位到的DataBlock,根据RestartPoint数据定位查询的数据块中的Entry
五、sstable
block和filter block都是 sstable 的一个组件,负责构造部分数据格式。
5.1 sstable
sstable被设计用于存储大量的 {key:value} 数据,当我们在 leveldb 查找某个 key 时,可能需要逐层查找多个 sstable 文件。
因此,sstable 在文件格式设计上,主要考虑:
- 查找速度,通过建索引解决
- 文件大小,通过压缩解决
5.2 文件格式
data block用于存储原始数据,同时为了方便磁盘查找,每个data block被设定为固定大小默认值为4K。
同时每一个data block对应一行信息,记录3要素:
- offset:即 data block 的偏移量
- size:即 data block 的大小
- data_block_key:满足条件
>= block 内所有的 key
被称为index block,其存储格式也是key value
key = data_block_key
value = (offset + size)
filter block,目前使用的是布隆过滤器,在查找key时,先通过filter block判断是否存在,如果不存在直接跳过对应的data block。同时在设计level db时预计会包含其他很多索引block,但目前就只有filter block,所以meta block等价于filter block,其里面只包含一组{key:value}数据就是找到filter block
5.3 footer
footer中包含了以下信息:
index of data block'index //index block位置
index of mata block'index //meta index block位置
footer 需要首先读取、解析出来,然后才能“按图索骥”找到其他 block,因此 footer 是定长的,而且位置固定在文件尾部。之后得到index block和meta index block位置,之后就可以从index block读到data block位置,meta index block中读到meta block位置。
<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1]
...
[meta block K]
[metaindex block]
[index block]
[Footer] (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>
5.4 源码解析
5.5 sstable读取
六、bloom filter
1. 简介
bloom filter是一种数据结构,作用类似于 hash table,相对于后者,空间利用率更高。
不过这种高利用率是有代价的,当我们在 bloom filter 查找 key 时,有返回两种情况:
- key 不存在,那么 key 一定不存在。
- key 存在,那么 key 可能存在。
也就是说 bloom filter 具有一定的误判率。
2. 理论知识
先介绍下 bloom filter 的几个组成:
- n 个 key
- m bits 的空间 v,全部初始化为0
- k 个无关的 hash 函数:
h1, h2, ..., hk
,hash 结果为{1, 2, ..., m} or {0, 1, ..., m-1})
具体的,对于 key=a
,经过 k 个 hash 函数后结果为
h1(a), h2(a), ..., hk(a)
那么就将 v 对应的 bit 置为 1.
假定 k 为 4,对应的 bloom filter 为:
注:这里有一个js写的一篇博客,支持互动的查看 bloom filter,更形象一些.
当 key 越来越多,v 里置为 1 的 bits 越来越多。对于某个不存在的 key’,k 个 hash 函数对应的 bit 可能正好为1,此时就概率发生误判,更专业的术语称为 false positive,或者 false drop.
因此,我们称 bloom filter 是一种概率型的数据结构,当返回某个 key’ 存在时,只是说明可能存在。
m 越大,k 越大, n 越小,那么 false positive越小。
更进一步,bloom filter 是关于空间和 false positive 的 tradeoff,bloom filter 的算法其实并不复杂,其真正的艺术在于这种平衡。
我们先看下 tradeoff 的结论:
hash 函数 k 的最优个数为 ln2 * (m/n).
七、minor compaction
minor compaction就是将memtable中的数据转化为immetable并写入到sstable中,同时其不总是直接写入到level 0,如果imm中的数据和level 1层的数据没有交集也会插入到level 1层。同时由于将imm数据写入到level磁盘中需要时间,如果太慢则会导致mem又满了而阻塞写入。后台有线程会执行以下代码来检测是否需要compaction并且minor的优先级比major高:
//实际Compact
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
//如果immutable memtable存在,则本次先compact,即Minor Compaction
if (imm_ != nullptr) {
CompactMemTable();
return;
}
...
// major compaction
CompactMemTable
(即minor compaction)主要流程分为三部分:
WriteLevel0Table(imm_, &edit, base)
:imm_
落盘成为新的 sst 文件,文件信息记录到edit
versions_->LogAndApply(&edit, &mutex_)
:因为compaction会生成新文件,同时旧文件可能还有人使用所以不能删,所以会保存多个版本信息,在compaction后将本次文件更新信息versions_
,当前的文件(包含新的 sst 文件)作为数据库的一个最新状态,后续读写都会基于该状态,(具体作用请看版本管理)DeleteObsoleeteFiles
:删除一些无用文件
imm_
持久化为 sstable 文件后,文件的相关信息通过meta
返回
{
mutex_.Unlock();
//更新memtable中全部数据到xxx.ldb文件
//meta记录key range, file_size等sst信息
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}
struct FileMetaData {
int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { }
};
meta
包括文件的 key range,大小,文件的引用数(当引用数为0时会从磁盘删除)等。
查找合适的 level 将新文件记录到edit
:
//为新生成sstable选择合适的level(不一定总是0)
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
//level及file meta记录到edit
edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest);
minor compaction 比较简单,因为只新增了一个 sstable 文件,加入后调用LogAndApply
生效到新版本。
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
//应用edit
s = versions_->LogAndApply(&edit, &mutex_);
}
八、版本管理
因为通过compaction会增加文件,存在新文件和老文件同时存在的情况,如果老文件还在执行操作此时不能删除。只有当其彻底没人使用时才能将老文件删除。
VersionEdit
即 delta,最重要的两个成员变量就是新增与删除文件:
DeletedFileSet deleted_files_;//待删除文件
//新增文件,例如immutable memtable dump后就会添加到new_files_
std::vector< std::pair<int, FileMetaData> > new_files_;
VersionEdit在每次compaction后都会调用该接口将新文件放入。同时在老文件没人使用时也会放入。
Version
用于表示某次 compaction 后的数据库状态,管理当前的文件集合,因此最重要一个成员变量files_
表示每一层的全部 sstable 文件。
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
8.1 PickLevelForMemTableOutput
就是为刚从 memtable 持久化的 sstable,选择一个合适的 level.
- level 0的 sstable 数量有严格的限制,因此尽可能尝试放到一个更大的 level.
- 大于 level 0的各层文件间是有序的,如果放到对应的层数会导致文件间不严格有序,会影响读取,则不再尝试。
- 如果放到 level + 1层,与 level + 2层的文件重叠很大,就会导致 compact 到该文件时,压力过大,则不再尝试。这算是一个预测,放到 level 层能够缓冲这一点。
- 最大返回 level 2,这大概是个经验值。
8.2 Builder
Builder是一个辅助类,实现Version + VersionEdit = Version‘,其中+ =分别对应Apply和SaveTo两个接口。
成员变量也是记录所有的 delta,levels_
存储了每一层的added_files
及deleted_files
:
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
VersionSet* vset_;
Version* base_;
LevelState levels_[config::kNumLevels];//每一层的新增及删除文件
**Apply:**将VersionEdit中记录的文件更新到levels_中
**SaveTo:**将levels_中的文件更新和base_合并生成一个新的Version v
,同时需要保证v中每一层文件之间的顺序。
8.3 VersionSet
随着Builder
不断执行,新的version
被构造出来。VersionSet
就负责管理多个版本,对应的变量全局唯一,在DBImpl
构造函数里初始化:
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {
管理一个双向链表
Version dummy_versions_; // 循环链表的头部.
Version* current_; // 最新版本
current_
指向最新的版本。
因此class Version
实际上还有三个重要的链表相关成员变量:
VersionSet* vset_; // Version中指向VersionSet
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
8.3.1 LogAndApply
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu)
的主要做了几件事,这个函数会在compaction中使用:
- 将
edit
应用于current_
生成一个新的Version
- 计算新
Version
下,下次 major compact 的文件Finalize(v);
由这个函数计算 - 更新一些元信息管理文件,将current_和edit信息写入manifest文件,接着在CURRENT文件里明文写入manifest文件名。(应该是用来故障恢复的,比如重启,断电等)
- 将新
Version
添加到双向链表,current_ = 新Version
首先是生成新Version
:
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
九、seek_compaction && size_compaction
对于major compaction有两种,一种是文件多次seek但是没有查找到数据(可能该key在这个文件里被删除了,或者在其他文件里,比如level 0是允许文件之间有重叠key范围的)。另外一种是当该level中文件过大时执行。
9.1 seek_compaction
// 1. 1次seek花费10ms
// 2. 1M读写花费10ms
// 3. 1M文件的compact需要25M IO(读写10-12MB的下一层文件),为什么10-12M?经验值?
// 因此1M的compact时间 = 25次seek时间 = 250ms
// 也就是40K的compact时间 = 1次seek时间,保守点取16KB,即t = 16K的compact时间 = 1次seek时间
// compact这个文件的时间: file_size / 16K
// 如果文件seek很多次但是没有找到key,时间和已经比compact时间要大,就应该compact了
// 这个次数记录到f->allowed_seeks
f->allowed_seeks = (f->file_size / 16384);//16KB
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
9.2 size_compaction
compaction 另外一个直观的想法就是,当某一层文件变得很大,往往意味着冗余数据过多,应该 compact 以避免占用磁盘以及读取过慢的问题。
level 越大,我们可以认为数据越“冷”,读取的几率越小,因此大的 level,能“容忍”的程度就越高,给的文件大小阈值越大。
具体的,当产生新版本时,遍历所有的层,比较该层文件总大小与基准大小,得到一个最应当 compact 的层。
这个步骤,在VersionSet::Finalize完成。
//计算compact的level和score,更新到compaction_level_&&compaction_score_
void VersionSet::Finalize(Version* v) {
int best_level = -1;
double best_score = -1;
//level 0看文件个数,降低seek的次数,提高读性能,个数/4
//level >0看文件大小,减少磁盘占用,大小/(10M**level)
//例如:
//level 0 有4个文件,score = 1.0
//level 1 文件大小为9M,score = 0.9
//那么compact的level就是0,score = 1.0
for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
可以看到 level 0 与其他层不同,看的是文件个数,因为 level 0 的文件是重叠的,每次读取都需要遍历所有文件,所以文件个数更加影响性能。
每层的基准大小为10M << ${level - 1}
,level = 1 则MaxBytes = 10M
, level = 2 则MaxBytes=100M
,依次类推.
逐层比较后,得到最大的得分以及对应层数:compaction_score_ compaction_level_
major compact 选择文件时就会用到上述两个条件。
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);//文件数过多
const bool seek_compaction = (current_->file_to_compact_ != nullptr);//seek了多次文件但是没有查到,记录到的file_to_compact_
因此,版本管理的作用之三,就是在新增或者遍历文件的过程中,为 major compact 筛选文件。
十、major compaction
作用
随着数据合并到更大的 level,一个明显的好处就是清理冗余数据。
如果不同 level 的 sst 文件里,存在相同的 key,那么更底层的数据就可以删除不再保留(不考虑 snapshot的情况下)。为了充分利用磁盘高性能的顺序写,删除数据也是顺序写入删除标记,而真正删除数据,是在 major compact 的过程中。
所以,一个作用是能够节省磁盘空间。
level 0 的数据文件之间是无序的,每次查找都需要遍历所有可能重叠的文件,而归并到 level 1 之后,数据变得有序,待查找的文件变少。
所以,另外一个作用是能够提高读效率。
10.1 当我们谈论筛选时,在谈论什么
leveldb 最为复杂的在 compaction,compaction 最为复杂的在 major compaction.面对磁盘上的众多 sstable 文件,应该怎么开始?
千里之行始于足下,首先需要找到最应该 compact 的一个文件。
“最应该”的判断条件,前面笔记已有介绍,有seek_compaction && size_compaction
,分别从读取和文件大小两个维度来判断。
筛选出这个文件后,还需要考虑一系列问题:
- 如果在 level 0,由于该层文件之间是无序的,如果只把这一个文件 compact 到 level 1 是否会导致读取错误?
- compact 到 level + 1后,会不会导致 level + 1 与 level + 2 的 compact 过于复杂?
- 这个文件应该与哪些文件 compact?
这些问题,都需要在PickCompaction
这个函数里解决。
10.2 Compaction
leveldb::Compaction
用来记录筛选文件的结果,其中inputs[2]
记录了参与 compact 的两层文件,是最重要的两个变量
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
10.3 VersionSet::PickCompaction
Compaction* VersionSet::PickCompaction()
简言之,就是选取一层需要compact的文件列表,及相关的下层文件列表,记录在Compaction*
返回。
其主要过程如下:
- 根据
inputs_[0]
确定inputs_[1]
- 根据
inputs_[1]
反过来看下能否扩大inputs_[0]
inptus_[0]
扩大的话,记录到expanded0
- 根据
expanded[0]
看下是否会导致inputs_[1]
增大 - 如果
inputs[1]
没有增大,那就扩大 compact 的 level 层的文件范围
也就是:
在不增加 level + 1 层文件,同时不会导致 compact 的文件过大的前提下,尽量增加 level 层的文件数
10.3.1 size_compactionor
seek_compaction
首先是根据size_compaction seek_compaction
计算应当 compact 的文件。
只有compaction_score_ >= 1
时,触发 size compaction.
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);//文件数过多
const bool seek_compaction = (current_->file_to_compact_ != nullptr);//seek了多次文件但是没有查到,记录到的file_to_compact_
如果size_compaction = true
,则找到该层一个满足条件的文件:
if (size_compaction) {
//该层第一个>compact_pointer_的文件,或者第一个文件
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(options_, level);
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
compact_pointer_
是 string 类型,记录了该层上次 compact 时文件的 largest key,初始值为空,也就是选择该层第一个文件。
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
如果seek_compaction = true
,则直接使用满足条件的文件。
到了这一步,inputs_[0]
里有且仅有一个文件。
10.3.2 level 0 输入文件的特殊处理
level 0 的文件之间是无序的,假设当前有 4 个文件,key range 分别是
[a, n]
[c, k]
[b, e]
[l, n]
本次选择了第3个文件,如果只是把[b, e]
更新到 level 1,那么就会导致读取时数据错误。因为多个文件之间数据是有重叠的,数据之间的先后无法判断,而更新到 level 1 就意味着认为数据更早。(将[b,e]更新到level 1,因为level 0是无序的,可能其他level 0中的文件有比这个文件更老的数据,直接更新到level 1后就导致level 1中有些文件比level 0更新了,后续读取会出错,所以需要将level 0层重叠文件都找出来)
对应的做法就是当选出文件后,判断还有哪些文件有重叠,把这些文件都加入进来,这个例子对应的就是把文件1 2都加进来。
代码上,先通过GetRange
获取输入文件的 key range,然后根据 key range 得到一个最全的文件列表。
// Files in level 0 may overlap each other, so pick up all overlapping ones
// 对level 0,获取当前已选择文件的key range: [smallest, largest]
// 然后选择level 0的其他与该key range有overlap的文件,组成新的key range
// 然后重新从头在level 0查找,直到key range固定下来
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
到了这一步,本质上是是的待 compact 的文件在各层都满足统一条件:inputs_[0]
的文件跟本层其他文件之间,没有 key 重叠
10.3.3 SetupOtherInputs
inputs_[0]
填充了第一层要参与 compact 的文件,接下来就是要计算下一层参与 compact 的文件,记录到inputs_[1]
。
基本的思想是:所有有重叠的 level + 1 层文件都要参与 compact,得到这些文件后,反过来看下,如果在不增加 level + 1 层文件的前提下,能否增加 level 层的文件?
也就是尽量增加 level 层的文件,贪心算法。
首先是计算下一层与inputs_[0]
key range 有重叠的所有 sstable files,记录到inputs_[1]
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
//inputs_[0]所有文件的key range -> [smallest, largest]
GetRange(c->inputs_[0], &smallest, &largest);
//inputs_[1]记录level + 1层所有与inputs_[0]有overlap的文件
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
//inputs_[0, 1]两层所有文件的key range -> [all_start, all_limit]
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
根据inputs_[1]
反推下 level 层有多少 key range 有重叠的文件,记录到expanded0
:
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
// 如果再不增加level + 1层文件的情况下,尽可能的增加level层的文件
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
//level层与[all_start, all_limit]有overlap的所有文件,记录到expanded0
//expanded0 >= inputs_[0]
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
如果文件确实又增加,同时又不会增加太多文件(太多会导致 compact 压力过大)
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
//1. level 层参与compact文件数有增加
//2. 但合并的文件总量在ExpandedCompactionByteSizeLimit之内(防止compact过多)
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {
那么就增加参与 compact 的文件,更新到inputs_
InternalKey new_start, new_limit;
//[new_start, new_limit]记录expand0的key range
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
//如果level层文件从inputs_[0]扩展到expand0,key的范围变成[new_start, new_limit]
//看下level + 1层overlap的文件范围,记录到expand1
current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
&expanded1);
//确保level + 1层文件没有增加,那么使用心得expand0, expand1
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level,
int(c->inputs_[0].size()),
int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size),
int(expanded0.size()),
int(expanded1.size()),
long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
到此,参与 compact 的文件集合就已经确定了,为了避免这些文件合并到 level + 1 层后,跟 level + 2 层有重叠的文件太多,届时合并 level + 1 和 level + 2 层压力太大,因此我们还需要记录下 level + 2 层的文件,后续 compact 时用于提前结束的判断:
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
// level + 2层有overlap的文件,记录到c->grandparents_
if (level + 2 < config::kNumLevels) {
//level + 2层overlap的文件记录到c->grandparents_
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
接着记录compact_pointer_
到c->edit_
,在后续PickCompaction
入口时使用。
// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
// 记录该层本次compact的最大key
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
最后一步,就是返回筛选的结果c
:
//选取一层需要compact的文件列表,及相关的下层文件列表,记录在Compaction*
Compaction* VersionSet::PickCompaction() {
Compaction* c;
...
return c;
}
10.4 BackgroundCompaction
//实际Compact
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
//如果immutable memtable存在,则本次先compact,即Minor Compaction
if (imm_ != nullptr) {
CompactMemTable();
return;
}
接着就是调用PickCompaction筛选合适的 level 及 文件。注意也可以手动指定 range,原理是类似的,不再赘述。
//如果immutable memtable不存在,则合并各层level的文件,称为Major Compaction
Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
//手动指定compact
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == nullptr);
if (c != nullptr) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
//自动compact,c记录了待参与compact的所有文件
c = versions_->PickCompaction();
}
//再接收到需要compaction的文件后 为了节省重新生成文件过程 有些情况可以直接使用源文件
// 1. level层只有一个文件
// 2. level + 1层没有文件
// 3. 跟level + 2层overlap的文件没有超过25M IsTrivialMove()在这个函数
Status status;
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
// level + 1没有overlap的文件,不需要compact,直接从level层标记到level + 1层即可
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
//直接把这个文件从level移动level + 1层
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();//删除旧文件,回收内存和磁盘空间
}
正常情况下,通过DoCompactionWork
完成文件的归并操作。
实现上,主要就是通过遍历所有的文件,实现多路归并,生成新的文件。
第一步,获取遍历所有文件用到的 Iterator*
. 因为level 0 是无序的所以level 0需要level 0.size个迭代器,其他层只需要1个迭代器。
iterator 返回的 key 全部有序,遍历过程可以清理掉一些 key。
由于多次Put/Delete,有些key会出现多次,在compact时丢弃。策略如下:
- 对于多次出现的user key,我们只关心最后写入的值 or >snapshot的值通过设置last_sequence_for_key = kMaxSequenceNumber以及跟compact->smallest_snapshot比较,可以分别保证这两点
- 如果是删除key && <= snapshot && 更高层没有该key,那么也可以忽略
同时跟上一节的思想类似,如果目前 compact 生成的文件,会导致接下来 level + 1 && level + 2 层 compact 压力过大,那么结束本次 compact.因此,每次都会调用ShouldStopBefore
来判断是否满足上述条件:
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
const VersionSet* vset = input_version_->vset_;
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &vset->icmp_;
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) >
0) {
if (seen_key_) {
overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
}
grandparent_index_++;
}
seen_key_ = true;
if (overlapped_bytes_ > MaxGrandParentOverlapBytes(vset->options_)) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
} else {
return false;
}
}
十一、写入和读取
1. Put
先用一张图片介绍下:
写入的key value
首先被封装到WriteBatch
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
//key,value数据更新到batch里
batch.Put(key, value);
return Write(opt, &batch);
}
WriterBatch
封装了数据,DBImpl::Writer
则继续封装了 mutex cond 等同步原语
// Information kept for every waiting writer
struct DBImpl::Writer {
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
写入流程实际上调用的是DBImpl::Write
//调用流程: DBImpl::Put -> DB::Put -> DBImpl::Write
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
//一次Write写入内容会首先封装到Writer里,Writer同时记录是否完成写入、触发Writer写入的条件变量等
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
数据被写入到writers_
,直到满足两个条件:
- 其他线程已经帮忙完成了
w
的写入 - 抢到锁并且位于
writers_
首部
MutexLock l(&mutex_);//多个线程调用的写入操作通过mutex_串行化
writers_.push_back(&w);
//数据先放到queue里,如果不在queue顶部则等待
//这里是对数据流的一个优化,wirters_里Writer写入时,可能会把queue里其他Writer也完成写入
while (!w.done && &w != writers_.front()) {
w.cv.Wait(); //这边会释放锁 直到被唤醒
}
//如果醒来并且抢到了mutex_,检查是否已经完成了写入(by其他Writer),则直接返回写入status
if (w.done) {
return w.status;
}
接着查看是否有足够空间写入,例如mem_
是否写满,是否必须触发 minor compaction 等
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == nullptr);
取出writers_
的数据,统一记录到updates
uint64_t last_sequence = versions_->LastSequence();//本次写入的SequenceNumber
Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
//updates存储合并后的所有WriteBatch
WriteBatch* updates = BuildBatchGroup(&last_writer); //一次性把writers_里面的数据都保存在updates 一次性写入 此时last_writer等于writers_最后一个
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
然后写入日志,写入内存:
//WriterBatch写入log文件,包括:sequence,操作count,每次操作的类型(Put/Delete),key/value及其长度
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
//log_底层使用logfile_与文件系统交互,调用Sync完成写入
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
//写入文件系统后不用担心数据丢失,继续插入MemTable
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
写入完成后,逐个唤醒等待的线程:
//last_writer记录了writers_里合并的最后一个Writer
//逐个遍历弹出writers_里的元素,并环形等待write的线程,直到遇到last_writer
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
// 唤醒队列未写入的第一个Writer
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
2. Sequence
批量写入接口DB::Write(const WriteOptions& options, WriteBatch* updates)
调用也是DBImpl::Write
。
批量写入一个典型问题就是一致性,例如这么调用:
leveldb::WriteBatch batch;
batch.Put("company", "Google");
batch.Put(...);
batch.Delete("company");
db->Write(write_option, &batch);
我们肯定不希望读到company -> Google
这个中间结果,而效果的产生就在于sequence
:versions_
记录了单调递增的sequence
,对于相同 key,判断先后顺序依赖该数值。
写入时,sequence
递增的更新到 memtable,但是一次性的记录到versions_
:
uint64_t last_sequence = versions_->LastSequence();//本次写入的SequenceNumber
...
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
...
versions_->SetLastSequence(last_sequence);
对于Get
操作(参考本文 Get 一节),看到的 sequence 只有两种可能: 批量提交get要不再batch.Put(“company”, “Google”);之前或者batch.Delete(“company”);之后执行。同时一个writebatch里的操作应该是一次全部写入的。
<= last_sequence
>= last_sequence + Count(updates)
因此读取时不会观察到中间状态。
3. WriteBatch
第一节介绍,写入的key/value
数据,都记录到了WriteBatch
,更具体的,记录到了:
//rep_存储了所有Put/Delete接口传入的数据
//按照一定格式记录了:sequence, count, 操作类型(Put or Delete),key/value的长度及key/value本身
std::string rep_; // See comment in write_batch.cc for the format of rep_
rep_
数据组织如下:一个writebatch可以包含多个操作