Nacos源码解读09——配置中心配置信息创建修改怎么处理的

news2024/11/17 6:25:25

存储配置

在这里插入图片描述
从整体上Nacos服务端的配置存储分为三层:
内存:Nacos每个节点都在内存里缓存了配置,但是只包含配置的md5(缓存配置文件太多了),所以内存级别的配置只能用于比较配置是否发生了变更,只用于客户端长轮询配置等场景。
文件系统:文件系统配置来源于数据库写入的配置。如果是集群启动或mysql单机启动,服务端会以本地文件系统的配置响应客户端查询。
数据库:所有写数据都会先写入数据库。只有当以derby数据源(-DembeddedStorage=true)单机(-Dnacos.standalone=true)启动时,客户端的查询配置请求会实时查询derby数据库。

服务端创建修改配置

可以看到在服务端创建和修改配置的时候会调用到ConfigController的publishConfig中进行配置的发布
在这里插入图片描述

@PostMapping
    @Secured(action = ActionTypes.WRITE, signType = SignType.CONFIG)
    public Boolean publishConfig(HttpServletRequest request, @RequestParam(value = "dataId") String dataId,
            @RequestParam(value = "group") String group,
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
            @RequestParam(value = "appName", required = false) String appName,
            @RequestParam(value = "src_user", required = false) String srcUser,
            @RequestParam(value = "config_tags", required = false) String configTags,
            @RequestParam(value = "desc", required = false) String desc,
            @RequestParam(value = "use", required = false) String use,
            @RequestParam(value = "effect", required = false) String effect,
            @RequestParam(value = "type", required = false) String type,
            @RequestParam(value = "schema", required = false) String schema) throws NacosException {
        ......
        // 目标灰度机器的IP地址。
        String betaIps = request.getHeader("betaIps");
        //构建配置信息
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        configInfo.setType(type);
        String encryptedDataKey = pair.getFirst();
        configInfo.setEncryptedDataKey(encryptedDataKey);
        //如果没配置灰度地址
        if (StringUtils.isBlank(betaIps)) {
             //没有配置标签
            if (StringUtils.isBlank(tag)) {
                //添加配置信息
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                //发布一个配置变更事件
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else {
            // 灰度发布
            configInfo.setEncryptedDataKey(encryptedDataKey);
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        //持久化日志
        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
                InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;            

}

PersistService有两个实现类一个是EmbeddedStoragePersistServiceImpl(嵌入式数据源derby)
一个ExternalStoragePersistServiceImpl(外部数据源) 而我使用的是mysql做的配置的持久化 所以用到的是ExternalStoragePersistServiceImpl 将数据写到mysql 的config_info表里

    @Override
    public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time,
            Map<String, Object> configAdvanceInfo, boolean notify) {
        try {
             //添加配置
            addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);
        } catch (DataIntegrityViolationException ive) { 
         // 2. 唯一约束冲突,更新
            updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);
        }
    }

将数据持久化

addConfigInfo尝试插入在一个事务里保存了config_info、config_tags_relation、his_config_info。

    @Override
    public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
            final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
        //tjc 就是TransactionTemplate 这里其实就是做的mysql的数据写入
        boolean result = tjt.execute(status -> {
            try {
                // 1. 保存config_info
                long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);
                 // 2. 保存config_tags_relation
                String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
                addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                / 3. 记录日志his_config_info
                insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
            } catch (CannotGetJdbcConnectionException e) {
                LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);
                throw e;
            }
            return Boolean.TRUE;
        });
    }

当发生了唯一索引冲突的时候会去修改配置信息

    @Override
    public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser,
            final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
        boolean result = tjt.execute(status -> {
            try {
                // 1. 查询config_info
                ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                String appNameTmp = oldConfigInfo.getAppName();
                /*
                 If the appName passed by the user is not empty, use the persistent user's appName,
                 otherwise use db; when emptying appName, you need to pass an empty string
                 */
                if (configInfo.getAppName() == null) {
                    configInfo.setAppName(appNameTmp);
                }
                // 2. 更新config_info
                updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);
                String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
                if (configTags != null) {
                    // 3. 删除所有老tag config_tags_relation
                    removeTagByIdAtomic(oldConfigInfo.getId());
                    // 4. 新增tag config_tags_relation
                    addConfigTagsRelation(oldConfigInfo.getId(), configTags, configInfo.getDataId(),
                            configInfo.getGroup(), configInfo.getTenant());
                }
                // 5. 记录日志
                insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");
            } catch (CannotGetJdbcConnectionException e) {
                LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);
                throw e;
            }
            return Boolean.TRUE;
        });
    }

