前言
熟悉 Spring Cloud Gateway 的人都知道 Gateway 提供了鉴权、路由等功能,本篇我们重点分析的是 Gateway 的动态路由功能。
Gateway Actuator API 方法源码解析
Spring Cloud Gateway 的 Actuator 端点允许监视 Spring Cloud Gateway 应用程序并与之交互,Gateway 依赖中默认包含了路由端点操作,我们引入 spring-boot-starter-actuator。
Gateway 端点开启方式如下:
management.endpoint.gateway.enabled=true
management.endpoints.web.exposure.include=gateway
我们在 IDEA 中按住 Ctrl,用鼠标点击 enable 就可以看 Gateway 端点的源码,如下:
GatewayControllerEndpoint 源码解析
GatewayControllerEndpoint 主要提供了三个方法,作用如下:
- routesdef():获取路由定义信息。
- routes():获取网关已经加载的所有路由信息。
- route(@PathVariable String id):根据路由 id 获取路由信息。
@RestControllerEndpoint(
id = "gateway"
)
public class GatewayControllerEndpoint extends AbstractGatewayControllerEndpoint {
public GatewayControllerEndpoint(List<GlobalFilter> globalFilters, List<GatewayFilterFactory> gatewayFilters, List<RoutePredicateFactory> routePredicates, RouteDefinitionWriter routeDefinitionWriter, RouteLocator routeLocator, RouteDefinitionLocator routeDefinitionLocator) {
super(routeDefinitionLocator, globalFilters, gatewayFilters, routePredicates, routeDefinitionWriter, routeLocator);
}
//获取路由定义信息
@GetMapping({"/routedefinitions"})
public Flux<RouteDefinition> routesdef() {
return this.routeDefinitionLocator.getRouteDefinitions();
}
//获取网关已经加载的所有路由信息
@GetMapping({"/routes"})
public Flux<Map<String, Object>> routes() {
return this.routeLocator.getRoutes().map(this::serialize);
}
Map<String, Object> serialize(Route route) {
HashMap<String, Object> r = new HashMap();
r.put("route_id", route.getId());
r.put("uri", route.getUri().toString());
r.put("order", route.getOrder());
r.put("predicate", route.getPredicate().toString());
if (!CollectionUtils.isEmpty(route.getMetadata())) {
r.put("metadata", route.getMetadata());
}
ArrayList<String> filters = new ArrayList();
for(int i = 0; i < route.getFilters().size(); ++i) {
GatewayFilter gatewayFilter = (GatewayFilter)route.getFilters().get(i);
filters.add(gatewayFilter.toString());
}
r.put("filters", filters);
return r;
}
//根据路由id 获取路由信息
@GetMapping({"/routes/{id}"})
public Mono<ResponseEntity<Map<String, Object>>> route(@PathVariable String id) {
return this.routeLocator.getRoutes().filter((route) -> {
return route.getId().equals(id);
}).singleOrEmpty().map(this::serialize).map(ResponseEntity::ok).switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
}
获取路由信息接口演示如下:
AbstractGatewayControllerEndpoint 源码解析
AbstractGatewayControllerEndpoint 主要提供了一下功能:
- refresh():刷新路由。
- refresh():获取全局过滤器。
- refresh():获取路由过滤器。
- refresh():获取断言工厂。
- refresh():保存路由信息。
- refresh():删除路由信息。
- refresh():根据路由id 获取组合过滤器。
public class AbstractGatewayControllerEndpoint implements ApplicationEventPublisherAware {
private static final Log log = LogFactory.getLog(GatewayControllerEndpoint.class);
protected RouteDefinitionLocator routeDefinitionLocator;
protected List<GlobalFilter> globalFilters;
protected List<GatewayFilterFactory> GatewayFilters;
protected List<RoutePredicateFactory> routePredicates;
protected RouteDefinitionWriter routeDefinitionWriter;
protected RouteLocator routeLocator;
protected ApplicationEventPublisher publisher;
public AbstractGatewayControllerEndpoint(RouteDefinitionLocator routeDefinitionLocator, List<GlobalFilter> globalFilters, List<GatewayFilterFactory> gatewayFilters, List<RoutePredicateFactory> routePredicates, RouteDefinitionWriter routeDefinitionWriter, RouteLocator routeLocator) {
this.routeDefinitionLocator = routeDefinitionLocator;
this.globalFilters = globalFilters;
this.GatewayFilters = gatewayFilters;
this.routePredicates = routePredicates;
this.routeDefinitionWriter = routeDefinitionWriter;
this.routeLocator = routeLocator;
}
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
//刷新路由
@PostMapping({"/refresh"})
public Mono<Void> refresh() {
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return Mono.empty();
}
//获取全局过滤器
@GetMapping({"/globalfilters"})
public Mono<HashMap<String, Object>> globalfilters() {
return this.getNamesToOrders(this.globalFilters);
}
//获取路由过滤器
@GetMapping({"/routefilters"})
public Mono<HashMap<String, Object>> routefilers() {
return this.getNamesToOrders(this.GatewayFilters);
}
//获取断言工厂
@GetMapping({"/routepredicates"})
public Mono<HashMap<String, Object>> routepredicates() {
return this.getNamesToOrders(this.routePredicates);
}
private <T> Mono<HashMap<String, Object>> getNamesToOrders(List<T> list) {
return Flux.fromIterable(list).reduce(new HashMap(), this::putItem);
}
private HashMap<String, Object> putItem(HashMap<String, Object> map, Object o) {
Integer order = null;
if (o instanceof Ordered) {
order = ((Ordered)o).getOrder();
}
map.put(o.toString(), order);
return map;
}
//保存路由信息
@PostMapping({"/routes/{id}"})
public Mono<ResponseEntity<Object>> save(@PathVariable String id, @RequestBody RouteDefinition route) {
return Mono.just(route).filter(this::validateRouteDefinition).flatMap((routeDefinition) -> {
return this.routeDefinitionWriter.save(Mono.just(routeDefinition).map((r) -> {
r.setId(id);
log.debug("Saving route: " + route);
return r;
})).then(Mono.defer(() -> {
return Mono.just(ResponseEntity.created(URI.create("/routes/" + id)).build());
}));
}).switchIfEmpty(Mono.defer(() -> {
return Mono.just(ResponseEntity.badRequest().build());
}));
}
private boolean validateRouteDefinition(RouteDefinition routeDefinition) {
boolean hasValidFilterDefinitions = routeDefinition.getFilters().stream().allMatch((filterDefinition) -> {
return this.GatewayFilters.stream().anyMatch((gatewayFilterFactory) -> {
return filterDefinition.getName().equals(gatewayFilterFactory.name());
});
});
boolean hasValidPredicateDefinitions = routeDefinition.getPredicates().stream().allMatch((predicateDefinition) -> {
return this.routePredicates.stream().anyMatch((routePredicate) -> {
return predicateDefinition.getName().equals(routePredicate.name());
});
});
log.debug("FilterDefinitions valid: " + hasValidFilterDefinitions);
log.debug("PredicateDefinitions valid: " + hasValidPredicateDefinitions);
return hasValidFilterDefinitions && hasValidPredicateDefinitions;
}
//删除路由信息
@DeleteMapping({"/routes/{id}"})
public Mono<ResponseEntity<Object>> delete(@PathVariable String id) {
return this.routeDefinitionWriter.delete(Mono.just(id)).then(Mono.defer(() -> {
return Mono.just(ResponseEntity.ok().build());
})).onErrorResume((t) -> {
return t instanceof NotFoundException;
}, (t) -> {
return Mono.just(ResponseEntity.notFound().build());
});
}
//根据路由id 获取组合过滤器
@GetMapping({"/routes/{id}/combinedfilters"})
public Mono<HashMap<String, Object>> combinedfilters(@PathVariable String id) {
return this.routeLocator.getRoutes().filter((route) -> {
return route.getId().equals(id);
}).reduce(new HashMap(), this::putItem);
}
}
获取全局过滤器演示如下:
其他接口不在一一演示。
通过 GatewayControllerEndpoint 和 AbstractGatewayControllerEndpoint 类的源码,我们可以知道 Gateway 提供获取路由信息、过滤器、断言等接口,这里面我们重点关注了的关于路由部分的,也是源码中涉及较多的,这两个类的源码提供了路由的增删改查及刷新的接口。
AbstractGatewayControllerEndpoint#save 方法源码分析
上面我们分析到 Gateway 源码提供了完整的增删改查及刷新的接口,我们重点来分析一下 Gateway 的新增路由的接口,也就是 AbstractGatewayControllerEndpoint#save 方法,该方法主要逻辑如下:
- 校验新增路由的入参,入参不合法直接返回失败。
- 调用 RouteDefinitionWriter 保存路由(实际调用的是 InMemoryRouteDefinitionRepository#save 方法)。
//org.springframework.cloud.gateway.actuate.AbstractGatewayControllerEndpoint#save
@PostMapping({"/routes/{id}"})
public Mono<ResponseEntity<Object>> save(@PathVariable String id, @RequestBody RouteDefinition route) {
//this::validateRouteDefinition 路由参数校验
return Mono.just(route).filter(this::validateRouteDefinition).flatMap((routeDefinition) -> {
//this.routeDefinitionWriter.save 调用routeDefinitionWriter 进行保存
return this.routeDefinitionWriter.save(Mono.just(routeDefinition).map((r) -> {
r.setId(id);
log.debug("Saving route: " + route);
return r;
})).then(Mono.defer(() -> {
return Mono.just(ResponseEntity.created(URI.create("/routes/" + id)).build());
}));
}).switchIfEmpty(Mono.defer(() -> {
return Mono.just(ResponseEntity.badRequest().build());
}));
}
private boolean validateRouteDefinition(RouteDefinition routeDefinition) {
//是否有重复的过滤器
boolean hasValidFilterDefinitions = routeDefinition.getFilters().stream().allMatch((filterDefinition) -> {
return this.GatewayFilters.stream().anyMatch((gatewayFilterFactory) -> {
return filterDefinition.getName().equals(gatewayFilterFactory.name());
});
});
//是否有重复的断言
boolean hasValidPredicateDefinitions = routeDefinition.getPredicates().stream().allMatch((predicateDefinition) -> {
return this.routePredicates.stream().anyMatch((routePredicate) -> {
return predicateDefinition.getName().equals(routePredicate.name());
});
});
log.debug("FilterDefinitions valid: " + hasValidFilterDefinitions);
log.debug("PredicateDefinitions valid: " + hasValidPredicateDefinitions);
return hasValidFilterDefinitions && hasValidPredicateDefinitions;
}
RouteDefinitionWriter 接口源码分析
我们上面分析到保存路由调用了的 RouteDefinitionWriter 接口的方法,RouteDefinitionWriter 中定义了保存路由和删除路由的方法,上面提到的 InMemoryRouteDefinitionRepository 间接实现了 RouteDefinitionWriter 接口。
public interface RouteDefinitionWriter {
//保存路由
Mono<Void> save(Mono<RouteDefinition> route);
//删除路由
Mono<Void> delete(Mono<String> routeId);
}
InMemoryRouteDefinitionRepository#save 方法源码分析
InMemoryRouteDefinitionRepository#save 方法的逻辑很简单,就只是把路由信息存入到 Map 对象 routes 中,基于内存的操作,因此 Gateway 重启之后就会丢失。
//org.springframework.cloud.gateway.route.InMemoryRouteDefinitionRepository#save
public Mono<Void> save(Mono<RouteDefinition> route) {
//private final Map<String, RouteDefinition> routes = Collections.synchronizedMap(new LinkedHashMap());
return route.flatMap((r) -> {
if (StringUtils.isEmpty(r.getId())) {
return Mono.error(new IllegalArgumentException("id may not be empty"));
} else {
//存入 routes 中 本质是一个 map
this.routes.put(r.getId(), r);
return Mono.empty();
}
});
}
AbstractGatewayControllerEndpoint#refresh 方法源码分析
AbstractGatewayControllerEndpoint#refresh 路由刷新功能,逻辑很简单,使用了事件监听的机制,发布了一个路由刷新事件。
//org.springframework.cloud.gateway.actuate.AbstractGatewayControllerEndpoint#refresh
@PostMapping({"/refresh"})
public Mono<Void> refresh() {
//发布路由刷新事件
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return Mono.empty();
}
CachingRouteLocator#onApplicationEvent 方法源码分析
CachingRouteLocator#onApplicationEvent 方法订阅 RefreshRoutesEvent 事件,主要做了以下两件事:
- 调用 CachingRouteLocator#fetch 方法。
- 发布路由刷新完成事件。
//org.springframework.cloud.gateway.route.CachingRouteLocator#onApplicationEvent
public void onApplicationEvent(RefreshRoutesEvent event) {
try {
//调用 CachingRouteLocator#fetch 方法
this.fetch().collect(Collectors.toList()).subscribe((list) -> {
Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe((signals) -> {
//发布路由刷新完成事件
this.applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
this.cache.put("routes", signals);
}, (throwable) -> {
this.handleRefreshError(throwable);
});
});
} catch (Throwable var3) {
this.handleRefreshError(var3);
}
}
CachingRouteLocator#fetch 方法源码分析
CachingRouteLocator#fetch 方法调用了
//org.springframework.cloud.gateway.route.CachingRouteLocator#fetch
private Flux<Route> fetch() {
return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
}
RouteDefinitionRouteLocator#getRoutes 方法源码分析
RouteDefinitionRouteLocator#getRoutes 方法主要逻辑如下:
- 获取路由定义信息,调用 convertToRoute 方法将路由定义信息转换为路由信息 。
- 将路由信息存入到 routes 中。
//org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator#getRoutes
public Flux<Route> getRoutes() {
//调用 convertToRoute 方法将路由定义信息转化为路由
Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
if (!this.gatewayProperties.isFailOnRouteDefinitionError()) {
routes = routes.onErrorContinue((error, obj) -> {
if (this.logger.isWarnEnabled()) {
this.logger.warn("RouteDefinition id " + ((RouteDefinition)obj).getId() + " will be ignored. Definition has invalid configs, " + error.getMessage());
}
});
}
//路由存储 routes 中
return routes.map((route) -> {
if (this.logger.isDebugEnabled()) {
this.logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});
}
如有不正确的地方请各位指出纠正。