从0-1实现简易Raft分布式共识算法

news2025/1/15 16:50:26

一、Raft前置简介

Raft目前是最著名的分布式共识性算法,被广泛的应用在各种分布式框架、组件中,如Redis、RocketMq、Kafka、Nacos(CP)等

根据Raft论文,可将Raft拆分为如下4个功能模块

  • 领导者选举
  • 日志同步、心跳
  • 持久化
  • 日志压缩,快照(本文未实现)

这4个模块彼此并不完全独立,如日志的同步情况左右着领导者选举,快照也影响着日志同步等等;为了前后的递进性,对于一些功能的实现,可能会出现改动和优化,比如日志同步实现后,在数据持久化部分又会对同步做一些优化,提高主、从节点日志冲突解决的性能。

这里就不再过多的介绍了,看本文之前请先简单了解一下Raft算法,提供如下资料:

  • Raft算法在线动画演示
  • 图解分布式共识算法Paxos协议
  • 浅谈分布式一致性:Raft 与 SOFAJRaft
  • 深入剖析共识性算法 Raft
  • 深入解读Raft算法与etcd工程实现
  • Raft一致性算法论文-译文
  • SOFA-JRaft:蚂蚁金服的Raft算法实现库(JAVA版)

本文实现不完全和Raft论文一致,做了不少改动,核心思想不变,请悉知!!

二、功能流程简介

你看完上述资料,应该对Raft有一个基本了解了,本文我们实现了一个Raft算法下的简易版的KV存储,我将它拆分成一下几个角色:

RPC模块:复制各节点间的信息传递,如心跳、日志、选举等等

节点模块:节点有三种状态leader、follow、candidate,每种状态下所要做的事是不一样的

状态机:负责节点状态的变更,日志持久化一致性处理,投票一致性处理

定时任务:leader需要定时发送心跳,follow需要定时检测leader是否存活等

日志模块:日志需要持久化在本地文件,还需要给其他节点同步

以上几个角色相互配合,实现以下几个主要功能流程:

1.选举流程

实现细节下面深究,这里暂不过多介绍,简单了解一下大致流程,大体就是:

  1. Follow节点发现Leader节点挂了,则升级为Candidate节点发起投票
  2. 其他Follow节点收到投票请求后,根据条件判断是否投票给它,True或者False
  3. Candidate一旦收到的投票通过请求过半,则升级为Leader
  4. 升级Leader后发送心跳,阻止其他Follow变成Candidate

在这里插入图片描述

2.心跳流程

注意:这里和原文有区别,我将心跳和日志做了拆分,不再耦合了,因为我觉得在没有客户端请求的情况下,记录这些心跳日志没有意义,在没有数据日志或者说数据日志水平都是一样的情况下,谁做Leader我觉得都OK

实现细节下面深究,这里暂不过多介绍,简单了解一下大致流程,大体就是:

  1. Leader会定时发送心跳请求给Follow,告诉它我还活着,防止它篡位
  2. Follow收到心跳后返回一个心跳响应
  3. Leader收到的心跳响应没有过半则自动降级成为Follow停止对外服务

(为什么要心跳响应,还要自动降级?后面咱们细说)

在这里插入图片描述

3.KV客户端请求流程

因为我们要做的是一个简易版KV嘛,那肯定有客户端发送命令嘛对不对:

  1. 客户端发送SET或者GET命令,集群返回成功或者数据
  2. 发送SET命令,只有Leader会处理,同步给其他Follow,然后根据结果返回成功还是失败
  3. 发送GET命令,目前也只有Leader会处理,返回对应数据,没有就null(GET没有日志产生)

节点间日志的同步持久化后面细说,这里也看的出来为什么分布式体系下CAP不能共存了,你想要高可用,性能好,就必须在请求leader刷盘成功后返回甚至异步刷盘,这就必然导致可能存在数据丢失或者主从数据不一致的情况,如果你想要一致性,就必须在节点日志都同步完成后才返回(下面我们将日志同步流程)

在这里插入图片描述

4.日志同步流程

上面说过了,我们将心跳和日志做了拆分,只有客户端请求SET命令才会产生日志

  1. Leader收到客户端请求后,先预提交到内存中,后发送预提交命令给所有Follow
  2. Follow收到Leader的预提交命令同样先提交到内存,然后响应Leader
  3. Leader一旦收到超过半数的Follow响应则执行刷盘持久化,否则给客户端响应失败
  4. Leader刷盘成功后,给所有Follow发送刷盘请求,然后给客户端响应成功(无需关心Follow刷盘结果)

这就是很典型的CP流程,保持了一致性和数据不丢失,但大大降低了性能(发现没有尽管这样做,依旧可能存在Follow数据丢失的情况,比如:我是新加入的Follow节点、Follow节点刷盘失败等等情况,那该怎么办呢,我们下面接着来补充)

在这里插入图片描述

5.日志校验流程

正如上所说,日志依旧存在丢失的风险,我们需要做一个日志校验定时任务,定时校验日志是否丢失,由于这个和日志的设计息息相关,所以我们后面在细说,这里简单过一遍流程

  1. follow会有一个定时任务,定时Check日志文件,寻找缺失的日志
  2. 如果有则拿到缺失的日志发送拉取请求到Leader,获取对应的日志
  3. 然后填充进日志文件,这样就一定保持了和Leader日志数据对齐了

