【架构师视角系列】Apollo配置中心之Client端(二)

news2024/10/1 7:43:49

原创文章,转载请标注。https://blog.csdn.net/leeboyce/article/details/135733075

文章目录

    • 声明
    • 配置中心系列文章
    • 一、客户端架构
      • 1、Config Service职责
        • (1)配置管理
        • (2)配置发布
        • (3)配置读取
      • 2、Apollo Client 职责
        • (1)配置拉取
        • (2)配置注入
        • (3)配置变更监听
      • 3、基本交互流程
        • (1)应用启动
        • (2)配置变更通知
        • (3)配置更新
        • (4)配置注入
    • 二、架构思考
        • (1)配置拉取的设计
        • (2)配置的注入方式
        • (3)配置变更的通知机制
        • (4)为什么配置拉取拆分为两个请求?
        • (5)长轮询的概念
        • (6)为什么需要做本地文件缓存?
    • 三、源码剖析
      • 1、初始化
        • (1)逻辑描述
        • (2)时序图
      • 2、查找注解
        • (1)逻辑描述
        • (2)时序图
        • (3)代码位置
      • 3、建立连接
        • (1)逻辑描述
        • (2)时序图
        • (3)具体函数
      • 4、拉取配置
        • (1)逻辑描述
        • (2)时序图
        • (3)代码实现
          • a)配置初始化加载(trySync)
          • b)周期配置拉取(schedulePeriodicRefresh)
          • c)长轮询监听与最新配置拉取(scheduleLongPollingRefresh)
      • 5、变更通知
        • (1)逻辑描述
        • (2)时序图
      • 6、配置注入
        • (1)逻辑描述
        • (2)代码位置
      • 四、最后

声明

原创文章,转载请标注。https://www.cnblogs.com/boycelee/p/17967590
《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

配置中心系列文章

《【架构师视角系列】Apollo配置中心之架构设计(一)》https://blog.csdn.net/leeboyce/article/details/135588206
《【架构师视角系列】Apollo配置中心之Client端(二)》https://blog.csdn.net/leeboyce/article/details/135733075

一、客户端架构

在这里插入图片描述

架构介绍会从分层、职责、关系以及运行负责四个维度进行描述。

1、Config Service职责

(1)配置管理

Config Service 是Apollo配置中心的服务端组件,负责管理应用程序的配置信息。它存储和维护应用程序的各种配置项。

(2)配置发布

Config Service 负责将最新的配置发布给注册在它上面的Apollo Client。当配置发生变更时,Config Service 负责通知所有订阅了相应配置的客户端。

(3)配置读取

Apollo Client 向 Config Service 发送请求,获取应用程序的配置信息。

2、Apollo Client 职责

(1)配置拉取

Apollo Client 负责向 Config Service 发送配置拉取请求,获取三方应用程序的配置。

(2)配置注入

Apollo Client 将从 Config Service 获取到的配置注入到三方应用程序中。

(3)配置变更监听

Apollo Client 可以注册对配置变更的监听器。当 Config Service 发布新的配置时,Apollo Client 能够感知到配置的变更,并触发相应的操作。

3、基本交互流程

(1)应用启动

Apollo Client 在应用启动时向 Config Service 发送配置拉取请求,获取初始的配置。

(2)配置变更通知

Config Service 在配置发生变更时,通知所有注册的 Apollo Client。

(3)配置更新

Apollo Client 接收到配置变更通知后,向 Config Service 发送请求,获取最新的配置。

(4)配置注入

Apollo Client 将获取到的最新配置注入到应用程序中,以便使用最新的配置信息。

通过以上交互流程达到应用不需要重启,动态配置变更的目的。

二、架构思考

架构师视角系列,在分析一款组件的源码时,需要深入思考其设计背后的动机。以下是读者在阅读本篇文章时应思考的问题:

(1)配置拉取的设计
  • 思考点: 设计中采用的配置拉取方式是如何选择的?背后的动机是什么?可能的考虑包括系统性能、可维护性和安全性。
(2)配置的注入方式
  • 思考点: 配置是如何被注入到组件中的?这种注入方式有何优势?设计选择的原因可能涉及松耦合、动态变化和代码可维护性等方面。
(3)配置变更的通知机制
  • 思考点: 配置变更是如何通知其他组件的?为什么选择当前的通知机制?可能的考虑包括实时性、效率以及系统整体的架构要求。
(4)为什么配置拉取拆分为两个请求?
  • 思考点: 配置拉取为何拆分为两个独立的请求?这个设计决策的目的是什么?可能涉及到性能优化、可伸缩性以及减轻服务器负担的考虑。
(5)长轮询的概念
  • 思考点: 什么是长轮询?为何在配置方案中选择使用它?长轮询的优势在哪里?可能涉及到减少轮询频率、降低网络开销以及更及时的配置变更通知。
(6)为什么需要做本地文件缓存?
  • 思考点: 为什么在组件中引入了本地文件缓存的机制?这样的设计有哪些优点?可能牵涉到性能优化、离线支持以及用户体验的方面。

