nacos集群源码解析-cp架构

news2024/11/16 18:47:22

目录

1 简介

1.1 什么是 CP 架构?

1.2 Nacos 中的 CP 架构特点

1.3 优缺点

1.4适用场景

2 cp架构的主节点选举

2.1 选举流程

2.2 总结

3 cp架构主节点的心跳发送

3.1 leader发送心跳

3.2 follower接收心跳

3.3 总结

4 cp架构的服务注册

4.1 注册流程

4.2 总结


1 简介

Nacos(Naming Configuration Service)是一款由阿里巴巴开源的服务发现和配置管理平台。它支持服务注册、发现和配置管理,并在微服务架构中被广泛应用。Nacos 支持两种主要的分布式一致性架构:CP 模式和 AP模式。这里主要介绍一下 CP 架构

1.1 什么是 CP 架构?

CP 架构遵循 CAP 理论中的 一致性(Consistency) 和 分区容错性(Partition Tolerance,在系统遇到网络分区问题时,CP 架构确保数据一致性,但可能会牺牲可用性(Availability)。这意味着在网络分区的情况下,系统会优先保证所有节点数据的一致性,即使需要暂停一些服务。

1.2 Nacos 中的 CP 架构特点

•  数据强一致性:CP 模式下,Nacos 集群中的数据保证强一致性,所有的写操作都需要通过大多数节点的确认才能生效。因此,当数据写入一个节点时,数据会被同步到集群中的大多数节点,确保一致性。

•  Raft 协议:Nacos 的 CP 架构实现基于 Raft 一致性协议,这是一个分布式一致性算法,确保数据在多个节点之间保持一致。Raft 协议通过选举出一个主节点(Leader)来管理写操作,其他节点作为从节点(Follower)负责复制数据。

•  主从架构:Nacos 在 CP 模式下采用主从架构,主节点处理所有写请求,并将这些请求复制给从节点。若主节点不可用,则会在剩余节点中进行新的选举。

•  数据可靠性:因为数据需要被大多数节点确认后才被写入,CP 模式提供了更高的数据可靠性,适用于对一致性要求高的场景,如金融或订单系统。

1.3 优缺点

优点

•  数据一致性:在分布式环境中保持数据的强一致性,减少数据不一致问题。

•  可靠性:保证在故障或网络分区情况下,数据不会出现不同步或冲突的现象。

缺点

•  可用性降低:在网络分区或节点故障时,服务可用性可能受到影响,无法及时处理请求。

•  性能开销:因为写操作需要多数节点确认,写入性能可能相对较低。

1.4适用场景

CP 架构适用于对一致性要求极高的应用,如:

•  金融系统:需要确保每笔交易的准确性。

•  订单处理系统:要求数据在订单创建、修改等操作中保持严格一致。

2 cp架构的主节点选举

2.1 选举流程

核心类:RaftCore.java 核心方法:init


public void init() throws Exception {
    Loggers.RAFT.info("initializing Raft sub-system");
    final long start = System.currentTimeMillis();

    raftStore.loadDatums(notifier, datums);

    setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));

    Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());

    initialized = true;

    Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
    //1 选举流程
    masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
    //2 发心跳流程
    heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());

    versionJudgement.registerObserver(isAllNewVersion -> {
        stopWork = isAllNewVersion;
        if (stopWork) {
            try {
                shutdown();
                raftListener.removeOldRaftMetadata();
            } catch (NacosException e) {
                throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
            }
        }
    }, 100);

    NotifyCenter.registerSubscriber(notifier);

    Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
            GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}


//MasterElection中的run方法
public void run() {
    try {
        if (stopWork) {
            return;
        }
        if (!peers.isReady()) {
            return;
        }
        //1 休眠一段时间
        RaftPeer local = peers.local();
        local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;

        if (local.leaderDueMs > 0) {
            return;
        }

        // reset timeout
        local.resetLeaderDue();
        local.resetHeartbeatDue();
        //2 发起投票
        sendVote();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while master election {}", e);
    }

}