数据变更事件发布

当数据持久化之后会去发送一个配置数据变更的ConfigDataChangeEvent事件
AsyncNotifyService会监听到ConfigDataChangeEvent事件来进行处理

    @Autowired
    public AsyncNotifyService(ServerMemberManager memberManager) {
        this.memberManager = memberManager;
        
        // 将ConfigDataChangeEvent注册到NotifyCentre。
        NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
        
        // 注册订阅服务器以订阅ConfigDataChangeEvent。
        NotifyCenter.registerSubscriber(new Subscriber() {
            
            @Override
            public void onEvent(Event event) {
                // 如果是配置变更事件
                if (event instanceof ConfigDataChangeEvent) {
                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                    long dumpTs = evt.lastModifiedTs;
                    String dataId = evt.dataId;
                    String group = evt.group;
                    String tenant = evt.tenant;
                    String tag = evt.tag;
                    //获取集群虽有节点列表
                    Collection<Member> ipList = memberManager.allMembers();
                    
                    // 每个节点组成一个Task
                    Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
                    Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
                    //遍历集群节点信息 
                    for (Member member : ipList) {
                        //判断是否是长轮询
                        if (!MemberUtil.isSupportedLongCon(member)) {
                          // 添加一个长轮询的异步dump任务
                            httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                                    evt.isBeta));
                        } else {
                        // 添加一个长连接的异步dump任务
                            rpcQueue.add(
                                    new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                        }
                    }
                      // 判断并执行长轮询的异步dump任务
                    if (!httpQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
                    }
                     // 判断并执行长连接的异步dump任务
                    if (!rpcQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                    }
                    
                }
            }
            
            @Override
            public Class<? extends Event> subscribeType() {
                return ConfigDataChangeEvent.class;
            }
        });
    }

在接收到ConfigDataChangeEvent之后,如果Nacos2.0以上的版本,会创建一个RpcTask用以执行配置变更的通知,由内部类AsyncRpcTask执行,AsyncRpcTask具体逻辑如下所示。

    class AsyncRpcTask implements Runnable {
        
        private Queue<NotifySingleRpcTask> queue;
        
        public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
            this.queue = queue;
        }
        
        @Override
        public void run() {
            while (!queue.isEmpty()) {
                NotifySingleRpcTask task = queue.poll();
                // 创建配置变更请求
                ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
                syncRequest.setDataId(task.getDataId());
                syncRequest.setGroup(task.getGroup());
                syncRequest.setBeta(task.isBeta);
                syncRequest.setLastModified(task.getLastModified());
                syncRequest.setTag(task.tag);
                syncRequest.setTenant(task.getTenant());
                Member member = task.member;
                 // 如果是自身的数据变更,直接执行dump操作
                if (memberManager.getSelf().equals(member)) {
                    if (syncRequest.isBeta()) {
                        // 同步Beta配置
                        dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                syncRequest.getLastModified(), NetUtils.localIP(), true);
                    } else {
                       // 像连接自己节点的client端进行配置信息的推送
                        dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
                    }
                    continue;
                }
                // 通知集群其他节点进行dump
                if (memberManager.hasMember(member.getAddress())) {
                    // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
                    boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
                    if (unHealthNeedDelay) {
                        // target ip is unhealthy, then put it in the notification list
                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                                task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                                0, member.getAddress());
                        // get delay time and set fail count to the task
                        asyncTaskExecute(task);
                    } else {
    
                        if (!MemberUtil.isSupportedLongCon(member)) {
                            asyncTaskExecute(
                                    new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
                                            task.getLastModified(), member.getAddress(), task.isBeta));
                        } else {
                            try {
                                configClusterRpcClientProxy
                                        .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                            } catch (Exception e) {
                                MetricsMonitor.getConfigNotifyException().increment();
                                asyncTaskExecute(task);
                            }
                        }
                      
                    }
                } else {
                    //No nothig if  member has offline.
                }
                
            }
        }
    }

向连接自己的Client端发送配置变更通知

这里首先创建了一个ConfigChangeClusterSyncRequest,并将配置信息写入。然后获取集群信息,通知相应的Server处理的数据同步请求。同步配置变更信息的核心逻辑由DumpService来执行。我们主要查看同步正式配置的操作,DumpService的dump方法如下所示。

    public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
            boolean isBeta) {
        //dataId+分组    
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        //dataId+分组+是否灰度发布+标签
        String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
        //添加dump任务
        dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
        DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
    }

