Nacos源码解读07——集群数据同步

news2025/1/13 15:42:07

Distro协议背景

Distro 协议是 Nacos 社区自研的⼀种 AP 分布式协议,是面向临时实例设计的⼀种分布式协议,
其保证了在某些 Nacos 节点宕机后,整个临时实例处理系统依旧可以正常工作。作为⼀种有状态
的中间件应用的内嵌协议,Distro 保证了各个 Nacos 节点对于海量注册请求的统⼀协调和存储。

设计思想

1.Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
2. 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据⼀致性。
3. 每个节点独立处理读请求,及时从本地发出响应。

原理

数据初始化

新加入的 Distro 节点会进行全量数据拉取。具体操作是轮询所有的 Distro 节点,通过向其他的机
器发送请求拉取全量数据。
在这里插入图片描述
在全量拉取操作完成之后,Nacos 的每台机器上都维护了当前的所有注册上来的非持久化实例数
据。

数据校验

在 Distro 集群启动之后,各台机器之间会定期的发送心跳。心跳信息主要为各个机器上的所有数据
的元信息(之所以使用元信息,是因为需要保证网络中数据传输的量级维持在⼀个较低水平)。这
种数据校验会以心跳的形式进行,即每台机器在固定时间间隔会向其他机器发起⼀次数据校验请求。
在这里插入图片描述
⼀旦在数据校验过程中,某台机器发现其他机器上的数据与本地数据不⼀致,则会发起⼀次全量拉
取请求,将数据补齐。

写操作

对于⼀个已经启动完成的 Distro 集群,在⼀次客户端发起写操作的流程中,当注册非持久化的实例
的写请求打到某台 Nacos 服务器时,Distro 集群处理的流程图如下
在这里插入图片描述
整个步骤包括几个部分(图中从上到下顺序):
 前置的 Filter 拦截请求,并根据请求中包含的 IP 和 port 信息计算其所属的 Distro 责任节点,
并将该请求转发到所属的 Distro 责任节点上。
 责任节点上的 Controller 将写请求进行解析。
 Distro 协议定期执行 Sync 任务,将本机所负责的所有的实例信息同步到其他节点上。

读操作

由于每台机器上都存放了全量数据,因此在每⼀次读操作中,Distro 机器会直接从本地拉取数据。
快速响应。
在这里插入图片描述
这种机制保证了 Distro 协议可以作为⼀种 AP 协议,对于读操作都进行及时的响应。在网络分区
的情况下,对于所有的读操作也能够正常返回;当网络恢复时,各个 Distro 节点会把各数据分片的
数据进行合并恢复。

Distro总结

Distro 协议是 Nacos 对于临时实例数据开发的⼀致性协议。其数据存储在缓存中,并且会在启动
时进行全量数据同步,并定期进行数据校验
在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请
求场景主要分为三种情况:

  1. 当该节点接收到属于该节点负责的实例的写请求时,直接写入。
  2. 当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,
    从而完成读写。
  3. 当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机
    器上)。
    Distro 协议作为 Nacos 的内嵌临时实例⼀致性协议,保证了在分布式环境下每个节点上面的服务
    信息的状态都能够及时地通知其他节点,可以维持数十万量级服务实例的存储和⼀致性。

Distro寻址

什么是Distro寻址

寻址是指当检测到Naocs集群节点变化时能后及时更新节点信息

寻址模式

Nacos支持两种寻址模式分别为「文件寻址」和「地址服务器寻址」
文件寻址

默认为文件寻址,可以通过参数「nacos.core.member.lookup.type」设置取值为「file」或者「address-server」

文件寻址路径默认为 「${user.home}/nacos/conf/cluster.conf」

文件寻址cluster.conf配置文件的内容格式为「ip1:port,ip2:port」

地址寻址

地址服务器寻址默认为:http://jmenv.tbsite.net:8080/serverlist;其中域名、端口、url均可自定义

检测到集群节点变更时会更新缓存并发布MembersChangeEvent事件

为防止新节点没有初始化好,当检测到新节点加入时先设置该节点状态为DOWN,该节点不参与通信

过几秒通过节点之间通信将已初始化的新节点状态由DOWN设置为UP,该节点正式参与通信

Distro启动

