Nacos 配置中心源码讲解

news2024/9/25 1:16:21

目录

1. 配置中心的优点

2. 配置模型结构

3. 配置中心 Server 端实现原理

3.1 新建配置 / 发布配置

3.2 查询配置

4. 配置中心 Client 端实现原理

4.1 发布配置

4.2 查询配置

4.3 监听机制 Listener


1. 配置中心的优点

  1. 运行时动态修改系统参数配置,不用重启服务

  2. 方便运维人员修改系统参数,不直接改代码,安全性高,防止代码改坏了2

  3. 微服务多,配置统一管理

2. 配置模型结构

3. 配置中心 Server 端实现原理

3.1 新建配置 / 发布配置

配置通过后台管理可以新建,当点击发布按钮时,将会调用接口 /nacos/v1/cs/configs 完成发布

接口 /nacos/v1/cs/configs 定义在 com.alibaba.nacos.config.server.controller.ConfigController#publishConfig

接下来看看这个接口如何实现的

ConfigController.publishConfig

publishConfig 接口主要做的就是数据的组装、字段的非法性校验。

@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH) // CONFIG_CONTROLLER_PATH = /v1/cs/configs
public class ConfigController {
    
    
    @PostMapping
    public Boolean publishConfig( HttpServletRequest request, HttpServletResponse response,
                                  @RequestParam String dataId, @RequestParam String group 
                                  // 省略其他字段                                
                                ) {
        
        // 省略非关键代码
        
        // 校验字段是否合法
        ParamUtils.checkTenant(tenant);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);
        
        // 从请求中组装数据到 ConfigForm 对象中
        ConfigForm configForm = new ConfigForm();
        configForm.setDataId(dataId);
        configForm.setGroup(group);
        // ... 省略 configForm 其他字段的 set 
    
    
        // 构建一个 ConfigRequestInfo 请求对象
        ConfigRequestInfo configRequestInfo = new ConfigRequestInfo(); 
        configRequestInfo.setSrcIp(RequestUtil.getRemoteIp(request));
        configRequestInfo.setRequestIpApp(RequestUtil.getAppName(request));
        configRequestInfo.setBetaIps(request.getHeader("betaIps"));
    
        String encryptedDataKey = pair.getFirst();
       
        return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKey);
    }
    
}

ConfigOperationService.publishConfig

publishConfig 接口最后调用到了 configOperationService.publishConfig

接下来看看这个 service 如何实现,以下代码省略了 betaIps 与 tag 的分支,这里只关注单机情况的当前的主流程。

@Service
public class ConfigOperationService {
    
    private ConfigInfoPersistService configInfoPersistService;
    
    public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey) {
        
        // 省略非关键代码
        
        Map<String, Object> configAdvanceInfo = getConfigAdvanceInfo(configForm);
       
        
        // 将 configForm 转成 与数据库对应的 ConfigInfo 对象
        ConfigInfo configInfo = new ConfigInfo(configForm.getDataId(), ...);        
        configInfo.setEncryptedDataKey(encryptedDataKey);
        
        // 调用持久化 service 执行 insertOrUpdate 插入或更新数据(因为 publishConfig 接口可以同时做新增和编辑)
        configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(), 
                                                configForm.getSrcUser(),
                                                configInfo, 
                                                TimeUtils.getCurrentTime(), 
                                                configAdvanceInfo, 
                                                false);
​
        return true;
    }    
}

可见,上述代码中,最后调用了 ConfigInfoPersistService.insertOrUpdate 方法。

ConfigInfoPersistService.insertOrUpdate

ConfigInfoPersistService,这个类名也能猜出它的作用了,Persist 就是持久化,这个类就是负责持久化数据。

ConfigInfoPersistService 是一个接口代表持久化,而 ExternalConfigInfoPersistServiceImpl 为实现类,代表采用外部的持久化(MySQL)

看看如何实现:

@Service
public class ExternalConfigInfoPersistServiceImpl implements ConfigInfoPersistService {
    