在该方法中,这里会根据配置变更信息,提交一个异步的DumpTask任务,后续会由DumpProcessor类的process方法进行处理

DumpProcessor执行dump任务

public class DumpProcessor implements NacosTaskProcessor {
    
    final DumpService dumpService;
    
    public DumpProcessor(DumpService dumpService) {
        this.dumpService = dumpService;
    }
    
    @Override
    public boolean process(NacosTask task) {
        final PersistService persistService = dumpService.getPersistService();
        DumpTask dumpTask = (DumpTask) task;
        String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
        String dataId = pair[0];
        String group = pair[1];
        String tenant = pair[2];
        long lastModified = dumpTask.getLastModified();
        String handleIp = dumpTask.getHandleIp();
        boolean isBeta = dumpTask.isBeta();
        String tag = dumpTask.getTag();
        //构建 ConfigDumpEvent
        ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
                .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
        //如果是灰度发布
        if (isBeta) {
            // if publish beta, then dump config, update beta cache
            ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
            
            build.remove(Objects.isNull(cf));
            build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
            build.content(Objects.isNull(cf) ? null : cf.getContent());
            build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
            
            return DumpConfigHandler.configDump(build.build());
        }
        //判断是否有标签 
        if (StringUtils.isBlank(tag)) {
           // 1. 查询当前配置
            ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
            
            build.remove(Objects.isNull(cf));
             // 2. 设置ConfigDumpEvent的content为最新的content
            build.content(Objects.isNull(cf) ? null : cf.getContent());
            build.type(Objects.isNull(cf) ? null : cf.getType());
            build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
        } else {
            ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
            
            build.remove(Objects.isNull(cf));
            build.content(Objects.isNull(cf) ? null : cf.getContent());
            
           
        }
         // 3. 执行ConfigDumpEvent处理
        return DumpConfigHandler.configDump(build.build());
    }
}
    public static boolean configDump(ConfigDumpEvent event) {
        final String dataId = event.getDataId();
        final String group = event.getGroup();
        final String namespaceId = event.getNamespaceId();
        final String content = event.getContent();
        final String type = event.getType();
        final long lastModified = event.getLastModifiedTs();
        final String encryptedDataKey = event.getEncryptedDataKey();
        ......
                //真正执行dump请求的地方
                result = ConfigCacheService
                        .dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);
                
        ......
        
    }
    /**
     * Save config file and update md5 value in cache.
     *
     * @param dataId         dataId string value.
     * @param group          group string value.
     * @param tenant         tenant string value.
     * @param content        content string value.
     * @param lastModifiedTs lastModifiedTs.
     * @param type           file type.
     * @return dumpChange success or not.
     */
    public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
            String type, String encryptedDataKey) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        CacheItem ci = makeSure(groupKey, encryptedDataKey, false);
        ci.setType(type);
        final int lockResult = tryWriteLock(groupKey);
        assert (lockResult != 0);
        
        if (lockResult < 0) {
            DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
            return false;
        }
        
        try {
            //获取md5
            final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
            if (lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey)) {
                DUMP_LOG.warn("[dump-ignore] the content is old. groupKey={}, md5={}, lastModifiedOld={}, "
                                + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                        lastModifiedTs);
                return true;
            }if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) && DiskUtil.targetFile(dataId, group, tenant).exists()) {
                DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                                + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                        lastModifiedTs);
             //若直接读取,则从持久层获取数据      
            } else if (!PropertyUtil.isDirectRead()) {
                //持久化数据
                DiskUtil.saveToDisk(dataId, group, tenant, content);
            }
            //更新md5值
            updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);
            return true;
        } catch (IOException ioe) {
            DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe);
            if (ioe.getMessage() != null) {
                String errMsg = ioe.getMessage();
                if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
                        .contains(DISK_QUATA_EN)) {
                    // Protect from disk full.
                    FATAL_LOG.error("磁盘满自杀退出", ioe);
                    System.exit(0);
                }
            }
            return false;
        } finally {
            releaseWriteLock(groupKey);
        }
    }
    public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey) {
        CacheItem cache = makeSure(groupKey, encryptedDataKey, false);
       
        if (cache.md5 == null || !cache.md5.equals(md5)) {
            cache.md5 = md5;
            cache.lastModifiedTs = lastModifiedTs;
            //发送本地数据变更事件
            NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
        }
    }