在深入研究源码时,理解这些设计决策背后的原因,有助于更全面地理解系统架构,并为自己的设计提供有价值的启示。

三、源码剖析

在这里插入图片描述

1、初始化

(1)逻辑描述

通过实现Spring框架提供的BeanPostProcessor接口,并完成postProcessBeforeInitialization函数的实现,我们能够在Bean初始化之前执行自定义的操作。BeanPostProcessor是Spring框架提供的一个扩展点,允许我们在Bean初始化前后插入自定义逻辑。在postProcessBeforeInitialization函数中,我们有机会遍历Bean的成员变量和函数,实现在初始化之前对它们进行定制化处理的需求。

(2)时序图
#### (3)代码位置

ApolloProcessor#postProcessBeforeInitialization

为了讲解更加顺畅,会沿着Method上的注解@ApolloConfigChangeListener实现逻辑进行讲解。

public abstract class ApolloProcessor implements BeanPostProcessor, PriorityOrdered {

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName)
    throws BeansException {
        Class clazz = bean.getClass();
        // 遍历Bean中的成员变量
        for (Field field : findAllField(clazz)) {
            processField(bean, beanName, field);
        }
        // 遍历Bean中的所有函数(根据这条逻辑进行讲解)
        for (Method method : findAllMethod(clazz)) {
            processMethod(bean, beanName, method);
        }
        return bean;
    }
    ...
}

2、查找注解

(1)逻辑描述

创建配置变化的监听器,并创建namespace对应的config实例,将监听器注册到config实例中。当发生配置变更时,会调用监听器的onChange函数,并利用反射机制通知对应的函数(使用@ApolloConfigChange)。

(2)时序图
image-20220504231427524
(3)代码位置

ApolloAnnotationProcessor#processMethod

public class ApolloAnnotationProcessor extends ApolloProcessor implements BeanFactoryAware,
    EnvironmentAware {
  ...
  @Override
  protected void processMethod(final Object bean, String beanName, final Method method) {
    // 处理函数上的注解(@ApolloConfigChange)(关注这里)
    this.processApolloConfigChangeListener(bean, method);
    this.processApolloJsonValue(bean, beanName, method);
  }

  private void processApolloConfigChangeListener(final Object bean, final Method method) {
    ApolloConfigChangeListener annotation = AnnotationUtils
        .findAnnotation(method, ApolloConfigChangeListener.class);
    if (annotation == null) {
      return;
    }
    Class<?>[] parameterTypes = method.getParameterTypes();
    Preconditions.checkArgument(parameterTypes.length == 1,
        "Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
        method);
    Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
        "Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
        method);

    ReflectionUtils.makeAccessible(method);
    String[] namespaces = annotation.value();
    String[] annotatedInterestedKeys = annotation.interestedKeys();
    String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
    // 创建配置变化监听器。当配置发生变化时,会调用onChange函数并使用反射触发标识@ApolloConfigChange的Method
    ConfigChangeListener configChangeListener = new ConfigChangeListener() {
      @Override
      public void onChange(ConfigChangeEvent changeEvent) {
        ReflectionUtils.invokeMethod(method, bean, changeEvent);
      }
    };

    Set<String> interestedKeys =
        annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
    Set<String> interestedKeyPrefixes =
        annotatedInterestedKeyPrefixes.length > 0 ? Sets.newHashSet(annotatedInterestedKeyPrefixes)
            : null;

    // 遍历namespace
    for (String namespace : namespaces) {
      final String resolvedNamespace = this.environment.resolveRequiredPlaceholders(namespace);
      // 创建(获取)Config实例(关注这里)
      Config config = ConfigService.getConfig(resolvedNamespace);

      // 注册监听器
      if (interestedKeys == null && interestedKeyPrefixes == null) {
        // 将创建的监听器注册到namespace对应的config实例中(关注这里)
        config.addChangeListener(configChangeListener);
      } else {
        config.addChangeListener(configChangeListener, interestedKeys, interestedKeyPrefixes);
      }
    }
  }
  ...
}

3、建立连接

(1)逻辑描述

为注解(@ApolloConfigChange)绑定的namespace创建Config实例,Config实例中会会为namespace创建本地配置仓库(createLocalConfigRepository处理本地配置存储)和远程配置仓库(createRemoteConfigRepository处理远程ConfigService配置拉取)。

(2)时序图
image-20220504231427524
(3)具体函数

ConfigService#getConfig

public class ConfigService {

  // 创建一个ConfigService单例
  private static final ConfigService s_instance = new ConfigService();

  // 获取 m_configManager 与 m_configRegistry单例
  private volatile ConfigManager m_configManager;
  private volatile ConfigRegistry m_configRegistry;
    
  // 获取nanespae对应的config实例(关注这里)
  public static Config getConfig(String namespace) {
    return s_instance.getManager().getConfig(namespace);
  }

(2)具体函数:DefaultConfigManager#getConfig

public class DefaultConfigManager implements ConfigManager {

