上篇简单看了下Nacos客户端在服务注册时做了什么。
本篇开始分析Nacos
在服务注册时,服务端的相关逻辑。
建议先阅读这篇文章:支持 gRPC 长链接,深度解读 Nacos 2.0 架构设计及新模型
回顾一下,上篇我们看了Nacos
在服务注册时,客户端的相关源码。Nacos2.X通过grpc
支持了长链接,那么客户端发起调用,肯定就有一个grpc的服务端在接收请求。那么就从这个grpc的相关代码看起~
grpc server
abstract class BaseRpcServer
是nacos-core
中一个抽象类,有一个@PostConstruct
修饰的start方法。
@PostConstruct
public void start() throws Exception {
String serverName = getClass().getSimpleName();
String tlsConfig = JacksonUtils.toJson(grpcServerConfig);
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {} and tls config:{}", serverName, getServicePort(), tlsConfig);
//启动grpc服务端
startServer();
Loggers.REMOTE.info("Nacos {} Rpc server started at port {} and tls config:{}", serverName, getServicePort(), tlsConfig);
//钩子函数:处理退出信号
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
try {
BaseRpcServer.this.stopServer();
Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
} catch (Exception e) {
Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
}
}));
}
这个startServer()
是一个抽象方法,我们看下其实现。
/**
* Start sever.
*
* @throws Exception exception throw if start server fail.
*/
public abstract void startServer() throws Exception;
追踪代码发现,这个方法是当前类BaseRpcServer
的子类BaseGrpcServer
实现的
public abstract class BaseGrpcServer extends BaseRpcServer
看下startServer()
的代码:
@Override
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
//注册服务
addServices(handlerRegistry, new GrpcConnectionInterceptor());
NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
if (grpcServerConfig.getEnableTls()) {
if (grpcServerConfig.getCompatibility()) {
builder.protocolNegotiator(new OptionalTlsProtocolNegotiator(getSslContextBuilder()));
} else {
builder.sslContext(getSslContextBuilder());
}
}
server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addTransportFilter(new AddressTransportFilter(connectionManager))
.keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
.permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS)
.build();
//启动grpc的server服务
server.start();
}
研读框架源码时,先不要陷入细节当中,第一遍梳理清楚整个框架的即可,关于grpc-server
就先看到这里。
上文中我们提到了抽象类BaseRpcServer
,简单分析下这个类。
BaseRpcServer
BaseRpcServer
和BaseGrpcServer
都是抽象类,GrpcClusterServer
和GrpcSdkServer
都是抽象实现类,并且这两个实现类都有@Service
注解标注,那么就意味着这两个类会被注册为spring bean 。
上文我们提过BaseRpcServer.start()
有一个@PostConstruct
注解,那么也就意味着具体调用时使用了GrpcClusterServer
和GrpcSdkServer
的任何一个类,都会去调用BaseRpcServer.start()
方法去启动grpc-server
。
GrpcClusterServer
和GrpcSdkServer
的区别
从名字上可以看出,一个是Cluster服务调用,一个是SDK调用。
那么客户端注册使用的是哪个Server?
BaseRpcServer
中定义了一个获取端口偏移量的方法:
/**
* the increase offset of nacos server port for rpc server port.
*
* @return delta port offset of main port.
*/
public abstract int rpcPortOffset();
GrpcSdkServer
对此给出的实现是返回一个常量定义:
public static final Integer SDK_GRPC_PORT_DEFAULT_OFFSET = 1000;
GrpcClusterServer
给出的常量中定义的端口是CLUSTER_GRPC_PORT_DEFAULT_OFFSET = 1001
即然定义了端口,那么这个常量在创建这两个server的时候肯定会用到,我们通过这个常量去寻找到调用方,自然而然也就找到了server创建的逻辑,进而找到这两个server使用上的区别。
通过常量Constants.SDK_GRPC_PORT_DEFAULT_OFFSET
发现一个新的类GrpcSdkClient
中rpcPortOffset()
方法使用了这个常量。
通过观察构造方法的调用,我们找到了RpcClient的创建工厂RpcClientFactory
//本地缓存,存储client,key为clientName
private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
//如果当前clientName不存在,那么执行GrpcSdkClient的创建逻辑
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
try {
return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
}
});
}
那么再看下上边源码中createClient
的调用方是谁,查看代码得知分别有两个调用方,ClientWorker
和NamingGrpcClientProxy
,第二个调用方是不是有点熟悉?
对的,我们在上篇文章看客户端注册服务的代码时,看到过如下代码:
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
private NamingClientProxy getExecuteClientProxy(Instance instance) {
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
那么这个grpcClientProxy
是哪一个实现呢?
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
clientProxy.registerService(serviceName, groupName, instance);
}
再次观察创建服务实例的代码,我们可以看到在实际的注册服务时,使用了一个客户端代理clientProxy
来做处理,我们来看下这个代理的是怎么创建的。
private NamingClientProxy clientProxy;
private void init(Properties properties) throws NacosException {
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
ValidatorUtils.checkInitParam(nacosClientProperties);
this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(nacosClientProperties);
initLogName(nacosClientProperties);
this.notifierEventScope = UUID.randomUUID().toString();
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
//一目了然
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
}
this.clientProxy = new NamingClientProxyDelegate
,可以看到这个代理实际上是一个创建了一个委托类,那继续看下这个委托类的构造方法。
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,
InstancesChangeNotifier changeNotifier) throws NacosException {
this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
changeNotifier);
this.serverListManager = new ServerListManager(properties, namespace);
this.serviceInfoHolder = serviceInfoHolder;
this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(),
NamingHttpClientManager.getInstance().getNacosRestTemplate());
initSecurityProxy(properties);
this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
//终于,我们找到了NamingGrpcClientProxy
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
回过头来再次看NacosNamingServie
的registerInstance
方法。
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
clientProxy.registerService(serviceName, groupName, instance);
}
我们已经知道了clientProxy
是NamingClientProxyDelegate
,那么就看下它是如何实现registerService
即可。
别着急,谜底马上揭开😎
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
private NamingClientProxy getExecuteClientProxy(Instance instance) {
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
instance.
封装了客户端要注册的服务,ephemeral
默认为true
。所以默认会选择grpcClientProxy
去注册服务。
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
回答我们自己提出的问题:GrpcClusterServer
和GrpcSdkServer
的区别?
后者是客户端注册使用的,那么前者肯定就是服务内部调用使用的了。
总结
梳理关键类和方法
- 客户端
- NacosDiscoveryAutoRegister
- 是一个
ApplicationListener
,监听WebServerInitializedEvent
事件后,注册当前实例
- 是一个
- NacosNamingService
- private void init(Properties properties) throws NacosException
- 初始化时,定义了创建客户端的代理委托类
clientProxy
为NamingClientProxyDelegate
- 初始化时,定义了创建客户端的代理委托类
- registerInstance
- 使用
clientProxy
注册服务
- 使用
- private void init(Properties properties) throws NacosException
- NamingClientProxyDelegate
- 构造方法注入:
grpcClientProxy = new NamingGrpcClientProxy
instance.isEphemeral() ? grpcClientProxy : httpClientProxy
- 构造方法注入:
- NamingGrpcClientProxy
registerService
requestToServer
this.rpcClient.request(request)
这里的rpcClient其实就是GrpcSdkClient
- NacosDiscoveryAutoRegister
- 服务端
- RpcClientFactory
createClient
:返回GrpcSdkClient
- GrpcSdkClient
- 未完待续…
- RpcClientFactory
本篇很大篇幅上讲了服务注册时,服务端grpc-server
相关的一些逻辑。从中可以看到服务端使用了很多委托类、代理类来抽象、封装相关业务逻辑,所以刚开始看框架源码如果一头雾水的时候,不要着急。抓大放小,先建立整体认知后,再回过头来深入细节。
下篇从源码上深入服务端在注册服务时的业务细节。
下班!🕶️