Raft分布式存储

news2025/1/21 6:38:36

文章目录

  • 前言
  • 一、项目大纲
  • 二、Raft模块
    • 1.Raft介绍
    • 2.大致内容
      • Leader与选举
      • 日志同步、心跳
      • raft日志的两个特点
    • 3.主要流程
      • 1. raft类的定义
      • 2.启动初始化
      • 3.竞选leader
        • electionTimeOutTicker:
        • doElection
        • sendRequestVote
        • RequestVote
      • 4.日志复制、心跳
        • leaderHearBeatTicker
        • doHeartBeat
        • sendAppendEntries
        • AppendEntries
      • 5.日志寻找匹配加速
      • 6.其它
  • 待续


前言

构建一种基于Raft一致性算法的分布式键值存储数据库,以确保数据的一致性、可用性和分区容错性

一、项目大纲

在这里插入图片描述

  1. raft节点:raft算法实现的核心层,负责与其他机器的raft节点沟通,达到 分布式共识 的目的。
  2. raftServer:负责raft节点与k-v数据库中间的协调服务;负责持久化k-v数据库的数据(可选)。
  3. 上层状态机(k-v数据库):负责数据存储。
  4. 持久层:负责相关数据的落盘,对于raft节点,根据共识算法要求,必须对一些关键数据进行落盘处理,以保证节点宕机后重启程序可以恢复关键数据;对于raftServer,可能会有一些k-v数据库的东西需要落盘持久化。
  5. RPC通信:在 领导者选举、日志复制、数据查询、心跳等多个Raft重要过程中提供多节点快速简单的通信能力。

二、Raft模块

1.Raft介绍

参考
文章: 两万字长文解析raft算法原理
视频: 解析分布式共识算法之Raft算法

本项目用Raft解决的问题

  • 一致性: 通过Raft算法确保数据的强一致性,使得系统在正常和异常情况下都能够提供一致的数据视图。
  • 可用性: 通过分布式节点的复制和自动故障转移,实现高可用性,即使在部分节点故障的情况下,系统依然能够提供服务。
  • 分区容错: 处理网络分区的情况,确保系统在分区恢复后能够自动合并数据一致性。

2.大致内容

详细的看上面参考网站

Leader与选举

Raft是一个强Leader 模型,可以粗暴理解成Leader负责统领follower,如果Leader出现故障,那么整个集群都会对外停止服务,直到选举出下一个Leader。

  1. 节点之间通过网络通信,其他节点(follower)如何知道leader出现故障?
    leader会定时向集群中剩下的节点(follower)发送AppendEntry(作为心跳,hearbeat )以通知自己仍然存活。
    可以推知,如果follower在一段时间内没有接收leader发送的AppendEntry,那么follower就会认为当前的leader 出现故障,从而发起选举。
    判断心跳超时,可以用一个定时器和一个标志位来实现,每到定时时间检查这期间有无AppendEntry 即可。
  2. AppendEntry作用
    • 心跳
    • 携带日志entry及其辅助信息,以控制日志的同步和日志向状态机提交
    • 通告leader的index和term等关键信息以便follower对比确认follower自己或者leader是否过期
  3. follower知道leader出现故障后如何选举出leader?
    follower认为leader故障后只能通过:term增加,变成candidate,向其他节点发起RequestVoteRPC申请其他follower的选票,过一段时间之后会发生如下情况:
    • 赢得选举,马上成为leader (此时term已经增加了)
    • 发现有符合要求的leader,自己马上变成follower 了,这个符合要求包括:leader的term≥自己的term
    • 一轮选举结束,无人变成leader,那么循环这个过程
  4. 为了防止在同一时间有太多的follower转变为candidate导致一直无法选出leader, Raft 采用了随机选举超时(randomized election timeouts)的机制, 每一个candidate 在发起选举后,都会随机化一个新的选举超时时间。
  5. 符合什么条件的节点可以成为leader?
    也可以称为“选举限制”,有限制的目的是为了保证选举出的 leader 一定包含了整个集群中目前已 committed 的所有日志。
    当 candidate 发送 RequestVoteRPC 时,会带上最后一个 entry 的信息。 所有的节点收到该请求后,都会比对自己的日志,如果发现自己的日志更新一些,则会拒绝投票给该 candidate。
    需要比较两个东西:最新日志entry的term和对应的index。index为日志entry在整个日志的索引。
if 两个节点最新日志entry的term不同
	term大的日志更新
else
	最新日志entry的index大的更新