DistroClientComponentRegistry和DistroHttpRegistry会在
Bean构造完成之后会执行doRegister方法进行Distro的注册

    // DistroClientComponentRegistry
    @PostConstruct
    public void doRegister() {
        // 创建distro处理器
        DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol,
                upgradeJudgement);
        // 创建distro传输代理对象
        DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy,
                serverMemberManager);
        // 创建失败处理器,该处理器主要是添加失败重试任务
        DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);
        // 注册Nacos:Naming:v2:ClientData类型数据的数据仓库实现
        componentHolder.registerDataStorage(DistroClientDataProcessor.TYPE, dataProcessor);
        // 注册Nacos:Naming:v2:ClientData类型的DistroData数据处理器
        componentHolder.registerDataProcessor(dataProcessor);
        // 注册Nacos:Naming:v2:ClientData类型数据的数据传输代理对象实现
        componentHolder.registerTransportAgent(DistroClientDataProcessor.TYPE, transportAgent);
        // 注册Nacos:Naming:v2:ClientData类型的失败任务处理器
        componentHolder.registerFailedTaskHandler(DistroClientDataProcessor.TYPE, taskFailedHandler);
    }
 
 
    // DistroHttpRegistry
    @PostConstruct
    public void doRegister() {
        // 创建distro数据存储对象
        componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                new DistroDataStorageImpl(dataStore, distroMapper));
        // 创建distro传输代理对象http
        componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager));
        // 创建失败处理器,该处理器主要是添加失败重试任务
        componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                new DistroHttpCombinedKeyTaskFailedHandler(taskEngineHolder));
        // 添加distro的http延时任务处理器
        taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
        // 注册数据处理器
        componentHolder.registerDataProcessor(consistencyService);
    }

DistroTaskEngineHolder

Distro的任务执行引擎他去做Distro的执行
这个类中有一个延时执行引擎和一个立即执行引擎,在创建类时会给延时执行引擎设置一个默认的任务处理器。DistroHttpRegistry中会注册一个任务处理器,则v1版本的话不会使用默认的任务处理器。

@Component
public class DistroTaskEngineHolder {
    
    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
    
    private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
    
    public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
        // 延时任务引擎设置默认任务处理器
        DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
        delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
    }
    
    public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
        return delayTaskExecuteEngine;
    }
    
    public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
        return executeWorkersManager;
    }
    public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
        this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
    }
}

DistroProtocol 加了@Component注解所以在bean实例化的时候会调用构造方法

    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroTaskEngineHolder distroTaskEngineHolder) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        //启动Distro任务
        startDistroTask();
    }
    private void startDistroTask() {
      // 判断是否是单机模式,如果是单机模式,不需要进行同步 直接设置初始化完成
        if (EnvUtil.getStandaloneMode()) {
            isInitialized = true;
            return;
        }
        // 开启验证任务
        startVerifyTask();
        // 开始加载全量拉取distro数据快照
        startLoadTask();
    }

Distro 定时校验同步节点数据

DistroProtocol

    private void startVerifyTask() {
        //开启一个定时线程池 定时五秒执行执行DistroVerifyTimedTask的run方法 去做校验同步
        GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
                        distroTaskEngineHolder.getExecuteWorkersManager()),
                DistroConfig.getInstance().getVerifyIntervalMillis());
    }

DistroVerifyTimedTask

    @Override
    public void run() {
        try {
          //获取集群其他节点
            List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("server list is: {}", targetServer);
            }
            //遍历数据类型  在Nacos server启动时初始化时两种类型HTTP和gRPC
            for (String each : distroComponentHolder.getDataStorageTypes()) {
                // 对dataStorage内的数据进行验证
                verifyForDataStorage(each, targetServer);
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
        }
    }
private void verifyForDataStorage(String type, List<Member> targetServer) {
        // 根据类型拿到对应的数据存储器  这里以GRPC为例
        DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
        // 判断该存储器有没有初始化完成(在启动时会全量拉取数据,拉去完则会设置FinishInitial为true)
        if (!dataStorage.isFinishInitial()) {
            Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
                    dataStorage.getClass().getSimpleName());
            return;
        }
        // 拿到要校验的distro数据集合
        List<DistroData> verifyData = dataStorage.getVerifyData();
        if (null == verifyData || verifyData.isEmpty()) {
            return;
        }
        for (Member member : targetServer) {
            // 获取到对应的传输代理http or rpc
            DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
            if (null == agent) {
                continue;
            }
            // 获取到distro传输代理对象,依次添加任务到立即执行引擎
            executeTaskExecuteEngine.addTask(member.getAddress() + type,
                    new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
        }
    }
@Override
public List<DistroData> getVerifyData() {
    List<DistroData> result = new LinkedList<>(); // 一组DistroData
    for (String each : clientManager.allClientId()) {
        Client client = clientManager.getClient(each);
        if (null == client || !client.isEphemeral()) { // 无效client或者非临时节点
            continue;
        }
        // 判断client是否为本几点负责的逻辑为ClientManagerDelegate#isResponsibleClient。即:属于ConnectionBasedClient并且isNative为true表示该client是直连到该节点的。
        if (clientManager.isResponsibleClient(client)) {
            // 构造Verify Data 主要信息为clientId,还有一个版本信息作为保留字段,目前都是0。
            DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            DistroData data = new DistroData(distroKey,
                    ApplicationUtils.getBean(Serializer.class).serialize(verifyData)); // 序列化校验数据
            data.setType(DataOperation.VERIFY);
            result.add(data);
        }
    }
    return result;
}

DistroVerifyExecuteTask

    @Override
    public void run() {
        //遍历要校验的distro数据集合
        for (DistroData each : verifyData) {
            try {
                 // // 判断是否支持回调传输数据  grpc支持回调
                if (transportAgent.supportCallbackTransport()) {
                 // 发送验证数据带回调
                    doSyncVerifyDataWithCallback(each);
                } else {
                 // 发送验证数据
                    doSyncVerifyData(each);
                }
            } catch (Exception e) {
                Loggers.DISTRO
                        .error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
            }
        }
    }
    private void doSyncVerifyData(DistroData data) {
        transportAgent.syncVerifyData(data, targetServer);
    }
    @Override
    public boolean syncVerifyData(DistroData verifyData, String targetServer) {
        //判断是否存在目标服务器
        if (isNoExistTarget(targetServer)) {
            return true;
        }
        // replace target server as self server so that can callback.
        verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
        //创建请求
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        //获取要发送请求的成员
        Member member = memberManager.find(targetServer);
        //节点状态检查需UP状态,即:可通信状态
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer);
            return false;
        }
        try {
            //发送请求
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            return checkResponse(response);
        } catch (NacosException e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! ", e);
        }
        return false;
    }

发送RPC请求

    public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {
        RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
        if (client != null) {
            return client.request(request, timeoutMills);
        } else {
            throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
        }
    }

客户端处理校验请求

DistroDataRequestHandler

    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                //校验请求 
                case VERIFY:
                    return handleVerify(request.getDistroData(), meta);
                 ......
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
    private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
        DistroDataResponse result = new DistroDataResponse();
        //如果缓存存在client则校验成功,刷新client保鲜时间,否则校验失败。
        if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
            result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
        }
        return result;
    }

public boolean onVerify(DistroData distroData, String sourceAddress) {
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),
                    distroData.getDistroKey());
        }
        String resourceType = distroData.getDistroKey().getResourceType();
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
            return false;
        }
        // 使用DistroData处理器来处理校验请求
        return dataProcessor.processVerifyData(distroData, sourceAddress);
    }

DistroClientDataProcessor

    @Override
    public boolean processVerifyData(DistroData distroData, String sourceAddress) {
        //获取Distro客户端校验信息
        DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class)
                .deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
         //根据clientId来验证不同的客户端       
        if (clientManager.verifyClient(verifyData.getClientId())) {
            return true;
        }
        Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
        return false;
    }

ConnectionBasedClientManager

    @Override
    public boolean verifyClient(String clientId) {
        ConnectionBasedClient client = clients.get(clientId);
         // 如果已经存在,则更新最近一次有效连接时间
        if (null != client) {
            client.setLastRenewTime();
            return true;
        }
        //如果不存在返回false
        return false;
    }

Distro定时校验总结

节点之间发送校验数据是在全量同步后进行的;发送校验的频率默认为5秒钟一次;校验数据包括clientId和version,其中version为保留字段当前为0;接受到校验数据后如果缓存中存在该client表示校验成功,同时更新保鲜时间,否则校验失败

节点刚加入全量拉取数据快照

