手写RPC框架--8.压缩报文

news2024/11/24 8:45:28

RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)

压缩报文

  • 对报文进行压缩
    • a.报文压缩
    • b.负载均衡
    • c.使用模板方法优化负载均衡
    • d.一致性hash-负载均衡算法
      • d.1) 介绍
      • d.2) 实现
    • e.实现心跳检测
    • f.最短响应时间的负载均衡策略

对报文进行压缩

a.报文压缩

在core模块下创建compress

在该包下创建Compressor接口:压缩与解压缩的接口

/**
 * 压缩与解压缩的接口
 */
public interface Compressor {
    /**
     * 压缩
     * @param bytes
     * @return
     */
    byte[] compress(byte[] bytes);

    /**
     * 解压缩
     * @param bytes
     * @return
     */
    byte[] decompress(byte[] bytes);
}

在该包下创建CompressWrapper类:压缩与解压缩的包装类

/**
 * 压缩与解压缩的包装类
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class CompressWrapper {
    private byte code;
    private String type;
    private Compressor compressor;
}

在该包下创建CompressorFactory类:压缩工厂类

/**
 * 压缩工厂类
 */
public class CompressorFactory {

    private final static Map<String, CompressWrapper> COMPRESSOR_CACHE = new ConcurrentHashMap<>(8);
    private final static Map<Byte, CompressWrapper> COMPRESSOR_CODE = new ConcurrentHashMap<>(8);

    static {
        CompressWrapper gzip = new CompressWrapper((byte) 1, "gzip", new GzipCompressor());

        COMPRESSOR_CACHE.put("gzip", gzip);

        COMPRESSOR_CODE.put((byte) 1, gzip);
    }

    /**
     * 使用工厂方法获取一个CompressWrapper
     * @param compressorType 压缩的类型
     * @return
     */
    public static CompressWrapper getCompressor(String compressorType) {
        return COMPRESSOR_CACHE.get(compressorType);
    }

    public static CompressWrapper getCompressor(byte compressorCode) {
        return COMPRESSOR_CODE.get(compressorCode);
    }
}

在common的exceptions中创建CompressException类:压缩异常处理

public class compressException extends RuntimeException{
    public compressException() {
        super();
    }

    public compressException(Throwable cause) {
        super(cause);
    }
}

在该包下创建impl包,并创建GzipCompressor类:gzip压缩的实现。实现Compressor接口

/**
 * gzip压缩的实现
 */
@Slf4j
public class GzipCompressor implements Compressor {
    @Override
    public byte[] compress(byte[] bytes) {

        try (
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
        ) {
            gzipOutputStream.write(bytes);
            gzipOutputStream.finish();
            byte[] result = byteArrayOutputStream.toByteArray();

            log.info("对字节数组进行压缩, 长度【{}】压缩至【{}】", bytes.length, result.length);
            return result;
        } catch (IOException e) {
            log.error("对字节数组进行压缩时发生异常", e);
            throw new CompressException(e);
        }
    }

    @Override
    public byte[] decompress(byte[] bytes) {

        try (
                ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
                GZIPInputStream gzipInputStream = new GZIPInputStream(bais);
        ) {
            byte[] result = gzipInputStream.readAllBytes();
            log.info("对字节数组进行解压, 长度【{}】解压至【{}】", bytes.length, result.length);
            return result;
        } catch (IOException e) {
            log.error("对字节数组进行解压时发生异常", e);
            throw new CompressException(e);
        }
    }
}

DcyRpcBootstrap类中添加COMPRESS_TYPE的默认常量,也可以通过方法进行修改

// 略...

public static String COMPRESS_TYPE = "gizp";

// 略...

/**
 * 配置压缩的方式
 * @param compressType
 * @return
 */
public DcyRpcBootstrap compress(String compressType) {
    COMPRESS_TYPE = compressType;
    return this;
}

修改RpcConsumerInvocationHandler类:修改填写压缩的代码

// 略...
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
        .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
        .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
        .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
        .requestType(RequestType.REQUEST.getId())
        .requestPayload(requestPayload)
        .build();
// 略...

修改DcyRpcRequestEncoder类,在请求类,对请求添加有关压缩的代码

// 略...
// 1.根据配置的序列化方式进行序列化
Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
byte[] body = serializer.serializer(dcyRpcRequest.getRequestPayload());

// 2.根据配置的压缩方式进行压缩
Compressor compressor = CompressorFactory.getCompressor(dcyRpcRequest.getCompressType()).getCompressor();
body = compressor.compress(body);

// 略...

修改DcyRpcRequestDecoder类,在响应类,对请求添加有关解压缩的代码

// 略...
// 解压缩和反序列化
// 解压缩
Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
payload = compressor.decompress(payload);
// 略...

修改DcyRpcResponseEncoder类,在响应类,对响应添加有关压缩的代码

// 略...
// 写入请求体body(requestPayload)
// 对响应做序列化器
Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
byte[] body = serializer.serializer(dcyRpcResponse.getBody());

// 对响应做压缩
Compressor compressor = CompressorFactory.getCompressor(dcyRpcResponse.getCompressType()).getCompressor();
body = compressor.compress(body);
// 略...