end

这样的限制可以保证:成为leader的节点,其日志已经是多数节点中最完备的,即包含了整个集群的所有 committed entries。

日志同步、心跳

在RPC中 日志同步 和 心跳 是放在一个RPC函数AppendEntryRPC中来实现的,原因为:

  • 心跳RPC 可以看成是没有携带日志的特殊的日志同步RPC。
  • 对于一个follower,如果leader认为其日志已经和自己匹配了,那么在AppendEntryRPC中不用携带日志(再携带日志属于无效信息了,但其他信息依然要携带),反之如果follower的日志只有部分匹配,那么就需要在AppendEntryRPC中携带对应的日志。
  1. 为什么不直接让follower拷贝leader的日志 或者 leader发送全部的日志给follower?
    leader发送日志的目的是让follower同步自己的日志,当然可以让leader发送自己全部的日志给follower,然后follower接收后就覆盖自己原有的日志,但是这样就会携带大量的无效的日志(因为这些日志follower本身就有)。
    因此raft的方式是:先找到日志不匹配的那个点,然后只同步那个点之后的日志。
  2. leader如何知道follower的日志是否与自己完全匹配?
    AppendEntryRPC中携带上 entry的index和对应的term(日志的term),可以通过比较最后一个日志的index和term来得出某个follower日志是否匹配。
  3. 如果发现不匹配,那么如何知道哪部分日志是匹配的,哪部分日志是不匹配的呢?
    leader每次发送AppendEntryRPC后,follower都会根据其entry的index和对应的term来判断某一个日志是否匹配。
    在leader刚当选,会从最后一个日志开始判断是否匹配,如果匹配,那么后续发送AppendEntryRPC就不需要携带日志entry了。
    如果不匹配,那么下一次就发送 倒数第2个 日志entry的index和其对应的term来判断匹配,
    如果还不匹配,那么依旧重复这个过程,直到遇到一个匹配的日志。

raft日志的两个特点

  1. 两个节点的日志中,有两个 entry 拥有相同的 index 和 term,那么它们一定记录了相同的内容/操作,即两个日志匹配
  2. 两个节点的日志中,有两个 entry 拥有相同的 index 和 term,那么它们前面的日志entry也相同
  3. 如何保证?
  • 保证第一点:仅有 leader 可以生成 entry,保证一致性
  • 保证第二点:leader 在通过 AppendEntriesRPC 和 follower 通讯时,除了带上自己的term等信息外,还会带上entry的index和对应的term等信息,follower在接收到后通过对比就可以知道自己与leader的日志是否匹配,不匹配则拒绝请求。
    leader发现follower拒绝后就知道entry不匹配,那么下一次就会尝试匹配前一个entry,直到遇到一个entry匹配,并将不匹配的entry给删除(覆盖)。

3.主要流程

1. raft类的定义

class Raft :
{
private:
    std::mutex m_mtx;
    std::vector<std::shared_ptr< RaftRpc >> m_peers; //需要与其他raft节点通信,这里保存与其他结点通信的rpc入口
    std::shared_ptr<Persister> m_persister;   //持久化层,负责raft数据的持久化
    int m_me;             //raft是以集群启动,这个用来标识自己的的编号
    int m_currentTerm;    //记录当前的term
    int m_votedFor;       //记录当前term给谁投票过
    std::vector<mprrpc:: LogEntry> m_logs;  日志条目数组,包含了状态机要执行的指令集,以及收到领导时的任期号
    // 这两个状态所有结点都在维护,易失
    int m_commitIndex;
    int m_lastApplied; // 已经汇报给状态机(上层应用)的log 的index

    // 这两个状态是由leader来维护,易失 ,这两个部分在内容补充的部分也会再讲解
    // 这两个状态的下标1开始,因为通常commitIndex和lastApplied从0开始,应该是一个无效的index,因此下标从1开始
    std::vector<int> m_nextIndex; //领导者使用 m_nextIndex 来确定需要发送给追随者的下一批日志条目。
    std::vector<int> m_matchIndex; //追随者使用 m_matchIndex 来记录已经成功复制的日志条目,并向领导者发送确认信息。
    enum Status
    {
        Follower,
        Candidate,
        Leader
    };
    // 保存当前身份
    Status m_status;

    std::shared_ptr<LockQueue<ApplyMsg>> applyChan;     // client从这里取日志,client与raft通信的接口
    // ApplyMsgQueue chan ApplyMsg // raft内部使用的chan,applyChan是用于和服务层交互,最后好像没用上
	
