【Nacos】Nacos配置中心服务端源码分析

news2025/1/15 16:54:34

在这里插入图片描述

上文说了Nacos配置中心客户端的源码流程,这篇介绍下Nacos配置中心服务端的源码。

服务端的启动

先来看服务启动时干了啥?

init()方法上面有@PostConstruct,该方法会在ExternalDumpService实例化后执行。
com.alibaba.nacos.config.server.service.dump.ExternalDumpService#init

@PostConstruct
@Override
protected void init() throws Throwable {
	dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
}

dumpOperate()主要干了两件事:

  1. dumpConfigInfo(),这个方法里面也是调用的DumpAllTask
  2. 提交DumpAllTask的定时任务
    com.alibaba.nacos.config.server.service.dump.DumpService#dumpOperate
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
						   DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
	String dumpFileContext = "CONFIG_DUMP_TO_FILE";
	TimerContext.start(dumpFileContext);
	try {
		LogUtil.DEFAULT_LOG.warn("DumpService start");

		Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
		... ...
		try {
			// 转存配置
			// 执行一次DumpAllTask
			dumpConfigInfo(dumpAllProcessor);
			... ...
		} catch (Exception e) {
			LogUtil.FATAL_LOG
				.error("Nacos Server did not start because dumpservice bean construction failure :\n" + e
					   .toString());
			throw new NacosException(NacosException.SERVER_ERROR,
									 "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
									 e);
		}
		if (!EnvUtil.getStandaloneMode()) {
			Runnable heartbeat = () -> {
				String heartBeatTime = TimeUtils.getCurrentTime().toString();
				// write disk
				try {
					DiskUtil.saveHeartBeatToDisk(heartBeatTime);
				} catch (IOException e) {
					LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
				}
			};

			ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);

			long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
			LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);

			// 6个小时执行一次DumpAllTask
			ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);

			ConfigExecutor
				.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);

			ConfigExecutor
				.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
		}

		ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
	} finally {
		TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
	}

}

dumpConfigInfo()里面还是执行了DumpAllTask。
com.alibaba.nacos.config.server.service.dump.DumpService#dumpConfigInfo

private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
	int timeStep = 6;
	Boolean isAllDump = true;
	// initial dump all
	FileInputStream fis = null;
	Timestamp heartheatLastStamp = null;
	try {
		... ...
		if (isAllDump) {
			LogUtil.DEFAULT_LOG.info("start clear all config-info.");
			DiskUtil.clearAll();
			// 执行DumpAllTask
			dumpAllProcessor.process(new DumpAllTask());
		} else {
			... ...
		}
	} catch (IOException e) {
		LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());
		throw e;
	} finally {
		if (null != fis) {
			try {
				fis.close();
			} catch (IOException e) {
				LogUtil.DEFAULT_LOG.warn("close file failed");
			}
		}
	}
}

process()会分页查询出数据库的所有配置,然后一个一个调用ConfigCacheService.dump()。
com.alibaba.nacos.config.server.service.dump.processor.DumpAllProcessor#process

public boolean process(NacosTask task) {
	long currentMaxId = persistService.findConfigMaxId();
	long lastMaxId = 0;
	while (lastMaxId < currentMaxId) {
		// 分页查询出数据库的所有配置
		Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
		if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
			for (ConfigInfoWrapper cf : page.getPageItems()) {
				long id = cf.getId();
				lastMaxId = id > lastMaxId ? id : lastMaxId;
				if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
					AggrWhitelist.load(cf.getContent());
				}

				if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
					ClientIpWhiteList.load(cf.getContent());
				}

				if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
					SwitchService.load(cf.getContent());
				}

				// dump为文件
				boolean result = ConfigCacheService
					.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),
						  cf.getType());

				final String content = cf.getContent();
				final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
				LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
									  GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
									  md5);
			}
			DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
		} else {
			lastMaxId += PAGE_SIZE;
		}
	}
	return true;
}

