JuiceFS__持久化缓存源码走读

news2024/11/17 4:55:17

JuiceFS__持久化缓存源码走读

JuiceFS 是一款高性能 POSIX 文件系统,针对云原生环境特别优化设计,在 Apache 2.0 开源协议下发布。使用 JuiceFS 存储数据,数据本身会被持久化在对象存储(例如 Amazon S3),而数据所对应的元数据可以根据场景需求被持久化在 Redis、MySQL、TiKV 等多种数据库引擎中。

本文主要对JuiceFS持久化缓存disk_cache的实现源码进行分析,

1.JuiceFS IO流程

在这里插入图片描述
上图是JuiceFS官方绘制的Read操作流程图,从上图可以看到:在进行读取操作时,应用程序首先会读取FUSE的page cache,如果没有命中则fuse 请求到达用户态后会先访问用户态维护的mem cache,也就是这里的ReadBuffer,如果仍然没有命中,则继续访问持久化缓存,也就是Block Cache Index,从本地读取收据,如果仍然没有找到对应数据缓存,才触发对象存储的读取。

2.代码实现

持久化缓存的代码实现位于juicefs-main\pkg\chunk\disk_cache.go下

2.1核心数据结构

//用于缓存块查找
type cacheKey struct {
	id   uint64
	indx uint32
	size uint32
}
//缓存块管理结构
type cacheItem struct {
	size  int32
	atime uint32 //通过比较atime可以用于淘汰,避免自己管理LRU结构
}
//待刷盘的缓存管理结构
type pendingFile struct {
	key  string
	page *Page
}
//持久化缓存管理结构,每个缓存目录对应一个,可配置多个缓存目录
type cacheStore struct {
	totalPages int64
	sync.Mutex
	dir          string
	mode         os.FileMode
	capacity     int64 
	freeRatio    float32
	hashPrefix   bool
	scanInterval time.Duration
	pending      chan pendingFile
	pages        map[string]*Page
	m            *cacheManagerMetrics

	used      int64
	keys      map[cacheKey]cacheItem
	scanned   bool
	stageFull bool
	rawFull   bool
	eviction  string
	checksum  string // checksum level
	uploader  func(key, path string, force bool) bool
}
//缓存管理器接口,memStore和cacheStore都需要实现该接口,可以根据配置决定
type CacheManager interface {
	cache(key string, p *Page, force bool)
	remove(key string)
	load(key string) (ReadCloser, error)
	uploaded(key string, size int)
	stage(key string, data []byte, keepCache bool) (string, error)
	removeStage(key string) error
	stagePath(key string) string
	stats() (int64, int64)
	usedMemory() int64
}

下面从第一个重要的函数看起:创建cacheStore:

func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingPages int, config *Config, uploader func(key, path string, force bool) bool) *cacheStore {
	if config.CacheMode == 0 {
		config.CacheMode = 0600 // only owner can read/write cache
	}
	if config.FreeSpace == 0.0 {
		config.FreeSpace = 0.1 // 10%
	}
	c := &cacheStore{
		m:            m,
		dir:          dir,
		mode:         config.CacheMode,
		capacity:     cacheSize,
		freeRatio:    config.FreeSpace,
		eviction:     config.CacheEviction,
		checksum:     config.CacheChecksum,
		hashPrefix:   config.HashPrefix,
		scanInterval: config.CacheScanInterval,
		keys:         make(map[cacheKey]cacheItem),
		pending:      make(chan pendingFile, pendingPages),
		pages:        make(map[string]*Page),
		uploader:     uploader,
	}
	c.createDir(c.dir)
	br, fr := c.curFreeRatio()
	if br < c.freeRatio || fr < c.freeRatio {
		logger.Warnf("not enough space (%d%%) or inodes (%d%%) for caching in %s: free ratio should be >= %d%%", int(br*100), int(fr*100), c.dir, int(c.freeRatio*100))
	}
	logger.Infof("Disk cache (%s): capacity (%d MB), free ratio (%d%%), max pending pages (%d)", c.dir, c.capacity>>20, int(c.freeRatio*100), pendingPages)
	go c.flush()//持续将缓存刷到盘中
	go c.checkFreeSpace()//检查空闲空间是否充足
	go c.refreshCacheKeys()//扫描存在的缓存块,并加入到keys中
	go c.scanStaging()//遍历缓存目录,提交写缓存
	return c
}
//刷盘函数
func (cache *cacheStore) flush() {
	for {
		w := <-cache.pending
		path := cache.cachePath(w.key)
		//如果刷盘成功则加入keys map,用于后续查找
		if cache.capacity > 0 && cache.flushPage(path, w.page.Data) == nil {
			cache.add(w.key, int32(len(w.page.Data)), uint32(time.Now().Unix()))
		}
		cache.Lock()
		_, ok := cache.pages[w.key]
		delete(cache.pages, w.key)
		atomic.AddInt64(&cache.totalPages, -int64(cap(w.page.Data)))
		cache.Unlock()
		w.page.Release()
		if !ok {
			cache.remove(w.key)
		}
	}
}
//这里可以看到,缓存文件会写入缓存数据和校验和
func (cache *cacheStore) flushPage(path string, data []byte) (err error) {
	start := time.Now()
	cache.m.cacheWrites.Add(1)
	cache.m.cacheWriteBytes.Add(float64(len(data)))
	defer func() {
		cache.m.cacheWriteHist.Observe(time.Since(start).Seconds())
	}()
	cache.createDir(filepath.Dir(path))
	tmp := path + ".tmp"
	f, err := os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE, cache.mode)
	if err != nil {
		logger.Warnf("Can't create cache file %s: %s", tmp, err)
		return err
	}
	defer func() {
		if err != nil {
			_ = os.Remove(tmp)
		}
	}()

	if _, err = f.Write(data); err != nil {
		logger.Warnf("Write to cache file %s failed: %s", tmp, err)
		_ = f.Close()
		return
	}
	if cache.checksum != CsNone {
		if _, err = f.Write(checksum(data)); err != nil {
			logger.Warnf("Write checksum to cache file %s failed: %s", tmp, err)
			_ = f.Close()
			return
		}
	}
	if err = f.Close(); err != nil {
		logger.Warnf("Close cache file %s failed: %s", tmp, err)
		return
	}
	if err = os.Rename(tmp, path); err != nil {
		logger.Warnf("Rename cache file %s -> %s failed: %s", tmp, path, err)
	}
	return
}

核心接口函数(1):从内存将缓存写入磁盘

func (cache *cacheStore) cache(key string, p *Page, force bool) {
	if cache.capacity == 0 {
		return
	}
	if cache.rawFull && cache.eviction == "none" {
		logger.Debugf("Caching directory is full (%s), drop %s (%d bytes)", cache.dir, key, len(p.Data))
		cache.m.cacheDrops.Add(1)
		return
	}
	cache.Lock()
	defer cache.Unlock()
	if _, ok := cache.pages[key]; ok {
		return
	}
	p.Acquire()//手动引用计数+1
	cache.pages[key] = p
	atomic.AddInt64(&cache.totalPages, int64(cap(p.Data)))
	select {
	case cache.pending <- pendingFile{key, p}://将缓存刷新到磁盘
	default:
		//如果是强制刷盘策略,则等待
		if force {
			cache.Unlock()
			cache.pending <- pendingFile{key, p}
			cache.Lock()
		} else {
			// 没有足够带宽写入磁盘则抛弃
			logger.Debugf("Caching queue is full (%s), drop %s (%d bytes)", cache.dir, key, len(p.Data))
			cache.m.cacheDrops.Add(1)
			delete(cache.pages, key)
			atomic.AddInt64(&cache.totalPages, -int64(cap(p.Data)))
			p.Release()//手动引用计数-1
		}
	}
}

核心接口函数(2):通过ReadCloser加载磁盘缓存到内存

func (cache *cacheStore) load(key string) (ReadCloser, error) {
	cache.Lock()
	defer cache.Unlock()
	if p, ok := cache.pages[key]; ok {
		return NewPageReader(p), nil
	}
	//将key反序列化用于查找缓存
	k := cache.getCacheKey(key)
	if cache.scanned && cache.keys[k].atime == 0 {
		return nil, errors.New("not cached")
	}
	cache.Unlock()
	f, err := openCacheFile(cache.cachePath(key), parseObjOrigSize(key), cache.checksum)
	cache.Lock()
	if err == nil {
		if it, ok := cache.keys[k]; ok {
			// update atime
			//成功找到缓存文件就更新文件的访问时间
			cache.keys[k] = cacheItem{it.size, uint32(time.Now().Unix())}
		}
	} else if it, ok := cache.keys[k]; ok {
		if it.size > 0 {
			cache.used -= int64(it.size + 4096)
		}
		delete(cache.keys, k)
	}
	return f, err
}

(3)缓存空间清理,由于磁盘IO较慢,不能再添加缓存文件就执行清理,这样效率较低,所以这里考虑采用定时清理的机制

func (cache *cacheStore) checkFreeSpace() {
	for {
		br, fr := cache.curFreeRatio()
		cache.stageFull = br < cache.freeRatio/2 || fr < cache.freeRatio/2
		cache.rawFull = br < cache.freeRatio || fr < cache.freeRatio
		if cache.rawFull && cache.eviction != "none" {
			logger.Tracef("Cleanup cache when check free space (%s): free ratio (%d%%), space usage (%d%%), inodes usage (%d%%)", cache.dir, int(cache.freeRatio*100), int(br*100), int(fr*100))
			cache.Lock()
			cache.cleanup()
			cache.Unlock()
			br, fr = cache.curFreeRatio()
			cache.rawFull = br < cache.freeRatio || fr < cache.freeRatio
		}
		if cache.rawFull {
			cache.uploadStaging()
		}
		time.Sleep(time.Second)
	}
}

func (cache *cacheStore) cleanup() {
	goal := cache.capacity * 95 / 100
	num := len(cache.keys) * 99 / 100
	// make sure we have enough free space after cleanup
	br, fr := cache.curFreeRatio()
	if br < cache.freeRatio {
		total, _, _, _ := getDiskUsage(cache.dir)
		toFree := int64(float32(total) * (cache.freeRatio - br))
		if toFree > cache.used {
			goal = 0
		} else if cache.used-toFree < goal {
			goal = cache.used - toFree
		}
	}
	if fr < cache.freeRatio {
		_, _, files, _ := getDiskUsage(cache.dir)
		toFree := int(float32(files) * (cache.freeRatio - fr))
		if toFree > len(cache.keys) {
			num = 0
		} else {
			num = len(cache.keys) - toFree
		}
	}

	var todel []cacheKey
	var freed int64
	var cnt int
	var lastK cacheKey
	var lastValue cacheItem
	var now = uint32(time.Now().Unix())
	// for each two random keys, then compare the access time, evict the older one
	for k, value := range cache.keys {
		if value.size < 0 {
			continue // staging
		}
		if cnt == 0 || lastValue.atime > value.atime {
			lastK = k
			lastValue = value
		}
		cnt++
		if cnt > 1 {
			delete(cache.keys, lastK)
			freed += int64(lastValue.size + 4096)
			cache.used -= int64(lastValue.size + 4096)
			todel = append(todel, lastK)
			logger.Debugf("remove %s from cache, age: %d", lastK, now-lastValue.atime)
			cache.m.cacheEvicts.Add(1)
			cnt = 0
			if len(cache.keys) < num && cache.used < goal {
				break
			}
		}
	}
	if len(todel) > 0 {
		logger.Debugf("cleanup cache (%s): %d blocks (%d MB), freed %d blocks (%d MB)", cache.dir, len(cache.keys), cache.used>>20, len(todel), freed>>20)
	}
	cache.Unlock()
	for _, k := range todel {
		_ = os.Remove(cache.cachePath(cache.getPathFromKey(k)))
	}
	cache.Lock()
}

3.总结