private void sendVote() {

    RaftPeer local = peers.get(NetUtils.localServer());
    Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
            local.term);

    peers.reset();
    //1 选举周期加1
    local.term.incrementAndGet();
    //2 投票给自己
    local.voteFor = local.ip;
    //3 将自己改为候选者
    local.state = RaftPeer.State.CANDIDATE;

    Map<String, String> params = new HashMap<>(1);
    params.put("vote", JacksonUtils.toJson(local));
    //4 发起投票
    for (final String server : peers.allServersWithoutMySelf()) {
        final String url = buildUrl(server, API_VOTE);
        try {
            HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
                        return;
                    }

                    RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);

                    Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
                    //5 解析选票 如果大于半数 将自己改为leader
                    peers.decideLeader(peer);

                }

                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
                }

                @Override
                public void onCancel() {

                }
            });
        } catch (Exception e) {
            Loggers.RAFT.warn("error while sending vote to server: {}", server);
        }
    }
}
2.2 总结
  1. 首先所有节点休眠一个随机时间

  2. 当节点苏醒后 将自己的选举周期加一 投票给自己 将自己修改为候选者

  3. 然后发送投票给其他所有节点

  4. 半数节点以上同意就将自己修改为leader 如果没有选择 开启下一轮的选举

3 cp架构主节点的心跳发送

3.1 leader发送心跳

核心类:RaftCore.java 核心方法:init


public void init() throws Exception {
    Loggers.RAFT.info("initializing Raft sub-system");
    final long start = System.currentTimeMillis();

    raftStore.loadDatums(notifier, datums);

    setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));

    Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());

    initialized = true;

    Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
    //1 选举流程
    masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
    //2 发心跳流程
    heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());

    versionJudgement.registerObserver(isAllNewVersion -> {
        stopWork = isAllNewVersion;
        if (stopWork) {
            try {
                shutdown();
                raftListener.removeOldRaftMetadata();
            } catch (NacosException e) {
                throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
            }
        }
    }, 100);

    NotifyCenter.registerSubscriber(notifier);

    Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
            GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}


//HeartBeat的run方法
public void run() {
    try {
        if (stopWork) {
            return;
        }
        if (!peers.isReady()) {
            return;
        }

        RaftPeer local = peers.local();
        local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
        if (local.heartbeatDueMs > 0) {
            return;
        }

        local.resetHeartbeatDue();
        //发送心跳
        sendBeat();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
    }

}


private void sendBeat() throws IOException, InterruptedException {
    RaftPeer local = peers.local();
    //1 如果不是leader 不能发送心跳
    if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
        return;
    }
    if (Loggers.RAFT.isDebugEnabled()) {
        Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
    }

    local.resetLeaderDue();

    // build data
    ObjectNode packet = JacksonUtils.createEmptyJsonNode();
    packet.replace("peer", JacksonUtils.transferToJsonNode(local));

    ArrayNode array = JacksonUtils.createEmptyArrayNode();

    if (switchDomain.isSendBeatOnly()) {
        Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
    }
    //2 封装需要发送的信息 主要包含key和时间
    if (!switchDomain.isSendBeatOnly()) {
        for (Datum datum : datums.values()) {

            ObjectNode element = JacksonUtils.createEmptyJsonNode();

            if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
            } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
            }
            element.put("timestamp", datum.timestamp.get());

            array.add(element);
        }
    }

    packet.replace("datums", array);
    // broadcast
    Map<String, String> params = new HashMap<String, String>(1);
    params.put("beat", JacksonUtils.toJson(packet));
    //3 压缩消息
    String content = JacksonUtils.toJson(params);

    ByteArrayOutputStream out = new ByteArrayOutputStream();
    GZIPOutputStream gzip = new GZIPOutputStream(out);
    gzip.write(content.getBytes(StandardCharsets.UTF_8));
    gzip.close();

    byte[] compressedBytes = out.toByteArray();
    String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);

    if (Loggers.RAFT.isDebugEnabled()) {
        Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
                compressedContent.length());
    }
    //4 将消息发送给其他节点
    for (final String server : peers.allServersWithoutMySelf()) {
        try {
            final String url = buildUrl(server, API_BEAT);
            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("send beat to server " + server);
            }
            HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
                        MetricsMonitor.getLeaderSendBeatFailedException().increment();
                        return;
                    }

                    peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
                    if (Loggers.RAFT.isDebugEnabled()) {
                        Loggers.RAFT.debug("receive beat response from: {}", url);
                    }
                }

                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
                            throwable);
                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                }

                @Override
                public void onCancel() {

                }
            });
        } catch (Exception e) {
            Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
            MetricsMonitor.getLeaderSendBeatFailedException().increment();
        }
    }

}
3.2 follower接收心跳

