文章目录
- 1.ServiceManager#init()
- 1.1 定时发送任务
- 1.2 定时更新状态任务
- 1.3 定时清除空service任务
1.ServiceManager#init()
@PostConstruct
public void init() {
// 启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
// 本机注册表是以各个服务的checksum(字串拼接)形式被发送的
GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
// 从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
if (emptyServiceAutoClean) {
Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",
cleanEmptyServiceDelay, cleanEmptyServicePeriod);
// delay 60s, period 20s;
// This task is not recommended to be performed frequently in order to avoid
// the possibility that the service cache information may just be deleted
// and then created due to the heartbeat mechanism
// 启动了一个定时任务:每30s清理一次注册表中的空service
// 空service,即没有任何instance的service
GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,
cleanEmptyServicePeriod);
}
try {
Loggers.SRV_LOG.info("listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen for service meta change failed!");
}
}
- 启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
- 从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
- 启动了一个定时任务:每30s清理一次注册表中的空service
1.1 定时发送任务
启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
ServiceReporter#run()
@Override
public void run() {
try {
// map的key为namespaceId,value为一个Set集合,集合中存放的是当前
// namespace中所有service的名称
// 这个map中存放的是当前注册表中所有服务的名称
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
// 遍历所有的namespace
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
// 遍历当前namespace中的所有服务名称
for (String serviceName : allServiceNames.get(namespaceId)) {
// 若当前服务不归当前Server负责,则直接跳过
if (!distroMapper.responsible(serviceName)) {
continue;
}
// 从注册表中获取到当前遍历的服务
Service service = getService(namespaceId, serviceName);
if (service == null || service.isEmpty()) {
continue;
}
// 重新计算当前service的checksum
service.recalculateChecksum();
// 将计算好的checksum写入到map
checksum.addItem(serviceName, service.getChecksum());
} // end-内层for
Message msg = new Message();
// 将当前namespace中的所有服务的checksum写入到msg中,
// 将来将msg发送给其它nacos
msg.setData(JacksonUtils.toJson(checksum));
// 获取到所有nacos
Collection<Member> sameSiteServers = memberManager.allMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
// 遍历所有nacos,要将msg发送出去
for (Member server : sameSiteServers) {
// 若当前遍历的server是当前server,则直接跳过
if (server.getAddress().equals(NetUtils.localServer())) {
continue;
}
// 将msg发送给当前遍历的server
synchronizer.send(server.getAddress(), msg);
}
} // end-外层for
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
} finally {
// 开启下一次定时执行
GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
TimeUnit.MILLISECONDS);
}
}
}
-
获取当前注册表中所有服务的名称
allServiceNames
-
遍历所有的namespace, 遍历当前namespace中的所有服务名称
- 从注册表中获取到当前遍历的服务, 重新计算当前service的checksum, 保持至map中
- 获取并遍历所有nacos,要将msg发送出去, msg就是上面的checksum的封装
1.2 定时更新状态任务
从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
UpdatedServiceProcessor#run() -> ServiceUpdater#run() -> ServiceUpdater#updatedHealthStatus()
@Override
public void run() {
ServiceKey serviceKey = null;
try {
// 运行一个无限循环
while (true) {
try {
// 从队列中取出一个元素
// toBeUpdatedServicesQueue 中存放的是来自于其它Server的服务状态发生变更的服务
serviceKey = toBeUpdatedServicesQueue.take();
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
}
if (serviceKey == null) {
continue;
}
// 另外启用一个线程来完成ServiceUpdater任务
GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
}
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
}
}
}
@Override
public void run() {
try {
updatedHealthStatus(namespaceId, serviceName, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG
.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
serverIP, e);
}
}
}
public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
// 从其它server获取指定服务的数据
Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
JsonNode serviceJson = JacksonUtils.toObj(msg.getData());
ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
// 这个map中存放的是来自于其它Nacos中的、当前服务所包含的所有instance的健康状态
// map的key为ip:port,value为healthy
Map<String, String> ipsMap = new HashMap<>(ipList.size());
// 遍历ipList
for (int i = 0; i < ipList.size(); i++) {
// 这个ip字符串的格式是: ip:port_healthy
String ip = ipList.get(i).asText();
String[] strings = ip.split("_");
// 将当前遍历instance的地址及健康状态写入到map
ipsMap.put(strings[0], strings[1]);
}
// 从注册表中获取当前服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
return;
}
boolean changed = false;
// 获取到注册表中当前服务的所有instance
List<Instance> instances = service.allIPs();
// 遍历注册表中当前服务的所有instance
for (Instance instance : instances) {
// 获取来自于其它nacos的当前遍历instance的健康状态
boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
// 若当前instance在注册表中记录的状态与外来的状态不一致,则以外来的为准
if (valid != instance.isHealthy()) {
changed = true;
// 将注册表中的instance状态修改为外来的状态
instance.setHealthy(valid);
Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
(instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
instance.getClusterName());
}
}
// 只要有一个instance的状态发生了变更,那么这个changed的值就为true
if (changed) {
// 发布状态变更事件
pushService.serviceChanged(service);
if (Loggers.EVT_LOG.isDebugEnabled()) {
StringBuilder stringBuilder = new StringBuilder();
List<Instance> allIps = service.allIPs();
for (Instance instance : allIps) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG
.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
service.getName(), stringBuilder.toString());
}
}
}
- 从其它server获取指定服务的数据msg
- 获取到注册表中当前服务的所有instance, 遍历, 获取来自于其它nacos的当前遍历instance的健康状态, 如果当前instance在注册表中记录的状态与外来的状态不一致,则以外来的为准, 将注册表中的instance状态修改为外来的状态
- 只要有一个instance的状态发生了变更, 发布状态变更事件, 在上一节的Nacos Server与Client的UDP通信中有这个方法的详情分析
1.3 定时清除空service任务
启动了一个定时任务:每30s清理一次注册表中的空service, 即没有任何instance的service
EmptyServiceAutoClean#run()
@Override
public void run() {
// Parallel flow opening threshold
// 这是一个并行流开启阈值:当一个namespace中包含的service的数量超过100时,
// 会将注册创建为一个并行流,否则就是一个串行流
int parallelSize = 100;
// 遍历注册表
// stringServiceMap 就是注册表的内层map
serviceMap.forEach((namespace, stringServiceMap) -> {
Stream<Map.Entry<String, Service>> stream = null;
// 若当前遍历的元素(namespace)中包含的服务的数量超出了阈值,
// 则生成一个并行流
if (stringServiceMap.size() > parallelSize) {
// 并行流
stream = stringServiceMap.entrySet().parallelStream();
} else {
// 串行流
stream = stringServiceMap.entrySet().stream();
}
stream.filter(entry -> {
final String serviceName = entry.getKey();
// 只要当前遍历的服务需要当前server负责,则通过过滤
return distroMapper.responsible(serviceName);
// 这里的forEach遍历的元素一定是最终需要由当前server处理的服务
}).forEach(entry -> stringServiceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
// 空的service
if (service.isEmpty()) {
// To avoid violent Service removal, the number of times the Service
// experiences Empty is determined by finalizeCnt, and if the specified
// value is reached, it is removed
// 若当前服务为空的次数超出了最大允许值,则删除这个服务,防止暴力删除
if (service.getFinalizeCount() > maxFinalizeCount) {
Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", namespace,
serviceName);
try {
// 删除服务
easyRemoveService(namespace, serviceName);
} catch (Exception e) {
Loggers.SRV_LOG.error("namespace : {}, [{}] services are automatically clean has "
+ "error : {}", namespace, serviceName, e);
}
}
// 计数器 + 1
service.setFinalizeCount(service.getFinalizeCount() + 1);
Loggers.SRV_LOG
.debug("namespace : {}, [{}] The number of times the current service experiences "
+ "an empty instance is : {}", namespace, serviceName,
service.getFinalizeCount());
} else {
// 如果当前service不空instance的话
// 将计数器归零
service.setFinalizeCount(0);
}
return service;
}));
});
}
- 遍历注册表, 如果遍历的元素中的服务数量超过阈值则注册并行流, 如果没有则生成串行流, 去解决清除任务
- 先判断是否是空的service, 如果是的话则若当前服务为空的次数超出了最大允许值,则删除这个服务,防止暴力删除, 并删除服务