RpcConfigChangeNotifier

    @Override
    public void onEvent(LocalDataChangeEvent event) {
        String groupKey = event.groupKey;
        boolean isBeta = event.isBeta;
        List<String> betaIps = event.betaIps;
        String[] strings = GroupKey.parseKey(groupKey);
        String dataId = strings[0];
        String group = strings[1];
        String tenant = strings.length > 2 ? strings[2] : "";
        String tag = event.tag;
        //配置数据变更
        configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
        
    }
    /**
     * adaptor to config module ,when server side config change ,invoke this method.
     *
     * @param groupKey groupKey
     */
    public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
            List<String> betaIps, String tag) {
        //获取注册的Client列表
        Set<String> listeners = configChangeListenContext.getListeners(groupKey);
        if (CollectionUtils.isEmpty(listeners)) {
            return;
        }
        int notifyClientCount = 0;
        //遍历client列表
        for (final String client : listeners) {
            //拿到grpc连接
            Connection connection = connectionManager.getConnection(client);
            if (connection == null) {
                continue;
            }
            
            ConnectionMeta metaInfo = connection.getMetaInfo();
            //beta ips check.
            String clientIp = metaInfo.getClientIp();
            String clientTag = metaInfo.getTag();
            if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
                continue;
            }
            //tag check
            if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
                continue;
            }
            //构建请求参数
            ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
            //构建推送任务
            RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());
            //推送任务 向客户端发送变更通知
            push(rpcPushRetryTask);
            notifyClientCount++;
        }
        Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);
    }

这里实际上也是一个异步执行的过程,推送任务RpcPushTask会被提交到ClientConfigNotifierServiceExecutor来计划执行,第一次会立即推送配置,即调用RpcPushTask的run方法,如果失败则延迟重试次数x2的秒数再次执行,直到超过重试次数,主动注销当前连接

    private void push(RpcPushTask retryTask) {
        ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;
        // 判断是否重试次数达到限制
        if (retryTask.isOverTimes()) {
            Loggers.REMOTE_PUSH.warn(
                    "push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.",
                    notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(),
                    retryTask.connectionId);
                    // 主动注销连接
            connectionManager.unregister(retryTask.connectionId);
        } else if (connectionManager.getConnection(retryTask.connectionId) != null) {
            // first time :delay 0s; sencond time:delay 2s  ;third time :delay 4s
             // 尝试执行配置推送
            ConfigExecutor.getClientConfigNotifierServiceExecutor()
                    .schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);
        } else {
            // client is already offline,ingnore task.
        }
        
    }

RpcPushTask

        @Override
        public void run() {
            tryTimes++;
            if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {
                push(this);
            } else {
                //推送任务
                rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
                    @Override
                    public void onSuccess() {
                        tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);
                    }
                    
                    @Override
                    public void onFail(Throwable e) {
                        tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);
                        Loggers.REMOTE_PUSH.warn("Push fail", e);
                        push(RpcPushTask.this);
                    }
                    
                }, ConfigExecutor.getClientConfigNotifierServiceExecutor());
                
            }
            
        }

客户端处理变更通知

ClientWorker

 private void initRpcClientHandler(final RpcClient rpcClientInner) {
             rpcClientInner.registerServerRequestHandler((request) -> {
                if (request instanceof ConfigChangeNotifyRequest) {
                    ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
                    LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",
                            rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),
                            configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
                    String groupKey = GroupKey
                            .getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),
                                    configChangeNotifyRequest.getTenant());
                    
                    CacheData cacheData = cacheMap.get().get(groupKey);
                    if (cacheData != null) {
                        synchronized (cacheData) {
                            cacheData.getLastModifiedTs().set(System.currentTimeMillis());
                            cacheData.setSyncWithServer(false);
                            //向阻塞队列中添加元素 触发长连接的执行
                            notifyListenConfig();
                        }
                        
                    }
                    //返回客户端响应
                    return new ConfigChangeNotifyResponse();
                }
                return null;
            });

}

后续逻辑处理看客户端的逻辑

向集群节点推送通知

AsyncRpcTask

 configClusterRpcClientProxy
                                        .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
    public void syncConfigChange(Member member, ConfigChangeClusterSyncRequest request, RequestCallBack callBack)
            throws NacosException {
    
        clusterRpcClientProxy.asyncRequest(member, request, callBack);
        
    }
    public void asyncRequest(Member member, Request request, RequestCallBack callBack) throws NacosException {
        RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
        if (client != null) {
            client.asyncRequest(request, callBack);
        } else {
            throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
        }
    }

