ribbon源码解析
自动装配
依赖
<!--添加ribbon的依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
spring-cloud-starter-netflix-ribbon
看到starter组件我们可以去依赖包的spring.factories文件下看看
RibbonAutoConfiguration
@AutoConfigurationBefore
表明当前配置类在某一个配置类加载前加载。
说明RibbonAutoConfiguration要在LoadBalancerAutoConfiguration之前加载
看下为我们注入的类
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
new SpringClientFactory()
public SpringClientFactory() {
super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
}
我们看下父类:NamedContextFactory
public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName,
String propertyName) {
this.defaultConfigType = defaultConfigType;
this.propertySourceName = propertySourceName;
this.propertyName = propertyName;
}
把RibbonClientConfiguration当作参数传入父类
我们看下RibbonClientConfiguration是怎么解析成bean定义的
RibbonApplicationContextInitializer实现了 ApplicationListener 所以
springboot启动的时候会去调用onApplicationEvent 方法
protected void initialize() {
if (clientNames != null) {
for (String clientName : clientNames) {
this.springClientFactory.getContext(clientName);
}
}
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
initialize();
}
我们看下getContext方法
org.springframework.cloud.context.named.NamedContextFactory#getContext
getContext方法调用了createContext方法
org.springframework.cloud.context.named.NamedContextFactory#createContext
这个defaultConfigType就是我们传入的RibbonClientConfiguration
我们看下register方法
public void register(Class<?>... annotatedClasses) {
Assert.notEmpty(annotatedClasses, "At least one annotated class must be specified");
this.reader.register(annotatedClasses);
}
org.springframework.context.annotation.AnnotatedBeanDefinitionReader#register方法
public void register(Class<?>... annotatedClasses) {
for (Class<?> annotatedClass : annotatedClasses) {
registerBean(annotatedClass);
}
}
看到这里不就是我们spring解析配置类的方法,就是把配置类解析成bean定义
为什么RibbonClientConfiguration这个类要说这么多,因为这个类是为我们注入了许多核心的类
LoadBalancerAutoConfiguration
可以看到LoadBalancerAutoConfiguration生效的条件是容器中必须有LoadBalancerClient类
这个类在RibbonAutoConfiguration时会我们注入
收集具有@LoadBalanced表示的RestTemplate
这一步时spring帮我们做的
我们可以看下@LoadBalance注解
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
可以看到@LoadBalance是一个复合注解,底层还是用到了@Qualifier注解,
所以才可以收集到RestTemplate集合
RestTemplate
RestTemplate继承InterceptingHttpAccessor所以具有获取和设置拦截器的能力
loadBalancedRestTemplateInitializer
loadBalancedRestTemplateInitializer() 初始化一些东西,里面有个内部类,拿到当前的RestTemplate 进行遍历,遍历customizers专门用来定制RestTemplate组件,用每个customizers来定制每个RestTemplate
在这个方法中会为所有被@LoadBalanced注解标识的 RestTemplate添加ribbon的自定义拦截器LoadBalancerInterceptor
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
在restTemplateCustomizer方法里面会为每个放进来的RestTemplate 定制一个ClientHttpRequestInterceptor 拦截器,实现拦截器的是他的子类LoadBalancerInterceptor
创建Ribbon自定义拦截器LoadBalancerInterceptor
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
添加拦截器具体方法。
首先获取当前拦截器集合(List),然后将loadBalancerInterceptor添加到当前集合中,最后将新的集合放回到restTemplate中。
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
LoadBalancerInterceptor
当用resttemplate调用某个api的时候会进入到intercept方法
org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor#intercept
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
//当前loadBalancer为RibbonLoadBalancerClient
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
发起请求时ribbon会用LoadBalancerInterceptor这个拦截器进行拦截。在该拦截器中会调用LoadBalancerClient.execute()方法
具体执行execute()由子类RibbonLoadBlancerClient 执行
org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#execute(j
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException {
return execute(serviceId, request, null);
}
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
ILoadBalancer
就是通过RibbonClientConfiguration配置类注入的
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
具体返回ZoneAwareLoadBalancer
ZoneAwareLoadBalancer
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
看下父类DynamicServerListLoadBalancer
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
DynamicServerListLoadBalancer的父类BaseLoadBalancer
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
}
看下initWithConfig方法
com.netflix.loadbalancer.BaseLoadBalancer#setPingInterval
public void setPingInterval(int pingIntervalSeconds) {
if (pingIntervalSeconds < 1) {
return;
}
this.pingIntervalSeconds = pingIntervalSeconds;
if (logger.isDebugEnabled()) {
logger.debug("LoadBalancer [{}]: pingIntervalSeconds set to {}",
name, this.pingIntervalSeconds);
}
setupPingTask(); // since ping data changed
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
//启动一个任务调度,每隔10秒 ping一次服务实例,检查状态,
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
这一步主要就是启动一个任务调度,每隔10秒 ping一次服务实例,检查状态
restOfInit
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
enableAndInitLearnNewServersFeature方法
enableAndInitLearnNewServersFeature():定时更新
在PollingServerListUpdater中,创建了一个Runnable线程,里面就是执行UpdateAction的行为,在延迟一定的时间过后,每隔一定的时间就执行一下那个Runnable线程,就会执行UpdateAction中的操作来刷新注册表,
默认的是1秒钟过后,会第一次执行那个Runnable线程,
//以后是每隔30秒执行一下那个Runnable线程,】t刷新注册表到自己的ribbon的LoadBalancer中来
看下doupdate方法
updateListOfServers方法
我们这里注册中心以nacos为例子
com.alibaba.cloud.nacos.ribbon.NacosServerList#getUpdatedListOfServers
public List<NacosServer> getUpdatedListOfServers() {
return this.getServers();
}
private List<NacosServer> getServers() {
try {
String group = this.discoveryProperties.getGroup();
List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(this.serviceId, group, true);
return this.instancesToServerList(instances);
} catch (Exception var3) {
throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + this.serviceId, var3);
}
}
看下selectInstances 方法
com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean)
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return this.selectInstances(serviceName, groupName, healthy, true);
}
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return this.selectInstances(serviceInfo, healthy);
}
重点来了getServiceInfo方法
public ServiceInfo getServiceInfo(String serviceName, String clusters) {
LogUtils.NAMING_LOGGER.debug("failover-mode: " + this.failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (this.failoverReactor.isFailoverSwitch()) {
return this.failoverReactor.getService(key);
} else {
//从缓存中获取实例信息
ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);
//如果不存在则去nacos服务端调用
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);
this.updatingMap.put(serviceName, new Object());
//nacos服务端调用
this.updateServiceNow(serviceName, clusters);
this.updatingMap.remove(serviceName);
} else if (this.updatingMap.containsKey(serviceName)) {
synchronized(serviceObj) {
try {
serviceObj.wait(5000L);
} catch (InterruptedException var8) {
LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, var8);
}
}
}
//异步更新内存注册列表
this.scheduleUpdateIfAbsent(serviceName, clusters);
return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());
}
}
到这里定时任务基本上完了
然后继续走下边的流程 回到最初拦截的地方
//通过负载均衡算法选取一个实例后续进行调用
org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#getServer(com.netflix.loadbalancer.ILoadBalancer, java.lang.Object)
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
com.netflix.loadbalancer.BaseLoadBalancer#chooseServer
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
//rule就是Irule
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
通过Irue进行服务实力的选择
我们看下Irule实现类NacosRule
com.alibaba.cloud.nacos.ribbon.NacosRule#choose
public Server choose(Object key) {
try {
String clusterName = this.nacosDiscoveryProperties.getClusterName();
DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer)this.getLoadBalancer();
String name = loadBalancer.getName();
NamingService namingService = this.nacosDiscoveryProperties.namingServiceInstance();
//这个和定时任务调用的一样
List<Instance> instances = namingService.selectInstances(name, true);
if (CollectionUtils.isEmpty(instances)) {
LOGGER.warn("no instance in service {}", name);
return null;
} else {
List<Instance> instancesToChoose = instances;
if (StringUtils.isNotBlank(clusterName)) {
List<Instance> sameClusterInstances = (List)instances.stream().filter((instancex) -> {
return Objects.equals(clusterName, instancex.getClusterName());
}).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sameClusterInstances)) {
instancesToChoose = sameClusterInstances;
} else {
LOGGER.warn("A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}", new Object[]{name, clusterName, instances});
}
}
//根据负载均衡策略选取一个
Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);
return new NacosServer(instance);
}
} catch (Exception var9) {
LOGGER.warn("NacosRule error", var9);
return null;
}
}
这个方法其实就是从本地环从中获取服务实例,如果获取到就进行返回
获取不到则向nacos服务端发起远程调用获取服务实例集合
最后通过负载均衡算法选取一个实例节点后续进行调用
最后在拦截器方法中进行ip和端口替换并发起远程http调用