难道每次都要从头到尾扫描一次文件吗?当然不是,扫描过的不需要扫描,有checkPoint,每次只是从checkPoint扫描到lastLogIndex

在这里插入图片描述

三、模块简介

1.RPC模块

这里我们采用Netty框架来做,每个节点即是Client又是Server

按原Raft算法来说,一共有以下几种RPC类型的通信:

RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。
AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。

但是我将它进行了一个拆分,拆分的更细了:

  • RequestVoteRPC-请求投票 RPC,由 Candidate 在选举期间发起。
  • RequestVoteResult-投票响应RPC,由follow投票
  • HeartBeatRequest-心跳RPC,由leader定时不间断发起
  • HeartBeatResult-心跳响应RPC,由follow响应
  • AppendEntriesPreCommit-日志预提交RPC,由leader发起预提交
  • AppendEntriesPreCommitResult-日志预提交响应RPC,由follow响应
  • AppendEntriesCommit-日志提交RPC,预提交成功后,leader会发起真正提交的命令
  • LogIndexPull-日志拉取RPC,follow定时检测发现自身存在日志丢失,向leader主动拉取日志
  • LogIndexPullResult-日志拉取响应RPC,leader发现follow存在日志缺失,把日志发给follow
  • ClientRequest-客户端请求RPC,KV存储的客户端,向集群发出的命令
  • ClientResponse-客户端请求响应RPC,对客户端的响应

分别对应着一个实体类:

在这里插入图片描述

RPC整体的编解码设计,序列化等等,都和我之前写的RPC框架差不多,这里就不在过多的介绍了,有兴趣可以看看我的:如何从0-1手写一个RPC框架

这里只介绍一下相比原来做出的调整,原来RPC框架传输的实体是固定的,而现在多了很多,而且大量涉及到同步请求返回,所以相比原来新增了泛型的处理,如下示例,两行代码就搞定了一次请求:

RpcSession<ClientResponse, ClientRequest> rpcSession = RpcSessionFactory.<ClientResponse, ClientRequest>openSession(serverConfig, clientRequest);
ClientResponse clientResponse = rpcSession.<ClientResponse>syncSend(4000L);

同时支持:同步等待、超时等待、异步三种请求方式:

public interface RpcSession<R,T>{

    <R> R syncSend();

    <R> R syncSend(long timeout);

    void asyncSend();
}

感兴趣的建议自己看看,RPC所在目录和Netty所有Handler如下:

在这里插入图片描述

2.节点模块

节点有三种类型,leader、candidate、follow,所以我这抽象出一个节点接口,三种实现,一个统一对外服务,一个全局节点信息类

一个节点接口

public interface RaftNode {

    /** 客户端的请求,会产生日志 : 只有leader才会处理,follow返回leader地址,candidate拒绝 */
    ClientResponse clientRequestHandler(ClientRequest command,List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException;

    /** leader发来的log预处理:会先缓存 */
    AppendEntriesPreCommitResult logPreCommitHandler(AppendEntriesPreCommit appendEntriesPreCommit);

    /** leader发来的log提交请求 */
    void logCommitHandler(AppendEntriesCommit appendEntriesCommit);

    /** follow发来的log拉取请求 */
    LogIndexPullResult sendLogPullRequest(List<Long> pullLogIndex);

    /** leader要处理follow的拉取请求 */
    LogIndexPullResult logPullRequestHandler(LogIndexPull logIndexPull);

    /** 发起投票 : 只有候选者 才会发起 */
    boolean callVoteRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException;

    /** 投票请求处理 */
    RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC);

    /** 发起心跳 : 只有领导才会发起心跳 阻止其他节点成为候选人*/
    boolean callHeartBeatRequest(List<ServerConfig>serverConfigs) throws ExecutionException, InterruptedException;

    /** 心跳请求处理 : 只有追随者/候选人才会处理*/
    HeartBeatResult heartBeatHandler(HeartBeatRequest heartBeatRequest);
    

}

三种实现

在这里插入图片描述

一个对外服务

public class RaftNodeService {

    private static final Logger log = LoggerFactory.getLogger(RaftNodeService.class);

    // 心跳间隔时间
    private final static long INTERVAL_TIME = 1500L;

    private static Map<NodeStatusEnums, RaftNode> raftNodeMap = new ConcurrentHashMap<>(8);

    static {
        raftNodeMap.put(NodeStatusEnums.LEADER, new LeaderRaftNode());
        raftNodeMap.put(NodeStatusEnums.CANDIDATE, new CandidateRaftNode());
        raftNodeMap.put(NodeStatusEnums.FOLLOW, new FollowRaftNode());
    }

