tinykv 项目地址:https://github.com/talent-plan/tinykv
本博客提供一个参考思路,未必是正确答案(但能够通过测试集),请注意甄别。欢迎在评论区讨论。
文章目录
- 修改 raft.go 中的函数
- 修改创建新 Raft 的代码
- 修改 sendRequestVote 函数
- 实现 sendAppend 函数
- 增设拒绝过半回退机制
- 增加 Step 函数对 MsgPropose 的处理
- 增设 Step 函数对 MsgAppendResponce 的处理
- 修改 handleRequestVote 函数
- 修改 handleHeartbeat 函数
- 完成 handleAppendEntries 函数
- 完成 log.go 中的函数
- 关于测试集
Project2a 的整体目标是实现 Raft 算法。作为其子任务,Project2ab 实现其中的日志复制(Log Replication)部分。Project2ab 的工程量适中,但是 tinykv 提供了很多很多测试样例,要想一次全部通过几乎不可能。在 debug 的过程中,我们可以完善自己的代码,同时对 Raft 共识算法获得更深的理解。
需要修改的文件:
raft/raft.go
raft/log.go
修改 raft.go 中的函数
由于 Project2ab 实现了日志复制,我们自然要在 Project2aa 不完善的代码上进行改动。
修改创建新 Raft 的代码
修改后newRaft
函数的代码如下所示。这里重点增加了这些成员的初始化:
Term
和Vote
:在 Raft 内部的 Storage 里面,存储着 Raft 的硬状态(hard state),该状态包含了 Raft 当前的Term
和当前的Vote
。Project2ab 并未对这个硬状态作过多解释,但是我猜测当一个 Raft 宕机时,所有易失存储丢失,而 Storage 中的这些硬状态还留着;那么它恢复(重启)的时候就会继承这个 Term 或者 Vote。不过我并没有在代码中实现 Raft 写其硬状态,估计后面还需要实现这个,在恰当的时机将 Raft 当前的状态写入硬状态。RaftLog
:通过配置c
中的存储来生成一个新的 RaftLog。函数newLog
在后面会实现。Prs
:这个成员存储了 cluster 中其他结点的一些状态,包括 Match 和 Next。只有 Leader 状态需要维护这个成员。Match 指的是某一成员的 RaftLog 和自己的 RaftLog 最后匹配的 index;Next 是自己将要向他们发送 AppendEntry RPC 时,Entry 条目的起始索引。Leader 通过看 Match 来判断一个 Entry 有没有成功复制到其他结点上。
// newRaft return a raft peer with the given config
func newRaft(c *Config) *Raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
// Your Code Here (2A).
hstate, _, _ := c.Storage.InitialState()
raft := &Raft{id: c.ID,
State: StateFollower,
Term: hstate.Term,
Vote: hstate.Vote,
heartbeatTimeout: c.HeartbeatTick,
electionTimeout: c.ElectionTick,
electionElapsed: -rand.Intn(c.ElectionTick),
Prs: make(map[uint64]*Progress),
votes: make(map[uint64]bool),
RaftLog: newLog(c.Storage),
}
for _, pr := range c.peers {
raft.votes[pr] = false
raft.Prs[pr] = &Progress{Match: 0, Next: 1}
}
return raft
}
修改 sendRequestVote 函数
Candidate 在给其他结点发送投票请求 RPC 时,还需要说明自己的 RaftLog 中最后的 Index 和 Term。当 candidate 的 RaftLog 比投票人自己的 RaftLog 还要旧时,投票人会拒绝这个请求。具体改动后如下:
// newly added: sendRequestVote sends a RequestVote RPC to the given peer.
func (r *Raft) sendRequestVote(to uint64) {
if r.State != StateCandidate {
return
}
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgRequestVote,
From: r.id,
To: to,
Term: r.Term,
Index: r.RaftLog.LastIndex(), // the last Index in the log
LogTerm: r.RaftLog.LastTerm(), // the last LogTerm in the log
})
}
实现 sendAppend 函数
把要发送给某个成员的 Entries 封装在 MsgAppend 请求中,保存在自己的邮箱里。要发哪些 Entries,看这个成员的 Next 值。如果自己没有 index 大于等于 Next 的 Entry,这说明我们无需向该成员发送 AppendEntry RPC。
当然,一旦消息产生,Leader 就自动更新这个成员的 Next,而不管它实际上是否收到了这条消息。消息在网络中丢失或损坏导致成员没有正确收到,后面自有办法补回这些 Entries。
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer. Returns true if a message was sent.
func (r *Raft) sendAppend(to uint64) bool {
// Your Code Here (2A).
logTerm, err := r.RaftLog.Term(r.Prs[to].Next - 1)
if err != nil {
return false
}
entries := r.RaftLog.entries[r.Prs[to].Next-r.RaftLog.entries[0].Index:]
if len(entries) == 0 || entries == nil {
return false
}
pentries := make([]*pb.Entry, len(entries))
for i, _ := range entries {
pentries[i] = &entries[i]
}
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgAppend,
From: r.id,
To: to,
Term: r.Term,
Index: r.Prs[to].Next - 1,
LogTerm: logTerm,
Commit: r.RaftLog.committed,
Entries: pentries,
})
r.Prs[to].Next = r.RaftLog.LastIndex() + 1
return true
}
增设拒绝过半回退机制
在 Project2aa 中,我为了偷懒没有实现“一个 candidate 在收到一半的拒绝时应该退回到 follower”的功能,然后被 Project2ab 的测试集制裁了。事实上,专门有一个文档raft/doc.go
供我们了解 tinykv 中 Raft 共识算法的实现细节。这个文档中说道(246-247行):
If candidate receives majority of votes of denials, it reverts back to follower.
首先为了实现这个细节,我们给Raft
结构体新增了两位成员。它们的意思显而易见,用于统计收到的支持票数和拒绝票数。
type Raft struct {
accepts uint64
rejects uint64
}
因此,我们需要修改becomeCandidate
函数。当一个结点成为候选人时,它需要重置自己的accepts
和rejects
:
// becomeCandidate transform this peer's state to candidate
func (r *Raft) becomeCandidate() {
// Your Code Here (2A).
r.Term++
r.Vote = r.id
r.State = StateCandidate
for pr, _ := range r.votes {
if pr == r.id {
r.votes[pr] = true
} else {
r.votes[pr] = false
}
}
r.accepts = 1
r.rejects = 0
r.electionElapsed = -rand.Intn(r.electionTimeout)
// extreme case: when the cluster has only 1 raft, it become leader immediately.
if len(r.votes) == 1 {
r.becomeLeader()
}
}
同时,我们也不再需要包装canBecomeLeader
这个函数了,当 candidate 收到选票(MsgVoteResponce)时,直接这样处理:
if m.Term == r.Term {
r.votes[m.From] = !m.Reject
if m.Reject {
r.rejects++
if 2*r.rejects >= uint64(len(r.votes)) {
r.becomeFollower(r.Term, r.Lead)
}
} else {
r.accepts++
if 2*r.accepts > uint64(len(r.votes)) {
r.becomeLeader()
}
}
}
增加 Step 函数对 MsgPropose 的处理
在文档raft/doc.go
中有这样一段描述(201-216行):
When ‘MessageType_MsgPropose’ is passed to the leader’s ‘Step’ method, the leader first calls the ‘appendEntry’ method to append entries to its log, and then calls ‘bcastAppend’ method to send those entries to its peers. When passed to candidate, ‘MessageType_MsgPropose’ is dropped. When passed to follower, ‘MessageType_MsgPropose’ is stored in follower’s mailbox(msgs) by the send method. It is stored with sender’s ID and later forwarded to the leader by rafthttp package.
意思是,leader 处理 MsgPropose 是先将里面的 Entries 拿出来,添加到自己的 log 里面,在转发给其他的 followers;candidate 不处理这个消息;follower 会把这个消息暂存在自己的信箱中,并把消息的To
重定向为 leader 的 id,以便后续将消息转发给 leader。
因此,Leader 处理这个消息的方式是:
if m.Entries == nil {
return nil
}
// first append the entries to raftlog
for _, e := range m.Entries {
r.RaftLog.AppendEntry(&pb.Entry{
EntryType: e.EntryType,
Term: r.Term,
Index: r.RaftLog.LastIndex() + 1,
Data: e.Data,
})
}
// renew local progress
r.Prs[r.id].Match = r.RaftLog.LastIndex()
r.Prs[r.id].Next = r.Prs[r.id].Match + 1
if len(r.votes) == 1 {
// entry appended markd as committed immediately if the cluster has only 1 raft
r.renewCommitted()
}
// then broadcast them
for pr, _ := range r.votes {
if pr != r.id {
r.sendAppend(pr)
}
}
而 Follower 处理的方式则很简单,如下:
// forward this message
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgPropose,
From: m.From,
To: r.Lead,
Entries: m.Entries,
})
这上面的renewCommitted
是我自定义的函数,目的是让 Leader 根据 Prs 的 Match 值判断日志的复制情况,并更新自己的 commit index。定义如下:
// newly added: renewCommitted marks those entries with most rafts having a replica as committed, returning true if committed is changed
func (r *Raft) renewCommitted() bool {
ret := false
r.Prs[r.id].Match = r.RaftLog.LastIndex()
r.Prs[r.id].Next = r.Prs[r.id].Match + 1
for count, ncommit := 0, r.RaftLog.committed+1; ; count = 0 {
term, err := r.RaftLog.Term(ncommit)
if err != nil {
return ret
}
// only log entry during current Term can be committed
if term != r.Term {
ncommit++
continue
}
for pr, _ := range r.Prs {
if r.Prs[pr].Match >= ncommit {
count++
}
}
if 2*count > len(r.Prs) {
r.RaftLog.committed = ncommit
ret = true
} else {
return ret
}
ncommit++
}
}
增设 Step 函数对 MsgAppendResponce 的处理
一个负责任的 leader 应该十分关注其 followers 返回过来的 AppendResponce RPC。
- 如果 follower 拒绝了,说明 leader 从 Next 开始发送的 Entries 和 follower 的并不匹配,这个 Next 需要回退,然后 leader 重发 Append RPC,直到这个 follower 接受为止。
- 如果 follower 接受了,更新响应的 Progress。同时,leader 还要看看有没有新增的可提交 Entry,从而决定自己的 commit index 是否要更新。
综上,处理过程如下:
if m.Reject {
r.Prs[m.From].Next--
r.sendAppend(m.From) // resend the entries
} else {
r.Prs[m.From].Match = m.Index
r.Prs[m.From].Next = m.Index + 1
r.renewCommitted()
}
修改 handleRequestVote 函数
前面提到,一个投票人收到 RequestVote RPC 时,如果 candidate 的日志比自己的旧,这个投票人是会选择拒绝的。而日志“新”的定义在 Raft 论文中有:
If the logs have last entries with different terms, then the log with the later term is more up-to-date. If the logs end with the same term, then whichever log is longer is more up-to-date
如果两个日志最后一项的 Term 不同,那 Term 大的更新。如果两个日志最后一项的 Term 恰好相同,那么更长的日志更新。
所以我们可以这样修改 handleRequestVote 函数:
// newly added: handleRequestVote handles RequestVote RPC request
func (r *Raft) handleRequestVote(m pb.Message) {
if m.MsgType != pb.MessageType_MsgRequestVote {
return
}
if m.Term < r.Term || m.Term == r.Term && (r.Vote != 0 && r.Vote != m.From) || m.LogTerm < r.RaftLog.LastTerm() || (m.LogTerm == r.RaftLog.LastTerm() && m.Index < r.RaftLog.LastIndex()) {
if m.Term > r.Term {
r.becomeFollower(m.Term, r.Lead)
}
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgRequestVoteResponse,
From: r.id,
To: m.From,
Term: r.Term,
Reject: true,
})
} else {
r.becomeFollower(m.Term, 0)
r.Vote = m.From
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgRequestVoteResponse,
From: r.id,
To: m.From,
Term: m.Term,
})
}
}
修改 handleHeartbeat 函数
Follower 收到 leader 的 Heartbeat RPC 后,会返回一个 Responce。这个 Responce 现在需要囊括 follower RaftLog 的新旧信息,以便 leader 统筹掌控。修改后的函数如下:
// handleHeartbeat handle Heartbeat RPC request
func (r *Raft) handleHeartbeat(m pb.Message) {
// Your Code Here (2A).
if m.Term < r.Term {
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgHeartbeatResponse,
From: r.id,
To: m.From,
Term: r.Term,
Reject: true,
})
} else {
r.becomeFollower(m.Term, m.From)
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgHeartbeatResponse,
From: r.id,
To: m.From,
Term: m.Term,
Index: r.RaftLog.LastIndex(),
LogTerm: r.RaftLog.LastTerm(),
})
}
}
完成 handleAppendEntries 函数
这个函数规定一个 follower 收到来自 leader 的 AppendEntry RPC 之后,如何更新自己的 RaftLog。具体而言,follower 会看 MsgAppend 中的Index
和LogTerm
,如果自己的 RaftLog 中对应的Index
上的 Entry 的 Term 不是Term
(即不匹配),则拒绝这个 RPC,并告诉 leader 自己的 RaftLog 目前最后一项的 Index 和 Term 是什么;否则,同意这个 RPC。
一旦同意 RPC,follower 就会查看自己的RaftLog.entries
和 RPC 消息中的m.Entries
。会有三种情况:
- 两者不冲突,且
m.Entries
中最后一项已经囊括在RaftLog.entries
中(m.Entries
为空也算这种情况)。此时 follower 自己的RaftLog.entries
不会改变。 - 两者不冲突,但是
m.Entries
中最后一项并不囊括在RaftLog.entries
中。那么m.Entries
中多出来的那些项就被天教导RaftLog.entries
后面。 - 两者冲突。冲突的含义是,
RaftLog.entries
和m.Entries
中分别存在一项 Entry,这两个 Entry 的Index
相同但是Term
不同。此时应该找到RaftLog.entries
中第一个冲突项的位置,该位置及其之后的 Entry 全部被删除;同时将m.Entries
中的新 Entry 加进来。
然后,follower 本身的 commit index 也会更新。论文中是这么说的:
If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
所谓的 index of last new entry 实际上就是m.Entries
中最后一项的 Index。如果m.Entries
为空,那么这个 index of last new entry 就是m.Index
。
据此,我们的代码可以完成如下:
// handleAppendEntries handle AppendEntries RPC request
func (r *Raft) handleAppendEntries(m pb.Message) {
// Your Code Here (2A).
if m.MsgType != pb.MessageType_MsgAppend {
return
}
if m.Term < r.Term {
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgAppendResponse,
From: r.id,
To: m.From,
Term: r.Term,
Reject: true,
})
} else {
r.becomeFollower(m.Term, m.From)
if term, err := r.RaftLog.Term(m.Index); err == nil && term == m.LogTerm {
index := m.Index + 1
lastEntryIndex := m.Index
if m.Entries != nil {
lastEntryIndex = m.Entries[len(m.Entries)-1].Index
for index <= r.RaftLog.LastIndex() && index <= m.Entries[len(m.Entries)-1].Index {
if term, _ = r.RaftLog.Term(index); term == m.Entries[index-m.Entries[0].Index].Term {
index++
} else {
r.RaftLog.storage.(*MemoryStorage).Append(r.RaftLog.entries[:index-r.RaftLog.entries[0].Index])
r.RaftLog.stabled = index - 1
r.RaftLog.entries = r.RaftLog.entries[:index-r.RaftLog.entries[0].Index]
for idx := index; idx <= m.Entries[len(m.Entries)-1].Index; idx++ {
r.RaftLog.entries = append(r.RaftLog.entries, *m.Entries[idx-m.Entries[0].Index])
}
break
}
}
if index == r.RaftLog.LastIndex()+1 {
r.RaftLog.storage.(*MemoryStorage).Append(r.RaftLog.entries[:index-r.RaftLog.entries[0].Index])
r.RaftLog.stabled = index - 1
for idx := index; idx <= m.Entries[len(m.Entries)-1].Index; idx++ {
r.RaftLog.entries = append(r.RaftLog.entries, *m.Entries[idx-m.Entries[0].Index])
}
}
}
if m.Commit > r.RaftLog.committed {
r.RaftLog.committed = min(m.Commit, lastEntryIndex)
}
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgAppendResponse,
From: r.id,
To: m.From,
Term: m.Term,
Index: r.RaftLog.LastIndex(),
LogTerm: r.RaftLog.LastTerm(),
Commit: r.RaftLog.committed,
})
} else {
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgAppendResponse,
From: r.id,
To: m.From,
Term: m.Term,
Reject: true,
})
}
}
}
完成 log.go 中的函数
首先,我们分析一下RaftLog
的结构。结构体的定义如下:
type RaftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
// log entries with index <= stabled are persisted to storage.
// It is used to record the logs that are not persisted by storage yet.
// Everytime handling `Ready`, the unstabled logs will be included.
stabled uint64
// all entries that have not yet compact.
entries []pb.Entry
// the incoming unstable snapshot, if any.
// (Used in 2C)
pendingSnapshot *pb.Snapshot
// Your Data Here (2A).
}
说白了这个 RaftLog 实际上是 Raft 结点位于内存上的一个日志数据,而RaftLog.storage
则是硬盘上的日志数据。结构体前面有一段说明文字:
// RaftLog manage the log entries, its struct look like:
//
// snapshot/first.....applied....committed....stabled.....last
// --------|------------------------------------------------|
// log entries
//
// for simplify the RaftLog implement should manage all log entries
// that not truncated
这个committed
就是我们之前说的 commit index,当 leader 确信大多数结点都有 Index 为Index
的 Entry 副本时,committed
就会被更新为Index
,表示这个 Entry 在 leader 这边处于已提交的状态。然后这个 commit index 会随着 AppendEntry RPC 发送给 followers,followers 收到了之后也会更新自己的 commit index,并告诉 leader 自己更新后的 commit index 是多少。Leader 随时监控所有结点的 commit index,一旦一个 Entry 在大多数结点上提交之后,leader 就会把这个 Entry 应用(apply)在自己的状态机上,并且理应告知其他 follower 也去 apply 响应的 Entry。
不过,我们 Project2ab 中暂时不需要 leader 更新自己的applied
。至于这个stabled
,我忘记是在哪看到的了,一个 Raft 一旦接受了新的 Entry,就要把它添加到自己的 storage 里面这下新的 Entry 就处于稳定状态了。可以在handleAppendEntries
函数里面看到这一行为:
r.RaftLog.storage.(*MemoryStorage).Append(r.RaftLog.entries[:index-r.RaftLog.entries[0].Index])
r.RaftLog.stabled = index - 1
然后,我们就可以开始完成函数了。首先实现newLog
函数。storage
里面本身存了一些 Entries(未压缩),然后有的 Entries 已经被压缩了;所以未压缩的这些 Entries,其首 Entry 的 Index 未必是 0。但是storage
里面未压缩的最后一个 Entry 肯定是最后一个稳定的 Entry,所以stabled
就是它的 Index。committed
存在于storage
的硬状态里面。applied
应该暂时不用管,并且我也不知道是否应该是snapshot.Metadata.Index
。
需要注意的是,storage.Entries()
方法是无法获得storage.ents[0]
的(因为这是一个 dummy entry),所以我们需要手动添加这一项。由于它是一个 dummy entry,所以我们只需关心这个 Entry 的 Index 和 Term 就行了。
// newLog returns log using the given storage. It recovers the log
// to the state that it just commits and applies the latest snapshot.
func newLog(storage Storage) *RaftLog {
// Your Code Here (2A).
hstate, _, _ := storage.InitialState()
snapshot, _ := storage.Snapshot()
fidx, _ := storage.FirstIndex()
lidx, _ := storage.LastIndex()
entries, _ := storage.Entries(fidx, lidx+1)
return &RaftLog{
storage: storage,
committed: hstate.Commit,
applied: snapshot.Metadata.Index,
stabled: lidx,
entries: append([]pb.Entry{{
Index: snapshot.Metadata.Index,
Term: snapshot.Metadata.Term,
}}, entries...),
// pendingSnapshot: , // waiting for complement
}
}
随后的函数都很好完成。注意RaftLog.entries[0]
是一个 dummy entry,这个 entry 仅仅起到定位的作用,即RaftLog.entries[i]
对应的 entry,其实际的 Index 值为i + RaftLog.entries[0].Index
。并且,RaftLog.Term()
的实现可以参考raft.storage.go
中MemoryStorage.Entries()
的实现。
// allEntries return all the entries not compacted.
// note, exclude any dummy entries from the return value.
// note, this is one of the test stub functions you need to implement.
func (l *RaftLog) allEntries() []pb.Entry {
// Your Code Here (2A).
return l.entries[1:]
}
// unstableEntries return all the unstable entries
func (l *RaftLog) unstableEntries() []pb.Entry {
// Your Code Here (2A).
return l.entries[l.stabled+1-l.entries[0].Index:]
}
// nextEnts returns all the committed but not applied entries
func (l *RaftLog) nextEnts() (ents []pb.Entry) {
// Your Code Here (2A).
return l.entries[l.applied+1-l.entries[0].Index : l.committed+1-l.entries[0].Index]
}
// LastIndex return the last index of the log entries
func (l *RaftLog) LastIndex() uint64 {
// Your Code Here (2A).
return l.entries[len(l.entries)-1].Index
}
// Term return the term of the entry in the given index
func (l *RaftLog) Term(i uint64) (uint64, error) {
// Your Code Here (2A).
offset := l.entries[0].Index
if i < offset {
return 0, ErrCompacted
}
if int(i-offset) >= len(l.entries) {
return 0, ErrUnavailable
}
return l.entries[i-offset].Term, nil
}
同时,自定义了一些新的函数,以方便代码编写过程。
// newly added: LastTerm returns the last term of the log entries
func (l *RaftLog) LastTerm() uint64 {
lastTerm, _ := l.Term(l.LastIndex())
return lastTerm
}
// newly added: AppendEntry appends an entry to the log
func (l *RaftLog) AppendEntry(e *pb.Entry) {
l.entries = append(l.entries, *e)
}
至此,Project2ab 也就完成了。
关于测试集
事实上,如果完成了上面的代码,测试集不能完全通过。问题在于下面的讨论:
Leader 更新自己的 commit index 值后,是立刻给 followers 发送 AppendEntry RPC 以告知其最新的 committed,还是等待下一次客户端请求到来时随新的 AppendEntry RPC 一起发送?
Project2ab 的测试集中,TestLeaderSyncFollowerLog2AB
和TestLogReplication2AB
的要求是前者,而TestLeaderCommitEntry2AB
的要求是后者。测试集本身自相矛盾,除非面向结果编程,否则无论怎么处理都无法通过全部测试集。Raft 原论文中采取的实现方式是后者,因此我也遵照原论文的实现方式,从而测试集中的TestLeaderSyncFollowerLog2AB
和TestLogReplication2AB
无法通过,期待 PingCAP 能够更新相应的测试逻辑。
而我认为从 tinykv 中学到东西是最重要的,我们不应拘泥于具体细节,两种实现方式都应是可接受的。