Nacos1.X中对NacosNamingService的实现

news2024/9/19 11:54:59

Nacos服务注册与发现的实现原理图

在这里插入图片描述
服务注册与发现的功能:

  • 服务实例启动时注册到服务注册表、关闭时则注销(服务注册)
  • 服务注册中心根据服务实例定时发送的心跳包,实现健康检测(健康检查BeatReactor中的BeatTask)
  • 服务消费者可以通过查询服务注册表来获得可用的实例(服务发现)
  • 服务消费者定时拉取服务注册中心的服务实例数据(HostReactor中的UpdateTask)
  • 服务注册中心检测到服务提供者异常,主动通过UDP协议推送更新给服务消费者(PushReceiver)

NacosNamingService

Nacos Client包中的NamingService实现类为NacosNamingService,通过封装好的SDK供用户使用,来调用nacos对外暴露的OpenAPI

SDK方式只是提供了一种访问的封装,在底层仍然是基于HTTP协议完成请求的。

NamingService提供了以下方法:

  • registerInstance:注册实例

  • deregisterInstance:注销实例

  • getAllInstances:获取某一服务的所有实例

  • selectInstances:获取某一服务健康或不健康的实例

  • selectOneHealthyInstance:根据权重选择一个健康的实例

  • getServerStatus:检测服务端健康状态

  • subscribe:注册对某个服务的监听

  • unsubscribe:注销对某个服务的监听

  • getSubscribeServices:获取被监听的服务

  • getServicesOfServer:获取命名空间(namespace)下的所有服务名

NacosNamingService还初始化了其他核心类,外提供的方法都是委托给其他核心类处理的。按顺序将依次初始化NamingProxy、BeatReactor、HostReactor

  • NamingProxy:用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端
  • BeatReactor:本地实例心跳,用于向Nacos服务端发送本地服务的心跳
  • HostReactor:用于从注册中心获取、保存、更新各服务实例信息
public class NacosNamingService implements NamingService {
    private String namespace;
    private String endpoint;
    private String serverList;
    private String cacheDir;
    private String logName;
    private HostReactor hostReactor;
    //心跳包响应
    private BeatReactor beatReactor;
    //进行服务注册的带来,通过NamingProxy与Nacos Server进行最终的通信
    private NamingProxy serverProxy;

    public NacosNamingService(String serverList) throws NacosException {
        Properties properties = new Properties();
        properties.setProperty("serverAddr", serverList);
        this.init(properties);
    }

    public NacosNamingService(Properties properties) throws NacosException {
        this.init(properties);
    }

    private void init(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties);
        this.namespace = InitUtils.initNamespaceForNaming(properties);
        InitUtils.initSerialization();
        this.initServerAddr(properties);
        InitUtils.initWebRootContext(properties);
        this.initCacheDir();
        this.initLogName(properties);
        //NamingService网络层代理
        this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
        //心跳包检测线程池
        this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));
        this.hostReactor = new HostReactor(this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.isPushEmptyProtect(properties), this.initPollingThreadCount(properties));
    }
     ......省略......

    //注册服务,委托NamingProxy处理
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //如果是临时节点
        if (instance.isEphemeral()) {
            //构造心跳包
            BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
            //将心跳包加到定时线程池中定时执行
            this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        //服务注册
        this.serverProxy.registerService(groupedServiceName, groupName, instance);
    }
    //注销服务,委托NamingProxy处理
    public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        //如果是临时节点,则移除心跳包
        if (instance.isEphemeral()) {
            this.beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
        }
        //调用NamingProxy进行服务注销
        this.serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
    }

    //获取所有服务实例的方法,委托HostReactor处理
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
        ServiceInfo serviceInfo;
        // 如果该消费者订阅了这个服务,那么会先从本地维护的服务列表中获取,本地为空再从服务注册中心获取服务
        if (subscribe) {
            serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            // 否则实例会从服务中心进行获取
            serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }

        List list;
        return (List)(serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts()) ? list : new ArrayList());
    }

    //获取健康(不健康)服务实例方法,委托HostReactor处理
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
        ServiceInfo serviceInfo;
        // 如果该消费者订阅了这个服务,那么会在本地维护一个服务列表,服务从本地获取
        if (subscribe) {
            serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            // 否则实例会从服务中心进行获取
            serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }

        return this.selectInstances(serviceInfo, healthy);
    }

    private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
        List list;
        if (serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            Iterator iterator = list.iterator();

            while(true) {
                Instance instance;
                do {
                    if (!iterator.hasNext()) {
                        return list;
                    }

                    instance = (Instance)iterator.next();
                } while(healthy == instance.isHealthy() && instance.isEnabled() && instance.getWeight() > 0.0D);

                iterator.remove();
            }
        } else {
            return new ArrayList();
        }
    }

    //获取一个健康的实例,委托HostReactor处理
    public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
        return subscribe ? RandomByWeight.selectHost(this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","))) : RandomByWeight.selectHost(this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
    }

    //监听服务实例,委托HostReactor处理
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        this.hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
    }
    //取消监听服务
    public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        this.hostReactor.unSubscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
    }

    //查询服务列表
    public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {
        return this.serverProxy.getServiceList(pageNo, pageSize, groupName, selector);
    }

    public List<ServiceInfo> getSubscribeServices() {
        return this.hostReactor.getSubscribeServices();
    }

    public String getServerStatus() {
        return this.serverProxy.serverHealthy() ? "UP" : "DOWN";
    }

    public BeatReactor getBeatReactor() {
        return this.beatReactor;
    }

    public void shutDown() throws NacosException {
        this.beatReactor.shutdown();
        this.hostReactor.shutdown();
        this.serverProxy.shutdown();
    }
}

NamingProxy

NamingProxy用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端。