    /**
     * 节点信息初始化
     */
    public static void raftNodeInit(ServerConfig self, List<ServerConfig> clusterConfig) {
        RaftNodeInfo.getInstance().setSelf(self);
        RaftNodeInfo.getInstance().setClusterConfig(clusterConfig);
        RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW);
        createElectionTask();
    }

    /**
     * 发送心跳
     */
    public synchronized static void sendHeartBeat() {
        RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
        boolean result = false;
        try {
            result = raftNode.callHeartBeatRequest(RaftNodeInfo.getInstance().getClusterConfig());
        } catch (ExecutionException e) {
            log.debug(" {}: 完了,作为leader发送心跳失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
        } catch (InterruptedException e) {
            log.debug(" {}: 完了,作为leader发送心跳失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
        }
        if (!result) {
            // 代表心跳失败了,状态已经变更了
            // 需要停止心跳,开启心跳检测
            heartBeatTestDestroy();
            createElectionTask();
        }
    }

    /**
     * 心跳处理
     */
    public static void heartBeatHandler(HeartBeatRequest request, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            channel.writeAndFlush(new RpcRemoteMsg<HeartBeatResult>(raftNode.heartBeatHandler(request)));
            // 收到了心跳,所以就要停止当前心跳的检测,然后重新开启一个检测任务
            createElectionTask();
        });
    }

    // 命令合规性校验 目前就get set 随便校验一下
    public static boolean commandCheck(String command) {
        if (command == null || !"SET_GET".contains(command.split(" ")[0]) || command.split(" ").length < 2) {
            return false;
        }
        return true;
    }

    /**
     * 客户端的请求,以KV为例 就是set命令 , 这里请求返回就简陋一点
     */
    public static void clientRequestHandler(ClientRequest request, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            ClientResponse clientResponse = ClientResponse.builder().build();
            clientResponse.setRequestId(request.getRequestId());
            if (!commandCheck(request.getCommand())) {
                clientResponse.setCode(401);
                clientResponse.setMsg("命令格式不正确");
                channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));
                return;
            }
            // 只有set命令才需要发送日志,get命令直接取数据就行了
            String[] command = request.getCommand().split(" ");
            if (command[0].equals("SET")) {
                RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
                try {
                    channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(raftNode.clientRequestHandler(request, RaftNodeInfo.getInstance().getClusterConfig())));
                    return;
                } catch (ExecutionException e) {
                    log.debug(" {}: 日志提交失败了:{}", request.getCommand(), e.getMessage(), e);
                    clientResponse.setCode(500);
                    clientResponse.setMsg(e.getMessage());
                    channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));
                } catch (InterruptedException e) {
                    log.debug(" {}: 日志提交失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
                    clientResponse.setCode(500);
                    clientResponse.setMsg(e.getMessage());
                    channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));
                }
            } else {
                // get命令直接取值
                clientResponse.setCode(200);
                clientResponse.setData(RaftNodeInfo.getInstance().getLogManage().getDataByKey(command[1]));
                channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));
            }
        });
    }


    /**
     * Log预提交请求
     */
    public static void logPreCommitHandler(AppendEntriesPreCommit request, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            channel.writeAndFlush(new RpcRemoteMsg<AppendEntriesPreCommitResult>(raftNode.logPreCommitHandler(request)));
        });
    }

    /**
     * Log提交请求
     */
    public static void logCommitHandler(AppendEntriesCommit request) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            raftNode.logCommitHandler(request);
            // 收到了日志,所以就要停止当前心跳的检测,然后重新开启一个检测任务
            createElectionTask();
        });
    }

    /**
     * 发起投票
     */
    public synchronized static void sendCallVote() {
        RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
        boolean result = false;
        try {
            result = raftNode.callVoteRequest(RaftNodeInfo.getInstance().getClusterConfig());
        } catch (ExecutionException e) {
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.debug(" {}: 完了,作为candidate发起投票失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
        } catch (InterruptedException e) {
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.debug(" {}: 完了,作为candidate发起投票失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
        }
        if (!result) {
            // 代表发起投票失败了,状态已经变更了
            // 需要重新开启一个检测任务
            createElectionTask();
            return;
        }
        // 投票成功了 需要开启心跳任务
        createHearBeatTask();
    }

    /**
     * 发起投票请求处理
     */
    public synchronized static void callVoteHandler(RequestVoteRPC requestVoteRPC, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            channel.writeAndFlush(new RpcRemoteMsg<RequestVoteResult>(raftNode.voteRequestHandler(requestVoteRPC)));
        });
    }

    /**
     * 发起Log拉取请求
     */
    public synchronized static LogIndexPullResult sendLogPullRequest(List<Long> pullLogIndex) {
        if (CollectionUtil.isNotEmpty(pullLogIndex)) {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            return raftNode.sendLogPullRequest(pullLogIndex);
        }
        return null;
    }

    /**
     * 发起Log拉取请求处理
     */
    public synchronized static void logPullRequestHandler(LogIndexPull logIndexPull, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            log.debug("{}:拉取日志:{}",channel.remoteAddress(),JSONObject.toJSON(logIndexPull));
            channel.writeAndFlush(new RpcRemoteMsg<LogIndexPullResult>(raftNode.logPullRequestHandler(logIndexPull)));
        });
    }


    /**
     * 销毁并创建心跳检测任务
     */
    public static void createElectionTask() {
        long randomTime = getRandomTime();
        final long intervalTime = INTERVAL_TIME + randomTime;
        // 先销毁之前的
        electionTaskDestroy();
        //开启新的
        ScheduledFuture<?> schedule = ThreadPoolUtils.hearBeatAsyncPool.schedule(new ElectionTask(intervalTime), intervalTime, TimeUnit.MILLISECONDS);
        RaftNodeInfo.getInstance().setElectionTask(schedule);
    }

    /**
     * 销毁并创建心跳任务
     */
    public static void createHearBeatTask() {
        // 先销毁之前的
        heartBeatTestDestroy();
        //开启新的
        ScheduledFuture<?> schedule = ThreadPoolUtils.hearBeatAsyncPool.scheduleAtFixedRate(new HeartBeatTask(), 0L, INTERVAL_TIME, TimeUnit.MILLISECONDS);
        RaftNodeInfo.getInstance().setElectionTask(schedule);
    }


    public static long getRandomTime() {
        // 要比心跳慢一点
        return RandomUtil.randomLong(250L, 1000L);
    }

    /**
     * 销毁心跳检测任务
     */
    public static void electionTaskDestroy() {
        if (null != RaftNodeInfo.getInstance().getElectionTask()) {
            RaftNodeInfo.getInstance().getElectionTask().cancel(true);
            RaftNodeInfo.getInstance().setElectionTask(null);
        }
    }

    /**
     * 销毁心跳任务
     */
    public static void heartBeatTestDestroy() {
        if (null != RaftNodeInfo.getInstance().getHeartBeatTask()) {
            RaftNodeInfo.getInstance().getHeartBeatTask().cancel(true);
            RaftNodeInfo.getInstance().setHeartBeatTask(null);
        }
    }

