Apollo配置中心之Server端

news2024/11/15 15:45:01

一、通知机制

二、架构思考

1、配置变更如何通知客户端?

(1)如何建立长轮询?

2、客户端如何拉取数据?

(1)如何拉取数据?

3、如何发现变更数据?

(1)为什么使用Config Service定时扫描ReleaseMessage的方式?

(2)为什么不采用Client调用Config Service直接查询的方式?

三、源码剖析

1、配置监听

1.1、建立长轮询

1.1.1、逻辑描述
1.1.2、时序图

1.1.3、代码位置
1.1.3.1、NotificationControllerV2#pollNotification
@RestController
@RequestMapping("/notifications/v2")
public class NotificationControllerV2 implements ReleaseMessageListener {
    ...
    
    private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps.synchronizedSetMultimap(TreeMultimap.create(String.CASE_INSENSITIVE_ORDER, Ordering.natural()));
    
    @GetMapping
    public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
            @RequestParam(value = "appId") String appId,
            @RequestParam(value = "cluster") String cluster,
            @RequestParam(value = "notifications") String notificationsAsString,
            @RequestParam(value = "dataCenter", required = false) String dataCenter,
            @RequestParam(value = "ip", required = false) String clientIp) {
        List<ApolloConfigNotification> notifications = null;

        // 反序列化
        try {
            notifications =
                    gson.fromJson(notificationsAsString, notificationsTypeReference);
        } catch (Throwable ex) {
            Tracer.logError(ex);
        }
        // (非核心,不关注)
        Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);

        // (核心流程,重点关注)
        // 使用Wrapper封装DeferredResult,利用Spring的DeferredResult + tomcat实现长轮询。
        DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
        Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
        Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());

        for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
            String normalizedNamespace = notificationEntry.getKey();
            ApolloConfigNotification notification = notificationEntry.getValue();
            namespaces.add(normalizedNamespace);
            clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
            if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
                // namespace名的关系映射(非核心,不关注)
                deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
            }
        }

        // watchedKeysMap 格式: namespace : appId_cluster_namespace
        Multimap<String, String> watchedKeysMap = watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);

        Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());

        /**
         * 1、set deferredResult before the check, for avoid more waiting
         * If the check before setting deferredResult,it may receive a notification the next time
         * when method handleMessage is executed between check and set deferredResult.
         */
        deferredResultWrapper
                .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
        
        // 完成时执行,将对应deferredResultWrapper从deferredResults中移除,表示该本次长轮询结束
        deferredResultWrapper.onCompletion(() -> {
            //unregister all keys
            for (String key : watchedKeys) {
                deferredResults.remove(key, deferredResultWrapper);
            }
            logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
        });
        // (核心流程,重点关注)
        // watchedKey : 格式:appId_cluster_namespace
        // register all keys
        // 将namespace:deferredResult注册至deferredResults(Map容器)中。
        // 多个namespace对应同一个deferredResult。当namespace发生变化时,就会从deferredResults找到对应的deferredResult,通知客户端。
        // 思考:DeferredResult是什么?和长轮询有什么关系?可以和其他的异步工具有什么区别?
        for (String key : watchedKeys) {
            this.deferredResults.put(key, deferredResultWrapper);
        }

        /**
         * 2、check new release
         */
        // (核心流程,重点关注)
        // 同步检测是否有最新版本,通过WatchedKeys(格式:appId_cluster_namespace)拉取最新通知信息(如果有变更直接返回,并不会等后续通知)。
        List<ReleaseMessage> latestReleaseMessages = releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);

        /**
         * Manually close the entity manager.
         * Since for async request, Spring won't do so until the request is finished,
         * which is unacceptable since we are doing long polling - means the db connection would be hold
         * for a very long time
         */
        entityManagerUtil.closeEntityManager();

        // (核心流程,重点关注)
        // 对latestReleaseMessages进行封装,将其封装成ApolloConfigNotification类型
        // 此处ApolloConfigNotification中只返回配置发生变更的namespace及其对应的notificationId
        List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages);

        // (核心流程,重点关注)
        // 注意:这里是同步返回,如果有查询发现有最新版本,直接返回,不需要等待通知。
        // DeferredResult则需要用户在代码中手动set值到DeferredResult,否则即便异步线程中的任务执行完毕,DeferredResult仍然不会向客户端返回任何结果。
        // 如果是有新配置,则通过handleMessage函数向deferredResultWrapper#setResult赋值.
        if (!CollectionUtils.isEmpty(newNotifications)) {
            // 非主动变更,其他情况通过此处进行相应。
            deferredResultWrapper.setResult(newNotifications);
        }

        return deferredResultWrapper.getResult();
    }

    /**
    * 此处ApolloConfigNotification中只返回配置发生变更的namespace及其对应的notificationId
    **/
    private List<ApolloConfigNotification> getApolloConfigNotifications(Set<String> namespaces,
                                                                        Map<String, Long> clientSideNotifications,
                                                                        Multimap<String, String> watchedKeysMap,
                                                                        List<ReleaseMessage> latestReleaseMessages) {
        List<ApolloConfigNotification> newNotifications = Lists.newArrayList();
        // 判断是否查询到namespace的最新版本消息
        if (!CollectionUtils.isEmpty(latestReleaseMessages)) {
            Map<String, Long> latestNotifications = Maps.newHashMap();
            for (ReleaseMessage releaseMessage : latestReleaseMessages) {
                latestNotifications.put(releaseMessage.getMessage(), releaseMessage.getId());
            }

            // 遍历namespace
            for (String namespace : namespaces) {
                long clientSideId = clientSideNotifications.get(namespace);
                long latestId = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;
                Collection<String> namespaceWatchedKeys = watchedKeysMap.get(namespace);
                for (String namespaceWatchedKey : namespaceWatchedKeys) {
                    // 获取最新版本的namespace对应的nofiticationId
                    long namespaceNotificationId =
                            latestNotifications.getOrDefault(namespaceWatchedKey, ConfigConsts.NOTIFICATION_ID_PLACEHOLDER);
                    if (namespaceNotificationId > latestId) {
                        latestId = namespaceNotificationId;
                    }
                }
                // 如果Config Service中的namespace对应的通知编号大于Client上传的namespace对应的通知编号,则说明有配置变更,就执行封装动作,将最新的namespace对应的通知编号(notificationId)返回
                if (latestId > clientSideId) {
                    ApolloConfigNotification notification = new ApolloConfigNotification(namespace, latestId);
                    namespaceWatchedKeys.stream().filter(latestNotifications::containsKey).forEach(namespaceWatchedKey ->
                            notification.addMessage(namespaceWatchedKey, latestNotifications.get(namespaceWatchedKey)));
                    newNotifications.add(notification);
                }
            }
        }
        return newNotifications;
    }

}
1.1.3.2、ReleaseMessageServiceWithCache#findLatestReleaseMessagesGroupByMessages
@Service
public class ReleaseMessageServiceWithCache implements ReleaseMessageListener, InitializingBean {

