安装Nacos源码
上一篇文章我们了解了《Nacos服务注册》客户端源码,本篇文章我们来看一下服务注册Nacos服务端的源码执行情况。首先需要下载Nacos源码, https://github.com/alibaba/nacos/releases/tag/1.4.3 ,
解压之后使用IDEA工具导入即可。
但是编译过后发现代码会报错,主要是缺少实体类,比如:
安装protobuf
这主要是应该nacos数据通信底层使用到protobuf进行序列化(与JSON类似),是Google提供的一种数据序列化协议
Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
所以这里我们需要安装protobuf ,先去下载 https://github.com/protocolbuffers/protobuf/releases,下载window版本如下:
- 下载之后解压
- 然后需要配置环境变量
- 找到consistency模块,进入src/main
- 进入main目录,执行cmd命令
protoc --java_out=./java ./proto/consistency.proto
protoc --java_out=./java ./proto/Data.proto
效果如下:
启动Nacos
找到console控制台,启动Nacos,第一次启动会报错,因为默认是以集群方式启动,会出现jdbc.properties找不到的错误
- 然后指定为单机启动,指定VM参数
- 启动成功
- 访问 http://localhost:8848/nacos/index.html 进入控制台
- 到这里,nacos服务端的源码就启动成功了,那么我们尝试启动nacos-client程序,让他注册到nacos-server
- 查看控制台,nacos-client成功注册到服务端
服务注册
在上一章节《Nacos源码分析-服务注册(客户端)》我们有分析到,nacos-client提交注册的地址是post /nacos/v1/ns/instance
,那么我们在nacos-server源码中找到该接口,它位于 naming 模块中的/controllers包下的InstanceController
接口中。源码如下
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
@Autowired
private SwitchDomain switchDomain;
@Autowired
private PushService pushService;
@Autowired
private ServiceManager serviceManager;
...省略...
/**
注册一个新的实例
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
//request请求对象中包括了注册的服务的port,namespaceId,groupName,serviceName,ip,集群名等等
public String register(HttpServletRequest request) throws Exception {
//拿到注册的服务的:namespaceId,默认是public
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//拿到注册的服务的:serviceName服务名会把组名加在前面,比如:DEFAULT_GROUP@@nacos-client
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//检查服务名的格式:groupName@@serviceName
NamingUtils.checkServiceNameFormat(serviceName);
//解析请求参数,封装服务实例对戏,把注册的服务封装为Instance,其中包括IP,端口,服务名等
final Instance instance = parseInstance(request);
//使用ServiceManger注册服务实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
//解析要注册的服务实例
private Instance parseInstance(HttpServletRequest request) throws Exception {
//拿到服务名 DEFAULT_GROUP@@nacos-client
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//拿到app,没配置就是:unknown
String app = WebUtils.optional(request, "app", "DEFAULT");
//拿到注册服务的:IP,是否开启服务,权重,健康状况,等封装为Instance 对象
Instance instance = getIpAddress(request);
instance.setApp(app);
instance.setServiceName(serviceName);
// Generate simple instance id first. This value would be updated according to
// 生成实例的ID:192.168.174.1#8080#DEFAULT#DEFAULT_GROUP@@nacos-client
instance.setInstanceId(instance.generateInstanceId());
//设置最后的心跳时间为当前时间
instance.setLastBeat(System.currentTimeMillis());
String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
if (StringUtils.isNotEmpty(metadata)) {
instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
}
//验证实例
instance.validate();
return instance;
}
register方法中会从请求对象中拿到注册的参数比如IP,是否开启服务,权重,健康状况等,然后封装为 instance对象,交给 serviceManager.registerInstance 去注册,下面是 serviceManager.registerInstance的源码
缓存和初始化serivce
@Component
public class ServiceManager implements RecordListener<Service> {
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
...省略部分代码...
//注册服务实例
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//1.会尝试从serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,
// 并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager的一个ConcurrentHashMap中
// 服务注册表的结构是Map<String,Map<String,Service>>
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//从注册表中获取服务,注册表是一个Map<String,Map<String,Service>>结构,
// 先根据namespaceId取得到Map<String,Service>,然后再根据serviceName取Service
Service service = getService(namespaceId, serviceName);
//参数无效,没有找到服务
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//添加 instance 服务实例到注册表
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
...省略部分代码...
//二.创建service,并初始化
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
//如果服务不存在就创建一个service
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
//保存service和初始化service
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
//保存service和初始化service
private void putServiceAndInit(Service service) throws NacosException {
//保存service
putService(service);
service = getService(service.getNamespaceId(), service.getName());
//初始化service
service.init();
//consistencyService.listen实现数据一致性监听
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
//保存service到注册表中
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
//把注册的服务存储到Map中
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
registerInstance做了三个事情
-
通过putService()方法将服务缓存到内存
-
service.init()建立心跳机制
-
consistencyService.listen实现数据一致性监听
registerInstance方法会尝试从ServiceManager#serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager#serviceMap中。
该Map是一个ConcurrentHashMap,结构是Map<String,Map<String,Service>>。第一个Key是NamespaceId 如:public ,第二个key是服务名,如 : DEFAULT_GROUP@@nacos-client
这就是nacos中的的服务注册表,用来存放注册的服务实例的Map.
注意:service和instance的关系是,一个service中包含一个 Map<String, Cluster> , 一个Cluster中包含一个 Set。
- service代表一个服务:比如用户服务
- Cluster代表服务集群,比如2个用户服务形成一个集群
- 而一个集群中有多个服务实例,所以Cluster中有了Set 来保存服务实例
除此之外还会调用 com.alibaba.nacos.naming.core.Service#init 方法对service进行初始化,下面是init方法的源码
public void init() {
//clientBeatCheckTask 是一个Runnable,它持有service,它的作用是
//检查并更新临时实例的状态,如果它们已过期,则将其删除
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
//定时任务:定时检查服务的健康状况,5S一次
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
service.init 初始化方法中主要是把service封装到 ClientBeatCheckTask 对象中,ClientBeatCheckTask 是一个Runnable线程对象,然后使用定时任务5s执行一次健康检查。 ClientBeatCheckTask 的作用是 : 检查并更新临时实例的状态,如果它们已过期,则将其删除
下面是 com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run 线程对象的源码
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
//拿到服务中的所有实例
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
//当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
//健康状态设置为false
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
//发布时间:服务状态改变
getPushService().serviceChanged(service);
//发布时间:服务实例心跳超时事件
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
run方法中会拿到当前service的所有instance,然后循环 , 如果:当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时,然后会改变instance的Healthy健康状态Wie false; 并抛出 服务实例心跳超时事件
getPushService().serviceChanged(service):方法很有意思,他的作用是通知 nacos-client该服务已经下线(UDP协议 push),这样的话nacos-client就会从本地剔除掉下线的服务。这就是它和eureka不一样的地方,eureka使用的是pull.而 nacos采用pull + push模式。 具体源码见: PushService#onApplicationEvent
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
//使用定时任务 1s 一次
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
//服务改变,添加到 push队列
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));
//使用udp协议push
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
添加instance
到这里,service的缓存和初始化就看完了,代码回到 com.alibaba.nacos.naming.core.ServiceManager#registerInstance 。接下来就是分析 addInstance方法
//添加一个instance到Add instance to service.
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//拿到key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@nacos-client
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//拿到service
Service service = getService(namespaceId, serviceName);
//对service加同步锁,避免并发修改
synchronized (service) {
//拿到该service中的所有instance
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
//把实例列表封装到Instances 对象中
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性
consistencyService.put(key, instances);
}
}
addInstance方法中会拿到service中的List<Instance>
实例列表,然后设置到 Instances 中,调用 consistencyService去同步到nacos集群。
这里采用了CopyOnWrite方案。对于 addIPAddress方法会拷贝旧的实例列表添加到新实例到列表中。在同步完nacos集群后,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。
这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案
consistencyService是用作service同步的。代表集群一致性的接口。
下面看一下 consistencyService.put 方法,底层会调用 DistroConsistencyServiceImpl#put 方法,源码如下
@Override
public void put(String key, Record value) throws NacosException {
//根据key确定是用ephemeralConsistencyService或者persistentConsistencyService
mapConsistencyService(key).put(key, value);
}
private ConsistencyService mapConsistencyService(String key) {
//key以 ephemeral 开头就是临时实例
// 临时实例选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类
// 持久实例选择 persistentConsistencyService,也就是PersistentConsistencyServiceDelegateImpl
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
//初始化方法,
@PostConstruct
public void init() {
//把notifier提交给线程池
GlobalExecutor.submitDistroNotifyTask(notifier);
}
@Override
public void put(String key, Record value) throws NacosException {
//把实例保存到本地实例表
onPut(key, value);
//使用distro协议同步到集群
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
put方法中,会先先根据服务的key判断使用临时同步服务ephemeralConsistencyService ,或者持久同步服务persistentConsistencyService。然后会做2个事情
- 调用onPut :把实例保存到本地实例列表 。
- 调用distroProtocol.sync把实例同步到集群
更新服务列表
对于onPut 方法中做了2个事情.
- 一个是把实例封装到Datum对象中,然后交给dataStore存储起来。
- 另一个是通过notifier.addTask 把key放入阻塞队列,然后会通过线程池异步去执行阻塞队列
public void onPut(String key, Record value) {
//判断是否是临时实例
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//把数据存储到dataStore,内部维护了一个Map
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//这里是把key放入一个阻塞队列,然后会用线程池异步去执行队列
notifier.addTask(key, DataOperation.CHANGE);
}
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
//一个阻塞队列
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
//如果是change,就把key放入一个map中
services.put(datumKey, StringUtils.EMPTY);
}
//加入阻塞队列
tasks.offer(Pair.with(datumKey, action));
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
//从阻塞队列中取出任务
Pair<String, DataOperation> pair = tasks.take();
//处理任务更新服务列表
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
Notifier是一个Runnable,其中维护了一个tasks(ArrayBlockingQueue)用来存储服务列表的变更事件。他的run方法中是一个死循环,不停的从阻塞队列中取出任务交给handle方法去处理。下面是 DistroConsistencyServiceImpl.Notifier#handle方法
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(datumKey);
if (recordListeners == null) {
Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey);
return;
}
//拿到有change的service,RecordListener 就是 service的接口
for (RecordListener listener : recordListeners) {
count++;
try {
//如果是change事件
if (action == DataOperation.CHANGE) {
//取出服务
Datum datum = dataStore.get(datumKey);
if (datum != null) {
//执行linster的change事件。更新服务列表
listener.onChange(datumKey, datum.value);
} else {
Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey);
}
continue;
}
//处理服务的delete事件
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
handle方法中会找到有变化的RecordListener,其实就是service(change 或者 delete事件)然后,触发onChange方法,其实就是调用 com.alibaba.nacos.naming.core.Service#onChange方法。
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
//遍历service中的所有实例instance
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
//设置权重
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//修改IP
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
该方法中会调用updateIPS去更新服务实例,源码如下
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 准备一个HashMap,key是cluster,值是集群下的Instance集合
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 获取集群名称存储到map中,key是集群名
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
// 遍历要更新的实例
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
// 判断实例是否包含clusterName,没有的话用默认cluster
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 判断cluster是否存在,不存在则创建新的cluster
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
// 获取当前cluster实例的集合,不存在则创建新的
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 添加新的实例到 Instance 集合
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 这里就是在更新注册表
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
//设置最后修改时间
setLastModifiedMillis(System.currentTimeMillis());
// 发布服务变更的通知消息
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
上面代码中 ,clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); 就是 在更新服务注册表,因为service#clusterMap 是一个Map<String, Cluster> 结构,cluster中就是服务实例。然后会调用 .Cluster#updateIps去更新实例。源码如下
public void updateIps(List<Instance> ips, boolean ephemeral) {
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
//拿到旧的服务列表,
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
...省略部分代码...
// 检查新加入实例的状态
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
//重置服务的健康状态
HealthCheckStatus.reset(ip);
}
}
// 移除要删除的实例
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
//移除
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
// 直接覆盖旧实例列表
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
同步服务到集群
接下来回到 DistroConsistencyServiceImpl#put方法中。刚才说到该方法做了2个事情
- onPut(key, value) : 更新服务列表
- distroProtocol.sync :同步服务到集群
我们现在来看一下sync方法是怎么做的,下面是方法的源码
/**
* 开始同步数据到所有的远程服务
* Start to sync data to all remote server.
*
* @param distroKey distro key of sync data
* @param action the action of data operation
*/
public void sync(DistroKey distroKey, DataOperation action, long delay) {
//拿到除开自己以外的所有nacos集群中的成员
for (Member each : memberManager.allMembersWithoutSelf()) {
//构建一个key
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
//构建一个延迟任务对象
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//交给线程池去执行,维护了一个DistroDelayTaskExecuteEngine
//任务交给 NacosDelayTaskExecuteEngine 引擎 其中维护了一个ScheduledExecutorService线程池
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
该方法中会找到所有的nacos集群的成员(除开自己),然后会拿到服务的key(DistroKey )构建一个DistroDelayTask任务对象,交给线程池去执行同步。
这里维护了一个 DelayTaskExecuteEngine 延迟任务执行引擎NacosDelayTaskExecuteEngine,任务的执行通过引擎的 processTasks方法完成com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasks
protected void processTasks() {
//拿到所有任务
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
//任务执行器
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
//执行任务,任务失败会重试
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
//重试失败的任务
retryFailedTask(taskKey, task);
}
}
}
总结
文章有点长,下面做个总结,从大的流程上来说分为如下几个步骤
- instanceController接口: nacos服务点接受到注册请求后会把请求解析为Instance,紧接着会执行serviceManager#registerInstance方法注册实例
- serviceManager#registerInstance方法中会先尝试创建Service对象,并缓存到一个Map<String, Map<String, Service>> 结构的服务注册表中,然后对每个service做初始化,主要是使用线程池10s一次检查服务是否健康状态,过期的服务会删除掉。
- serviceManager#registerInstance第二个事情就是执行addInstances方法添加实例,该方法会触发服务列表的更新以及把服务同步到其他nacos集群中。
文章到这里就结束了,如果文章对你有所帮助,请给个好评,你的鼓励是我最大的动力