一个全局节点信息类

public class RaftNodeInfo {

    /**
     * 自己
     */
    private ServerConfig self;

    /**
     * 集群其他节点信息
     */
    private List<ServerConfig> clusterConfig;

    /**
     * 当前节点状态 默认FOLLOW
     */
    private volatile NodeStatusEnums currentNodeStatus = NodeStatusEnums.FOLLOW;

    /**
     * 当前节点任期
     */
    private volatile long currentTerm = 0L;

    /**
     * 当前leader
     */
    private volatile String currentLeaderId;

    /**
     * 最后日志索引 已提交的
     */
    private volatile long lastLogIndex = 0L;

    /**
     * 最后的日志任期 这我这没用到
     */
    private volatile long lastLogTerm = 0L;

    /**
     * 当前任期给谁投过票
     */
    private volatile String voteFor;

    /**
     * 最近更新时间  心跳或者日志更新
     **/
    private volatile long lastUpdateTime = 0L;

    /**
     * 心跳任务
     **/
    private ScheduledFuture heartBeatTask;

    /**
     * 心跳检测任务
     **/
    private ScheduledFuture electionTask;

    /**
     * 日志管理
     **/
    private LogManage logManage;

    /**
     * 日志文件
     **/
    private String logPath;
}

3.状态机

提供节点状态变更、心跳结果处理、投票结果处理、日志一致性处理

public class StateMachines {
    private static final Logger log = LoggerFactory.getLogger(StateMachines.class);

    /** 候选人-》leader */
    public static void becomeLeader(){
        // 变为leader
        RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.LEADER);
        // leader设置为自己
        RaftNodeInfo.getInstance().setCurrentLeader(RaftNodeInfo.getInstance().getSelf().toString());
        // 票清了
        RaftNodeInfo.getInstance().setVoteFor(null);
    }

    /** follow-》候选人 */
    public static void becomeCandidate(){
        // 变为候选人
        RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.CANDIDATE);
        // 任期+1
        RaftNodeInfo.getInstance().setCallVoteTerm();
        // 给自己投一票
        RaftNodeInfo.getInstance().setVoteFor(RaftNodeInfo.getInstance().getSelf().toString());
    }

    /** 候选人、leader->follow */
    public static void becomeFollow(long term,String leaderId,String voteFor){
        RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW);
        RaftNodeInfo.getInstance().setCurrentLeader(leaderId);
        RaftNodeInfo.getInstance().setCurrentTerm(term);
        RaftNodeInfo.getInstance().setVoteFor(voteFor);
        RaftNodeInfo.getInstance().setLastUpdateTime(System.currentTimeMillis());
    }


    /** 投票结果一致性处理 */
    public static boolean voteResultHandler(List<Future<RequestVoteResult>> taskList,Integer nodeNum) throws ExecutionException, InterruptedException {
        int voteNum = 0;
        for (Future<RequestVoteResult> future : taskList) {
            RequestVoteResult voteResult = future.get();
            // 判断leader是否还存活 存活的话肯定要把我给否了呀
            if (leaderIsLive(voteResult)) {
                return false;
            }
            if(voteResult!=null){
                log.debug("投票结果,我的term:{} ,结果:{}",RaftNodeInfo.getInstance().getCurrentTerm(), JSONObject.toJSON(voteResult));
            }
            if (null != voteResult && voteResult.isVoteGranted()) {
                voteNum++;
            }
        }

        if (voteNum != 0 && voteNum >= (nodeNum / 2)) {
            // 投票通过 升级为leader
            StateMachines.becomeLeader();

            log.debug(" {}: 哈哈哈,我升级为leader啦", RaftNodeInfo.getInstance().getSelf().toString());
            return true;
        } else {
            // 投票不通过,退成follow 继续苟着
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);

            log.debug(" {}: 完了,这帮人不支持我,等待机会再试", RaftNodeInfo.getInstance().getSelf().toString());
            return false;
        }
    }

    // 判断leader是否还存活 存活的话肯定要把我给否了呀
    private static boolean leaderIsLive(RequestVoteResult voteResult) {
        if (null != voteResult && StrUtil.isNotEmpty(voteResult.getLeaderId())) {
            // 被leader一票否决,退成follow 继续苟着
            StateMachines.becomeFollow(voteResult.getTerm(), voteResult.getLeaderId(), null);
            return true;
        }
        return false;
    }

    /** 心跳结果一致性处理 */
    public static boolean heartBeatResultHandler(List<Future<HeartBeatResult>> taskList,Integer nodeNum) throws ExecutionException, InterruptedException {
        int responseNum = 0;
        for (Future<HeartBeatResult> future : taskList) {
            HeartBeatResult heartBeatResult = future.get();
            if (null != heartBeatResult) {
                responseNum++;
            }
        }
        if (responseNum != 0 && responseNum >= (nodeNum / 2)) {
            log.debug("{}: 万众一心,我再接再厉", RaftNodeInfo.getInstance().getSelf().toString());
            return true;
        } else {
            // 没有应答或者应答数量小于一半 就退化为候选者,并停止对外提供服务
            // 状态变更
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.debug("{}: 我找不到追随者了,我暂时停止对外服务", RaftNodeInfo.getInstance().getSelf().toString());
            return false;
        }
    }

    /** 日志预提交结果 */
    public static boolean logPreCommitHandler(List<Future<AppendEntriesPreCommitResult>> taskList, Integer nodeNum) throws ExecutionException, InterruptedException {
        int responseNum = 0;
        for (Future<AppendEntriesPreCommitResult> future : taskList) {
            AppendEntriesPreCommitResult preCommitResult = future.get();
            if (null != preCommitResult && preCommitResult.isSuccess()) {
                responseNum++;
            }
        }
        return responseNum != 0 && responseNum >= (nodeNum / 2);
    }
}

