文章目录
- 源码总流程图
- 说明
- GateWayAutoConfiguration
- DispatcherHandler
- getHandler()
- handleRequestWith()
- RouteToRequestUrlFilter
- ReactiveLoadBalancerClientFilter
- NettyRoutingFilter
- 补充知识
- 适配器模式
- 详细流程图
源码总流程图
在线总流程图
说明
Gateway的版本使用的是4.0.0
Gateway的实现是基于WebFlux 、Reactor 、Netty
Gateway微服务的yml配置如下
- Gateway的访问端口为8888
- id为order_route的路由 uri为lb://mall-order
- 为mall-order这个微服务定义了一个path路径的predicate断言;定义了三个filter
server:
port: 8888
spring:
application:
name: mall-gateway
#配置nacos注册中心地址
cloud:
nacos:
discovery:
server-addr: nacos.mall.com:8848
username: nacos
password: nacos
gateway:
#设置路由:路由id、路由到微服务的uri、断言
routes:
- id: user_route #路由ID,全局唯一
uri: lb://mall-user #lb 整合负载均衡器ribbon,loadbalancer
predicates:
- Path=/user/** # 断言,路径相匹配的进行路由
- id: order_route #路由ID,全局唯一
# 测试 http://localhost:8888/order/findOrderByUserId/1
uri: lb://mall-order #lb 整合负载均衡器loadbalancer
predicates:
- Path=/order/** # 断言,路径相匹配的进行路由
#配置过滤器工厂
filters:
- AddRequestHeader=X-Request-color, red #添加请求头
- AddRequestParameter=color, blue # 添加请求参数
- CheckAuth=hushang,男 #自定义过滤器工厂
Gateway的工作原理就是下面这一张图
在开始看Gateway的源码之前,我们回忆一下SpringMVC的实现原理:
- DispatchServlet#doDispatch作为Springmvc的入口
- HandlerMapper 路由匹配 —> 找到Handler
- 通过handler 找的适配的 HandlerAdapter
- HandlerAdapter#handle方法执行
而在Gateway的源码中:
- DispatcherHandler#handle作为入口
- HandlerMapping 路由匹配 --> 断言predicate匹配
RoutePredicateHandlerMapping#getHandlerInternal
,找到路由Route对象 - 返回FilteringWebHandler
- HandlerAdapter 适配器
SimpleHandlerAdapter#handle
处理WebHandler - 进入到
org.springframework.cloud.gateway.handler.FilteringWebHandler#handle
—> filterChain处理
Flux和Mono的概念
Reactor学习文档
Flux:
Mono:
GateWayAutoConfiguration
在GatewayAutoConfiguration自动配置类中,它配置了很多bean对象,常见的就比如:
// 保存我们配置文件中关于网关路由相关的所有配置
// GatewayProperties保存了List<RouteDefinition>
// 而RouteDefinition就是每一个路由对象,保存了id、uri、断言集合List<PredicateDefinition>、Filter集合List<FilterDefinition>
@Bean
public GatewayProperties gatewayProperties() {
return new GatewayProperties();
}
// Path路径匹配的断言工厂,断言相关的bean都是以RoutePredicateFactory结尾
@Bean
@ConditionalOnEnabledPredicate
public PathRoutePredicateFactory pathRoutePredicateFactory() {
return new PathRoutePredicateFactory();
}
// 添加请求头的Filter,一般都是以GatewayFilterFactory
@Bean
@ConditionalOnEnabledFilter
public AddRequestHeaderGatewayFilterFactory addRequestHeaderGatewayFilterFactory() {
return new AddRequestHeaderGatewayFilterFactory();
}
// 全局过滤器,把我们访问网关的url转换为路由中配置的uri
// http://localhost:8888/order/findOrderByUserId/1 ---> lb://mall-order/order/findOrderByUserId/1
@Bean
@ConditionalOnEnabledGlobalFilter
public RouteToRequestUrlFilter routeToRequestUrlFilter() {
return new RouteToRequestUrlFilter();
}
GateWayAutoConfiguration配置的主要bean
类名 | 说明 |
---|---|
GatewayProperties | gateway属性配置类 |
PropertiesRouteDefinitionLocator | 操作GatewayProperties对象,返回Flux<RouteDefinition> |
RouteDefinitionRouteLocator | 将RouteDefinition转换为Route |
RoutePredicateHandlerMapping | Gateway的HandlerMapping,匹配请求对应的Route,返回FilteringWebHandler |
XXXRoutePredicateFactory | 路由断言工厂的bean |
XXXGatewayFilterFactory | 局部Filter |
GlobalFilter实现类 | 全局Filter |
DispatcherHandler
DispatcherHandler#handle
作为我们查看Gateway源码的入口
- 请求request和响应response实例会被封装为ServerWebExchange
- 核心方法就是return语句
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
return Flux.fromIterable(this.handlerMappings) // 遍历所有的HandlerMapper
.concatMap(mapping -> mapping.getHandler(exchange)) // 调用每一个HandlerMapper,能否找到Handler
.next() // 继续遍历下一个HandlerMapper
.switchIfEmpty(createNotFoundError()) // 如果HandlerMapper遍历完后都没有Handler,那么要抛异常了
.onErrorResume(ex -> handleDispatchError(exchange, ex))
.flatMap(handler -> handleRequestWith(exchange, handler)); // 如果找到Handler,那就去通过HandlerAdapter去调用Handler
}
fromIterable()
方法的作用就是就是遍历Gateway所有的HandlerMapper,我们这里肯定最终是使用的RoutePredicateHandlerMapping
这个路由断言的
我们接下来继续往下,遍历各个HandlerMapper,并调用mapping.getHandler(exchange)
方法,这里最终会调用至RoutePredicateHandlerMapping
类的getHandlerInternal()
方法中,经过断言匹配后,返回一个FilteringWebHandler
对象。该方法接下来会详细介绍。
中间这几行其实主要就是如果我当前往Gateway的请求,通过路由断言没有匹配上,那么就会抛异常
.next()
.switchIfEmpty(createNotFoundError())
.onErrorResume(ex -> handleDispatchError(exchange, ex))
经过路由断言匹配,得到一个WebHandler对象之后,会执行handleRequestWith(exchange, handler)
方法,在该方法中会找一个与WebHandler匹配的HandlerAdapter来适配WebHandler对象,最终去调用WebHandler的
getHandler()
通过DispatcherHandler#handle
方法中的.concatMap(mapping -> mapping.getHandler(exchange))
这一行代码
进入到了AbstractHandlerMapping#getHandler
:
-
遍历我们yml配置文件中所有定义的路由
-
根据我们路由定义的断言Predicate规则去调用对应的断言工厂
-
将匹配成功的路由保存至exchange对象中
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ROUTE_ID_ATTR, routeId); -
断言匹配成功就返回一个WebHandler接口的实现类FilteringWebHandler对象
public Mono<Object> getHandler(ServerWebExchange exchange) {
// 从这里我们就可以发现,通过getHandlerInternal(exchange)方法就能找Handler,之后的.map()方法中就直接return handler;
return getHandlerInternal(exchange).map(handler -> {
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
}
ServerHttpRequest request = exchange.getRequest();
// 正常情况下这个if都不会进入
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
CorsConfiguration config = (this.corsConfigurationSource != null ?
this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (config != null) {
config.validateAllowCredentials();
}
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return NO_OP_HANDLER;
}
}
// 直接返回handler
return handler;
});
}
我们这里就直接进入到RoutePredicateHandlerMapping
类中
RoutePredicateHandlerMapping#getHandlerInternal
的详细代码如下
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return Mono.deferContextual(contextView -> {
exchange.getAttributes().put(GATEWAY_REACTOR_CONTEXT_ATTR, contextView);
// 核心方法是lookupRoute(exchange),这里会去进行路由的校验,根据我们配置文件中定义的路由断言规则进行校验
return lookupRoute(exchange)
.flatMap((Function<Route, Mono<?>>) r -> {
// 下面几行代码就是操作exchange的属性
// 上方的lookupRoute()方法中会添加GATEWAY_PREDICATE_ROUTE_ATTR,这里就进行移除
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
// 把当前路由对象添加进exchange对象中,之后的流程还会用到我们的路由对象
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
// 路由匹配成功,就直接返回WebHandler对象
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
// 当前请求没有任何一个路由匹配上的处理流程
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
});
}
再进入到RoutePredicateHandlerMapping#lookupRoute
方法中
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
// 获取到我们yml配置文件中所有定义的路由,并进行遍历
return this.routeLocator.getRoutes()
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// 添加GATEWAY_PREDICATE_ROUTE_ATTR
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
// 调用当前路由对象中的断言的apply()方法,apply()方法中又是一些异步的处理流程
// 这里就会根据我们配置文件中为路由配置的各个断言,去调用各个断言对象
return r.getPredicate().apply(exchange);
})
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
}
我当前gateway的配置文件中定义了两个路由
spring:
cloud:
gateway:
#设置路由:路由id、路由到微服务的uri、断言
routes:
- id: user_route #路由ID,全局唯一
uri: lb://mall-user #lb 整合负载均衡器ribbon,loadbalancer
predicates:
- Path=/user/** # 断言,路径相匹配的进行路由
- id: order_route #路由ID,全局唯一
# 测试 http://localhost:8888/order/findOrderByUserId/1
uri: lb://mall-order #lb 整合负载均衡器loadbalancer
predicates:
- Path=/order/** # 断言,路径相匹配的进行路由
所以上面的方法会执行两次
因为return r.getPredicate().apply(exchange);
又是一些异步调用,但我们从方法名就能看出来,这里根据我们yml配置文件中路由定义的断言去调用对应的断言对象。这就是各个断言工厂具体的实现了,接下来就以Path路径匹配的断言工厂来举例
class DefaultAsyncPredicate<T> implements AsyncPredicate<T> {
private final Predicate<T> delegate;
@Override
public Publisher<Boolean> apply(T t) {
// 调用各个Predicate对象的test()方法
return Mono.just(delegate.test(t));
}
//...
}
所以我们现在就直接去看path路径匹配的断言类PathRoutePredicateFactory
,
@Override
public Predicate<ServerWebExchange> apply(Config config) {
final ArrayList<PathPattern> pathPatterns = new ArrayList<>();
synchronized (this.pathPatternParser) {
pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchTrailingSlash());
config.getPatterns().forEach(pattern -> {
PathPattern pathPattern = this.pathPatternParser.parse(pattern);
pathPatterns.add(pathPattern);
});
}
return new GatewayPredicate() {
// 会进入到test()方法中
@Override
public boolean test(ServerWebExchange exchange) {
// 当前请求的uri路径 /order/findOrderByUserId/1
PathContainer path = parsePath(exchange.getRequest().getURI().getRawPath());
PathPattern match = null;
for (int i = 0; i < pathPatterns.size(); i++) {
// yml配置文件中的配置项 /order/**
PathPattern pathPattern = pathPatterns.get(i);
// 如果path匹配成功,那么match对象就不为null
if (pathPattern.matches(path)) {
match = pathPattern;
break;
}
}
// 如果path匹配成功,那么match对象就不为null 。匹配成功的处理逻辑
if (match != null) {
traceMatch("Pattern", match.getPatternString(), path, true);
PathMatchInfo pathMatchInfo = match.matchAndExtract(path);
putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());
// match.getPatternString() 为 /order/**
exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ATTR, match.getPatternString());
String routeId = (String) exchange.getAttributes().get(GATEWAY_PREDICATE_ROUTE_ATTR);
// 保存当前路由id
if (routeId != null) {
exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ROUTE_ID_ATTR, routeId);
}
return true;
}
// path匹配不成,返回false
else {
traceMatch("Pattern", config.getPatterns(), path, false);
return false;
}
}
@Override
public Object getConfig() {
return config;
}
@Override
public String toString() {
return String.format("Paths: %s, match trailing slash: %b", config.getPatterns(),
config.isMatchTrailingSlash());
}
};
}
handleRequestWith()
// DispatcherHandler#handleRequestWith
private Mono<Void> handleRequestWith(ServerWebExchange exchange, Object handler) {
if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
return Mono.empty(); // CORS rejection
}
if (this.handlerAdapters != null) {
// 遍历所有的HandlerAdapter
for (HandlerAdapter adapter : this.handlerAdapters) {
// 找能处理WebHandler类型的HandlerAdapter , 最终找到SimpleHandlerAdapter
if (adapter.supports(handler)) {
return adapter.handle(exchange, handler)
.flatMap(result -> handleResult(exchange, result));
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
如下图所示,找到SimpleHandlerAdapter
这个
SimpleHandlerAdapter
的详细代码如下所示
public class SimpleHandlerAdapter implements HandlerAdapter {
@Override
public boolean supports(Object handler) {
return WebHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
// 强转
WebHandler webHandler = (WebHandler) handler;
// getHandler()方法返回了一个FilteringWebHandler对象,这里就调用它的handle()方法
Mono<Void> mono = webHandler.handle(exchange);
// 返回一个空对象
return mono.then(Mono.empty());
}
}
FilteringWebHandler#handle
方法
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
// getHandler()方法中存入了Route路由对象,这里取出来,该对象保存着我们yml配置文件中的配置
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
// 取出我们yml配置文件中 该路由的局部Filter,我这里有三个,一个添加请求头、一个添加请求参数、一个自定义的
List<GatewayFilter> gatewayFilters = route.getFilters();
// 全局Filter
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
// 将局部Filter和全局Filter存入一个集合中
combined.addAll(gatewayFilters);
// 排序
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
// 调用各自的filter()方法
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
[GatewayFilterAdapter{RemoveCachedBodyFilter@5cfed0ba}, order = -2147483648]"
[GatewayFilterAdapter{AdaptCachedBodyGlobalFilter@22a6d75c}, order = -2147482648]"
[GatewayFilterAdapter{NettyWriteResponseFilter@691567ea}, order = -1]"
[GatewayFilterAdapter{ForwardPathFilter@28be7fec}, order = 0]"
// 三个局部Filter
[[AddRequestHeader X-Request-color = 'red'], order = 1]"
[[AddRequestParameter color = 'blue'], order = 2]"
[com.tuling.mall.gateway.filter.CheckAuthGatewayFilterFactory$$Lambda$980/0x0000014b3c649390@54f513cb, order = 3]"
// 路由转换 把http://localhost:8888/order/findOrderByUserId/1 ---> lb://mall-order/order/findOrderByUserId/1
[GatewayFilterAdapter{RouteToRequestUrlFilter@5c8d58ed}, order = 10000]"
// 根据lb://前缀过滤处理,使用serviceId选择一个服务实例,从而实现负载均衡
[GatewayFilterAdapter{ReactiveLoadBalancerClientFilter@437ed416}, order = 10150]"
[GatewayFilterAdapter{LoadBalancerServiceInstanceCookieFilter@11f23038}, order = 10151]"
[GatewayFilterAdapter{WebsocketRoutingFilter@26f0141}, order = 2147483646]"
// 发送netty 请求
[GatewayFilterAdapter{NettyRoutingFilter@de77146}, order = 2147483647]"
[GatewayFilterAdapter{ForwardRoutingFilter@6a567f7b}, order = 2147483647]"
接下来就挑几个重要的全局GlobalFilter来分析
RouteToRequestUrlFilter
路由转换 把http://localhost:8888/order/findOrderByUserId/1?color=blue
—> lb://mall-order/order/findOrderByUserId/1?color=blue
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 从exchange中取路由Route对象
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
// 取当前请求uri : http://localhost:8888/order/findOrderByUserId/1?color=blue
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
// 路由对象中保存的uri,也就是我们在yml文件中配置的值: lb://mall-order
URI routeUri = route.getUri();
if (hasAnotherScheme(routeUri)) {
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
// 如果我们在yml文件中配置的uri,即不是lb开头并且host还为null,那么就抛异常
if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}
// 转换结果为: lb://mall-order/order/findOrderByUserId/1?color=blue
URI mergedUrl = UriComponentsBuilder.fromUri(uri)
.scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();
// 存入exchange中,之后的LoadBalancer全局Filter中会用到
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}
ReactiveLoadBalancerClientFilter
解析lb://服务名,去服务注册中心获取服务实例instance,并通过负载均衡算法选择一个具体的instance
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 此时的url是这个样子: lb://mall-order/order/findOrderByUserId/1?color=blue
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
}
// 再获取一遍:lb://mall-order/order/findOrderByUserId/1?color=blue
URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
// 就是服务名: mall-order
String serviceId = requestUri.getHost();
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
// 请求信息 封装成一个对象
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
new RequestDataContext(new RequestData(exchange.getRequest()), getHint(serviceId)));
// 调用choose()方法
return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {
// 服务注册中心的响应response,获取server实例对象
ServiceInstance retrievedInstance = response.getServer();
// 值为http://localhost:8888/order/findOrderByUserId/1?color=blue
URI uri = exchange.getRequest().getURI();
String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
// 服务实例
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,overrideScheme);
// 最终通过上面的服务实例,修改之后的请求url为:http://192.168.236.173:8020/order/findOrderByUserId/1?color=blue
URI requestUrl = reconstructURI(serviceInstance, uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
// 存入exchange对象中,之后netty发送请求会用到该url
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
}).then(chain.filter(exchange))
.doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.FAILED, throwable, lbRequest,
exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
.doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.SUCCESS, lbRequest,
exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
}
// 进入到choose()方法中
private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId,
ReactorServiceInstanceLoadBalancer.class);
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + serviceId);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 调用ReactorLoadBalancer对象的choose()方法
return loadBalancer.choose(lbRequest);
}
NettyRoutingFilter
发送netty请求
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 经过负载均衡之后的请求url,具体的下游服务请求地址
// http://192.168.236.173:8020/order/findOrderByUserId/1?color=blue
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
// scheme为http
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest();
// GET 请求
final HttpMethod method = HttpMethod.valueOf(request.getMethod().name());
// url为 http://192.168.236.173:8020/order/findOrderByUserId/1?color=blue
final String url = requestUrl.toASCIIString();
// 请求头
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
// 路由对象
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
headers.add(httpHeaders);
// Will either be set below, or later by Netty
headers.remove(HttpHeaders.HOST);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
// 发送请求
}).request(method).uri(url).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound.withConnection(connection -> log.trace(...);
}
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
}).responseConnection((res, connection) -> {
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
}
setResponseStatus(res, response);
// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
Type.RESPONSE);
if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
response.getHeaders().addAll(filteredResponseHeaders);
return Mono.just(res);
});
Duration responseTimeout = getResponseTimeout(route);
if (responseTimeout != null) {
responseFlux = responseFlux
.timeout(responseTimeout,
Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
.onErrorMap(TimeoutException.class,
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}
补充知识
适配器模式
在FilteringWebHandler#handle
方法中,先获取路由的局部Filter,在创建一个集合存放全局Filter,在把局部Filter和全局Filter放在一起。这里就有一个问题:
局部Filter的类型是GatewayFilter
,而全局Filter的类型是GlobalFilter
,它们是怎么通过下面这种方式存放在一个集合中的嘞?
// 局部Filter
List<GatewayFilter> gatewayFilters = route.getFilters();
// 全局Filter
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
// 将局部Filter和全局Filter存入一个集合中
combined.addAll(gatewayFilters);
这里就用到了适配器模式,具体实现步骤是:
- 创建一个GatewayFilterAdapter类,实现 GatewayFilter接口
- GatewayFilterAdapter类中定义一个GlobalFilter属性,构造方法中传GlobalFilter类型的对象赋值给该属性
- 实现GatewayFilter接口的filter()方法,在filter()方法中调用全局Filter的filter()方法
private static class GatewayFilterAdapter implements GatewayFilter {
private final GlobalFilter delegate;
GatewayFilterAdapter(GlobalFilter delegate) {
this.delegate = delegate;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return this.delegate.filter(exchange, chain);
}
}
遍历List<GlobalFilter> globalFilters
全局Filter,分别存入GatewayFilterAdapter对象中
List<GatewayFilterAdapter>
----> List<GatewayFilter> globalFilters
集合
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
// 局部Filter
List<GatewayFilter> gatewayFilters = route.getFilters();
// 全局Filter,这样就把全局GlobalFilter变为了GatewayFilter类型了
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
// 存入一个集合中
combined.addAll(gatewayFilters);
AnnotationAwareOrderComparator.sort(combined);
// 调用各自的filter()方法
return new DefaultGatewayFilterChain(combined).filter(exchange);
}