  @Override
  public Config getConfig(String namespace) {
    Config config = m_configs.get(namespace);
    // 每个namespace创建一个Config对象
    if (config == null) {
      synchronized (this) {
        config = m_configs.get(namespace);
        if (config == null) {
          ConfigFactory factory = m_factoryManager.getFactory(namespace);
          //config对象中有,拉取远程和本地仓库(Repository)
          config = factory.create(namespace);
          m_configs.put(namespace, config);
        }
      }
    }
    return config;
  }
}

(3)具体函数:DefaultConfigFactory#create

创建顺序是:1)创建远端存储仓库,从Config Service中拉取配置数据;2)创建本地存储仓库,将远端拉取到的配置文件存储到本地文件中;3)实例化Config,以供后续获取配置信息使用。

public class DefaultConfigFactory implements ConfigFactory {
  ...
  @Override
  public Config create(String namespace) {
    ConfigFileFormat format = determineFileFormat(namespace);
    if (ConfigFileFormat.isPropertiesCompatible(format)) {
      return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
    }
    // (关注这里)。调用createLocalConfigRepository函数,创建LocalConfigRepository,建立本地存储仓库
    return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));
  }

  LocalFileConfigRepository createLocalConfigRepository(String namespace) {
    if (m_configUtil.isInLocalMode()) {
      logger.warn(
          "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
          namespace);
      return new LocalFileConfigRepository(namespace);
    }
    // (关注这里)。调用createRemoteConfigRepository函数,创建RemoteConfigRepository,建立远程存储仓库
    return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
  }
    
  RemoteConfigRepository createRemoteConfigRepository(String namespace) {
    return new RemoteConfigRepository(namespace);
  }
  ...
}

4、拉取配置

(1)逻辑描述

配置拉取主要分为三个关键步骤:1)初始化加载配置;2)定期拉取配置;3)通过长轮询进行刷新。在这其中,长轮询刷新阶段又分为两个请求:1)配置更新通知(通过长轮询实现);2)详细配置拉取。通过这个流程,系统能够实现配置的及时更新,确保应用程序始终使用最新的配置信息。

(2)时序图
image-20220504231427524
(3)代码实现

具体函数:RemoteConfigRepository#RemoteConfigRepository

public class RemoteConfigRepository extends AbstractConfigRepository {
  ...
  public RemoteConfigRepository(String namespace) {
    m_namespace = namespace;
    m_configCache = new AtomicReference<>();
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    m_httpClient = ApolloInjector.getInstance(HttpClient.class);
    m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
    remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
    m_longPollServiceDto = new AtomicReference<>();
    m_remoteMessages = new AtomicReference<>();
    m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
    m_configNeedForceRefresh = new AtomicBoolean(true);
    m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
        m_configUtil.getOnErrorRetryInterval() * 8);
    // 初始化加载配置
    this.trySync();
    // 周期拉取配置(apollo定时兜底)
    this.schedulePeriodicRefresh();
    // 长轮询
    this.scheduleLongPollingRefresh();
  }
    
}
a)配置初始化加载(trySync)

具体函数:AbstractConfigRepository#trySync

时序图:
image-20220504231427524

逻辑描述:namespace初始化加载配置

public abstract class AbstractConfigRepository implements ConfigRepository {  
  ...
  // 获取配置内容
  protected boolean trySync() {
    try {
      // (关注这里)获取配置
      sync();
      return true;
    } catch (Throwable ex) {
      Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
      logger
          .warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
              .getDetailMessage(ex));
    }
    return false;
  }
  ...
}

具体函数:RemoteConfigRepository#sync()

