上一篇讲到客户端发送请求到服务端进行服务注册,注册后,服务端会发出两个事件,第一个事件会触发另一个ServiceChangedEvent,这个事件被com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl#onEvent
监听,监听到后将该客户端最新的信息推送给订阅该客户端的其他客户端。
这个推送是怎么完成的?
首先,客户端必须告诉服务端它订阅了哪些客户端
试了下貌似是A首次调用B之后,之后A中定时执行的UpdateTask才会去拉取被调用的B服务的信息,意思是A调用一次B之后就认为订阅了这个服务。
比如我这里ruoyi system的定时拉取ruoyi file的信息
此时ruoyi file的端口是3000
此时如果我停掉ruoyi file 并更改下端口重启,ruoyi system下次拉取到的应该会变,但实际上是我一停止ruoyi file, ruoyi system就会收到来自服务端的通知,收到通知后,依然会有UpdateTask去拉取ruoyi file的信息
ruoyi file的端口改成9301之后重启,ruoyi system会再次收到来自服务端的推送
ruoyi system的下一次UpdateTask就会获取到ruoyi file新的端口
此时我如果再次将ruoyi file停掉,服务端应该会产生一个ServiceChangedEvent并且被com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl#onEvent监听到,
最终向订阅ruoyi file的其他客户端推送消息是在
com.alibaba.nacos.naming.push.v2.task.PushExecuteTask#run
@Override
public void run() {
try {
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
//或者需要推送给哪些客户端
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
//推送
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
com.alibaba.nacos.naming.push.v2.executor.PushExecutorRpcImpl#doPushWithCallback
com.alibaba.nacos.core.remote.RpcPushService#pushWithCallback
可以发现,不管被订阅者上线或者下线,都会给订阅者推送,区别就是request中的ServiceInfo的hosts中存的IP端口变了,我这只有一个实例,现在后hosts就是空的
被订阅者上线,通知订阅者,hosts属性中带上了IP和端口
总结
服务端接收到服务注册请求后,发布了ClientRegisterServiceEvent客户端注册事件
ClientRegisterServiceEvent事件被ClientServiceIndexesManager订阅后发布ServiceChangedEvent服务变更事件
ServiceChangedEvent被NamingSubscriberServiceV2Impl订阅并创建PushDelayTask并被PushExecuteTask执行,负责向订阅该服务的客户端发起推送serviceInfo请求
推送的请求被NamingPushRequestHandler处理并发布InstancesChangeEvent,最终回调AbstractEventListener
————————————————
版权声明:本文为CSDN博主「宽仔的代码之路」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/guntun8987/article/details/125570748