public class NamingProxy implements Closeable {
    //Nacos自定义的RestTemplate
    private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
    //默认服务端口
    private static final int DEFAULT_SERVER_PORT = 8848;
    private int serverPort = 8848;
    //命名空间
    private final String namespaceId;
    private final String endpoint;
    private String nacosDomain;
    private List<String> serverList;
    private List<String> serversFromEndpoint = new ArrayList();
    private final SecurityProxy securityProxy;
    private long lastSrvRefTime = 0L;
    private final long vipSrvRefInterMillis;
    private final long securityInfoRefreshIntervalMills;
    private Properties properties;
    //刷新定时任务
    private ScheduledExecutorService executorService;
    //最大重试次数,默认值是3
    private int maxRetry;

    public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) {
        this.vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30L);
        this.securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5L);
        this.securityProxy = new SecurityProxy(properties, this.nacosRestTemplate);
        this.properties = properties;
        this.setServerPort(8848);
        this.namespaceId = namespaceId;
        this.endpoint = endpoint;
        this.maxRetry = ConvertUtils.toInt(properties.getProperty("namingRequestDomainMaxRetryCount", String.valueOf(3)));
        if (StringUtils.isNotEmpty(serverList)) {
            this.serverList = Arrays.asList(serverList.split(","));
            if (this.serverList.size() == 1) {
                this.nacosDomain = serverList;
            }
        }

        this.initRefreshTask();
    }
    //注册服务
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        
        Map<String, String> params = new HashMap(16);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        // 把上述服务实例的一些必要参数保存到一个Map中,通过OpenAPI的方式发送注册请求
        this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");
    }
    //注销服务
    public void deregisterService(String serviceName, Instance instance) throws NacosException {
       
        Map<String, String> params = new HashMap(8);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        this.reqApi(UtilAndComs.nacosUrlInstance, params, "DELETE");
    }

    //更新服务
    public void updateInstance(String serviceName, String groupName, Instance instance) throws NacosException {
       
        Map<String, String> params = new HashMap(8);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enabled", String.valueOf(instance.isEnabled()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        this.reqApi(UtilAndComs.nacosUrlInstance, params, "PUT");
    }

    public Service queryService(String serviceName, String groupName) throws NacosException {
        
        Map<String, String> params = new HashMap(3);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        String result = this.reqApi(UtilAndComs.nacosUrlService, params, "GET");
        return (Service)JacksonUtils.toObj(result, Service.class);
    }

    public void createService(Service service, AbstractSelector selector) throws NacosException {
        
        Map<String, String> params = new HashMap(6);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", service.getName());
        params.put("groupName", service.getGroupName());
        params.put("protectThreshold", String.valueOf(service.getProtectThreshold()));
        params.put("metadata", JacksonUtils.toJson(service.getMetadata()));
        params.put("selector", JacksonUtils.toJson(selector));
        this.reqApi(UtilAndComs.nacosUrlService, params, "POST");
    }

    public boolean deleteService(String serviceName, String groupName) throws NacosException {
        
        Map<String, String> params = new HashMap(6);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        String result = this.reqApi(UtilAndComs.nacosUrlService, params, "DELETE");
        return "ok".equals(result);
    }

    public void updateService(Service service, AbstractSelector selector) throws NacosException {
       
        Map<String, String> params = new HashMap(6);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", service.getName());
        params.put("groupName", service.getGroupName());
        params.put("protectThreshold", String.valueOf(service.getProtectThreshold()));
        params.put("metadata", JacksonUtils.toJson(service.getMetadata()));
        params.put("selector", JacksonUtils.toJson(selector));
        this.reqApi(UtilAndComs.nacosUrlService, params, "PUT");
    }

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
        Map<String, String> params = new HashMap(8);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        return this.reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, "GET");
    }

    //发送心跳信息,通过心跳机制向服务端报告实例的健康状态
    public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
            LogUtils.NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString());
        }

        Map<String, String> params = new HashMap(8);
        Map<String, String> bodyMap = new HashMap(2);
        if (!lightBeatEnabled) {
            bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
        }

        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", beatInfo.getServiceName());
        params.put("clusterName", beatInfo.getCluster());
        params.put("ip", beatInfo.getIp());
        params.put("port", String.valueOf(beatInfo.getPort()));
        String result = this.reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, "PUT");
        return JacksonUtils.toObj(result);
    }

    public boolean serverHealthy() {
        try {
            String result = this.reqApi(UtilAndComs.nacosUrlBase + "/operator/metrics", new HashMap(2), "GET");
            JsonNode json = JacksonUtils.toObj(result);
            String serverStatus = json.get("status").asText();
            return "UP".equals(serverStatus);
        } catch (Exception var4) {
            return false;
        }
    }
    //查询服务列表
    public ListView<String> getServiceList(int pageNo, int pageSize, String groupName) throws NacosException {
        return this.getServiceList(pageNo, pageSize, groupName, (AbstractSelector)null);
    }

    public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {
        Map<String, String> params = new HashMap(4);
        params.put("pageNo", String.valueOf(pageNo));
        params.put("pageSize", String.valueOf(pageSize));
        params.put("namespaceId", this.namespaceId);
        params.put("groupName", groupName);
        if (selector != null) {
            switch(SelectorType.valueOf(selector.getType())) {
            case none:
            default:
                break;
            case label:
                ExpressionSelector expressionSelector = (ExpressionSelector)selector;
                params.put("selector", JacksonUtils.toJson(expressionSelector));
            }
        }

        String result = this.reqApi(UtilAndComs.nacosUrlBase + "/service/list", params, "GET");
        JsonNode json = JacksonUtils.toObj(result);
        ListView<String> listView = new ListView();
        listView.setCount(json.get("count").asInt());
        listView.setData((List)JacksonUtils.toObj(json.get("doms").toString(), new TypeReference<List<String>>() {
        }));
        return listView;
    }

    public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
        return this.reqApi(api, params, Collections.EMPTY_MAP, method);
    }

    public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {
        return this.reqApi(api, params, body, this.getServerList(), method);
    }

    public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) {
            throw new NacosException(400, "no server available");
        } else {
            NacosException exception = new NacosException();
            if (StringUtils.isNotBlank(this.nacosDomain)) {
                int i = 0;

                while(i < this.maxRetry) {
                    try {
                        return this.callServer(api, params, body, this.nacosDomain, method);
                    } catch (NacosException var12) {
                        exception = var12;
                        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
                            LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12);
                        }

                        ++i;
                    }
                }
            } else {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());
                int i = 0;

                while(i < servers.size()) {
                    String server = (String)servers.get(index);

                    try {
                        return this.callServer(api, params, body, server, method);
                    } catch (NacosException var13) {
                        exception = var13;
                        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
                            LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13);
                        }

                        index = (index + 1) % servers.size();
                        ++i;
                    }
                }
            }

            throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
        }
    }

    public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer) throws NacosException {
        return this.callServer(api, params, body, curServer, "GET");
    }

    public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0L;
        this.injectSecurityInfo(params);
        Header header = this.builderHeader();
        String url;
        if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
            if (!IPUtil.containsPort(curServer)) {
                curServer = curServer + ":" + this.serverPort;
            }

            url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
        } else {
            url = curServer + api;
        }

        try {
            HttpRestResult<String> restResult = this.nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
            end = System.currentTimeMillis();
            MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe((double)(end - start));
            if (restResult.ok()) {
                return (String)restResult.getData();
            } else if (304 == restResult.getCode()) {
                return "";
            } else {
                throw new NacosException(restResult.getCode(), restResult.getMessage());
            }
        } catch (Exception var13) {
            LogUtils.NAMING_LOGGER.error("[NA] failed to request", var13);
            throw new NacosException(500, var13);
        }
    }

    private void injectSecurityInfo(Map<String, String> params) {
        if (StringUtils.isNotBlank(this.securityProxy.getAccessToken())) {
            params.put("accessToken", this.securityProxy.getAccessToken());
        }

        String ak = this.getAccessKey();
        String sk = this.getSecretKey();
        params.put("app", AppNameUtils.getAppName());
        if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) {
            try {
                String signData = getSignData((String)params.get("serviceName"));
                String signature = SignUtil.sign(signData, sk);
                params.put("signature", signature);
                params.put("data", signData);
                params.put("ak", ak);
            } catch (Exception var6) {
                LogUtils.NAMING_LOGGER.error("inject ak/sk failed.", var6);
            }
        }

    }

    public String getAccessKey() {
        return this.properties == null ? SpasAdapter.getAk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("accessKey"), new Callable<String>() {
            public String call() {
                return SpasAdapter.getAk();
            }
        });
    }

    public String getSecretKey() {
        return this.properties == null ? SpasAdapter.getSk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("secretKey"), new Callable<String>() {
            public String call() throws Exception {
                return SpasAdapter.getSk();
            }
        });
    }

}

