【MIT6.824】lab2C-persistence, lab2D-log compaction 实现笔记

news2025/1/18 20:54:46

引言

lab2C的实验要求如下

Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or “serialize”) the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder; see the comments in persist() and readPersist(). labgob is like Go’s gob encoder but prints error messages if you try to encode structures with lower-case field names. For now, pass nil as the second argument to persister.Save(). Insert calls to persist() at the points where your implementation changes persistent state. Once you’ve done this, and if the rest of your implementation is correct, you should pass all of the 2C tests.

lab2D的实验要求如下

Implement Snapshot() and the InstallSnapshot RPC, as well as the changes to Raft to support these (e.g, operation with a trimmed log). Your solution is complete when it passes the 2D tests (and all the previous Lab 2 tests).

总体而言, lab2C需要我们实现关键数据的持久化,lab2D需要我们通过快照实现日志的压缩。代码可以在https://github.com/slipegg/MIT6.824中得到。所有代码均通过了1千次的测试。

lab2C 实现

在实验时测试2C时,测试代码将会尝试将某些节点从网络中断开,然后一段时间后再依据这些断开的节点的持久化的信息重新生成一个新的节点并加入到网络中,测试代码将会检测加入这个节点后是否与预期相同。

在初始化节点的时候,会传入一个Persister对象,这个对象充当一个硬盘的角色,用于持久化数据,后续在测试重新生成节点时,就需要传入旧节点的Persister对象,以便新节点能够从硬盘中读取旧节点的数据进行复原。

参考raft论文,我们需要持久化的数据有:

  • currentTerm
  • votedFor
  • log entries

在raft.go中,我们需要实现persist和readPersist函数,用于持久化和读取数据。

// persist saves Raft's persistent state to stable storage,
func (rf *Raft) persist() {
	rf.persister.Save(rf.encodeState(), rf.persister.snapshot)
}

func (rf *Raft) encodeState() []byte {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.logs)
	return w.Bytes()
}
// readPersist restores previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	var currentTerm int
	var votedFor int
	var logs []LogEntry
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	if d.Decode(&currentTerm) != nil ||
		d.Decode(&votedFor) != nil ||
		d.Decode(&logs) != nil {
		Debug(dError, "S%v failed to read persist", rf.me)
	} else {
		Debug(dInfo, "S%v read persist successfully", rf.me)
		rf.currentTerm = currentTerm
		rf.votedFor = votedFor
		rf.logs = logs
		rf.lastApplied = rf.getFirstIndex()
		rf.commitIndex = rf.getFirstIndex()
	}
}

然后我们需要在每次修改了持久化数据的地方调用persist函数,然后在初始化节点时调用readPersist函数来读取持久化数据,整体难度不大。

lab2D 实现

在实验时测试2D时,测试代码在接收到apply的命令id为9结尾时,就会调用节点的Snapshot函数进行快照,将日志压缩。代码需要做到在压缩日志后,仍然能够准确地运行。

首先需要完成快照生成的函数,如下所示,每次会传入需要快照到的日志index,以及当这个节点为止的状态机的快照数据,系统保证传入的日志index一定是已经apply过的。由于已经将状态机的内容放入到了snapshot中,所以其实包括index在内的前面的所有日志都可以删除了,但是由于在同步日志信息时,需要上一个日志的term信息,所以我们会单独保留id为index的日志的id和term信息,放在logs的第一位。

func (rf *Raft) Snapshot(index int, snapshot []byte) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if index <= rf.getFirstIndex() {
		Debug(dSnap, "S%v ignores the snapshot request with end index %v, because the index is not bigger than the first index %v", rf.me, index, rf.getFirstIndex())
		return
	}

	rf.logs = append([]LogEntry{{index, rf.logs[index-rf.getFirstIndex()].Term, nil}}, rf.logs[index-rf.getFirstIndex()+1:]...)
	rf.persister.Save(rf.encodeState(), snapshot)
	Debug(dSnap, "S%v applies the snapshot with end index %v, now the len(logs)=%v", rf.me, index, len(rf.logs))
}

由于快照的引入,现在logs中的第一个日志可能不再是0了,所以之前代码中所有从logs中依据日志index获取日志的代码都要修改为:rf.logs[index-rf.getFirstIndex()]

同时快照的引入还会导致在leader与follower进行日志同步时,需要的同步的日志可能已经没有了,所以这时候需要直接将整个日志发送给对方。

需要发送的快照请求如下:

func (rf *Raft) genInstallSnapshotRequest() *InstallSnapshotRequest {
	return &InstallSnapshotRequest{
		Term:             rf.currentTerm,
		LeaderId:         rf.me,
		LastIncludeIndex: rf.getFirstIndex(),
		LastIncludeTerm:  rf.logs[0].Term,
		Data:             rf.persister.ReadSnapshot(),
	}
}

follower接收到快照请求后,需要进行如下处理,主要就是检查这个快照有没有过期,是不是真的比自己当前commit的日志还要新,如果是的话,就将自己的日志全部删除,只保留快照中给的最后一个日志,作为logs中的第一个日志,然后再唤起applyCond进行快照的apply。

func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, reply *InstallSnapshotReply) {
	rf.mu.Lock()
	Debug(dSnap, "S%v {term: %v, commitIndex: %v}, received from S%v with InstallSnapshotRequest {%v} ", rf.me, rf.currentTerm, rf.commitIndex, request.LeaderId, request)
	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm
	if request.Term < rf.currentTerm {
		return
	}
	if request.Term > rf.currentTerm {
		rf.currentTerm = request.Term
		rf.votedFor = -1
		rf.persist()
	}
	rf.changeState(Follower)

	if request.LastIncludeIndex <= rf.commitIndex {
		return
	}

	rf.persister.Save(rf.encodeState(), request.Data)
	rf.commitIndex = request.LastIncludeIndex
	rf.logs = []LogEntry{{request.LastIncludeIndex, request.LastIncludeTerm, nil}} //2D遇到的bug所在
	Debug(dSnap, "S%v installs snapshot from S%v, now the commitIndex is %v", rf.me, request.LeaderId, rf.commitIndex)

	rf.waitApplySnapshotRequest = *request
	rf.applyCond.Signal()
}

如果leader接收到回复表示快照已经更新成功了,那么就更新这个节点的nextIndex和matchIndex。

func (rf *Raft) handleInstallSnapshotReply(peer int, request *InstallSnapshotRequest, reply *InstallSnapshotReply) {
	if reply.Term > rf.currentTerm {
		rf.changeState(Follower)
		rf.currentTerm = reply.Term
		rf.votedFor = -1
		rf.persist()
		Debug(dWarn, "S%v found higher term %v in InstallSnapshotReply %v from S%v, changes to follower", rf.me, reply.Term, reply, peer)
	} else {
		rf.nextIndex[peer] = request.LastIncludeIndex + 1
		rf.matchIndex[peer] = request.LastIncludeIndex
		Debug(dLog, "S%v has installed snapshot to S%v, now the S%v's nextIndex is %v", rf.me, peer, peer, rf.nextIndex[peer])
		rf.updateCommitIndexForLeader()
	}
}

注意为了能够有序地进行快照的apply,对原本的applier函数进行了修改,同时增加了waitApplySnapshotRequest来记录最新需要apply的快照请求。

其主要思想是每次唤起applyCond时,先检查是否有新的快照请求,即waitApplySnapshotRequest的Term是否为-1,如果不为-1,那么就进行快照的apply,快照apply了之后再把waitApplySnapshotRequest的Term设置为-1。如果没有新的快照请求,那么就进行日志的apply。

func (rf *Raft) applier() {
	for !rf.killed() {
		rf.mu.Lock()
		for rf.lastApplied >= rf.commitIndex {
			rf.applyCond.Wait()
		}

		if rf.waitApplySnapshotRequest.Term != -1 {
			if rf.lastApplied < rf.waitApplySnapshotRequest.LastIncludeIndex {
				rf.mu.Unlock()

				rf.applyCh <- ApplyMsg{ //Question: two applyCh update way, how to update orderly?
					SnapshotValid: true,
					Snapshot:      rf.waitApplySnapshotRequest.Data,
					SnapshotTerm:  rf.waitApplySnapshotRequest.LastIncludeTerm,
					SnapshotIndex: rf.waitApplySnapshotRequest.LastIncludeIndex,
				}

				rf.mu.Lock()
				rf.lastApplied = rf.waitApplySnapshotRequest.LastIncludeIndex
				Debug(dSnap, "S%v applies snapshot from S%v, now the lastApplied is %v", rf.me, rf.waitApplySnapshotRequest.LeaderId, rf.lastApplied)

			}
			rf.waitApplySnapshotRequest = InstallSnapshotRequest{Term: -1}
			rf.mu.Unlock()
		} else {
			commitIndex, lastApplied := rf.commitIndex, rf.lastApplied
			if rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {
				Debug(dWarn, "S%v has no log to apply, because lastApplied %v < firstIndex %v", rf.me, lastApplied, rf.getFirstIndex())
				rf.mu.Unlock()
				continue
			}
			entries := make([]LogEntry, commitIndex-lastApplied)
			Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",
				rf.me, lastApplied, rf.getFirstIndex(), commitIndex)
			copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])
			rf.mu.Unlock()

			for _, entry := range entries {
				rf.applyCh <- ApplyMsg{
					CommandValid: true,
					Command:      entry.Command,
					CommandIndex: entry.Index,
					CommandTerm:  entry.Term,
				}
			}

			rf.mu.Lock()
			Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",
				rf.me, lastApplied+1, len(entries), rf.lastApplied)
			rf.lastApplied = commitIndex
			rf.mu.Unlock()
		}
	}
}

问题记录

当时写的时候也感觉不是特别复杂,但是后面测试的时候发现这里还是有很多需要注意的点,容易导致错误。快照的引入导致的一个重要的问题是我们现在有两种方式来更新状态机的数据,一种是通过日志的apply,一种是通过快照的apply。

一开始的写法是在接收到快照请求进行InstallSnapshot的处理的时候新起了一个go协程来直接对快照进行apply,但是这会导致一系列的问题。

一开始我们对这两者的并发做什么限制,那么这就有可能出现下面这种情况:

  1. follower节点接受到快照同步请求,并且开启一个协程开始进行快照的apply
  2. 在快照的apply之前,follower节点接收到下一个日志的同步的请求,开始进行日志的apply

这两个apply的顺序其实是不确定的,很有可能就会出现先进行日志的apply,然后再进行快照的apply,这样就会导致状态机的数据不一致,所以需要控制在快照进行apply的时候,不允许进行日志的apply。

然后我采用的方法是控制节点的lastApplied值,即在开启协程进行快照的apply前将lastApplied值设置为-1,然后在快照的apply结束后再将lastApplied设置为快照的index值,然后在日志进行apply的时候,对lastApplied进行判断,如果lastApplied值为-1,那么就进行锁等待,直到lastApplied值不为-1,然后再进行日志的apply。但是这种方法在测试的时候会发现,进行1000次测试大约会有0~3次的可能出现错误,错误的原因是在进行日志的apply的时候,需要apply的日志已经在logs中没有了,导致了取值的错误,也就是并发控制没有成功,在进行了快照的apply后,日志的apply依旧在进行。

经过debug发现这是由于出现了如下这种情况:

  1. followe节点接收到日志同步的请求,开启一个协程进行日志的apply
  2. leader节点已经进行了快照,然后由于超时又给该follower节点发送了日志同步的请求
  3. follower节点接收到快照同步的请求,设置lastApplied为-1,然后开启一个协程进行快照的apply
  4. follower节点结束了日志的apply,将lastApplied设置为日志的index,然后follower节点继续检查,发现lastApplied不为-1,且lastApplied小于commitIndex,所以继续进行日志的apply,然后在logs中取日志时发现该日志已经没有了,导致错误。

所以通过lastApplied进行并发控制并不可行,最后采用的方法是添加了snapApplyCount变量,每次在进行快照的apply时,将snapApplyCount加1,快照的apply结束后将snapApplyCount减1,然后在进行日志的apply时,如果snapApplyCount不为0,那么就进入锁等待。

注意在完成快照的apply后,有可能节点已经接收到了leader同步来的其他日志,所以需要在结束后检查是否有新的日志需要apply,如果需要就唤起日志的apply。最后处理快照同步请求的代码如上述的InstallSnapshot所示,日志apply的代码如下:

func (rf *Raft) applier() {
	for !rf.killed() {
		rf.mu.Lock()
		for rf.snapApplyCount != 0 || rf.lastApplied >= rf.commitIndex {
			rf.applyCond.Wait()
		}

		commitIndex, lastApplied := rf.commitIndex, rf.lastApplied
		if rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {
			rf.mu.Unlock()
			continue
		}
		entries := make([]LogEntry, commitIndex-lastApplied)
		Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",
			rf.me, lastApplied, rf.getFirstIndex(), commitIndex)
		copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])
		rf.mu.Unlock()

		for _, entry := range entries {
			rf.applyCh <- ApplyMsg{
				CommandValid: true,
				Command:      entry.Command,
				CommandIndex: entry.Index,
				CommandTerm:  entry.Term,
			}
		}

		rf.mu.Lock()
		Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",
			rf.me, lastApplied+1, len(entries), rf.lastApplied)
		rf.lastApplied = commitIndex
		rf.mu.Unlock()
	}
}

但是上述方法后面经过测试发现也还是有少量的bug,bug的主要原因在于如下这种情况:

  1. follower节点接收到最后日志为x的快照同步请求,开启一个协程进行快照的apply
  2. follower节点又接收到最后日志为x+10的快照同步请求,开启一个协程进行快照的apply
  3. follower先完成了x+10的快照的apply,然后才完成了x的快照的apply,但是这时候它会将lastApplied设置为x,同时apply的顺序也出现了错误。

纵观上面的问题的一大根源在于我们出现了多个apply的协程,而没有对协程进行很好的并发控制,所以最后采取了上述的发型,将所有的apply都放在一个协程中进行,优先进行快照的apply,进测试可以准确地通过。

实验结果

最终对lab2中所有的测试进行了1000次的测试,全部通过。

请添加图片描述

总结

整个lab2中感觉难度最大的还是lab2B,因为需要实现的功能比较多,需要多多参考raft论文中的论文,最为印象深刻的就是lab2D中的并发问题了,这种问题确实在一开始实现的时候比较难想到,需要通过实验发现,而这种1000次测试才出现一两次错误的问题就更加难发现了,需要有全面的日志记录和多次重复实验的系统才行,后面有机会也分享一下有关日志记录和重复实验相关的内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1608268.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《游戏系统设计十二》灵活且简单的条件检查系统

目录 1、序言 2、需求 3、实现 3.1 思路 3.2 代码实现 4、总结 1、序言 每个游戏都有一些检查性的任务&#xff0c;在做一些判断的时候&#xff0c;判断等级是不是满足需求。 比如如下场景&#xff1a;在进入副本的时候需要检查玩家等级是否满足&#xff0c;满足之后才…

配置linux的oracle 21c启停服务

一、配置启停 1、使用root用户登陆 su - root 2、修改oratab文件 修改oratab文件&#xff0c;将红框里面的N改为“Y”&#xff0c;使启停脚本能够生效 vi /etc/oratab 3、验证 配置好后就能够使用 dbshut 停止服务 和 dbstart 启动服务 了 2.1启动服务 su - oracle dbstart…

现代商业中首席人工智能官(CAIO)的角色与影响

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Spark01

Spark01 一. Spark概述二. Spark环境部署 - Local三. Spark环境部署 - Standalone1. Standalone集群概述2. Standalone环境部署3. 测试环境 四. Spark环境部署 - Standalone-HA1. 安装部署Zookeeper1. 下载2. zookeeper安装3. 配置StandAlone-HA集群 五. Spark On YARN -- 重点…

深入挖掘C语言 ---- 文件操作

目录 1. 文件的打开和关闭1.1 流和标准流1.1.1流1.1.2标准流 1.2 文件指针1.3 文件的打开和关闭 2. 顺序读写3. 随机读写3.1 fseek3.2 ftell3.3 rewind 4. 读取结束判定 正文开始 1. 文件的打开和关闭 1.1 流和标准流 1.1.1流 我们程序的数据需要输出到各种外部设备, 也需要…

小白也能看懂的BEV感知技术(二)

1. 引言 在自动驾驶的领域中&#xff0c;BEV&#xff08;Birds Eye View&#xff0c;鸟瞰图&#xff09;感知技术扮演着至关重要的角色。它允许自动驾驶车辆从上帝视角“看到”周围的环境&#xff0c;就像一只鸟从空中俯瞰地面一样。这项技术对于理解车辆周围的复杂场景至关重…

【Linux系统】地址空间 Linux内核进程调度队列

1.进程的地址空间 1.1 直接写代码&#xff0c;看现象 1 #include<stdio.h>2 #include<unistd.h>3 4 int g_val 100;5 6 int main()7 {8 int cnt 0;9 pid_t id fork();10 if(id 0)11 {12 while(1)13 {14 printf(&…

javaagent使用

Java Agent是什么&#xff1f; Java Agent是Java平台提供的一个强大工具&#xff0c;它可以在运行时修改或增强Java应用程序的行为。是在JDK1.5以后引入的&#xff0c;它能够在不影响正常编译的情况下修改字节码&#xff0c;相当于是在main方法执行之前的拦截器&#xff0c;也叫…

Python | Leetcode Python题解之第32题最长有效括号

题目&#xff1a; 题解&#xff1a; class Solution:def longestValidParentheses(self, s: str) -> int:stack[]maxL0nlen(s)tmp[0]*n #标记数组cur0for i in range(n):if s[i](:stack.append(i)else:if stack:jstack.pop()if s[j](:tmp[i],tmp[j]1,1 #匹配成…

