Nacos服务注册与发现的实现原理图
服务注册与发现的功能:
- 服务实例启动时注册到服务注册表、关闭时则注销(服务注册)
- 服务注册中心根据服务实例定时发送的心跳包,实现健康检测(健康检查BeatReactor中的BeatTask)
- 服务消费者可以通过查询服务注册表来获得可用的实例(服务发现)
- 服务消费者定时拉取服务注册中心的服务实例数据(HostReactor中的UpdateTask)
- 服务注册中心检测到服务提供者异常,主动通过UDP协议推送更新给服务消费者(PushReceiver)
NacosNamingService
Nacos Client包中的NamingService实现类为NacosNamingService,通过封装好的SDK供用户使用,来调用nacos对外暴露的OpenAPI
SDK方式只是提供了一种访问的封装,在底层仍然是基于HTTP协议完成请求的。
NamingService提供了以下方法:
-
registerInstance:注册实例
-
deregisterInstance:注销实例
-
getAllInstances:获取某一服务的所有实例
-
selectInstances:获取某一服务健康或不健康的实例
-
selectOneHealthyInstance:根据权重选择一个健康的实例
-
getServerStatus:检测服务端健康状态
-
subscribe:注册对某个服务的监听
-
unsubscribe:注销对某个服务的监听
-
getSubscribeServices:获取被监听的服务
-
getServicesOfServer:获取命名空间(namespace)下的所有服务名
NacosNamingService还初始化了其他核心类,外提供的方法都是委托给其他核心类处理的。按顺序将依次初始化NamingProxy、BeatReactor、HostReactor
- NamingProxy:用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端
- BeatReactor:本地实例心跳,用于向Nacos服务端发送本地服务的心跳
- HostReactor:用于从注册中心获取、保存、更新各服务实例信息
public class NacosNamingService implements NamingService {
private String namespace;
private String endpoint;
private String serverList;
private String cacheDir;
private String logName;
private HostReactor hostReactor;
//心跳包响应
private BeatReactor beatReactor;
//进行服务注册的带来,通过NamingProxy与Nacos Server进行最终的通信
private NamingProxy serverProxy;
public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty("serverAddr", serverList);
this.init(properties);
}
public NacosNamingService(Properties properties) throws NacosException {
this.init(properties);
}
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
this.initServerAddr(properties);
InitUtils.initWebRootContext(properties);
this.initCacheDir();
this.initLogName(properties);
//NamingService网络层代理
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
//心跳包检测线程池
this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.isPushEmptyProtect(properties), this.initPollingThreadCount(properties));
}
......省略......
//注册服务,委托NamingProxy处理
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//如果是临时节点
if (instance.isEphemeral()) {
//构造心跳包
BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
//将心跳包加到定时线程池中定时执行
this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//服务注册
this.serverProxy.registerService(groupedServiceName, groupName, instance);
}
//注销服务,委托NamingProxy处理
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
//如果是临时节点,则移除心跳包
if (instance.isEphemeral()) {
this.beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
}
//调用NamingProxy进行服务注销
this.serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}
//获取所有服务实例的方法,委托HostReactor处理
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// 如果该消费者订阅了这个服务,那么会先从本地维护的服务列表中获取,本地为空再从服务注册中心获取服务
if (subscribe) {
serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
// 否则实例会从服务中心进行获取
serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
List list;
return (List)(serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts()) ? list : new ArrayList());
}
//获取健康(不健康)服务实例方法,委托HostReactor处理
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// 如果该消费者订阅了这个服务,那么会在本地维护一个服务列表,服务从本地获取
if (subscribe) {
serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
// 否则实例会从服务中心进行获取
serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return this.selectInstances(serviceInfo, healthy);
}
private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
List list;
if (serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
Iterator iterator = list.iterator();
while(true) {
Instance instance;
do {
if (!iterator.hasNext()) {
return list;
}
instance = (Instance)iterator.next();
} while(healthy == instance.isHealthy() && instance.isEnabled() && instance.getWeight() > 0.0D);
iterator.remove();
}
} else {
return new ArrayList();
}
}
//获取一个健康的实例,委托HostReactor处理
public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
return subscribe ? RandomByWeight.selectHost(this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","))) : RandomByWeight.selectHost(this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
}
//监听服务实例,委托HostReactor处理
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
this.hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
}
//取消监听服务
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
this.hostReactor.unSubscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
}
//查询服务列表
public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {
return this.serverProxy.getServiceList(pageNo, pageSize, groupName, selector);
}
public List<ServiceInfo> getSubscribeServices() {
return this.hostReactor.getSubscribeServices();
}
public String getServerStatus() {
return this.serverProxy.serverHealthy() ? "UP" : "DOWN";
}
public BeatReactor getBeatReactor() {
return this.beatReactor;
}
public void shutDown() throws NacosException {
this.beatReactor.shutdown();
this.hostReactor.shutdown();
this.serverProxy.shutdown();
}
}
NamingProxy
NamingProxy用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端。
public class NamingProxy implements Closeable {
//Nacos自定义的RestTemplate
private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
//默认服务端口
private static final int DEFAULT_SERVER_PORT = 8848;
private int serverPort = 8848;
//命名空间
private final String namespaceId;
private final String endpoint;
private String nacosDomain;
private List<String> serverList;
private List<String> serversFromEndpoint = new ArrayList();
private final SecurityProxy securityProxy;
private long lastSrvRefTime = 0L;
private final long vipSrvRefInterMillis;
private final long securityInfoRefreshIntervalMills;
private Properties properties;
//刷新定时任务
private ScheduledExecutorService executorService;
//最大重试次数,默认值是3
private int maxRetry;
public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) {
this.vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30L);
this.securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5L);
this.securityProxy = new SecurityProxy(properties, this.nacosRestTemplate);
this.properties = properties;
this.setServerPort(8848);
this.namespaceId = namespaceId;
this.endpoint = endpoint;
this.maxRetry = ConvertUtils.toInt(properties.getProperty("namingRequestDomainMaxRetryCount", String.valueOf(3)));
if (StringUtils.isNotEmpty(serverList)) {
this.serverList = Arrays.asList(serverList.split(","));
if (this.serverList.size() == 1) {
this.nacosDomain = serverList;
}
}
this.initRefreshTask();
}
//注册服务
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
Map<String, String> params = new HashMap(16);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 把上述服务实例的一些必要参数保存到一个Map中,通过OpenAPI的方式发送注册请求
this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");
}
//注销服务
public void deregisterService(String serviceName, Instance instance) throws NacosException {
Map<String, String> params = new HashMap(8);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
this.reqApi(UtilAndComs.nacosUrlInstance, params, "DELETE");
}
//更新服务
public void updateInstance(String serviceName, String groupName, Instance instance) throws NacosException {
Map<String, String> params = new HashMap(8);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enabled", String.valueOf(instance.isEnabled()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
this.reqApi(UtilAndComs.nacosUrlInstance, params, "PUT");
}
public Service queryService(String serviceName, String groupName) throws NacosException {
Map<String, String> params = new HashMap(3);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
String result = this.reqApi(UtilAndComs.nacosUrlService, params, "GET");
return (Service)JacksonUtils.toObj(result, Service.class);
}
public void createService(Service service, AbstractSelector selector) throws NacosException {
Map<String, String> params = new HashMap(6);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", service.getName());
params.put("groupName", service.getGroupName());
params.put("protectThreshold", String.valueOf(service.getProtectThreshold()));
params.put("metadata", JacksonUtils.toJson(service.getMetadata()));
params.put("selector", JacksonUtils.toJson(selector));
this.reqApi(UtilAndComs.nacosUrlService, params, "POST");
}
public boolean deleteService(String serviceName, String groupName) throws NacosException {
Map<String, String> params = new HashMap(6);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
String result = this.reqApi(UtilAndComs.nacosUrlService, params, "DELETE");
return "ok".equals(result);
}
public void updateService(Service service, AbstractSelector selector) throws NacosException {
Map<String, String> params = new HashMap(6);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", service.getName());
params.put("groupName", service.getGroupName());
params.put("protectThreshold", String.valueOf(service.getProtectThreshold()));
params.put("metadata", JacksonUtils.toJson(service.getMetadata()));
params.put("selector", JacksonUtils.toJson(selector));
this.reqApi(UtilAndComs.nacosUrlService, params, "PUT");
}
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
Map<String, String> params = new HashMap(8);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return this.reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, "GET");
}
//发送心跳信息,通过心跳机制向服务端报告实例的健康状态
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap(8);
Map<String, String> bodyMap = new HashMap(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put("namespaceId", this.namespaceId);
params.put("serviceName", beatInfo.getServiceName());
params.put("clusterName", beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
String result = this.reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, "PUT");
return JacksonUtils.toObj(result);
}
public boolean serverHealthy() {
try {
String result = this.reqApi(UtilAndComs.nacosUrlBase + "/operator/metrics", new HashMap(2), "GET");
JsonNode json = JacksonUtils.toObj(result);
String serverStatus = json.get("status").asText();
return "UP".equals(serverStatus);
} catch (Exception var4) {
return false;
}
}
//查询服务列表
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName) throws NacosException {
return this.getServiceList(pageNo, pageSize, groupName, (AbstractSelector)null);
}
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {
Map<String, String> params = new HashMap(4);
params.put("pageNo", String.valueOf(pageNo));
params.put("pageSize", String.valueOf(pageSize));
params.put("namespaceId", this.namespaceId);
params.put("groupName", groupName);
if (selector != null) {
switch(SelectorType.valueOf(selector.getType())) {
case none:
default:
break;
case label:
ExpressionSelector expressionSelector = (ExpressionSelector)selector;
params.put("selector", JacksonUtils.toJson(expressionSelector));
}
}
String result = this.reqApi(UtilAndComs.nacosUrlBase + "/service/list", params, "GET");
JsonNode json = JacksonUtils.toObj(result);
ListView<String> listView = new ListView();
listView.setCount(json.get("count").asInt());
listView.setData((List)JacksonUtils.toObj(json.get("doms").toString(), new TypeReference<List<String>>() {
}));
return listView;
}
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
return this.reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {
return this.reqApi(api, params, body, this.getServerList(), method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
params.put("namespaceId", this.getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) {
throw new NacosException(400, "no server available");
} else {
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(this.nacosDomain)) {
int i = 0;
while(i < this.maxRetry) {
try {
return this.callServer(api, params, body, this.nacosDomain, method);
} catch (NacosException var12) {
exception = var12;
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12);
}
++i;
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
int i = 0;
while(i < servers.size()) {
String server = (String)servers.get(index);
try {
return this.callServer(api, params, body, server, method);
} catch (NacosException var13) {
exception = var13;
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13);
}
index = (index + 1) % servers.size();
++i;
}
}
}
throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
}
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer) throws NacosException {
return this.callServer(api, params, body, curServer, "GET");
}
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0L;
this.injectSecurityInfo(params);
Header header = this.builderHeader();
String url;
if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + ":" + this.serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
} else {
url = curServer + api;
}
try {
HttpRestResult<String> restResult = this.nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe((double)(end - start));
if (restResult.ok()) {
return (String)restResult.getData();
} else if (304 == restResult.getCode()) {
return "";
} else {
throw new NacosException(restResult.getCode(), restResult.getMessage());
}
} catch (Exception var13) {
LogUtils.NAMING_LOGGER.error("[NA] failed to request", var13);
throw new NacosException(500, var13);
}
}
private void injectSecurityInfo(Map<String, String> params) {
if (StringUtils.isNotBlank(this.securityProxy.getAccessToken())) {
params.put("accessToken", this.securityProxy.getAccessToken());
}
String ak = this.getAccessKey();
String sk = this.getSecretKey();
params.put("app", AppNameUtils.getAppName());
if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) {
try {
String signData = getSignData((String)params.get("serviceName"));
String signature = SignUtil.sign(signData, sk);
params.put("signature", signature);
params.put("data", signData);
params.put("ak", ak);
} catch (Exception var6) {
LogUtils.NAMING_LOGGER.error("inject ak/sk failed.", var6);
}
}
}
public String getAccessKey() {
return this.properties == null ? SpasAdapter.getAk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("accessKey"), new Callable<String>() {
public String call() {
return SpasAdapter.getAk();
}
});
}
public String getSecretKey() {
return this.properties == null ? SpasAdapter.getSk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("secretKey"), new Callable<String>() {
public String call() throws Exception {
return SpasAdapter.getSk();
}
});
}
}
serverAddr和endpoint两种方式配置NacosServer地址
- serverAddr方式是直接告诉客户端nacos服务端的IP;
- endpoint是告诉客户端一个endpoint,客户端通过HTTP请求到endpoint查询nacos服务端IP列表,这里endpoint不是NacosServer的地址,而是获取NacosServer地址的连接点。
NamingProxy会启动1个,用于定期调用refreshSrvIfNeed()方法更新Nacos服务端地址,默认间隔为30秒。
refreshSrvIfNeed()方法对Nacos服务端地址的更新仅在使用endpoint的时候才会进行实际更新,如果是通过serverAddr配置的Nacos服务端地址,refreshSrvIfNeed()方法将不会进行任何操作。
NamingProxy中和endpoint方式有关的代码
//初始化刷新定时任务
private void initRefreshTask() {
//初始化线程池
this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.naming.updater");
t.setDaemon(true);
return t;
}
});
this.refreshSrvIfNeed();
this.securityProxy.login(this.getServerList());
//设置定时刷新任务
this.executorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
NamingProxy.this.refreshSrvIfNeed();
}
}, 0L, this.vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
//
this.executorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
NamingProxy.this.securityProxy.login(NamingProxy.this.getServerList());
}
}, 0L, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
}
//
public List<String> getServerListFromEndpoint() {
try {
String urlString = "http://" + this.endpoint + "/nacos/serverlist";
Header header = this.builderHeader();
HttpRestResult<String> restResult = this.nacosRestTemplate.get(urlString, header, Query.EMPTY, String.class);
if (!restResult.ok()) {
throw new IOException("Error while requesting: " + urlString + "'. Server returned: " + restResult.getCode());
} else {
String content = (String)restResult.getData();
List<String> list = new ArrayList();
Iterator var6 = IoUtils.readLines(new StringReader(content)).iterator();
while(var6.hasNext()) {
String line = (String)var6.next();
if (!line.trim().isEmpty()) {
list.add(line.trim());
}
}
return list;
}
} catch (Exception var8) {
var8.printStackTrace();
return null;
}
}
//进行刷新
private void refreshSrvIfNeed() {
try {
if (!CollectionUtils.isEmpty(this.serverList)) {
return;
}
if (System.currentTimeMillis() - this.lastSrvRefTime < this.vipSrvRefInterMillis) {
return;
}
List<String> list = this.getServerListFromEndpoint();
if (CollectionUtils.isEmpty(list)) {
throw new Exception("Can not acquire Nacos list");
}
if (!CollectionUtils.isEqualCollection(list, this.serversFromEndpoint)) {
}
this.serversFromEndpoint = list;
this.lastSrvRefTime = System.currentTimeMillis();
} catch (Throwable var2) {
LogUtils.NAMING_LOGGER.warn("failed to update server list", var2);
}
}
private List<String> getServerList() {
List<String> snapshot = this.serversFromEndpoint;
if (!CollectionUtils.isEmpty(this.serverList)) {
snapshot = this.serverList;
}
return snapshot;
}
BeatReactor
BeatReactor 是 Nacos 客户端用于维护服务实例心跳的核心组件,它的作用是确保客户端的服务实例在 Nacos 服务端保持可见和健康。
通过定期发送心跳请求,BeatReactor 可以让 Nacos 服务端知道某个服务实例依然在线,并在实例异常(例如断线或宕机)时及时清理掉不健康的实例。
成员变量Map<String, BeatInfo> dom2Beat中保存了需要发送的BeatInfo,key为{serviceName}#{ip}#{port},value为对应的BeatInfo。
BeatReactor维护一个定期执行线程任务的线程池,默认线程数为1~CPU核心数的一半,可由namingClientBeatThreadCount参数指定。
public class BeatReactor implements Closeable {
//定时任务线程池
private final ScheduledExecutorService executorService;
//与NacosServer通信的代理
private final NamingProxy serverProxy;
private boolean lightBeatEnabled;
//所有需要发送心跳的服务实例及其对应的心跳信息
public final Map<String, BeatInfo> dom2Beat;
public BeatReactor(NamingProxy serverProxy) {
this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.lightBeatEnabled = false;
this.dom2Beat = new ConcurrentHashMap();
this.serverProxy = serverProxy;
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
//添加心跳包到定时线程池中
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
//构造心跳包key
String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//从dom2Beat移除该key,如果key对应的value不为空的话,表明该beatInfo已经存在了,则停止心跳检测
if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
//停止心跳检测
existBeat.setStopped(true);
}
//如果dom2Beat中不存在该key,则将key放到map中,并进行定时心跳检测
this.dom2Beat.put(key, beatInfo);
//定时任务线程池,定时执行里面的线程
this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
public void removeBeatInfo(String serviceName, String ip, int port) {
//从心跳包map中删除对应的心跳包信息
BeatInfo beatInfo = (BeatInfo)this.dom2Beat.remove(this.buildKey(serviceName, ip, port));
//将心跳包状态设置为stop
if (beatInfo != null) {
beatInfo.setStopped(true);
MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
}
//根据服务实例,构造心跳包
public BeatInfo buildBeatInfo(Instance instance) {
return this.buildBeatInfo(instance.getServiceName(), instance);
}
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
public String buildKey(String serviceName, String ip, int port) {
return serviceName + "#" + ip + "#" + port;
}
public void shutdown() throws NacosException {
String className = this.getClass().getName();
ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);
}
}
BeatInfo 心跳信息
代表一个服务实例的心跳信息,包括服务名、IP 地址、端口、权重等
public class BeatInfo {
private String serviceName;
private String ip;
private int port;
private double weight;
// 其他字段
}
BeatTask(BeatReactor的内部类)
负责定期执行心跳任务的类,每个 BeatTask 对应一个服务实例。
当任务运行时,它会调用 NamingProxy.sendBeat 向 Nacos 服务端发送心跳。
//心跳包发送线程
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
public void run() {
//如果心跳包检查没有停止,则发送心跳包
if (!this.beatInfo.isStopped()) {
long nextTime = this.beatInfo.getPeriod();
try {
//发送心跳包
JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has("lightBeatEnabled")) {
lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0L) {
nextTime = interval;
}
int code = 10200;
if (result.has("code")) {
code = result.get("code").asInt();
}
//如果该实例没有在注册中心注册,则进行注册
if (code == 20404) {
Instance instance = new Instance();
instance.setPort(this.beatInfo.getPort());
instance.setIp(this.beatInfo.getIp());
instance.setWeight(this.beatInfo.getWeight());
instance.setMetadata(this.beatInfo.getMetadata());
instance.setClusterName(this.beatInfo.getCluster());
instance.setServiceName(this.beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
//注册服务
BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
} catch (Exception var15) {
//log日志
}
}
} catch (NacosException var16) {
//log日志
} catch (Exception var17) {
//log日志
} finally {
//将线程再次放到定时任务线程池中执行下次的心跳包发送
BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
}
HostReactor
HostReactor用于获取、保存、更新各Service实例信息。
成员变量Map<String, ServiceInfo> serviceInfoMap中保存了已获取到的服务的信息,key为{服务名}@@{集群名}。
HostReactor会启动名为com.alibaba.nacos.client.naming.updater的线程来更新服务信息,默认线程数为1~CPU核心数的一半,可由namingPollingThreadCount参数指定。
定时任务UpdateTask会根据服务的cacheMillis值定时更新服务信息,默认值为10秒。该定时任务会在获取某一服务信息时创建,保存在成员变量Map<String, ScheduledFuture<?>> futureMap中。
public class HostReactor implements Closeable {
private static final long DEFAULT_DELAY = 1000L;
private static final long UPDATE_HOLD_INTERVAL = 5000L;
private final Map<String, ScheduledFuture<?>> futureMap;
// 本地已存在的服务列表,key是服务名称,value是ServiceInfo
private final Map<String, ServiceInfo> serviceInfoMap;
// 正在更新的实例集合
private final Map<String, Object> updatingMap;
// 接收NacosServer端主动推送服务信息端
private final PushReceiver pushReceiver;
// 与NacosServer保持心跳服务
private final BeatReactor beatReactor;
//底层与NacosServer端通信端代理类
private final NamingProxy serverProxy;
//
private final FailoverReactor failoverReactor;
//本地缓存端路径
private final String cacheDir;
private final boolean pushEmptyProtection;
// 定时任务(负责服务列表的实时更新)
private final ScheduledExecutorService executor;
//实例变化通知者,负责管理服务的订阅信息,并进行回调
private final InstancesChangeNotifier notifier;
public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, boolean pushEmptyProtection, int pollingThreadCount) {
this.futureMap = new HashMap();
this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.beatReactor = beatReactor;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap(16);
}
this.pushEmptyProtection = pushEmptyProtection;
this.updatingMap = new ConcurrentHashMap();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
this.notifier = new InstancesChangeNotifier();
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(this.notifier);
}
public Map<String, ServiceInfo> getServiceInfoMap() {
return this.serviceInfoMap;
}
//增加定时刷新服务任务
public synchronized ScheduledFuture<?> addTask(HostReactor.UpdateTask task) {
return this.executor.schedule(task, 1000L, TimeUnit.MILLISECONDS);
}
//订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listener
public void subscribe(String serviceName, String clusters, EventListener eventListener) {
//给该服务增加监听器,服务发生变化后进行回调
this.notifier.registerListener(serviceName, clusters, eventListener);
//将该服务添加到HostReactor的定时任务中,定时刷新
this.getServiceInfo(serviceName, clusters);
}
//取消订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listener
public void unSubscribe(String serviceName, String clusters, EventListener eventListener) {
this.notifier.deregisterListener(serviceName, clusters, eventListener);
}
public List<ServiceInfo> getSubscribeServices() {
return this.notifier.getSubscribeServices();
}
//处理从注册中心获取到的JSON格式的服务实例,并更新到本地serviceInfoMap中
public ServiceInfo processServiceJson(String json) {
ServiceInfo serviceInfo = (ServiceInfo)JacksonUtils.toObj(json, ServiceInfo.class);
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
} else {
ServiceInfo oldService = (ServiceInfo)this.serviceInfoMap.get(serviceKey);
if (this.pushEmptyProtection && !serviceInfo.validate()) {
return oldService;
} else {
boolean changed = false;
if (oldService != null) {
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
LogUtils.NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime());
}
this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
Map<String, Instance> oldHostMap = new HashMap(oldService.getHosts().size());
Iterator var7 = oldService.getHosts().iterator();
while(var7.hasNext()) {
Instance host = (Instance)var7.next();
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap(serviceInfo.getHosts().size());
Iterator var17 = serviceInfo.getHosts().iterator();
while(var17.hasNext()) {
Instance host = (Instance)var17.next();
newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet();
Set<Instance> newHosts = new HashSet();
Set<Instance> remvHosts = new HashSet();
List<Entry<String, Instance>> newServiceHosts = new ArrayList(newHostMap.entrySet());
Iterator var12 = newServiceHosts.iterator();
while(true) {
Entry entry;
Instance host;
String key;
while(var12.hasNext()) {
entry = (Entry)var12.next();
host = (Instance)entry.getValue();
key = (String)entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), ((Instance)oldHostMap.get(key)).toString())) {
modHosts.add(host);
} else if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
var12 = oldHostMap.entrySet().iterator();
while(var12.hasNext()) {
entry = (Entry)var12.next();
host = (Instance)entry.getValue();
key = (String)entry.getKey();
if (!newHostMap.containsKey(key) && !newHostMap.containsKey(key)) {
remvHosts.add(host);
}
}
if (newHosts.size() > 0) {
changed = true;
}
if (remvHosts.size() > 0) {
changed = true;
}
if (modHosts.size() > 0) {
changed = true;
this.updateBeatInfo(modHosts);
}
serviceInfo.setJsonFromServer(json);
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, this.cacheDir);
}
break;
}
} else {
changed = true;
this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, this.cacheDir);
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set((double)this.serviceInfoMap.size());
if (changed) {
//记录日志
}
return serviceInfo;
}
}
}
private void updateBeatInfo(Set<Instance> modHosts) {
Iterator var2 = modHosts.iterator();
while(var2.hasNext()) {
Instance instance = (Instance)var2.next();
String key = this.beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());
if (this.beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) {
BeatInfo beatInfo = this.beatReactor.buildBeatInfo(instance);
this.beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);
}
}
}
//从本地缓存中获取服务实例信息
private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
return (ServiceInfo)this.serviceInfoMap.get(key);
}
//直接从服务注册中心获取服务
public ServiceInfo getServiceInfoDirectlyFromServer(String serviceName, String clusters) throws NacosException {
String result = this.serverProxy.queryList(serviceName, clusters, 0, false);
return StringUtils.isNotEmpty(result) ? (ServiceInfo)JacksonUtils.toObj(result, ServiceInfo.class) : null;
}
//获取服务,先从本地获取,本地没有,则进行维护,并从注册中心更新最新服务信息
public ServiceInfo getServiceInfo(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
if (this.failoverReactor.isFailoverSwitch()) {
return this.failoverReactor.getService(key);
} else {
// 1.先通过serverName即服务名获得一个serviceInfo
ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
//如果没有serviceInfo,则通过传进来的参数new出一个新的serviceInfo对象,并且同时维护到本地Map和更新Map
serviceObj = new ServiceInfo(serviceName, clusters);
this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);
this.updatingMap.put(serviceName, new Object());
// 2.updateServiceNow(),立刻去Nacos服务端拉取该服务最新实例列表,更新serviceInfoMap
this.updateServiceNow(serviceName, clusters);
this.updatingMap.remove(serviceName);
} else if (this.updatingMap.containsKey(serviceName)) {
synchronized(serviceObj) {
try {
serviceObj.wait(5000L);
} catch (InterruptedException var8) {
}
}
}
// 3.定时更新实例信息
this.scheduleUpdateIfAbsent(serviceName, clusters);
// 4.最后返回服务实例数据(前面已经进行了更新)
return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());
}
}
//立即从注册中心拉取该服务最新实例列表,并更新到本地
private void updateServiceNow(String serviceName, String clusters) {
try {
this.updateService(serviceName, clusters);
} catch (NacosException var4) {
}
}
//通过定时任务,每10秒去更新一次数据
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {
synchronized(this.futureMap) {
if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {
//创建一个UpdateTask的更新线程任务,每10秒去异步更新集合数据
ScheduledFuture<?> future = this.addTask(new HostReactor.UpdateTask(serviceName, clusters));
this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
}
}
//从注册中心拉取该服务最新实例列表,并更新到本地
//这里需要特别关注,client端通过该方法将client端udpPort传给Nacos Server端
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = this.getServiceInfo0(serviceName, clusters);
boolean var12 = false;
try {
var12 = true;
//从注册中心查询服务下的实例列表
String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
//处理从注册中心获取到的服务实例JSON数据,更新本地服务列表
this.processServiceJson(result);
var12 = false;
} else {
var12 = false;
}
} finally {
if (var12) {
if (oldService != null) {
synchronized(oldService) {
oldService.notifyAll();
}
}
}
}
if (oldService != null) {
synchronized(oldService) {
oldService.notifyAll();
}
}
}
//仅仅执行刷新,从Nacos注册中心获取服务,但不刷新本地列表
public void refreshOnly(String serviceName, String clusters) {
try {
this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
} catch (Exception var4) {
}
}
public void shutdown() throws NacosException {
String className = this.getClass().getName();
LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);
ThreadUtils.shutdownThreadPool(this.executor, LogUtils.NAMING_LOGGER);
this.pushReceiver.shutdown();
this.failoverReactor.shutdown();
NotifyCenter.deregisterSubscriber(this.notifier);
LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);
}
}
获取服务实例流程
更新服务实例流程UpdateTask
UpdateTask(Nacos拉模式)
HostReactor的内部类,用于定期从NacosServer端拉取最新的服务信息。
//更新任务线程
public class UpdateTask implements Runnable {
long lastRefTime = 9223372036854775807L;
private final String clusters;
private final String serviceName;
private int failCount = 0;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
private void incFailCount() {
int limit = 6;
if (this.failCount != limit) {
++this.failCount;
}
}
private void resetFailCount() {
this.failCount = 0;
}
public void run() {
long delayTime = 1000L;
try {
//从本地缓存中获取服务实例列表
ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
//如果服务为空,则更新本地服务列表
if (serviceObj == null) {
HostReactor.this.updateService(this.serviceName, this.clusters);
return;
}
// 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询
if (serviceObj.getLastRefTime() <= this.lastRefTime) {
HostReactor.this.updateService(this.serviceName, this.clusters);
serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
} else {
HostReactor.this.refreshOnly(this.serviceName, this.clusters);
}
// 刷新更新时间
this.lastRefTime = serviceObj.getLastRefTime();
// 判断该注册的Service是否被订阅,如果没有订阅则不再执行
if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) {
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
this.incFailCount();
return;
}
// 下次更新缓存时间设置,默认为10秒
delayTime = serviceObj.getCacheMillis();
//任务运行成功,重置失败次数为0
this.resetFailCount();
} catch (Throwable var7) {
//任务执行失败,失败次数+1
this.incFailCount();
} finally {
//取delayTime<<failCount的值与60000L之间的小值,作为下次运行的时间间隔
//下次调度刷新时间,下次执行的时间与failCount有关
HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS);
}
}
}
PushReceiver(Nacos推模式)
PushReceiver用于接收Nacos服务端的推送,初始化时会创建DatagramSocket使用UDP的方式接收Nacos Server端推送的最新服务信息。
PushReceiver使用的UDP协议的客户端的端口udpPort会在UpdateTask任务中,调用updateService的方法中获取,并且传给NacosServer端;
NacosServer会保存该udpPort端口,当Client订阅的服务信息发生变化时,就会主动推送最新信息给Client端。
public class PushReceiver implements Runnable, Closeable {
private static final Charset UTF_8 = Charset.forName("UTF-8");
private static final int UDP_MSS = 65536;
private ScheduledExecutorService executorService;
private DatagramSocket udpSocket;
private HostReactor hostReactor;
private volatile boolean closed = false;
public static String getPushReceiverUdpPort() {
return System.getenv("push.receiver.udp.port");
}
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
String udpPort = getPushReceiverUdpPort();
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
this.executorService.execute(this);
} catch (Exception var3) {
LogUtils.NAMING_LOGGER.error("[NA] init udp socket failed", var3);
}
}
public void run() {
while(!this.closed) {
try {
byte[] buffer = new byte[65536];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
this.udpSocket.receive(packet);
String json = (new String(IoUtils.tryDecompress(packet.getData()), UTF_8)).trim();
LogUtils.NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushReceiver.PushPacket pushPacket = (PushReceiver.PushPacket)JacksonUtils.toObj(json, PushReceiver.PushPacket.class);
String ack;
if (!"dom".equals(pushPacket.type) && !"service".equals(pushPacket.type)) {
if ("dump".equals(pushPacket.type)) {
ack = "{\"type\": \"dump-ack\", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(this.hostReactor.getServiceInfoMap())) + "\"}";
} else {
ack = "{\"type\": \"unknown-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
}
} else {
//调用HostReactor处理收到的json数据
this.hostReactor.processServiceJson(pushPacket.data);
ack = "{\"type\": \"push-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
}
this.udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress()));
} catch (Exception var6) {
if (this.closed) {
return;
}
LogUtils.NAMING_LOGGER.error("[NA] error while receiving push data", var6);
}
}
}
public void shutdown() throws NacosException {
String className = this.getClass().getName();
LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);
ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);
this.closed = true;
this.udpSocket.close();
LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);
}
public int getUdpPort() {
return this.udpSocket.getLocalPort();
}
public static class PushPacket {
public String type;
public long lastRefTime;
public String data;
public PushPacket() {
}
}
}
InstancesChangeNotifier
该对象负责保存服务实例的监听,当服务实例发生变化的时候,负责进行通知,回调监听器方法
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
//监听器map
private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap();
private final Object lock = new Object();
public InstancesChangeNotifier() {
}
//注册服务监听器
public void registerListener(String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(serviceName, clusters);
ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
if (eventListeners == null) {
synchronized(this.lock) {
eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet();
this.listenerMap.put(key, eventListeners);
}
}
}
eventListeners.add(listener);
}
//取消注册服务监听器
public void deregisterListener(String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(serviceName, clusters);
ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
if (eventListeners != null) {
eventListeners.remove(listener);
if (CollectionUtils.isEmpty(eventListeners)) {
this.listenerMap.remove(key);
}
}
}
//判断是否被订阅
public boolean isSubscribed(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
return CollectionUtils.isNotEmpty(eventListeners);
}
public List<ServiceInfo> getSubscribeServices() {
List<ServiceInfo> serviceInfos = new ArrayList();
Iterator var2 = this.listenerMap.keySet().iterator();
while(var2.hasNext()) {
String key = (String)var2.next();
serviceInfos.add(ServiceInfo.fromKey(key));
}
return serviceInfos;
}
//服务实例变化事件
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters());
//获取该服务的订阅者
ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
if (!CollectionUtils.isEmpty(eventListeners)) {
Iterator var4 = eventListeners.iterator();
while(true) {
//循环该服务的订阅者,调用订阅者的回调方法
while(var4.hasNext()) {
final EventListener listener = (EventListener)var4.next();
final Event namingEvent = this.transferToNamingEvent(event);
//如果该listener继承了Nacos定义的AbstractEventListener,并且executor不为空,则通过executor以新的线程的方式回调监听onEvent方法
if (listener instanceof AbstractEventListener && ((AbstractEventListener)listener).getExecutor() != null) {
((AbstractEventListener)listener).getExecutor().execute(new Runnable() {
public void run() {
listener.onEvent(namingEvent);
}
});
}
//否则直接调用该监听器的onEvent方法
else {
listener.onEvent(namingEvent);
}
}
//回调结束,返回
return;
}
}
}
//转换事件
private Event transferToNamingEvent(InstancesChangeEvent instancesChangeEvent) {
return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(), instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());
}
public Class<? extends com.alibaba.nacos.common.notify.Event> subscribeType() {
return InstancesChangeEvent.class;
}
}