serverAddr和endpoint两种方式配置NacosServer地址

  • serverAddr方式是直接告诉客户端nacos服务端的IP;
  • endpoint是告诉客户端一个endpoint,客户端通过HTTP请求到endpoint查询nacos服务端IP列表,这里endpoint不是NacosServer的地址,而是获取NacosServer地址的连接点。
    NamingProxy会启动1个,用于定期调用refreshSrvIfNeed()方法更新Nacos服务端地址,默认间隔为30秒。
    refreshSrvIfNeed()方法对Nacos服务端地址的更新仅在使用endpoint的时候才会进行实际更新,如果是通过serverAddr配置的Nacos服务端地址,refreshSrvIfNeed()方法将不会进行任何操作。
    在这里插入图片描述

NamingProxy中和endpoint方式有关的代码


//初始化刷新定时任务
    private void initRefreshTask() {
        //初始化线程池
        this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.naming.updater");
                t.setDaemon(true);
                return t;
            }
        });
        this.refreshSrvIfNeed();
        this.securityProxy.login(this.getServerList());
        //设置定时刷新任务
        this.executorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                NamingProxy.this.refreshSrvIfNeed();
            }
        }, 0L, this.vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
        //
        this.executorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                NamingProxy.this.securityProxy.login(NamingProxy.this.getServerList());
            }
        }, 0L, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
    }

    //
    public List<String> getServerListFromEndpoint() {
        try {
            String urlString = "http://" + this.endpoint + "/nacos/serverlist";
            Header header = this.builderHeader();
            HttpRestResult<String> restResult = this.nacosRestTemplate.get(urlString, header, Query.EMPTY, String.class);
            if (!restResult.ok()) {
                throw new IOException("Error while requesting: " + urlString + "'. Server returned: " + restResult.getCode());
            } else {
                String content = (String)restResult.getData();
                List<String> list = new ArrayList();
                Iterator var6 = IoUtils.readLines(new StringReader(content)).iterator();

                while(var6.hasNext()) {
                    String line = (String)var6.next();
                    if (!line.trim().isEmpty()) {
                        list.add(line.trim());
                    }
                }

                return list;
            }
        } catch (Exception var8) {
            var8.printStackTrace();
            return null;
        }
    }
    //进行刷新
    private void refreshSrvIfNeed() {
        try {
            if (!CollectionUtils.isEmpty(this.serverList)) {
                return;
            }
            if (System.currentTimeMillis() - this.lastSrvRefTime < this.vipSrvRefInterMillis) {
                return;
            }

            List<String> list = this.getServerListFromEndpoint();
            if (CollectionUtils.isEmpty(list)) {
                throw new Exception("Can not acquire Nacos list");
            }

            if (!CollectionUtils.isEqualCollection(list, this.serversFromEndpoint)) {
            }

            this.serversFromEndpoint = list;
            this.lastSrvRefTime = System.currentTimeMillis();
        } catch (Throwable var2) {
            LogUtils.NAMING_LOGGER.warn("failed to update server list", var2);
        }

    }
    private List<String> getServerList() {
        List<String> snapshot = this.serversFromEndpoint;
        if (!CollectionUtils.isEmpty(this.serverList)) {
            snapshot = this.serverList;
        }

        return snapshot;
    }

BeatReactor

在这里插入图片描述

BeatReactor 是 Nacos 客户端用于维护服务实例心跳的核心组件,它的作用是确保客户端的服务实例在 Nacos 服务端保持可见和健康。
通过定期发送心跳请求,BeatReactor 可以让 Nacos 服务端知道某个服务实例依然在线,并在实例异常(例如断线或宕机)时及时清理掉不健康的实例。

