前言
本文为笔者个人阅读Apache Impala源码时的笔记,仅代表我个人对代码的理解,个人水平有限,文章可能存在理解错误、遗漏或者过时之处。如果有任何错误或者有更好的见解,欢迎指正。
正文
本文顺承前文Impala3.4源码阅读笔记(二) data-cache的Lookup实现继续分析实现Store的具体流程和细节,其中使用的一些类和对象前文有讲不再赘述,因此建议先阅读完前文后再继续。
Store的实现相较于Lookup要复杂得多,因为其中牵扯了缓存条目的插入与逐出、缓存文件的写入和多线程并发写的问题。还是那张图:
我们还是沿着执行路径进行分析,先从DataCache::Partition::Store
入手,首先会查找缓存元数据中缓存键是否存在:
Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
if (handle.get() != nullptr) {
if (HandleExistingEntry(key, handle, buffer, buffer_len)) return false;
}
只有缓存键不存在或者buffer_len
大于原来的缓存长度,HandleExistingEntry
才会返回true
继续之后的流程。之后会进行并发数和重复待缓存的检查:
const bool exceed_concurrency =
pending_insert_set_.size() >= FLAGS_data_cache_write_concurrency;
if (exceed_concurrency || pending_insert_set_.find(key.ToString()) != pending_insert_set_.end()) {
...
return false;
}
其中pending_insert_set_
是一个字符串集合,保存了待插入或正在插入的CacheKey
字符串,如果其大小超过data_cache_write_concurrency
限制或CacheKey
已存在(说明别的线程正在插入该条目)则本次插入会被放弃。通过了以上检查之后,会调用CacheFile::Allocate
在当前缓存文件里申请一个插入位置并将key移进pending_insert_set_
准备插入:
cache_file = cache_files_.back().get();
insertion_offset = cache_file->Allocate(charge_len, partition_lock);
...
pending_insert_set_.emplace(key.ToString());
然后是一段比较有意思的代码,设置作用域退出触发器:
auto remove_from_pending_set = MakeScopeExitTrigger([this, &key]() {
std::unique_lock<SpinLock> partition_lock(lock_);
pending_insert_set_.erase(key.ToString());
});
return InsertIntoCache(key, cache_file, insertion_offset, buffer, buffer_len);
作用域退出触发器工作原理类似于lock_guard
,我们可以传递一个函数给MakeScopeExitTrigger
来设置作用域退出触发器,当触发器对象被析构时会执行该函数。此处我们传递一个lambda函数来设置触发器,Store
执行完时会离开remove_from_pending_set
的所属作用域,然后执行该lambda函数,该函数的功能很简单就是将执行完插入的CacheKey
移出pending_insert_set_
。Store
最后就是调用InsertIntoCache
完成插入,InsertIntoCache
首先会向meta_cache_
申请一块空间存放Handle
:
Cache::UniquePendingHandle pending_handle(
meta_cache_->Allocate(key, sizeof(CacheEntry), charge_len));
if (UNLIKELY(pending_handle.get() == nullptr)) return false;
然后是将数据写入缓存文件,是的,在逐出旧数据之前会先写入新数据,所以缓存分区大小的限制会被暂时忽略:
if (UNLIKELY(!cache_file->Write(insertion_offset, buffer, buffer_len))) {
return false;
}
此处文件写入的Write
和Lookup
中读文件的Read
方法一样最终都调用了系统API完成,因此不再展开,我们继续看接下来的缓存条目插入:
CacheEntry entry(cache_file, insertion_offset, buffer_len, checksum);
memcpy(meta_cache_->MutableValue(&pending_handle), &entry, sizeof(CacheEntry));
Cache::UniqueHandle handle(meta_cache_->Insert(std::move(pending_handle), this));
首先构造了缓存条目,然后直接用memcpy
将条目直接写入handle
,在Lookup
实现的介绍中说过可以将handle
理解为键值对,MutableValue
则会返回handle
中值位置的写入地址。然后就是关键的缓存元数据插入了,Insert
会将上文构造好的handle
插入缓存元数据,我们先来看Insert
方法的定义:
UniqueHandle Insert(UniquePendingHandle handle, Cache::EvictionCallback* eviction_callback) override {
HandleBase* h_in = reinterpret_cast<HandleBase*>(DCHECK_NOTNULL(handle.release()));
HandleBase* h_out = shards_[Shard(h_in->hash())]->Insert(h_in, eviction_callback);
return UniqueHandle(reinterpret_cast<Cache::Handle*>(h_out), Cache::HandleDeleter(this));
}
可以看见该方法还需要传入一个Cache::EvictionCallback
对象的指针,EvictionCallback
是一个接口类,只定义了一个回调函数,meta_cache_
会在逐出缓存条目时调用该回调函数:
class EvictionCallback {
public:
virtual void EvictedEntry(Slice key, Slice value) = 0;
virtual ~EvictionCallback() = default;
};
Partition
类继承了该接口并实现了EvictedEntry
方法,该方法中包括了缓存文件的打洞操作:
void DataCache::Partition::EvictedEntry(Slice key, Slice value) {
...
entry.file()->PunchHole(entry.offset(), eviction_len);
...
}
PunchHole
可以将缓存文件中的一段区间挖掉并将存储空间还给操作系统,通过这种方法删除缓存文件中被逐出的数据片段。PunchHole
也使用系统API fallocate
实现文件打洞功能,此处不再展开。因为逐出缓存条目时需要回调该函数,所以meta_cache_->Insert(std::move(pending_handle), this)
还需要传入当前对象(Partition
对象)指针this。meta_cache_->Insert
又调用了RLCacheShard::Insert
,RLCacheShard
类通过RLHandle
实现了一个循环双链表并包括一个哈希表HandleTable
,支持FIFO和LRU两种缓存置换策略,RLCacheShard::Insert
比较长,我们只看一步步看其关键部分,首先是插入双链表和哈希表:
RLHandle* to_remove_head = nullptr;
RL_Append(handle);
RLHandle* old = static_cast<RLHandle*>(table_.Insert(handle));
if (old != nullptr) {
RL_Remove(old);
if (Unref(old)) {
old->next = to_remove_head;
to_remove_head = old;
}
}
将被逐出的条目会先被串成一个链表,to_remove_head
作为链表头结点。RL_Append
会将条目插入循环双链表,table_.Insert(handle)
将条目插入哈希表,如果哈希表已存在该条目则会将其替换然后返回旧条目,否则返回nullptr
。如果有返回旧条目需要将其删除,包括通过RL_Remove
将其移出循环双链表、Unref
将其引用计数减一,若之后引用计数只剩一还需将其链接到to_remove_head
准备逐出。然后就是缓存容量的限制部分:
while (usage_ > capacity_ && rl_.next != &rl_) {
RLHandle* old = rl_.next;
RL_Remove(old);
table_.Remove(old->key(), old->hash());
if (Unref(old)) {
old->next = to_remove_head;
to_remove_head = old;
}
}
其中rl_
是循环双链表的虚头结点,其next
结点为最旧的结点,prev
为最新的结点,新旧根据置换策略FIFO或LRU决定。当使用量超过容量usage_ > capacity_
时会一直循环,不断删除旧条目,删除过程与上文相同。最后就是旧条目的逐出:
while (to_remove_head != nullptr) {
RLHandle* next = to_remove_head->next;
FreeEntry(to_remove_head);
to_remove_head = next;
}
此处调用了FreeEntry
方法:
void RLCacheShard<policy>::FreeEntry(RLHandle* e) {
DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
if (e->eviction_callback) {
e->eviction_callback->EvictedEntry(e->key(), e->value());
}
UpdateMemTracker(-static_cast<int64_t>(e->charge()));
Free(e);
}
可以发现其调用了先前传入的回调函数EvictedEntry
来将数据从缓存文件逐出。完成了旧条目的逐出之后,Store的过程也就结束了。
最后还有一个问题,置换策略FIFO或LRU是如何实现的?实际上这两个策略唯一的区别就是如何定义缓存条目的新旧,FIFO的逻辑是越早插入的越旧,而LRU的逻辑是越早插入且越久没有被Lookup的越旧。RLCacheShard
内的循环双链表本身就是按照插入顺序排序的,所以FIFO策略实际上不需要额外的实现,链表第一个结点就是最旧的,而LRU策略则需要在Lookup后将Lookup访问的结点移到链表尾部让它成为最新的,在RLCacheShard::Lookup
中可以看到:
e = static_cast<RLHandle*>(table_.Lookup(key, hash));
if (e != nullptr) {
e->refs.fetch_add(1, std::memory_order_relaxed);
RL_UpdateAfterLookup(e);
}
在Lookup最后执行了RL_UpdateAfterLookup
来更新结点位置,我们来看其实现:
template<>
void RLCacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) {
}
template<>
void RLCacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) {
RL_Remove(e);
RL_Append(e);
}
对应FIFO和LRU,RL_UpdateAfterLookup
有两个特化模板,其中FIFO不需要更新结点位置,所以是个空函数,而LRU的RL_UpdateAfterLookup
也十分简单,移出结点重新插入到链表尾就完成了结点位置更新。