nacos源码分析
- 一、环境构建
- 二、源码分析
- 1. 启动类
- 2. 源码的中的案例
- 3.. 服务订阅流程梳理
- 3.1. 从 NamingFactory.createNamingService(properties);说起
- 3.2. 服务订阅总结
- 4. 服务注册流程梳理
- 4.1. 从 naming.registerInstance("nacos.test.3", "11.11.11.11", 8888);说起
- 4.2. 服务注册流程总结
一、环境构建
源码需要 jdk1.8 进行构建
- 代码下载地址:https://github.com/alibaba/nacos/tree/develop
- git 下载:git clone https://github.com/alibaba/nacos.git
- 找到 console --> src --> main --> resources --> META-INF 下的derby-schema.sql 导入到本地数据库
- 修改 console --> src --> main --> resources --> 下的application.properties 配置文件
- 先在 idea 安装 Protobuf 插件,安装成功后将consistency工程进行编译 mvn compile
- 将所有的工程进行编译
- 配置 vm 启动参数 -Dnacos.standalone=true
二、源码分析
客户端与注册中心服务端的交互,主要集中在服务注册、服务下线、服务发现、订阅某个服务,其实使用最多的就是服务注册和服务发现
1. 启动类
启动类可以看到@EnableScheduling 注解,说明有很多东西都是由定时任务完成的
@SpringBootApplication
@ComponentScan(basePackages = "com.alibaba.nacos", excludeFilters = {
@Filter(type = FilterType.CUSTOM, classes = {NacosTypeExcludeFilter.class}),
@Filter(type = FilterType.CUSTOM, classes = {TypeExcludeFilter.class}),
@Filter(type = FilterType.CUSTOM, classes = {AutoConfigurationExcludeFilter.class})})
@ServletComponentScan
@EnableScheduling
public class Nacos {
public static void main(String[] args) {
SpringApplication.run(Nacos.class, args);
}
}
2. 源码的中的案例
- 找到 nacos-example 工程
- 查看 App.class 文件
public class App {
public static void main(String[] args) throws NacosException {
Properties properties = new Properties();
properties.setProperty("serverAddr", "localhost:8848");
properties.setProperty("namespace", "quickStart");
NamingService naming = NamingFactory.createNamingService(properties);
// 服务注册
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
// 服务发现
System.out.println("[Instances after register] " + naming.getAllInstances("nacos.test.3", Lists.newArrayList("TEST1")));
System.out.println("[Instances after register] " + naming.getAllInstances("nacos.test.3", Lists.newArrayList("DEFAULT")));
}
}
- 将 main 函数进行修改
public static void main(String[] args) throws NacosException {
Properties properties = new Properties();
// nacos的ip地址
properties.setProperty("serverAddr", "localhost:8848");
// 命名空间
properties.setProperty("namespace", "Test_study");
// 用户名
properties.setProperty("username", "nacos");
// 密码
properties.setProperty("password", "nacos");
// 通过工厂模式根据传入的properties属性创建NamingService实例的方法
NamingService naming = NamingFactory.createNamingService(properties);
// 服务注册
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888);
}
- 查看 NamingExample.class 文件
public class NamingExample {
private static final String INSTANCE_SERVICE_NAME = "nacos.test.3";
private static final String INSTANCE_IP = "11.11.11.11";
private static final int INSTANCE_PORT = 8888;
private static final String INSTANCE_CLUSTER_NAME = "TEST1";
public static void main(String[] args) throws NacosException, InterruptedException {
Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr", "localhost"));
properties.setProperty("namespace", System.getProperty("namespace", "public"));
NamingService naming = NamingFactory.createNamingService(properties);
// 服务注册
naming.registerInstance(INSTANCE_SERVICE_NAME, INSTANCE_IP, INSTANCE_PORT, INSTANCE_CLUSTER_NAME);
// 服务订阅
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("test-thread");
return thread;
});
naming.subscribe(INSTANCE_SERVICE_NAME, new AbstractEventListener() {
//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void onEvent(Event event) {
System.out.println("[serviceName] " + ((NamingEvent) event).getServiceName());
System.out.println("[instances from event] " + ((NamingEvent) event).getInstances());
}
});
Thread.sleep(1000);
System.out.println("[instances after register] " + naming.getAllInstances(INSTANCE_SERVICE_NAME));
Thread.sleep(1000);
// 服务下线
naming.deregisterInstance(INSTANCE_SERVICE_NAME, INSTANCE_IP, INSTANCE_PORT, INSTANCE_CLUSTER_NAME);
Thread.sleep(1000);
System.out.println("[instances after deregister] " + naming.getAllInstances(INSTANCE_SERVICE_NAME));
Thread.sleep(1000);
}
}
3… 服务订阅流程梳理
3.1. 从 NamingFactory.createNamingService(properties);说起
- 打断点启动
- f7 进入NamingFactory.createNamingService(properties)方法
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
// 通过反射获取NacosNamingService类
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
// 通过getConstructor()方法获取该类带有Properties参数的构造函数对象
Constructor constructor = driverImplClass.getConstructor(Properties.class);
// 通过newInstance()方法使用指定的参数实例化该类的对象
return (NamingService) constructor.newInstance(properties);
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
- 进入导航 NacosNamingService 类,可以看到构造方法
- 构造方法打断点,重新启动
- f7 进入init(properties)方法
private void init(Properties properties) throws NacosException {
// 可忽略:异步加载 createEmptyJsonNode()和SpasAdapter.getAk() 组件
PreInitUtils.asyncPreLoadCostComponent();
// 可忽略:获取配置Nacos的相关参数如:地址、端口、命名空间等
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
// 可忽略:打印日志
NAMING_LOGGER.info(ParamUtil.getInputParameters(nacosClientProperties.asProperties()));
// 可忽略:检查必要配置的属性
ValidatorUtils.checkInitParam(nacosClientProperties);
// 可忽略:获取命名空间
this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);
// 可忽略:序列化
InitUtils.initSerialization();
// 可忽略:初始化上下文
InitUtils.initWebRootContext(nacosClientProperties);
// 可忽略:初始化日志
initLogName(nacosClientProperties);
// 可忽略:获取一个uuid
this.notifierEventScope = UUID.randomUUID().toString();
// 可忽略:给监听对象赋值
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
// 注册一个事件监听器,并且指定该监听器的事件队列容量为16384
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
// 通知中心注册一个订阅者对象,以便接收通知。当有通知发布时,订阅者会收到通知并执行相应操作
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
// -----------------------------主要看这个-----------------------------------
// Nacos服务器进行通信,实现服务的注册与发现等功能
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
}
- f7 进入 NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);方法
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder,
NacosClientProperties properties, InstancesChangeNotifier changeNotifier) throws NacosException {
// 在需要时可以更新服务信息
this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
changeNotifier);
// 可以获取服务器列表、监控服务器状态、更新服务器列表等功能
this.serverListManager = new ServerListManager(properties, namespace);
this.serviceInfoHolder = serviceInfoHolder;
// 用于处理安全性相关的操作。SecurityProxy类可能是用于验证用户身份、管理权限等安全功能
this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(),
NamingHttpClientManager.getInstance().getNacosRestTemplate());
// 该函数用于初始化安全代理
initSecurityProxy(properties);
// 初始化一个客户端代理对象,用于与命名服务进行交互
this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
- f7 进入 new ServiceInfoUpdateService(properties, serviceInfoHolder, this, changeNotifier);方法
public ServiceInfoUpdateService(NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder,
NamingClientProxy namingClientProxy, InstancesChangeNotifier changeNotifier) {
this.asyncQuerySubscribeService = isAsyncQueryForSubscribeService(properties);
// 这创建一个定时任务的线程池
this.executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties),
new NameThreadFactory("com.alibaba.nacos.client.naming.updater"));
this.serviceInfoHolder = serviceInfoHolder;
this.namingClientProxy = namingClientProxy;
this.changeNotifier = changeNotifier;
}
- 既然是有线程池,那么当前类肯定有 run()方法,下下找
- 主要的代码解释
- this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters); -----> 获取服务唯一的 key
- ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); -----> 通过 key 获取服务的信息
- serviceInfoHolder.processServiceInfo(serviceObj); -----> 反序列化操作
- 查看processServiceInfo(ServiceInfo serviceInfo)方法
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
// 获取服务的key
String serviceKey = serviceInfo.getKey();
// 如果为空就返回null
if (serviceKey == null) {
NAMING_LOGGER.warn("process service info but serviceKey is null, service host: {}",
JacksonUtils.toJson(serviceInfo.getHosts()));
return null;
}
// 通过key获取jvm缓存的服务信息
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
// 判断serviceInfo是否为空或错误推送,错误就返回旧服务
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
NAMING_LOGGER.warn("process service info but found empty or error push, serviceKey: {}, "
+ "pushEmptyProtection: {}, hosts: {}", serviceKey, pushEmptyProtection, serviceInfo.getHosts());
return oldService;
}
// 写入jvm缓存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 计算两个服务实例的差异
InstancesDiff diff = getServiceInfoDiff(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
// 将本地缓存的大小设置给监控指标对象
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (diff.hasDifferent()) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
if (!failoverReactor.isFailoverSwitch(serviceKey)) {
NotifyCenter.publishEvent(
new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts(), diff));
}
// 将jvm缓存中的服务信息,写入到本地缓存中
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
3.2. 服务订阅总结
- 创建NacosNamingService 对象
- NamingFactory.createNamingService(); -----> 创建NacosNamingService
- 通过反射 NacosNamingService 实例,通过构造函数执行 init()方法
- init()方法
- NamingClientProxyDelegate() -----> 实现服务的注册与发现等功能
- 创建 ServiceInfoUpdateService()对象 -----> 定时拉取服务列表数据
- run()方法
- serviceInfoHolder.getServiceInfoMap().get(serviceKey); -----> 获取服务列表数据
- serviceInfoHolder.processServiceInfo(serviceObj); -----> 反序列化
- serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); -----> 服务数据存储到serviceInfoMap 中服务的数据变更,会将数据拉取到serviceInfoMap
- run()方法
- 创建 ServiceInfoUpdateService()对象 -----> 定时拉取服务列表数据
- NamingClientProxyDelegate() -----> 实现服务的注册与发现等功能
4. 服务注册流程梳理
4.1. 从 naming.registerInstance(“nacos.test.3”, “11.11.11.11”, 8888);说起
- 打断点启动
- f7 进入registerInstance()方法,再次 按 f7 直到进入到下面方法
/**
* 服务注册
* @param serviceName 服务名称
* @param groupName 分组名称
* @param ip 服务ip
* @param port 服务端口
* @param clusterName 集群名称
* @throws NacosException
*/
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
// nacos的配置信息对象
Instance instance = new Instance();
// ip
instance.setIp(ip);
// 端口
instance.setPort(port);
// 权重
instance.setWeight(1.0);
// 集群名称
instance.setClusterName(clusterName);
// 注册方法 ----- 主要的方法
registerInstance(serviceName, groupName, instance);
}
- 打开Instance 类查看配置类里有哪些属性
@JsonInclude(Include.NON_NULL)
public class Instance implements Serializable {
/**
* 此实例的唯一id
*/
private String instanceId;
/**
* ip
*/
private String ip;
/**
* 端口
*/
private int port;
/**
* 权重.
*/
private double weight = 1.0D;
/**
* 健康状态.
*/
private boolean healthy = true;
/**
* 启用以接受请求
*/
private boolean enabled = true;
/**
* 临时.
*
* @since 1.0.0
*/
private boolean ephemeral = true;
/**
* 集群名字.
*/
private String clusterName;
/**
* 服务名称.
*/
private String serviceName;
/**
* user extended attributes.
*/
private Map<String, String> metadata = new HashMap<>();
..... get/set
... toString
.....
}
- f7 进入registerInstance(serviceName, groupName, instance);方法
/**
* 注册
* @param serviceName 服务名称
* @param groupName 组名称
* @param instance 配置信息
* @throws NacosException
*/
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 校验服务的合法性
NamingUtils.checkInstanceIsLegal(instance);
// 检查并移除服务实例的分组名称前缀
checkAndStripGroupNamePrefix(instance, groupName);
// 将服务实例注册到Nacos注册中心 ----- 主要的方法
clientProxy.registerService(serviceName, groupName, instance);
}
- f7 进入clientProxy.registerService(serviceName, groupName, instance)方法
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
- f7 进入registerService(serviceName, groupName, instance);方法
- registerService 是一个接口,实现类有 4 个,nacos2.x 的默认实现类为 grpc 所以最终进入的是NamingGrpcClientProxy 类
/**
* 校验是否为临时服务
* @param serviceName 服务名称
* @param groupName 服务组名
* @param instance 配置信息
* @throws NacosException
*/
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
if (instance.isEphemeral()) {
// 默认会走这里 原因:Instance类中 ephemeral属性为true
registerServiceForEphemeral(serviceName, groupName, instance);
} else {
doRegisterServiceForPersistent(serviceName, groupName, instance);
}
}
- f7 进入registerServiceForEphemeral(serviceName, groupName, instance);方法
private void registerServiceForEphemeral(String serviceName, String groupName, Instance instance)
throws NacosException {
// 缓存配置信息 就是一个map集合,还有一个定时任务的心跳机制
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
// 服务注册
doRegisterService(serviceName, groupName, instance);
}
redoService.cacheInstanceForRedo(serviceName, groupName, instance); 的 的心跳机制
public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy, NacosClientProperties properties) {
setProperties(properties);
// 定时任务的线程池
this.redoExecutor = new ScheduledThreadPoolExecutor(redoThreadCount, new NameThreadFactory(REDO_THREAD_NAME));
this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), redoDelayTime, redoDelayTime,
TimeUnit.MILLISECONDS);
}
- f7 进入doRegisterService(serviceName, groupName, instance)方法
/**
* 将提供的服务实例注册到指定的服务和分组中
*
* @param serviceName 服务名
* @param groupName 组名
* @param instance 配置信息
* @throws NacosException nacos exception
*/
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
// 请求的一些属性
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
// 将请求发送到服务器,并获取响应对象
requestToServer(request, Response.class);
// 通知系统服务实例已成功注册 // 就是改了一个标志
redoService.instanceRegistered(serviceName, groupName);
}
redoService.instanceRegistered(serviceName, groupName);
/**
* 如果{@code true}表示缓存的数据已经成功注册到服务器。
*/
private volatile boolean registered;
/**
* 如果{@code true}表示缓存的数据正在从服务器注销。
*/
private volatile boolean unregistering;
public void registered() {
this.registered = true;
this.unregistering = false;
}
- 进入requestToServer(request, Response.class);方法
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
Response response = null;
try {
// 添加请求头
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
// 发送请求
response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
// .... 根据类型抛异常
...
} catch (NacosException e) {
// 抛异常
}
}
- f7 进入rpcClient.request(request)方法,再次按 f7 进入public Response request(Request request, long timeoutMills)
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 0;
Response response;
Throwable exceptionThrow = null;
long start = System.currentTimeMillis();
while (retryTimes <= rpcClientConfig.retryTimes() && (timeoutMills <= 0
|| System.currentTimeMillis() < timeoutMills + start)) {
boolean waitReconnect = false;
try {
... 异常校验
response = this.currentConnection.request(request, timeoutMills);
... 异常校验
// return response.
lastActiveTimeStamp = System.currentTimeMillis();
// 返回响应数据
return response;
} catch (Throwable e) {
... 处理异常
}
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
// 用于在请求失败时切换服务器
switchServerAsyncOnRequestFail();
}
... 异常校验
}
- f7 进入this.currentConnection.request(request, timeoutMills);方法
@Override
public Response request(Request request, long timeouts) throws NacosException {
// 转grpc类型
Payload grpcRequest = GrpcUtils.convert(request);
// 异步请求
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
Payload grpcResponse;
try {
// 获取返回的数据
if (timeouts <= 0) {
grpcResponse = requestFuture.get();
} else {
grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
// 再次转换回来,并且强制转为Response
return (Response) GrpcUtils.parse(grpcResponse);
}
4.2. 服务注册流程总结
- 创建NacosNamingService 对象
- clientProxy.registerService(serviceName, groupName, instance); —> grpc 协议进行注册
- registerServiceForEphemeral(serviceName, groupName, instance); ----> 临时注册
- redoService.cacheInstanceForRedo(serviceName, groupName, instance); —> 缓存 + 心跳
- doRegisterService(serviceName, groupName, instance); ----> 远程调用
- registerServiceForEphemeral(serviceName, groupName, instance); ----> 临时注册
- clientProxy.registerService(serviceName, groupName, instance); —> grpc 协议进行注册
- doRegisterService(serviceName, groupName, instance); ----> 远程调用
- requestToServer(request, Response.class);
- rpcClient.request(request)
- while (retryTimes <= rpcClientConfig.retryTimes() && (timeoutMills <= 0 || System.currentTimeMillis() < timeoutMills + start)) 发送请求条件
- this.currentConnection.request(request, timeoutMills); —> 发送注册请求
- requestToServer(request, Response.class);