dump()就是将数据库的配置,保存到本地,一个配置对应一个文件,这样客户端来查询配置,直接查的本地文件,而不是查数据库。
com.alibaba.nacos.config.server.service.ConfigCacheService#dump

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
						   String type) {
	String groupKey = GroupKey2.getKey(dataId, group, tenant);
	CacheItem ci = makeSure(groupKey);
	ci.setType(type);
	final int lockResult = tryWriteLock(groupKey);
	assert (lockResult != 0);

	if (lockResult < 0) {
		DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
		return false;
	}

	try {
		final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);

		if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
			DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
						  + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
						  lastModifiedTs);
		} else if (!PropertyUtil.isDirectRead()) {
			// 写入磁盘
			DiskUtil.saveToDisk(dataId, group, tenant, content);
		}
		// 更新md5,发布LocalDataChangeEvent事件
		updateMd5(groupKey, md5, lastModifiedTs);
		return true;
	} catch (IOException ioe) {
		DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
		if (ioe.getMessage() != null) {
			String errMsg = ioe.getMessage();
			if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
				.contains(DISK_QUATA_EN)) {
				// Protect from disk full.
				FATAL_LOG.error("磁盘满自杀退出", ioe);
				System.exit(0);
			}
		}
		return false;
	} finally {
		releaseWriteLock(groupKey);
	}
}

服务启动过程中主要就是将数据库的配置全部保存到本地。

客户端来查询配置

客户端启动时会调用/v1/cs/configs来查询配置。

com.alibaba.nacos.config.server.controller.ConfigController#getConfig

@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
					  @RequestParam("dataId") String dataId, @RequestParam("group") String group,
					  @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
					  @RequestParam(value = "tag", required = false) String tag)
	throws IOException, ServletException, NacosException {
	// 读取配置的入口
	// check tenant
	ParamUtils.checkTenant(tenant);
	tenant = NamespaceUtil.processNamespaceParameter(tenant);
	// check params
	ParamUtils.checkParam(dataId, group, "datumId", "content");
	ParamUtils.checkParam(tag);

	final String clientIp = RequestUtil.getRemoteIp(request);
	inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}

doGetConfig()直接找到文件,使用jdk的零拷贝传输直接将文件输入流转response输出流中。
com.alibaba.nacos.config.server.controller.ConfigServletInner#doGetConfig

public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
						  String tenant, String tag, String clientIp) throws IOException, ServletException {
	final String groupKey = GroupKey2.getKey(dataId, group, tenant);
	String autoTag = request.getHeader("Vipserver-Tag");
	String requestIpApp = RequestUtil.getAppName(request);
	int lockResult = tryConfigReadLock(groupKey);

	final String requestIp = RequestUtil.getRemoteIp(request);
	boolean isBeta = false;
	if (lockResult > 0) {
		// LockResult > 0 means cacheItem is not null and other thread can`t delete this cacheItem
		FileInputStream fis = null;
		try {
			String md5 = Constants.NULL;
			long lastModified = 0L;
			CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
			if (cacheItem.isBeta() && cacheItem.getIps4Beta().contains(clientIp)) {
				isBeta = true;
			}

			final String configType =
				(null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();
			response.setHeader("Config-Type", configType);
			FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType);
			String contentTypeHeader = fileTypeEnum.getContentType();
			response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);

			File file = null;
			ConfigInfoBase configInfoBase = null;
			PrintWriter out = null;
			if (isBeta) {
				md5 = cacheItem.getMd54Beta();
				lastModified = cacheItem.getLastModifiedTs4Beta();
				if (PropertyUtil.isDirectRead()) {
					configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);
				} else {
					file = DiskUtil.targetBetaFile(dataId, group, tenant);
				}
				response.setHeader("isBeta", "true");
			} else {
				if (StringUtils.isBlank(tag)) {
					if (isUseTag(cacheItem, autoTag)) {
					... ...
					} else {
						md5 = cacheItem.getMd5();
						lastModified = cacheItem.getLastModifiedTs();
						if (PropertyUtil.isDirectRead()) {
							// 单节点模式,直接读取数据库
							configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
						} else {
							// 集群模式,读取磁盘文件
							file = DiskUtil.targetFile(dataId, group, tenant);
						}
						... ...
					}
				} else {
					... ...
				}
			}

			response.setHeader(Constants.CONTENT_MD5, md5);

			// Disable cache.
			response.setHeader("Pragma", "no-cache");
			response.setDateHeader("Expires", 0);
			response.setHeader("Cache-Control", "no-cache,no-store");
			if (PropertyUtil.isDirectRead()) {
				response.setDateHeader("Last-Modified", lastModified);
			} else {
				fis = new FileInputStream(file);
				response.setDateHeader("Last-Modified", file.lastModified());
			}

			if (PropertyUtil.isDirectRead()) {
				out = response.getWriter();
				out.print(configInfoBase.getContent());
				out.flush();
				out.close();
			} else {
				// 零拷贝
				fis.getChannel()
					.transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));
			}

			LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());

			final long delayed = System.currentTimeMillis() - lastModified;

			// TODO distinguish pull-get && push-get
			/*
                Otherwise, delayed cannot be used as the basis of push delay directly,
                because the delayed value of active get requests is very large.
                */
			ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
											ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);

		} finally {
			releaseConfigReadLock(groupKey);
			IoUtils.closeQuietly(fis);
		}
	} else if (lockResult == 0) {
... ...
	} else {
... ...
	}

	return HttpServletResponse.SC_OK + "";
}

