目录
前言
1.原理分析
1.1 ReactiveLoadBalancerClientFilter源码分析
1.2 LoadBalancerClientFactory源码分析
2.代码实现
2.1 扩展原生RoundRobinLoadBalancer轮询策略
2.1.1 自定义实现RoundRobinLoadBalancer
2.1.2 配置自定义的RoundRobinLoadBalancer
2.2 扩展原生RandomLoadBalancer随机策略
2.2.1 自定义实现RandomLoadBalancer
2.2.2 配置自定义的RandomLoadBalancer
2.3 动态绑定client和loadBalancer并注册到LoadBalancerClientFactory
前言
工作和兴趣的使然,由于需要对各种开源的项目做一些自定义的插件以及扩展,所以会经常研究一些开源组件的源码。正好前段阵子公司内部计划进行产品依赖版本升级,springcloud升级到2021.0.6,spring boot升级到2.7.11了,自然spring cloud gateway就随之升级到了3.1.6,带来的问题就是gateway内部组件的大调整和更新,比如scg在Hoxton.M2 RELEASED版本之前,内部的负载均衡组件一直用的都是ribbon,现在新版本的cloud包括boot内部的负载均衡组件统一用了spring自己的loadbalancer,那么我们开发的针对于老版本的一些插件和扩展就不能用了,需要对新的组件进行适配了,本人正好负责这次的版本升级工作,所以想空闲时间就把这些升级的东西和过程写下来。都是本人手搓的一手代码,创作不易,望诸君高台贵手,点赞支持。
1.原理分析
直接进入主题,本文主要核心有两点:
- 针对scg的loadbalancer组件做自定义扩展,官方内置的负载策略目前只有两种:RandomLoad和RoundLoad,那么我们在实际业务中这两种可能满足不了业务需求,例如:使用轮询策略RoundRobinLoadBalancer会每次轮询访问目标服务实例,但是有可能业务上会有一些自己业务的判断逻辑,比如我的下游目标服务起了5个实例,每次轮询都会在这五个实例里面公平的循环轮询,现在公司说service端需要升级,但是又不想强制要client端升级才能使用,这样客户才会比较好容易接受,所以现在把其中3个实例升级到最新版本,剩下的两个实例还是老版本,那么在网关这里如果还是使用的老的轮询策略,就会出现一个问题:假如当前请求是来自老的客户端,那么如果轮询到了新版本实例上去了或者新客户端请求轮询到老的版本实例上去了,就有可能会出现一系列问题,这个时候就需要在轮询的时候加一些自己业务的判断上去,好让实例choose能满足业务需求。
- 实现一个动态配置服务负载均衡策略的逻辑:就比如在scg里面,服务a使用轮询,服务b使用随机,运行了一段时间后,想把服务a的负载均衡策略改成随机或者是其他自定义的策略,无需改代码以及启停服务,直接动态修改配置热更新生效
1.1 ReactiveLoadBalancerClientFilter源码分析
查看scg的源码我们不难发现,scg内部负载均衡的核心逻辑都是由一个GlobalFilter来实现的,这个全局filter叫:ReactiveLoadBalancerClientFilter,查看这个filter的代码我们可以看到它有个核心方法choose(),这个方法的作用就是根据当前所需要路由的serviceId来选择对应的loadBalancer,然后通过loadBalancer来选择出最终要路由到的serviceInstance,那么通过这个方法我们能抓到两个核心要素:
1.拿到路由目标service对应的loadBalancer;
2.通过loadBalancer来choose出最终要路由的实例serviceInstance。
经过上面分析,我们就能明白了,实现自定义loadBalancer的核心逻辑就是:
1.自定义自己的ReactorLoadBalancer;
2.在自定义的ReactorLoadBalancer里面实现自己的choose逻辑;
3.动态将serviceId和自己的loadBalancer绑定并注册到LoadBalancerClientFactory(由代码this.clientFactory.getInstance()可知)中去。
1.2 LoadBalancerClientFactory源码分析
spring cloud loadBalancer是怎么实现每个client绑定自己的配置的呢,官方提供了两个注解:@LoadBalancerClient和@LoadBalancerClients,首先我们来看@LoadBalancerClient的源码,可以看到@LoadBalancerClient提供了三个参数:name、value、configuration[],看注解就知道name、value就是配置客户端的名称(客户端名称我们就可以理解为一个应用的applicationName,因为在实际负载中都是用服务名作为clientId、serviceId),configuration就是客户端对应的自定义的配置,包括负载策略的配置、健康检查策略的配置等等,这里我们只需要关注负载策略的自定义就好,其他的用默认就行,如果有需求也可以都自定义。
接下来我们看@LoadBalancerClients源码
可以发现有个@Import注解,我们打开Import里面的LoadBalancerClientConfigurationRegister类,发现它实现了ImportBeanDefinitionRegister接口并重写了registerBeanDefinitions方法,我们重点看下这个方法,发现它里面干了3件事:
1.获取所有的@LoadBalancerClients注解的元数据,拿到代码里LoadBalancerClients注解的配置,然后根据配置进行loadBalancerClientConfiguration的绑定;
2.如果第一步里面@LoadBalancerClients里面如果配置的是defaultConfiguration,那么就用默认的配置进行绑定;
3.获取所有的@LoadBalancerClient注解的元数据,然后同样的拿到代码LoadBalancerClient注解的配置,然后根据配置进行loadBalancerClientConfiguration的绑定
那我们看下具体是怎么绑定的,看源码:
本质就是构建一个LoadBalancerClientSpecification bean,这个spec就是每个客户端自定义负载策略的核心bean,我们看下它的源码,不难发现,它有两个属性:name、configuration[],是不是发现似曾相识,就是@LoadBalancerClient注解的两个属性,所以最终就是为了装配这个bean,构建完ben之后,那俩注解的作用就完成了,接下来我们在继续看下spring是如何实现前面我们所讲的:在路由时,是如何通过serviceId获取到这个客户端配置的。
接下来我们继续,所有的配置动作已经解析完成了,那么就是将配置交给spring容器了,spring是如何加载这个LoadBalancerClientSpecification的呢,我们跟随配置:spring.cloud.loadbalancer.ebabled会发现有个自动装配类:LoadBalancerAutoConfiguration,查看这个配置类源码我们可以发现,有参构造的参数就是LoadBalancerClientSpecification,然后再进行初始化bean:LoadBalancerClientFactory这个loadBalancer的核心factory。
查看LoadBalancerClientFactory的源码我们可以看到它继承了:NamedContextFactory,那么这个作用是什么呢:子容器之间的数据隔离。NamedContextFactory的作用是创建一个子容器(子上下文context),然后每个子容器通过LoadBalancerClientSpecification来定义客户端容器name以及数据配置。我们回到开头所讲的ReactiveLoadBalancerClientFilter这个filter,在请求进来的时候通过LoadBalancerClientFactory拿serviceId去获取这个客户端对应的loadBalancer,跟进源码,我们发现它最终调用的是NamedContextFactory的getInstance()方法,然后调用getContext(),主要逻辑就是通过serviceId去获取子容器,如果没有那么就创建一个新的子容器(子上下文),查看源码我们不难发现,新的子容器name就是用的serviceId,然后再拿到对应的LoadBalancerClientSpecification来注册到子容器中去;所以到这里,再回到上面,我们看LoadBalancerAutoConfiguration中的LoadBalanceClientrFactory的初始化就干了一件事:将所有的客户端的配置LoadBalancerClientSpecification注册到NamedContextFactory中去;然后随着请求过来时,拿到已经注册好的LoadBalancerClientSpecification对当前请求的客户端进行子上下文的初始化。
createContext()源码截图:
到此,经过上述简单的源码分析,那么原理和实现方案我们就已经大致明白了,接下来就直接上代码来验证。
2.代码实现
想要实现自定义负载策略,首先需要实现官方接口:ReactorServiceInstanceLoadBalancer ,查看代码我们会发现这个接口是spring loadBalancer官方提供的接口,所有的策略都需要实现它,所以我们自定义的策略也不例外,具体代码如下:
2.1 扩展原生RoundRobinLoadBalancer轮询策略
2.1.1 自定义实现RoundRobinLoadBalancer
package com.primeton.gateway.core.lb;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description 轮询
* @Author wx
* @Date 2023/5/26
*/
public class GatewayRoundLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(GatewayRoundLoadBalancer.class);
final AtomicInteger position;
private final String serviceId;
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public GatewayRoundLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId) {
this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
}
public GatewayRoundLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId,
int seedPosition) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.position = new AtomicInteger(seedPosition);
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances,
Request request) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances, request);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,
Request request) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
// Do not move position when there is only 1 instance, especially some suppliers
// have already filtered instances
if (instances.size() == 1) {
return new DefaultResponse(instances.get(0));
}
List<ServiceInstance> useInstances = customChoose(instances, request);
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
ServiceInstance instance = useInstances.get(pos % useInstances.size());
return new DefaultResponse(instance);
}
/**
* 自定义instances choose出满足业务请求的实例,然后按照轮询策略来从
* 剩下的满足业务需求的实例列表选出最终的实例
*/
private List<ServiceInstance> customChoose(List<ServiceInstance> instances, Request request) {
//todo 比如根据request中的参数来筛选、 或者筛选出实例元数据中含有某些符合参数的实例 等等
List<ServiceInstance> use = new ArrayList<>();
for (ServiceInstance instance : instances) {
Map<String, String> metadata = instance.getMetadata();
if (metadata.containsKey("xxx")) use.add(instance);
}
return use;
}
}
2.1.2 配置自定义的RoundRobinLoadBalancer
package com.primeton.gateway.core.lb;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
/**
* @Description TODO
* @Author wx
* @Date 2024/6/13
*/
public class GatewayRoundLoadBalancerConfiguration {
@Bean
public GatewayRoundLoadBalancer gatewayRoundLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new GatewayRoundLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
2.2 扩展原生RandomLoadBalancer随机策略
2.2.1 自定义实现RandomLoadBalancer
package com.primeton.gateway.core.lb;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* @Description 随机
* @Author wx
* @Date 2023/5/26
*/
public class GatewayRandomLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(GatewayRandomLoadBalancer.class);
private final String serviceId;
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public GatewayRandomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances,
Request request) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances, request);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,
Request request) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
List<ServiceInstance> useInstances = customChoose(instances, request);
int index = ThreadLocalRandom.current().nextInt(useInstances.size());
ServiceInstance instance = useInstances.get(index);
return new DefaultResponse(instance);
}
//todo 节合实际业务来筛选
private List<ServiceInstance> customChoose(List<ServiceInstance> instances, Request request) {
return instances;
}
}
2.2.2 配置自定义的RandomLoadBalancer
package com.primeton.gateway.core.lb;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
/**
* @Description TODO
* @Author wx
* @Date 2024/6/13
*/
public class GatewayRandomLoadBalancerConfiguration {
@Bean
public GatewayRandomLoadBalancer gatewayRandomLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new GatewayRandomLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
2.3 动态绑定client和loadBalancer并注册到LoadBalancerClientFactory
看完前面的原理分析,那我们就明白要实现client动态绑定loadBalancer并注册到LoadBalancerClientFactory去,要做的就是两件事:1.根据业务配置对每个客户端进行LoadBalancerClientSpecification组装;2.组装好LoadBalancerClientSpecification注册到LoadBalancerClientFactory也就是NamedContextFactory上下文里面去。具体实现如下:
package com.primeton.gateway.core.lb;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientSpecification;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.EnumerablePropertySource;
import org.springframework.core.env.PropertySource;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @Description TODO
* @Author wx
* @Date 2023/5/26
*/
@Configuration
public class GatewayLoadBalancerConfiguration {
private static final String SUFFIX = ".loadbalancer.LoadBalancer-configuration-class-name";
@Autowired
private ConfigurableEnvironment env;
@Autowired
private LoadBalancerClientFactory loadBalancerClientFactory;
@PostConstruct
public void postConstruct() {
//第一步:解析业务配置,从而解析出来每个客户端对应的负载策略配置
HashMap<String, String> configs = new HashMap<>();
for (PropertySource<?> propertySource : env.getPropertySources()) {
if (propertySource instanceof EnumerablePropertySource) {
for (String name : ((EnumerablePropertySource) propertySource).getPropertyNames()) {
if (name != null && name.endsWith(SUFFIX)) {
configs.put(name, env.getProperty(name));
}
}
}
}
//第二步:组装Specification并绑定到spring上下文中去
List<LoadBalancerClientSpecification> configurations = new ArrayList<>();
for (String clientId : configs.keySet()) {
String id = clientId.substring(0, clientId.length() - SUFFIX.length());
try {
Class<?>[] classes = {Class.forName(configs.get(clientId))};
LoadBalancerClientSpecification specification = new LoadBalancerClientSpecification();
specification.setName(id);
specification.setConfiguration(classes);
configurations.add(specification);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
loadBalancerClientFactory.setConfigurations(configurations);
}
}
上面代码我做一下简单的思路描述:我们可以把上面代码分为两部分:
1.第一部分我是自定义了一个配置规则:serviceId..loadbalancer.LoadBalancer-configuration-class-name=xxxxx (全路径);比如:DEMO01..loadbalancer.LoadBalancer-configuration-class-name=com.primeton.gateway.core.lb.GatewayRoundLoadBalancerConfiguration(这个配置类看2.1.2章节),这就表示客户端应用DEMO01的负载均衡策略就是我在2.1.2自定义的轮询策略,所有需要路由到DEMO01的请求都要走我们2.1.1里面的逻辑进行choose()筛选出最终要路由的实例,这个配置可以放在配置文件,也可以放在其他地方;
2.第二部分就是针对所有的客户端负载策略配置进行组装spring loadBalancer需要的LoadBalancerClientSpecification,然后最终模拟源码里面的绑定动作将我们自己组装好的数据设置到NamedContextFactory上下文中去。
到此我们自定义客户端负载均衡策略方案就实现了,但是还差最后一步:怎么动态更新呢,比如现在DEMO01我们配置的是我们写的GatewayRoundLoadBalancerConfiguration,我们想要将它换成GatewayRandomLoadBalancerConfiguration 随机策略,由于时间问题,我这里就给大家出个方案,具体实现我就不写了,有时间再给大家写:
方案1:结合配置中心:nacos、Apollo等做配置热更新,监听nacos、Apollo配置,当配置有变化时,nacos、Apollo服务端都会发送通知,你只需要在代码里创建一个listener,然后针对我们上面定义的key,然后把上面代码的步骤再走一遍就行了。
方案2:将配置更新做成接口化,gateway写个controller专门用来管理配置,然后配置有变化通过调用gateway对应接口来通知gateway进行更新,最后再走一遍上面代码即可
方案3:结合redis,配置数据存在redis里面,利用redis的键空间监听通知监听这个配置,当配置有改动的时候redis会发出通知,我们在gateway里面监听好,然后进行上面的代码即可
当然还有很多其他方案,核心实现已经分享给大家了,剩下的就看诸位结合自身业务采取什么样的配置方案了。