引入网关的必要性
在微服务架构下,我们一般会把相对独立的业务或功能划分为不同的服务,不同服务之间相互隔离,独立部署。这些微服务由服务注册中心集中管理。对于来自外部的用户请求来说,在处理请求的核心业务逻辑之前,一般还需要对请求做一些预处理,如权限校验、监控统计、流量限制等。如果外部请求直接到达微服务,那么所有的服务都需要各自负责处理这些功能,会造成这部分功能逻辑在各个服务重复出现,增加了未来对这些基础功能的维护升级的成本。
所以在微服务环境下,我们还需要一个网关组件来作为请求入口。一些基础的请求预处理的逻辑可以统一实现在网关这一层,这样业务服务只需要专注于处理业务逻辑即可。另外,引入网关作为统一请求入口之后,还可以利用网关来实现一些其他的功能,比如服务保护、灰度发布等。
1. Spring Cloud Gateway 简介
Spring Cloud Gateway 是 Spring Cloud 微服务生态下的网关组件。Spring Cloud Gateway 是基于 Spring 5 和 Spring Boot 2 搭建的,本质上是一个 Spring Boot 应用。在详细介绍其基本原理之前,先看一下通常而言,可以由微服务网关提供的功能。
在 Spring Cloud Gateway 发布之前,Spring Cloud 使用的是由 Netflix 开源的 Zuul 1 作为网关组件。Zuul 1 是基于传统的 Servlet API 开发的,使用了阻塞的网络模型,每个请求需要分配专门的线程处理,所以资源开销比较大,在高并发的情况下需要创建大量的线程来处理请求,线程数目会成为系统的瓶颈。作为取代 Spring Cloud Zuul 的组件,Spring Cloud Gateway 网络层使用了基于非阻塞的 Netty,解决了线程数瓶颈从而提升了系统性能。
微服务网关的功能
- 请求路由:根据请求本身的属性把请求转发到不同的微服务是微服务网关的基本功能之一。业务运维人员需要针对不同的服务配置路由,使网关能够根据请求的 header、路径、参数、协议等属性将其转发到对应的服务。
- 服务发现:网关是微服务环境的请求入口。支持服务发现能使网关在转发请求到目标服务时充分利用服务注册中心动态管理服务实例的优势,在配置路由转发的目标地址时也会更加方便。
- 修改请求响应:网关在收到外部请求,将其转发到目标服务之前,可以根据需求对请求进行修改,比如果更改请求 header、参数等。类似地,也可以在获取到业务服务响应之后,返回给用户前对响应进行修改。
- 权限校验:某些业务场景在处理用户请求时需要先对用户进行权限校验,这部分逻辑也可以由网关来负责。请求在到达网关时,由网关根据请求要访问的业务接口先对用户鉴权,只有校验通过的请求才会转发到对应的服务,而校验不通过的请求会被网关直接拒绝。这样做能够把拒绝无效请求这一步提前到网关这一层,减少无效的流量进入到业务服务。
- 限流熔断:网关可以通过添加限流、熔断等机制来对业务服务起保护作用,提升系统整体的可用性。根据业务服务的吞吐量,网关可以限制转发到该服务的请求数量,超出限制的请求直接拒绝或降级,这样可以避免因为过多的请求导致业务服务负载过高的情况。当业务服务异常时,还可以通过熔断的方式到达快速失败的效果。
- 请求重试:对于一些幂等的请求,当网关转发目标服务失败时,可以在网关层做自动重试。对于一些多实例部署服务,重试时还可以考虑把请求转发到不同的实例,以提高请求成功的概率。
- 响应缓存:当用户请求获取的是一些静态的或更新不频繁的数据时,一段时间内多次请求获取到的数据很可能是一样的。对于这种情况可以将响应缓存起来。这样用户请求可以直接在网关层得到响应数据,无需再去访问业务服务,减轻业务服务的负担。
- 响应聚合:某些情况下用户请求要获取的响应内容可能会来自于多个业务服务。网关作为业务服务的调用方,可以把多个服务的响应整合起来,再一并返回给用户。
- 监控统计:因为网关是请求入口,所以在网关这一层可以方便地对外部的访问请求做监控和统计,同时还可以对业务服务的响应做监控,方便发现异常情况。
- 灰度流量:网关可以用来做服务流量的灰度切换。比如某个业务服务上线了新版本,那可以在网关这一层按照灰度策略,把一部分请求流量切换到新版本服务上,以达到验证新版本业务服务的功能和性能的效果。
- 异常响应处理:对于业务服务返回的异常响应,可以在网关层在返回给用户之前做转换处理。这样可以把一些业务侧返回的异常细节隐藏,转换成用户友好的错误提示返回。
2. Spring Cloud Gateway 基本原理
Spring Cloud Gateway 使用了 Spring WebFlux 非阻塞网络框架,网络层默认使用了高性能非阻塞的 Netty Server,解决了 Spring Cloud Zuul 因为阻塞的线程模型带来的性能下降的问题。
Gateway 本身是一个 Spring Boot 应用,它处理请求是逻辑是根据配置的路由对请求进行预处理和转发。Gateway 有几个比较核心的概念:
- Route:一个 Route 由路由 ID,转发 URI,多个 Predicates 以及多个 Filters 构成。Gateway 上可以配置多个 Routes。处理请求时会按优先级排序,找到第一个满足所有 Predicates 的 Route;
- Predicate:表示路由的匹配条件,可以用来匹配请求的各种属性,如请求路径、方法、header 等。一个 Route 可以包含多个子 Predicates,多个子 Predicates 最终会合并成一个;
- Filter:过滤器包括了处理请求和响应的逻辑,可以分为 pre 和 post 两个阶段。多个 Filter 在 pre 阶段会按优先级高到低顺序执行,post 阶段则是反向执行。Gateway 包括两类 Filter。
- 全局 Filter:每种全局 Filter 全局只会有一个实例,会对所有的 Route 都生效。
- 路由 Filter:路由 Filter 是针对 Route 进行配置的,不同的 Route 可以使用不同的参数,因此会创建不同的实例。
Gateway 在启动时会创建 Netty Server,由它接收来自 Client 的请求。收到请求后根据路由的匹配条件找到第一个满足条件的路由,然后请求在被该路由配置的过滤器处理后由 Netty Client 转到目标服务。服务返回响应后会再次被过滤器处理,最后返回给 Client。
Gateway 路由配置
Spring Cloud Gateway 本身提供了很多 Predicate 和 Filter 的实现,一些基本的功能可以通过这些现成的 Predicate 和 Filter 配置实现。这些 Gateway 本身提供的 Predicate 和 Filter 在官方文档上有详细的介绍,这里给一个大致的例子:
spring:
cloud:
gateway:
routes:
- id: test_route
uri: lb://service-A
predicates:
- Path=/hello
filters:
- SetRequestHeader=X-Request-Red, Blue
复制代码
路由是 Gateway 的核心构件,不同的路由根据匹配条件可以处理不同类型的请求,并转发到对应的目标服务。一个路由由以下几个属性组成:
- Id: 路由 ID;
- Uri: 转发请求的目标地址;
- Order: 顺序(优先级);
- Predicate: 匹配条件。多个 Predicates 会合并成一个聚合的条件;
- Filters: 路由过滤器。这些过滤器最终会和全局过滤器一起排序处理匹配成功的请求;
- Metadata: 额外的元数据。
Gateway 请求路由原理
Gateway 使用了 Spring WebFlux 框架,该框架处理请求的入口在类 DispatcherHandler 。它会根据提供的 HandlerMapping 来获取处理请求的 Handler 方法。Gateway 应用对 HandlerMapping 的实现是 RoutePredicateHandlerMapping 。
- 进来的请求由 DispatcherHandler 处理。
- DispatcherHandler 根据 RoutePredicateHandlerMapping 获取 Handler 方法。
- RoutePredicateHandlerMapping 依赖 RouteLocator 获取所有路由配置并根据匹配条件打到请求匹配的路由。
- RoutePredicateHandlerMapping 把请求交给 FilteringWebHandler 处理。
- FilteringWebHandler 从请求匹配的路由获取对应的路由 Filter,并和全局 Filter 合并构造 GatewayFilterChain ,请求最终由 GatewayFilterChain 里的 Filter 按顺序处理。
Spring Cloud Gateway 上下文(ServerWebExchange)
SpringCloud Gateway 的上下文是 ServerWebExchange,请求的信息都存储在 ServerWebExchange 中,在网关上的后续操作都是基于上下文操作的,在 http 请求到达网关之后,网关入口是ReactorHttpHandlerAdapter#apply 方法,去获取请求的 request 和 response,构建当次请求的上下文供后续 filter 使用:
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
// 获取请求的Request,构建ReactorServerHttpRequest
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
// 构建ServerHttpResponse
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
// 交给HttpWebHandlerAdapter构建上下文ServerWebExchange
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}
复制代码
构建完 request 和 response 后,交给 HttpWebHandlerAdapter 构建上下文 ServerWebExchange:
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
// 构建请求的上下文
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
}
复制代码
Spring Cloud Gateway 读取路由(RouteDefinition)
我们在配置文件中配置的一个路由规则,对应到 Java 类就是 GatewayProperties,Spring Boot 会将配置文件映射为 Java 类,例如上文的配置:
spring:
cloud:
gateway:
routes:
- id: test_route
uri: lb://service-A
predicates:
- Path=/hello
filters:
- SetRequestHeader=X-Request-Red, Blue
复制代码
路由信息映射到 GatewayProperties 后如何获取其中的 RouteDefinition?
答案是通过 RouteDefinitionLocator。
public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {
private final GatewayProperties properties;
// 构造函数设置properties
public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
this.properties = properties;
}
// 从properties中读取RouteDefinition
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(this.properties.getRoutes());
}
}
复制代码
当然我们获取路由信息的地方不止 properties 一种,还可以从 内存,缓存,甚至注册中心等。CompositeRouteDefinitionLocator 利用委派器模式允许我们组合读取路由信息。
public class CompositeRouteDefinitionLocator implements RouteDefinitionLocator {
private final Flux<RouteDefinitionLocator> delegates;
// 将 RouteDefinitionLocator 组合
public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates) {
this.delegates = delegates;
}
// 委托给 RouteDefinitionRepository 执行读取
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return this.delegates.flatMap(RouteDefinitionLocator::getRouteDefinitions);
}
}
复制代码
Spring Cloud Gateway 的 GlobalFilter
GlobalFilter 是所有被 Gateway 拦截的 http 请求都要做的处理;GatewayFilter 是根据路由配置匹配predicate 的 http 请求才会做的处理。
全局拦截器,是所有被拦截到的 http 请求都要去做的处理;例如拿到一个 http 请求后,我们的目的是转发到下游服务,请求结果并返回,那么所有被拦截到的 http 请求都需要做下列几件事:
- 按照 predicate 把符合规则的 url 转换为真正要去请求的 url;
- 调用真正的下游服务(基于 netty 实现的 http 调用,具体代码在 NettyRoutingFilter 类中);
- 得到 response,返回给调用方。
public interface GlobalFilter {
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
复制代码
接口中只有一个 filter 方法,实现类实现该接口后在 filter 中去做具体拦截逻辑,这些 Filter 都实现了 GlobalFilter 接口:
AdaptCachedBodyGlobalFilter:优先级最高的 Filter,请求到 gateway 后,将上下文ServerWebExchange 中已有的缓存删除 请求信息,将此次的请求信息缓存到上下文中。
ForwardPathFilter:如果该请求还未被路由或 URI对象的属性不是 forward,则将该请求对应配置的 Route 信息中 uri 的 path 设置到上下文 ServerWebExchange 中。
RouteToRequestUrlFilter:将此次请求的 uri 和配置的 Route 规则做 merged 处理,拿到真正代理的下游服务的地址,将得到的 url 放到上下文中,key 为
GATEWAY_REQUEST_URL_ATTR
。LoadBalancerClientFilter:网关提供了负载均衡的 Filter,具体负载规则可以自己实现。
NoLoadBalancerClientFilter:没有负载均衡的拦截器。
NettyRoutingFilter:网关的 http 是基于 netty 实现的,若此次请求 scheme 是 http 或 https 则使用基于 netty 的 httpClient 执行调用,将返回结果写入上下文中。
NettyWriteResponseFilter:基于 Web Flux,若上下文中存在
CLIENT_RESPONSE_CONN_ATTR
,将响应数据返回。WebClientHttpRoutingFilter:作用同 NettyRoutingFilter,方式同 LoadBalancerClientFilter。
WebsocketRoutingFilter:路由 WebSocket 请求,校验逻辑在WebsocketRoutingFilter#changeSchemeIfIsWebSocketUpgrade 中。
WebClientWriteResponseFilter:作用同 NettyWriteResponseFilter。
ForwardRoutingFilter:设置此次请求已被路由。
GatewayMetricsFilter:统计网关的性能指标。
Spring Cloud Gateway 的 GatewayFilter
GatewayFilter 是面向开发人员的,因需适配,当我们需要给符合 predicate 的 url 做一些处理时通过配置就可添加,例如,我们想给 path 匹配上 /test/**
的 url 添加 header,通过下列配置就可添加,这类配置是根据业务需求进行的特殊配置。
public interface GatewayFilter extends ShortcutConfigurable {
/**
* Name key.
*/
String NAME_KEY = "name";
/**
* Value key.
*/
String VALUE_KEY = "value";
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
复制代码
接口定义中多了 NAME_KEY
和 VALUE_KEY
,原因是 GatewayFilter 是面向开发人员的,例如我们需要配置给 path符合 /test/** 的请求添加 header 时,header 是 key-value 形式,这时候就用到了:
public class AddRequestHeaderGatewayFilterFactory extends AbstractNameValueGatewayFilterFactory {
@Override
public GatewayFilter apply(NameValueConfig config) {
return (exchange, chain) -> {
// 将要添加的key-value添加到上下文的header中
ServerHttpRequest request = exchange.getRequest().mutate()
.header(config.getName(), config.getValue()).build();
return chain.filter(exchange.mutate().request(request).build());
};
}
}
复制代码
GlobalFilter 和 GatewayFilter 整合应用
每个 Filter 中都有一个 Order 属性,在执行时是在 FilteringWebHandler#handle方法 中对 GlobalFilter 和 GatewayFilter 进行的整合和排序,具体执行在 FilteringWebHandler#filter方法:
/**
* 整合Filter
*/
public Mono<Void> handle(ServerWebExchange exchange) {
// 根据Route信息取出配置的GatewayFilter集合
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
// 取出globalFilters
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
// 将GatewayFilter添加到combined
combined.addAll(gatewayFilters);
// combined根据Order排优先级
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
/**
* 执行Filter
*/
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
复制代码
GlobalFilter 和 GatewayFilter 自定义 Filter
- 自定义 GlobalFilter:GlobalFilter 具体的实现方式是实现接口,每个 filter 都实现了 GlobalFilter 接口:
public class GlobalTestFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if("符合业务逻辑,处理完业务逻辑,继续执行下一个filter"){
return chain.filter(exchange);
}
//不符合业务逻辑,直接返回
return "按照不符合业务逻辑处理";
}
}
复制代码
- 自定义 GatewayFilter:GatewayFilter 具体的实现方式是工厂,每个工厂都继承了 AbstractGatewayFilterFactory:
public class TestGatewayFilterFactory extends AbstractGatewayFilterFactory<TestGatewayFilterFactory.Config> {
public TestGatewayFilterFactory() {
super(TestGatewayFilterFactory.Config.class);
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
if("符合条件,处理业务逻辑,继续执行下一个Filter"){
return chain.filter(exchange);
}
// 不符合条件,直接返回
return "false";
};
}
public static class Config {
private String businessAttributes;
public String getBusinessAttributes() {
return businessAttributes;
}
public void setBusinessAttributes(String businessAttributes) {
this.businessAttributes = businessAttributes;
}
}
}
复制代码
3. Spring Cloud Gateway 工作过程
网关启动阶段
- Yaml 文件和 GatewayProperties 文件映射,映射处理源码在 JavaBeanBinder.BeanProperty#getValue –> CollectionBinder#merge —> Binder#bindBean;
- 加载 Locator Bean,为后续读取 RouteDefinition 做准备【GatewayAutoConfiguration】;
public class GatewayAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public PropertiesRouteDefinitionLocator propertiesRouteDefinitionLocator(
GatewayProperties properties) {
return new PropertiesRouteDefinitionLocator(properties);
}
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
List<RouteDefinitionLocator> routeDefinitionLocators) {
return new CompositeRouteDefinitionLocator(
Flux.fromIterable(routeDefinitionLocators));
}
@Bean
@Primary
public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
return new CachingRouteLocator(
new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
}
}
复制代码
- 初始化GlobalFilters【FilteringWebHandler】;
public class GatewayAutoConfiguration {
@Bean
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
return new FilteringWebHandler(globalFilters);
}
}
复制代码
public class FilteringWebHandler implements WebHandler {
private final List<GatewayFilter> globalFilters;
// 构造函数中设置globalFiltersglobalFilters
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);
}
// 设置globalFilters
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
}
复制代码
- 初始化 predicates,gatewayFilters,getRoutes【GatewayAutoConfiguration –> RouteDefinitionRouteLocator】;
public class RouteDefinitionRouteLocator
implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
// 构造函数中初始化
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
List<RoutePredicateFactory> predicates,
List<GatewayFilterFactory> gatewayFilterFactories,
GatewayProperties gatewayProperties, ConversionService conversionService) {
this.routeDefinitionLocator = routeDefinitionLocator;
this.conversionService = conversionService;
initFactories(predicates);
gatewayFilterFactories.forEach(
factory -> this.gatewayFilterFactories.put(factory.name(), factory));
this.gatewayProperties = gatewayProperties;
}
// 设置predicate工厂
private void initFactories(List<RoutePredicateFactory> predicates) {
predicates.forEach(factory -> {
String key = factory.name();
if (this.predicates.containsKey(key)) {
this.logger.warn("A RoutePredicateFactory named " + key
+ " already exists, class: " + this.predicates.get(key)
+ ". It will be overwritten.");
}
this.predicates.put(key, factory);
if (logger.isInfoEnabled()) {
logger.info("Loaded RoutePredicateFactory [" + key + "]");
}
});
}
public Flux<Route> getRoutes() {
// 从RouteDefinitions转换为Route,转换过程在convertToRoute方法中实现
return this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute)
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});
}
// RouteDefinition到Route的转换
private Route convertToRoute(RouteDefinition routeDefinition) {
// 从routeDefinition获取predicate
AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
// 从routeDefinition获取gatewayFilters
List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
// 构造Route
return Route.async(routeDefinition).asyncPredicate(predicate)
.replaceFilters(gatewayFilters).build();
}
// 获取GatewayFilters
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
List<GatewayFilter> filters = new ArrayList<>();
// 如果默认filter不为空,则去加载
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
this.gatewayProperties.getDefaultFilters()));
}
// 如果Filter不为空,则
if (!routeDefinition.getFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(routeDefinition.getId(),
routeDefinition.getFilters()));
}
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
@SuppressWarnings("unchecked")
private List<GatewayFilter> loadGatewayFilters(String id,
List<FilterDefinition> filterDefinitions) {
List<GatewayFilter> filters = filterDefinitions.stream().map(definition -> {
// 从gatewayFilterFactories中根据key获取factory
GatewayFilterFactory factory = this.gatewayFilterFactories
.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException(
"Unable to find GatewayFilterFactory with name "
+ definition.getName());
}
// 获取definition设置的Filter值
Map<String, String> args = definition.getArgs();
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + id + " applying filter " + args + " to "
+ definition.getName());
}
Map<String, Object> properties = factory.shortcutType().normalize(args,
factory, this.parser, this.beanFactory);
// 每一个工厂中都有一个静态内部类Config,目的是存储我们设置的Filter值
Object configuration = factory.newConfig();
// 将后几个参数的信息绑定到configuration
ConfigurationUtils.bind(configuration, properties,
factory.shortcutFieldPrefix(), definition.getName(), validator,
conversionService);
// 获得GatewayFilter
GatewayFilter gatewayFilter = factory.apply(configuration);
if (this.publisher != null) {
this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
}
return gatewayFilter;
}).collect(Collectors.toList());
ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
for (int i = 0; i < filters.size(); i++) {
GatewayFilter gatewayFilter = filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
}
复制代码
请求处理阶段
- ReactorHttpHandlerAdapter#apply 方法是请求到网关执行的入口;
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
// 获取请求的request和response
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
// 给到HttpWebHandlerAdapter执行构建
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}
复制代码
- HttpWebHandlerAdapter#handle 构建网关上下文 ServerWebExchange;
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
// 根据请求的request、response构建网关上下文
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
}
复制代码
- DispatcherHandler 用于 Http 请求处理器/控制器的中央分发处理器,把请求分发给已经注册的处理程序处理,DispatcherHandler 遍历 Mapping 获取对应的 handler,网关一共有 6 个 handlerMapping【此处会找到 RoutePredicateHandlerMapping,通过 RoutePredicateHandlerMapping 获取 FilteringWebHandler,通过 FilteringWebHandler 获取】;
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
// 遍历mapping获取handler
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
}
复制代码
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
private final FilteringWebHandler webHandler;
private final RouteLocator routeLocator;
private final Integer managementPort;
private final ManagementPortType managementPortType;
// 网关启动时进行了初始化
public RoutePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties,
Environment environment) {
this.webHandler = webHandler;
this.routeLocator = routeLocator;
this.managementPort = getPortProperty(environment, "management.server.");
this.managementPortType = getManagementPortType(environment);
setOrder(1);
setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
}
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
// 返回FilteringWebHandler
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for ["
+ getExchangeDesc(exchange) + "]");
}
})));
}
}
复制代码
- RoutePredicateHandlerMapping#lookupRoute 匹配路由,根据 routeLocator 获取我们在配置我文件中配置的 Route,和当前请求的路由做匹配;
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
// routeLocator获取我们在配置我文件中配置的Route
return this.routeLocator.getRoutes()
.concatMap(route -> Mono.just(route).filterWhen(r -> {
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
// 当前请求的路由做匹配
return r.getPredicate().apply(exchange);
})
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
.next()
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
}
}
复制代码
- FilteringWebHandler 创建过滤器链,执行过滤器;
public class FilteringWebHandler implements WebHandler {
// 创建过滤器链
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
// 调用过滤器
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
// 执行调用
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
}