客户端长轮询监听配置

客户端启动成功后,会调用Http接口/v1/cs/configs/listener长轮询来监听配置的变更。

com.alibaba.nacos.config.server.controller.ConfigController#listener

@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
	throws ServletException, IOException {
	// 监听配置更新的入口
	request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
	String probeModify = request.getParameter("Listening-Configs");
	if (StringUtils.isBlank(probeModify)) {
		throw new IllegalArgumentException("invalid probeModify");
	}

	probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

	Map<String, String> clientMd5Map;
	try {
		clientMd5Map = MD5Util.getClientMd5Map(probeModify);
	} catch (Throwable e) {
		throw new IllegalArgumentException("invalid probeModify");
	}

	// 长轮询
	// do long-polling
	inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

doPollingConfig()会判断是否支持长轮询,依据是header是否包含Long-Pulling-Timeout属性。
com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
							  Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {

	// Long polling.
	if (LongPollingService.isSupportLongPolling(request)) {
		// 支持长轮询
		longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
		return HttpServletResponse.SC_OK + "";
	}
... ...
}

addLongPollingClient()会将客户端保存起来,方便后面有配置变更时找到客户端并进行响应。
com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
								 int probeRequestSize) {

	String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
	String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
	String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
	String tag = req.getHeader("Vipserver-Tag");
	int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);

	// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
	long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
	if (isFixedPolling()) {
		timeout = Math.max(10000, getFixedPollingInterval());
		// Do nothing but set fix polling timeout.
	} else {
		long start = System.currentTimeMillis();
		// 校验md5
		List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
		if (changedGroups.size() > 0) {
			// 如果有变更立马返回
			generateResponse(req, rsp, changedGroups);
			LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
									RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
									changedGroups.size());
			return;
		} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
			// 如果是初始化请求,直接返回,不挂起
			LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
									RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
									changedGroups.size());
			return;
		}
	}
	String ip = RequestUtil.getRemoteIp(req);

	// Must be called by http thread, or send response.
	final AsyncContext asyncContext = req.startAsync();

	// AsyncContext.setTimeout() is incorrect, Control by oneself
	asyncContext.setTimeout(0L);

	// 如果md5是一样的,异步执行ClientLongPolling
	ConfigExecutor.executeLongPolling(
		new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

ClientLongPolling()会启动一个延时30执行的任务,如果30s内配置没有变更,任务就会执行,对客户端进行响应,如果30s内配置发生了变更,此任务就会被取消。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#run

public void run() {
	// 延时30s执行
	asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
		@Override
		public void run() {
			try {
				getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());

				// Delete subsciber's relations.
				allSubs.remove(ClientLongPolling.this);

				if (isFixedPolling()) {
					... ...
				} else {
					LogUtil.CLIENT_LOG
						.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
							  RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
							  "polling", clientMd5Map.size(), probeRequestSize);
					// 超时直接返回
					sendResponse(null);
				}
			} catch (Throwable t) {
				LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
			}

		}

	}, timeoutTime, TimeUnit.MILLISECONDS);

	// 将客户端端缓存至队列中
	allSubs.add(this);
}

sendResponse()对客户端进行响应,如果配置有变更,就会取消上面创建的任务。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#sendResponse

