版本 awsVersion = ‘1.11.277’
LeaseManager 接口管理实例的租约信息,提供以下功能:
- 注册实例
- 取消注册实例
- 实例续约
- 剔除过期实例
public interface LeaseManager<T> {
/*
* 注册实例并续约
*/
void register(T r, int leaseDuration, boolean isReplication);
/**
* 取消注册实例
*/
boolean cancel(String appName, String id, boolean isReplication);
/**
* 续约
*/
boolean renew(String appName, String id, boolean isReplication);
/**
* 剔除过期实例
*/
void evict();
}
InstanceRegistry 接口即注册表服务,继承 LeaseManager 接口,提供以下功能:
- 启动和关闭注册表服务
- 更新注册表中实例的状态
- 从注册表中获取应用信息和实例信息
- 初始化和获取注册表缓存
- 租约过期机制和自我保护机制(和 LeaseManager 的 evict() 方法相关)
public interface InstanceRegistry extends LeaseManager<InstanceInfo>, LookupService<String> {
// ========================
// 启动和关闭注册表服务
// ========================
/**
* 在PeerAwareInstanceRegistry接口的init()和syncUp()方法调用后被调用
* 1.更新expectedNumberOfClientsSendingRenews
* 更新numberOfRenewsPerMinThreshold
* 2.如果从其他Eureka节点拉取注册表成功并且实例数量大于0
* 设置peerInstancesTransferEmptyOnStartup为false
* 和PeerAwareInstanceRegistry接口的shouldAllowAccess()方法相关
* 3.设置startupTime为当前时间
* 4.设置自身实例状态为InstanceStatus.UP
* 5.调用postInit()方法
* 创建EvictionTask并通过Timer调度定时剔除过期实例
* 配置evictionIntervalTimerInMs指定剔除过期实例的时间间隔,默认60s
*/
void openForTraffic(ApplicationInfoManager applicationInfoManager, int count);
void shutdown();
// ========================
// 更新注册表中实例的状态
// ========================
@Deprecated
void storeOverriddenStatusIfRequired(String id, InstanceStatus overriddenStatus);
/**
* 更新注册表中实例的overriddenStatus
*/
void storeOverriddenStatusIfRequired(String appName, String id, InstanceStatus overriddenStatus);
/**
* 更新注册表中实例的overriddenStatus和status
*/
boolean statusUpdate(String appName,
String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication);
/**
* 删除注册表中实例的overriddenStatus并设置status
*/
boolean deleteStatusOverride(String appName,
String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication);
/**
* 获取注册表中overriddenStatus集合的快照
*/
Map<String, InstanceStatus> overriddenInstanceStatusesSnapshot();
// ========================
// 注册表 CRUD
// ========================
/**
* 获取本地注册表
*/
Applications getApplicationsFromLocalRegionOnly();
/**
* 根据应用名称从本地注册表或其他region的注册表中获取应用信息
*/
Application getApplication(String appName, boolean includeRemoteRegion);
/**
* 根据应用名称和实例id从本地注册表或其他region的注册表中获取实例信息
*/
InstanceInfo getInstanceByAppAndId(String appName, String id);
/**
* 根据应用名称和实例id从本地注册表或其他region的注册表中获取实例信息
*/
InstanceInfo getInstanceByAppAndId(String appName, String id, boolean includeRemoteRegions);
/**
* 清空注册表
*/
void clearRegistry();
// ========================
// 注册表缓存
// ========================
/**
* 初始化注册表缓存ResponseCacheImpl
*/
void initializedResponseCache();
/**
* 获取注册表缓存ResponseCacheImpl
*/
ResponseCache getResponseCache();
// ========================
// 租约过期机制&自我保护机制
// ========================
/**
* 获取上一分钟收到的续约(renew)请求数
*/
long getNumOfRenewsInLastMin();
/**
* 获取每一分钟续约(renew)请求数的阈值
* 如果上一分钟收到的续约请求数小于阈值,开启自我保护机制
* 计算方式:实例数量 * (60 / 续约间隔时间)* 续约百分比阈值0.85
* this.expectedNumberOfClientsSendingRenews *
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) *
* serverConfig.getRenewalPercentThreshold())
*/
int getNumOfRenewsPerMinThreshold();
/**
* 是否启用租约过期机制
*/
boolean isLeaseExpirationEnabled();
/**
* 是否启用自我保护机制
*/
boolean isSelfPreservationModeEnabled();
}
Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry 是 AbstractInstanceRegistry 抽象类的成员变量,key 是 remoteRegionUrlsWithName 配置中的 regionName,value 则是 initRemoteRegionRegistry() 方法中创建的RemoteRegionRegistry 对象。
// 配置remoteRegionUrlsWithName
regionName1;regionUrl1,regionName2;regionUrl2...
RemoteRegionRegistry 类表示其他区域的注册表信息,配置 remoteRegion.registryFetchIntervalInSeconds
指定从其他区域拉取注册表信息的间隔时间,默认 30s
。
拉取成功后,将 readyForServingData 设置为 true,表示该区域的注册表已经可以提供服务。
Runnable remoteRegionFetchTask = new Runnable() {
@Override
public void run() {
try {
if (fetchRegistry()) {
readyForServingData = true;
} else {
logger.warn("Failed to fetch remote registry. " +
"This means this eureka server " +
"is not ready for serving traffic.");
}
}
}
};
scheduler.schedule(
new TimedSupervisorTask(
"RemoteRegionFetch_" + regionName,
scheduler,
remoteRegionFetchExecutor,
// 配置remoteRegion.registryFetchIntervalInSeconds
serverConfig.getRemoteRegionRegistryFetchInterval(),
TimeUnit.SECONDS,
5, // exponential backoff bound
remoteRegionFetchTask
),
serverConfig.getRemoteRegionRegistryFetchInterval(),
TimeUnit.SECONDS);
remoteRegion.global.appWhiteList
和 remoteRegion.{regionName}.appWhiteList
配置全局和 regionName 指定区域的拉取白名单
,appName 不在白名单中的应用信息是无法拉取的。
PeerAwareInstanceRegistry 接口继承 InstanceRegistry 接口,提供以下功能:
public interface PeerAwareInstanceRegistry extends InstanceRegistry {
/**
* 初始化PeerAwareInstanceRegistryImpl,包括:
* 1.实例化注册表缓存ResponseCacheImpl
* 2.创建定时任务,定时更新numberOfRenewsPerMinThreshold
* 配置renewalThresholdUpdateIntervalMs
* 指定更新numberOfRenewsPerMinThreshold的时间间隔,默认15min
* 3.初始化其他区域注册表regionNameVSRemoteRegistry
*/
void init(PeerEurekaNodes peerEurekaNodes) throws Exception;
/**
* 是否可以对外提供注册表服务
* 1.如果在调用openForTraffic方法时
* 从其他Eureka节点拉取注册表失败则返回false
* 2.如果remoteRegionRequired为true
* 还需要等待其他区域注册表全部拉取成功后才返回true
*/
boolean shouldAllowAccess(boolean remoteRegionRequired);
/**
* 从其他Eureka节点拉取注册表信息
*/
int syncUp();
/**
* 注册实例信息
*/
void register(InstanceInfo info, boolean isReplication);
void statusUpdate(final String asgName,
final ASGResource.ASGStatus newStatus,
final boolean isReplication);
}
注:在 LeaseManager 接口中已经声明了 register(T r, int leaseDuration, boolean isReplication) 方法的前提下,为什么在 PeerAwareInstanceRegistry 接口中再次声明 register(InstanceInfo info, boolean isReplication) 方法呢?原来 register(InstanceInfo info, boolean isReplication) 方法是在 syncUp() 方法中被调用,是将从其他 Eureka 节点拉取过来的注册表中的实例信息注册到本地注册表中
。
// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
public int syncUp() {
// 统计从其他Eureka节点同步过来的实例信息数量
int count = 0;
// ...
// 从其他Eureka节点拉取注册表信息
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
// ...
// 判断该实例的availabilityZone是否和当前Eureka节点属于同一个region
// 如果是,则将该实例注册到本地注册表
if (isRegisterable(instance)) {
register(instance,
instance.getLeaseInfo().getDurationInSecs(),
true);
count++;
}
}
}
// ...
return count;
}
PeerAwareInstanceRegistryImpl 类构造方法和 init 方法代码如下:
@Singleton
public class PeerAwareInstanceRegistryImpl
extends AbstractInstanceRegistry
implements PeerAwareInstanceRegistry {
// startupTime、peerInstancesTransferEmptyOnStartup
// 在openForTraffic方法被调用时赋值
private long startupTime = 0;
private boolean peerInstancesTransferEmptyOnStartup = true;
// peerEurekaNodes在init方法被调用时赋值
protected volatile PeerEurekaNodes peerEurekaNodes;
// eurekaClient在构造方法被调用时赋值
protected final EurekaClient eurekaClient;
// instanceStatusOverrideRule在构造方法被调用时赋值
private final InstanceStatusOverrideRule instanceStatusOverrideRule;
// 定时调用updateRenewalThreshold方法
private Timer timer = new Timer(
"ReplicaAwareInstanceRegistry - RenewalThresholdUpdater", true);
@Inject
public PeerAwareInstanceRegistryImpl(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
EurekaClient eurekaClient) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN,
// then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule =
new FirstMatchWinsCompositeRule(
new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap),
new LeaseExistsRule());
}
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// 1.统计每分钟和其他Eureka节点的同步频率
this.numberOfReplicationsLastMin.start();
// 2.赋值peerEurekaNodes属性,保存Eureka集群节点信息
this.peerEurekaNodes = peerEurekaNodes;
// 3.创建本地注册表缓存ResponseCacheImpl
initializedResponseCache();
// 4.创建TimerTask,通过Timer调度updateRenewalThreshold方法
// 定时更新numberOfRenewsPerMinThreshold
scheduleRenewalThresholdUpdateTask();
// 5.创建其他区域的注册表RemoteRegionRegistry
initRemoteRegionRegistry();
// ...
}
}
-
实现了
PeerAwareInstanceRegistry
接口,通过eurekaClient 属性
获得了从其他 Eureka 节点拉取注册表
(PeerAwareInstanceRegistry#syncUp()
方法)的能力, -
通过
peerEurekaNodes 属性
获得了将本地注册表的更新同步
给其他 Eureka 节点(PeerAwareInstanceRegistryImpl#replicateToPeers()
方法)的能力
注:为什么不将 Eureka 节点之间同步更新数据的操作和拉取注册表的操作一起声明在 PeerAwareInstanceRegistry 接口中,而是另外通过 PeerEurekaNode 类去实现呢?
既然 numberOfRenewsPerMinThreshold 是通过实例数量实时计算为什么不将 numberOfRenewsPerMinThreshold 属性声明在 PeerAwareInstanceRegistryImpl 类中,而是声明在父类 AbstractInstanceRegistry 中?
instanceStatusOverrideRule 根据规则计算实例的