spring cloud gateway源码分析,一个请求进来的默认处理流程

news2024/11/18 17:20:08

1.前言

spring cloud gateway的基本组成和作用就不细赘述,此篇适合对此有一定了解的人阅读。
spring cloud gateway版本: Hoxton.SR1

spring cloud gateway的配置使用yml配置:

server:
  port: 9527y

#根据微服务名称进行动态路由的配置
spring:
  application:
    name: cloud-gateway
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true                     #开启从注册中心动态创建路由的功能,利用微服务名称进行路由
      routes:                
        - id: config-client
          uri: lb://config-client
          predicates:
            - Path=/config/**
          filters:
            - RewritePath=/config/?(?<segment>.*),/config/v1/$\{segment}

2. 流程图

在这里插入图片描述
先看一张官网文档给的图,此图大概描述了请求的处理原理,各个组件大致的位置。

3.源码剖析

在这里插入图片描述
http底层处理是基于netty,netty是一个高性能异步事件驱动的通讯框架,对于netty的处理流程可以查阅其源码。netty读取完数据经过pipeline管道处理后,最终调用到reactor.netty.http.server.HttpServerHandle#onStateChange方法。然后经过层层方法调用到核心类org.springframework.web.reactive.DispatcherHandler#handle

	public Mono<Void> handle(ServerWebExchange exchange) {
		if (this.handlerMappings == null) {
		    //没有合适的handler返回失败
			return createNotFoundError();
		}
		return Flux.fromIterable(this.handlerMappings)
		        //mapping.getHandler是关键方法,根据handlerMapping找到对应的handler
				.concatMap(mapping -> mapping.getHandler(exchange))
				.next()
				.switchIfEmpty(createNotFoundError())
				//invokeHandler是关键方法,调用处理逻辑
				.flatMap(handler -> invokeHandler(exchange, handler))
				//处理结果,写出
				.flatMap(result -> handleResult(exchange, result));
	}

	private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
		if (this.handlerAdapters != null) {
			for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
				if (handlerAdapter.supports(handler)) {
					//查找合适的handlerAdapter处理,默认会调用到SimpleHandlerAdapter#handle
					return handlerAdapter.handle(exchange, handler);
				}
			}
		}
		return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
	}

	private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
		return getResultHandler(result).handleResult(exchange, result)
				.checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]")
				.onErrorResume(ex ->
						result.applyExceptionHandler(ex).flatMap(exResult -> {
							String text = "Exception handler " + exResult.getHandler() +
									", error=\"" + ex.getMessage() + "\" [DispatcherHandler]";
							return getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text);
						}));
	}

	private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
		if (this.resultHandlers != null) {
			for (HandlerResultHandler resultHandler : this.resultHandlers) {
				if (resultHandler.supports(handlerResult)) {
					return resultHandler;
				}
			}
		}
		throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
	}

handlerMappings的注入类看下图,最后通过RoutePredicateHandlerMapping找到合适的处理类。handlerMappings中的其他几种Mapping方式,是别的策略或者配置时会用到,可以思考是怎么用的。

在这里插入图片描述

先来看看mapping.getHandler的处理逻辑,默认会调用到org.springframework.web.reactive.handler.AbstractHandlerMapping#getHandler

	@Override
	public Mono<Object> getHandler(ServerWebExchange exchange) {
		//getHandlerInternal,根据exchange真正去查找合适的处理handler,根据上面解释,
		//getHandlerInternal调用到RoutePredicateHandlerMapping类中去
		return getHandlerInternal(exchange).map(handler -> {
			if (logger.isDebugEnabled()) {
				logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
			}
			//跨域处理
			if (hasCorsConfigurationSource(handler)) {
				ServerHttpRequest request = exchange.getRequest();
				CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
				CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
				config = (config != null ? config.combine(handlerConfig) : handlerConfig);
				if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
					return REQUEST_HANDLED_HANDLER;
				}
			}
			return handler;
		});
	}

现在调用到了org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping#getHandlerInternal方法中,这里一个关键点就来了。Predicates断言,是路由配置的关键,根据predicates的结果,满足的话就会转发请求到对应的Router配置的uri上。

	@Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
		// don't handle requests on management port if set and different than server port
		if (this.managementPortType == DIFFERENT && this.managementPort != null
				&& exchange.getRequest().getURI().getPort() == this.managementPort) {
			return Mono.empty();
		}
		exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
		
		//lookupRoute(exchange)去查找满足断言条件的路由Router
		return lookupRoute(exchange)
				// 满足的router会被组装到exchange中,然后返回webHandler
				.flatMap((Function<Route, Mono<?>>) r -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isDebugEnabled()) {
						logger.debug(
								"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
					}

					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
					return Mono.just(webHandler);
				}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isTraceEnabled()) {
						logger.trace("No RouteDefinition found for ["
								+ getExchangeDesc(exchange) + "]");
					}
				})));
	}

	protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
		return this.routeLocator.getRoutes()
				// 遍历所有的Router,此行r.getPredicate().apply(exchange)是验证是否满足断言要求,
				// 满足的Router会被返回
				.concatMap(route -> Mono.just(route).filterWhen(r -> {
					// add the current route we are testing
					exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
					return r.getPredicate().apply(exchange);
				})
						// instead of immediately stopping main flux due to error, log and
						// swallow it
						.doOnError(e -> logger.error(
								"Error applying predicate for route: " + route.getId(),
								e))
						.onErrorResume(e -> Mono.empty()))
				// .defaultIfEmpty() put a static Route not found
				// or .switchIfEmpty()
				// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
				.next()
				// TODO: error handling
				.map(route -> {
					if (logger.isDebugEnabled()) {
						logger.debug("Route matched: " + route.getId());
					}
					validateRoute(route, exchange);
					return route;
				});

		/*
		 * TODO: trace logging if (logger.isTraceEnabled()) {
		 * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
		 */
	}