启动拉取任务

    private void startLoadTask() {
        DistroCallback loadCallback = new DistroCallback() {
            @Override
            public void onSuccess() {
                isInitialized = true;
            }
            
            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        GlobalExecutor.submitLoadDataTask(
                new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
    }

DistroLoadDataTask

数据初始化读取任务

    @Override
    public void run() {
        try {
         //从集群中其他节点全量加载数据
            load();
            //如果没有加载成功延迟30秒钟重新执行一次,可以通过参数「nacos.core.protocol.distro.data.load_retry_delay_ms」指定
            if (!checkCompleted()) {
                GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
            } else {
                loadCallback.onSuccess();
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
            }
        } catch (Exception e) {
            loadCallback.onFailed(e);
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
        }
    }

从其他节点拉取全量数据

    private void load() throws Exception {
        while (memberManager.allMembersWithoutSelf().isEmpty()) {
            Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
            TimeUnit.SECONDS.sleep(1);
        }
        while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
            Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
            TimeUnit.SECONDS.sleep(1);
        }
        //不同的数据类型缓存快照,此处有gRPC和http两类数据类型
        for (String each : distroComponentHolder.getDataStorageTypes()) {
            if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
                loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
            }
        }
    }
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == transportAgent || null == dataProcessor) {
        return false;
    }
    //获取集群中除了本节点的其他节点,循环重试获取快照,直到有成功节点返回快照,成功后设置状态状态完成初始化
    for (Member each : memberManager.allMembersWithoutSelf()) { 
        try {
            //获取集群其他节点数据
           DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
           //处理数据同步到本地
            boolean result = dataProcessor.processSnapshot(distroData);
            if (result) {
                distroComponentHolder.findDataStorage(resourceType).finishInitial(); // 设置为完成初始化
                return true;
            }
        } catch (Exception e) {
           
        }
    }
    return false;
}

发起读取数据快照请求

@Override
public DistroData getDatumSnapshot(String targetServer) {
    Member member = memberManager.find(targetServer);
    if (checkTargetServerStatusUnhealthy(member)) {
        throw new DistroException(
                String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
    }
    DistroDataRequest request = new DistroDataRequest();
   // 设置请求操作为SNAPSHOT
    request.setDataOperation(DataOperation.SNAPSHOT); 
    try {
       // 发起请求快照数据
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        if (checkResponse(response)) {
            return ((DistroDataResponse) response).getDistroData();
        } else {
            throw new DistroException(
                    String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
                            targetServer, response.getErrorCode(), response.getMessage()));
        }
    } catch (NacosException e) {
        throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
    }
}

服务端接收读取快照请求

DistroDataRequestHandler

    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                   ......
                case SNAPSHOT:
                    return handleSnapshot();
                   ......
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
    private DistroDataResponse handleSnapshot() {
        DistroDataResponse result = new DistroDataResponse();
        //读取快照数据
        DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
        result.setDistroData(distroData);
        return result;
    }
@Override
public DistroData getDatumSnapshot() {
    List<ClientSyncData> datum = new LinkedList<>();
    // 把本节点的所有client数据全部封装
    for (String each : clientManager.allClientId()) {
       // 获取对应客户端
        Client client = clientManager.getClient(each);
        if (null == client || !client.isEphemeral()) {
            continue;
        }
        // 针对客户端生成数据
        datum.add(client.generateSyncData());
    }
     // 封装响应数据
    ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
    snapshot.setClientSyncDataList(datum);
    // 将数据序列化成json字符串 再转成字节数据
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot); 
    // 返回响应数据
    return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}

生成服务数据

client数据信息,命名空间、分组名称、服务名称、节点Instance信息(IP、端口等等)。

public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

总结

集群中每个节点都拥有所有的快照数据;在节点启动时会从集群中其他节点中的一个节点同步快照数据并缓存在Map中;缓存的数据类型分类两类分别为HTTP和gRPC;具体数据即客户端注册节点信息含命名空间、分组名称、服务名称、节点Instance信息等。

增量数据同步

当某台机器发现其他机器上的数据与本地数据不⼀致 会触发一次

校验数据失败处理

    @Override
    public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
        if (isNoExistTarget(targetServer)) {
            callback.onSuccess();
            return;
        }
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        Member member = memberManager.find(targetServer);
        try {
            //注意这里有一个DistroVerifyCallbackWrapper的回调类当调用结束之后会调用到onResponse方法
            DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                    verifyData.getDistroKey().getResourceKey(), callback, member);
            clusterRpcClientProxy.asyncRequest(member, request, wrapper);
        } catch (NacosException nacosException) {
            callback.onFailed(nacosException);
        }
    }
    @Override
        public void onResponse(Response response) {
            
            if (checkResponse(response)) {
                NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
                distroCallback.onSuccess();
            } else {
             //校验失败处理
                Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
                //发送一个校验失败的事件 
                NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
                //更新为失败 
                NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
                distroCallback.onFailed(null);
            }
        }