void sendResponse(List<String> changedGroups) {

	// Cancel time out task.
	if (null != asyncTimeoutFuture) {
		// 取消任务
		asyncTimeoutFuture.cancel(false);
	}
	generateResponse(changedGroups);
}

generateResponse()会将变更配置的dataId和group新信息返回给客户端,并不会返回具体的配置内容,内容会由客户端来查询。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#generateResponse

void generateResponse(List<String> changedGroups) {
	if (null == changedGroups) {

		// Tell web container to send http response.
		asyncContext.complete();
		return;
	}

	HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();

	try {
		// 封装更新的配置,返回客户端
		final String respString = MD5Util.compareMd5ResultString(changedGroups);

		// Disable cache.
		response.setHeader("Pragma", "no-cache");
		response.setDateHeader("Expires", 0);
		response.setHeader("Cache-Control", "no-cache,no-store");
		response.setStatus(HttpServletResponse.SC_OK);
		response.getWriter().println(respString);
		asyncContext.complete();
	} catch (Exception ex) {
		PULL_LOG.error(ex.toString(), ex);
		asyncContext.complete();
	}
}

配置变更通知客户端

当在Nacos管理后台修改了配置后,会调用/v1/cs/configs来更新配置。

publishConfig()会将配置保存到数据库中,并发布ConfigDataChangeEvent事件。
com.alibaba.nacos.config.server.controller.ConfigController#publishConfig

@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
							 @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
							 @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
							 @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
							 @RequestParam(value = "appName", required = false) String appName,
							 @RequestParam(value = "src_user", required = false) String srcUser,
							 @RequestParam(value = "config_tags", required = false) String configTags,
							 @RequestParam(value = "desc", required = false) String desc,
							 @RequestParam(value = "use", required = false) String use,
							 @RequestParam(value = "effect", required = false) String effect,
							 @RequestParam(value = "type", required = false) String type,
							 @RequestParam(value = "schema", required = false) String schema) throws NacosException {

	// 修改配置入口
	final String srcIp = RequestUtil.getRemoteIp(request);
	final String requestIpApp = RequestUtil.getAppName(request);
	srcUser = RequestUtil.getSrcUserName(request);
	//check type
	if (!ConfigType.isValidType(type)) {
		type = ConfigType.getDefaultType().getType();
	}
	// check tenant
	ParamUtils.checkTenant(tenant);
	ParamUtils.checkParam(dataId, group, "datumId", content);
	ParamUtils.checkParam(tag);
	Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
	MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
	MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
	MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
	MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
	MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
	MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
	ParamUtils.checkParam(configAdvanceInfo);

	if (AggrWhitelist.isAggrDataId(dataId)) {
		LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
					dataId, group);
		throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
	}

	final Timestamp time = TimeUtils.getCurrentTime();
	String betaIps = request.getHeader("betaIps");
	ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
	configInfo.setType(type);
	if (StringUtils.isBlank(betaIps)) {
		if (StringUtils.isBlank(tag)) {
			// 插入数据库
			persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
			// 发布ConfigDataChangeEvent事件
			/**
                 * AsyncNotifyService监听了ConfigDataChangeEvent事件
                 * @see AsyncNotifyService#AsyncNotifyService(com.alibaba.nacos.core.cluster.ServerMemberManager)
                 */
			ConfigChangePublisher
				.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
		} else {
			persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
			ConfigChangePublisher.notifyConfigChange(
				new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
		}
	} else {
		// beta publish
		persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
		ConfigChangePublisher
			.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
	}
	ConfigTraceService
		.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
							 ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
	return true;
}

AsyncNotifyService监听了ConfigDataChangeEvent事件,然后提交了AsyncTask任务来对Nacos集群中的节点进行通知配置的变化。
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService

public AsyncNotifyService(ServerMemberManager memberManager) {
	this.memberManager = memberManager;

	// Register ConfigDataChangeEvent to NotifyCenter.
	NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);

	// Register A Subscriber to subscribe ConfigDataChangeEvent.
	NotifyCenter.registerSubscriber(new Subscriber() {

		@Override
		public void onEvent(Event event) {
			// Generate ConfigDataChangeEvent concurrently
			if (event instanceof ConfigDataChangeEvent) {
				// 监听ConfigDataChangeEvent事件
				ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
				long dumpTs = evt.lastModifiedTs;
				String dataId = evt.dataId;
				String group = evt.group;
				String tenant = evt.tenant;
				String tag = evt.tag;
				Collection<Member> ipList = memberManager.allMembers();

				// In fact, any type of queue here can be
				// 遍历集群中的所有节点,封装NotifySingleTask
				Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
				for (Member member : ipList) {
					queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
												   evt.isBeta));
				}
				// 提交AsyncTask任务,AsyncTask中包含了NotifySingleTask
				/**
                     * @see AsyncTask#run()
                     */
				ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
			}
		}

		@Override
		public Class<? extends Event> subscribeType() {
			return ConfigDataChangeEvent.class;
		}
	});
}

AsyncTask.run()会调用Nacos集群中的所有节点(包含自己)的Http接口/v1/cs/communication/dataChange来通知配置的变化。
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncTask#run

@Override
public void run() {
	executeAsyncInvoke();
}

private void executeAsyncInvoke() {
	// 遍历所有的NotifySingleTask任务
	while (!queue.isEmpty()) {
		NotifySingleTask task = queue.poll();
		String targetIp = task.getTargetIP();
		if (memberManager.hasMember(targetIp)) {
			// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
			boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
			if (unHealthNeedDelay) {
				// target ip is unhealthy, then put it in the notification list
				ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
												  task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
												  0, task.target);
				// get delay time and set fail count to the task
				asyncTaskExecute(task);
			} else {
				Header header = Header.newInstance();
				header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
				header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
				if (task.isBeta) {
					header.addParam("isBeta", "true");
				}
				AuthHeaderUtil.addIdentityToHeader(header);
				// 调用/v1/cs/communication/dataChange接口
				/**
                         * @see CommunicationController#notifyConfigInfo(javax.servlet.http.HttpServletRequest, java.lang.String, java.lang.String, java.lang.String, java.lang.String)
                         */
				restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
			}
		}
	}
}

notifyConfigInfo()主要负责将变化的配置从数据库中查询出来,然后更新本地的文件。
com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
								@RequestParam("group") String group,
								@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
								@RequestParam(value = "tag", required = false) String tag) {
	// 通知配置数据变更的入口
	dataId = dataId.trim();
	group = group.trim();
	String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
	long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
	String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
	String isBetaStr = request.getHeader("isBeta");
	if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
		dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
	} else {
		// 转存数据
		dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
	}
	return true;
}

dump()操作又提交了一个DumpTask任务。
com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, java.lang.String)

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {
	dump(dataId, group, tenant, tag, lastModified, handleIp, false);
}

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
				 boolean isBeta) {
	String groupKey = GroupKey2.getKey(dataId, group, tenant);
	String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
	// 添加DumpTask任务
	/**
         * @see DumpProcessor#process(com.alibaba.nacos.common.task.NacosTask)
         */
	dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
	DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

process()会将变化的配置从数据库中查询出来,交于DumpConfigHandler.configDump()处理配置。
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process

public boolean process(NacosTask task) {
	// 处理DumpTask
	final PersistService persistService = dumpService.getPersistService();
	DumpTask dumpTask = (DumpTask) task;
	String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
	String dataId = pair[0];
	String group = pair[1];
	String tenant = pair[2];
	long lastModified = dumpTask.getLastModified();
	String handleIp = dumpTask.getHandleIp();
	boolean isBeta = dumpTask.isBeta();
	String tag = dumpTask.getTag();

	ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
		.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);

	if (isBeta) {
。。。。。。
	} else {
		if (StringUtils.isBlank(tag)) {
			// 从数据库查询配置数据
			ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);

			build.remove(Objects.isNull(cf));
			build.content(Objects.isNull(cf) ? null : cf.getContent());
			build.type(Objects.isNull(cf) ? null : cf.getType());

			// 转存配置数据
			return DumpConfigHandler.configDump(build.build());
		} else {
。。。。。。
		}
	}
}

configDump()又调用了ConfigCacheService.dump(),这个方法在服务端启动时保存所有的配置文件时也使用了。
com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump

public static boolean configDump(ConfigDumpEvent event) {
	final String dataId = event.getDataId();
	final String group = event.getGroup();
	final String namespaceId = event.getNamespaceId();
	final String content = event.getContent();
	final String type = event.getType();
	final long lastModified = event.getLastModifiedTs();
	if (event.isBeta()) {
		。。。。。。
	}
	if (StringUtils.isBlank(event.getTag())) {
		if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
			AggrWhitelist.load(content);
		}

		if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
			ClientIpWhiteList.load(content);
		}

		if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
			SwitchService.load(content);
		}

		boolean result;
		if (!event.isRemove()) {
			// dump数据
			result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);

			if (result) {
				ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
												ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
												content.length());
			}
		} else {
			。。。。。。
		}
		return result;
	} else {
		。。。。。。
	}

}

dump()会将新的配置写入磁盘文件,更新md5,然后发布LocalDataChangeEvent事件。
com.alibaba.nacos.config.server.service.ConfigCacheService#dump

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
						   String type) {
	String groupKey = GroupKey2.getKey(dataId, group, tenant);
	CacheItem ci = makeSure(groupKey);
	ci.setType(type);
	final int lockResult = tryWriteLock(groupKey);
	assert (lockResult != 0);

	if (lockResult < 0) {
		DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
		return false;
	}

	try {
		final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);

		if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
			DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
						  + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
						  lastModifiedTs);
		} else if (!PropertyUtil.isDirectRead()) {
			// 写入磁盘
			DiskUtil.saveToDisk(dataId, group, tenant, content);
		}
		// 更新md5,发布LocalDataChangeEvent事件
		updateMd5(groupKey, md5, lastModifiedTs);
		return true;
	} catch (IOException ioe) {
		DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
		if (ioe.getMessage() != null) {
			String errMsg = ioe.getMessage();
			if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
				.contains(DISK_QUATA_EN)) {
				// Protect from disk full.
				FATAL_LOG.error("磁盘满自杀退出", ioe);
				System.exit(0);
			}
		}
		return false;
	} finally {
		releaseWriteLock(groupKey);
	}
}

updateMd5()会更新md5,然后发布LocalDataChangeEvent事件。
com.alibaba.nacos.config.server.service.ConfigCacheService#updateMd5

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
	CacheItem cache = makeSure(groupKey);
	if (cache.md5 == null || !cache.md5.equals(md5)) {
		cache.md5 = md5;
		cache.lastModifiedTs = lastModifiedTs;
		// 发布LocalDataChangeEvent事件
		/**
             * LongPollingService监听了LocalDataChangeEvent事件
             * @see LongPollingService#LongPollingService()
             */
		NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
	}
}

LongPollingService会监听LocalDataChangeEvent事件,然后提交DataChangeTask。
com.alibaba.nacos.config.server.service.LongPollingService#LongPollingService

public LongPollingService() {
	allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();

	ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);

	// Register LocalDataChangeEvent to NotifyCenter.
	NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);

	// Register A Subscriber to subscribe LocalDataChangeEvent.
	NotifyCenter.registerSubscriber(new Subscriber() {

		@Override
		public void onEvent(Event event) {
			if (isFixedPolling()) {
				// Ignore.
			} else {
				// 监听LocalDataChangeEvent事件
				if (event instanceof LocalDataChangeEvent) {
					LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
					// 提交DataChangeTask任务
					/**
                         * @see DataChangeTask#run()
                         */
					ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
				}
			}
		}

		@Override
		public Class<? extends Event> subscribeType() {
			return LocalDataChangeEvent.class;
		}
	});

}

DataChangeTask会找到监听这个配置的客户端,然后进行通知。
com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run