public class RemoteConfigRepository extends AbstractConfigRepository {
  ...
  @Override
  protected synchronized void sync() {
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

    try {
      ApolloConfig previous = m_configCache.get();
      // 加载配置(关注这里)
      ApolloConfig current = loadApolloConfig();

      //reference equals means HTTP 304
      if (previous != current) {
        logger.debug("Remote Config refreshed!");
        m_configCache.set(current);
        // 通知Repository监听器,配置发生变化(关注这里)
        this.fireRepositoryChange(m_namespace, this.getConfig());
      }

      if (current != null) {
        Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
            current.getReleaseKey());
      }

      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      transaction.setStatus(ex);
      throw ex;
    } finally {
      transaction.complete();
    }
  }

  // 加载配置
  private ApolloConfig loadApolloConfig() {
    // 限流,避免创建过多连接。同一个namespace会有多种触发loadApolloConfig函数的方式
    if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
      //wait at most 5 seconds
      try {
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
      }
    }
    String appId = m_configUtil.getAppId();
    String cluster = m_configUtil.getCluster();
    String dataCenter = m_configUtil.getDataCenter();
    String secret = m_configUtil.getAccessKeySecret();
    Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
    int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
    long onErrorSleepTime = 0; // 0 means no sleep
    Throwable exception = null;

    // 从meta server中获取注册到eureka的config service
    List<ServiceDTO> configServices = getConfigServices();
    String url = null;
    retryLoopLabel:
    for (int i = 0; i < maxRetries; i++) {
      List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
      Collections.shuffle(randomConfigServices);
      if (m_longPollServiceDto.get() != null) {
        randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
      }

      for (ServiceDTO configService : randomConfigServices) {
        if (onErrorSleepTime > 0) {
          logger.warn(
              "Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
              onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);

          try {
            m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
          } catch (InterruptedException e) {
            //ignore
          }
        }
        // 拼接请求config service获取配置的url
        url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
                dataCenter, m_remoteMessages.get(), m_configCache.get());

        logger.debug("Loading config from {}", url);

        HttpRequest request = new HttpRequest(url);
        if (!StringUtils.isBlank(secret)) {
          Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
          request.setHeaders(headers);
        }

        Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
        transaction.addData("Url", url);
        try {

          // 发送请求
          HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class);
          m_configNeedForceRefresh.set(false);
          m_loadConfigFailSchedulePolicy.success();

          transaction.addData("StatusCode", response.getStatusCode());
          transaction.setStatus(Transaction.SUCCESS);

          // 如果配置没有变更,config service会返回304状态码
          if (response.getStatusCode() == 304) {
            logger.debug("Config server responds with 304 HTTP status code.");
            // 缓存中拉取历史配置
            return m_configCache.get();
          }

          ApolloConfig result = response.getBody();

          logger.debug("Loaded config for {}: {}", m_namespace, result);
          // 如果配置变更,这会直接返回
          return result;
        } catch (ApolloConfigStatusCodeException ex) {
          ApolloConfigStatusCodeException statusCodeException = ex;
          //config not found
          if (ex.getStatusCode() == 404) {
            String message = String.format(
                "Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
                    "please check whether the configs are released in Apollo!",
                appId, cluster, m_namespace);
            statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
                message);
          }
          Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
          transaction.setStatus(statusCodeException);
          exception = statusCodeException;
          if(ex.getStatusCode() == 404) {
            break retryLoopLabel;
          }
        } catch (Throwable ex) {
          Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
          transaction.setStatus(ex);
          exception = ex;
        } finally {
          transaction.complete();
        }

        // if force refresh, do normal sleep, if normal config load, do exponential sleep
        onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
            m_loadConfigFailSchedulePolicy.fail();
      }
    }
  ...
}
b)周期配置拉取(schedulePeriodicRefresh)

具体函数:RemoteConfigRepository#schedulePeriodicRefresh

时序图:
image-20220504231427524

逻辑描述:周期拉取配置(apollo定时兜底)

public class RemoteConfigRepository extends AbstractConfigRepository {
  ...
  private final static ScheduledExecutorService m_executorService;
    
  static {
    m_executorService = Executors.newScheduledThreadPool(1,
        ApolloThreadFactory.create("RemoteConfigRepository", true));
  }

  // 定时拉取配置
  private void schedulePeriodicRefresh() {
    logger.debug("Schedule periodic refresh with interval: {} {}",
        m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
      // 固定时间间隔执行任务
      m_executorService.scheduleAtFixedRate(
        new Runnable() {
          @Override
          public void run() {
            Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
            logger.debug("refresh config for namespace: {}", m_namespace);
            trySync();
            Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
          }
        }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
        m_configUtil.getRefreshIntervalTimeUnit());
  }
  ...
}
c)长轮询监听与最新配置拉取(scheduleLongPollingRefresh)

具体函数:RemoteConfigRepository#scheduleLongPollingRefresh()

时序图:
image-20220504231427524

逻辑描述:建立长轮询,监听配置变更通知通知后加载最新配置

public class RemoteConfigRepository extends AbstractConfigRepository {

  private void scheduleLongPollingRefresh() {
    remoteConfigLongPollService.submit(m_namespace, this);
  }
}

(8)具体函数:RemoteConfigLongPollService#submit()

public class RemoteConfigLongPollService {

  private final ExecutorService m_longPollingService;

