【SpringCloud负载均衡】【源码+图解】【四】负载均衡的实现

news2024/11/28 23:27:41

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

目录

  • 4. 负载均衡
    • 4.1 提供者DiscoveryClient
      • 4.1.1 CompositeDiscoveryClient
      • 4.1.2 EurekaDiscoveryClient
      • 4.1.3 SimpleDiscoveryClient
      • 4.1.4 自定义DiscoveryClient
    • 4.2 过滤器Supplier
      • 4.2.1 CachingServiceInstanceListSupplier
        • yml配置
        • Configuration配置
        • 创建
        • 使用
      • 4.2.2 ZonePreferenceServiceInstanceListSupplier
        • yml配置
        • Configuration配置
        • 创建
        • 使用
      • 4.2.3 HealthCheckServiceInstanceListSupplier
        • yml配置
        • Configuration配置
        • 创建
        • 使用
        • 实践
      • 4.2.4 RequestBasedStickySessionServiceInstanceListSupplier
        • yml配置
        • Configuration配置
        • 创建
        • 使用
        • 实践
      • 4.2.5 SameInstancePreferenceServiceInstanceListSupplier
        • yml配置
        • Configuration配置
        • 创建
        • 使用
      • 4.2.6 RetryAwareServiceInstanceListSupplier
        • yml配置
        • Configuration配置
          • LoadBalancedRetryPolicy
          • RetryTemplate
          • RetryTemplate.execute()
        • 创建

4. 负载均衡

loadBalancer采用了webFlux响应式编程,只有在真正choose的时候才会去拉取数据,所以它的顺序是一层一层往下拉取真正的数据,然后再一层一层向上筛选过滤数据实现负载均衡,接下来我们分析下这些Supplier

在这里插入图片描述

4.1 提供者DiscoveryClient

对于DiscoveryClientServiceInstanceListSupplier它不会单独使用,而是作为其他Supplier的delegate,也就是兜底用的,所以它的作用至关重要。先看下它的构造函数

public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
    private Duration timeout = Duration.ofSeconds(30);

	private final String serviceId;

	private final Flux<List<ServiceInstance>> serviceInstances;

    // delegate默认是DiscoveryClient的实现类CompositeDiscoveryClient,见后文的分析
	public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
		this.serviceId = environment.getProperty(PROPERTY_NAME);
        // timeout为配置中的spring.cloud.loadbalancer.service-discovery.timeout
		resolveTimeout(environment);
        // 这里采用了响应式编程,这里仅仅是定义,当调用get()函数获取的时候才会真正的拿数据
		this.serviceInstances = Flux.defer(
            		// 调用delegate,即CompositeDiscoveryClient获取实例信息
            	() -> Mono.fromCallable(() -> delegate.getInstances(serviceId)))
				.timeout(timeout, Flux.defer(() -> {
                    // 超时返回空值
					logTimeout();
					return Flux.just(new ArrayList<>());
				}), Schedulers.boundedElastic()).onErrorResume(error -> {
            		// 异常返回空值
					logException(error);
					return Flux.just(new ArrayList<>());
				});
	}
}

DiscoveryClientServiceInstanceListSupplier最重要的是DiscoveryClient,它才是真正获取ServiceInstance的类,先看下DiscoveryClient的类图

在这里插入图片描述

DiscoveryClient提供了两个方法,最重要的是getInstances,默认的实现是CompositeDiscoveryClient,它的成员参数discoveryClients默认下是EurekaDiscoveryClientSimpleDiscoveryClient,除了这两个也可以自定义。接下来我们着重分下三个client的配置和getInstances方法

4.1.1 CompositeDiscoveryClient

public class CompositeDiscoveryClientAutoConfiguration {
	@Bean
    // 声明默认情况使用CompositeDiscoveryClient
	@Primary
	public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
		return new CompositeDiscoveryClient(discoveryClients);
	}

}
public class CompositeDiscoveryClient implements DiscoveryClient {
    @Override
	public List<ServiceInstance> getInstances(String serviceId) {
		if (this.discoveryClients != null) {
            // 默认情况下EurekaDiscoveryClient,SimpleDiscoveryClient
            // 如果要使用SimpleDiscoveryClient,可以使用以下两种方式:
            //		第一eureka.client.enabled为false,这样就不会注入EurekaDiscoveryClient类
            //		第二eureka.client.fetch-registry为false,这样就不会获取registry信息,那么EurekaDiscoveryClient拿到的值为空
			for (DiscoveryClient discoveryClient : this.discoveryClients) {
				List<ServiceInstance> instances = discoveryClient.getInstances(serviceId);
				if (instances != null && !instances.isEmpty()) {
                    // 一旦获得ServiceInstance就立即返回,后面的client就废弃了
					return instances;
				}
			}
		}
		return Collections.emptyList();
	}
}

4.1.2 EurekaDiscoveryClient

// eureka.client.enabled为true才开启,默认下为true
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
public class EurekaDiscoveryClientConfiguration {
    @Bean
	@ConditionalOnMissingBean
	public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
		return new EurekaDiscoveryClient(client, clientConfig);
	}
}
public class EurekaDiscoveryClient implements DiscoveryClient {
    // eurekaClient即Eureka中的DiscoveryClient类
	private final EurekaClient eurekaClient;
	// eureka.client.*的配置
	private final EurekaClientConfig clientConfig;
    
    @Override
	public List<ServiceInstance> getInstances(String serviceId) {
        // 调用DiscoveryClient.getInstancesByVipAddress,这里拿到的instance即DiscoveryClient中refreshRegistry方法定时更新的实例列表
		List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false);
		List<ServiceInstance> instances = new ArrayList<>();
		for (InstanceInfo info : infos) {
            // 封装成EurekaServiceInstance
			instances.add(new EurekaServiceInstance(info));
		}
		return instances;
	}
}
    

4.1.3 SimpleDiscoveryClient

public class SimpleDiscoveryClientAutoConfiguration implements ApplicationListener<WebServerInitializedEvent> {
    // spring.cloud.discovery.client.simple.*
    private SimpleDiscoveryProperties simple = new SimpleDiscoveryProperties();
    @Bean
	@Order
	public DiscoveryClient simpleDiscoveryClient(SimpleDiscoveryProperties properties) {
		return new SimpleDiscoveryClient(properties);
	}
}
public class SimpleDiscoveryClient implements DiscoveryClient {
    @Override
	public List<ServiceInstance> getInstances(String serviceId) {
		List<ServiceInstance> serviceInstances = new ArrayList<>();
        // 这里返回的instances为yml文件中的spring.cloud.discovery.client.simple.instances的配置
		List<DefaultServiceInstance> serviceInstanceForService = this.simpleDiscoveryProperties.getInstances()
				.get(serviceId);
		if (serviceInstanceForService != null) {
			serviceInstances.addAll(serviceInstanceForService);
		}
		return serviceInstances;
	}
}

看下spring.cloud.discovery.client.simple的配置例子