ConfigChangeClusterSyncRequestHandler 处理ConfigChangeClusterSyncRequest

    @TpsControl(pointName = "ClusterConfigChangeNotify")
    @Override
    public ConfigChangeClusterSyncResponse handle(ConfigChangeClusterSyncRequest configChangeSyncRequest,
            RequestMeta meta) throws NacosException {
        //是否是灰度
        if (configChangeSyncRequest.isBeta()) {
            dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
                    configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp(),
                    true);
        } else {
           //向连接自己节点的 client端进行数据的同步
            dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
                    configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
        }
        return new ConfigChangeClusterSyncResponse();
    }

服务端配置变更总结

1.当从服务端进行配置的新增和修改会先将数据给持久化到内嵌的数据源或者外部的比如mysql中
2.然后发送配置变更的事件会将配置通过GRPC协议同步给连接自己的cleint端 和连接集群中其他节点的客户端

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1299074.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SSM实现的公文管理系统

一、技术架构 前端&#xff1a;jsp | jquery | bootstrap 后端&#xff1a;spring | springmvc | mybatis 环境&#xff1a;jdk1.8 | mysql | maven 二、代码及数据库 三、功能介绍 01. 登录页 02. 首页 03. 系统管理-角色管理 04. 系统管理-功能管理 05. 系统管理-用…

[数据启示录 02] 堆栈

堆栈&#xff08;stack&#xff09;是一种基于后进先出&#xff08;LIFO&#xff0c;Last In First Out&#xff09;原则的数据结构。它模拟了现实生活中的堆栈&#xff0c;类似于一摞盘子或一堆书。 堆栈有两个基本操作&#xff1a;入栈&#xff08;push&#xff09;和出栈&a…

前端面试——CSS面经(持续更新)

1. CSS选择器及其优先级 !important > 行内样式 > id选择器 > 类/伪类/属性选择器 > 标签/伪元素选择器 > 子/后台选择器 > *通配符 2. 重排和重绘是什么&#xff1f;浏览器的渲染机制是什么&#xff1f; 重排(回流)&#xff1a;当增加或删除dom节点&…

SLAM算法与工程实践——SLAM基本库的安装与使用(4):Sophus库

SLAM算法与工程实践系列文章 下面是SLAM算法与工程实践系列文章的总链接&#xff0c;本人发表这个系列的文章链接均收录于此 SLAM算法与工程实践系列文章链接 下面是专栏地址&#xff1a; SLAM算法与工程实践系列专栏 文章目录 SLAM算法与工程实践系列文章SLAM算法与工程实践…

数据结构之交换排序

目录 交换排序 冒泡排序 冒泡排序的时间复杂度 快速排序 快速排序单趟排序的时间复杂度 快速排序的时间复杂度 快速排序的优化 优化1&#xff1a;三数取中法 优化2&#xff1a;小区间优化法 交换排序 在日常生活中交换排序的使用场景是很多的&#xff0c;比如在学校做…

第76讲:MySQL数据库中常用的命令行工具的基本使用

文章目录 1.mysql客户端命令工具2.mysqladmin管理数据库的客户端工具3.mysqlbinlog查看数据库中的二进制日志4.mysqlshow统计数据库中的信息5.mysqldump数据库备份工具6.mysqllimport还原备份的数据7.source命令还原SQL类型的备份文件 MySQL数据库提供了很多的命令行工具&#…

Linux系统---简易伙伴系统

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C/C》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、题目要求 1.采用C语言实现 2.伙伴系统采用free_area[11]数组来组织。要求伙伴内存最小为一个页面&#xff0c;页面大小为4KB…

2023年电工(初级)证模拟考试题库及电工(初级)理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年电工&#xff08;初级&#xff09;证模拟考试题库及电工&#xff08;初级&#xff09;理论考试试题是由安全生产模拟考试一点通提供&#xff0c;电工&#xff08;初级&#xff09;证模拟考试题库是根据电工&…

esxi全称“VMware ESXi

esxi全称“VMware ESXi”&#xff0c;是可直接安装在物理服务器上的强大的裸机管理系统&#xff0c;是一款虚拟软件&#xff1b;ESXi本身可以看做一个操作系统&#xff0c;采用Linux内核&#xff0c;安装方式为裸金属方式&#xff0c;可直接安装在物理服务器上&#xff0c;不需…

