路由断言(Route Predicate)工厂
Spring Cloud Gateway包括许多内置的路由断言(Route Predicate)工厂,所有这些Predicate都与HTTP请求的不同属性匹配。多个Route Predicate工厂可以进行组合。
官方文档:https://docs.spring.io/spring-cloud-gateway/docs/2.2.9.RELEASE/reference/html/#the-cookie-route-predicate-factory
spring:
cloud:
gateway:
routes:
- id: weight_high
uri: https://weighthigh.org
predicates:
- Weight=group1, 8
- id: weight_low
uri: https://weightlow.org
predicates:
- Weight=group1, 2
该路由会转发 80% 的流量to https://weighthigh.org
,转发 20% 的流量 https://weightlow.org
。
路由判断
RoutePredicateHandlerMapping#lookupRoute
,寻找路由,会将路由中的断言进行判断,判断通过才会返回路由。
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator
.getRoutes()
//individually filter routes so that filterWhen error delaying is not a problem
.concatMap(route -> Mono
.just(route)
.filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
//instead of immediately stopping main flux due to error, log and swallow it
.doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))
.onErrorResume(e -> Mono.empty())
)
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
//TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
/* TODO: trace logging
if (logger.isTraceEnabled()) {
logger.trace("RouteDefinition did not match: " + routeDefinition.getId());
}*/
}
权重路由
WeightCalculatorWebFilter#filter
,获取groupWeights
中的config.ranges
,判断当前获取的随机数是否在该区间,将路由信息存放在weights
中。
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
Map<String, String> weights = getWeights(exchange);
groupWeights.forEach((group, config) -> {
double r = this.random.nextDouble();
List<Double> ranges = config.ranges;
if (log.isTraceEnabled()) {
log.trace("Weight for group: "+group +", ranges: "+ranges +", r: "+r);
}
for (int i = 0; i < ranges.size() - 1; i++) {
if (r >= ranges.get(i) && r < ranges.get(i+1)) {
String routeId = config.rangeIndexes.get(i);
weights.put(group, routeId);
break;
}
}
});
if (log.isTraceEnabled()) {
log.trace("Weights attr: "+weights);
}
return chain.filter(exchange);
}
static Map<String, String> getWeights(ServerWebExchange exchange) {
Map<String, String> weights = exchange.getAttribute(WEIGHT_ATTR);
if (weights == null) {
weights = new ConcurrentHashMap<>();
exchange.getAttributes().put(WEIGHT_ATTR, weights);
}
return weights;
}
WeightCalculatorWebFilter#onApplicationEvent
监听到PredicateArgsEvent
会进行初始化
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof PredicateArgsEvent) {
handle((PredicateArgsEvent) event);
} else if (event instanceof WeightDefinedEvent) {
addWeightConfig(((WeightDefinedEvent)event).getWeightConfig());
}
}
public void handle(PredicateArgsEvent event) {
Map<String, Object> args = event.getArgs();
if (args.isEmpty() || !hasRelevantKey(args)) {
return;
}
WeightConfig config = new WeightConfig(event.getRouteId());
ConfigurationUtils.bind(config, args,
WeightConfig.CONFIG_PREFIX, WeightConfig.CONFIG_PREFIX, validator);
addWeightConfig(config);
}
WeightCalculatorWebFilter#addWeightConfig
,更新config.ranges
的数据。
void addWeightConfig(WeightConfig weightConfig) {
String group = weightConfig.getGroup();
GroupWeightConfig c = groupWeights.get(group);
if (c == null) {
c = new GroupWeightConfig(group);
groupWeights.put(group, c);
}
GroupWeightConfig config = c;
config.weights.put(weightConfig.getRouteId(), weightConfig.getWeight());
//recalculate
// normalize weights
int weightsSum = config.weights.values().stream().mapToInt(Integer::intValue).sum();
final AtomicInteger index = new AtomicInteger(0);
config.weights.forEach((routeId, weight) -> {
Double nomalizedWeight = weight / (double) weightsSum;
config.normalizedWeights.put(routeId, nomalizedWeight);
// recalculate rangeIndexes
config.rangeIndexes.put(index.getAndIncrement(), routeId);
});
//TODO: calculate ranges
config.ranges.clear();
config.ranges.add(0.0);
List<Double> values = new ArrayList<>(config.normalizedWeights.values());
for (int i = 0; i < values.size(); i++) {
Double currentWeight = values.get(i);
Double previousRange = config.ranges.get(i);
Double range = previousRange + currentWeight;
config.ranges.add(range);
}
if (log.isTraceEnabled()) {
log.trace("Recalculated group weight config "+ config);
}
}
WeightRoutePredicateFactory#apply
,根据exchange的属性weights,获取到选择的路由chosenRoute。
@Override
public Predicate<ServerWebExchange> apply(WeightConfig config) {
return exchange -> {
Map<String, String> weights = exchange.getAttributeOrDefault(WEIGHT_ATTR,
Collections.emptyMap());
String routeId = exchange.getAttribute(GATEWAY_PREDICATE_ROUTE_ATTR);
// all calculations and comparison against random num happened in
// WeightCalculatorWebFilter
String group = config.getGroup();
if (weights.containsKey(group)) {
String chosenRoute = weights.get(group);
if (log.isTraceEnabled()) {
log.trace("in group weight: "+ group + ", current route: " + routeId +", chosen route: " + chosenRoute);
}
return routeId.equals(chosenRoute);
}
return false;
};
}
路由过滤器
路由过滤器可用于修改进入的HTTP请求和返回的HTTP响应。Spring Cloud Gateway内置了多种路由过滤器,由GatewayFilter的工厂类禅城。更多资料可以看官方文档:https://docs.spring.io/spring-cloud-gateway/docs/2.2.9.RELEASE/reference/html/#gatewayfilter-factories
责任链处理
FilteringWebHandler#handle
,根据路由获取到GatewayFilter
的集合,再添加全局的路由器,构成责任链,执行filter
方法。
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
//TODO: needed or cached?
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: "+ combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
StripPrefixGatewayFilterFactory
StripPrefixGatewayFilterFactory#apply
,获取到uri的路径,用/
切分,过滤掉第n个。
public class StripPrefixGatewayFilterFactory extends AbstractGatewayFilterFactory<StripPrefixGatewayFilterFactory.Config> {
public static final String PARTS_KEY = "parts";
public StripPrefixGatewayFilterFactory() {
super(Config.class);
}
@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList(PARTS_KEY);
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
ServerHttpRequest request = exchange.getRequest();
addOriginalRequestUrl(exchange, request.getURI());
String path = request.getURI().getRawPath();
String newPath = "/" + Arrays.stream(StringUtils.tokenizeToStringArray(path, "/"))
.skip(config.parts).collect(Collectors.joining("/"));
newPath += (newPath.length() > 1 && path.endsWith("/") ? "/" : "");
ServerHttpRequest newRequest = request.mutate()
.path(newPath)
.build();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI());
return chain.filter(exchange.mutate().request(newRequest).build());
};
}
public static class Config {
private int parts;
public int getParts() {
return parts;
}
public void setParts(int parts) {
this.parts = parts;
}
}
}