DistroClientDataProcessor 订阅了
DistroClientDataProcessor会监听到ClientVerifyFailedEvent事件

    @Override
    public void onEvent(Event event) {
        // 必须是集群
        if (EnvUtil.getStandaloneMode()) {
            return;
        }
        //必须是grpc请求
        if (!upgradeJudgement.isUseGrpcFeatures()) {
            return;
        }
        //校验失败事件
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
        //同步事件
            syncToAllServer((ClientEvent) event);
        }
    }

校验失败任务客户端处理

private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {
        // 判断客户端是否存在
        Client client = clientManager.getClient(event.getClientId());
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        // 如果存在,则开始进行同步
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        // 这里也是发送的 action为add的事件 注意后续的处理流程 他们最后和syncToAllServer方法一样最后都由DistroDelayTaskProcessor处理
        distroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                targetServer);
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // 添加到延时任务执行引擎来执行     
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
        }
    }

同步集群的其他节点

private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
       // 当客户端断开连接事件ClientDisconnectEvent时,向其他节点同步DELETE操作
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
       // 当客户端变更事件ClientChangedEvent时,向其他节点同步CHANGE操作
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        //遍历所有需要同步的节点
        for (Member each : memberManager.allMembersWithoutSelf()) {
            //执行数据增量更新
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }
    /**
     * Start to sync to target server.
     *
     * @param distroKey    distro key of sync data
     * @param action       the action of data operation
     * @param targetServer target server
     * @param delay        delay time for sync
     */
    public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                targetServer);
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        //添加任务让执行器去做执行
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
        }
    }

后续看任务队列执行同步数据操作

任务队列执行同步数据操作

DistroDelayTaskProcessor会去执行处理延时任务 因为之前发的Action是ADD所以会走ADD的逻辑

    @Override
    public boolean process(NacosTask task) {
         // 不处理非延迟任务
        if (!(task instanceof DistroDelayTask)) {
            return true;
        }
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        switch (distroDelayTask.getAction()) {
            case DELETE:
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
            // 添加任务到立即执行任务引擎     
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }

DistroSyncChangeTask继承了AbstractDistroExecuteTask而AbstractDistroExecuteTask又继承AbstractExecuteTask ,AbstractExecuteTask 内容实现了Runnable所以会执行AbstractDistroExecuteTask的run方法

    @Override
    public void run() {
        String type = getDistroKey().getResourceType();
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
        if (null == transportAgent) {
            Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
            return;
        }
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
         grpc支持回调
        if (transportAgent.supportCallbackTransport()) {
            doExecuteWithCallback(new DistroExecuteCallback());
        } else {
            executeDistroTask();
        }
    }
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        //获取DistroData 
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        //执行同步数据
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }

DistroClientTransportAgent

@Override
public boolean syncData(DistroData data, String targetServer) {
    if (isNoExistTarget(targetServer)) {
        return true;
    }
    // 构造请求数据并设置数据类型
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    // 查找目标节点缓存数据
    Member member = memberManager.find(targetServer);
    // 节点状态检查需UP状态,即:可通信状态
    if (checkTargetServerStatusUnhealthy(member)) {
        Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
        return false;
    }
    try {
        // 向目标节点发送数据
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        return checkResponse(response);
    } catch (NacosException e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
    }
    return false;
}

客户端节点处理增量同步请求

DistroDataRequestHandler ADD、CHANGE、DELETE都是由handleSyncData方法进行处理

    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                ......
                case ADD:
                case CHANGE:
                case DELETE:
                    return handleSyncData(request.getDistroData());
                ...... 
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
private DistroDataResponse handleSyncData(DistroData distroData) {
    DistroDataResponse result = new DistroDataResponse();
    if (!distroProtocol.onReceive(distroData)) {
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("[DISTRO-FAILED] distro data handle failed");
    }
    return result;
}
    public boolean onReceive(DistroData distroData) {
        Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
                distroData.getDistroKey());
        String resourceType = distroData.getDistroKey().getResourceType();
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
            return false;
        }
        //同步信息
        return dataProcessor.processData(distroData);
    }