【C++】:C++关键字,命名空间,输入输出,缺省参数

目录 一&#xff0c;C关键字(C98)二&#xff0c;命名空间2.1 命名冲突2.2 关键字namespace2.2.1 命名空间中可以定义变量/函数/类型2.2.2 命名空间可以嵌套2.2.3 同一个工程中允许存在多个相同名称的命名空间,编译器最后会合成同一个命名空间中。 2.3 命名空间的使用2.3.1 指定…

多模态AnyGPT——整合图像、语音和文本多模态大规模语言模型算法原理与实践

概述 大规模语言模型在理解和生成人类语言方面具有非凡的能力&#xff0c;但迄今为止&#xff0c;它们的能力主要局限于文本处理。然而&#xff0c;现实世界是一个多模式的环境&#xff0c;信息通过视觉、听觉和触觉等多种感官进行交换。融入这种多样性是开发下一代系统的主要…

桥接模式【结构型模式C++】

1.概述 桥接模式是一种结构型设计模式&#xff0c;是用于把抽象化与实现化解耦&#xff0c;使得二者可以独立变化。这种类型的设计模式属于结构型模式&#xff0c;它通过提供抽象化和实现化之间的桥接结构&#xff0c;来实现二者的解耦。 这种模式涉及到一个作为桥接的接口&am…

高斯溅射融合之路(一)- webgl渲染3d gaussian splatting

大家好&#xff0c;我是山海鲸的技术负责人。之前已经写了一个GIS融合系列。其实CesiumJS的整合有相当的难度&#xff0c;同时也有很多方面的工作&#xff0c;很难在几篇文章内写完&#xff0c;整个山海鲸团队也是投入了接近两年的时间&#xff0c;才把周边整套工具链进行了完善…

算法复杂度分析笔记

基本定义间的关系 算法介绍 算法分析 时间复杂度 用数量级刻画&#xff1a;忽略所有低次幂项和系数 eg1: eg2: eg3: eg4: 小结 空间复杂度 eg: 总结

Vue3从入门到实践:深度了解新组件

1.Teleport 概念&#xff1a;Teleport&#xff08;传送门&#xff09;是一个新的特性&#xff0c;用于在DOM中的任意位置渲染组件。它允许你将组件的内容渲染到DOM中的另一个位置&#xff0c;而不受组件层次结构的限制。 下面举出例子解释&#xff1a; 1.新建App.vue文件作…

YOLOv9改进策略 | Neck篇 | 2024.1最新MFDS-DETR的HS-FPN改进特征融合层(轻量化Neck、全网独家首发)

一、本文介绍 本文给大家带来的改进机制是最近这几天最新发布的改进机制MFDS-DETR提出的一种HS-FPN结构&#xff0c;其是一种为白细胞检测设计的网络结构&#xff0c;主要用于解决白细胞数据集中的多尺度挑战。它的基本原理包括两个关键部分&#xff1a;特征选择模块和特征融合…

vue快速入门(三十一)vscod开发vue需要下载的插件

步骤很详细&#xff0c;直接上教程 上一篇 暂时就这两样足矣&#xff0c;有新的以后再更新&#xff08;别下载太多&#xff0c;可能会冲突&#xff09; 测试一下&#xff1a; 提示功能&#xff1a; 代码补全功能&#xff1a;

基于弹簧鞘复合纱和迁移学习算法的可穿戴人体重构和智能试衣系统

研究背景 在信息时代和元宇宙的背景下&#xff0c;虚拟服装设计对满足服装行业的个性化需求至关重要。与传统方法不同&#xff0c;虚拟试衣节省时间、方便客户&#xff0c;并提供多样化的款式。准确得测量人体围度并重构出人体的模型是虚拟试衣的关键。为了实现动态人体重构&a…

第Y7周:训练自己的数据集

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制&#x1f680; 文章来源&#xff1a;K同学的学习圈子 目录 一、下载YOLOv8 二、配置环境 三、准备工作 四、运行 出现报错&#xff1a;…

《QT实用小工具·三十一》基于QT开发的访客管理平台demo2

1、概述 源码放在文章末尾 该项目为访客管理平台demo&#xff0c;包含主界面、系统设置、警情查询、调试帮助、用户退出功能。 项目部分代码如下&#xff1a; #pragma execution_character_set("utf-8")#include "frmmain.h" #include "ui_frmmain…