    @Override
    public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time,
            Map<String, Object> configAdvanceInfo, boolean notify) {
        try {
            // 添加配置
            addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);
        } catch (DataIntegrityViolationException ive) { // Unique constraint conflict
            updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);
        }
    }
    
    
    // 添加配置
    @Override
    public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
            final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
        
        // 添加配置
        long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);
        
        // 省略非关键代码           
    }
    
    
    // addConfigInfoAtomic 完成实际的数据插入
    @Override
    public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,
            final ConfigInfo configInfo, final Timestamp time, Map<String, Object> configAdvanceInfo) {
 
        // 省略非关键代码    
        
        // 根据文件内容使用指定编码(UTF-8) 计算 md5 值
        final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
        
        KeyHolder keyHolder = new GeneratedKeyHolder();
        
        // 找到 ConfigInfoMapper 
        ConfigInfoMapper configInfoMapper =             mapperManager.findMapper(dataSourceService.getDataSourceType(),TableConstant.CONFIG_INFO);
        
        // 构建 SQL
        final String sql = configInfoMapper.insert(
                Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", 
                              "md5", "src_ip", "src_user","gmt_create", "gmt_modified",
                              "c_desc", "c_use", "effect", "type", 
                              "c_schema","encrypted_data_key"));
​
        String[] returnGeneratedKeys = configInfoMapper.getPrimaryKeyGeneratedKeys();
     
            // 使用 JDBC Template 执行 SQL
            jt.update(new PreparedStatementCreator() {
                @Override
                public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
                    // 构建预编译执行模式,传入对应参数
                    PreparedStatement ps = connection.prepareStatement(sql, returnGeneratedKeys);
                    ps.setString(1, configInfo.getDataId());
                    ps.setString(2, configInfo.getGroup());
                    ps.setString(3, tenantTmp);
                    ps.setString(4, appNameTmp);
                    ps.setString(5, configInfo.getContent());
                    ps.setString(6, md5Tmp);
                    ps.setString(7, srcIp);
                    ps.setString(8, srcUser);
                    ps.setTimestamp(9, time);
                    ps.setTimestamp(10, time);
                    ps.setString(11, desc);
                    ps.setString(12, use);
                    ps.setString(13, effect);
                    ps.setString(14, type);
                    ps.setString(15, schema);
                    ps.setString(16, encryptedDataKey);
                    return ps;
                }
            }, keyHolder);
        
        
            // 生成一个 config id 返回
            Number nu = keyHolder.getKey();
            return nu.longValue();
    }    
}

可以看出,配置的发布主要流程就是向数据库添加了数据。

3.2 查询配置

在 Nacos 后台管理,点击配置详情,观察控制台浏览器发送了一条查询详情接口 GET /nacos/v1/cs/configs

该接口位置在 com.alibaba.nacos.config.server.controller.ConfigController#detailConfigInfo

@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {
    
    private ConfigInfoPersistService configInfoPersistService;
    
    @GetMapping
    public ConfigAllInfo detailConfigInfo(String dataId, String group,String tenant) {
​
        // 参数校验
        ParamUtils.checkTenant(tenant);
        ParamUtils.checkParam(dataId, group, "datumId", "content");
        
        // 查询配置详情
        ConfigAllInfo configAllInfo = configInfoPersistService.findConfigAllInfo(dataId, group, tenant);
        
        // 省略非关键代码
        return configAllInfo;
    }
}

configInfoPersistService.findConfigAllInfo 最终调用以下方法

@Service
public class ExternalConfigInfoPersistServiceImpl implements ConfigInfoPersistService {
    
    protected JdbcTemplate jt;
    
    @Override
    public ConfigAllInfo findConfigAllInfo(final String dataId, final String group, final String tenant) {
        
        // 省略非关键代码
        
        // 获取到 mapper
        ConfigInfoMapper configInfoMapper = mapperManager.findMapper(
                                                dataSourceService.getDataSourceType(),
                                                TableConstant.CONFIG_INFO);
                
        
        // 使用 JdbcTemplate 执行 SQL 查询数据
        ConfigAllInfo configAdvance = this.jt.queryForObject(
                
            configInfoMapper.select(
                            Arrays.asList("id", "data_id", "group_id", "tenant_id", 
                                          "app_name", "content", "md5", "gmt_create",
                                        "gmt_modified", "src_user", "src_ip", "c_desc",
                                          "c_use", "effect", "type", "c_schema",
                                    "encrypted_data_key"),
                            Arrays.asList("data_id", "group_id", "tenant_id")),
            
                    new Object[] {dataId, group, tenantTmp}, CONFIG_ALL_INFO_ROW_MAPPER);
        
            return configAdvance;
    }
}

可见,查询配置详情就是从数据库查询一条记录。

4. 配置中心 Client 端实现原理

Server 端看起来很简单,复杂度其实都在 Client 端。

4.1 发布配置

我们使用官方提供的代码示例来研究,代码模块在 nacos/example

客户端调用的发布配置方法其实就是向 server 端发送了一个请求进行发布。而 server 端如何处理上面已经介绍过了。

Nacos 2.x 版本 Client 是使用 RPC 发送的消息。后台管理则是使用 HTTP 接口调用的。

可见最终采用 GRPC 发送请求。

4.2 查询配置

NacosClient 获取配置调用如下方法

接下来,研究 getConfig 方法

public class NacosConfigService implements ConfigService {
    
    @Override
    public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
        return getConfigInner(namespace, dataId, group, timeoutMs);
    }
    
    
    private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) {
        
        // 省略非关键代码
 
        ConfigResponse cr = new ConfigResponse();
        
        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);
         
        // 读取本地故障切换文件(如果存在)
        String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
        if (content != null) {
            // 本地故障切换文件存在,就读取内容并返回
​
            cr.setContent(content);            
            String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
            cr.setEncryptedDataKey(encryptedDataKey);          
            return content;
        }
        
        // 故障切换文件不存在,发送一个 RPC 请求来查询 server 端的配置文件内容
        try {
            ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
            cr.setContent(response.getContent());
            cr.setEncryptedDataKey(response.getEncryptedDataKey());
            content = cr.getContent();
            
            return content;
        } catch (NacosException ioe) {
            if (NacosException.NO_RIGHT == ioe.getErrCode()) {
                throw ioe;
            }
        }
        
        // 走到这里代表请求 server 端失败了,那么就读取本地的快照文件获取配置内容
        content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);
        
        cr.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant);
        cr.setEncryptedDataKey(encryptedDataKey);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }
    
}

整体流程图

故障切换文件是什么?

该文件由用户手动维护,Nacos 不负责创建。

该功能设计可发现,当我们为 NacosClient 所在的服务器的文件系统中创建一个故障切换文件后,NacosClient 将从该文件中读取配置项,而不再请求服务端数据。

根据源码注释可知,该功能可用于当 NacosServer 关闭的同一时间,客户端需要同时更改配置(因为启动需要时间,这个时间段直接读取本地的故障切换文件)

本地快照文件是什么

本地快照文件由每次 RPC 请求远程 Server 文件获取到返回结果时,将结果存储起来到一个快照文件。

后续 Server 访问不通了,就使用本地快照文件。流程见下图中的红色区域。

存储快照的原理就是 将配置的内容写入到本地指定文件中。

com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor#saveSnapshot

4.3 监听机制 Listener

监听机制代表 NacosClient 监听某个配置,当配置内容发生变更时,NacosClient 能够知道并拿到最新的配置内容。

NacosClient 使用监听功能的代码如下:

接下来研究下该功能如何实现。

入口是:添加监听器 addListener

public class NacosConfigService implements ConfigService {
    
    private final ClientWorker worker;
    
    @Override
    public void addListener(String dataId, String group, Listener listener) throws NacosException {
        worker.addTenantListeners(dataId, group, Arrays.asList(listener));
    }
}  

看出添加监听器调用了 ClientWorker.addListener

ClientWorker.addListener 源码如下:

public class ClientWorker {
    
    private ConfigTransportClient agent;
    
    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) {
       
        // 根据 dataId,group 包装为 CacheData 放入缓存
        CacheData cache = addCacheDataIfAbsent(dataId, group, agent.getTenant());
        
        synchronized (cache) {
            for (Listener listener : listeners) {
                // 添加监听器
                cache.addListener(listener);
            }
            // 通知监听器读取最新配置
            agent.notifyListenConfig();
        }   
        
        // 省略非关键代码
    }
}   

