💖 Spring家族及微服务系列文章
✨【微服务】SpringCloud轮询拉取注册表及服务发现源码解析
✨【微服务】SpringCloud微服务续约源码解析
✨【微服务】SpringCloud微服务注册源码解析
✨【微服务】Nacos2.x服务发现?RPC调用?重试机制?
✨【微服务】Nacos通知客户端服务变更以及重试机制
✨【微服务】Nacos服务发现源码分析
✨【微服务】SpringBoot监听器机制以及在Nacos中的应用
✨【微服务】Nacos服务端完成微服务注册以及健康检查流程
✨【微服务】Nacos客户端微服务注册原理流程
✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理
✨【微服务】SpringBoot启动流程注册FeignClient
✨【微服务】SpringBoot启动流程初始化OpenFeign的入口
✨Spring Bean的生命周期
✨Spring事务原理
✨SpringBoot自动装配原理机制及过程
✨SpringBoot获取处理器流程
✨SpringBoot中处理器映射关系注册流程
✨Spring5.x中Bean初始化流程
✨Spring中Bean定义的注册流程
✨Spring的处理器映射器与适配器的架构设计
✨SpringMVC执行流程图解及源码
目录
编辑
💖 Spring家族及微服务系列文章
一、前言
二、Ribbon集成Eureka
1、入口
1.1、ZoneAwareLoadBalancer构造初始化
1.2、DynamicServerListLoadBalancer构造初始化
2、获取并更新服务列表到本地
2.1、更新服务的列表
1)DomainExtractingServerList组件
2.2、从EurekaClient获取服务列表
2.3、updateAllServerList(List ls)逻辑
2.4、设置到本地
2.5、更新到本地
3、动态更新服务列表
3.1、enableAndInitLearnNewServersFeature() 启动任务
3.2、延迟执行任务
3.3、执行更新
4、Ping机制
4.1、启动任务
4.2、任务
4.3、调用ping
4.4、pingServers逻辑
一、前言
在前面的文章中已经分析了Ribbon实现负载均衡的原理,我们这篇文章接着讲解ribbon如何集成eureka的,入口是什么?如何获取eureka服务列表,服务发现,缓存到本地?如何动态获取更新服务列表,周期性任务?如何选取一个可以用的服务Ping机制,周期性任务?CAS加锁、释放锁?
二、Ribbon集成Eureka
无论是Eureka还是Ribbon都是Netflix公司的两大开源项目,不过现在都不维护了。SpringCloud只是集成了它们,即底层实现是依赖于这两个项目去实现的。不过在高版本的SpringCloud当中移除了Ribbon,自己实现一套负载均衡解决方案,至于稳定性现在不好说。所以国人依然是以稳定性考虑,依然沿用JDK8,高版本的Spring使用了JDK17。再次提醒下一定要以稳定为主,不然背锅的可是你哦!!!
比较有意思的是,启动的时候没有打印关于Ribbon的日志,找了半天也没有找到还自我怀疑配置出错了。结果调用了一下,发现它这个时候才使用DefaultListableBeanFactory创建实例的,尤其上面圈的两个。
1、入口
在前面的文章《【微服务】SpringCloud中使用Ribbon实现负载均衡的原理》中讲解到,SpringBoot在自动装配的过程中,根据RibbonClientConfiguration中的配置默认装配ZoneAwareLoadBalancer的。ZoneAwareLoadBalancer是ribbon项目里面的,RibbonClientConfiguration是spring-cloud-netflix-ribbon里面的。
既然是接口与实现类我们不妨看看类的关系图
好像很复杂的样子,我们先对接口与实现类关系图有个印象,留个疑惑?其实父子关系,无非微妙在构造方法、方法重用、模板方法、super关键字、继承等。那么上面的自动装配使用了构造方法,我们看下有没有什么玄机。
注意:我们重点关注LoadBalancer链条。
1.1、ZoneAwareLoadBalancer构造初始化
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
// 通过super关键字调用父类有参构造方法
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
String name = Optional.ofNullable(getName()).orElse("default");
// 通过配置初始化字段
this.enabled = clientConfig.getGlobalProperty(ENABLED);
this.triggeringLoad = clientConfig.getGlobalProperty(TRIGGERING_LOAD_PER_SERVER_THRESHOLD.format(name));
this.triggeringBlackoutPercentage = clientConfig.getGlobalProperty(AVOID_ZONE_WITH_BLACKOUT_PERCENTAGE.format(name));
}
这里通过super关键字调用父类有参构造方法初始化几个Ribbon关键组件,思考一下如果不用super的话调用该构造方法父类会不会有什么变化。通过全局配置初始化一些字段。
1.2、DynamicServerListLoadBalancer构造初始化
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
// 通过super调用父类构造方法初始化
super(clientConfig, rule, ping);
// 初始化为DomainExtractingServerList
this.serverListImpl = serverList;
// 过滤器,初始化默认为ZonePreferenceServerListFilter
this.filter = filter;
// 服务列表更新器,初始化时默认为PollingServerListUpdater
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
// 重点关注该初始化
restOfInit(clientConfig);
}
serverList究竟是什么,我们可从入参追溯,发现跟自动装配有关。但是发现了两处都有 serverList,究竟谁才是最终赢家?我们不妨debugger下。
1)debugger负载均衡请求
这里我们debugger进入到NamedContextFactory(spring-cloud-commons项目里面的),红框里面的几个重点关注的单例Bean,有没有引起你的注意?有关Ribbon的不妨都去看看,说不定会发现新大陆哦!!!
2)ServerList装配
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config,
Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
DiscoveryEnabledNIWSServerList是ribbon项目里面的,DomainExtractingServerList是spring-cloud-netflix项目里面的,从上面的debugger图可见ribbonServerList即serverList为DomainExtractingServerList。它们的构造方法可以点击进去看看,下面会2.1的1)详细分析
3)restOfInit(IClientConfig clientConfig)集成Eureka入口
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
// 关闭此选项以避免在 BaseLoadBalancer.setServerList ()中执行重复的异步启动
this.setEnablePrimingConnections(false);
// 感知新的服务实例的添加/移除,即动态维护服务实例列表
enableAndInitLearnNewServersFeature();
// 更新Eureka client 中所有服务的实例列表,即初始化服务实例列表
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
主要逻辑:
- 感知新的服务实例的添加/移除,即动态维护服务实例列表。下面3分析
- 更新Eureka client 中所有服务的实例列表,即初始化服务实例列表。下面2分析
2、获取并更新服务列表到本地
2.1、更新服务的列表
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
// 获取服务
servers = serverListImpl.getUpdatedListOfServers();
// 从 Discovery 客户端获取{}的服务器列表: {}
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
// 过滤处理。这里有一部分逻辑,感兴趣的可以去看看
servers = filter.getFilteredListOfServers(servers);
// 从 Discovery 客户端获取的{}的筛选服务器列表: {}
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
// 更新全部服务列表
updateAllServerList(servers);
}
主要逻辑:
- 获取服务列表。serverListImpl在(1中2))初始化时默认为DomainExtractingServerList,而它是在spring-cloud-netfilx项目里面的,所以这里调用的是它的getUpdatedListOfServers()获取服务的,下面1)补充说明下。但是在ribbon项目里面你点击该方法发现直接跳转到下面获取服务列表了,这时很容易忽略了一些东西,下面1)进行补充。
- 过滤处理。这里有一部分逻辑,感兴趣的可以去看看
- 更新全部服务列表到ribbon本地
1)DomainExtractingServerList组件
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
// list为DiscoveryEnabledNIWSServerList的引用
this.list = list;
this.ribbon = RibbonProperties.from(clientConfig);
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers() {
// 委托DiscoveryEnabledNIWSServerList处理
List<DiscoveryEnabledServer> servers = setZones(
this.list.getInitialListOfServers());
return servers;
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
// 委托DiscoveryEnabledNIWSServerList处理
List<DiscoveryEnabledServer> servers = setZones(
this.list.getUpdatedListOfServers());
return servers;
}
可见在自动装配中初始化时list为DiscoveryEnabledNIWSServerList,而上面两个功能都是委托(ribbon项目里面的)DiscoveryEnabledNIWSServerList处理的。
2.2、从EurekaClient获取服务列表
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<>();
}
// 获取eurekaClient
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
// 如果 targetArea 为 null,它将被解释为 client 的相同区域
// 获取服务实例
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
// 遍历服务实例列表
for (InstanceInfo ii : listOfInstanceInfo) {
// 获取服务实例为UP状态的
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// 拷贝是必要的,因为 InstanceInfo 构建器只使用原始引用,而且我们不想破坏对象的全局
// eureka拷贝,因为它可能被我们系统中的其他客户机使用
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
// 转换为DiscoveryEnabledServer
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
// 添加到集合
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
// 如果当前的 vipAddress 有服务器,我们不使用后续的基于 vipAddress 的服务器
break;
}
}
}
return serverList;
}
主要逻辑:
- 获取eurekaClient,上一篇博客是否还有印象呢
- vipAddresses不为空,分隔然后遍历。1)获取服务实例列表。上一篇服务发现是否记得。2)遍历服务实例列表;3)过滤获取获取服务实例为UP即正常运行状态的;4)是否使用覆盖端口,如果是则拷贝。拷贝是必要的,因为 InstanceInfo 构建器只使用原始引用,而且我们不想破坏对象的全局eureka拷贝,因为它可能被我们系统中的其他客户机使用;5)转换为DiscoveryEnabledServer,添加到服务列表serverList;6)如果当前的vipAddress有服务器,我们不使用后续的基于vipAddress的服务
2.3、updateAllServerList(List<T> ls)逻辑
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
// 确保在多线程情况下只有一个线程进来
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
// 设置,以便客户端可以立即开始使用这些服务器,而不必等到ping周期结束。
s.setAlive(true);
}
// 更新服务列表
setServersList(ls);
// 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
super.forceQuickPing();
} finally {
// 还原以便下次CAS成功
serverListUpdateInProgress.set(false);
}
}
}
主要逻辑:
- CAS确保在多线程情况下只有一个线程进来
- 设置服务为活跃状态,以便客户端可以立即开始使用这些服务器,而不必等到ping周期结束。
- 更新服务列表。下面分析
- 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
- 还原以便下次CAS成功
2.4、设置到本地
@Override
public void setServersList(List lsrv) {
// 通过super调用父类方法更新到本地
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<>();
for (Server server : serverList) {
// 确保创建了ServerStats,以避免在热路径上创建它们
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
// 为区域设置服务器列表
setServerListForZones(serversInZones);
}
主要逻辑:
- 通过super调用父类方法更新到本地
- 遍历服务列表,确保创建了ServerStats,以避免在热路径上创建它们
- 为区域设置服务器列表
2.5、更新到本地
public void setServersList(List lsrv) {
Lock writeLock = allServerLock.writeLock();
logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
ArrayList<Server> newServers = new ArrayList<>();
// 加锁
writeLock.lock();
try {
ArrayList<Server> allServers = new ArrayList<>();
// 遍历入参
for (Object server : lsrv) {
if (server == null) {
continue;
}
if (server instanceof String) {
server = new Server((String) server);
}
if (server instanceof Server) {
logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId());
// 强制类型转换,添加到allServers列表
allServers.add((Server) server);
} else {
throw new IllegalArgumentException(
"Type String or Server expected, instead found:"
+ server.getClass());
}
}
boolean listChanged = false;
// 如果allServerList的值与allServers不等则说明发生了更新
if (!allServerList.equals(allServers)) {
listChanged = true;
if (changeListeners != null && changeListeners.size() > 0) {
List<Server> oldList = ImmutableList.copyOf(allServerList);
List<Server> newList = ImmutableList.copyOf(allServers);
for (ServerListChangeListener l: changeListeners) {
try {
// 服务列表变更
l.serverListChanged(oldList, newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
}
}
}
}
if (isEnablePrimingConnections()) {
for (Server server : allServers) {
if (!allServerList.contains(server)) {
server.setReadyToServe(false);
newServers.add((Server) server);
}
}
// 观察控制台没有走这里
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
// This will reset readyToServe flag to true on all servers
// regardless whether
// previous priming connections are success or not
// 这将在所有服务器上将readyToServe标志重置为true,无论以前的启动连接是否成功
// 覆盖现有的服务器列表
allServerList = allServers;
// 能否跳过ping,观察控制台执行下面forceQuickPing()逻辑,也就是走了listChanged = true;处理
if (canSkipPing()) {
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
} else if (listChanged) {
// 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
forceQuickPing();
}
} finally {
// 释放锁
writeLock.unlock();
}
}
主要逻辑:
- 加锁成功,遍历入参即服务列表
- 如果是Server类型,则强制类型转换,添加到allServers列表;否则抛异常
- 如果allServerList的值与allServers不等则说明发生了更新,发布服务列表变更事件
- 覆盖现有的服务器列表。这将在所有服务器上将readyToServe标志重置为true,无论以前的启动连接是否成功
- 能否跳过ping,观察控制台执行下面forceQuickPing()逻辑,也就是走了listChanged = true;处理;如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
- 释放锁,
3、动态更新服务列表
3.1、enableAndInitLearnNewServersFeature() 启动任务
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
根据自动装配serverListUpdater为PollingServerListUpdater,所以这里委托PollingServerListUpdater的start()方法处理。
3.2、延迟执行任务
@Override
public synchronized void start(final UpdateAction updateAction) {
// CAS确保并发场景只有一个线程进入
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = () -> {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
// 实际上交给DynamicServerListLoadBalancer的匿名内部类updateAction处理
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
};
// 延迟1秒执行,周期性隔30秒
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
主要逻辑:
- CAS确保并发场景只有一个线程进入
- 实际上交给DynamicServerListLoadBalancer的匿名内部类updateAction处理,下面分析
- 延迟1秒执行,周期性隔30秒执行任务
3.3、执行更新
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
实际上交给DynamicServerListLoadBalancer的匿名内部类updateAction处理,所以回到了DynamicServerListLoadBalancer。而updateListOfServers()就是我们上面2中分析的流程了。
4、Ping机制
细心观察控制台打印日志,你会发现还有一个周期性任务,复制然后搜索追踪,下面分析细节:
4.1、启动任务
在1的1.2以及LoadBalancer链条可知会进入到BaseLoadBalancer的构造方法做一些初始化工作,内部调用setPingInterval(pingIntervalTime)
public void setPingInterval(int pingIntervalSeconds) {
// pingIntervalSeconds默认30秒
if (pingIntervalSeconds < 1) {
return;
}
// 默认30秒
this.pingIntervalSeconds = pingIntervalSeconds;
if (logger.isDebugEnabled()) {
logger.debug("LoadBalancer [{}]: pingIntervalSeconds set to {}",
name, this.pingIntervalSeconds);
}
// 启动任务
setupPingTask(); // since ping data changed
}
void setupPingTask() {
// 是否跳过ping
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
// 终止此计时器,放弃任何当前计划的任务。不会干扰当前正在执行的任务(如果存在的话)。
// 一旦一个计时器被终止,它的执行线程就会优雅地终止,不会再有任何任务被安排在它上面。
// 注意,从这个计时器调用的a timer 任务的 run 方法中调用这个方法绝对可以保证正在进行的任务执行是这个计时器将要执行的最后一个任务执行。
// 此方法可以重复调用; 第二次和随后的调用没有效果。
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
// 执行BaseLoadBalancer的内部类PingTask#run(),默认每30秒执行一次
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
// 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
forceQuickPing();
}
主要逻辑:
- pingIntervalSeconds默认30秒,setupPingTask()启动任务
- 是否跳过ping,如果是return。一般不会跳过
- lbTimer不为空,终止此计时器,放弃任何当前计划的任务。不会干扰当前正在执行的任务(如果存在的话)。一旦一个计时器被终止,它的执行线程就会优雅地终止,不会再有任何任务被安排在它上面。 注意,从这个计时器调用的a timer 任务的 run 方法中调用这个方法绝对可以保证正在进行的任务执行是这个计时器将要执行的最后一个任务执行。此方法可以重复调用; 第二次和随后的调用没有效果。
- 执行BaseLoadBalancer的内部类PingTask#run(),默认每30秒执行一次
- 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
4.2、任务
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
public void runPinger() throws Exception {
// CAS确保并发下只有一个线程进来处理,注意并发度不是特别高场景
if (!pingInProgress.compareAndSet(false, true)) {
// CAS失败,说明有线程正在执行任务,就不需要再做了
return;
}
// 我们进去了,我们找到Ping了
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* ReadLock 应该是空闲的,除非 addServer 操作正在进行..。
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
// ping所有的服务,返回是否存活数组
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<>();
final List<Server> changedServers = new ArrayList<>();
for (int i = 0; i < numCandidates; i++) {
// 是否活跃状态
boolean isAlive = results[i];
// 取出一个服务
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
// 观察控制台没看到
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
// 这里做剔除操作,即活跃的留下来
if (isAlive) {
newUpList.add(svr);
}
}
// 加锁以便并发场景下只有一个线程可以做修改
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
// 释放锁
upLock.unlock();
// 发布服务状态变更事件
notifyServerStatusChangeListener(changedServers);
} finally {
// 还原以便下次CAS成功,有点类似于释放锁
pingInProgress.set(false);
}
}
}
PingTask 是BaseLoadBalancer的内部类,主要逻辑:
- CAS确保并发下只有一个线程进来处理,注意并发度不是特别高场景。如果CAS失败,说明有线程正在执行任务,就不需要再做了
- CAS成功,加锁。ReadLock 应该是空闲的,除非 addServer 操作正在进行..。赋值,释放锁。
- ping所有的服务,返回是否存活数组,下面分析
- 遍历服务个数,从是否存活数组results[i]取出一个状态,allServers[i]取出一个服务,如果服务是存活正常的则添加到newUpList;加锁以便并发场景下只有一个线程可以做修改upServerList,释放锁;发布服务状态变更事件
- 还原以便下次CAS成功,有点类似于释放锁
4.3、调用ping
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* 默认答案是服务挂了。*/
try {
// 注意: IFF 假设我们有一大组服务器(比如说15个) ,我们正在做一个真正的 Ping,
// 下面的逻辑将串行运行它们,因此需要15倍的时间来 Ping 每个服务器一个更好的方法是把它放在一个
// 执行池,但是,在写这篇文章的时候,我们没有真正使用一个真正的 Ping
// (它主要是在内存中的 eureka 调用) ,因此我们可以负担得起简化这个设计和运行这个串行
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
SerialPingStrategy是BaseLoadBalancer的内部类,主要逻辑:
- 从入参获取服务个数,初始化一个boolean类型数组results,长度为numCandidates
- 默认答案是服务挂了。
- ping不为空,开始ping。注意: IFF 假设我们有一大组服务器(比如说15个) ,我们正在做一个真正的 Ping,下面的逻辑将串行运行它们,因此需要15倍的时间来 Ping 每个服务器一个更好的方法是把它放在一个执行池,但是,在写这篇文章的时候,我们没有真正使用一个真正的 Ping(它主要是在内存中的 eureka 调用) ,因此我们可以负担得起简化这个设计和运行这个串行
在Ribbon中默认使用的是DummyPing,但是在SpringCloud中eureka集成ribbon时EurekaRibbonClientConfiguration配置了NIWSDiscoveryPing,所以覆盖掉了。
4.4、pingServers逻辑
public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
// 强制类型转换
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
// 获取服务实例
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo != null){
InstanceStatus status = instanceInfo.getStatus();
if (status != null){
// 如果服务实例的状态为UP则isAlive为true,否则false
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
主要逻辑:
- 强制类型转换为DiscoveryEnabledServer类型
- 获取服务实例,实例不为空,获取实例状态
- 实例状态不为空,如果服务实例的状态为UP则isAlive为true,否则false