修改DcyRpcResponseDecoder类,在请求类,对响应添加有关解压缩的代码

// 略...
// 解压缩和反序列化
// 解压缩
Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
payload = compressor.decompress(payload);
// 略...

b.负载均衡

当一个服务节点无法支撑现有的访问量时,会部署多个节点,组成一个集群然后通过负载均衡,将请求分发给这个集群下的每个服务节点,从而达到多个服务节点共同分担请求压力的目的。

负载均衡可以根据不同的标准进行分类。常见的负载均衡分类:

  • 1.根据工作层级
    • 数据链路层负载均衡:通过分析数据链路层的信息(如MAC地址)进行负载均衡。
    • 传输层负载均衡:通过分析传输层信息(如TCP/UDP端口号)进行负载均衡。例如:L4负载均衡
    • 应用层负载均衡:通过分析应用层信息(如HTTP请求)进行负载均衡。例如:L7负载均衡
  • 2.根据分配策略
    • 轮询:按顺序将请求分配给服务器,当分配到最后一个服务器后,进行重新分配
    • 加权轮询:类似于轮询,但服务器被分配一个权重,根据权重来分配请求
    • 随机:随机选择一个服务器来处理请求
    • 加权随机:类似于随机,但服务器被分配一个权重,根据权重来随机分配请求
    • 最少连接:将请求分配给当前连接数最少的服务器
    • 最短响应时间:将请求分配给响应时间最短的服务器
    • 哈希:根据某个哈希值(如请求的源IP地址)来分配请求,相同哈希值的请求会被分配到同一个服务器
    • 自适应:根据服务器的运行时指标(如CPU利用率、内存使用量等)动态调整分配策略
  • 3.根据部署位置
    • 客户端负载均衡:在客户端实现负载均衡,客户端直接选择合适的服务器进行请求
    • 服务器端负载均衡:在服务器端实现负载均衡,通常有一个负载均衡器设备或软件来分配请求给后端服务器
    • 全局负载均衡:在全局范围内实现负载均衡,主要用于夸数据中心或地理位置分布的场景,例如:使用DNS解析不同地域的请求到不同的服务器

在core模块下,修改com.dcyrpc.discovery.impl包的ZookeeperRegistry类:修改成,拉取合适的服务列表

  • 修改lookup()方法

  • 根据服务名称,返回一个服务列表

// 略...
/**
 * 服务发现
 *  - 拉取合适的服务列表
 * @param serviceName 服务名称
 * @return 服务列表
 */
@Override
public List<InetSocketAddress> lookup(String serviceName) {
    // 1.找到对应服务的节点
    String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName;

    // 2.从zk中获取它的子节点,
    List<String> children = ZookeeperUtils.getChildren(zooKeeper, serviceNode, null);
    // 获取所有的可用的服务列表
    List<InetSocketAddress> inetSocketAddressList = children.stream().map(ipString -> {
        String[] ipAndPort = ipString.split(":");
        String ip = ipAndPort[0];
        int port = Integer.valueOf(ipAndPort[1]);
        return new InetSocketAddress(ip, port);
    }).toList();

    if (inetSocketAddressList.size() == 0){
        throw new DiscoveryException("未发现任何可用的服务主机");
    }

    return inetSocketAddressList;
}

在core模块下的com.dcyrpc创建loadbalancer包,在包创建LoadBalancer类:负载均衡器的接口

/**
 * 负载均衡接口
 * - 根据服务列表找到一个可用的服务
 */
public interface LoadBalancer {
    /**
     * 根据服务名找到一个可用的服务
     * @param serviceName 服务名称
     * @return 服务地址
     */
    InetSocketAddress selectServiceAddress(String serviceName);
}

loadbalancer包下,创建Selector类:轮询的负载均衡器的算法接口

public interface Selector {

    /**
     * 根据服务列表执行一种算法,获取一个服务节点
     * @return 服务节点
     */
    InetSocketAddress getNext();

    // todo 服务动态上下线需要进行reBalance
    void reBalance();
}

在common的exceptions中创建LoadBalancerException类:负载均衡异常处理

/**
 * 负载均衡异常处理
 */
public class LoadBalancerException extends RuntimeException{
    public LoadBalancerException() {
        super();
    }

    public LoadBalancerException(String message) {
        super(message);
    }

    public LoadBalancerException(Throwable cause) {
        super(cause);
    }
}

DcyRpcBootstrap类中添加getRegistry()方法

// 略... 
public static LoadBalancer LOAD_BALANCER;

// 略... 
public DcyRpcBootstrap registry(RegistryConfig registryConfig) {
    // 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合
    // 使用 registryConfig 获取一个注册中心
    this.registry = registryConfig.getRegistry();
    DcyRpcBootstrap.LOAD_BALANCER = new RoundRobinLoadBalancer();
    return this;
}

// 略... 
public Registry getRegistry() {
    return registry;
}

loadbalancer包下,创建RoundRobinLoadBalancer类:轮询的负载均衡器策略

  • 实现LoadBalancer接口
  • 创建RoundRobinSelector内部类,实现Selector接口
/**
 * 轮询的负载均衡策略
 */