该方法主要完成两点

  1. 构建一个 CacheData 对象,放入缓存

  2. 通知监听器读取最新配置

我们分开来看

1. 构建一个 CacheData 对象,放入缓存

完成这个功能的方法为 ClientWorker.addCacheDataIfAbsent(),下面单独研究这个方法:

/**
 * groupKey -> cacheData.
 */
private final AtomicReference<Map<String, CacheData>> cacheMap =
    															new AtomicReference<>(new HashMap<>());

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) {
    
     	// 先从缓存中获取
        CacheData cache = getCache(dataId, group, tenant); // cacheMap.get(key)
        if (null != cache) {
            // 能够获取到直接返回
            return cache;
        }
    	
    	// 下面开始构建 CacheData 
    
        // 构建一个 Key
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        
		CacheData cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
		int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
		cache.setTaskId(taskId);
              
		
    	// 将此次构建的 CacheData 放入缓存 Map
    	Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());
		copy.put(key, cache);
		cacheMap.set(copy);
       
        return cache;
}

再来看看 CacheData 的字段

public class CacheData {
      
    // 配置的 dataId
    public final String dataId;
    // 配置 group 
    public final String group;
    
    // 当前"配置" 的监听器列表
    private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
    
    // 配置内容
    private volatile String content;

	// 配置内容 MD5 值
    private volatile String md5;
    
    // 本地缓存修改时间
    private volatile AtomicLong lastModifiedTs = new AtomicLong(0);
    
    private int taskId;
  
    private String type;
    
    // 省略部分字段,省略部分 setter getter
    
    
	public void setContent(String content) {
        this.content = content;
        // 每次设置配置内容都会重新计算 它的MD5 值
        this.md5 = getMd5String();
    }
	public String getMd5String() {
        return (null == content) ? Constants.NULL : MD5Utils.md5Hex(content, Constants.ENCODE);
    }
  1. 通知监听器读取最新配置

    在添加监听器代码的最后调用了 agent.notifyListenConfig(); 代码如下

public class ClientWorker implements Closeable {
   	        
   
	public class ConfigRpcTransportClient extends ConfigTransportClient {
           
		private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);

        private Object bellItem = new Object();
           
		@Override
		public void notifyListenConfig() {
			listenExecutebell.offer(bellItem);
		}
	}
       
}

可见源码,只是向 ArrayBlockingQueue 中加入了一个 数据。加入的这个 bellItem 对象只是一个占位符,对象无实际意义。

再来看看 ArrayBlockingQueue 取数据的地方,(还是在当前类)

public class ClientWorker implements Closeable {
   	
   
	public class ConfigRpcTransportClient extends ConfigTransportClient {
		     
		private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);
        
        @Override
        public void startInternal() {
            // executor 是 ScheduledExecutorService executor
            executor.schedule(() -> {
                while (
                        // 线程池没有关闭
                        !executor.isShutdown()
                        // 并且 线程池没有执行完毕:有任务正在执行
                        && !executor.isTerminated()) {
                    
                    
                        // 从 listenExecutebell 取出一个数据,取到数据立即返回,没有的话等待5秒再返回
                        listenExecutebell.poll(5L, TimeUnit.SECONDS);
                       
                        
                        // 执行配置监听
                        executeConfigListen();
                   
                }
            }, 0L, TimeUnit.MILLISECONDS);
            
        }
    }
}

可见这段代码,其实最返回值没有接收,返回值其实不重要,也就是队列中的数据内容本身不重要。

重要的是有数据就够了,那么这段代码代表什么意思呢?

这里相当于一个不断无限循环,如果有数据了就理解执行后面的 执行配置监听,没有数据了就每隔5秒执行一次。

其实,这里将队列用作了一种时间控制手段,然后向队列插入一条数据代表着立即执行一次后面的方法。否则就走正常的每隔5秒执行一次。

看看 executeConfigListen 干了什么吧

代码有点长,所以我将代码做了一些精简,省略一些次重要的代码。这个方法要做的事情也有点多,先总结一下:

该方法做的事情就是 读取到最新的配置内容,如有变更,就回调当前客户端的 监听器。