  private ConcurrentMap<String, ReleaseMessage> releaseMessageCache;

  ...
  public List<ReleaseMessage> findLatestReleaseMessagesGroupByMessages(Set<String> messages) {
    // messages格式:appId_cluster_namespace
    if (CollectionUtils.isEmpty(messages)) {
      return Collections.emptyList();
    }
    List<ReleaseMessage> releaseMessages = Lists.newArrayList();

    // 此处的message命名为namespaces更合适
    for (String message : messages) {
      // 获取缓存中namespace的版本信息
      ReleaseMessage releaseMessage = releaseMessageCache.get(message);
      if (releaseMessage != null) {
        releaseMessages.add(releaseMessage);
      }
    }

    return releaseMessages;
  }
  ...
    
}

1.2、刷新ReleaseMessage缓存

1.3.1、逻辑描述

更新 ReleaseMessages,管理 releaseMessageCache,其中键为 appId_cluster_namespace,值为通知编号 notificationId。

1.3.2、代码位置
1.3.2.1、ReleaseMessageServiceWithCache#afterPropertiesSet
@Service
public class ReleaseMessageServiceWithCache implements ReleaseMessageListener, InitializingBean {

  // 维护空间最新的,结构为appId_cluster_namespace : notificationId
  private ConcurrentMap<String, ReleaseMessage> releaseMessageCache;

  private AtomicBoolean doScan;
  private ExecutorService executorService;

  private void initialize() {
    releaseMessageCache = Maps.newConcurrentMap();
    doScan = new AtomicBoolean(true);
    // 此处初始化的是单线程的线程池,不带定时任务。
    executorService = Executors.newSingleThreadExecutor(ApolloThreadFactory
        .create("ReleaseMessageServiceWithCache", true));
  }

  public List<ReleaseMessage> findLatestReleaseMessagesGroupByMessages(Set<String> messages) {
    // messages格式:appId_cluster_namespace
    if (CollectionUtils.isEmpty(messages)) {
      return Collections.emptyList();
    }
    List<ReleaseMessage> releaseMessages = Lists.newArrayList();

    for (String message : messages) {
      ReleaseMessage releaseMessage = releaseMessageCache.get(message);
      if (releaseMessage != null) {
        releaseMessages.add(releaseMessage);
      }
    }

    return releaseMessages;
  }

  @Override
  public void handleMessage(ReleaseMessage message, String channel) {
    //Could stop once the ReleaseMessageScanner starts to work
    doScan.set(false);
    logger.info("message received - channel: {}, message: {}", channel, message);

    String content = message.getMessage();
    Tracer.logEvent("Apollo.ReleaseMessageService.UpdateCache", String.valueOf(message.getId()));
    if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
      return;
    }

