上文说了Nacos配置中心客户端的源码流程,这篇介绍下Nacos配置中心服务端的源码。
服务端的启动
先来看服务启动时干了啥?
init()方法上面有@PostConstruct,该方法会在ExternalDumpService实例化后执行。
com.alibaba.nacos.config.server.service.dump.ExternalDumpService#init
@PostConstruct
@Override
protected void init() throws Throwable {
dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
}
dumpOperate()主要干了两件事:
- dumpConfigInfo(),这个方法里面也是调用的DumpAllTask
- 提交DumpAllTask的定时任务
com.alibaba.nacos.config.server.service.dump.DumpService#dumpOperate
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
String dumpFileContext = "CONFIG_DUMP_TO_FILE";
TimerContext.start(dumpFileContext);
try {
LogUtil.DEFAULT_LOG.warn("DumpService start");
Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
... ...
try {
// 转存配置
// 执行一次DumpAllTask
dumpConfigInfo(dumpAllProcessor);
... ...
} catch (Exception e) {
LogUtil.FATAL_LOG
.error("Nacos Server did not start because dumpservice bean construction failure :\n" + e
.toString());
throw new NacosException(NacosException.SERVER_ERROR,
"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
e);
}
if (!EnvUtil.getStandaloneMode()) {
Runnable heartbeat = () -> {
String heartBeatTime = TimeUtils.getCurrentTime().toString();
// write disk
try {
DiskUtil.saveHeartBeatToDisk(heartBeatTime);
} catch (IOException e) {
LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
}
};
ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);
// 6个小时执行一次DumpAllTask
ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
ConfigExecutor
.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
ConfigExecutor
.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
}
ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
} finally {
TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
}
}
dumpConfigInfo()里面还是执行了DumpAllTask。
com.alibaba.nacos.config.server.service.dump.DumpService#dumpConfigInfo
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
int timeStep = 6;
Boolean isAllDump = true;
// initial dump all
FileInputStream fis = null;
Timestamp heartheatLastStamp = null;
try {
... ...
if (isAllDump) {
LogUtil.DEFAULT_LOG.info("start clear all config-info.");
DiskUtil.clearAll();
// 执行DumpAllTask
dumpAllProcessor.process(new DumpAllTask());
} else {
... ...
}
} catch (IOException e) {
LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());
throw e;
} finally {
if (null != fis) {
try {
fis.close();
} catch (IOException e) {
LogUtil.DEFAULT_LOG.warn("close file failed");
}
}
}
}
process()会分页查询出数据库的所有配置,然后一个一个调用ConfigCacheService.dump()。
com.alibaba.nacos.config.server.service.dump.processor.DumpAllProcessor#process
public boolean process(NacosTask task) {
long currentMaxId = persistService.findConfigMaxId();
long lastMaxId = 0;
while (lastMaxId < currentMaxId) {
// 分页查询出数据库的所有配置
Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
for (ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = id > lastMaxId ? id : lastMaxId;
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(cf.getContent());
}
if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(cf.getContent());
}
// dump为文件
boolean result = ConfigCacheService
.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),
cf.getType());
final String content = cf.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
md5);
}
DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
} else {
lastMaxId += PAGE_SIZE;
}
}
return true;
}
dump()就是将数据库的配置,保存到本地,一个配置对应一个文件,这样客户端来查询配置,直接查的本地文件,而不是查数据库。
com.alibaba.nacos.config.server.service.ConfigCacheService#dump
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
String type) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
CacheItem ci = makeSure(groupKey);
ci.setType(type);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
lastModifiedTs);
} else if (!PropertyUtil.isDirectRead()) {
// 写入磁盘
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
// 更新md5,发布LocalDataChangeEvent事件
updateMd5(groupKey, md5, lastModifiedTs);
return true;
} catch (IOException ioe) {
DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
.contains(DISK_QUATA_EN)) {
// Protect from disk full.
FATAL_LOG.error("磁盘满自杀退出", ioe);
System.exit(0);
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
}
服务启动过程中主要就是将数据库的配置全部保存到本地。
客户端来查询配置
客户端启动时会调用/v1/cs/configs
来查询配置。
com.alibaba.nacos.config.server.controller.ConfigController#getConfig
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag)
throws IOException, ServletException, NacosException {
// 读取配置的入口
// check tenant
ParamUtils.checkTenant(tenant);
tenant = NamespaceUtil.processNamespaceParameter(tenant);
// check params
ParamUtils.checkParam(dataId, group, "datumId", "content");
ParamUtils.checkParam(tag);
final String clientIp = RequestUtil.getRemoteIp(request);
inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
doGetConfig()直接找到文件,使用jdk的零拷贝传输直接将文件输入流转response输出流中。
com.alibaba.nacos.config.server.controller.ConfigServletInner#doGetConfig
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
String tenant, String tag, String clientIp) throws IOException, ServletException {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
String autoTag = request.getHeader("Vipserver-Tag");
String requestIpApp = RequestUtil.getAppName(request);
int lockResult = tryConfigReadLock(groupKey);
final String requestIp = RequestUtil.getRemoteIp(request);
boolean isBeta = false;
if (lockResult > 0) {
// LockResult > 0 means cacheItem is not null and other thread can`t delete this cacheItem
FileInputStream fis = null;
try {
String md5 = Constants.NULL;
long lastModified = 0L;
CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
if (cacheItem.isBeta() && cacheItem.getIps4Beta().contains(clientIp)) {
isBeta = true;
}
final String configType =
(null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();
response.setHeader("Config-Type", configType);
FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType);
String contentTypeHeader = fileTypeEnum.getContentType();
response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);
File file = null;
ConfigInfoBase configInfoBase = null;
PrintWriter out = null;
if (isBeta) {
md5 = cacheItem.getMd54Beta();
lastModified = cacheItem.getLastModifiedTs4Beta();
if (PropertyUtil.isDirectRead()) {
configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);
} else {
file = DiskUtil.targetBetaFile(dataId, group, tenant);
}
response.setHeader("isBeta", "true");
} else {
if (StringUtils.isBlank(tag)) {
if (isUseTag(cacheItem, autoTag)) {
... ...
} else {
md5 = cacheItem.getMd5();
lastModified = cacheItem.getLastModifiedTs();
if (PropertyUtil.isDirectRead()) {
// 单节点模式,直接读取数据库
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
// 集群模式,读取磁盘文件
file = DiskUtil.targetFile(dataId, group, tenant);
}
... ...
}
} else {
... ...
}
}
response.setHeader(Constants.CONTENT_MD5, md5);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
if (PropertyUtil.isDirectRead()) {
response.setDateHeader("Last-Modified", lastModified);
} else {
fis = new FileInputStream(file);
response.setDateHeader("Last-Modified", file.lastModified());
}
if (PropertyUtil.isDirectRead()) {
out = response.getWriter();
out.print(configInfoBase.getContent());
out.flush();
out.close();
} else {
// 零拷贝
fis.getChannel()
.transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));
}
LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());
final long delayed = System.currentTimeMillis() - lastModified;
// TODO distinguish pull-get && push-get
/*
Otherwise, delayed cannot be used as the basis of push delay directly,
because the delayed value of active get requests is very large.
*/
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);
} finally {
releaseConfigReadLock(groupKey);
IoUtils.closeQuietly(fis);
}
} else if (lockResult == 0) {
... ...
} else {
... ...
}
return HttpServletResponse.SC_OK + "";
}
客户端长轮询监听配置
客户端启动成功后,会调用Http接口/v1/cs/configs/listener
长轮询来监听配置的变更。
com.alibaba.nacos.config.server.controller.ConfigController#listener
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// 监听配置更新的入口
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// 长轮询
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
doPollingConfig()会判断是否支持长轮询,依据是header是否包含Long-Pulling-Timeout
属性。
com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
// Long polling.
if (LongPollingService.isSupportLongPolling(request)) {
// 支持长轮询
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
... ...
}
addLongPollingClient()会将客户端保存起来,方便后面有配置变更时找到客户端并进行响应。
com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// Do nothing but set fix polling timeout.
} else {
long start = System.currentTimeMillis();
// 校验md5
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
// 如果有变更立马返回
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
// 如果是初始化请求,直接返回,不挂起
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// Must be called by http thread, or send response.
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L);
// 如果md5是一样的,异步执行ClientLongPolling
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
ClientLongPolling()会启动一个延时30执行的任务,如果30s内配置没有变更,任务就会执行,对客户端进行响应,如果30s内配置发生了变更,此任务就会被取消。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#run
public void run() {
// 延时30s执行
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
// Delete subsciber's relations.
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
... ...
} else {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
// 超时直接返回
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
// 将客户端端缓存至队列中
allSubs.add(this);
}
sendResponse()对客户端进行响应,如果配置有变更,就会取消上面创建的任务。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#sendResponse
void sendResponse(List<String> changedGroups) {
// Cancel time out task.
if (null != asyncTimeoutFuture) {
// 取消任务
asyncTimeoutFuture.cancel(false);
}
generateResponse(changedGroups);
}
generateResponse()会将变更配置的dataId和group新信息返回给客户端,并不会返回具体的配置内容,内容会由客户端来查询。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#generateResponse
void generateResponse(List<String> changedGroups) {
if (null == changedGroups) {
// Tell web container to send http response.
asyncContext.complete();
return;
}
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
try {
// 封装更新的配置,返回客户端
final String respString = MD5Util.compareMd5ResultString(changedGroups);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(respString);
asyncContext.complete();
} catch (Exception ex) {
PULL_LOG.error(ex.toString(), ex);
asyncContext.complete();
}
}
配置变更通知客户端
当在Nacos管理后台修改了配置后,会调用/v1/cs/configs
来更新配置。
publishConfig()会将配置保存到数据库中,并发布ConfigDataChangeEvent事件。
com.alibaba.nacos.config.server.controller.ConfigController#publishConfig
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
// 修改配置入口
final String srcIp = RequestUtil.getRemoteIp(request);
final String requestIpApp = RequestUtil.getAppName(request);
srcUser = RequestUtil.getSrcUserName(request);
//check type
if (!ConfigType.isValidType(type)) {
type = ConfigType.getDefaultType().getType();
}
// check tenant
ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(dataId)) {
LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
dataId, group);
throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
configInfo.setType(type);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
// 插入数据库
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
// 发布ConfigDataChangeEvent事件
/**
* AsyncNotifyService监听了ConfigDataChangeEvent事件
* @see AsyncNotifyService#AsyncNotifyService(com.alibaba.nacos.core.cluster.ServerMemberManager)
*/
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else {
// beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService
.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
AsyncNotifyService监听了ConfigDataChangeEvent事件,然后提交了AsyncTask任务来对Nacos集群中的节点进行通知配置的变化。
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
// 监听ConfigDataChangeEvent事件
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be
// 遍历集群中的所有节点,封装NotifySingleTask
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (Member member : ipList) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
}
// 提交AsyncTask任务,AsyncTask中包含了NotifySingleTask
/**
* @see AsyncTask#run()
*/
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
AsyncTask.run()会调用Nacos集群中的所有节点(包含自己)的Http接口/v1/cs/communication/dataChange来通知配置的变化。
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncTask#run
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
// 遍历所有的NotifySingleTask任务
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (memberManager.hasMember(targetIp)) {
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
if (unHealthNeedDelay) {
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, task.target);
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
AuthHeaderUtil.addIdentityToHeader(header);
// 调用/v1/cs/communication/dataChange接口
/**
* @see CommunicationController#notifyConfigInfo(javax.servlet.http.HttpServletRequest, java.lang.String, java.lang.String, java.lang.String, java.lang.String)
*/
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
}
}
}
}
notifyConfigInfo()主要负责将变化的配置从数据库中查询出来,然后更新本地的文件。
com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
// 通知配置数据变更的入口
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
// 转存数据
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
dump()操作又提交了一个DumpTask任务。
com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, java.lang.String)
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {
dump(dataId, group, tenant, tag, lastModified, handleIp, false);
}
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
// 添加DumpTask任务
/**
* @see DumpProcessor#process(com.alibaba.nacos.common.task.NacosTask)
*/
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
process()会将变化的配置从数据库中查询出来,交于DumpConfigHandler.configDump()处理配置。
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
public boolean process(NacosTask task) {
// 处理DumpTask
final PersistService persistService = dumpService.getPersistService();
DumpTask dumpTask = (DumpTask) task;
String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
String dataId = pair[0];
String group = pair[1];
String tenant = pair[2];
long lastModified = dumpTask.getLastModified();
String handleIp = dumpTask.getHandleIp();
boolean isBeta = dumpTask.isBeta();
String tag = dumpTask.getTag();
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
if (isBeta) {
。。。。。。
} else {
if (StringUtils.isBlank(tag)) {
// 从数据库查询配置数据
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.type(Objects.isNull(cf) ? null : cf.getType());
// 转存配置数据
return DumpConfigHandler.configDump(build.build());
} else {
。。。。。。
}
}
}
configDump()又调用了ConfigCacheService.dump(),这个方法在服务端启动时保存所有的配置文件时也使用了。
com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump
public static boolean configDump(ConfigDumpEvent event) {
final String dataId = event.getDataId();
final String group = event.getGroup();
final String namespaceId = event.getNamespaceId();
final String content = event.getContent();
final String type = event.getType();
final long lastModified = event.getLastModifiedTs();
if (event.isBeta()) {
。。。。。。
}
if (StringUtils.isBlank(event.getTag())) {
if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(content);
}
if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(content);
}
if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(content);
}
boolean result;
if (!event.isRemove()) {
// dump数据
result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
content.length());
}
} else {
。。。。。。
}
return result;
} else {
。。。。。。
}
}
dump()会将新的配置写入磁盘文件,更新md5,然后发布LocalDataChangeEvent事件。
com.alibaba.nacos.config.server.service.ConfigCacheService#dump
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
String type) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
CacheItem ci = makeSure(groupKey);
ci.setType(type);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
lastModifiedTs);
} else if (!PropertyUtil.isDirectRead()) {
// 写入磁盘
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
// 更新md5,发布LocalDataChangeEvent事件
updateMd5(groupKey, md5, lastModifiedTs);
return true;
} catch (IOException ioe) {
DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
.contains(DISK_QUATA_EN)) {
// Protect from disk full.
FATAL_LOG.error("磁盘满自杀退出", ioe);
System.exit(0);
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
}
updateMd5()会更新md5,然后发布LocalDataChangeEvent事件。
com.alibaba.nacos.config.server.service.ConfigCacheService#updateMd5
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
// 发布LocalDataChangeEvent事件
/**
* LongPollingService监听了LocalDataChangeEvent事件
* @see LongPollingService#LongPollingService()
*/
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
}
}
LongPollingService会监听LocalDataChangeEvent事件,然后提交DataChangeTask。
com.alibaba.nacos.config.server.service.LongPollingService#LongPollingService
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
// Register LocalDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe LocalDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// Ignore.
} else {
// 监听LocalDataChangeEvent事件
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
// 提交DataChangeTask任务
/**
* @see DataChangeTask#run()
*/
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return LocalDataChangeEvent.class;
}
});
}
DataChangeTask会找到监听这个配置的客户端,然后进行通知。
com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run
public void run() {
try {
ConfigCacheService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
// 找到监听这个配置的客户端
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// If published tag is not in the beta list, then it skipped.
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue;
}
// If published tag is not in the tag list, then it skipped.
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
RequestUtil
.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
// 通知客户端配置更新了
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
}
}