@Slf4j
public class RoundRobinLoadBalancer implements LoadBalancer {

    private Registry registry;

    // 一个服务会匹配一个selector
    private Map<String, Selector> cache = new ConcurrentHashMap<>(8);

    public RoundRobinLoadBalancer() {
        this.registry = DcyRpcBootstrap.getInstance().getRegistry();
    }

    @Override
    public InetSocketAddress selectServiceAddress(String serviceName) {

        // 优先从缓存中获取一个选择器
        Selector selector = cache.get(serviceName);

        // 如果为空,则需要为这个service创建一个selector
        if (selector == null) {
            // 这个负载均衡器,内部要维护服务列表,作为缓存
            List<InetSocketAddress> serviceList = this.registry.lookup(serviceName);

            // 提供一些算法
            selector = new RoundRobinSelector(serviceList);

            // 将selector放入缓存当中
            cache.put(serviceName, selector);
        }

        // 获取可用节点
        return selector.getNext();
    }

    private static class RoundRobinSelector implements Selector {

        private List<InetSocketAddress> serviceList;
        private AtomicInteger index;

        public RoundRobinSelector(List<InetSocketAddress> serviceList) {
            this.serviceList = serviceList;
            this.index = new AtomicInteger(0);
        }

        @Override
        public InetSocketAddress getNext() {
            if (serviceList == null || serviceList.size() == 0) {
                log.error("进行负载均衡选取节点时发生服务列表为空");
                throw new LoadBalancerException();
            }

            InetSocketAddress address = serviceList.get(index.get());

            // index如果到了最后一个位置,重置
            if (index.get() == serviceList.size() - 1) {
                index.set(0);
            } else {
                // 游标后移一位
            	index.incrementAndGet();
            }
            
            return address;
        }

        @Override
        public void reBalance() {
        }
    }
}

修改RpcConsumerInvocationHandler类:修改invoke()方法

  • 添加负载均衡器的方法
// 略...
// 1.发现服务,从注册中心,寻找一个可用的服务
//  - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)

// 获取当前配置的负载均衡器,选取一个可用的节点
InetSocketAddress address = DcyRpcBootstrap.LOAD_BALANCER.selectServiceAddress(interfaceRef.getName());
if (log.isInfoEnabled()){
    log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
}

// 略...

在core下com.dcyrpc.discovery.impl包下,修改ZookeeperRegistry类:将固定的端口号改成通过启动类获取DcyRpcBootstrap.PORT

// 创建本机的临时节点,ip:port
// 服务提供方的端口(一般自己设定),还需要获取ip的方法
// /dcyrpc-metadata/providers/com.dcyrpc.DcyRpc/192.168.195.1:8088
// TODO:后续处理端口问题
String node = parentNode + "/" + NetUtils.getIp() + ":" + DcyRpcBootstrap.PORT;

DcyRpcBootstrap类中

  • 添加静态端口常量
  • 修改start()方法:重写新的端口
// 略...
public static int PORT = 8088;
// 略...


public void start() {
    // 略...
    // 4.绑定端口
    ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
    // 略...
}

c.使用模板方法优化负载均衡

在core模块下的loadbalancer包下,创建AbstractLoadBalancer抽象类

public abstract class AbstractLoadBalancer implements LoadBalancer{

    // 一个服务会匹配一个selector
    private Map<String, Selector> cache = new ConcurrentHashMap<>(8);

    @Override
    public InetSocketAddress selectServiceAddress(String serviceName) {

        // 优先从缓存中获取一个选择器
        Selector selector = cache.get(serviceName);

        // 如果为空,则需要为这个service创建一个selector
        if (selector == null) {
            // 这个负载均衡器,内部要维护服务列表,作为缓存
            List<InetSocketAddress> serviceList = DcyRpcBootstrap.getInstance().getRegistry().lookup(serviceName);

            // 提供一些算法
            selector = getSelector(serviceList);

            // 将selector放入缓存当中
            cache.put(serviceName, selector);
        }

        // 获取可用节点
        return selector.getNext();
    }

    /**
     * 由子类进行拓展
     * @param serviceList 服务列表
     * @return 负载均衡算法选择器
     */
    protected abstract Selector getSelector(List<InetSocketAddress> serviceList);
}

在core模块下的loadbalancer包下,修改RoundRobinLoadBalancer

  • 继承AbstractLoadBalancer抽象类
/**
 * 轮询的负载均衡策略
 */
@Slf4j
public class RoundRobinLoadBalancer extends AbstractLoadBalancer {

    @Override
    protected Selector getSelector(List<InetSocketAddress> serviceList) {
        return new RoundRobinSelector(serviceList);
    }
    
	/**
     * 轮询的负载均衡具体实现算法
     */
    private static class RoundRobinSelector implements Selector {

        private List<InetSocketAddress> serviceList;
        private AtomicInteger index;

        public RoundRobinSelector(List<InetSocketAddress> serviceList) {
            this.serviceList = serviceList;
            this.index = new AtomicInteger(0);
        }