核心类:RaftController.java 核心方法:beat


public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
    if (versionJudgement.allMemberIsNewVersion()) {
        throw new IllegalStateException("old raft protocol already stop");
    }
    String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
    String value = URLDecoder.decode(entity, "UTF-8");
    value = URLDecoder.decode(value, "UTF-8");

    JsonNode json = JacksonUtils.toObj(value);
    //1 接受心跳
    RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));

    return JacksonUtils.transferToJsonNode(peer);
}


public RaftPeer receivedBeat(JsonNode beat) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    final RaftPeer local = peers.local();
    final RaftPeer remote = new RaftPeer();
    JsonNode peer = beat.get("peer");
    remote.ip = peer.get("ip").asText();
    remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
    remote.term.set(peer.get("term").asLong());
    remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
    remote.leaderDueMs = peer.get("leaderDueMs").asLong();
    remote.voteFor = peer.get("voteFor").asText();
    //1 只处理leader的心跳
    if (remote.state != RaftPeer.State.LEADER) {
        Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
                JacksonUtils.toJson(remote));
        throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
    }

    if (local.term.get() > remote.term.get()) {
        Loggers.RAFT
                .info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
                        remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
        throw new IllegalArgumentException(
                "out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
    }
    //2 如果之前不是follower 将自己修改为follower节点
    if (local.state != RaftPeer.State.FOLLOWER) {

        Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
        // mk follower
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
    }

    final JsonNode beatDatums = beat.get("datums");
    local.resetLeaderDue();
    local.resetHeartbeatDue();

    peers.makeLeader(remote);

    if (!switchDomain.isSendBeatOnly()) {

        Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());

        for (Map.Entry<String, Datum> entry : datums.entrySet()) {
            //3 将自己内存的数据标记为0
            receivedKeysMap.put(entry.getKey(), 0);
        }

        // now check datums
        List<String> batch = new ArrayList<>();

        int processedCount = 0;
        if (Loggers.RAFT.isDebugEnabled()) {
            Loggers.RAFT
                    .debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
                            beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
        }
        //4 开始解析远程发来的key
        for (Object object : beatDatums) {
            processedCount = processedCount + 1;

            JsonNode entry = (JsonNode) object;
            String key = entry.get("key").asText();
            final String datumKey;

            if (KeyBuilder.matchServiceMetaKey(key)) {
                datumKey = KeyBuilder.detailServiceMetaKey(key);
            } else if (KeyBuilder.matchInstanceListKey(key)) {
                datumKey = KeyBuilder.detailInstanceListkey(key);
            } else {
                // ignore corrupted key:
                continue;
            }

            long timestamp = entry.get("timestamp").asLong();
            //5 标记是leader发来的key
            receivedKeysMap.put(datumKey, 1);

            try {
                //6 如果本地包含这个key 并且时间戳大于等于leader的时间戳 不处理
                if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
                        && processedCount < beatDatums.size()) {
                    continue;
                }
                //7 如果不包含这个key或者本地key不是新的 存储起来
                if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                    batch.add(datumKey);
                }
                //8 如果没有达到批量处理条件 继续处理
                if (batch.size() < 50 && processedCount < beatDatums.size()) {
                    continue;
                }

                String keys = StringUtils.join(batch, ",");

                if (batch.size() <= 0) {
                    continue;
                }

                Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
                                + ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
                        processedCount, beatDatums.size(), datums.size());

                // update datum entry
                //9 调用leader的接口 获取不是最新key的信息
                String url = buildUrl(remote.ip, API_GET);
                Map<String, String> queryParam = new HashMap<>(1);
                queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
                HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            return;
                        }

                        List<JsonNode> datumList = JacksonUtils
                                .toObj(result.getData(), new TypeReference<List<JsonNode>>() {
                                });

                        for (JsonNode datumJson : datumList) {
                            Datum newDatum = null;
                            OPERATE_LOCK.lock();
                            try {

                                Datum oldDatum = getDatum(datumJson.get("key").asText());

                                if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
                                        .get()) {
                                    Loggers.RAFT
                                            .info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
                                                    datumJson.get("key").asText(),
                                                    datumJson.get("timestamp").asLong(), oldDatum.timestamp);
                                    continue;
                                }

                                if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
                                    Datum<Service> serviceDatum = new Datum<>();
                                    serviceDatum.key = datumJson.get("key").asText();
                                    serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                    serviceDatum.value = JacksonUtils
                                            .toObj(datumJson.get("value").toString(), Service.class);
                                    newDatum = serviceDatum;
                                }

                                if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
                                    Datum<Instances> instancesDatum = new Datum<>();
                                    instancesDatum.key = datumJson.get("key").asText();
                                    instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                    instancesDatum.value = JacksonUtils
                                            .toObj(datumJson.get("value").toString(), Instances.class);
                                    newDatum = instancesDatum;
                                }

                                if (newDatum == null || newDatum.value == null) {
                                    Loggers.RAFT.error("receive null datum: {}", datumJson);
                                    continue;
                                }

                                raftStore.write(newDatum);

                                datums.put(newDatum.key, newDatum);
                                notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);

                                local.resetLeaderDue();

                                if (local.term.get() + 100 > remote.term.get()) {
                                    getLeader().term.set(remote.term.get());
                                    local.term.set(getLeader().term.get());
                                } else {
                                    local.term.addAndGet(100);
                                }

                                raftStore.updateTerm(local.term.get());

                                Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
                                        newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);

                            } catch (Throwable e) {
                                Loggers.RAFT
                                        .error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
                                                e);
                            } finally {
                                OPERATE_LOCK.unlock();
                            }
                        }
                        try {
                            TimeUnit.MILLISECONDS.sleep(200);
                        } catch (InterruptedException e) {
                            Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);
                        }
                        return;
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
                    }

                    @Override
                    public void onCancel() {

                    }

                });

                batch.clear();

            } catch (Exception e) {
                Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
            }

        }

        List<String> deadKeys = new ArrayList<>();
        //10 将leader已经删除的key删除
        for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
            if (entry.getValue() == 0) {
                deadKeys.add(entry.getKey());
            }
        }

        for (String deadKey : deadKeys) {
            try {
                deleteDatum(deadKey);
            } catch (Exception e) {
                Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
            }
        }

    }

    return local;
}
3.3 总结
  1. 首先是leader每隔一定时间会发送心跳给follower节点 心跳的数据主要包含key值和时间戳

  2. follower接收到心跳后 将与本地的key和时间戳做对比 将过期的数据重新向leader节点拉取一份更新到本地

  3. 如果本地的key值在leader发来的心跳中没有出现 说明这个key在leader节点被删除了 需要删除

4 cp架构的服务注册

4.1 注册流程

核心类:RaftConsistencyServiceImpl.java 核心方法:put


public void put(String key, Record value) throws NacosException {
    checkIsStopWork();
    try {
        //1 注册服务信息
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                e);
    }
}


public void signalPublish(String key, Record value) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    //1 判断节点是不是leader
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);

        final RaftPeer leader = getLeader();
        //2 如果不是leader 转发请求给leader
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }

    OPERATE_LOCK.lock();
    try {
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }

        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
        //3 将改动存储到本地文件并发布事件通知
        onPublish(datum, peers.local());

        final String content = json.toString();
        //4 通知其他节点变更 满足半数以上即可
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        for (final String server : peers.allServersIncludeMyself()) {
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            final String url = buildUrl(server, API_ON_PUB);
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.RAFT
                                .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                        datum.key, server, result.getCode());
                        return;
                    }
                    latch.countDown();
                }

                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                }

                @Override
                public void onCancel() {

                }
            });

        }
        //5 不满足半数直接抛异常
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }

        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        OPERATE_LOCK.unlock();
    }
}
4.2 总结
  1. 当客户端发送服务注册的消息时 如果发送到follower节点 follower节点会把消息转发给leader节点

  2. leader节点手动服务注册的消息时 首先将服务信息存储到本地文件 并且广播给follower节点存储服务信息

  3. 然后leader节点通知follower节点将服务信息存储提交 满足半数以上响应成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2241678.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【电脑】解决DiskGenius调整分区大小时报错“文件使用的簇被标记为空闲或与其它文件有交叉”