    // 计算本地ReleaseMessageId与触发handleMessage的ReleaseMessageId的gap
    long gap = message.getId() - maxIdScanned;
    // 如果gap等于1,直接合并
    if (gap == 1) {
      mergeReleaseMessage(message);
    } else if (gap > 1) {
      //gap found!
      // 如果gap大于1,加载gap间缺失的ReleaseMessage查询出来,并将新查询出的ReleaseMessages与历史数据进行比较,releaseMessageCache中维护最新的ReleaseMessageId(notificationId)
      loadReleaseMessages(maxIdScanned);
    }
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    // 读取配置
    populateDataBaseInterval();
    //block the startup process until load finished
    //this should happen before ReleaseMessageScanner due to autowire
    // 初始化,拉取ReleaseMessages
    loadReleaseMessages(0);
    // 异步拉取增量ReleaseMessages。(注意:这里executorService不是定时任务,而是单线程的线程池)
    // 目的:处理初始化时拉取ReleaseMessages产生的遗漏问题
    // 这里可以理解为fix(可以不关注)
    executorService.submit(() -> {
      while (doScan.get() && !Thread.currentThread().isInterrupted()) {
        Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageServiceWithCache",
            "scanNewReleaseMessages");
        try {
          // 加载ReleaseMessages
          loadReleaseMessages(maxIdScanned);
          transaction.setStatus(Transaction.SUCCESS);
        } catch (Throwable ex) {
          transaction.setStatus(ex);
          logger.error("Scan new release messages failed", ex);
        } finally {
          transaction.complete();
        }
        try {
          scanIntervalTimeUnit.sleep(scanInterval);
        } catch (InterruptedException e) {
          //ignore
        }
      }
    });
  }

  /**
   *
   * @param releaseMessage
   */
  private synchronized void mergeReleaseMessage(ReleaseMessage releaseMessage) {
    ReleaseMessage old = releaseMessageCache.get(releaseMessage.getMessage());
    // 判断当前ReleaseMessages的id是否大于历史ReleaseMessages的id,如果大于则更新缓存
    if (old == null || releaseMessage.getId() > old.getId()) {
      // message 内容为: appId_cluster_namespace
      releaseMessageCache.put(releaseMessage.getMessage(), releaseMessage);
      maxIdScanned = releaseMessage.getId();
    }
  }

  private void loadReleaseMessages(long startId) {
    boolean hasMore = true;
    while (hasMore && !Thread.currentThread().isInterrupted()) {
      //current batch is 500
      // 此处逻辑和AppNamespaceServiceWithCache的一样
      // 批量获取大于startId的500条ReleaseMessages数据(返回升序)
      // 思考:需要扫描才能知道最新的消息ID,这样的设计不太好
      List<ReleaseMessage> releaseMessages = releaseMessageRepository
          .findFirst500ByIdGreaterThanOrderByIdAsc(startId);
      if (CollectionUtils.isEmpty(releaseMessages)) {
        break;
      }
      // 将新查询出的ReleaseMessages与历史数据进行比较,releaseMessageCache中维护最新的ReleaseMessageId(notificationId)
      releaseMessages.forEach(this::mergeReleaseMessage);
      // 获取新的startId,作为当前的最新数据标记,便于后续在此startId基础上拉取后续新的ReleaseMessages
      int scanned = releaseMessages.size();
      startId = releaseMessages.get(scanned - 1).getId();
      // 当拉取数据(scanned)大于500时,说明后续还有数据,则继续执行进入while中,否则退出
      hasMore = scanned == 500;
      logger.info("Loaded {} release messages with startId {}", scanned, startId);
    }
  }

2、变更推送

Admin Service将发布后的配置,通过消息的方式发送给Config Service,然后Config Service通知对应的Client。此处可以通过消息中间件来实现消息的生产与发现,但考虑到一个中间件的引入的同时也会带来很多不确定性隐患,所以通过数据库的方式实现消息的生产与消费。

2.1、触发变更

2.1.1、逻辑描述

用户的操作发布后,通过AdminService向数据库中的ReleaseMessage表插入配置变更通知编号。

2.1.2、代码位置
2.1.3.1、DatabaseMessageSender#sendMessage
@Component
public class DatabaseMessageSender implements MessageSender {
  private BlockingQueue<Long> toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
  private final ExecutorService cleanExecutorService;

  private final ReleaseMessageRepository releaseMessageRepository;

  public DatabaseMessageSender(final ReleaseMessageRepository releaseMessageRepository) {
    cleanExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("DatabaseMessageSender", true));
    cleanStopped = new AtomicBoolean(false);
    this.releaseMessageRepository = releaseMessageRepository;
  }

  @Override
  @Transactional
  public void sendMessage(String message, String channel) {
    logger.info("Sending message {} to channel {}", message, channel);
    // 只发布APOLLO_RELEASE_TOPIC的数据
    if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
      logger.warn("Channel {} not supported by DatabaseMessageSender!", channel);
      return;
    }

    Tracer.logEvent("Apollo.AdminService.ReleaseMessage", message);
    Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
    try {
      // 存储 ReleaseMessage
      ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));
      // 添加到阻塞阻塞队列中,成功失败都会立即返回(会在初始化时,开启一个线程处理toClean)
      toClean.offer(newMessage.getId());
      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      logger.error("Sending message to database failed", ex);
      transaction.setStatus(ex);
      throw ex;
    } finally {
      transaction.complete();
    }
  }
  ...
}
2.1.3.2、ReleaseMessage
@Entity
@Table(name = "ReleaseMessage")
public class ReleaseMessage {
  // 自增ID
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  @Column(name = "Id")
  private long id;