        @Override
        public InetSocketAddress getNext() {
            if (serviceList == null || serviceList.size() == 0) {
                log.error("进行负载均衡选取节点时发生服务列表为空");
                throw new LoadBalancerException();
            }

            InetSocketAddress address = serviceList.get(index.get());

            // index如果到了最后一个位置,重置
            if (index.get() == serviceList.size() - 1) {
                index.set(0);
            } else {
                // 游标后移一位
                index.incrementAndGet();
            }


            return address;
        }

        @Override
        public void reBalance() {

        }
    }
}

d.一致性hash-负载均衡算法

d.1) 介绍

按照传统的hash算法思路,我们需要构建一张hash表,将服务器挂载在hash表中:根据请求的要素,与服务器的数量取余

  • 有缓存
  • 长连接

在这里插入图片描述

但是,这样的方式会存在很多问题,如动态扩容的问题。比如,随着业务量增长,将原有的六个服务扩容至八个,此时,我们不仅要修改路由表,还要修改hash的路由策略。

一致性hash借鉴了hash算法的部分能力做了如下的设计:

  • 1.将hash值均匀的分布在一个区间,我们一般将区间设置为整形的取值范围(-2的31次方 ~ 2的31次方 -1)当然这个范围也可以是(0 ~ 2的32次方 -1),只要是一个合理的容易计算的足够大的范围即可。
  • 2.将这个区间构建成一个环,构建成环不一定必须要链表,其实很多的有序的数据结构都可以,比如数组,比如红黑树,只要加上一点点逻辑,就是数完最后一个回到第一个节点就可以了。
  • 3.将服务器按照自身的特点,计算hash值,并将其挂载在hash表中。

在这里插入图片描述

当请求进来以后,根据请求的部分特征,如url、请求id,请求来源等信息进行hash运算,看请求落在哪个范围,然后顺时针找到第一个服务器即可,这样最大的好处就是当有新的服务加入集群只需要将服务挂载在hash环即可,但是后自然会有流量进入该服务器,而不需要修改任何的逻辑,因为我们的hash环足够大,所以可以容纳的机器也很多。

**问题:**但是此时会出现一个问题,如果节点过少,hash分布不均匀会产生严重的流量倾斜:

在这里插入图片描述

为了解决这个问题,我们就需要引入虚拟节点的概念,我们可以将一个真实节点化身为n个(比如128)虚拟节点,每个虚拟节点都指向同一个服务,分别对虚拟节点进行hash,可以让一个服务的虚拟节点大致均匀的分布在hash环上,

在这里插入图片描述

d.2) 实现

在core模块下的loadbalancer.impl包中,创建ConsistentHashBalancer类,继承AbstractLoadBalancer

/**
 * 一致性哈希的负载均衡策略
 */
@Slf4j
public class ConsistentHashBalancer extends AbstractLoadBalancer {

    @Override
    protected Selector getSelector(List<InetSocketAddress> serviceList) {
        return new ConsistentHashSelector(serviceList, 128);
    }


    /**
     * 一致性哈希的具体实现算法
     */
    private static class ConsistentHashSelector implements Selector {

        // hash环用来存储服务器节点
        private SortedMap<Integer, InetSocketAddress> circle = new TreeMap<>();
        // 虚拟节点的个数
        private int virtualNodes;

        public ConsistentHashSelector(List<InetSocketAddress> serviceList, int virtualNodes) {
            // 将节点转换为虚拟节点,进行过载
            this.virtualNodes = virtualNodes;
            for (InetSocketAddress inetSocketAddress : serviceList) {
                // 把每一个节点加入到哈希环中
                addNodeToCircle(inetSocketAddress);
            }
        }


        @Override
        public InetSocketAddress getNext() {
            // 获取请求的要素
            DcyRpcRequest dcyRpcRequest = DcyRpcBootstrap.REQUEST_THREAD_LOCAL.get();
            // 对请求的要素做处理,

            // 根据请求的一些特征来选择服务器 id
            String requestId = Long.toString(dcyRpcRequest.getRequestId());

            // 对请求的id做hash,字符串默认的hash不太好
            int hash = hash(requestId);

            // 判断该hash值是否能直接落在一个服务器上(是否和服务器的hash一样)
            if (!circle.containsKey(hash)) {
                // 寻找最近的节点
                SortedMap<Integer, InetSocketAddress> tailMap = circle.tailMap(hash);
                hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
            }

            return circle.get(hash);
        }

        @Override
        public void reBalance() {

        }

        /**
         * 将每一个节点挂载到哈希环
         * @param inetSocketAddress 节点的地址
         */
        private void addNodeToCircle(InetSocketAddress inetSocketAddress) {
            // 为每一个节点生成匹配的虚拟节点进行挂载
            for (int i = 0; i < virtualNodes; i++) {
                int hash = hash(inetSocketAddress.toString() + "-" + i);
                // 挂载到hash环上
                circle.put(hash, inetSocketAddress);
                log.info("hash为【{}】的节点已经挂载到了哈希环上", hash);
            }
        }

        private void removeNodeFromCircle(InetSocketAddress inetSocketAddress) {
            // 为每一个节点生成匹配的虚拟节点进行挂载
            for (int i = 0; i < virtualNodes; i++) {
                int hash = hash(inetSocketAddress.toString() + "-" + i);
                // 挂载到hash环上
                circle.remove(hash);
            }
        }