4.日志模块

public interface LogManage extends ResourceLifeCycle{

    /** leader预提交 */
    long preCommitLog(LogEntity logEntity);

    /** follow预提交 */
    void preCommitLog(long preCommitLogId,LogEntity logEntity);

    /** 缓存移除 */
    void cacheLogRemove(long cacheLogId);

    /** leader日志提交 */
    long commitLog(long cacheLogId);

    /** follow日志提交 */
    void commitLog(long cacheLogId,long logIndex);

    /** follow日志Check */
    void logIndexCheck();

    /** 根据日志索引获取日志内容 */
    LogEntity getLogEntityByIndex(long logIndex, RandomAccessFile file);

    /** 命令数据处理 */
    void dataHandler(String command);

    /** 根据Key获取数据 */
    String getDataByKey(String key);
}

5.定时任务

在这里插入图片描述

  • ElectionTask:心跳检测任务,不通过则升级为Candidate
  • HeartBeatTask:心跳任务,不断给Follow发送心跳,阻止其成为Candidate
  • LogIndexCheckTask:Follow日志Check定时任务

四、核心流程介绍

其实流程图已经很清楚了,这里挑部分来聊聊

1.选举

目前心跳设置的时间为1500ms,心跳检测的时间为1750ms+0-750ms随机数(之前随机数设置的很短,算上网络延迟等因素,导致两个Candidate同任期的几率非常之高),follow收到心跳会更新lastUpdateTIme,而心跳检测则会检测这个时间到当前时间是否超过检测时间间隔,超过了则会变成candidate发起选举

在这里插入图片描述

CandidateRaftNode:发起选举RPC

选举RPC实体类

public class RequestVoteRPC extends RpcMsgId implements Serializable {

    /** 候选人的任期号  */
    private long term;

    /** 请求选票的候选人的 Id(ip:selfPort) */
    private String candidateId;

    /** 候选人的最后日志条目的索引值 */
    private long lastLogIndex;

    /** 候选人最后日志条目的任期号  */
    private long lastLogTerm;

}

选举方法

    public boolean callVoteRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException {
        if (CollectionUtil.isEmpty(serverConfigs)) {
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.error("只有一个节点,还发起什么投票?");
            return false;
        }
        // candidate 会发起投票请求
        RaftNodeInfo instance = RaftNodeInfo.getInstance();

        // 投票过程中 可能又收到了心跳或者日志,状态已经变为follow
        if (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) {
            return false;
        }
        log.debug(" {}: 哈哈哈,我发起了投票", RaftNodeInfo.getInstance().getSelf().toString());
        List<Future<RequestVoteResult>> taskList = new ArrayList<>(serverConfigs.size());

        // 加上自己的一票 需要 大于= n/2+1
        // 所以直接 >= n/2 就算通过了
        // 但是注意此时如果已经存在leader,日志数又不比当前leader大,所以leader还是leader 具有一票否决权
        for (ServerConfig serverConfig : serverConfigs) {
            Future<RequestVoteResult> voteResultFuture = ThreadPoolUtils.sendAsyncMsgPool.submit(() -> {
                // 构建投票
                RequestVoteRPC voteRPC = RequestVoteRPC.builder().candidateId(instance.getSelf().toString())
                        .term(instance.getCurrentTerm())  // 成为候选 的时候任期就+1了
                        .lastLogIndex(instance.getLastLogIndex()).build();
                RpcSession<RequestVoteResult, RequestVoteRPC> voteRPCRpcSession = RpcSessionFactory.<RequestVoteResult, RequestVoteRPC>openSession(serverConfig, voteRPC);
                return voteRPCRpcSession == null ? null : voteRPCRpcSession.syncSend(1000L);
            });
            taskList.add(voteResultFuture);
        }
        // 投票过程中 可能状态又已经变为follow
        if (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) {
            return false;
        }

        return StateMachines.voteResultHandler(taskList, serverConfigs.size());

    }