  // message 内容为: appId_cluster_namespace
  @Column(name = "Message", nullable = false)
  private String message;

  // 更新时间
  @Column(name = "DataChange_LastTime")
  private Date dataChangeLastModifiedTime;
}

2.2、感知变更

2.2.1、逻辑描述

在bean初始化结束后执行ReleaseMessageScanner#afterPropertiesSet函数中的操作,定时扫描数据库,获取最大的ReleaseMessageId。一旦有新的ReleaseMessage就会立即通过fireMessageScanned通知监听器。

2.2.2、时序图

2.2.3、代码位置
2.2.3.1、ReleaseMessageScanner#afterPropertiesSet
public class ReleaseMessageScanner implements InitializingBean {
  @Autowired
  private ReleaseMessageRepository releaseMessageRepository;
  private int databaseScanInterval;
  private final List<ReleaseMessageListener> listeners;
  private final ScheduledExecutorService executorService;
  private final Map<Long, Integer> missingReleaseMessages; // missing release message id => age counter
  private long maxIdScanned;

  public ReleaseMessageScanner() {
    listeners = Lists.newCopyOnWriteArrayList();
    executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
        .create("ReleaseMessageScanner", true));
    missingReleaseMessages = Maps.newHashMap();
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
    // 获取最大的ReleaseMessageId
    maxIdScanned = loadLargestMessageId();
    executorService.scheduleWithFixedDelay(() -> {
      Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
      try {
        scanMissingMessages();
        // 扫描数据库中的ReleaseMessage
        scanMessages();
        transaction.setStatus(Transaction.SUCCESS);
      } catch (Throwable ex) {
        transaction.setStatus(ex);
        logger.error("Scan and send message failed", ex);
      } finally {
        transaction.complete();
      }
    }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

  }

  /**
   * Scan messages, continue scanning until there is no more messages
   */
  private void scanMessages() {
    boolean hasMoreMessages = true;
    while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
      // 循环获取最新ReleaseMessage
      hasMoreMessages = scanAndSendMessages();
    }
  }

  /**
   * scan messages and send
   *
   * @return whether there are more messages
   */
  private boolean scanAndSendMessages() {
    // 批量获取500条ReleaseMessage数据(升序)
    //current batch is 500
    List<ReleaseMessage> releaseMessages =
        releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
    if (CollectionUtils.isEmpty(releaseMessages)) {
      return false;
    }
    // 通知Listener,触发监听器
    fireMessageScanned(releaseMessages);
    int messageScanned = releaseMessages.size();
    long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
    // check id gaps, possible reasons are release message not committed yet or already rolled back
    if (newMaxIdScanned - maxIdScanned > messageScanned) {
      recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
    }
    maxIdScanned = newMaxIdScanned;
    // 一次拉取500条数据,如果超过500条则分配多次拉取
    return messageScanned == 500;
  }

  private void scanMissingMessages() {
    Set<Long> missingReleaseMessageIds = missingReleaseMessages.keySet();
    Iterable<ReleaseMessage> releaseMessages = releaseMessageRepository
        .findAllById(missingReleaseMessageIds);
    fireMessageScanned(releaseMessages);
    releaseMessages.forEach(releaseMessage -> {
      missingReleaseMessageIds.remove(releaseMessage.getId());
    });
    growAndCleanMissingMessages();
  }

  /**
   * Notify listeners with messages loaded
   * @param messages
   */
  private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
    for (ReleaseMessage message : messages) {
      for (ReleaseMessageListener listener : listeners) {
        try {
          // 通知Listener,触发监听器
          listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
        } catch (Throwable ex) {
          Tracer.logError(ex);
          logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
        }
      }
    }
  }
}

2.3、推送变更

2.3.1、逻辑描述

向与ConfigSerivce建立监听长轮询的Client端推送变更的配置信息,为了避免“惊群效应”出现,会使用线程池分批进行消息推送。

2.3.3、代码位置
2.3.3.1、NotificationControllerV2#handleMessage
@RestController
@RequestMapping("/notifications/v2")
public class NotificationControllerV2 implements ReleaseMessageListener {

    @Override
    public void handleMessage(ReleaseMessage message, String channel) {
        logger.info("message received - channel: {}, message: {}", channel, message);

        String content = message.getMessage();
        Tracer.logEvent("Apollo.LongPoll.Messages", content);
        if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
            return;
        }

        // 获取对应namespace
        String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);

        if (Strings.isNullOrEmpty(changedNamespace)) {
            logger.error("message format invalid - {}", content);
            return;
        }

        if (!deferredResults.containsKey(content)) {
            return;
        }

        //create a new list to avoid ConcurrentModificationException
        List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

        ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
        configNotification.addMessage(content, message.getId());

        //do async notification if too many clients
        // 使用线程池分批进行消息推送,避免“惊群效应”出现
        if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
            largeNotificationBatchExecutorService.submit(() -> {
                logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
                        bizConfig.releaseMessageNotificationBatch());
                for (int i = 0; i < results.size(); i++) {
                    if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
                        } catch (InterruptedException e) {
                            //ignore
                        }
                    }
                    logger.debug("Async notify {}", results.get(i));
                    results.get(i).setResult(configNotification);
                }
            });
            return;
        }

        logger.debug("Notify {} clients for key {}", results.size(), content);

        // 将变更消息设置进DeferredResult中
        for (DeferredResultWrapper result : results) {
            result.setResult(configNotification);
        }
        logger.debug("Notification completed");
    }
}