        /**
         * 具体的哈希算法
         * @param s
         * @return
         */
        private int hash(String s) {
            MessageDigest md;
            try {
                md = MessageDigest.getInstance("md5");
            } catch (NoSuchAlgorithmException e) {
                throw new RuntimeException(e);
            }

            byte[] digest = md.digest(s.getBytes());
            int res = 0;
            // md5得到的结果是一个字节数组,但是我们要int 4个字节
            for (int i = 0; i < digest.length; i++) {
                res = res << 8;
                if (digest[i] < 0){
                    res = res | (digest[i] & 255);
                } else {
                    res = res | digest[i];
                }
                System.out.println(Integer.toBinaryString(digest[i]));
            }

            return res;
        }
    }
}

DcyRpcBootstrap类中

  • 添加 ThreadLocal 本地线程
  • LOAD_BALANCER 改为 一致性哈希
// 略...
public static final ThreadLocal<DcyRpcRequest> REQUEST_THREAD_LOCAL = new ThreadLocal<>();
// 略...
public DcyRpcBootstrap registry(RegistryConfig registryConfig) {
    // 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合
    // 使用 registryConfig 获取一个注册中心
    this.registry = registryConfig.getRegistry();
    DcyRpcBootstrap.LOAD_BALANCER = new ConsistentHashBalancer();
    return this;
}

RpcConsumerInvocationHandler类中:

  • 将请求存入本地线程,需要在合适的时候调用remove方法
  • 调整 invoke()方法的逻辑顺序
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

    /**
     * ---------------------------封装报文---------------------------
     */
    // 1.封装报文
    RequestPayload requestPayload = RequestPayload.builder()
            .interfaceName(interfaceRef.getName())
            .methodName(method.getName())
            .parametersType(method.getParameterTypes())
            .parametersValue(args)
            .returnType(method.getReturnType())
            .build();

    // TODO 需要对各个请求的操作做处理

    DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
            .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
            .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
            .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
            .requestType(RequestType.REQUEST.getId())
            .requestPayload(requestPayload)
            .build();

    // 请求存入本地线程
    DcyRpcBootstrap.REQUEST_THREAD_LOCAL.set(dcyRpcRequest);

    // 2.发现服务,从注册中心拉取服务列表,并通过客户端负载均衡寻找一个可用的服务
    //  - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)

    // 获取当前配置的负载均衡器,选取一个可用的节点
    InetSocketAddress address = DcyRpcBootstrap.LOAD_BALANCER.selectServiceAddress(interfaceRef.getName());
    if (log.isInfoEnabled()){
        log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
    }

    // 2.尝试获取一个可用的通道
    Channel channel = getAvailableChannel(address);
    if (log.isInfoEnabled()){
        log.info("获取了和【{}】建立的连接通道,准备发送数据", address);
    }

    /**
     * ---------------------------异步策略---------------------------
     */

    // 4.写出报文
    CompletableFuture<Object> completableFuture = new CompletableFuture<>();
    // 将completableFuture暴露出去
    DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);

    // 直接使用writeAndFlush 写出一个请求,这个请求的实例就会进入pipeline执行出栈的一系列操作
    channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
        // 需要处理异常
        if (!promise.isSuccess()) {
            completableFuture.completeExceptionally(promise.cause());
        }
    });

    // 清理threadLocal
    DcyRpcBootstrap.REQUEST_THREAD_LOCAL.remove();

    // 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
    // 在Netty的pipeline中最终的handler的处理结果 调用complete
    // 5.获得响应的结果
    return completableFuture.get(10, TimeUnit.SECONDS);
}

e.实现心跳检测

定期向所有的channel发送一个简单的请求即可,如果能得到回应说明连接是正常的。

其中我们要在心跳探测的过程中完成以下几项工作:

1、如果可以正常访问,记录响应时间,以备后用。

2、如果不能正常访问,则进行重试,重试三次依旧不能访问,则从健康服务列表中剔除,以后的访问不会使用该连接。

注意:重试的等待时间我们选取一个合适范围内的随机时间,这样可以避免局域网络问题导致的大面积同时重试,产生重试风暴

在core模块下的transport.message包的DcyRpcRequest请求类上:添加时间戳变量

// 时间戳
private long timeStamp;

在core模块下的transport.message包的DcyRpcResponse响应类上:添加时间戳变量

// 时间戳
private long timeStamp;

修改RpcConsumerInvocationHandler类:

  • 请求体增加时间戳
  • 修改请求类型的id
// 略...
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
        .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
        .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
        .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
        .requestType(RequestType.REQUEST.getId())
        .timeStamp(new Date().getTime())
        .requestPayload(requestPayload)
        .build();
// 略...

修改MySimpleChannelInboundHandler类:

  • 修改请求类型的id
// 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(dcyRpcResponse.getRequestId());

修改DcyRpcRequestEncoder请求类编码器:

  • 添加时间戳
  • 添加 RequestPayload是否为空 的判断逻辑
// 略...
// 8个字节的请求id
byteBuf.writeLong(dcyRpcRequest.getRequestId());
// 时间戳
byteBuf.writeLong(dcyRpcRequest.getTimeStamp());