public class ConfigRpcTransportClient extends ConfigTransportClient {
    
	@Override
	public void executeConfigListen() {
		// 有监听组
		// taskId -> List<CacheData>: 存储的是  不需要同步数据 或者 需要同步数据但是 不需要全部同步数据
		Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
  
		// 是否需要全量同步(5 分钟全量同步一次)
		boolean needAllSync = System.currentTimeMillis() - lastAllSyncTime >= ALL_SYNC_INTERNAL;
            
		// 遍历当前全部 Cache
		for (CacheData cache : cacheMap.get().values()) {
			// 判断该 CacheData 是否需要检查, 如果 isSyncWithServer == false,必定进行检查
			
			if (cache.isSyncWithServer()) { // 本来是 false 什么时候变成 true 了
				// 检查 CacheData.MD5 与 Listenter.MD5 比对,如果不等于的话直接通知
				cache.checkListenerMd5();
				
                if (!needAllSync) {
					// 不需要全部数据同步,直接循环下一个
					// 上面已经同步了单个
        
					// 是否需要全量同步, 如果未达到全量同步时间或距离上次全量同步小于五分钟,则跳过这个 CacheData:本次的 CacheData 无需更换
					continue;
				}
			}
                    
			// 没同步过数据下来,   或者 需要全量同步的下来
                    
            // 维护 listenCachesMap
            List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
            // 将 CacheData 标记起来
            cacheDatas.add(cache);
		}
            
		boolean hasChangedKeys = false;
 
                
		for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
			String taskId = entry.getKey();
                    
			// GroupKey.getKeyTenant -> 上一次修改时间
			Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
                    
			List<CacheData> listenCaches = entry.getValue();
			for (CacheData cacheData : listenCaches) {
				timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant), cacheData.getLastModifiedTs().longValue());
            }
                    
			// 构建 RPC 请求对象
            ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
            // 获取一个 RPC Client
			RpcClient rpcClient = ensureRpcClient(taskId);

            // 发送请求
            ConfigChangeBatchListenResponse configChangeBatchListenResponse =  	(ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);
            
     		// 已改变的的配置
			Set<String> changeKeys = new HashSet<>();
                            
                           
			if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
				
                // 已有变更
				hasChangedKeys = true;
                                
				for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {
                                    
					String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant());
					
                    changeKeys.add(changeKey);
                                    
					// 通知改变 刷新内容并且检查、触发 Listener 回调
					refreshContentAndCheck(changeKey);
				}
                                
			}
                            
                            
			for (CacheData cacheData : listenCaches) {
				String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
				
                if (!changeKeys.contains(groupKey)) {
					// 返回来要改变的配置 没有 包含当前的配置
					// 上一次修改时间
					Long previousTimesStamp = timestampMap.get(groupKey);
					if (previousTimesStamp != null &&
! cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp, System.currentTimeMillis())) {
                                                
						// 修改失败了, 两个时间不等于, 说明别的地方已经同步过了, 比如 接收到服务端的消息推送
						continue;
					}
                                            
					// 两个时间等于、修改成功了,修改为了当前时间,开始同步数据
					cacheData.setSyncWithServer(true);     
				}
                                
				cacheData.setInitializing(false);
			}         
		}
  
        // If it has changed keys,notify re-sync md5.
  		if (hasChangedKeys) {
        	notifyListenConfig();
		}
	}
}