Follow选举响应

  1. 任期比我大我就同意
  2. 任期跟我一样,记录的日志比我多而且我没有投过票我也同意

(Follow同一个任期内只能投一票)

    public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) {
        // follow 需要处理投票请求
        RaftNodeInfo instance = RaftNodeInfo.getInstance();
        RequestVoteResult voteResult = RequestVoteResult.builder().term(instance.getCurrentTerm()).build();
        voteResult.setRequestId(voteRPC.getRequestId());
        // 1.任期比我大,我直接就同意
        if (voteRPC.getTerm() > instance.getCurrentTerm()) {
            return agreeVote(voteResult, voteRPC);
        }
        // 2.任期跟我一样,记录的日志比我多 而且 我没有投过票
        // 我只能投一票
        if ((voteRPC.getTerm() == instance.getCurrentTerm() && voteRPC.getLastLogIndex() >= instance.getLastLogIndex())
                && (instance.getVoteFor() == null || instance.getVoteFor().equals(voteRPC.getCandidateId()))) {
            return agreeVote(voteResult, voteRPC);
        }
        voteResult.setTerm(instance.getCurrentTerm());
        voteResult.setVoteGranted(false);
        log.info(" {}: 我身为现任Follow,我不认可你的实力,我不能给你投票:{}", instance.getSelf().toString(), voteRPC.getCandidateId());
        return voteResult;
    }

    private RequestVoteResult agreeVote(RequestVoteResult voteResult, RequestVoteRPC voteRPC) {

        voteResult.setTerm(RaftNodeInfo.getInstance().getCurrentTerm());
        voteResult.setVoteGranted(true);

        RaftNodeInfo.getInstance().setCurrentTerm(voteRPC.getTerm());
        RaftNodeInfo.getInstance().setVoteFor(voteRPC.getCandidateId());

        log.info(" {}: 我身为现任Follow,我认可你的实力,我给你投票:{}", RaftNodeInfo.getInstance().getSelf().toString(), voteRPC.getCandidateId());
        return voteResult;
    }

Leader响应

leader有没有可能收到投票?有可能!假设某一个Follow延迟收到心跳或者没有收到心跳就会发起,那leader就会收到它发起的投票,那怎么办?判断任期和日志,任期和日志都比Leader大则Leader需要退位,否则Leader应该具有一票否决权(这样就防止了某个follow无限发起投票,任期无限+1这种情况)

一个candidate任期非常大的时候,其他follow必然会给他投票,那这样就升为leader就导致了同时存在两个leader的情况,所以这时候的当期leader应该具有一票否决权

    public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) {
        // leader 有可能收到 候选者的投票申请
        RaftNodeInfo instance = RaftNodeInfo.getInstance();
        RequestVoteResult requestVoteResult = RequestVoteResult.builder().build();
        requestVoteResult.setRequestId(voteRPC.getRequestId());
        // 候选人的任期比我大 而且日志还比我大 说明我已经out了,我需要退位
        if (voteRPC.getTerm() >= instance.getCurrentTerm() && voteRPC.getLastLogIndex() > instance.getLastLogIndex()) {
            // 状态变更
            StateMachines.becomeFollow(voteRPC.getTerm(), voteRPC.getCandidateId(), null);

            requestVoteResult.setTerm(voteRPC.getTerm());
            requestVoteResult.setVoteGranted(true);
            log.info(" {}: 我身为现任leader,我认可你的实力,我下位让贤:{}", instance.getSelf().toString(), voteRPC.getCandidateId());
            return requestVoteResult;
        }
        log.info(" {}: 我身为现任leader,不同你的上任请求:{}", instance.getSelf().toString(), voteRPC.getCandidateId());
        // 否则就不同意,而且你还得给我老实点
        requestVoteResult.setTerm(instance.getCurrentTerm());
        requestVoteResult.setVoteGranted(false);
        requestVoteResult.setLeaderId(instance.getSelf().toString());
        return requestVoteResult;
    }

2.心跳

在这里插入图片描述

心跳这里我做了一个响应降级的操作,其实正常是不需要的,我这的目的是防止网络分区!

假设原本是这样:

在这里插入图片描述

一旦网络分区则会变成这样,导致两个leader的出现,所以这时候心跳的响应就至关重要,一旦响应少于半数,则leader应该自动降级

在这里插入图片描述

LeaderRaftNode:发起心跳

public boolean callHeartBeatRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException {

        if (CollectionUtil.isEmpty(serverConfigs)) {
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.debug(" {}: 只有一个leader,还发什么心跳?", RaftNodeInfo.getInstance().getSelf().toString());
            return false;
        }

        List<Future<HeartBeatResult>> taskList = new ArrayList<>(serverConfigs.size());

        // leader 需要发送心跳 防止网络分区,一旦心跳返回不足 n/2 则自动降级
        for (ServerConfig serverConfig : serverConfigs) {
            Future<HeartBeatResult> heartBeatResultFuture = ThreadPoolUtils.sendAsyncMsgPool.submit(() -> {
                HeartBeatRequest build = HeartBeatRequest.builder()
                        .leaderId(RaftNodeInfo.getInstance().getSelf().toString())
                        .leaderLastCommitIndex(RaftNodeInfo.getInstance().getLastLogIndex())
                        .term(RaftNodeInfo.getInstance().getCurrentTerm()).build();
                RpcSession<HeartBeatResult, HeartBeatRequest> heartBeatRequestRpcSession = RpcSessionFactory.<HeartBeatResult, HeartBeatRequest>openSession(serverConfig, build);
                return heartBeatRequestRpcSession == null ? null : heartBeatRequestRpcSession.syncSend(200L);
            });
            taskList.add(heartBeatResultFuture);
        }
        // 响应结果处理
        return StateMachines.heartBeatResultHandler(taskList, serverConfigs.size());
    }

