文章目录
- 前言
- 一、 前奏:
- 二、客户端连接的建立:
- 2.1 NacosNamingService 创建:
- 2.2 NacosNamingService 初始化:
- 2.3 NamingClientProxyDelegate 长连接建立:
- 2.3.1 grpc 代理对象创建:
- 2.3.2 NamingGrpcClientProxy grpc:
- 2.3.2.1 createClient 客户端的创建:
- 2.3.2.2 start 长连接建立:
- 三、客户端实例的注册:
- 3.1 NamingGrpcClientProxy# registerService:
- 3.2 客户端发送注册请求:
- 总结
前言
本文对Nacos 客户端启动时,同服务端建立长连接的过程进行介绍。环境:客户端版本2.2.1,服务端版本 3.0.13;
一、 前奏:
实际客户端同服务端进行grpc 通道的建立,是在客户端实例注册过程中进行的,因为注册肯定要向服务端发送请求,所以要先通过grpc 完成通道的建立 ;一下对客户端实例的注册流程进行简单介绍。
流程图:
流程步骤解释:
- 客户端所在web 应用启动完成,发布 WebServiceInitializedEvent 事件;
- AbstractAutoServiceRegistration ,onApplicationEvent 方法接收事件 并调用start 方法;
public void onApplicationEvent(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
// 本机web 端口
this.port.compareAndSet(0, event.getWebServer().getPort());
// 初始化方法调用
this.start();
}
}
start() 方法:
public void start() {
if (!this.isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
} else {
if (!this.running.get()) {
// 发布 实例注册 前事件
this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
this.registrationLifecycles.forEach((registrationLifecycle) -> {
registrationLifecycle.postProcessBeforeStartRegister(this.getRegistration());
});
// 实例注册方法调用
this.register();
this.registrationLifecycles.forEach((registrationLifecycle) -> {
registrationLifecycle.postProcessAfterStartRegister(this.getRegistration());
});
if (this.shouldRegisterManagement()) {
this.registrationManagementLifecycles.forEach((registrationManagementLifecycle) -> {
registrationManagementLifecycle.postProcessBeforeStartRegisterManagement(this.getManagementRegistration());
});
this.registerManagement();
this.registrationManagementLifecycles.forEach((registrationManagementLifecycle) -> {
registrationManagementLifecycle.postProcessAfterStartRegisterManagement(this.getManagementRegistration());
});
}
// 实例注册完成事件
this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
this.running.compareAndSet(false, true);
}
}
}
- AbstractAutoServiceRegistration ,register() 方法 调用到 NacosServiceRegistry 的 register 方法;
- namingService.registerInstance 进行客户端的注册;
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
} else {
// 客户端长连接的建立
NamingService namingService = this.namingService();
String serviceId = registration.getServiceId();
String group = this.nacosDiscoveryProperties.getGroup();
// 构建客户端实例对象
Instance instance = this.getNacosInstanceFromRegistration(registration);
try {
// grpc 发送 客户端实例注册请求
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
} catch (Exception var7) {
if (this.nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
ReflectionUtils.rethrowRuntimeException(var7);
} else {
log.warn("Failfast is false. {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
}
}
}
}
二、客户端连接的建立:
NamingService namingService = this.namingService();
这行代码做了很多事情,其中需要重点关注的时 客户端与服务端连接的建立,以及客户端的故障转移机制,下文先对连接的建立进行介绍;
2.1 NacosNamingService 创建:
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
// 调用 NacosNamingService 的构造方法,传入配置参数
Constructor constructor = driverImplClass.getConstructor(Properties.class);
return (NamingService) constructor.newInstance(properties);
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
2.2 NacosNamingService 初始化:
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
private void init(Properties properties) throws NacosException {
PreInitUtils.asyncPreLoadCostComponent();
// 自定义属性
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
ValidatorUtils.checkInitParam(nacosClientProperties);
// 命名空间
this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(nacosClientProperties);
// 日志名称属性设置
initLogName(nacosClientProperties);
this.notifierEventScope = UUID.randomUUID().toString();
// InstancesChangeNotifier extends Subscriber<InstancesChangeEvent>
// 订阅者,订阅InstancesChangeEvent 实例变更事件 ,出现变更调用InstancesChangeNotifier onchange 方法
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
// 注册事件发布器
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
// 注册订阅者
NotifyCenter.registerSubscriber(changeNotifier);
// 服务信息获取(故障转移)
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
// 客户端代理
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties,
changeNotifier);
}
2.3 NamingClientProxyDelegate 长连接建立:
2.3.1 grpc 代理对象创建:
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder,
NacosClientProperties properties, InstancesChangeNotifier changeNotifier) throws NacosException {
// 服务更新
this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
changeNotifier);
// 服务端地址管理器
this.serverListManager = new ServerListManager(properties, namespace);
this.serviceInfoHolder = serviceInfoHolder;
this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(),
NamingHttpClientManager.getInstance().getNacosRestTemplate());
initSecurityProxy(properties);
// http 代理
this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
// grpc 代理
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
2.3.2 NamingGrpcClientProxy grpc:
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
super(securityProxy);
this.namespaceId = namespaceId;
this.uuid = UUID.randomUUID().toString();
// 请求超时时间
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<>();
// 资源是sdk
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
// 模式是注册
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
// rpc 客户端的创建
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
RpcClientTlsConfig.properties(properties.asProperties()));
this.redoService = new NamingGrpcRedoService(this, properties);
NAMING_LOGGER.info("Create naming rpc client for uuid->{}", uuid);
start(serverListFactory, serviceInfoHolder);
}
2.3.2.1 createClient 客户端的创建:
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
// 不是grpc 抛出异常
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
// 客户端创建 Map<String, RpcClient> CLIENT_MAP
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
});
}
2.3.2.2 start 长连接建立:
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
// 服务地址的工厂
rpcClient.serverListFactory(serverListFactory);
// 监听器放入
rpcClient.registerConnectionListener(redoService);
// 请求处理器
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
// 客户端启动
rpcClient.start();
NotifyCenter.registerSubscriber(this);
}
rpcClient.start():注意做了3件事 (具体的实现细节在后续文章进行介绍)
-
客户端与服务端的通道建立:
1)和nacos 服务端建立通信的channel 管道;建立双向流的grpc 通信存根;
2)发送服务检查请求,从nacos 服务端获取到连接的connectId ;
3) 发送给服务端客户端和服务端完成连接建立的请求; -
客户端与服务端的心跳监测:
1) 在while(true) 循环中,发送healthCheck() 请求,得到true 则保持心跳(继续下一次循环),false 则失去心跳;
2)如果失去心跳,则将客户端从健康状态标记为不健康状态;
3)通过reconnect 方法尝试与nacos 服务端重新建立通信连接; -
客户端与服务端的断线重连:
- 通过 connectToServer 尝试与nacos 服务端重新建立通信连接;
2)建立成功,则将原有的连接置为不可用,并关闭原有连接,释放资源;发布新的连接建立事件到 eventLinkedBlockingQueue 队列中;
3)如果建立不成功则进行增大通nacos 服务端建立连接请求的时间间隔;
- 通过 connectToServer 尝试与nacos 服务端重新建立通信连接;
/**
* Start this client.
*/
public final void start() throws NacosException {
// cas 状态转换: 乐观锁实现
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
if (!success) {
return;
}
// 创建 clientEventExecutor 线程池,池子中设置了2个线程
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
});
// connection event consumer.
// 线程池提交任务: 客户端和服务端 连接重置;当nacos 服务端重启,客户端在心跳监测
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
// Do nothing
}
}
});
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
// reconnectionSignal 重连接队列
ReconnectContext reconnectContext = reconnectionSignal
.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// 重连接队列是null 则表示 客户端与服务端没有发生断线重连的情况
// check alive time. 超过心跳的间隔时间,则重新发送healthCheck 监控检查请求
if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
boolean isHealthy = healthCheck();
if (!isHealthy) {
// 如果 健康检测失败
if (currentConnection == null) {
continue;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Server healthy check fail, currentConnection = {}",
rpcClientConfig.name(), currentConnection.getConnectionId());
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
// 标记客户端为 UNHEALTHY
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
// 服务端有可能发生了故障,则将服务端信息 ServerInfo 置为null
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
} else {
continue;
}
}
if (reconnectContext.serverInfo != null) {
// 发送连接重置时,检查 nacos 服务端的ip 和端口
// clear recommend server if server is not in server list.
boolean serverExist = false;
for (String server : getServerListFactory().getServerList()) {
ServerInfo serverInfo = resolveServerInfo(server);
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
break;
}
}
if (!serverExist) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Recommend server is not in server list, ignore recommend server {}",
rpcClientConfig.name(), reconnectContext.serverInfo.getAddress());
reconnectContext.serverInfo = null;
}
}
// 发送重新连接服务端的请求
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});
// connect to server, try to connect to server sync retryTimes times, async starting if failed.
// 客户端启动时 第一次进行同nacos 服务端的连接建立
Connection connectToServer = null;
rpcClientStatus.set(RpcClientStatus.STARTING);
int startUpRetryTimes = rpcClientConfig.retryTimes();
// 重试次数判断
while (startUpRetryTimes > 0 && connectToServer == null) {
try {
startUpRetryTimes--;
// 随机获取一个nacos 服务端的地址
ServerInfo serverInfo = nextRpcServer();
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}",
rpcClientConfig.name(), serverInfo);
// 服务端的连接
connectToServer = connectToServer(serverInfo);
} catch (Throwable e) {
LoggerUtils.printIfWarnEnabled(LOGGER,
"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);
}
}
if (connectToServer != null) {
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
rpcClientConfig.name(), connectToServer.serverInfo.getAddress(),
connectToServer.getConnectionId());
// 连接建立成功,则将连接成功时间放入到 eventLinkedBlockingQueue 队列中进行消费
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
} else {
// 连接失败则 将失败时间放入到reconnectionSignal 队列中,消费改队列时 进入重连的逻辑
switchServerAsync();
}
// 注册连接重置 处理器
registerServerRequestHandler(new ConnectResetRequestHandler());
// register client detection request.
registerServerRequestHandler(request -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
});
}
三、客户端实例的注册:
在完成与服务端的通信channel 建立之后,就可以通过 namingService.registerInstance(serviceId, group, instance) 进行nacos 客户端实例的注册;
3.1 NamingGrpcClientProxy# registerService:
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
// ConcurrentMap<String, InstanceRedoData> registeredInstance map 中放入实例信息 key:分组名@@服务名
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
// 向nacos 服务端发送注册请求,然后修改 InstanceRedoData 的实例信息为注册成功状态
doRegisterService(serviceName, groupName, instance);
}
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
// key:分组名@@服务名
String key = NamingUtils.getGroupedName(serviceName, groupName);
// 客户端服务对象创建,然后放入到 registeredInstance map 缓存(注册状态是未注册)
InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
synchronized (registeredInstances) {
registeredInstances.put(key, redoData);
}
}
3.2 客户端发送注册请求:
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
// 发送注册请求到服务端
requestToServer(request, Response.class);
// 请求发送成功 ,将当前服务实例的注册状态改为已注册
redoService.instanceRegistered(serviceName, groupName);
}
requestToServer(request, Response.class);
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
try {
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
// 通过 rpcClient 获取通道 发送 InstanceRequest 类型的 request 请求
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
if (responseClass.isAssignableFrom(response.getClass())) {
return (T) response;
}
NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
response.getClass().getName(), responseClass.getName());
} catch (NacosException e) {
throw e;
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
}
throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
总结
客户端在启动成功之后发布 WebServiceInitializedEvent 事件,nacos 客户端同服务端创建通信通道,发送nacos 服务端的服务检查请求,正常返回后得到通道的id,创建双向流 grpc 的通信存根,发送连接确定建立的请求后;发起客户端实例的注册请求到nacos 服务端进行注册。