总体来说,JuiceFS的持久化缓存实现方案比较简洁清楚,每个缓存块对应一个文件,通过定时清理空间,使可用空间保持在总容量5%以上,淘汰策略通过遍历keys哈希表来随机两两对比,清除atime较小的那个缓存块文件。该种淘汰策略相比经典的LRU策略更加简单,定时批量清理,留出足够的空间。这种方式清理缓存没有严格按照atime排序进行,可能在清理的准确性上稍弱。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/493173.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java小记 2023-05-05

public class Test {/*** 谓类的方法就是指类中用static 修饰的方法&#xff08;非static 为实例方法&#xff09;&#xff0c;比如main 方法&#xff0c;那么可以以main* 方法为例&#xff0c;可直接调用其他类方法&#xff0c;必须通过实例调用实例方法&#xff0c;this 关键…

7.3 有源滤波电路(2)

四、开关电容滤波器 开关电容电路由受时钟脉冲信号控制的模拟开关、电容器和运算放大电路三部分组成。这种电路的特性与电容器的精度无关&#xff0c;而仅与各电容器电容量之比的准确性有关。在集成电路中&#xff0c;可以通过均匀地控制硅片上氧化层的介电常数及其厚度&#…

国产版ChatGPT大盘点

我们看到,最近,国内大厂开始密集发布类ChatGPT产品。 一方面,是因为这是最近10年最大的趋势和机会。 另一方面,国内的AI,不能别国外卡了脖子。 那在类ChatGPT赛道上,哪些中国版的ChatGPT能快速顶上?都各有哪些困境需要突破呢?本文给诸位带来各个玩家的最新进展。 *…

大数据Doris(十二):Unique数据模型

文章目录 Unique数据模型 一、读时合并 二、写时合并 Unique数据模型 在某些多维分析场景下,用户更关注的是如何保证 Key 的唯一性,即如何获得 Primary Key 唯一性约束。因此,我们引入了 Unique 数据模型,该模型可以根据相同的Primary Key 来保留后插入的数据,确保数据…

Day962.如何更好地重构和组织后端代码 -遗留系统现代化实战

如何更好地重构和组织后端代码 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录是关于如何更好地重构和组织后端代码的内容。 如果说在气泡上下文中开发新的需求&#xff0c;类似于老城区旁边建设一个新城区&#xff0c;那么在遗留系统中开发新的需求&#xff0c;就类似于在…

c++的构造函数与析构函数

构造函数是一种特殊的成员函数&#xff0c;用于在对象创建时初始化对象的成员变量。它的名称与类名相同&#xff0c;没有返回类型&#xff0c;可以有参数。当创建对象时&#xff0c;构造函数会自动调用&#xff0c;以初始化对象的成员变量。如果没有定义构造函数&#xff0c;编…

华为OD机试真题-24点运算【2023】【JAVA】

一、题目描述 计算24点是一种扑克牌益智游戏&#xff0c;随机抽出4张扑克牌&#xff0c;通过加()&#xff0c;减(-)&#xff0c;乘(*), 除(/)四种运算法则计算得到整数24&#xff0c;本问题中&#xff0c;扑克牌通过如下字符或者字符串表示&#xff0c;其中&#xff0c;小写jo…

PCL1.12.0+Vtk7.1.1安装

1. qt4&#xff1a;Ubuntu 20.04 LTS 安装qt4 library_ubuntu20.04安装qt4 2.本文下载过程可参考1&#xff1a;ubuntu20.04下安装pcl_ubuntu安装pcl_Yuannau_jk的博客-CSDN博客 参考2&#xff1a;Ubuntu 20.04.05安装PCL-1.12.0_no package metslib found_zhiTjun的博客-CSDN…

解决 IDEA中的Tomcat服务器控制台乱码

解决 IDEA中的Tomcat服务器控制台乱码 问题描述&#xff1a;当我们使用idea编辑器部署web程序到tomcat服务器上&#xff0c;当我们运行tomcat的时候控制台出现服务器输出内容乱码的情况&#xff0c;这个问题可能是由于编码不一致引起的。在IDEA中&#xff0c;如果项目的编码方…

HttpServletRequest在Spring中的获取和注入 @Autowired注入Request

问题描述&#xff1a; 在最近一次团队review代码时&#xff0c;团队成员发现有将HttpServletRequest 直接通过Autowired注入的情况&#xff0c;于是大家产生了一个疑问&#xff0c;HttpServletRequest并非Spring中的类&#xff0c;且在没有手动通过Bean的方式注入&#xff0c;…

Oracle数据库、实例、用户、表空间、表之间的关系

数据库&#xff1a; Oracle数据库是数据的物理存储。这就包括&#xff08;数据文件ORA或者DBF、控制文件、联机日志、参数文件&#xff09;。其实Oracle数据库的概念和其它数据库不一样&#xff0c;这里的数据库是一个操作系统只有一个库。可以看作是Oracle就只有一个大数据库。…

Vue核心 绑定样式 条件渲染

1.11.绑定样式 class样式&#xff1a; 写法&#xff1a;:class“xxx”&#xff0c;xxx 可以是字符串、数组、对象:style“[a,b]” 其中a、b是样式对象**:style“{fontSize: xxx}”**其中 xxx 是动态值 字符串写法适用于&#xff1a;类名不确定&#xff0c;要动态获取数组写法…

HTB靶机07-Cronos-WP

cronos IP&#xff1a;10.10.10.13 scan ┌──(xavier㉿kali)-[~] └─$ sudo nmap -sSV -T4 10.10.10.13 Starting Nmap 7.93 ( https://nmap.org ) at 2023-04-06 23:19 CST Nmap scan report for 10.10.10.13 Host is up (0.23s latency). Not shown: 997 closed tcp por…

SpringCloud全面学习笔记之进阶篇

目录 前言微服务保护初识Sentinel雪崩问题及解决方案雪崩问题超时处理仓壁模式熔断降级流量控制总结 服务保护技术对比Sentinel介绍和安装微服务整合Sentinel 流量控制快速入门流控模式关联模式链路模式小结 流控效果warm up排队等待 热点参数限流全局参数限流热点参数限流案例…

算法记录 | Day52 动态规划

300.最长递增子序列 思路&#xff1a; 1.dp[i]的定义:以 nums[i] 结尾的最长递增子序列长度。 2.状态转移方程:位置i的最长升序子序列等于j从0到i-1各个位置的最长升序子序列 1 的最大值。 if (nums[i] > nums[j]) dp[i] max(dp[i], dp[j] 1); 注意这里不是要dp[i] …

基于AT89C52单片机的电子秒表设计与仿真

点击链接获取Keil源码与Project Backups仿真图&#xff1a; https://download.csdn.net/download/qq_64505944/87755619?spm1001.2014.3001.5503 源码获取 主要内容&#xff1a; 本设计以AT89C52单片机为核心&#xff0c;采用常用电子器件设计&#xff0c;包括电源开关、按键…

网络安全 等级保护 网络设备、安全设备知识点汇总

网络设备、安全设备知识点汇总 1、防火墙&#xff08;Firewall) 定义:相信大家都知道防火墙是干什么用的&#xff0c; 我觉得需要特别提醒一下&#xff0c;防火墙抵御的是外部的攻击&#xff0c;并不能对内部的病毒 ( 如ARP病毒 ) 或攻击没什么太大作用。 功能:防火墙的功能…

coturn中turnutils_peer和turnutils_uclient使用说明

coturn的作用有两个&#xff1a;寻找反射地址以及流转发&#xff0c;本人写过webrtc janus服务器部署在公网&#xff0c;coturn转发媒体流 coturn下面的工具turnutils_stunclient用于查找反射地址。 而turnutils_peer和turnutils_uclient用于测试转发功能&#xff0c;再次给以…

STL中priority_queue自定义类型使用和源码简单分析

priority_queue使用 这里说一下优先级队列的其他的用法,这里我们先看默认的究竟是建立大堆还是小堆? #include <iostream> #include <queue>int main() {int arr[] { 10, 2, 1, 3, 5, 4, 0 };std::priority_queue<int> q;for (size_t i 0; i < sizeo…

基于springboot的私人健身与教练预约管理系统

摘 要 随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;各行各业相继进入信息管理时代…