成员变量Map<String, BeatInfo> dom2Beat中保存了需要发送的BeatInfo,key为{serviceName}#{ip}#{port},value为对应的BeatInfo。

BeatReactor维护一个定期执行线程任务的线程池,默认线程数为1~CPU核心数的一半,可由namingClientBeatThreadCount参数指定。

public class BeatReactor implements Closeable {
    //定时任务线程池
    private final ScheduledExecutorService executorService;
    //与NacosServer通信的代理
    private final NamingProxy serverProxy;
    private boolean lightBeatEnabled;
    //所有需要发送心跳的服务实例及其对应的心跳信息
    public final Map<String, BeatInfo> dom2Beat;

    public BeatReactor(NamingProxy serverProxy) {
        this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
    }
    public BeatReactor(NamingProxy serverProxy, int threadCount) {
        this.lightBeatEnabled = false;
        this.dom2Beat = new ConcurrentHashMap();
        this.serverProxy = serverProxy;
        this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
    }
    //添加心跳包到定时线程池中
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        //构造心跳包key
        String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //从dom2Beat移除该key,如果key对应的value不为空的话,表明该beatInfo已经存在了,则停止心跳检测
        if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
            //停止心跳检测
            existBeat.setStopped(true);
        }
        //如果dom2Beat中不存在该key,则将key放到map中,并进行定时心跳检测
        this.dom2Beat.put(key, beatInfo);
        //定时任务线程池,定时执行里面的线程
        this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
    }

    public void removeBeatInfo(String serviceName, String ip, int port) {
        //从心跳包map中删除对应的心跳包信息
        BeatInfo beatInfo = (BeatInfo)this.dom2Beat.remove(this.buildKey(serviceName, ip, port));
        //将心跳包状态设置为stop
        if (beatInfo != null) {
            beatInfo.setStopped(true);
            MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
        }
    }
    //根据服务实例,构造心跳包
    public BeatInfo buildBeatInfo(Instance instance) {
        return this.buildBeatInfo(instance.getServiceName(), instance);
    }

    public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(groupedServiceName);
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
        return beatInfo;
    }

    public String buildKey(String serviceName, String ip, int port) {
        return serviceName + "#" + ip + "#" + port;
    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);
    }
    
}

BeatInfo 心跳信息

代表一个服务实例的心跳信息,包括服务名、IP 地址、端口、权重等

public class BeatInfo {
    private String serviceName;
    private String ip;
    private int port;
    private double weight;
    // 其他字段
}

BeatTask(BeatReactor的内部类)

负责定期执行心跳任务的类,每个 BeatTask 对应一个服务实例。
当任务运行时,它会调用 NamingProxy.sendBeat 向 Nacos 服务端发送心跳。

