Nacos2.X 配置中心源码分析:客户端如何拉取配置、服务端配置发布客户端监听机制

news2025/1/12 9:04:45

文章目录

  • Nacos配置中心源码
    • 总流程图
    • NacosClient源码分析
      • 获取配置
      • 注册监听器
    • NacosServer源码分析
      • 配置dump
      • 配置发布

Nacos配置中心源码

总流程图

Nacos2.1.0源码分析在线流程图

在这里插入图片描述

源码的版本为2.1.0 ,并在配置了下面两个启动参数,一个表示单机启动,一个是指定nacos的工作目录,其中会存放各种运行文件方便查看

-Dnacos.standalone=true
-Dnacos.home=D:\nacos-cluster\nacos2.1.0standalone



NacosClient源码分析

在NacosClient端服务注册中心核心的接口是NamingService,而配置中心核心的接口是ConfigService

我们可以添加一个配置,然后查看这里的实例代码

在这里插入图片描述

/*
* Demo for Nacos
* pom.xml
    <dependency>
        <groupId>com.alibaba.nacos</groupId>
        <artifactId>nacos-client</artifactId>
        <version>${version}</version>
    </dependency>
*/
package com.alibaba.nacos.example;

import java.util.Properties;
import java.util.concurrent.Executor;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;

/**
 * Config service example
 *
 * @author Nacos
 *
 */
public class ConfigExample {

	public static void main(String[] args) throws NacosException, InterruptedException {
		String serverAddr = "localhost";
		String dataId = "nacos-config-demo.yaml";
		String group = "DEFAULT_GROUP";
		Properties properties = new Properties();
		properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        //获取配置服务
		ConfigService configService = NacosFactory.createConfigService(properties);
        //获取配置
		String content = configService.getConfig(dataId, group, 5000);
		System.out.println(content);
        //注册监听器
		configService.addListener(dataId, group, new Listener() {
			@Override
			public void receiveConfigInfo(String configInfo) {
				System.out.println("recieve:" + configInfo);
			}

			@Override
			public Executor getExecutor() {
				return null;
			}
		});

        //发布配置
		//boolean isPublishOk = configService.publishConfig(dataId, group, "content");
		//System.out.println(isPublishOk);
        //发送properties格式
        configService.publishConfig(dataId,group,"common.age=30", ConfigType.PROPERTIES.getType());

		Thread.sleep(3000);
		content = configService.getConfig(dataId, group, 5000);
		System.out.println(content);

/*		boolean isRemoveOk = configService.removeConfig(dataId, group);
		System.out.println(isRemoveOk);
		Thread.sleep(3000);

		content = configService.getConfig(dataId, group, 5000);
		System.out.println(content);
		Thread.sleep(300000);*/

	}
}



获取配置

总结:

获取配置的主要方法是 NacosConfigService 类的 getConfig 方法,通常情况下该方法直接从本地文件中取得配置的值,如果本地文件不存在或者内容为空,则再通过grpc从远端拉取配置,并保存到本地快照中。

NacosServer端的处理是从磁盘读取配置文件./nacosHome/data/config-data/DEFAULT_GROUP/dataId,然后将读取到的content返回

在这里插入图片描述



接下来的源码就是这里一块的流程,它是如何调用到NacosConfigService 类的 getConfig ()方法

public interface ConfigService {
    String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
    ......
}

在这里插入图片描述


还是一样,从spring.factiries文件中起步

在这里插入图片描述



进入到NacosConfigBootstrapConfiguration自动配置类的,这其中会创建一个NacosPropertySourceLocatorbean对象

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(
    name = {"spring.cloud.nacos.config.enabled"},
    matchIfMissing = true
)
public class NacosConfigBootstrapConfiguration {
    public NacosConfigBootstrapConfiguration() {
    }

    @Bean
    @ConditionalOnMissingBean
    public NacosConfigProperties nacosConfigProperties() {
        return new NacosConfigProperties();
    }

    @Bean
    @ConditionalOnMissingBean
    public NacosConfigManager nacosConfigManager(NacosConfigProperties nacosConfigProperties) {
        return new NacosConfigManager(nacosConfigProperties);
    }

    // 核心bean
    @Bean
    public NacosPropertySourceLocator nacosPropertySourceLocator(NacosConfigManager nacosConfigManager) {
        return new NacosPropertySourceLocator(nacosConfigManager);
    }

    @Bean
    @ConditionalOnMissingBean(
        search = SearchStrategy.CURRENT
    )
    @ConditionalOnNonDefaultBehavior
    public ConfigurationPropertiesRebinder smartConfigurationPropertiesRebinder(ConfigurationPropertiesBeans beans) {
        return new SmartConfigurationPropertiesRebinder(beans);
    }
}