Kubernetes架构及核心部件

文章目录 1、Kubernetes集群概述1.1、概述1.2、通过声明式API即可 2、Kubernetes 集群架构2.1、Master 组件2.1.1、API Server2.1.2、集群状态存储2.1.3、控制器管理器2.1.4、调度器 2.2、Worker Node 组件2.2.1、kubelet2.2.2、容器运行时环境2.2.3、kube-proxy 2.3、图解架构…

大数据Doris(三十五):Unique模型(唯一主键)介绍

文章目录 Unique模型(唯一主键)介绍 一、创建doris表 二、插入数据

LANDSAT_7/02/T1/TOA的Landsat7_C2_TOA类数据集

Landsat7_C2_TOA数据集是将数据每个波段的辐射亮度值转换为大气层顶表观反射率TOA&#xff0c;是飞行在大气层之外的航天传感器量测的反射率&#xff0c;包括了云层、气溶胶和气体的贡献&#xff0c;可通过辐射亮度定标参数、太阳辐照度、太阳高度角和成像时间等几个参数计算得…

P9 LinuxC 进程概述 终端启动的程序父进程是终端

前言 &#x1f3ac; 个人主页&#xff1a;ChenPi &#x1f43b;推荐专栏1: 《C_ChenPi的博客-CSDN博客》✨✨✨ &#x1f525; 推荐专栏2: 《Linux C应用编程&#xff08;概念类&#xff09;_ChenPi的博客-CSDN博客》✨✨✨ &#x1f6f8;推荐专栏3: ​​​​​​《链表_ChenP…

Qt实现二维码生成和识别

一、简介 QZxing开源库: 生成和识别条码和二维码 下载地址&#xff1a;https://gitcode.com/mirrors/ftylitak/qzxing/tree/master 二、编译与使用 1.下载并解压&#xff0c;解压之后如图所示 2.编译 打开src目录下的QZXing.pro&#xff0c;选择合适的编译器进行编译 最后生…

Ansible中执行流控制

1.ansible中的迭代循环 创建目录和文件 vim createfile.yaml - name: create file playbook hosts: all tasks: - name: create file file: path: "/mnt/{{item[name]}}" state: …

华为ensp实验——基于全局地址池的DHCP组网实验

目录 前言实验目的实验内容实验结果 前言 该实验基于华为ensp&#xff0c;版本号是1.3.00.100 V100R003C00SPC100&#xff0c;只供学习和参考&#xff0c;不作任何商业用途。 具体的DHCP命令可以看系列文章链接&#xff0c;计算机网络实验&#xff08;华为eNSP模拟器&#xff…

机场信息集成系统系列介绍(2):机场航班报文处理系统

本文介绍机场航班报文处理系统。#机场##sita##AFTN##航空# 一、定义 机场航班报文处理系统是一种基于计算机技术的自动化处理系统&#xff0c;用于接收、解析、处理和传递与航班相关的报文信息。这些报文可能包括航班计划、航班状态更新、旅客信息等&#xff0c;通常来源于航…

UniGui使用CSSUniTreeMenu滚动条

有些人反应UniTreeMenu当菜单项目比较多的时候会超出但是没有出滚动条&#xff0c;只需要添加如下CSS 老规矩&#xff0c;unitreemeu的layout的componentcls里添加bbtreemenu&#xff0c;然后在css里添加 .bbtreemenu .x-box-item{ overflow-y: auto; } 然后当内容超出后就会…

AI 赋能 | 智能制造的 AI 算法开发和工程实现

谈到智能制造、智慧工厂&#xff0c;愿景是美好的&#xff0c;借助计算机视觉技术和 AI 算法&#xff0c;为自动化生产线赋予环境感知的能力&#xff0c;从而改善工艺流程&#xff0c;提高生产效率。但是&#xff0c;随着柔性化生产的需求增长&#xff0c;产线的布局调整和功能…

文章解读与仿真程序复现思路——电力自动化设备EI\CSCD\北大核心《考虑源网荷效益的峰谷电价与峰谷时段双层优化模型》

这个标题涉及到电力定价和能源效益的优化模型。让我来分解一下&#xff1a; 峰谷电价&#xff1a;这是一种电力定价策略&#xff0c;即在一天内不同时间段设定不同的电价。通常&#xff0c;高峰时段&#xff08;需求高&#xff09;的电价相对较高&#xff0c;而低谷时段&#x…