spring:
  application:
    name: user
  cloud:
    discovery:
      client:
        simple:
          instances:
            product:
              - instance-id: localhost:product:5001
                service-id: product
                host: localhost
                port: 5001
                secure: false
                uri:
                  host: localhost
                  port: 5001
                metadata:
                  my: my
              - instance-id: localhost:product:5002
                service-id: product
                host: localhost
                port: 5002
                secure: false
                uri:
                  host: localhost
                  port: 5002
                metadata:
                  my: my

4.1.4 自定义DiscoveryClient

// 1、注入Spring容器
@Component
// 2、实现DiscoveryClient
public class MyDiscoveryClient implements DiscoveryClient{

    @Override
    public String description() {
        return null;
    }

    @Override
    public List<ServiceInstance> getInstances(String serviceId) {
        return null;
    }

    @Override
    public List<String> getServices() {
        return null;
    }

}

4.2 过滤器Supplier

先看下ServiceInstanceListSupplier类图

在这里插入图片描述

顶层接口为Supplier,唯一的方法为get(),它的作用就是提供服务,loadBalance中提供List<ServiceInstance>,接下来我们逐一分析每一个Supplier的使用

4.2.1 CachingServiceInstanceListSupplier

yml配置

spring:
  cloud:
    loadbalancer:
      configurations: default # 默认值,即默认使用CachingServiceInstanceListSupplier
      cache:
        ttl: 35 # 缓存过期时间
        capacity: 256 # 缓存最大容量
        caffeine:
          spec: initialCapacity=500,expireAfterWrite=5s # CaffeineBasedLoadBalancerCacheManager的相关配置

Configuration配置

public static class BlockingSupportConfiguration {
		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
    	// DefaultConfigurationCondition即spring.cloud.loadbalancer.configurations = default,默认值
		@Conditional(DefaultConfigurationCondition.class) 
		public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder() // 返回ServiceInstanceListSupplierBuilder
                .withBlockingDiscoveryClient() // A DiscoveryClientServiceInstanceListSupplier
                .withCaching() // B CachingServiceInstanceListSupplier
                .build(context); // C
		}
}

创建

public final class ServiceInstanceListSupplierBuilder {
    private Creator baseCreator;

	private DelegateCreator cachingCreator;
    
    public ServiceInstanceListSupplierBuilder withBlockingDiscoveryClient() {
		this.baseCreator = context -> {
            // A 
            // 注意DiscoveryClient是接口,不是eureka那个DiscoveryClient类
			DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
            // 创建DiscoveryClientServiceInstanceListSupplier,真正提供实例的Supplier
			return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
		};
		return this;
	}
    
    public ServiceInstanceListSupplierBuilder withCaching() {
		this.cachingCreator = (context, delegate) -> {
            // B, 注意这里用到的缓存管理器为LoadBalancerCacheManager,后面会详细分析
			ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
					.getBeanProvider(LoadBalancerCacheManager.class);
			if (cacheManagerProvider.getIfAvailable() != null) {
                // 创建CachingServiceInstanceListSupplier
				return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable());
			}
			return delegate;
		};
		return this;
	}
    
    // C
    public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
        // apply方法即调用A处的代码获的DiscoveryClientServiceInstanceListSupplier
		ServiceInstanceListSupplier supplier = baseCreator.apply(context);
        ......
		if (this.cachingCreator != null) {
            // apply方法即调用B处的代码,将DiscoveryClientServiceInstanceListSupplier作为delegate
            // 创建CachingServiceInstanceListSupplier
			supplier = this.cachingCreator.apply(context, supplier);
		}
		return supplier;
	}
}

使用

public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
    private final Flux<List<ServiceInstance>> serviceInstances;

	@SuppressWarnings("unchecked")
	public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
		super(delegate);
		this.serviceInstances = CacheFlux.lookup(key -> {
            // 获取缓存容器,缓存的管理使用DefaultLoadBalancerCacheManager,见后文分析
			Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
            // 根据serviceId获取List<ServiceInstance>
			List<ServiceInstance> list = cache.get(key, List.class);
			return Flux.just(list).materialize().collectList(); // 封装成Flux<List<ServiceInstance>>
		}, delegate.getServiceId()).
            // 如果缓存为空则调用deletdate(即DiscoveryClientServiceInstanceListSupplier)获取
            onCacheMissResume(delegate.get().take(1))
				.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
                    // 从DiscoveryClientServiceInstanceListSupplier获取后更新缓存
					Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
					....
						cache.put(key, instances);
				}).then());
	}
    
    // 实现get()
    @Override
	public Flux<List<ServiceInstance>> get() {
		return serviceInstances;
	}
}

CachingServiceInstanceListSupplier用到了CacheManager作为缓存管理器,先看下其类图

在这里插入图片描述

默认情况下使用的是DefaultLoadBalancerCacheManager,如果要使用CaffeineBasedLoadBalancerCacheManager需要引入下面的依赖

<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>2.9.2</version>
</dependency>

分析下默认的DefaultLoadBalancerCacheManager,下面是它的结构图
在这里插入图片描述

再看下其构造函数,了解它的工作原理

public class DefaultLoadBalancerCacheManager implements LoadBalancerCacheManager {
    private final ConcurrentMap<String, Cache> cacheMap = new ConcurrentHashMap<>(16);

    public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties,
            String... cacheNames) {
        // 2、创建默认的cache
        cacheMap.putAll(createCaches(cacheNames, loadBalancerCacheProperties).stream()
                .collect(Collectors.toMap(DefaultLoadBalancerCache::getName, cache -> cache)));
    }

    public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties) {
        // 1、 默认下只创建一个SERVICE_INSTANCE_CACHE_NAME的cache
        this(loadBalancerCacheProperties, SERVICE_INSTANCE_CACHE_NAME);
    }

    private Set<DefaultLoadBalancerCache> createCaches(String[] cacheNames,
            LoadBalancerCacheProperties loadBalancerCacheProperties) {
        return Arrays.stream(cacheNames).distinct()
                .map(name -> 
                     // 3、创建SERVICE_INSTANCE_CACHE_NAME的cache
                     new DefaultLoadBalancerCache(name,
                        // 定义serviceId -> List<ServiceInstance>的map
                        new ConcurrentHashMapWithTimedEviction<>(
                            // spring.cloud.loadbalancer.cache.capacity,默认256
                            loadBalancerCacheProperties.getCapacity(),
                            // 用来定期更新缓存,即删除过期缓存
                            new DelayedTaskEvictionScheduler<>(aScheduledDaemonThreadExecutor())
                                                                ),
                        // 缓存过期时间spring.cloud.loadbalancer.cache.ttl,默认35秒
                        loadBalancerCacheProperties.getTtl().toMillis(), 
                        false))
                .collect(Collectors.toSet());
    }
}

4.2.2 ZonePreferenceServiceInstanceListSupplier

yml配置

spring:
  cloud:
    loadbalancer:
      configurations: zone-preference # 开启**ZonePreferenceServiceInstanceListSupplier**
      zone: myZone # 偏好这个zone的serviceInstance