代码有点长,这里将它分为两大部分来看

  1. 找出需要监听的配置

    最终目标就是将需要监听的配置放到 listenCachesMap 中

    满足什么样的规则才能放进去呢?

    Nacos 每隔 5 分钟进行一次全量数据同步,就是全部配置(CacheData) 都去远程获取一次最新数据。

    否则当 CacheData 首次添加监听器、接收到服务端的配置变更推送、最后一个监听器被移除的时候,就去获取一次最新数据。

    也就意味着,配置如果没变更的话,就 5 分钟去远程获取一次数据,否则立即从远程服务获取一次数据。

    listenCachesMap 代表什么呢,这意味着本次需要检查的 需要监听的 配置。代表这些配置可能发生变更。所以把他们先收集起来。

    再来看看,什么样的配置会放进去?

    1. 就是上面提到的(首次添加监听器、接收到服务端的配置变更推送、最后一个监听器被移除的时候,就去获取一次最新数据) 这三种方式,只要有一项触发了就会放入。

    2. 不使用本地配置的放入(这项可以忽略掉,主要是第一条)

  1. 获取最新的配置

    上一步获取到了 listenCachesMap , 这一步就开始对这个 map 进行处理。

    怎么处理呢?

    此时会发送一个 RPC 请求,参数是这些 CacheData ,目的是向服务器查询,发送过去的这些 CacheData 的内容是否发生了变更?服务器接收到请求如何判断是否变更呢?其实就是比较客户端的 Content 和 服务端存的 Content 是不是一致的。Content 也许内容很多,直接比较效率不高,所以比较的其实是 Content 的 md5 值。

Nacos Server 端的比较代码

// com.alibaba.nacos.config.server.service.ConfigCacheService#isUptodate(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

// NacosServer 端的比较代码 , 参数 md5 是 NacosClient 传过来的
public static boolean isUptodate(String groupKey, String md5, String ip, String tag) {
    // 获取 Server 端的 md5 
	String serverMd5 = ConfigCacheService.getContentMd5(groupKey, ip, tag);
    // 与客户端的比较
	return StringUtils.equals(md5, serverMd5);
}

NacosServer 比较完后,会将 md5 不同的 CacheData 返回给客户端,也就是告诉 Nacos Client 哪些 配置与 NacosClient 传来的 是不一样的,也就是说,Nacos Client 不同的这些配置是旧的,然后将 不同的返回给 NacosClient。

再来会到 NacosClient ,NacosClient 拿到结果后,那就得需要处理这些不同的 CacheData 了,怎么处理呢?

for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
                                        .getChangedConfigs()) {
                                    
	String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant());
    changeKeys.add(changeKey);
                                    
    // 通知改变 刷新内容并且检查、触发 Listener 回调
    refreshContentAndCheck(changeKey);
}

最终调用了 refreshContentAndCheck

看看其实现源码:

public class ClientWorker implements Closeable {
	
    // 省略部分代码,精简化部分代码
    
    private void refreshContentAndCheck(String groupKey) {
        CacheData cache = cacheMap.get().get(groupKey);
      
		refreshContentAndCheck(cache, true);  
    }
    
    private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
 		
        // 从 Server 端获取最新的配置
        ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,notify);
        
        // 更新本地的 CacheData
		cacheData.setContent(response.getContent());
         
		// 检查监听器的 MD5
        cacheData.checkListenerMd5();
    }
}

这个方法要做的事情主要就是从服务端获取最新的数据并刷新本地的旧数据,然后通知当前 CacheData 的监听器。

接下来,看看最后调用 cacheData.checkListenerMd5() 如何实现

public class CacheData {
    
	void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            // 比较 监听器的 MD5 是否与最新的 MD5 一致
            if (!md5.equals(wrap.lastCallMd5)) {
                // MD5 不一致 -》 通知监听器,发生了改变。
                safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
            }
        }
    }      
    
    // 通知当前 CacheData 的监听器
    // 此方法经过我很大的简化,去除了一些次重要代码,但不影响主流程,目的是更容易的看到本节主题(如何回调到监听器)流程
   
	private void safeNotifyListener(final String dataId, final String group, final String content, 										final String type,final String md5, 
                                    final String encryptedDataKey, 
                                    final ManagerListenerWrap listenerWrap) {
        
        final Listener listener = listenerWrap.listener;
        
        Runnable job = () -> {

			// 监听回调
			listener.receiveConfigInfo(content);
         
            // 更新监听器最新的 MD5 值
			listenerWrap.lastCallMd5 = md5;
  
        };

		// 执行任务
		listener.getExecutor().execute(job);
    }
}

listener.receiveConfigInfo(content); 这个回调方法就是示例中的回调

至此,整个流程完毕了但没有完全完毕。先将上述流程画个图总结下

看出来执行上图中的配置监听,是通过队列机制触发的,事实上,不止添加监听器会向队列中加数据,还有其他几种方式:

完整的通知变更来源有以下5点

  1. RPC 连接建立成功

  2. 服务端推送变更

  3. 内容发生变更

  4. 移除监听器

  5. 添加监听器

本篇篇幅太多,下一篇讲解 服务端推送变更实现原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/176129.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

排序算法解析:快排,归并 (全)

一、快排原始快排 算法思想&#xff1a;ps&#xff1a;排序的效果其实就是使一个数列中的每个数都满足左边数比它小、右边数比它大&#xff08;假设升序&#xff09;。接下来我们来了解快排&#xff1a;多次递归遍历&#xff0c;每单次遍历&#xff0c;设定一个限定值&#xff…

02 |「数据结构、逻辑结构、物理结构」基本概念简析

前言 前言&#xff1a;简析数据结构、逻辑结构、物理结构。 文章目录前言一、数据结构1. 简介2. 数据3. 结构4. 分析5. 分类1&#xff09;线性结构&#xff08;线性表&#xff09;2&#xff09;树结构3&#xff09;图结构二、逻辑结构与物理结构1. 为什么要有逻辑结构和物理结构…

SpringBoot+Vue--前端搭建-笔记1

前端搭建 首先安装node.js(百度) 官网下载地址&#xff1a;http://nodejs.cn/download 以前写的关于npm 后端了解的npm_biubiubiu0706的博客-CSDN博客 安装Node.js淘宝镜像加速器(cnpm) npm install cnpm -g(可以不安装) #建议使用如下语句解决npm速度慢的问题 好比设置仓…

代码随想录算法训练营三期 day 24 - 回溯 (1) (补)

回溯算法理论基础 什么是回溯法 回溯法也可以叫做回溯搜索法&#xff0c;它是一种搜索的方式。回溯是递归的副产品&#xff0c;只要有递归就会有回溯。所以以下讲解中&#xff0c;回溯函数也就是递归函数&#xff0c;指的都是一个函数。 回溯法的效率 回溯的本质是穷举&…

【手把手教你学51单片机】中断的优先级

注&#xff1a;本文章转载自《手把手教你学习51单片机》&#xff01;因转载需要原文链接&#xff0c;故无法选择转载&#xff01; 如若侵权&#xff0c;请联系我进行删除&#xff01;上传至网络博客目的为了记录自己学习的过程的同时&#xff0c;同时能够帮助其他一同学习的小伙…

第四十三章 动态规划——最长单调序列模型

第四十三章 动态规划——最长单调序列模型一、最长单调序列模型1、模型母题2、思路分析&#xff08;两种方法&#xff1a;DP&#xff0c;贪心&#xff09;二、模型的应用1、AcWing 1017. 怪盗基德的滑翔翼&#xff08;1&#xff09;问题&#xff08;2&#xff09;分析&#xff…

C规范编辑笔记(十四)

往期文章&#xff1a; C规范编辑笔记(一) C规范编辑笔记(二) C规范编辑笔记(三) C规范编辑笔记(四) C规范编辑笔记(五) C规范编辑笔记(六) C规范编辑笔记(七) C规范编辑笔记(八) C规范编辑笔记(九) C规则编辑笔记(十) C规范编辑笔记(十一) C规范编辑笔记(十二) C规范编辑笔记(…

Linux进程学习【一】

✨个人主页&#xff1a; Yohifo &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f38a;每篇一句&#xff1a; 图片来源 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 Perseverance is not a long race; it is many short races one after another…

Linux基本功系列之rename命令实战

文章目录一. rename 命令介绍二. 语法格式及常用选项三. 参考案例3.1 将当前目录下所有.cfg的文件&#xff0c;替换为.txt结尾3.2 将所有出现mufeng的部分都替换为mufeng13.3 将mufeng0开头都变成mufeng00开头3.4 rename支持正则表示式总结前言&#x1f680;&#x1f680;&…

2023-1-22 刷题情况

积水面积 先祝大家新年快乐&#xff0c;新的一年&#xff0c;万事如意。 题目描述 一组正整数&#xff0c;分别表示由正方体叠起的柱子的高度。若某高度值为 xxx&#xff0c;表示由 xxx 个正立方的方块叠起&#xff08;如下图&#xff0c;0≤x≤50000 \le x \le 50000≤x≤5…

