背景
我们继续分析EurekaClient的两个自动化配置类:
自动化配置类 | 功能 | 职责 |
EurekaClientAutoConfiguration | 配置EurekaClient | 确保了Eureka客户端能够正确地: - 注册到Eureka服务端 - 周期性地发送心跳信息来更新服务租约 - 下线时通知Eureka服务端 - 获取服务实例列表; 更侧重于Eureka客户端的基本配置和功能实现 |
EurekaDiscoveryClientConfiguration | 配置EurekaDiscoveryClient | 创建RefreshScopeRefreshedEvent事件的监听类,用于重启注册; 更多地涉及到服务的自动注册、健康检查以及事件处理等方面 |
CloudEurekaClient分析
原理
客户端本质就是4个动作:
获取服务列表
注册服务实例
租约续约
取消租约
源码
让我们继续关注 第一个自动装配类 EurekaClientAutoConfiguration 对CloudEurekaClient 的构造封装,即如下代码块:
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
分析代码:
CloudEurekaClient对象,并交给容器管理bean
CloudEurekaClient
public class CloudEurekaClient extends DiscoveryClient {
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
}
分析代码:
实际上CloudEurekaClient调用了父类DiscoveryClient的构造器
DiscoveryClient
经历了多个重载构造器的嵌套,我们进入了最终的构造器:
private final ScheduledExecutorService scheduler;
// additional executors for supervised subtasks
private final ThreadPoolExecutor heartbeatExecutor;
private final ThreadPoolExecutor cacheRefreshExecutor;
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// .... 一些初始化工作
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// .......
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
// ...其他初始化工作
}
代码分析:
这里初始化了3个异步线程池:scheduler、heartbeatExecutor、cacheRefreshExecutor
scheduler:coreSize=2的周期任务线程池,线程名命名是DiscoveryClient-%s
heartbeatExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-HeartbeatExecutor-%d
cacheRefreshExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-CacheRefreshExecutor-%d
这三个线程池,是怎么配合工作的呢?不着急,慢慢往下看
initScheduledTasks()的代码如下:
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
// 【1】
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
// 【2】
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
分析代码:
【1】检查是否需要获取注册表信息(配置项eureka.client.fetchRegistry默认=true)
用注入的异步线程池cacheRefreshExecutor,按指定时间间隔registryFetchIntervalSeconds,去执行CacheRefreshThread,即缓存刷新refreshRegistry()任务
缓存刷新任务cacheRefreshTask
使用调度器 scheduler 安排任务
【2】检查是否需要注册入Eureka(配置项eureka.client.registerWithEureka默认=true)
用注入的异步线程池heartbeatExecutor,按指定时间间隔renewalIntervalInSecs,去执行HeartbeatThread,即执行续租renew()任务
心跳续租任务heartbeatTask
使用调度器 scheduler 安排任务
CacheRefreshThread - 缓存刷新
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
//.....
//【1】刷新本地注册服务列表
boolean success = fetchRegistry(remoteRegionsModified);
//.....
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
try {
// 【2】获取本地localRegionApps的服务列表
Applications applications = getApplications();
// 【3】获取远程数据并更新服务列表
getAndUpdateDelta(applications);
}
// registry was fetched successfully, so return true
return true;
}
private void getAndUpdateDelta(Applications applications) throws Throwable {
// .....
//【4】检查缓存delta的服务注册列表
Applications delta = null;
if (delta == null) {
// 【4.1】如果缓存为空,就再去拉取一次EurekaServer的数据
getAndStoreFullRegistry();
} else {
if (fetchRegistryUpdateLock.tryLock()) {
try {
//【5】获取EurekaServer最新的服务注册表,并执行delta更新
getAndStoreFullRegistry()
updateDelta(delta);
} finally {
fetchRegistryUpdateLock.unlock();
}
}
}
}
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
}
private void updateDelta(Applications delta) {
int deltaCount = 0;
//【6】遍历服务注册列表的每个app
for (Application app : delta.getRegisteredApplications()) {
//【7】遍历每个服务的所有实例instance
for (InstanceInfo instance : app.getInstances()) {
//【8】获取本地cache的服务注册信息
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {
remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}
++deltaCount;
//【9】如果实例是新增的类型
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
//【10】执行实例添加
applications.addApplication(app);
}
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
}
//【11】如果实例是修改的类型
else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
//【12】没有已有实例,执行添加操作
applications.addApplication(app);
}
//【13】存在已有实例,则注册新的实例信息
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
}
//【14】如果实例是删除的类型
else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {
//【15】删除这个服务的实例
existingApp.removeInstance(instance);
//【16】如果这个服务的实例数量=0,则直接删除服务信息app
if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
applications.removeApplication(existingApp);
}
}
}
}
}
}
代码分析:见下面流程图
HeartbeatThread - 心跳续租
private final Counter REREGISTER_COUNTER = Monitors.newCounter(PREFIX
+ "Reregister");
private class HeartbeatThread implements Runnable {
public void run() {
//【1】更新操作
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//【2】客户端发送心跳包,获取响应
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
//【3】响应码=404,说明服务在EurekaServer不存在
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
long timestamp = instanceInfo.setIsDirtyWithTime();
//【4】客户端重新发起一次register操作,给EurekaServer
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
//【5】EurekaServer注册成功,则续约成功
return success;
}
//【6】响应码=200,则在EurekaServer侧续约成功了
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
代码分析:见下面流程图
取消租约
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient");
}
}
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
if (cacheRefreshTask != null) {
cacheRefreshTask.cancel();
}
if (heartbeatTask != null) {
heartbeatTask.cancel();
}
}
代码分析:见下面流程图
小结
我们回到开头的原理,知道EurekaClient客户端本质就是4个动作:
获取服务列表:在CacheRefreshThread里有实现,即CacheRefreshThread的【4.1】步骤的eurekaTransport.queryClient.getApplications
注册服务实例:在HeartbeatThread里有实现,即HeartbeatThread的【4】步骤的eurekaTransport.registrationClient.register
租约续约:在HeartbeatThread里有实现,即HeartbeatThread的【2】步骤的eurekaTransport.registrationClient.sendHeartBeat
取消租约:在定义CloudEurekaClient的@Bean(destroyMethod = "shutdown")注解有生命
但我们还想知道,CacheRefreshThread 和 HeartbeatThread的背后通信,以及在EurekaServer的原理细节。可以,我们放到下一个章节再讲。
其他文章
Kafka消息堆积问题排查
基于SpringMVC的API灰度方案
理解到位:灾备和只读数据库
SQL治理经验谈:索引覆盖
Mybatis链路分析:JDK动态代理和责任链模式的应用
大模型安装部署、测试、接入SpringCloud应用体系
Mybatis插件-租户ID的注入&拦截应用