异常重试
- 异常重试
- a.异常重试
- b.RPC的自我保护
- 1) 介绍
- 2) 实现令牌桶限流器
- c.熔断器
- d.实现服务端的限流
- e.实现客户端的熔断
- f.流量隔离
- 1) 介绍
- 2) 实现
异常重试
a.异常重试
1.为什么需要异常重试?
当发起一次 yrpc 调用,去调用远程的一个服务,比如用户的登录操作,我们会先对用户的用户名以及密码进行验证,验证成功之后会获取用户的基本信息。当我们通过远程的用户服务来获取用户基本信息的时候,恰好网络出现了问题,比如网络突然抖了一下,导致我们的请求失败了,而这个请求我们希望它能够尽可能地执行成功,那这时我们要怎么做呢?
我们需要重新发起一次 yrpc 调用,那我们在代码中该如何处理呢?是在代码逻辑里 catch 一下,失败了就再发起一次调用吗?这样做显然不够优雅吧。这时我们就可以考虑使用 yrpc 框架的重试机制。
2.yrpc 框架的重试机制
当调用端发起的请求失败时,yrpc 框架自身可以进行重试,再重新发送请求,用户可以自
行设置是否开启重试以及重试的次数。
在common模块下的annotation
包中,创建TryTimes
自定义注解
@Target(ElementType.METHOD)
在方法上使用
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TryTimes {
int tryTimes() default 3;
int intervalTime() default 2000;
}
在demo模块下的api模块,pom文件中引入common的模块
<dependency>
<groupId>com.dcy</groupId>
<artifactId>dcyrpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
在api模块下的DcyRpc
接口,添加自定义的重试次数注解
- 括号内的值:不写就按默认值设置
@TryTimes(tryTimes = 3, intervalTime = 3000)
String sayHi(String msg);
修改在core模块下的proxy.handler
包下的RpcConsumerInvocationHandler
类,invoke()
方法:什么情况下要重试? 1.异常 2.响应有问题 code=500
- 用 while循环 和 try异常捕获,包住整个业务逻辑
- 通过method获取注解的值
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
int tryTimes = 0;
int intervalTime = 0;
// 从接口中获取 判断是否需要重试
TryTimes annotation = method.getAnnotation(TryTimes.class);
if (annotation != null) {
tryTimes = annotation.tryTimes();
intervalTime = annotation.intervalTime();
}
while (true) {
try {
// 略... 原来的业务代码
} catch (Exception e) {
// 次数减一,并且等待
tryTimes--;
try {
Thread.sleep(intervalTime);
} catch (InterruptedException ex) {
log.error("在进行重试时发生异常", ex);
}
if (tryTimes < 0) {
log.error("对方法【{}】进行远程调用时,重试【{}】次,依然不可用", method.getName(), tryTimes, e);
break;
}
log.error("在进行第【{}】次重试时发生异常", 3-tryTimes, e);
}
}
throw new RuntimeException("执行远程【{"+method.getName()+"}】方法调用失败");
}
b.RPC的自我保护
1) 介绍
当程序流量非常大,大到机器也崩溃。所以,当流量高,并发大,应该如果保护程序
服务提供方 有大量的流量突然来袭,顶不住 限流
服务调用方 发送了大量的请求但是都不行,没必要继续发送 熔断 降级
限流逻辑又该如何实现呢?
比如最简单的计数器,还有可以做到平滑限流的滑动窗口、漏斗算法以及令牌桶算法等等。其中令牌桶算法最为常用。
我们可以假设这样一个场景:我发布了一个服务,提供给多个应用的调用方去调用,这时有一个应用的调用方发送过来的请求流量要比其它的应用大很多,这时我们就应该对这个应用下的调用端发送过来的请求流量进行限流。所以说我们在做限流的时候要考虑应用级别的维度,甚至是 IP 级别的维度,这样做不仅可以让我们对一个应用下的调用端发送过来的请求流量做限流,还可以对一个 IP 发送过来的请求流量做限流。
使用方该如何配置应用维度以及 IP 维度的限流呢?在代码中配置是不是不大方便?yrpc 框架真正强大的地方在于它的治理功能,而治理功能大多都需要依赖一个注册中心或者配置中心,我们可以通过 yrpc 治理的管理端进行配置,再通过注册中心或者配置中心将限流阈值的配置下发到服务提供方的每个节点上,实现动态配置。
甚至可以将限流逻辑放在调用端,调用端在发出请求时先触发限流逻辑,调用限流服务,如果请求量已经到达了限流阈值,请求都不需要发出去,直接返回给动态代理一个限流异常即可。
2) 实现令牌桶限流器
限流器作用于服务端,用于保护服务端的服务节点
在core模块下的com.dcyrpc
下创建protection
包,创建RateLimiter
接口
public interface RateLimiter {
/**
* 是否允许新的请求进入
* @return
*/
boolean allowRequest();
}
在core模块下的com.dcyrpc
下的protection
包,创建TokenBuketRateLimiter
类:基于令牌桶算法的限流器
-
变量:令牌的数量,桶的容量,给令牌桶加令牌的速率,上一次加令牌的时间
-
提供 释放请求 方法:
synchronized
有多个线程同时请求- 1.给令牌桶添加令牌
- 2.自己获取令牌
/**
* 基于令牌桶算法的限流器
*/
public class TokenBuketRateLimiter implements RateLimiter {
// 代表令牌的数量,>0 有令牌能放行, 放行就-1 ==0无令牌阻拦
private int tokens;
// 令牌数(桶的容量)
private final int capacity;
// 令牌桶的令牌(每秒放几个): 按照一定速率给令牌桶加令牌, 如每秒500个(不能超过总数)
// - 可以用定时任务加令牌
// - 每一个请求要发送的时候就加一个令牌
private final int rate;
// 上一次加令牌的时间
private Long lastAddTokenTime;
public TokenBuketRateLimiter(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.lastAddTokenTime = System.currentTimeMillis();
this.tokens = capacity;
}
/**
* 判断请求是否可以放行
* @return
*/
public synchronized boolean allowRequest() {
// 1.给令牌桶添加令牌
long currentTime = System.currentTimeMillis();
long timeInterval = currentTime - lastAddTokenTime; // 时间间隔
// 如果时间间隔超过1秒,放令牌
if (timeInterval >= 1000) {
int needAddTokens = (int) (timeInterval * rate / 1000); // 需要添加的tokens
// 加令牌
tokens = Math.min(capacity, tokens + needAddTokens);
// 标记最后加入令牌的时间
this.lastAddTokenTime = System.currentTimeMillis();
}
// 2.自己获取令牌
if (tokens > 0) {
tokens--;
System.out.println("请求被放行---------------------");
return true;
} else {
System.out.println("请求被拦截---------------------");
return false;
}
}
}
c.熔断器
熔断器作用于客户端
理论上:标准断路器应该有3种状态:
- open:当开时,流量无法访问
- close:当关闭时,流量可以正常访问
- half-open:发送一个请求,如果这个请求得到了回应就改成close,没有回应就改成open
在core模块下的com.dcyrpc
下的protection
包,创建CircuitBreaker
类:熔断器
- 变量: 是否开启(Boolean), 总的请求数, 异常的请求数, 最大异常数, 最大异常比例
- 核心方法:通过数据指标,判断是否开启熔断器
- 其他方法:重置熔断器,记录正常请求数,记录异常请求数
/**
* 熔断器
*/
public class CircuitBreaker {
private volatile boolean isOpen = false;
// 总的请求数
private AtomicInteger requestCount = new AtomicInteger(0);
// 异常的请求数
private AtomicInteger errorRequestCount = new AtomicInteger(0);
// 最大异常数
private int maxErrorRequestCount;
// 最大异常比例
private float maxErrorRate;
public CircuitBreaker(int maxErrorRequestCount, float maxErrorRate) {
this.maxErrorRequestCount = maxErrorRequestCount;
this.maxErrorRate = maxErrorRate;
}
/**
* 通过数据指标,判断是否开启熔断器
* @return
*/
public boolean isBreak() {
// 断路器已经打开
if (isOpen) {
return true;
}
// 判断数据指标,是否满足当前的阈值
if (errorRequestCount.get() > maxErrorRequestCount) {
this.isOpen = true;
return true;
}
// 判断异常率
if (errorRequestCount.get() > 0 && requestCount.get() > 0 && (float) errorRequestCount.get() / requestCount.get() > maxErrorRate * 10) {
this.isOpen = true;
return true;
}
return false;
}
/**
* 重置熔断器
*/
public void reset() {
this.isOpen = false;
this.requestCount.set(0);
this.errorRequestCount.set(0);
}
/**
* 记录正常请求数
*/
public void recordRequest() {
this.requestCount.getAndIncrement();
}
/**
* 记录异常请求数
*/
public void recordErrorRequest() {
this.errorRequestCount.getAndIncrement();
}
}
d.实现服务端的限流
实现IP应用级别的总的限流器
修改core模块下的config
包下的Configuration
类:配置限流器
// 略...
// 为每一个IP配置的限流器
private final Map<SocketAddress, RateLimiter> EACH_IP_RATE_LIMITER = new ConcurrentHashMap<>(16);
修改core模块下的enumeration
包下的ResponseCode
类:修改响应码
SUCCESS((byte) 20, "成功"),
SUCCESS_HEART_BEAT((byte) 21, "心跳检测成功返回"),
RATE_LIMIT((byte) 31, "服务被限流"),
RESOURCE_NOT_FOUND((byte) 44, "请求的资源不存在"),
FAIL((byte) 50, "调用方法发生异常");
CLOSING((byte) 51, "程序正在关闭中");
// 略...
修改core模块下的channelHandler.handler
包下的MethodCallHandler
类的channelRead0()
方法:修改业务逻辑, 把限流器放入
-
先封装部分响应
-
通过channel获取IP地址,针对IP地址的限流,应该每一个IP匹配一个限流器
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DcyRpcRequest dcyRpcRequest) throws Exception {
// 1.先封装部分响应
DcyRpcResponse dcyRpcResponse = DcyRpcResponse.builder()
.requestId(dcyRpcRequest.getRequestId())
.compressType(dcyRpcRequest.getCompressType())
.serializeType(dcyRpcRequest.getSerializeType())
.build();
// 2.限流操作
// 2.1.获取连接的地址
Channel channel = channelHandlerContext.channel();
SocketAddress socketAddress = channel.remoteAddress();
Map<SocketAddress, RateLimiter> eachIpRateLimiter = DcyRpcBootstrap.getInstance().getConfiguration().getEachIpRateLimiter();
// 2.2.查看缓存中是否有限流器
RateLimiter rateLimiter = eachIpRateLimiter.get(socketAddress);
if (rateLimiter == null) {
rateLimiter = new TokenBuketRateLimiter(500, 300);
eachIpRateLimiter.put(socketAddress, rateLimiter);
}
boolean allowRequest = rateLimiter.allowRequest();
// 3.处理限流
if (!allowRequest) {
// 需要封装响应并且返回
dcyRpcResponse.setCode(ResponseCode.RATE_LIMIT.getCode());
} else if (dcyRpcRequest.getRequestType() == RequestType.HEART_BEAT.getId()) {
// 处理心跳
dcyRpcResponse.setCode(ResponseCode.SUCCESS_HEART_BEAT.getCode());
// 封装响应并返回
} else {
// 正常调用
/**--------------------------具体调用过程--------------------------------------------*/
// 1.获取负载内容payload
RequestPayload requestPayload = dcyRpcRequest.getRequestPayload();
// 2.根据负载内容进行方法调用
try {
Object result = callTargetMethod(requestPayload);
log.info("请求【{}】已经在Provider端完成方法调用", dcyRpcRequest.getRequestId());
// 3.封装响应
dcyRpcResponse.setCode(ResponseCode.SUCCESS.getCode());
dcyRpcResponse.setBody(result);
} catch (Exception e) {
dcyRpcResponse.setCode(ResponseCode.FAIL.getCode());
log.error("请求id为【{}】时发生异常", dcyRpcRequest.getRequestId(), e);
}
// 4.写出响应
channelHandlerContext.channel().writeAndFlush(dcyRpcResponse);
}
}
e.实现客户端的熔断
修改core模块下的config
包下的Configuration
类:配置熔断器
// 略...
// 为每一个IP配置的断路器
private final Map<SocketAddress, CircuitBreaker> eachIpCircuitBreaker = new ConcurrentHashMap<>(16);
修改core模块下的proxy.handler
包下的RpcConsumerInvocationHandler
类的invoke()
方法:修改业务逻辑
- 1.封装报文,创建请求
- 2.将请求存入本地线程
- 3.发现服务
- 4.获取当前地址的断路器
- 如果断路器是打开的,则不发送请求,抛出异常
- 5.尝试获取一个可用的通道
- 6.写出报文
- 7.清理threadLocal
- 8.获得响应的结果
- 9.发生任何异常,需要记录请求失败的次数
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
int tryTimes = 0;
int intervalTime = 0;
// 从接口中获取 判断是否需要重试
TryTimes annotation = method.getAnnotation(TryTimes.class);
if (annotation != null) {
tryTimes = annotation.tryTimes();
intervalTime = annotation.intervalTime();
}
while (true) {
// 什么情况下要重试? 1.异常 2.响应有问题 code=500
/**
* ---------------------------封装报文---------------------------
*/
// 1.封装报文
RequestPayload requestPayload = RequestPayload.builder()
.interfaceName(interfaceRef.getName())
.methodName(method.getName())
.parametersType(method.getParameterTypes())
.parametersValue(args)
.returnType(method.getReturnType())
.build();
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
.requestId(DcyRpcBootstrap.getInstance().getConfiguration().getIdGenerator().getId())
.compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.getInstance().getConfiguration().getCompressType()).getCode())
.serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.getInstance().getConfiguration().getSerializeType()).getCode())
.requestType(RequestType.REQUEST.getId())
.timeStamp(new Date().getTime())
.requestPayload(requestPayload)
.build();
// 2.请求存入本地线程
DcyRpcBootstrap.REQUEST_THREAD_LOCAL.set(dcyRpcRequest);
// 3.发现服务,从注册中心拉取服务列表,并通过客户端负载均衡寻找一个可用的服务
// - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
// 获取当前配置的负载均衡器,选取一个可用的节点
InetSocketAddress address = DcyRpcBootstrap.getInstance().getConfiguration().getLoadBalancer().selectServiceAddress(interfaceRef.getName());
if (log.isInfoEnabled()) {
log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
}
// 4.获取当前地址的断路器
Map<SocketAddress, CircuitBreaker> eachIpCircuitBreaker = DcyRpcBootstrap.getInstance().getConfiguration().getEachIpCircuitBreaker();
CircuitBreaker circuitBreaker = eachIpCircuitBreaker.get(address);
if (circuitBreaker == null) {
circuitBreaker = new CircuitBreaker(10, 0.5F);
eachIpCircuitBreaker.put(address, circuitBreaker);
}
try {
// 判断熔断器是否开启: 如果已打开
if (dcyRpcRequest.getRequestType() != RequestType.HEART_BEAT.getId() && circuitBreaker.isBreak()) {
// 定期打开
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
DcyRpcBootstrap.getInstance().getConfiguration().getEachIpCircuitBreaker().get(address).reset();
}
}, 5000);
throw new RuntimeException("当前断路器已经开启,无法发送请求");
}
// 5.尝试获取一个可用的通道
Channel channel = getAvailableChannel(address);
if (log.isInfoEnabled()) {
log.info("获取了和【{}】建立的连接通道,准备发送数据", address);
}
/**
* ---------------------------同步策略---------------------------
*/
// ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
// // get()阻塞获取结果
// // getNow()获取当前的结果,如果未处理完成,返回null
// if (channelFuture.isDone()) {
// Object object = channelFuture.getNow();
// } else if (!channelFuture.isSuccess()) {
// // 发生问题,需要捕获异常。
// // 子线程可以捕获异步任务的异常
// Throwable cause = channelFuture.cause();
// throw new RuntimeException(cause);
// }
/**
* ---------------------------异步策略---------------------------
*/
// 6.写出报文
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
// 将completableFuture暴露出去
DcyRpcBootstrap.PENDING_REQUEST.put(dcyRpcRequest.getRequestId(), completableFuture);
// 直接使用writeAndFlush 写出一个请求,这个请求的实例就会进入pipeline执行出栈的一系列操作
channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
// 需要处理异常
if (!promise.isSuccess()) {
completableFuture.completeExceptionally(promise.cause());
}
});
// 7.清理threadLocal
DcyRpcBootstrap.REQUEST_THREAD_LOCAL.remove();
// 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
// 在Netty的pipeline中最终的handler的处理结果 调用complete
// 8.获得响应的结果
Object result = completableFuture.get(10, TimeUnit.SECONDS);
// 记录成功的请求
circuitBreaker.recordRequest();
return result;
} catch (Exception e) {
// 次数减一,并且等待
tryTimes--;
//记录错误的次数
circuitBreaker.recordErrorRequest();
try {
Thread.sleep(intervalTime);
} catch (InterruptedException ex) {
log.error("在进行重试时发生异常", ex);
}
if (tryTimes < 0) {
log.error("对方法【{}】进行远程调用时,重试【{}】次,依然不可用", method.getName(), tryTimes, e);
break;
}
log.error("在进行第【{}】次重试时发生异常", 3 - tryTimes, e);
}
}
throw new RuntimeException("执行远程【{" + method.getName() + "}】方法调用失败");
}
在common模块下的exception
包下,创建ResponseException
:响应异常类
public class ResponseException extends RuntimeException{
private byte code;
private String msg;
public ResponseException(byte code, String msg) {
super(msg);
this.msg = msg;
this.code = code;
}
}
修改core模块下的proxy.handler
包下的MySimpleChannelInboundHandler
类的channelRead0()
方法
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DcyRpcResponse dcyRpcResponse) throws Exception {
CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(dcyRpcResponse.getRequestId());
SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
Map<SocketAddress, CircuitBreaker> eachIpCircuitBreaker = DcyRpcBootstrap.getInstance().getConfiguration().getEachIpCircuitBreaker();
CircuitBreaker circuitBreaker = eachIpCircuitBreaker.get(socketAddress);
byte code = dcyRpcResponse.getCode();
if (code == ResponseCode.FAIL.getCode()) {
circuitBreaker.recordErrorRequest();
completableFuture.complete(null);
log.error("当前请求id【{}】,返回错误的结果,响应码【{}】", dcyRpcResponse.getRequestId(), code);
throw new ResponseException(code, ResponseCode.FAIL.getDesc());
} else if (code == ResponseCode.RATE_LIMIT.getCode()) {
circuitBreaker.recordErrorRequest();
completableFuture.complete(null);
log.error("当前请求id【{}】,被限流,响应码【{}】", dcyRpcResponse.getRequestId(), code);
throw new ResponseException(code, ResponseCode.RATE_LIMIT.getDesc());
} else if (code == ResponseCode.RESOURCE_NOT_FOUND.getCode()) {
circuitBreaker.recordErrorRequest();
completableFuture.complete(null);
log.error("当前请求id【{}】,未找到目标资源,响应码【{}】", dcyRpcResponse.getRequestId(), code);
throw new ResponseException(code, ResponseCode.RESOURCE_NOT_FOUND.getDesc());
} else if (code == ResponseCode.SUCCESS.getCode()) {
// 异步
// 服务提供方,给予的结果
Object returnValue = dcyRpcResponse.getBody();
returnValue = returnValue == null ? new Object() : returnValue;
// 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
completableFuture.complete(returnValue);
log.info("已寻找【{}】completableFuture,处理响应结果", dcyRpcResponse.getRequestId());
} else if (code == ResponseCode.SUCCESS_HEART_BEAT.getCode()) {
completableFuture.complete(null);
log.info("当前请求id【{}】,处理心跳检测,响应码【{}】", dcyRpcResponse.getRequestId(), code);
}
}
f.流量隔离
1) 介绍
后期业务发展了,调用你接口的调用方就会越来越多,流量也会渐渐多起来。可能某一天,一个“爆炸式惊喜”就来了。其中一个调用方的流量突然激增,让你整个集群瞬间处于高负载运行,进而影响到其它调用方,导致整体的业务可用性降低。最好的办法就是隔离流量,将多个dcyrpc服务进行分组,一个调用方只能访问一个分组的服务,就是一个调用方流量爆炸也只会影响一个分组的服务,整体还是可用的。
怎么实现分组?
分组的逻辑的就是让调用方可以发现一个分组的服务,那实现的逻辑就一定是服务发现的时候只能拉取同一个分组的服务,我们需要在服务发现中做一些改造。
原本服务调用方是通过接口名去注册中心找到所有的服务节点来完成服务发现的,那换到这里的话,这样做其实并不合适,因为这样调用方会拿到所有的服务节点。因此为了实现分组隔离逻辑,我们需要重新改造下服务发现的逻辑,调用方去获取服务节点的时候除了要带着接口名,还需要另外加一个分组参数,相应的服务提供方在注册的时候也要带上分组参数。
非核心应用不要跟核心应用分在同一个组,核心应用之间应该做好隔离,一个重要的原则就是保障核心应用不受影响。比如提供给电商下单过程中用的商品信息接口,我们肯定是需要独立出一个单独分组,避免受其它调用方污染的。
2) 实现
修改common模块下的DcyRpcApi
枚举
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DcyRpcApi {
// 分组名称
String group() default "default";
}
修改provider模块下的impl.DcyRpcImpl
启动类:在注解上添加分组
@DcyRpcApi(group = "HK")
public class DcyRpcImpl implements DcyRpc {
// 略...
}
修改core模块下的DcyRpcBootstrap
启动类的scan()
方法:
- 通过注解获取分组信息
- 将分组信息存入ServiceConfig中
public DcyRpcBootstrap scan(String packageName) {
// 略...
for (Class<?> clazz : classes) {
// 略...
// 获取分组信息
DcyRpcApi dcyRpcApi = clazz.getAnnotation(DcyRpcApi.class);
String group = dcyRpcApi.group();
for (Class<?> anInterface : interfaces) {
// 略...
serviceConfig.setGroup(group);
// 略...
}
}
return this;
}
修改core模块下的ServiceConfig
类的:
- 添加group属性,group的setter/getter方法
private String group = "default";
public void setGroup(String group) {
this.group = group;
}
public String getGroup() {
return group;
}
修改core模块下的discovery.impl
包下的ZookeeperRegistry
类:register方法
- 建立分组节点
@Override
public void register(ServiceConfig<?> service) {
// 服务名称的节点
String parentNode = Constant.BASE_PROVIDERS_PATH + "/" + service.getInterface().getName();
// 判断节点是否存在,不存在则创建节点(持久)
if (!ZookeeperUtils.existNode(zooKeeper, parentNode, null)) {
ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode, null);
ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);
}
// 建立分组节点
parentNode = parentNode + "/" + service.getGroup();
if (!ZookeeperUtils.existNode(zooKeeper, parentNode, null)) {
ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode, null);
ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);
}
// 略...
}
修改consumer模块下的启动类:
// 略...
DcyRpcBootstrap.getInstance()
.application("first-dcyrpc-consumer")
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.serialize("jdk")
.compress("gzip")
.group("HK")
.reference(reference);
// 略...
修改core模块下的config
包的Configuration
类:
// 分组信息
private String group = "default";
修改core模块下的DcyRpcBootstrap
启动类:
-
在reference方法中设置分组信息
-
添加group方法
public DcyRpcBootstrap reference(ReferenceConfig<?> reference) {
// 启动心跳检测
log.info("开始心跳检测");
HeartbeatDetector.detectHeartbeat(reference.getInterface().getName());
// 配置reference,将来调用get方法时,方便生成代理对象
// 1.reference需要一个注册中心
reference.setRegistry(configuration.getRegistryConfig().getRegistry());
reference.setGroup(this.getConfiguration().getGroup());
return this;
}
public DcyRpcBootstrap group(String group) {
this.configuration.setGroup(group);
return this;
}
修改core模块下的ReferenceConfig
类的:
- 添加group属性,group的setter/getter方法
- 在
get()
方法,把group传值给RpcConsumerInvocationHandler
public T get() {
// 使用动态代理完成工作
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<T>[] classes = new Class[]{interfaceRef};
InvocationHandler handler = new RpcConsumerInvocationHandler(interfaceRef, registry, group);
// 使用动态代理生成代理对象
Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);
return (T) helloProxy;
}
private String group;
public void setGroup(String group) {
this.group = group;
}
public String getGroup() {
return group;
}
修改core模块下的proxy.handler
包下的RpcConsumerInvocationHandler
类
- 设置group变量
- 在
invoke()
方法中,发现服务传入一个group
private String group;
public RpcConsumerInvocationHandler(Class<?> interfaceRef, Registry registry, String group) {
this.interfaceRef = interfaceRef;
this.registry = registry;
this.group = group;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 略....
// 3.发现服务,从注册中心拉取服务列表,并通过客户端负载均衡寻找一个可用的服务
// - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
// 获取当前配置的负载均衡器,选取一个可用的节点
InetSocketAddress address = DcyRpcBootstrap.getInstance()
.getConfiguration().getLoadBalancer().selectServiceAddress(interfaceRef.getName(), group);
// 略...
}
修改core模块下的loadbalancer
包下的LoadBalancer
接口
InetSocketAddress selectServiceAddress(String serviceName, String group);
修改core模块下的loadbalancer
包下的AbstractLoadBalancer
:在selectServiceAddress
方法里的lookup传入group
@Override
public InetSocketAddress selectServiceAddress(String serviceName, String group) {
// 略...
if (selector == null) {
// 这个负载均衡器,内部要维护服务列表,作为缓存
List<InetSocketAddress> serviceList = DcyRpcBootstrap.getInstance().getConfiguration().getRegistryConfig().getRegistry().lookup(serviceName, group);
// 略...
}
// 略...
}
修改core模块下的discovery.impl
包下的ZookeeperRegistry
类:lookup方法
- 拼接分组信息
String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" + group;