3、配置拉取

3.1、构建缓存

3.1.1、逻辑描述

为了降低数据库的查询压力,会将热点数据缓存至GuavaCache中。ConfigServiceWithCache会构建两个缓存,分别是configCache和configIdCache,其中configCache可以通过空间名称查询具体配置,configIdCache可以通过通知编号(notificationId)查询具体配置。

3.1.2、代码位置

3.1.2.1、ConfigServiceWithCache#initialize

public class ConfigServiceWithCache extends AbstractConfigService {

  ...

  private LoadingCache<String, ConfigCacheEntry> configCache;

  private LoadingCache<Long, Optional<Release>> configIdCache;

  /**
   * 初始化配置加载
   */
  @PostConstruct
  void initialize() {
    // key为namespace,value为通知编号(notificationId)及其对应的配置信息(Release)
    // 访问后配置缓存有效时间为60分钟
    configCache = CacheBuilder.newBuilder()
        .expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES)
        .build(new CacheLoader<String, ConfigCacheEntry>() {
          @Override
          public ConfigCacheEntry load(String key) throws Exception {
            List<String> namespaceInfo = STRING_SPLITTER.splitToList(key);
            if (namespaceInfo.size() != 3) {
              Tracer.logError(
                  new IllegalArgumentException(String.format("Invalid cache load key %s", key)));
              return nullConfigCacheEntry;
            }

            Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD, key);
            try {
              // 加载ReleaseMessage对象,具体就是通知编号notificationId,ReleaseMessage中包含参数有id、message(内容:appId_cluster_namespace)、dataChangeLastModifiedTime
              ReleaseMessage latestReleaseMessage = releaseMessageService.findLatestReleaseMessageForMessages(Lists
                  .newArrayList(key));
              // 获取Release 最新配置信息
              Release latestRelease = releaseService.findLatestActiveRelease(namespaceInfo.get(0), namespaceInfo.get(1),
                  namespaceInfo.get(2));

              transaction.setStatus(Transaction.SUCCESS);

              // 获取通知编号
              long notificationId = latestReleaseMessage == null ? ConfigConsts.NOTIFICATION_ID_PLACEHOLDER : latestReleaseMessage
                  .getId();

              if (notificationId == ConfigConsts.NOTIFICATION_ID_PLACEHOLDER && latestRelease == null) {
                return nullConfigCacheEntry;
              }

              // 缓存key为通知编号(notificationId),value为对应的具体配置信息(Release)
              return new ConfigCacheEntry(notificationId, latestRelease);
            } catch (Throwable ex) {
              transaction.setStatus(ex);
              throw ex;
            } finally {
              transaction.complete();
            }
          }
        });
    // key为通知编号(notificationId),value为具体配置信息
    configIdCache = CacheBuilder.newBuilder()
        .expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES)
        .build(new CacheLoader<Long, Optional<Release>>() {
          @Override
          public Optional<Release> load(Long key) throws Exception {
            Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD_ID, String.valueOf(key));
            try {
              // key为notificationId,value为具体配置
              Release release = releaseService.findActiveOne(key);

              transaction.setStatus(Transaction.SUCCESS);

              return Optional.ofNullable(release);
            } catch (Throwable ex) {
              transaction.setStatus(ex);
              throw ex;
            } finally {
              transaction.complete();
            }
          }
        });
  }
}

3.2、查询配置

3.2.1、逻辑描述

通过前序长轮询流程通知,获取的namespace对应的最新通知编号(notificationId),来查询最新配置。

3.2.2、时序图

3.2.3、代码位置
3.2.3.1、ConfigController#queryConfig
public class ConfigController {
    
  private final ConfigService configService;
  private final AppNamespaceServiceWithCache appNamespaceService;
  
  ...

