系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、初步定位问题
- 二、源码解释
- 1.引入库
- 核心问题代码
- 进一步往下看【这块儿算是只是拓展了,问题其实处在上面的代码】
- Nacos是如何实现的?
- 如何解决
- 总结
前言
背景:
使用了SpringCloudGateWay 和 SpringCloud 全套的组件,内网服务指之间通信和请求使用FeignClient (基于HTTP的)的一个客户端。
现况:
运维已经增加了5秒的服务下线等待,并且不会让流量在打到下线的服务,但是还是有相关请求路由到了不可用的服务节点上
一、初步定位问题
org.springframework.cloud.loadbalancer.cache.LoadBalancerCacheProperties 是配置SCG从Nacos获取
路由配置`表的一个缓存配置,默认是打开状态并且缓存了35秒。
由于SCG使用了LoadBanalcer请求转发 ,此配置会导致,猜测服务下线时候获取到了已经不可用的或者下线的节点会导致503、500、504或者请求超时等等…
源码断点
二、源码解释
1.引入库
核心问题代码
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.cache.CacheFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cloud.client.ServiceInstance;
/**
* A {@link ServiceInstanceListSupplier} implementation that tries retrieving
* {@link ServiceInstance} objects from cache; if none found, retrieves instances using
* {@link DiscoveryClientServiceInstanceListSupplier}.
*
* @author Spencer Gibb 大佬1
* @author Olga Maciaszek-Sharma 大佬2
* @since 2.2.0 2.20的版本
*/
public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
private static final Log log = LogFactory.getLog(CachingServiceInstanceListSupplier.class);
/**
* 缓存的名字
*/
public static final String SERVICE_INSTANCE_CACHE_NAME = CachingServiceInstanceListSupplier.class.getSimpleName()
+ "Cache";
/**
* 缓存Flux集合
*/
private final Flux<List<ServiceInstance>> serviceInstances;
@SuppressWarnings("unchecked")
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
super(delegate);//父类方法
//在CacheFlux中寻找对应serviceName的服务,这里使用了一个CacheFlux
//如何使用看这里:具体的API文档在这里【这个可能会在未来的版本会去掉】:https://projectreactor.io/docs/extra/release/api/reactor/cache/CacheFlux.html
this.serviceInstances = CacheFlux.lookup(key -> {
// TODO: configurable cache name 【这里似乎有一个TODO代码需要处理,可以指定缓存的的名字了~可能未来版本会支持】
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
//缓存不存在说明缓存管理器有问题,这里一般情况不会进来,是一个为了逻辑完整的代码
if (cache == null) {
if (log.isErrorEnabled()) {
log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);
}
return Mono.empty();
}
//找到了缓存,但是缓存为空,说明缓存不存在
List<ServiceInstance> list = cache.get(key, List.class);
if (list == null || list.isEmpty()) {
return Mono.empty();
}
//如果存在,则Flux.just返回这个缓存列表
return Flux.just(list).materialize().collectList();
}, delegate.getServiceId())
//如果换成没有命中从delegate获取一个数据来源从
.onCacheMissResume(delegate.get().take(1))
//写入缓存
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
if (log.isErrorEnabled()) {
log.error("Unable to find cache for writing: " + SERVICE_INSTANCE_CACHE_NAME);
}
}
else {
cache.put(key, instances);
}
}).then());
}
@Override
public Flux<List<ServiceInstance>> get() {
return serviceInstances;
}
}
进一步往下看【这块儿算是只是拓展了,问题其实处在上面的代码】
ServiceInstanceListSupplier
:在SCG的实现是 ==>DiscoveryClientServiceInstanceListSupplier:
如果需要看懂,为了更好了解 需要先了解一下WebFlux和链式调用你才能看懂下面的代码,或者你只看 delegate.getInstances()
这个方法也可以
源码解释
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import org.springframework.boot.convert.DurationStyle;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.core.env.Environment;
import static org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory.PROPERTY_NAME;
/**
* A discovery-client-based {@link ServiceInstanceListSupplier} implementation.
*
* @author Spencer Gibb
* @author Olga Maciaszek-Sharma
* @author Tim Ysewyn
* @since 2.2.0
*/
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
/**
* Property that establishes the timeout for calls to service discovery.
*/
public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";
private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);
private Duration timeout = Duration.ofSeconds(30);
private final String serviceId;
private final Flux<List<ServiceInstance>> serviceInstances;
/***
* 重点需要看下这个类的构造方法的serviceInstances
* 这是一个Flux的集合
*/
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
resolveTimeout(environment);
//delegate.getInstances 这个真正往集合中添加实例的方法,不同SpringCloud规范了这个接口,不同的组件实现它就好了,其他的WebFlux的一些方法可以忽略。
//我们处于SpringCloudGateWay中,所以SpringCloudGateWay 按照这个方式
this.serviceInstances = Flux.defer(() -> Flux.just(delegate.getInstances(serviceId)))
.subscribeOn(Schedulers.boundedElastic()).timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
});
}
public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
resolveTimeout(environment);
this.serviceInstances = Flux
.defer(() -> delegate.getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
}));
}
@Override
public String getServiceId() {
return serviceId;
}
@Override
public Flux<List<ServiceInstance>> get() {
return serviceInstances;
}
private void resolveTimeout(Environment environment) {
String providedTimeout = environment.getProperty(SERVICE_DISCOVERY_TIMEOUT);
if (providedTimeout != null) {
timeout = DurationStyle.detectAndParse(providedTimeout);
}
}
private void logTimeout() {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Timeout occurred while retrieving instances for service %s."
+ "The instances could not be retrieved during %s", serviceId, timeout));
}
}
private void logException(Throwable error) {
LOG.error(String.format("Exception occurred while retrieving instances for service %s", serviceId), error);
}
}
Nacos是如何实现的?
其中:实际走的是Nacos为其实现的获取Nacos实例的Reactive的实现
NacosReactiveDiscoveryClient: 核心方法 loadInstancesFromNacos()【哈哈哈 一看就是中国人写的 这方法名字】
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.nacos.discovery.reactive;
import java.util.function.Function;
import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery;
import com.alibaba.nacos.api.exception.NacosException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
/**
* @author <a href="mailto:echooy.mxq@gmail.com">echooymxq</a>
**/
public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient {
private static final Logger log = LoggerFactory
.getLogger(NacosReactiveDiscoveryClient.class);
private NacosServiceDiscovery serviceDiscovery;
public NacosReactiveDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
this.serviceDiscovery = nacosServiceDiscovery;
}
@Override
public String description() {
return "Spring Cloud Nacos Reactive Discovery Client";
}
@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
return Mono.justOrEmpty(serviceId).flatMapMany(loadInstancesFromNacos())
.subscribeOn(Schedulers.boundedElastic());
}
private Function<String, Publisher<ServiceInstance>> loadInstancesFromNacos() {
return serviceId -> {
try {
return Flux.fromIterable(serviceDiscovery.getInstances(serviceId));
}
catch (NacosException e) {
log.error("get service instance[{}] from nacos error!", serviceId, e);
return Flux.empty();
}
};
}
@Override
public Flux<String> getServices() {
return Flux.defer(() -> {
try {
return Flux.fromIterable(serviceDiscovery.getServices());
}
catch (Exception e) {
log.error("get services from nacos server fail,", e);
return Flux.empty();
}
}).subscribeOn(Schedulers.boundedElastic());
}
}
到此SPG如何从Nacos中获取服务列表就梳理完毕,同样的,服务和服务之间内网之间的调用应该也是换汤不换药
如何解决
- 增加配置,关闭缓存,增加SCG的bootstrap.yml配置,当使用这个配置以后,就不会走上述逻辑。
spring:
cloud:
loadbalancer:
cache:
enabled: false # 是否启用缓存
-
带来的问题
- 不使用缓存,似乎可以解决了上述的问题,但是没有缓存似乎会对Nacos带来一定的压力。问题为甚是35s ,35s不会频繁失效不会带来”内存风暴“吗?(这块需要了解)
- 35秒的缓存设计通常与“Refresh-ahead”策略有关。这是一种缓存预热策略,用于在数据过期之前提前刷新缓存数据,适用于那些预计在不久的将来会被频繁请求的热数据。例如,如果缓存数据的过期时间设置为60秒,刷新提前系数设置为0.5,那么在数据实际过期前的30秒(即在第35秒时),缓存就会异步刷新数据。这样做的好处是在高流量系统中,可以在下一次可能的缓存访问之前更新缓存,避免缓存失效时的突然流量峰值,从而提高系统的性能和用户体验。在实际应用中,这种策略可以确保缓存中的数据始终保持最新状态,减少因缓存失效导致的数据库压力。例如,在一些高流量的Web应用中,通过提前刷新缓存,可以避免在缓存数据过期时大量用户同时请求数据库,从而减少数据库的负载并提高响应速度。此外,缓存设计还需要考虑其他因素,如缓存大小、缓存命中率、缓存未命中率等,以确保缓存系统的整体性能和效率。不同的应用场景可能需要不同的缓存策略组合,以适应特定的读/写访问模式。例如,写密集型应用可能需要结合使用Write-Through、Write-back或Write-around策略,而读密集型应用则可能更侧重于Read-through或Refresh-ahead策略。选择合适的缓存策略对于提高系统性能和用户体验至关重要。
-
最终解决方案
不建议关闭这缓存,会导致Nacos压力过大,所以这边解决方案是最后修改了滚动更新时候的旧Pod的存活时间
总结
与运维人员配合修改Pod 实例的”存活窗口时间“大于缓存的35秒即可