3.日志

日志设计的非常之简陋,就不做过多的介绍了,本文目的还是以实现Raft为主,性能问题暂不考虑,不过还是说一下测试结果,因为KV存储,项目启动需要读取数据放入内存,目前读取50m左右文件10w条日志需要8s左右,肯定是不合理的,目前并没有做日志压缩和快照,也没有用零拷贝技术,因为不想搞的太过复杂

在这里插入图片描述

关于日志check,这里放上两种测试常见的结果

在这里插入图片描述

1.新的节点加入,需要拉取一次所有数据

在这里插入图片描述

2.日志中间缺失

在这里插入图片描述

两种情况都是没问题的!

五、遗留的问题

注意:尽管这样还是有几率导致数据丢失的!!!!

再次强调:本文不完全和Raft论文对标,加了不少个人的想法进去,所以在这个过程中都是遇到问题、思考问题、解决问题,这本就是一个学习的过程,目前最大的一个问题就是:

新加入的节点已经收到了Leader的数据,更新的lastCommitIndex,但是还没来得及向Leader同步以前的数据,而这时Leader挂了,所以这时候这个节点就有几率通过投票成为Leader,这时候数据就有几率丢失文章中可能看不太出来,具体得看看代码,这算是一个很严重的BUG,各位想想可以怎么解决,而Raft又是怎么解决的?

当然可能还有其他问题,各位大佬如果知道的也可以提出来

六、总结

只有深入本质才能顺应发展,在分布式体系下,共识算法是必不可少的,光看不实践就容易眼高手低,当初我看Raft的时候也感觉挺简单的,不就是三种状态做不同的事,然后状态变更嘛,真正一做起来就发现好多细节都需要考虑,这还只是个demo,回头想想RocketMq和kafka的存储设计是真的厉害,做完这个又收获不少

七、项目\个人博客地址

项目地址

个人博客 : 无八股,全干货

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/822607.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

蓝桥云课ROS机器人旧版实验报告-04三维建模与仿真

项目名称 实验四 3D建模与仿真 成绩 内容&#xff1a;自定义机器人3D模型&#xff0c;创建一个URDF文件、xacro文件、ROS2[Kinetic/Melodic/Noetic]仿真 实验记录&#xff08;70分&#xff09; 从头开始构建使用 URDF 的可视化机器人模型&#xff1a; 先尝试两个案例&a…

合合信息上会在即:“排队”耗时近两年,能否交出IPO答卷?

撰稿|行星 来源|贝多财经 近日&#xff0c;上海合合信息科技股份有限公司&#xff08;下称“合合信息”&#xff09;在上海证券交易所科创板递交招股书&#xff08;上会稿&#xff09;。据贝多财经了解&#xff0c;合合信息于2021年9月27日递交招股书&#xff0c;将于2023年8…

今日头条面试真题及答案,软件测试工程师面试秘籍

试题1&#xff0e;在浏览器地址栏里输入一个网址&#xff0c;接下来会发生什么&#xff1f; 答案&#xff1a;发生的操作如下。 &#xff08;1&#xff09;浏览器查找该网址的IP地址。 &#xff08;2&#xff09;浏览器根据解析得到的IP地址向Web服务器发送一个HTTP请求。 &am…

CFI技术新探索,struct_san今日登场

一、背景 C/C开发的应用程序&#xff0c;长久以来存在内存破坏类的安全问题。当攻击者掌握了目标程序的漏洞后&#xff0c;就可以开发漏洞利用程序劫持目标程序的控制流。早期的漏洞利用是采用代码注入的方式&#xff0c;通过在缓冲区置入一段代码&#xff08;shellcode&#…

在 Tinkercad 中加快设计的 22 个技巧

在 Tinkercad 中加快设计的 22 个技巧 原文 Everyone knows that Tinkercad is the easiest way to get started in 3D design. Once you get the hang of it, you realize that it’s one of the fastest design tools available. With no software to launch or complex me…

Pytest学习教程_测试报告生成pytest-html(三)

前言 pytest-html 是一个用于生成漂亮的 HTML 测试报告的 pytest 插件。它可以方便地将 pytest 运行的测试结果转换为易于阅读和理解的 HTML 报告&#xff0c;提供了丰富的测试结果展示功能和交互性。 一、安装 # 版本查看命令 pytest版本&#xff1a; pytest --version pyte…

PHP代码审计--理论

提供资料&#xff1a; php 基础 : https://www.runoob.com/php/php-tutorial.html php是什么&#xff1f; PHP 是服务器端脚本语言。 首先在学习PHP前需要对HTML 和CSS有一定的认识 PHP 能做什么&#xff1f; PHP 可以生成动态页面内容PHP 可以创建、打开、读取、写入、关…

