目录
- 介绍
- 架构分析
- 添加实例-同步信息给其他集群服务
- 添加实例-提交同步任务
- 添加实例-执行同步任务
- 实例健康状态监控
介绍
Nacos 启动默认会使用集群模式,也就是没有带有-m standalone
的时候就是用的简单集群模式
另外我们再分析单机模式注册实例的时候最后一部分是把本次注册同步给集群其他服务
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),
DataOperation.CHANGE,globalConfig.getTaskDispatchPeriod() / 2);
本文章就来分析下简单集群模式的各种设计和源码
架构分析
简单集群模式下,首先还是实例注册
在集群模式下实例可以注册到任意一台Nacos服务上
Nacos服务自己把本次更新同步给同集群的其他服务
添加实例-同步信息给其他集群服务
sync(DistroKey distroKey, DataOperation action, long delay):
//获取除自身外其他的服务Member
for (Member each : memberManager.allMembersWithoutSelf()) {
//每个 Member在启动的时候都会启动一些列的接口 作为集群 Member 之间通信用
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());
//组装一个任务 任务内容是 送到哪里 啥作用 延时时间
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//添加任务 等待执行
//其实就是把任务加到一个 ConcurrentHashMap<Object, AbstractDelayTask> tasks 中
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
}
添加实例-提交同步任务
有 addTask 肯定就有获取任务并执行的地方
distroTaskEngineHolder.getDelayTaskExecuteEngine():
//创建一个定时任务 默认延时 100 ms
processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
ProcessRunnable#run
processTasks():
//keys = keys.addAll(tasks.keySet());
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
//获取一个任务 DistroDelayTask
AbstractDelayTask task = removeTask(taskKey);
//获取处理方式
//这个处理方式 在 DistroHttpRegistry Bean 注册的时候会在@PostConstruct 方法中设置
//taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
// new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
//但是这里设置的key是String类型的 之前设置任务的时候添加的是DistroKey 所以没有匹配的
//就用的默认的NacosTaskProcessor : new DistroDelayTaskProcessor(this, distroComponentHolder);
NacosTaskProcessor processor = getProcessor(taskKey);
//这里会根据返回值判断是否不需要重试 重试就是把任务重新添加回去
processor.process(task);
}
DistroDelayTaskProcessor#process(NacosTask task):
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
//这里就是把 key 和 DistroSyncChangeTask 建立关系了
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
//DistroExecuteTaskExecuteEngine.addTask(distroKey, executeTask);
//distroKey : DistroKey
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, executeTask);
}
DistroExecuteTaskExecuteEngine#addTask(Object tag, AbstractExecuteTask task):
//这里为空的理由和上面的一样
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
processor.process(task);
return;
}
TaskExecuteWorker worker = getWorker(tag);
//这里做的也就是把 task 在放在一个 Queue中 TaskExecuteWorker@BlockingQueue<Runnable> queue
worker.process(task);
添加实例-执行同步任务
正如前边说的:有存放就有获取
在 TaskExecuteWorker 构造函数中就有 new InnerWorker(name).start();
Runnable task = queue.take();
task.run();
DistroSyncChangeTask#run
//type = KeyBuilder.INSTANCE_LIST_KEY_PREFIX
String type = getDistroKey().getResourceType();
//new DistroHttpAgent(memberManager) 中获取了 Datum (也就是 Instances)
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
//syncData 就是调用 http 发送信息了 /ns/distro/datum 这个不是一个openApi
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
实例健康状态监控
我们知道实例注册后是需要监控健康状态的
但是现在是集群了,集群中只需要有一个服务监控就可以了