「实验记录」MIT 6.824 Raft Lab2B Log Replication

news2024/11/17 15:56:43

#Lab2B - Log Replication

  • I. Source
  • II. My Code
  • III. Motivation
  • IV. Solution
    • S1 - leader上任即初始化
    • S2 - leader发送AppendEntries
    • S3 - follower接收AppendEntries
    • S4 - leader收到AppendEntries 回信
    • S5 - candidate选举限制
    • S6 - defs.go约定俗成和实现Start()
  • V. Result

I. Source

  1. MIT-6.824 2020 课程官网
  2. Lab2: Raft 实验主页
  3. simviso 精品付费翻译 MIT 6.824 课程
  4. Paper - Raft extended version

II. My Code

  1. source code 的 Gitee 地址
  2. Lab2B: Log Replication 的 Gitee 地址

课程官网提供的 Lab 代码下载地址,我没有访问成功,于是我从 Github 其他用户那里 clone 到干净的源码,有需要可以访问我的 Gitee 获取

III. Motivation

提出 Raft 的主要目的,是为了解决容错问题,即使集群中有一些机器发生了故障,也不影响整体的运作(对外提供的服务)

我用一个 demo 来说明,假设我们的需求一直都是自己的 PC 能够顺利访问云端的资源(HTTP 或数据库)服务器。在服务器稳定在线的情况下,我们去访问它,一点问题都没有

但是,如果那唯一的一台服务器掉线了,那么我们将无法再访问,即对外的服务到此停止。这是我们无法忍受的,我们希望提供服务的一方能够保持稳定,时时刻刻为我提供访问服务。这就是我们的需求

好,现在问题摆在眼前,提供服务的一方怎样保证稳定性?让唯一的那台服务器永远维持稳定的状态,不允许宕机?这非常地不现实,就好比让一个人练成金刚不坏之身

所以,我们只能琢磨是否可以通过添加服务器的数量来确保对外服务的稳定。更近一步,即是现在服务器不再只有一台,扩充到 3 台,这 3 台中有一台是 primary 服务器,也主要由它对外提供服务;其他 2 台是 secondary 服务器(后备力量),拥有和 primary 服务器相同的数据内容

在 primary 服务器出现故障的时候,secondary 服务器顶上去,替代它的位置。这样就可以保持稳定的对外服务了

这就是我们应对资源服务器崩溃的最常用最有效的法子,但是想实现这个想法,首先要解决数据同步的问题,即如何确保 secondary 服务器拥有和 primary 服务器同样的内容?

这个同步问题,在学术上被称为共识算法,最经典的共识算法是 Paxos,但是它太难理解了。于是,斯坦福那帮人想出了更为简便的共识算法,即 Raft

通过 Raft 算法就可以同步集群中服务器的内容。要实现该算法,分三步走,5 - The Raft consensus algorithm 章节中的 Leader Election、Log Replication 和 Safety

本文主要针对第二步,Lab2B: Log Replication 展开讲解,如有 Lab2A: Leader Election 的需要,请移步

IV. Solution

在选举出 leader 之后,集群就可以正式开始对外提供服务了。不论什么类型的服务(数据库的 CRUD…),归根结底都可以变成读写操作 Write/Read。在系统软件级别都是用 log 来记录操作,将服务分解成一条条指令,然后交给自己的状态机执行。状态机之间只要确保 log 已相同顺序写入,那么就可以确保彼此的状态是相同的

log 的作用我可能一言两语概括不完,好多东西都需要自己悟的,可以多去看看 2 - Replicated state machines 中关于状态机和 log 的描述

好,回归正题。Lab2B: Log Replication 就是解决日志同步的问题,即如何确保集群中服务器拥有同样的数据

首先,让我们看一下 Raft 节点的工作流程

S1 - leader上任即初始化

在 Lab2A: Leader Election 中我们有详细讲解到 Raft 节点的 S1 - 角色转换 的规则,在本文中将不再赘述。当 candidate 获得过半选票后成为 leader,它第一件事就是要向集群中广播 AppendEntries 心跳包,即告诉所有人,它是 leader

在此之前还需要进行一些初始化的工作,尤其是 nextIdxs[]matchIdxs[] ,前者记录了下一次应该从日志的何处开始向第 i i i 位 follower 发送 AppendEntries ;而后者是 leader 自己的私密小抄本,用来记录第 i i i 位 follower 已经复制日志到何处了。且看一下 raft.go:run() 中新加的部分,

