文章目录
- 一、背景
- 二、总体流程
- 三、源码解析
- 1. lb拦截器配置
- 2. LB拦截器实现
- 3. LB执行前置处理
- 4. 负载均衡
- 5. LB执行http请求
一、背景
Spring Cloud 2020版本以后,默认移除了对Netflix的依赖,其中就包括Ribbon,官方默认推荐使用Spring Cloud Loadbalancer正式替换Ribbon,并成为了Spring Cloud负载均衡器的唯一实现。LoadBalancer也可以看做是一种进程级的LB,后面用LB代指LoadBalancer。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
二、总体流程
主要是通过LoadBalancerInterceptor添加到restTemplate中,加入了负载均衡的策略,完成负载均衡的功能。
三、源码解析
1. lb拦截器配置
首先通过spring.facotries配置,自动条件加载LoadBalancerAutoConfiguration.LoadBalancerInterceptorConfig配置,完成拦截器的配置。
@Configuration(proxyBeanMethods = false)
@Conditional(RetryMissingOrDisabledCondition.class)
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
// 创建默认的LB拦截器
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
// 将LB拦截器配置到RestTemplate中
restTemplate.setInterceptors(list);
};
}
}
2. LB拦截器实现
LB拦截器里面实现了intercept方法,将LB负载均衡逻辑的需要的参数装填好,调用LoadBancerClient的execute方法
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
// 服务名
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
// 将HttpRequest、body、execution 保证成LoadBalancerRequest,具体下面源码
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
public class LoadBalancerRequestFactory {
private final LoadBalancerClient loadBalancer;
private final List<LoadBalancerRequestTransformer> transformers;
public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
List<LoadBalancerRequestTransformer> transformers) {
this.loadBalancer = loadBalancer;
this.transformers = transformers;
}
public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
this.loadBalancer = loadBalancer;
transformers = new ArrayList<>();
}
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
// 将LoadBalancerClient, 请求转换器,HttpRequest,body,execution设置到BlockingLoadBalancerRequest
return new BlockingLoadBalancerRequest(loadBalancer, transformers,
new BlockingLoadBalancerRequest.ClientHttpRequestData(request, body, execution));
}
}
3. LB执行前置处理
LB执行前前置处理,主要处理灰度配置,LB监控、根据配置的负载均衡策略选择调用实例
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
// 获取灰度配置,默认headerName=X-SC-LB-Hint
String hint = getHint(serviceId);
// 将灰度配置放单LBRequest中
LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,
buildRequestContext(request, hint));
// 获取支持当前服务的LB生命周期管理实例
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
// 执行onStart方法
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 根据服务名、请求,选择服务实例
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
if (serviceInstance == null) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
throw new IllegalStateException("No instances available for " + serviceId);
}
// 实际http请求
return execute(serviceId, serviceInstance, lbRequest);
}
// 负载均衡策略选择实例
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
// 获取灰度配置
private String getHint(String serviceId) {
LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
String defaultHint = properties.getHint().getOrDefault("default", "default");
String hintPropertyValue = properties.getHint().get(serviceId);
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
4. 负载均衡
目前负载均衡算法主要由两个实现RandomLoadBalancer,RoundRobinLoadBalancer,目前代码以RoundRobinLoadBalancer为例进行分析。
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
// 轮询算法实现
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
此外,负载均衡还和ServiceInstanceListSupplier#get方法有关,灰度策略就是在HintBasedServiceInstanceListSupplier重新实现了get方法,实现了灰度调度功能,源码分析以HintBasedServiceInstanceListSupplier为例进行分析。
@Override
public Flux<List<ServiceInstance>> get(Request request) {
return delegate.get(request).map(instances -> filteredByHint(instances, getHint(request.getContext())));
}
private List<ServiceInstance> filteredByHint(List<ServiceInstance> instances, String hint) {
if (!StringUtils.hasText(hint)) {
return instances;
}
List<ServiceInstance> filteredInstances = new ArrayList<>();
for (ServiceInstance serviceInstance : instances) {
// 根据灰度策略过滤服务实例
if (serviceInstance.getMetadata().getOrDefault("hint", "").equals(hint)) {
filteredInstances.add(serviceInstance);
}
}
if (filteredInstances.size() > 0) {
return filteredInstances;
}
// If instances cannot be found based on hint,
// we return all instances retrieved for given service id.
return instances;
}
5. LB执行http请求
最后就是进行http请求。
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException {
DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));
try {
// 实际执行http请求
T response = request.apply(serviceInstance);
LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
Object clientResponse = getClientResponse(response, properties.isUseRawStatusCodeInResponseData());
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
lbRequest, defaultResponse, clientResponse)));
return response;
}
catch (IOException iOException) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));
throw iOException;
}
catch (Exception exception) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
ReflectionUtils.rethrowRuntimeException(exception);
}
return null;
}
@Override
public ClientHttpResponse apply(ServiceInstance instance) throws Exception {
// 修改httpRequest的uri中host为选择服务的地址,具体见ServiceRequestWrapper
HttpRequest serviceRequest = new ServiceRequestWrapper(clientHttpRequestData.request, instance, loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
// 请求转换器,处理httprequest
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
// 跳转回http处理链,进行真正http请求
return clientHttpRequestData.execution.execute(serviceRequest, clientHttpRequestData.body);
}
public class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
private final LoadBalancerClient loadBalancer;
public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) {
super(request);
this.instance = instance;
this.loadBalancer = loadBalancer;
}
@Override
public URI getURI() {
// 将原uri中host替换为实例的地址,这个是LB真正的目的
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}
}