【电脑】解决DiskGenius调整分区大小时报错“文件使用的簇被标记为空闲或与其它文件有交叉” 零、报错 在使用DiskGenius对磁盘分区进行调整时&#xff0c;DiskGenius检查出磁盘报错&#xff0c;报错信息&#xff1a;文件使用的簇被标记为空闲或与其它文件有交叉&#xff0c;…

YOLOv11 C++ TensorRT

引用 YOLOv11 C TensorRT项目是一个用C实现并使用NVIDIA TensorRT进行优化的高性能对象检测解决方案。该项目利用 YOLOv11 模型提供快速准确的对象检测&#xff0c;并利用 TensorRT 最大限度地提高推理效率和性能。 &#x1f4e2; 更新 主要特点&#xff1a; 模型转换&#x…

产品思维如何颠覆我的开发与盈利观-营销自己

之前&#xff0c;我独自一人开发了一个名为“心情追忆”的小程序&#xff0c;旨在帮助用户记录日常的心情变化及重要时刻。从项目的构思、设计、前端&#xff08;小程序&#xff09;开发、后端搭建到最终部署&#xff0c;所有环节都由我一人包办。经过一个月的努力&#xff0c;…

Spring Boot框架:电商系统的快速构建

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本网上商城系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&…

【网页设计】CSS3 进阶(动画篇)

1. CSS3 2D 转换 转换&#xff08;transform&#xff09;是CSS3中具有颠覆性的特征之一&#xff0c;可以实现元素的位移、旋转、缩放等效果 转换&#xff08;transform&#xff09;你可以简单理解为变形 移动&#xff1a;translate旋转&#xff1a;rotate缩放&#xf…

Android12的ANR解析

0. 参考&#xff1a; ANR分析 深入理解 Android ANR 触发原理以及信息收集过程 1.ANR的触发分类: ANR分为4类&#xff1a; InputDispatchTimeout&#xff1a;输入事件分发超时5s,包括按键和触摸事件。BroadcastTimeout&#xff1a;比如前台广播在10s内未执行完成&#xff0…

【eNSP】路由基础与路由来源——静态路由实验

路由是数据包从源地址到目的地址的传输路径&#xff0c;静态路由是指网络管理员手动配置的路由条目&#xff0c;用于指定数据包从源地址到目的地址的固定路径。以下是关于静态路由的详细介绍。 一、路由的基础知识点 路由的定义&#xff1a; 路由是指在计算机网络中&#xff…

【AI声音克隆整合包及教程】第二代GPT-SoVITS V2:创新与应用

一、引言 随着科技的迅猛发展&#xff0c;声音克隆技术已经成为一个炙手可热的研究领域。SoVITS&#xff08;Sound Voice Intelligent Transfer System&#xff09;&#xff0c;作为该领域的先锋&#xff0c;凭借其卓越的性能和广泛的适用性&#xff0c;正在为多个行业带来前所…

VScode-Java开发常用插件

中文——界面易读 字体主题——代码可观 头注释——项目信息明了 java开发包——java必备 git协作开发——版本控制

jmeter常用配置元件介绍总结之逻辑控制器

系列文章目录 安装jmeter jmeter常用配置元件介绍总结之逻辑控制器 逻辑控制器1.IF控制器2.事务控制器3.循环控制器4.While控制器5.ForEach控制器6.Include控制器7.Runtime控制器8.临界部分控制器9.交替控制器10.仅一次控制器11.简单控制器12.随机控制器13.随机顺序控制器14.吞…