func (rf *Raft) run() {
	for !rf.killed() {
		switch rf.role {
		case Follower:
			select {
			case <-rf.grantVoteCh:
			case <-rf.heartBeatCh:
			case <-time.After(randElectionTimeOut()):
				rf.role = Candidate
			}
			break
		case Candidate:
			rf.mu.Lock()
			rf.curTerm++
			rf.votedFor = rf.me
			rf.voteCount = 1
			rf.mu.Unlock()

			go rf.boatcastRV()
			select {
			case <-time.After(randElectionTimeOut()):
			case id := <-rf.heartBeatCh:
				/* 一定是收到了来自集群中同期 OR 任期比它大的 leader 的心跳包 */
				rf.mu.Lock()
				rf.role = Follower
				rf.votedFor = id.int /* 被动回滚至 follower */
				rf.mu.Unlock()
			case <-rf.leaderCh:
				/* 赢得了选举 */
				rf.mu.Lock()
				rf.role = Leader
				/*------------Lab2B Log Replication----------------*/
				rf.nextIdxs = make([]int, len(rf.peers))
				rf.matchIdxs = make([]int, len(rf.peers))
				for i := range rf.peers {
					rf.nextIdxs[i] = rf.lastLogIdx() + 1
					rf.matchIdxs[i] = 0
				}
				rf.mu.Unlock()
			}
			break
		case Leader:
			rf.boatcastAE()
			time.Sleep(fixedHeartBeatTimeOut())
			break
		}
		time.Sleep(10 * time.Millisecond)
	}
}

第 32 行之后就是初始化的相关工作,nextIdxs[] 赋值为最后一条目的下一个位置,为什么是下一个位置呢?因为待会 client 会调用 Start() 向日志中追加一新条目;matchIdxs[] 就默认为 0 即可,因为之后的每次 AppendEntries 都会一并更新 nextIdxs[]matchIdxs[] ,而且 matchIdxs[] 更多涉及的是 commit 日志相关的流程,与发送 AppendEntries 关系不大

另外,第 47 行通过休眠 10 ms 来暂缓了 Raft 节点的事件循环,我一开始并没有意识到,后来看到了 Lab2: Raft 实验主页 的提示才添加了这句话,原话是这样的,

Your code may have loops that repeatedly check for certain events. Don’t have these loops execute continuously without pausing, since that will slow your implementation enough that it fails tests. Use Go’s condition variables, or insert a time.Sleep(10 * time.Millisecond) in each loop iteration.

我翻译一下,即是不要让事件循环一直连续执行,因为这会减慢实现速度,导致测试失败。使用 Go 的条件变量 OR 选择在每个循环迭代中休眠 10 ms

这里要吐槽一下,虽然 golang 的并发模型很好用,但是还是没有达到特别精准的程度,其中还是有很多坑的

Raft 节点也添加了一些字段,其中的 commitCh 用来感知自己是否应该提交日志条目了,appliedIdxcommitIdx 是一对,可以理解为日志的固定窗口,

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	/*------------Lab2A Leader Election----------------*/
	role      Role /* 三个角色 */
	voteCount int  /* 选票数 */
	curTerm   int  /* 任期号 */
	votedFor  int  /* 投给谁了 */

	grantVoteCh chan struct{}      /* 是否收到了拉票请求 */
	leaderCh    chan struct{}      /* 是否赢得了选举 */
	heartBeatCh chan struct{ int } /* 感知 leader 发来的心跳包 */

	/*------------Lab2B Log Replication----------------*/
	log        []LogEntry    /* 日志序列 */
	commitIdx  int           /* 最新一次提交的日志条目编号 */
	appliedIdx int           /* 最新一次回应 client 的日志条目编号 */
	nextIdxs   []int         /* 明确下一次应该发送给 followers 哪条日志条目 */
	matchIdxs  []int         /* 统计已提交该条目的 followers 个数 */
	commitCh   chan struct{} /* 感知是否应该提交日志条目 */
	applyCh    chan ApplyMsg /* 应用于状态机后需要告知 clients */
}

前者的下一个位置是窗口的起始位置,后者是窗口的末尾;applyCh 是对外的接口,Raft 节点将已经应用于状态机的条目返回给 client。需要注意,applyCh 是同步 channel,MIT 6.824 Lab2 QA 中提醒不可以在对其写入的过程中上锁,这样有可能导致死锁。讲到这里,就可以扯出 commit 的相关操作了,且看代码,

func (rf *Raft) commit() {
	for !rf.killed() {
		select {
		case <-rf.commitCh:
			rf.mu.Lock()
			for i := rf.appliedIdx + 1; i <= rf.commitIdx; i++ {
				msg := ApplyMsg{CommandIndex: i, CommandValid: true, Command: rf.log[i].Cmd}
				rf.mu.Unlock()
				rf.applyCh <- msg
				rf.mu.Lock()
				rf.appliedIdx = i
			}
			rf.mu.Unlock()
		}
	}
}

raft.go:Make() 中通过 go rf.commit() 开启提交协程,该协程一直在线,一直盯着 commitCh ,感知自己是否有新的日志条目可以提交。第 6 行即是固定窗口的手法,在第 8 行放锁确保写入 applyCh 不会阻塞

S2 - leader发送AppendEntries

leader 在 raft.go:run() 中会调用 raft.go:boatcastAE() 向集群广播 AppendEntries 心跳包,且看具体实现,

func (rf *Raft) boatcastAE() {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	/* 所有 peers 应该收到相同的 AE 包 */
	for i, _ := range rf.peers {
		if i != rf.me && rf.role == Leader {
			go func(id int) {
				args := AppendEntriesArgs{
					Term:     rf.curTerm,
					LeaderId: rf.me,
					/*------------Lab2B Log Replication----------------*/
					PrevLogIdx:   rf.nextIdxs[id] - 1,
					LeaderCommit: rf.commitIdx,
				}
				args.PrevLogTerm = rf.log[args.PrevLogIdx].Term
				if rf.nextIdxs[id] <= rf.lastLogIdx() { /* rejoin test 不加以限制可能会越界 */
					args.Entries = make([]LogEntry, len(rf.log[rf.nextIdxs[id]:]))
					copy(args.Entries, rf.log[rf.nextIdxs[id]:])
				}

				reply := AppendEntriesReply{}
				rf.sendAppendEntries(id, &args, &reply)
			}(i)
		}
	}
}

要确保发送给每位 follower 的心跳包其背后依据的日志是一样,只有日志相同,才能保证数据的一致性,即心跳包的内容是同等程度新的。这就需要通过上锁来保证

PrevLogIdx 设置为下一次发送的条目的前一个位置,即 rf.nextIdxs[id]-1PrevLogTerm 是对应位置上的任期;特别需要注意,不能直接将 rf.log[rf.nextIdxs[id]:] 赋值给 args.Entries ,这样做是浅拷贝,在并发的环境下会带来致命的错误。这里需要重新创建副本,调用 make()copy() 进行日志条目的深拷贝

S3 - follower接收AppendEntries

当收到 AppendEntries 之后,会做一些常规的判断,例如发送方的任期是否较新,对应论文图 2 中 AppendEntries RPC 的 Receiver 的第一条,

没我新,则直接拒绝。且看一下 appendEntries.go:AppendEntries() 代码,有点长,

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Success = false
	reply.Term = rf.curTerm
	/*----backup 未优化的手段----*/
	//reply.NextIdx = rf.lastLogIdx() + 1

	if args.Term < rf.curTerm {
		return
	}

	/* 心跳包只对 follower 和 candidate 管用,leader 是不会响应它的 */
	rf.heartBeatCh <- struct{ int }{args.LeaderId}
	/* 主要为了让旧 leader 收到了新 leader 的心跳包后而被迫退位 */
	if args.Term > rf.curTerm {
		rf.curTerm = args.Term
		rf.role = Follower
		rf.votedFor = NoBody
	}

	/*------------Lab2B Log Replication----------------*/
	if rf.lastLogIdx() < args.PrevLogIdx { /* 违法下标,越界了 */
		reply.XTerm = -1
		reply.XLen = len(rf.log)
		return
	}

	/* 相同 index 但 term 不同 */
	if rf.log[args.PrevLogIdx].Term != args.PrevLogTerm {
		/*----backup 未优化的手段----*/
		/* 删掉冲突点及其之后的所有条目 */
		//rf.log = rf.log[:args.PrevLogIdx]
		//reply.NextIdx = rf.lastLogIdx() + 1

		reply.XTerm = rf.log[args.PrevLogIdx].Term
		reply.XIdx = args.PrevLogIdx
		for i := args.PrevLogIdx; i >= 0; i-- { /* 从当前位置开始向前扫 */
			/* 定位到 XTerm 的第一个日志条目 */
			if rf.log[i].Term != reply.XTerm {
				reply.XIdx = i + 1
				break
			}
		}
		return
	}

	rf.votedFor = args.LeaderId /* 臣服于 leader */
	reply.Success = true

	for i, entry := range args.Entries { /* 寻找重叠部分的冲突点 */
		index := args.PrevLogIdx + i + 1
		if index > rf.lastLogIdx() {
			rf.log = append(rf.log, entry)
		} else {
			/* 冲突点,相同 index 不同 term */
			if rf.log[index].Term != entry.Term {
				rf.log = rf.log[:index]
				rf.log = append(rf.log, entry)
			}
			/* 相同 index 处的日志条目的 term 相同,说明存储的命令是相同的 AND 之前的所有日志也是相同的 */
		}
	}

	/* follower 通过 LeaderCommit 来感知日志条目已提交到何处 AND 始终应该比 leader 慢一点 */
	if args.LeaderCommit > rf.commitIdx { /* 对应论文 Figure 2 中的 AppendEntries Receiver 的第 5 条 */
		rf.commitIdx = args.LeaderCommit
		if args.LeaderCommit > rf.lastLogIdx() {
			rf.commitIdx = rf.lastLogIdx()
		}
		rf.commitCh <- struct{}{} /* 告知主线程,现在可以提交了 因为 leader 都提交了,follower 跟着交就行了 */
	}
}