apply调用进入org.springframework.cloud.gateway.handler.AsyncPredicate.DefaultAsyncPredicate#apply方法,查看delegate.test(t)调用的实现类,可以发现所有断言的调用,此处根据我们配置的断言规则调用对应的断言,返回Boolean。
在这里插入图片描述
通过断言拿到对应的handler后回到DispatcherHandler#handle方法接下来调用invokeHandler(exchange, handler)

//这个handler 是 FilteringWehHandler
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
		if (this.handlerAdapters != null) {
			for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
				if (handlerAdapter.supports(handler)) {
					//使用SimpleHandlerAdapter来处理
					return handlerAdapter.handle(exchange, handler);
				}
			}
		}
		return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
	}

然后调用org.springframework.web.reactive.result.SimpleHandlerAdapter#handle

	@Override
	public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
		WebHandler webHandler = (WebHandler) handler;
		//handler是FilteringWehHandler,所以调用到FilteringWehHandler.handle
		Mono<Void> mono = webHandler.handle(exchange);
		return mono.then(Mono.empty());
	}

org.springframework.cloud.gateway.handler.FilteringWebHandler#handle

	@Override
	public Mono<Void> handle(ServerWebExchange exchange) {
		Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
		//取出之前匹配的Router,取出filters,如果配置了的话
		List<GatewayFilter> gatewayFilters = route.getFilters();
		//GatewayFilter和globalFilter合并,并按order排序
		List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
		combined.addAll(gatewayFilters);
		// TODO: needed or cached?
		AnnotationAwareOrderComparator.sort(combined);

		if (logger.isDebugEnabled()) {
			logger.debug("Sorted gatewayFilterFactories: " + combined);
		}
		//进入过滤链调用filters
		return new DefaultGatewayFilterChain(combined).filter(exchange);
	}


		@Override
		public Mono<Void> filter(ServerWebExchange exchange) {
			return Mono.defer(() -> {
				if (this.index < filters.size()) {
					GatewayFilter filter = filters.get(this.index);
					//index 每次+1,设置到chain中,传递到下一次filter,下一次filter时取就是next的filter
					DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
							this.index + 1);
					//filter链执行
					return filter.filter(exchange, chain);
				}
				else {
					return Mono.empty(); // complete
				}
			});
		}

filter链很重要,是spring cloud gateway的扩展点,可以做扩展逻辑,比如权限校验,登录认证,日志等。默认情况下的filter链如下,需要关注一下LoadBalancerClientFilter和NettyRoutingFilter
在这里插入图片描述

org.springframework.cloud.gateway.filter.LoadBalancerClientFilter#filter

	@Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
		//使用协议 http还是lb
		String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
		if (url == null
				|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
			return chain.filter(exchange);
		}
		// 保存原始请求url
		addOriginalRequestUrl(exchange, url);

		if (log.isTraceEnabled()) {
			log.trace("LoadBalancerClientFilter url before: " + url);
		}
		//根据注册中心的信息,使用负载均衡算法,找一个可用的服务
		final ServiceInstance instance = choose(exchange);

		if (instance == null) {
			throw NotFoundException.create(properties.isUse404(),
					"Unable to find instance for " + url.getHost());
		}

		URI uri = exchange.getRequest().getURI();

		// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
		// if the loadbalancer doesn't provide one.
		String overrideScheme = instance.isSecure() ? "https" : "http";
		if (schemePrefix != null) {
			overrideScheme = url.getScheme();
		}
		//替换成真实服务器的地址,后续调用使用
		URI requestUrl = loadBalancer.reconstructURI(
				new DelegatingServiceInstance(instance, overrideScheme), uri);

		if (log.isTraceEnabled()) {
			log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
		}

		exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
		return chain.filter(exchange);
	}