  @GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")
  public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
                                  @PathVariable String namespace,
                                  @RequestParam(value = "dataCenter", required = false) String dataCenter,
                                  @RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
                                  @RequestParam(value = "ip", required = false) String clientIp,
                                  @RequestParam(value = "messages", required = false) String messagesAsString,
                                  HttpServletRequest request, HttpServletResponse response) throws IOException {
    String originalNamespace = namespace;
    //strip out .properties suffix
    namespace = namespaceUtil.filterNamespaceName(namespace);
    //fix the character case issue, such as FX.apollo <-> fx.apollo
    namespace = namespaceUtil.normalizeNamespace(appId, namespace);

    if (Strings.isNullOrEmpty(clientIp)) {
      clientIp = tryToGetClientIp(request);
    }

    // 反序列化
    ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);

    List<Release> releases = Lists.newLinkedList();

    String appClusterNameLoaded = clusterName;
    if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
      //(核心逻辑,重点关注)加载配置信息
      Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
          dataCenter, clientMessages);

      if (currentAppRelease != null) {
        releases.add(currentAppRelease);
        //we have cluster search process, so the cluster name might be overridden
        appClusterNameLoaded = currentAppRelease.getClusterName();
      }
    }

    //if namespace does not belong to this appId, should check if there is a public configuration
    // 如果namespace不属于当前appId,而是属于公共的配置文件。具体应用场景,就是应用A向共享自己的配置给其他应用,就可以将其自身的配置文件设置成public类型的
    if (!namespaceBelongsToAppId(appId, namespace)) {
      Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
          dataCenter, clientMessages);
      if (Objects.nonNull(publicRelease)) {
        releases.add(publicRelease);
      }
    }

    if (releases.isEmpty()) {
      response.sendError(HttpServletResponse.SC_NOT_FOUND,
          String.format(
              "Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
              appId, clusterName, originalNamespace));
      Tracer.logEvent("Apollo.Config.NotFound",
          assembleKey(appId, clusterName, originalNamespace, dataCenter));
      return null;
    }

    auditReleases(appId, clusterName, dataCenter, clientIp, releases);

    // 格式是:私有的ReleaseKey1+私有的ReleaseKey2+public的ReleaseKey1+public的ReleaseKey1
    String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
            .collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));

    // Client端上的ReleaseKey与Server端key相同,则没有配置没有变更
    if (mergedReleaseKey.equals(clientSideReleaseKey)) {
      // Client side configuration is the same with server side, return 304
      response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
      Tracer.logEvent("Apollo.Config.NotModified",
          assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
      return null;
    }

    ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
        mergedReleaseKey);
    // 合并配置信息
    apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));

    Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
        originalNamespace, dataCenter));
    return apolloConfig;
  }
}
3.2.3.2、AbstractConfigService#loadConfig
public abstract class AbstractConfigService implements ConfigService {
  @Autowired
  private GrayReleaseRulesHolder grayReleaseRulesHolder;

  /**
   * 加载配置
   * @return
   */
  @Override
  public Release loadConfig(String clientAppId, String clientIp, String configAppId, String configClusterName,
      String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages) {
    // load from specified cluster first
    // 从指定cluster拉取配置
    if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, configClusterName)) {
      // 查找配置
      Release clusterRelease = findRelease(clientAppId, clientIp, configAppId, configClusterName, configNamespace,
          clientMessages);

      if (Objects.nonNull(clusterRelease)) {
        return clusterRelease;
      }
    }

    // try to load via data center
    // 从指定的dataCenter的cluster加载配置
    if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, configClusterName)) {
      Release dataCenterRelease = findRelease(clientAppId, clientIp, configAppId, dataCenter, configNamespace,
          clientMessages);
      if (Objects.nonNull(dataCenterRelease)) {
        return dataCenterRelease;
      }
    }

    // fallback to default release
    // 不指定,走默认
    return findRelease(clientAppId, clientIp, configAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, configNamespace,
        clientMessages);
  }

  /**
   * 查找配置信息
   */
  private Release findRelease(String clientAppId, String clientIp, String configAppId, String configClusterName,
      String configNamespace, ApolloNotificationMessages clientMessages) {
    // 获取namespace的灰度发布的配置编号
    Long grayReleaseId = grayReleaseRulesHolder.findReleaseIdFromGrayReleaseRule(clientAppId, clientIp, configAppId,
        configClusterName, configNamespace);

    Release release = null;

    // 通过灰度的配置编号获取具体配置信息
    if (grayReleaseId != null) {
      release = findActiveOne(grayReleaseId, clientMessages);
    }

    // 如果没有灰度发布的信息,则直接获取namespace最新的配置信息
    if (release == null) {
      //(核心逻辑,重点关注)通过appId + cluster + namespace,拉取最新配置信息
      release = findLatestActiveRelease(configAppId, configClusterName, configNamespace, clientMessages);
    }

    return release;
  }
}
3.2.3.3、ConfigServiceWithCache#findLatestActiveRelease

从Cache或DB中查询namespace的最新配置信息

public class ConfigServiceWithCache extends AbstractConfigService {

  ...
    
  private LoadingCache<String, ConfigCacheEntry> configCache;

  private LoadingCache<Long, Optional<Release>> configIdCache;

  @Override
  protected Release findActiveOne(long id, ApolloNotificationMessages clientMessages) {
    Tracer.logEvent(TRACER_EVENT_CACHE_GET_ID, String.valueOf(id));
    return configIdCache.getUnchecked(id).orElse(null);
  }