public void run() {
	try {
		ConfigCacheService.getContentBetaMd5(groupKey);
		for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
			ClientLongPolling clientSub = iter.next();
			// 找到监听这个配置的客户端
			if (clientSub.clientMd5Map.containsKey(groupKey)) {
				// If published tag is not in the beta list, then it skipped.
				if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
					continue;
				}

				// If published tag is not in the tag list, then it skipped.
				if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
					continue;
				}

				getRetainIps().put(clientSub.ip, System.currentTimeMillis());
				iter.remove(); // Delete subscribers' relationships.
				LogUtil.CLIENT_LOG
					.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
						  RequestUtil
						  .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
						  "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
				// 通知客户端配置更新了
				clientSub.sendResponse(Arrays.asList(groupKey));
			}
		}
	} catch (Throwable t) {
		LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/335752.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第三方电容笔怎么样?开学适合买的电容笔

随着科学技术的进步&#xff0c;很多新型的电子产品和数码设备都出现了。比如手机&#xff0c;IPAD&#xff0c;蓝牙耳机&#xff0c;电容笔等等。实际上&#xff0c;如果你想要更好的使用ipad&#xff0c;那么你就需要一支电容笔。比如ipad&#xff0c;我们用ipad来做笔记&…

面向战场的cesium基础到进阶的案例展示(我相信VIP总是有原因的)

cesium 前置说明(友情提示,关注重点代码,其他影响复现的都可以删除或者替换数值解决) 这里面用到了cesium的模型加载、图片加载、着色器、实时改变模型状态、模型删除等知识点,这需要你自己去观摩下述会包含所有相关代码,他们的联系其实在代码中能看到(比如飞机操作类会…

告别传统繁杂的采购合同管理 打造企业自动化采购管理模式

随着企业竞争日趋激烈&#xff0c;采购成本压力剧增&#xff0c;企业对于采购合同管理更加严格&#xff0c;从而把控物资成本。对于任何一家企业采购来说&#xff0c;规范化合同的全面管理&#xff0c;是采购活动中重要的一个环节。 但在如今&#xff0c;依旧有很多企业采购合…

Windows截取gif动态图的软件 ScreenToGif 的安装、使用教程

一、概述 &#x1f449;GIF&#xff08;Graphics Interchange Format&#xff09;&#xff0c;又称图形交换格式&#xff0c;是一种公用的图像文件格式标准&#xff0c;于1987年由Compu Serve公司成功研发并推出。 &#x1f449;GIF用于以超文本标志语言方式显示索引彩色图像&a…

【FFMPEG源码分析】从ffplay源码摸清ffmpeg框架(二)

demux模块 从前面一篇文章中可以得知&#xff0c;demux模块的使用方法大致如下: 分配AVFormatContext通过avformat_open_input(…)传入AVFormatContext指针和文件路径&#xff0c;启动demux通过av_read_frame(…) 从AVFormatContext中读取demux后的audio/video/subtitle数据包…

LKWA靶场通关和源码分析

文章目录一、Blind RCE&#xff1f;二、XSSI三、PHP Object Injection四、PHP Object Injection(cookie)五、PHP Object Injection(Referer)六、PHAR七、SSRF八、Variables总结一、Blind RCE&#xff1f; 源码&#xff1a; <?php include("sidebar.php"); /***…

【程序化天空盒】过程记录01:日月 天空渐变 大气散射

1 日月 SunAndMoon 昼夜的话肯定少不了太阳和月亮&#xff0c;太阳和月亮实现的道理是一样的&#xff0c;只不过是月亮比太阳多了一个需要控制月牙程度&#xff08;or添加贴图&#xff09;的细节~ 1.1 Sun 太阳的话很简单&#xff0c;直接在shader里实现一个太阳跟随平行光旋…

Ubuntu18.04中安装Pycharm2023

下载安装包访问 Jetbrains官方网站 下载 Linux的安装包点击 Download 后下载文件名为 pycharm-community-2022.3.2.tar.gz解压安装启动终端&#xff0c;cd Downloads 进入Downloads目录&#xff08;默认下载路径&#xff09;解压压缩包 tar -xzvf pycharm-community-2020.2.2.t…

【Nacos】Nacos配置中心客户端启动源码分析

SpringCloud项目启动过程中会解析bootstrop.properties、bootstrap.yaml配置文件&#xff0c;启动父容器&#xff0c;在子容器启动过程中会加入PropertySourceBootstrapConfiguration来读取配置中心的配置。 PropertySourceBootstrapConfiguration#initialize PropertySource…

实现复选框全选和全不选的切换

今天&#xff0c;复看了一下JS的菜鸟教程&#xff0c;发现评论里面都是精华呀&#xff01;&#xff01; 看到函数这一节&#xff0c;发现就复选框的全选和全不选功能展开了讨论。我感觉挺有意思的&#xff0c;尝试实现了一下。 1. 全选、全不选&#xff0c;两个按钮&#xff…

CentOS8联网部署Ceph-Quincy集群

文章目录1.环境准备1.1 关闭selinux1.2 关闭防火墙1.3 配置免密1.4 设置yum源1.5 安装依赖1.6 设置时间同步1.7 安装docker2.安装Ceph2.1 安装cephadm2.2 部署ceph集群2.3 集群添加节点2.4 部署MON2.5 部署OSD2.6 部署MGR2.7 集群状态3.问题3.1 failed to retrieve runc versio…

腾讯云对象存储+企业网盘 打通数据链“最后一公里

对云厂商和企业用户来说&#xff0c;随着数据规模的快速增长&#xff0c;企业除了对存储功能和性能的要求不断增加&#xff0c;也越来越注重数据分发的效率。在传统数据分发的过程中&#xff0c;数据管理员往往需要先在存储桶下载对应的客户方案/交付资料&#xff0c;再使用微信…

【前言】嵌入式系统简介

随手拍拍&#x1f481;‍♂️&#x1f4f7; 日期: 2022.12.01 地点: 杭州 介绍: 2022.11.30下午两点时&#xff0c;杭州下了一场特别大的雪。隔天的12月路过食堂时&#xff0c;边上的井盖上发现了这个小雪人。此时边上的雪已经融化殆尽&#xff0c;只有这个雪人依旧维持着原状⛄…

【FLASH存储器系列十九】固态硬盘掉电后如何恢复掉电前状态?

掉电分两种&#xff0c;一种是正常掉电&#xff0c;另一种是异常掉电。不管是哪种原因导致的掉电&#xff0c;我们都希望&#xff0c;重新上电后&#xff0c;SSD都需要能从掉电中恢复过来&#xff0c;继续正常工作。正常掉电恢复&#xff0c;这个好理解&#xff0c;主机通知SSD…

Linux(centOS7)虚拟机中配置 vim

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是小童&#xff0c;Java开发工程师&#xff0c;CSDN博客博主&#xff0c;Java领域新星创作者 &#x1f4d5;系列专栏&#xff1a;前端、Java、Java中间件大全、微信小程序、微信支付、若依框架、Spring全家桶 &#x1f4…

数据库实践LAB大纲 05 JDBC 连接

概述 Java DataBase Connectivity&#xff0c;Java 数据库连接 执行SQL的Java API 为多种关系型数据提供统一访问 FUNCTION 建立与数据库的连接向数据库发送 SQL 语句处理从数据库返回的结果 四种常见JDBC驱动程序 JDBC-ODBC Bridge drivernative-API, partly Java driver…

LeetCode题目笔记——1.两数之和

文章目录题目描述题目难度——简单方法一&#xff1a;暴力代码/Python方法二&#xff1a;哈希表代码/Python代码/C总结题目描述 这道题可以说是力扣的入坑题了&#xff0c;很经典&#xff0c;好像还是面试的经典题。 给定一个整数数组 nums 和一个整数目标值 target&#xff0c…

Zookeeper技术认知

目录概念理解工作原理文件系统通知系统zookeeper在kakfa中的作用概念理解 Zookeeper是一个开源的分布式的&#xff0c;为分布式框架提供协调服务的Apache项目。 工作原理 Zookeeper 作为一个分布式的服务框架&#xff0c;主要用来解决分布式集群中应用系统的一致性问题&…

Android IO 框架 Okio 的实现原理,到底哪里 OK?

本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 提问。 前言 大家好&#xff0c;我是小彭。 今天&#xff0c;我们来讨论一个 Square 开源的 I/O 框架 Okio&#xff0c;我们最开始接触到 Okio 框架还是源于 Square 家的 OkHttp 网络…

C4--Vivado添加列表中不存在的FLash器件2023-02-10

以华邦SPI FLASH W25Q128JVEIQ为例进行说明。&#xff08;其他Flash添加步骤一致&#xff09; 1.本地vivado安装目录D:\Softwares\xlinx_tools\Vivado\2020.2\data\xicom下&#xff0c;找到xicom_cfgmem_part_table.csv文件&#xff0c;这个表与vivado hardware manager中的器…