badgerdb 压缩合并

news2025/1/20 20:07:15

压缩合并原因

  •         badgerdb是lsm tree派系的数据库,put,delete接口都是通过追加写日志的方式来保存的,日志如果一直不清理,会导致读性能越来越差,占用的存储空间也越来越大,badgerdb为了解决这些问题,就有了日志合并,日志的合并的实现方式主要有参考rockdb和pebbledb这两个数据库。

压缩合并的协程启动

  •  加快压缩

        在open的时候,默认启动4个协程来进行压缩,启动4个协程进行压缩的目的是为了加快压缩,每个协程启动的时候还特意让协程启动的时间不一致,经可能的避开多个压缩写成同时操作同一层的同一个table。同一层的同一个table只能是一个压缩协程占用。levelsController.cstatus结构体会记录各层的table.

func (s *levelsController) startCompact(lc *z.Closer) {
    n := s.kv.opt.NumCompactors//默认是4
    lc.AddRunning(n - 1)
    for i := 0; i < n; i++ {
        go s.runCompactor(i, lc)//启动4个协程
    }
}
  • 优先压缩第0层

        其中编号为0的压缩协程会把L0层的的的优先级提高,编号为0的压缩协程不检查得分是否高于1,直接进行压缩。

	runOnce := func() bool {
		prios := s.pickCompactLevels()//选择压缩的sst
		if id == 0 {
			// Worker ID zero prefers to compact L0 always.
			prios = moveL0toFront(prios)
		}
		for _, p := range prios {
			if id == 0 && p.level == 0 {
				// Allow worker zero to run level 0, irrespective of its adjusted score.
			} else if p.adjusted < 1.0 {
				break
			}
			if run(p) {
				return true
			}
		}

		return false
	}

选择合并的层数

        第0层的得分的计算方式是第0层的table表的总数除以配置的第0层的大小;

        非0层的得分的计算方式是该层所有的table表的总大小除以目标大小;

	addPriority := func(level int, score float64) {
		pri := compactionPriority{
			level:    level,
			score:    score,
			adjusted: score,
			t:        t,
		}
		prios = append(prios, pri)
	}

	// Add L0 priority based on the number of tables.
	addPriority(0, float64(s.levels[0].numTables())/float64(s.kv.opt.NumLevelZeroTables))//opt.NumLevelZeroTables默认是5

	// All other levels use size to calculate priority.
	for i := 1; i < len(s.levels); i++ {
		// Don't consider those tables that are already being compacted right now.
		delSize := s.cstatus.delSize(i)

		l := s.levels[i]
		sz := l.getTotalSize() - delSize
		addPriority(i, float64(sz)/float64(t.targetSz[i]))//L1->L6层剩余的size
	}

        目标层的大小有一个比例系数LevelSizeMultiplier,默认是10,最后构成的目标大小如下图

 

 

非0层选择合并层的table表

        先按提交的时间把合并层的tbale表排序,提交时间早的优先被挑选到,挑选出来的table有最大和最小的key,在待合并的下一层找到和这个范围有交集的table表。在挑选的时候会放弃其他压缩协程正在处理的压缩table,重新进行选择

	// We pick tables, so we compact older tables first. This is similar to
	// kOldestLargestSeqFirst in RocksDB.
	s.sortByHeuristic(tables, cd)

	for _, t := range tables {
		cd.thisSize = t.Size()
		cd.thisRange = getKeyRange(t)
		// If we're already compacting this range, don't do anything.
		if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
			continue
		}
		cd.top = []*table.Table{t}
		left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)

		cd.bot = make([]*table.Table, right-left)
		copy(cd.bot, cd.nextLevel.tables[left:right])

		if len(cd.bot) == 0 {
			cd.bot = []*table.Table{}
			cd.nextRange = cd.thisRange
			if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
				continue
			}
			return true
		}
		cd.nextRange = getKeyRange(cd.bot...)

		if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) {
			continue
		}
		if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
			continue
		}
		return true
	}
	return false

每一次压缩过程,都是上层只有一个table表,下层至少有0个table表,最多可能是所有的table表

 


选择合并第0层table表

        第0层的table表和table之间的排列不是按照sst的范围来排的,只是按照写入的时间顺序来排,非0层table表之间按照sst的范围来排,非0层选择待压缩的table就直接从第0个开始,即写入时间最旧的,如果和下一个table表有交集就也需要加入,下一个table表的范围没有交集就退出。

        也就是会造成第0层待合入的table的个数会至少是一个,下一层也是至少是0个,最大是整层。

	top := cd.thisLevel.tables
	if len(top) == 0 {
		return false
	}

	var out []*table.Table
	if len(cd.dropPrefixes) > 0 {//正常业务这里就是0
		// Use all tables if drop prefix is set. We don't want to compact only a
		// sub-range. We want to compact all the tables.
		out = top

	} else {
		var kr keyRange
		// cd.top[0] is the oldest file. So we start from the oldest file first.
		for _, t := range top {
			dkr := getKeyRange(t)
			if kr.overlapsWith(dkr) {
				out = append(out, t)
				kr.extend(dkr)
			} else {
				break
			}
		}
	}
	cd.thisRange = getKeyRange(out...)//L0 层的tables[0]起到没有交集的tables[i]
	cd.top = out

	left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)//找出cd.thisRange.left,cd.thisRange.right在cd.nextLevel.tables中的索引
	cd.bot = make([]*table.Table, right-left)
	copy(cd.bot, cd.nextLevel.tables[left:right])

	if len(cd.bot) == 0 {//cd.thisRange范围在cd.nextLevel的一个Table中
		cd.nextRange = cd.thisRange
	} else {
		cd.nextRange = getKeyRange(cd.bot...)
	}

 合并过程

把一次压缩过程,拆成多个片段,方便起多个迭代器进行并发的合并。创建新的table,并top和bot的表做为一个迭代器打开,从迭代器里面读出所有的kv对,过滤掉过期的,再插入到新table里面去。

	newIterator := func() []y.Iterator {
		// Create iterators across all the tables involved first.
		var iters []y.Iterator
		switch {
		case lev == 0:
			iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...)
		case len(topTables) > 0:
			y.AssertTrue(len(topTables) == 1)
			iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
		}
		// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
		return append(iters, table.NewConcatIterator(valid, table.NOCACHE))
	}
	addKeys := func(builder *table.Builder) {
		timeStart := time.Now()
		var numKeys, numSkips uint64
		var rangeCheck int
		var tableKr keyRange
		for ; it.Valid(); it.Next() {
			// See if we need to skip the prefix.
			if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
				numSkips++
				updateStats(it.Value())
				continue
			}

			// See if we need to skip this key.
			if len(skipKey) > 0 {
				if y.SameKey(it.Key(), skipKey) {
					numSkips++
					updateStats(it.Value())
					continue
				} else {
					skipKey = skipKey[:0]
				}
			}

			if !y.SameKey(it.Key(), lastKey) {
				firstKeyHasDiscardSet = false
				if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
					break
				}
				if builder.ReachedCapacity() {
					// Only break if we are on a different key, and have reached capacity. We want
					// to ensure that all versions of the key are stored in the same sstable, and
					// not divided across multiple tables at the same level.
					break
				}
				lastKey = y.SafeCopy(lastKey, it.Key())
				numVersions = 0
				firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0

				if len(tableKr.left) == 0 {
					tableKr.left = y.SafeCopy(tableKr.left, it.Key())
				}
				tableKr.right = lastKey

				rangeCheck++
				if rangeCheck%5000 == 0 {
					// This table's range exceeds the allowed range overlap with the level after
					// next. So, we stop writing to this table. If we don't do this, then we end up
					// doing very expensive compactions involving too many tables. To amortize the
					// cost of this check, we do it only every N keys.
					if exceedsAllowedOverlap(tableKr) {
						// s.kv.opt.Debugf("L%d -> L%d Breaking due to exceedsAllowedOverlap with
						// kr: %s\n", cd.thisLevel.level, cd.nextLevel.level, tableKr)
						break
					}
				}
			}

			vs := it.Value()
			version := y.ParseTs(it.Key())

			isExpired := isDeletedOrExpired(vs.Meta, vs.ExpiresAt)

			// Do not discard entries inserted by merge operator. These entries will be
			// discarded once they're merged
			if version <= discardTs && vs.Meta&bitMergeEntry == 0 {
				// Keep track of the number of versions encountered for this key. Only consider the
				// versions which are below the minReadTs, otherwise, we might end up discarding the
				// only valid version for a running transaction.
				numVersions++
				// Keep the current version and discard all the next versions if
				// - The `discardEarlierVersions` bit is set OR
				// - We've already processed `NumVersionsToKeep` number of versions
				// (including the current item being processed)
				lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
					numVersions == s.kv.opt.NumVersionsToKeep

				if isExpired || lastValidVersion {
					// If this version of the key is deleted or expired, skip all the rest of the
					// versions. Ensure that we're only removing versions below readTs.
					skipKey = y.SafeCopy(skipKey, it.Key())

					switch {
					// Add the key to the table only if it has not expired.
					// We don't want to add the deleted/expired keys.
					case !isExpired && lastValidVersion:
						// Add this key. We have set skipKey, so the following key versions
						// would be skipped.
					case hasOverlap:
						// If this key range has overlap with lower levels, then keep the deletion
						// marker with the latest version, discarding the rest. We have set skipKey,
						// so the following key versions would be skipped.
					default:
						// If no overlap, we can skip all the versions, by continuing here.
						numSkips++
						updateStats(vs)
						continue // Skip adding this key.
					}
				}
			}
			numKeys++
			var vp valuePointer
			if vs.Meta&bitValuePointer > 0 {
				vp.Decode(vs.Value)
			}
			switch {
			case firstKeyHasDiscardSet:
				// This key is same as the last key which had "DiscardEarlierVersions" set. The
				// the next compactions will drop this key if its ts >
				// discardTs (of the next compaction).
				builder.AddStaleKey(it.Key(), vs, vp.Len)
			case isExpired:
				// If the key is expired, the next compaction will drop it if
				// its ts > discardTs (of the next compaction).
				builder.AddStaleKey(it.Key(), vs, vp.Len)
			default:
				builder.Add(it.Key(), vs, vp.Len)
			}
		}