NacosPropertySourceLocator这个bean中,它的接口中的默认方法会调用locate()方法:

  • 加载共享配置文件,也就是shared-configs配置项指定的数组

  • 加载加载扩展的配置文件,也就是extension-configs配置项指定的数组

  • 加载和应用名相关的几个默认配置文件,比如order-service-dev.yml

  • 上面三个方法中都会各自调用到loadNacosDataIfPresent() --> loadNacosPropertySource(...) --> NacosPropertySourceBuilder.build()

public class NacosPropertySourceLocator implements PropertySourceLocator {
    
    public PropertySource<?> locate(Environment env) {
        this.nacosConfigProperties.setEnvironment(env);
        // 获取配置中心服务ConfigService
        ConfigService configService = this.nacosConfigManager.getConfigService();
        if (null == configService) {
            log.warn("no instance of config service found, can't load config from nacos");
            return null;
        } else {
            long timeout = (long)this.nacosConfigProperties.getTimeout();
            this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);
            String name = this.nacosConfigProperties.getName();
            String dataIdPrefix = this.nacosConfigProperties.getPrefix();
            if (StringUtils.isEmpty(dataIdPrefix)) {
                dataIdPrefix = name;
            }

            if (StringUtils.isEmpty(dataIdPrefix)) {
                dataIdPrefix = env.getProperty("spring.application.name");
            }

            CompositePropertySource composite = new CompositePropertySource("NACOS");
            // 加载共享配置文件
            this.loadSharedConfiguration(composite);
            // 加载扩展的配置文件
            this.loadExtConfiguration(composite);
            // 加载当前应用配置文件
            // 在该方法中会进行下面三行的逻辑
            /*
            dataIdPrefix
            dataIdPrefix + "." + fileExtension
            dataIdPrefix + "-" + profile + "." + fileExtension
            */
            this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);
            return composite;
        }
    }
    
    // 可以详细看一下,NacosClient启动时,是怎么根据微服务名去取配置文件的
    private void loadApplicationConfiguration() {
        String fileExtension = properties.getFileExtension();
        String nacosGroup = properties.getGroup();
        // 最先使用微服务名 调用下面的loadNacosDataIfPresent()方法
        this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);
        // 接下来是使用微服务名+文件后缀名的方式 调用下面的loadNacosDataIfPresent()方法
      this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + "." + fileExtension, nacosGroup, fileExtension, true);
        String[] var7 = environment.getActiveProfiles();
        int var8 = var7.length;

        for(int var9 = 0; var9 < var8; ++var9) {
            String profile = var7[var9];
            // 第三次使用 微服务名+profile + 文件后缀名 调用下面的loadNacosDataIfPresent()方法
            String dataId = dataIdPrefix + "-" + profile + "." + fileExtension;
            this.loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);
        }

    }
    
    // 在上面三个加载共享、扩展、当前应用名方法中,最终都会调用到下面的loadNacosDataIfPresent(...) 方法中
    private void loadNacosDataIfPresent(...) {
        if (null != dataId && dataId.trim().length() >= 1) {
            if (null != group && group.trim().length() >= 1) {
                // 调用loadNacosPropertySource()方法
                NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);
                this.addFirstPropertySource(composite, propertySource, false);
            }
        }
    }
    
    // loadNacosDataIfPresent(...) ---> loadNacosPropertySource(...)
    private NacosPropertySource loadNacosPropertySource(...) {
        return NacosContextRefresher.getRefreshCount() != 0L && !isRefreshable ? 	
            			NacosPropertySourceRepository.getNacosPropertySource(dataId, group) : 
        				// 这里会进入到NacosPropertySourceBuilder类的build方法
        				this.nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);
    }
    
}




NacosPropertySourceBuilder类的代码调用流程:

  • 这里就会调用到核心接口configService接口实现类的getConfig()方法
public class NacosPropertySourceBuilder {
    ......

    NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {
        // 这里会先调用loadNacosData()方法
        List<PropertySource<?>> propertySources = this.loadNacosData(dataId, group, fileExtension);
        NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources, group, dataId, new Date(), isRefreshable);
        NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);
        return nacosPropertySource;
    }
    
    // build(...) ---> loadNacosData(...)
    private List<PropertySource<?>> loadNacosData(String dataId, String group, String fileExtension) {
        String data = null;

        try {
            // 这里进入到了configService接口实现类的getConfig()方法
            data = this.configService.getConfig(dataId, group, this.timeout);
            if (StringUtils.isEmpty(data)) {
                log.warn("Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]", dataId, group);
                return Collections.emptyList();
            }

            if (log.isDebugEnabled()) {
                log.debug(String.format("Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId, group, data));
            }

            return NacosDataParserHandler.getInstance().parseNacosData(dataId, data, fileExtension);
        } catch (NacosException var6) {
            log.error("get data from Nacos error,dataId:{} ", dataId, var6);
        } catch (Exception var7) {
            log.error("parse data from Nacos error,dataId:{},data:{}", new Object[]{dataId, data, var7});
        }

        return Collections.emptyList();
    }
    
}



核心方法,NacosClient向NacosServer发送请求,拉取配置的方法。

前面的调用栈如果不会,可以直接在下面getConfig()方法出打一个断点,然后从debug中看调用栈。方法具体的实现:

  • NacosClient端,这里首先会读取本地文件,本地是有一个缓存的
  • 如果本地缓存中没有我们需要的配置,那么就需要从NacosServer端拉取配置了
  • 发送请求,获取响应数据
  • 将数据在本地文件中缓存一份
// 只是方法调用
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
    return getConfigInner(namespace, dataId, group, timeoutMs);
}


private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    group = blank2defaultGroup(group);
    ParamUtils.checkKeyParam(dataId, group);
    ConfigResponse cr = new ConfigResponse();

    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);

    // use local config first
    // NacosClient端,这里首先会读取本地文件,本地是有一个缓存的
    String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
    if (content != null) {
        LOGGER.warn(..);
        cr.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
            .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cr.setEncryptedDataKey(encryptedDataKey);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }

    // 如果本地缓存中没有我们需要的配置,那么就需要从NacosServer端拉取配置了
    try {
        // 从服务端获取配置
        ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
        cr.setContent(response.getContent());
        cr.setEncryptedDataKey(response.getEncryptedDataKey());
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();

        return content;
    } catch (NacosException ioe) {
        if (NacosException.NO_RIGHT == ioe.getErrCode()) {
            throw ioe;
        }
        LOGGER.warn(..);
    }

    LOGGER.warn(..);
    // 再从本地文件缓存中找
    content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);

    cr.setContent(content);
    String encryptedDataKey = LocalEncryptedDataKeyProcessor
        .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
    cr.setEncryptedDataKey(encryptedDataKey);
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
}



//--------------------------
public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout, boolean notify)
    throws NacosException {
    // 默认组名
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }
    // 从服务端查询配置
    return this.agent.queryConfig(dataId, group, tenant, readTimeout, notify);
}

// 向NacosServer发送请求,拉取配置数据,并在本地文件中缓存一份
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
    throws NacosException {
    ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
    request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
    RpcClient rpcClient = getOneRunningClient();
    if (notify) {
        CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
        if (cacheData != null) {
            rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
        }
    }
    // 发送请求,获取响应数据
    ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);

    ConfigResponse configResponse = new ConfigResponse();
    if (response.isSuccess()) {
        // 将数据在本地文件中缓存一份
        LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
        configResponse.setContent(response.getContent());
        String configType;
        if (StringUtils.isNotBlank(response.getContentType())) {
            configType = response.getContentType();
        } else {
            configType = ConfigType.TEXT.getType();
        }
        configResponse.setConfigType(configType);
        String encryptedDataKey = response.getEncryptedDataKey();
        LocalEncryptedDataKeyProcessor
            .saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
        configResponse.setEncryptedDataKey(encryptedDataKey);
        return configResponse;
    } else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_NOT_FOUND) {
        LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, null);
        LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, null);
        return configResponse;
    } else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_QUERY_CONFLICT) {
        ...

    }
}



注册监听器

结论:

  • 从spring.facotries文件中开始,其中一个bean会监听spring容器启动完成的事件
  • 然后它会为当前应用添加监听器:遍历每个dataId,添加监听器。
  • 当nacosServer端更改了配置,这里监听器中的方法就会运行,这里都会发布一个RefreshEvent事件
  • 处理RefreshEvent事件的方法中会
    • 刷新环境变量
    • 销毁@RefreshScope注解修改的bean实例




NacosServer端如果修改了配置,就会发布一个事件,而在NacosClient端这边就会有一个EventListener去监听该事件并进行相应的处理。

ConfigService接口中,有三个和监听器相关的方法

public interface ConfigService {

    String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)
        throws NacosException;

    void addListener(String dataId, String group, Listener listener) throws NacosException;

    void removeListener(String dataId, String group, Listener listener);

}

在这里插入图片描述



接下来进入源码中,入口是NacosConfigAutoConfiguration自动配置的NacosContextRefresherbean 对象

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(
    name = {"spring.cloud.nacos.config.enabled"},
    matchIfMissing = true
)
public class NacosConfigAutoConfiguration {
	...

