本文将详细介绍 Nacos 客户端在启动时进行自动注册原理, 以及Nacos服务器是如何处理客户端的注册与订阅请求的;
本文会附带源码解读, 但不会死抠每一行代码, 主要是梳理整个流程, 过程中的关键步骤, 都会由思维导图的形式展现出来;
如果在阅读过程中对文中提到的 SpringBoot 启动过程以及扩展机制不太了解, 参考这篇文章 SpringBoot启动流程与配置类处理机制详解, 附源码与思维导图, 强烈建议学习后再来读本文;
Nacos注册中心
Nacos 1.X 版本中, 客户端通过发送HTTP请求进行服务注册与发现; 通过一个POST请求进行服务注册, 通过一个GET请求进行服务发现;
2.X 版本增加了grpc 的通信方式, 默认通过 gRPC 进行服务注册发现;
以下分析使用2.0.4版本
在 spring-cloud 的高版本中, 不需要加@EnableDiscoveryClient注解也能正常使用 Nacos 注册中心, 例如 spring-cloud 2021.0.4 版本中, @EnableDiscoveryClient 注解通过 @Import 注解引入了一个ImportSelector, 注册了一个配置类, 但是这个配置类基本是个空的, 只是注册了一个参数类, Nacos 还用不上;
在 SpringCloud 标准中, 定义了 DiscoveryClient 接口, 用于与服务注册中心通信; Nacos 提供了这个接口的实现类, 用于与Nacos服务注册中心通信;
在 Nacos 提供的自动配置类 NacosDiscoveryClientConfiguration
中, 向 Spring 容器注册 NacosDiscoveryClient
对象; 所以才能在代码中直接注入;
注册中心领域模型
-
当一个服务进行服务发现时, 只能发现与自己相同命名空间, 相同Group 的服务;
-
划分集群是为了就近访问和容灾; 通过配置文件可以配置自己所属的集群, 拉取服务时可以使用
NamingService
拉取特定集群下的实例; -
1.X 版本中在Nacos Server 内部采用一个多层的 Map 结构来保存这种模型; 外层Map的 key 是命名空间; 内层 Map 的 key 是分组名 + 服务名, value 是一个 Service 对象, 对应一个具体服务
在1.X 版本中
- 一个 Service 对象中采用
Map<String, Cluster>
保存了这个服务下的所有集群 Cluster; - 一个 Cluster 对象中采用两个
Set<Instance>
保存了集群下所有的临时实例和持久实例 Instance; - 一个 Instance 对象就与一个服务实例相对应;
在2.X版本中, 是将 Service 和 ClientId( 随机数 + IP + 端口) 保存在
ClientServiceIndexesManager
- 一个 Service 对象中采用
-
注册服务时, 可以添加一些元信息, 例如版本号; 这些信息会以Map形式保存;
服务器启动
Tomcat启动
Tomcat 默认启动在 8848 端口; 启动完成后由各种 Controller , 例如 InstanceController
, 处理客户端的HTTP请求;
gRPC启动
2.X 版本, 新增了 gRPC 的通信方式, gRPC 监听 8848 + 1000 的端口;
原理是在一个 BeanPostProcessor 中织入了 gRPC server 启动的代码, startServer 方法;
9848 端口监听客户端的请求, 9849用于集群之间的数据同步;
Nacos gRPC Server启动| ProcessOn免费在线作图,在线流程图,在线思维导图
客户端自动注册
NamingService
是 Nacos 提供的一个核心接口,用于服务发现和服务注册。导入 nacos-client 包后就可以使用这个接口;
public static void main(String[] args) throws NacosException, IOException {
Properties properties = new Properties();
properties.setProperty("serverAddr","192.168.0.108:8848");
NamingService naming = NamingFactory.createNamingService(properties);
// 服务注册
System.out.println("-----服务注册-----");
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
// 服务发现
System.out.println("-----服务发现-----");
System.out.println(naming.getAllInstances("nacos.test.3"));
System.in.read();
}
引入了 Nacos 客户端依赖后, SpringBoot 启动的什么阶段去发送服务注册和发现的请求?
使用的是ApplicationListener
这个扩展点, 事件监听机制;
Spring Boot启动调用Nacos API| ProcessOn免费在线作图,在线流程图,在线思维导图
-
在 SpringBoot 内嵌的 Servlet 容器初始化完成并完成启动 (onRefresh方法内启动) 时, SpringBoot 发布
WebServerInitializedEvent
事件, Nacos-Client 定义的监听器AbstractAutoServiceRegistration
监听到这个事件, 执行onApplicationEvent
回调方法onRefresh 方法是在初始化事件发布器和注册 ApplicationListener 之间调用的, 还没注册 ApplicationListener 之前就发布了
WebServerInitializedEvent
?是的, 事件会先暂存到 Multicaster;
-
回调方法内会层层调用, 来到一个
ServiceRegistry
的register
方法; -
Nacos 提供了这个接口的实现类 (SPI机制) , 在其
register
方法中, 会创建NacosNamingService
实例, 调用其rigisterInstance
方法, 方法要传递的参数从配置文件读取; 从而实现服务的注册;为什么不直接从容器取
NacosNamingService
?因为这时候还进行到实例化 Bean的阶段, 取不到的;
registerInstance
-
在 2.X 版本中,
registerInstance
方法默认使用的 gRPC 的方式与 Nacos Server 通信, 通过 gRPC 长连接, 发送一个 InstanceRequest, 注册当前服务; -
使用 gRPC 还是 HTTP, 是根据当前 Client 是否是临时结点来定的, 临时结点则使用 gRPC, 否则使用 HTTP;
临时结点是在描述 Nacos Client 的类型, 临时结点指服务挂掉后, Nacos Server 会把这个服务的 URL 删除;
-
在 1.X 的版本中都是 HTTP 的方式了; 用一个 HTTP 工具类发送一个 POST请求到注册中心, 路径是
/nacos/v1/ns/instance
;
服务端处理注册请求
1.X
Nacos 1.x服务端处理服务注册| ProcessOn免费在线作图,在线流程图,在线思维导图
2.X
Nacos 2.x服务端处理服务注册| ProcessOn免费在线作图,在线流程图,在线思维导图
以临时结点注册为例:
-
gRPC Request 到达后, 在服务器端的
InstanceRequestHandler
的handle
方法中处理, 在handle
方法中会提取出Service
对象, 包括命名空间: 分组: 服务名; -
如果是一个
REGISTER_INSTANCE
的请求(对应客户端发送的是 InstanceRequest), 就调用registerInstance
方法; 以下均在registerInstance
方法内部; -
在该方法内部, 尝试从缓存中获取对应的 Service , 如果有, 说明这个服务之前已经有实例注册过; 如果没有, 说明这是第一次出现这个服务的实例, 需要新创建 Service, 放到缓存中;
-
然后根据请求的 ConnectionId, 获取与之对应的 Client 对象; 一个 Client 对象就对应一个 Nacos 客户端实例;
-
Client 内保存了客户端的 ClientId ( 随机数 + IP + 端口), 连接状态, 最后一次心跳的时间, 用 两个 Map 保存了这个客户端发布了哪些服务实例, 订阅了哪些服务 (订阅就对应了 NamingService的 subscribe 方法);
public abstract class AbstractClient implements Client { // 当前客户端发布的服务以及对应的实例; protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1); // 当前客户端订阅的服务以及客户端作为订阅者的信息; protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1); }
-
然后将本次注册实例封装成一个 PublishInfo, 将 Service 和 PublishInfo 添加到与之对应的 Client 中并进行处理;
在 Client中, 用一个 Map, 叫 publishers 保存被添加的 Service 和 Instance ; 然后会发布
ClientChangeEvent
, 用于集群之间的数据同步, 从这里可以看出是临时节点的注册信息同步是AP模式, 使用的是Distro协议;如果是持久结点, 是CP模式, 基于Raft 协议
-
更新这个Client 的
LastUpdateTime
; -
发布
ClientRegisterServiceEvent
事件, 表示有服务注册行为发生;由
ClientServiceIndexesManager
去处理, 在这个类里, 用 Map 保存了所有服务和实例, 具体是一个Map<Service, Set<String>>; 这里的String 实际上是实例实际上是 ClientId;
还保存了一个服务有哪些订阅者,
subscriverIndexes
;所以在 2.X 版本中, 服务以及实例是通过这种方式保存的
ClientServiceIndexesManager
将本次注册对应的客户端和服务添加到自己的Map中保存;然后发布一个
ServiceChangedEvent
;ServiceChangedEvent
事件在NamingSubscriberServiceV2Impl
的onEvent
方法中被处理, Nacos 通过 Push 引擎把最新的实例信息推送给所有订阅了这个服务的客户端;因为服务发生变更了, 多了一个实例, 所以要通知所有订阅的客户端;
这里和服务订阅的时候有些差异, 服务订阅的时候只把被订阅的服务的实例信息推送给单个订阅方一次, 不会推给别人
if (event instanceof ServiceEvent.ServiceChangedEvent) { // If service changed, push to all subscribers. ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event; Service service = serviceChangedEvent.getService(); delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay())); } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) { // If service is subscribed by one client, only push this client. ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event; Service service = subscribedEvent.getService(); delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId())); }
订阅了相关服务的客户端收到注册信息后, 将信息保存在自己的缓存中;
-
发布
InstanceMetaDataEvent
事件, 用于更新元数据;
总结下来是这么几步:
- 提取命名空间, 分组, 服务名等信息, 封装为 Service;
- 尝试从服务器缓存中取出对应的 Service, 如果有, 说明不是第一次注册该服务的实例; 如果没有, 需要把新的 Service 放到缓存中;
- 根据客户端信息拿到对应的 Client 对象(这个客户端是第一次连接就新建一个 Client), Client 中保存了 ClientID, 这个客户端发布了哪些实例, 这个客户端订阅了哪些实例;
- 将发布实例的信息封装成 PublishInfo, 保存到 Client 中;
- 更新 Client 的心跳时间;
- 通过事件发布订阅机制, 进行集群结点之间的信息同步, AP模式;
- 将发布信息, 保存到 ClientServiceIndexedManager
- 将最新的实例信息, 推送给所有订阅了该服务的客户端;
客户端服务发现
1.X
Nacos 1.x服务发现| ProcessOn免费在线作图,在线流程图,在线思维导图
2.X
Nacos 2.x服务发现与订阅及通知客户端| ProcessOn免费在线作图,在线流程图,在线思维导图
发现与订阅机制
-
通过 OpenFeign 也好, Ribbon 也好, NacosDiscoveryClient 也好, 拉取服务的实例信息时, 都会调用
NacosNamingService
的selectInstances
方法, 并且默认将subscribe
参数设置为 true; -
NacosNamingService
的selectInstances
方法用来获取一个服务下的所有实例, 该方法有subcribe
参数, 如果设置为true
, 表示我要订阅而不是仅仅查询一次服务注册表;subscribe = false
表示只是查询一次, 实现上也是只发送一个 gRPC Request, 实现了单次的拉取; -
回到
subcribe
参数设置为true
的情况, 会尝试从本地缓存中获取 ServiceInfo , 如果有, 说明之前订阅过, 就直接取本地缓存中的要获取的服务的所有实例, 返回即可;本地缓存是一个
ServiceInfoHolder
, 结构如下ConcurrentMap<String, ServiceInfo> serviceInfoMap; ServiceInfo:: private List<Instance> hosts = new ArrayList<Instance>();
-
如果本地缓存
serviceInfoMap
中没有对应的ServiceInfo
, 说明是第一次尝试获取, 要向服务器发起订阅; -
发起订阅请求之前, 先创建一个周期任务, 用于定时向服务器拉取我这次要订阅的这个服务的实例信息;
所以服务订阅是Pull机制(除了实例变更时会Push)
周期任务做了什么:
从服务器拉取服务的实例信息; 保存在本地的 ServiceInfoHolder 中;
周期是多少:
初始延时是1S; 第一次执行以后:
有一个失败计数, 拉取失败计数值 + 1, 上限是6; 初始为0;
如果拉取成功, 会重置失败计数, 并且本次的延迟时间会设置为 6S;
如果拉取失败, 失败计数++, 本次延迟时间在( 1S << 失败计数, 60S)之间选小的; 2, 4, 8, 16, 32, 64;
例如当前失败了 6次, 失败计数为6, 那么延迟时间在 ( 1S << 6 = 64S , 60S) 之间选小的, 即60S;
-
然后通过
NamingGrpcClientProxy
向服务器发送订阅请求, 订阅服务, 通过响应获取要订阅的服务的最新实例信息, 保存到本地缓存ServiceInfoHolder
中; 这里会获取一次实例信息, 服务器还会推送一次, 双重保险;
总结下来是这么几点
- 无论是 DiscoverClient 还是 LoadBanlancer 还是 OpenFeign, 最终都是调用 NamingService 的 selectInstances 方法完成服务发现与订阅;
- 调用 selectInstances 方法时候, 默认 subscribe 参数为 true, 表示要订阅而非只拉取一次;
- selectInstances 首先检查本地缓存有没有, 如果有, 就直接返回
- 如果没有, 开启一个定时任务, 每 6s pull 一次实例信息;
- 通过 gRPC 发起订阅请求;
自动订阅
并没有在服务器启动的时候自动订阅服务; 因为我启动的时候也不知道你到底会用什么服务, 我怎么帮你订阅;
是在使用DiscoveryClient
的 getInstances
方法的时候去调用的NacosNamingService
的订阅方法, 才确定你要订阅什么服务;
服务端处理订阅请求
1.X
Nacos 1.x服务发现| ProcessOn免费在线作图,在线流程图,在线思维导图
2.X
Nacos 2.x服务发现与订阅及通知客户端| ProcessOn免费在线作图,在线流程图,在线思维导图
-
服务端对应的 gRPC 的 Handler处理订阅请求;
-
根据请求构造出订阅对应的 Service 对象, 并且把发起订阅的客户端封装成一个 Subscriber;
-
然后根据请求获取对应的 Client 对象, 将订阅信息到 Client 对象中; 在Client 中, 用一个Map 保存订阅关系, 被订阅的服务作为 key, Subscriber 作为 value ( 订阅者其实就是 Client 自己 );
public abstract class AbstractClient implements Client { protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1); @Override public boolean addServiceSubscriber(Service service, Subscriber subscriber) { if (null == subscribers.put(service, subscriber)) { MetricsMonitor.incrementSubscribeCount(); } return true; } }
-
更新这个 Client 的
LastUpdateTime
, 发布ClientSubscribeServiceEvent
服务订阅事件; -
这个事件将被
ClientServiceIndexesManager
处理, 将新增的订阅关系保存到自己的 Map 中;public class ClientServiceIndexesManager extends SmartSubscriber { private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>(); private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
-
ClientServiceIndexesManager
完成添加以后又发布一个ServiceSubscribedEvent
, 这个事件在NamingSubscriberServiceV2Impl
对象中被处理, 处理逻辑是创建一个延时任务, 延时 0.5S, 然后将被订阅的服务的实例信息推送给订阅方, 如果推送失败, 1S后再尝试;public class PushExecuteTask extends AbstractExecuteTask { public void run() { try { PushDataWrapper wrapper = generatePushData(); for (String each : getTargetClientIds()) { Client client = delayTaskEngine.getClientManager().getClient(each); if (null == client) { // means this client has disconnect continue; } Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service); delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper, new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll())); } } catch (Exception e) { Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e); delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L)); } } }
-
最后, 将被订阅的服务的最新的实例信息响应给客户端;
总结下来是这么几点
- 将订阅信息保存到 Client
- 更新 Client 心跳时间
- 将订阅信息保存到 ClientServiceIndexedManager
- 定时任务将被订阅的服务的实例信息推送给订阅者;
心跳与健康检查
临时节点才需要发送心跳;
1.X心跳发送
Nacos 1.x心跳机制与健康检查| ProcessOn免费在线作图,在线流程图,在线思维导图
一句话总结: NacosNamingService 在 registerInstance 方法中开启一个定时任务, 每 5S 发送一次心跳;
- 在 SpringBoot 启动进行服务注册时, 在
NamingSerice
的registerInstance
方法中: - 发起注册请求之前, 会先生成心跳信息
BeatInfo
, 封装当前服务的信息; - 然后在
BeatReactor
中通过线程池ScheduledThreadPoolExecutor
, 做一个周期任务BeatTask
每隔 5S 执行一次; - 在 BeatTask 这个定时任务里, 发送一个 POST 请求, 携带
BeatInfo
;
1.X健康检查
Nacos 1.x心跳机制与健康检查| ProcessOn免费在线作图,在线流程图,在线思维导图
一句话总结: 服务器端每个服务上有一个周期 5S 的定时任务, 超过15秒没有收到心跳, 设置为不健康状态, 能对运维起到警示作用; 超过30S没有收到, 删除对应实例;
-
发送心跳的请求到达
InstanceController
的beat
方法; -
在 beat 方法中, 尝试从缓存中获取这个服务的实例;
-
如果没获取到, 说明是没注册的实例, 收到了其心跳包, 那么会进行服务注册;
调用
ServiceManager
的registerInstance
, 服务端处理注册请求也是用的这个方法;在方法内, 尝试从缓存中获取这个实例对应的服务, 如果没有, 说明是这个服务的第一个实例, 那么创建这个服务对象, 并且在这个服务上去开启一个周期任务, 时间间隔为5S;
这个周期任务会遍历当前服务下的所有实例, 检查他们的 LastBeat 时间, 如果距离当前已经过去15S, 标记为不健康, 如果已经过去30S, 构造一个DELETE请求, 发给自己, 删除实例;
-
如果获取到了实例, 那么更新这个实例的最新心跳时间 (LastBeat) 为当前时间(服务器端的时间), 所以服务器和客户端的时钟要同步;
2.X心跳发送
Nacos 2.x心跳机制与健康检查| ProcessOn免费在线作图,在线流程图,在线思维导图
一句话总结: registerInstance 方法内需要创建 GrpcClientProxy 来发送gRPC请求, 在其构造方法中开启定时任务, 5S 发送一次心跳
-
在 2.X 版本中,
registerInstance
方法默认使用的 gRPC 的方式与 Nacos Server 通信, 通过 gRPC 长连接(GrpcSdkClient ), 发送一个 InstanceRequest, 注册当前服务; -
在
registerInstance
方法内部, 会创建一个NamingGrpcClientProxy
对象, 用这个对象去发送注册服务的请求;在这个对象的构造函数中, 调用
start()
方法来让这个NamingGrpcClientProxy
运行起来;方法内会用
ScheduledExecutorService
创建一个任务, 每 5S 发送一次心跳这个任务是个死循环, 通过调用一个阻塞队列的poll方法来实现周期性唤醒, poll方法的超时时间被设置为5S;
这是个很巧妙的设计:
这个阻塞队列的大小被设置为1; 那么一般情况下每隔5S这个任务被唤醒, 发送一个心跳请求;
但是这个任务不止做了发送心跳这一件事; 他还负责异步地切换Nacos服务器: 当客户端发现当前连接的 Nacos 服务器不可用或者响应异常时,需要切换到其他可用的 Nacos 服务器;
客户端发现 Nacos服务器不可用的时候, 就会往这个阻塞队列里放一个对象; 这样, 这个任务就会被立刻唤醒;
这可能导致两次唤醒之间的时间间隔小于 5S, 只需要记录上次发送的时间, 然后在发送心跳的逻辑上加个判断, 距离上次是否超过 5S, 就可以了;
2.X健康检查
2.X 版本服务器对心跳请求的处理很简单, 就是更新对应的客户端的活跃时间; 健康检查的重点是在周期任务;
Nacos 2.x心跳机制与健康检查| ProcessOn免费在线作图,在线流程图,在线思维导图
一句话总结: 服务器有定时任务 3S 执行一次, 超过 20S 没有心跳的 Client, 主动发送探查请求, 没有响应就删除
-
在服务器启动时, 在 ConnectionManager.start() 方法中, 创建一个周期任务, 这个周期任务每3S执行一次, 在周期任务中:
-
检查 Client 的上次活跃时间, 如果已经过去20S以上, 将会把对应的 Client 的 ID 放到过期集合中; 控制台里过期的会变红;
-
遍历过期集合中的 ClientId, 主动发送一个 ClientDetectionRequest;
-
如果有响应, 更新其活跃时间, 并将对应的 ClientId 放到一个成功集合中 ;
-
再次遍历过期集合, 删除其中不在成功集合中的 Client;
-
发布Disconnect事件, 调用所有订阅者的回调函数; 观察者模式;
public void notifyClientDisConnected(final Connection connection) { for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) clientConnectionEventListener.clientDisConnected(connection); }