Raft算法之Leader选举
一、Leader选举概述
Raft 使用心跳(heartbeat)触发Leader选举。当服务器启动时,初始化为Follower。Leader向所有Followers周期性发送heartbeat。如果Follower在选举超时时间内没有收到Leader的heartbeat,就会等待一段随机的时间后发起一次Leader选举。
Follower将其当前term加一然后转换为Candidate。它首先给自己投票并且给集群中的其他服务器发送 RequestVote RPC 。结果有以下三种情况:
1、赢得了多数的选票,成功选举为Leader;
2、收到了Leader的消息,表示有其它服务器已经抢先当选了Leader;
3、没有服务器赢得多数的选票,Leader选举失败,等待选举时间超时后发起下一次选举。
选举出Leader后,Leader通过定期向所有Followers发送心跳信息维持其统治。若Follower一段时间未收到Leader的心跳则认为Leader可能已经挂了,会再次发起Leader选举过程。
二、ETCD中raft模块的Leader选举
大多数Raft算法的实现都有一个庞大的设计,包括存储处理,消息序列化,以及网络传输。etcd中的raft遵循最小设计哲学,仅仅实现Raft算法的核心。
首先看raft模块里面启动节点的入口代码:
// raft/node.go 文件
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
//
// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
rn, err := NewRawNode(c) // ref-1 创建初始的节点(Node)
if err != nil {
panic(err)
}
rn.Bootstrap(peers) // ref-2 对节点进行引导
n := newNode(rn)
go n.run() // ref-3 让节点跑起来
return &n
}
ref-1处的代码会创建一个初始的节点,细节如下:
// raft/rawnode.go文件
// NewRawNode instantiates a RawNode from the given configuration.
//
// See Bootstrap() for bootstrapping an initial state; this replaces the former
// 'peers' argument to this method (with identical behavior). However, It is
// recommended that instead of calling Bootstrap, applications bootstrap their
// state manually by setting up a Storage that has a first index > 1 and which
// stores the desired ConfState as its InitialState.
func NewRawNode(config *Config) (*RawNode, error) {
r := newRaft(config) // ref-4 创建一个raft出来,我理解这儿的raft就代表一个集群
rn := &RawNode{
raft: r,
}
rn.prevSoftSt = r.softState()
rn.prevHardSt = r.hardState()
return rn, nil
}
ref-4会创建一个raft出来,详细代码如下:
// raft/raft.go文件
func newRaft(c *Config) *raft {
...... // 省略
r.becomeFollower(r.Term, None) // ref-5 变更当前节点角色为Follower
var nodesStrs []string
for _, n := range r.prs.VoterNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
return r
}
节点角色一共分为Follower、Leader和candidate,ref-5处代码会切换角色为Follower,并且传递的from参数是None。我理解这是一个初始的状态。
接下来我们看ref-2处的代码细节,看看是初始节点是如何启动的:
// raft/bootstrap.go文件
// Bootstrap initializes the RawNode for first use by appending configuration
// changes for the supplied peers. This method returns an error if the Storage
// is nonempty.
//
// It is recommended that instead of calling this method, applications bootstrap
// their state manually by setting up a Storage that has a first index > 1 and
// which stores the desired ConfState as its InitialState.
func (rn *RawNode) Bootstrap(peers []Peer) error {
if len(peers) == 0 {
return errors.New("must provide at least one peer to Bootstrap")
}
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
if err != nil {
return err
}
if lastIndex != 0 {
return errors.New("can't bootstrap a nonempty Storage")
}
// We've faked out initial entries above, but nothing has been
// persisted. Start with an empty HardState (thus the first Ready will
// emit a HardState update for the app to persist).
rn.prevHardSt = emptyState
// TODO(tbg): remove StartNode and give the application the right tools to
// bootstrap the initial membership in a cleaner way.
rn.raft.becomeFollower(1, None) // ref-6 变更角色为follower
ents := make([]pb.Entry, len(peers))
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
data, err := cc.Marshal()
if err != nil {
return err
}
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
}
rn.raft.raftLog.append(ents...)
// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
//
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}
return nil
}
ref-6处的代码会将角色变更为follower,注意它的term传递的是1.
我们接着看ref-3处的代码,看看节点是如何run起来的,代码细节如下:
// raft/node.go文件
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
if lead != r.lead { // ref-7 这儿再判断leader是否发生了变更
if r.hasLeader() { // 判断是否有leader
if lead == None { // 初始状态,从没有leader的情况选举出来了leader
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else { // leader变更的情况
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
}
propc = n.propc
} else { // 集群失去leader
r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
propc = nil
}
lead = r.lead
}
select {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case pm := <-propc:
m := pm.m // 收到的消息
m.From = r.id
err := r.Step(m) // ref-8
if pm.result != nil {
pm.result <- err
close(pm.result)
}
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m) // ref-9
}
case cc := <-n.confc:
_, okBefore := r.prs.Progress[r.id]
cs := r.applyConfChange(cc)
// If the node was removed, block incoming proposals. Note that we
// only do this if the node was in the config before. Nodes may be
// a member of the group without knowing this (when they're catching
// up on the log and don't have the latest config) and we don't want
// to block the proposal channel in that case.
//
// NB: propc is reset when the leader changes, which, if we learn
// about it, sort of implies that we got readded, maybe? This isn't
// very sound and likely has bugs.
if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
var found bool
outer:
for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
for _, id := range sl {
if id == r.id {
found = true
break outer
}
}
}
if !found {
propc = nil
}
}
select {
case n.confstatec <- cs:
case <-n.done:
}
case <-n.tickc:
n.rn.Tick()
case readyc <- rd:
n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
}
}
}
注意ref-8和ref-9处的step(m)函数,他们就是在处理具体的投票信息,函数细节如下:
// raft/raft.go文件
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
switch { // 这个switch是在处理任期数据
case m.Term == 0:
// local message
case m.Term > r.Term
// 大概意思是要是在超时时间内收到了来自当前leader的投票请求,那么就不会更新自己的term,也不会再授予投票
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
}
switch {
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject: // 要是term是来自拒绝我们的投票的节点,那么我们就会成为一个follower
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
// 变更自己成为一个follower
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this could
// also mean that this node has advanced its term number during a network
// partition, and it is now unable to either win an election or to rejoin
// the majority on the old term. If checkQuorum is false, this will be
// handled by incrementing term numbers in response to MsgVote with a
// higher term, but if checkQuorum is true we may not advance the term on
// MsgVote and must generate other messages to advance the term. The net
// result of these two features is to minimize the disruption caused by
// nodes that have been removed from the cluster's configuration: a
// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
// but it will not receive MsgApp or MsgHeartbeat, so it will not create
// disruptive term increases, by notifying leader of this node's activeness.
// The above comments also true for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
} else if m.Type == pb.MsgPreVote {
// Before Pre-Vote enable, there may have candidate with higher term,
// but less log. After update to Pre-Vote, the cluster may deadlock if
// we drop messages with a lower term.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
} else {
// ignore other cases
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
}
return nil
}
switch m.Type { // 这个switch是在处理消息类型
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection) // 处理选举消息 // ref-10
} else {
r.hup(campaignElection) // 处理选举消息 // ref-11
}
case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
// Note: it turns out that that learners must be allowed to cast votes.
// This seems counter- intuitive but is necessary in the situation in which
// a learner has been promoted (i.e. is now a voter) but has not learned
// about this yet.
// For example, consider a group in which id=1 is a learner and id=2 and
// id=3 are voters. A configuration change promoting 1 can be committed on
// the quorum `{2,3}` without the config change being appended to the
// learner's log. If the leader (say 2) fails, there are de facto two
// voters remaining. Only 3 can win an election (due to its log containing
// all committed entries), but to do so it will need 1 to vote. But 1
// considers itself a learner and will continue to do so until 3 has
// stepped up as leader, replicates the conf change to 1, and 1 applies it.
// Ultimately, by receiving a request to vote, the learner realizes that
// the candidate believes it to be a voter, and that it should act
// accordingly. The candidate's config may be stale, too; but in that case
// it won't win the election, at least in the absence of the bug discussed
// in:
// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
// it's local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
default:
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}
ref-10和ref-11调用的hup函数如下所示:
// raft/raft.go文件
func (r *raft) hup(t CampaignType) {
if r.state == StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t) // ref-12
}
ref-12处代码调用的campaign函数如下所示:
// raft/raft.go文件
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
if !r.promotable() {
// This path should not be hit (callers are supposed to check), but
// better safe than sorry.
r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
}
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection { // 选举的第一阶段
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else { // 选举的第二阶段
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
// 拉取投票结果
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
// 发送投票请求
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
参考资料
1、Raft算法详解