Configuration配置

	public static class BlockingSupportConfiguration {
        @Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
        // ZonePreferenceConfigurationCondition即spring.cloud.loadbalancer.configurations = zone-preference
		@Conditional(ZonePreferenceConfigurationCondition.class)
		public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder()
                .withBlockingDiscoveryClient() // DiscoveryClientServiceInstanceListSupplier
                .withZonePreference() // A
                .withCaching(). // CachingServiceInstanceListSupplier
                build(context); // B
		}
    }

创建

public final class ServiceInstanceListSupplierBuilder {
    public ServiceInstanceListSupplierBuilder withZonePreference() {
		DelegateCreator creator = (context, delegate) -> {
            // A, 创建ZonePreferenceServiceInstanceListSupplier
			LoadBalancerZoneConfig zoneConfig = context.getBean(LoadBalancerZoneConfig.class);
			return new ZonePreferenceServiceInstanceListSupplier(delegate, zoneConfig);
		};
        // 添加到creators
		this.creators.add(creator);
		return this;
	}
    
    // B
    // 从配置文件中可以看出zone-preference配置创建出来的Supplier为:
    // CachingServiceInstanceListSupplier <- ZonePreferenceServiceInstanceListSupplier <- DiscoveryClientServiceInstanceListSupplier
    // 也就是默认的Supplier添加zone过滤器
    public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
        // 底部DiscoveryClientServiceInstanceListSupplier
		ServiceInstanceListSupplier supplier = baseCreator.apply(context);

        // 中间ZonePreferenceServiceInstanceListSupplier
		for (DelegateCreator creator : creators) {
			supplier = creator.apply(context, supplier);
		}

        // 上层CachingServiceInstanceListSupplier
		if (this.cachingCreator != null) {
			supplier = this.cachingCreator.apply(context, supplier);
		}
		return supplier;
	}
}

使用

public class ZonePreferenceServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
	@Override
	public Flux<List<ServiceInstance>> get() {
        // 将DiscoveryClientServiceInstanceListSupplier获取到的List<ServiceInstance>过滤
		return getDelegate().get().map(this::filteredByZone);
	}

	private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
		if (zone == null) {
            // zoneConfig为LoadBalancerZoneConfig,即spring.cloud.loadbalancer.zone的值
			zone = zoneConfig.getZone();
		}
        // 判断是否配置spring.cloud.loadbalancer.zone的值,即保留这个zone的serviceInstance
		if (zone != null) {
			List<ServiceInstance> filteredInstances = new ArrayList<>();
			for (ServiceInstance serviceInstance : serviceInstances) {
				String instanceZone = getZone(serviceInstance);
				if (zone.equalsIgnoreCase(instanceZone)) {
					filteredInstances.add(serviceInstance);
				}
			}
			if (filteredInstances.size() > 0) {
                // 如果有则返回符合条件的
				return filteredInstances;
			}
		}
        // 如果filteredInstances为空则返回原值,确保不为空
		return serviceInstances;
	}

	private String getZone(ServiceInstance serviceInstance) {
        // 如果是从EurekaDiscoveryClient来的,则是其他微服务实例配置文件中的eureka.instance.metadata
        // 如果是从SimpleDiscoveryClient来的,则是自身配置文件中的spring.cloud.loadbalancer.client.simple.instances.{serviceId}[index].metadata
		Map<String, String> metadata = serviceInstance.getMetadata();
		if (metadata != null) {
			return metadata.get(ZONE);
		}
		return null;
	}

}

4.2.3 HealthCheckServiceInstanceListSupplier

yml配置

spring:
  cloud:
    loadbalancer:
      configurations: health-check # 这里一定要是health-check才能开启HealthCheckServiceInstanceListSupplier
      clients:
        product: 
          health-check:
            initial-delay: 0 
            interval: 25
            refetch-instances: false
            refetch-instances-interval: 25
            path:
              default: /actuator/health 
            port: 0
            repeat-health-check: true

Configuration配置

	public static class BlockingSupportConfiguration {
        @Bean
		@ConditionalOnBean({ DiscoveryClient.class, RestTemplate.class })
		@ConditionalOnMissingBean
        // HealthCheckConfigurationCondition即spring.cloud.loadbalancer.configurations = health-check
		@Conditional(HealthCheckConfigurationCondition.class)
		public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder()
                .withBlockingDiscoveryClient() // DiscoveryClientServiceInstanceListSupplier
                .withBlockingHealthChecks() // A、HealthCheckServiceInstanceListSupplier
				.build(context); // B
		}
    }

创建

public final class ServiceInstanceListSupplierBuilder {
    public ServiceInstanceListSupplierBuilder withBlockingHealthChecks() {
		DelegateCreator creator = (context, delegate) -> {
            // A
			RestTemplate restTemplate = context.getBean(RestTemplate.class);
			LoadBalancerClientFactory loadBalancerClientFactory = context.getBean(LoadBalancerClientFactory.class);
			return blockingHealthCheckServiceInstanceListSupplier(restTemplate, delegate, loadBalancerClientFactory);
		};
        // 将HealthCheckServiceInstanceListSupplier添加到creators
		this.creators.add(creator);
		return this;
	}
    
    // B
    // 从配置文件中可以看出zone-preference配置创建出来的Supplier为:
    // HealthCheckServiceInstanceListSupplier <- DiscoveryClientServiceInstanceListSupplier
    public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
        // 底层DiscoveryClientServiceInstanceListSupplier
		ServiceInstanceListSupplier supplier = baseCreator.apply(context);

        // 上层HealthCheckServiceInstanceListSupplier
		for (DelegateCreator creator : creators) {
			supplier = creator.apply(context, supplier);
		}

        // null
		if (this.cachingCreator != null) {
			supplier = this.cachingCreator.apply(context, supplier);
		}
		return supplier;
	}
}

使用

根据配置看下HealthCheckServiceInstanceListSupplier的源码,整个过程有点长,可以按着序号看

