nacos集群状态同步源码分析
ServerStatusReporter
ServerStatusReporter 是 ServerListManager的内部类
通过@Component注解被解析到spring容器中
再通过@PostConstruct初始化执行init方法
上边代码启动了一个延时2秒的线程
private class ServerStatusReporter implements Runnable {
@Override
public void run() {
try {
if (EnvUtil.getPort() <= 0) {
return;
}
int weight = Runtime.getRuntime().availableProcessors() / 2;
if (weight <= 0) {
weight = 1;
}
long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight
+ "\r\n";
//获取当前服务所有的实例
List<Member> allServers = getServers();
if (!contains(EnvUtil.getLocalAddress())) {
Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",
EnvUtil.getLocalAddress(), allServers);
return;
}
if (allServers.size() > 0 && !EnvUtil.getLocalAddress()
.contains(IPUtil.localHostIP())) {
for (Member server : allServers) {
//排除当前服务的ip
if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {
continue;
}
// This metadata information exists from 1.3.0 onwards "version"
if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
Loggers.SRV_LOG
.debug("[SERVER-STATUS] target {} has extend val {} = {}, use new api report status",
server.getAddress(), MemberMetaDataConstants.VERSION,
server.getExtendVal(MemberMetaDataConstants.VERSION));
continue;
}
Message msg = new Message();
msg.setData(status);
//向集群其他节点发送同步
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
} finally {
//又重新启动延时定时任务线程进行状态同步
GlobalExecutor
.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
}
}
}
上边代码其实就是获取当前服务的所有实例
然后排出当前节点的实例
同步集群其他节点状态
最后在finally中重新注册延时任务线程进行状态同步
节点同步接口
public class ServerStatusSynchronizer implements Synchronizer {
@Override
public void send(final String serverIP, Message msg) {
if (StringUtils.isEmpty(serverIP)) {
return;
}
final Map<String, String> params = new HashMap<String, String>(2);
params.put("serverStatus", msg.getData());
String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status";
if (IPUtil.containsPort(serverIP)) {
url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ "/operator/server/status";
}
try {
//异步发起调用
HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}",
serverIP);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, e);
}
}
@Override
public Message get(String server, String key) {
return null;
}
}
数据新增及变更同步
在nacos服务启动中,会加载ServiceManager为spring的bean对象,执行init()方法,其中会创建定时任务线程池每隔1分钟执行ServiceReporter任务,他就是nacos各个节点间同步服务实例元数据的任务。一下是run()所有内容
ServiceManager
执行nint方法启动定时任务
看下run方法
private class ServiceReporter implements Runnable {
@Override
public void run() {
try {
//获取所有的服务信息
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
//循环
for (String namespaceId : allServiceNames.keySet()) {
//创建需要同步的数据对象,它封装了namespaceId对应的service所有的实例信息
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
for (String serviceName : allServiceNames.get(namespaceId)) {
//只有维持心跳的节点才会向checksum中添加数据,也就是存活节点
if (!distroMapper.responsible(serviceName)) {
continue;
}
Service service = getService(namespaceId, serviceName);
if (service == null || service.isEmpty()) {
continue;
}
///拼接所有实例信息,解析为md5赋值给checksum属性
service.recalculateChecksum();
//添加到checksum中
checksum.addItem(serviceName, service.getChecksum());
}
Message msg = new Message();
msg.setData(JacksonUtils.toJson(checksum));
//拿到所有nacos节点地址
Collection<Member> sameSiteServers = memberManager.allMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
//将消息发送给除自身意外的所有nacos节点
for (Member server : sameSiteServers) {
if (server.getAddress().equals(NetUtils.localServer())) {
continue;
}
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
} finally {
GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
TimeUnit.MILLISECONDS);
}
}
}
大致可以总结为将namespaceId对应的所有实例元数据信息,对于serviceName下所有实例信息,只有维持该serviceName心跳的节点才会对这元数据信息进行处理,将他们都加到一个checksum对象中,然后封装为Message对象中,最后发送给其他所有nacos节点。直到所有namespaceId都遍历结束。
1、集群环境维持每个service心跳的算法,对于一个服务类型会对他的serviceName进行hash,然后对集群节点数量求余,得到一个节点,该节点就是维持该服务类型所对应的所有实例。
2、节点之间同步服务实例数据就是基于1中选出来的节点,每个节点会向其他节点同步自己维持心跳的服务的所有实例
健康检查
nacos开启健康检查是在客户端启动时向服务端发起注册的时候
服务端接口
registerInstance方法
com.alibaba.nacos.naming.core.ServiceManager#registerInstance
com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit
com.alibaba.nacos.naming.core.Service#init
public void init() {
//该方法中通过HealthCheckReactor.scheduleCheck(clientBeatCheckTask)调用了一个clientBeatCheckTask任务线程,进入到scheduleCheck方法中,发现该方法中开启了一个定时任务,这个任务是每隔5s就执行一次CientBeatCheckTask线程操作
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
看下定时任务
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
看下run方法
ClientBeatCheckTask
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
//大于15秒设置为false
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
//通知客户端操作
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
//大于30秒则删除
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
ClientBeatCheckTask线程操作主要包括两点:
先遍历一次所有的实例对象,判断最后一次心跳发送的时间距离当前时间是否超过了设定的值(默认为15s),如果是,则将该实例的health属性改为false
再遍历一次所有的实例对象,判断最后一次心跳发送的时间距离当前时间是否超过了可删除时间的值(默认为30s),如果是,则将该实例从内存中删除。