  public RemoteConfigLongPollService() {
    m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
    m_longPollingStopped = new AtomicBoolean(false);
    m_longPollingService = Executors.newSingleThreadExecutor(
        ApolloThreadFactory.create("RemoteConfigLongPollService", true));
    m_longPollStarted = new AtomicBoolean(false);
    m_longPollNamespaces =
        Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create());
    m_notifications = Maps.newConcurrentMap();
    m_remoteNotificationMessages = Maps.newConcurrentMap();
    m_responseType = new TypeToken<List<ApolloConfigNotification>>() {
    }.getType();
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    m_httpClient = ApolloInjector.getInstance(HttpClient.class);
    m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
    m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
  }

  public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
    // 如果长轮询已经启动,就不会再往线程池里添加runnable(通过m_longPollStarted判断是否启动),但是会往m_longPollNamespaces中添加需要被通知变更的namespace对应的remoteConfigRepository
    boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
    m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
    if (!m_longPollStarted.get()) {
      startLongPolling();
    }
    return added;
  }

  // 多个namespace,也只有一个长轮询
  private void startLongPolling() {
    if (!m_longPollStarted.compareAndSet(false, true)) {
      //already started
      return;
    }
    try {
      final String appId = m_configUtil.getAppId();
      final String cluster = m_configUtil.getCluster();
      final String dataCenter = m_configUtil.getDataCenter();
      final String secret = m_configUtil.getAccessKeySecret();
      final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
      // 单线程连接池
      m_longPollingService.submit(new Runnable() {
        @Override
        public void run() {
          if (longPollingInitialDelayInMills > 0) {
            try {
              logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
              TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
            } catch (InterruptedException e) {
              //ignore
            }
          }
          doLongPollingRefresh(appId, cluster, dataCenter, secret);
        }
      });
    } catch (Throwable ex) {
      m_longPollStarted.set(false);
      ApolloConfigException exception =
          new ApolloConfigException("Schedule long polling refresh failed", ex);
      Tracer.logError(exception);
      logger.warn(ExceptionUtil.getDetailMessage(exception));
    }
  }

  private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
    final Random random = new Random();
    ServiceDTO lastServiceDto = null;
    // 只要不中断就循环
    while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
      // limiter令牌桶限流为2qps, 5秒之内存在没有获取到1个令牌的情况,则休眠5秒
      if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
        //wait at most 5 seconds
        try {
          TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }
      }
      Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
      String url = null;
      try {
        if (lastServiceDto == null) {
          List<ServiceDTO> configServices = getConfigServices();
          lastServiceDto = configServices.get(random.nextInt(configServices.size()));
        }

        url =
            assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
                m_notifications);

        logger.debug("Long polling from {}", url);

        HttpRequest request = new HttpRequest(url);
        request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
        if (!StringUtils.isBlank(secret)) {
          Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
          request.setHeaders(headers);
        }

        transaction.addData("Url", url);

        final HttpResponse<List<ApolloConfigNotification>> response =
            m_httpClient.doGet(request, m_responseType);

        logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
        if (response.getStatusCode() == 200 && response.getBody() != null) {
          updateNotifications(response.getBody());
          updateRemoteNotifications(response.getBody());
          transaction.addData("Result", response.getBody().toString());
          // 此处通知,执行notify之后加载数据
          notify(lastServiceDto, response.getBody());
        }

        //try to load balance
        if (response.getStatusCode() == 304 && random.nextBoolean()) {
          lastServiceDto = null;
        }

        m_longPollFailSchedulePolicyInSecond.success();
        transaction.addData("StatusCode", response.getStatusCode());
        transaction.setStatus(Transaction.SUCCESS);
      } catch (Throwable ex) {
        lastServiceDto = null;
        Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
        transaction.setStatus(ex);
        long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
        logger.warn(
            "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
            sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
        try {
          TimeUnit.SECONDS.sleep(sleepTimeInSecond);
        } catch (InterruptedException ie) {
          //ignore
        }
      } finally {
        transaction.complete();
      }
    }
  }

  private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
    if (notifications == null || notifications.isEmpty()) {
      return;
    }
    for (ApolloConfigNotification notification : notifications) {
      String namespaceName = notification.getNamespaceName();
      //create a new list to avoid ConcurrentModificationException
      List<RemoteConfigRepository> toBeNotified =
          Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
      ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
      ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
      //since .properties are filtered out by default, so we need to check if there is any listener for it
      toBeNotified.addAll(m_longPollNamespaces
          .get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
      for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
        try {
          remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
        } catch (Throwable ex) {
          Tracer.logError(ex);
        }
      }
    }
  }

  public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
    m_longPollServiceDto.set(longPollNotifiedServiceDto);
    m_remoteMessages.set(remoteMessages);
    m_executorService.submit(new Runnable() {
      @Override
      public void run() {
        m_configNeedForceRefresh.set(true);
        trySync();
      }
    });
  }
  
}

(9)AbstractConfigRepository#trySync()

public abstract class AbstractConfigRepository implements ConfigRepository {
    
  // 拉配置信息,不是notificationID,而是配置内容
  protected boolean trySync() {
    try {
      sync();
      return true;
    } catch (Throwable ex) {
      Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
      logger
          .warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
              .getDetailMessage(ex));
    }
    return false;
  }

  // 子类RemoteConfigRepository中实现
  protected synchronized void sync() {
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

    try {
      ApolloConfig previous = m_configCache.get();
      ApolloConfig current = loadApolloConfig();

      //reference equals means HTTP 304
      if (previous != current) {
        logger.debug("Remote Config refreshed!");
        m_configCache.set(current);
        this.fireRepositoryChange(m_namespace, this.getConfig());
      }

      if (current != null) {
        Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
            current.getReleaseKey());
      }

      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      transaction.setStatus(ex);
      throw ex;
    } finally {
      transaction.complete();
    }
  }

  protected void fireRepositoryChange(String namespace, Properties newProperties) {
    for (RepositoryChangeListener listener : m_listeners) {
      try {
        listener.onRepositoryChange(namespace, newProperties);
      } catch (Throwable ex) {
        Tracer.logError(ex);
        logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
      }
    }
  }
}

5、变更通知

(1)逻辑描述

在namespace数据发生变更时,系统将通知所有监听该namespace的监听器。系统会比较新老配置,将差异配置存储在ConfigChange中,并随后通知各个监听器。

(2)时序图
image-20220504231427524 #### (2)代码实现

DefaultConfig#onRepositoryChange()

public class DefaultConfig extends AbstractConfig implements RepositoryChangeListener {
  ...
  /**
   * Repository通知变更,
   * @param namespace the namespace of this repository change
   * @param newProperties the properties after change
   */
  @Override
  public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
    if (newProperties.equals(m_configProperties.get())) {
      return;
    }

    ConfigSourceType sourceType = m_configRepository.getSourceType();
    Properties newConfigProperties = propertiesFactory.getPropertiesInstance();
    newConfigProperties.putAll(newProperties);

    // 计算配置变更情况,对新老配置进行比较,将差异配置存储在ConfigChange中Map的格式为{namspace:{key:value}}
    Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties,
        sourceType);

    //check double checked result
    if (actualChanges.isEmpty()) {
      return;
    }

    // 将具体变更通知各监听器
    this.fireConfigChange(m_namespace, actualChanges);

    Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
  }

  // 构建Method的变更事件ConfigChange参数
  private Map<String, ConfigChange> updateAndCalcConfigChanges(Properties newConfigProperties,
      ConfigSourceType sourceType) {
    List<ConfigChange> configChanges = calcPropertyChanges(m_namespace, m_configProperties.get(), newConfigProperties);

    ImmutableMap.Builder<String, ConfigChange> actualChanges =
        new ImmutableMap.Builder<>();

    /** === Double check since DefaultConfig has multiple config sources ==== **/

    //1. use getProperty to update configChanges's old value
    for (ConfigChange change : configChanges) {
      change.setOldValue(this.getProperty(change.getPropertyName(), change.getOldValue()));
    }

    //2. update m_configProperties
    updateConfig(newConfigProperties, sourceType);
    clearConfigCache();

    //3. use getProperty to update configChange's new value and calc the final changes
    for (ConfigChange change : configChanges) {
      change.setNewValue(this.getProperty(change.getPropertyName(), change.getNewValue()));
      switch (change.getChangeType()) {
        case ADDED:
          if (Objects.equals(change.getOldValue(), change.getNewValue())) {
            break;
          }
          if (change.getOldValue() != null) {
            change.setChangeType(PropertyChangeType.MODIFIED);
          }
          actualChanges.put(change.getPropertyName(), change);
          break;
        case MODIFIED:
          if (!Objects.equals(change.getOldValue(), change.getNewValue())) {
            actualChanges.put(change.getPropertyName(), change);
          }
          break;
        case DELETED:
          if (Objects.equals(change.getOldValue(), change.getNewValue())) {
            break;
          }
          if (change.getNewValue() != null) {
            change.setChangeType(PropertyChangeType.MODIFIED);
          }
          actualChanges.put(change.getPropertyName(), change);
          break;
        default:
          //do nothing
          break;
      }
    }
    return actualChanges.build();
  }

  /**
   * 父类中实现 
   * 配置变更通知,通知监听器
   * @param changes map's key is config property's key
   */
  protected void fireConfigChange(String namespace, Map<String, ConfigChange> changes) {
    final Set<String> changedKeys = changes.keySet();
    final List<ConfigChangeListener> listeners = this.findMatchedConfigChangeListeners(changedKeys);

    // notify those listeners
    for (ConfigChangeListener listener : listeners) {
      Set<String> interestedChangedKeys = resolveInterestedChangedKeys(listener, changedKeys);
      InterestedConfigChangeEvent interestedConfigChangeEvent = new InterestedConfigChangeEvent(
          namespace, changes, interestedChangedKeys);
      this.notifyAsync(listener, interestedConfigChangeEvent);
    }
  }

  /**
   * 异步通知
   * @param listener
   * @param changeEvent
   */
  private void notifyAsync(final ConfigChangeListener listener, final ConfigChangeEvent changeEvent) {
    m_executorService.submit(new Runnable() {
      @Override
      public void run() {
        String listenerName = listener.getClass().getName();
        Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
        try {
          listener.onChange(changeEvent);
          transaction.setStatus(Transaction.SUCCESS);
        } catch (Throwable ex) {
          transaction.setStatus(ex);
          Tracer.logError(ex);
          logger.error("Failed to invoke config change listener {}", listenerName, ex);
        } finally {
          transaction.complete();
        }
      }
    });
  }
  ...
}

6、配置注入

(1)逻辑描述