卷积神经网络进阶--基础知识

卷积神经网络进阶 b站课程链接碳基生物都能学会的神经网络&#xff08;跳着看的&#xff09; 因为我用的是pytorch&#xff0c;而该课程是用tenserflow的&#xff0c;所以主要记了一下理论 为什么要讲不同的网络结构 不同的网络结构解决的问题不同不同的网络结构使用的技巧不同…

【人工智能原理自学】卷积神经网络:打破图像识别的瓶颈

&#x1f60a;你好&#xff0c;我是小航&#xff0c;一个正在变秃、变强的文艺倾年。 &#x1f514;本文讲解卷积神经网络&#xff1a;打破图像识别的瓶颈&#xff0c;一起卷起来叭&#xff01; 目录一、手写体识别二、“炼丹”一、手写体识别 在机器学习、神经网络领域&#…

【数据分析】(task4)数据可视化

note matplotlib的四个容器&#xff1a; Figure&#xff1a;顶层级&#xff0c;用来容纳子 Axes&#xff0c;一组具体绘图元素和画布&#xff08;canvas&#xff09;。 画板。Axes&#xff1a;matplotlib宇宙的核心&#xff0c;容纳了大量元素用来构造一幅幅子图&#xff0c;一…

【QT5.9】与MFC对比学习笔记-感悟篇【2023.01.22】

简介 在公司从事MFC的程序维护一年两个月&#xff0c;期间因为公司被QT告侵权对QT产生了抵触的心情。现在无奈要用到&#xff0c;需要抓紧学习了。 正文 1.数据模型 先说下刚用到的模型&#xff0c;模型也叫数据模型&#xff0c;也就是耳熟的MVC架构中的M&#xff08;Model…

我用笨办法啃下了一个开源项目的源码!

目录 1、从最简单的源码开始&#xff1a;别幻想一步登天 2、循序渐进&#xff1a;先搞定底层依赖的技术 3、一定要以Hello World作为入口来阅读 4、抓大放小&#xff0c;边写注释边画图 5、反复三遍&#xff0c;真正理解源码 6、借力打力&#xff0c;参考源码分析书籍及博客 7…

研一寒假C++复习笔记--引用的使用

​​​​​​​ 目录 1--引用的基本语法 2--引用的注意事项 3--在函数参数中使用引用 4--引用作函数的返回值 5--引用的本质 6--常量引用 1--引用的基本语法 引用相当于给变量起别名&#xff0c;其基本语法如下&#xff1a; 数据类型 &别名 原名 # include <…

Linux操作系统之进程信号

代码存放在&#xff1a;https://github.com/sjmshsh/System-Call-Learn/tree/master/signal 我们先来看一张图&#xff0c;了解一下通过阅读本博客&#xff0c;你可以收获什么。 背景知识 首先我说明一点 信号 ! 信号量 我们这篇文章讲解的是信号&#xff0c;不是信号量 信…

POJ3263. Tallest Cow题解(c++ 前缀和)

POJ3263. Tallest Cow 传送门&#xff1a;Tallest Cow 题目&#xff1a; 有N头牛站成一行。两头作能够相支看见&#xff0c;当且仅当它们中间的牛身高都比它们矮。现在&#xff0c;我们只知道其中最高的牛是第P头&#xff0c;它的身高是H&#xff0c;不知道剩余N-1头牛的身高。…

大数据之Kafka高级知识点

文章目录前言一、分片和副本机制&#xff08;一&#xff09;分片机制&#xff08;二&#xff09;副本二、Kafka如何保证数据不丢失&#xff08;一&#xff09;Producer生产者&#xff08;二&#xff09;Broker&#xff08;三&#xff09;Consumer消费者三、消息存储和查询机制总…

重新设计 TCP 协议

看一段关于 TCP 协议的历史讨论&#xff0c;源自&#xff1a;The design philosophy of the DARPA internet protocols 读这段文字时&#xff0c;你可能觉得这不是在谈 TCP&#xff0c;而是在创造一个新协议&#xff0c;但事实上这就是 TCP 在被创造过程中真实的纠结。 现在来…