// 写入请求体body(requestPayload)
// 1.根据配置的序列化方式进行序列化
byte[] body = null;
if (dcyRpcRequest.getRequestPayload() != null) {
    Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
    body = serializer.serializer(dcyRpcRequest.getRequestPayload());

	// 2.根据配置的压缩方式进行压缩
    Compressor compressor = CompressorFactory.getCompressor(dcyRpcRequest.getCompressType()).getCompressor();
    body = compressor.compress(body);
}

修改DcyRpcRequestDecoder响应类解码器:

  • 添加时间戳
  • 添加 payload是否为空 的判断逻辑
// 略...
// 9.时间戳
long timeStamp = byteBuf.readLong();
// 略...
dcyRpcRequest.setTimeStamp(timeStamp);
// 略...
// 解压缩
if (payload != null && payload.length != 0) {
    Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
    payload = compressor.decompress(payload);

    // 反序列化
    Serializer serializer = SerializerFactory.getSerializer(serializeType).getSerializer();
    RequestPayload requestPayload = serializer.deserialize(payload, RequestPayload.class);
    dcyRpcRequest.setRequestPayload(requestPayload);
}

修改DcyRpcResponseEncoder响应类编码器:

  • 添加时间戳
  • 添加 body是否为空 的判断逻辑
// 略...
// 时间戳
byteBuf.writeLong(dcyRpcResponse.getTimeStamp());

// 写入请求体body(requestPayload)
// 对响应做序列化器
byte[] body = null;
if (dcyRpcResponse.getBody() != null) {
    Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
    body = serializer.serializer(dcyRpcResponse.getBody());

    // 对响应做压缩
    Compressor compressor = CompressorFactory.getCompressor(dcyRpcResponse.getCompressType()).getCompressor();
    body = compressor.compress(body);
}
// 略...

修改DcyRpcResponseDecoder响应类解码器:

  • 添加时间戳
  • 添加 payload是否为空 的判断逻辑
// 略...
// 9.解析时间戳
long timeStamp = byteBuf.readLong();
// 略...
dcyRpcResponse.setTimeStamp(timeStamp);
// 略...
if (payload != null && payload.length > 0) {
// 解压缩和反序列化
// 解压缩
Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
payload = compressor.decompress(payload);

Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
Object body = serializer.deserialize(payload, Object.class);

dcyRpcResponse.setBody(body);
}

在core模块下的transport.message包的MessageFormatConstant常量上:

  • 修改HEADER_LENGTH头部信息的长度
// 头部信息的长度
public final static short HEADER_LENGTH = (byte) (MAGIC.length + 1 + 2 + 4 + 1 + 1 + 1 + 8 + 8);

修改MethodCallHandler类的channelRead0()方法

  • 添加 请求类型是否为心跳请求 的判断逻辑
// 2.根据负载内容进行方法调用
Object result = null;
if (dcyRpcRequest.getRequestType() != RequestType.HEART_BEAT.getId()) {
    result = callTargetMethod(requestPayload);
    log.info("请求【{}】已经在Provider端完成方法调用", dcyRpcRequest.getRequestId());
}

DcyRpcBootstrap类中

  • 添加静态的响应时间的TreeMap(有序)
  • 开启心跳检测
// 略...
public static final TreeMap<InetSocketAddress, Channel> ANSWER_TIME_CHANNEL_CACHE = new TreeMap<>();
// 略...
public DcyRpcBootstrap reference(ReferenceConfig<?> reference) {
    // 启动心跳检测
    log.info("开始心跳检测");
    HeartbeatDetector.detectHeartbeat(reference.getInterface().getName());
	// 略...
}

在core模块下创建core包,

在该包下创建HeartbeatDetector类:心跳检测器

  • 通过线程池开启守护线程,每隔2秒发送心跳请求
/**
 * 心跳检测器
 */
@Slf4j
public class HeartbeatDetector {
    public static void detectHeartbeat(String serviceName) {
        // 从注册中心拉取服务列表并建立连接
        Registry registry = DcyRpcBootstrap.getInstance().getRegistry();
        List<InetSocketAddress> addresses = registry.lookup(serviceName);

        // 将连接进行缓存
        for (InetSocketAddress address : addresses) {
            try {
                if (!DcyRpcBootstrap.CHANNEL_CACHE.containsKey(address)) {
                    Channel channel = NettyBootstrapInitializer.getBootstrap().connect(address).sync().channel();
                    DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        // 定期发送消息
        Thread thread = new Thread(() -> {
            new Timer().scheduleAtFixedRate(new MyTimerTask(), 0, 2000);
        }, "dcyRpc-HeartbeatDetector-thread");

        thread.setDaemon(true);
        thread.start();
    }

    private static class MyTimerTask extends TimerTask {

        @Override
        public void run() {

            // 每次启动将响应时长的map清空
            DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.clear();

            // 遍历所有的channel
            Map<InetSocketAddress, Channel> channelCache = DcyRpcBootstrap.CHANNEL_CACHE;
            for (Map.Entry<InetSocketAddress, Channel> entry : channelCache.entrySet()) {
                Channel channel = entry.getValue();

                long start = System.currentTimeMillis();
                // 构建心跳请求
                DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
                        .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
                        .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
                        .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
                        .requestType(RequestType.HEART_BEAT.getId())
                        .timeStamp(start)
                        .build();

                CompletableFuture<Object> completableFuture = new CompletableFuture<>();
                DcyRpcBootstrap.PENDING_REQUEST.put(dcyRpcRequest.getRequestId(), completableFuture);
                channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
                    if (!promise.isSuccess()) {
                        completableFuture.completeExceptionally(promise.cause());
                    }
                });

                long endTime = 0L;
                try {
                    completableFuture.get();
                    endTime = System.currentTimeMillis();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }

                Long time = endTime - start;

                // 使用TreeMap进行缓存
                DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.put(time, channel);

                log.info("和【{}】服务器的响应时间是【{}】", entry.getKey() ,time);
            }
        }
    }
}

f.最短响应时间的负载均衡策略

在core模块下com.dcyrpc.loadbalancer.impl包下,创建MinResponseTimeLoadBalancer类:最短响应时间的负载均衡策略

