客户端篇:
1、NacosAutoServiceRegistration类继承了AbstractAutoServiceRegistration类,AbstractAutoServiceRegistration类实现了ApplicationListener,实现了ApplicationListener接口的类都必须实现一个onApplicationEvent方法,然后spring容器启动时会调用处理事件方法,源码中的调用链:
1、Spring启动AbstractAutoServiceRegistration类中的onApplicationEvent方法
2、执行方法中的bind(event) 绑定事件方法
3、执行start()方法,该方法用于发布事件,publishEvent就是发布事件,源码内没有对InstancePreRegisteredEvent做监听,开发者可以在此进行扩展,核心代码:
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
4、register()方法详解:方法会调用到NacosServiceRegistry类的register方法,该方法中会调用NamingService的registerInstance方法,该方法用于:使用当前服务的实例属性向nacos服务注册实例并添加一个延时执行的定时心跳任务BeatTask,用于给nacos发送当前服务的心还跳着的信息,这样nacos就不会将该服务下掉。
心跳任务创建核心代码:
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);解释:
创建并执行在给定延迟后启用的单次动作
参数1:要执行的任务
参数2:从现在开始延迟执行的事件
参数3:延迟参数的事件单位
new BeatTask(beatInfo)解释:
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long result = serverProxy.sendBeat(beatInfo);
long nextTime = result > 0 ? result : beatInfo.getPeriod();
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
BeatTask是个实现了Runnable的线程类,里面的run方法会在上述步骤参数2延迟时间到了之后进行执行,执行的主要内容就是:serverProxy.sendBeat(beatInfo);,该方法内容:
public long sendBeat(BeatInfo beatInfo) {
try {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(4);
params.put("beat", JSON.toJSONString(beatInfo));
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) {
return jsonObject.getLong("clientBeatInterval");
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), e);
}
return 0L;
}
核心代码就是:String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);,向nacos服务发送了一个:/nacos/v1/ns/instance/beat的put请求,请求体内携带了当前客户端的心跳信息
客户端实例注册方法核心代码:serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
该方法调用了nacos服务的实例注册接口:/nacos/v1/ns/instance,请求体包含:客户端的实例信息,ip、端口号、服务名称等
nacos服务端篇:
/nacos/v1/ns/instance对应的接口就在nacos项目中的InstanceController类中,接下来我们对/nacos/v1/ns/instance方法来进行一个详细的分析,该方法就用于客户端的服务注册,将客户端的实例信息异步的放入一个双重map中进行服务注册,异步的原因是为了给客户端一个快速的响应:
1、接口过来后,程序会执行InstanceController类中的register方法,该方法中的核心方法:serviceManager.registerInstance(namespaceId, serviceName, instance);,接下来就一步步的对这个方法进行详细的解析
2、registerInstance方法内的createEmptyService方法首先会用于将当前客户端的实例信息放入到nacos的双重map内存注册表中去,将结构创好,里面对应的数据还没有填入,核心代码:
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
然后createEmptyService方法链中,会执行一个init()方法,也是核心,代码如下:
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);解释:线程池执行器,创建一个延时任务,初始化5秒后执行,然后以后每过5秒钟执行一次:
futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
↓
GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS):方法内代码如下:
public static ScheduledFuture<?> scheduleNamingHealth(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
/**
* 线程池的执行器
* NAMING_HEALTH_EXECUTOR:延时线程池
* scheduleWithFixedDelay:执行延时线程任务
* 创建并执行一个周期性操作,该操作在给定的初始延迟之后首先启用,然后在一次执行终止和下一次执行开始之间的给定延迟之后启用。
*/
return NAMING_HEALTH_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
该方法会创建一个基于传来的:ClientBeatCheckTask task线程类,没过一段时间执行一次ClientBeatCheckTask类中的run方法,该线程类的run方法的主要作用如下:
1:心跳检测run方法
2:用服务名称hash后对机器数去模,然后选择集群里的一台机器执行任务
3:如果某个实例超过15秒没有收到心跳,则将它的healthy属性值设为false
4:如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)
然后方法内还有个addOrReplaceService方法,该方法是基于持久化存储的时候才会调用,暂不分析
3、registerInstance方法内的第二个核心方法:addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);,该方法为添加客户端实例核心方法,接下来进行解析:
3.1:
// 通过namespaceId、serviceName、ephemeral生成一个key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
3.2:
// 通过namespaceId和serviceName获取客户端实例
Service service = getService(namespaceId, serviceName);
3.3:核心同步方法:
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// instances里有新注册的实例
/**
* consistencyService的put方法有6个实现类
* 看的方法:1:猜 (经验)
* 2:在该行代码打个断点,然后F7进入内部即可知道具体调用了那个实现类 (慢)
* 3:看consistencyService在此类是如何注入的,可以看到是注入了一个
* @Resource(name = "consistencyDelegate")的bean,就可以由
* consistencyDelegate来猜实现类的名称就是:DelegateConsistencyServiceImpl
*/
consistencyService.put(key, instances);
}
3.3.1:同步方法内的: List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);为核心代码;该addIpAddresses方法用于将客户端的实例和老的注册表实例放在一个List<Instance>中,然后进行返回,最后instanceList的值就是:老的注册表实例 + 客户端新传来的实例
3.3.2:同步方法内的: consistencyService.put(key, instances);方法:该consistencyService有多个实现类,需要看一下这个接口注入的实现类是哪一个,然后去看这个实现类的put方法:
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
可以看到,该接口注入的是Bean为consistencyDelegate的实例,然后查找一下该接口的实现类,就找到了对应的实现类DelegateConsistencyServiceImpl,该类在Spring容器中的Bean名称为consistencyDelegate,那么刚才的put方法就是执行的此类中的put方法:
@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService
该类的put方法代码:
/**
* Instances实现了Record接口,所以可以使用Record接口来接收
* @param key key of data, this key should be globally unique
* @param value value of data
* @throws NacosException
*/
@Override
public void put(String key, Record value) throws NacosException {
/**
* mapConsistencyService(key)方法返回的ConsistencyService对象的put也有6个实现类
* 当前如果是临时实例:
* 返回的就是:ephemeralConsistencyService,ephemeralConsistencyService的实现类就是:DistroConsistencyServiceImpl
* 当前如果是持久化实例:
* 返回的就是:persistentConsistencyService,persistentConsistencyService的实现类就是:PersistentConsistencyServiceDelegateImpl
*/
mapConsistencyService(key).put(key, value);
}
因为 Instances实现了Record接口,所以可以使用Record接口来接收,然后该方法内调用了mapConsistencyService(key)来返回一个实例,然后执行这个实例的put方法,看下mapConsistencyService(key)方法:
private ConsistencyService mapConsistencyService(String key) {
/**
* 判断当前key是否以固定的值打头的,是的话就返回的true,不是的话就返回的false
* AP架构
* ephemeralConsistencyService来注册的话:临时实例的注册
* persistentConsistencyService来注册的话:持久实例的注册
*/
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
该方法用于判断该客户端实例是使用临时实例注册还是持久实例注册,当前流程是使用的临时实例注册,也就是注册在内存中,所以返回的是:ephemeralConsistencyService
那么也就是说会调用ephemeralConsistencyService对应实例的put方法,该put方法代码:
public void put(String key, Record value) throws NacosException {
// 核心注册逻辑 onPut方法 将客户端传来的信息封装到一个内存队列当中
onPut(key, value);
// 做一些同步操作,因为方法有sync
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
方法内的核心方法是onPut方法,onPut方法用于将客户端传来的客户端实例信息放入一个阻塞队列当中,代码如下:
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
// 强转成Instances,然后放入Datum里面
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
核心方法为:notifier.addTask(key, DataOperation.CHANGE);,该方法是客户端注册方法调用链路中最后执行的一个方法,该方法内用于将客户端的一些信息放入:private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);这个阻塞队列中,用于Notifier线程处理任务run方法取出来,然后异步的将当前注册的客户端的实例放入双重map中进行注册,注册的行为是异步的,在Notifier线程类中,该线程类的run方法代码如下:
public void run() {
Loggers.DISTRO.info("distro notifier started");
/**
* 死循环,不断的从tasks中取客户端信息,读出来后就进行客户端的注册,这个线程不能停止
*/
for (; ; ) {
try {
//从tasks阻塞队列中去取出一个一个的实例信息,然后直接handler方法
// 只有在tasks阻塞队列中有数据取出来的时候,才会往下执行,否则这里一直会处于阻塞状态,不会占据cpu的资源
Pair<String, DataOperation> pair = tasks.take();
// 进行客户端的注册,将客户端信息放入到注册表的map的value中
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
该run方法内的究极核心方法:handle(pair);,该方法就是最终客户端进行注册的地方,将客户端信息放入到真正的注册表中,也就是双重map中,在handler方法的调用链当中,最核心的方法就是:Cluster.updateIps方法,updateIps方法的核心流程就是:
1:将真正的注册表copy一份放到副本注册表中,用于该方法所有的操作,这个注册表只是某个group组下的stockService内部的实例列表
2:将真正的注册表替换成处理好的注册表副本,注册表副本就是处理好的,直接将处理好的注册表副本直接赋值给真正的注册表,在赋值给真正的注册表这一大段代码之间,是没有用到真正的注册表的,所以这就实现了:copy on write 写时赋值思想
流程图: