nacos 删除过期实例也是注册中心的一个重要功能,今天我们从入口到结束分析一下,首先确定删除的入口在服务端注册接口的源码里,此处可以参考:参考注册源码
一、注册入口
1、创建空服务
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建空实例,删除过期实例入口,点击进入
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//核心代码,注册实例入口
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
点击 createEmptyService方法
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);//点击进入
}
继续进入
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);//先获取
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);//存放集群数据
}
service.validate();
//核心代码,点击进入
putServiceAndInit(service);
if (!local) {//还没有处理监听,此时处理
addOrReplaceService(service);
}
}
}
点击putServiceAndInit(service);来到
private void putServiceAndInit(Service service) throws NacosException {
putService(service);//实例存放到map中,点击
service.init();//定时任务初始化,点击
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
点击putService(service);来到:
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
//创建内存注册表
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}//创建内存注册表第二层数据
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
2、点击 service.init(); 来的 service类
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);//定时任务执行
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
3、找到 clientBeatCheckTask 类点击进入
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
//如果某个实例超过15秒没有收到心跳,相关属性设置为false
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);//设置false
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
//如果超过30秒没有收到心跳,直接剔除该实例,被剔除的实例如果恢复心跳,则会重新注册
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);//删除实例,点击进入
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
点击 deleteIp(instance);
private void deleteIp(Instance instance) {
try {
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
// delete instance asynchronously:开始异步删除,点击
HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJson(), result.getMessage(), result.getCode());
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
}
}
4、点击HttpClient.asyncHttpDelete
public static void asyncHttpDelete(String url, List<String> headers, Map<String, String> paramValues,
Callback<String> callback) throws Exception {
//删除实例接口,调到InstanceController的deregister接口
asyncHttpRequest(url, headers, paramValues, callback, HttpMethod.DELETE);
}
点击 asyncHttpRequest 方法
public static void asyncHttpRequest(String url, List<String> headers, Map<String, String> paramValues,
Callback<String> callback, String method) throws Exception {
Query query = Query.newInstance().initParams(paramValues);
query.addParam("encoding", "UTF-8");
query.addParam("nofix", "1");
Header header = Header.newInstance();
if (CollectionUtils.isNotEmpty(headers)) {
header.addAll(headers);
}
header.addParam(HttpHeaderConsts.ACCEPT_CHARSET, "UTF-8");
AuthHeaderUtil.addIdentityToHeader(header);
switch (method) {//异步增删改查服务
case HttpMethod.GET:
ASYNC_REST_TEMPLATE.get(url, header, query, String.class, callback);
break;
case HttpMethod.POST:
ASYNC_REST_TEMPLATE.postForm(url, header, paramValues, String.class, callback);
break;
case HttpMethod.PUT:
ASYNC_REST_TEMPLATE.putForm(url, header, paramValues, String.class, callback);
break;
case HttpMethod.DELETE://删除接口,点击
ASYNC_REST_TEMPLATE.delete(url, header, query, String.class, callback);
break;
default:
throw new RuntimeException("not supported method:" + method);
}
}
5、点击 ASYNC_REST_TEMPLATE.delete(url, header, query, String.class, callback);
来到 NacosAsyncRestTemplate extends AbstractNacosRestTemplate 类
public <T> void delete(String url, Header header, Query query, Type responseType, Callback<T> callback) {
//点击
execute(url, HttpMethod.DELETE, new RequestHttpEntity(header, query), responseType, callback);
}
点击 execute 方法
private <T> void execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type type,
Callback<T> callback) {
try {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<T> responseHandler = super.selectResponseHandler(type);
//正式调用
clientRequest.execute(uri, httpMethod, requestEntity, responseHandler, callback);
} catch (Exception e) {
// When an exception occurs, use Callback to pass it instead of throw it directly.
callback.onError(e);
}
}
最终调到com.alibaba.nacos.naming.controllers.InstanceController#deregister
二、删除入口
来到com.alibaba.nacos.naming.controllers.InstanceController#deregister方法
@CanDistro
@DeleteMapping//删除实例
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String deregister(HttpServletRequest request) throws Exception {
Instance instance = getIpAddress(request);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
return "ok";
}
//正式删除,点击进入
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return "ok";
}
1、点击 serviceManager.removeInstance 来到
ServiceManager implements RecordListener<Service>
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
Service service = getService(namespaceId, serviceName);
synchronized (service) {//同步删除,进入
removeInstance(namespaceId, serviceName, ephemeral, service, ips);
}
}
点击 removeInstance(namespaceId, serviceName, ephemeral, service, ips);
private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service,
Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//开始删除,点击进入
List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);//核心代码,点击
}
点击 substractIpAddresses(service, ephemeral, ips);
private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips)
throws NacosException {
//点击
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
}
来到
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length);
}
for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());//删除数据
} else {
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
instanceMap.put(instance.getDatumKey(), instance);//添加数据
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
2、点击 consistencyService.put(key, instances);进入
com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
@Override
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);//进入
}
点击 mapConsistencyService(key)
private ConsistencyService mapConsistencyService(String key) {
//临时ephemeralConsistencyService 阿里自己实现的基于AP模式的Distro协议
// 持久化数据persistentConsistencyService
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
点击put方法,对应一下两个实现类
3、来到
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
临时实例处理
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);//将注册实例更新到内存注册表,临时实时数据
//同步 信息到nacos服务端的其他节点
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
点击 onPut(key, value);
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
点击 distroProtocol.sync
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//开始同步到其他节点
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
点击addTask
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);//存入map
} finally {
lock.unlock();
}
}
临时实例
4、来到处理持久化实例处理
com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl#put
@Override
public void put(String key, Record value) throws NacosException {
//阿里自己实现的CP模式的raft算法协议
checkIsStopWork();
try {//阿里自己实现的CP模式的raft算法协议,点击
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}
进入com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore#signalPublish
public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
//判断此节点是否是leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
//将注册请求转发到leader节点,点击
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
//更新注册实例数据到内存和磁盘文件上,点击
onPublish(datum, peers.local());
final String content = json.toString();
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
//UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/datum/commit";
final String url = buildUrl(server, API_ON_PUB);
//利用CountDownLatch实现一个简单的raft协议写入数据的逻辑,必须集群半数以上节点写入成功才会给客户端返回成功
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, result.getCode());
return;
}
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
}
@Override
public void onCancel() {
}
});
}
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
点击 proxyPostLarge 方法com.alibaba.nacos.naming.consistency.persistent.raft.RaftProxy#proxyPostLarge
public void proxyPostLarge(String server, String api, String content, Map<String, String> headers)
throws Exception {
// do proxy
if (!IPUtil.containsPort(server)) {
server = server + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort();
}
String url = "http://" + server + EnvUtil.getContextPath() + api;
RestResult<String> result = HttpClient.httpPostLarge(url, headers, content);
if (!result.ok()) {
throw new IllegalStateException("leader failed, caused by: " + result.getMessage());
}
}
点击 onPublish(datum, peers.local());
public void onPublish(Datum datum, RaftPeer source) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);//同步写实时数据到文件
}
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
//发布事件ValueChangeEvent更新内存注册表,点击
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
点击 NotifyCenter.publishEvent来到 com.alibaba.nacos.common.notify.NotifyCenter#publishEvent(com.alibaba.nacos.common.notify.Event)
public static boolean publishEvent(final Event event) {
try {
return publishEvent(event.getClass(), event);//触发监听
} catch (Throwable ex) {
LOGGER.error("There was an exception to the message publishing : {}", ex);
return false;
}
}
点击
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);//触发监听,点击
}
final String topic = ClassUtils.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
来到com.alibaba.nacos.common.notify.DefaultPublisher#publish
@Override
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);//触发监听,点击
return true;
}
return true;
}
来到
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
// Notification single event listener
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.触发监听,点击
notifySubscriber(subscriber, event);
}
}
进入
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
//触发监听到子类PersistentNotifier,进入
final Runnable job = new Runnable() {
@Override
public void run() {
subscriber.onEvent(event);
}
};
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception : {}", e);
}
}
}
来到com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier#onEvent
@Override//事件监听
public void onEvent(ValueChangeEvent event) {
notify(event.getKey(), event.getAction(), find.apply(event.getKey()));//点击进入
}
进入
public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) {
for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
try {
if (action == DataOperation.CHANGE) {
listener.onChange(key, value);//监听改变,进入service类
}
if (action == DataOperation.DELETE) {
listener.onDelete(key);//监听删除
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
}
}
}
}
if (!listenerMap.containsKey(key)) {
return;
}
for (RecordListener listener : listenerMap.get(key)) {
try {
if (action == DataOperation.CHANGE) {
listener.onChange(key, value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(key);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
}
}
}
进入com.alibaba.nacos.naming.core.Service#onChange
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//将注册实例更新到注册表内存结构里去,点击
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
点击
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
//将临时注册实例更新到cluster的ephemeralInstances属性上去,服务发现查找临时实例最终从内存里找到的就是这个属性
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);//发布服务变化事件,点击
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
来到com.alibaba.nacos.naming.push.PushService#serviceChanged
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap
.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
//udp方式将服务变动通知给订阅的客户端
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
到此,删除过期实例的源码分析完成,下篇分析获取服务列表的源码,敬请期待!