DistroClientDataProcessor

    @Override
    public boolean processData(DistroData distroData) {
        switch (distroData.getType()) {
            case ADD:
            case CHANGE:
                ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                        .deserialize(distroData.getContent(), ClientSyncData.class);
                //将同步过来的Client信息进行缓存        
                handlerClientSyncData(clientSyncData);
                return true;
                //响应删除操作,从clients缓存中移除。
            case DELETE:
                String deleteClientId = distroData.getDistroKey().getResourceKey();
                Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
                clientManager.clientDisconnected(deleteClientId);
                return true;
            default:
                return false;
        }
    }

移除信息

@Override
public boolean clientDisconnected(String clientId) {
    Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    return true;
}

同步信息

    private void handlerClientSyncData(ClientSyncData clientSyncData) {
        Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
        //Client信息缓存 需要的是从其他节点通过过来的Client信息,ConnectionBasedClient属性isNative为false表示该连接时从其他节点同步过来的;true表示该连接客户端直接连接的。
        clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
        //获取Client信息
        Client client = clientManager.getClient(clientSyncData.getClientId());
        //更新Client的Service以及Instance信息
        upgradeClient(client, clientSyncData);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1294551.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

柏林噪声C++

柏林噪声 随机噪声 如上图所示随机噪声没有任何规律可言&#xff0c;我们希望生成有一些意义的局部连续的随机图案 一维柏林噪声 假设希望生成一段局部连续的随机曲线&#xff0c;可以采用插值的方式&#xff1a;在固定点随机分配y值&#xff08;一般是整数点&#xff09;&a…

docker安装elasticsearch8.5.0和kibana

服务器环境&#xff0c;centos7 一、安装elasticsearch 1. 创建一个es和kibana通用的网络 docker network create es-net 2. 拉取es镜像&#xff0c;这里选择8.5.0版本 docker pull elasticsearch:8.5.03. 创建挂载目录&#xff0c;并授权 mkdir /usr/local/install/ela…

WebStorm:Mac/Win上强大的JavaScript开发工具

WebStorm是JetBrains公司开发的针对Mac和Windows系统的JavaScript开发工具。它为开发者提供了一站式的代码编辑、调试、测试和版本控制等功能&#xff0c;帮助你更高效地进行Web开发。新版本的WebStorm 2023在性能和用户体验方面都做出了重大改进&#xff0c;让你的JavaScript开…

机器学习实战:预测波士顿房价

前言&#xff1a; Hello大家好&#xff0c;我是Dream。 今天来学习一下机器学习中一个非常经典的案例&#xff1a;预测波士顿房价&#xff0c;在此过程中也会补充很多重要的知识点&#xff0c;欢迎大家一起前来探讨学习~ 一、导入数据 在这个项目中&#xff0c;我们利用马萨诸…

富时中国A50指数查询方法详解

富时中国A50指数&#xff0c;是指衡量中国A股市场50家具有代表性的上市公司股票表现的指数。对于投资者来说&#xff0c;了解和查询A50指数的走势对于制定投资策略至关重要。那么&#xff0c;如何轻松地查询富时中国A50指数呢&#xff1f; 1. 百度搜索 百度搜索引擎是最简便的…

【Node.js后端架构:MVC模式】基于expres讲解

Node.js后端架构&#xff1a;MVC模式 什么是MVC MVC (Model-View-Controller) 是一种软件设计模式&#xff0c;用于将应用程序的逻辑分离成三个不同的组件&#xff1a;模型、视图和控制器。 模型&#xff08;Model&#xff09;负责处理应用程序的数据逻辑。它负责从数据库或其…

使用Netty实现文件传输的HTTP服务器和客户端

现在我们来用netty实现文件传输的HTTP服务器和客户端 pom依赖文件&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-insta…

国内好用的CRM系统你知道哪些?看看这篇吧

选择CRM系统需要考虑许多因素&#xff0c;其中一个重要的标准是用户数量。使用广泛、受到用户认可的CRM系统&#xff0c;无论是在功能、易用性、灵活性还是价格上&#xff0c;都会给企业带来更多的益处。有什么国内用得比较多的CRM系统吗&#xff1f; 以Zoho CRM为例&#xff…

Linux学习教程(第十四章 Linux系统服务管理)一

第十四章 Linux系统服务管理&#xff08;一&#xff09; 什么是系统服务&#xff1f;服务是在后台运行的应用程序&#xff0c;并且可以提供一些本地系统或网络的功能。 那么&#xff0c;Linux 中常见的服务有那些&#xff0c;这些服务怎么分类&#xff0c;服务如何启动&#x…

视频处理关键知识

1 引言 视频技术发展到现在已经有100多年的历史&#xff0c;虽然比照相技术历史时间短&#xff0c;但在过去很长一段时间之内都是最重要的媒体。由于互联网在新世纪的崛起&#xff0c;使得传统的媒体技术有了更好的发展平台&#xff0c;应运而生了新的多媒体技术。而多媒体技术…

为什么感染HPV的人越来越多?劲松中西医结合医院专家发表看法

近年来&#xff0c;HPV感染率在我国呈现上升趋势&#xff0c;引起了社会的广泛关注。HPV是一种人乳头瘤病毒&#xff0c;主要通过性接触传播&#xff0c;也是引起宫颈癌的主要原因之一。那么&#xff0c;为什么我国的HPV感染率如此高呢&#xff1f; 首先&#xff0c;我们需要了…

手把手教你写 Compose 动画 -- 讲的不能再细的 AnimationSpec 动画规范

前面我们聊过 animateDpAsState 和 Animatable 两种动画 API 的用法&#xff0c;但只是简单了解了&#xff0c;这两个函数内部都有一个共同的核心参数&#xff1a;AnimationSpec。 Composable fun animateDpAsState(targetValue: Dp,animationSpec: AnimationSpec<Dp> …

友思特 Neuro-R:快速部署实时推理API

科技赋能&#xff0c;视觉技术在缺陷检测方面的应用为生产制造自动化程度带来了重大影响&#xff0c;有效缩短了检测时间&#xff0c;提升了生产效率&#xff0c;创造出更多的经济效益。通过视觉技术的精准分析和快速判断&#xff0c;缺陷检测变得更加准确、高效&#xff0c;无…

Java二十一章 网络通信

1 网络程序设计基础 网络程序设计编写的是与其他计算机进行通信的程序。 局域网与互联网 服务器是指提供信息的计算机或程序&#xff0c;客户机是指请求信息的计算机或程序。网络用于连接服务器与客户机&#xff0c;实现两者间的相互通信。 网络协议 网络协议规定了计算机…

echarts绘制一个柱状折线图

其他echarts&#xff1a; echarts绘制一个环形图 echarts绘制一个饼图 echarts绘制一个环形图2 效果&#xff1a; 代码&#xff1a; <template><div class"wrapper"><!-- 柱状图 --><div v-if"type new_bar" ref"barChar…

2023最新八股文前端面试题

第一章 Css 1.说一下CSS的盒模型。 在HTML页面中的所有元素都可以看成是一个盒子盒子的组成:内容content、内边距padding、边框border、外边距margin盒模型的类型: 标准盒模型 margin border padding content IE盒模型 margin content(border padding) 控制盒模型的模式…

如何选择靠谱的软件测试外包公司?CMA、CNAS软件测试报告获取

作为信息科技产业的代表之一&#xff0c;软件公司受到了越来越多的关注&#xff0c;它们的发展为我国的科技创新提供了强大的战略支撑。软件测试作为提升软件产品质量的后盾&#xff0c;日益成为一个专业化、标准化和规范化的行业&#xff0c;软件测试外包公司就是这种背景下成…

每日汇评:由于非农就业数据可能低迷,黄金恐再次测试2050美元上方

周五早间&#xff0c;金价在2,030美元附近扩大区间&#xff1b; 美元正在企稳&#xff0c;美债收益率坚持复苏收益&#xff1b; 由于美国就业大概率疲软&#xff0c;黄金日线图倾向于看涨&#xff1b; 周五早间&#xff0c;金价连续第四个交易日延续区间波动走势&#xff0c;因…

员工持股平台模式有哪几种?

员工持股平台模式 目前在现有的市场环境下持股平台的模式主要有公司型的持股平台以及有限合伙企业的持股平台。 &#xff08;一&#xff09;公司型员工持股平台 设立公司型的员工持股平台的唯一目的是为了让平台公司受让母公司的股权&#xff0c;从而实现员工间接持有母公司股权…

外贸行业的CRM系统和其它CRM有什么区别?

外贸行业对客户管理的追求日益提高&#xff0c;为了应对客户需求的变化和多元性&#xff0c;外贸企业需要借助CRM管理系统实现智能管理。下面&#xff0c;我们将详细探讨外贸CRM的概念、特点和具体应用。 什么是外贸CRM&#xff1f; 外贸CRM是指针对外贸行业的客户关系管理系…