在上篇文章中,我们主要聚焦于Nacos
服务注册在服务端grpc
设计层面的一些代码。本篇文章将深入探讨服务注册的相关逻辑,通过细致的分析来更全面地理解这一过程。
NamingGrpcClientProxy.registerService
我们从NamingGrpcClientProxy
的registerService
方法看起。不清楚为什么从这里看起的,请看上篇文章。
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
doRegisterService(serviceName, groupName, instance);
}
1.cacheInstanceForRedo
看下redoService.cacheInstanceForRedo(serviceName, groupName, instance);
的实现:
private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
synchronized (registeredInstances) {
registeredInstances.put(key, redoData);
}
}
可以看到这其实就是将当前服务实例放到一个map中,这个map的含义是:已注册实例。
2.doRegisterService
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
//构建请求对象request
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
requestToServer
很显然就是实际请求server端去创建客户端实例的代码,下一行的redoService.instanceRegistered(serviceName, groupName)
这里的redoService
是不是有些眼熟?上边cacheInstanceForRedo
做的事情是在一个map中添加当前客户端实例,instanceRegistered
是把这个实例标记为已注册(this.registered = true
)。
从一致性的角度来讲,requestToServer
如果出现了意外case,那么就一定会抛出异常,继而redoService.instanceRegistered
就不会执行,也就不会出现一致性问题。
OK,接下来我们只需要看requestToServer
就好。
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
try {
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
//使用rpcClient去请求服务端
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
if (responseClass.isAssignableFrom(response.getClass())) {
return (T) response;
}
NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
response.getClass().getName(), responseClass.getName());
} catch (NacosException e) {
throw e;
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
}
throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
我们着重看下请求服务端的代码rpcClient.request(request)
/**
* send request.
*
* @param request request.
* @return response from server.
*/
public Response request(Request request, long timeoutMills) throws NacosException {
//用来做请求重试
int retryTimes = 0;
Response response;
Throwable exceptionThrow = null;
long start = System.currentTimeMillis();
while (retryTimes < rpcClientConfig.retryTimes() && System.currentTimeMillis() < timeoutMills + start) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected, current status:" + rpcClientStatus.get());
}
//发起请求的代码
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
throw new NacosException(SERVER_ERROR, "Unknown Exception.");
}
if (response instanceof ErrorResponse) {
if (response.getErrorCode() == NacosException.UN_REGISTER) {
synchronized (this) {
waitReconnect = true;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
switchServerAsync();
}
}
}
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// return response.
lastActiveTimeStamp = System.currentTimeMillis();
return response;
} catch (Throwable e) {
if (waitReconnect) {
try {
// wait client to reconnect.
Thread.sleep(Math.min(100, timeoutMills / 3));
} catch (Exception exception) {
// Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER,
"Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes,
e.getMessage());
exceptionThrow = e;
}
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsyncOnRequestFail();
}
if (exceptionThrow != null) {
throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
: new NacosException(SERVER_ERROR, exceptionThrow);
} else {
throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
}
}
代码比较长,主要还是添加了重试逻辑。我们只要关注实际发起请求的代码this.currentConnection.request(request, timeoutMills)
currentConnection是怎么来的?
/**
* Start this client.
*/
public final void start() throws NacosException {
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
if (!success) {
return;
}
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
});
// connection event consumer.
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
// Do nothing
}
}
});
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Server healthy check fail, currentConnection = {}",
rpcClientConfig.name(), currentConnection.getConnectionId());
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
} else {
continue;
}
}
if (reconnectContext.serverInfo != null) {
// clear recommend server if server is not in server list.
boolean serverExist = false;
for (String server : getServerListFactory().getServerList()) {
ServerInfo serverInfo = resolveServerInfo(server);
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
break;
}
}
if (!serverExist) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Recommend server is not in server list, ignore recommend server {}",
rpcClientConfig.name(), reconnectContext.serverInfo.getAddress());
reconnectContext.serverInfo = null;
}
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});
// connect to server, try to connect to server sync retryTimes times, async starting if failed.
Connection connectToServer = null;
rpcClientStatus.set(RpcClientStatus.STARTING);
int startUpRetryTimes = rpcClientConfig.retryTimes();
while (startUpRetryTimes > 0 && connectToServer == null) {
try {
startUpRetryTimes--;
ServerInfo serverInfo = nextRpcServer();
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}",
rpcClientConfig.name(), serverInfo);
connectToServer = connectToServer(serverInfo);
} catch (Throwable e) {
LoggerUtils.printIfWarnEnabled(LOGGER,
"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);
}
}
if (connectToServer != null) {
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
rpcClientConfig.name(), connectToServer.serverInfo.getAddress(),
connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
} else {
switchServerAsync();
}
registerServerRequestHandler(new ConnectResetRequestHandler());
// register client detection request.
registerServerRequestHandler(request -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
});
}
观察到currentConnection
是在start方法中赋值的,继续追踪代码我们发现这一切还是源于NamingGrpcClientProxy
的构造方法中调用。
回到connectToServer(serverInfo)
方法,看一下是如何去拿到链接的。
//抽象方法,子类去实现
public abstract Connection connectToServer(ServerInfo serverInfo) throws Exception;
//GrpcClient的实现
@Override
public Connection connectToServer(ServerInfo serverInfo) {
try {
if (grpcExecutor == null) {
//创建一个线程池
this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());
}
int port = serverInfo.getServerPort() + rpcPortOffset();
//根据ip和端口创建ManagedChannel -> io.grpc中的类
ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel);
if (newChannelStubTemp != null) {
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);
return null;
}
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel(managedChannel);
//send a setup request.
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities);
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
//wait to register connection setup
Thread.sleep(100L);
return grpcConn;
}
return null;
} catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
}
return null;
}
总结
本篇算是填了第一篇客户端源码分析的坑,着重看了发起grpc
请求链路上的源码,整体上还是比较清晰易懂的。