MIT6.824 2022 Raft

MIT6.824 2022 Raft

  • Raft
    • leader election
    • log
    • persistence
    • log compaction
    • 整体测试


leader election

if rf.state != Leader {
args := AppendEntriesArgs{Term: rf.currentTerm, LeaderId:}

flag := (rf.state == Leader)
args := AppendEntriesArgs{Term: rf.currentTerm, LeaderId:}
if !flag {

RequestVote RPC中 at least as up-to-date对应于:

Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs. If the logs have last entries with different terms, then the log with the later term is more up-to-date. If the logs end with the same term, then whichever log is longer is more up-to-date.

由于是at least as,还包括相等的情况。

votedFor:candidateId that received vote in current term (or null if none)
当前任期接受到的选票的候选者 ID(初值为 null)

If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate
如果选举定时器超时时,没有收到 leader 的的追加日志请求 或者没有投票给候选者,该机器转化为候选者。
注意不要忽略granting vote to candidate这一条
MIT 6.824 Spring 2020 Lab2 Raft 实现笔记


2B阶段实现起来没用多久,但一直在调试。发现vscode自带的调试功能挺好用的(go test插件?)
currentTerm: latest term server has seen (initialized to 0 on first boot, increases monotonically)
If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower
votedFor:candidateId that received vote in current term (or null if none)
log[]:log entries; each entry contains command for state machine, and term when entry was received by leader (first index is 1)
日志记录,起始索引为1,可以在初始化时加入term为0的假日志,便于程序统一处理(lastLogTerm prevLogTerm)
AppendEntries RPC
rule 5:index of last new entry而不是本地日志最大索引值,prevLogIndex+len(entries)
RequestVote RPC

	condition1 := (rf.votedFor == InvalidId) || (rf.votedFor == args.CandidateId)
	condition2 := (args.LastLogTerm > lastLogTerm) || ((args.LastLogTerm == lastLogTerm) && (args.LastLogIndex >= lastLogIndex))

问题原因:写2A的时候没有对lastLogIndex lastLogTerm赋值,RequestVote在对比时一直通不过。

问题表现:测试刚显示通过,然后程序就runtime error,单独运行某个测试样例又无法复现

在明显的问题解决完后后,运行一次go test -run 2B -race显示全部通过,但执行脚本运行100次有时候却出现了几次错误。这种一定几率出现的问题就比较麻烦了。

for i in {1..100}; do go test -run 2B -race; done
Test (2B): concurrent Start()s ...
--- FAIL: TestConcurrentStarts2B (31.27s)
    config.go:549: only 1 decided for index 6; wanted 3
Test (2B): RPC counts aren't too high ...
--- FAIL: TestCount2B (32.34s)
    config.go:549: only 2 decided for index 11; wanted 3


Students’ Guide to Raft
Raft Q&A
Debugging by Pretty Printing
Instructors’ Guide to Raft
发现我遇到的一些问题都在Students’ Guide to Raft提到了,不过我也对这些点记忆深刻了。
Students’ Guide to Raft记录
1 论文中的表述大多时候是must而不是should,例如不是说每当server接收RPC调用时都应该重置选举定时器,而是在receiving AppendEntries RPC from current leader or granting vote to candidate时候才重置选举定时器。
2 心跳信息不应该被视作特殊的一种消息,当follower接收了该心跳消息,则隐式地表明当前日志已于该server匹配,而后进行错误的提交。
3 当接收心跳消息后,简单地切断server日志prevLogIndex的部分,这同样是不对的,论文中的表述为If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it,以上为一个条件语句。
4 当选举定时器过期后,即使上一任选举尚未完成,也应该进行下次的选举。
5 reply false意味着立即返回
6 AppendEntries处理过程中即使prevLogIndex对应位置条目不存在,同样视作日志不匹配
7 检查AppendEntries函数,即使entries为空也同样能执行成功,同样考虑本地日志越界的情况
8 认真理解last new entry与at least as up-to-date的含义
9 注意更新commitIndex时log[N].term == currentTerm
10 nextIndex是乐观估计,matchIndex是悲观估计,即使在很多时候赋值都为nextIndex = matchIndex + 1
11 在接收的旧的RPC回复时,比较当前term与arg中term,如果不一样,则直接返回,不进行处理


1 对于心跳消息可以直接复用Start调用的协商函数,心跳消息同样可以用来同步各server的日志
2 在判断RPC是否成功之后,reply处理之前比对当前Term与参数Term,如果不一致则直接返回
3 避免matchIndex的后退

rf.matchIndex[server] = Max(rf.matchIndex[server], args.PrevLogIndex+len(args.Entries))
rf.nextIndex[server] = rf.matchIndex[server] + 1

4 我认为论文中重试不是立即重试,而是等到定时器过期时再进行重试。
选举时RPC失败,则选举定时器过期,进行下一次选举时重试RequestVote RPC
协商时RPC失败,则心跳定时器过期,同步日志时重试AppendEntries RPC

在执行1000次go test时,以上的两个问题都没再出现,而是出现data race的警告

Test (2B): leader backs up quickly over incorrect follower logs ...
Write at 0x00c0005b0988 by goroutine 277:
      /usr/lib/go-1.13/src/runtime/slice.go:197 +0x0
      /root/mit6.824/6.824/src/raft/raft.go:291 +0x663
      /usr/lib/go-1.13/src/runtime/asm_amd64.s:539 +0x3a
      /usr/lib/go-1.13/src/reflect/value.go:321 +0xd3
      /root/mit6.824/6.824/src/labrpc/labrpc.go:496 +0x811
      /root/mit6.824/6.824/src/labrpc/labrpc.go:420 +0x607
      /root/mit6.824/6.824/src/labrpc/labrpc.go:240 +0x93

Previous read at 0x00c0005b0988 by goroutine 323:
      /usr/lib/go-1.13/src/reflect/value.go:976 +0x1df
      /usr/lib/go-1.13/src/encoding/gob/encode.go:328 +0x436
      /usr/lib/go-1.13/src/encoding/gob/encode.go:581 +0xf0
      /usr/lib/go-1.13/src/encoding/gob/encode.go:351 +0x26f
      /usr/lib/go-1.13/src/encoding/gob/encode.go:551 +0x1a3
      /usr/lib/go-1.13/src/encoding/gob/encode.go:328 +0x436
      /usr/lib/go-1.13/src/encoding/gob/encode.go:701 +0x1fe
      /usr/lib/go-1.13/src/encoding/gob/encoder.go:251 +0x666
      /usr/lib/go-1.13/src/encoding/gob/encoder.go:176 +0x5b
      /root/mit6.824/6.824/src/labgob/labgob.go:36 +0x7b
      /root/mit6.824/6.824/src/labrpc/labrpc.go:93 +0x198
      /root/mit6.824/6.824/src/raft/raft.go:336 +0xd5
      /root/mit6.824/6.824/src/raft/raft.go:388 +0x562

Goroutine 277 (running) created at:
      /root/mit6.824/6.824/src/labrpc/labrpc.go:239 +0x174

Goroutine 323 (running) created at:
      /root/mit6.824/6.824/src/raft/raft.go:376 +0xa3
--- FAIL: TestBackup2B (16.85s)
    testing.go:853: race detected during execution of test


// 错误的
args.Entries = rf.log[rf.nextIndex[server]:]
// 正确的
args.Entries = append(args.Entries, rf.log[rf.nextIndex[server]:]...)



2C编写的代码比较简单,只需要照着persist/readPersist的example实现对应函数,然后在3个持久状态改变后调用persist函数。AppendEntries RPC的next index优化参照guidance中提供的方法

The accelerated log backtracking optimization is very underspecified, probably because the authors do not see it as being necessary for most deployments. It is not clear from the text exactly how the conflicting index and term sent back from the client should be used by the leader to determine what nextIndex to use. We believe the protocol the authors probably want you to follow is:
If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None.
If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm.
Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log.
If it does not find an entry with that term, it should set nextIndex = conflictIndex.

the one beyond the index不怎么能理解,就直接设置成最后一个等于该term的索引。next index只影响效率,不怎么影响正确性。如果2B没啥问题,2C也应该没啥问题,如果2C有问题,就应该运行几千次2B测试,验证一下。


Test (2C): basic persistence ...
  ... Passed --   4.1  3  216   46172    6
Test (2C): more persistence ...
2022/10/30 06:44:20 next index retry. cur term:5 me:3 server:1 value:2   reply:term:-1  index:2
2022/10/30 06:44:20 next index retry. cur term:5 me:3 server:2 value:2   reply:term:-1  index:2
2022/10/30 06:44:22 next index retry. cur term:9 me:1 server:4 value:5   reply:term:-1  index:5
2022/10/30 06:44:22 next index retry. cur term:9 me:1 server:0 value:5   reply:term:-1  index:5
2022/10/30 06:44:24 next index retry. cur term:13 me:4 server:3 value:8   reply:term:-1  index:8
2022/10/30 06:44:24 next index retry. cur term:13 me:4 server:2 value:8   reply:term:-1  index:8
2022/10/30 06:44:29 next index retry. cur term:16 me:2 server:1 value:11   reply:term:-1  index:11
2022/10/30 06:44:29 next index retry. cur term:16 me:2 server:0 value:11   reply:term:-1  index:11
2022/10/30 06:44:31 next index retry. cur term:20 me:0 server:4 value:14   reply:term:-1  index:14
2022/10/30 06:44:31 next index retry. cur term:20 me:0 server:3 value:14   reply:term:-1  index:14
  ... Passed --  16.0  5  982  207944   16
Test (2C): partitioned leader and one follower crash, leader restarts ...
2022/10/30 06:44:35 next index retry. cur term:3 me:2 server:1 value:2   reply:term:-1  index:2
  ... Passed --   1.9  3   51   12120    4
Test (2C): Figure 8 ...
2022/10/30 06:44:37 next index retry. cur term:4 me:0 server:1 value:3   reply:term:-1  index:3
2022/10/30 06:44:41 next index retry. cur term:11 me:2 server:1 value:9   reply:term:-1  index:9
2022/10/30 06:44:41 next index retry. cur term:12 me:1 server:4 value:4   reply:term:-1  index:4
2022/10/30 06:44:41 next index retry. cur term:12 me:1 server:3 value:5   reply:term:-1  index:5
2022/10/30 06:44:43 next index retry. cur term:13 me:3 server:2 value:9   reply:term:8  index:9
2022/10/30 06:44:43 next index retry. cur term:14 me:4 server:1 value:10   reply:term:-1  index:10
2022/10/30 06:44:44 next index retry. cur term:15 me:2 server:3 value:11   reply:term:-1  index:11
2022/10/30 06:44:45 next index retry. cur term:16 me:3 server:0 value:10   reply:term:-1  index:10
2022/10/30 06:44:45 next index retry. cur term:16 me:3 server:0 value:9   reply:term:8  index:9
2022/10/30 06:44:46 next index retry. cur term:17 me:0 server:2 value:13   reply:term:-1  index:13
2022/10/30 06:44:47 next index retry. cur term:19 me:1 server:4 value:12   reply:term:-1  index:12
2022/10/30 06:44:48 next index retry. cur term:20 me:2 server:3 value:14   reply:term:-1  index:14
2022/10/30 06:44:49 next index retry. cur term:22 me:4 server:0 value:16   reply:term:-1  index:16
2022/10/30 06:44:50 next index retry. cur term:24 me:3 server:2 value:19   reply:term:-1  index:19
2022/10/30 06:44:52 next index retry. cur term:27 me:3 server:0 value:23   reply:term:-1  index:23
2022/10/30 06:44:52 next index retry. cur term:27 me:3 server:2 value:23   reply:term:-1  index:23
2022/10/30 06:44:53 next index retry. cur term:29 me:3 server:2 value:25   reply:term:-1  index:25
2022/10/30 06:44:53 next index retry. cur term:29 me:3 server:0 value:25   reply:term:-1  index:25
2022/10/30 06:44:54 next index retry. cur term:30 me:0 server:1 value:17   reply:term:-1  index:17
2022/10/30 06:44:54 next index retry. cur term:31 me:2 server:4 value:21   reply:term:-1  index:21
2022/10/30 06:44:55 next index retry. cur term:34 me:4 server:2 value:29   reply:term:-1  index:29
2022/10/30 06:44:55 next index retry. cur term:34 me:4 server:1 value:29   reply:term:-1  index:29
2022/10/30 06:44:57 next index retry. cur term:36 me:2 server:0 value:28   reply:term:-1  index:28
2022/10/30 06:44:58 next index retry. cur term:37 me:1 server:3 value:27   reply:term:-1  index:27
2022/10/30 06:44:58 next index retry. cur term:39 me:3 server:0 value:33   reply:term:-1  index:33
2022/10/30 06:44:58 next index retry. cur term:39 me:3 server:4 value:32   reply:term:-1  index:32
2022/10/30 06:44:59 next index retry. cur term:40 me:4 server:2 value:33   reply:term:-1  index:33
2022/10/30 06:45:00 next index retry. cur term:41 me:2 server:1 value:34   reply:term:-1  index:34
2022/10/30 06:45:01 next index retry. cur term:42 me:0 server:3 value:35   reply:term:-1  index:35
2022/10/30 06:45:02 next index retry. cur term:43 me:1 server:4 value:35   reply:term:40  index:35
2022/10/30 06:45:02 next index retry. cur term:43 me:1 server:3 value:35   reply:term:-1  index:35
race: limit on 8128 simultaneously alive goroutines is exceeded, dying

协程数目超出限制,参照Kill函数的注释,在每个定时任务中加入 !rf.killed()判断,以便在测试完成后退出协程。

// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
func (rf *Raft) Kill() {
	atomic.StoreInt32(&rf.dead, 1)
	// Your code here, if desired.
// 向各节点发送心跳消息
func (rf *Raft) heartBeatTask() {
	for !rf.killed() {
		flag := (rf.state == Leader)
		if !flag {
		go rf.agreementTask()
// 向server提交命令
func (rf *Raft) Submitter() {
	for !rf.killed() {
		if rf.lastApplied < rf.commitIndex {
			msg := ApplyMsg{CommandValid: true, Command: rf.log[rf.lastApplied].Command, CommandIndex: rf.lastApplied}
			rf.applyCh <- msg
		time.Sleep(10 * time.Millisecond)

log compaction


rf.log[index]    ->  rf.log[index-X]
len(rf.log)      ->  X+len(rf.log)
len(rf.log) - 1  ->  X+len(rf.log)-1


  1. 日志永远不能为空
  2. follower日志lastIncludeIndex/commitIndex之前的term默认与leader的term匹配
  3. 在AppendEntries函数中ConflictIndex应大于lastIncludedIndex
  4. 在AppendEntriesRPC处理重传时,find same term的范围应该大于lastIncludedIndex
  5. 在同一个协程中apply快照与日志

第一条很容易理解,无论如何,日志都应该存在一条lastIncludeIndex日志,故在InstallSnapshot的step7中虽然写的是discard the entire log,也要加上一条快照对应的lastIncludeIndex日志,Snapshot函数中,少删除一条日志,保证0位为lastIncludeIndex日志。

第二条 leader拥有所有已提交的日志,提交日志相同索引的日志内容一致,所以没必要检查term(可用于AppendEntries中step2 prelog 匹配与step3 different term)

Leader Completeness: if a log entry is committed in a given term, then that entry will be present in the logs of the leaders for all higher-numbered terms.

第三第四条实际上描述的事情差不多,日志0位的term最好只在RequestVote RPC相关的LastLogTerm被用到,preLogTerm可根据第二条规则默认匹配,nextIndex相关的操作不访问日志0位。


Previously, this lab recommended that you implement a function called CondInstallSnapshot to avoid the requirement that snapshots and log entries sent on applyCh are coordinated. This vestigal API interface remains, but you are discouraged from implementing it: instead, we suggest that you simply have it return true.

func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash bool) {
	iters := 30
	servers := 3
	cfg := make_config(t, servers, !reliable, true)
	defer cfg.cleanup()

	cfg.begin(name), servers, true)
	leader1 := cfg.checkOneLeader()

	for i := 0; i < iters; i++ {
		victim := (leader1 + 1) % servers
		sender := leader1
		if i%3 == 1 {
			sender = (leader1 + 1) % servers
			victim = leader1
		DPrintf("%v time,vicitm is %v\n", i, victim)
		if disconnect {
			cfg.disconnect(victim), servers-1, true)
		if crash {
			DPrintf("%v crash", victim), servers-1, true)

		// perhaps send enough to get a snapshot
		nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval)
		for i := 0; i < nn; i++ {

		// let applier threads catch up with the Start()'s
		if disconnect == false && crash == false {
			// make sure all followers have caught up, so that
			// an InstallSnapshot RPC isn't required for
			// TestSnapshotBasic2D()., servers, true)
		} else {, servers-1, true)

		if cfg.LogSize() >= MAXLOGSIZE {
			cfg.t.Fatalf("Log size too large")
		if disconnect {
			// reconnect a follower, who maybe behind and
			// needs to rceive a snapshot to catch up.
			cfg.connect(victim), servers, true)
			leader1 = cfg.checkOneLeader()
		if crash {
			cfg.start1(victim, cfg.applierSnap)
			DPrintf("%v restart\n", victim)
			cfg.connect(victim), servers, true)
			leader1 = cfg.checkOneLeader()

在test_test.go添加打印后发现,vicitm server一直卡在crash和restart中间阶段,检查代码发现卡死在Make函数中发送快照至service的操作。故修改实现,接收到 InstallSnapshot RPC调用或重启时只修改commitIndex,不提交至service,只在一个协程中修改lastApplied。

一: 快照编码错误
在刚开始实现的时候,我将lastIncludeIndex和lastIncludeTerm持久化到Persister的snapshot成员中,一到crash测试就显示snapshot decode error。

Test (2D): install snapshots (crash) ...
2022/11/19 05:26:22 0 become leader
2022/11/19 05:26:24 2 become leader
2022/11/19 05:26:24 snapshot decode error
exit status 1


// 生成快照
if (m.CommandIndex+1)%SnapShotInterval == 0 {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	var xlog []interface{}
	for j := 0; j <= m.CommandIndex; j++ {
		xlog = append(xlog, cfg.logs[i][j])
	rf.Snapshot(m.CommandIndex, w.Bytes())
// 读取快照
if cfg.saved[i] != nil {
	cfg.saved[i] = cfg.saved[i].Copy()

	snapshot := cfg.saved[i].ReadSnapshot()
	if snapshot != nil && len(snapshot) > 0 {
		// mimic KV server and process snapshot now.
		// ideally Raft should send it up on applyCh...
		err := cfg.ingestSnap(i, snapshot, -1)
		if err != "" {

func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string {
	if snapshot == nil {
		log.Fatalf("nil snapshot")
		return "nil snapshot"
	r := bytes.NewBuffer(snapshot)
	d := labgob.NewDecoder(r)
	var lastIncludedIndex int
	var xlog []interface{}
	if d.Decode(&lastIncludedIndex) != nil ||
		d.Decode(&xlog) != nil {
		log.Fatalf("snapshot decode error")
		return "snapshot Decode() error"
	if index != -1 && index != lastIncludedIndex {
		err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", i)
		return err
	cfg.logs[i] = map[int]interface{}{}
	for j := 0; j < len(xlog); j++ {
		cfg.logs[i][j] = xlog[j]
	cfg.lastApplied[i] = lastIncludedIndex
	return ""


二:InstallSnapshot RPC理解错误

If existing log entry has same index and term as snapshot’s last included entry, retain log entries following it and reply

对于InstallSnapshot RPC的step 6理解错误,理解成是否存在相同的快照,比对lastIncludeIndex与lastIncludeTerm去了。实际上直接检查相同index term是否一致即可,commit之前默认一致。

logIndex := args.LastIncludedIndex - rf.lastIncludedIndex
if logIndex < 0 {
	WPrintf("server snapshot is newer. server id:%v lastIncludedIndex:%v  log len:%v\n",, rf.lastIncludedIndex, len(rf.log))
if logIndex < len(rf.log) && rf.log[logIndex].Term == args.LastIncludedTerm { // step 5
	DPrintf("same log entry. server id:%v lastIncludedIndex:%v  log len:%v\n",, rf.lastIncludedIndex, len(rf.log))




1 垃圾回收

Raft must discard old log entries in a way that allows the Go garbage collector to free and re-use the memory; this requires that there be no reachable references (pointers) to the discarded log entries.


// rf.log = rf.log[index-rf.lastIncludedIndex:]
rf.log = append([]logEntry{}, rf.log[index-rf.lastIncludedIndex:]...)



If, when the server comes back up, it reads the updated snapshot, but the outdated log, it may end up applying some log entries that are already contained within the snapshot. This happens since the commitIndex and lastApplied are not persisted, and so Raft doesn’t know that those log entries have already been applied. The fix for this is to introduce a piece of persistent state to Raft that records what “real” index the first entry in Raft’s persisted log corresponds to. This can then be compared to the loaded snapshot’s lastIncludedIndex to determine what elements at the head of the log to discard.

Students’ Guide to Raft

func (rf *Raft) compareStateAndSnapshot() {
	if rf.snapshot == nil || len(rf.snapshot) < 1 {
	r := bytes.NewBuffer(rf.snapshot)
	d := labgob.NewDecoder(r)
	var lastIncludedIndex int
	if d.Decode(&lastIncludedIndex) != nil {
		log.Panicf("compareStateAndSnapshot: snapshot decode error")
	if rf.lastIncludedIndex != lastIncludedIndex {
		WPrintf("snapshot lastIncludedIndex is different. snapshot:%v  state:%v\n", lastIncludedIndex, rf.lastIncludedIndex)
		rf.log = append([]logEntry{}, rf.log[lastIncludedIndex-rf.lastIncludedIndex:]...)
		rf.lastIncludedIndex = lastIncludedIndex
		rf.lastIncludedTerm = rf.log[0].Term // 日志0位的term
		rf.commitIndex = rf.lastIncludedIndex


在2D测试跑通后跑全部的测试,其中几次在2D的各个测试出现了panic: test timed out after 10m0s问题,看了看打印的goroutine堆栈,发现许多goroutine都在获取锁,并且apply协程阻塞在通道发送那一步(在当时实现中此时该线程持有锁),觉得有可能是service没有接收ApplyMsg,导致server一直无法推进,故修改提交协程实现,在发送消息时不持有锁。

// 向service发送命令
func (rf *Raft) applyTask() {
	for !rf.killed() {
		var msg ApplyMsg
		sendMsg := false
		if rf.lastApplied < rf.commitIndex {
			sendMsg = true
			if rf.lastApplied < rf.lastIncludedIndex {
				msg = ApplyMsg{CommandValid: false, SnapshotValid: true, Snapshot: rf.snapshot, SnapshotIndex: rf.lastIncludedIndex, SnapshotTerm: rf.lastIncludedTerm}
				DPrintf("applySnapshot. server id:%v commitIndex:%v  lastApplied:%v  lastIncludedIndex:%v\n",, rf.commitIndex, rf.lastApplied, rf.lastIncludedIndex)
				rf.lastApplied = rf.lastIncludedIndex
			} else {
				msg = ApplyMsg{CommandValid: true, Command: rf.log[rf.lastApplied-rf.lastIncludedIndex].Command, CommandIndex: rf.lastApplied}
		if sendMsg {
			rf.applyCh <- msg
		time.Sleep(10 * time.Millisecond)

这个实验最麻烦的一点就在于永远不知道是否真正解决了某个问题,在2B阶段有时候测试到一千多次才出现错误,都有点无从调试,只能看看raft论文,Students’ Guide,自我完善一下代码,并时时检查一些关键变量,打印一些异常情况。


6.824 Lab 2: Raft
Students’ Guide to Raft
Raft Q&A
Raft Locking Advice
Raft Structure Advice
Debugging by Pretty Printing
Lab guidance


Rule 1: Whenever you have data that more than one goroutine uses, and at least one goroutine might modify the data, the goroutines should use locks to prevent simultaneous use of the data.

Rule 2: Whenever code makes a sequence of modifications to shared data, and other goroutines might malfunction if they looked at the data midway through the sequence, you should use a lock around the whole sequence.
Rule 3: Whenever code does a sequence of reads of shared data (orreads and writes), and would malfunction if another goroutine modified the data midway through the sequence, you should use a lock around the whole sequence.
Rule 4: It’s usually a bad idea to hold a lock while doing anything that might wait: reading a Go channel, sending on a channel, waiting for a timer, calling time.Sleep(), or sending an RPC (and waiting for the reply).
Rule 5: Be careful about assumptions across a drop and re-acquire of a lock. One place this can arise is when avoiding waiting with locks held.

commitIndex is volatile because Raft can figure out a correct value for it after a reboot using just the persistent state. Once a leader successfully gets a new log entry committed, it knows everything before that point is also committed. A follower that crashes and comes back up will be told about the right commitIndex whenever the current leader sends it an AppendEntries RPC.

lastApplied starts at zero after a reboot because the Figure 2 design assumes the service (e.g., a key/value database) doesn’t keep any persistent state. Thus its state needs to be completely recreated by replaying all log entries. If the service does keep persistent state, it is expected to persistently remember how far in the log it has executed, and to ignore entries before that point. Either way it’s safe to start with lastApplied = 0 after a reboot.



Instead, the best approach is usually to work backwards and narrow down the size of phase 2 until it is as small as possible, so that the location of the fault is readily apparent. This is done by expanding the instrumentation of your code to surface errors sooner, and thereby spend less time in phase 2. This generally involves adding additional debugging statements and/or assertions to your code.

When possible, consider writing your code to “fail loudly”. Instead of trying to tolerate unexpected states, try to explicitly detect states that should never be allowed to happen, and immediately report these errors. Consider even immediately calling the Go ‘panic’ function in these cases to fail especially loudly. See also the Wikipedia page on Offensive programming techniques. Remember that the longer you allow errors to remain latent, the longer it will take to narrow down the true underlying fault.

When you’re failing a test, and it’s not obvious why, it’s usually worth taking the time to understand what the test is actually doing, and which part of the test is observing the problem. It can be helpful to add print statements to the test code so that you know when events are happening.

每个server commit index之前的日志都一模一样且无法更改

2D时为解决panic: test timed out after 10m0s问题,向service发送消息时不再持有锁

// 向service发送命令
func (rf *Raft) applyTask() {
	for !rf.killed() {
		var msg ApplyMsg
		sendMsg := false
		if rf.lastApplied < rf.commitIndex {
			sendMsg = true
			if rf.lastApplied < rf.lastIncludedIndex {
				msg = ApplyMsg{CommandValid: false, SnapshotValid: true, Snapshot: rf.snapshot, SnapshotIndex: rf.lastIncludedIndex, SnapshotTerm: rf.lastIncludedTerm}
				DPrintf("applySnapshot. server id:%v commitIndex:%v  lastApplied:%v  lastIncludedIndex:%v\n",, rf.commitIndex, rf.lastApplied, rf.lastIncludedIndex)
				rf.lastApplied = rf.lastIncludedIndex
			} else {
				msg = ApplyMsg{CommandValid: true, Command: rf.log[rf.lastApplied-rf.lastIncludedIndex].Command, CommandIndex: rf.lastApplied}
		if sendMsg {
			rf.applyCh <- msg
		time.Sleep(10 * time.Millisecond)


config.go:628: one(6739062427422661052) failed to reach agreement


// do a complete agreement.
// it might choose the wrong leader initially,
// and have to re-submit after giving up.
// entirely gives up after about 10 seconds.
// indirectly checks that the servers agree on the
// same value, since nCommitted() checks this,
// as do the threads that read from applyCh.
// returns index.
// if retry==true, may submit the command multiple
// times, in case a leader fails just after Start().
// if retry==false, calls Start() only once, in order
// to simplify the early Lab 2B tests.
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
	t0 := time.Now()
	starts := 0
	for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false {
		// try all the servers, maybe one is the leader.
		index := -1
		for si := 0; si < cfg.n; si++ {
			starts = (starts + 1) % cfg.n
			var rf *Raft
			if cfg.connected[starts] {
				rf = cfg.rafts[starts]
			if rf != nil {
				index1, _, ok := rf.Start(cmd)
				if ok {
					index = index1
		if index != -1 {
			// somebody claimed to be the leader and to have
			// submitted our command; wait a while for agreement.
			t1 := time.Now()
			for time.Since(t1).Seconds() < 2 {
				nd, cmd1 := cfg.nCommitted(index)
				if nd > 0 && nd >= expectedServers {
					// committed
					if cmd1 == cmd {
						// and it was the command we submitted.
						return index
				time.Sleep(20 * time.Millisecond)
			if retry == false {
				cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
		} else {
			time.Sleep(50 * time.Millisecond)

	if cfg.checkFinished() == false {
		for si := 0; si < cfg.n; si++ {
			starts = (starts + 1) % cfg.n
			var rf *Raft
			if cfg.connected[starts] {
				rf = cfg.rafts[starts]
			if rf != nil {
				DPrintf("%+v", rf)
		log.Panicf("one(%v) failed to reach agreement", cmd)
		cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
	return -1
func internalChurn(t *testing.T, unreliable bool) {

	servers := 5
	cfg := make_config(t, servers, unreliable, false)
	defer cfg.cleanup()

	if unreliable {
		cfg.begin("Test (2C): unreliable churn")
	} else {
		cfg.begin("Test (2C): churn")

	stop := int32(0)

	// create concurrent clients
	cfn := func(me int, ch chan []int) {
		var ret []int
		ret = nil
		defer func() { ch <- ret }()
		values := []int{}
		for atomic.LoadInt32(&stop) == 0 {
			x := rand.Int()
			index := -1
			ok := false
			for i := 0; i < servers; i++ {
				// try them all, maybe one of them is a leader
				rf := cfg.rafts[i]
				if rf != nil {
					index1, _, ok1 := rf.Start(x)
					if ok1 {
						ok = ok1
						index = index1
			if ok {
				// maybe leader will commit our value, maybe not.
				// but don't wait forever.
				for _, to := range []int{10, 20, 50, 100, 200} {
					nd, cmd := cfg.nCommitted(index)
					if nd > 0 {
						if xx, ok := cmd.(int); ok {
							if xx == x {
								values = append(values, x)
						} else {
							cfg.t.Fatalf("wrong command type")
					time.Sleep(time.Duration(to) * time.Millisecond)
			} else {
				time.Sleep(time.Duration(79+me*17) * time.Millisecond)
		ret = values

	ncli := 3
	cha := []chan []int{}
	for i := 0; i < ncli; i++ {
		cha = append(cha, make(chan []int))
		go cfn(i, cha[i])

	for iters := 0; iters < 20; iters++ {
		if (rand.Int() % 1000) < 200 {
			i := rand.Int() % servers

		if (rand.Int() % 1000) < 500 {
			i := rand.Int() % servers
			if cfg.rafts[i] == nil {
				cfg.start1(i, cfg.applier)

		if (rand.Int() % 1000) < 200 {
			i := rand.Int() % servers
			if cfg.rafts[i] != nil {

		// Make crash/restart infrequent enough that the peers can often
		// keep up, but not so infrequent that everything has settled
		// down from one change to the next. Pick a value smaller than
		// the election timeout, but not hugely smaller.
		time.Sleep((RaftElectionTimeout * 7) / 10)

	for i := 0; i < servers; i++ {
		if cfg.rafts[i] == nil {
			cfg.start1(i, cfg.applier)

	atomic.StoreInt32(&stop, 1)

	values := []int{}
	for i := 0; i < ncli; i++ {
		vv := <-cha[i]
		if vv == nil {
			t.Fatal("client failed")
		values = append(values, vv...)


	lastIndex :=, servers, true)

在出错前打印的DPrintf(“%+v”, rf)语句显示信息中可以看到

commitIndex:1153 lastApplied:996 nextIndex:[] matchIndex:[] state:0 leaderId:2
commitIndex:1153 lastApplied:1153 nextIndex:[] matchIndex:[] state:0 leaderId:2
commitIndex:1153 lastApplied:1153 nextIndex:[] matchIndex:[] state:0 leaderId:2
commitIndex:1153 lastApplied:1153 nextIndex:[] matchIndex:[] state:0 leaderId:2
commitIndex:1153 lastApplied:1153 nextIndex:[1154 1154 1154 1154 1154] matchIndex:[1153 1153 1153 1153 1153] state:2 leaderId:2

实际上有一个server commitIndex已经变成了1153,只不过还没来得及发送至service,故简单修改一下applyTask,不再是发送一次就休眠,变成本次成功发送则再进行一次尝试。

// 向service发送命令
func (rf *Raft) applyTask() {
	for !rf.killed() {
		var msg ApplyMsg
		sendMsg := false
		needNextApply := false
		if rf.lastApplied < rf.commitIndex {
			sendMsg = true
			needNextApply = true
			if rf.lastApplied < rf.lastIncludedIndex {
				rf.lastApplied = rf.lastIncludedIndex
				msg = ApplyMsg{CommandValid: false, SnapshotValid: true, Snapshot: rf.snapshot, SnapshotIndex: rf.lastIncludedIndex, SnapshotTerm: rf.lastIncludedTerm}
				DPrintf("applySnapshot. server id:%v commitIndex:%v  lastApplied:%v  lastIncludedIndex:%v\n",, rf.commitIndex, rf.lastApplied, rf.lastIncludedIndex)
			} else {
				msg = ApplyMsg{CommandValid: true, Command: rf.log[rf.lastApplied-rf.lastIncludedIndex].Command, CommandIndex: rf.lastApplied}
		if sendMsg {
			rf.applyCh <- msg

		if !needNextApply {
			time.Sleep(10 * time.Millisecond)





