文章目录
- 1.InstanceController#beat()
- 1.1 serviceManager.registerInstance()
- 1.2 serviceManager.getService()
- 1.3 处理本次心跳
1.InstanceController#beat()
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
// 创建一个JSON Node,该方法的返回值就是它,后面的代码就是对这个Node进行各种初始化
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
// 从请求中获取到beat,即client端的beatInfo
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
// 将beat构建为clientBeat
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
// 获取到客户端传递来的client的port,其将来用于UDP通信
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// 从注册表中获取当前发送请求的client对应的instance,Ip port对应
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// 处理注册表中不存在该client的instance的情况
if (instance == null) {
// 若请求中没有携带心跳数据,则直接返回
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
// 下面处理的情况是,注册表中没有该client的instance,但其发送的请求中具有心跳数据。
// 在client的注册请求还未到达时(网络抖动等原因),第一次心跳请求先到达了server,会出现这种情况
// 处理方式是,使用心跳数据构建出一个instance,注册到注册表
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
// 注册
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
// 从注册表中获取service
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
// 从请求中获取到beat为null
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// 处理本次心跳
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
- 创建一个JSON Node (result), 该方法的返回值就是它,后面的代码就是对这个Node进行各种初始化
- 从请求中获取到beat,即client端的beatInfo, 将beat构建为clientBeat
- 获取到客户端传递来的client的port,其将来用于UDP通信
- 从注册表中获取当前发送请求的client对应的instance,Ip port对应
- 处理注册表中不存在该client的instance的情况, 若请求中没有携带心跳数据,则直接返回
- 注册表中没有该client的instance,但其发送的请求中具有心跳数据。在client的注册请求还未到达时(网络抖动等原因),第一次心跳请求先到达了server,会出现这种情况, 使用心跳数据构建出一个instance,注册到注册表
- 从注册表中获取service, 并调用service.processClientBeat方法处理本次心跳
1.1 serviceManager.registerInstance()
注册instance到service中。
之前的处理注册请求中分析过此代码。
- 创建一个空service
- 从注册表中获取到service
- instance写入到service,即写入到了注册表
1.2 serviceManager.getService()
从注册表中获取service。
public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
serviceMap为Server端的注册表。
1.3 处理本次心跳
- 创建一个处理器,其是一个任务
- 开启一个立即执行的任务,即执行clientBeatProcessor任务的run()
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取当前服务的所有临时实例
List<Instance> instances = cluster.allIPs(true);
// 遍历所有这些临时实例,从中查找当前发送心跳的instance
for (Instance instance : instances) {
// 只要ip与port与当前心跳的instance的相同,就是了
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 修改最后心跳时间戳
instance.setLastBeat(System.currentTimeMillis());
// 修改该instance的健康状态
// 当instance被标记时,即其marked为true时,其是一个持久实例
if (!instance.isMarked()) {
// instance的healthy才是临时实例健康状态的表示
// 若当前instance健康状态为false,但本次是其发送的心跳,说明这个instance“起死回生”了,
// 我们需要将其health变为true
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
// 发布服务变更事件(其对后续我们要分析的UDP通信非常重要)
getPushService().serviceChanged(service);
}
}
}
}
}
- 获取当前服务的所有临时实例, 只有临时实例才会有心跳
- 遍历所有这些临时实例,从中查找当前发送心跳的instance, 只要ip与port与当前心跳的instance的相同,就是了
- 修改最后心跳时间戳, 这个为主要目的
- 修改该instance的健康状态, 如果没有被标记且不健康的实例为临时实例, 则修改健康状态为健康的
- 发布服务变更事件, 对于后面的UDP通信很重要