上文我们说了服务启动的时候从远程Nacos服务端拉取配置,这节我们来说下Nacos服务端配置的变动怎么实时通知到客户端,首先需要注册监听器。
注册监听器
NacosContextRefresher类会监听应用启动发布的ApplicationReadyEvent事件,然后进行配置监听器的注册。
com.alibaba.cloud.nacos.refresh.NacosContextRefresher#onApplicationEvent
public void onApplicationEvent(ApplicationReadyEvent event) {
// many Spring context
if (this.ready.compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}
registerNacosListenersForApplications()方法里会进行判断,如果自动刷新机制是开启的,则进行监听器注册。上文我们说到了会将拉到的配置缓存到NacosPropertySourceRepository中, 这儿就从缓存中获取所有的配置,然后循环进行监听器注册(如果配置文件中配置refresh字段为 false,则不注册监听器)。
com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListenersForApplications
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) {
for (NacosPropertySource propertySource : NacosPropertySourceRepository
.getAll()) {
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
我们可以看到,监听器是以dataId+groupId+namespace为维度进行注册的,后续配置更新时会回调此监听器。
监听器的逻辑主要就三步:
- REFRESH_COUNT++,在之前的loadNacosPropertySource()方法有用到
- 往NacosRefreshHistory#records中添加一条刷新记录
- 发布一个RefreshEvent事件,该事件是SpringCloud提供的,主要就是用来做环境变更刷新用的
com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
refreshCountIncrement();
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// 发布RefreshEvent事件
// todo feature: support single refresh for listening
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
监听器的注册操作又委托到了ConfigService。
com.alibaba.nacos.client.config.NacosConfigService#addListener
public void addListener(String dataId, String group, Listener listener) throws NacosException {
worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
监听器的注册在ClientWorker中处理,这块会创建一个CacheData对象,该对象主要就是用来管理监听器的,也是非常重要的一个类。
com.alibaba.nacos.client.config.impl.ClientWorker#addTenantListeners
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
throws NacosException {
group = blank2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
cache.addListener(listener);
}
}
CacheData中中药字段如下:
// 可对配置进行拦截处理,可用于配置加密解密
private final ConfigFilterChainManager configFilterChainManager;
public final String dataId;
public final String group;
public final String tenant;
// 关注此配置的监听器
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
// 用于比较此配置是否变更
private volatile String md5;
/**
* whether use local config.
*/
private volatile boolean isUseLocalConfig = false;
/**
* last modify time.
*/
private volatile long localConfigLastModified;
// 配置的内容
private volatile String content;
addCacheDataIfAbsent()方法中会将刚才创建的CacheData缓存到ClientWorker中的一个Map中,后续会用到。
com.alibaba.nacos.client.config.impl.ClientWorker#addCacheDataIfAbsent
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
String key = GroupKey.getKeyTenant(dataId, group, tenant);
CacheData cacheData = cacheMap.get(key);
if (cacheData != null) {
return cacheData;
}
cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
// multiple listeners on the same dataid+group and race condition
CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
if (lastCacheData == null) {
//fix issue # 1317
if (enableRemoteSyncConfig) {
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
cacheData.setContent(response.getContent());
}
int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
cacheData.setTaskId(taskId);
lastCacheData = cacheData;
}
// reset so that server not hang this check
lastCacheData.setInitializing(true);
LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());
return lastCacheData;
}
至此,在服务启动后向每一个需要支持热更新的配置都注册了一个监听器,用来监听远程配置的变动,以及做相应的处理。
获取更新的配置
ClientWorker是在ConfigService的构造方法中创建的。
ClientWorker的构造函数里会去创建两个线程池,executor会每隔10ms进行一次配置变更的检查,executorService主要是用来处理长轮询请求的。
com.alibaba.nacos.client.config.impl.ClientWorker#ClientWorker
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
// 检查配置信息
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
checkConfigInfo()负责提交长轮询任务。
com.alibaba.nacos.client.config.impl.ClientWorker#checkConfigInfo
public void checkConfigInfo() {
// Dispatch tasks.
int listenerSize = cacheMap.size();
// Round up the longingTaskCount.
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// The task list is no order.So it maybe has issues when changing.
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
长轮询任务的执行过程。
com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable#run
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config
// 校验配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// 根据dataId从服务端查询最新的配置
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(response.getContent());
cache.setEncryptedDataKey(response.getEncryptedDataKey());
if (null != response.getConfigType()) {
cache.setType(response.getConfigType());
}
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(response.getContent()), response.getConfigType());
} catch (NacosException ioe) {
String message = String
.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// 校验md5是否变化,有变化就发通知
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
checkUpdateDataIds()该方法中,会将所有的dataId按定义格式拼接出一个字符串,构造一个长轮询请求,发给服务端,Long-Pulling-Timeout 超时时间默认30s,如果服务端没有配置变更,则会保持该请求直到超时,有配置变更则直接返回有变更的dataId列表。
com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateDataIds
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// It updates when cacheData occurs in cacheMap by first time.
// 添加要初始化的cacheData
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
// 检验服务器端的配置
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
checkUpdateConfigStr()会发起HTTP接口/v1/cs/configs/listener
的调用。
com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
Map<String, String> params = new HashMap<String, String>(2);
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap<String, String>(2);
// 使用长轮询
headers.put("Long-Pulling-Timeout", "" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put("Long-Pulling-Timeout-No-Hangup", "true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
// 调用远程的监听接口
HttpRestResult<String> result = agent
.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
readTimeoutMs);
if (result.ok()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData());
} else {
setHealthServer(false);
LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
result.getCode());
}
} catch (Exception e) {
setHealthServer(false);
LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
throw e;
}
return Collections.emptyList();
}
checkListenerMd5()主要就是判断两个md5是不是相同,不同则调用safeNotifyListener()处理。
com.alibaba.nacos.client.config.impl.CacheData#checkListenerMd5
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
// 配置有变化通知监听器
safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
}
}
}
safeNotifyListener()方法主要就是调用监听器的receiveConfigInfo()方法,然后更新监听器包装器中的lastContent、lastCallMd5字段。
com.alibaba.nacos.client.config.impl.CacheData#safeNotifyListener
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
@Override
public void run() {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
/**
* @see com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener(java.lang.String, java.lang.String)
*/
listener.receiveConfigInfo(contentTmp);
// compare lastContent and content
if (listener instanceof AbstractConfigChangeListener) {
Map data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
listenerWrap.lastContent = content;
}
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
listener);
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
group, md5, listener, t.getCause());
} finally {
Thread.currentThread().setContextClassLoader(myClassLoader);
}
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
group, md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
监听器要执行的方法我们上面也已经讲过了,主要就是发布RefreshEvent事件。至此,Nacos的处理流程已经结束了,RefreshEvent事件主要由 SpringCloud相关类来处理。
RefreshEvent事件处理
RefreshEvent事件会由RefreshEventListener来处理。
org.springframework.cloud.endpoint.event.RefreshEventListener#onApplicationEvent
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
handle((ApplicationReadyEvent) event);
}
else if (event instanceof RefreshEvent) {
handle((RefreshEvent) event);
}
}
委托给ContextRefresher来刷新容器中的配置。
org.springframework.cloud.endpoint.event.RefreshEventListener#handle(org.springframework.cloud.endpoint.event.RefreshEvent)
public void handle(RefreshEvent event) {
if (this.ready.get()) { // don't handle events before app is ready
log.debug("Event received " + event.getEventDesc());
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}
org.springframework.cloud.context.refresh.ContextRefresher#refresh
public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll();
return keys;
}
refreshEnvironment()会去刷新Spring环境变量,实际上是交给addConfigFilesToEnvironment()方法去做的刷新,具体刷新思想就是重新创建一个新的Spring容器,然后将这个新容器中的环境信息设置到原有的Spring环境中。拿到所有变化的配置项后,发布一个环境变化的 EnvironmentChangeEvent事件。
org.springframework.cloud.context.refresh.ContextRefresher#refreshEnvironment
public synchronized Set<String> refreshEnvironment() {
Map<String, Object> before = extract(
this.context.getEnvironment().getPropertySources());
addConfigFilesToEnvironment();
Set<String> keys = changes(before,
extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
return keys;
}
/* For testing. */ ConfigurableApplicationContext addConfigFilesToEnvironment() {
ConfigurableApplicationContext capture = null;
try {
StandardEnvironment environment = copyEnvironment(
this.context.getEnvironment());
SpringApplicationBuilder builder = new SpringApplicationBuilder(Empty.class)
.bannerMode(Mode.OFF).web(WebApplicationType.NONE)
.environment(environment);
// Just the listeners that affect the environment (e.g. excluding logging
// listener because it has side effects)
builder.application()
.setListeners(Arrays.asList(new BootstrapApplicationListener(),
new ConfigFileApplicationListener()));
capture = builder.run();
if (environment.getPropertySources().contains(REFRESH_ARGS_PROPERTY_SOURCE)) {
environment.getPropertySources().remove(REFRESH_ARGS_PROPERTY_SOURCE);
}
MutablePropertySources target = this.context.getEnvironment()
.getPropertySources();
String targetName = null;
for (PropertySource<?> source : environment.getPropertySources()) {
String name = source.getName();
if (target.contains(name)) {
targetName = name;
}
if (!this.standardSources.contains(name)) {
if (target.contains(name)) {
target.replace(name, source);
}
else {
if (targetName != null) {
target.addAfter(targetName, source);
// update targetName to preserve ordering
targetName = name;
}
else {
// targetName was null so we are at the start of the list
target.addFirst(source);
targetName = name;
}
}
}
}
}
finally {
ConfigurableApplicationContext closeable = capture;
while (closeable != null) {
try {
closeable.close();
}
catch (Exception e) {
// Ignore;
}
if (closeable.getParent() instanceof ConfigurableApplicationContext) {
closeable = (ConfigurableApplicationContext) closeable.getParent();
}
else {
break;
}
}
}
return capture;
}
org.springframework.cloud.context.scope.refresh.RefreshScope#refreshAll
public void refreshAll() {
super.destroy();
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
@Value注解的属性要实现热更新就需要配合@RefreshScope注解,被@RefreshScope注解的对象的作用域为RefreshScope,这种对象不是存在Spring容器的一级缓存中,而是存在GenericScope对象的cache属性中,当配置变更时会清空缓存在cache属性的对象,这样Bean下次使用时就会被重新创建,从而从Environment中获取最新的配置。
org.springframework.cloud.context.scope.GenericScope#destroy()
public void destroy() {
List<Throwable> errors = new ArrayList<Throwable>();
// 清空缓存
Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
for (BeanLifecycleWrapper wrapper : wrappers) {
try {
Lock lock = this.locks.get(wrapper.getName()).writeLock();
lock.lock();
try {
wrapper.destroy();
}
finally {
lock.unlock();
}
}
catch (RuntimeException e) {
errors.add(e);
}
}
if (!errors.isEmpty()) {
throw wrapIfNecessary(errors.get(0));
}
this.errors.clear();
}