    @Bean
    public NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {
        return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
    }
    
	...
}

该类它监听了ApplicationReadyEvent事件,

  • 在spring容器启动完成后就会调用该类的onApplicationEvent()方法

  • 给当前应用注册nacos监听器

  • 为每个 dataId注册监听器

  • 当某个dataId发生了更改,这里都会发布一个RefreshEvent事件

public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent> , ApplicationContextAware {
 
    // 在spring容器启动完成后就会调用该类的onApplicationEvent()方法
     public void onApplicationEvent(ApplicationReadyEvent event) {
        if (this.ready.compareAndSet(false, true)) {
            // 给当前应用注册nacos监听器
            this.registerNacosListenersForApplications();
        }
    }
    
    
    
    // 给当前应用注册nacos监听器
    private void registerNacosListenersForApplications() {
        // 是否刷新配置,默认为true
        if (this.isRefreshEnabled()) {
            Iterator var1 = NacosPropertySourceRepository.getAll().iterator();
			// 遍历每个dataId
            while(var1.hasNext()) {
                NacosPropertySource propertySource = (NacosPropertySource)var1.next();
                if (propertySource.isRefreshable()) {
                    String dataId = propertySource.getDataId();
                    // 为每个 dataId注册监听器
                    this.registerNacosListener(propertySource.getGroup(), dataId);
                }
            }
        }
    }
    
    
    
    // 为每个 dataId注册监听器
    private void registerNacosListener(final String groupKey, final String dataKey) {
        String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
        // 定义一个监听器
        Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> {
            return new AbstractSharedListener() {
                public void innerReceive(String dataId, String group, String configInfo) {
                    NacosContextRefresher.refreshCountIncrement();
                    // 配置的历史记录
                    NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
                    // 发布一个RefreshEvent事件,会在处理该事件的位置真正进行刷新配置项
                    NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "..."));

                }
            };
        });

        try {
            
            // 调用configService接口的addListener()添加监听器
            this.configService.addListener(dataKey, groupKey, listener);
        } catch (NacosException var6) {
            log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6);
        }
    }
}



当NacosServer端某个配置文件改动后,就会回调上面监听器的innerReceive()方法,在该方法中就会发布RefreshEvent事件,处理该事件的是RefreshEventListener类中的onApplicationEvent()方法:

  • 直接调用refresh()方法
public class RefreshEventListener implements SmartApplicationListener {
	...

    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ApplicationReadyEvent) {
            this.handle((ApplicationReadyEvent)event);
        } else if (event instanceof RefreshEvent) {
            // 处理RefreshEvent事件,调用handler()方法
            this.handle((RefreshEvent)event);
        }

    }

    public void handle(ApplicationReadyEvent event) {
        this.ready.compareAndSet(false, true);
    }

    public void handle(RefreshEvent event) {
        if (this.ready.get()) {
            // 这里就会调用refresh()方法进行刷新
            Set<String> keys = this.refresh.refresh();
        }

    }
}




接下来就进入到了ContextRefresher类的refresh()方法:

  • 刷新环境变量
  • 销毁@RefreshScope注解修改的bean实例
public synchronized Set<String> refresh() {
    // 刷新环境变量
    Set<String> keys = this.refreshEnvironment();
    // 销毁@RefreshScope注解修改的bean实例
    this.scope.refreshAll();
    return keys;
}



NacosServer源码分析

配置dump

服务端启动时就会依赖 DumpService 的 init 方法,从数据库中 load 配置存储在本地磁盘上,并将一些重要的元信息例如 MD5 值缓存在内存中。服务端会根据心跳文件中保存的最后一次心跳时间,来判断到底是从数据库 dump 全量配置数据还是部分增量配置数据(如果机器上次心跳间隔是 6h 以内的话)。

全量 dump 当然先清空磁盘缓存,然后根据主键 ID 每次捞取一千条配置刷进磁盘和内存。增量 dump 就是捞取最近六小时的新增配置(包括更新的和删除的),先按照这批数据刷新一遍内存和文件,再根据内存里所有的数据全量去比对一遍数据库,如果有改变的再同步一次,相比于全量 dump 的话会减少一定的数据库 IO 和磁盘 IO 次数。



配置发布

在这里插入图片描述


结论:

  • 更改数据库中的数据,持久化信息到mysql

  • 触发一个ConfigDataChangeEvent事件。至此请求结束。

  • 接下来就处理上面的事件:

    • 遍历Nacos集群下的所有节点,包括自己

    • 生成一个http/rpc的任务对象去执行,这里就直接看rpc任务对象的处理

    • 判断是不是当前节点,如果是就调用dump()方法去处理

      • 将更改的数据保存至本地磁盘中

      • 生成md5,并通过一个key将md5存入cache中,再发布一个LocalDataChangeEvent事件,该事件存了key

        处理上方事件的方法中会开启一个任务,在任务的run()方法中会真正调用客户端发送grpc请求,发送一个ConfigChangeNotifyRequest请求对象

    • 如果不是当前节点就发送grpc请求为其他节点同步修改配置项