在配置发生变更后,系统会通知在Bean初始化时创建的与namespace对应的监听器。接着,系统通过反射的方式触发相应的函数(使用@ApolloConfigChange注解)。

(2)代码位置

ConfigChangeListener#onChange()

public class ApolloAnnotationProcessor extends ApolloProcessor implements BeanFactoryAware,
    EnvironmentAware {
  ...
  private void processApolloConfigChangeListener(final Object bean, final Method method) {
    ApolloConfigChangeListener annotation = AnnotationUtils
        .findAnnotation(method, ApolloConfigChangeListener.class);
    if (annotation == null) {
      return;
    }
    Class<?>[] parameterTypes = method.getParameterTypes();
    Preconditions.checkArgument(parameterTypes.length == 1,
        "Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
        method);
    Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
        "Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
        method);

    ReflectionUtils.makeAccessible(method);
    // value 是 namespace
    String[] namespaces = annotation.value();
    String[] annotatedInterestedKeys = annotation.interestedKeys();
    String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
    // 创建配置变化监听器
    ConfigChangeListener configChangeListener = new ConfigChangeListener() {
      @Override
      public void onChange(ConfigChangeEvent changeEvent) {
        ReflectionUtils.invokeMethod(method, bean, changeEvent);
      }
    };
  }
  ...
}

四、最后

《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

懂得不多,做得太少。欢迎批评、指正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1402653.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】Linux开发工具 - vim的基本操作

IDE例子 Linux编辑器-vim使用 vi/vim的区别简单点来说&#xff0c;它们都是多模式编辑器&#xff0c;不同的是vim是vi的升级版本&#xff0c;它不仅兼容vi的所有指令&#xff0c;而且还有一些新的特性在里面。例如语法加亮&#xff0c;可视化操作不仅可以在终端运行&#xff…

暴力破解常见的服务器

目录 使用 pydictor 生成自己的字典工具liunx下载使用常用的参数说明插件型字典 (可自己根据 API 文档开发) 使用 hydra 工具在线破解系统用户密码使用 hydra 破解 windows 7 远程桌面密码使用 hydra 工具破解 ssh 服务 root 用户密码 使用 Medusa 工具在线破解medusa参数说明M…

公网环境调试本地配置的Java支付宝沙箱环境模拟支付场景

文章目录 前言1. 下载当面付demo2. 修改配置文件3. 打包成web服务4. 局域网测试5. 内网穿透6. 测试公网访问7. 配置二级子域名8. 测试使用固定二级子域名访问 前言 在沙箱环境调试支付SDK的时候&#xff0c;往往沙箱环境部署在本地&#xff0c;局限性大&#xff0c;在沙箱环境…

JVM知识总结

1.概述 JVM指的是Java虚拟机&#xff0c;本质上是一个运行在计算机上的程序&#xff0c;他的职责是运行Java字节码文件&#xff0c;作用是为了支持跨平台特性。 功能&#xff1a; 装载字节码&#xff0c;解释/编译为机器码 管理数据存储和垃圾回收 优化热点代码提升效率 …

AI技术图像编辑 Luminar Neo

Luminar Neo是一款先进的AI照片编辑软件&#xff0c;旨在简化和增强照片编辑过程。它适用于macOS和Windows&#xff0c;提供独立的应用程序以及用于集成到现有工作流程的插件。Luminar Neo的主要特点包括AI天空替换、Accent AI、氛围AI以及20多种独特的照片效果。无论是风景摄影…

Qt事件处理,提升组件类

1.相关说明 1.提升组件QLabel的类&#xff0c;以实现双击功能 2.监控键盘事件&#xff0c;实现上下左右移动 3.鼠标点击获取坐标 2.相关界面 3.相关代码和操作 自定义类TMyLabel&#xff0c;父类为QLabel tmylabel.h #ifndef TMYLABEL_H #define TMYLABEL_H #include <QL…

图像处理中,采用极线约束准则来约束特征点匹配搜索空间,理论上在极线上进行搜索。这里的极线是什么线,怎么定义的?基本矩阵F和本质矩阵E有什么区别?

问题描述&#xff1a;图像处理中&#xff0c;采用极线约束准则来约束特征点匹配搜索空间&#xff0c;理论上在极线上进行搜索。这里的极线是什么线&#xff0c;怎么定义的&#xff1f;基本矩阵F和本质矩阵E有什么区别&#xff1f; 问题1解答&#xff1a; 极线是通过极线几何学…

飞书+ChatGPT+cpolar搭建企业智能AI助手并实现无公网ip远程访问

文章目录 推荐 前言环境列表1.飞书设置2.克隆feishu-chatgpt项目3.配置config.yaml文件4.运行feishu-chatgpt项目5.安装cpolar内网穿透6.固定公网地址7.机器人权限配置8.创建版本9.创建测试企业10. 机器人测试 推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂…

无线音频设备市场调研:预计2029年将达到404亿美元

