目录
- Nacos 2.1.X
- 注册实例
- 入口
- 接口流程
- Client 注册 事件处理
- 服务订阅
- 入口
Nacos 2.1.X
注册实例
入口
com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler#handle
Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
}
接口流程
registerInstance(Service service, Instance instance, String clientId):
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent());
registerInstance(Service service, Instance instance, String clientId):
//获取一个服务实例
//如果 singletonRepository 没有service 则发布 MetadataEvent.ServiceMetadataEvent 并设置到Map中
//ConcurrentHashMap<Service, Service> singletonRepository
//ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
//2.1.x 开始 临时实例走 gRpc 持久实例还是走http
//也就是只优化了常用的临时实例 不常用的就没有优化
throw new NacosRuntimeException();
}
//根据 clientId 获取一个 client
//这个对象就是对应一个连接端信息 一个连接就有一个 clientId 对应有一个 client
//ConcurrentMap<String, IpPortBasedClient> clients
Client client = clientManager.getClient(clientId);
InstancePublishInfo instanceInfo = getPublishInfo(instance);
//维护 ConcurrentHashMap<Service, InstancePublishInfo> publishers
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
//给 client 赋值
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent();
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent();
这个主流程可以看出来,其实就做了2件事就是
- 组装各种对象 放入各种 Map 中
- 发布各种事件
我们看下发布的事件:
RegisterInstanceTraceEvent
ServiceMetadataEvent
ClientRegisterServiceEvent
InstanceMetadataEvent
如果要分析注册 那么应该看 ClientRegisterServiceEvent
事件处理
Client 注册 事件处理
com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation
//ConcurrentMap<Service, Set<String>> publisherIndexes : Set<String> By service Map
publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
//这里发布了服务变更事件 也就是说注册已经完成了 也就是说注册其实就是写入这个Map
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
那这些 Map 到底有什么用呢?可以梳理一下查询服务逻辑看一下是怎么关联的
服务订阅
入口
com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle
Service service = Service.newService(namespaceId, groupName, serviceName, true);
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
namespaceId, groupedServiceName, 0, request.getClusters());
// 这一句比较关键 下边详细分析
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());
if (request.isSubscribe()) {
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
}
//从这里看一看出服务列表就是在 serviceInfo 中
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp()):
serviceStorage.getData(service):
// 看下缓存有不 没有就创建一个
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
getPushData(service):
ServiceInfo result = emptyServiceInfo(service);
//ConcurrentHashMap<Service, Service> singletonRepository 这个有了
Service singleton = ServiceManager.getInstance().getSingleton(service);
//根据 Service 获取所有 Instances
result.setHosts(getAllInstancesFromIndex(singleton));
serviceDataIndexes.put(singleton, result);
return result;
List<Instance> getAllInstancesFromIndex(Service service):
//ConcurrentMap<Service, Set<String>> publisherIndexes 根据service获取 Set<clientId>
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
//ConcurrentMap<String, IpPortBasedClient> clients
//ConcurrentHashMap<Service, InstancePublishInfo> publishers
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
InstancePublishInfo publishInfo = instancePublishInfo.get();
if (publishInfo instanceof BatchInstancePublishInfo) {
BatchInstancePublishInfo batchInstancePublishInfo = (BatchInstancePublishInfo) publishInfo;
List<Instance> batchInstance = parseBatchInstance(service, batchInstancePublishInfo, clusters);
result.addAll(batchInstance);
} else {
Instance instance = parseInstance(service, instancePublishInfo.get());
result.add(instance);
clusters.add(instance.getClusterName());
}
}
}
//维护 ConcurrentMap<Service, Set<String>> serviceClusterIndex
serviceClusterIndex.put(service, clusters);
return new LinkedList<>(result);