一、实现步骤
- 再请求 Header 中打上标签,例如再 Header 中添加 "gray-tag: true" ,其表示要进行灰度测试(访问灰度服务),而其他则访问正式服务。
- 在负载均衡器 Spring Cloud LoadBalancer 中,拿到 Header 仲的 "gray-tag" 进行判断,如果此标签不为空,并等于"true" 的话,表示要访问灰度发布的服务,否则只访问正式的服务。
- 在网关 Spring Cloud Gateway 中,将 Header 标签 "gray-tag:true" 继续往下一个调用服务中传递。
- 在后续的调用服务中,需要实现两个关键功能:
● 在负载均衡器 Spring Cloud LoadBalancer 中,判断回复发布标签,将请求分发到对应服务
● 将灰度发布标签继续传递给下一个调用的服务.如此反复传递
二、服务模块
2.1 注册为灰色服务实例
spring:
application:
name: user-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
username: nacos
password: nacos
metadata:
{"gray-tag":true} #标识当前为灰度节点
server:
port: 0
2.2 设置负载均衡器
在服务启动类设置父子均衡器和Openfeign 服务
@SpringBootApplication
@LoadBalancerClients(defaultConfiguration = GllobalLoadbanlancerConfig.class)
@EnableFeignClients
public class UserServiceGrayApplication {
public static void main(String[] args) {
SpringApplication.run(UserServiceGrayApplication.class, args);
}
}
2.3 传递灰度标签
package com.example.userservicegray.config;
import com.example.globalconfigdemo.global.GlobalVarible;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.Enumeration;
@Component
public class FeignRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
// 从 RequestContextHolder 中获取 HttpServletRequest
ServletRequestAttributes attributes = (ServletRequestAttributes)
RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
if(request.getHeader(GlobalVarible.GRAY_TAG)!=null&&request.getHeader(GlobalVarible.GRAY_TAG)=="true"){
requestTemplate.header(GlobalVarible.GRAY_TAG,"true");
}
}
// @Override
// public void apply(RequestTemplate requestTemplate) {
// // 从 RequestContextHolder 中获取 HttpServletRequest
// ServletRequestAttributes attributes = (ServletRequestAttributes)
// RequestContextHolder.getRequestAttributes();
// HttpServletRequest request = attributes.getRequest();
// Enumeration<String> headerNames = request.getHeaderNames();
// while (headerNames.hasMoreElements()){
// String key = headerNames.nextElement();
// String value = request.getHeader(key);
// requestTemplate.header(key,value);
// }
// }
}
三、网关模块
网关传递灰度发布标识
package com.example.gatewayservice;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Component
public class GatewayFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request= exchange.getRequest();
ServerHttpResponse response= exchange.getResponse();
if(request.getQueryParams().getFirst(GlobalVarible.GRAY_TAG)!=null&&request.getQueryParams().getFirst(GlobalVarible.GRAY_TAG)=="true") {
response.getHeaders().set(GlobalVarible.GRAY_TAG, "true");
}
System.out.println("======================================");
return chain.filter(exchange);
}
@Override
public int getOrder() {
return 0;
}
}
四、自定义负载均衡模块
4.1 自定义负载均衡器
package com.example.globalconfigdemo.global;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.*;
import org.springframework.cloud.loadbalancer.core.*;
import org.springframework.http.HttpHeaders;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class GlobalLoadbancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);
private AtomicInteger position;
private String serviceId;
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public GlobalLoadbancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
this(serviceInstanceListSupplierProvider, serviceId, (new Random()).nextInt(1000));
}
public GlobalLoadbancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, int seedPosition) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.position = new AtomicInteger(seedPosition);
}
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map((serviceInstances) -> {
//此时调用时,将request作为参数传给调用方法,在得到服务实例时通过判断请求头中的标识来返回实例
return this.processInstanceResponse(supplier, serviceInstances,request);
});
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances,Request request) {
//将request 传给调用方法
Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances,request);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback)supplier).selectedServiceInstance((ServiceInstance)serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,Request request) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + this.serviceId);
}
return new EmptyResponse();
} else if (instances.size() == 1) {
return new DefaultResponse((ServiceInstance)instances.get(0));
} else {
//得到 Request 对象,[通过方法传递参数得到此对象]
//从 Request 对象的 Header 中得到灰度标签
RequestDataContext requestDataContext= (RequestDataContext) request.getContext();
HttpHeaders headers=requestDataContext.getClientRequest().getHeaders();
if(headers.get(GlobalVarible.GRAY_TAG)!=null&&headers.get(GlobalVarible.GRAY_TAG).get(0).equals("true")){
List<ServiceInstance> grayInstance=instances.stream().filter(s->s.getMetadata().get(GlobalVarible.GRAY_TAG)!=null&&s.getMetadata().get(GlobalVarible.GRAY_TAG).equals("true")).toList();
//判断灰度列表不为空
if(grayInstance.size()>0){
instances=grayInstance;
}
}else {
instances=instances.stream().filter(s->s.getMetadata().get(GlobalVarible.GRAY_TAG)==null || !s.getMetadata().get(GlobalVarible.GRAY_TAG).equals("true")).toList();
}
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}
}
4.2 封装自定义负载均衡器
package com.example.globalconfigdemo.global.config;
import com.example.globalconfigdemo.global.GlobalLoadbancer;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
public class GllobalLoadbanlancerConfig {
@Bean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty("loadbalancer.client.name");
return new GlobalLoadbancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}