public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier
		implements InitializingBean, DisposableBean {
    // 1、构造函数初始化参数
    public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
			ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory,
			BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction) {
		super(delegate);
        // 即spring.cloud.loadbalancer.clients.{serviceId}.healthCheck
		this.healthCheck = loadBalancerClientFactory.getProperties(getServiceId()).getHealthCheck();
        // 默认发送健康检查到/actuator/health
		defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default", "/actuator/health");
        // 判断是否alive的函数,这里为了方便分析代码把
        // ServiceInstanceListSupplierBuilder.blockingHealthCheckServiceInstanceListSupplier()方法中定义的aliveFunction移到了这里
		this.aliveFunction = (serviceInstance, healthCheckPath) -> Mono.defer(() -> {
            // 构建uri
					URI uri = UriComponentsBuilder.fromUriString(getUri(serviceInstance, healthCheckPath)).build()
							.toUri();
					try {
                        // 8、发送http请求并获取healthCheck结果
						return Mono
								.just(HttpStatus.OK.equals(restTemplate.getForEntity(uri, Void.class).getStatusCode()));
					}
					catch (Exception ignored) {
                        // 异常则为false
						return Mono.just(false);
					}
				});
		Repeat<Object> aliveInstancesReplayRepeat = Repeat
            // 如果spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.refetchInstances开启则开启定时healthCheck
				.onlyIf(repeatContext -> this.healthCheck.getRefetchInstances())
            // 每隔spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.refetchInstancesInterval进行一次healthCheck,默认25秒
				.fixedBackoff(healthCheck.getRefetchInstancesInterval());
		Flux<List<ServiceInstance>> aliveInstancesFlux = Flux
            // 4、真正的数据源。首次或者间隔refetchInstancesInterval后开始拉数据serviceInstances
            .defer(delegate)
            // 当aliveInstancesReplayRepeat发出值时开始拉数据,即每隔refetchInstancesInterval拉一次数据
            .repeatWhen(aliveInstancesReplayRepeat)
			.switchMap(serviceInstances -> healthCheckFlux(serviceInstances) // 5、数据源返回数据开始healthCheck
						.map(alive -> Collections.unmodifiableList(new ArrayList<>(alive)))); // 只返回alive的serviceInstances
		aliveInstancesReplay = aliveInstancesFlux
            // 3、开启订阅后,spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.initial-delay后开始aliveInstancesFlux,默认0
            .delaySubscription(healthCheck.getInitialDelay()) 
            .replay(1) // 保留最近的一份历史记录
			.refCount(1); // 保证healthCheck持续
	}
    
    
    // 注意HealthCheckServiceInstanceListSupplier实现了InitializingBean,
    // 也就是说当它完成实例化后会调用afterPropertiesSet进行一定的初始化工作
    @Override
	public void afterPropertiesSet() {
		Disposable healthCheckDisposable = this.healthCheckDisposable;
		if (healthCheckDisposable != null) {
			healthCheckDisposable.dispose();
		}
        // 2、参数都设置好后开始订阅,正式开始healthCheck
		this.healthCheckDisposable = aliveInstancesReplay.subscribe();
	}
    
    protected Flux<List<ServiceInstance>> healthCheckFlux(List<ServiceInstance> instances) {
		Repeat<Object> healthCheckFluxRepeat = Repeat
            // spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.repeatHealthCheck,是否重复healthCheck,默认true
            .onlyIf(repeatContext -> healthCheck.getRepeatHealthCheck())
            // spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.interval,每interval进行一次healthCheck,默认25秒
            .fixedBackoff(healthCheck.getInterval());
		return Flux.defer(() -> {
            // 6、首次或者间隔spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.interval开始判断isAlive
			List<Mono<ServiceInstance>> checks = new ArrayList<>(instances.size());
			for (ServiceInstance instance : instances) {
				Mono<ServiceInstance> alive = isAlive(instance).onErrorResume(error -> {
                    // 异常返回空值
					return Mono.empty();
				}).timeout(healthCheck.getInterval(), Mono.defer(() -> {
                    // 超时返回空值
					return Mono.empty();
				})).handle((isHealthy, sink) -> {
					if (isHealthy) {
						sink.next(instance);
					}
				});

				checks.add(alive);
			}
			List<ServiceInstance> result = new ArrayList<>();
			return Flux.merge(checks).map(alive -> {
				result.add(alive);
				return result;
			}).defaultIfEmpty(result);
		}).repeatWhen(healthCheckFluxRepeat);
	}
    
    // 7、开始判断isAlive
    protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
        // serviceInstance.getServiceId()的值分情况
        // 		EurekaDiscoveryClient: Product微服务的eureka.instance.appname
        // 		SimpleDiscoveryClient: spring.cloud.discovery.client.simple.instances.{serviceId}[index].service-id
        // containsService = spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.path是否有serviceInstance.getServiceId()这个key
		boolean containsService = healthCheck.getPath().containsKey(serviceInstance.getServiceId());
        // spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.path.{serviceInstance.getServiceId()}
		String healthCheckPropertyValue = healthCheck.getPath().get(serviceInstance.getServiceId());
        // 如果有key但是没定义值,则默认为true,就是alive
		if (containsService && !StringUtils.hasText(healthCheckPropertyValue)) {
			return Mono.just(true);
		}
        // 如果不存在key,或者key-value都有值则进行healthCheck
		String healthCheckPath = healthCheckPropertyValue != null ? healthCheckPropertyValue : defaultHealthCheckPath;
		return aliveFunction.apply(updatedServiceInstance(serviceInstance), healthCheckPath);
	}
    
    private ServiceInstance updatedServiceInstance(ServiceInstance serviceInstance) {
        // spring.cloud.loadbalancer.clients.{serviceId}.healthCheck.port
		Integer healthCheckPort = healthCheck.getPort();
        // 如果定义了healthCheckPort并且serviceInstance是来自配置文件,则把端口更新到serviceInstance
		if (serviceInstance instanceof DefaultServiceInstance && healthCheckPort != null) {
			return new DefaultServiceInstance(serviceInstance.getInstanceId(), serviceInstance.getServiceId(),
					serviceInstance.getHost(), healthCheckPort, serviceInstance.isSecure(),
					serviceInstance.getMetadata());
		}
		return serviceInstance;
	}
}

实践

为了能更深入理解HealthCheckServiceInstanceListSupplier的工作原理,我们举个栗子。

Product微服务

继续用前面的HelloWorld,再添加一个Controller做为healthCheck的API

@RestController
@RequestMapping("/healthCheck")
public class HealthCheckController {
    
    private static long lastTime = 0;

    @GetMapping
    public boolean healthCheck(HttpServletRequest request) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long currentTime = System.currentTimeMillis();
        long interval = lastTime == 0 ? 0 : (currentTime - lastTime) / 1000;
        lastTime = currentTime;
        System.out.println("HealthCheck for " + request.getServerPort() + " at " + sdf.format(new Date(currentTime)) + ", interval: " + interval);
        return true;
    }
}

User微服务

yml配置

spring:
  cloud:
    loadbalancer:
      configurations: health-check # 这里一定要是health-check才能开启HealthCheckServiceInstanceListSupplier
      clients:
        product: 
          health-check:
            initial-delay: 0s
            interval: 25s  # health-check间隔时间
            refetch-instances: true # 开启定期更新数据源
            refetch-instances-interval: 60s # 向数据源更新serviceInstances间隔时间
            path:
              default: /healthCheck # 即Product返回healthCheck结果的Api
            repeat-health-check: true # 开启重复health-check,也就是在refetch-instances-interval内每隔interval进行一次health-check

pom.xml配置

因为loadBalance底层用到了webFlux响应式编程,所以要引入相应的依赖,不然请求发不出去,所以要添加下列依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>2.7.5</version>
</dependency>

重新启动Product和User微服务,浏览器输入http://localhost:2222/user,在Product的日志输出中可以看到每隔25秒进行一次health-check,每隔60秒重新拉一次数据源并重新开始health-check。

4.2.4 RequestBasedStickySessionServiceInstanceListSupplier

yml配置

spring:
  cloud:
    loadbalancer:
      configurations: request-based-sticky-session # 这里一定要是health-check才能开启HealthCheckServiceInstanceListSupplier
      clients:
        product: 
          sticky-session:
            instance-id-cookie-name: sc-lb-instance-id
            add-service-instance-cookie: false

