JuiceFS__持久化缓存源码走读
JuiceFS 是一款高性能 POSIX 文件系统,针对云原生环境特别优化设计,在 Apache 2.0 开源协议下发布。使用 JuiceFS 存储数据,数据本身会被持久化在对象存储(例如 Amazon S3),而数据所对应的元数据可以根据场景需求被持久化在 Redis、MySQL、TiKV 等多种数据库引擎中。
本文主要对JuiceFS持久化缓存disk_cache的实现源码进行分析,
1.JuiceFS IO流程
上图是JuiceFS官方绘制的Read操作流程图,从上图可以看到:在进行读取操作时,应用程序首先会读取FUSE的page cache,如果没有命中则fuse 请求到达用户态后会先访问用户态维护的mem cache,也就是这里的ReadBuffer,如果仍然没有命中,则继续访问持久化缓存,也就是Block Cache Index,从本地读取收据,如果仍然没有找到对应数据缓存,才触发对象存储的读取。
2.代码实现
持久化缓存的代码实现位于juicefs-main\pkg\chunk\disk_cache.go下
2.1核心数据结构
//用于缓存块查找
type cacheKey struct {
id uint64
indx uint32
size uint32
}
//缓存块管理结构
type cacheItem struct {
size int32
atime uint32 //通过比较atime可以用于淘汰,避免自己管理LRU结构
}
//待刷盘的缓存管理结构
type pendingFile struct {
key string
page *Page
}
//持久化缓存管理结构,每个缓存目录对应一个,可配置多个缓存目录
type cacheStore struct {
totalPages int64
sync.Mutex
dir string
mode os.FileMode
capacity int64
freeRatio float32
hashPrefix bool
scanInterval time.Duration
pending chan pendingFile
pages map[string]*Page
m *cacheManagerMetrics
used int64
keys map[cacheKey]cacheItem
scanned bool
stageFull bool
rawFull bool
eviction string
checksum string // checksum level
uploader func(key, path string, force bool) bool
}
//缓存管理器接口,memStore和cacheStore都需要实现该接口,可以根据配置决定
type CacheManager interface {
cache(key string, p *Page, force bool)
remove(key string)
load(key string) (ReadCloser, error)
uploaded(key string, size int)
stage(key string, data []byte, keepCache bool) (string, error)
removeStage(key string) error
stagePath(key string) string
stats() (int64, int64)
usedMemory() int64
}
下面从第一个重要的函数看起:创建cacheStore:
func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingPages int, config *Config, uploader func(key, path string, force bool) bool) *cacheStore {
if config.CacheMode == 0 {
config.CacheMode = 0600 // only owner can read/write cache
}
if config.FreeSpace == 0.0 {
config.FreeSpace = 0.1 // 10%
}
c := &cacheStore{
m: m,
dir: dir,
mode: config.CacheMode,
capacity: cacheSize,
freeRatio: config.FreeSpace,
eviction: config.CacheEviction,
checksum: config.CacheChecksum,
hashPrefix: config.HashPrefix,
scanInterval: config.CacheScanInterval,
keys: make(map[cacheKey]cacheItem),
pending: make(chan pendingFile, pendingPages),
pages: make(map[string]*Page),
uploader: uploader,
}
c.createDir(c.dir)
br, fr := c.curFreeRatio()
if br < c.freeRatio || fr < c.freeRatio {
logger.Warnf("not enough space (%d%%) or inodes (%d%%) for caching in %s: free ratio should be >= %d%%", int(br*100), int(fr*100), c.dir, int(c.freeRatio*100))
}
logger.Infof("Disk cache (%s): capacity (%d MB), free ratio (%d%%), max pending pages (%d)", c.dir, c.capacity>>20, int(c.freeRatio*100), pendingPages)
go c.flush()//持续将缓存刷到盘中
go c.checkFreeSpace()//检查空闲空间是否充足
go c.refreshCacheKeys()//扫描存在的缓存块,并加入到keys中
go c.scanStaging()//遍历缓存目录,提交写缓存
return c
}
//刷盘函数
func (cache *cacheStore) flush() {
for {
w := <-cache.pending
path := cache.cachePath(w.key)
//如果刷盘成功则加入keys map,用于后续查找
if cache.capacity > 0 && cache.flushPage(path, w.page.Data) == nil {
cache.add(w.key, int32(len(w.page.Data)), uint32(time.Now().Unix()))
}
cache.Lock()
_, ok := cache.pages[w.key]
delete(cache.pages, w.key)
atomic.AddInt64(&cache.totalPages, -int64(cap(w.page.Data)))
cache.Unlock()
w.page.Release()
if !ok {
cache.remove(w.key)
}
}
}
//这里可以看到,缓存文件会写入缓存数据和校验和
func (cache *cacheStore) flushPage(path string, data []byte) (err error) {
start := time.Now()
cache.m.cacheWrites.Add(1)
cache.m.cacheWriteBytes.Add(float64(len(data)))
defer func() {
cache.m.cacheWriteHist.Observe(time.Since(start).Seconds())
}()
cache.createDir(filepath.Dir(path))
tmp := path + ".tmp"
f, err := os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE, cache.mode)
if err != nil {
logger.Warnf("Can't create cache file %s: %s", tmp, err)
return err
}
defer func() {
if err != nil {
_ = os.Remove(tmp)
}
}()
if _, err = f.Write(data); err != nil {
logger.Warnf("Write to cache file %s failed: %s", tmp, err)
_ = f.Close()
return
}
if cache.checksum != CsNone {
if _, err = f.Write(checksum(data)); err != nil {
logger.Warnf("Write checksum to cache file %s failed: %s", tmp, err)
_ = f.Close()
return
}
}
if err = f.Close(); err != nil {
logger.Warnf("Close cache file %s failed: %s", tmp, err)
return
}
if err = os.Rename(tmp, path); err != nil {
logger.Warnf("Rename cache file %s -> %s failed: %s", tmp, path, err)
}
return
}
核心接口函数(1):从内存将缓存写入磁盘
func (cache *cacheStore) cache(key string, p *Page, force bool) {
if cache.capacity == 0 {
return
}
if cache.rawFull && cache.eviction == "none" {
logger.Debugf("Caching directory is full (%s), drop %s (%d bytes)", cache.dir, key, len(p.Data))
cache.m.cacheDrops.Add(1)
return
}
cache.Lock()
defer cache.Unlock()
if _, ok := cache.pages[key]; ok {
return
}
p.Acquire()//手动引用计数+1
cache.pages[key] = p
atomic.AddInt64(&cache.totalPages, int64(cap(p.Data)))
select {
case cache.pending <- pendingFile{key, p}://将缓存刷新到磁盘
default:
//如果是强制刷盘策略,则等待
if force {
cache.Unlock()
cache.pending <- pendingFile{key, p}
cache.Lock()
} else {
// 没有足够带宽写入磁盘则抛弃
logger.Debugf("Caching queue is full (%s), drop %s (%d bytes)", cache.dir, key, len(p.Data))
cache.m.cacheDrops.Add(1)
delete(cache.pages, key)
atomic.AddInt64(&cache.totalPages, -int64(cap(p.Data)))
p.Release()//手动引用计数-1
}
}
}
核心接口函数(2):通过ReadCloser加载磁盘缓存到内存
func (cache *cacheStore) load(key string) (ReadCloser, error) {
cache.Lock()
defer cache.Unlock()
if p, ok := cache.pages[key]; ok {
return NewPageReader(p), nil
}
//将key反序列化用于查找缓存
k := cache.getCacheKey(key)
if cache.scanned && cache.keys[k].atime == 0 {
return nil, errors.New("not cached")
}
cache.Unlock()
f, err := openCacheFile(cache.cachePath(key), parseObjOrigSize(key), cache.checksum)
cache.Lock()
if err == nil {
if it, ok := cache.keys[k]; ok {
// update atime
//成功找到缓存文件就更新文件的访问时间
cache.keys[k] = cacheItem{it.size, uint32(time.Now().Unix())}
}
} else if it, ok := cache.keys[k]; ok {
if it.size > 0 {
cache.used -= int64(it.size + 4096)
}
delete(cache.keys, k)
}
return f, err
}
(3)缓存空间清理,由于磁盘IO较慢,不能再添加缓存文件就执行清理,这样效率较低,所以这里考虑采用定时清理的机制
func (cache *cacheStore) checkFreeSpace() {
for {
br, fr := cache.curFreeRatio()
cache.stageFull = br < cache.freeRatio/2 || fr < cache.freeRatio/2
cache.rawFull = br < cache.freeRatio || fr < cache.freeRatio
if cache.rawFull && cache.eviction != "none" {
logger.Tracef("Cleanup cache when check free space (%s): free ratio (%d%%), space usage (%d%%), inodes usage (%d%%)", cache.dir, int(cache.freeRatio*100), int(br*100), int(fr*100))
cache.Lock()
cache.cleanup()
cache.Unlock()
br, fr = cache.curFreeRatio()
cache.rawFull = br < cache.freeRatio || fr < cache.freeRatio
}
if cache.rawFull {
cache.uploadStaging()
}
time.Sleep(time.Second)
}
}
func (cache *cacheStore) cleanup() {
goal := cache.capacity * 95 / 100
num := len(cache.keys) * 99 / 100
// make sure we have enough free space after cleanup
br, fr := cache.curFreeRatio()
if br < cache.freeRatio {
total, _, _, _ := getDiskUsage(cache.dir)
toFree := int64(float32(total) * (cache.freeRatio - br))
if toFree > cache.used {
goal = 0
} else if cache.used-toFree < goal {
goal = cache.used - toFree
}
}
if fr < cache.freeRatio {
_, _, files, _ := getDiskUsage(cache.dir)
toFree := int(float32(files) * (cache.freeRatio - fr))
if toFree > len(cache.keys) {
num = 0
} else {
num = len(cache.keys) - toFree
}
}
var todel []cacheKey
var freed int64
var cnt int
var lastK cacheKey
var lastValue cacheItem
var now = uint32(time.Now().Unix())
// for each two random keys, then compare the access time, evict the older one
for k, value := range cache.keys {
if value.size < 0 {
continue // staging
}
if cnt == 0 || lastValue.atime > value.atime {
lastK = k
lastValue = value
}
cnt++
if cnt > 1 {
delete(cache.keys, lastK)
freed += int64(lastValue.size + 4096)
cache.used -= int64(lastValue.size + 4096)
todel = append(todel, lastK)
logger.Debugf("remove %s from cache, age: %d", lastK, now-lastValue.atime)
cache.m.cacheEvicts.Add(1)
cnt = 0
if len(cache.keys) < num && cache.used < goal {
break
}
}
}
if len(todel) > 0 {
logger.Debugf("cleanup cache (%s): %d blocks (%d MB), freed %d blocks (%d MB)", cache.dir, len(cache.keys), cache.used>>20, len(todel), freed>>20)
}
cache.Unlock()
for _, k := range todel {
_ = os.Remove(cache.cachePath(cache.getPathFromKey(k)))
}
cache.Lock()
}
3.总结
总体来说,JuiceFS的持久化缓存实现方案比较简洁清楚,每个缓存块对应一个文件,通过定时清理空间,使可用空间保持在总容量5%以上,淘汰策略通过遍历keys哈希表来随机两两对比,清除atime较小的那个缓存块文件。该种淘汰策略相比经典的LRU策略更加简单,定时批量清理,留出足够的空间。这种方式清理缓存没有严格按照atime排序进行,可能在清理的准确性上稍弱。