【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理
目录
- 4. 负载均衡
- 4.1 提供者DiscoveryClient
- 4.1.1 CompositeDiscoveryClient
- 4.1.2 EurekaDiscoveryClient
- 4.1.3 SimpleDiscoveryClient
- 4.1.4 自定义DiscoveryClient
- 4.2 过滤器Supplier
- 4.2.1 CachingServiceInstanceListSupplier
- yml配置
- Configuration配置
- 创建
- 使用
- 4.2.2 ZonePreferenceServiceInstanceListSupplier
- yml配置
- Configuration配置
- 创建
- 使用
- 4.2.3 HealthCheckServiceInstanceListSupplier
- yml配置
- Configuration配置
- 创建
- 使用
- 实践
- 4.2.4 RequestBasedStickySessionServiceInstanceListSupplier
- yml配置
- Configuration配置
- 创建
- 使用
- 实践
- 4.2.5 SameInstancePreferenceServiceInstanceListSupplier
- yml配置
- Configuration配置
- 创建
- 使用
- 4.2.6 RetryAwareServiceInstanceListSupplier
- yml配置
- Configuration配置
- LoadBalancedRetryPolicy
- RetryTemplate
- RetryTemplate.execute()
- 创建
4. 负载均衡
loadBalancer采用了webFlux响应式编程,只有在真正choose的时候才会去拉取数据,所以它的顺序是一层一层往下拉取真正的数据,然后再一层一层向上筛选过滤数据实现负载均衡,接下来我们分析下这些Supplier
4.1 提供者DiscoveryClient
对于DiscoveryClientServiceInstanceListSupplier它不会单独使用,而是作为其他Supplier的delegate,也就是兜底用的,所以它的作用至关重要。先看下它的构造函数
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
private Duration timeout = Duration.ofSeconds(30);
private final String serviceId;
private final Flux<List<ServiceInstance>> serviceInstances;
// delegate默认是DiscoveryClient的实现类CompositeDiscoveryClient,见后文的分析
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
// timeout为配置中的spring.cloud.loadbalancer.service-discovery.timeout
resolveTimeout(environment);
// 这里采用了响应式编程,这里仅仅是定义,当调用get()函数获取的时候才会真正的拿数据
this.serviceInstances = Flux.defer(
// 调用delegate,即CompositeDiscoveryClient获取实例信息
() -> Mono.fromCallable(() -> delegate.getInstances(serviceId)))
.timeout(timeout, Flux.defer(() -> {
// 超时返回空值
logTimeout();
return Flux.just(new ArrayList<>());
}), Schedulers.boundedElastic()).onErrorResume(error -> {
// 异常返回空值
logException(error);
return Flux.just(new ArrayList<>());
});
}
}
DiscoveryClientServiceInstanceListSupplier最重要的是DiscoveryClient,它才是真正获取ServiceInstance的类,先看下DiscoveryClient的类图
DiscoveryClient提供了两个方法,最重要的是getInstances,默认的实现是CompositeDiscoveryClient,它的成员参数discoveryClients默认下是EurekaDiscoveryClient和SimpleDiscoveryClient,除了这两个也可以自定义。接下来我们着重分下三个client的配置和getInstances方法
4.1.1 CompositeDiscoveryClient
public class CompositeDiscoveryClientAutoConfiguration {
@Bean
// 声明默认情况使用CompositeDiscoveryClient
@Primary
public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
return new CompositeDiscoveryClient(discoveryClients);
}
}
public class CompositeDiscoveryClient implements DiscoveryClient {
@Override
public List<ServiceInstance> getInstances(String serviceId) {
if (this.discoveryClients != null) {
// 默认情况下EurekaDiscoveryClient,SimpleDiscoveryClient
// 如果要使用SimpleDiscoveryClient,可以使用以下两种方式:
// 第一eureka.client.enabled为false,这样就不会注入EurekaDiscoveryClient类
// 第二eureka.client.fetch-registry为false,这样就不会获取registry信息,那么EurekaDiscoveryClient拿到的值为空
for (DiscoveryClient discoveryClient : this.discoveryClients) {
List<ServiceInstance> instances = discoveryClient.getInstances(serviceId);
if (instances != null && !instances.isEmpty()) {
// 一旦获得ServiceInstance就立即返回,后面的client就废弃了
return instances;
}
}
}
return Collections.emptyList();
}
}
4.1.2 EurekaDiscoveryClient
// eureka.client.enabled为true才开启,默认下为true
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
public class EurekaDiscoveryClientConfiguration {
@Bean
@ConditionalOnMissingBean
public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
return new EurekaDiscoveryClient(client, clientConfig);
}
}
public class EurekaDiscoveryClient implements DiscoveryClient {
// eurekaClient即Eureka中的DiscoveryClient类
private final EurekaClient eurekaClient;
// eureka.client.*的配置
private final EurekaClientConfig clientConfig;
@Override
public List<ServiceInstance> getInstances(String serviceId) {
// 调用DiscoveryClient.getInstancesByVipAddress,这里拿到的instance即DiscoveryClient中refreshRegistry方法定时更新的实例列表
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) {
// 封装成EurekaServiceInstance
instances.add(new EurekaServiceInstance(info));
}
return instances;
}
}
4.1.3 SimpleDiscoveryClient
public class SimpleDiscoveryClientAutoConfiguration implements ApplicationListener<WebServerInitializedEvent> {
// spring.cloud.discovery.client.simple.*
private SimpleDiscoveryProperties simple = new SimpleDiscoveryProperties();
@Bean
@Order
public DiscoveryClient simpleDiscoveryClient(SimpleDiscoveryProperties properties) {
return new SimpleDiscoveryClient(properties);
}
}
public class SimpleDiscoveryClient implements DiscoveryClient {
@Override
public List<ServiceInstance> getInstances(String serviceId) {
List<ServiceInstance> serviceInstances = new ArrayList<>();
// 这里返回的instances为yml文件中的spring.cloud.discovery.client.simple.instances的配置
List<DefaultServiceInstance> serviceInstanceForService = this.simpleDiscoveryProperties.getInstances()
.get(serviceId);
if (serviceInstanceForService != null) {
serviceInstances.addAll(serviceInstanceForService);
}
return serviceInstances;
}
}
看下spring.cloud.discovery.client.simple
的配置例子
spring:
application:
name: user
cloud:
discovery:
client:
simple:
instances:
product:
- instance-id: localhost:product:5001
service-id: product
host: localhost
port: 5001
secure: false
uri:
host: localhost
port: 5001
metadata:
my: my
- instance-id: localhost:product:5002
service-id: product
host: localhost
port: 5002
secure: false
uri:
host: localhost
port: 5002
metadata:
my: my
4.1.4 自定义DiscoveryClient
// 1、注入Spring容器
@Component
// 2、实现DiscoveryClient
public class MyDiscoveryClient implements DiscoveryClient{
@Override
public String description() {
return null;
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
return null;
}
@Override
public List<String> getServices() {
return null;
}
}
4.2 过滤器Supplier
先看下ServiceInstanceListSupplier类图
顶层接口为Supplier,唯一的方法为get(),它的作用就是提供服务,loadBalance中提供List<ServiceInstance>
,接下来我们逐一分析每一个Supplier的使用
4.2.1 CachingServiceInstanceListSupplier
yml配置
spring:
cloud:
loadbalancer:
configurations: default # 默认值,即默认使用CachingServiceInstanceListSupplier
cache:
ttl: 35 # 缓存过期时间
capacity: 256 # 缓存最大容量
caffeine:
spec: initialCapacity=500,expireAfterWrite=5s # CaffeineBasedLoadBalancerCacheManager的相关配置
Configuration配置
public static class BlockingSupportConfiguration {
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
// DefaultConfigurationCondition即spring.cloud.loadbalancer.configurations = default,默认值
@Conditional(DefaultConfigurationCondition.class)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder() // 返回ServiceInstanceListSupplierBuilder
.withBlockingDiscoveryClient() // A DiscoveryClientServiceInstanceListSupplier
.withCaching() // B CachingServiceInstanceListSupplier
.build(context); // C
}
}
创建
public final class ServiceInstanceListSupplierBuilder {
private Creator baseCreator;
private DelegateCreator cachingCreator;
public ServiceInstanceListSupplierBuilder withBlockingDiscoveryClient() {
this.baseCreator = context -> {
// A
// 注意DiscoveryClient是接口,不是eureka那个DiscoveryClient类
DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
// 创建DiscoveryClientServiceInstanceListSupplier,真正提供实例的Supplier
return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
};
return this;
}
public ServiceInstanceListSupplierBuilder withCaching() {
this.cachingCreator = (context, delegate) -> {
// B, 注意这里用到的缓存管理器为LoadBalancerCacheManager,后面会详细分析
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
.getBeanProvider(LoadBalancerCacheManager.class);
if (cacheManagerProvider.getIfAvailable() != null) {
// 创建CachingServiceInstanceListSupplier
return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable());
}
return delegate;
};
return this;
}
// C
public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
// apply方法即调用A处的代码获的DiscoveryClientServiceInstanceListSupplier
ServiceInstanceListSupplier supplier = baseCreator.apply(context);
......
if (this.cachingCreator != null) {
// apply方法即调用B处的代码,将DiscoveryClientServiceInstanceListSupplier作为delegate
// 创建CachingServiceInstanceListSupplier
supplier = this.cachingCreator.apply(context, supplier);
}
return supplier;
}
}
使用
public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
private final Flux<List<ServiceInstance>> serviceInstances;
@SuppressWarnings("unchecked")
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
super(delegate);
this.serviceInstances = CacheFlux.lookup(key -> {
// 获取缓存容器,缓存的管理使用DefaultLoadBalancerCacheManager,见后文分析
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
// 根据serviceId获取List<ServiceInstance>
List<ServiceInstance> list = cache.get(key, List.class);
return Flux.just(list).materialize().collectList(); // 封装成Flux<List<ServiceInstance>>
}, delegate.getServiceId()).
// 如果缓存为空则调用deletdate(即DiscoveryClientServiceInstanceListSupplier)获取
onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
// 从DiscoveryClientServiceInstanceListSupplier获取后更新缓存
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
....
cache.put(key, instances);
}).then());
}
// 实现get()
@Override
public Flux<List<ServiceInstance>> get() {
return serviceInstances;
}
}
CachingServiceInstanceListSupplier用到了CacheManager作为缓存管理器,先看下其类图
默认情况下使用的是DefaultLoadBalancerCacheManager,如果要使用CaffeineBasedLoadBalancerCacheManager需要引入下面的依赖
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.2</version>
</dependency>
分析下默认的DefaultLoadBalancerCacheManager,下面是它的结构图
再看下其构造函数,了解它的工作原理
public class DefaultLoadBalancerCacheManager implements LoadBalancerCacheManager {
private final ConcurrentMap<String, Cache> cacheMap = new ConcurrentHashMap<>(16);
public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties,
String... cacheNames) {
// 2、创建默认的cache
cacheMap.putAll(createCaches(cacheNames, loadBalancerCacheProperties).stream()
.collect(Collectors.toMap(DefaultLoadBalancerCache::getName, cache -> cache)));
}
public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties) {
// 1、 默认下只创建一个SERVICE_INSTANCE_CACHE_NAME的cache
this(loadBalancerCacheProperties, SERVICE_INSTANCE_CACHE_NAME);
}
private Set<DefaultLoadBalancerCache> createCaches(String[] cacheNames,
LoadBalancerCacheProperties loadBalancerCacheProperties) {
return Arrays.stream(cacheNames).distinct()
.map(name ->
// 3、创建SERVICE_INSTANCE_CACHE_NAME的cache
new DefaultLoadBalancerCache(name,
// 定义serviceId -> List<ServiceInstance>的map
new ConcurrentHashMapWithTimedEviction<>(
// spring.cloud.loadbalancer.cache.capacity,默认256
loadBalancerCacheProperties.getCapacity(),
// 用来定期更新缓存,即删除过期缓存
new DelayedTaskEvictionScheduler<>(aScheduledDaemonThreadExecutor())
),
// 缓存过期时间spring.cloud.loadbalancer.cache.ttl,默认35秒
loadBalancerCacheProperties.getTtl().toMillis(),
false))
.collect(Collectors.toSet());
}
}
4.2.2 ZonePreferenceServiceInstanceListSupplier
yml配置
spring:
cloud:
loadbalancer:
configurations: zone-preference # 开启**ZonePreferenceServiceInstanceListSupplier**
zone: myZone # 偏好这个zone的serviceInstance
Configuration配置
public static class BlockingSupportConfiguration {
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
// ZonePreferenceConfigurationCondition即spring.cloud.loadbalancer.configurations = zone-preference
@Conditional(ZonePreferenceConfigurationCondition.class)
public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withBlockingDiscoveryClient() // DiscoveryClientServiceInstanceListSupplier
.withZonePreference() // A
.withCaching(). // CachingServiceInstanceListSupplier
build(context); // B
}
}
创建
public final class ServiceInstanceListSupplierBuilder {
public ServiceInstanceListSupplierBuilder withZonePreference() {
DelegateCreator creator = (context, delegate) -> {
// A, 创建ZonePreferenceServiceInstanceListSupplier
LoadBalancerZoneConfig zoneConfig = context.getBean(LoadBalancerZoneConfig.class);
return new ZonePreferenceServiceInstanceListSupplier(delegate, zoneConfig);
};
// 添加到creators
this.creators.add(creator);
return this;
}
// B
// 从配置文件中可以看出zone-preference配置创建出来的Supplier为:
// CachingServiceInstanceListSupplier <- ZonePreferenceServiceInstanceListSupplier <- DiscoveryClientServiceInstanceListSupplier
// 也就是默认的Supplier添加zone过滤器
public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
// 底部DiscoveryClientServiceInstanceListSupplier
ServiceInstanceListSupplier supplier = baseCreator.apply(context);
// 中间ZonePreferenceServiceInstanceListSupplier
for (DelegateCreator creator : creators) {
supplier = creator.apply(context, supplier);
}
// 上层CachingServiceInstanceListSupplier
if (this.cachingCreator != null) {
supplier = this.cachingCreator.apply(context, supplier);
}
return supplier;
}
}
使用
public class ZonePreferenceServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
@Override
public Flux<List<ServiceInstance>> get() {
// 将DiscoveryClientServiceInstanceListSupplier获取到的List<ServiceInstance>过滤
return getDelegate().get().map(this::filteredByZone);
}
private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
if (zone == null) {
// zoneConfig为LoadBalancerZoneConfig,即spring.cloud.loadbalancer.zone的值
zone = zoneConfig.getZone();
}
// 判断是否配置spring.cloud.loadbalancer.zone的值,即保留这个zone的serviceInstance
if (zone != null) {
List<ServiceInstance> filteredInstances = new ArrayList<>();
for (ServiceInstance serviceInstance : serviceInstances) {
String instanceZone = getZone(serviceInstance);
if (zone.equalsIgnoreCase(instanceZone)) {
filteredInstances.add(serviceInstance);
}
}
if (filteredInstances.size() > 0) {
// 如果有则返回符合条件的
return filteredInstances;
}
}
// 如果filteredInstances为空则返回原值,确保不为空
return serviceInstances;
}
private String getZone(ServiceInstance serviceInstance) {
// 如果是从EurekaDiscoveryClient来的,则是其他微服务实例配置文件中的eureka.instance.metadata
// 如果是从SimpleDiscoveryClient来的,则是自身配置文件中的spring.cloud.loadbalancer.client.simple.instances.{serviceId}[index].metadata
Map<String, String> metadata = serviceInstance.getMetadata();
if (metadata != null) {
return metadata.get(ZONE);
}
return null;
}
}
4.2.3 HealthCheckServiceInstanceListSupplier
yml配置
spring:
cloud:
loadbalancer:
configurations: health-check # 这里一定要是health-check才能开启HealthCheckServiceInstanceListSupplier
clients:
product:
health-check:
initial-delay: 0
interval: 25
refetch-instances: false
refetch-instances-interval: 25
path:
default: /actuator/health
port: 0
repeat-health-check: true
Configuration配置
public static class BlockingSupportConfiguration {
@Bean
@ConditionalOnBean({ DiscoveryClient.class, RestTemplate.class })
@ConditionalOnMissingBean
// HealthCheckConfigurationCondition即spring.cloud.loadbalancer.configurations = health-check
@Conditional(HealthCheckConfigurationCondition.class)
public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withBlockingDiscoveryClient() // DiscoveryClientServiceInstanceListSupplier
.withBlockingHealthChecks() // A、HealthCheckServiceInstanceListSupplier
.build(context); // B
}
}
创建
public final class ServiceInstanceListSupplierBuilder {
public ServiceInstanceListSupplierBuilder withBlockingHealthChecks() {
DelegateCreator creator = (context, delegate) -> {
// A
RestTemplate restTemplate = context.getBean(RestTemplate.class);
LoadBalancerClientFactory loadBalancerClientFactory = context.getBean(LoadBalancerClientFactory.class);
return blockingHealthCheckServiceInstanceListSupplier(restTemplate, delegate, loadBalancerClientFactory);
};
// 将HealthCheckServiceInstanceListSupplier添加到creators
this.creators.add(creator);
return this;
}
// B
// 从配置文件中可以看出zone-preference配置创建出来的Supplier为:
// HealthCheckServiceInstanceListSupplier <- DiscoveryClientServiceInstanceListSupplier
public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
// 底层DiscoveryClientServiceInstanceListSupplier
ServiceInstanceListSupplier supplier = baseCreator.apply(context);
// 上层HealthCheckServiceInstanceListSupplier
for (DelegateCreator creator : creators) {
supplier = creator.apply(context, supplier);
}
// null
if (this.cachingCreator != null) {
supplier = this.cachingCreator.apply(context, supplier);
}
return supplier;
}
}
使用
根据配置看下HealthCheckServiceInstanceListSupplier的源码,整个过程有点长,可以按着序号看
public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier
implements InitializingBean, DisposableBean {
// 1、构造函数初始化参数
public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory,
BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction) {
super(delegate);
// 即spring.cloud.loadbalancer.clients.{serviceId}.healthCheck
this.healthCheck = loadBalancerClientFactory.getProperties(getServiceId()).getHealthCheck();
// 默认发送健康检查到/actuator/health
defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default", "/actuator/health");
// 判断是否alive的函数,这里为了方便分析代码把
// ServiceInstanceListSupplierBuilder.blockingHealthCheckServiceInstanceListSupplier()方法中定义的aliveFunction移到了这里
this.aliveFunction = (serviceInstance, healthCheckPath) -> Mono.defer(() -> {
// 构建uri
URI uri = UriComponentsBuilder.fromUriString(getUri(serviceInstance, healthCheckPath)).build()
.toUri();
try {
// 8、发送http请求并获取healthCheck结果
return Mono
.just(HttpStatus.OK.equals(restTemplate.getForEntity(uri, Void.class).getStatusCode()));
}
catch (Exception ignored) {
// 异常则为false
return Mono.just(false);
}
});
Repeat<Object> aliveInstancesReplayRepeat = Repeat
// 如果spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.refetchInstances开启则开启定时healthCheck
.onlyIf(repeatContext -> this.healthCheck.getRefetchInstances())
// 每隔spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.refetchInstancesInterval进行一次healthCheck,默认25秒
.fixedBackoff(healthCheck.getRefetchInstancesInterval());
Flux<List<ServiceInstance>> aliveInstancesFlux = Flux
// 4、真正的数据源。首次或者间隔refetchInstancesInterval后开始拉数据serviceInstances
.defer(delegate)
// 当aliveInstancesReplayRepeat发出值时开始拉数据,即每隔refetchInstancesInterval拉一次数据
.repeatWhen(aliveInstancesReplayRepeat)
.switchMap(serviceInstances -> healthCheckFlux(serviceInstances) // 5、数据源返回数据开始healthCheck
.map(alive -> Collections.unmodifiableList(new ArrayList<>(alive)))); // 只返回alive的serviceInstances
aliveInstancesReplay = aliveInstancesFlux
// 3、开启订阅后,spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.initial-delay后开始aliveInstancesFlux,默认0
.delaySubscription(healthCheck.getInitialDelay())
.replay(1) // 保留最近的一份历史记录
.refCount(1); // 保证healthCheck持续
}
// 注意HealthCheckServiceInstanceListSupplier实现了InitializingBean,
// 也就是说当它完成实例化后会调用afterPropertiesSet进行一定的初始化工作
@Override
public void afterPropertiesSet() {
Disposable healthCheckDisposable = this.healthCheckDisposable;
if (healthCheckDisposable != null) {
healthCheckDisposable.dispose();
}
// 2、参数都设置好后开始订阅,正式开始healthCheck
this.healthCheckDisposable = aliveInstancesReplay.subscribe();
}
protected Flux<List<ServiceInstance>> healthCheckFlux(List<ServiceInstance> instances) {
Repeat<Object> healthCheckFluxRepeat = Repeat
// spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.repeatHealthCheck,是否重复healthCheck,默认true
.onlyIf(repeatContext -> healthCheck.getRepeatHealthCheck())
// spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.interval,每interval进行一次healthCheck,默认25秒
.fixedBackoff(healthCheck.getInterval());
return Flux.defer(() -> {
// 6、首次或者间隔spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.interval开始判断isAlive
List<Mono<ServiceInstance>> checks = new ArrayList<>(instances.size());
for (ServiceInstance instance : instances) {
Mono<ServiceInstance> alive = isAlive(instance).onErrorResume(error -> {
// 异常返回空值
return Mono.empty();
}).timeout(healthCheck.getInterval(), Mono.defer(() -> {
// 超时返回空值
return Mono.empty();
})).handle((isHealthy, sink) -> {
if (isHealthy) {
sink.next(instance);
}
});
checks.add(alive);
}
List<ServiceInstance> result = new ArrayList<>();
return Flux.merge(checks).map(alive -> {
result.add(alive);
return result;
}).defaultIfEmpty(result);
}).repeatWhen(healthCheckFluxRepeat);
}
// 7、开始判断isAlive
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
// serviceInstance.getServiceId()的值分情况
// EurekaDiscoveryClient: Product微服务的eureka.instance.appname
// SimpleDiscoveryClient: spring.cloud.discovery.client.simple.instances.{serviceId}[index].service-id
// containsService = spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.path是否有serviceInstance.getServiceId()这个key
boolean containsService = healthCheck.getPath().containsKey(serviceInstance.getServiceId());
// spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.path.{serviceInstance.getServiceId()}
String healthCheckPropertyValue = healthCheck.getPath().get(serviceInstance.getServiceId());
// 如果有key但是没定义值,则默认为true,就是alive
if (containsService && !StringUtils.hasText(healthCheckPropertyValue)) {
return Mono.just(true);
}
// 如果不存在key,或者key-value都有值则进行healthCheck
String healthCheckPath = healthCheckPropertyValue != null ? healthCheckPropertyValue : defaultHealthCheckPath;
return aliveFunction.apply(updatedServiceInstance(serviceInstance), healthCheckPath);
}
private ServiceInstance updatedServiceInstance(ServiceInstance serviceInstance) {
// spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.port
Integer healthCheckPort = healthCheck.getPort();
// 如果定义了healthCheckPort并且serviceInstance是来自配置文件,则把端口更新到serviceInstance
if (serviceInstance instanceof DefaultServiceInstance && healthCheckPort != null) {
return new DefaultServiceInstance(serviceInstance.getInstanceId(), serviceInstance.getServiceId(),
serviceInstance.getHost(), healthCheckPort, serviceInstance.isSecure(),
serviceInstance.getMetadata());
}
return serviceInstance;
}
}
实践
为了能更深入理解HealthCheckServiceInstanceListSupplier的工作原理,我们举个栗子。
Product微服务
继续用前面的HelloWorld,再添加一个Controller做为healthCheck的API
@RestController
@RequestMapping("/healthCheck")
public class HealthCheckController {
private static long lastTime = 0;
@GetMapping
public boolean healthCheck(HttpServletRequest request) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long currentTime = System.currentTimeMillis();
long interval = lastTime == 0 ? 0 : (currentTime - lastTime) / 1000;
lastTime = currentTime;
System.out.println("HealthCheck for " + request.getServerPort() + " at " + sdf.format(new Date(currentTime)) + ", interval: " + interval);
return true;
}
}
User微服务
yml配置
spring:
cloud:
loadbalancer:
configurations: health-check # 这里一定要是health-check才能开启HealthCheckServiceInstanceListSupplier
clients:
product:
health-check:
initial-delay: 0s
interval: 25s # health-check间隔时间
refetch-instances: true # 开启定期更新数据源
refetch-instances-interval: 60s # 向数据源更新serviceInstances间隔时间
path:
default: /healthCheck # 即Product返回healthCheck结果的Api
repeat-health-check: true # 开启重复health-check,也就是在refetch-instances-interval内每隔interval进行一次health-check
pom.xml配置
因为loadBalance底层用到了webFlux响应式编程,所以要引入相应的依赖,不然请求发不出去,所以要添加下列依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.7.5</version>
</dependency>
重新启动Product和User微服务,浏览器输入http://localhost:2222/user
,在Product的日志输出中可以看到每隔25秒进行一次health-check,每隔60秒重新拉一次数据源并重新开始health-check。
4.2.4 RequestBasedStickySessionServiceInstanceListSupplier
yml配置
spring:
cloud:
loadbalancer:
configurations: request-based-sticky-session # 这里一定要是health-check才能开启HealthCheckServiceInstanceListSupplier
clients:
product:
sticky-session:
instance-id-cookie-name: sc-lb-instance-id
add-service-instance-cookie: false
Configuration配置
public static class BlockingSupportConfiguration {
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
// RequestBasedStickySessionConfigurationCondition即spring.cloud.loadbalancer.configurations = request-based-sticky-session
@Conditional(RequestBasedStickySessionConfigurationCondition.class)
public ServiceInstanceListSupplier requestBasedStickySessionDiscoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withBlockingDiscoveryClient() // DiscoveryClientServiceInstanceListSupplier
.withRequestBasedStickySession() // RequestBasedStickySessionServiceInstanceListSupplier
.build(context);
}
}
创建
public final class ServiceInstanceListSupplierBuilder {
public ServiceInstanceListSupplierBuilder withRequestBasedStickySession() {
DelegateCreator creator = (context, delegate) -> {
LoadBalancerClientFactory loadBalancerClientFactory = context.getBean(LoadBalancerClientFactory.class);
return new RequestBasedStickySessionServiceInstanceListSupplier(delegate, loadBalancerClientFactory);
};
// 将新创建好的RequestBasedStickySessionServiceInstanceListSupplier添加到creators
this.creators.add(creator);
return this;
}
}
使用
public class RequestBasedStickySessionServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
public RequestBasedStickySessionServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
super(delegate);
// spring.cloud.loadbalancer.clients.{serviceId}.*
// 如果spring.cloud.loadbalancer.clients中没有serviceId对应的配置就用默认的即spring.cloud.loadbalancer.*
this.properties = loadBalancerClientFactory.getProperties(getServiceId());
}
@Override
public Flux<List<ServiceInstance>> get() {
return delegate.get();
}
@SuppressWarnings("rawtypes")
@Override
public Flux<List<ServiceInstance>> get(Request request) {
// spring.cloud.loadbalancer.clients.{serviceId}.stickySession.instanceIdCookieName
String instanceIdCookieName = properties.getStickySession().getInstanceIdCookieName();
// request为LoadBalancerRequestAdapter,context默认下为RequestDataContext,详情请见前文
Object context = request.getContext();
if ((context instanceof RequestDataContext)) {
MultiValueMap<String, String> cookies = ((RequestDataContext) context).getClientRequest().getCookies();
String cookie = cookies.getFirst(instanceIdCookieName);
if (cookie != null) {
// 如果request含有key为instanceIdCookieName值的cookie,则挑选instanceId为其值的serviceInstance
return delegate.get(request).map(serviceInstances -> selectInstance(serviceInstances, cookie));
}
return delegate.get(request);
}
return delegate.get(request);
}
// 通过cookie过滤serviceInstances
private List<ServiceInstance> selectInstance(List<ServiceInstance> serviceInstances, String cookie) {
for (ServiceInstance serviceInstance : serviceInstances) {
if (cookie.equals(serviceInstance.getInstanceId())) {
return Collections.singletonList(serviceInstance);
}
}
return serviceInstances;
}
}
实践
yml配置
spring:
cloud:
loadbalancer:
configurations: request-based-sticky-session # 这里一定要是health-check才能开启HealthCheckServiceInstanceListSupplier
clients:
myStickClient:
sticky-session:
instance-id-cookie-name: sc-lb-instance-id
add-service-instance-cookie: false
User微服务
对HelloWorld中的UseController做下修改
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
RestTemplate restTemplate;
@GetMapping
public void getProduct() {
HttpHeaders headers = new HttpHeaders();
List<String> cookies = new ArrayList<>();
// 添加cookie
cookies.add("sc-lb-instance-id=localhost:product:5002");
headers.put(HttpHeaders.COOKIE, cookies);
HttpEntity<String> htpEntity = new HttpEntity<String>(headers);
ResponseEntity<String> result = restTemplate.exchange("http://product/product", HttpMethod.GET, htpEntity, String.class);
// String result = restTemplate.getForObject("http://product/product", String.class);
System.out.println(result.getBody());
}
}
重新启动User微服务,浏览器输入http://localhost:2222/user
,按下回车就可以在User微服务的日志中看到5002,因为我们选择了sc-lb-instance-id=localhost:product:5002
,多试几次依旧是5002。
4.2.5 SameInstancePreferenceServiceInstanceListSupplier
yml配置
spring:
cloud:
loadbalancer:
configurations: same-instance-preference # 这里一定要是same-instance-preference才能开启HealthCheckServiceInstanceListSupplier
Configuration配置
public static class BlockingSupportConfiguration {
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
// SameInstancePreferenceConfigurationCondition 即spring.cloud.loadbalancer.configurations = same-instance-preference
@Conditional(SameInstancePreferenceConfigurationCondition.class)
public ServiceInstanceListSupplier sameInstancePreferenceServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withBlockingDiscoveryClient() // DiscoveryClientServiceInstanceListSupplier
.withSameInstancePreference() // SameInstancePreferenceServiceInstanceListSupplier
.build(context);
}
}
创建
public final class ServiceInstanceListSupplierBuilder {
public ServiceInstanceListSupplierBuilder withSameInstancePreference() {
DelegateCreator creator = (context,
delegate) -> new SameInstancePreferenceServiceInstanceListSupplier(delegate);
// 将SameInstancePreferenceServiceInstanceListSupplier添加到creators
this.creators.add(creator);
return this;
}
}
使用
public class SameInstancePreferenceServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier
implements SelectedInstanceCallback {
private ServiceInstance previouslyReturnedInstance;
@Override
public Flux<List<ServiceInstance>> get() {
return delegate.get() // 通过DiscoveryClientServiceInstanceListSupplier获得数据源
.map(this::filteredBySameInstancePreference); // 过滤
}
// 1、首先过滤第一遍返回List<ServiceInstance>给RoundRobinLoadBalancer挑选
private List<ServiceInstance> filteredBySameInstancePreference(List<ServiceInstance> serviceInstances) {
// 如果previouslyReturnedInstance存在则直接返回previouslyReturnedInstance
if (previouslyReturnedInstance != null && serviceInstances.contains(previouslyReturnedInstance)) {
return Collections.singletonList(previouslyReturnedInstance);
}
previouslyReturnedInstance = null;
// 如果不存在返回原数据,通过RoundRobinLoadBalancer选择后再更新previouslyReturnedInstance的值,见后文
return serviceInstances;
}
// 2、RoundRobinLoadBalancer挑选第一遍后会再调用selectedServiceInstance更新previouslyReturnedInstance,详情请见RoundRobinLoadBalancer.processInstanceResponse方法
@Override
public void selectedServiceInstance(ServiceInstance serviceInstance) {
// 如果previouslyReturnedInstance为null或者和前值不同,则更新previouslyReturnedInstance值
if (previouslyReturnedInstance == null || !previouslyReturnedInstance.equals(serviceInstance)) {
previouslyReturnedInstance = serviceInstance;
}
}
}
4.2.6 RetryAwareServiceInstanceListSupplier
要使RetryAwareServiceInstanceListSupplier必须满足下面3个条件
yml配置
spring:
cloud:
loadbalancer:
retry:
enabled: true # 1、默认true, 默认开启RetryAwareServiceInstanceListSupplier
avoid-previous-instance: true # 2、默认true,必须true才能开启
retry-on-all-operations: false
max-retries-on-same-service-instance: 0
max-retries-on-next-service-instance: 1
retryable-status-codes:
- 1
backoff:
enabled: false
min-backoff: 5
max-backoff: 10 #Long.MAX_VALUE
jitter: 1
pom.xml配置
<!-- 3、加入retry依赖 -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.3.4</version>
</dependency>
重新启动后发起请求并在RoundRobinLoadBalancer打断点即可看到其被使用
接下来分析为什么会需要这三个条件
Configuration配置
@Configuration(proxyBeanMethods = false)
@ConditionalOnBlockingDiscoveryEnabled
// 1、RetryTemplate必须存在,也即要引入spring-retry依赖
@ConditionalOnClass(RetryTemplate.class)
// 2、spring.cloud.loadbalancer.retry.enabled = true
// 3、spring.cloud.loadbalancer.retry.avoid-previous-instance = true
@Conditional(BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition.class)
@AutoConfigureAfter(BlockingSupportConfiguration.class)
// 请注意,这里必须存在ServiceInstanceListSupplier的实例,也就是前面我们分析的Supplier必须要存在,默认为Cache的Supplier
@ConditionalOnBean(ServiceInstanceListSupplier.class)
public static class BlockingRetryConfiguration {
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@Primary
public ServiceInstanceListSupplier retryAwareDiscoveryClientServiceInstanceListSupplier(
ServiceInstanceListSupplier delegate) {
// delegate默认为CachingServiceInstanceListSupplier,参考前面的debug图
return new RetryAwareServiceInstanceListSupplier(delegate);
}
}
因为spring.cloud.loadbalancer.retry.enabled = true
,所以不会使用默认的拦截器LoadBalancerInterceptor
,而是RetryLoadBalancerInterceptor,先看下它的Configuration
@Configuration(proxyBeanMethods = false)
// RetryTemplate必须存在
@ConditionalOnClass(RetryTemplate.class)
// 默认为LoadBalancerClientFactory
@ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
// 默认true
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", matchIfMissing = true)
public static class RetryInterceptorAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RetryLoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory, LoadBalancedRetryFactory loadBalancedRetryFactory,
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
return new RetryLoadBalancerInterceptor(loadBalancerClient, requestFactory, loadBalancedRetryFactory,
loadBalancerFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
// 将RetryLoadBalancerInterceptor添加到restTemplate中进行拦截
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
RetryLoadBalancerInterceptor和之前默认的LoadBalancerInterceptor拦截过程不一样,因此着重分析下,下面的源码为了简便分析经过了处理
public class RetryLoadBalancerInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
// 从URL中获取hostname作为serviceId
final String serviceName = originalUri.getHost();
// 1、创建BlockingLoadBalancedRetryPolicy
final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName, loadBalancer);
// 2、创建RetryTemplate
RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
// 3、template.execute
return template.execute(context -> {
// 3.1、此处即retryCallback.doWithRetry(context)的内容
......
}, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
// 3.2、此处即recoveryCallback.recover(context)的内容
......
});
}
}
在通过流程图看下RetryLoadBalancerInterceptor的intercept过程
接下来我们按着步骤分析RetryLoadBalancerInterceptor.intercept的过程
LoadBalancedRetryPolicy
先看下**lbRetryFactory.createRetryPolicy(serviceName, loadBalancer)**的过程
public class BlockingLoadBalancedRetryFactory implements LoadBalancedRetryFactory {
@Override
public LoadBalancedRetryPolicy createRetryPolicy(String serviceId, ServiceInstanceChooser serviceInstanceChooser) {
// 根据serviceId的配置创建BlockingLoadBalancedRetryPolicy
return new BlockingLoadBalancedRetryPolicy(loadBalancerFactory.getProperties(serviceId));
}
}
再看下BlockingLoadBalancedRetryPolicy的类图,分析下它的功能
从它的方法来看主要是提供retry的判断,看下其详细代码
public class BlockingLoadBalancedRetryPolicy implements LoadBalancedRetryPolicy {
// 满足以下任一条件可以retry:
// 1、request的method为GET
// 2、spring.cloud.loadbalancer.clients.{serviceId}.retry.retryOnAllOperations = true
public boolean canRetry(LoadBalancedRetryContext context) {
HttpMethod method = context.getRequest().getMethod();
return HttpMethod.GET.equals(method) || properties.getRetry().isRetryOnAllOperations();
}
// spring.cloud.loadbalancer.clients.{serviceId}.retry.maxRetriesOnSameServiceInstance,默认0
@Override
public boolean canRetrySameServer(LoadBalancedRetryContext context) {
return sameServerCount < properties.getRetry().getMaxRetriesOnSameServiceInstance() && canRetry(context);
}
// spring.cloud.loadbalancer.clients.{serviceId}.retry.maxRetriesOnNextServiceInstance,默认1
@Override
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
return nextServerCount <= properties.getRetry().getMaxRetriesOnNextServiceInstance() && canRetry(context);
}
@Override
public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
if (!canRetrySameServer(context) && canRetry(context)) {
// SameServer重试次数耗尽,使用NextServer的重试次数
sameServerCount = 0;
nextServerCount++;
if (!canRetryNextServer(context)) {
// NextServer的重试次数也耗尽,设置不能重试
context.setExhaustedOnly();
}
else {
context.setServiceInstance(null);
}
}
else {
// 首先使用SameServer的重试次数
sameServerCount++;
}
}
// spring.cloud.loadbalancer.clients.{serviceId}.retry.retryableStatusCodes,默认控制。
// 这里的statusCode即400、404这种值
@Override
public boolean retryableStatusCode(int statusCode) {
return properties.getRetry().getRetryableStatusCodes().contains(statusCode);
}
}
RetryTemplate
首先看下它的类图
再看下其创建的过程
private RetryTemplate createRetryTemplate(String serviceName, HttpRequest request,
LoadBalancedRetryPolicy retryPolicy) {
RetryTemplate template = new RetryTemplate();
// 默认NoBackOffPolicy
BackOffPolicy backOffPolicy = lbRetryFactory.createBackOffPolicy(serviceName);
template.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
template.setThrowLastExceptionOnExhausted(true);
// 默认RetryListener[0]
RetryListener[] retryListeners = lbRetryFactory.createRetryListeners(serviceName);
if (retryListeners != null && retryListeners.length != 0) {
template.setListeners(retryListeners);
}
// 默认InterceptorRetryPolicy
template.setRetryPolicy(
// spring.cloud.loadbalancer.clients.{serviceId}.retry.enabled
!loadBalancerFactory.getProperties(serviceName).getRetry().isEnabled() || retryPolicy == null
? new NeverRetryPolicy()
: new InterceptorRetryPolicy(request, retryPolicy, loadBalancer, serviceName));
return template;
}
RetryTemplate.execute()
整体捋一遍源码中是怎么运行途中的过程的,这里为了方便只留下主体部分的代码
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
RetryPolicy retryPolicy = this.retryPolicy; // InterceptorRetryPolicy
BackOffPolicy backOffPolicy = this.backOffPolicy; // NoBackOffPolicy
// 1、创建LoadBalancedRetryContext
RetryContext context = open(retryPolicy, state);
try {
// 2、判断是否重试,如果是首次请求或者BlockingLoadBalancedRetryPolicy.canRetryNextServer
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
lastException = null;
// 3、获取serviceInstance逻辑
return retryCallback.doWithRetry(context);
}
catch (Throwable e) {
// 4、出现异常,更新BlockingLoadBalancedRetryPolicy.registerThrowable方法
registerThrowable(retryPolicy, state, context, e);
// 5、如果还能重试,执行backOffPolicy,本过程用的是NoBackOffPolicy,不做任何事
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
backOffPolicy.backOff(backOffContext);
}
}
}
// 6、重试次数耗尽也没能获得成功,执行recoveryCallback返回结果
return handleRetryExhausted(recoveryCallback, context, state);
}
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null || exhausted);
doCloseInterceptors(retryCallback, context, lastException);
RetrySynchronizationManager.clear();
}
}
这里要着重分析的是第3步的retryCallback.doWithRetry(context),按照惯例删除掉不影响主体的代码
public class RetryLoadBalancerInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(...) throws IOException {
......
return template.execute(context -> {
ServiceInstance serviceInstance = null;
// 1、先从LoadBalancedRetryContext中获取serviceInstance
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
serviceInstance = lbContext.getServiceInstance();
}
String hint = getHint(serviceName);
if (serviceInstance == null) {
// 2、首次获取serviceInstance,走类似LoadBalancerInterceptor的过程获取serviceInstance
ServiceInstance previousServiceInstance = null;
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
previousServiceInstance = lbContext.getPreviousServiceInstance();
}
// 3、将httprequest包装成DefaultRequest
DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>(
new RetryableRequestContext(previousServiceInstance, new RequestData(request), hint));
// 4、通过BlockingLoadBalancerClient.choose(...)获取serviceInstance,过程不再重复,
// 只需要知道中间需要用到Supplier,而本过程中用到了RetryAwareServiceInstanceListSupplier
serviceInstance = loadBalancer.choose(serviceName, lbRequest);
// 5、将serviceInstance保存到LoadBalancedRetryContext中,更新previousServiceInstance
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
lbContext.setServiceInstance(serviceInstance);
}
Response<ServiceInstance> lbResponse = new DefaultResponse(serviceInstance);
}
// 6、包装成LoadBalancerRequestAdapter
LoadBalancerRequestAdapter<ClientHttpResponse, RetryableRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(
requestFactory.createRequest(request, body, execution),
new RetryableRequestContext(null, new RequestData(request), hint));
ServiceInstance finalServiceInstance = serviceInstance;
// 7、获取结果
ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute(serviceName,
finalServiceInstance, lbRequest);
int statusCode = response.getRawStatusCode();
// 8、如果是因为http请求返回异常结果,且statusCode在定义的retryable-status-codes中,则抛出异常进行重试
// 这也就是为什么会有第1步,第5步保存serviceInstance后再次retryCallback.doWithRetry(context)时不用走第2/3/4/5步
if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
response.close();
throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);
}
return response;
}, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
@Override
protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
return response;
}
});
}
}
创建
上一步介绍完RetryLoadBalancerInterceptor后就到了与之配对的RetryAwareServiceInstanceListSupplier
public class RetryAwareServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
@Override
public Flux<List<ServiceInstance>> get(Request request) {
// 如果context不是RetryableRequestContext则直接走后续的CachingServiceInstanceListSupplier
// retryCallback.doWithRetry(context)中第3步已包装成RetryableRequestContext
if (!(request.getContext() instanceof RetryableRequestContext)) {
return delegate.get(request);
}
RetryableRequestContext context = (RetryableRequestContext) request.getContext();
ServiceInstance previousServiceInstance = context.getPreviousServiceInstance();
// previousServiceInstance为空则重新拉取List<ServiceInstance>
if (previousServiceInstance == null) {
return delegate.get(request);
}
// 剔除previousServiceInstance,即重试的话不要使用上一次失败的ServiceInstance
return delegate.get(request).map(instances -> filteredByPreviousInstance(instances, previousServiceInstance));
}
private List<ServiceInstance> filteredByPreviousInstance(List<ServiceInstance> instances,
ServiceInstance previousServiceInstance) {·
List<ServiceInstance> filteredInstances = new ArrayList<>(instances);
// 把previousServiceInstance从instances中移除
if (previousServiceInstance != null) {
filteredInstances.remove(previousServiceInstance);
}
if (filteredInstances.size() > 0) {
return filteredInstances;
}
return instances;
}
@Override
public Flux<List<ServiceInstance>> get() {
return delegate.get();
}
}
关于服务调用的LoadBalance就介绍到此,从HelloWorld中每次请求其他服务的时候都需要用restTemplate来调用,这在开发中很不灵活方便,SpringCloud提供了声明式调用–OpenFeign。接下来开始分析OpenFeign。