Configuration配置

	public static class BlockingSupportConfiguration {
        @Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
        // RequestBasedStickySessionConfigurationCondition即spring.cloud.loadbalancer.configurations = request-based-sticky-session
		@Conditional(RequestBasedStickySessionConfigurationCondition.class)
		public ServiceInstanceListSupplier requestBasedStickySessionDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder()
                .withBlockingDiscoveryClient() // DiscoveryClientServiceInstanceListSupplier
                .withRequestBasedStickySession() // RequestBasedStickySessionServiceInstanceListSupplier
				.build(context);
		}
    }

创建

public final class ServiceInstanceListSupplierBuilder {
    public ServiceInstanceListSupplierBuilder withRequestBasedStickySession() {
		DelegateCreator creator = (context, delegate) -> {
			LoadBalancerClientFactory loadBalancerClientFactory = context.getBean(LoadBalancerClientFactory.class);
			return new RequestBasedStickySessionServiceInstanceListSupplier(delegate, loadBalancerClientFactory);
		};
        // 将新创建好的RequestBasedStickySessionServiceInstanceListSupplier添加到creators
		this.creators.add(creator);
		return this;
	}
}

使用

public class RequestBasedStickySessionServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
    public RequestBasedStickySessionServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
			ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
		super(delegate);
        // spring.cloud.loadbalancer.clients.{serviceId}.*
        // 如果spring.cloud.loadbalancer.clients中没有serviceId对应的配置就用默认的即spring.cloud.loadbalancer.*
		this.properties = loadBalancerClientFactory.getProperties(getServiceId());
	}
    @Override
	public Flux<List<ServiceInstance>> get() {
		return delegate.get();
	}

	@SuppressWarnings("rawtypes")
	@Override
	public Flux<List<ServiceInstance>> get(Request request) {
        // spring.cloud.loadbalancer.clients.{serviceId}.stickySession.instanceIdCookieName
		String instanceIdCookieName = properties.getStickySession().getInstanceIdCookieName();
        // request为LoadBalancerRequestAdapter,context默认下为RequestDataContext,详情请见前文
		Object context = request.getContext();
		if ((context instanceof RequestDataContext)) {
			MultiValueMap<String, String> cookies = ((RequestDataContext) context).getClientRequest().getCookies();
			String cookie = cookies.getFirst(instanceIdCookieName);
			if (cookie != null) {
                // 如果request含有key为instanceIdCookieName值的cookie,则挑选instanceId为其值的serviceInstance
				return delegate.get(request).map(serviceInstances -> selectInstance(serviceInstances, cookie));
			}
			return delegate.get(request);
		}
		return delegate.get(request);
	}

    // 通过cookie过滤serviceInstances
	private List<ServiceInstance> selectInstance(List<ServiceInstance> serviceInstances, String cookie) {
		for (ServiceInstance serviceInstance : serviceInstances) {
			if (cookie.equals(serviceInstance.getInstanceId())) {
				return Collections.singletonList(serviceInstance);
			}
		}
		return serviceInstances;
	}
}

实践

yml配置

spring:
  cloud:
    loadbalancer:
      configurations: request-based-sticky-session # 这里一定要是health-check才能开启HealthCheckServiceInstanceListSupplier
      clients:
        myStickClient: 
          sticky-session:
            instance-id-cookie-name: sc-lb-instance-id
            add-service-instance-cookie: false

User微服务

对HelloWorld中的UseController做下修改

@RestController
@RequestMapping("/user")
public class UserController {
    @Autowired
    RestTemplate restTemplate;

    @GetMapping
    public void getProduct() {
        HttpHeaders headers = new HttpHeaders();
        List<String> cookies = new ArrayList<>();
        // 添加cookie
        cookies.add("sc-lb-instance-id=localhost:product:5002");
        headers.put(HttpHeaders.COOKIE, cookies);
        HttpEntity<String> htpEntity = new HttpEntity<String>(headers);
        ResponseEntity<String> result = restTemplate.exchange("http://product/product", HttpMethod.GET, htpEntity, String.class);
//        String result = restTemplate.getForObject("http://product/product", String.class);
        System.out.println(result.getBody());
    }
}

重新启动User微服务,浏览器输入http://localhost:2222/user,按下回车就可以在User微服务的日志中看到5002,因为我们选择了sc-lb-instance-id=localhost:product:5002,多试几次依旧是5002。

4.2.5 SameInstancePreferenceServiceInstanceListSupplier

yml配置

spring:
  cloud:
    loadbalancer:
      configurations: same-instance-preference # 这里一定要是same-instance-preference才能开启HealthCheckServiceInstanceListSupplier

Configuration配置

	public static class BlockingSupportConfiguration {
        @Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
        // SameInstancePreferenceConfigurationCondition 即spring.cloud.loadbalancer.configurations = same-instance-preference
		@Conditional(SameInstancePreferenceConfigurationCondition.class)
		public ServiceInstanceListSupplier sameInstancePreferenceServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder()
                .withBlockingDiscoveryClient() // DiscoveryClientServiceInstanceListSupplier
                .withSameInstancePreference() // SameInstancePreferenceServiceInstanceListSupplier
					.build(context);
		}
    }

创建

public final class ServiceInstanceListSupplierBuilder {
    public ServiceInstanceListSupplierBuilder withSameInstancePreference() {
		DelegateCreator creator = (context,
				delegate) -> new SameInstancePreferenceServiceInstanceListSupplier(delegate);
        // 将SameInstancePreferenceServiceInstanceListSupplier添加到creators
		this.creators.add(creator);
		return this;
	}
}

使用

public class SameInstancePreferenceServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier
		implements SelectedInstanceCallback {
    private ServiceInstance previouslyReturnedInstance;
    @Override
	public Flux<List<ServiceInstance>> get() {
		return delegate.get() // 通过DiscoveryClientServiceInstanceListSupplier获得数据源
            .map(this::filteredBySameInstancePreference); // 过滤
	}

    // 1、首先过滤第一遍返回List<ServiceInstance>给RoundRobinLoadBalancer挑选
	private List<ServiceInstance> filteredBySameInstancePreference(List<ServiceInstance> serviceInstances) {
        // 如果previouslyReturnedInstance存在则直接返回previouslyReturnedInstance
		if (previouslyReturnedInstance != null && serviceInstances.contains(previouslyReturnedInstance)) {
			return Collections.singletonList(previouslyReturnedInstance);
		}
		previouslyReturnedInstance = null;
        // 如果不存在返回原数据,通过RoundRobinLoadBalancer选择后再更新previouslyReturnedInstance的值,见后文
		return serviceInstances;
	}

    // 2、RoundRobinLoadBalancer挑选第一遍后会再调用selectedServiceInstance更新previouslyReturnedInstance,详情请见RoundRobinLoadBalancer.processInstanceResponse方法
	@Override
	public void selectedServiceInstance(ServiceInstance serviceInstance) {
        // 如果previouslyReturnedInstance为null或者和前值不同,则更新previouslyReturnedInstance值
		if (previouslyReturnedInstance == null || !previouslyReturnedInstance.equals(serviceInstance)) {
			previouslyReturnedInstance = serviceInstance;
		}
	}
}