NacosClient端的处理

  • 接收到ConfigChangeNotifyRequest请求对象,然后就放入了一个阻塞队列中。
  • 客户端while死循环,队列中有任务了/每隔5s 从队列中获取任务/null,去执行配置监听器方法
  • 根据CacheData对象远程获取配置内容,进行md5的比较
  • 如果有变化就通知监听器去处理,这就回到了nacosClient端获取配置中的流程了



我们接下来分析,在NacosServer端修改了配置,点击发布配置,NacosClient怎么就能接收到是哪一个dataId修改了嘞

发布配置官方接口文档

这里实际上是调用的NacosServer的/nacos/v2/cs/config接口,处理该请求的是ConfigController.publishConfig()方法

在这一次请求中其实就是做了两件事:将更新写入数据库中,然后发布一个事件,将事件添加进队列中,此时请求就结束了。

在controller方法中有两行核心的方法

// 进入service层,核心方法
// 持久化配置信息到数据库
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);

// 触发ConfigDataChangeEvent事件,这是客户端能感知配置更新的根本原因
ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));




持久化配置信息到数据库就没必须继续看下去了,我们接下来看看notifyConfigChange()方法的实现:

  • 该方法就是单纯的一层一层方法调用
public class ConfigChangePublisher {

    public static void notifyConfigChange(ConfigDataChangeEvent event) {
        if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {
            return;
        }
        // 该方法继续调用
        NotifyCenter.publishEvent(event);
    }
}

public static boolean publishEvent(final Event event) {
    try {
        // 该方法继续调用
        return publishEvent(event.getClass(), event);
    } catch (Throwable ex) {
        LOGGER.error("There was an exception to the message publishing : ", ex);
        return false;
    }
}

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher.publish(event);
    }

    final String topic = ClassUtils.getCanonicalName(eventType);

    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        // 该方法继续调用
        return publisher.publish(event);
    }
    return false;
}



这里就会进入到DefaultPublisher类的publish(event)方法中。该类非常重要,Nacos很多功能都用的这统一的一套事件发布与订阅。

public boolean publish(Event event) {
    checkIsStart();
    // 如果队列中写满了,那么就返回false,下面就直接处理了
    // 该类的run()方法中会死循环从队列中取任务执行
    boolean success = this.queue.offer(event);
    if (!success) {
        LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
        // 处理事件
        receiveEvent(event);
        return true;
    }
    return true;
}




此时,本次http请求就已经结束了,这里将事件放入队列中后就会有其他的订阅者来异步处理事件。

这样的设计也实现了发布任务与处理任务之间的解耦

此时队列中有了任务,在NacosServer中任务订阅者此时还需要做两件事:

  • 通知集群其他Nacos节点进行更新
  • 通知NacosClient端配置发生了更改
public void notifySubscriber(final Subscriber subscriber, final Event event) {
    
    LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);

    // 订阅者需要去处理事件
    // 主要做两件事 通知集群其他Nacos节点进行更新、通知NacosClient端配置发生了更改
    final Runnable job = () -> subscriber.onEvent(event);
    final Executor executor = subscriber.executor();
    
    if (executor != null) {
        executor.execute(job);
    } else {
        try {
            job.run();
        } catch (Throwable e) {
            LOGGER.error("Event callback exception: ", e);
        }
    }
}




这里会进入到AsyncNotifyService的构造方法中:

  • 遍历集群环境下的所有节点
  • 创建任务添加进http/grpc的队列中
  • 从http/grpc的队列中取任务执行
public AsyncNotifyService(ServerMemberManager memberManager) {
        ...
        
        // Register A Subscriber to subscribe ConfigDataChangeEvent.
        NotifyCenter.registerSubscriber(new Subscriber() {
            
            @Override
            public void onEvent(Event event) {
                // Generate ConfigDataChangeEvent concurrently
                if (event instanceof ConfigDataChangeEvent) {
                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                    long dumpTs = evt.lastModifiedTs;
                    String dataId = evt.dataId;
                    String group = evt.group;
                    String tenant = evt.tenant;
                    String tag = evt.tag;
                    // 获取nacos集群下的各个节点
                    Collection<Member> ipList = memberManager.allMembers();
                    
                    // In fact, any type of queue here can be
                    Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
                    Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
                    
                    for (Member member : ipList) {
                        // 使用http/rpc的方式通知各节点,具体的dataId被修改了
                        // 这里先添加进队列,下面的if中处理
                        if (!MemberUtil.isSupportedLongCon(member)) {
                            httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                                    evt.isBeta));
                        } else {
                            rpcQueue.add(
                                    new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                        }
                    }
                    // 处理队列中的任务
                    if (!httpQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
                    }
                    if (!rpcQueue.isEmpty()) {
                        // 直接看AsyncRpcTask类中的run()方法
                        ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                    }
                    
                }
            }
            
            ......
        });
    }