  /**
   * (核心逻辑,重点关注)拉取数据,本地缓存没有就拉取DB
   * @return
   */
  @Override
  protected Release findLatestActiveRelease(String appId, String clusterName, String namespaceName,
                                            ApolloNotificationMessages clientMessages) {
    String key = ReleaseMessageKeyGenerator.generate(appId, clusterName, namespaceName);

    Tracer.logEvent(TRACER_EVENT_CACHE_GET, key);

    // 获取namespace对应的缓存配置信息,此处key为(appId+clusterName+namespaceName),获取到的信息包括通知编号(notificationId和具体配置信息Release)
    ConfigCacheEntry cacheEntry = configCache.getUnchecked(key);

    //cache is out-dated
    // 缓存过期
    if (clientMessages != null && clientMessages.has(key) &&
        clientMessages.get(key) > cacheEntry.getNotificationId()) {
      //invalidate the cache and try to load from db again
      // 清除缓存(guava cache)
      invalidate(key);
      // 重新从DB中拉取缓存,获取该namespace下的最新通知编号(notificationId)及其对应的配置信息
      cacheEntry = configCache.getUnchecked(key);
    }

    // 如果缓存的版本信息大于当前客户端所上传的版本,则直接返回最新配置信息
    return cacheEntry.getRelease();
  }

  private void invalidate(String key) {
    configCache.invalidate(key);
    Tracer.logEvent(TRACER_EVENT_CACHE_INVALIDATE, key);
  }

  @Override
  public void handleMessage(ReleaseMessage message, String channel) {
    logger.info("message received - channel: {}, message: {}", channel, message);
    if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message.getMessage())) {
      return;
    }

    try {
      // 清除缓存
      invalidate(message.getMessage());

      //warm up the cache
      // 重新从DB中拉取缓存,获取该namespace下的最新通知编号(notificationId)及其对应的配置信息
      configCache.getUnchecked(message.getMessage());
    } catch (Throwable ex) {
      //ignore
    }
  }

  private static class ConfigCacheEntry {
    private final long notificationId;
    private final Release release;

    public ConfigCacheEntry(long notificationId, Release release) {
      this.notificationId = notificationId;
      this.release = release;
    }

    public long getNotificationId() {
      return notificationId;
    }

    public Release getRelease() {
      return release;
    }
  }
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1430962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

全国医院及文体设施相关情况数据,shp+excel格式,多字段可查询,所见即所得

基本信息. 数据名称: 全国医院及文体设施相关情况数据 数据格式: shpexcel 数据几何类型: 面 数据坐标系: WGS84 数据来源&#xff1a;网络公开数据 数据字段&#xff1a; 序号字段名称字段说明1province省级名称2city城市名称3city_dm城市代码4tsgcss2020公共图书馆图…

vulhub中 Apache Airflow Celery 消息中间件命令执行漏洞复现(CVE-2020-11981)

Apache Airflow是一款开源的&#xff0c;分布式任务调度框架。在其1.10.10版本及以前&#xff0c;如果攻击者控制了Celery的消息中间件&#xff08;如Redis/RabbitMQ&#xff09;&#xff0c;将可以通过控制消息&#xff0c;在Worker进程中执行任意命令。 1.利用这个漏洞需要控…

[基础IO]文件描述符{重定向/perror/磁盘结构/inode/软硬链接}

文章目录 1. 再识重定向2.浅谈perror()3.初始文件系统4.软硬链接 1. 再识重定向 图解./sf > file.txt 2>&1 1中内容拷贝给2 使得2指向file 再学一个 把file的内容传给cat cat拿到后再给file2 2.浅谈perror() open()接口调用失败返回-1,并且错误码errno被适当的设置,…

Hive 主要内容一览

Hive架构 用户接口&#xff1a;Client CLI&#xff08;command-line interface&#xff09;、JDBC/ODBC(jdbc访问hive) 元数据&#xff1a;Metastore 元数据包括&#xff1a;表名、表所属的数据库&#xff08;默认是default&#xff09;、表的拥有者、列/分区字段、表的类型&am…

CTF(5)

一、[SWPUCTF 2021 新生赛]ez_caesar 1、题目 import base64 def caesar(plaintext):str_list list(plaintext)i 0while i < len(plaintext):if not str_list[i].isalpha():str_list[i] str_list[i]else:a "A" if str_list[i].isupper() else "a"…

vulhub中Adminer远程文件读取漏洞复现(CVE-2021-43008)

Adminer是一个PHP编写的开源数据库管理工具&#xff0c;支持MySQL、MariaDB、PostgreSQL、SQLite、MS SQL、Oracle、Elasticsearch、MongoDB等数据库。 在其版本1.12.0到4.6.2之间存在一处因为MySQL LOAD DATA LOCAL导致的文件读取漏洞。 参考链接&#xff1a; https://gith…

端到端实现高精地图重建(TopoNet解读和横评)

论文出处 [2304.05277] Graph-based Topology Reasoning for Driving Scenes (arxiv.org)https://arxiv.org/abs/2304.05277 TopoNet TopoNet的目标是从车辆上安装的多视角摄像头获取图像&#xff0c;感知实体并推理出驾驶场景的拓扑关系&#xff0c;实现端到端预测&#xf…

2017年苏州大学837复试机试C/C++

2017年苏州大学复试机试 要求 要求用C/C编程&#xff1b;对程序中必要的地方进行注释。上机规则 请在电脑桌面上新建一个文件夹文件夹名为考试姓名&#xff08;中文&#xff09;&#xff1b;考试完毕后&#xff0c;将所编写的文件放在上述文件中。 第一题&#xff08;20分&…

Node.js-1