4.2.6 RetryAwareServiceInstanceListSupplier

要使RetryAwareServiceInstanceListSupplier必须满足下面3个条件

yml配置

spring:
  cloud:
    loadbalancer:
      retry:
        enabled: true # 1、默认true, 默认开启RetryAwareServiceInstanceListSupplier
        avoid-previous-instance: true # 2、默认true,必须true才能开启
        retry-on-all-operations: false
        max-retries-on-same-service-instance: 0
        max-retries-on-next-service-instance: 1
        retryable-status-codes:
        - 1
        backoff:
          enabled: false
          min-backoff: 5
          max-backoff: 10 #Long.MAX_VALUE
          jitter: 1

pom.xml配置

<!-- 3、加入retry依赖 -->
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>1.3.4</version>
</dependency>

重新启动后发起请求并在RoundRobinLoadBalancer打断点即可看到其被使用

接下来分析为什么会需要这三个条件

Configuration配置

	@Configuration(proxyBeanMethods = false)
	@ConditionalOnBlockingDiscoveryEnabled
	// 1、RetryTemplate必须存在,也即要引入spring-retry依赖
	@ConditionalOnClass(RetryTemplate.class)
    // 2、spring.cloud.loadbalancer.retry.enabled = true
	// 3、spring.cloud.loadbalancer.retry.avoid-previous-instance = true
	@Conditional(BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition.class)
	@AutoConfigureAfter(BlockingSupportConfiguration.class)
	// 请注意,这里必须存在ServiceInstanceListSupplier的实例,也就是前面我们分析的Supplier必须要存在,默认为Cache的Supplier
	@ConditionalOnBean(ServiceInstanceListSupplier.class)
	public static class BlockingRetryConfiguration {

		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@Primary
		public ServiceInstanceListSupplier retryAwareDiscoveryClientServiceInstanceListSupplier(
				ServiceInstanceListSupplier delegate) {
            // delegate默认为CachingServiceInstanceListSupplier,参考前面的debug图
			return new RetryAwareServiceInstanceListSupplier(delegate);
		}

	}

因为spring.cloud.loadbalancer.retry.enabled = true,所以不会使用默认的拦截器LoadBalancerInterceptor,而是RetryLoadBalancerInterceptor,先看下它的Configuration

	@Configuration(proxyBeanMethods = false)
	// RetryTemplate必须存在
	@ConditionalOnClass(RetryTemplate.class)
	// 默认为LoadBalancerClientFactory
	@ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
	// 默认true
	@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", matchIfMissing = true)
	public static class RetryInterceptorAutoConfiguration {

		@Bean
		@ConditionalOnMissingBean
		public RetryLoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory, LoadBalancedRetryFactory loadBalancedRetryFactory,
				ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
			return new RetryLoadBalancerInterceptor(loadBalancerClient, requestFactory, loadBalancedRetryFactory,
					loadBalancerFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
                // 将RetryLoadBalancerInterceptor添加到restTemplate中进行拦截
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}

RetryLoadBalancerInterceptor和之前默认的LoadBalancerInterceptor拦截过程不一样,因此着重分析下,下面的源码为了简便分析经过了处理

public class RetryLoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    @Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
        // 从URL中获取hostname作为serviceId
		final String serviceName = originalUri.getHost();
        // 1、创建BlockingLoadBalancedRetryPolicy
		final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName, loadBalancer);
        // 2、创建RetryTemplate
		RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
        // 3、template.execute
		return template.execute(context -> {
            // 3.1、此处即retryCallback.doWithRetry(context)的内容
            ......
			}, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
			// 3.2、此处即recoveryCallback.recover(context)的内容
            ......
		});
	}
}

在通过流程图看下RetryLoadBalancerInterceptor的intercept过程

在这里插入图片描述

接下来我们按着步骤分析RetryLoadBalancerInterceptor.intercept的过程

LoadBalancedRetryPolicy

先看下**lbRetryFactory.createRetryPolicy(serviceName, loadBalancer)**的过程

public class BlockingLoadBalancedRetryFactory implements LoadBalancedRetryFactory {
	@Override
	public LoadBalancedRetryPolicy createRetryPolicy(String serviceId, ServiceInstanceChooser serviceInstanceChooser) {
        // 根据serviceId的配置创建BlockingLoadBalancedRetryPolicy
		return new BlockingLoadBalancedRetryPolicy(loadBalancerFactory.getProperties(serviceId));
	}
}

再看下BlockingLoadBalancedRetryPolicy的类图,分析下它的功能

在这里插入图片描述

从它的方法来看主要是提供retry的判断,看下其详细代码

public class BlockingLoadBalancedRetryPolicy implements LoadBalancedRetryPolicy {
    // 满足以下任一条件可以retry:
    // 	1、request的method为GET
    // 	2、spring.cloud.loadbalancer.clients.{serviceId}.retry.retryOnAllOperations = true
	public boolean canRetry(LoadBalancedRetryContext context) {
		HttpMethod method = context.getRequest().getMethod();
		return HttpMethod.GET.equals(method) || properties.getRetry().isRetryOnAllOperations();
	}

    // spring.cloud.loadbalancer.clients.{serviceId}.retry.maxRetriesOnSameServiceInstance,默认0
	@Override
	public boolean canRetrySameServer(LoadBalancedRetryContext context) {
		return sameServerCount < properties.getRetry().getMaxRetriesOnSameServiceInstance() && canRetry(context);
	}

    // spring.cloud.loadbalancer.clients.{serviceId}.retry.maxRetriesOnNextServiceInstance,默认1
	@Override
	public boolean canRetryNextServer(LoadBalancedRetryContext context) {
		return nextServerCount <= properties.getRetry().getMaxRetriesOnNextServiceInstance() && canRetry(context);
	}

	@Override
	public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
		if (!canRetrySameServer(context) && canRetry(context)) {
			// SameServer重试次数耗尽,使用NextServer的重试次数
			sameServerCount = 0;
			nextServerCount++;
			if (!canRetryNextServer(context)) {
                // NextServer的重试次数也耗尽,设置不能重试
				context.setExhaustedOnly();
			}
			else {
				context.setServiceInstance(null);
			}
		}
		else {
            // 首先使用SameServer的重试次数
			sameServerCount++;
		}
	}

    // spring.cloud.loadbalancer.clients.{serviceId}.retry.retryableStatusCodes,默认控制。
    // 这里的statusCode即400、404这种值
	@Override
	public boolean retryableStatusCode(int statusCode) {
		return properties.getRetry().getRetryableStatusCodes().contains(statusCode);
	}

}
RetryTemplate

首先看下它的类图

在这里插入图片描述

再看下其创建的过程

