目录
一、简介
二、健康检查流程
2.1、健康检查
2.2、客户端释放连接事件
2.3、客户端断开连接事件
2.4、小结
2.5、总结图
三、服务剔除
一、简介
Nacos作为注册中心不止提供了服务注册和服务发现的功能,还提供了服务可用性检测的功能,在Nacos 1.x的版本中,临时实例走的是distro协议,客户端向注册中心发送心跳来维持自身的健康(healthy)状态,持久实例则走的是Raft协议存储。
主要有两种检测机制:
- 1)、客户端主动上报机制:主动向Nacos服务端发送心跳,告诉Nacos服务端是否自己还活着。
- 2)、服务器端主动下探机制:Nacos服务端主动向每个Nacos客户端发起探活,如果探活成功,说明客户端还活着,如果探活失败,则服务端将会剔除客户端。
对于Nacos健康检测机制,主要是有两种服务实例类型:
- 临时实例:客户端主动上报机制
- 持久实例:服务端主动下探机制
在1.x版本中,临时实例每隔5秒会主动上报自己的健康状态,发送心跳,如果发送心跳的间隔时间超过15秒,Nacos服务器端会将服务标记为亚健康状态,如果超过30S没有发送心跳,那么服务实例会被从服务列表中剔除。
在Nacos 2.x版本以后,持久实例不变,还是通过服务端主动下探机制,但是临时实例变成通过长连接来判断实例是否健康。
- 长连接: 一个连接上可以连续发送多数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包,在Nacos 2.x之后,使用Grpc协议代替了http协议,长连接会保持客户端和服务端发送的状态,在源码中ConnectionManager 管理所有客户端的长连接。ConnectionManager每3秒检测所有超过20S内没有发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在指定时间内成功响应,则检测通过,否则执行unregister方法移除Connection。
如果客户端持续和服务端进行通讯,服务端是不需要主动下探的,只有当客户端没有一直和服务端通信的时候,服务端才会主动下探操作。
二、健康检查流程
2.1、健康检查
在Nacos2.0之后,使用Grpc协议代替了http协议,Grpc是一个长连接的,长连接会保持客户端和服务端发送的状态,在Nacos源码中ConnectionManager 管理所有客户端的长连接。
ConnectionManager每隔3秒检测所有超过20S内没有发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在指定时间内成功响应,则检测通过,否则执行unregister方法移除Connection。
我们从ConnectionManager类的源码开始分析:
ConnectionManager内部有一个map用于存放当前所有客户端的长连接信息:
/**
* 连接集合
* key: ConnectionId
* value: Connection
*/
Map<String, Connection> connections = new ConcurrentHashMap<>();
当我们启动一个nacos客户端的时候,就会往connections里面保存这个连接信息。
在ConnectionManager类内部,我们发现了存在一个使用@PostConstruct注解标识的方法,说明构造方法执行后就会触发执行start():
/**
* 应用启动的时候执行,首次执行延迟1s,运行中周期为3秒执行一次
* Start Task:Expel the connection which active Time expire.
*/
@PostConstruct
public void start() {
// 初始化runtimeConnectionEjector为NacosRuntimeConnectionEjector
initConnectionEjector();
// 开始执行不健康连接的剔除任务
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> {
// 调用com.alibaba.nacos.core.remote.NacosRuntimeConnectionEjector.doEject
runtimeConnectionEjector.doEject();
}, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
可以看到,start()方法创建了一个定时任务,首次执行延迟1s,后面每隔3s执行一次,实际上就是执行不健康连接的剔除任务。
我们查看runtimeConnectionEjector.doEject()方法:
public void doEject() {
try {
Loggers.CONNECTION.info("Connection check task start");
Map<String, Connection> connections = connectionManager.connections;
int totalCount = connections.size();
MetricsMonitor.getLongConnectionMonitor().set(totalCount);
int currentSdkClientCount = connectionManager.currentSdkClientCount();
Loggers.CONNECTION.info("Long connection metrics detail ,Total count ={}, sdkCount={},clusterCount={}",
totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount));
// 超时的连接集合
Set<String> outDatedConnections = new HashSet<>();
long now = System.currentTimeMillis();
for (Map.Entry<String, Connection> entry : connections.entrySet()) {
Connection client = entry.getValue();
// client.getMetaInfo().getLastActiveTime(): 客户端最近一次活跃时间
// 客户端最近一次活跃时间距离当前时间超过20s的客户端,服务端会发起请求探活,如果失败或者超过指定时间内未响应则剔除服务。
if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
// check out date connection
Loggers.CONNECTION.info("Out dated connection ,size={}", outDatedConnections.size());
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
// 记录成功探活的客户端连接的集合
Set<String> successConnections = new HashSet<>();
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = connectionManager.getConnection(outDateConnectionId);
if (connection != null) {
// 创建一个客户端检测请求
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public long getTimeout() {
return 5000L;
}
@Override
public void onResponse(Response response) {
latch.countDown();
if (response != null && response.isSuccess()) {
// 探活成功,更新最近活跃时间,然后加入到探活成功的集合中
connection.freshActiveTime();
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
Loggers.CONNECTION.info("[{}]send connection active request ", outDateConnectionId);
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
Loggers.CONNECTION.error("[{}]Error occurs when check client active detection ,error={}",
outDateConnectionId, e);
latch.countDown();
}
}
latch.await(5000L, TimeUnit.MILLISECONDS);
Loggers.CONNECTION.info("Out dated connection check successCount={}", successConnections.size());
for (String outDateConnectionId : outDatedConnections) {
// 不在探活成功的集合,说明探活失败,执行注销连接操作
if (!successConnections.contains(outDateConnectionId)) {
Loggers.CONNECTION.info("[{}]Unregister Out dated connection....", outDateConnectionId);
// 注销过期连接
connectionManager.unregister(outDateConnectionId);
}
}
}
Loggers.CONNECTION.info("Connection check task end");
} catch (Throwable e) {
Loggers.CONNECTION.error("Error occurs during connection check... ", e);
}
}
如上代码,比较容易看懂,总体逻辑就是:
- 1、拿到当前所有的连接;
- 2、循环判断每个连接,判断下最近一次活跃时间距离当前时间,是不是超过20s,如果超过20s,将连接ID加入到一个过期连接集合中放着;
- 3、循环过期连接集合中的每个连接,Nacos服务端主动发起一个探活,如果探活成功,将连接ID加入到探活成功的集合中;
- 4、比较过期连接集合、探活成功集合,两者的差集,就是真正探活失败,需要剔除的那些连接,将会执行注销连接操作;
针对探活失败的那些连接,需要执行注销连接,具体代码如下:
// 注销过期连接
connectionManager.unregister(outDateConnectionId);
public synchronized void unregister(String connectionId) {
// 根据connectionId从连接集合中移除这个连接
// Map<String, Connection> connections = new ConcurrentHashMap<>();
Connection remove = this.connections.remove(connectionId);
// 移除成功
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
LOGGER.info("[{}]Connection unregistered successfully. ", connectionId);
// 通知其它客户端,这个连接断开了
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
unregister()方法首先根据connectionId从连接集合中移除这个连接,然后通知其它客户端,这个连接断开了。
继续跟踪clientConnectionEventListenerRegistry.notifyClientDisConnected(remove)的源码:
public void notifyClientDisConnected(final Connection connection) {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
try {
clientConnectionEventListener.clientDisConnected(connection);
} catch (Throwable throwable) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
clientConnectionEventListener.getName(), throwable);
}
}
}
ClientConnectionEventListener其实就是客户端连接事件的一些监听器,看下其类图:
ClientConnectionEventListener主要有三个子类,这里我们关注ConnectionBasedClientManager。
我们查看ConnectionBasedClientManager#clientDisConnected()的源码:
public void clientDisConnected(Connection connect) {
clientDisconnected(connect.getMetaInfo().getConnectionId());
}
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
boolean isResponsible = isResponsibleClient(client);
// 发布客户端释放连接事件
/**
* 具体处理是在:{@link com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager.onEvent}
* 主要做了下面几个事情:
* 1、从订阅者列表中移除所有服务对这个客户端的引用
* 2、从发布者列表中移除所有服务对这个客户端的引用
*/
NotifyCenter.publishEvent(new ClientOperationEvent.ClientReleaseEvent(client, isResponsible));
// 发布客户端断开连接事件
/**
* 具体处理是在:{@link com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager.onEvent}
* 主要做了下面几个事情:
* 1、将服务实例元数据添加到过期集合中
*/
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsible));
return true;
}
可以看到,关键的逻辑就是发布了两个事件:客户端释放连接事件、客户端断开连接事件。
2.2、客户端释放连接事件
具体处理是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager.onEvent:
public void onEvent(Event event) {
if (event instanceof ClientOperationEvent.ClientReleaseEvent) {
// 处理客户端释放连接事件
handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);
} else if (event instanceof ClientOperationEvent) {
// 处理排除ClientReleaseEvent后的其它客户端操作事件
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientDisconnect(ClientOperationEvent.ClientReleaseEvent event) {
Client client = event.getClient();
for (Service each : client.getAllSubscribeService()) {
// 从订阅者列表中移除所有服务对这个客户端的引用
// private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
// key: Service value: 客户端ID集合
removeSubscriberIndexes(each, client.getClientId());
}
DeregisterInstanceReason reason = event.isNative()
? DeregisterInstanceReason.NATIVE_DISCONNECTED : DeregisterInstanceReason.SYNCED_DISCONNECTED;
long currentTimeMillis = System.currentTimeMillis();
for (Service each : client.getAllPublishedService()) {
// 从发布者列表中移除所有服务对这个客户端的引用
removePublisherIndexes(each, client.getClientId());
InstancePublishInfo instance = client.getInstancePublishInfo(each);
NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(currentTimeMillis,
"", false, reason, each.getNamespace(), each.getGroup(), each.getName(),
instance.getIp(), instance.getPort()));
}
}
主要做了两件事情:
- 1、从订阅者列表中移除所有服务对这个客户端的引用;
- 2、从发布者列表中移除所有服务对这个客户端的引用;
2.3、客户端断开连接事件
具体处理是在com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager.onEvent:
public void onEvent(Event event) {
if (event instanceof MetadataEvent.InstanceMetadataEvent) {
// 处理实例元数据事件
handleInstanceMetadataEvent((MetadataEvent.InstanceMetadataEvent) event);
} else if (event instanceof MetadataEvent.ServiceMetadataEvent) {
// 处理服务元数据事件
handleServiceMetadataEvent((MetadataEvent.ServiceMetadataEvent) event);
} else {
// 处理客户端断开连接事件
handleClientDisconnectEvent((ClientEvent.ClientDisconnectEvent) event);
}
}
private void handleClientDisconnectEvent(ClientEvent.ClientDisconnectEvent event) {
for (Service each : event.getClient().getAllPublishedService()) {
String metadataId = event.getClient().getInstancePublishInfo(each).getMetadataId();
if (containInstanceMetadata(each, metadataId)) {
// 实例已过期,将实例元数据添加到过期集合中
updateExpiredInfo(true, ExpiredMetadataInfo.newExpiredInstanceMetadata(each, metadataId));
}
}
}
主要做了一件事情:
- 1、判断实例元数据是否存在,存在的话,将它标志已过期,添加到过期集合中;
2.4、小结
以上就是Nacos服务端健康检查的整体流程,总结一下:
- 1、入口在ConnectionManager.start()方法,该方法有注解@PostConstruct;
- 2、start()方法启动了一个定时任务,3s定时调度一次(每次结束后延迟3s);
- 3、判断哪些客户端最近一次活跃时间已经超过20s,如果超过,判断为连接过期,并把过期的client存放到过期集合中;
- 4、Nacos服务端会对过期的client进行一次探活操作,如果失败或者指定时间内还没有响应,直接剔除该客户端;
- 5、剔除客户端的过程,发布了两个事件:客户端释放连接事件、客户端断开连接事件。拿到订阅者列表、发布者列表,移除掉所有服务对这个client的引用,保证服务不会引用到过期的client;
2.5、总结图
三、服务剔除
前面健康检查我们主要分析了ConnectionBasedClientManager这个类,细心的朋友可能会发现ConnectionBasedClientManager的构造方法其实启动了一个定时任务,如下所示:
public ConnectionBasedClientManager() {
// 启动了一个定时任务,无延迟,每隔5s执行一次
// 具体就是执行ExpiredClientCleaner.run()方法
GlobalExecutor
.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this), 0, Constants.DEFAULT_HEART_BEAT_INTERVAL,
TimeUnit.MILLISECONDS);
}
这个定时任务,每隔5s就会执行一次,具体就是执行ExpiredClientCleaner.run()方法:
private static class ExpiredClientCleaner implements Runnable {
private final ConnectionBasedClientManager clientManager;
public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {
this.clientManager = clientManager;
}
@Override
public void run() {
long currentTime = System.currentTimeMillis();
for (String each : clientManager.allClientId()) {
// 判断客户端是否超时
ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
if (null != client && client.isExpire(currentTime)) {
// 超时连接处理
clientManager.clientDisconnected(each);
}
}
}
}
上面这个clientManager.clientDisconnected(each)超时连接处理,我们在前面已经分析过了,这里不再分析,关键的逻辑就是发布了两个事件:客户端释放连接事件、客户端断开连接事件。