第 16 行和 Lab2A: Leader Election 一样,主要为了让旧 leader 收到了新 leader 的心跳包后而被迫退位,这个部分并没有做修整;接着看第 24 行之后的新增内容,这部分主要就是首先寻找冲突点,然后,删除其后的条目,接着再强制复制来自 leader 的条目,对应论文图 2 中 AppendEntries RPC 的 Receiver 的第三条和第四条

在代码中对应第 52~64 行,通过遍历 args.Entries 与 Raft 节点的日志进行比对,如果相同位置处的条目任期不同,则认为该点为冲突点;解决冲突点问题,即是将冲突点之后的条目通通删除,然后再将 args.Entries 中冲突点之后的条目复制到 Raft 节点日志中

其中,需要记住两条原则性的常识,

  1. 如果两条日志的相同索引位置存储的条目的任期是相同的,那么可以认定它们存储的命令也是相同的;
  2. 如果两条日志的相同索引位置存储的条目的任期是相同的,那么在此之前的所有条目也都是相同的

论文中说是这么说的,但是它说得很隐晦,很绕,并没有教我们怎么做。我这样写,也是参考大多数人的做法。但是,我自己的想法是与其一个一个的比较,不如不找冲突点了,在 PrevLogTermrf.log[PrevLogIdx].Term 对上的情况下,直接将所有 args.Entries 复制到日志中,

/* 找到了结合点 BUT 只能在其后 Append-Only 日志条目 */
rf.log = rf.log[:args.PrevLogIdx+1]
rf.log = append(rf.log, args.Entries...)

替换第 52~64 行,我认为是可行的,而且测试结果也是相同的

最后,进行 commit,对应论文图 2 中 AppendEntries RPC 的 Receiver 的第五条,选择较小的值更新自己的 commitIdx 。在这套流程中,follower 需要及时提交已复制的条目,具体是通过 leader 发来的下一次的心跳包来判断自己是否应该提交上一次复制的条目。对应在代码中,

if args.LeaderCommit > rf.commitIdx { /* 对应论文 Figure 2 中的 AppendEntries Receiver 的第 5 条 */
	rf.commitIdx = args.LeaderCommit
	if args.LeaderCommit > rf.lastLogIdx() {
		rf.commitIdx = rf.lastLogIdx()
	}
	rf.commitCh <- struct{}{} /* 告知主线程,现在可以提交了 因为 leader 都提交了,follower 跟着交就行了 */
}

查看 leader 的提交进度,如果 leader 已经回应了 client(提交条目并应用到状态机),那么 follower 要赶紧提交上一次复制的条目。总结下来,follower 的提交进度应该比 leader 慢一拍

第 24 行用来判断 leader 发来的 PrevLogIdx 是否越界了,如果超出了日志的范围,那么就直接返回同步失败的讯息,

if rf.lastLogIdx() < args.PrevLogIdx {
	return
}

告诉 leader,将下次再发送前一条日志试一试,看看是否能匹配上,对应在 appendEntries.go:sendAppendEntries()rf.nextIdxs[i]--

这里我采用了 backup 优化策略,稍后再提及。目前,我们先按照论文中所介绍的方法,逐一递减 nextIdx

第 31 行遇到相同位置 BUT 任期不同的情况,可将该点视为冲突点,采用对待冲突的方法,即是将冲突点之后的条目通通删掉,然后再复制上传过来的条目,

if rf.log[args.PrevLogIdx].Term != args.PrevLogTerm {
		/*----backup 未优化的手段----*/
		/* 删掉冲突点及其之后的所有条目 */
		rf.log = rf.log[:args.PrevLogIdx]
		reply.NextIdx = rf.lastLogIdx() + 1
		return
	}

这样,一整套将 follower 接收到 AppendEntries 的流程跑了下来。从理论上是对的,但是从实践的角度出发,达成共识(日志复制)的速度还是不够快,尤其是在 backup 测试中

于是,我们想办法是否能将冲突点 nextIdx 递减的速度加快,让 leader 选择下一次发送时更为准确的条目,而不是通过递减的手段进行一一试错来判断该条目是否合适

课程中也提到了优化的手法,先看一下 AppendEntries RPC 的结构体定义,

type AppendEntriesArgs struct {
	Term         int
	LeaderId     int
	PrevLogTerm  int
	PrevLogIdx   int
	Entries      []LogEntry
	LeaderCommit int
}

type AppendEntriesReply struct {
	Term    int
	Success bool
	//NextIdx int
	XTerm int /* 冲突点条目的任期 */
	XIdx  int /* XTerm 期间的第一条目位置 */
	XLen  int /* follower 所拥有的日志的长度 */
}

其中的 Args 是致敬原文图 2 中的结构,并未做删改添加;之后的 Reply 添加了 XTermXIdxXLen 三个字段,分别记录 follower 日志中冲突点的任期、该任期内的第一条目的位置以及 follower 此时的日志长度。三者的作用且许我慢慢道来

原先如果 PrevLogIdx 越界了,我们会告诉 leader:下一次递减一位再发来;现在优化过后,

if rf.lastLogIdx() < args.PrevLogIdx { /* 违法下标,越界了 */
  reply.XTerm = -1
  reply.XLen = len(rf.log)
  return
}

告诉 leader:我当前日志的长度,即可;原先如果 PrevLogIdx 处有冲突,则采取直接删除的策略,现在优化成,

/* 相同 index 但 term 不同 */
if rf.log[args.PrevLogIdx].Term != args.PrevLogTerm {
  /*----backup 未优化的手段----*/
  /* 删掉冲突点及其之后的所有条目 */
  //rf.log = rf.log[:args.PrevLogIdx]
  //reply.NextIdx = rf.lastLogIdx() + 1

  reply.XTerm = rf.log[args.PrevLogIdx].Term
  reply.XIdx = args.PrevLogIdx
  for i := args.PrevLogIdx; i >= 0; i-- { /* 从当前位置开始向前扫 */
    /* 定位到 XTerm 的第一个日志条目 */
    if rf.log[i].Term != reply.XTerm {
      reply.XIdx = i + 1
      break
    }
  }
  return
}

不采用一股脑的删除策略了,而是去 follower 日志中寻找冲突点任期内的第一条目并记下其下标。我采用较为暴力的法子,即是从当前位置开始向前扫,直到找到一条不为当前任期的条目。这里,也可以采用二分查找法,我懒得去实现了,读者可以试一下。为什么可以采用二分查找法?因为日志是有顺序的,任期从小到大,符合二分查找的条件

以上就是 follower 在接收到 AppendEntries RPC 后该有的反应

S4 - leader收到AppendEntries 回信

在代码中对应的是 appendEntries.go:sendAppendEntries() ,我们先看一下代码吧,

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)

	rf.mu.Lock()
	defer rf.mu.Unlock()

	if !ok {
		return ok
	}

	term := rf.curTerm
	/* 自身过期的情况下,不需要在维护 nextIdx 了 */
	if rf.role != Leader || args.Term != term {
		return ok
	}

	/* 仅仅是被动退位,不涉及到需要投票给谁 */
	if reply.Term > term {
		rf.curTerm = reply.Term
		rf.role = Follower /* 主动回滚至 follower */
		rf.votedFor = NoBody
		return ok
	}

	/*------------Lab2B Log Replication----------------*/
	if reply.Success {
		//rf.nextIdxs[server] += len(args.Entries)
		rf.nextIdxs[server] = args.PrevLogIdx + len(args.Entries) + 1
		/* 目前第 i 号 follower 的日志已复制到 nextIdx-1 处  */
		rf.matchIdxs[server] = rf.nextIdxs[server] - 1

		/* 对应论文 Figure 2 的 Rules for Servers 的 Leaders 最后一条 */
		N := rf.commitIdx
		for i := N + 1; i <= rf.lastLogIdx(); i++ {
			count := 1
			for id := range rf.peers {
				/* leader 只会统计自己任期内所追加的日志条目,这样设计划分了新旧 leader 之间的职责,即每一任的 leader 顾好自己任期内的事 */
				if id != rf.me && rf.matchIdxs[id] >= i && rf.log[i].Term == rf.curTerm {
					count++
				}
			}

			if count > len(rf.peers)/2 { /* 大多数 followers 已经复制了第 i 号之前(包括)的日志条目 */
				N = i
				break
			}
		}

		if N > rf.commitIdx {
			rf.commitIdx = N
			/* 在发送 AE 包之前,要将上次那些已经被大多数 followers 复制的条目提交上去 */
			rf.commitCh <- struct{}{}
		}
	} else {
		/*----backup 未优化的手段----*/
		//rf.nextIdxs[server] = reply.NextIdx

		if reply.XTerm == -1 {
			rf.nextIdxs[server] = reply.XLen
		} else {
			termNotExist := true

			for i := rf.nextIdxs[server] - 1; i >= 1; i-- { /* 从后往前扫,寻找 XTerm 的最后一个条目 */
				if rf.log[i].Term == reply.XTerm {
					termNotExist = false
					rf.nextIdxs[server] = i
					break
				}

				if rf.log[i].Term < reply.XTerm {
					break
				}
			}

			if termNotExist {
				rf.nextIdxs[server] = reply.XIdx
			}
		}
	}

	return ok
}

从第 25 行开始才是 Lab2B: Log Replication 新增的内容,如果 follower 成功复制了发过去的日志条目,则更新 nextIdxmatchIdx ,S1 - leader 上任初始化 中已经讲述了两者的作用

之后就是 commit 环节,要严格遵照论文图 2 中 Rules for Servers 的 Leaders 最后一条,其大意是统计一下有多少 followers 已成功复制了当前任期内发送的日志条目

如果过半 followers 已成功复制,leader 则顺理成章 commit 条目,应用状态机以及回应 client,这一套一气呵成

另外,在论文 5.4.2 - Committing entries from previous terms 提到 leader 应该只统计自己任期内所追加的日志条目。这样设计划分了新旧 leader 之间的职责,即每一任的 leader 顾好自己任期内的事。当前任期内的日志条目成功 commit,从而间接保证了之前任期的条目也顺利 commit 了

这个道理需要慢慢想,我也是琢磨了很久,现在也不是很透彻。但是它就是这么规定的,我们在学习中会经常遇到一些自己当时不太能理解的问题,其实与其去刨根问底 OR 纠缠不清,不如记住这个道理,使用即可,不必知道为什么。待自己境界到了,自然也就可以悟出其中的道理了

有个非常重要的规则,即是已 commit 的日志条目不可以被覆盖!!!已 commit 意味着 leader 已将该条目应用至状态机,并积极回应了 client。试想,如果被覆盖了,那么 client 可能会在同一个位置看到多份不同的条目,这是彻头彻尾的错误

接下来,要重点讲讲同步失败的情况,S3 - follower 接收 AppendEntries 提到论文中是采用 nextIdx 递减的策略来一一试错,速度很慢

我们采用了 XTerm 的方法记录冲突点的相关讯息,leader 收到回信后,如果发现 follower 的日志较短,远远没到达 PrevLogIdx 所在的位置,那么就会将 nextIdx 置为 follower 日志的长度,即日志的最后一条目的下一个位置。对应代码中的第 58 行

第二种情况,确认回信中提到的冲突点的任期在 leader 日志中是否存在。如果存在,则将 nextIdx 置为该任期内最后一条目的位置;反之,置为回信的 XIdx

这里也可以采用二分查找法,我为了省事选择了暴力法,读者可以进行调整

至此,差不多讲完了 leader 收到 AppendEntries 回信后的反应

S5 - candidate选举限制

在 5.4.1 - Election restriction 中提及到日志何为最新,有两个评判标准,

  1. 如果两条日志的最新条目任期不同,那么任期号大的日志是最新的;
  2. 如果日志以相同的任期结束,那么哪个日志越长,哪个日志就越最新

体现在 requestVote.go:RequestVote() 中即是 up2date 布尔值,

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	/* 默认不投票 */
	reply.VoteGranted = false
	reply.Term = rf.curTerm

	if args.Term < rf.curTerm {
		return
	}

	if args.Term > rf.curTerm { /* 也可能是为了镇压任期较旧的 leader */
		rf.curTerm = args.Term
		rf.role = Follower
		rf.votedFor = NoBody /* 为臣服做准备 */
	}

	idx := rf.lastLogIdx()
	term := rf.lastLogTerm()
	up2date := false

	if args.LastLogTerm > term { /* 日志任期大的新 */
		up2date = true
	}

	if args.LastLogTerm == term && args.LastLogIdx >= idx { /* 任期相同,日志长的新 */
		up2date = true
	}

	/* 对应论文中 5.4.1 - Election restriction */
	if (rf.votedFor == NoBody || rf.votedFor == args.CandidateId) && up2date {
		rf.role = Follower
		rf.votedFor = args.CandidateId /* 臣服于 leader */
		reply.VoteGranted = true
		rf.grantVoteCh <- struct{}{} /* 如果投票给他人,那么就需要重置自己的 ElectionTimeOut */
	}
}

这部分比较简单,需要修改的地方也不多,重点即在第 24~30 行,读者可自行品鉴

S6 - defs.go约定俗成和实现Start()

defs.go 中定义了两个常用的函数 lastLogIdx()lastLogTerm()

/* Raft 节点最后一条日志的编号 */
func (rf *Raft) lastLogIdx() int {
	return rf.log[len(rf.log)-1].Idx
}

/* Raft 节点最后一条日志的任期号 */
func (rf *Raft) lastLogTerm() int {
	return rf.log[len(rf.log)-1].Term
}

通过前者可以获取日志最后一条目的下标;而后者返回最后一条目的任期

raft.go:Start() 是对外的接口,与 client 进行交互。client 向 leader 发送一条命令请求,leader 需要将其追加其日志中,并将其同步至集群。代码如下,

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	// Your code here (2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	index := -1
	term := rf.curTerm
	isLeader := rf.role == Leader

	if isLeader {
		rf.log = append(rf.log, LogEntry{Idx: rf.lastLogIdx() + 1, Term: term, Cmd: command})
		index = rf.lastLogIdx()
	}

	return index, term, isLeader
}