private RetryTemplate createRetryTemplate(String serviceName, HttpRequest request,
			LoadBalancedRetryPolicy retryPolicy) {
		RetryTemplate template = new RetryTemplate();
    // 默认NoBackOffPolicy
		BackOffPolicy backOffPolicy = lbRetryFactory.createBackOffPolicy(serviceName);
		template.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
		template.setThrowLastExceptionOnExhausted(true);
    // 默认RetryListener[0]
		RetryListener[] retryListeners = lbRetryFactory.createRetryListeners(serviceName);
		if (retryListeners != null && retryListeners.length != 0) {
			template.setListeners(retryListeners);
		}
    // 默认InterceptorRetryPolicy
		template.setRetryPolicy(
            // spring.cloud.loadbalancer.clients.{serviceId}.retry.enabled
				!loadBalancerFactory.getProperties(serviceName).getRetry().isEnabled() || retryPolicy == null
						? new NeverRetryPolicy()
						: new InterceptorRetryPolicy(request, retryPolicy, loadBalancer, serviceName));
		return template;
	}
RetryTemplate.execute()

整体捋一遍源码中是怎么运行途中的过程的,这里为了方便只留下主体部分的代码

protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {

		RetryPolicy retryPolicy = this.retryPolicy; // InterceptorRetryPolicy
		BackOffPolicy backOffPolicy = this.backOffPolicy; // NoBackOffPolicy
    	// 1、创建LoadBalancedRetryContext
		RetryContext context = open(retryPolicy, state);
		try {
            // 2、判断是否重试,如果是首次请求或者BlockingLoadBalancedRetryPolicy.canRetryNextServer
			while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
				try {
					lastException = null;
                    // 3、获取serviceInstance逻辑
					return retryCallback.doWithRetry(context);
				}
				catch (Throwable e) {
                    // 4、出现异常,更新BlockingLoadBalancedRetryPolicy.registerThrowable方法
						registerThrowable(retryPolicy, state, context, e);
                    // 5、如果还能重试,执行backOffPolicy,本过程用的是NoBackOffPolicy,不做任何事
					if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
							backOffPolicy.backOff(backOffContext);
					}

				}
			}
            // 6、重试次数耗尽也没能获得成功,执行recoveryCallback返回结果
			return handleRetryExhausted(recoveryCallback, context, state);
		}
		catch (Throwable e) {
			throw RetryTemplate.<E>wrapIfNecessary(e);
		}
		finally {
			close(retryPolicy, context, state, lastException == null || exhausted);
			doCloseInterceptors(retryCallback, context, lastException);
			RetrySynchronizationManager.clear();
		}

	}

这里要着重分析的是第3步的retryCallback.doWithRetry(context),按照惯例删除掉不影响主体的代码

public class RetryLoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    @Override
	public ClientHttpResponse intercept(...) throws IOException {
		......
		return template.execute(context -> {
			ServiceInstance serviceInstance = null;
            // 1、先从LoadBalancedRetryContext中获取serviceInstance
			if (context instanceof LoadBalancedRetryContext) {
				LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
				serviceInstance = lbContext.getServiceInstance();
			}
			String hint = getHint(serviceName);
			if (serviceInstance == null) {
                // 2、首次获取serviceInstance,走类似LoadBalancerInterceptor的过程获取serviceInstance
				ServiceInstance previousServiceInstance = null;
				if (context instanceof LoadBalancedRetryContext) {
					LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
					previousServiceInstance = lbContext.getPreviousServiceInstance();
				}
                // 3、将httprequest包装成DefaultRequest
				DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>(
						new RetryableRequestContext(previousServiceInstance, new RequestData(request), hint));
                // 4、通过BlockingLoadBalancerClient.choose(...)获取serviceInstance,过程不再重复,
                // 只需要知道中间需要用到Supplier,而本过程中用到了RetryAwareServiceInstanceListSupplier
				serviceInstance = loadBalancer.choose(serviceName, lbRequest);
                // 5、将serviceInstance保存到LoadBalancedRetryContext中,更新previousServiceInstance
				if (context instanceof LoadBalancedRetryContext) {
					LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
					lbContext.setServiceInstance(serviceInstance);
				}
				Response<ServiceInstance> lbResponse = new DefaultResponse(serviceInstance);
			}
            // 6、包装成LoadBalancerRequestAdapter
			LoadBalancerRequestAdapter<ClientHttpResponse, RetryableRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(
					requestFactory.createRequest(request, body, execution),
					new RetryableRequestContext(null, new RequestData(request), hint));
			ServiceInstance finalServiceInstance = serviceInstance;
            // 7、获取结果
			ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute(serviceName,
					finalServiceInstance, lbRequest);
			int statusCode = response.getRawStatusCode();
            // 8、如果是因为http请求返回异常结果,且statusCode在定义的retryable-status-codes中,则抛出异常进行重试
            // 这也就是为什么会有第1步,第5步保存serviceInstance后再次retryCallback.doWithRetry(context)时不用走第2/3/4/5步
			if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
				byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
				response.close();
				throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);
			}
			return response;
		}, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
			@Override
			protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
				return response;
			}
		});
	}
}

创建

上一步介绍完RetryLoadBalancerInterceptor后就到了与之配对的RetryAwareServiceInstanceListSupplier

public class RetryAwareServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
    @Override
	public Flux<List<ServiceInstance>> get(Request request) {
        // 如果context不是RetryableRequestContext则直接走后续的CachingServiceInstanceListSupplier
        // retryCallback.doWithRetry(context)中第3步已包装成RetryableRequestContext
		if (!(request.getContext() instanceof RetryableRequestContext)) {
			return delegate.get(request);
		}
		RetryableRequestContext context = (RetryableRequestContext) request.getContext();
		ServiceInstance previousServiceInstance = context.getPreviousServiceInstance();
        // previousServiceInstance为空则重新拉取List<ServiceInstance>
		if (previousServiceInstance == null) {
			return delegate.get(request);
		}
        // 剔除previousServiceInstance,即重试的话不要使用上一次失败的ServiceInstance
		return delegate.get(request).map(instances -> filteredByPreviousInstance(instances, previousServiceInstance));
	}

	private List<ServiceInstance> filteredByPreviousInstance(List<ServiceInstance> instances,
			ServiceInstance previousServiceInstance) {·
		List<ServiceInstance> filteredInstances = new ArrayList<>(instances);
        // 把previousServiceInstance从instances中移除
		if (previousServiceInstance != null) {
			filteredInstances.remove(previousServiceInstance);
		}
		if (filteredInstances.size() > 0) {
			return filteredInstances;
		}
		return instances;
	}

	@Override
	public Flux<List<ServiceInstance>> get() {
		return delegate.get();
	}
}

关于服务调用的LoadBalance就介绍到此,从HelloWorld中每次请求其他服务的时候都需要用restTemplate来调用,这在开发中很不灵活方便,SpringCloud提供了声明式调用–OpenFeign。接下来开始分析OpenFeign

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90661.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux文本三剑客之grep命令

Linux文本三剑客之grep命令 1. grep 命令 介绍 grep 命令的基本语法格式和参数列表&#xff01; 文本搜索工具&#xff0c;根据用户指定的”模式”对目标文本逐行进行匹配检查&#xff0c;打印匹配到的行。 模式&#xff1a;由正则表达式字符及文本字符所编写的过滤条件&am…

MySQL 中截取字符串的方法

LEFT(str, len) 从左边开始截取&#xff0c;如果字符串为 null 则返回null。 str&#xff1a;被截取字符串&#xff1b;len&#xff1a;截取长度 SELECT LEFT(ABCDEFT, 2) FROM sub_str;RIGHT(str, len) 从右边开始截取&#xff0c;如果字符串为 null 则返回null。 str&…