我们这里是nacos2.X的版本,所以我这里就自己看AsyncRpcTask类的run()方法:

  • 调用dump()方法
  • 发送请求,同步其他节点数据变化
public void run() {
    while (!queue.isEmpty()) {
        NotifySingleRpcTask task = queue.poll();

        ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
        syncRequest.setDataId(task.getDataId());
        syncRequest.setGroup(task.getGroup());
        syncRequest.setBeta(task.isBeta);
        syncRequest.setLastModified(task.getLastModified());
        syncRequest.setTag(task.tag);
        syncRequest.setTenant(task.getTenant());
        Member member = task.member;
        // 判断member是不是当前节点
        if (memberManager.getSelf().equals(member)) {
            // 如果是当前节点就直接调用dump()方法
            // 这里又会经过服务注册那边当服务更改后为订阅者进行推送的流程中,这里最终是会到DumpProcessor.process()方法
            if (syncRequest.isBeta()) {
                dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                 syncRequest.getLastModified(), NetUtils.localIP(), true);
            } else {
                dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                 syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
            }
            continue;
        }

        // 其他节点
        if (memberManager.hasMember(member.getAddress())) {
            boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
            if (unHealthNeedDelay) {
                ...
            } else {

                if (!MemberUtil.isSupportedLongCon(member)) {
                    asyncTaskExecute(
                        new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
                                             task.getLastModified(), member.getAddress(), task.isBeta));
                } else {
                    try {
                        // 为nacos集群中的其他节点进行同步配置变化
                        configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                    } catch (Exception e) {
                        MetricsMonitor.getConfigNotifyException().increment();
                        asyncTaskExecute(task);
                    }
                }

            }
        } else {
            //No nothig if  member has offline.
        }

    }
}



dump()这里又会经过服务注册那边当服务更改后为订阅者进行推送的流程中:先将task存入一个队列中 --> 去队列中的任务 --> 各自的任务处理类去处理。这里最终是会到DumpProcessor.process()方法:

  • 方法调用process() --> configDump() —>dump()
  • 将配置保存在磁盘文件中
  • 配置发生变化,更新md5
  • 发布LocalDataChangeEvent事件
    目的告诉NacosClient端,配置发生了改变。处理该事件的RpcConfigChangeNotifier.onEvent()
public boolean process(NacosTask task) {
    ...
        // 直接看这里最后的configDump()方法
        return DumpConfigHandler.configDump(build.build());
}



public static boolean configDump(ConfigDumpEvent event){
    ......
        if (StringUtils.isBlank(event.getTag())) {
            ......

                boolean result;
            if (!event.isRemove()) {
                // 核心方法,进入到这里
                // 写入磁盘
                result = ConfigCacheService
                    .dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);

                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                                                    ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                                                    content.length());
                }
            } else {
                ......
            }
            return result;
        } else {
            .......

        }

    public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
                               String type, String encryptedDataKey) {
        ......

        try {
            // 根据content配置内容生成一个md5。content中的内容有变化那么生成的md5也肯定是不一样的
            final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
            // 配置的最后一次更新时间
            if (lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey)) {
                DUMP_LOG.warn(...);
                return true;
            }
            if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) && DiskUtil.targetFile(dataId, group, tenant).exists()) {
                DUMP_LOG.warn(...);
            } else if (!PropertyUtil.isDirectRead()) {
                // 将配置保存在磁盘文件中
                DiskUtil.saveToDisk(dataId, group, tenant, content);
            }
            // 配置发生变化,更新md5
            // 继续跟入该方法
            updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);
            return true;
        } catch (IOException ioe) {
            ......
            return false;
        } finally {
            releaseWriteLock(groupKey);
        }
    }
    
    
    public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey) {
        // 根据groupKey,将md5数据保存在缓存中
        CacheItem cache = makeSure(groupKey, encryptedDataKey, false);
        if (cache.md5 == null || !cache.md5.equals(md5)) {
            cache.md5 = md5;
            cache.lastModifiedTs = lastModifiedTs;
            // 发布LocalDataChangeEvent事件,包含groupKey
            // 目的告诉NacosClient端,配置发生了改变。处理该事件的RpcConfigChangeNotifier.onEvent()
            NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
        }
    }



接下来看看RpcConfigChangeNotifier.onEvent()的方法逻辑:

  • 遍历各个客户端
  • 发送grpc请求
@Override
public void onEvent(LocalDataChangeEvent event) {
    ...
    configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);

}


public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
                              List<String> betaIps, String tag) {

    // 其实就代表着Client集合
    Set<String> listeners = configChangeListenContext.getListeners(groupKey);
    if (CollectionUtils.isEmpty(listeners)) {
        return;
    }
    int notifyClientCount = 0;
    // 遍历各个客户端
    for (final String client : listeners) {
        Connection connection = connectionManager.getConnection(client);
        if (connection == null) {
            continue;
        }

        ConnectionMeta metaInfo = connection.getMetaInfo();
        //一些检查校验
        String clientIp = metaInfo.getClientIp();
        String clientTag = metaInfo.getTag();
        if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
            continue;
        }
        if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
            continue;
        }

        // 封装一个请求对象ConfigChangeNotifyRequest
        ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);

        // 创建rpc推送任务,在RpcPushTask.run()方法中推送客户端
        RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());
        push(rpcPushRetryTask);
        notifyClientCount++;
    }
    
}



public void run() {
    tryTimes++;
    if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {
        push(this);
    } else {
        // 这里推送客户端,客户端再进行refresh操作
        rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
            @Override
            public void onSuccess() {
                tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);
            }

            @Override
            public void onFail(Throwable e) {
                tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);
                Loggers.REMOTE_PUSH.warn("Push fail", e);
                push(RpcPushTask.this);
            }

        }, ConfigExecutor.getClientConfigNotifierServiceExecutor());

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1911764.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++初探究(2)

引用 对于一个常量&#xff0c;想要将其进行引用&#xff0c;则使用普通的引用相当于权限扩大&#xff08;常量为只读&#xff0c;但此处的引用参数为可读可写&#xff09;&#xff0c;C编译器会报错. 例如&#xff1a; const int a 10;int& ra a;//权限放大&#xff0…

使用Mplayer实现MP3功能

核心功能 1. 界面设计 项目首先定义了一个clearscreen函数&#xff0c;用于清空屏幕&#xff0c;为用户界面的更新提供了便利。yemian函数负责显示主菜单界面&#xff0c;提供了包括查看播放列表、播放控制、播放模式选择等在内的9个选项。 2. 文件格式支持 is_supported_f…

详解TCP和UDP通信协议

目录 OSI的七层模型的主要功能 tcp是什么 TCP三次握手 为什么需要三次握手&#xff0c;两次握手不行吗 TCP四次挥手 挥手会什么需要四次 什么是TCP粘包问题&#xff1f;发生的原因 原因 解决方案 UDP是什么 TCP和UDP的区别 网络层常见协议 利用socket进行tcp传输代…

淮北在选择SCADA系统时,哪些因素会影响其稳定性?

关键字:LP-SCADA系统, 传感器可视化, 设备可视化, 独立SPC系统, 智能仪表系统,SPC可视化,独立SPC系统 在选择SCADA系统时&#xff0c;稳定性是一个关键因素&#xff0c;因为它直接影响到生产过程的连续性和安全性。以下是一些影响SCADA系统稳定性的因素&#xff1a; 硬件质量…

如何在 CentOS 上配置本地 YUM 源

引言 CentOS 作为一个流行的企业级 Linux 发行版&#xff0c;依赖 YUM&#xff08;Yellowdog Updater, Modified&#xff09;来管理软件包。YUM 源&#xff08;Repository&#xff09;是软件包存储和分发的中心&#xff0c;它们通常位于互联网上。然而&#xff0c;在某些情况下…

使用clion刷leetcode

如何优雅的使用clion刷leetcode 安装插件&#xff1a;LeetCode Editor) 插件配置&#xff1a; 这样我们每打开一个项目&#xff0c;就会创建类似的文件 我们的项目结构&#xff1a; 我们在题解文件中导入头文件myHeader.h并将新建的文件添加到cmakelists.txt文件&#xff0c;…

数据结构双向循环链表

主程序 #include "fun.h" int main(int argc, const char *argv[]) { double_p Hcreate_head(); insert_head(H,10); insert_head(H,20); insert_head(H,30); insert_head(H,40); insert_tail(H,50); show_link(H); del_tail(H); …

c++内存管理(上)

