源码解析-Spring Eureka
文章目录
- 源码解析-Spring Eureka
- 前言
- 一、从Spring.factory和注解开始
- 二、重要的一步EurekaServerInitializerConfiguration
- 三、初始化了什么?
- 自动保护
- 四, 重新回到EurekaServerAutoConfiguration
前言
无
一、从Spring.factory和注解开始
我们可以看到,Eureka通过spring boot的自动配置机制引入了一个类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
通过这个配置我们找到对应的配置类,可以看到,这个配置类使用了Marker作为条件注入
@Configuration(
proxyBeanMethods = false
)
@Import({EurekaServerInitializerConfiguration.class})
@ConditionalOnBean({EurekaServerMarkerConfiguration.Marker.class})
@EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
@PropertySource({"classpath:/eureka/server.properties"})
public class EurekaServerAutoConfiguration implements WebMvcConfigurer
这个时候我们返回查看我们配置一个eureka所需要的基本注解可以看到,我们正在这个这个@EnableEurekaServer 注解里面初始化了这个类
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.springframework.cloud.netflix.eureka.server;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({EurekaServerMarkerConfiguration.class})
public @interface EnableEurekaServer {
}
通过spring.factory的自动配置以及@EnableEurekaServer 就可以实现eureka服务端的手动注入(通过加入注解)
二、重要的一步EurekaServerInitializerConfiguration
在上面的EurekaServerAutoConfiguration里面我们可以看到它import了一个初始化类
注意在这个初始化类实现了SmartLifeCycle接口,实现了其Start方法
@Configuration(
proxyBeanMethods = false
)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered
实现的start方法,会在bean在启动的时候调用,该方法会new一个线程并发布订阅
public void start() {
(new Thread(() -> {
try {
this.eurekaServerBootstrap.contextInitialized(this.servletContext);
log.info("Started Eureka Server");
this.publish(new EurekaRegistryAvailableEvent(this.getEurekaServerConfig()));
this.running = true;
this.publish(new EurekaServerStartedEvent(this.getEurekaServerConfig()));
} catch (Exception var2) {
log.error("Could not initialize Eureka servlet context", var2);
}
})).start();
}
可以看到,通过这个start方法,eureka初始化了它自己的context上下文并发布了一些事件。
三、初始化了什么?
进入到contextInitialized方法,我们可以看到
public void contextInitialized(ServletContext context) {
try {
this.initEurekaEnvironment();
this.initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
} catch (Throwable var3) {
log.error("Cannot bootstrap eureka server :", var3);
throw new RuntimeException("Cannot bootstrap eureka server :", var3);
}
}
eureka首先初始化了配置信息,然后进行上下文的初始化
protected void initEurekaServerContext() throws Exception {
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
if (this.isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
EurekaMonitors.registerAllStats();
}
进入到initEurekaServerContext方法,我们可以看到几个重要的方法
在openForTraffic方法里面
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
this.expectedNumberOfClientsSendingRenews = count;
this.updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", this.numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && this.serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
this.primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
我们重点关注这里的super.postInit()
protected void postInit() {
this.renewsLastMin.start();
if (this.evictionTaskRef.get() != null) {
((EvictionTask)this.evictionTaskRef.get()).cancel();
}
this.evictionTaskRef.set(new EvictionTask());
this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());
}
可以看到this.evictionTaskRef.set(new EvictionTask());,这里注册了一个剔除任务
int registrySize = (int)this.getLocalRegistrySize();
int registrySizeThreshold = (int)((double)registrySize * this.serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
这里的剔除与eureka配置里面的自我保护配置有关
自动保护
在eureka中,如果打开了自我保护配置并设置了剔除阈值,eureka集群就会在计算正常超过阈值的时候执行上面的代码把的节点给剔除
- 如果现在有10个节点,7个节点是正常,3个节点是由有问题的,阈值设置了80%,这个时候7个节点中的一个节点出现了问题,但是没有超过阈值(变成了60%),这个时候就会访问到失败的节点
- 如果现在有100个节点,3个节点有问题,阈值也是80%,现在的值是(97%)超过了阈值,如果这个时候有节点出现问题则会立即剔除,
- 但是不能把自我保护关闭,如果3个节点是因为波动导致的暂时访问不到则会立即被剔除
eureka:
server:
enable-self-preservation: true
eureka:
server:
renewal-percent-threshold: 0.85
我们再进到syncUp方法里面
public int syncUp() {
int count = 0;
for(int i = 0; i < this.serverConfig.getRegistrySyncRetries() && count == 0; ++i) {
if (i > 0) {
try {
Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException var10) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = this.eurekaClient.getApplications();
Iterator var4 = apps.getRegisteredApplications().iterator();
可以看到当一个eureka服务启动的时候,会作为一个eureka客户端去peer节点拉取配置(这也是eureka为什么不是强一致性的)
四, 重新回到EurekaServerAutoConfiguration
首先就是eureka的控制器类,eureka的dashboard上面的数据通过这个控制器(springWeb)来获取
@Bean
@ConditionalOnProperty(
prefix = "eureka.dashboard",
name = {"enabled"},
matchIfMissing = true
)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
接着再eurekaServerContext里面实例化了属于eureka的上下文
@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
}
进入context的初始化方法,可以看在context初始化里面有重要的一环就是设置三级缓存initializedResponseCache
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
this.initializedResponseCache();
this.scheduleRenewalThresholdUpdateTask();
this.initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable var3) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", var3);
}
}
里面有一个定时任务,就是定期刷新缓存
if (this.shouldUseReadOnlyResponseCache) {
this.timer.schedule(this.getCacheUpdateTask(), new Date(System.currentTimeMillis() / responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs);
}
继续查看自动配置类,可以看到,eureka通过jersey框架包装了注册服务
@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(Application eurekaJerseyApp) {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Integer.MAX_VALUE);
bean.setUrlPatterns(Collections.singletonList("/eureka/*"));
return bean;
}
再看jerseyApplication,这里面会对eureka的包路径进行扫描, 并将其中的候选类进行注入,其中非常重要的就是resource目录下的applicationsresoure,该方法会返回一个Application的Bean(SPring的config加Bean)
@Bean
public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
Set<Class<?>> classes = new HashSet();
String[] var5 = EUREKA_PACKAGES;
int var6 = var5.length;
for(int var7 = 0; var7 < var6; ++var7) {
String basePackage = var5[var7];
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
Iterator var10 = beans.iterator();
while(var10.hasNext()) {
BeanDefinition bd = (BeanDefinition)var10.next();
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
classes.add(cls);
}
}
Map<String, Object> propsAndFeatures = new HashMap();
propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
在applicationsResource里面,通过jersey编写一系列关于**注册中心的“注册”,“取消”,“续约”**等的HTTP方法
我们先来看获取ALLKey的方法(拉取所有配置)
@GET
public Response getContainers(@PathParam("version") String version, @HeaderParam("Accept") String acceptHeader, @HeaderParam("Accept-Encoding") String acceptEncoding, @HeaderParam("X-Eureka-Accept") String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions);
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
if (!this.registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
} else {
CurrentRequestVersion.set(Version.toEnum(version));
Key.KeyType keyType = KeyType.JSON;
String returnMediaType = "application/json";
if (acceptHeader == null || !acceptHeader.contains("json")) {
keyType = KeyType.XML;
returnMediaType = "application/xml";
}
Key cacheKey = new Key(EntityType.Application, "ALL_APPS", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
Response response;
if (acceptEncoding != null && acceptEncoding.contains("gzip")) {
response = Response.ok(this.responseCache.getGZIP(cacheKey)).header("Content-Encoding", "gzip").header("Content-Type", returnMediaType).build();
} else {
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
CurrentRequestVersion.remove();
return response;
}
}
其中ResponseCache注入了了eureka自己实现的三级缓存的getValue方法
@VisibleForTesting
Value getValue(Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
Value currentPayload = (Value)this.readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = (Value)this.readWriteCacheMap.get(key);
this.readOnlyCacheMap.put(key, payload);
}
} else {
payload = (Value)this.readWriteCacheMap.get(key);
}
} catch (Throwable var5) {
logger.error("Cannot get value for key : {}", key, var5);
}
return payload;
}
我们再来看在eureka中一个服务是如何被维护的
在applicationResoure中,有添加服务的方法
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
if (this.isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (this.isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (this.isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (this.isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!this.appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
} else {
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
if (this.isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
}
if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
this.registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
}
}
进去到register方法里面,我们可以看到,eureka首先向自己注册了当前服务,然后同步到了peer节点上面
public void register(InstanceInfo info, boolean isReplication) {
int leaseDuration = 90;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceInfo.InstanceStatus)null, isReplication);
}
可以看到,register里面,我们的InstanceInfo 被一个Map<String, Lease> gMap维护着
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
this.read.lock();
try {
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
EurekaMonitors.REGISTER.increment(isReplication);
if (gMap == null) {
ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
我们看下Lease的结构,通过下面的这种结构,我们只需要修改Lease的long信息就可以对当前节点的生命状态进行修改而不需要修改节点本身
public class Lease<T> {
public static final int DEFAULT_DURATION_IN_SECS = 90;
private T holder; // 具体的实例信息
// 一些用于维护节点状态的时间信息
private long evictionTimestamp;
private long registrationTimestamp;
private long serviceUpTimestamp;
private volatile long lastUpdateTimestamp;
private long duration;