前言:
在开发 Spring Cloud 微服务项目时候,Feign 调用是非常常见的,Feign 调用的底层还是 HTTP 的远程调用,会有超时问题,如果没有搞清楚超时问题,生产环境的调用肯那个会有种种问题出现,本篇我们来分享一下 Feign 调用的超时配置。
连接超时和读取超时配置
- ConnectTimeout(连接超时):Feign 是基于 HTTP 的远程调用,众所周知,HTTP 调用会进行 TCP 的三次握手,连接超时时间,就是多少秒没连接上,就会抛出超时异常,Feign 连接超时默认 10 秒。
- ReadTimeout(读取超时):HTTP 成功连接后,客户端发会送请求报文,服务端收到后解析并返回响应报文,在写出响应报文时,如果超过了设置的时间还没写完,就会抛出读取超时异常,在某些接口请求数据量大的时候,就很容易出现读取超时,Feign 读取超时默认 60 秒。
超时演示
我们在被 Feign 调用的接口中让线程 sleep 61 秒,调用者服务就抛出超时异常了。
Feign 默认超时时间源码
通过源码 debugger 我们可以知道 Feign 的默认读取超时时间是 60 秒,默认连接超时时间是 10 秒。
在这里插入代码片
自定义 Feign 超时时间
Feign 支持在 ribbon 或者 feign 配置项下配置超时时间,feign 下配置优先级最高,但是新版 Spring Cloud 已经移除了 ribbon,因此建议配置在 feign中,如下配置:
#默认连接超时时间
feign.client.config.default.connect-timeout=5000
#默认读取超时时间
feign.client.config.default.read-timeout=3000
debugger 如下图,Feign 超时配置已经生效,default 表示作用于所有客户端,也可替换 default 为具体的客户端名称,表示作用于单个客户端,可以给每个客户端配置不同的超时时间。
给指定的服务端配置超时时间如下:
#默认连接超时时间
feign.client.config.order-service.connect-timeout=4000
#默认读取超时时间
feign.client.config.order-service.read-timeout=4000
debugger 验证如下:
Feign 源码分析
Feign 参数的初始化
Feign 通过接口生成代理对象,扫描到 Feign 接口构建代理对象,在 Feign#builder 创建构建者时,Feign 客户端相关的参数都是在这个时候初始化的,超时时间也是在这个时候初始化的,Feign#builder 会创建一个 Options 对象,源码如下:
//feign.Feign.Builder#Builder
public Builder() {
//日志级别
this.logLevel = Level.NONE;
this.contract = new Default();
//客户端
this.client = new feign.Client.Default((SSLSocketFactory)null, (HostnameVerifier)null);
//重试器
this.retryer = new feign.Retryer.Default();
//日志
this.logger = new NoOpLogger();
//编码器
this.encoder = new feign.codec.Encoder.Default();
//解码器
this.decoder = new feign.codec.Decoder.Default();
this.queryMapEncoder = new FieldQueryMapEncoder();
//错误解码器
this.errorDecoder = new feign.codec.ErrorDecoder.Default();
//超时配置
this.options = new Options();
//处理器工厂
this.invocationHandlerFactory = new feign.InvocationHandlerFactory.Default();
this.closeAfterDecode = true;
//传播策略
this.propagationPolicy = ExceptionPropagationPolicy.NONE;
//是否强制解码
this.forceDecoding = false;
this.capabilities = new ArrayList();
}
前面我们说 Feign 的默认连接超时时间是 10 秒,默认读取超时时间是 60 秒,我们来从源码证明,Options 构造方法源码如下:
//feign.Request.Options#Options()
public Options() {
this(10L, TimeUnit.SECONDS, 60L, TimeUnit.SECONDS, true);
}
//feign.Request.Options#Options(long, java.util.concurrent.TimeUnit, long, java.util.concurrent.TimeUnit, boolean)
public Options(long connectTimeout, TimeUnit connectTimeoutUnit, long readTimeout, TimeUnit readTimeoutUnit, boolean followRedirects) {
this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
this.readTimeout = readTimeout;
this.readTimeoutUnit = readTimeoutUnit;
this.followRedirects = followRedirects;
}
从 Options 构造方法源码中可以证明 Feign 的默认超时时间。
SentinelFeign.Builder#build 方法源码分析
SentinelFeign.Builder#build 方法主要逻辑如下:
- 获取应用程序上下文。
- 通过应用程序上下文获取到 target 对象的 BeanDefinition 。
- 通过 BeanDefinition 获取到 FeignClientFactoryBean 。
- 获取 FallBack 类和工厂。
- 创建代理对象的处理器 SentinelInvocationHandler(项目中引入了 Sentinel)。
//com.alibaba.cloud.sentinel.feign.SentinelFeign.Builder#build
public Feign build() {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
//target 就是我们定义的 feign 接口 例如 OrderFeign
//dispatch 就是我们的接口方法 例如 com.user.service.feign.OrderFeign.queryOrder()
//通用应用程序上下文
GenericApplicationContext gctx = (GenericApplicationContext)Builder.this.applicationContext;
//target 对象的 beanDefinition 对象
BeanDefinition def = gctx.getBeanDefinition(target.type().getName());
//FeignClient bean 工厂
FeignClientFactoryBean feignClientFactoryBean = (FeignClientFactoryBean)def.getAttribute("feignClientsRegistrarFactoryBean");
//Feign 接口中配置的的 fallback
Class fallback = feignClientFactoryBean.getFallback();
//fallback 工厂
Class fallbackFactory = feignClientFactoryBean.getFallbackFactory();
//目标服务id order-service
String beanName = feignClientFactoryBean.getContextId();
if (!StringUtils.hasText(beanName)) {
beanName = feignClientFactoryBean.getName();
}
//创建 代理对象处理器 SentinelInvocationHandler
if (Void.TYPE != fallback) {
//获取 fallback 实例
Object fallbackInstance = this.getFromContext(beanName, "fallback", fallback, target.type());
return new SentinelInvocationHandler(target, dispatch, new org.springframework.cloud.openfeign.FallbackFactory.Default(fallbackInstance));
} else if (Void.TYPE != fallbackFactory) {
FallbackFactory fallbackFactoryInstance = (FallbackFactory)this.getFromContext(beanName, "fallbackFactory", fallbackFactory, FallbackFactory.class);
return new SentinelInvocationHandler(target, dispatch, fallbackFactoryInstance);
} else {
return new SentinelInvocationHandler(target, dispatch);
}
}
//fallback 的处理
private Object getFromContext(String name, String type, Class fallbackType, Class targetType) {
Object fallbackInstance = Builder.this.feignContext.getInstance(name, fallbackType);
if (fallbackInstance == null) {
throw new IllegalStateException(String.format("No %s instance of type %s found for feign client %s", type, fallbackType, name));
} else if (!targetType.isAssignableFrom(fallbackType)) {
throw new IllegalStateException(String.format("Incompatible %s instance. Fallback/fallbackFactory of type %s is not assignable to %s for feign client %s", type, fallbackType, targetType, name));
} else {
return fallbackInstance;
}
}
});
super.contract(new SentinelContractHolder(this.contract));
return super.build();
}
源码 Debugger 如下:
fallback 就是我们接口中配置的 @FeignClient(value = “order-service”,fallback = OrderFeignFallback.class) ,源码 Debugger 如下:
SentinelInvocationHandler#invoke 方法源码分析
我们在分析 Feign 初始化的过程最后创建一个 SentinelInvocationHandler,Feign 的实现是依赖代理的,因此真正执行 Feign 请求的是 SentinelInvocationHandler#invoke 方法,SentinelInvocationHandler#invoke 主要逻辑如下:
- 首先对方法名称进行判断,看是否是 equals、hashCode、toString 这些类型的方法。
- 对代理类对象型进行判断,看代理对象是否是 HardCodedTarget 类型的,是否是 HardCodedTarget 类型的代理对象的区别是否执行 Sentinel 的资源监控,最终都会调用 SynchronousMethodHandler#invoke 方法。
//com.alibaba.cloud.sentinel.feign.SentinelInvocationHandler#invoke
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
//proxy 代理对象 就是我们 Feign 接口的代理对象
//method 就是我们的 Feign 接口的方法
//args Feign 接口参数
//是否是 equals 方法
if ("equals".equals(method.getName())) {
try {
Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return this.equals(otherHandler);
} catch (IllegalArgumentException var21) {
return false;
}
//是否是 hashCode 方法
} else if ("hashCode".equals(method.getName())) {
return this.hashCode();
// 是否是 toString 方法
} else if ("toString".equals(method.getName())) {
return this.toString();
} else {
//都不是 一般我们定义的 feign 接口方法都会走到这里
//MethodHandler 方法处理器
MethodHandler methodHandler = (MethodHandler)this.dispatch.get(method);
Object result;
//target 类型判断
if (!(this.target instanceof HardCodedTarget)) {
//调用方法
result = methodHandler.invoke(args);
} else {
//硬编码target
HardCodedTarget hardCodedTarget = (HardCodedTarget)this.target;
//方法元数据
MethodMetadata methodMetadata = (MethodMetadata)SentinelContractHolder.METADATA_MAP.get(hardCodedTarget.type().getName() + Feign.configKey(hardCodedTarget.type(), method));
//方法元数据为空判断
if (methodMetadata == null) {
result = methodHandler.invoke(args);
} else {
//资源名称 GET:http://order-service/query-order sentinel 资源监控使用
String resourceName = methodMetadata.template().method().toUpperCase() + ":" + hardCodedTarget.url() + methodMetadata.template().path();
Entry entry = null;
Object var12;
try {
Throwable ex;
try {
//进入 sentinel 的逻辑
ContextUtil.enter(resourceName);
entry = SphU.entry(resourceName, EntryType.OUT, 1, args);
//调用方法
result = methodHandler.invoke(args);
return result;
} catch (Throwable var22) {
ex = var22;
if (!BlockException.isBlockException(var22)) {
Tracer.trace(var22);
}
}
if (this.fallbackFactory == null) {
throw var22;
}
try {
//fallback 逻辑
Object fallbackResult = ((Method)this.fallbackMethodMap.get(method)).invoke(this.fallbackFactory.create(ex), args);
var12 = fallbackResult;
} catch (IllegalAccessException var19) {
throw new AssertionError(var19);
} catch (InvocationTargetException var20) {
throw new AssertionError(var20.getCause());
}
} finally {
if (entry != null) {
entry.exit(1, args);
}
ContextUtil.exit();
}
return var12;
}
}
return result;
}
}
SynchronousMethodHandler#invoke 方法源码分析
SynchronousMethodHandler#invoke 方法的逻辑十分简单,构建请求的 RequestTemplate 对象,包括封装参数、路径 、请求方式等信息,超时时间对象、重试对象的获取,然后执行 SynchronousMethodHandler#executeAndDecode 方法,或者执行重试。
//feign.SynchronousMethodHandler#invoke
public Object invoke(Object[] argv) throws Throwable {
//构建方法请求的 RequestTemplate 封装参数 路径 请求方式等信息
RequestTemplate template = this.buildTemplateFromArgs.create(argv);
//feign 超时时间
Options options = this.findOptions(argv);
//重试配置
Retryer retryer = this.retryer.clone();
while(true) {
try {
//执行请求
return this.executeAndDecode(template, options);
} catch (RetryableException var9) {
RetryableException e = var9;
try {
//重试
retryer.continueOrPropagate(e);
} catch (RetryableException var8) {
Throwable cause = var8.getCause();
if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) {
throw cause;
}
throw var8;
}
if (this.logLevel != Level.NONE) {
this.logger.logRetry(this.metadata.configKey(), this.logLevel);
}
}
}
}
SynchronousMethodHandler#executeAndDecode 方法源码分析
SynchronousMethodHandler#executeAndDecode 方法的主要作用是从 RequestTemplate 获取 Request 对象,调用 FeignBlockingLoadBalancerClient#execute 方法发起请求调用。
//feign.SynchronousMethodHandler#executeAndDecode
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
//从 RequestTemplate 中获取 request 信息
Request request = this.targetRequest(template);
if (this.logLevel != Level.NONE) {
this.logger.logRequest(this.metadata.configKey(), this.logLevel, request);
}
long start = System.nanoTime();
//response
Response response;
try {
//执行请求
response = this.client.execute(request, options);
//获取响应
response = response.toBuilder().request(request).requestTemplate(template).build();
} catch (IOException var13) {
if (this.logLevel != Level.NONE) {
this.logger.logIOException(this.metadata.configKey(), this.logLevel, var13, this.elapsedTime(start));
}
throw FeignException.errorExecuting(request, var13);
}
//请求耗时
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
//解码器
if (this.decoder != null) {
//解码
return this.decoder.decode(response, this.metadata.returnType());
} else {
//解码器为空的逻辑
CompletableFuture<Object> resultFuture = new CompletableFuture();
this.asyncResponseHandler.handleResponse(resultFuture, this.metadata.configKey(), response, this.metadata.returnType(), elapsedTime);
try {
if (!resultFuture.isDone()) {
throw new IllegalStateException("Response handling not done");
} else {
return resultFuture.join();
}
} catch (CompletionException var12) {
Throwable cause = var12.getCause();
if (cause != null) {
throw cause;
} else {
throw var12;
}
}
}
}
FeignBlockingLoadBalancerClient#execute 方法源码分析
FeignBlockingLoadBalancerClient#execute 方法是核心方法,其中包含负载均衡的逻辑,具体逻辑如下:
- 获取原始请求地址和服务实例id。
- 灰度处理。
- 使用负载均衡获取服务实例(在 LoadBalancer 篇章中有分析过)。
- 服务实例为空判断,服务实例为空返回服务不可用,否则调用 LoadBalancerUtils#executeWithLoadBalancerLifecycleProcessing 方法发起 HTTP 调用。
//org.springframework.cloud.openfeign.loadbalancer.FeignBlockingLoadBalancerClient#execute
public Response execute(Request request, Options options) throws IOException {
//原始请求地址 例如:http://order-service/query-order
URI originalUri = URI.create(request.url());
//服务实例id order-service
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
//灰度处理
String hint = this.getHint(serviceId);
//lbRequest
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest(new RequestDataContext(LoadBalancerUtils.buildRequestData(request), hint));
//生命周期处理器
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(this.loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class);
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStart(lbRequest);
});
//使用负载均衡获取服务实例
ServiceInstance instance = this.loadBalancerClient.choose(serviceId, lbRequest);
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance);
String message;
//服务实例为空判断
if (instance == null) {
//服务实例为空
message = "Load balancer does not contain an instance for the service " + serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, lbResponse));
});
return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build();
} else {
//服务实例不为空
message = this.loadBalancerClient.reconstructURI(instance, originalUri).toString();
//创建新的 request 目标服务的请求地址 例如: http://192.168.123.132:8086/query-order
Request newRequest = this.buildRequest(request, message);
return LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(this.delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors);
}
}
LoadBalancerUtils#executeWithLoadBalancerLifecycleProcessing 方法源码分析
LoadBalancerUtils#executeWithLoadBalancerLifecycleProcessing 方法就是使用 feignClient 对象发起 HTTP 请求调用了,再往下就是HTTP 调用的逻辑了。
//org.springframework.cloud.openfeign.loadbalancer.LoadBalancerUtils#executeWithLoadBalancerLifecycleProcessing(feign.Client, feign.Request.Options, feign.Request, org.springframework.cloud.client.loadbalancer.Request, org.springframework.cloud.client.loadbalancer.Response<org.springframework.cloud.client.ServiceInstance>, java.util.Set<org.springframework.cloud.client.loadbalancer.LoadBalancerLifecycle>, boolean)
static Response executeWithLoadBalancerLifecycleProcessing(Client feignClient, Options options, Request feignRequest, org.springframework.cloud.client.loadbalancer.Request lbRequest, org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse, Set<LoadBalancerLifecycle> supportedLifecycleProcessors, boolean loadBalanced) throws IOException {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStartRequest(lbRequest, lbResponse);
});
try {
//使用 feignClient 发出请求 往下就是 http 底层调用逻辑了
Response response = feignClient.execute(feignRequest, options);
if (loadBalanced) {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.SUCCESS, lbRequest, lbResponse, buildResponseData(response)));
});
}
return response;
} catch (Exception var8) {
if (loadBalanced) {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.FAILED, var8, lbRequest, lbResponse));
});
}
throw var8;
}
}
如有不正确的地方请各位指出纠正。