InnoDB引擎底层逻辑讲解——架构之磁盘架构

1. System Tablespaces区域 系统表空间是change buffer&#xff08;更改缓冲区&#xff09;的存放区域&#xff0c;这是在8.0之后重新规划的&#xff0c;在5.x版本的时候&#xff0c;系统表空间还会存放innodb的数据字典undolog日志等信息&#xff0c;在8.0之后主要主要存放更…

【程序猿周末如何才能获得充分的休息】

工作以后常常容易感到疲于奔命&#xff0c;即使在周末也没有得到高质量的休息。打工人/学生党如何过周末&#xff1f;你有哪些延长周末和下班时间的好方法吗&#xff1f;你可以选择从以下几个方向谈谈你的想法和观点。 一&#xff1a;周末的时间规划 周末双休 二&#xff1a;提…

springboot 自定义starter项目Unable to read meta-data for class

springboot 自定义starter包&#xff0c;在项目中引用&#xff0c;启动报错。 org.springframework.boot.SpringApplication [SpringApplication.java:843] Application run failed java.lang.IllegalStateException: Unable to read meta-data for class com.hxg.mail.spring…

找好听的配乐、BGM就上这6个网站,免费商用。

推荐几个音乐素材网站给你&#xff0c;各种类似、风格的都有&#xff0c;而且免费下载&#xff0c;还可以商用&#xff0c;建议收藏起来~ 菜鸟图库 https://www.sucai999.com/audio.html?vNTYxMjky 站内有上千首音效素材&#xff0c;网络流行的音效素材这里都能找到&#xf…

一起学算法(双指针篇)

概念&#xff1a; 通过两个指针&#xff0c;不断的调整区间&#xff0c;从而求出问题最优解的算法就叫“尺取法”&#xff0c;由于利用的是两个双指针&#xff0c;所以也叫作“双指针”算法&#xff0c;这里的“尺”的含义&#xff0c;主要是因为这类问题&#xff0c;最终要求解…

刷题笔记 day2

力扣 1089 复写零 思路&#xff1a;双指针 第一步&#xff1a;利用指针 cur 去记录最后一位要复写的数 &#xff0c; 利用指针 dest 指向最后一位数所要复写的位置&#xff1b; 实现过程&#xff1a;最开始 cur 指向0&#xff0c;dest 指向 -1 &#xff0c; 当arr[cur] ! …

高并发与性能优化的神奇之旅

作为公司的架构师或者程序员&#xff0c;你是否曾经为公司的系统在面对高并发和性能瓶颈时感到手足无措或者焦头烂额呢&#xff1f;笔者在出道那会为此是吃尽了苦头的&#xff0c;不过也得感谢这段苦&#xff0c;让笔者从头到尾去探索&#xff0c;找寻解决之法。 目录 第一站&…

深入理解设计模式之模板方法模式

深入理解设计模式之模板方法模式 什么是模板方法模式&#xff1f; 模板方法模式是一种行为型设计模式&#xff0c;它定义了一个算法的骨架&#xff0c;将一些步骤的具体实现延迟到子类中。模板方法模式通过将算法的通用部分抽象出来&#xff0c;以模板方法的形式提供给子类&am…

express学习笔记7 - docker跟mysql篇

安装Docker和Navicat Docker 进官⽹https://docs.docker.com/get-docker/ 选择机型安装即可。 Navicat&#xff08;也可以在网上找个破解版本&#xff09; 进官⽹https://www.navicat.com/en/products/navicat-premium 安装完之后连接新建⼀个数据库连接 然后再⾥⾯新建⼀个数…

【编程语言 · C语言 · 通讯录管理系统】

【编程语言 C语言 通讯录管理系统】https://mp.weixin.qq.com/s?__bizMzg4NTE5MDAzOA&mid2247491539&idx1&sn02173f15bbff6d5f01a3426a1ecf7120&chksmcfade32af8da6a3cb187ecde99fe0519c4d67ef05488754ab2196fab0915262c260ccc68b304&payreadticketHEsQ…

MacOS使用brew如何下载Nginx

首先&#xff0c;第一步切换源&#xff1a; 切换 brew.git 仓库地址&#xff1a; cd "$(brew --repo)" git remote set-url origin https://mirrors.aliyun.com/homebrew/brew.git 替换 homebrew-core.git 仓库地址: cd "$(brew --repo)/Library/Taps/home…

无线蓝牙耳机有什么值得耳机买的?几款值得买的口碑品牌盘点

蓝牙耳机是一种无线耳机&#xff0c;其通过蓝牙技术与其他设备进行连接&#xff0c;例如手机、电脑、平板电脑等。蓝牙耳机使得用户可以在不受线缆限制的情况下享受音频体验&#xff0c;而且还可以方便地进行通话&#xff0c;目前市场上有许多不同种类和品牌的蓝牙耳机&#xf…

大厂原来都这么使用IDEA远程调试的!

远程调试是一项重要的技术&#xff0c;特别是对于使用IDEA开发的开发者来说。在本篇技术博客中&#xff0c;我们将探讨如何使用IDEA进行远程调试。 1 IDEA 配置 首先&#xff0c;我们需要确保我们的开发环境已经准备就绪。我们需要在远程服务器上安装并配置好调试器&#xff…