一、引言
本章将是我们第二阶段,开始学习集群模式下,Nacos 是怎么去操作的 ?
本章重点:
- 在Nacos服务端当中,会去开启健康心跳检查定时任务。如果是在Nacos集群下,大家思考一下,有没有必要所有的集群实例都去执行开启健康心跳检查定时任务?还是只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 ?大家可以思考一下~
- 既然Nacos有健康心跳检查定时任务,如果微服务健康实例状态发生了改变,这个时候Nacos是怎么把健康实例同步给其他Nacos 集群节点的 ?代码怎么实现的 ?
带着这些问题我们一起往下看吧 ~
二、目录
目录
一、引言
二、目录
三、集群心跳健康检查架构分析
四、集群心跳健康检查选举源码分析
五、集群实例健康状态同步源码分析
六、本章总结
三、集群心跳健康检查架构分析
我们先来分析第一问题。 在Nacos集群下,所有的集群实例都去执行开启健康心跳检查定时任务?还是只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 ?
- 如果是在Nacos集群下,所有的集群实例都去执行开启健康心跳检查定时任务。那么就会出现跑出来结果不一致的问题,那么以哪个集群实例结果为准呢 ?很明显这种方式很不合理。
- 那么就是第二种方式的了,只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 。
第二种方式明显更加靠谱点,逻辑也更加简洁。在Nacos集群当中也是这么做的,所有集群实例都会开启健康心跳检查任务,但是真正执行健康心跳任务检查逻辑的只有一个实例,在执行完成后。会有一个定时任务,把结果同步给其他集群节点。
那我们接下来看看源码当中,Nacos 是怎么去实现的~
四、集群心跳健康检查选举源码分析
既然是 ” 心跳健康检查 “ ,我们还是要看服务端实例注册接口中的 ClientBeatCheckTask 任务:
那我们直接看 ClientBeatCheckTask 当中的 run 方法,一开始有两个 if 判断方法:
// 集群下,判断自身节点是否需要执行心跳健康检查任务,如果不需要,直接 return
if (!getDistroMapper().responsible(service.getName())) {
return;
}
// 判断是否需要开启健康任务检查,默认为: true
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
那么集群下是如何保证只有一台节点去执行定时任务的,关键点就在于第一个判断当中 responsible方法,那我们具体来看下代码:
public boolean responsible(String serviceName) {
// 获取集群节点的数量
final List<String> servers = healthyList;
// 如果为单机模式,就直接返回为 true
if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
return true;
}
// 没有可用健康集群的节点,就直接返回 false
if (CollectionUtils.isEmpty(servers)) {
// means distro config is not ready yet
return false;
}
int index = servers.indexOf(EnvUtil.getLocalAddress());
int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true;
}
// 把 serviceName 的进行 hash操作,然后和 servers.size() 取模,最终只有一个集群节点能够返回 true
int target = distroHash(serviceName) % servers.size();
return target >= index && target <= lastIndex;
}
通过这个方法我们可以得知,在Nacos集群下,只会有一个节点去执行定时任务。那么该节点定时执行完,怎么把结果同步给其他集群节点的呢 ?
我们一起来往下接着看~
五、集群实例健康状态同步源码分析
本节重点:在Nacos集群下,只会有一个节点去执行定时任务。那么该节点定时执行完,怎么把结果同步给其他集群节点的呢 ?
在 ServiceManager 类中,init() 方法被 @PostConstruct 注解修饰,在Spring 创建 Bean的时候,会去执行 init()方法。在这个方法当中,会去开启心跳健康检查同步的定时任务,我们一起来看下~
@PostConstruct
public void init() {
// 同步心跳健康检查异结果异步任务
GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
// 处理 同步心跳健康检查异结果异步任务 内存队列 + 异步任务
GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
// 省略部分代码
}
那我们先来看下 同步心跳健康检查异结果异步任务代码,ServiceReporter 当中的 run() 方法:
我们可以把这块代码分成三个部分,这样看更容易理解:
第一部分:获取当前所有服务,key:命名空间 value:服务名称
// 获取全部服务,key:命名空间 value:服务名称
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
第二部分:遍历 allServiceNames 中的每一个命名空间 ,封装请求参数 ,准备同步健康心跳检查结果
// 遍历 allServiceNames 中的每一个命名空间 ,封装请求参数 ,准备同步健康心跳检查结果
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
// 遍历每一个命名空间对应 serviceName 服务名称
for (String serviceName : allServiceNames.get(namespaceId)) {
if (!distroMapper.responsible(serviceName)) {
continue;
}
Service service = getService(namespaceId, serviceName);
if (service == null || service.isEmpty()) {
continue;
}
service.recalculateChecksum();
// 添加请求参数
checksum.addItem(serviceName, service.getChecksum());
}
// 封装 Message 对象数据,把请求对象转换成JSON
Message msg = new Message();
msg.setData(JacksonUtils.toJson(checksum));
Collection<Member> sameSiteServers = memberManager.allMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
第三部分:同步结果到其他集群节点
for (Member server : sameSiteServers) {
// 判断是否是当前集群的节点,如果是就跳过
if (server.getAddress().equals(NetUtils.localServer())) {
continue;
}
// 重点:同步其他集群节点
synchronizer.send(server.getAddress(), msg);
}
在 synchronizer.send(server.getAddress(), msg); 这个方法当中,会通过HTTP 方式给其他集群节点同步心跳任务健康检查结果:
@Override
public void send(final String serverIP, Message msg) {
if (serverIP == null) {
return;
}
// 创建请求参数
Map<String, String> params = new HashMap<String, String>(10);
params.put("statuses", msg.getData());
params.put("clientIP", NetUtils.localServer());
// 拼接 url 地址
String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
if (IPUtil.containsPort(serverIP)) {
url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ "/service/status";
}
try {
// 异步发送 http 请求,请求地址:http://ip/v1/ns/service/status , 同步心跳健康检查结果
HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {
// 代码省略
});
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
}
}
通过代码可以得知,最终也是通过 HTTP 的方式来进行数据同步的,也能够看出请求地址是v1/ns/service/status。接下来我们一起来看下请求地址对应的接口代码逻辑,其实代码很好找,看下图:
这块代码就不细讲了,主要逻辑就是 判断服务状态是否有变动 ,有变动的话就 包装 ServiceKey 对象, 放入到 toBeUpdatedServicesQueue 阻塞队列当中。
代码如下:
public void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP, String checksum) {
lock.lock();
try {
// 包装 ServiceKey 对象, 放入到 toBeUpdatedServicesQueue 阻塞队列当中
toBeUpdatedServicesQueue.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
} catch (Exception e) {
toBeUpdatedServicesQueue.poll();
toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updated to queue.", e);
} finally {
lock.unlock();
}
}
我们刚刚分析的在 ServiceManager类中的 init 方法(代码如下),第一个线程任务就是同步心跳健康检查结果的异步任务,那么我们接下来分析第二个线程任务。
@PostConstruct
public void init() {
// 同步心跳健康检查异结果异步任务
GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
// 处理 同步心跳健康检查异结果异步任务 内存队列 + 异步任务
GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
// 省略部分代码
}
第二个线程任务类是:UpdatedServiceProcessor,我们从run 方法中(代码如下),能够看出是一个 while 循环,并且是没有结束条件的。在循环的逻辑当中,会从toBeUpdatedServicesQueue阻塞队列中一直取任务,取到任务之后,又是提交了一个线程池任务。
@Override
public void run() {
ServiceKey serviceKey = null;
try {
while (true) {
try {
// 从阻塞队列当中一直获取任务
serviceKey = toBeUpdatedServicesQueue.take();
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
}
if (serviceKey == null) {
continue;
}
// 把任务提交到线程池执行
GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
}
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
}
}
那我们接着看 ServiceUpdater 当中的 run 方法,代码如下:
@Override
public void run() {
try {
// 调用更改健康状态方法
updatedHealthStatus(namespaceId, serviceName, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG
.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
serverIP, e);
}
}
public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
// 解析参数
JsonNode serviceJson = JacksonUtils.toObj(msg.getData());
ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
Map<String, String> ipsMap = new HashMap<>(ipList.size());
for (int i = 0; i < ipList.size(); i++) {
String ip = ipList.get(i).asText();
String[] strings = ip.split("_");
ipsMap.put(strings[0], strings[1]);
}
Service service = getService(namespaceId, serviceName);
if (service == null) {
return;
}
// 是否改变标识
boolean changed = false;
// 遍历全部实例信息,更新健康状态
List<Instance> instances = service.allIPs();
for (Instance instance : instances) {
boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
if (valid != instance.isHealthy()) {
changed = true;
instance.setHealthy(valid);
Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
(instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
instance.getClusterName());
}
}
// 如果实例健康状态改变了,那么就发布 服务改变事件,使用 upd 的方式通知客户端
if (changed) {
pushService.serviceChanged(service);
if (Loggers.EVT_LOG.isDebugEnabled()) {
StringBuilder stringBuilder = new StringBuilder();
List<Instance> allIps = service.allIPs();
for (Instance instance : allIps) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG
.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
service.getName(), stringBuilder.toString());
}
}
}
在上面代码中,注意是先解析我们的 msg.getData()参数,然后获取注册表中全部的 Instance 实例列,进行遍历,在健康状态有变动的情况下,会直接更改它的 healthy 属性。在方法的最后,如果有更新 healthy属性的情况下,最终也会发布服务改变事件来通知客户端进行更新。
六、本章总结
在本章节我们首先知道了,在Nacos集群下,是只有一个集群节点去执行心跳健康检查定时任务的,然后把结果同步给其他集群的节点。那么是怎么同步给其他集群节点的呢 ?
在Nacos 服务端是有一个定时任务,来和其他集群节点进行数据同步的。通过源码分析,我们知道最终也是通过 HTTP 的方式进行同步的,采用了 异步任务 + 阻塞队列的方式 的设计架构。这样的好处就是快,先把任务都给接受放入到阻塞队列当中,就立马返回。然后后台会开启一条线程不断从阻塞队列当中获取任务进行处理。
把本章流程图补充完整: