文章目录
- 前言
- 一、项目大纲
- 二、Raft模块
- 1.Raft介绍
- 2.大致内容
- Leader与选举
- 日志同步、心跳
- raft日志的两个特点
- 3.主要流程
- 1. raft类的定义
- 2.启动初始化
- 3.竞选leader
- electionTimeOutTicker:
- doElection
- sendRequestVote
- RequestVote
- 4.日志复制、心跳
- leaderHearBeatTicker
- doHeartBeat
- sendAppendEntries
- AppendEntries
- 5.日志寻找匹配加速
- 6.其它
- 待续
前言
构建一种基于Raft一致性算法的分布式键值存储数据库,以确保数据的一致性、可用性和分区容错性
一、项目大纲
- raft节点:raft算法实现的核心层,负责与其他机器的raft节点沟通,达到 分布式共识 的目的。
- raftServer:负责raft节点与k-v数据库中间的协调服务;负责持久化k-v数据库的数据(可选)。
- 上层状态机(k-v数据库):负责数据存储。
- 持久层:负责相关数据的落盘,对于raft节点,根据共识算法要求,必须对一些关键数据进行落盘处理,以保证节点宕机后重启程序可以恢复关键数据;对于raftServer,可能会有一些k-v数据库的东西需要落盘持久化。
- RPC通信:在 领导者选举、日志复制、数据查询、心跳等多个Raft重要过程中提供多节点快速简单的通信能力。
二、Raft模块
1.Raft介绍
参考
文章: 两万字长文解析raft算法原理
视频: 解析分布式共识算法之Raft算法
本项目用Raft解决的问题:
- 一致性: 通过Raft算法确保数据的强一致性,使得系统在正常和异常情况下都能够提供一致的数据视图。
- 可用性: 通过分布式节点的复制和自动故障转移,实现高可用性,即使在部分节点故障的情况下,系统依然能够提供服务。
- 分区容错: 处理网络分区的情况,确保系统在分区恢复后能够自动合并数据一致性。
2.大致内容
详细的看上面参考网站
Leader与选举
Raft是一个强Leader 模型,可以粗暴理解成Leader负责统领follower,如果Leader出现故障,那么整个集群都会对外停止服务,直到选举出下一个Leader。
- 节点之间通过网络通信,其他节点(follower)如何知道leader出现故障?
leader会定时向集群中剩下的节点(follower)发送AppendEntry
(作为心跳,hearbeat )以通知自己仍然存活。
可以推知,如果follower在一段时间内没有接收leader发送的AppendEntry
,那么follower就会认为当前的leader 出现故障,从而发起选举。
判断心跳超时,可以用一个定时器和一个标志位来实现,每到定时时间检查这期间有无AppendEntry
即可。 - AppendEntry作用
- 心跳
- 携带日志entry及其辅助信息,以控制日志的同步和日志向状态机提交
- 通告leader的index和term等关键信息以便follower对比确认follower自己或者leader是否过期
- follower知道leader出现故障后如何选举出leader?
follower认为leader故障后只能通过:term增加,变成candidate,向其他节点发起RequestVoteRPC
申请其他follower的选票,过一段时间之后会发生如下情况:- 赢得选举,马上成为leader (此时term已经增加了)
- 发现有符合要求的leader,自己马上变成follower 了,这个符合要求包括:leader的term≥自己的term
- 一轮选举结束,无人变成leader,那么循环这个过程
- 为了防止在同一时间有太多的follower转变为candidate导致一直无法选出leader, Raft 采用了随机选举超时(randomized election timeouts)的机制, 每一个candidate 在发起选举后,都会随机化一个新的选举超时时间。
- 符合什么条件的节点可以成为leader?
也可以称为“选举限制”,有限制的目的是为了保证选举出的 leader 一定包含了整个集群中目前已 committed 的所有日志。
当 candidate 发送RequestVoteRPC
时,会带上最后一个 entry 的信息。 所有的节点收到该请求后,都会比对自己的日志,如果发现自己的日志更新一些,则会拒绝投票给该 candidate。
需要比较两个东西:最新日志entry的term和对应的index。index为日志entry在整个日志的索引。
if 两个节点最新日志entry的term不同
term大的日志更新
else
最新日志entry的index大的更新
end
这样的限制可以保证:成为leader的节点,其日志已经是多数节点中最完备的,即包含了整个集群的所有 committed entries。
日志同步、心跳
在RPC中 日志同步 和 心跳 是放在一个RPC函数AppendEntryRPC
中来实现的,原因为:
- 心跳RPC 可以看成是没有携带日志的特殊的日志同步RPC。
- 对于一个follower,如果leader认为其日志已经和自己匹配了,那么在
AppendEntryRPC
中不用携带日志(再携带日志属于无效信息了,但其他信息依然要携带),反之如果follower的日志只有部分匹配,那么就需要在AppendEntryRPC
中携带对应的日志。
- 为什么不直接让follower拷贝leader的日志 或者 leader发送全部的日志给follower?
leader发送日志的目的是让follower同步自己的日志,当然可以让leader发送自己全部的日志给follower,然后follower接收后就覆盖自己原有的日志,但是这样就会携带大量的无效的日志(因为这些日志follower本身就有)。
因此raft的方式是:先找到日志不匹配的那个点,然后只同步那个点之后的日志。 - leader如何知道follower的日志是否与自己完全匹配?
在AppendEntryRPC
中携带上 entry的index和对应的term(日志的term),可以通过比较最后一个日志的index和term来得出某个follower日志是否匹配。 - 如果发现不匹配,那么如何知道哪部分日志是匹配的,哪部分日志是不匹配的呢?
leader每次发送AppendEntryRPC
后,follower都会根据其entry的index和对应的term来判断某一个日志是否匹配。
在leader刚当选,会从最后一个日志开始判断是否匹配,如果匹配,那么后续发送AppendEntryRPC
就不需要携带日志entry了。
如果不匹配,那么下一次就发送 倒数第2个 日志entry的index和其对应的term来判断匹配,
如果还不匹配,那么依旧重复这个过程,直到遇到一个匹配的日志。
raft日志的两个特点
- 两个节点的日志中,有两个 entry 拥有相同的 index 和 term,那么它们一定记录了相同的内容/操作,即两个日志匹配
- 两个节点的日志中,有两个 entry 拥有相同的 index 和 term,那么它们前面的日志entry也相同
- 如何保证?
- 保证第一点:仅有 leader 可以生成 entry,保证一致性
- 保证第二点:leader 在通过
AppendEntriesRPC
和 follower 通讯时,除了带上自己的term等信息外,还会带上entry的index和对应的term等信息,follower在接收到后通过对比就可以知道自己与leader的日志是否匹配,不匹配则拒绝请求。
leader发现follower拒绝后就知道entry不匹配,那么下一次就会尝试匹配前一个entry,直到遇到一个entry匹配,并将不匹配的entry给删除(覆盖)。
3.主要流程
1. raft类的定义
class Raft :
{
private:
std::mutex m_mtx;
std::vector<std::shared_ptr< RaftRpc >> m_peers; //需要与其他raft节点通信,这里保存与其他结点通信的rpc入口
std::shared_ptr<Persister> m_persister; //持久化层,负责raft数据的持久化
int m_me; //raft是以集群启动,这个用来标识自己的的编号
int m_currentTerm; //记录当前的term
int m_votedFor; //记录当前term给谁投票过
std::vector<mprrpc:: LogEntry> m_logs; 日志条目数组,包含了状态机要执行的指令集,以及收到领导时的任期号
// 这两个状态所有结点都在维护,易失
int m_commitIndex;
int m_lastApplied; // 已经汇报给状态机(上层应用)的log 的index
// 这两个状态是由leader来维护,易失 ,这两个部分在内容补充的部分也会再讲解
// 这两个状态的下标1开始,因为通常commitIndex和lastApplied从0开始,应该是一个无效的index,因此下标从1开始
std::vector<int> m_nextIndex; //领导者使用 m_nextIndex 来确定需要发送给追随者的下一批日志条目。
std::vector<int> m_matchIndex; //追随者使用 m_matchIndex 来记录已经成功复制的日志条目,并向领导者发送确认信息。
enum Status
{
Follower,
Candidate,
Leader
};
// 保存当前身份
Status m_status;
std::shared_ptr<LockQueue<ApplyMsg>> applyChan; // client从这里取日志,client与raft通信的接口
// ApplyMsgQueue chan ApplyMsg // raft内部使用的chan,applyChan是用于和服务层交互,最后好像没用上
// 选举超时
std::chrono::_V2::system_clock::time_point m_lastResetElectionTime;
// 心跳超时,用于leader
std::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime;
// 用于传入快照点
// 储存了快照中的最后一个日志的Index和Term
int m_lastSnapshotIncludeIndex;
int m_lastSnapshotIncludeTerm;
public:
void AppendEntries1(const mprrpc::AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply); //日志同步 + 心跳 rpc ,重点关注
void applierTicker(); //定期向状态机写入日志,非重点函数
bool CondInstallSnapshot(int lastIncludedTerm, int lastIncludedIndex, std::string snapshot); //快照相关,非重点
void doElection(); //发起选举
void doHeartBeat(); //leader定时发起心跳
// 每隔一段时间检查睡眠时间内有没有重置定时器,没有则说明超时了
// 如果有则设置合适睡眠时间:睡眠到重置时间+超时时间
void electionTimeOutTicker(); //监控是否该发起选举了
std::vector<ApplyMsg> getApplyLogs();
int getNewCommandIndex();
void getPrevLogInfo(int server, int *preIndex, int *preTerm);
void GetState(int *term, bool *isLeader); //看当前节点是否是leader
void InstallSnapshot( const mprrpc::InstallSnapshotRequest *args, mprrpc::InstallSnapshotResponse *reply);
void leaderHearBeatTicker(); //检查是否需要发起心跳(leader)
void leaderSendSnapShot(int server);
void leaderUpdateCommitIndex(); //leader更新commitIndex
bool matchLog(int logIndex, int logTerm); //对应Index的日志是否匹配,只需要Index和Term就可以知道是否匹配
void persist(); //持久化
void RequestVote(const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply); //变成candidate之后需要让其他结点给自己投票
bool UpToDate(int index, int term); //判断当前节点是否含有最新的日志
int getLastLogIndex();
void getLastLogIndexAndTerm(int *lastLogIndex, int *lastLogTerm);
int getLogTermFromLogIndex(int logIndex);
int GetRaftStateSize();
int getSlicesIndexFromLogIndex(int logIndex); //设计快照之后logIndex不能与在日志中的数组下标相等了,根据logIndex找到其在日志数组中的位置
bool sendRequestVote(int server , std::shared_ptr<mprrpc::RequestVoteArgs> args , std::shared_ptr<mprrpc::RequestVoteReply> reply, std::shared_ptr<int> votedNum) ; // 请求其他结点的投票
bool sendAppendEntries(int server ,std::shared_ptr<mprrpc::AppendEntriesArgs> args , std::shared_ptr<mprrpc::AppendEntriesReply> reply , std::shared_ptr<int> appendNums ) ; //Leader发送心跳后,对心跳的回复进行对应的处理
//rf.applyChan <- msg //不拿锁执行 可以单独创建一个线程执行,但是为了同意使用std:thread ,避免使用pthread_create,因此专门写一个函数来执行
void pushMsgToKvServer(ApplyMsg msg); //给上层的kvserver层发送消息
void readPersist(std::string data);
std::string persistData();
void Start(Op command,int* newLogIndex,int* newLogTerm,bool* isLeader ) ; // 发布发来一个新日志
// 即kv-server主动发起,请求raft(持久层)保存snapshot里面的数据,index是用来表示snapshot快照执行到了哪条命令
void Snapshot(int index , std::string snapshot );
public:
void init(std::vector<std::shared_ptr< RaftRpc >> peers,int me,std::shared_ptr<Persister> persister,std::shared_ptr<LockQueue<ApplyMsg>> applyCh); //初始化
关键函数:
- Raft的主要流程:
- 领导选举(
sendRequestVote
,RequestVote
) - 日志同步、心跳(
sendAppendEntries
,AppendEntries
)
- 定时器的维护:
- Raft向状态机定时写入(
applierTicker
) - 心跳维护定时器(
leaderHearBeatTicker
) - 选举超时定时器(
electionTimeOutTicker
)
- 持久化相关:
- 哪些内容需要持久化,什么时候需要持久化(persist)
2.启动初始化
void Raft::init(std::vector<std::shared_ptr<RaftRpc>> peers, int me, std::shared_ptr<Persister> persister, std::shared_ptr<LockQueue<ApplyMsg>> applyCh) {
m_peers = peers; //与其他结点沟通的rpc类
m_persister = persister; //持久化类
m_me = me; //标记自己,毕竟不能给自己发送rpc吧
m_mtx.lock();
//applier
this->applyChan = applyCh; //与kv-server沟通
// rf.ApplyMsgQueue = make(chan ApplyMsg)
m_currentTerm = 0; //初始化term为0
m_status = Follower; //初始化身份为follower
m_commitIndex = 0;
m_lastApplied = 0;
m_logs.clear();
for (int i =0;i<m_peers.size();i++){
m_matchIndex.push_back(0);
m_nextIndex.push_back(0);
}
m_votedFor = -1; //当前term没有给其他人投过票就用-1表示
m_lastSnapshotIncludeIndex = 0;
m_lastSnapshotIncludeTerm = 0;
m_lastResetElectionTime = now();
m_lastResetHearBeatTime = now();
// initialize from state persisted before a crash
readPersist(m_persister->ReadRaftState());
if(m_lastSnapshotIncludeIndex > 0){
m_lastApplied = m_lastSnapshotIncludeIndex;
//rf.commitIndex = rf.lastSnapshotIncludeIndex 崩溃恢复不能读取commitIndex
}
m_mtx.unlock();
// start ticker 开始三个定时器
std::thread t(&Raft::leaderHearBeatTicker, this);
t.detach();
std::thread t2(&Raft::electionTimeOutTicker, this);
t2.detach();
std::thread t3(&Raft::applierTicker, this);
t3.detach();
}
从上面可以看到一共产生了三个定时器,分别维护:选举、日志同步和心跳、raft节点与kv-server的联系。相互之间是比较隔离的
3.竞选leader
在Raft算法中,每个节点(无论是追随者(follower)还是候选人(candidate))都有一个选举定时器。如果追随者在一定的时间内没有收到任何来自领导者或候选人的消息,它会认为当前没有有效的领导者,然后启动选举定时器。一旦定时器到期,追随者会转换为候选人状态,并开始新一轮的领导者选举。
electionTimeOutTicker
:负责查看是否该发起选举,如果该发起选举就执行doElection发起选举。doElection
:实际发起选举,构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。sendRequestVote
:负责发送选举中的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。RequestVote
:接收别人发来的选举请求,主要检验是否要给对方投票。
electionTimeOutTicker:
选举定时器,选举超时由electionTimeOutTicker
维护。
void Raft::electionTimeOutTicker() {
// Check if a Leader election should be started.
while (true) {
m_mtx.lock();
auto nowTime = now(); //睡眠前记录时间
auto suitableSleepTime = getRandomizedElectionTimeout() + m_lastResetElectionTime - nowTime;
m_mtx.unlock();
if (suitableSleepTime.count() > 1) {
std::this_thread::sleep_for(suitableSleepTime);
}
if ((m_lastResetElectionTime - nowTime).count() > 0) { //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
continue;
}
doElection();
}
}
在死循环中,
首先计算距离上次重置选举计时器的时间m_lastResetElectionTime
- nowTime
加上随机化的选举超时时间getRandomizedElectionTimeout
,
计算得到距离下一次超时应该睡眠的时间suitableSleepTime
,然后线程根据这个时间决定是否睡眠。
若超时时间未到,线程进入睡眠状态,若在此期间选举计时器被重置,则继续循环。
若超时时间已到,调用doElection()
函数启动领导者选举过程。
随机化的选举超时时间是为了避免多个追随者几乎同时成为候选人,导致选举失败
doElection
启动领导者选举过程
void Raft::doElection() {
lock_guard<mutex> g(m_mtx); //c11新特性,使用raii避免死锁
if (m_status != Leader) {
DPrintf("[ ticker-func-rf(%d) ] 选举定时器到期且不是leader,开始选举 \n", m_me);
//当选举的时候定时器超时就必须重新选举,不然没有选票就会一直卡住
//重竞选超时,term也会增加的
m_status = Candidate;
///开始新一轮的选举
m_currentTerm += 1; //无论是刚开始竞选,还是超时重新竞选,term都要增加
m_votedFor = m_me; //即是自己给自己投票,也避免candidate给同辈的candidate投
persist();
std::shared_ptr<int> votedNum = std::make_shared<int>(1); // 使用 make_shared 函数初始化
// 重新设置定时器
m_lastResetElectionTime = now();
// 发布RequestVote RPC
for (int i = 0; i < m_peers.size(); i++) {
if (i == m_me) {
continue;
}
int lastLogIndex = -1, lastLogTerm = -1;
getLastLogIndexAndTerm(&lastLogIndex, &lastLogTerm);//获取最后一个log的term和下标,以添加到RPC的发送
//初始化发送参数
std::shared_ptr<mprrpc::RequestVoteArgs> requestVoteArgs = std::make_shared<mprrpc::RequestVoteArgs>();
requestVoteArgs->set_term(m_currentTerm);
requestVoteArgs->set_candidateid(m_me);
requestVoteArgs->set_lastlogindex(lastLogIndex);
requestVoteArgs->set_lastlogterm(lastLogTerm);
std::shared_ptr<mprrpc::RequestVoteReply> requestVoteReply = std::make_shared<mprrpc::RequestVoteReply>();
//使用匿名函数执行避免其拿到锁
std::thread t(&Raft::sendRequestVote, this, i, requestVoteArgs, requestVoteReply,
votedNum); // 创建新线程并执行函数,并传递参数
t.detach();
}
}
}
sendRequestVote
根据调用RequestVote
得到的reply响应结果,对发起投票的候选者状态进行更新
bool Raft::sendRequestVote(int server, std::shared_ptr<mprrpc::RequestVoteArgs> args, std::shared_ptr<mprrpc::RequestVoteReply> reply,
std::shared_ptr<int> votedNum) {
bool ok = m_peers[server]->RequestVote(args.get(),reply.get());
if (!ok) {
return ok;//rpc通信失败就立即返回,避免资源消耗
}
lock_guard<mutex> lg(m_mtx);
if(reply->term() > m_currentTerm){
//回复的term比自己大,说明自己落后了,那么就更新自己的状态并且退出
m_status = Follower; //三变:身份,term,和投票
m_currentTerm = reply->term();
m_votedFor = -1; //term更新了,那么这个term自己肯定没投过票,为-1
persist(); //持久化
return true;
} else if ( reply->term() < m_currentTerm ) {
//回复的term比自己的term小,不应该出现这种情况
return true;
}
if(!reply->votegranted()){ //这个节点因为某些原因没给自己投票,没啥好说的,结束本函数
return true;
}
//给自己投票了
*votedNum = *votedNum + 1; //voteNum多一个
if (*votedNum >= m_peers.size()/2+1) {
//变成leader
*votedNum = 0; //重置voteDNum,如果不重置,那么就会变成leader很多次,是没有必要的,甚至是错误的!!!
// 第一次变成leader,初始化状态和nextIndex、matchIndex
m_status = Leader;
int lastLogIndex = getLastLogIndex();
for (int i = 0; i <m_nextIndex.size() ; i++) {
m_nextIndex[i] = lastLogIndex + 1 ;//有效下标从1开始,因此要+1
m_matchIndex[i] = 0; //每换一个领导都是从0开始,见论文的fig2
}
std::thread t(&Raft::doHeartBeat, this); //马上向其他节点宣告自己就是leader
t.detach();
persist();
}
return true;
}
RequestVote
得到投票请求后,节点根据传递来的信息进行判断是否对其投票,构造reply返回值
void Raft::RequestVote( const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply) {
lock_guard<mutex> lg(m_mtx);
Defer ec1([this]() -> void { //应该先持久化,再撤销lock,因此这个写在lock后面
this->persist();
});
//对args的term的三种情况分别进行处理,大于小于等于自己的term都是不同的处理
//reason: 出现网络分区,该竞选者已经OutOfDate(过时)
if (args->term() < m_currentTerm) {
reply->set_term(m_currentTerm);
reply->set_votestate(Expire);
reply->set_votegranted(false);
return;
}
//论文fig2:右下角,如果任何时候rpc请求或者响应的term大于自己的term,更新term,并变成follower
if (args->term() > m_currentTerm) {
m_status = Follower;
m_currentTerm = args->term();
m_votedFor = -1;
// 重置定时器:收到leader的ae,开始选举,透出票
//这时候更新了term之后,votedFor也要置为-1
}
// 现在节点任期都是相同的(任期小的也已经更新到新的args的term了)
// 要检查log的term和index是不是匹配的了
int lastLogTerm = getLastLogIndex();
//只有没投票,且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票
if (!UpToDate(args->lastlogindex(), args->lastlogterm())) {
//日志太旧了
reply->set_term(m_currentTerm);
reply->set_votestate(Voted);
reply->set_votegranted(false);
return;
}
// 当因为网络质量不好导致的请求丢失重发就有可能!!!!
// 因此需要避免重复投票
if (m_votedFor != -1 && m_votedFor != args->candidateid()) {
reply->set_term(m_currentTerm);
reply->set_votestate(Voted);
reply->set_votegranted(false);
return;
} else {
//同意投票
m_votedFor = args->candidateid();
m_lastResetElectionTime = now();//认为必须要在投出票的时候才重置定时器,
reply->set_term(m_currentTerm);
reply->set_votestate(Normal);
reply->set_votegranted(true);
return;
}
}
4.日志复制、心跳
leaderHearBeatTicker
:负责查看是否该发送心跳了,如果该发起就执行doHeartBeat
。doHeartBeat
:实际发送心跳,判断到底是构造需要发送的rpc,并多线程调用sendRequestVote
处理rpc及其相应。sendAppendEntries
:负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。leaderSendSnapShot
:负责发送快照的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。AppendEntries
:接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。InstallSnapshot
:接收leader发来的快照请求,同步快照到本机。
leaderHearBeatTicker
void Raft::leaderHearBeatTicker() {
while (true) {
auto nowTime = now();
m_mtx.lock();
auto suitableSleepTime = std::chrono::milliseconds(HeartBeatTimeout) + m_lastResetHearBeatTime - nowTime;
m_mtx.unlock();
if (suitableSleepTime.count() < 1) {
suitableSleepTime = std::chrono::milliseconds(1);
}
std::this_thread::sleep_for(suitableSleepTime);
if ((m_lastResetHearBeatTime - nowTime).count() > 0) { //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
continue;
}
doHeartBeat();
}
}
其基本逻辑和选举定时器electionTimeOutTicker
一模一样
不一样之处在于设置的休眠时间不同,这里是根据HeartBeatTimeout
来设置,固定时间。
而electionTimeOutTicker
中是根据getRandomizedElectionTimeout()
设置,随机一个时间。
doHeartBeat
void Raft::doHeartBeat() {
std::lock_guard<mutex> g(m_mtx);
if (m_status == Leader) {
auto appendNums = std::make_shared<int>(1); //正确返回的节点的数量
//对Follower(除了自己外的所有节点发送AE)
for (int i = 0; i < m_peers.size(); i++) {
if(i == m_me){ //不对自己发送AE
continue;
}
//日志压缩加入后要判断是发送快照还是发送AE
if (m_nextIndex[i] <= m_lastSnapshotIncludeIndex) {
//应该发送的日志已经被压缩成快照,必须发送快照了
std::thread t(&Raft::leaderSendSnapShot, this, i);
t.detach();
continue;
}
//发送心跳,构造发送值
int preLogIndex = -1;
int PrevLogTerm = -1;
getPrevLogInfo(i, &preLogIndex, &PrevLogTerm); //获取本次发送的一系列日志的上一条日志的信息,以判断是否匹配
std::shared_ptr<mprrpc::AppendEntriesArgs> appendEntriesArgs = std::make_shared<mprrpc::AppendEntriesArgs>();
appendEntriesArgs->set_term(m_currentTerm);
appendEntriesArgs->set_leaderid(m_me);
appendEntriesArgs->set_prevlogindex(preLogIndex);
appendEntriesArgs->set_prevlogterm(PrevLogTerm);
appendEntriesArgs->clear_entries();
appendEntriesArgs->set_leadercommit(m_commitIndex);
// 作用是携带上prelogIndex的下一条日志及其之后的所有日志
//leader对每个节点发送的日志长短不一,但是都保证从prevIndex发送直到最后
if (preLogIndex != m_lastSnapshotIncludeIndex) {
for (int j = getSlicesIndexFromLogIndex(preLogIndex) + 1; j < m_logs.size(); ++j) {
mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();
*sendEntryPtr = m_logs[j];
}
} else {
for (const auto& item: m_logs) {
mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();
*sendEntryPtr = item;
}
}
int lastLogIndex = getLastLogIndex();
//初始化返回值
const std::shared_ptr<mprrpc::AppendEntriesReply> appendEntriesReply = std::make_shared<mprrpc::AppendEntriesReply>();
std::thread t(&Raft::sendAppendEntries, this, i, appendEntriesArgs, appendEntriesReply,
appendNums); // 创建新线程并执行b函数,并传递参数
t.detach();
}
m_lastResetHearBeatTime = now(); //leader发送心跳,重置心跳时间,
}
}
sendAppendEntries
AppendEntries
5.日志寻找匹配加速
6.其它
待续
代码如下(示例):