Nacos 2.0 架构设计及新模型
参考 https://zhuanlan.zhihu.com/p/344572647
使用GRPC注册临时实例流程图
SpringBoot自动注入
注入对应服务注册的Bean
监听Tomcat启动事件
NacosAutoServiceRegistration 继承了AbstractAutoServiceRegistration 而 AbstractAutoServiceRegistration实现了ApplicationListener接口当tomcat启动之后会发送WebServerInitializedEvent事件 AbstractAutoServiceRegistration监听了WebServerInitializedEvent事件进行后续的注册操作
开始注册服务
构建出来的 Instance信息
临时实例使用GRPC协议注册
这里判断是不是临时节点 临时节点默认为true 会走NamingGrpcClientProxy的注册方法采用grpc协议,nacos2.x版本中临时节点默认都是此协议进行通信。持久化节点使用HTTP请求进行通信
缓存实例信息
开始注册服务信息
发送注册请求
将服务信息改为已注册
服务端接收注册请求
InstanceRequestHandler 继承自RequestHandler所以当RequestHandler执行handler方法的时候会执行InstanceRequestHandler 的handler方法进行逻辑处理
clientOperationService是一个临时实例还是持久化实例的代理类,去管理不同实例的注册行为
Client模型
Nacos2.x以后新增Client模型**。**一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的id(clientId)。Client负责管理一个客户端的服务实例注册Publish和服务订阅Subscribe。我们可以看一下这个模型其实就是一个接口
public interface Client {
// 客户端id/gRPC的connectionId
String getClientId();
// 是否临时客户端
boolean isEphemeral();
// 客户端更新时间
void setLastUpdatedTime();
long getLastUpdatedTime();
// 服务实例注册/注销/查询
boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
InstancePublishInfo removeServiceInstance(Service service);
InstancePublishInfo getInstancePublishInfo(Service service);
Collection<Service> getAllPublishedService();
// 服务订阅/取消订阅/查询订阅
boolean addServiceSubscriber(Service service, Subscriber subscriber);
boolean removeServiceSubscriber(Service service);
Subscriber getSubscriber(Service service);
Collection<Service> getAllSubscribeService();
// 生成同步给其他节点的client数据
ClientSyncData generateSyncData();
// 是否过期
boolean isExpire(long currentTime);
// 释放资源
void release();
}
服务信息Service与Instance创建
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
//缓存服务、缓存命名空间与服务的关系
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
//连接Id作为客户端Id,获取客户端
Client client = clientManager.getClient(clientId);
//检查客户端是否合法:客户端是否存在、客户端是否瞬时
if (!clientIsLegal(client, clientId)) {
return;
}
// 生成服务端存储的instance信息,并记录到Client
InstancePublishInfo instanceInfo = getPublishInfo(instance);
//在客户端添加服务与实例的关系信息
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
// 发布注册服务事件,源码解读见下文:服务变更事件处理
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
// 发布元数据事件,源码解读见下文:管理元数据源码
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
ServiceManager
Service的容器是ServiceManager,但是在com.alibaba.nacos.naming.core.v2包下,容器中Service都是单例。
public class ServiceManager {
private static final ServiceManager INSTANCE = new ServiceManager();
//单例Service,可以查看Service的equals和hasCode方法
private final ConcurrentHashMap<Service, Service> singletonRepository;
//namespace下的所有service
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
.....
}
所以从这个位置可以看出,当调用这个注册方法的时候ServiceManager负责管理Service单例
//通过Map储存单例的Service
public Service getSingleton(Service service) {
singletonRepository.putIfAbsent(service, service);
Service result = singletonRepository.get(service);
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
ClientManager
ClientManager这是一个接口这里我们要看它对应的一个实现类ConnectionBasedClientManager,这个实现类负责管理长连接clientId与Client模型的映射关系
// 根据clientId查询Client
public Client getClient(String clientId) {
return clients.get(clientId);
}
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
添加服务实例信息
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
}
Client实例AbstractClient负责存储当前客户端的服务注册表,即Service与Instance的关系。注意对于单个客户端来说,同一个服务只能注册一个实例。
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
//缓存服务与实例映射关系到客户端的容器、一个客户端对应一个连接
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
可以从下图中看到 我把同一个服务 开了两个不同端口的实例 他每一个客户端id 都不相同 所以Clinet里面publishers这个map是一个service 对应一个实例
持久化节点注册
在Nacos 2.0版本中如果是持久化实例会使用NamingHttpClientProxy进行HTTP请求进行实例的注册
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//拼接请求参数
final Map<String, String> params = new HashMap<String, String>(32);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put(IP_PARAM, instance.getIp());
params.put(PORT_PARAM, String.valueOf(instance.getPort()));
params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
return reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
throws NacosException {
return reqApi(api, params, body, serverListManager.getServerList(), method);
}
发送HTTP请求
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (serverListManager.isDomain()) {
String nacosDomain = serverListManager.getNacosDomain();
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
exception.getErrMsg());
throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
服务端接收HTTP请求进行注册
通过阅读官方文档可以知道,对于HTTP注册来说,服务端处理的API地址为:
http://ip:port/nacos/v1/ns/instance/register
InstanceController
/**
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 获得当前请求中的命名空间信息,如果不存在则使用默认的命名空间
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 获得当前请求中的服务名称,如果不存在则使用默认的服务名称
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
// 检查服务名称是否合法
NamingUtils.checkServiceNameFormat(serviceName);
// 将当前信息构造为一个Instance实例对象
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
// 根据当前对GRPC的支持情况 调用符合条件的处理,支持GRPC特征则调用InstanceOperatorClientImpl
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
判断是否支持GRPC 如果支持GRPC则会使用V2版本 否则使用V1
private InstanceOperator getInstanceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}
V1版本方案进行注册
先来看v1会调用 InstanceOperatorServiceImpl的registerInstance进行注册
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建服务信息,不存在则进行创建,同时创建service、cluster的关系
//同时初始化service的时候,创建服务端的心跳检测判断任务
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 再次获得服务信息
Service service = getService(namespaceId, serviceName);
// 再次判断服务
checkServiceIsNull(service, namespaceId, serviceName);
// 添加实例信息
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
创建服务信息
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//从缓存中获取服务信息
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
//初始化service
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
//关联服务和集群的关系
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
//校验服务名称等是否合规
service.validate();
//初始话服务信息,创建心跳检测任务
putServiceAndInit(service);
if (!local) {
//是否是临时服务,一致性处理
addOrReplaceService(service);
}
}
在putServiceAndInit方法中的核心逻辑就是将当前服务信息放置到缓存中,同时调用初始化方法开启服务端的心跳检测任务,用于判断当前服务下的实例信息的变化,如果有变化则同时客户端.
public void init() {
// 开启当前服务的心跳检测任务
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
添加实例信息
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//构建key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//从缓存中获得服务信息
Service service = getService(namespaceId, serviceName);
//为服务设置一把锁
synchronized (service) {
//这个方法里面就是最核心的对命名空间->服务->cluster->instance
//基于这套数据结构和模型完成内存服务注册,就是在这里
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 真正你的Distro协议生效,主要是在这里,会去走distro的put逻辑
// 会把你的服务实例数据页放在内存里,同时发起一个延迟异步任务的sync的数据复制任务
// 延迟一段时间
consistencyService.put(key, instances);
}
}
对于addIpAddresses方法来说,核心的就是创建起相关的关联关系
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = CollectionUtils.set();
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length);
}
for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
该方法结束以后,命名空间->服务->cluster->instance,这个存储结构的关系就确定了。
V2版本方案进行注册
InstanceOperatorClientImpl
@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
//是否是临时实例
boolean ephemeral = instance.isEphemeral();
//获取客户端id
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
createIpPortClientIfAbsent(clientId);
//获取服务信息
Service service = getService(namespaceId, serviceName, ephemeral);
//注册实例信息
clientOperationService.registerInstance(service, instance, clientId);
}
持久化实例会使用PersistentClientOperationServiceImpl进行服务的信息注册 使用JRaft协议,将数据写入raft集群
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is ephemeral service, can't register persistent instance.",
singleton.getGroupedServiceName()));
}
final InstanceStoreRequest request = new InstanceStoreRequest();
request.setService(service);
request.setInstance(instance);
request.setClientId(clientId);
final WriteRequest writeRequest = WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(request))).setOperation(DataOperation.ADD.name())
.build();
try {
protocol.write(writeRequest);
} catch (Exception e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
服务信息映射事件处理
在上面的流程中,可以看到调用通知中心派发了2个事件:
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId)
new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId()
这里的目的是为了过滤目标服务得到最终Instance列表建立Service与Client的关系,建立Service与Client的关系就是为了加速查询。
ClientServiceIndexesManager类服务处理这个类的监听业务,ClientServiceIndexesManager维护了两个索引:
Service与发布clientId
Service与订阅clientId
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}
//建立Service与发布Client的关系
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
从ClientServiceIndexesManager类的源代码中可以看到,该类注册订阅了4个事件:
客户端注册服务事件、客户端取消注册服务事件、客户端订阅服务事件、客户端取消订阅服务事件
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);
result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);
result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);
result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
return result;
}
这个索引关系建立以后,还会触发ServiceChangedEvent,代表服务注册表变更。对于注册表变更紧接着还要做两个事情:
1.通知订阅客户端
2.Nacos集群数据同步。