Node.js 简介 定义&#xff1a;Node.js 是一个跨平台 JavaScript 运行环境&#xff0c;使开发者可以搭建服务器端的 JavaScript 应用程序 为什么 Node.js 能执行 JS 代码&#xff1a; Chrome 浏览器能执行 JS 代码&#xff0c;依靠的是内核中的 V8引擎&#xff08;即&#x…

react 使用react-seamless-scroll实现无缝滚动

文章目录 1. 实现无缝滚动效果2. react-seamless-scroll 无缝滚动案例介绍3. react 项目集成3.1 项目引入 cssSeamlessScroll 滚动组件3.2 完整代码3.2.1 newBet.tsx 代码3.2.2 index.module.scss 1. 实现无缝滚动效果 实现单步向下滚动点击更多展开&#xff0c;收起&#xff0…

[Angular 基础] - Angular 渲染过程 组件的创建

[Angular 基础] - Angular 渲染过程 & 组件的创建 之前的笔记为了推进度写的太笼统了&#xff08;只有功能没有其他&#xff09;&#xff0c;当时学的时候知道是什么东西&#xff0c;但是学完后重新复习发现有些内容就记不清了&#xff0c;所以重新用自己的语言总结一下 …

[晓理紫]每日论文分享(有中文摘要,源码或项目地址)--强化学习、模仿学习、机器人

专属领域论文订阅 关注{晓理紫}&#xff0c;每日更新论文&#xff0c;如感兴趣&#xff0c;请转发给有需要的同学&#xff0c;谢谢支持 如果你感觉对你有所帮助&#xff0c;请关注我&#xff0c;每日准时为你推送最新论文。 为了答谢各位网友的支持&#xff0c;从今日起免费为3…

基于YOLOv8算法的照片角度分类项目实践

目录 一、任务概述二、YOLOv8算法简介2.1 算法改进2.2 算法特点2.3 网络结构2.4 性能比较 三、工程实践3.1 安装算法框架库ultralytics3.2 库存照片预处理3.2.1 提取所有图片3.2.2 去除冗余的相同照片3.2.3 去除无车辆照片3.2.4 随机提取指定数量的图片 3.3 照片朝向分类3.3.1 …

项目02《游戏-06-开发》Unity3D

基于 项目02《游戏-05-开发》Unity3D &#xff0c; 接下来做 背包系统的 存储框架 &#xff0c; 首先了解静态数据 与 动态数据&#xff0c;静态代表不变的数据&#xff0c;比如下图武器Icon&#xff0c; 其中&#xff0c;武器的名称&#xff0c;描述&#xff…

宠物空气净化器哪个牌子好?除猫毛好的猫用空气净化器牌子推荐

大家都知道&#xff0c;宠物掉毛的情况有多么严重。特别是在换毛的季节&#xff0c;简直就是毛发遍地飞。这给家里有小孩和老人的人带来了很多困扰&#xff0c;他们可能会流鼻涕、过敏等等。而且&#xff0c;宠物有时候也会随地大小便&#xff0c;那个味道真的很难闻。家里的人…

【揭秘】JMeter JDBC脚本实战,让你的性能测试更高效!

Jmeter使用jdbc的场景&#xff1a; 1、接口功能测试时&#xff0c;需要查询验证码 2、通过数据库查询已经注册的手机号码 3、性能测试时&#xff0c;直接对某个SQL做性能测试&#xff0c;快速的发现性能问题 添加一个jdbc的配置元件 配置jdbc连接信息 配置说明&#xff1a; 1…

如何看待敏捷

局部清晰&#xff0c;循序渐进&#xff0c;整体清晰增量型 考试要么预测&#xff08;传统&#xff0c;瀑布&#xff09;&#xff0c;要么敏捷&#xff0c;要么就用混合方法 项目生命周期两种&#xff1a;预测型、敏捷型 开发生命周期四种&#xff1a;预测型、迭代型、增量型、…

JVM工作原理与实战(三十四):解决GC问题的方法

专栏导航 JVM工作原理与实战 RabbitMQ入门指南 从零开始了解大数据 目录 专栏导航 前言 一、常见的垃圾回收&#xff08;GC&#xff09;模式 二、解决GC问题的方法 1.优化基础JVM参数 2.更换垃圾回收器 3.优化垃圾回收器的参数 总结 前言 JVM作为Java程序的运行环境&a…

龙龙送外卖pta[代码+讲解]

题目 题解 代码 题目 龙龙是“饱了呀”外卖软件的注册骑手&#xff0c;负责送帕特小区的外卖。帕特小区的构造非常特别&#xff0c;都是双向道路且没有构成环 —— 你可以简单地认为小区的路构成了一棵树&#xff0c;根结点是外卖站&#xff0c;树上的结点就是要送餐的地址…

钓鱼攻击:深度解析与防范策略

一、引言 在当今的网络世界中&#xff0c;钓鱼攻击已经成为一种日益猖獗的威胁。这种攻击方式利用电子邮件、社交媒体或其他在线平台&#xff0c;伪装成可信赖的来源&#xff0c;诱导受害者点击恶意链接或下载恶意附件&#xff0c;进而窃取个人信息或实施其他恶意行为。本文将…