合并完成后

        合并后需要删除sst文件和创建新的sst,删除和创建需要原子性操作,要记录到mainfest文件里面。删除上层top的sst和下层bot的sst,再加上nextlevel层的sst 文件

        

func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet {
	changes := []*pb.ManifestChange{}
	for _, table := range newTables {
		changes = append(changes,
			newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType()))
	}
	for _, table := range cd.top {
		// Add a delete change only if the table is not in memory.
		if !table.IsInmemory {
			changes = append(changes, newDeleteChange(table.ID()))
		}
	}
	for _, table := range cd.bot {
		changes = append(changes, newDeleteChange(table.ID()))
	}
	return pb.ManifestChangeSet{Changes: changes}
}

 https://github.com/facebook/rocksdb/wiki/Leveled-Compaction

LSM Tree-Based存储引擎的compaction策略(feat. RocksDB)_LittleMagics的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/925516.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入解析:树结构及其应用

文章目录 学习树的基本概念理解树的遍历方式学习堆和优先队列的应用案例分析&#xff1a;使用堆进行Top K元素的查找结论 &#x1f389;欢迎来到数据结构学习专栏~深入解析&#xff1a;树结构及其应用 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;✨博客主页&#xff1a;IT陈…

提高生产力的强大开发工具

在当今快速发展的软件开发领域&#xff0c;提高生产效率和质量是每个开发团队追求的目标。JNPF&#xff08;Java Non-Enterprise Application Framework&#xff09;作为一种灵活且强大的开发工具&#xff0c;旨在帮助开发团队实现这一目标。本文将深入探讨JNPF如何提高生产力&…

非凸联合创始人李佐凡受邀出席复旦DSBA项目座谈会

8月17日&#xff0c;非凸科技联合创始人&CTO李佐凡受邀参加复旦管院数据科学与商业分析专业硕士&#xff08;DS&BA&#xff09;项目发展座谈会&#xff0c;与学校教授、老师在生源背景、课程教学、职业发展、学生培养和企业合作方面进行深入交流&#xff0c;旨在更好地…

【衍射光栅】用于Matlab的交互式衍射光栅模型研究

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【C++STL基础入门】vector运算和遍历、排序、乱序算法

文章目录 前言一、vector运算符1.1 比较运算符vector有哪些比较运算符&#xff1f;示例代码注意 1.2 下标运算符 二、算法2.1 算法需要的头文件2.2 遍历算法2.3 排序算法从大到小从小到大 2.4 乱序算法 总结 前言 C标准库提供了丰富的容器和算法&#xff0c;其中vector是最常用…

基本概念【算术、 关系、逻辑、位、字符串、条件、优先级等运算符】(三)-全面详解(学习总结---从入门到深化)

文章目录 运算符(operator) 算术运算符 赋值及其扩展赋值运算符 关系运算符 逻辑运算符 位运算符 字符串连接符 条件运算符 运算符优先级的问题 数据类型的转换 自动类型转换 强制类型转换 Scanner 处理键盘输入 运算符(operator) 计算机的基本用途就是执行数学运…

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

每日一题 113路径总和||(递归)

