前言
从客户端与服务端两个角度概述一下Eureka服务发现的原理,如下:
客户端
依赖自动装配机制,客户端启动时就会从Eureka服务端全量获取服务实例的注册信息并缓存到本地。之后每隔30秒向Eureka服务端发起增量获取的请求,如果增量获取的数据为空或者使用增量获取的数据更新本地缓存后发现一致性hashcode与服务端返回的不一致,就会发起全量获取的请求。
服务端
服务端先从只读缓存中查询,如果命中则直接返回,否则从读写缓存中查询,然后更新到只读缓存中并返回。
读写缓存的失效时间是3分钟,一旦读写缓存失效对于全量获取的请求会从registry缓存中加载;对于增量获取的请求会从 recentlyChangedQueue 队列(存放最近发生变化的服务实例)中加载,同时会有定时任务每隔30秒清空该队列。
源码分析
服务发现(客户端逻辑)
一、EurekaClientAutoConfiguration
在 spring-cloud-netflix-eureka-client jar包下的 META-INF/spring.factories 文件中指定了 EurekaClientAutoConfiguration 为其中的自动装配类。
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
EurekaClientAutoConfiguration 自动装配类中定义了 CloudEurekaClient 类型的bean。
二、CloudEurekaClient
在 CloudEurekaClient 的构造器中引用了父类 DiscoveryClient 的构造器
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher) {
// 调用父类 DiscoveryClient 的构造器
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
三、DiscoveryClient
节选 DiscoveryClient 构造器方法的部分代码:
// 判断客户端是否应该从Eureka Server获取注册信息
// 即判断 eureka.client.fetchRegistry 属性值(默认true)
if (clientConfig.shouldFetchRegistry()) {
try {
// 启动时从Eureka Server获取注册信息
boolean primaryFetchRegistryResult = fetchRegistry(false);
if (!primaryFetchRegistryResult) {
logger.info("Initial registry fetch from primary servers failed");
}
boolean backupFetchRegistryResult = true;
if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
backupFetchRegistryResult = false;
logger.info("Initial registry fetch from backup servers failed");
}
if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
}
} catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
节选 intScheduledTasks 方法的部分代码:
// 判断客户端是否应该从Eureka Server获取注册信息
// 即判断 eureka.client.fetchRegistry 属性值(默认true)
if (clientConfig.shouldFetchRegistry()) {
// 获取客户端从Eureka Server获取注册信息的时间间隔
// 即获取 eureka.client.registryFetchIntervalSeconds 属性值(默认30)
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
// 获取缓存刷新执行器的指数补偿相关属性
// 即获取 eureka.client.cacheRefreshExecutorExponentialBackOffBound 属性值(默认10)
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
// 每隔30秒执行一次TimedSupervisorTask任务
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
TimedSupervisorTask#run
@Override
public void run() {
Future<?> future = null;
try {
// 执行 CacheRefreshThread#run 方法
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
// 每隔30秒重复执行一次
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
CacheRefreshThread#run
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
refreshRegistry 节选关键代码
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 从localRegionApps缓存中获取Applications实例
Applications applications = getApplications();
// 1、判断是否设置了禁止增量获取注册信息,即判断eureka.client.disableDelta属性值(默认false)
// 2、判断是否设置了VIP地址,即判断eureka.client.registryRefreshSingleVipAddress属性值
// 3、判断是否设置了强制全量获取注册信息,即forceFullRegistryFetch参数值
// 4、判断从localRegionApps缓存中获取Applications实例是否为空
// 5、判断上述的Applications实例的Application队列是否为空
// 6、判断上述的Applications实例的versionDelta是否等于-1
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1))
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 全量获取注册信息
getAndStoreFullRegistry();
} else {
// 增量获取注册信息
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 遍历EurekaEventListener监听器列表,分别触发对CacheRefreshedEvent事件的处理
// 发布HeartbeatEvent事件
onCacheRefreshed();
// 如果发现实例状态发生变化,则发布StatusChangeEvent事件
updateInstanceRemoteStatus();
return true;
}
全量获取注册信息 - getAndStoreFullRegistry
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 判断是否指定“eureka.client.registryRefreshSingleVipAddress”属性,即表示客户端是否对指定VIP地址的注册信息感兴趣
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
// 对fetchRegistryGeneration加一
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 客户端将获取到的Applications实例缓存到localRegionApps中
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
增量获取注册信息 - getAndUpdateDelta
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
// 向服务端发起增量获取注册信息的请求
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 如果增量获取的数据为空,则进行全量获取
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
// 对currentUpdateGeneration加一
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 更新本地缓存localRegionApps存储的Applications实例
updateDelta(delta);
// 获取一致性hashcode
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// 如果服务端返回的一致性hashcode与客户端存储的一致性hashcode不一致
// 或者需要记录客户端与服务端的注册信息的差异
// 可以通过配置文件中的 eureka.client.logDeltaDiff 属性修改默认值(默认false)
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
// 向服务端发起全量获取注册信息的请求
reconcileAndLogDifference(delta, reconcileHashCode);
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
updateDelta
private void updateDelta(Applications delta) {
int deltaCount = 0;
// 对从服务端获取到的Application列表进行遍历
for (Application app : delta.getRegisteredApplications()) {
// 对每个Application对应的服务实例列表进行遍历
for (InstanceInfo instance : app.getInstances()) {
// 获取本地缓存localRegionApps存储的Applications实例
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {
remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}
++deltaCount;
// 如果服务实例的类型是新增
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
// 如果服务实例的类型是修改
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
// 如果服务实例的类型是删除
} else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
existingApp.removeInstance(instance);
if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
applications.removeApplication(existingApp);
}
}
}
}
}
logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
for (Applications applications : remoteRegionVsApps.values()) {
applications.setVersion(delta.getVersion());
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}
服务发现-全量获取(服务端逻辑)
一、ApplicationsResource
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
// 一、判断客户端发送请求中的“regions”参数是否为空
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
// 对EurekaMonitors的counter、myZoneCounter指标加一
EurekaMonitors.GET_ALL.increment();
} else {
// 将“regions”使用逗号分隔成字符串数组
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions);
// 对EurekaMonitors的counter、myZoneCounter指标加一
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// 二、判断是否允许访问注册信息,如果不允许直接返回403
// 满足如下任一条件即为不允许:
// 1、如果“isRemoteRegionRequested”为false,满足当前时间<=启动时间+等待时间(Eureka服务器在集群中获取不到对等服务器的服务实例时的等待时间,默认5分钟)
// 可以通过 eureka.server.waitTimeInMsWhenSyncEmpty 属性修改默认值
// 2、如果“isRemoteRegionRequested”为false,满足当前时间<=启动时间+等待时间,或者regionNameVSRemoteRegistry缓存中任意一个没有准备就绪(即判断RemoteRegionRegistry#readyForServingData属性值)
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
// 三、使用ThreadLocal存储版本号,默认V2
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
// 四、如果请求头没有指定json,则使用xml
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
// 五、构建缓存Key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
// 六、从缓存中获取缓存Key对应的数据
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
// 如果请求头指定了“Accept-Encoding”为“gzip”,则使用ResponseCacheImpl#getGZIP方法
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
// 否则使用ResponseCacheImpl#get方法
response = Response.ok(responseCache.get(cacheKey))
.build();
}
// 七、删除ThreadLocal中存储的版本号
CurrentRequestVersion.remove();
return response;
}
二、ResponseCacheImpl
get
public String get(final Key key) {
// shouldUseReadOnlyResponseCache:是否使用只读缓存,默认true
// 可以通过 eureka.server.useReadOnlyResponseCache 属性修改默认值
return get(key, shouldUseReadOnlyResponseCache);
}
@VisibleForTesting
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
getValue
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
// 如果使用只读缓存
if (useReadOnlyCache) {
// 从只读缓存中获取
final Value currentPayload = readOnlyCacheMap.get(key);
// 如果只读缓存命中,则返回数据
if (currentPayload != null) {
payload = currentPayload;
// 如果只读缓存未命中
} else {
// 从读写缓存中获取
payload = readWriteCacheMap.get(key);
// 将读写缓存中获取到的数据放到只读缓存中
readOnlyCacheMap.put(key, payload);
}
// 如果不使用只读缓存
} else {
// 从读写缓存中获取
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
// 返回获取到的数据
return payload;
}
而读写缓存实际上是一个 LoadingCache,在 ResponseCacheImpl 构造器中有如下代码:
// eureka.server.initialCapacityOfResponseCache:设置读写缓存的初始容量,默认1000
// eureka.server.responseCacheAutoExpirationInSeconds:读写缓存的失效时间,默认180秒
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
// 根据Key获取Value
Value value = generatePayload(key);
return value;
}
});
generatePayload
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
// 全量
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
// 重点看下 AbstractInstanceRegistry#getApplications 方法
payload = getPayLoad(key, registry.getApplications());
}
// 增量
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
三、AbstractInstanceRegistry
getApplications
public Applications getApplications() {
// 获取是否禁止回退到其它时区,默认false
// 可以通过 eureka.server.disableTransparentFallbackToOtherRegion 属性修改默认值
boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
if (disableTransparentFallback) {
return getApplicationsFromLocalRegionOnly();
} else {
return getApplicationsFromAllRemoteRegions();
}
}
getApplicationsFromAllRemoteRegions
public Applications getApplicationsFromAllRemoteRegions() {
return getApplicationsFromMultipleRegions(allKnownRemoteRegions);
}
getApplicationsFromMultipleRegions
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
includeRemoteRegion, remoteRegions);
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
Applications apps = new Applications();
// 设置版本号为1
apps.setVersion(1L);
// 对registry缓存遍历
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
// 对存储appName、Lease实例对应关系的map进行遍历
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
// 构建Application实例
app = new Application(lease.getHolder().getAppName());
}
// 使用Lease实例相关属性填充服务实例,然后将其添加到Application实例中
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
// 再将Application实例添加到Applications实例中
apps.addApplication(app);
}
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
// 设置一致性hashcode
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
服务发现-增量获取(服务端逻辑)
一、ApplicationResources
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
// 一、判断客户端发送请求中的“regions”参数是否为空
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
// 二、判断是否允许访问注册信息,如果不允许直接返回403
// 满足如下任一条件即为不允许:
// 1、如果配置文件中 eureka.server.disableDelta 属性(默认false)设置为true,即禁用增量获取
// 2、如果“isRemoteRegionRequested”为false,满足当前时间<=启动时间+等待时间(Eureka服务器在集群中获取不到对等服务器的服务实例时的等待时间,默认5分钟)
// 可以通过 eureka.server.waitTimeInMsWhenSyncEmpty 属性修改默认值
// 3、如果“isRemoteRegionRequested”为false,满足当前时间<=启动时间+等待时间,或者regionNameVSRemoteRegistry缓存中任意一个没有准备就绪(即判断RemoteRegionRegistry#readyForServingData属性值)
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
return Response.status(Status.FORBIDDEN).build();
}
String[] regions = null;
if (!isRemoteRegionRequested) {
// 对EurekaMonitors的counter、myZoneCounter指标加一
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
// 将“regions”使用逗号分隔成字符串数组
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions);
// 对EurekaMonitors的counter、myZoneCounter指标加一
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}
// 三、使用ThreadLocal存储版本号,默认V2
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
// 四、如果请求头没有指定json,则使用xml
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
// 五、构建缓存Key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
final Response response;
// 六、从缓存中获取缓存Key对应的数据
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
// 如果请求头指定了“Accept-Encoding”为“gzip”,则使用ResponseCacheImpl#getGZIP方法
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
// 否则使用ResponseCacheImpl#get方法
response = Response.ok(responseCache.get(cacheKey)).build();
}
// 七、删除ThreadLocal中存储的版本号
CurrentRequestVersion.remove();
return response;
}
二、ResponseCacheImpl
get
public String get(final Key key) {
// shouldUseReadOnlyResponseCache:是否使用只读缓存,默认true
// 可以通过配置文件 eureka.server.useReadOnlyResponseCache 修改默认值
return get(key, shouldUseReadOnlyResponseCache);
}
@VisibleForTesting
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
getValue
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
// 如果使用只读缓存
if (useReadOnlyCache) {
// 从只读缓存中获取
final Value currentPayload = readOnlyCacheMap.get(key);
// 如果只读缓存命中,则返回数据
if (currentPayload != null) {
payload = currentPayload;
// 如果只读缓存未命中
} else {
// 从读写缓存中获取
payload = readWriteCacheMap.get(key);
// 将读写缓存中获取到的数据放到只读缓存中
readOnlyCacheMap.put(key, payload);
}
// 如果不使用只读缓存
} else {
// 从读写缓存中获取
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
// 返回获取到的数据
return payload;
}
而读写缓存实际上是一个 LoadingCache,在 ResponseCacheImpl 构造器中有如下代码:
// eureka.server.initialCapacityOfResponseCache:设置读写缓存的初始容量,默认1000
// eureka.server.responseCacheAutoExpirationInSeconds:读写缓存的失效时间,默认180秒
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
// 根据Key获取Value
Value value = generatePayload(key);
return value;
}
});
generatePayload
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
// 全量
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
// 增量
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
// 重点看下 AbstractInstanceRegistry#getApplicationDeltas 方法
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
三、AbstractInstanceRegistry
@Deprecated
public Applications getApplicationDeltas() {
GET_ALL_CACHE_MISS_DELTA.increment();
Applications apps = new Applications();
// 设置版本号
apps.setVersion(responseCache.getVersionDelta().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
write.lock();
try {
// 获取 recentlyChangedQueue 队列的迭代器
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is : {}",
this.recentlyChangedQueue.size());
// 对 recentlyChangedQueue 队列进行遍历
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
logger.debug(
"The instance id {} is found with status {} and actiontype {}",
instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
// 从applicationInstancesMap中获取appName对应的Application实例
Application app = applicationInstancesMap.get(instanceInfo
.getAppName());
// 如果未命中
if (app == null) {
// 构建Application实例
app = new Application(instanceInfo.getAppName());
// 将服务实例的appName与Application实例的对应关系放到applicationInstancesMap中
applicationInstancesMap.put(instanceInfo.getAppName(), app);
// 将Application实例添加到Application实例中
apps.addApplication(app);
}
// 使用Lease实例的相关属性填充服务实例的相关属性,然后将服务实例添加到Application实例中
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
// 获取是否禁止回退到其它时区,默认false
// 可以通过 eureka.server.disableTransparentFallbackToOtherRegion 属性修改默认值
boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
if (!disableTransparentFallback) {
Applications allAppsInLocalRegion = getApplications(false);
// regionNameVSRemoteRegistry默认为空
for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
Applications applications = remoteRegistry.getApplicationDeltas();
for (Application application : applications.getRegisteredApplications()) {
Application appInLocalRegistry =
allAppsInLocalRegion.getRegisteredApplications(application.getName());
if (appInLocalRegistry == null) {
apps.addApplication(application);
}
}
}
}
// 主要用来生成一致性hashcode,类似“UP_2”、“UP_1_DOWN_1”
Applications allApps = getApplications(!disableTransparentFallback);
// 设置一致性hashcode
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
在 AbstractInstanceRegistry 的如下五个方法中,会将最近发生变化的服务实例添加到 recentlyChangedQueue队列中:
此外,在 AbstractInstanceRegistry 构造器中定义了一个定时任务,用于清理 recentlyChangedQueue 队列,如下:
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
// 默认每隔30秒执行一次清空 recentlyChangedQueue 队列的任务
// 可以通过 eureka.server.deltaRetentionTimerIntervalInMs 属性修改默认值
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
getDeltaRetentionTask
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
// 通过对 recentlyChangedQueue 队列的迭代遍历,最终会清空该队列
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}