RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)
压缩报文
- 对报文进行压缩
- a.报文压缩
- b.负载均衡
- c.使用模板方法优化负载均衡
- d.一致性hash-负载均衡算法
- d.1) 介绍
- d.2) 实现
- e.实现心跳检测
- f.最短响应时间的负载均衡策略
对报文进行压缩
a.报文压缩
在core模块下创建compress
包
在该包下创建Compressor
接口:压缩与解压缩的接口
/**
* 压缩与解压缩的接口
*/
public interface Compressor {
/**
* 压缩
* @param bytes
* @return
*/
byte[] compress(byte[] bytes);
/**
* 解压缩
* @param bytes
* @return
*/
byte[] decompress(byte[] bytes);
}
在该包下创建CompressWrapper
类:压缩与解压缩的包装类
/**
* 压缩与解压缩的包装类
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class CompressWrapper {
private byte code;
private String type;
private Compressor compressor;
}
在该包下创建CompressorFactory
类:压缩工厂类
/**
* 压缩工厂类
*/
public class CompressorFactory {
private final static Map<String, CompressWrapper> COMPRESSOR_CACHE = new ConcurrentHashMap<>(8);
private final static Map<Byte, CompressWrapper> COMPRESSOR_CODE = new ConcurrentHashMap<>(8);
static {
CompressWrapper gzip = new CompressWrapper((byte) 1, "gzip", new GzipCompressor());
COMPRESSOR_CACHE.put("gzip", gzip);
COMPRESSOR_CODE.put((byte) 1, gzip);
}
/**
* 使用工厂方法获取一个CompressWrapper
* @param compressorType 压缩的类型
* @return
*/
public static CompressWrapper getCompressor(String compressorType) {
return COMPRESSOR_CACHE.get(compressorType);
}
public static CompressWrapper getCompressor(byte compressorCode) {
return COMPRESSOR_CODE.get(compressorCode);
}
}
在common的exceptions
中创建CompressException
类:压缩异常处理
public class compressException extends RuntimeException{
public compressException() {
super();
}
public compressException(Throwable cause) {
super(cause);
}
}
在该包下创建impl
包,并创建GzipCompressor
类:gzip压缩的实现。实现Compressor接口
/**
* gzip压缩的实现
*/
@Slf4j
public class GzipCompressor implements Compressor {
@Override
public byte[] compress(byte[] bytes) {
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
) {
gzipOutputStream.write(bytes);
gzipOutputStream.finish();
byte[] result = byteArrayOutputStream.toByteArray();
log.info("对字节数组进行压缩, 长度【{}】压缩至【{}】", bytes.length, result.length);
return result;
} catch (IOException e) {
log.error("对字节数组进行压缩时发生异常", e);
throw new CompressException(e);
}
}
@Override
public byte[] decompress(byte[] bytes) {
try (
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
GZIPInputStream gzipInputStream = new GZIPInputStream(bais);
) {
byte[] result = gzipInputStream.readAllBytes();
log.info("对字节数组进行解压, 长度【{}】解压至【{}】", bytes.length, result.length);
return result;
} catch (IOException e) {
log.error("对字节数组进行解压时发生异常", e);
throw new CompressException(e);
}
}
}
在DcyRpcBootstrap
类中添加COMPRESS_TYPE
的默认常量,也可以通过方法进行修改
// 略...
public static String COMPRESS_TYPE = "gizp";
// 略...
/**
* 配置压缩的方式
* @param compressType
* @return
*/
public DcyRpcBootstrap compress(String compressType) {
COMPRESS_TYPE = compressType;
return this;
}
修改RpcConsumerInvocationHandler
类:修改填写压缩的代码
// 略...
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
.requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
.compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
.serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
.requestType(RequestType.REQUEST.getId())
.requestPayload(requestPayload)
.build();
// 略...
修改DcyRpcRequestEncoder
类,在请求类,对请求添加有关压缩的代码
// 略...
// 1.根据配置的序列化方式进行序列化
Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
byte[] body = serializer.serializer(dcyRpcRequest.getRequestPayload());
// 2.根据配置的压缩方式进行压缩
Compressor compressor = CompressorFactory.getCompressor(dcyRpcRequest.getCompressType()).getCompressor();
body = compressor.compress(body);
// 略...
修改DcyRpcRequestDecoder
类,在响应类,对请求添加有关解压缩的代码
// 略...
// 解压缩和反序列化
// 解压缩
Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
payload = compressor.decompress(payload);
// 略...
修改DcyRpcResponseEncoder
类,在响应类,对响应添加有关压缩的代码
// 略...
// 写入请求体body(requestPayload)
// 对响应做序列化器
Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
byte[] body = serializer.serializer(dcyRpcResponse.getBody());
// 对响应做压缩
Compressor compressor = CompressorFactory.getCompressor(dcyRpcResponse.getCompressType()).getCompressor();
body = compressor.compress(body);
// 略...
修改DcyRpcResponseDecoder
类,在请求类,对响应添加有关解压缩的代码
// 略...
// 解压缩和反序列化
// 解压缩
Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
payload = compressor.decompress(payload);
// 略...
b.负载均衡
当一个服务节点无法支撑现有的访问量时,会部署多个节点,组成一个集群然后通过负载均衡,将请求分发给这个集群下的每个服务节点,从而达到多个服务节点共同分担请求压力的目的。
负载均衡可以根据不同的标准进行分类。常见的负载均衡分类:
- 1.根据工作层级
- 数据链路层负载均衡:通过分析数据链路层的信息(如MAC地址)进行负载均衡。
- 传输层负载均衡:通过分析传输层信息(如TCP/UDP端口号)进行负载均衡。例如:L4负载均衡
- 应用层负载均衡:通过分析应用层信息(如HTTP请求)进行负载均衡。例如:L7负载均衡
- 2.根据分配策略
- 轮询:按顺序将请求分配给服务器,当分配到最后一个服务器后,进行重新分配
- 加权轮询:类似于轮询,但服务器被分配一个权重,根据权重来分配请求
- 随机:随机选择一个服务器来处理请求
- 加权随机:类似于随机,但服务器被分配一个权重,根据权重来随机分配请求
- 最少连接:将请求分配给当前连接数最少的服务器
- 最短响应时间:将请求分配给响应时间最短的服务器
- 哈希:根据某个哈希值(如请求的源IP地址)来分配请求,相同哈希值的请求会被分配到同一个服务器
- 自适应:根据服务器的运行时指标(如CPU利用率、内存使用量等)动态调整分配策略
- 3.根据部署位置
- 客户端负载均衡:在客户端实现负载均衡,客户端直接选择合适的服务器进行请求
- 服务器端负载均衡:在服务器端实现负载均衡,通常有一个负载均衡器设备或软件来分配请求给后端服务器
- 全局负载均衡:在全局范围内实现负载均衡,主要用于夸数据中心或地理位置分布的场景,例如:使用DNS解析不同地域的请求到不同的服务器
在core模块下,修改com.dcyrpc.discovery.impl
包的ZookeeperRegistry
类:修改成,拉取合适的服务列表
-
修改
lookup()
方法 -
根据服务名称,返回一个服务列表
// 略...
/**
* 服务发现
* - 拉取合适的服务列表
* @param serviceName 服务名称
* @return 服务列表
*/
@Override
public List<InetSocketAddress> lookup(String serviceName) {
// 1.找到对应服务的节点
String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName;
// 2.从zk中获取它的子节点,
List<String> children = ZookeeperUtils.getChildren(zooKeeper, serviceNode, null);
// 获取所有的可用的服务列表
List<InetSocketAddress> inetSocketAddressList = children.stream().map(ipString -> {
String[] ipAndPort = ipString.split(":");
String ip = ipAndPort[0];
int port = Integer.valueOf(ipAndPort[1]);
return new InetSocketAddress(ip, port);
}).toList();
if (inetSocketAddressList.size() == 0){
throw new DiscoveryException("未发现任何可用的服务主机");
}
return inetSocketAddressList;
}
在core模块下的com.dcyrpc
创建loadbalancer
包,在包创建LoadBalancer
类:负载均衡器的接口
/**
* 负载均衡接口
* - 根据服务列表找到一个可用的服务
*/
public interface LoadBalancer {
/**
* 根据服务名找到一个可用的服务
* @param serviceName 服务名称
* @return 服务地址
*/
InetSocketAddress selectServiceAddress(String serviceName);
}
在loadbalancer
包下,创建Selector
类:轮询的负载均衡器的算法接口
public interface Selector {
/**
* 根据服务列表执行一种算法,获取一个服务节点
* @return 服务节点
*/
InetSocketAddress getNext();
// todo 服务动态上下线需要进行reBalance
void reBalance();
}
在common的exceptions
中创建LoadBalancerException
类:负载均衡异常处理
/**
* 负载均衡异常处理
*/
public class LoadBalancerException extends RuntimeException{
public LoadBalancerException() {
super();
}
public LoadBalancerException(String message) {
super(message);
}
public LoadBalancerException(Throwable cause) {
super(cause);
}
}
在DcyRpcBootstrap
类中添加getRegistry()
方法
// 略...
public static LoadBalancer LOAD_BALANCER;
// 略...
public DcyRpcBootstrap registry(RegistryConfig registryConfig) {
// 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合
// 使用 registryConfig 获取一个注册中心
this.registry = registryConfig.getRegistry();
DcyRpcBootstrap.LOAD_BALANCER = new RoundRobinLoadBalancer();
return this;
}
// 略...
public Registry getRegistry() {
return registry;
}
在loadbalancer
包下,创建RoundRobinLoadBalancer
类:轮询的负载均衡器策略
- 实现LoadBalancer接口
- 创建
RoundRobinSelector
内部类,实现Selector接口
/**
* 轮询的负载均衡策略
*/
@Slf4j
public class RoundRobinLoadBalancer implements LoadBalancer {
private Registry registry;
// 一个服务会匹配一个selector
private Map<String, Selector> cache = new ConcurrentHashMap<>(8);
public RoundRobinLoadBalancer() {
this.registry = DcyRpcBootstrap.getInstance().getRegistry();
}
@Override
public InetSocketAddress selectServiceAddress(String serviceName) {
// 优先从缓存中获取一个选择器
Selector selector = cache.get(serviceName);
// 如果为空,则需要为这个service创建一个selector
if (selector == null) {
// 这个负载均衡器,内部要维护服务列表,作为缓存
List<InetSocketAddress> serviceList = this.registry.lookup(serviceName);
// 提供一些算法
selector = new RoundRobinSelector(serviceList);
// 将selector放入缓存当中
cache.put(serviceName, selector);
}
// 获取可用节点
return selector.getNext();
}
private static class RoundRobinSelector implements Selector {
private List<InetSocketAddress> serviceList;
private AtomicInteger index;
public RoundRobinSelector(List<InetSocketAddress> serviceList) {
this.serviceList = serviceList;
this.index = new AtomicInteger(0);
}
@Override
public InetSocketAddress getNext() {
if (serviceList == null || serviceList.size() == 0) {
log.error("进行负载均衡选取节点时发生服务列表为空");
throw new LoadBalancerException();
}
InetSocketAddress address = serviceList.get(index.get());
// index如果到了最后一个位置,重置
if (index.get() == serviceList.size() - 1) {
index.set(0);
} else {
// 游标后移一位
index.incrementAndGet();
}
return address;
}
@Override
public void reBalance() {
}
}
}
修改RpcConsumerInvocationHandler
类:修改invoke()
方法
- 添加负载均衡器的方法
// 略...
// 1.发现服务,从注册中心,寻找一个可用的服务
// - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
// 获取当前配置的负载均衡器,选取一个可用的节点
InetSocketAddress address = DcyRpcBootstrap.LOAD_BALANCER.selectServiceAddress(interfaceRef.getName());
if (log.isInfoEnabled()){
log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
}
// 略...
在core下com.dcyrpc.discovery.impl
包下,修改ZookeeperRegistry
类:将固定的端口号改成通过启动类获取DcyRpcBootstrap.PORT
// 创建本机的临时节点,ip:port
// 服务提供方的端口(一般自己设定),还需要获取ip的方法
// /dcyrpc-metadata/providers/com.dcyrpc.DcyRpc/192.168.195.1:8088
// TODO:后续处理端口问题
String node = parentNode + "/" + NetUtils.getIp() + ":" + DcyRpcBootstrap.PORT;
在DcyRpcBootstrap
类中
- 添加静态端口常量
- 修改
start()
方法:重写新的端口
// 略...
public static int PORT = 8088;
// 略...
public void start() {
// 略...
// 4.绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
// 略...
}
c.使用模板方法优化负载均衡
在core模块下的loadbalancer
包下,创建AbstractLoadBalancer
抽象类
public abstract class AbstractLoadBalancer implements LoadBalancer{
// 一个服务会匹配一个selector
private Map<String, Selector> cache = new ConcurrentHashMap<>(8);
@Override
public InetSocketAddress selectServiceAddress(String serviceName) {
// 优先从缓存中获取一个选择器
Selector selector = cache.get(serviceName);
// 如果为空,则需要为这个service创建一个selector
if (selector == null) {
// 这个负载均衡器,内部要维护服务列表,作为缓存
List<InetSocketAddress> serviceList = DcyRpcBootstrap.getInstance().getRegistry().lookup(serviceName);
// 提供一些算法
selector = getSelector(serviceList);
// 将selector放入缓存当中
cache.put(serviceName, selector);
}
// 获取可用节点
return selector.getNext();
}
/**
* 由子类进行拓展
* @param serviceList 服务列表
* @return 负载均衡算法选择器
*/
protected abstract Selector getSelector(List<InetSocketAddress> serviceList);
}
在core模块下的loadbalancer
包下,修改RoundRobinLoadBalancer
类
- 继承
AbstractLoadBalancer
抽象类
/**
* 轮询的负载均衡策略
*/
@Slf4j
public class RoundRobinLoadBalancer extends AbstractLoadBalancer {
@Override
protected Selector getSelector(List<InetSocketAddress> serviceList) {
return new RoundRobinSelector(serviceList);
}
/**
* 轮询的负载均衡具体实现算法
*/
private static class RoundRobinSelector implements Selector {
private List<InetSocketAddress> serviceList;
private AtomicInteger index;
public RoundRobinSelector(List<InetSocketAddress> serviceList) {
this.serviceList = serviceList;
this.index = new AtomicInteger(0);
}
@Override
public InetSocketAddress getNext() {
if (serviceList == null || serviceList.size() == 0) {
log.error("进行负载均衡选取节点时发生服务列表为空");
throw new LoadBalancerException();
}
InetSocketAddress address = serviceList.get(index.get());
// index如果到了最后一个位置,重置
if (index.get() == serviceList.size() - 1) {
index.set(0);
} else {
// 游标后移一位
index.incrementAndGet();
}
return address;
}
@Override
public void reBalance() {
}
}
}
d.一致性hash-负载均衡算法
d.1) 介绍
按照传统的hash算法思路,我们需要构建一张hash表,将服务器挂载在hash表中:根据请求的要素,与服务器的数量取余
- 有缓存
- 长连接
但是,这样的方式会存在很多问题,如动态扩容的问题。比如,随着业务量增长,将原有的六个服务扩容至八个,此时,我们不仅要修改路由表,还要修改hash的路由策略。
一致性hash借鉴了hash算法的部分能力做了如下的设计:
- 1.将hash值均匀的分布在一个区间,我们一般将区间设置为整形的取值范围(-2的31次方 ~ 2的31次方 -1)当然这个范围也可以是(0 ~ 2的32次方 -1),只要是一个合理的容易计算的足够大的范围即可。
- 2.将这个区间构建成一个环,构建成环不一定必须要链表,其实很多的有序的数据结构都可以,比如数组,比如红黑树,只要加上一点点逻辑,就是数完最后一个回到第一个节点就可以了。
- 3.将服务器按照自身的特点,计算hash值,并将其挂载在hash表中。
当请求进来以后,根据请求的部分特征,如url、请求id,请求来源等信息进行hash运算,看请求落在哪个范围,然后顺时针找到第一个服务器即可,这样最大的好处就是当有新的服务加入集群只需要将服务挂载在hash环即可,但是后自然会有流量进入该服务器,而不需要修改任何的逻辑,因为我们的hash环足够大,所以可以容纳的机器也很多。
**问题:**但是此时会出现一个问题,如果节点过少,hash分布不均匀会产生严重的流量倾斜:
为了解决这个问题,我们就需要引入虚拟节点的概念,我们可以将一个真实节点化身为n个(比如128)虚拟节点,每个虚拟节点都指向同一个服务,分别对虚拟节点进行hash,可以让一个服务的虚拟节点大致均匀的分布在hash环上,
d.2) 实现
在core模块下的loadbalancer.impl
包中,创建ConsistentHashBalancer
类,继承AbstractLoadBalancer
/**
* 一致性哈希的负载均衡策略
*/
@Slf4j
public class ConsistentHashBalancer extends AbstractLoadBalancer {
@Override
protected Selector getSelector(List<InetSocketAddress> serviceList) {
return new ConsistentHashSelector(serviceList, 128);
}
/**
* 一致性哈希的具体实现算法
*/
private static class ConsistentHashSelector implements Selector {
// hash环用来存储服务器节点
private SortedMap<Integer, InetSocketAddress> circle = new TreeMap<>();
// 虚拟节点的个数
private int virtualNodes;
public ConsistentHashSelector(List<InetSocketAddress> serviceList, int virtualNodes) {
// 将节点转换为虚拟节点,进行过载
this.virtualNodes = virtualNodes;
for (InetSocketAddress inetSocketAddress : serviceList) {
// 把每一个节点加入到哈希环中
addNodeToCircle(inetSocketAddress);
}
}
@Override
public InetSocketAddress getNext() {
// 获取请求的要素
DcyRpcRequest dcyRpcRequest = DcyRpcBootstrap.REQUEST_THREAD_LOCAL.get();
// 对请求的要素做处理,
// 根据请求的一些特征来选择服务器 id
String requestId = Long.toString(dcyRpcRequest.getRequestId());
// 对请求的id做hash,字符串默认的hash不太好
int hash = hash(requestId);
// 判断该hash值是否能直接落在一个服务器上(是否和服务器的hash一样)
if (!circle.containsKey(hash)) {
// 寻找最近的节点
SortedMap<Integer, InetSocketAddress> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
@Override
public void reBalance() {
}
/**
* 将每一个节点挂载到哈希环
* @param inetSocketAddress 节点的地址
*/
private void addNodeToCircle(InetSocketAddress inetSocketAddress) {
// 为每一个节点生成匹配的虚拟节点进行挂载
for (int i = 0; i < virtualNodes; i++) {
int hash = hash(inetSocketAddress.toString() + "-" + i);
// 挂载到hash环上
circle.put(hash, inetSocketAddress);
log.info("hash为【{}】的节点已经挂载到了哈希环上", hash);
}
}
private void removeNodeFromCircle(InetSocketAddress inetSocketAddress) {
// 为每一个节点生成匹配的虚拟节点进行挂载
for (int i = 0; i < virtualNodes; i++) {
int hash = hash(inetSocketAddress.toString() + "-" + i);
// 挂载到hash环上
circle.remove(hash);
}
}
/**
* 具体的哈希算法
* @param s
* @return
*/
private int hash(String s) {
MessageDigest md;
try {
md = MessageDigest.getInstance("md5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
byte[] digest = md.digest(s.getBytes());
int res = 0;
// md5得到的结果是一个字节数组,但是我们要int 4个字节
for (int i = 0; i < digest.length; i++) {
res = res << 8;
if (digest[i] < 0){
res = res | (digest[i] & 255);
} else {
res = res | digest[i];
}
System.out.println(Integer.toBinaryString(digest[i]));
}
return res;
}
}
}
在DcyRpcBootstrap
类中
- 添加 ThreadLocal 本地线程
- LOAD_BALANCER 改为 一致性哈希
// 略...
public static final ThreadLocal<DcyRpcRequest> REQUEST_THREAD_LOCAL = new ThreadLocal<>();
// 略...
public DcyRpcBootstrap registry(RegistryConfig registryConfig) {
// 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合
// 使用 registryConfig 获取一个注册中心
this.registry = registryConfig.getRegistry();
DcyRpcBootstrap.LOAD_BALANCER = new ConsistentHashBalancer();
return this;
}
在RpcConsumerInvocationHandler
类中:
- 将请求存入本地线程,需要在合适的时候调用remove方法
- 调整
invoke()
方法的逻辑顺序
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/**
* ---------------------------封装报文---------------------------
*/
// 1.封装报文
RequestPayload requestPayload = RequestPayload.builder()
.interfaceName(interfaceRef.getName())
.methodName(method.getName())
.parametersType(method.getParameterTypes())
.parametersValue(args)
.returnType(method.getReturnType())
.build();
// TODO 需要对各个请求的操作做处理
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
.requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
.compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
.serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
.requestType(RequestType.REQUEST.getId())
.requestPayload(requestPayload)
.build();
// 请求存入本地线程
DcyRpcBootstrap.REQUEST_THREAD_LOCAL.set(dcyRpcRequest);
// 2.发现服务,从注册中心拉取服务列表,并通过客户端负载均衡寻找一个可用的服务
// - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
// 获取当前配置的负载均衡器,选取一个可用的节点
InetSocketAddress address = DcyRpcBootstrap.LOAD_BALANCER.selectServiceAddress(interfaceRef.getName());
if (log.isInfoEnabled()){
log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
}
// 2.尝试获取一个可用的通道
Channel channel = getAvailableChannel(address);
if (log.isInfoEnabled()){
log.info("获取了和【{}】建立的连接通道,准备发送数据", address);
}
/**
* ---------------------------异步策略---------------------------
*/
// 4.写出报文
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
// 将completableFuture暴露出去
DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);
// 直接使用writeAndFlush 写出一个请求,这个请求的实例就会进入pipeline执行出栈的一系列操作
channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
// 需要处理异常
if (!promise.isSuccess()) {
completableFuture.completeExceptionally(promise.cause());
}
});
// 清理threadLocal
DcyRpcBootstrap.REQUEST_THREAD_LOCAL.remove();
// 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
// 在Netty的pipeline中最终的handler的处理结果 调用complete
// 5.获得响应的结果
return completableFuture.get(10, TimeUnit.SECONDS);
}
e.实现心跳检测
定期向所有的channel发送一个简单的请求即可,如果能得到回应说明连接是正常的。
其中我们要在心跳探测的过程中完成以下几项工作:
1、如果可以正常访问,记录响应时间,以备后用。
2、如果不能正常访问,则进行重试,重试三次依旧不能访问,则从健康服务列表中剔除,以后的访问不会使用该连接。
注意:重试的等待时间我们选取一个合适范围内的随机时间,这样可以避免局域网络问题导致的大面积同时重试,产生重试风暴
在core模块下的transport.message
包的DcyRpcRequest
请求类上:添加时间戳变量
// 时间戳
private long timeStamp;
在core模块下的transport.message
包的DcyRpcResponse
响应类上:添加时间戳变量
// 时间戳
private long timeStamp;
修改RpcConsumerInvocationHandler
类:
- 请求体增加时间戳
- 修改请求类型的id
// 略...
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
.requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
.compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
.serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
.requestType(RequestType.REQUEST.getId())
.timeStamp(new Date().getTime())
.requestPayload(requestPayload)
.build();
// 略...
修改MySimpleChannelInboundHandler
类:
- 修改请求类型的id
// 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(dcyRpcResponse.getRequestId());
修改DcyRpcRequestEncoder
请求类编码器:
- 添加时间戳
- 添加 RequestPayload是否为空 的判断逻辑
// 略...
// 8个字节的请求id
byteBuf.writeLong(dcyRpcRequest.getRequestId());
// 时间戳
byteBuf.writeLong(dcyRpcRequest.getTimeStamp());
// 写入请求体body(requestPayload)
// 1.根据配置的序列化方式进行序列化
byte[] body = null;
if (dcyRpcRequest.getRequestPayload() != null) {
Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
body = serializer.serializer(dcyRpcRequest.getRequestPayload());
// 2.根据配置的压缩方式进行压缩
Compressor compressor = CompressorFactory.getCompressor(dcyRpcRequest.getCompressType()).getCompressor();
body = compressor.compress(body);
}
修改DcyRpcRequestDecoder
响应类解码器:
- 添加时间戳
- 添加 payload是否为空 的判断逻辑
// 略...
// 9.时间戳
long timeStamp = byteBuf.readLong();
// 略...
dcyRpcRequest.setTimeStamp(timeStamp);
// 略...
// 解压缩
if (payload != null && payload.length != 0) {
Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
payload = compressor.decompress(payload);
// 反序列化
Serializer serializer = SerializerFactory.getSerializer(serializeType).getSerializer();
RequestPayload requestPayload = serializer.deserialize(payload, RequestPayload.class);
dcyRpcRequest.setRequestPayload(requestPayload);
}
修改DcyRpcResponseEncoder
响应类编码器:
- 添加时间戳
- 添加 body是否为空 的判断逻辑
// 略...
// 时间戳
byteBuf.writeLong(dcyRpcResponse.getTimeStamp());
// 写入请求体body(requestPayload)
// 对响应做序列化器
byte[] body = null;
if (dcyRpcResponse.getBody() != null) {
Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
body = serializer.serializer(dcyRpcResponse.getBody());
// 对响应做压缩
Compressor compressor = CompressorFactory.getCompressor(dcyRpcResponse.getCompressType()).getCompressor();
body = compressor.compress(body);
}
// 略...
修改DcyRpcResponseDecoder
响应类解码器:
- 添加时间戳
- 添加 payload是否为空 的判断逻辑
// 略...
// 9.解析时间戳
long timeStamp = byteBuf.readLong();
// 略...
dcyRpcResponse.setTimeStamp(timeStamp);
// 略...
if (payload != null && payload.length > 0) {
// 解压缩和反序列化
// 解压缩
Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
payload = compressor.decompress(payload);
Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
Object body = serializer.deserialize(payload, Object.class);
dcyRpcResponse.setBody(body);
}
在core模块下的transport.message
包的MessageFormatConstant
常量上:
- 修改
HEADER_LENGTH
头部信息的长度
// 头部信息的长度
public final static short HEADER_LENGTH = (byte) (MAGIC.length + 1 + 2 + 4 + 1 + 1 + 1 + 8 + 8);
修改MethodCallHandler
类的channelRead0()
方法
- 添加 请求类型是否为心跳请求 的判断逻辑
// 2.根据负载内容进行方法调用
Object result = null;
if (dcyRpcRequest.getRequestType() != RequestType.HEART_BEAT.getId()) {
result = callTargetMethod(requestPayload);
log.info("请求【{}】已经在Provider端完成方法调用", dcyRpcRequest.getRequestId());
}
在DcyRpcBootstrap
类中
- 添加静态的响应时间的TreeMap(有序)
- 开启心跳检测
// 略...
public static final TreeMap<InetSocketAddress, Channel> ANSWER_TIME_CHANNEL_CACHE = new TreeMap<>();
// 略...
public DcyRpcBootstrap reference(ReferenceConfig<?> reference) {
// 启动心跳检测
log.info("开始心跳检测");
HeartbeatDetector.detectHeartbeat(reference.getInterface().getName());
// 略...
}
在core模块下创建core
包,
在该包下创建HeartbeatDetector
类:心跳检测器
- 通过线程池开启守护线程,每隔2秒发送心跳请求
/**
* 心跳检测器
*/
@Slf4j
public class HeartbeatDetector {
public static void detectHeartbeat(String serviceName) {
// 从注册中心拉取服务列表并建立连接
Registry registry = DcyRpcBootstrap.getInstance().getRegistry();
List<InetSocketAddress> addresses = registry.lookup(serviceName);
// 将连接进行缓存
for (InetSocketAddress address : addresses) {
try {
if (!DcyRpcBootstrap.CHANNEL_CACHE.containsKey(address)) {
Channel channel = NettyBootstrapInitializer.getBootstrap().connect(address).sync().channel();
DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 定期发送消息
Thread thread = new Thread(() -> {
new Timer().scheduleAtFixedRate(new MyTimerTask(), 0, 2000);
}, "dcyRpc-HeartbeatDetector-thread");
thread.setDaemon(true);
thread.start();
}
private static class MyTimerTask extends TimerTask {
@Override
public void run() {
// 每次启动将响应时长的map清空
DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.clear();
// 遍历所有的channel
Map<InetSocketAddress, Channel> channelCache = DcyRpcBootstrap.CHANNEL_CACHE;
for (Map.Entry<InetSocketAddress, Channel> entry : channelCache.entrySet()) {
Channel channel = entry.getValue();
long start = System.currentTimeMillis();
// 构建心跳请求
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
.requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
.compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
.serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
.requestType(RequestType.HEART_BEAT.getId())
.timeStamp(start)
.build();
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
DcyRpcBootstrap.PENDING_REQUEST.put(dcyRpcRequest.getRequestId(), completableFuture);
channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
if (!promise.isSuccess()) {
completableFuture.completeExceptionally(promise.cause());
}
});
long endTime = 0L;
try {
completableFuture.get();
endTime = System.currentTimeMillis();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
Long time = endTime - start;
// 使用TreeMap进行缓存
DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.put(time, channel);
log.info("和【{}】服务器的响应时间是【{}】", entry.getKey() ,time);
}
}
}
}
f.最短响应时间的负载均衡策略
在core模块下com.dcyrpc.loadbalancer.impl
包下,创建MinResponseTimeLoadBalancer
类:最短响应时间的负载均衡策略
- 继承
AbstractLoadBalancer
/**
* 最短响应时间的负载均衡策略
*/
@Slf4j
public class MinResponseTimeLoadBalancer extends AbstractLoadBalancer {
@Override
protected Selector getSelector(List<InetSocketAddress> serviceList) {
return new MinResponseTimeSelector(serviceList);
}
/**
* 最短响应时间的负载均衡具体实现算法
*/
private static class MinResponseTimeSelector implements Selector {
public MinResponseTimeSelector(List<InetSocketAddress> serviceList) {
}
@Override
public InetSocketAddress getNext() {
Map.Entry<Long, Channel> entry = DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.firstEntry();
if (entry != null) {
return (InetSocketAddress) entry.getValue().remoteAddress();
}
// 直接从缓存中获取一个可用
Channel channel = (Channel) DcyRpcBootstrap.CHANNEL_CACHE.values().toArray()[0];
return (InetSocketAddress) channel.remoteAddress();
}
@Override
public void reBalance() {
}
}
}