//心跳包发送线程
    class BeatTask implements Runnable {
        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        public void run() {
            //如果心跳包检查没有停止,则发送心跳包
            if (!this.beatInfo.isStopped()) {
                long nextTime = this.beatInfo.getPeriod();

                try {
                    //发送心跳包
                    JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = result.get("clientBeatInterval").asLong();
                    boolean lightBeatEnabled = false;
                    if (result.has("lightBeatEnabled")) {
                        lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
                    }

                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0L) {
                        nextTime = interval;
                    }

                    int code = 10200;
                    if (result.has("code")) {
                        code = result.get("code").asInt();
                    }
                    //如果该实例没有在注册中心注册,则进行注册
                    if (code == 20404) {
                        Instance instance = new Instance();
                        instance.setPort(this.beatInfo.getPort());
                        instance.setIp(this.beatInfo.getIp());
                        instance.setWeight(this.beatInfo.getWeight());
                        instance.setMetadata(this.beatInfo.getMetadata());
                        instance.setClusterName(this.beatInfo.getCluster());
                        instance.setServiceName(this.beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);

                        try {
                           //注册服务
             BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
                        } catch (Exception var15) {
                           //log日志
                        }
                    }
                } catch (NacosException var16) {
                    //log日志
                } catch (Exception var17) {
                    //log日志
                } finally {
                    //将线程再次放到定时任务线程池中执行下次的心跳包发送
                    BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

HostReactor

在这里插入图片描述

HostReactor用于获取、保存、更新各Service实例信息。

成员变量Map<String, ServiceInfo> serviceInfoMap中保存了已获取到的服务的信息,key为{服务名}@@{集群名}。

HostReactor会启动名为com.alibaba.nacos.client.naming.updater的线程来更新服务信息,默认线程数为1~CPU核心数的一半,可由namingPollingThreadCount参数指定。

定时任务UpdateTask会根据服务的cacheMillis值定时更新服务信息,默认值为10秒。该定时任务会在获取某一服务信息时创建,保存在成员变量Map<String, ScheduledFuture<?>> futureMap中。

public class HostReactor implements Closeable {
    private static final long DEFAULT_DELAY = 1000L;
    private static final long UPDATE_HOLD_INTERVAL = 5000L;
    private final Map<String, ScheduledFuture<?>> futureMap;
    // 本地已存在的服务列表,key是服务名称,value是ServiceInfo
    private final Map<String, ServiceInfo> serviceInfoMap;
    // 正在更新的实例集合
    private final Map<String, Object> updatingMap;
    // 接收NacosServer端主动推送服务信息端
    private final PushReceiver pushReceiver;
    // 与NacosServer保持心跳服务
    private final BeatReactor beatReactor;
    //底层与NacosServer端通信端代理类
    private final NamingProxy serverProxy;
    //
    private final FailoverReactor failoverReactor;
    //本地缓存端路径
    private final String cacheDir;
    private final boolean pushEmptyProtection;
    // 定时任务(负责服务列表的实时更新)
    private final ScheduledExecutorService executor;
    //实例变化通知者,负责管理服务的订阅信息,并进行回调
    private final InstancesChangeNotifier notifier;

    public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
        this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
    }

    public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, boolean pushEmptyProtection, int pollingThreadCount) {
        this.futureMap = new HashMap();
        this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.client.naming.updater");
                return thread;
            }
        });
        this.beatReactor = beatReactor;
        this.serverProxy = serverProxy;
        this.cacheDir = cacheDir;
        if (loadCacheAtStart) {
            this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));
        } else {
            this.serviceInfoMap = new ConcurrentHashMap(16);
        }

        this.pushEmptyProtection = pushEmptyProtection;
        this.updatingMap = new ConcurrentHashMap();
        this.failoverReactor = new FailoverReactor(this, cacheDir);
        this.pushReceiver = new PushReceiver(this);
        this.notifier = new InstancesChangeNotifier();
        NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
        NotifyCenter.registerSubscriber(this.notifier);
    }

    public Map<String, ServiceInfo> getServiceInfoMap() {
        return this.serviceInfoMap;
    }
    //增加定时刷新服务任务
    public synchronized ScheduledFuture<?> addTask(HostReactor.UpdateTask task) {
        return this.executor.schedule(task, 1000L, TimeUnit.MILLISECONDS);
    }
    //订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listener
    public void subscribe(String serviceName, String clusters, EventListener eventListener) {
        //给该服务增加监听器,服务发生变化后进行回调
        this.notifier.registerListener(serviceName, clusters, eventListener);
        //将该服务添加到HostReactor的定时任务中,定时刷新
        this.getServiceInfo(serviceName, clusters);
    }
    //取消订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listener
    public void unSubscribe(String serviceName, String clusters, EventListener eventListener) {
        this.notifier.deregisterListener(serviceName, clusters, eventListener);
    }

    public List<ServiceInfo> getSubscribeServices() {
        return this.notifier.getSubscribeServices();
    }
    //处理从注册中心获取到的JSON格式的服务实例,并更新到本地serviceInfoMap中
    public ServiceInfo processServiceJson(String json) {
        ServiceInfo serviceInfo = (ServiceInfo)JacksonUtils.toObj(json, ServiceInfo.class);
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        } else {
            ServiceInfo oldService = (ServiceInfo)this.serviceInfoMap.get(serviceKey);
            if (this.pushEmptyProtection && !serviceInfo.validate()) {
                return oldService;
            } else {
                boolean changed = false;
                if (oldService != null) {
                    if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                        LogUtils.NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime());
                    }

                    this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
                    Map<String, Instance> oldHostMap = new HashMap(oldService.getHosts().size());
                    Iterator var7 = oldService.getHosts().iterator();

                    while(var7.hasNext()) {
                        Instance host = (Instance)var7.next();
                        oldHostMap.put(host.toInetAddr(), host);
                    }

                    Map<String, Instance> newHostMap = new HashMap(serviceInfo.getHosts().size());
                    Iterator var17 = serviceInfo.getHosts().iterator();

                    while(var17.hasNext()) {
                        Instance host = (Instance)var17.next();
                        newHostMap.put(host.toInetAddr(), host);
                    }

                    Set<Instance> modHosts = new HashSet();
                    Set<Instance> newHosts = new HashSet();
                    Set<Instance> remvHosts = new HashSet();
                    List<Entry<String, Instance>> newServiceHosts = new ArrayList(newHostMap.entrySet());
                    Iterator var12 = newServiceHosts.iterator();

                    while(true) {
                        Entry entry;
                        Instance host;
                        String key;
                        while(var12.hasNext()) {
                            entry = (Entry)var12.next();
                            host = (Instance)entry.getValue();
                            key = (String)entry.getKey();
                            if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), ((Instance)oldHostMap.get(key)).toString())) {
                                modHosts.add(host);
                            } else if (!oldHostMap.containsKey(key)) {
                                newHosts.add(host);
                            }
                        }

                        var12 = oldHostMap.entrySet().iterator();

                        while(var12.hasNext()) {
                            entry = (Entry)var12.next();
                            host = (Instance)entry.getValue();
                            key = (String)entry.getKey();
                            if (!newHostMap.containsKey(key) && !newHostMap.containsKey(key)) {
                                remvHosts.add(host);
                            }
                        }

                        if (newHosts.size() > 0) {
                            changed = true;
                        }

                        if (remvHosts.size() > 0) {
                            changed = true;
                        }

                        if (modHosts.size() > 0) {
                            changed = true;
                            this.updateBeatInfo(modHosts);
                        }

                        serviceInfo.setJsonFromServer(json);
                        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
                            DiskCache.write(serviceInfo, this.cacheDir);
                        }
                        break;
                    }
                } else {
                    changed = true;
     
                    this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
                    NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
                    serviceInfo.setJsonFromServer(json);
                    DiskCache.write(serviceInfo, this.cacheDir);
                }

                MetricsMonitor.getServiceInfoMapSizeMonitor().set((double)this.serviceInfoMap.size());
                if (changed) {
                   //记录日志
                }
                return serviceInfo;
            }
        }
    }

    private void updateBeatInfo(Set<Instance> modHosts) {
        Iterator var2 = modHosts.iterator();

        while(var2.hasNext()) {
            Instance instance = (Instance)var2.next();
            String key = this.beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());
            if (this.beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) {
                BeatInfo beatInfo = this.beatReactor.buildBeatInfo(instance);
                this.beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);
            }
        }

    }
    //从本地缓存中获取服务实例信息
    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        return (ServiceInfo)this.serviceInfoMap.get(key);
    }
    //直接从服务注册中心获取服务
    public ServiceInfo getServiceInfoDirectlyFromServer(String serviceName, String clusters) throws NacosException {
        String result = this.serverProxy.queryList(serviceName, clusters, 0, false);
        return StringUtils.isNotEmpty(result) ? (ServiceInfo)JacksonUtils.toObj(result, ServiceInfo.class) : null;
    }
    //获取服务,先从本地获取,本地没有,则进行维护,并从注册中心更新最新服务信息
    public ServiceInfo getServiceInfo(String serviceName, String clusters) {
        
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (this.failoverReactor.isFailoverSwitch()) {
            return this.failoverReactor.getService(key);
        } else {
            // 1.先通过serverName即服务名获得一个serviceInfo
            ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);
            if (null == serviceObj) {
                //如果没有serviceInfo,则通过传进来的参数new出一个新的serviceInfo对象,并且同时维护到本地Map和更新Map
                serviceObj = new ServiceInfo(serviceName, clusters);
                this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);
                this.updatingMap.put(serviceName, new Object());
                // 2.updateServiceNow(),立刻去Nacos服务端拉取该服务最新实例列表,更新serviceInfoMap
                this.updateServiceNow(serviceName, clusters);
                this.updatingMap.remove(serviceName);
            } else if (this.updatingMap.containsKey(serviceName)) {
                synchronized(serviceObj) {
                    try {
                        serviceObj.wait(5000L);
                    } catch (InterruptedException var8) {
                        
                    }
                }
            }
            // 3.定时更新实例信息
            this.scheduleUpdateIfAbsent(serviceName, clusters);
            // 4.最后返回服务实例数据(前面已经进行了更新)
            return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());
        }
    }
    //立即从注册中心拉取该服务最新实例列表,并更新到本地
    private void updateServiceNow(String serviceName, String clusters) {
        try {
            this.updateService(serviceName, clusters);
        } catch (NacosException var4) {  
        }
    }
    //通过定时任务,每10秒去更新一次数据
    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {
            synchronized(this.futureMap) {
                if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {
                    //创建一个UpdateTask的更新线程任务,每10秒去异步更新集合数据
                    ScheduledFuture<?> future = this.addTask(new HostReactor.UpdateTask(serviceName, clusters));
                    this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
                }
            }
        }
    }
    //从注册中心拉取该服务最新实例列表,并更新到本地
    //这里需要特别关注,client端通过该方法将client端udpPort传给Nacos Server端
    public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = this.getServiceInfo0(serviceName, clusters);
        boolean var12 = false;
        try {
            var12 = true;
            //从注册中心查询服务下的实例列表
            String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
            if (StringUtils.isNotEmpty(result)) {
                //处理从注册中心获取到的服务实例JSON数据,更新本地服务列表
                this.processServiceJson(result);
                var12 = false;
            } else {
                var12 = false;
            }
        } finally {
            if (var12) {
                if (oldService != null) {
                    synchronized(oldService) {
                        oldService.notifyAll();
                    }
                }
            }
        }
        if (oldService != null) {
            synchronized(oldService) {
                oldService.notifyAll();
            }
        }
    }
    //仅仅执行刷新,从Nacos注册中心获取服务,但不刷新本地列表
    public void refreshOnly(String serviceName, String clusters) {
        try {
            this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
        } catch (Exception var4) {    
        }
    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);
        ThreadUtils.shutdownThreadPool(this.executor, LogUtils.NAMING_LOGGER);
        this.pushReceiver.shutdown();
        this.failoverReactor.shutdown();
        NotifyCenter.deregisterSubscriber(this.notifier);
        LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);
    }
}