探索 HTML 和 CSS 实现的蜡烛火焰

效果演示 这段代码是一个模拟蜡烛火焰的HTML和CSS代码。它创建了一个具有动态效果的蜡烛火焰动画&#xff0c;包括火焰的摆动、伸缩和光晕的闪烁。 HTML <div class"holder"><div class"candle"><div class"blinking-glow"&g…

机器学习 - 为 Jupyter Notebook 安装新的 Kernel

https://ipython.readthedocs.io/en/latest/install/kernel_install.html 当使用jupyter-notebook --no-browser 启动一个 notebook 时&#xff0c;默认使用了该 jupyter module 所在的 Python 环境作为 kernel&#xff0c;比如 C:\devel\Python\Python311。 如果&#xff0c…

SwiftUI-基础入门

开发OS X 图形应用界面时有三种实现方式&#xff1a;XIB、Storyboard、SwiftUI。Storyboard基于XIB做了优化&#xff0c;但XIB基本被放弃了&#xff0c;而SwiftUI是苹果公司后来开发的一套编程语言&#xff0c;用来平替Objective-C。虽然现在Swift 6 还是有些不完善的地方&…

androidstudio入门到放弃配置

b站视频讲解传送门 android_studio安装包&#xff1a;https://developer.android.google.cn/studio?hlzh-cn 下载安装 开始创建hello-world 1.删除缓存 文件 下载gradle文件压缩&#xff1a;gradle-8.9用自己创建项目时自动生成的版本即可&#xff0c;不用和我一样 https://…

如何在pycharm中 判断是否成功安装pytorch环境

1、在电脑开始端&#xff0c;找到 2、打开后 在base环境下 输入conda env list 目前我的环境中没有pytorch 学习视频&#xff1a;【Anaconda、Pytorch、Pycharm到底是什么关系?什么是环境?什么是包?】https://www.bilibili.com/video/BV1CN411s7Ue?vd_sourcefad0750b8c6…

昆明华厦眼科医院举办中外专家眼科技术研讨会

9月13日&#xff0c;“睿智迭代&#xff0c;增效赋能”Menicon Z Night中外专家研讨会在昆明华厦眼科医院成功举办。此次会议由目立康公司与昆明华厦眼科医院携手共筑&#xff0c;标志着双方合作迈向新的高度。 昆明华厦眼科医院总经理王若镜首先发表了热情洋溢的致辞&#xff…

HarmonyOS ArkUI(基于ArkTS) 开发布局 (上)

一 ArkUI(基于ArkTS)概述 基于ArkTS的声明式开发范式的方舟开发框架是一套开发极简、高性能、支持跨设备的UI开发框架&#xff0c;提供了构建应用UI所必需的能力 点击详情 特点 开发效率高&#xff0c;开发体验好 代码简洁&#xff1a;通过接近自然语义的方式描述UI&#x…

【STM32】项目实战——OV7725/OV2604摄像头颜色识别检测(开源)

本篇文章分享关于如何使用STM32单片机对彩色摄像头&#xff08;OV7725/OV2604&#xff09;采集的图像数据进行分析处理&#xff0c;最后实现颜色的识别和检测。 目录 一、什么是颜色识别 1、图像采集识别的一些基本概念 1. 像素&#xff08;Pixel&#xff09; 2. 分辨率&am…

SpringCloud-使用FFmpeg对视频压缩处理

在现代的视频处理系统中&#xff0c;压缩视频以减小存储空间、加快传输速度是一项非常重要的任务。FFmpeg作为一个强大的开源工具&#xff0c;广泛应用于音视频的处理&#xff0c;包括视频的压缩和格式转换等。本文将通过Java代码示例&#xff0c;向您展示如何使用FFmpeg进行视…

大数据学习14之Scala面向对象--至简原则

1.类和对象 1.1基本概念 面向对象&#xff08;Object Oriented&#xff09;是一种编程思想&#xff0c;面向对象主要是把事物给对象化&#xff0c;包括其属性和行为。面向对象编程更贴近实际生活的思想&#xff0c;总体来说面向对象的底层还是面向过程&#xff0c;面向过程抽象…