    // 选举超时
    std::chrono::_V2::system_clock::time_point m_lastResetElectionTime;
    // 心跳超时,用于leader
    std::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime;

    // 用于传入快照点
    // 储存了快照中的最后一个日志的Index和Term
    int m_lastSnapshotIncludeIndex;
    int m_lastSnapshotIncludeTerm;

public:
    
    void AppendEntries1(const mprrpc::AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply); //日志同步 + 心跳 rpc ,重点关注
    void applierTicker();     //定期向状态机写入日志,非重点函数
    
    bool CondInstallSnapshot(int lastIncludedTerm, int lastIncludedIndex, std::string snapshot);    //快照相关,非重点
    void doElection();    //发起选举
    void doHeartBeat();    //leader定时发起心跳
    // 每隔一段时间检查睡眠时间内有没有重置定时器,没有则说明超时了
// 如果有则设置合适睡眠时间:睡眠到重置时间+超时时间
    void electionTimeOutTicker();   //监控是否该发起选举了
    std::vector<ApplyMsg> getApplyLogs();
    int getNewCommandIndex();
    void getPrevLogInfo(int server, int *preIndex, int *preTerm);
    void GetState(int *term, bool *isLeader);  //看当前节点是否是leader
    void InstallSnapshot( const mprrpc::InstallSnapshotRequest *args, mprrpc::InstallSnapshotResponse *reply);  
    void leaderHearBeatTicker(); //检查是否需要发起心跳(leader)
    void leaderSendSnapShot(int server);  
    void leaderUpdateCommitIndex();  //leader更新commitIndex
    bool matchLog(int logIndex, int logTerm);  //对应Index的日志是否匹配,只需要Index和Term就可以知道是否匹配
    void persist();   //持久化
    void RequestVote(const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply);    //变成candidate之后需要让其他结点给自己投票
    bool UpToDate(int index, int term);   //判断当前节点是否含有最新的日志
    int getLastLogIndex();
    void getLastLogIndexAndTerm(int *lastLogIndex, int *lastLogTerm);
    int getLogTermFromLogIndex(int logIndex);
    int GetRaftStateSize();
    int getSlicesIndexFromLogIndex(int logIndex);   //设计快照之后logIndex不能与在日志中的数组下标相等了,根据logIndex找到其在日志数组中的位置


    bool sendRequestVote(int server , std::shared_ptr<mprrpc::RequestVoteArgs> args ,  std::shared_ptr<mprrpc::RequestVoteReply> reply,   std::shared_ptr<int> votedNum) ; // 请求其他结点的投票
    bool sendAppendEntries(int server ,std::shared_ptr<mprrpc::AppendEntriesArgs> args , std::shared_ptr<mprrpc::AppendEntriesReply> reply , std::shared_ptr<int> appendNums ) ;  //Leader发送心跳后,对心跳的回复进行对应的处理


    //rf.applyChan <- msg //不拿锁执行  可以单独创建一个线程执行,但是为了同意使用std:thread ,避免使用pthread_create,因此专门写一个函数来执行
    void pushMsgToKvServer(ApplyMsg msg);  //给上层的kvserver层发送消息
    void readPersist(std::string data);    
    std::string persistData();
    void Start(Op command,int* newLogIndex,int* newLogTerm,bool* isLeader ) ;   // 发布发来一个新日志
// 即kv-server主动发起,请求raft(持久层)保存snapshot里面的数据,index是用来表示snapshot快照执行到了哪条命令
    void Snapshot(int index , std::string snapshot );

public:
    void init(std::vector<std::shared_ptr< RaftRpc >> peers,int me,std::shared_ptr<Persister> persister,std::shared_ptr<LockQueue<ApplyMsg>> applyCh);		//初始化

关键函数:

  1. Raft的主要流程:
  • 领导选举(sendRequestVoteRequestVote
  • 日志同步、心跳(sendAppendEntries AppendEntries
  1. 定时器的维护:
  • Raft向状态机定时写入(applierTicker
  • 心跳维护定时器(leaderHearBeatTicker
  • 选举超时定时器(electionTimeOutTicker
  1. 持久化相关:
  • 哪些内容需要持久化,什么时候需要持久化(persist)

2.启动初始化

void Raft::init(std::vector<std::shared_ptr<RaftRpc>> peers, int me, std::shared_ptr<Persister> persister, std::shared_ptr<LockQueue<ApplyMsg>> applyCh) {
    m_peers = peers;     //与其他结点沟通的rpc类
    m_persister = persister;   //持久化类
    m_me = me;    //标记自己,毕竟不能给自己发送rpc吧

    m_mtx.lock();

    //applier
    this->applyChan = applyCh;   //与kv-server沟通
//    rf.ApplyMsgQueue = make(chan ApplyMsg)
    m_currentTerm = 0;   //初始化term为0
    m_status = Follower;   //初始化身份为follower
    m_commitIndex = 0;  
    m_lastApplied = 0;
    m_logs.clear();
    for (int i =0;i<m_peers.size();i++){
        m_matchIndex.push_back(0);
        m_nextIndex.push_back(0);
    }
    m_votedFor = -1;    //当前term没有给其他人投过票就用-1表示

    m_lastSnapshotIncludeIndex = 0;
    m_lastSnapshotIncludeTerm = 0;
    m_lastResetElectionTime = now();
    m_lastResetHearBeatTime = now();

    // initialize from state persisted before a crash
    readPersist(m_persister->ReadRaftState());
    if(m_lastSnapshotIncludeIndex > 0){
        m_lastApplied = m_lastSnapshotIncludeIndex;
        //rf.commitIndex = rf.lastSnapshotIncludeIndex 崩溃恢复不能读取commitIndex
    }

    m_mtx.unlock();
    // start ticker  开始三个定时器
    std::thread t(&Raft::leaderHearBeatTicker, this);
    t.detach();

    std::thread t2(&Raft::electionTimeOutTicker, this);
    t2.detach();

    std::thread t3(&Raft::applierTicker, this);
    t3.detach();
}

从上面可以看到一共产生了三个定时器,分别维护:选举、日志同步和心跳、raft节点与kv-server的联系。相互之间是比较隔离的

3.竞选leader

在Raft算法中,每个节点(无论是追随者(follower)还是候选人(candidate))都有一个选举定时器。如果追随者在一定的时间内没有收到任何来自领导者或候选人的消息,它会认为当前没有有效的领导者,然后启动选举定时器。一旦定时器到期,追随者会转换为候选人状态,并开始新一轮的领导者选举。
在这里插入图片描述

  • electionTimeOutTicker:负责查看是否该发起选举,如果该发起选举就执行doElection发起选举。
  • doElection:实际发起选举,构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。
  • sendRequestVote:负责发送选举中的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
  • RequestVote:接收别人发来的选举请求,主要检验是否要给对方投票。
electionTimeOutTicker:

选举定时器,选举超时由electionTimeOutTicker 维护。

void Raft::electionTimeOutTicker() {
    // Check if a Leader election should be started.
    while (true) {
        m_mtx.lock();
        auto nowTime = now(); //睡眠前记录时间
        auto suitableSleepTime = getRandomizedElectionTimeout() + m_lastResetElectionTime - nowTime;
        m_mtx.unlock();
        if (suitableSleepTime.count() > 1) {
            std::this_thread::sleep_for(suitableSleepTime);
        }

        if ((m_lastResetElectionTime - nowTime).count() > 0) {  //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
            continue;
        }
        doElection();
    }
}

在死循环中,
首先计算距离上次重置选举计时器的时间m_lastResetElectionTime - nowTime加上随机化的选举超时时间getRandomizedElectionTimeout
计算得到距离下一次超时应该睡眠的时间suitableSleepTime,然后线程根据这个时间决定是否睡眠。
若超时时间未到,线程进入睡眠状态,若在此期间选举计时器被重置,则继续循环。
若超时时间已到,调用doElection() 函数启动领导者选举过程。

随机化的选举超时时间是为了避免多个追随者几乎同时成为候选人,导致选举失败

doElection

启动领导者选举过程

void Raft::doElection() {
    lock_guard<mutex> g(m_mtx); //c11新特性,使用raii避免死锁

    if (m_status != Leader) {
        DPrintf("[       ticker-func-rf(%d)              ]  选举定时器到期且不是leader,开始选举 \n", m_me);
        //当选举的时候定时器超时就必须重新选举,不然没有选票就会一直卡住
        //重竞选超时,term也会增加的
        m_status = Candidate;
        ///开始新一轮的选举
        m_currentTerm += 1;  //无论是刚开始竞选,还是超时重新竞选,term都要增加
        m_votedFor = m_me; //即是自己给自己投票,也避免candidate给同辈的candidate投
        persist();   
        std::shared_ptr<int> votedNum = std::make_shared<int>(1); // 使用 make_shared 函数初始化 
        //	重新设置定时器
        m_lastResetElectionTime = now();
        //	发布RequestVote RPC
        for (int i = 0; i < m_peers.size(); i++) {
            if (i == m_me) {
                continue;
            }
            int lastLogIndex = -1, lastLogTerm = -1;
            getLastLogIndexAndTerm(&lastLogIndex, &lastLogTerm);//获取最后一个log的term和下标,以添加到RPC的发送

            //初始化发送参数
            std::shared_ptr<mprrpc::RequestVoteArgs> requestVoteArgs = std::make_shared<mprrpc::RequestVoteArgs>();
            requestVoteArgs->set_term(m_currentTerm);
            requestVoteArgs->set_candidateid(m_me);
            requestVoteArgs->set_lastlogindex(lastLogIndex);
            requestVoteArgs->set_lastlogterm(lastLogTerm);
            std::shared_ptr<mprrpc::RequestVoteReply> requestVoteReply = std::make_shared<mprrpc::RequestVoteReply>();

            //使用匿名函数执行避免其拿到锁

            std::thread t(&Raft::sendRequestVote, this, i, requestVoteArgs, requestVoteReply,
                          votedNum); // 创建新线程并执行函数,并传递参数
            t.detach();
        }
    }
}
sendRequestVote

根据调用RequestVote得到的reply响应结果,对发起投票的候选者状态进行更新

bool Raft::sendRequestVote(int server, std::shared_ptr<mprrpc::RequestVoteArgs> args, std::shared_ptr<mprrpc::RequestVoteReply> reply,
                           std::shared_ptr<int> votedNum) {


    bool ok = m_peers[server]->RequestVote(args.get(),reply.get());

    if (!ok) {
        return ok;//rpc通信失败就立即返回,避免资源消耗
    }

    lock_guard<mutex> lg(m_mtx);
    if(reply->term() > m_currentTerm){
        //回复的term比自己大,说明自己落后了,那么就更新自己的状态并且退出
        m_status = Follower; //三变:身份,term,和投票
        m_currentTerm = reply->term();
        m_votedFor = -1;  //term更新了,那么这个term自己肯定没投过票,为-1
        persist(); //持久化
        return true;
    } else if ( reply->term()   < m_currentTerm   ) {
        //回复的term比自己的term小,不应该出现这种情况
        return true;
    }

    if(!reply->votegranted()){  //这个节点因为某些原因没给自己投票,没啥好说的,结束本函数
        return true;
    }
    
  //给自己投票了
    *votedNum = *votedNum + 1; //voteNum多一个
    if (*votedNum >=  m_peers.size()/2+1) {
        //变成leader
        *votedNum = 0;   //重置voteDNum,如果不重置,那么就会变成leader很多次,是没有必要的,甚至是错误的!!!

        //	第一次变成leader,初始化状态和nextIndex、matchIndex
        m_status = Leader;
        int lastLogIndex =   getLastLogIndex();
        for (int i = 0; i <m_nextIndex.size()  ; i++) {
            m_nextIndex[i] = lastLogIndex + 1 ;//有效下标从1开始,因此要+1
            m_matchIndex[i] = 0;               //每换一个领导都是从0开始,见论文的fig2
        }
        std::thread t(&Raft::doHeartBeat, this); //马上向其他节点宣告自己就是leader
        t.detach();

        persist();  
    }
    return true;
}
RequestVote

得到投票请求后,节点根据传递来的信息进行判断是否对其投票,构造reply返回值

void Raft::RequestVote( const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply) {
    lock_guard<mutex> lg(m_mtx);

    Defer ec1([this]() -> void { //应该先持久化,再撤销lock,因此这个写在lock后面
        this->persist();
    });
    //对args的term的三种情况分别进行处理,大于小于等于自己的term都是不同的处理
    //reason: 出现网络分区,该竞选者已经OutOfDate(过时)
    if (args->term() < m_currentTerm) {
        reply->set_term(m_currentTerm);
        reply->set_votestate(Expire);
        reply->set_votegranted(false);
        return;
    }
    //论文fig2:右下角,如果任何时候rpc请求或者响应的term大于自己的term,更新term,并变成follower
    if (args->term() > m_currentTerm) {

        m_status = Follower;
        m_currentTerm = args->term();
        m_votedFor = -1;

        //	重置定时器:收到leader的ae,开始选举,透出票
        //这时候更新了term之后,votedFor也要置为-1
    }

    //	现在节点任期都是相同的(任期小的也已经更新到新的args的term了)
    //	要检查log的term和index是不是匹配的了
    int lastLogTerm = getLastLogIndex();
    //只有没投票,且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票
    if (!UpToDate(args->lastlogindex(), args->lastlogterm())) {

        //日志太旧了
        reply->set_term(m_currentTerm);
        reply->set_votestate(Voted);
        reply->set_votegranted(false);
        return;
    }
   
//    当因为网络质量不好导致的请求丢失重发就有可能!!!!
//    因此需要避免重复投票
    if (m_votedFor != -1 && m_votedFor != args->candidateid()) {
        reply->set_term(m_currentTerm);
        reply->set_votestate(Voted);
        reply->set_votegranted(false);
        return;
    } else {
        //同意投票
        m_votedFor = args->candidateid();
        m_lastResetElectionTime = now();//认为必须要在投出票的时候才重置定时器,
        reply->set_term(m_currentTerm);
        reply->set_votestate(Normal);
        reply->set_votegranted(true);
        return;
    }
}

4.日志复制、心跳

在这里插入图片描述

  • leaderHearBeatTicker:负责查看是否该发送心跳了,如果该发起就执行doHeartBeat
  • doHeartBeat:实际发送心跳,判断到底是构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。
  • sendAppendEntries:负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
  • leaderSendSnapShot:负责发送快照的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
  • AppendEntries:接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。
  • InstallSnapshot:接收leader发来的快照请求,同步快照到本机。
leaderHearBeatTicker
void Raft::leaderHearBeatTicker() {
    while (true) {

        auto nowTime = now();
        m_mtx.lock();
        auto suitableSleepTime = std::chrono::milliseconds(HeartBeatTimeout) + m_lastResetHearBeatTime - nowTime;
        m_mtx.unlock();
        if (suitableSleepTime.count() < 1) {
            suitableSleepTime = std::chrono::milliseconds(1);
        }
        std::this_thread::sleep_for(suitableSleepTime);
        if ((m_lastResetHearBeatTime - nowTime).count() > 0) { //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
            continue;
        }
        doHeartBeat();
    }
}

其基本逻辑和选举定时器electionTimeOutTicker一模一样
不一样之处在于设置的休眠时间不同,这里是根据HeartBeatTimeout来设置,固定时间。
electionTimeOutTicker中是根据getRandomizedElectionTimeout() 设置,随机一个时间。

doHeartBeat
void Raft::doHeartBeat() {
    std::lock_guard<mutex> g(m_mtx);
    if (m_status == Leader) {
        auto appendNums = std::make_shared<int>(1); //正确返回的节点的数量
        //对Follower(除了自己外的所有节点发送AE)
        for (int i = 0; i < m_peers.size(); i++) {
            if(i == m_me){ //不对自己发送AE
                continue;
            }
            //日志压缩加入后要判断是发送快照还是发送AE
            if (m_nextIndex[i] <= m_lastSnapshotIncludeIndex) {
				//应该发送的日志已经被压缩成快照,必须发送快照了
                std::thread t(&Raft::leaderSendSnapShot, this, i); 
                t.detach();
                continue;
            }
            //发送心跳,构造发送值
            int preLogIndex = -1;
            int PrevLogTerm = -1;
            getPrevLogInfo(i, &preLogIndex, &PrevLogTerm);  //获取本次发送的一系列日志的上一条日志的信息,以判断是否匹配
            std::shared_ptr<mprrpc::AppendEntriesArgs> appendEntriesArgs = std::make_shared<mprrpc::AppendEntriesArgs>();
            appendEntriesArgs->set_term(m_currentTerm);
            appendEntriesArgs->set_leaderid(m_me);
            appendEntriesArgs->set_prevlogindex(preLogIndex);
            appendEntriesArgs->set_prevlogterm(PrevLogTerm);
            appendEntriesArgs->clear_entries();
            appendEntriesArgs->set_leadercommit(m_commitIndex);
            // 作用是携带上prelogIndex的下一条日志及其之后的所有日志
            //leader对每个节点发送的日志长短不一,但是都保证从prevIndex发送直到最后
            if (preLogIndex != m_lastSnapshotIncludeIndex) {
                for (int j = getSlicesIndexFromLogIndex(preLogIndex) + 1; j < m_logs.size(); ++j) {
                    mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();
                    *sendEntryPtr = m_logs[j];  
                }
            } else {
                for (const auto& item: m_logs) {
                    mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();
                    *sendEntryPtr = item;  
                }
            }
            int lastLogIndex = getLastLogIndex();
            //初始化返回值
            const std::shared_ptr<mprrpc::AppendEntriesReply> appendEntriesReply = std::make_shared<mprrpc::AppendEntriesReply>();

            std::thread t(&Raft::sendAppendEntries, this, i, appendEntriesArgs, appendEntriesReply,
                          appendNums); // 创建新线程并执行b函数,并传递参数
            t.detach();
        }
        m_lastResetHearBeatTime = now(); //leader发送心跳,重置心跳时间,
    }
}

sendAppendEntries

AppendEntries

5.日志寻找匹配加速

6.其它

待续

代码如下(示例):


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2033679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华媒舍:6种明星代言推广策略,轻松吸引消费者目光!

1. 背书代言 背书代言是最常见的明星代言策略之一&#xff0c;也是最直接有效的一种方式。背书代言通过让明星以自己的名义、形象和声誉来推荐特定产品或服务&#xff0c;以吸引消费者的关注和购买意愿。这种策略依托于明星在社交媒体、电视广告等渠道的影响力&#xff0c;可以…

【npm】如何将自己的插件发布到npm上

前言 简单说下 npm 是什么&#xff1a; npm 是一个 node 模块管理工具&#xff0c;也是全球最大的共享源。 npm 工具与 nodejs 配套发布&#xff0c;便利开发人员共享代码。npm 主要包括 npm 官方网站、CLI&#xff08;控制台命令行工具&#xff09;、和 registry&#xff08;…

【可能是全网最丝滑的LangChain教程】二十、LangChain进阶之Chains

我们笑着说再见&#xff0c;却深知再见遥遥无期。 01 Chain介绍 在LangChain 中&#xff0c;“Chain” 是指一系列可以串联起来执行特定任务的组件或模型。这些链条可以包括预处理、模型调用、后处理等步骤&#xff0c;它们共同工作以完成一个复杂的语言处理任务。 咱说点人话…

3:svgicon的使用的整体步骤

1&#xff1a;在src下创建icons文件放入svg文件的icon&#xff0c;并切创建index.js, 来处理icon 主要创建&#xff1a;1&#xff1a;src/icons/svg/svg格式icon 2&#xff1a;src/icons/index.js 2&#xff1a;src/icons/index.js 写入代码如下&#xff08;注释比较明确&#…

【数据结构】六、图:5.图的最小生成树MST(普里姆(Prim)算法、克鲁斯卡尔(Kruskal)算法、Boruvka 算法)

2.最小生成树MST 文章目录 2.最小生成树MST2.1 普里姆(Prim)算法算法思路 2.2 克鲁斯卡尔(Kruskal)算法算法思路 2.3 Boruvka 算法2.3.1基本原理2.3.2基本过程 一个图可以有多个生成树&#xff0c;我们定义无向连通图的 最小生成树&#xff08;Minimum Spanning Tree&#xff…

PHP餐厅点餐系统小程序源码

&#x1f37d;️【餐厅点餐新纪元&#xff0c;点餐系统让用餐更便捷&#xff01;】&#x1f4f1; &#x1f50d; 一键浏览&#xff0c;菜单尽在掌握 &#x1f4f1; 走进餐厅&#xff0c;无需再担心找不到服务员或菜单被抢光&#xff01;餐厅点餐系统让你轻松扫描桌上的二维码…

机器学习笔记:门控循环单元的建立

目录 介绍 结构 模型原理 重置门与更新门 候选隐状态 输出隐状态 模型实现 引入数据 初始化参数 定义模型 训练与预测 简洁实现GRU 思考 介绍 门控循环单元&#xff08;Gated Recurrent Unit&#xff0c;简称GRU&#xff09;是循环神经网络一种较为复杂的构成形式…

轻量级的灰度配置平台|得物技术

一、前言 随着近几年得物的业务和技术的快速发展&#xff0c;我们不管是在面向C端场景还是B端供应链&#xff1b;业务版本的迭代更新&#xff0c;技术架构的不断升级&#xff1b;不管是业务稳定性还是架构稳定性&#xff0c;业务灰度的能力对我们来说都是一项重要的技术保障&a…

x264 编码器 PSNR算法源码分析

PSNR PSNR(Peak Signal-to-Noise Ratio,峰值信噪比)是一种常用的图像质量评价指标,用于衡量图像或视频的清晰度和质量。PSNR是基于信号的最大可能功率与影响信号的噪声功率之间的比率。在图像处理领域,PSNR通常用来评估图像压缩或图像增强算法的效果。 PSNR的计算公式是…

思科CCNP最新考证流程

CCNP CCNP全称思科网络高级工程师认证&#xff08;Cisco Certified Network Professional&#xff09;&#xff0c;是Cisco思科认证中的中级认证。获得ccnp证书表示着资深网络工程师具有对100个节点到超过500个节点的融合局域网和广域网进行安装、配置和故障排除的能力。能够管…

LeetCode257 二叉树的所有路径

前言 题目&#xff1a; 257. 二叉树的所有路径 文档&#xff1a; 代码随想录——二叉树的所有路径 编程语言&#xff1a; C 解题状态&#xff1a; 没思路&#xff0c;简单题强度好高… 思路 本题利用了递归加回溯的思路。 这道题目要求从根节点到叶子的路径&#xff0c;所以需…

一个Indie Hacker的微SaaS技术栈

如今&#xff0c;可用的技术非常多&#xff0c;我们每个月都会看到各种新的 JS 框架发布&#xff0c;有时&#xff0c;如果你一开始没有选择正确的技术堆栈&#xff0c;以后扩展起来就会很困难。因此&#xff0c;在今天的文章中&#xff0c;我将与你分享我用于开发微型 SaaS 的…

vue使用富文本编辑器+自由伸缩图片

首先要下载依赖&#xff0c;下方是本人使用的package.json&#xff0c;下载完依赖如果有启动项目失败的情况&#xff0c;建议将依赖版本降低或使用和下方一样的版本 package.json代码 {"name": "l","version": "0.1.0","privat…

Linux中线程常用接口(创建,等待,退出,取消)

pthread_create #include <pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); Compile and link with -pthread. 编译时应注意。 #include<iostream> #in…

使用Playwright解决reCAPTCHA的分步指南

您是否在您的网络爬虫中遇到过CAPTCHA&#xff1f;许多网站使用CAPTCHA系统&#xff08;最常见的是reCAPTCHA&#xff09;来防止自动化访问。但是&#xff0c;本文将指导您使用Playwright&#xff08;一种强大的浏览器自动化工具&#xff09;和CapSolver&#xff08;一个设计用…

# 利刃出鞘_Tomcat 核心原理解析(二)

利刃出鞘_Tomcat 核心原理解析&#xff08;二&#xff09; 一、 Tomcat专题 - Tomcat架构 - HTTP工作流程 1、Http 工作原理 HTTP 协议&#xff1a;是浏览器与服务器之间的数据传送协议。作为应用层协议&#xff0c;HTTP 是基于 TCP/IP 协议来传递数据的&#xff08;HTML文件…

AI 的偏见来自数据集,而数据集的偏见来自人类 | Open AGI Forum

作者 | Annie Xu 采访、责编 | Eric Wang 出品丨GOSIM 开源创新汇 Richard Vencu&#xff0c;现任 Stability AI 机器学习运维负责人、LAION 工程负责人兼创始人&#xff0c;他的人生可谓十分精彩。 已过知天命之年的他是个中国通&#xff0c;极其热爱中国的武术、茶叶、诱人…

BugKu CTF Misc:被勒索了 disordered_zip simple MQTT 请攻击这个压缩包

前言 BugKu是一个由乌云知识库&#xff08;wooyun.org&#xff09;推出的在线漏洞靶场。乌云知识库是一个致力于收集、整理和分享互联网安全漏洞信息的社区平台。 BugKu旨在提供一个实践和学习网络安全的平台&#xff0c;供安全爱好者和渗透测试人员进行挑战和练习。它包含了…

03. 剑指offer刷题-二叉树篇(第二部分)

class Solution { public:TreeNode* Convert(TreeNode* pRootOfTree) {if(pRootOfTree nullptr) return nullptr;vector<TreeNode*> cur traversal(pRootOfTree);return cur[0];}// 这道题需要用到「分解问题」的思维&#xff0c;想把整棵链表&#xff0c;可以先把左右…

[upload]-做题笔记

项目下载地址&#xff1a;https://github.com/c0ny1/upload-labs 第一关 查看源代码&#xff0c;可以看到是前端js限制上传jpg,png,gif后缀文件 function checkFile() {var file document.getElementsByName(upload_file)[0].value;if (file null || file "") …