服务注册就是在微服务启动时自动注册进nacos注册中心,核心逻辑就是在启动时调用nacos-server端的http接口:/nacos/v1/ns/instance,具体参考nacos官方文档。
我们打开nacos源码结构查看
上图为Nacos2.2的源码结构.其中比较核心的就是这几个包:
nacos-api: 核心api,主要提供一些核心的接口和类,如ConfigService、ConfigFactory、Listener等;
nacos-client: Nacos客户端与服务端交互的包;
nacos-common: 通用包,提供一些通用的常量类、工具类、数据模型以及一些核心的接口等等;
nacos-config: 配置管理的服务端包,采用SpringMVC对外提供配置发布、获取、删除、监听等一系列操作接口;
nacos-consistency: 数据一致性协议相关接口,如APProtocol、CPProcotol、ConsistencyProtocol等;
nacos-core: Nacos内核包,非常核心,如nacos-consistency中一致性协议的实现、集群管理等等
nacos-naming: Nacos注册中心服务端包,采用SpringMVC对外提供关于服务注册与发现等一系列操作接口
在上面说到,客户端微服务调用了一个注册服务的接口发往客户端,那么服务端这边是怎么处理的呢
Nacos服务器使用了一个双重Map结构来保存实例数据,具体为ServiceManager类中的属性serviceMap
这里设计到nacos的相关参数概念
Namespace:比较好理解,主要用于环境隔离,如dev、prod等,nacos中对应数据模型:
com.alibaba.nacos.console.model.Namespace;
Group:nacos注册中心中没有Group的数据模型,我觉得这是个逻辑概念,处于同一个Namespace下的服务还可以以Group进行隔离,可以在Namespace的基础上提供更小粒度的隔离;
Service:服务信息,可以对照具体的微服务(如帐号服务、订单服务)来理解,里面封装了集群列表,而集群又封装了实例列表,nacos中对应数据模型:com.alibaba.nacos.naming.core.Service;
Cluster:集群信息,每个服务可以部署多个集群(如多机房部署,一个机房就是一个集群),nacos中对应数据模型:com.alibaba.nacos.naming.core.Cluster;
Instance:具体的服务实例信息,如果单个账号服务实例,如果是单集群部署,就会对应一个实例列表,一个实例列表都处于一个Cluster中;如果是多集群部署,那么就会存在多个Cluster,每个Cluster都会存储这样一个实例列表。nacos中对应实例的数据模型:com.alibaba.nacos.naming.core.Instance;
接着上面客户端发送Grpc注册服务请求后通过一系列调用会来到DistroConsistencyServiceImpl类中的put方法
public void put(String key, Record value) throws NacosException {
#核心注册逻辑
this.onPut(key, value);
if (!((UpgradeJudgement)ApplicationUtils.getBean(UpgradeJudgement.class)).isUseGrpcFeatures()) {
#同步Instances给其他nacos集群 通过异步任务来实现,DistroProtocol类是Distro协议的其中一个核心实现
this.distroProtocol.sync(new DistroKey(key, "com.alibaba.nacos.naming.iplist."), DataOperation.CHANGE, DistroConfig.getInstance().getSyncDelayMillis());
}
}
其onPut方法中会调用notifier.addTask往阻塞队列新增Instances
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum();
datum.value = (Instances)value;
datum.key = key;
datum.timestamp.incrementAndGet();
this.dataStore.put(key, datum);
}
if (this.listeners.containsKey(key)) {
# 调用本类的线程池新增
this.notifier.addTask(key, DataOperation.CHANGE);
}
}
接下来我们进入addTask具体的实现,这也是nacos服务注册的核心逻辑,通过线程put进阻塞队列的设计。
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap(10240);
#阻塞队列
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue(1048576);
public Notifier() {
}
public void addTask(String datumKey, DataOperation action) {
if (!this.services.containsKey(datumKey) || action != DataOperation.CHANGE) {
if (action == DataOperation.CHANGE) {
this.services.put(datumKey, "");
}
#BlockingQueue接口方法 往队列尾部添加元素,不会阻塞
#至此服务注册流程已经结束
this.tasks.offer(Pair.with(datumKey, action));
}
}
既然是队列存储,那一定是会有地方从此队列取出来进行消费出来,我们可以找到在DistroConsistencyServiceImpl类的init()方法中发现,且该方法还标注了一个@PostConstruct注解
我们继续查看Notifier线程的run方法
public void run() {
Loggers.DISTRO.info("distro notifier started");
#死循环从阻塞队列取数据处理。
while(true) {
while(true) {
try {
#BlockingQueue接口take方法 获取队列头部第1个元素,会一直阻塞直到取得元素或当前线程中断。
Pair<String, DataOperation> pair = (Pair)this.tasks.take();
this.handle(pair);
} catch (Throwable var2) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", var2);
}
}
}
}
相当于这个Notifier线程只干了一件事,在线程启动后就死循环扫描该队列,并获取数据处理,我们再次点击进去handler方法查看数据如何处理的。
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = (String)pair.getValue0();
#获取此处数据的操作方式
DataOperation action = (DataOperation)pair.getValue1();
this.services.remove(datumKey);
int count = 0;
if (!DistroConsistencyServiceImpl.this.listeners.containsKey(datumKey)) {
return;
}
Iterator var5 = ((ConcurrentLinkedQueue)DistroConsistencyServiceImpl.this.listeners.get(datumKey)).iterator();
while(var5.hasNext()) {
#获取监听器列表
RecordListener listener = (RecordListener)var5.next();
++count;
try {
if (action == DataOperation.CHANGE) {
#向各监听器调用更新注册表
listener.onChange(datumKey, DistroConsistencyServiceImpl.this.dataStore.get(datumKey).value);
} else if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
}
} catch (Throwable var8) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, var8);
}
}
debug可以发现最终到了Service类的onChange方法
这里主要是更改Instance相对于的权重属性
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
Iterator var3 = value.getInstanceList().iterator();
while(var3.hasNext()) {
Instance instance = (Instance)var3.next();
if (instance == null) {
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0) {
instance.setWeight(10000.0);
}
if (instance.getWeight() < 0.01 && instance.getWeight() > 0.0) {
instance.setWeight(0.01);
}
}
#核心更新逻辑
this.updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
this.recalculateChecksum();
}
我们直接进入到updateIPs方法中,这里使用了copyonwrite的思想更新注册表,关键拷贝的不是整个注册表而是注册表中的Cluster实例列表,因此如果注册表中数据很多,最终拷贝的也只是很小一部分不会产生什么影响
Nacos如何解决多节点读写并发冲突
copyonwrite:适合读多写少的情况,最大程度的提高读的效率,牺牲空间,换取性能的思想