文章目录
- InstanceController.register()
- 1.获取到请求中指定属性的值
- 2.通过请求参数组装出instance
- 3.将instance写入到注册表
- 3.1 创建一个空的service临时的实例
- 3.1.1 重写计算校验和
- 3.1.2 将service写入到注册表
- 3.1.2.1 将service写入注册表
- 3.1.2.2 初始化service内部健康检测任务
- 3.1.2.2.1 定时清除过期instance任务
- 3.1.2.2.2 cluster的健康检测任务
- 3.1.3 持久实例到其他Nacos Server
- 3.2 将instance写入到service
- 4.总结
InstanceController.register()
InstanceController#register(): 处理Nacos Client的注册请求。
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 从请求中获取指定属性的值
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 从请求中获取指定属性的值
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
// 检测serviceName名称是否合法
checkServiceNameFormat(serviceName);
// 通过请求参数组装出instance
final Instance instance = parseInstance(request);
// 将instance写入到注册表
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
- 从请求中获取指定属性的值, namespaceId和serviceName, 然后检查serviceName名称是否合法
- 通过请求参数组装出instance
- 最后将instance写入注册表中
1.获取到请求中指定属性的值
获取到请求中指定属性namespaceId和serviceName
WebUtils#optional()
public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {
// 若请求map中不包含指定属性的值,或其值为null,则直接返回给定的默认值
if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0] == null) {
return defaultValue;
}
// 从请求中获取到指定属性的值,若其值仅为“空白字符”,则直接返回给定的默认值
String value = req.getParameter(key);
if (StringUtils.isBlank(value)) {
return defaultValue;
}
String encoding = req.getParameter("encoding");
// 使用指定的字符编码,对属性值进行解码
return resolveValue(value, encoding);
}
private static String resolveValue(String value, String encoding) {
if (StringUtils.isEmpty(encoding)) {
encoding = StandardCharsets.UTF_8.name();
}
try {
// 解码
value = HttpUtils.decode(new String(value.getBytes(StandardCharsets.UTF_8), encoding), encoding);
} catch (UnsupportedEncodingException ignore) {
}
return value.trim();
}
- 若请求map中不包含指定属性的值,或其值为null,则直接返回给定的默认值 public
- 请求中获取到指定属性的值,若其值仅为“空白字符”,则直接返回给定的默认值 public
- 使用指定的字符编码,对属性值进行解码
WebUtils#required()
public static String required(final HttpServletRequest req, final String key) {
// 若请求中不包含指定属性值,则抛出异常
String value = req.getParameter(key);
if (StringUtils.isEmpty(value)) {
throw new IllegalArgumentException("Param '" + key + "' is required.");
}
String encoding = req.getParameter("encoding");
// 解码
return resolveValue(value, encoding);
}
若请求中不包含指定属性值,则抛出异常。
checkServiceNameFormat(): 检测serviceName名称是否合法, 必须以@@连接两个字符串
private void checkServiceNameFormat(String combineServiceName) {
// 必须以@@连接两个字符串
String[] split = combineServiceName.split(Constants.SERVICE_INFO_SPLITER);
if (split.length <= 1) {
throw new IllegalArgumentException(
"Param 'serviceName' is illegal, it should be format as 'groupName@@serviceName");
}
}
2.通过请求参数组装出instance
parseInstance()
private Instance parseInstance(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String app = WebUtils.optional(request, "app", "DEFAULT");
// 通过请求中的数据组装出一个instance
Instance instance = getIpAddress(request);
// 初始化instance
instance.setApp(app);
instance.setServiceName(serviceName);
// Generate simple instance id first. This value would be updated according to
// INSTANCE_ID_GENERATOR.
instance.setInstanceId(instance.generateInstanceId());
instance.setLastBeat(System.currentTimeMillis());
String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
if (StringUtils.isNotEmpty(metadata)) {
instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
}
// 验证instance
instance.validate();
return instance;
}
- 通过请求中的数据组装出一个instance, getIpAddress(request)
- 初始化instance, 加入各种属性, 比如app serviceName instanceId, 最后心跳时间和元数据
- 验证instance
通过请求获取一个instance
private Instance getIpAddress(HttpServletRequest request) {
// 从请求中获取各种属性
final String ip = WebUtils.required(request, "ip");
final String port = WebUtils.required(request, "port");
String cluster = WebUtils.optional(request, CommonParams.CLUSTER_NAME, StringUtils.EMPTY);
if (StringUtils.isBlank(cluster)) {
cluster = WebUtils.optional(request, "cluster", UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
String enabledString = WebUtils.optional(request, "enabled", StringUtils.EMPTY);
boolean enabled;
if (StringUtils.isBlank(enabledString)) {
enabled = BooleanUtils.toBoolean(WebUtils.optional(request, "enable", "true"));
} else {
enabled = BooleanUtils.toBoolean(enabledString);
}
boolean ephemeral = BooleanUtils.toBoolean(
WebUtils.optional(request, "ephemeral", String.valueOf(switchDomain.isDefaultInstanceEphemeral())));
String weight = WebUtils.optional(request, "weight", "1");
boolean healthy = BooleanUtils.toBoolean(WebUtils.optional(request, "healthy", "true"));
// 使用获取到的属性值装配一个instance
Instance instance = new Instance();
instance.setPort(Integer.parseInt(port));
instance.setIp(ip);
instance.setWeight(Double.parseDouble(weight));
instance.setClusterName(cluster);
instance.setHealthy(healthy);
instance.setEnabled(enabled);
instance.setEphemeral(ephemeral);
return instance;
}
在请求中获取到参数, 然后设置在instance中。
instance.validate(): 验证instance
3.将instance写入到注册表
ServiceManager#registerInstance()
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建一个空service,
// 注意,第三个参数为true,表示 临时实例
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 从注册表中获取到service
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 将instance写入到service,即写入到了注册表
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
- 创建一个空的service, 临时的实例
- 从注册表中获取service
- 将instance写入到service, 写入到注册表中
3.1 创建一个空的service临时的实例
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 从注册表中获取service
Service service = getService(namespaceId, serviceName);
// 若当前注册instance是其提供服务的第一个实例,则注册表中是没有该service的,
// 此时会创建一个service实例
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
// 重新计算校验和
service.recalculateChecksum();
if (cluster != null) {
// cluster与service建立联系
// n:1
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 将service写入到注册表
putServiceAndInit(service);
// 对持久实例的操作
if (!local) {
addOrReplaceService(service);
}
}
}
// 从服务端的注册表中获取Service
public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
- Service#recalculateChecksum(): 重写计算校验和
- ServiceManager#putServiceAndInit(): 将service写入到注册表
- Service#addOrReplaceService(): 持久实例到其他Nacos Server
3.1.1 重写计算校验和
Service#recalculateChecksum(): 重写计算校验和
public synchronized void recalculateChecksum() {
// 获取当前service所包含的所有instance列表
List<Instance> ips = allIPs();
StringBuilder ipsString = new StringBuilder();
// 将service所数据追加到ipsString
ipsString.append(getServiceString());
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("service to json: " + getServiceString());
}
if (CollectionUtils.isNotEmpty(ips)) {
Collections.sort(ips);
}
// 遍历所有instances,将它们的数据进行追加
for (Instance ip : ips) {
String string = ip.getIp() + ":" + ip.getPort() + "_" + ip.getWeight() + "_" + ip.isHealthy() + "_" + ip
.getClusterName();
ipsString.append(string);
ipsString.append(",");
}
// 最终获取到当前service的所有数据,经MD5加密后赋值给checksum
checksum = MD5Utils.md5Hex(ipsString.toString(), Constants.ENCODE);
}
// Service.java
public List<Instance> allIPs() {
List<Instance> result = new ArrayList<>();
// 遍历当前service所包含的所有cluster
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
// 将当前遍历cluster中包含的所有instance添加到result
result.addAll(entry.getValue().allIPs());
}
return result;
}
// Cluster.java
public List<Instance> allIPs() {
List<Instance> allInstances = new ArrayList<>();
// 添加持久实例
allInstances.addAll(persistentInstances);
// 添加临时实例
allInstances.addAll(ephemeralInstances);
return allInstances;
}
- Service.allIPS() -> Cluster.allIPS(): 获取当前service所包含的所有instance列表
- 遍历当前service所包含的所有cluster
- 将当前遍历cluster中包含的所有instance添加到result, 返回result
- 遍历所有instances,将它们的数据进行追加, 最终获取当前Service的所有数据, 经MD5加密后赋值给checksum
Service的一个属性为checksum, 为校验和, 是当前Service的所有SCI信息的字符串拼接。
3.1.2 将service写入到注册表
ServiceManager#putServiceAndInit(): 将service写入到注册表
- 将service写入注册表
- 初始化service内部健康检测任务
- 给nacos集合中的当前服务的持久实例、临时实例添加监听
3.1.2.1 将service写入注册表
ServiceManager#putService(): 将service写入注册表serviceMap
public void putService(Service service) {
// 双重检测锁机制 Double Check Lock,DCL
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
}
}
}
// 写入到注册表map
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
3.1.2.2 初始化service内部健康检测任务
Service#init(): 初始化service内部健康检测任务
public void init() {
// 开启定时清除过期instance任务
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
// 开启了当前service所包含的所有cluster的健康检测任务
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
// 开启当前遍历cluster的健康检测任务:
// 将当前cluster包含的所有instance的心跳检测任务定时添加到一个任务队列
// taskQueue,即将当前cluster所包含的持久实例的心跳任务添加到taskQueue
entry.getValue().init();
}
}
开启定时清除过期instance任务, 然后开启了当前service所包含的所有cluster的健康检测任务
3.1.2.2.1 定时清除过期instance任务
Service#init(): 初始化service内部健康检测任务
ClientBeatCheckTask#run(): 开启定时清除过期instance任务
@Override
public void run() {
try {
// 若当前service不用当前Server负责,则直接结束
if (!getDistroMapper().responsible(service.getName())) {
return;
}
// 若当前服务没有开启检测检测功能,则直接结束
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
// 获取当前服务的所有临时实例
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
// 遍历当前服务的所有临时实例
for (Instance instance : instances) {
// 若当前时间距离上次心跳时间已经超过了15s,则将当前instance状态设置为不健康
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
// 若instance的marked属性不为true,则当前instance可能是临时实例
// marked属性若为true,则instance一定为持久实例
if (!instance.isMarked()) {
// 将healthy状态设置为false
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
// 发布状态变更事件
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
// 遍历所有临时实例
for (Instance instance : instances) {
// 若当前instance被标记了,说明其为过期的持久实例,直接跳过
if (instance.isMarked()) {
continue;
}
// 若当前时间与上次心跳时间间隔超过了30s,则将当前instance清除
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
// 清除
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
- 若当前service不用当前Server负责,则直接结束
- 当前服务没有开启检测检测功能,则直接结束
- 获取当前服务的所有临时实例, 遍历当前服务的所有临时实例, 判断是否心跳时间超过15s, 是否是临时实例, 将healthy状态设置为false
- 遍历所有临时实例, 如果是过期的持久实例, 则直接跳过, 若当前时间与上次心跳时间间隔超过了30s,则将当前instance清除
deleteIp()
deleteIp(): 清除当前instance。
private void deleteIp(Instance instance) {
try {
// 构建并初始化一个request
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
// 构建一个访问自己的请求url
String url = "http://127.0.0.1:" + ApplicationUtils.getPort() + ApplicationUtils.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
// delete instance asynchronously:
// 调用 Nacos 自研的HttpClient完成Server间的请求提交,
// 该HttpClient是对Apache的Http异步Client的封装
HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJson(), result.getMessage(), result.getCode());
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
}
}
通过构建的request和url, 调用 Nacos 自研的HttpClient完成Server间的请求提交
3.1.2.2.2 cluster的健康检测任务
Cluster#init(): 初始化Cluster内部健康检测任务
先创建健康检查任务, 再开启。
HealthCheckTask#run(): 健康检查。
@Override
public void run() {
try {
if (distroMapper.responsible(cluster.getService().getName()) && switchDomain
.isHealthCheckEnabled(cluster.getService().getName())) {
// 处理该任务
healthCheckProcessor.process(this);
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG
.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());
}
}
} catch (Throwable e) {
Loggers.SRV_LOG
.error("[HEALTH-CHECK] error while process health check for {}:{}", cluster.getService().getName(),
cluster.getName(), e);
} finally {
if (!cancelled) {
// 开启下一次任务
HealthCheckReactor.scheduleCheck(this);
// worst == 0 means never checked
if (this.getCheckRtWorst() > 0 && switchDomain.isHealthCheckEnabled(cluster.getService().getName())
&& distroMapper.responsible(cluster.getService().getName())) {
// TLog doesn't support float so we must convert it into long
long diff =
((this.getCheckRtLast() - this.getCheckRtLastLast()) * 10000) / this.getCheckRtLastLast();
this.setCheckRtLastLast(this.getCheckRtLast());
Cluster cluster = this.getCluster();
if (Loggers.CHECK_RT.isDebugEnabled()) {
Loggers.CHECK_RT.debug("{}:{}@{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
cluster.getService().getName(), cluster.getName(), cluster.getHealthChecker().getType(),
this.getCheckRtNormalized(), this.getCheckRtWorst(), this.getCheckRtBest(),
this.getCheckRtLast(), diff);
}
}
}
}
}
- try-cache中处理该任务
- finally中开启下一次任务
public void process(HealthCheckTask task) {
// 获取当前cluster中包含的持久实例
List<Instance> ips = task.getCluster().allIPs(false);
if (CollectionUtils.isEmpty(ips)) {
return;
}
// 遍历所有持久实例
for (Instance ip : ips) {
// 若当前遍历instance过期,则跳过
if (ip.isMarked()) {
if (SRV_LOG.isDebugEnabled()) {
SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp());
}
continue;
}
if (!ip.markChecking()) {
SRV_LOG.warn("tcp check started before last one finished, service: " + task.getCluster().getService()
.getName() + ":" + task.getCluster().getName() + ":" + ip.getIp() + ":" + ip.getPort());
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getTcpHealthParams());
continue;
}
// 生成一个心跳实例
Beat beat = new Beat(ip, task);
// 将心跳实例写入到queue
taskQueue.add(beat);
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
}
- 获取当前cluster中包含的持久实例
- 遍历所有持久实例, 若instance过期, 则跳过
- 生成一个心跳实例, 心跳实例放入任务队列taskQueue, 即将当前cluster所包含的持久实例的心跳任务添加到taskQueue
3.1.3 持久实例到其他Nacos Server
Service#addOrReplaceService(): 持久实例到其他Nacos Server
// 一致性服务
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
public void addOrReplaceService(Service service) throws NacosException {
// 将这个service同步到其它nacos server
consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);
}
3.2 将instance写入到service
ServiceManager#updateIpAddresses(): 修改当前service的instance列表, 添加或者删除实例
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
// 从其它nacos获取当前服务数据(临时实例数据)
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
// 获取本地注册表中当前服务的所有临时实例
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
// 遍历注册表中获取到的实例
for (Instance instance : currentIPs) {
// 将当前遍历的instance写入到map,key为ip:port,value为instance
currentInstances.put(instance.toIpAddr(), instance);
// 将当前遍历的instanceId写入到一个set
currentInstanceIds.add(instance.getInstanceId());
}
Map<String, Instance> instanceMap;
if (datum != null) {
// 将注册表中主机的instance数据替换掉外来的相同主机的instance数据
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length);
}
for (Instance instance : ips) {
// 若当前service中不包含当前要注册的instance所属cluster,则创建一个
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
// 初始化cluster的健康检测任务
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
// 若当前操作为清除操作,则将当前instance从instanceMap中清除,
// 否则就是添加操作,即将当前instance添加到instanceMap中
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
- 从其它nacos获取当前服务数据(临时实例数据)
- 获取本地注册表中当前服务的所有临时实例
- 遍历注册表中获取到的实例, 将当前遍历的instance写入到map, 将当前遍历的instanceId写入到一个set
- 将注册表中主机的instance数据替换掉外来的相同主机的instance数据
- 若当前service中不包含当前要注册的instance所属cluster,则创建一个, 初始化cluster的健康检查任务
- 返回修改后的instance列表
将注册表中主机的instance数据替换掉外来的相同主机的instance数据 setValid():
4.总结
- 创建空的Service
- 将instance写入到service,即写入到了注册表
创建空的Service: 需要将service写入到注册表, 并且初始化service内部健康检测任务, 在初始化service内部健康的时候会开启定时清除过期instance任务和开启当前service的cluster的健康检查任务。
将instance写入到service,即写入到了注册表