至此,已然讲明白了 Lab2B: Log Replication 整个一套流程

V. Result

golang 比较麻烦,它有 GOPATH 模式,也有 GOMODULE 模式,6.824-golabs-2020 采用的是 GOPATH,所以在运行之前,需要将 golang 默认的 GOMODULE 关掉,

$ export GO111MODULE="off"

随后,就可以进入 src/raft 中开始运行测试程序,

$ go test -run 2B

仅此一次的测试远远不够,可以通过 shell 循环,让测试跑个千把次,

$ for i in {1..1000}; go test -run 2B    

这样,如果还没错误,那应该是真的通过了。分布式的很多 bug 需要通过反复模拟才能复现出来的,它不像单线程程序那样,永远是幂等的情况。也可以用我写的脚本 test_2b.sh,

for i in {1..1000}
do
  echo "+++++++++++++epoch $i+++++++++++++"
  go test -run 2B
done

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/546635.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

yolov7结构改进策略解析

论文链接&#xff1a;https://arxiv.org/abs/2207.02696 代码链接&#xff1a;https://github.com/WongKinYiu/yolov7 具体分割如何训练&#xff0c;请参考我之前的博客论文&#xff1a; https://blog.csdn.net/qq_41920323/article/details/129464115?spm1001.2014.3001.5502…

联用多个插件可以让 GPT-4 的能力更加强大,实现更加复杂的操作

&#x1f680; 联用多个插件可以让 GPT-4 的能力更加强大&#xff0c;实现更加复杂的操作。 联用多个插件可以让 GPT-4 的能力更加强大&#xff0c;实现更加复杂的操作。 不过&#xff0c;使用插件和联网功能也有一些要注意的地方。 首先是安全性问题&#xff0c;特别是像购…

Burpsuite模块—-Intruder模块详解

一、简介 Burp Intruder是一个强大的工具&#xff0c;用于自动对Web应用程序自定义的攻击&#xff0c;Burp Intruder 是高度可配置的&#xff0c;并被用来在广范围内进行自动化攻击。你可以使用 Burp Intruder 方便地执行许多任务&#xff0c;包括枚举标识符&#xff0c;获取有…

Kali-linux密码在线破解

为了使用户能成功登录到目标系统&#xff0c;所以需要获取一个正确的密码。在Kali中&#xff0c;在线破解密码的工具很多&#xff0c;其中最常用的两款分别是Hydra和Medusa。本节将介绍使用Hydra和Medusa工具实现密码在线破解。 8.1.1 Hydra工具 Hydra是一个相当强大的暴力密…

浅谈分布式事物解决方案

目录 背景 1 XA规范分布式事物方案 1.1 俩阶段提交&#xff08;2PC&#xff09; 1.2 三阶段提交&#xff08;3PC&#xff09; 2 补偿事务&#xff08;TCC&#xff09; 3 可靠消息最终一致性方案 4 可靠消息最终一致性方案 5 SAGA事物 6 Seata AT 模式 背景 分布式事务出现…

使用Git-lfs上传超过100m的大文件到GitHub

文章目录 1. 安装 git-lfs2. 在Git中安装git-ifs3. 找到工程中的所有大文件4.执行完这行命令&#xff0c;项目目录下会生成文件 .gitattributes&#xff0c;此时Git push将 .gitattributes 提交到远程仓库。 5. 需要注意的事 1. 安装 git-lfs Git Large File Storage | Git La…

Day44【动态规划】完全背包、518.零钱兑换 II、377.组合总和 Ⅳ

完全背包 文章讲解 视频讲解 有N件物品和一个最多能背重量为W的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品都有无限个&#xff08;也就是可以放入背包多次&#xff09;&#xff0c;求解将物品装入背包里的最大价值 完全背包和01背包问题…

F103ZET6使用FSMC和HAL点亮ILI9341

前言 将标准库下的ILI9341驱动移植到使用CubeMX生成的HAL库环境&#xff0c;并成功运行。 一、STM32CubeMX生成框架 &#xff08;一&#xff09;配置RCC、SYS和时钟树 参见常规配置。 &#xff08;二&#xff09;配置FSMC 1、原理图引脚定义 LCD8080接口使用的引脚主要分…

【数据结构】线性表 ⑤ ( 双循环链表 | 双循环链表特点 | 双循环链表插入操作处理 | 代码示例 - 使用 Java 实现 双循环链表 )

文章目录 一、双循环链表二、双循环链表特点三、双循环链表插入操作处理四、代码示例 - 使用 Java 实现 双循环链表 一、双循环链表 " 双循环链表 " 是 在 单循环链表 的基础上 , 在每个 节点 中 , 新增一个 指针 , 指向 该节点 的 前驱节点 ; 双向循环链表 每个 节…

头歌计算机组成原理实验—运算器设计(8)第8关:乘法流水线设计

第8关&#xff1a;乘法流水线设计 实验目的 学生掌握运算流水线基本概念&#xff0c;理解将复杂运算步骤细分成子过程的思想&#xff0c;能够实现简单的乘法运算流水线。 视频讲解 实验内容 在 Logisim 中打开 alu.circ 文件&#xff0c;在6位补码阵列乘法器中利用5位阵列乘…

React学习笔记三-模块与组件的理解

此文章是本人在学习React的时候&#xff0c;写下的学习笔记&#xff0c;在此纪录和分享。此为第三篇&#xff0c;主要介绍react中的模块与组件。 目录 1.模块与组件 1.1模块 1.2组件 1.3模块化 1.4组件化 2.React面向组件编程 2.1函数式组件 2.2类组件 2.2.1类知识的复…

防火墙(一)

防火墙知识 一、iptables概述二、四表五链四表五链iptables防火墙的使用方法 三、示例操作四、规则的匹配通用匹配&#xff1a;隐含匹配&#xff1a;端口匹配&#xff1a; --sport源端口、--dport目的端口TCP标志位匹配&#xff1a;ICMP类型匹配&#xff1a;显示匹配&#xff1…

Windows 安装MySQL 8.0 超详细教程(mysql 8.0.30)

目录 一、删除以前安装的MySQL服务 1、查找以前是否装有mysql 2、删除mysql &#xff08;1&#xff09;停止mysql服务&#xff1a; &#xff08;2&#xff09;删除mysql服务&#xff1a; 3.检查mysql是否已删除 二、下载mysql二进制包 三、解压二进制包&#xff0c;编辑…

一图看懂 setuptools 模块:一个功能齐全、积极维护且稳定的库,旨在方便打包Python项目,资料整理+笔记(大全)

本文由 大侠(AhcaoZhu)原创&#xff0c;转载请声明。 链接: https://blog.csdn.net/Ahcao2008 一图看懂 setuptools 模块&#xff1a;一个功能齐全、积极维护且稳定的库&#xff0c;旨在方便打包Python项目&#xff0c;资料整理笔记&#xff08;大全&#xff09; &#x1f9ca;…

第一个 Rust 程序

目录 必要知识代码示例 Cargo 教程[Rust 输出到命令行](https://www.runoob.com/rust/rust-println.html)资料 必要知识 Rust 语言代码文件后缀名为 .rs 使用 rustc 命令编译 .rs 文件 rustc runoob.rs # 编译 runoob.rs 文件编译后会生成 可执行文件 例如&#xff1a; …

『python爬虫』26. selenium与超级鹰处理复杂验证码的处理(保姆级图文)

目录 1. 图片选择类验证码2. 滑块验证码3. 滑块出错&#xff0c;不加载总结 欢迎关注 『python爬虫』 专栏&#xff0c;持续更新中 欢迎关注 『python爬虫』 专栏&#xff0c;持续更新中 1. 图片选择类验证码 我们这里查看超级鹰文档 图片验证码返回的是一个 dic 结构为 x1,y1…

【SAM系列】An Alternative to WSSS? An Empirical Study of SAM on WSSS Problems

论文链接&#xff1a;https://arxiv.org/pdf/2305.01586.pdf 论文代码&#xff1a;暂无 目的 WSSS旨在弱标签的情况下&#xff0c;生成高质量的分割伪标签&#xff0c;然后用于全监督的语义分割训练。本文探索用SAM来生成伪标签来替代WSSS方案。 为什么不直接用SAM分割而利用…

《面试1v1》synchronized

源码都背下来了&#xff0c;你给我看这 我是 javapub&#xff0c;一名 Markdown 程序员从&#x1f468;‍&#x1f4bb;&#xff0c;八股文种子选手。 面试官&#xff1a; 你好&#xff0c;我看到你的简历上写着你熟悉 Java 中的 “synchronized” 关键字。你能给我讲讲它的作…

chatgpt赋能Python-python3反转字符串

Python3反转字符串技巧&#xff1a;让你的代码更高效&#xff01; 你是否曾经在编程时需要对字符串进行反转&#xff0c;但却不知从何入手&#xff1f;Python3提供了简单易用的方法&#xff0c;帮助你更快地反转字符串。本文将介绍Python3中字符串反转的方法以及如何在代码中利…

chatgpt赋能Python-python3如何安装numpy

如何安装numpy&#xff1f; 介绍 在Python编程中&#xff0c;NumPy是使用最广泛的库之一。NumPy是数学和科学计算中的核心模块&#xff0c;主要用于处理数字数据&#xff0c;包括数组计算、线性代数、傅里叶变换、随机数生成等任务。在Python编程中&#xff0c;使用NumPy可以…