org.springframework.cloud.gateway.filter.NettyRoutingFilter#filter 处理http和https请求的发送

	@Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

		String scheme = requestUrl.getScheme();
		if (isAlreadyRouted(exchange)
				|| (!"http".equals(scheme) && !"https".equals(scheme))) {
			return chain.filter(exchange);
		}
		setAlreadyRouted(exchange);

		ServerHttpRequest request = exchange.getRequest();

		final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
		final String url = requestUrl.toASCIIString();

		HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

		final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
		filtered.forEach(httpHeaders::set);

		boolean preserveHost = exchange
				.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
		Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
		//发送请求
		Flux<HttpClientResponse> responseFlux = httpClientWithTimeoutFrom(route)
				.headers(headers -> {
					headers.add(httpHeaders);
					// Will either be set below, or later by Netty
					headers.remove(HttpHeaders.HOST);
					if (preserveHost) {
						String host = request.getHeaders().getFirst(HttpHeaders.HOST);
						headers.add(HttpHeaders.HOST, host);
					}
				}).request(method).uri(url).send((req, nettyOutbound) -> {
					if (log.isTraceEnabled()) {
						nettyOutbound
								.withConnection(connection -> log.trace("outbound route: "
										+ connection.channel().id().asShortText()
										+ ", inbound: " + exchange.getLogPrefix()));
					}
					return nettyOutbound.send(request.getBody()
							.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
									.getNativeBuffer()));
				}).responseConnection((res, connection) -> {

					// Defer committing the response until all route filters have run
					// Put client response as ServerWebExchange attribute and write
					// response later NettyWriteResponseFilter
					exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
					exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

					ServerHttpResponse response = exchange.getResponse();
					// put headers and status so filters can modify the response
					HttpHeaders headers = new HttpHeaders();

					res.responseHeaders().forEach(
							entry -> headers.add(entry.getKey(), entry.getValue()));

					String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
					if (StringUtils.hasLength(contentTypeValue)) {
						exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
								contentTypeValue);
					}

					setResponseStatus(res, response);

					// make sure headers filters run after setting status so it is
					// available in response
					HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
							getHeadersFilters(), headers, exchange, Type.RESPONSE);

					if (!filteredResponseHeaders
							.containsKey(HttpHeaders.TRANSFER_ENCODING)
							&& filteredResponseHeaders
									.containsKey(HttpHeaders.CONTENT_LENGTH)) {
						// It is not valid to have both the transfer-encoding header and
						// the content-length header.
						// Remove the transfer-encoding header in the response if the
						// content-length header is present.
						response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
					}

					exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
							filteredResponseHeaders.keySet());

					response.getHeaders().putAll(filteredResponseHeaders);

					return Mono.just(res);
				});

		Duration responseTimeout = getResponseTimeout(route);
		if (responseTimeout != null) {
			responseFlux = responseFlux
					.timeout(responseTimeout, Mono.error(new TimeoutException(
							"Response took longer than timeout: " + responseTimeout)))
					.onErrorMap(TimeoutException.class,
							th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
									th.getMessage(), th));
		}

		return responseFlux.then(chain.filter(exchange));
	}

以上,就是请求进来的处理过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1276243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

zookeeper心跳检测 (实操课程)

本系列是zookeeper相关的实操课程&#xff0c;课程测试环环相扣&#xff0c;请按照顺序阅读来学习和测试zookeeper。 阅读本文之前&#xff0c;请先阅读----​​​​​​zookeeper 单机伪集群搭建简单记录&#xff08;实操课程系列&#xff09;zookeeper 客户端常用命令简单记录…

人工智能-优化算法之学习率调度器

学习率调度器 到目前为止&#xff0c;我们主要关注如何更新权重向量的优化算法&#xff0c;而不是它们的更新速率。 然而&#xff0c;调整学习率通常与实际算法同样重要&#xff0c;有如下几方面需要考虑&#xff1a; 首先&#xff0c;学习率的大小很重要。如果它太大&#xf…

知识管理平台Confluence:win10安装confluence

文章目录 介绍主要功能 安装教程安装java运行平台JRE安装数据库Postgresql在Postgresql创建confluence使用的数据库创建数据库用户创建数据库 安装confluence注册confluence启动confluence 参考链接 介绍 Confluence 是由澳大利亚软件公司 Atlassian 开发的企业协作平台。它提…

flutter开发实战-ValueListenableBuilder实现局部刷新功能

flutter开发实战-ValueListenableBuilder实现局部刷新功能 在创建的新工程中&#xff0c;点击按钮更新counter后&#xff0c;通过setState可以出发本类的build方法进行更新。当我们只需要更新一小部分控件的时候&#xff0c;通过setState就不太合适了&#xff0c;这就需要进行…

canvas基础:渲染文本

