客户端
客户端的配置有两种方式来维持,一是客户端主动拉取,而是客户端长轮询更新
配置文件的种类
1、本地配置文件: 本地就已经存在的配置文件
2、 本地缓存文件: 从服务端获取的保存在了本地 (本地生成了文件)
3、 cacheData 缓存数据: 内存中缓存的配置文件数据
客户端主动获取
从源码的ConfigExample中,可以看到获取配置的方法:
NacosConfigService.getConfig
1、优先从本地配置中获取
2、本地配置中没有则会去服务端获取
3、 服务端异常且异常不是因为鉴权失败,则从本地缓存文件中获取
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// 1、优先使用本地配置
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
dataId, group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try {
// 2、本地配置没有则会从服务端获取
String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(ct[0]);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
agent.getName(), dataId, group, tenant, ioe.toString());
}
LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
dataId, group, tenant, ContentUtils.truncateContent(content));
// 3、如果服务端获取异常,而异常不是403(鉴权失败),则从本地缓存中获取
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
下面分析下从服务器端获取配置文件的方法
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
String[] ct = new String[2];
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group));
} else {
params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));
}
// 发起请求:/v1/cs/configs
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
String message = String.format(
"[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
dataId, group, tenant);
LOGGER.error(message, e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
switch (result.code) {
case HttpURLConnection.HTTP_OK:
// 成功时候创建的本地缓存文件, 内容存的是返回的是result.content
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
ct[0] = result.content;
if (result.headers.containsKey(CONFIG_TYPE)) {
ct[1] = result.headers.get(CONFIG_TYPE).get(0);
} else {
ct[1] = ConfigType.TEXT.getType();
}
return ct;
case HttpURLConnection.HTTP_NOT_FOUND:
// 失败时候,创建的本地缓存,内容存的是 null
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return ct;
case HttpURLConnection.HTTP_CONFLICT: {
LOGGER.error(
"[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
+ "tenant={}", agent.getName(), dataId, group, tenant);
throw new NacosException(NacosException.CONFLICT,
"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,
group, tenant);
throw new NacosException(result.code, result.content);
}
default: {
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,
group, tenant, result.code);
throw new NacosException(result.code,
"http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
}
}
服务端
ConfigController
进行参数的解析
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam(value = "tag", required = false) String tag)
throws IOException, ServletException, NacosException {
// check params
ParamUtils.checkParam(dataId, group, "datumId", "content");
ParamUtils.checkParam(tag);
final String clientIp = RequestUtil.getRemoteIp(request);
inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
/**
* 同步配置获取接口
*/
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
String tenant, String tag, String clientIp) throws IOException, ServletException {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
String autoTag = request.getHeader("Vipserver-Tag");
String requestIpApp = RequestUtil.getAppName(request);
// 读取读锁,获取失败会自旋重复获取10次
int lockResult = tryConfigReadLock(groupKey);
final String requestIp = RequestUtil.getRemoteIp(request);
boolean isBeta = false;
if (lockResult > 0) {
FileInputStream fis = null;
try {
String md5 = Constants.NULL;
long lastModified = 0L;
CacheItem cacheItem = ConfigService.getContentCache(groupKey);
if (cacheItem != null) {
if (cacheItem.isBeta()) {
if (cacheItem.getIps4Beta().contains(clientIp)) {
isBeta = true;
}
}
String configType = cacheItem.getType();
response.setHeader("Config-Type", (null != configType) ? configType : "text");
}
File file = null;
ConfigInfoBase configInfoBase = null;
PrintWriter out = null;
if (isBeta) {
md5 = cacheItem.getMd54Beta();
lastModified = cacheItem.getLastModifiedTs4Beta();
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);
} else {
file = DiskUtil.targetBetaFile(dataId, group, tenant);
}
response.setHeader("isBeta", "true");
} else {
if (StringUtils.isBlank(tag)) {
if (isUseTag(cacheItem, autoTag)) {
if (cacheItem != null) {
if (cacheItem.tagMd5 != null) {
md5 = cacheItem.tagMd5.get(autoTag);
}
if (cacheItem.tagLastModifiedTs != null) {
lastModified = cacheItem.tagLastModifiedTs.get(autoTag);
}
}
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, autoTag);
} else {
file = DiskUtil.targetTagFile(dataId, group, tenant, autoTag);
}
response.setHeader("Vipserver-Tag",
URLEncoder.encode(autoTag, StandardCharsets.UTF_8.displayName()));
} else {
md5 = cacheItem.getMd5();
lastModified = cacheItem.getLastModifiedTs();
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
file = DiskUtil.targetFile(dataId, group, tenant);
}
if (configInfoBase == null && fileNotExist(file)) {
// FIXME CacheItem
// 不存在了无法简单的计算推送delayed,这里简单的记做-1
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
// pullLog.info("[client-get] clientIp={}, {},
// no data",
// new Object[]{clientIp, groupKey});
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
}
} else {
if (cacheItem != null) {
if (cacheItem.tagMd5 != null) {
md5 = cacheItem.tagMd5.get(tag);
}
if (cacheItem.tagLastModifiedTs != null) {
Long lm = cacheItem.tagLastModifiedTs.get(tag);
if (lm != null) {
lastModified = lm;
}
}
}
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
} else {
file = DiskUtil.targetTagFile(dataId, group, tenant, tag);
}
if (configInfoBase == null && fileNotExist(file)) {
// FIXME CacheItem
// 不存在了无法简单的计算推送delayed,这里简单的记做-1
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND,
-1, requestIp);
// pullLog.info("[client-get] clientIp={}, {},
// no data",
// new Object[]{clientIp, groupKey});
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
}
}
response.setHeader(Constants.CONTENT_MD5, md5);
/**
* 禁用缓存
*/
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
response.setDateHeader("Last-Modified", lastModified);
} else {
fis = new FileInputStream(file);
response.setDateHeader("Last-Modified", file.lastModified());
}
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
out = response.getWriter();
out.print(configInfoBase.getContent());
out.flush();
out.close();
} else {
fis.getChannel().transferTo(0L, fis.getChannel().size(),
Channels.newChannel(response.getOutputStream()));
}
LogUtil.pullCheckLog.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());
final long delayed = System.currentTimeMillis() - lastModified;
// TODO distinguish pull-get && push-get
// 否则无法直接把delayed作为推送延时的依据,因为主动get请求的delayed值都很大
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
ConfigTraceService.PULL_EVENT_OK, delayed,
requestIp);
} finally {
// 释放锁
releaseConfigReadLock(groupKey);
if (null != fis) {
fis.close();
}
}
} else if (lockResult == 0) {
// FIXME CacheItem 不存在了无法简单的计算推送delayed,这里简单的记做-1
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
} else {
pullLog.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey);
response.setStatus(HttpServletResponse.SC_CONFLICT);
response.getWriter().println("requested file is being modified, please try later.");
return HttpServletResponse.SC_CONFLICT + "";
}
return HttpServletResponse.SC_OK + "";
}
在上面的方法里主要做了以下几件事
1、获取读锁,获取不到就自旋重复获取10次
2、根据beta、tag、autoTag来判断读什么配置
3、PropertyUtil.isDirectRead() 判断是读 mysql 还是配置文件
4、使用jdk的零拷贝传输直接将文件输入流转response输出流
5、释放读取