POM文件添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
在依赖中查看自动装配文件spring.factories
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosConfigAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration
org.springframework.boot.diagnostics.FailureAnalyzer=\
com.alibaba.cloud.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer
org.springframework.boot.env.PropertySourceLoader=\
com.alibaba.cloud.nacos.parser.NacosJsonPropertySourceLoader,\
com.alibaba.cloud.nacos.parser.NacosXmlPropertySourceLoader
核心类
- NacosConfigBootstrapConfiguration
启动时拉取配置的原理。 - NacosConfigAutoConfiguration
更改配置时实现动态刷新的原理。
NacosConfigBootstrapConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {
@Bean
@ConditionalOnMissingBean
public NacosConfigProperties nacosConfigProperties() {
return new NacosConfigProperties();
}
@Bean
@ConditionalOnMissingBean
public NacosConfigManager nacosConfigManager(
NacosConfigProperties nacosConfigProperties) {
return new NacosConfigManager(nacosConfigProperties);
}
@Bean
public NacosPropertySourceLocator nacosPropertySourceLocator(
NacosConfigManager nacosConfigManager) {
return new NacosPropertySourceLocator(nacosConfigManager);
}
}
NacosPropertySourceLocator
用来实现Nacos配置文件加载。
NacosConfigManager
它的构造方法里面会创建ConfigService
,实现是由NacosFactory#createConfigService()
来完成的。具体就是执行反射创建实例。
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService)constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable var4) {
throw new NacosException(-400, var4);
}
}
NacosConfigService
实例构造方法中,会初始化agent,worker。agent对象是用来发http请求到nacos服务端的;ClientWorker
是用来长轮训nacos服务端来感知配置更新。
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty("encode");
if (StringUtils.isBlank(encodeTmp)) {
this.encode = "UTF-8";
} else {
this.encode = encodeTmp.trim();
}
this.initNamespace(properties);
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
ClientWorker
初始化了两个线程池,一个用来定时执行checkConfigInfo
方法,用来用来执行长轮询。checkConfigInfo
方法中会执行 LongPollingRunable
的 run 方法。
public ClientWorker(final HttpAgent agent, ConfigFilterChainManager configFilterChainManager, Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
this.init(properties);
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
ClientWorker.this.checkConfigInfo();
} catch (Throwable var2) {
ClientWorker.LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", var2);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
public void checkConfigInfo() {
int listenerSize = this.cacheMap.size();
int longingTaskCount = (int)Math.ceil((double)listenerSize / ParamUtil.getPerTaskConfigSize());
if ((double)longingTaskCount > this.currentLongingTaskCount) {
for(int i = (int)this.currentLongingTaskCount; i < longingTaskCount; ++i) {
this.executorService.execute(new ClientWorker.LongPollingRunnable(i));
}
this.currentLongingTaskCount = (double)longingTaskCount;
}
}
LongPollingRunnable
- 判断是否有更新,
/v1/cs/configs/listener
,ClientWorker#checkUpdateConfigStr
。 - 有更新则远程拉取配置信息,
/v1/cs/configs
,ClientWorker#getServerConfig
。 - 有更新就触发回调,
cacheData.checkListenerMd5();
public void run() {
List<CacheData> cacheDatas = new ArrayList();
ArrayList inInitializingCacheList = new ArrayList();
try {
Iterator var3 = ClientWorker.this.cacheMap.values().iterator();
while(var3.hasNext()) {
CacheData cacheData = (CacheData)var3.next();
if (cacheData.getTaskId() == this.taskId) {
cacheDatas.add(cacheData);
try {
ClientWorker.this.checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception var13) {
ClientWorker.LOGGER.error("get local config info error", var13);
}
}
}
List<String> changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
ClientWorker.LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
Iterator var16 = changedGroupKeys.iterator();
while(var16.hasNext()) {
String groupKey = (String)var16.next();
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String[] ct = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = (CacheData)ClientWorker.this.cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]});
} catch (NacosException var12) {
String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant);
ClientWorker.LOGGER.error(message, var12);
}
}
var16 = cacheDatas.iterator();
while(true) {
CacheData cacheDatax;
do {
if (!var16.hasNext()) {
inInitializingCacheList.clear();
ClientWorker.this.executorService.execute(this);
return;
}
cacheDatax = (CacheData)var16.next();
} while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant)));
cacheDatax.checkListenerMd5();
cacheDatax.setInitializing(false);
}
} catch (Throwable var14) {
ClientWorker.LOGGER.error("longPolling error : ", var14);
ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
cacheData.checkListenerMd5();
,会触发 listener.receiveConfigInfo(contentTmp);
listener方法定义在NacosContextRefresher#registerNacosListener
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
refreshCountIncrement();
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// todo feature: support single refresh for listening
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
NacosPropertySourceLocator
PropertySourceBootstrapConfiguration
实现了ApplicationContextInitializer<ConfigurableApplicationContext>
,所以在启动的时候会执行重写方法initialize
。遍历容器中的PropertySourceLocator
加载配置,在Nacos注册中心客户端中,该PropertySourceLocator
就是NacosPropertySourceLocator
。
public void initialize(ConfigurableApplicationContext applicationContext) {
List<PropertySource<?>> composite = new ArrayList<>();
AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
boolean empty = true;
ConfigurableEnvironment environment = applicationContext.getEnvironment();
for (PropertySourceLocator locator : this.propertySourceLocators) {
Collection<PropertySource<?>> source = locator.locateCollection(environment);
if (source == null || source.size() == 0) {
continue;
}
List<PropertySource<?>> sourceList = new ArrayList<>();
for (PropertySource<?> p : source) {
if (p instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerable = (EnumerablePropertySource<?>) p;
sourceList.add(new BootstrapPropertySource<>(enumerable));
}
else {
sourceList.add(new SimpleBootstrapPropertySource(p));
}
}
logger.info("Located property source: " + sourceList);
composite.addAll(sourceList);
empty = false;
}
if (!empty) {
MutablePropertySources propertySources = environment.getPropertySources();
String logConfig = environment.resolvePlaceholders("${logging.config:}");
LogFile logFile = LogFile.get(environment);
for (PropertySource<?> p : environment.getPropertySources()) {
if (p.getName().startsWith(BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
propertySources.remove(p.getName());
}
}
insertPropertySources(propertySources, composite);
reinitializeLoggingSystem(environment, logConfig, logFile);
setLogLevels(applicationContext, environment);
handleIncludedProfiles(environment);
}
}
加载Nacos配置中心的配置,NacosPropertySourceLocator#locate
。依次加载shared配置,ext配置,application配置。
@Override
public PropertySource<?> locate(Environment env) {
nacosConfigProperties.setEnvironment(env);
ConfigService configService = nacosConfigManager.getConfigService();
if (null == configService) {
log.warn("no instance of config service found, can't load config from nacos");
return null;
}
long timeout = nacosConfigProperties.getTimeout();
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties.getName();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);
loadSharedConfiguration(composite);
loadExtConfiguration(composite);
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
加载application配置,NacosPropertySourceLocator#loadApplicationConfiguration
。会先加载无后缀名的配置,再加载有后缀名的配置,有后缀名的配置的优先级更高。
private void loadApplicationConfiguration(
CompositePropertySource compositePropertySource, String dataIdPrefix,
NacosConfigProperties properties, Environment environment) {
String fileExtension = properties.getFileExtension();
String nacosGroup = properties.getGroup();
// load directly once by default
loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,
fileExtension, true);
// load with suffix, which have a higher priority than the default
loadNacosDataIfPresent(compositePropertySource,
dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
// Loaded with profile, which have a higher priority than the suffix
for (String profile : environment.getActiveProfiles()) {
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
fileExtension, true);
}
}
NacosPropertySourceLocator#loadNacosDataIfPresent
private void loadNacosDataIfPresent(final CompositePropertySource composite,
final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
if (null == dataId || dataId.trim().length() < 1) {
return;
}
if (null == group || group.trim().length() < 1) {
return;
}
NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,
fileExtension, isRefreshable);
this.addFirstPropertySource(composite, propertySource, false);
}
private NacosPropertySource loadNacosPropertySource(final String dataId,
final String group, String fileExtension, boolean isRefreshable) {
if (NacosContextRefresher.getRefreshCount() != 0) {
if (!isRefreshable) {
return NacosPropertySourceRepository.getNacosPropertySource(dataId,
group);
}
}
return nacosPropertySourceBuilder.build(dataId, group, fileExtension,
isRefreshable);
}
NacosPropertySourceBuilder#build
,会调用到configService.getConfig(dataId, group, timeout)
NacosPropertySource build(String dataId, String group, String fileExtension,
boolean isRefreshable) {
List<PropertySource<?>> propertySources = loadNacosData(dataId, group,
fileExtension);
NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources,
group, dataId, new Date(), isRefreshable);
NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);
return nacosPropertySource;
}
private List<PropertySource<?>> loadNacosData(String dataId, String group,
String fileExtension) {
String data = null;
try {
data = configService.getConfig(dataId, group, timeout);
if (StringUtils.isEmpty(data)) {
log.warn(
"Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]",
dataId, group);
return Collections.emptyList();
}
if (log.isDebugEnabled()) {
log.debug(String.format(
"Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId,
group, data));
}
return NacosDataParserHandler.getInstance().parseNacosData(dataId, data,
fileExtension);
}
catch (NacosException e) {
log.error("get data from Nacos error,dataId:{} ", dataId, e);
}
catch (Exception e) {
log.error("parse data from Nacos error,dataId:{},data:{}", dataId, data, e);
}
return Collections.emptyList();
}
NacosConfigService#getConfigInner
核心主要是:LocalConfigInfoProcessor.getFailover(this.agent.getName(), dataId, group, tenant);
,通过LocalConfigInfoProcessor
加载本地缓存,如果存在就把内容读取出来直接返回,就不走http请求了;this.worker.getServerConfig(dataId, group, tenant, timeoutMs);
,通过worker.getServerConfig
加载远程变量
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = this.null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
String content = LocalConfigInfoProcessor.getFailover(this.agent.getName(), dataId, group, tenant);
if (content != null) {
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});
cr.setContent(content);
this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
content = cr.getContent();
return content;
} else {
try {
String[] ct = this.worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(ct[0]);
this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
content = cr.getContent();
return content;
} catch (NacosException var9) {
if (403 == var9.getErrCode()) {
throw var9;
} else {
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", new Object[]{this.agent.getName(), dataId, group, tenant, var9.toString()});
LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});
content = LocalConfigInfoProcessor.getSnapshot(this.agent.getName(), dataId, group, tenant);
cr.setContent(content);
this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
content = cr.getContent();
return content;
}
}
}
}
总结
- 启动的时候,执行
NacosPropertySourceLocator.locate
来获取配置 - 启动的时候,会开启定时任务,发起长轮询请求监听配置,一旦配置变化,触发获取配置的请求,执行监听器