0125 搜索与回溯算法 Day14

剑指 Offer 12. 矩阵中的路径 给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成&#xff0c;其中“相邻…

GLAD:带有反射壁的空心波导

概述 离散傅里叶变换的混叠效应为带有反射壁的空心波导的建模提供了一个便捷的方法。反射壁可以将光返回到光路中而混叠效应将使溢出光场从反方向折回到采样光场中。如果光场分布是一个偶函数&#xff0c;那么折回的作用就如同反射效果。我们可以将任意形状的光场分布转化成…

推荐一款免费的AI绘图软件,可生成二次元画作和3D模型

随着AI绘画的火热&#xff0c;市面上关于AI绘画的话题居高不小&#xff0c;各种教程、软件、小程序也是满天飞&#xff0c;在这些眼花缭乱的推荐中&#xff0c;究竟哪一款ai绘图软件才是真正适合自己的&#xff0c;不但免费&#xff0c;生成出来的二次元画作还很精美&#xff1…

早教资源网站

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 网站前台&#xff1a;关于我们、联系我们、公告信息、二手物品、资源信息 管理员功能&#xff1a; 1、管理关于我们、联…

Django 第三天学习笔记

1.模板层-变量和标签 能够传递到Django模板中的数据类型&#xff1a; 1.str 字符串 2.Int 整形 3.List 数组 4.Tuple 元组 5.Dict 字典 6.Func 方法 7.Obj 类的实例化对象。 在模板中使用的变量的语法&#xff1a; {{变量名}}{{变量名.index}} #索引{{变量名.key}} #获取字典对…

数据结构顺序栈

栈 这是大话数据结构种对于栈的描述 可以看到 栈是一种特殊的线性表 它只能在尾部进行元素的插入和删除 但是在栈种 这叫做 入栈 和 出栈 而且它遵循 先进入的元素后出 后进入的元素先出 即就是我们常听说的 先进后出 和后进先出 这里就有一个简单的例子 先进后出 后进先出…

【Node.js】实现微信小程序在线支付功能

实战项目名称&#xff1a;微信小程序实现在线支付功能 - 文章结尾附上微信小程序码&#xff0c;扫码登录后即可体验&#xff01;&#xff01; 文章目录一、实战步骤1. 前期准备2. 添加wechatpay-node-v3和fs插件3. 预设微信下单的数据4. 将上一步骤的下单信息返回给前端5. 小程…

在抖音全程看世界杯,超高清直播背后的硬实力

导语&#xff1a;IT技术赛场开赛&#xff01;作者 | 宋慧 出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;当前&#xff0c;2022 卡塔尔世界杯比赛正在如火如荼进行中&#xff0c;处在更加激烈关键的半决赛阶段。作为足球运动的全球顶级赛事&#xff0c;世界杯…

SysML图例-核聚变

DDD领域驱动设计批评文集>> 《软件方法》强化自测题集>> 《软件方法》各章合集>> [新闻]核聚变里程碑式突破>> SysML图中词汇&#xff1a; Tokamak&#xff1a; 一种利用磁约束来实现受控核聚变的环形容器&#xff0c;通过约束电磁波驱动&#xff…

如何形成前端知识体系

来啦各位大佬&#xff5e;但很不好意思&#xff0c;我就是标题党&#xff0c;这篇博文并没有很明确的给出「如何形成前端知识体系」答案&#xff0c;我自学前端&#xff0c;在面试字节的时候&#xff0c;字节的大佬说我的知识点没有成体系&#xff0c;很零散的飘在各个地方&…

面试官:你如何实现大文件上传

提到大文件上传&#xff0c;在脑海里最先想到的应该就是将图片保存在自己的服务器&#xff08;如七牛云服务器&#xff09;&#xff0c;保存在数据库&#xff0c;不仅可以当做地址使用&#xff0c;还可以当做资源使用&#xff1b;或者将图片转换成base64&#xff0c;转换成buff…

怎么复制网页上不能复制的文字(付费文档免费复制),一招搞定

好多小伙伴上网查资料的时候&#xff0c;想要复制网页内容&#xff0c;但是提示付费复制或者不允许复制&#xff0c;遇到这种情况怎么办呢&#xff1f;下面就是小编分享的一招搞定无法复制网页内容文字的方法。 怎么复制网页上不能复制的文字 借助360安全浏览器/360极速浏览器…

Minecraft 1.19.2 Forge模组开发 06.建筑生成

1.12.2自定义建筑生成 1.16.5自定义建筑生成 1.18.2自定义建筑生成 我们本次尝试在主世界生成一个自定义的建筑。 效果展示效果展示效果展示 由于版本更新缘故&#xff0c;1.19的建筑生成将不涉及任何Java包的代码编写&#xff0c;只需要在数据包中对建筑生成进行自定义。 …

基于粒子群优化算法的BP神经网络预测模型(Matlab代码实现)

目录 1 概述 2 粒子群优化算法 3 BP神经网络 4 PSO优化 BP网络算法 5 运行结果 6 参考文献 7 Matlab代码实现 1 概述 在工程应用中经常会遇到一些复杂的非线性系统,这些系统的状态方程复杂,难以准确的用数学方法建模,而BP神经网络实质上实现了一个从输入到输出的映射功…

【k8s】Kubernetes 基础组件详解

一、k8s简介 Kubernetes 是容器集群管理系统工具&#xff0c;是一个开源平台&#xff0c;可实现容器集群的自动化部署、自动扩缩容、维护等功能。Kubernetesk8s是Kubernetes的缩写&#xff0c;Google 于 2014 年开源了 Kubernetes 项目&#xff0c;Kubernetes的名字来自希腊语&…

无延时直播/超低延时直播实际测试延时效果(项目实测组图)

阿酷TONY / 2022-11-30 / 长沙 / 超多组图 无延时直播/超低延时直播&#xff0c;主要只测试延时情况&#xff0c;没有涉及直播产品的功能、使用操作界面&#xff0c;有兴趣的朋友可以加联系我实际测试哦~~~ 1.无延时直播应用场景 无延时直播/超低延时常见应用场景&#…

近90天互动量破百万,「围炉煮茶」究竟做对了什么?

今年秋冬&#xff0c;“围炉煮茶”爆红网络。小红书相关笔记数量突破8万&#xff0c;累计话题浏览量1200万次&#xff0c;近90天互动量破百万&#xff01; 茶&#xff0c;从老一辈的茶杯茶盘里&#xff0c;通过创新再造&#xff0c;成为年轻人的社交“新”头好。高流量曝光、高…

Go语言性能剖析利器--pprof实战

作者&#xff1a;耿宗杰 前言 关于pprof的文章在网上已是汗牛充栋&#xff0c;却是千篇一律的命令介绍&#xff0c;鲜有真正实操的&#xff0c;本文将参考Go社区资料&#xff0c;结合自己的经验&#xff0c;实战Go程序的性能分析与优化过程。 优化思路 首先说一下性能优化的…