文章目录
- 前言
- 项目背景
- 实现方案
- 具体实现
- 功能演示
- 思路延伸
- 1. spring cloud gateway
- 2. 研究路由原理
- 2.1 寻找合适的 Handler
- 2.2 执行 Handler
- 2.3 处理调用结果
- 参考内容
前言
本系列用来记录一些在实际项目中的小东西,并记录在过程中想到一些小东西,因为是随笔记录,所以内容不会过于详细。
项目背景
需要做一个数据监控中心来监控其他业务系统的数据,如请求次数,三方接口调用次数以及失败次数等。数据来源于各个业务系统,并且数据不落在监控系统内部,而是由各个业务系统提供接口,前端访问 monitor 系统,由 monitor 系统调用其他业务系统的数据接口返回数据。
话不多说 包括各个业务系统的数据统计功能从开始到发布只有三天时间
所以说白了, 这里所谓的数据监控中心就是一个请求转发的应用。
实现方案
具体实现
路由服务的核心实现如下,业务有所简化,比如去除了用户信息验证登功能,注释比较清楚,这里不再赘述。
/**
* @Author : kingfish
* @Email : kingfishx@163.com
* @Date : 2023/11/4 10:45
* @Desc : 常量
*/
public interface CommonContants {
/**
* 路由关键字
*/
String ROUTER = "router";
}
/**
* @Author : kingfish
* @Email : kingfishx@163.com
* @Date : 2023/11/4 16:53
* @Desc : 持有各个业务项目的信息,如有需要也可以定制各个业务系统的特殊逻辑
*/
@Data
public class SystemHolder {
/**
* 系统域名
*/
private String domain;
public SystemHolder() {
}
public SystemHolder(String domain) {
this.domain = domain;
}
}
/**
* @Author : kingfish
* @Email : kingfishx@163.com
* @Date : 2023/11/4 10:40
* @Desc : 路由调用工具类
*/
@Slf4j
@Component
public class InvokeRouterUtil {
/**
* 路由的系统参数是否保留
*/
private static final String RESERVED_PREFIX = "reservedPrefix";
/**
* 系统域名映射 key :系统唯一标识, value 系统
*/
private Map<String, SystemHolder> systemMap = Maps.newHashMap();
@Value("${server.servlet.context-path}")
private String contextPath;
@PostConstruct
public void init() {
systemMap.put("aaa", new SystemHolder("http://localhost:6666"));
systemMap.put("bbb", new SystemHolder("http://localhost:9999"));
}
/**
* 路由调用
*
* @param system
* @param request
* @param response
*/
@SneakyThrows
public void invoke(String system,
HttpServletRequest request, HttpServletResponse response) {
final HttpMethod httpMethod = HttpMethod.resolve(request.getMethod());
final URI realUrl = resolveRealUrl(system, request);
if (httpMethod == null || realUrl == null) {
throw CommonBizException.logException("当前请求不合法, 无法转发");
}
log.info("[路由转发][originalUrl = {}, realUrl = {}]", request.getRequestURI(), realUrl);
ClientHttpRequest delegateRequest =
new SimpleClientHttpRequestFactory().
createRequest(realUrl, httpMethod);
resolveRequestHeader(request, delegateRequest);
resolveRequestBody(request, delegateRequest);
try (ClientHttpResponse clientHttpResponse = delegateRequest.execute();
ServletOutputStream responseOutputStream = response.getOutputStream()) {
response.setStatus(clientHttpResponse.getStatusCode().value());
clientHttpResponse.getHeaders().forEach((key, values) -> {
// 处理响应头重复情况,在请求返回时该响应头会出现重复,postman 调用成功,但是实际前端调用会出错
if (!"Transfer-Encoding".equalsIgnoreCase(key)) {
values.forEach(value -> response.setHeader(key, value));
}
});
IoUtil.copy(clientHttpResponse.getBody(), responseOutputStream);
}
}
/**
* 解析请求体
*
* @param request
* @param delegateRequest
* @throws IOException
*/
private void resolveRequestBody(HttpServletRequest request, ClientHttpRequest delegateRequest) throws IOException {
StreamUtils.copy(request.getInputStream(), delegateRequest.getBody());
}
/**
* 解析请求头
*
* @param request
* @param delegateRequest
* @return
*/
private void resolveRequestHeader(HttpServletRequest request, ClientHttpRequest delegateRequest) {
Enumeration<String> headerNames = request.getHeaderNames();
// 设置请求头
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
Enumeration<String> v = request.getHeaders(headerName);
List<String> arr = new ArrayList<>();
while (v.hasMoreElements()) {
arr.add(v.nextElement());
}
delegateRequest.getHeaders().addAll(headerName, arr);
}
}
/**
* 解析获取真实请求路径
*
* @param system
* @param request
* @return
*/
private URI resolveRealUrl(String system, HttpServletRequest request)
throws URISyntaxException, UnsupportedEncodingException {
StringBuilder requestUrl =
new StringBuilder(StringUtils.substringAfter(request.getRequestURI(),
contextPath + CommonContants.ROUTER + "/" + system + "/"));
SystemHolder systemHolder = systemMap.get(system);
String domain = systemHolder.getDomain();
if (StringUtils.isNoneBlank(requestUrl.toString(), domain)) {
final Enumeration<String> parameterNames = request.getParameterNames();
StringBuilder uriVariables = new StringBuilder("?");
while (parameterNames.hasMoreElements()) {
// 转义部分特殊字符,如果请求参数里带有 +、空格等不转义请求路由过去会出问题
final String parameterName = URLEncoder.encode(parameterNames.nextElement(), "UTF-8");
final String parameter = URLEncoder.encode(request.getParameter(parameterName), "UTF-8");
if (parameterName.equalsIgnoreCase(RESERVED_PREFIX)
&& Boolean.TRUE.toString().equalsIgnoreCase(parameter)) {
requestUrl.insert(0, system + "/");
continue;
}
uriVariables.append(parameterName).append("=").append(parameter).append("&");
}
domain = domain.endsWith("/") ? domain : domain + "/";
return new URI(domain + requestUrl
+ (uriVariables.length() == 1 ? "" : uriVariables.substring(0, uriVariables.length() - 1)));
}
return null;
}
}
/**
* @Author : kingfish
* @Email : kingfishx@163.com
* @Date : 2023/11/4 18:06
* @Desc : 路由调用
*/
@RestController
@RequestMapping(CommonContants.ROUTER)
public class RemoteRouterController {
@Autowired
private InvokeRouterUtil invokeRouterUtil;
@RequestMapping("/{system}/**")
public void genericInvoke(@PathVariable("system") String system,
HttpServletRequest request, HttpServletResponse response) {
invokeRouterUtil.invoke(system, request, response);
}
}
功能演示
-
假设我们已经通过如上代码建立了一个路由服务 ,配置如下:
server.port=8090 server.servlet.context-path=/common-demo
-
假设我们新建了两个服务 aaa (端口号6666) 和 bbb (端口号9999), 都分别暴露如下接口
@RestController @RequestMapping("test") public class TestController { @Value("${server.servlet.context-path}") private String contextPath; @RequestMapping("demo") public String demo(String msg) { return contextPath + " 收到消息 :" + msg; } }
针对上面两个服务暴露的接口请求路径应该如下:
http://localhost:6666/aaa/test/demo?msg=123 http://localhost:9999/bbb/test/demo?msg=123
-
如果通过路由服务请求,请求地址应为:
http://localhost:8090/common-demo/router/aaa/test/demo?msg=123&reservedPrefix=true http://localhost:8090/common-demo/router/bbb/test/demo?msg=123&reservedPrefix=true
解释下请求路径中的各个参数的意义:
http://localhost:8090/common-demo
: 是 路由服务的请求上下文router
:标注这个请求需要路由转发aaa、bbb
: 服务标识,标识转发到哪个服务。test/demo?msg=123
: 正常请求路径以及参数reservedPrefix
:转发请求时是否需要携带前缀,即服务标识,默认为false
- 如果为true,则转发后的请求为 {{服务域名}}/{{服务标识}}/{{正常请求路径}},以上面为例转发后的请求路径即为:
java http://localhost:6666/aaa/test/demo?msg=123 http://localhost:9999/aaa/test/demo?msg=123
- 如果为fasle ,则转发后的请求为 {{服务域名}}/{{正常请求路径}},以上面为例转发后的请求路径即为
java http://localhost:6666/test/demo?msg=123 http://localhost:9999/test/demo?msg=123
思路延伸
这个数据监控中心虽然叫做数据监控中心,但是实际实现的作用也就是个请求转发的功能,这里就想到了网关的作用也是请求转发,所以这里顺便学习了下 Spring Cloud Gateway 的实现。由于 spring cloud gateway 之前个人仅停留的了解阶段,所以下面的部分内容有所参考,推荐阅读参考原文。
1. spring cloud gateway
先搭建一个spring cloud gateway 项目,项目本身并没有任何内容,所以这里贴出 pom 和 yml 配置即可
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.16</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<artifactId>spring-cloud-gateway</artifactId>
<name>spring-cloud-gateway</name>
<properties>
<java.version>8</java.version>
<spring-cloud.version>2021.0.6</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
<version>3.1.6</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
server:
port: 9091
spring:
application:
name: gateway
cloud:
gateway:
enabled: true
routes:
- id: demo
uri: http://localhost:8090/common-demo
predicates:
- Path=/router/**
filters:
- StripPrefix=1
logging:
level:
root: debug
web: debug
sql: debug
-
新建一个服务 common-demo, 端口 8090,提供路由目的接口,如下:
@RestController @RequestMapping("test") public class TestController { @RequestMapping("demo") public String demo(String msg) { return "收到消息 :" + msg; } }
-
请求如下:
2. 研究路由原理
由于 spring-cloud-starter-gateway 使用的是 Spring Webflux,所以请求的入口是 DispatcherHandler, 基本逻辑与 Spring Web 类似,都是根据请求生成对应的 Handler 并找到对应的 HandlerAdapter 来处理请求结果,不同的是 Webflux 通过IO 多路复用增加了服务的并发访问量。
// org.springframework.web.reactive.DispatcherHandler#handle
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
// 从 handlerMappings 中寻找合适的 handler ,通过 invokeHandler 调用 handler 并通过 handleResult 返回结果.
return Flux.fromIterable(this.handlerMappings)
// 1. 寻找合适的 Handler :顺序执行 handlerMappings.getHandler 方法,遇到第一个返回不为空的继续执行
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
// 如果全部为空则切换到NotFounc 逻辑
.switchIfEmpty(createNotFoundError())
// 2. 找到合适的 HandlerAdapter 来调用执行 Handler
.flatMap(handler -> invokeHandler(exchange, handler))
// 3. 处理 Handler 调用结果
.flatMap(result -> handleResult(exchange, result));
}
2.1 寻找合适的 Handler
上面代码中的 handlerMappings 默认有四个实现,如下,默认会按照次顺序寻找能当前请求的 HandlerMapping :
org.springframework.web.reactive.function.server.support.RouterFunctionMapping
org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping
org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping
org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
对于网关转发的请求,是交由 RoutePredicateHandlerMapping#getHandlerInternal 来处理,而 RoutePredicateHandlerMapping#getHandlerInternal 的逻辑就是根据当前请求路径匹配对应的路由 (Route)并返回。
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getLocalAddress() != null
&& exchange.getRequest().getLocalAddress().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
// 根据当前请求寻找匹配路由,返回 webHandler,
// 这里的 webHandler 是 RoutePredicateHandlerMapping 的全局变量,实现类是 FilteringWebHandler
return lookupRoute(exchange)
.flatMap((Function<Route, Mono<?>>) r -> {
// 移除 GATEWAY_PREDICATE_ROUTE_ATTR 属性
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
// 添加属性 GATEWAY_ROUTE_ATTR, value 是当前 route
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
// 查找当前请求对应的路由 并返回。
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
// 从配置的路由中寻找
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// 过滤出满足当前请求的路由,将其id添加到 exchange 属性中,在下面 r.getPredicate().apply(exchange) 中会使用到。
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
.next()
// TODO: error handling
.map(route -> {
validateRoute(route, exchange);
return route;
});
}
2.2 执行 Handler
// 调用处理器,逻辑基本和 Spring Web 相同:找到合适的 HandlerAdapter 执行并返回
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
return Mono.empty(); // CORS rejection
}
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
这里 HandlerAdapter 默认是下面四个实现类,我们的请求会交由 SimpleHandlerAdapter 来完成。
org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter
org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter
org.springframework.web.reactive.result.SimpleHandlerAdapter
SimpleHandlerAdapter 实现如下:SimpleHandlerAdapter 是直接调用 Handler#handle 方法处理的请求
public class SimpleHandlerAdapter implements HandlerAdapter {
@Override
public boolean supports(Object handler) {
return WebHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
Mono<Void> mono = webHandler.handle(exchange);
// 这里当 webHandler.handle(exchange) 执行完毕后,替换一个空的 Mono 返回
return mono.then(Mono.empty());
}
}
而这里的 handler 就是 RoutePredicateHandlerMapping#getHandlerInternal 返回的 FilteringWebHandler,所以这里会调用 FilteringWebHandler#handle 方法。
FilteringWebHandler#handle 的方法实现很简单,即根据当前请求对应的路由的过滤器生成过滤器链,并执行。如下:
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
// 获取当前请求匹配的路由,这个属性在 RoutePredicateHandlerMapping#getHandlerInternal 中放入
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
// 获取当前路由配置的过滤器
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
// 对过滤器进行排序
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
// 生成过滤器链并执行
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
按照 gatewayFilters
排序后的顺序,下面简单看下每个过滤器的作用(关于过滤器的解析并非本文主题,所以参考 Spring Cloud Gateway 源码解析 - 07 - 过滤器解析之 GlobalFilter, 更推荐阅读原文,下面大部分是直接抄的):
-
RemoveCachedBodyFilter:清理网关请求的上线文参数。需要注意的是 该过滤器虽然排序在最前面,但实际上是在最后执行。
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 最高的优先级能保证自己最先执行,在内部直接执行其他过滤器,在其他过滤器执行结束后便可以清理资源。 return chain.filter(exchange).doFinally(s -> { Object attribute = exchange.getAttributes().remove(CACHED_REQUEST_BODY_ATTR); if (attribute != null && attribute instanceof PooledDataBuffer) { PooledDataBuffer dataBuffer = (PooledDataBuffer) attribute; if (dataBuffer.isAllocated()) { if (log.isTraceEnabled()) { log.trace("releasing cached body in exchange attribute"); } dataBuffer.release(); } } }); } // 设置最高的优先级 @Override public int getOrder() { return HIGHEST_PRECEDENCE; }
-
AdaptCachedBodyGlobalFilter : 与 RemoveCachedBodyFilter 相反,AdaptCachedBodyGlobalFilter的作用是将请求参数缓存到上下文中。
public class AdaptCachedBodyGlobalFilter implements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> { private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>(); // 当配置了重试时, 会通过 RetryGatewayFilterFactory 加载 RetryGatewayFilter过滤器 // 而在执行重试逻辑时发布 EnableBodyCachingEvent,此处会监听到该事件 @Override public void onApplicationEvent(EnableBodyCachingEvent event) { this.routesToCache.putIfAbsent(event.getRouteId(), true); } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // the cached ServerHttpRequest is used when the ServerWebExchange can not be // mutated, for example, during a predicate where the body is read, but still // needs to be cached. // 从上下文获取 CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR 缓存 (请求参数的缓存) ServerHttpRequest cachedRequest = exchange.getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null); // 如果 cachedRequest 不为空,则说明已经进行了缓存,则通过缓存的request构建一个上下文请求 if (cachedRequest != null) { exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR); return chain.filter(exchange.mutate().request(cachedRequest).build()); } // 从上下文获取 CACHED_REQUEST_BODY_ATTR 缓存 (请求体的缓存) DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null); // 获取当前请求的路由信息 Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); // 如果满足 if 条件,则说明请求已经被缓存过, if (body != null || !this.routesToCache.containsKey(route.getId())) { return chain.filter(exchange); } // ServerWebExchangeUtils#cacheRequestBody 会调用到 ServerWebExchangeUtils#decorate 方法来进行缓存CACHED_REQUEST_BODY_ATTR return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> { // don't mutate and build if same request object // 如果是同一个请求,则直接执行Filter逻辑 if (serverHttpRequest == exchange.getRequest()) { return chain.filter(exchange); } // 否则,通过新的请求构建一个请求上下文 return chain.filter(exchange.mutate().request(serverHttpRequest).build()); }); } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE + 1000; } }
-
NettyWriteResponseFilter : 该过滤器和 RemoveCachedBodyFilter 相同,优先加载,最后执行 (在 RemoveCachedBodyFilter 之前执行),其作用是将请求返回的数据写入到 Response 中
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added // until the NettyRoutingFilter is run // @formatter:off return chain.filter(exchange) .then(Mono.defer(() -> { // 从上下文中获取CLIENT_RESPONSE_CONN_ATTR,Connection是对NettyChannel的封装 // CLIENT_RESPONSE_CONN_ATTR是在{@link NettyRoutingFilter#filter}中放入的 Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR); if (connection == null) { return Mono.empty(); } ServerHttpResponse response = exchange.getResponse(); // TODO: needed? final Flux<DataBuffer> body = connection .inbound() .receive() .retain() .map(byteBuf -> wrap(byteBuf, response)); MediaType contentType = null; try { contentType = response.getHeaders().getContentType(); } catch (Exception e) { } return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })).doOnCancel(() -> cleanup(exchange)) .doOnError(throwable -> cleanup(exchange)); // @formatter:on }
-
ForwardPathFilter : 用来处理 Forward URI,对应 ForwardRoutingFilter 来转发请求。
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); URI routeUri = route.getUri(); String scheme = routeUri.getScheme(); //如果请求已经被处理过或者uri的scheme不是forward,则不处理 //可以通过自定义过滤器来设置GATEWAY_ALREADY_ROUTED_ATTR为true从而使Filter不起作用 if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) { return chain.filter(exchange); } //替换请求path重新构建path exchange = exchange.mutate().request(exchange.getRequest().mutate().path(routeUri.getPath()).build()).build(); return chain.filter(exchange); }
-
RouteToRequestUrlFilter : 根据 Router 生成真正的请求路径
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); // route 为空则不执行 (GATEWAY_ROUTE_ATTR 在RoutePredicateHandlerMapping中放入的) if (route == null) { return chain.filter(exchange); } log.trace("RouteToRequestUrlFilter start"); URI uri = exchange.getRequest().getURI(); boolean encoded = containsEncodedParts(uri); URI routeUri = route.getUri(); // 判断是否为其他类型的协议 如:lb,则会将lb去掉 if (hasAnotherScheme(routeUri)) { // this is a special url, save scheme to special attribute // replace routeUri with schemeSpecificPart // 将当前请求的schema放入上下文 exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme()); // 创建新的 routeUri routeUri = URI.create(routeUri.getSchemeSpecificPart()); } // 如果RouteUri以lb开头,必须请求中带有host if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) { // Load balanced URIs should always have a host. If the host is null it is // most // likely because the host name was invalid (for example included an // underscore) throw new IllegalStateException("Invalid host: " + routeUri.toString()); } //生成RequestURL,并放入上下文中 //此处生成的URL的Path最终会以请求的Path为主,会覆盖真正的RouteUri, // 例如RouteUri为http://localhost:8088/api/hello,请求的URI为http://localhost:8080/api, // 那此处生成的URL为http://localhost:8080/api URI mergedUrl = UriComponentsBuilder.fromUri(uri) // .uri(routeUri) .scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl); return chain.filter(exchange); }
-
NoLoadBalancerClientFilter : 当没有配置注册中心时,也就不需要负载均衡,所以加载该过滤器,相应的还存在负载均衡的过滤器 ReactiveLoadBalancerClientFilter
@Override @SuppressWarnings("Duplicates") public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); // 判断如果请求需要负载均衡的情况下直接抛出 404 if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } throw NotFoundException.create(use404, "Unable to find instance for " + url.getHost()); }
-
WebsocketRoutingFilter : 处理 websocket 类型的请求。当请求上下文中的 GATEWAY_REQUEST_URL_ATTR 的 URL 中的协议 (schema) 为 ws 或者 wss 该 Filter 生效,使用 Spring 的 WebSocket 对请求进行转发。同时可以进行负载均衡,通过在 Route 的 URI 配置前边加上 lb: 生效。
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { changeSchemeIfIsWebSocketUpgrade(exchange); URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); // 如果是已经路由 || 非 ws、 wss 协议则直接跳过 if (isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) { return chain.filter(exchange); } // 设置为已被处理,后边的NettyRoutingFilter或者WebClientHttpRoutingFilter则不会执行 setAlreadyRouted(exchange); HttpHeaders headers = exchange.getRequest().getHeaders(); HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); List<String> protocols = getProtocols(headers); // 通过{@link HandshakeWebSocketService}去转发的请 return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols)); }
-
NettyRoutingFilter : 用来处理 http、https 的请求。使用基于 Netty HttpClient 请求后端的服务,上边讲到的 NettyWriteResponseFilter 用来处理 NettyRoutingFilter 请求后端获得的响应,将响应写回给客户端。同时 SCG 还定义了 WebClientHttpRoutingFilter,于 NettyRoutingFilter 类似,区别在于没有使用 Netty 去做请求转发的代理。NettyRoutingFilter 中会将请求的响应放入上下文中,供 NettyWriteResponseFilter 使用。
@Override @SuppressWarnings("Duplicates") public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); // 如果已经处理过 或 非 http、https 请求则跳过 if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))) { return chain.filter(exchange); } // 设置已经处理过 setAlreadyRouted(exchange); // 获取请求各种属性 ServerHttpRequest request = exchange.getRequest(); final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); final String url = requestUrl.toASCIIString(); // 执行请求头Filter,如ForwardedHeadersFilter、RemoveHopByHopHeadersFilter、XForwardedHeadersFilter HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); //基于filter过后的请求头创建Http请求头 final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); // 是否在请求头中增加 Host 信息 boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); // 构建 HttpCLient Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> { headers.add(httpHeaders); // Will either be set below, or later by Netty // 移除并根据需要决定是否添加新的 HOST headers.remove(HttpHeaders.HOST); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); headers.add(HttpHeaders.HOST, host); } }).request(method).uri(url).send((req, nettyOutbound) -> { if (log.isTraceEnabled()) { nettyOutbound.withConnection(connection -> log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix())); } return nettyOutbound.send(request.getBody().map(this::getByteBuf)); }).responseConnection((res, connection) -> { // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write // response later NettyWriteResponseFilter // 将调用真实服务返回的Response放入上下文,但NettyWriteResponseFilter中也没有用 exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); // 将Netty Channle放入上下文供NettyWriteResponseFilter使用 exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); // 设置响应头 res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } // 设置响应状态 setResponseStatus(res, response); // make sure headers filters run after setting status so it is // available in response HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) { // It is not valid to have both the transfer-encoding header and // the content-length header. // Remove the transfer-encoding header in the response if the // content-length header is present. response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().addAll(filteredResponseHeaders); return Mono.just(res); }); // 设置请求超时时间 Duration responseTimeout = getResponseTimeout(route); if (responseTimeout != null) { responseFlux = responseFlux .timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); }
-
ForwardRoutingFilter : 用来处理 forward 协议的请求,将 ForwardPathFilter 构建的新的 Request 发送给 DispatcherHandler 处理。
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); //判定是否已经处理过或者请求协议为forward,如果不是则忽略 if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) { return chain.filter(exchange); } // 交由 DispatcherHandler 来处理 return this.getDispatcherHandler().handle(exchange); }
其他补充:
- 关于 ServerWebExchangeUtils#setAlreadyRouted : 该方法是记录当前请求已经被处理,在NettyRoutingFilter、WebClientHttpRoutingFilter、WebsocketRoutingFilter、JsonToGrpcGatewayFilterFactory 中都有调用,其实可以看出来这四个过滤器处理的是不同协议的请求(WebClientHttpRoutingFilter 和 NettyRoutingFilter 都是处理 Http 和 Https 协议的请求,不同的是 NettyRoutingFilter 处理是通过 Netty 方式发起的请求,而 WebClientHttpRoutingFilter 则是处理普通的请求)
- 其他GlobalFilter:因为本篇例子的局限性,还存在一些其他的没有被加载,如 与 NoLoadBalancerClientFilter 对应的 ReactiveLoadBalancerClientFilter ,当需要进行负载均衡的时候加载该 FIlter;GatewayMetricsFilter, 需要添加 spring-boot-starter-actuator 依赖,可通过 spring.cloud.gateway.metrics.enabled=true/false 进行配置,默认为开启状态。可以通过 /actuator/metrics/gateway.requests 来访问查看。
2.3 处理调用结果
需要注意的是,如果是需要 gateway 转发的请求,则并不会执行到这一步,因为在对应协议的处理过程中就将请求的返回值写入 Response 中了。
// 处理调用结果 : 获取合适的结果处理器,处理并执行
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
// 获取结果处理器处理结果
return getResultHandler(result).handleResult(exchange, result)
// 设置检查点,如果处理异常会打印
.checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]")
.onErrorResume(ex ->
result.applyExceptionHandler(ex).flatMap(exResult -> {
String text = "Exception handler " + exResult.getHandler() +
", error=\"" + ex.getMessage() + "\" [DispatcherHandler]";
return getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text);
}));
}
// 获取可以处理当前结果的结果处理器
private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
if (this.resultHandlers != null) {
for (HandlerResultHandler resultHandler : this.resultHandlers) {
if (resultHandler.supports(handlerResult)) {
return resultHandler;
}
}
}
throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}
默认情况下 this.resultHandlers
会加载下面四个处理器。
org.springframework.web.reactive.result.method.annotation.ResponseEntityResultHandler :处理HttpEntity和ResponseEntity返回值。
org.springframework.web.reactive.function.server.support.ServerResponseResultHandler : 处理 ServerResponses 返回值
org.springframework.web.reactive.result.method.annotation.ResponseBodyResultHandler :当它检测到@ResponseBody的存在时,它应该在查找特定返回类型的结果处理程序之后排序。但是请注意,此处理程序可以识别并显式忽略ResponseEntity返回类型。
org.springframework.web.reactive.result.view.ViewResolutionResultHandler : 视图解析器。
参考内容
https://my.oschina.net/u/4970510/blog/5282057
https://blog.csdn.net/yaomingyang/article/details/112686324