Gateway源码分析:路由Route、断言Predicate、Filter

news2024/9/21 16:50:20

文章目录

    • 源码总流程图
    • 说明
    • GateWayAutoConfiguration
    • DispatcherHandler
    • getHandler()
    • handleRequestWith()
      • RouteToRequestUrlFilter
      • ReactiveLoadBalancerClientFilter
      • NettyRoutingFilter
    • 补充知识
      • 适配器模式
    • 详细流程图

源码总流程图

在线总流程图
在这里插入图片描述



说明

Gateway的版本使用的是4.0.0

Gateway的实现是基于WebFlux 、Reactor 、Netty



Gateway微服务的yml配置如下

  • Gateway的访问端口为8888
  • id为order_route的路由 uri为lb://mall-order
  • 为mall-order这个微服务定义了一个path路径的predicate断言;定义了三个filter
server:
  port: 8888
spring:
  application:
    name: mall-gateway

  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: nacos.mall.com:8848
        username: nacos
        password: nacos

    gateway:
      #设置路由:路由id、路由到微服务的uri、断言
      routes:
        - id: user_route   #路由ID,全局唯一
          uri: lb://mall-user  #lb 整合负载均衡器ribbon,loadbalancer
          predicates:
            - Path=/user/**   # 断言,路径相匹配的进行路由

        - id: order_route  #路由ID,全局唯一
          # 测试 http://localhost:8888/order/findOrderByUserId/1
          uri: lb://mall-order  #lb 整合负载均衡器loadbalancer
          predicates:
            - Path=/order/**   # 断言,路径相匹配的进行路由

          #配置过滤器工厂
          filters:
            - AddRequestHeader=X-Request-color, red  #添加请求头
            - AddRequestParameter=color, blue  # 添加请求参数
            - CheckAuth=hushang,#自定义过滤器工厂



Gateway的工作原理就是下面这一张图

在这里插入图片描述



在开始看Gateway的源码之前,我们回忆一下SpringMVC的实现原理:

  1. DispatchServlet#doDispatch作为Springmvc的入口
  2. HandlerMapper 路由匹配 —> 找到Handler
  3. 通过handler 找的适配的 HandlerAdapter
  4. HandlerAdapter#handle方法执行



而在Gateway的源码中:

  • DispatcherHandler#handle作为入口
  • HandlerMapping 路由匹配 --> 断言predicate匹配 RoutePredicateHandlerMapping#getHandlerInternal,找到路由Route对象
  • 返回FilteringWebHandler
  • HandlerAdapter 适配器 SimpleHandlerAdapter#handle 处理WebHandler
  • 进入到org.springframework.cloud.gateway.handler.FilteringWebHandler#handle —> filterChain处理



Flux和Mono的概念

Reactor学习文档

Flux:

在这里插入图片描述



Mono:

在这里插入图片描述



GateWayAutoConfiguration

在这里插入图片描述



在GatewayAutoConfiguration自动配置类中,它配置了很多bean对象,常见的就比如:

// 保存我们配置文件中关于网关路由相关的所有配置
// GatewayProperties保存了List<RouteDefinition>
// 而RouteDefinition就是每一个路由对象,保存了id、uri、断言集合List<PredicateDefinition>、Filter集合List<FilterDefinition>
@Bean
public GatewayProperties gatewayProperties() {
    return new GatewayProperties();
}



// Path路径匹配的断言工厂,断言相关的bean都是以RoutePredicateFactory结尾
@Bean
@ConditionalOnEnabledPredicate
public PathRoutePredicateFactory pathRoutePredicateFactory() {
    return new PathRoutePredicateFactory();
}


// 添加请求头的Filter,一般都是以GatewayFilterFactory
@Bean
@ConditionalOnEnabledFilter
public AddRequestHeaderGatewayFilterFactory addRequestHeaderGatewayFilterFactory() {
    return new AddRequestHeaderGatewayFilterFactory();
}


// 全局过滤器,把我们访问网关的url转换为路由中配置的uri
// http://localhost:8888/order/findOrderByUserId/1  --->  lb://mall-order/order/findOrderByUserId/1
@Bean
@ConditionalOnEnabledGlobalFilter
public RouteToRequestUrlFilter routeToRequestUrlFilter() {
    return new RouteToRequestUrlFilter();
}



GateWayAutoConfiguration配置的主要bean

类名说明
GatewayPropertiesgateway属性配置类
PropertiesRouteDefinitionLocator操作GatewayProperties对象,返回Flux<RouteDefinition>
RouteDefinitionRouteLocator将RouteDefinition转换为Route
RoutePredicateHandlerMappingGateway的HandlerMapping,匹配请求对应的Route,返回FilteringWebHandler
XXXRoutePredicateFactory路由断言工厂的bean
XXXGatewayFilterFactory局部Filter
GlobalFilter实现类全局Filter



DispatcherHandler

DispatcherHandler#handle作为我们查看Gateway源码的入口

  • 请求request和响应response实例会被封装为ServerWebExchange
  • 核心方法就是return语句
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    if (this.handlerMappings == null) {
        return createNotFoundError();
    }
    if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
        return handlePreFlight(exchange);
    }
    return Flux.fromIterable(this.handlerMappings) // 遍历所有的HandlerMapper
        .concatMap(mapping -> mapping.getHandler(exchange)) // 调用每一个HandlerMapper,能否找到Handler
        .next() // 继续遍历下一个HandlerMapper
        .switchIfEmpty(createNotFoundError()) // 如果HandlerMapper遍历完后都没有Handler,那么要抛异常了
        .onErrorResume(ex -> handleDispatchError(exchange, ex))
        .flatMap(handler -> handleRequestWith(exchange, handler)); // 如果找到Handler,那就去通过HandlerAdapter去调用Handler
}



fromIterable()方法的作用就是就是遍历Gateway所有的HandlerMapper,我们这里肯定最终是使用的RoutePredicateHandlerMapping这个路由断言的

在这里插入图片描述

我们接下来继续往下,遍历各个HandlerMapper,并调用mapping.getHandler(exchange)方法,这里最终会调用至RoutePredicateHandlerMapping类的getHandlerInternal()方法中,经过断言匹配后,返回一个FilteringWebHandler对象。该方法接下来会详细介绍。



中间这几行其实主要就是如果我当前往Gateway的请求,通过路由断言没有匹配上,那么就会抛异常

.next()
.switchIfEmpty(createNotFoundError())
.onErrorResume(ex -> handleDispatchError(exchange, ex))



经过路由断言匹配,得到一个WebHandler对象之后,会执行handleRequestWith(exchange, handler)方法,在该方法中会找一个与WebHandler匹配的HandlerAdapter来适配WebHandler对象,最终去调用WebHandler的



getHandler()

通过DispatcherHandler#handle方法中的.concatMap(mapping -> mapping.getHandler(exchange)) 这一行代码

进入到了AbstractHandlerMapping#getHandler

  • 遍历我们yml配置文件中所有定义的路由

  • 根据我们路由定义的断言Predicate规则去调用对应的断言工厂

  • 将匹配成功的路由保存至exchange对象中

    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
    exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ROUTE_ID_ATTR, routeId);

  • 断言匹配成功就返回一个WebHandler接口的实现类FilteringWebHandler对象

public Mono<Object> getHandler(ServerWebExchange exchange) {
    // 从这里我们就可以发现,通过getHandlerInternal(exchange)方法就能找Handler,之后的.map()方法中就直接return handler;
    return getHandlerInternal(exchange).map(handler -> {
        if (logger.isDebugEnabled()) {
            logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
        }
        ServerHttpRequest request = exchange.getRequest();
        // 正常情况下这个if都不会进入
        if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
            CorsConfiguration config = (this.corsConfigurationSource != null ?
                                        this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
            CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
            config = (config != null ? config.combine(handlerConfig) : handlerConfig);
            if (config != null) {
                config.validateAllowCredentials();
            }
            if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
                return NO_OP_HANDLER;
            }
        }
        // 直接返回handler
        return handler;
    });
}





我们这里就直接进入到RoutePredicateHandlerMapping类中

在这里插入图片描述

RoutePredicateHandlerMapping#getHandlerInternal的详细代码如下

protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
    if (this.managementPortType == DIFFERENT && this.managementPort != null
        && exchange.getRequest().getURI().getPort() == this.managementPort) {
        return Mono.empty();
    }
    exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

    return Mono.deferContextual(contextView -> {
        exchange.getAttributes().put(GATEWAY_REACTOR_CONTEXT_ATTR, contextView);
        // 核心方法是lookupRoute(exchange),这里会去进行路由的校验,根据我们配置文件中定义的路由断言规则进行校验
        return lookupRoute(exchange)
            .flatMap((Function<Route, Mono<?>>) r -> {
                // 下面几行代码就是操作exchange的属性
                // 上方的lookupRoute()方法中会添加GATEWAY_PREDICATE_ROUTE_ATTR,这里就进行移除
                exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
				// 把当前路由对象添加进exchange对象中,之后的流程还会用到我们的路由对象
                exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                // 路由匹配成功,就直接返回WebHandler对象
                return Mono.just(webHandler);
                
            }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
            // 当前请求没有任何一个路由匹配上的处理流程
            exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
            if (logger.isTraceEnabled()) {
                logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
            }
        })));
    });
}





再进入到RoutePredicateHandlerMapping#lookupRoute方法中

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
    // 获取到我们yml配置文件中所有定义的路由,并进行遍历
    return this.routeLocator.getRoutes()
        .concatMap(route -> Mono.just(route).filterWhen(r -> {
            // 添加GATEWAY_PREDICATE_ROUTE_ATTR
            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
            // 调用当前路由对象中的断言的apply()方法,apply()方法中又是一些异步的处理流程
            // 这里就会根据我们配置文件中为路由配置的各个断言,去调用各个断言对象
            return r.getPredicate().apply(exchange);
        })

        .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
        .onErrorResume(e -> Mono.empty()))
        .next()
        // TODO: error handling
        .map(route -> {
            if (logger.isDebugEnabled()) {
                logger.debug("Route matched: " + route.getId());
            }
            validateRoute(route, exchange);
            return route;
        });
}

我当前gateway的配置文件中定义了两个路由

spring:
  cloud:
    gateway:
      #设置路由:路由id、路由到微服务的uri、断言
      routes:
        - id: user_route   #路由ID,全局唯一
          uri: lb://mall-user  #lb 整合负载均衡器ribbon,loadbalancer
          predicates:
            - Path=/user/**   # 断言,路径相匹配的进行路由

        - id: order_route  #路由ID,全局唯一
          # 测试 http://localhost:8888/order/findOrderByUserId/1
          uri: lb://mall-order  #lb 整合负载均衡器loadbalancer
          predicates:
            - Path=/order/**   # 断言,路径相匹配的进行路由

所以上面的方法会执行两次

在这里插入图片描述

在这里插入图片描述

因为return r.getPredicate().apply(exchange);又是一些异步调用,但我们从方法名就能看出来,这里根据我们yml配置文件中路由定义的断言去调用对应的断言对象。这就是各个断言工厂具体的实现了,接下来就以Path路径匹配的断言工厂来举例

class DefaultAsyncPredicate<T> implements AsyncPredicate<T> {

		private final Predicate<T> delegate;
    
		@Override
		public Publisher<Boolean> apply(T t) { 
            // 调用各个Predicate对象的test()方法
			return Mono.just(delegate.test(t));
		}
    //...
}





所以我们现在就直接去看path路径匹配的断言类PathRoutePredicateFactory

@Override
public Predicate<ServerWebExchange> apply(Config config) {
    final ArrayList<PathPattern> pathPatterns = new ArrayList<>();
    synchronized (this.pathPatternParser) {
        pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchTrailingSlash());
        config.getPatterns().forEach(pattern -> {
            PathPattern pathPattern = this.pathPatternParser.parse(pattern);
            pathPatterns.add(pathPattern);
        });
    }
    return new GatewayPredicate() {
        // 会进入到test()方法中
        @Override
        public boolean test(ServerWebExchange exchange) {
            // 当前请求的uri路径  /order/findOrderByUserId/1
            PathContainer path = parsePath(exchange.getRequest().getURI().getRawPath());

            PathPattern match = null;
            for (int i = 0; i < pathPatterns.size(); i++) {
                // yml配置文件中的配置项  /order/**
                PathPattern pathPattern = pathPatterns.get(i);
                // 如果path匹配成功,那么match对象就不为null
                if (pathPattern.matches(path)) {
                    match = pathPattern;
                    break;
                }
            }

            // 如果path匹配成功,那么match对象就不为null  。匹配成功的处理逻辑
            if (match != null) {
                traceMatch("Pattern", match.getPatternString(), path, true);
                PathMatchInfo pathMatchInfo = match.matchAndExtract(path);
                putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());
                //  match.getPatternString() 为   /order/**
                exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ATTR, match.getPatternString());
                String routeId = (String) exchange.getAttributes().get(GATEWAY_PREDICATE_ROUTE_ATTR);
                // 保存当前路由id
                if (routeId != null) {
                    exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ROUTE_ID_ATTR, routeId);
                }
                return true;
            }
            // path匹配不成,返回false
            else {
                traceMatch("Pattern", config.getPatterns(), path, false);
                return false;
            }
        }

        @Override
        public Object getConfig() {
            return config;
        }

        @Override
        public String toString() {
            return String.format("Paths: %s, match trailing slash: %b", config.getPatterns(),
                                 config.isMatchTrailingSlash());
        }
    };
}



handleRequestWith()

// DispatcherHandler#handleRequestWith
private Mono<Void> handleRequestWith(ServerWebExchange exchange, Object handler) {
    if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
        return Mono.empty();  // CORS rejection
    }
    if (this.handlerAdapters != null) {
        // 遍历所有的HandlerAdapter
        for (HandlerAdapter adapter : this.handlerAdapters) {
            // 找能处理WebHandler类型的HandlerAdapter , 最终找到SimpleHandlerAdapter
            if (adapter.supports(handler)) {
                return adapter.handle(exchange, handler)
                    .flatMap(result -> handleResult(exchange, result));
            }
        }
    }
    return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}

如下图所示,找到SimpleHandlerAdapter这个

在这里插入图片描述



SimpleHandlerAdapter的详细代码如下所示

public class SimpleHandlerAdapter implements HandlerAdapter {

    @Override
    public boolean supports(Object handler) {
        return WebHandler.class.isAssignableFrom(handler.getClass());
    }

    @Override
    public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
        // 强转
        WebHandler webHandler = (WebHandler) handler;
        // getHandler()方法返回了一个FilteringWebHandler对象,这里就调用它的handle()方法
        Mono<Void> mono = webHandler.handle(exchange);
        // 返回一个空对象
        return mono.then(Mono.empty());
    }

}



FilteringWebHandler#handle方法

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    // getHandler()方法中存入了Route路由对象,这里取出来,该对象保存着我们yml配置文件中的配置
    Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
    // 取出我们yml配置文件中 该路由的局部Filter,我这里有三个,一个添加请求头、一个添加请求参数、一个自定义的
    List<GatewayFilter> gatewayFilters = route.getFilters();

    // 全局Filter
    List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
    // 将局部Filter和全局Filter存入一个集合中
    combined.addAll(gatewayFilters);
    // 排序
    AnnotationAwareOrderComparator.sort(combined);

    if (logger.isDebugEnabled()) {
        logger.debug("Sorted gatewayFilterFactories: " + combined);
    }
	
    // 调用各自的filter()方法
    return new DefaultGatewayFilterChain(combined).filter(exchange);
}



在这里插入图片描述

[GatewayFilterAdapter{RemoveCachedBodyFilter@5cfed0ba}, order = -2147483648]"
[GatewayFilterAdapter{AdaptCachedBodyGlobalFilter@22a6d75c}, order = -2147482648]"
[GatewayFilterAdapter{NettyWriteResponseFilter@691567ea}, order = -1]"
[GatewayFilterAdapter{ForwardPathFilter@28be7fec}, order = 0]"
// 三个局部Filter
[[AddRequestHeader X-Request-color = 'red'], order = 1]"
[[AddRequestParameter color = 'blue'], order = 2]"
[com.tuling.mall.gateway.filter.CheckAuthGatewayFilterFactory$$Lambda$980/0x0000014b3c649390@54f513cb, order = 3]"
// 路由转换 把http://localhost:8888/order/findOrderByUserId/1  --->  lb://mall-order/order/findOrderByUserId/1
[GatewayFilterAdapter{RouteToRequestUrlFilter@5c8d58ed}, order = 10000]"
// 根据lb://前缀过滤处理,使用serviceId选择一个服务实例,从而实现负载均衡
[GatewayFilterAdapter{ReactiveLoadBalancerClientFilter@437ed416}, order = 10150]"
[GatewayFilterAdapter{LoadBalancerServiceInstanceCookieFilter@11f23038}, order = 10151]"
[GatewayFilterAdapter{WebsocketRoutingFilter@26f0141}, order = 2147483646]"
// 发送netty 请求
[GatewayFilterAdapter{NettyRoutingFilter@de77146}, order = 2147483647]"
[GatewayFilterAdapter{ForwardRoutingFilter@6a567f7b}, order = 2147483647]"



接下来就挑几个重要的全局GlobalFilter来分析

RouteToRequestUrlFilter

路由转换 把http://localhost:8888/order/findOrderByUserId/1?color=blue —> lb://mall-order/order/findOrderByUserId/1?color=blue

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    // 从exchange中取路由Route对象
    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
    if (route == null) {
        return chain.filter(exchange);
    }
    log.trace("RouteToRequestUrlFilter start");
    // 取当前请求uri : http://localhost:8888/order/findOrderByUserId/1?color=blue
    URI uri = exchange.getRequest().getURI();
    boolean encoded = containsEncodedParts(uri);
    // 路由对象中保存的uri,也就是我们在yml文件中配置的值:   lb://mall-order
    URI routeUri = route.getUri();

    if (hasAnotherScheme(routeUri)) {
        exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
        routeUri = URI.create(routeUri.getSchemeSpecificPart());
    }

    // 如果我们在yml文件中配置的uri,即不是lb开头并且host还为null,那么就抛异常
    if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
        throw new IllegalStateException("Invalid host: " + routeUri.toString());
    }

    // 转换结果为:  lb://mall-order/order/findOrderByUserId/1?color=blue
    URI mergedUrl = UriComponentsBuilder.fromUri(uri)
        .scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();
    // 存入exchange中,之后的LoadBalancer全局Filter中会用到
    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
    return chain.filter(exchange);
}



ReactiveLoadBalancerClientFilter

解析lb://服务名,去服务注册中心获取服务实例instance,并通过负载均衡算法选择一个具体的instance

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	// 此时的url是这个样子:   lb://mall-order/order/findOrderByUserId/1?color=blue
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
    if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
        return chain.filter(exchange);
    }
    addOriginalRequestUrl(exchange, url);

    if (log.isTraceEnabled()) {
        log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
    }

    // 再获取一遍:lb://mall-order/order/findOrderByUserId/1?color=blue
    URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    // 就是服务名:    mall-order
    String serviceId = requestUri.getHost();
    Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
        .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
                                         RequestDataContext.class, ResponseData.class, ServiceInstance.class);
    // 请求信息 封装成一个对象
    DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
        new RequestDataContext(new RequestData(exchange.getRequest()), getHint(serviceId)));
    // 调用choose()方法
    return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {

        // 服务注册中心的响应response,获取server实例对象
        ServiceInstance retrievedInstance = response.getServer();

        // 值为http://localhost:8888/order/findOrderByUserId/1?color=blue
        URI uri = exchange.getRequest().getURI();

        String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
        if (schemePrefix != null) {
            overrideScheme = url.getScheme();
        }

        // 服务实例
        DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,overrideScheme);

        // 最终通过上面的服务实例,修改之后的请求url为:http://192.168.236.173:8020/order/findOrderByUserId/1?color=blue
        URI requestUrl = reconstructURI(serviceInstance, uri);

        if (log.isTraceEnabled()) {
            log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
        }
        // 存入exchange对象中,之后netty发送请求会用到该url
        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
        exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
    }).then(chain.filter(exchange))
        .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
						.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
								CompletionContext.Status.FAILED, throwable, lbRequest,
								exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
		.doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
						.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
								CompletionContext.Status.SUCCESS, lbRequest,
								exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
								new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
}



// 进入到choose()方法中
private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
                                               Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
    ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId,
                                                                                      ReactorServiceInstanceLoadBalancer.class);
    if (loadBalancer == null) {
        throw new NotFoundException("No loadbalancer available for " + serviceId);
    }
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
    // 调用ReactorLoadBalancer对象的choose()方法
    return loadBalancer.choose(lbRequest);
}



NettyRoutingFilter

发送netty请求

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    // 经过负载均衡之后的请求url,具体的下游服务请求地址
    // http://192.168.236.173:8020/order/findOrderByUserId/1?color=blue
    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

    // scheme为http
    String scheme = requestUrl.getScheme();
    if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))) {
        return chain.filter(exchange);
    }
    setAlreadyRouted(exchange);

    ServerHttpRequest request = exchange.getRequest();
	// GET 请求
    final HttpMethod method = HttpMethod.valueOf(request.getMethod().name());
    // url为 http://192.168.236.173:8020/order/findOrderByUserId/1?color=blue
    final String url = requestUrl.toASCIIString();
	// 请求头
    HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

    final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
    filtered.forEach(httpHeaders::set);

    boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
    // 路由对象
    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

    Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
        headers.add(httpHeaders);
        // Will either be set below, or later by Netty
        headers.remove(HttpHeaders.HOST);
        if (preserveHost) {
            String host = request.getHeaders().getFirst(HttpHeaders.HOST);
            headers.add(HttpHeaders.HOST, host);
        }
        
        // 发送请求
    }).request(method).uri(url).send((req, nettyOutbound) -> {
        if (log.isTraceEnabled()) {
            nettyOutbound.withConnection(connection -> log.trace(...);
        }
        return nettyOutbound.send(request.getBody().map(this::getByteBuf));
    }).responseConnection((res, connection) -> {

        // Defer committing the response until all route filters have run
        // Put client response as ServerWebExchange attribute and write
        // response later NettyWriteResponseFilter
        exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
        exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

        ServerHttpResponse response = exchange.getResponse();
        // put headers and status so filters can modify the response
        HttpHeaders headers = new HttpHeaders();

        res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

        String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
        if (StringUtils.hasLength(contentTypeValue)) {
            exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
        }

        setResponseStatus(res, response);

        // make sure headers filters run after setting status so it is
        // available in response
        HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
                                                                       Type.RESPONSE);

        if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
            && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
            // It is not valid to have both the transfer-encoding header and
            // the content-length header.
            // Remove the transfer-encoding header in the response if the
            // content-length header is present.
            response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
        }

        exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());

        response.getHeaders().addAll(filteredResponseHeaders);

        return Mono.just(res);
    });

    Duration responseTimeout = getResponseTimeout(route);
    if (responseTimeout != null) {
        responseFlux = responseFlux
            .timeout(responseTimeout,
                     Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
            .onErrorMap(TimeoutException.class,
                        th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
    }

    return responseFlux.then(chain.filter(exchange));
}



补充知识

适配器模式

FilteringWebHandler#handle方法中,先获取路由的局部Filter,在创建一个集合存放全局Filter,在把局部Filter和全局Filter放在一起。这里就有一个问题:

局部Filter的类型是GatewayFilter,而全局Filter的类型是GlobalFilter,它们是怎么通过下面这种方式存放在一个集合中的嘞?

// 局部Filter
List<GatewayFilter> gatewayFilters = route.getFilters();
// 全局Filter
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
// 将局部Filter和全局Filter存入一个集合中
combined.addAll(gatewayFilters);



这里就用到了适配器模式,具体实现步骤是:

  • 创建一个GatewayFilterAdapter类,实现 GatewayFilter接口
  • GatewayFilterAdapter类中定义一个GlobalFilter属性,构造方法中传GlobalFilter类型的对象赋值给该属性
  • 实现GatewayFilter接口的filter()方法,在filter()方法中调用全局Filter的filter()方法
private static class GatewayFilterAdapter implements GatewayFilter {

    private final GlobalFilter delegate;

    GatewayFilterAdapter(GlobalFilter delegate) {
        this.delegate = delegate;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return this.delegate.filter(exchange, chain);
    }
}



遍历List<GlobalFilter> globalFilters全局Filter,分别存入GatewayFilterAdapter对象中

List<GatewayFilterAdapter> ----> List<GatewayFilter> globalFilters集合

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
    // 局部Filter
    List<GatewayFilter> gatewayFilters = route.getFilters();
    // 全局Filter,这样就把全局GlobalFilter变为了GatewayFilter类型了
    List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
    // 存入一个集合中
    combined.addAll(gatewayFilters);
    AnnotationAwareOrderComparator.sort(combined);


    // 调用各自的filter()方法
    return new DefaultGatewayFilterChain(combined).filter(exchange);
}



详细流程图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1939029.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在线 PDF 制作者泄露用户上传的文档

两家在线 PDF 制作者泄露了数万份用户文档&#xff0c;包括护照、驾驶执照、证书以及用户上传的其他个人信息。 我们都经历过这样的情况&#xff1a;非常匆忙&#xff0c;努力快速制作 PDF 并提交表单。许多人向在线 PDF 制作者寻求帮助&#xff0c;许多人的祈祷得到了回应。 …

Python学习笔记—100页Opencv详细讲解教程

目录 1 创建和显示窗口... - 4 - 2 加载显示图片... - 6 - 3 保存图片... - 7 - 4 视频采集... - 8 - 5视频录制... - 11 - 6 控制鼠标... - 12 - 7 TrackBar 控件... - 14 - 8.RGB和BGR颜色空间... - 16 - 9.HSV和HSL和YUV.. - 17 - 10 颜色空间的转化... - 18 - …

分页查询与分页条件查询

--------------- 无PageHelper插件分页查询 1.创建PageBean实体类 Data NoArgsConstructor AllArgsConstructor public class PageBean<T> {private Long total;//总条数private List<T> items;//当前页数据集合 }类型安全性 泛型&#xff1a;提供了编译时的类型…

【学长工具库】1.如何快速部署开源框架 | 若依框架保姆级搭建教程

今天学长带来了一款十分适合自学的开源框架-若依框架&#xff0c; 本文会详细的教大家怎么部署这个系统。 文末有所有资料获取方式~ 框架技术栈 前端采用 Vue、Element UI。后端采用 Spring Boot、Spring Security、Redis & Jwt。权限认证使用 Jwt&#xff0c;支持多终端…

【IEEE出版,会议历史良好、论文录用检索快】第四届计算机科学与区块链国际学术会议 (CCSB 2024,9月6-8)

CCSB 2024会议由深圳大学主办&#xff0c;旨在探讨计算机科学的最新发展如何与区块链技术相结合&#xff0c;以及这一结合如何推动金融、供应链管理、数据安全和其他多个行业的革新&#xff0c; 本次会议将提供一个多学科交流的平台&#xff0c;汇集来自相关领域学者的研究和思…

vxe-弹窗初始化激活选中Vxe-Table表格中第一行input输入框

1.实现效果 2.Modal弹窗的渲染过程 一、Vue组件的生命周期 Vue组件从创建到销毁会经历一系列的生命周期钩子&#xff0c;这些钩子为开发者提供了在不同阶段插入自定义逻辑的机会。在Modal弹窗的上下文中&#xff0c;这些生命周期钩子同样适用。 beforeCreate&#xff1a;组件…

解决 Ubuntu 用户登录后的 shell 和功能问题

在使用 Ubuntu 系统管理用户时&#xff0c;可能会遇到一些常见的问题&#xff0c;比如新创建的用户无法使用常见命令&#xff08;如 ll&#xff09;以及输出信息没有颜色。这些问题通常与用户的默认 shell 有关。本文将总结如何解决这些问题&#xff0c;并确保新用户能够正常使…

【linux深入剖析】命名管道 | 匿名管道与命名管道的区别 | system V共享内存

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 1. 命名管道2. 创建命名管…

STM32之九:ADC模数转换器

目录 1. 简介 2. ADC 2.1 逐次逼近型寄存器SAR 2.2 ADC转换时间 3 ADC框图 3.1 8 bit ADC0809芯片内部框图 3.2 ADC框图 3.2.1 注入通道和规则通道 3.2.2 单次/连续转换模式 3.2.3 扫描模式 3.2.4 外部触发转换 3.2.5 数据对齐 3.2.6 模拟看门狗 4. 总结和ADC驱…

mac无法清空废纸篓怎么办 mac废纸篓清空了如何找回 cleanmymac误删文件怎么恢复

废纸篓相当于“一颗后悔药”&#xff0c;用于临时存储用户删除的文件。我们从从Mac上删除的文件&#xff0c;一般会进入废纸篓中。如果我们后悔了&#xff0c;可以从废纸篓中找回来。然而&#xff0c;有时我们会发现mac无法清空废纸篓&#xff0c;这是怎么回事?本文将探讨一些…

Unity-URP-SSAO记录

勾选After Opacity Unity-URP管线&#xff0c;本来又一个“bug”, 网上查不到很多关于ssao的资料 以为会不会又是一个极度少人用的东西 而且几乎都是要第三方替代 也完全没有SSAO大概的消耗是多少&#xff0c;完全是黑盒(因为用的人少&#xff0c;研究的人少&#xff0c;优…

JMeter学习笔记:线程组

继续&#xff1a;请求&#xff08;Sampler元件模拟的用户请求&#xff09;出错后继续运行&#xff1b; 启动下一进程&#xff1a;如果出错&#xff0c;则同一脚本中的余下请求将不再执行&#xff0c;直接重新开始执行&#xff1b; 停止线程&#xff1a;如果遇到请求&#xff…

51单片机嵌入式开发:14、STC89C52RC 之HX1838红外解码NEC+数码管+串口打印+LED显示

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 STC89C52RC 之HX1838红外解码NEC数码管串口打印LED显示 STC89C52RC 之HX1838红外解码NEC数码管串口打印LED显示1 概述2 硬件电路2.1 遥控器2.2 红外接收器电路2.3 STC89C52单…

深入理解Linux网络(三):TCP对象创建

深入理解Linux网络&#xff08;三&#xff09;&#xff1a;TCP对象创建 TCP对象创建inet_createsock_init_data TCP对象创建 常见的三句TCP编程&#xff1a; int main() {int sk socket(AF_INET, SOCK_STREAM, 0);connect(sk, ...)recv(sk, ...) }简单的两三⾏代码&#xff…

逆向案例二十八——某高考志愿网异步请求头参数加密,以及webpack

网址&#xff1a;aHR0cDovL3d3dy54aW5nYW9rYW90Yi5jb20vY29sbGVnZXMvc2VhcmNo 抓包分析&#xff0c;发现请求头有参数u-sign是加密的&#xff0c;载荷没有进行加密&#xff0c;直接跟栈分析。 进入第二个栈&#xff0c;打上断点&#xff0c;分析有没有加密位置。 可以看到参数…

【PyTorch】图像二分类项目-部署

【PyTorch】图像二分类项目 【PyTorch】图像二分类项目-部署 在独立于训练脚本的新脚本中部署用于推理的模型&#xff0c;需要构造一个模型类的对象&#xff0c;并将权重加载到模型中。操作流程为&#xff1a;定义模型--加载权重--在验证和测试数据集上部署模型。 import torch…

Windows11 安装Docker,安装至D盘(其他非C盘皆可)

Docker默认安装在C盘&#xff0c;这未来随着docker使用必定会导致C盘空间吃紧。 所以本文提前进行空间布局&#xff0c;将docker默认安装路径软链接到D盘。 软链接D盘 Docker默认安装路径为C:\Program Files\Docker。使用管理员权限打开命令终端 输入以下命令&#xff1a;m…

【LeetCode】day14:226 - 翻转二叉树, 101 - 对称二叉树, 104 - 二叉树的最大深度, 111 - 二叉树的最小深度

LeetCode 代码随想录跟练 Day14 226.翻转二叉树101.对称二叉树104.二叉树的最大深度111.二叉树的最小深度 226.翻转二叉树 题目描述&#xff1a; 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 使用递归处理&#xff08;迭代以及层序同…

jdk1.8 List集合Stream流式处理

jdk1.8 List集合Stream流式处理 一、介绍(为什么需要流Stream&#xff0c;能解决什么问题&#xff1f;)1.1 什么是 Stream&#xff1f;1.2 常见的创建Stream方法1.3 常见的中间操作1.4 常见的终端操作 二、创建流Stream2.1 Collection的.stream()方法2.2 数组创建流2.3 静态工厂…

单链表的创建与遍历--C

基本结构声明 struct node{int data; //数据域struct node *next;//指针域 }; #include<stdio.h> #include<stdlib.h>struct node{//链表结点 int data;//数据域 struct node *next;//指针域 }; typedef struct node Node; int main(void){Node *head,*p,*…