获取服务实例流程

在这里插入图片描述

更新服务实例流程UpdateTask

在这里插入图片描述

UpdateTask(Nacos拉模式)

HostReactor的内部类,用于定期从NacosServer端拉取最新的服务信息。

//更新任务线程
    public class UpdateTask implements Runnable {
        long lastRefTime = 9223372036854775807L;
        private final String clusters;
        private final String serviceName;
        private int failCount = 0;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        private void incFailCount() {
            int limit = 6;
            if (this.failCount != limit) {
                ++this.failCount;
            }
        }

        private void resetFailCount() {
            this.failCount = 0;
        }

        public void run() {
            long delayTime = 1000L;

            try {
                //从本地缓存中获取服务实例列表
                ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
                //如果服务为空,则更新本地服务列表
                if (serviceObj == null) {
                    HostReactor.this.updateService(this.serviceName, this.clusters);
                    return;
                }
                // 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询
                if (serviceObj.getLastRefTime() <= this.lastRefTime) {
                    HostReactor.this.updateService(this.serviceName, this.clusters);
                    serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
                } else {
                    HostReactor.this.refreshOnly(this.serviceName, this.clusters);
                }
                 // 刷新更新时间 
                this.lastRefTime = serviceObj.getLastRefTime();
                // 判断该注册的Service是否被订阅,如果没有订阅则不再执行
                if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) {
                    return;
                }

                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    this.incFailCount();
                    return;
                }
                // 下次更新缓存时间设置,默认为10秒
                delayTime = serviceObj.getCacheMillis();
                //任务运行成功,重置失败次数为0
                this.resetFailCount();
            } catch (Throwable var7) {
                //任务执行失败,失败次数+1
                this.incFailCount();
            } finally {
                //取delayTime<<failCount的值与60000L之间的小值,作为下次运行的时间间隔
                //下次调度刷新时间,下次执行的时间与failCount有关 
                HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS);
            }

        }
    }

PushReceiver(Nacos推模式)

PushReceiver用于接收Nacos服务端的推送,初始化时会创建DatagramSocket使用UDP的方式接收Nacos Server端推送的最新服务信息。
PushReceiver使用的UDP协议的客户端的端口udpPort会在UpdateTask任务中,调用updateService的方法中获取,并且传给NacosServer端;
NacosServer会保存该udpPort端口,当Client订阅的服务信息发生变化时,就会主动推送最新信息给Client端。

public class PushReceiver implements Runnable, Closeable {
    private static final Charset UTF_8 = Charset.forName("UTF-8");
    private static final int UDP_MSS = 65536;
    private ScheduledExecutorService executorService;
    private DatagramSocket udpSocket;
    private HostReactor hostReactor;
    private volatile boolean closed = false;