目录 引入 分析 说明 C语言中动态内存管理方式 C内存管理方式 new/delete操作内置类型 new和delete操作自定义类型 引入 我们先来看下面的一段代码和相关问题 int globalVar 1; static int staticGlobalVar 1; void Test() { static int staticVar 1; int localVar 1…

影视行业的人工智能与-【机器学习】:案例分析

欢迎关注小知&#xff1a;知孤云出岫 目录 引言AI和ML在影视行业的当前应用AI和ML对影视行业的未来影响案例研究&#xff1a;AI生成动画视频目标工具和库数据收集模型训练视频生成 结论参考文献 引言 人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09…

window.matchMedia

matchMedia() 返回一个新的 MediaQueryList 对象&#xff0c;表示指定的媒体查询字符串解析后的结果。 const width ref(); const myFunction (x) > {if (x.matches) {// 媒体查询document.body.style.backgroundColor "yellow";width.value "yellow&quo…

JavaScript 作用域 与 var、let、const关键字

目录 一、JavaScript 作用域 1、全局作用域 2、函数作用域 3、块级作用域 4、综合示例 5、总结 二、var、let、const 1、var 关键字 2、let 关键字 3、const 关键字 4、总结 5、使用场景 一、JavaScript 作用域 在JavaScript中&#xff0c;作用域是指程序中可访问…

网络编程:TCP

一、tcp编程 注意 1.数据本身有顺序 2.发送和接收次数不需要对应 3. 1. C/S 模式 》服务器/客户端模型 server:socket()-->bind()--->listen()-->accept()-->recv()-->close() client:socket()-->connect()-->send()-->close(); int on 1; setso…

如何学好C++?

首先&#xff0c;对于零基础的想学习C的同学&#xff0c;我想要你们先明白一件事&#xff1a;C是一门极为复杂且难以掌握的编程语言。因此推荐在学习C之前可以先去学习C语言&#xff0c;在拥有了一定的知识储备和编程能力后再学习C会更加的高效和相对轻松。 下面推荐从三个方面…

源码编译安装 LAMP

目录 2.1Apache 网站服务基础 2.1.1 Apache 简介 1.Apache 的起源 2.Apache 的主要特点 2.1.2 安装 httpd 服务器 1.准备工作 2.源码编译及安装 3.确认安装结果​编辑 4.优化执行路径 5.添加 httpd 系统服务 2.2 httpd 服务器的基本配置 2.2.1 Web 站点的部…

锅总反驳李彦宏说的“不要卷模型,要卷应用”

李彦宏的观点是大家不要卷模型&#xff0c;要卷应用&#xff0c;但我认为这种看法是荒谬的。以下是24条反驳李彦宏观点的论点和论据&#xff1a; 模型的准确性直接决定应用的质量和用户体验&#xff1a; 论据&#xff1a;在自然语言处理、计算机视觉等领域&#xff0c;模型的准…

安全求交集PSI

安全求交集定义 求交集的PSI&#xff1a;交集可以被两方看见或其中一方看见&#xff0c;非交集进行保护有两方的PSI半诚实的PSI&#xff1a;攻击者要严格遵守协议&#xff0c;在此基础上得到他人的秘密是做不到的 Two-Party Semi-Honest PSI 挑战一&#xff1a;隐藏非交集元素…

纯前端如何实现Gif暂停、倍速播放

前言 GIF 我相信大家都不会陌生&#xff0c;由于它被广泛的支持&#xff0c;所以我们一般用它来做一些简单的动画效果。一般就是设计师弄好了之后&#xff0c;把文件发给我们。然后我们就直接这样使用&#xff1a; <img src"xxx.gif"/>这样就能播放一个 GIF …

offer题目33:判断是否是二叉搜索树的后序遍历序列

题目描述&#xff1a;输入一个整数数组&#xff0c;判断该数组是不是某二叉搜索树的后序遍历结果。如果是则返回true,否则返回false。假设输入的数组的任意两个数字都互不相同。例如&#xff0c;输入数组{5,7,6,9,11,10,8},则返回true,&#xff0c;因为这个整数是下图二叉搜索树…

作业/数据结构/2024/7/8

链表的相关操作作业&#xff1a; 1】 按值修改 2】按值查找&#xff0c;返回当前节点的地址 &#xff08;先不考虑重复&#xff0c;如果有重复&#xff0c;返回第一个&#xff09; 3】 逆置(反转) 4】释放链表 main.c #include "head.h"int main(int argc, con…

6.Python学习:异常和日志

1.异常的抓取 1.1异常的概念 使用异常前&#xff1a; print(1/0)使用异常后&#xff1a;错误提示更加友好&#xff0c;不影响程序继续往下运行 try:print(10/0) except ZeroDivisionError:print("0不能作为分母")1.2异常的抓取 第一种&#xff1a;如果提前知道可…