1.前言
spring cloud gateway的基本组成和作用就不细赘述,此篇适合对此有一定了解的人阅读。
spring cloud gateway版本: Hoxton.SR1
spring cloud gateway的配置使用yml配置:
server:
port: 9527y
#根据微服务名称进行动态路由的配置
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true #开启从注册中心动态创建路由的功能,利用微服务名称进行路由
routes:
- id: config-client
uri: lb://config-client
predicates:
- Path=/config/**
filters:
- RewritePath=/config/?(?<segment>.*),/config/v1/$\{segment}
2. 流程图
先看一张官网文档给的图,此图大概描述了请求的处理原理,各个组件大致的位置。
3.源码剖析
http底层处理是基于netty,netty是一个高性能异步事件驱动的通讯框架,对于netty的处理流程可以查阅其源码。netty读取完数据经过pipeline管道处理后,最终调用到reactor.netty.http.server.HttpServerHandle#onStateChange方法。然后经过层层方法调用到核心类org.springframework.web.reactive.DispatcherHandler#handle
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
//没有合适的handler返回失败
return createNotFoundError();
}
return Flux.fromIterable(this.handlerMappings)
//mapping.getHandler是关键方法,根据handlerMapping找到对应的handler
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
//invokeHandler是关键方法,调用处理逻辑
.flatMap(handler -> invokeHandler(exchange, handler))
//处理结果,写出
.flatMap(result -> handleResult(exchange, result));
}
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
//查找合适的handlerAdapter处理,默认会调用到SimpleHandlerAdapter#handle
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]")
.onErrorResume(ex ->
result.applyExceptionHandler(ex).flatMap(exResult -> {
String text = "Exception handler " + exResult.getHandler() +
", error=\"" + ex.getMessage() + "\" [DispatcherHandler]";
return getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text);
}));
}
private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
if (this.resultHandlers != null) {
for (HandlerResultHandler resultHandler : this.resultHandlers) {
if (resultHandler.supports(handlerResult)) {
return resultHandler;
}
}
}
throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}
handlerMappings的注入类看下图,最后通过RoutePredicateHandlerMapping找到合适的处理类。handlerMappings中的其他几种Mapping方式,是别的策略或者配置时会用到,可以思考是怎么用的。
先来看看mapping.getHandler的处理逻辑,默认会调用到org.springframework.web.reactive.handler.AbstractHandlerMapping#getHandler
@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
//getHandlerInternal,根据exchange真正去查找合适的处理handler,根据上面解释,
//getHandlerInternal调用到RoutePredicateHandlerMapping类中去
return getHandlerInternal(exchange).map(handler -> {
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
}
//跨域处理
if (hasCorsConfigurationSource(handler)) {
ServerHttpRequest request = exchange.getRequest();
CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return REQUEST_HANDLED_HANDLER;
}
}
return handler;
});
}
现在调用到了org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping#getHandlerInternal方法中,这里一个关键点就来了。Predicates断言,是路由配置的关键,根据predicates的结果,满足的话就会转发请求到对应的Router配置的uri上。
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
//lookupRoute(exchange)去查找满足断言条件的路由Router
return lookupRoute(exchange)
// 满足的router会被组装到exchange中,然后返回webHandler
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug(
"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for ["
+ getExchangeDesc(exchange) + "]");
}
})));
}
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// 遍历所有的Router,此行r.getPredicate().apply(exchange)是验证是否满足断言要求,
// 满足的Router会被返回
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
/*
* TODO: trace logging if (logger.isTraceEnabled()) {
* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
*/
}
apply调用进入org.springframework.cloud.gateway.handler.AsyncPredicate.DefaultAsyncPredicate#apply方法,查看delegate.test(t)调用的实现类,可以发现所有断言的调用,此处根据我们配置的断言规则调用对应的断言,返回Boolean。
通过断言拿到对应的handler后回到DispatcherHandler#handle方法接下来调用invokeHandler(exchange, handler)
//这个handler 是 FilteringWehHandler
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
//使用SimpleHandlerAdapter来处理
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
然后调用org.springframework.web.reactive.result.SimpleHandlerAdapter#handle
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
//handler是FilteringWehHandler,所以调用到FilteringWehHandler.handle
Mono<Void> mono = webHandler.handle(exchange);
return mono.then(Mono.empty());
}
org.springframework.cloud.gateway.handler.FilteringWebHandler#handle
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
//取出之前匹配的Router,取出filters,如果配置了的话
List<GatewayFilter> gatewayFilters = route.getFilters();
//GatewayFilter和globalFilter合并,并按order排序
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
//进入过滤链调用filters
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
//index 每次+1,设置到chain中,传递到下一次filter,下一次filter时取就是next的filter
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
//filter链执行
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
filter链很重要,是spring cloud gateway的扩展点,可以做扩展逻辑,比如权限校验,登录认证,日志等。默认情况下的filter链如下,需要关注一下LoadBalancerClientFilter和NettyRoutingFilter
org.springframework.cloud.gateway.filter.LoadBalancerClientFilter#filter
@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
//使用协议 http还是lb
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// 保存原始请求url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
//根据注册中心的信息,使用负载均衡算法,找一个可用的服务
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
//替换成真实服务器的地址,后续调用使用
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
org.springframework.cloud.gateway.filter.NettyRoutingFilter#filter 处理http和https请求的发送
@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest();
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
final String url = requestUrl.toASCIIString();
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
boolean preserveHost = exchange
.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
//发送请求
Flux<HttpClientResponse> responseFlux = httpClientWithTimeoutFrom(route)
.headers(headers -> {
headers.add(httpHeaders);
// Will either be set below, or later by Netty
headers.remove(HttpHeaders.HOST);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
}).request(method).uri(url).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound
.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText()
+ ", inbound: " + exchange.getLogPrefix()));
}
return nettyOutbound.send(request.getBody()
.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
.getNativeBuffer()));
}).responseConnection((res, connection) -> {
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
setResponseStatus(res, response);
// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);
if (!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
return Mono.just(res);
});
Duration responseTimeout = getResponseTimeout(route);
if (responseTimeout != null) {
responseFlux = responseFlux
.timeout(responseTimeout, Mono.error(new TimeoutException(
"Response took longer than timeout: " + responseTimeout)))
.onErrorMap(TimeoutException.class,
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}
以上,就是请求进来的处理过程。