题目 给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22…

脱离束缚:数字化工厂中ARM控制器的革命性应用!

近年来&#xff0c;中国数字经济体系已进入高速增长阶段。制造业作为中国经济高质量发展的重要支撑力量&#xff0c;在面临生产成本不断上涨、关键装备和核心零部件“受制于人”等挑战时&#xff0c;建设数字化工厂已成必然。 数字化工厂数据采集出现的问题 在数字工厂的建设…

两个步骤让图片动起来!

在当今数字时代&#xff0c;动态图片已经成为了网页设计和移动应用设计的标配之一。动态图片能够吸引用户的注意力&#xff0c;提高用户体验和页面交互性。那么&#xff0c;图片怎么动起来&#xff1f;有什么好用的方法呢&#xff1f;下面我们来一起探讨一下。 通常我们认知的动…

【分布式技术专题】「OSS中间件系列」从0到1的介绍一下开源对象存储MinIO技术架构

MinIO背景介绍 MinIO创始者是Anand Babu Periasamy, Harshavardhana&#xff08;戒日王&#xff09;等人&#xff0c; Anand是GlusterFS的初始开发者、Gluster公司的创始人与CTO&#xff0c;Harshavardhana曾经是GlusterFS的开发人员&#xff0c;直到2011年红帽收购了Gluster公…

Web安全测试(三):SQL注入漏洞

一、前言 结合内部资料&#xff0c;与安全渗透部门同事合力整理的安全测试相关资料教程&#xff0c;全方位涵盖电商、支付、金融、网络、数据库等领域的安全测试&#xff0c;覆盖Web、APP、中间件、内外网、Linux、Windows多个平台。学完后一定能成为安全大佬&#xff01; 全部…

IDEA启动两个Tomcat服务的方式 使用nginx进行反向代理 JMeter测试分布式情况下synchronized锁失效

目录 引出IDEA启动Tomcat两个端口的方式1.编辑配置2.添加新的端口-Dserver.port80833.service里面管理4.启动后进行测试 使用nginx进行反向代理反向代理多个端口运行日志查看启动关闭重启 分布式情况下synchronized失效synchronized锁代码启动tomcat两个端口nginx反向代理JMete…

实验七 Linux 内核移植

【实验目的】 掌握 Linux 内核配置和编译的基本方法 【实验环境】 ubuntu 14.04 发行版FS4412 实验平台交叉编译工具&#xff1a;arm-none-linux-gnueabi- 【注意事项】 实验步骤中以“$”开头的命令表示在 ubuntu 环境下执行&#xff0c;以“#”开头的命令表 示在开发板下…

C++信息学奥赛1139:整理药名

#include <iostream> #include <string> using namespace std; int main() {int n;// 输入整数ncin>>n;cin.ignore();string arr[n];// 循环读取n行字符串for (int i 0; i<n ;i){getline(cin,arr[i]);}for (int i 0; i<n ;i){for(int j0;j<arr[i]…

【JSDocvscode】使用JSDoc、在vscode中开启node调试、使用vscode编写运行Python程序

JSDoc JSDoc是JavaScript的一种注释语法&#xff0c;同时通过JSDoc注释也可以规避js弱类型中不进行代码提示的问题 图形展示JSDoc的效果&#xff1a; 上述没有进行JSDoc&#xff0c;然后我们a点什么 是没有任何提示的 上述就是加上 JSDoc的效果 常用的 vscode 其实内置了 js…

IBM Spectrum LSF License Scheduler

LSF License Scheduler 提供了两个版本: Basic Edition 和 Standard Edition。 LSF License Scheduler Basic Edition 随附于 LSF Standard Edition 和 Advanced Edition &#xff0c;并非旨在应用有关如何在集群或项目之间共享许可证的策略。 相反&#xff0c; LSF License S…

[JavaWeb]【十四】web后端开发-MAVEN高级

目录 一、分模块设计与开发 1.1 分模块设计 1.2 分模块设计-实践​编辑 1.2.1 复制老项目改为spring-boot-management 1.2.2 新建maven模块runa-pojo 1.2.2.1 将原项目pojo复制到runa-pojo模块 1.2.2.2 runa-pojo引入新依赖 1.2.2.3 删除原项目pojo包 1.2.2.4 在spring-…

postman 调用webservice

有个外部接口需要提供古老的webservice 格式接口。 1 设置格式 按照xml 格式设置。 2 消息体xml 封装 不加envelope: <soap:Envelope xmlns:soap"" target"_blank">http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <soap…