    public static String getPushReceiverUdpPort() {
        return System.getenv("push.receiver.udp.port");
    }

    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            String udpPort = getPushReceiverUdpPort();
            if (StringUtils.isEmpty(udpPort)) {
                this.udpSocket = new DatagramSocket();
            } else {
                this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
            }

            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });
            this.executorService.execute(this);
        } catch (Exception var3) {
            LogUtils.NAMING_LOGGER.error("[NA] init udp socket failed", var3);
        }

    }

    public void run() {
        while(!this.closed) {
            try {
                byte[] buffer = new byte[65536];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                this.udpSocket.receive(packet);
                String json = (new String(IoUtils.tryDecompress(packet.getData()), UTF_8)).trim();
                LogUtils.NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                PushReceiver.PushPacket pushPacket = (PushReceiver.PushPacket)JacksonUtils.toObj(json, PushReceiver.PushPacket.class);
                String ack;
                if (!"dom".equals(pushPacket.type) && !"service".equals(pushPacket.type)) {
                    if ("dump".equals(pushPacket.type)) {
                        ack = "{\"type\": \"dump-ack\", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(this.hostReactor.getServiceInfoMap())) + "\"}";
                    } else {
                        ack = "{\"type\": \"unknown-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
                    }
                } else {
                    //调用HostReactor处理收到的json数据
                    this.hostReactor.processServiceJson(pushPacket.data);
                    ack = "{\"type\": \"push-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
                }

                this.udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress()));
            } catch (Exception var6) {
                if (this.closed) {
                    return;
                }

                LogUtils.NAMING_LOGGER.error("[NA] error while receiving push data", var6);
            }
        }

    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);
        ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);
        this.closed = true;
        this.udpSocket.close();
        LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);
    }

    public int getUdpPort() {
        return this.udpSocket.getLocalPort();
    }

    public static class PushPacket {
        public String type;
        public long lastRefTime;
        public String data;

        public PushPacket() {
        }
    }
}

InstancesChangeNotifier

该对象负责保存服务实例的监听,当服务实例发生变化的时候,负责进行通知,回调监听器方法

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    //监听器map
    private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap();
    private final Object lock = new Object();

    public InstancesChangeNotifier() {
    }
    //注册服务监听器
    public void registerListener(String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
        if (eventListeners == null) {
            synchronized(this.lock) {
                eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
                if (eventListeners == null) {
                    eventListeners = new ConcurrentHashSet();
                    this.listenerMap.put(key, eventListeners);
                }
            }
        }

        eventListeners.add(listener);
    }
    //取消注册服务监听器
    public void deregisterListener(String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
        if (eventListeners != null) {
            eventListeners.remove(listener);
            if (CollectionUtils.isEmpty(eventListeners)) {
                this.listenerMap.remove(key);
            }

        }
    }
    //判断是否被订阅
    public boolean isSubscribed(String serviceName, String clusters) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
        return CollectionUtils.isNotEmpty(eventListeners);
    }

    public List<ServiceInfo> getSubscribeServices() {
        List<ServiceInfo> serviceInfos = new ArrayList();
        Iterator var2 = this.listenerMap.keySet().iterator();

        while(var2.hasNext()) {
            String key = (String)var2.next();
            serviceInfos.add(ServiceInfo.fromKey(key));
        }

        return serviceInfos;
    }
    //服务实例变化事件
    public void onEvent(InstancesChangeEvent event) {
        String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters());
        //获取该服务的订阅者
        ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
        if (!CollectionUtils.isEmpty(eventListeners)) {
            Iterator var4 = eventListeners.iterator();

            while(true) {
                //循环该服务的订阅者,调用订阅者的回调方法
                while(var4.hasNext()) {
                    final EventListener listener = (EventListener)var4.next();
                    final Event namingEvent = this.transferToNamingEvent(event);
                    //如果该listener继承了Nacos定义的AbstractEventListener,并且executor不为空,则通过executor以新的线程的方式回调监听onEvent方法
                    if (listener instanceof AbstractEventListener && ((AbstractEventListener)listener).getExecutor() != null) {
                        ((AbstractEventListener)listener).getExecutor().execute(new Runnable() {
                            public void run() {
                                listener.onEvent(namingEvent);
                            }
                        });
                    } 
                    //否则直接调用该监听器的onEvent方法
                    else {
                        listener.onEvent(namingEvent);
                    }
                }
                //回调结束,返回
                return;
            }
        }
    }
    //转换事件
    private Event transferToNamingEvent(InstancesChangeEvent instancesChangeEvent) {
        return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(), instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());
    }

    public Class<? extends com.alibaba.nacos.common.notify.Event> subscribeType() {
        return InstancesChangeEvent.class;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2135809.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

打造最佳自闭症患全寄宿学校:为孩子的未来保驾护航

在自闭症儿童教育的广阔领域里&#xff0c;寻找一所能够全方位关注孩子成长、为他们的未来奠定坚实基础的学校&#xff0c;是许多家庭的心愿。广州的星贝育园自闭症儿童寄宿制学校&#xff0c;正是这样一所致力于成为最佳的自闭症儿童全寄宿学校&#xff0c;它以独特的教育理念…

软件测试学习笔记丨Postman基础使用

本文转自测试人社区&#xff0c;原文链接&#xff1a;https://ceshiren.com/t/topic/32096 一、Postman基础使用 1.1 简介 Postman是一款流行的API测试工具和开发环境&#xff0c;旨在简化API开发过程、测试和文档编制。优势&#xff1a; Postman可以快速构建请求&#xff0c…

使用 React Testing Library 测试自定义 React Hooks

自定义 React hooks为开发人员提供了在多个组件之间提取和重用常见功能的能力。然而&#xff0c;测试这些 hooks可能会有些棘手&#xff0c;特别是对于测试新手来说。在本文中&#xff0c;我们将探讨如何使用 React Testing Library 测试自定义 React hook。 测试 React组件 首…

录屏工具大揭秘:录屏快捷键、工具使用感受与建议

在繁忙的工作中&#xff0c;我们常常需要记录下一些重要的操作步骤或精彩瞬间&#xff0c;这时录屏工具就派上了用场&#xff1b;接下来&#xff0c;我将以轻松愉悦的方式&#xff0c;为大家介绍四款常用的录屏工具及其快捷键&#xff0c;并分享使用感受与建议&#xff1a; 录…

OM6626国产低功耗蓝牙对比NRF52832/NRF52810

OM6626 是一款超低功耗的蓝牙soc 主要特性&#xff1a; 支持BLE5.3支持SIG Mesh支持2.4G长包主频64Mhz&#xff0c;80KB RAM主要应用在esl电子价签&#xff0c;IoT模组、CGM、高报告率HID设备 PUM特点 1.71~3.6v供电电压1秒间隔广播平均电流&#xff1a;9uA&#xff1b;1秒间…

架构设计 - 常用日志收集方案选型对比与推荐

目录 1. 常用组合1.1 ELK Stack -> Elastic Stack1.2 EFK Stack1.3 Graylog1.4 PLG 日志系统1.5 Splunk1.6 Filebeat ELK1.7 AWS CloudWatch Logs1.8 阿里云日志服务1.9 腾讯云 CLS&#xff08;日志服务&#xff09; 2. 推荐 日志收集是系统监控和调试中的关键环节。常见的…

心觉:以终为始,帮你精准实现目标

Hi&#xff0c;我是心觉&#xff0c;与你一起玩转潜意识、脑波音乐和吸引力法则&#xff0c;轻松掌控自己的人生&#xff01; 挑战每日一省写作169/1000天 假设你的目标是 一年内赚到150万。我们可以通过“以终为始”和“以始为终”的结合来帮助你实现这个目标 以下是完整的…

VuePress搭建文档网站/个人博客(简单配置、易上手)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

sourcetree配置ssh连接gitee

使用PuttyGen.exe生成的公钥私钥格式和git文档方法生成的不一样&#xff0c; SSH 公钥设置 | Gitee 帮助中心 gitee方法生成的公钥类似&#xff1a; ssh-ed25519 AAAA***5B Gitee SSH Key PuttyGen.exe生成的&#xff1a; 公钥 ---- BEGIN SSH2 PUBLIC KEY ---- Comment:…

15个顶级ChatGPT学术提示词指令,让学术研究与撰写论文,轻松上手,效率翻倍

大家好,我将通过这篇文章详细介绍如何有效利用ChatGPT来增强学术写作的各个环节。从论文陈述的精炼到数据的深入分析,从研究主题的探索到论文各部分的撰写,为大家一步一步展示如何通过这些工具来提升研究工作的质量和效率。 1.完善论文陈述: 输入你的初步论文陈述,ChatG…

【YashanDB知识库】数据库获取时间和服务器时间不一致

本文转自YashanDB官网&#xff0c;具体内容可见数据库获取时间和服务器时间不一致 【问题分类】功能使用 【关键字】服务器时间、数据库时间 【问题描述】数据库获取的时间和服务器时间不一致。 【问题原因分析】YashanDB并没有时区的概念&#xff0c;数据库的时间以数据库启…

红黑树的删除

文章目录 前言一.删除的节点左子树右子树都有二.删除的节点只有左/右子树删除调整操作 三.删除的节点没有孩子1.删除的节点为红色2.删除的节点为黑色1).兄弟节点为黑色(1).兄弟节点至少有一个红色的孩子节点LL型RR型RL型LR型 (2).兄弟节点没有孩子或所有孩子为黑色 2).兄弟节点…

Redis实现发布/订阅功能(实战篇)

前言 博主在学习 Redis 实现发布订阅功能的时候&#xff0c;踩了太多的坑。 不是讲解不详细&#xff0c;看的一知半解&#xff1b;就是代码有问题&#xff0c;实际压根跑不起来&#xff01; 于是博主萌生了自己写一个最新版且全程无错的博客供各位参考。希望各位不要把我才过…

C语言浮点型数据在内存中的存储(23)

文章目录 前言一、浮点数在内存中的存储练习引入浮点数的存储浮点数存的过程 二、浮点数取的过程E不全为0或不全为1E全为0E全为1 三、再回顾练习总结 前言 哎&#xff0c;之前写了一篇&#xff0c;可是中途退出没保存&#xff0c;只能再写一遍了~   浮点数在内存中的存储跟整…

智汇创想pytest接口自动化测试框架

本测试框架是基于pytest搭建的接口自动化框架&#xff0c;对象为深圳智汇创想官方网站。深圳智汇创想科技有限责任公司&#xff08;深圳智汇创想科技有限责任公司&#xff09;&#xff0c;是一家专注于跨境电子商务的集团公司&#xff0c;全球电商平台多品类多品牌的零售商&…

什么是APT攻击,有哪些防御策略

在数字化时代&#xff0c;网络安全已成为国家、企业和个人不可忽视的重要议题。其中&#xff0c;高级持续性威胁&#xff08;APT&#xff09;攻击以其隐蔽性强、攻击规模大、持续时间长等特点&#xff0c;成为网络安全领域最为棘手的问题之一。面对APT攻击的严峻挑战&#xff0…

Unity 场景优化(1) game视口的Statistics 内容介绍

Unity的 Statistics &#xff08;stats&#xff09; Unity是多线程的。但是控制使用unity的api必须在主线程中&#xff0c;比如控制物体的transform信息。 Audio Level&#xff1a; DSP Load&#xff1a;数字信号处理&#xff08;Digital Signal Processing&#xff09;负载&…

空间视频化趋势理解

「视频空间化」这个趋势不是从现在开始&#xff0c;而是潜在发展了很多年了&#xff0c;而且我个人觉得「视频空间化」的背后其实对应的是「空间视频化」的趋势&#xff0c;所以未来我们还是要注重自己的技术栈中对视频相关处理技术的吸收以及整合&#xff0c;下面是我的几个理…

Jenkins生成html报告

下载插件 1.需要下载插件 html Publisher plugins 2.下载Groovy(设置css样式&#xff09;&#xff0c;默认没有css样式 在Job配置页面&#xff0c;增加构建步骤Execute system Groovy script&#xff0c;在Groovy Command中输入上面命令&#xff0c;即可&#xff1a; System.…

清理C盘缓存的垃圾,专业清理C盘缓存垃圾与优化运行内存的策略

专业清理C盘缓存垃圾与优化运行内存的策略 一、清理C盘缓存垃圾 在Windows操作系统中&#xff0c;C盘通常作为系统盘&#xff0c;其健康状况直接影响到系统的整体性能。定期清理C盘中的缓存和垃圾文件是维护系统性能的重要步骤。以下是一些专业建议&#xff1a; 1.使用磁盘清…