无线音频&#xff0c;是指将音频型号以无线电波作为载体&#xff0c;从一个设备传输到另外一个设备&#xff0c;实现音频的无线传输。 最常见的是蓝牙传输&#xff0c;传输频率2.4G&#xff0c;蓝牙音箱、蓝牙耳机都属于蓝牙音频传输&#xff0c;蓝牙音频传输是双向传输&#x…

主板电路学习; 华硕ASUS K43SD笔记本安装win7X64(ventoy)

记录 老爷机 白色 华硕 K43SD 笔记本 安装 win7X64 1. MBR样式常规安装win7X64Sp1 (华硕 K43SD 安装 win7X64 ) 老爷机 白色 华硕 K43SD 笔记本 安装 win7X64 &#xff08;常规安装&#xff09; 设置&#xff1a; 禁用UEFI 启用AHCI ventoy制作MBR&#xff08;非UEFI&#…

性能优化-OpenCL kernel 开发

「发表于知乎专栏《移动端算法优化》」 本文主要介绍OpenCL的 Kernel&#xff0c;包括代码的实例以及使用注意的详解。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&#xff09;开发基础教…

光的干涉与衍射

引用内容来自曹天元的《上帝掷骰子吗&#xff1f;&#xff1a;量子物理史话》&#xff0c;其余内容来自课程。 牛顿光的色散实验 色散实验是牛顿所做的有名的实验之一。实验的情景在一些科普读物里被渲染得令人印象深刻&#xff1a;炎热难忍的夏天&#xff0c;牛顿却戴着厚重的…

谁懂啊,金蝶BI财务分析居然这么简单

接触过财务数据分析的&#xff0c;都知道财务分析中指标计算复杂&#xff0c;维度、指标组合多变&#xff0c;而每当出现了一处变化&#xff0c;就得全部推动重来&#xff0c;那工作量和复杂程度让人头皮发麻。就算是金蝶系统&#xff0c;也是偏流程&#xff0c;对财务数据分析…

OpenGPTs:一款外挂般的GPTs管理器,由ChatPaper团队开源!

OpenGPTs-非常好用的开源GPTs管理器. 一句话介绍 非常好用的GPTs管理器&#xff0c;ChatPaper团队开源一款功能强大的浏览器插件&#xff0c;适合所有拥有Plus权限的朋友。 为什么要做OpenGPTs&#xff1f; &#x1f914;&#x1f4a1; 众所周知&#xff0c;OpenAI官网的GPT…

浅谈 ST 表

更好的阅读体验 浅谈 ST 表 这种东西还是很简单的&#xff0c;但是涉及左移右移&#xff0c;模板容易打挂&#xff0c;写篇笔记。 ST 表是什么 虽然这个是通过二维数组来实现的&#xff0c;但是我不是很喜欢称之为“表”。我觉得完全可以看作是在一维序列上的区间&#xff…

sqli-labs通关笔记(less-11 ~ less16)

上一篇文章说了sqli-labs的less-1到less-10的注入方法&#xff0c;这一篇从less-11开始。 由于从11关开始都是post请求&#xff0c;不会像前十关一样存在符号转成unicode的麻烦&#xff0c;所以不再使用apifox&#xff0c;直接从页面上进行测试。 Less-11 老规矩&#xff0c;…

【深度学习】CodeFormer训练过程,如何训练人脸修复模型CodeFormer

文章目录 BasicSR介绍环境数据阶段 I - VQGAN阶段 II - CodeFormer (w0)阶段 III - CodeFormer (w1) 代码地址&#xff1a;https://github.com/sczhou/CodeFormer/releases/tag/v0.1.0 论文的一些简略介绍&#xff1a; https://qq742971636.blog.csdn.net/article/details/134…

React Hooks 源码解析:useEffect

React Hooks 源码解析&#xff08;4&#xff09;&#xff1a;useEffect React 源码版本: v16.11.0源码注释笔记&#xff1a;airingursb/react 1. useEffect 简介 1.1 为什么要有 useEffect 我们在前文中说到 React Hooks 使得 Functional Component 拥有 Class Component 的…

206.反转链表(附带源码)

一、思路 二、代码 一、思路 将指针调转一个方向就行&#xff0c;很简单 做法&#xff1a; 定义2个指针&#xff1a;prev、 cur、 next 当next为空时&#xff0c;循环结束 思路清晰&#xff0c;操作清楚&#xff0c;开始敲代码。 二、代码 struct ListNode* reverseList(s…

Tide Quencher 8WS-Mal,TQ8WS-Mal,能够针对特定的荧光物质进行淬灭

您好&#xff0c;欢迎来到新研之家 文章关键词&#xff1a;Tide Quencher 8WS maleimide&#xff0c;TQ8WS maleimide &#xff0c;Tide Quencher 8WS Mal&#xff0c;TQ8WS Mal&#xff0c;荧光淬灭剂Tide Quencher 8WS 马来酰亚胺 &#xff0c;TQ8WS 马来酰亚胺 一、基本信…