文章目录
- @EnableFeignClients
- registerDefaultConfiguration
- registerFeignClients
- 类路径扫描
- 注册 @FeignClient
- registerFeignClient
- 总结
- FeignClient 接口实例化
- FeignContext
- createContext
- this.configurations
- Client
- FeignBuilder
- loadBalance 负载均衡实现
- Targeter
- 创建接口代理对象 Feign#newInstance
- 接口调用过程
- 代理执行
- InvocationHandlerFactory.MethodHandler
- SynchronousMethodHandler
- executeAndDecode
- 创建请求
- 负载均衡
- 请求URL
- 选择实例调用
- ReactiveLoadBalancer
- RoundRobinLoadBalancer
- ServiceInstanceListSupplier
- ServiceInstance
本文源码基于 spring-cloud-starter-openfeign-3.1.8
@EnableFeignClients
点开@EnableFeignClients,可以看到@EnableFeignClients导入了一个FeignClientsRegistrar类:@Import(FeignClientsRegistrar.class)
,这个类继承了ImportBeanDefinitionRegistrar接口。关于@Import的原理可以看这篇文章:
https://blog.csdn.net/qq_40926260/article/details/142646615?spm=1001.2014.3001.5502
class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
// 注册 FeignClientSpecification.class
registerDefaultConfiguration(metadata, registry);
//
registerFeignClients(metadata, registry);
}
}
registerDefaultConfiguration
registerDefaultConfiguration方法的作用在于向容器中注册FeignClientSpecification
这个BeanDefinition
// metadata:@EnableFeignClients 加在哪里,metadata 就是哪个类的注解元数据
// registry: Bean 注册中心,即 Spring 容器
private void registerDefaultConfiguration(AnnotationMetadata metadata, BeanDefinitionRegistry c) {
Map<String, Object> defaultAttrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName(), true);
if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
String name;
if (metadata.hasEnclosingClass()) {
name = "default." + metadata.getEnclosingClassName();
} else {
name = "default." + metadata.getClassName();
}
registerClientConfiguration(registry, name, defaultAttrs.get("defaultConfiguration"));
}
}
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(name + "." + FeignClientSpecification.class.getSimpleName(),
builder.getBeanDefinition());
}
defaultConfiguration这个配置的作用在于FeignClientSpecification这个类有一个带参构造,会使用到这个参数
registerFeignClients
registerFeignClients方法意为注册所有的FeignClient
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");
if (clients == null || clients.length == 0) {
// 扫描类路径下的 @FeignClient
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
// 获取所有要扫描的包名
Set<String> basePackages = getBasePackages(metadata);
for (String basePackage : basePackages) {
candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
}
} else {
// 禁用类路径下的 @FeignClient 扫描
for (Class<?> clazz : clients) {
candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz));
}
}
// 得到所有的@FeignClient注解类型的Bean定义
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");
// 只有接口才进行注册
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(FeignClient.class.getCanonicalName());
String name = getClientName(attributes);
registerClientConfiguration(registry, name, attributes.get("configuration"));
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
类路径扫描
@EnableFeignClients注解的clients属性如果不为空,那么会进行类路径扫描,使用配置的FeignClient类型
类路径扫描用到了@EnableFeignClients的 3 个属性
- value:包名配置
- basePackages:包名配置
- basePackageClasses:使用指定Class的包名
如果 @EnableFeignClients 上这3种方式都没配置,才使用@EnableFeignClients加在的那个类的包名作为包扫描路径
protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
Map<String, Object> attributes = importingClassMetadata
.getAnnotationAttributes(EnableFeignClients.class.getCanonicalName());
Set<String> basePackages = new HashSet<>();
for (String pkg : (String[]) attributes.get("value")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
for (String pkg : (String[]) attributes.get("basePackages")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
for (Class<?> clazz : (Class[]) attributes.get("basePackageClasses")) {
basePackages.add(ClassUtils.getPackageName(clazz));
}
// 如果 @EnableFeignClients 上一个包名都没配置,才使用@EnableFeignClients加在的那个类的包名
if (basePackages.isEmpty()) {
basePackages.add(ClassUtils.getPackageName(importingClassMetadata.getClassName()));
}
return basePackages;
}
注册 @FeignClient
客户端名称的优先级,contextId -> name -> serviceId,如果都没有则报错
private String getClientName(Map<String, Object> client) {
if (client == null) {
return null;
}
String value = (String) client.get("contextId");
if (!StringUtils.hasText(value)) {
value = (String) client.get("value");
}
if (!StringUtils.hasText(value)) {
value = (String) client.get("name");
}
if (!StringUtils.hasText(value)) {
value = (String) client.get("serviceId");
}
if (StringUtils.hasText(value)) {
return value;
}
throw new IllegalStateException(
"Either 'name' or 'value' must be provided in @" + FeignClient.class.getSimpleName());
}
这个客户端名称是FeignClientSpecification所需的另外一个参数
注册@EnableFeignClients
和@FeignClient
都是注册的FeignClientSpecification这个类型,只是 Bean 的名称不同,形式都是:${name}.FeignClientSpecification
registerFeignClient
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
Class clazz = ClassUtils.resolveClassName(className, null);
ConfigurableBeanFactory beanFactory = registry instanceof ConfigurableBeanFactory
? (ConfigurableBeanFactory) registry : null;
// 获取 contextId ,支持表达式计算
String contextId = getContextId(beanFactory, attributes);
// serviceId -> name -> value,支持表达式计算
String name = getName(attributes);
FeignClientFactoryBean factoryBean = new FeignClientFactoryBean();
factoryBean.setBeanFactory(beanFactory);
factoryBean.setName(name);
factoryBean.setContextId(contextId);
factoryBean.setType(clazz);
factoryBean.setRefreshableClient(isClientRefreshEnabled());
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {
factoryBean.setUrl(getUrl(beanFactory, attributes));
factoryBean.setPath(getPath(beanFactory, attributes));
factoryBean.setDecode404(Boolean.parseBoolean(String.valueOf(attributes.get("decode404"))));
Object fallback = attributes.get("fallback");
if (fallback != null) {
factoryBean.setFallback(fallback instanceof Class ? (Class<?>) fallback
: ClassUtils.resolveClassName(fallback.toString(), null));
}
Object fallbackFactory = attributes.get("fallbackFactory");
if (fallbackFactory != null) {
factoryBean.setFallbackFactory(fallbackFactory instanceof Class ? (Class<?>) fallbackFactory
: ClassUtils.resolveClassName(fallbackFactory.toString(), null));
}
return factoryBean.getObject();
});
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
definition.setLazyInit(true);
validate(attributes);
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);
beanDefinition.setAttribute("feignClientsRegistrarFactoryBean", factoryBean);
// has a default, won't be null
boolean primary = (Boolean) attributes.get("primary");
beanDefinition.setPrimary(primary);
String[] qualifiers = getQualifiers(attributes);
if (ObjectUtils.isEmpty(qualifiers)) {
qualifiers = new String[] { contextId + "FeignClient" };
}
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers);
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
registerOptionsBeanDefinition(registry, contextId);
}
总结
通过上面的分析,@EnableFeignClients 的所有配置就清楚了
- value,basePackages,basePackageClasses用于指定包扫描路径
- defaultConfiguration 用于作为 FeignClientSpecification 这个Bean的参数
- clients:用于手动指定@FeignClient实例,不为空则不进行@FeignClient类扫描
启动过程中会注册两类 Bean 定义信息(注意是Bean定义)
- 注册@EnableFeignClients类:假如@EnableFeignClients加在类a.b.Main上,则会注册一个名称为
default.a.b.Main.FeignClientSpecification
的类,类型为FeignClientSpecification - 注册所有的@FeignClient接口,名称为
${clientName}.FeignClientSpecification
的类,类型为FeignClientSpecification。这个clientName根据@FeignClient属性值确认:优先级从高到低依次为contextId -> name -> serviceId - 为所有contextId值注册一个OptionsFactoryBean类型的Bean
启动过程中会将所有@FeignClient实例以FactoryBean的形式注册入容器:FeignClientFactoryBean,按类型注入,并且设置了lazyInit,所以不会立刻进行初始化
FeignClient 接口实例化
前面已经说过,都是以 FactoryBean 形式注册的,所以要看其 getObject 方法
@Override
public Object getObject() {
return getTarget();
}
注册的是接口类型,所以实例化的过程中应该使用了的代理,类似 MyBatis 一样。
<T> T getTarget() {
FeignContext context = beanFactory != null ? beanFactory.getBean(FeignContext.class)
: applicationContext.getBean(FeignContext.class);
// 通过原生Feign的API来创建请求客户端
Feign.Builder builder = feign(context);
// 未指定 url ,则开启负载均衡
if (!StringUtils.hasText(url)) {
// ...
if (!name.startsWith("http")) { url = "http://" + name; } else {
url = name;
}
url += cleanPath();
return (T) loadBalance(builder, context, new HardCodedTarget<>(type, name, url));
}
if (StringUtils.hasText(url) && !url.startsWith("http")) {
url = "http://" + url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof FeignBlockingLoadBalancerClient) {
// not load balancing because we have a url,
// but Spring Cloud LoadBalancer is on the classpath, so unwrap
client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
}
if (client instanceof RetryableFeignBlockingLoadBalancerClient) {
// not load balancing because we have a url,
// but Spring Cloud LoadBalancer is on the classpath, so unwrap
client = ((RetryableFeignBlockingLoadBalancerClient) client).getDelegate();
}
builder.client(client);
}
applyBuildCustomizers(context, builder);
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(type, name, url));
}
FeignContext
FeignContext 由 OpenFeign 的AutoConfiguration 进行注册
public class FeignContext extends NamedContextFactory<FeignClientSpecification> {
// name: contextId
// type: Client.class
public <T> T getInstance(String name, Class<T> type) {
// 会创建 AnnotationConfigApplicationContext
AnnotationConfigApplicationContext context = getContext(name);
try {
return context.getBean(type);
} catch (NoSuchBeanDefinitionException e) {
// ignore
}
return null;
}
// 懒加载
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
}
createContext
针对每个contextId都会创建一个上下文 ApplicationContext
protected AnnotationConfigApplicationContext createContext(String name) {
// 1. 创建AnnotationConfigApplicationContext对象
AnnotationConfigApplicationContext context;
if (this.parent != null) {
DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();
if (parent instanceof ConfigurableApplicationContext) {
beanFactory.setBeanClassLoader(
((ConfigurableApplicationContext) parent).getBeanFactory().getBeanClassLoader());
} else {
beanFactory.setBeanClassLoader(parent.getClassLoader());
}
context = new AnnotationConfigApplicationContext(beanFactory);
context.setClassLoader(this.parent.getClassLoader());
} else {
context = new AnnotationConfigApplicationContext();
}
// 2.注册配置类
// @EnableFeignClients的defaultConfiguration属性
// @FeignClient的configuration属性
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name).getConfiguration()) {
context.register(configuration);
}
}
// default开头的只有@EnableFeignClients的defaultConfiguration属性
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(this.propertySourceName,
Collections.<String, Object>singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
}
context.setDisplayName(generateDisplayName(name));
// 刷新上下文
context.refresh();
return context;
}
为什么要每个 contextId 都创建一个 ApplicationContext ?
个人认为这里主要是针对同一个@FeignClient分布在多个文件的场景,例如
@FeignClient(contextId = "payment", configuration = ConfigA.class)
interface ClientA {}
@FeignClient(contextId = "payment", configuration = ConfigB.class)
interface ClientB {}
这样ClientA,ClientB,ConfigA,ConfigB这 4 个就是在同一个 ApplicationContext 里面的
this.configurations
提一下这里使用到的this.configurations,前面提到过 @EnableFeignClients 启动过程中注册了两类 FeignClientSpecification ,就是这里起作用
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({ FeignClientProperties.class, FeignHttpClientProperties.class, FeignEncoderProperties.class })
public class FeignAutoConfiguration {
// ...
@Autowired(required = false)
private List<FeignClientSpecification> configurations
= new ArrayList<>();
@Bean
public FeignContext feignContext() {
FeignContext context = new FeignContext();
context.setConfigurations(this.configurations);
return context;
}
}
这里看源码的时候我有个疑问,这个初始化顺序是如何保证的?
feignContext 这个方法调用一定是要在 configurations 值已经注入完成之后,同时 configurations 注入是要保证 FeignAutoConfiguration 这个 Bean 的 Autowire 操作在
FeignClientSpecification实例放入容器之后。也就是说这 3 个操作有时间上的先后关系,才能保证正确工作
前面提到过 FeignClientSpecification 是由 ImportBeanDefinitionRegistrar 放入 BeanDefinition 的,ImportBeanDefinitionRegistrar 和 Autowire 都是在Bean后置处理器中进行处理的,同时 @Configuration 配置类是由 ConfigurationClassPostProcessor 进行处理
个人认为这个过程应该是下面这样的
- 由于 @EnableFeignClients 加在启动类上,最先进行处理,所以FeignClientSpecification的BeanDefinition肯定是最先就注册好了的
- 碰到FeignAutoConfiguration时,会先走创建Bean的流程,然后在ConfigurationClassPostProcessor发现它是个配置类,于是注册了feignContext这个方法的BeanDefinition,但是此时没有调用这个方法
- 然后在Autowire的后置处理器中发现需要自动注入FeignClientSpecification实例,所以这时候应该是会去实例化这些FeignClientSpecification对象的
- 最后才是在FeignClientFactoryBean的getTarget方法里通过
BeanFactory.getBean
来调用feignContext方法创建FeignContext对象
所以这个执行顺序是可以保证的
Client
再回顾一下负载均衡创建FeignClient的代码,目的是通过 FeignContext 创建的ApplicationContext 获取 Client.class 这个类型的Bean,然后使用它再通过原生Feign的API创建请求代理对象
protected <T> T loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget<T> target) {
Client client = getOptional(context, Client.class);
if (client != null) {
builder.client(client);
applyBuildCustomizers(context, builder);
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);
}
throw new IllegalStateException(
"No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-loadbalancer?");
}
Client.class 这个是Feign的原生接口
FeignContext 的上下文并未注册 Client.class 这个类型,那它是怎么获取的呢?
原因在于它其实有个 parent 上下文,就是应用启动的上下文
FeignContext 的上下文获取的其实是当前 Spring 应用的上下文,所以就要求当前模块中有注册 Client.class 这个 Bean。那这个 Bean 从哪儿来呢?
这也是为什么 spring-cloud-starter-openfeign-3.1.8 版本不引入 spring-cloud-starter-loadbalancer 这个依赖会报错的原因
FeignBuilder
protected Feign.Builder feign(FeignContext context) {
FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class);
Logger logger = loggerFactory.create(type);
// @formatter:off
Feign.Builder builder = get(context, Feign.Builder.class)
// required values
.logger(logger)
.encoder(get(context, Encoder.class))
.decoder(get(context, Decoder.class))
.contract(get(context, Contract.class));
// @formatter:on
configureFeign(context, builder);
return builder;
}
loadBalance 负载均衡实现
下面看负载均衡的这个实现
protected <T> T loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget<T> target) {
// 实际调用 context.getInstance(contextId, type);
// contextId 就是FeignClientFactoryBean自身设置的contextId属性
Client client = getOptional(context, Client.class);
if (client != null) {
builder.client(client);
applyBuildCustomizers(context, builder);
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);
}
throw new IllegalStateException(
"No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-loadbalancer?");
}
Targeter
org.springframework.cloud.openfeign.Targeter
默认实现调用 Feign.Builder#target 方法
class DefaultTargeter implements Targeter {
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
return feign.target(target);
}
}
feign.Feign.Builder 的实现
public <T> T target(Target<T> target) {
return build().newInstance(target);
}
public Feign build() {
super.enrich();
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors,
responseInterceptor, logger, logLevel, dismiss404, closeAfterDecode,
propagationPolicy, forceDecoding);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}
另外一个是带有CircuitBreaker断路器特性的实现
org.springframework.cloud.openfeign.FeignCircuitBreakerTargeter
这里就先不展开了
创建接口代理对象 Feign#newInstance
创建接口代理是通过Feign#newInstance完成的,Feign是一个抽象类,包括两个实现:
- AsyncFeign:包括ReflectiveAsyncFeign
- ReflectiveFeign
AsyncFeign 是使用了响应式,暂时还处于实验阶段,所以实际一般使用的还是ReflectiveFeign 这个实现
@Experimental
public abstract class AsyncFeign<C> extends Feign
下面看 feign.ReflectiveFeign#newInstance 方法:
@Override
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if (Util.isDefault(method)) {
// default 方法处理
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);
// 绑定 default 方法
for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
接口调用过程
代理执行
static class FeignInvocationHandler implements InvocationHandler {
private final Target target;
private final Map<Method, MethodHandler> dispatch;
FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
this.target = checkNotNull(target, "target");
this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
return dispatch.get(method).invoke(args);
}
}
InvocationHandlerFactory.MethodHandler
interface MethodHandler {
Object invoke(Object[] argv) throws Throwable;
}
SynchronousMethodHandler
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
executeAndDecode
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (decoder != null) {
return responseInterceptor
.aroundDecode(new InvocationContext(decoder, metadata.returnType(), response));
}
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(), elapsedTime);
try {
if (!resultFuture.isDone())
throw new IllegalStateException("Response handling not done");
return resultFuture.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
}
}
创建请求
下面这部分其实是属于Feign原生API的源码了,简单说一下即可,不是重点
RequestTemplate 创建请求对象
Request request = targetRequest(template);
Request targetRequest(RequestTemplate template) {
for (RequestInterceptor interceptor : requestInterceptors) {
interceptor.apply(template);
}
return target.apply(template);
}
target 实例是feign.Target.HardCodedTarget
public Request request() {
if (!this.resolved) {
throw new IllegalStateException("template has not been resolved.");
}
// loadBalance(builder, context, new HardCodedTarget<>(type, name, url));
// HardCodedTarget 使用的 url 就是 FeignClientFactoryBean 的 url
return Request.create(this.method, this.url(), this.headers(), this.body, this);
}
为什么 Feign 第一次调用很慢 ?
对比上面的代码第一次调用和第二次调用好像也没什么区别
其实,问题出在 response = client.execute(request, options);
这句,在于 client (前面提到的 Client 接口)具体的实现,包含以下几类:
- feign.Client.Default:及其子类feign.Client.Proxied
- FeignBlockingLoadBalancerClient
- RetryableFeignBlockingLoadBalancerClient
FeignBlockingLoadBalancerClient和RetryableFeignBlockingLoadBalancerClient都是 spring-cloud-loadbalancer这个包下的,用于进行负载均衡
首先要了解Feign是如何进行远程调用的,这里面包括,注册中心、负载均衡、FeignClient之间的关系,微服务通过不论是eureka、nacos也好注册到服务端,然后FeignClient客户端在进行负载均衡调用,大概就是这么一个过程。所以为什么慢,其实就是负载均衡这块慢了
以前的版本是基于Ribbon做负载均衡的,https://juejin.cn/post/7249624466150408250,这里我只记录基于 spring-cloud-loadbalancer 做负载均衡
负载均衡
这里使用的负载均衡组件是 spring-cloud-load-balancer,不是Ribbon
https://docs.spring.io/spring-cloud-commons/reference/spring-cloud-commons/loadbalancer.html
具体使用哪个负载均衡实现根据 DefaultFeignLoadBalancerConfiguration 这个配置类来看
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(LoadBalancerClientsProperties.class)
class DefaultFeignLoadBalancerConfiguration {
@Bean
@ConditionalOnMissingBean
@Conditional(OnRetryNotEnabledCondition.class)
public Client feignClient(LoadBalancerClient loadBalancerClient,
LoadBalancerClientFactory loadBalancerClientFactory) {
return new FeignBlockingLoadBalancerClient(new Client.Default(null, null), loadBalancerClient,
loadBalancerClientFactory);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
@ConditionalOnBean(LoadBalancedRetryFactory.class)
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "true",
matchIfMissing = true)
public Client feignRetryClient(LoadBalancerClient loadBalancerClient,
LoadBalancedRetryFactory loadBalancedRetryFactory, LoadBalancerClientFactory loadBalancerClientFactory) {
return new RetryableFeignBlockingLoadBalancerClient(new Client.Default(null, null), loadBalancerClient,
loadBalancedRetryFactory, loadBalancerClientFactory);
}
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
final URI originalUri = URI.create(request.url());
// 即@FeignClient的name属性
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
String hint = getHint(serviceId);
// 将请求封装为负载均衡类型的请求
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
new RequestDataContext(buildRequestData(request), hint));
// 一个扩展点
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 选择一个实例进行调用
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
// 封装响应对象
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
instance);
if (instance == null) {
String message = "Load balancer does not contain an instance for the service " + serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
.body(message, StandardCharsets.UTF_8).build();
}
// 重新构建 url ,因为要进行请求转发
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
// 构建新请求
Request newRequest = buildRequest(request, reconstructedUrl);
LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());
}
请求URL
这里重点讲一下请求的 url ,如果是负载均衡方式调用,那么url就是 http://${name}/接口地址及参数
例如:如果@FeignClient的name属性写的是服务名称,则url为 http://payment-service/api/payment/create,如下所示
@FeignClient(contextId = "paymentClient1", name = "payment-service")
public interface FeignPaymentService {
@PostMapping("/api/payment/create")
Map<String, Object> createPayment(Map<String, Object> map);
}
选择实例调用
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
// 负载均衡器:即负载均衡策略
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
ReactiveLoadBalancer
org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer 接口定义了负载均衡算法
在spring-cloud-loadbalance里有两个实现
- RandomLoadBalancer:随机法
- RoundRobinLoadBalancer:轮询法
RoundRobinLoadBalancer
@Override
// see original
// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
public Mono<Response<ServiceInstance>> choose(Request request) {
// ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider
// 会通过注册中心获取所有注册的服务实例
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
轮询的过程如下:当前位置 pos + 1,然后对服务实例个数取余
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
return new EmptyResponse();
}
// Do not move position when there is only 1 instance,
// especially some suppliers have already filtered instances
if (instances.size() == 1) {
return new DefaultResponse(instances.get(0));
}
// Ignore the sign bit, this allows pos to loop sequentially from 0 to
// Integer.MAX_VALUE
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
ServiceInstanceListSupplier
针对使用了注册中心的场景,负载均衡的自动配置提供了两个ServiceInstanceListSupplier
类型的Bean
以上面的RoundRobinLoadBalancer为例,它是在LoadBalancerClientConfiguration进行注册
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
// loadBalancerClientFactory.getLazyProvider
public <T> ObjectProvider<T> getLazyProvider(String name, Class<T> type) {
return new ClientFactoryObjectProvider<>(this, name, type);
}
public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
implements ReactiveLoadBalancer.Factory<ServiceInstance> {
}
再回到RoundRobinLoadBalancer的choose方法里
public Mono<Response<ServiceInstance>> choose(Request request) {
// serviceInstanceListSupplierProvider 的类型是
// ClientFactoryObjectProvider
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
// ClientFactoryObjectProvider
class ClientFactoryObjectProvider<T> implements ObjectProvider<T> {
// LoadBalancerClientFactory
private final NamedContextFactory<?> clientFactory;
@Override
public T getIfAvailable(Supplier<T> defaultSupplier) throws BeansException {
return delegate().getIfAvailable(defaultSupplier);
}
private ObjectProvider<T> delegate() {
if (this.provider == null) {
this.provider = this.
clientFactory.getProvider(this.name, this.type);
}
return this.provider;
}
}
LoadBalancerClientFactory 继承了 NamedContextFactory<LoadBalancerClientSpecification>
,和前面碰到的 FeignContext 一样
这里注册的流程和前面的 FeignContext 一样,只不过配置类换成了上图中的这两个
CachingServiceInstanceListSupplier 的构造方法里会获取所有的注册实例,下次调用时就直接从内部的实例缓存中获取
public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
private static final Log log = LogFactory.getLog(CachingServiceInstanceListSupplier.class);
/**
* Name of the service cache instance.
*/
public static final String SERVICE_INSTANCE_CACHE_NAME = CachingServiceInstanceListSupplier.class.getSimpleName()
+ "Cache";
// 服务列表
private final Flux<List<ServiceInstance>> serviceInstances;
@SuppressWarnings("unchecked")
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
super(delegate);
this.serviceInstances = CacheFlux.lookup(key -> {
// TODO: configurable cache name
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
if (log.isErrorEnabled()) {
log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);
}
return Mono.empty();
}
List<ServiceInstance> list = cache.get(key, List.class);
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
}, delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
if (log.isErrorEnabled()) {
log.error("Unable to find cache for writing: " + SERVICE_INSTANCE_CACHE_NAME);
}
}
else {
cache.put(key, instances);
}
}).then());
}
@Override
public Flux<List<ServiceInstance>> get() {
return serviceInstances;
}
}
ServiceInstance
存储每个服务的信息