canvas实例应用100 专栏提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。 canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重要的帮助。 文章目录 示例…

java设计模式学习之【桥接模式】

文章目录 引言桥接模式简介定义与用途&#xff1a;实现方式 使用场景优势与劣势桥接模式在Spring中的应用绘图示例代码地址 引言 想象你正在开发一个图形界面应用程序&#xff0c;需要支持多种不同的窗口操作系统。如果每个系统都需要写一套代码&#xff0c;那将是多么繁琐&am…

scrapy爬虫中间件和下载中间件的使用

一、关于中间件 之前文章说过&#xff0c;scrapy有两种中间件&#xff1a;爬虫中间件和下载中间件&#xff0c;他们的作用时间和位置都不一样&#xff0c;具体区别如下&#xff1a; 爬虫中间件&#xff08;Spider Middleware&#xff09; 作用&#xff1a; 爬虫中间件主要负…

SQL Server 2016(基本概念和命令)

1、文件类型。 【1】主数据文件&#xff1a;数据库的启动信息。扩展名为".mdf"。 【2】次要&#xff08;辅助&#xff09;数据文件&#xff1a;主数据之外的数据都是次要数据文件。扩展名为".ndf"。 【3】事务日志文件&#xff1a;包含恢复数据库的所有事务…

深入理解前端路由:构建现代 Web 应用的基石(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

2024年天津天狮学院专升本专业课报名缴费流程

天津天狮学院高职升本缴费流程 一、登录缴费系统 二、填写个人信息&#xff0c;进行缴费 1.在姓名处填写“姓名”&#xff0c;学号处填写“身份证号”&#xff0c;如下图所示&#xff1a; 此处填写身份证号 2.单击查询按钮&#xff0c;显示报考专业及缴费列表&#xff0c;…

JPA数据源Oracle异常记录

代码执行异常 ObjectOptimisticLockingFailureException org.springframework.orm.ObjectOptimisticLockingFailureException: Batch update returned unexpected row count from update [0]; actual row count: 0; expected: 1; nested exception is org.hibernate.StaleSta…

从0开始学习JavaScript--JavaScript ES6 模块系统

JavaScript ES6&#xff08;ECMAScript 2015&#xff09;引入了官方支持的模块系统&#xff0c;使得前端开发更加现代化和模块化。本文将深入探讨 ES6 模块系统的各个方面&#xff0c;通过丰富的示例代码详细展示其核心概念和实际应用。 ES6 模块的基本概念 1 模块的导出 ES…

java原子类型

AtomicBoolean AtomicInteger AtomicLong AtomicReference<V> StringBuilder - 不是原子类型。StringBuilder 是 java.lang 包下的类 用法&#xff1a;无需回调改变数值

基于springboot + vue框架的网上商城系统

qq&#xff08;2829419543&#xff09;获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;springboot 前端&#xff1a;采用vue技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xf…

Linux:vim的简单使用

个人主页 &#xff1a; 个人主页 个人专栏 &#xff1a; 《数据结构》 《C语言》《C》《Linux》 文章目录 前言一、vim的基本概念二、vim的基本操作三、vim正常模式命令集四、vim底行模式命令集五、.xxx.swp的解决总结 前言 本文是对Linux中vim使用的总结 一、vim的基本概念 …

C语言:求十个数中的平均数

分析&#xff1a; 程序中定义了一个average函数&#xff0c;用于计算分数的平均值。该函数接受一个包含10个分数的数组作为参数&#xff0c;并返回平均值。在主函数main中&#xff0c;首先提示输入10个分数&#xff0c;然后使用循环读取输入的分数&#xff0c;并将它们存储在名…

iris+vue上传到本地存储【go/iris】

iris部分 //main.go package mainimport ("fmt""io""net/http""os" )//上传视频文件部分 func uploadHandler_video(w http.ResponseWriter, r *http.Request) {// 解析上传的文件err : r.ParseMultipartForm(10 << 20) // 设置…

Nacos 架构原理

基本架构及概念​ 服务 (Service)​ 服务是指一个或一组软件功能&#xff08;例如特定信息的检索或一组操作的执行&#xff09;&#xff0c;其目的是不同的客户端可以为不同的目的重用&#xff08;例如通过跨进程的网络调用&#xff09;。Nacos 支持主流的服务生态&#xff0c…

基于springboot + vue在线考试系统

qq&#xff08;2829419543&#xff09;获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;springboot 前端&#xff1a;采用vue技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xf…

更改Jupyter Notebook 默认存储路径

import osprint(os.path.abspath(.)) 然后打开cmd,输入&#xff1a; jupyter notebook --generate-config 按照路径在本地文件夹中找到那个文件。 然后找到"c.NotebookApp.notebook_dir"这条语句&#xff1a;&#xff08;直接通过"crtlf"输入关键字找阿 …