前言
本文为笔者个人阅读Apache Impala源码时的笔记,仅代表我个人对代码的理解,个人水平有限,文章可能存在理解错误、遗漏或者过时之处。如果有任何错误或者有更好的见解,欢迎指正。
基本信息
data-cache是impala在本地的数据缓存,采用LRU策略存储频繁使用的表数据,避免每次使用都要从HDFS再次读取,从而加快数据读取速度。data-cache可以包括多个缓存分区,分区大小配额和存放路径通过参数配置,可参考Data Cache for Remote Reads。
模块结构
data-cache功能由DataCache
类实现,这个类被定义在be\src\runtime\io\data-cache.h中。DiskIoMgr
类是一个管理器类,负责为所有磁盘和远程文件系统上的所有查询进行IO调度,DiskIoMgr
对象是DataCache
对象唯一的管理者。HdfsFileReader
类则是DataCache
对象的唯一实际调用者,HdfsFileReader
通过DiskIoMgr
获取到DataCache
对象的指针并使用。
读取HDFS数据由HdfsFileReader::ReadFromPos
函数负责,ReadFromPos
会尝试从data-cache中读取数据,缓存MISS的数据部分再去HDFS中读取,全部读取结束后若有缓存MISS发生ReadFromPos
会尝试将本次读取的数据写入data-cache。
data-cache通过Partition
类管理缓存分区,一个Partition
对象主要包括一份缓存元数据和若干缓存文件。当向一个Partition
中插入新数据时,数据将被追加到该Partition
当前使用的备份文件中。每条缓存数据的存储消耗计入该分区的总大小,当一个分区大小达到其配额时,该分区中最近最少使用的数据将被逐出(LRU策略)。
一个缓存文件由一个CacheFile
对象负责管理,包括文件的创建删除、读写和PunchHole操作。缓存文件名格式为“impala-cache-file-” + UUID,缓存文件大小受到data_cache_file_max_size_bytes参数限制,默认1TB,插入数据时若超过该限制时会创建一个新的缓存文件来插入。数据逐出则是通过PunchHole来实现的,PunchHole按照给定偏移量和数据长度在文件中打洞,文件中的所有洞都不消耗存储空间,具体可以参考用fallocate进行"文件预留"或"文件打洞"。
每个分区都有一份缓存元数据meta_cache_,其类型为ShardedCache
类的独占指针,底层使用的HandleTable
类实现了一个开链哈希表,以Handle
为中介记录缓存键CacheKey
到缓存条目CacheEntry
的映射。
data-cache通过CacheKey
来唯一标识一条缓存数据,DataCache
也通过CacheKey
的Hash值来决定该条目插入哪个缓存分区。CacheKey
包括filename(数据对应的HDFS文件完整路径),mtime(该文件的修改时间)和offset(数据在该文件的起始偏移量)三部分,这三部分表明了数据来自哪个版本的哪个文件的哪个部分,只有这三部分完全匹配才能保证缓存数据与HDFS文件数据一致。CacheKey
仅包括offset而不包括数据长度使得data-cache目前有一个缺陷,就是会数据重复缓存且不支持子范围查找。举个例子,假如data-cache缓存了某文件[10,50]的数据,也就是文件第10~50个字节,其缓存键的offset=10。此时若要查找[20,30]的数据时则会MISS,即使[10,50]包含了[20,30],这是因为没有offset=20的缓存条目,同理插入[20,30]的数据也不会与[10,50]合并,它们的offset不同所以CacheKey
不同,不被认为是同一条目。
与CacheKey
一一对应的是缓存条目CacheEntry
类,CacheEntry
并不保存缓存数据,而是保存数据所在的CacheFile
对象指针、数据在缓存文件中的偏移量、数据长度和校验和。通过CacheEntry
对象获取缓存文件指针等参数才能去缓存文件中读取缓存数据。
此外data-cache中还有Tracer
和DataCacheTest
两个类,分别负责记录data-cache工作情况到日志和data-cache的测试。
工作流程
我们从HdfsFileReader::ReadFromPos
开始分析data-cache的工作流程,这个函数会被ScanRange
对象调用去读取HDFS文件,下面贴出了部分关键代码和注释说明:
Status HdfsFileReader::ReadFromPos(...) {
...
// 首先,尝试从datacache读取数据
// 从DiskIoMgr拿到datacache对象指针,并进行检查是否可用
DataCache* remote_data_cache = io_mgr->remote_data_cache();
bool try_data_cache = scan_range_->UseDataCache() && remote_data_cache != nullptr;
int64_t cached_read = 0;
if (try_data_cache) {
// ReadDataCache函数调用DataCache的API读取缓存数据到buffer,然后返回读取的字节数
cached_read = ReadDataCache(remote_data_cache, file_offset, buffer, bytes_to_read);
DCHECK_GE(cached_read, 0);
*bytes_read = cached_read;
}
// *bytes_read为已经读取的字节数,bytes_to_read为需要读取的字节数
// 通过循环去hdfs读取数据直到读取完所有需要的数据
while (*bytes_read < bytes_to_read) {
int bytes_remaining = bytes_to_read - *bytes_read;
...
int64_t position_in_file = file_offset + *bytes_read;
// ReadFromPosInternal从hdfs读取数据到buffer
status = ReadFromPosInternal(hdfs_file, queue, position_in_file,
buffer + *bytes_read, bytes_remaining, ¤t_bytes_read);
...
*bytes_read += current_bytes_read;
}
// 计算缓存miss的字节数,尝试将miss的数据写入缓存
int64_t cached_bytes_missed = *bytes_read - cached_read;
if (try_data_cache && status.ok() && cached_bytes_missed > 0) {
DCHECK_LE(*bytes_read, bytes_to_read);
// WriteDataCache函数调用DataCache的API尝试将buffer数据写入缓存
WriteDataCache(remote_data_cache, file_offset, buffer, *bytes_read,
cached_bytes_missed);
}
}
return status;
}
可以发现HdfsFileReader::ReadFromPos
通过ReadDataCache
和WriteDataCache
来读写data-cache,这两个函数比较简单,下面贴出了两个函数的代码:
int64_t HdfsFileReader::ReadDataCache(DataCache* remote_data_cache, int64_t file_offset,
uint8_t* buffer, int64_t bytes_to_read) {
// 直接调用datacache的Lookup函数读取数据到buffer,并返回读取字节数
int64_t cached_read = remote_data_cache->Lookup(*scan_range_->file_string(),
scan_range_->mtime(), file_offset, bytes_to_read, buffer);
// 下面都是一些与功能无关的metric更新
...
return cached_read;
}
// WriteDataCache没有返回值是因为缓存插入并非一定成功的,多线程并发写、数据条目过大都可能导致失败
void HdfsFileReader::WriteDataCache(DataCache* remote_data_cache, int64_t file_offset,
const uint8_t* buffer, int64_t buffer_len, int64_t bytes_missed) {
// 直接调用datacache的Store函数将buffer数据写入到缓存
remote_data_cache->Store(*scan_range_->file_string(), scan_range_->mtime(),
file_offset, buffer, buffer_len);
// 下面都是一些与功能无关的metric更新
...
}
接下来我对DataCache
的两个关键读写函数进行分析,首先是负责查找并读取缓存的Lookup
及其相关函数:
int64_t DataCache::Lookup(const string& filename, int64_t mtime, int64_t offset,
int64_t bytes_to_read, uint8_t* buffer) {
...
// 构造一个缓存键,并计算其哈希值以确定其对应条目所在的分区索引。
const CacheKey key(filename, mtime, offset);
int idx = key.Hash() % partitions_.size();
// 转而调用对应分区的Lookup函数,完成数据读取
int64_t bytes_read = partitions_[idx]->Lookup(key, bytes_to_read, buffer);
...
return bytes_read;
}
int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to_read,
uint8_t* buffer) {
// 从缓存元数据meta_cache_中拿到CacheKey对应的Handle
Slice key = cache_key.ToSlice();
Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
// Handle为空说明缓存MISS了,记录到追踪文件(如果开启了对应功能)并退出
if (handle.get() == nullptr) {
if (tracer_ != nullptr) {
tracer_->Trace(Tracer::MISS, cache_key, bytes_to_read, /*entry_len=*/-1);
}
return 0;
}
// 将Handle转换为CacheEntry
CacheEntry entry(meta_cache_->Value(handle));
if (tracer_ != nullptr) {
tracer_->Trace(Tracer::HIT, cache_key, bytes_to_read, entry.len());
}
// 从CacheEntry拿到数据所在的CacheFile对象指针
CacheFile* cache_file = entry.file();
bytes_to_read = min(entry.len(), bytes_to_read);
...
// 调用CacheFile的Read读取缓存文件数据,如果失败会删除该条目并退出
if (UNLIKELY(!cache_file->Read(entry.offset(), buffer, bytes_to_read))) {
meta_cache_->Erase(key);
return 0;
}
// 验证校验和是否启用,若启用会检查校验和并删除校验和不匹配的条目并退出
if (FLAGS_data_cache_checksum && bytes_to_read == entry.len() &&
!VerifyChecksum("read", entry, buffer, bytes_to_read)) {
meta_cache_->Erase(key);
return 0;
}
return bytes_to_read;
}
Lookup过程的示意图如下所示(示意缓存HIT的情况并省略了缓存分区等细节):
然后是负责写入缓存的Store
及其相关函数:
bool DataCache::Store(const string& filename, int64_t mtime, int64_t offset,
const uint8_t* buffer, int64_t buffer_len) {
...
// 构造一个缓存键,同样计算其哈希值以确定其对应条目需要放入的分区索引。
const CacheKey key(filename, mtime, offset);
int idx = key.Hash() % partitions_.size();
bool start_reclaim;
// 转而调用对应分区的Store函数,完成数据读取
bool stored = partitions_[idx]->Store(key, buffer, buffer_len, &start_reclaim);
...
return stored;
}
bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffer,
int64_t buffer_len, bool* start_reclaim) {
...
// 检查是否已经缓存了该条目,只有该条目未被缓存或比原有缓存条目更大才继续缓存
{
Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
if (handle.get() != nullptr) {
if (HandleExistingEntry(key, handle, buffer, buffer_len)) return false;
}
}
// 然后是一些写缓存文件的准备工作
...
// 执行InsertIntoCache函数开始写入缓存
return InsertIntoCache(key, cache_file, insertion_offset, buffer, buffer_len);
}
bool DataCache::Partition::InsertIntoCache(const Slice& key, CacheFile* cache_file,
int64_t insertion_offset, const uint8_t* buffer, int64_t buffer_len) {
...
// 分配缓存handle
Cache::UniquePendingHandle pending_handle(
meta_cache_->Allocate(key, sizeof(CacheEntry), charge_len));
if (UNLIKELY(pending_handle.get() == nullptr)) return false;
// 计算校验和(如果开启了对应功能)
int64_t checksum = FLAGS_data_cache_checksum ? Checksum(buffer, buffer_len) : 0;
// 调用CacheFile的Write将数据写入缓存文件
if (UNLIKELY(!cache_file->Write(insertion_offset, buffer, buffer_len))) {
return false;
}
// 构造对应的缓存条目并转换为handle
CacheEntry entry(cache_file, insertion_offset, buffer_len, checksum);
memcpy(meta_cache_->MutableValue(&pending_handle), &entry, sizeof(CacheEntry));
// Insert按照LRU策略将handle插入缓存元数据meta_cache_
Cache::UniqueHandle handle(meta_cache_->Insert(std::move(pending_handle), this));
...
return true;
}
Store过程的示意图如下所示(示意缓存条目不存在的情况并省略了缓存分区、多线程并发等细节):
至此,data-cache的工作流程就分析完了。
高级配置项
在impalad.conf中配置-data_cache_enable_tracing=true可以开启data-cache追踪功能,开启后data-cache的工作情况会被记录在data-cache目录的impala-cache-trace.txt中:
通过阅读源代码,我们可以知道其中各项的含义:
- ts为timestamp,时间戳;
- s为status,cache状态,可能的值有H(命中)、M(未命中)、S(存储成功)和F(存储失败)四种;
- f为filename,请求文件的完整HDFS路径;
- m为mtime,请求文件的修改时间;
- o为offset,请求区域在文件中的起始偏移量;
- lLen为lookup_len,查找长度,也就是期望从cache读取的字节数;
- eLen为entry_len,缓存数据长度,也就是实际读取或写入缓存的字节数;