  • 继承AbstractLoadBalancer
/**
 * 最短响应时间的负载均衡策略
 */
@Slf4j
public class MinResponseTimeLoadBalancer extends AbstractLoadBalancer {
    @Override
    protected Selector getSelector(List<InetSocketAddress> serviceList) {
        return new MinResponseTimeSelector(serviceList);
    }

    /**
     * 最短响应时间的负载均衡具体实现算法
     */
    private static class MinResponseTimeSelector implements Selector {

        public MinResponseTimeSelector(List<InetSocketAddress> serviceList) {
        }

        @Override
        public InetSocketAddress getNext() {
            Map.Entry<Long, Channel> entry = DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.firstEntry();
            if (entry != null) {
                return (InetSocketAddress) entry.getValue().remoteAddress();
            }

            // 直接从缓存中获取一个可用
            Channel channel = (Channel) DcyRpcBootstrap.CHANNEL_CACHE.values().toArray()[0];

            return (InetSocketAddress) channel.remoteAddress();
        }

        @Override
        public void reBalance() {
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/988707.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue 将public文件下的图片引入.vue文件内

data() {return {publicPath:process.env.BASE_URL,} }<div :style"{backgroundImage: url(${publicPath}images/tradingRegular_images/rectBg.png)}">11 </div>

从一个向量类中理解Python 中的特殊方法(__init__、__getitem__、__len__、__repr__、__str__)

文章目录 前言一、init、getitem、len、repr、str解释二、向量案例1.实现属于我们自己的向量2.导入向量模块 前言 特殊方法是一种具有特殊命名约定的方法&#xff0c;用来定义类的行为与功能。当满足特定条件时&#xff0c;这些方法会被自动调用&#xff0c;从而实现一些内置的…

Splunk Enterprise for Mac:卓越的数据分析与管理工具

在当今的数字化时代&#xff0c;数据已经成为企业成功的核心驱动力。然而&#xff0c;如何有效地管理和分析这些数据&#xff0c;却常常让企业感到困惑。Splunk Enterprise for Mac 是一款领先的数据分析和管理工具&#xff0c;可以帮助你解决这一难题。 Splunk Enterprise fo…

Nginx(动静分离、分配缓冲区、资源缓存、防盗链、资源压缩、IP黑白名单、大文件传输配置、跨域配置、高可用、性能优化)

Nginx&#xff0c;负载均衡&#xff0c;Http反向代理服务器&#xff0c;支持大部分协议&#xff0c;如TCP、UDP、SMTP、HTTPS 环境搭建 Nginx反向代理-负载均衡 首先通过SpringBootFreemarker快速搭建一个WEB项目&#xff1a;springboot-web-nginx&#xff0c;然后在该项目中&…

适用于Linux的Windows子系统(在VScode中开发Linux项目)

目录 前言 一、VScode扩展安装 二、挂载项目 1.连接 2.挂载&#xff08;挂载之后项目终端就是Linux了&#xff09; 3.愉快的搬砖开始了 4.前端如何通过内网 IP 本地访问到 Ubuntu 上&#xff1f; 总结 前言 系列分为三章&#xff08;从安装到项目使用&#xff09;&…

云端AI:释放企业创新力,打造智慧企业

文章目录 1. 云端AI的基本概念1.1 云计算1.2 人工智能1.3 云端AI 2. 云端AI的重要性2.1 成本效益2.2 弹性扩展2.3 无缝整合2.4 实时更新 3. 云端AI的应用领域3.1 智能客服3.2 预测分析3.3 自动化生产 4. 云端AI的未来趋势4.1 边缘计算与云端AI的融合4.2 可解释性AI4.3 隐私和安…

ORACLE的分区(一)

目录 一、分区概念 二、表分区的优点 三、分区策略 一、分区概念 随着时间的发展&#xff0c;一个表的数据会越来越多&#xff0c;当数据量增大的时候我们一般采取建立索引优化索引的方式提高查询速度&#xff0c;但是数据量再次增大即使是索引也无法提高速度&#xff0c;这时…

从金蝶云星空到聚水潭通过接口配置打通数据

从金蝶云星空到聚水潭通过接口配置打通数据 源系统:金蝶云星空 金蝶K/3Cloud&#xff08;金蝶云星空&#xff09;是移动互联网时代的新型ERP&#xff0c;是基于WEB2.0与云技术的新时代企业管理服务平台。金蝶K/3Cloud围绕着“生态、人人、体验”&#xff0c;旨在帮助企业打造面…

代码随想录Day_59打卡

①、下一个更大元素Ⅱ 给定一个循环数组 nums &#xff08; nums[nums.length - 1] 的下一个元素是 nums[0] &#xff09;&#xff0c;返回 nums 中每个元素的 下一个更大元素 。 数字 x 的 下一个更大的元素 是按数组遍历顺序&#xff0c;这个数字之后的第一个比它更大的数&am…

REST风格【SpringBoot】

1.REST简介 行为动作 通常模块名使用复数&#xff0c;也就是加s 2.RESTful入门 Controller public class UserController {RequestMapping(value "/users", method RequestMethod.POST)public String save() {System.out.println("user save");return &…

pcie 总结

用户空间pci 常用命令 lspci 查看所有pci 设备 lspci -t 树形查看所有设备 lspci -s 00:1f.6 -vvv 查看某个设备所有信息 lspci -s 00:1f.6 -vvv -xxx 增加16进制看看 sudo cat /proc/iomen | grep PCI 查看所有地址映射 如何确定pcie io空间 内存空间大小 (1)读取出基地址…

视频监控/安防监控/AI视频分析/边缘计算/TSINGSEE青犀AI算法智慧仓储解决方案

随着全球经济与科学技术的双重推动&#xff0c;我国的仓储管理已经进入了高速发展时期&#xff0c;物流仓储也由简单的储藏仓库向智能化仓储转变。TSINGSEE青犀AI智慧仓储解决方案是利用先进的信息技术和物联网技术来提高仓储管理效率、降低成本的一种仓储管理模式。 方案功能 …

如何使用PyTorch训练LLM

推荐&#xff1a;使用 NSDT场景编辑器 快速搭建3D应用场景 像LangChain这样的库促进了上述端到端AI应用程序的实现。我们的教程介绍 LangChain for Data Engineering & Data Applications 概述了您可以使用 Langchain 做什么&#xff0c;包括 LangChain 解决的问题&#xf…

Facebook营销攻略:教你集中管理Facebook Business专页及广告

Facebook 在社交媒体间是无人不识的「霸主」&#xff0c;占据着主导地位&#xff0c;2020年 Facebook 创造了 859亿美元的收入&#xff0c;当中有大约600亿美元来自 Facebook 的应用程序&#xff0c;而 Facebook App 已被下载超过50亿次。作为全球最大型的社交媒体公司之一&…

2023年7月京东饮料行业数据分析(京东运营数据分析)

饮料消费已成为当下快消品行业里的主力军&#xff0c;随着社会群体喜好的改变、消费群体的不断扩大&#xff0c;可选择的饮料种类越来越多&#xff0c;我国饮料市场的体量也较为庞大。根据鲸参谋电商数据分析平台的数据显示&#xff0c;今年7月份&#xff0c;京东平台饮料的销量…

如何获得一个Oracle 23c免费开发者版

获取23c开发者版 简单介绍可参考这里。 获取数据库可以参考这篇文章Introducing Oracle Database 23c Free – Developer Release或这里。 Docker Image 这是最快的方法。在OCI上创建一个计算实例&#xff0c;然后就可以拉取image使用了。 docker的安装和配置不赘述了。 …

《DevOps实践指南》- 读书笔记(一)

DevOps实践指南 Part 1 DevOps 介绍精益运动敏捷宣言 1. 敏捷、持续交付和三步法1.1 制造业价值流1.2 技术价值流1.2.1 聚焦于部署前置时间1.2.2 关注返工指标——%C/A 1.3 三步工作法&#xff1a;DevOps 的基础原则 2. 第一步&#xff1a;流动原则2.1 使工作可见2.2 限制制品数…

Jenkins教程—构建多分支流水线项目

本教程向你展示如何使用Jenkins协调一个用 Node Package Manager (npm) 管理的简单 Node.js 和 React 项目&#xff0c; 并同时 为开发和产品环境交付不同的结果。 在开始本教程之前&#xff0c;建议你前往 教程概览 页面&#xff0c;并至少完成一个 介绍教程&#xff0c; 从而…

分布式秒杀方案--java

前提&#xff1a;先把商品详情和秒杀商品缓存redis中&#xff0c;减少对数据库的访问&#xff08;可使用定时任务&#xff09; 秒杀商品无非就是那几步&#xff08;前面还可能会有一些判断&#xff0c;如用户是否登录&#xff0c;一人一单&#xff0c;秒杀时间验证等&#xff0…

springboot web 增加不存在的url返回200状态码 vue 打包设置

spring boot项目增加 html web页面访问 1. 首先 application.properties 文件中增加配置&#xff0c;指定静态资源目录&#xff08;包括html的存放&#xff09; spring.resources.static-locationsclasspath:/webapp/,classpath:/webapp/static/ 2. 项目目录 3. 如果有实现 …