手写RPC框架--12.异常重试

news2025/1/11 5:41:34

异常重试

  • 异常重试
    • a.异常重试
    • b.RPC的自我保护
      • 1) 介绍
      • 2) 实现令牌桶限流器
    • c.熔断器
    • d.实现服务端的限流
    • e.实现客户端的熔断
    • f.流量隔离
      • 1) 介绍
      • 2) 实现

异常重试

a.异常重试

1.为什么需要异常重试?
当发起一次 yrpc 调用,去调用远程的一个服务,比如用户的登录操作,我们会先对用户的用户名以及密码进行验证,验证成功之后会获取用户的基本信息。当我们通过远程的用户服务来获取用户基本信息的时候,恰好网络出现了问题,比如网络突然抖了一下,导致我们的请求失败了,而这个请求我们希望它能够尽可能地执行成功,那这时我们要怎么做呢?
我们需要重新发起一次 yrpc 调用,那我们在代码中该如何处理呢?是在代码逻辑里 catch 一下,失败了就再发起一次调用吗?这样做显然不够优雅吧。这时我们就可以考虑使用 yrpc 框架的重试机制。

2.yrpc 框架的重试机制
当调用端发起的请求失败时,yrpc 框架自身可以进行重试,再重新发送请求,用户可以自
行设置是否开启重试以及重试的次数。

在common模块下的annotation包中,创建TryTimes自定义注解

  • @Target(ElementType.METHOD) 在方法上使用
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TryTimes {
    int tryTimes() default 3;
    int intervalTime() default 2000;
}

在demo模块下的api模块,pom文件中引入common的模块

<dependency>
    <groupId>com.dcy</groupId>
    <artifactId>dcyrpc-common</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

在api模块下的DcyRpc接口,添加自定义的重试次数注解

  • 括号内的值:不写就按默认值设置
@TryTimes(tryTimes = 3, intervalTime = 3000)
String sayHi(String msg);

修改在core模块下的proxy.handler包下的RpcConsumerInvocationHandler类,invoke()方法:什么情况下要重试? 1.异常 2.响应有问题 code=500

  • 用 while循环 和 try异常捕获,包住整个业务逻辑
  • 通过method获取注解的值
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    int tryTimes = 0;
    int intervalTime = 0;

    // 从接口中获取 判断是否需要重试
    TryTimes annotation = method.getAnnotation(TryTimes.class);
    if (annotation != null) {
        tryTimes = annotation.tryTimes();
        intervalTime = annotation.intervalTime();
    }

    while (true) {
        try {
            // 略... 原来的业务代码
        } catch (Exception e) {
            // 次数减一,并且等待
            tryTimes--;
            try {
                Thread.sleep(intervalTime);
            } catch (InterruptedException ex) {
                log.error("在进行重试时发生异常", ex);
            }
            if (tryTimes < 0) {
                log.error("对方法【{}】进行远程调用时,重试【{}】次,依然不可用", method.getName(), tryTimes, e);
                break;
            }
            log.error("在进行第【{}】次重试时发生异常", 3-tryTimes, e);
        }
    }
    throw new RuntimeException("执行远程【{"+method.getName()+"}】方法调用失败");
}

b.RPC的自我保护

1) 介绍

当程序流量非常大,大到机器也崩溃。所以,当流量高,并发大,应该如果保护程序

服务提供方	有大量的流量突然来袭,顶不住					 限流
服务调用方	发送了大量的请求但是都不行,没必要继续发送		熔断 降级

在这里插入图片描述

限流逻辑又该如何实现呢?

比如最简单的计数器,还有可以做到平滑限流的滑动窗口、漏斗算法以及令牌桶算法等等。其中令牌桶算法最为常用

我们可以假设这样一个场景:我发布了一个服务,提供给多个应用的调用方去调用,这时有一个应用的调用方发送过来的请求流量要比其它的应用大很多,这时我们就应该对这个应用下的调用端发送过来的请求流量进行限流。所以说我们在做限流的时候要考虑应用级别的维度,甚至是 IP 级别的维度,这样做不仅可以让我们对一个应用下的调用端发送过来的请求流量做限流,还可以对一个 IP 发送过来的请求流量做限流。

使用方该如何配置应用维度以及 IP 维度的限流呢?在代码中配置是不是不大方便?yrpc 框架真正强大的地方在于它的治理功能,而治理功能大多都需要依赖一个注册中心或者配置中心,我们可以通过 yrpc 治理的管理端进行配置,再通过注册中心或者配置中心将限流阈值的配置下发到服务提供方的每个节点上,实现动态配置

甚至可以将限流逻辑放在调用端,调用端在发出请求时先触发限流逻辑,调用限流服务,如果请求量已经到达了限流阈值,请求都不需要发出去,直接返回给动态代理一个限流异常即可。

2) 实现令牌桶限流器

限流器作用于服务端,用于保护服务端的服务节点

在core模块下的com.dcyrpc下创建protection包,创建RateLimiter接口

public interface RateLimiter {
    /**
     * 是否允许新的请求进入
     * @return
     */
    boolean allowRequest();
}

在core模块下的com.dcyrpc下的protection包,创建TokenBuketRateLimiter类:基于令牌桶算法的限流器

  • 变量:令牌的数量,桶的容量,给令牌桶加令牌的速率,上一次加令牌的时间

  • 提供 释放请求 方法: synchronized 有多个线程同时请求

    • 1.给令牌桶添加令牌
    • 2.自己获取令牌
/**
 * 基于令牌桶算法的限流器
 */
public class TokenBuketRateLimiter implements RateLimiter {

    // 代表令牌的数量,>0 有令牌能放行, 放行就-1   ==0无令牌阻拦
    private int tokens;

    // 令牌数(桶的容量)
    private final int capacity;

    // 令牌桶的令牌(每秒放几个): 按照一定速率给令牌桶加令牌, 如每秒500个(不能超过总数)
    //  - 可以用定时任务加令牌
    //  - 每一个请求要发送的时候就加一个令牌
    private final int rate;

    // 上一次加令牌的时间
    private Long lastAddTokenTime;

    public TokenBuketRateLimiter(int capacity, int rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.lastAddTokenTime = System.currentTimeMillis();
        this.tokens = capacity;
    }

    /**
     * 判断请求是否可以放行
     * @return
     */
    public synchronized boolean allowRequest() {
        // 1.给令牌桶添加令牌
        long currentTime = System.currentTimeMillis();
        long timeInterval = currentTime - lastAddTokenTime; // 时间间隔
        
        // 如果时间间隔超过1秒,放令牌
        if (timeInterval >= 1000) {
            int needAddTokens = (int) (timeInterval * rate / 1000); // 需要添加的tokens
            // 加令牌
            tokens = Math.min(capacity, tokens + needAddTokens);
            // 标记最后加入令牌的时间
            this.lastAddTokenTime = System.currentTimeMillis();
        }

        // 2.自己获取令牌
        if (tokens > 0) {
            tokens--;
            System.out.println("请求被放行---------------------");
            return true;
        } else {
            System.out.println("请求被拦截---------------------");
            return false;
        }
    }
}

c.熔断器

熔断器作用于客户端

理论上:标准断路器应该有3种状态:

  • open:当开时,流量无法访问
  • close:当关闭时,流量可以正常访问
  • half-open:发送一个请求,如果这个请求得到了回应就改成close,没有回应就改成open

在core模块下的com.dcyrpc下的protection包,创建CircuitBreaker类:熔断器

  • 变量: 是否开启(Boolean), 总的请求数, 异常的请求数, 最大异常数, 最大异常比例
  • 核心方法:通过数据指标,判断是否开启熔断器
  • 其他方法:重置熔断器,记录正常请求数,记录异常请求数
/**
 * 熔断器
 */
public class CircuitBreaker {

    private volatile boolean isOpen = false;

    // 总的请求数
    private AtomicInteger requestCount = new AtomicInteger(0);

    // 异常的请求数
    private AtomicInteger errorRequestCount = new AtomicInteger(0);

    // 最大异常数
    private int maxErrorRequestCount;

    // 最大异常比例
    private float maxErrorRate;

    public CircuitBreaker(int maxErrorRequestCount, float maxErrorRate) {
        this.maxErrorRequestCount = maxErrorRequestCount;
        this.maxErrorRate = maxErrorRate;
    }

    /**
     * 通过数据指标,判断是否开启熔断器
     * @return
     */
    public boolean isBreak() {
        // 断路器已经打开
        if (isOpen) {
            return true;
        }

        // 判断数据指标,是否满足当前的阈值
        if (errorRequestCount.get() > maxErrorRequestCount) {
            this.isOpen = true;
            return true;
        }

        // 判断异常率
        if (errorRequestCount.get() > 0 && requestCount.get() > 0 && (float) errorRequestCount.get() / requestCount.get() > maxErrorRate * 10) {
            this.isOpen = true;
            return true;
        }

        return false;
    }

    /**
     * 重置熔断器
     */
    public void reset() {
        this.isOpen = false;
        this.requestCount.set(0);
        this.errorRequestCount.set(0);
    }

    /**
     * 记录正常请求数
     */
    public void recordRequest() {
        this.requestCount.getAndIncrement();
    }

    /**
     * 记录异常请求数
     */
    public void recordErrorRequest() {
        this.errorRequestCount.getAndIncrement();
    }
}

d.实现服务端的限流

实现IP应用级别总的限流器

修改core模块下的config包下的Configuration类:配置限流器

// 略...
// 为每一个IP配置的限流器
private final Map<SocketAddress, RateLimiter> EACH_IP_RATE_LIMITER = new ConcurrentHashMap<>(16);

修改core模块下的enumeration包下的ResponseCode类:修改响应码

SUCCESS((byte) 20, "成功"),
SUCCESS_HEART_BEAT((byte) 21, "心跳检测成功返回"),
RATE_LIMIT((byte) 31, "服务被限流"),
RESOURCE_NOT_FOUND((byte) 44, "请求的资源不存在"),
FAIL((byte) 50, "调用方法发生异常");
CLOSING((byte) 51, "程序正在关闭中");
// 略...

修改core模块下的channelHandler.handler包下的MethodCallHandler类的channelRead0()方法:修改业务逻辑, 把限流器放入

  • 先封装部分响应

  • 通过channel获取IP地址,针对IP地址的限流,应该每一个IP匹配一个限流器

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DcyRpcRequest dcyRpcRequest) throws Exception {
    // 1.先封装部分响应
    DcyRpcResponse dcyRpcResponse = DcyRpcResponse.builder()
            .requestId(dcyRpcRequest.getRequestId())
            .compressType(dcyRpcRequest.getCompressType())
            .serializeType(dcyRpcRequest.getSerializeType())
            .build();

    // 2.限流操作
    // 2.1.获取连接的地址
    Channel channel = channelHandlerContext.channel();
    SocketAddress socketAddress = channel.remoteAddress();
    Map<SocketAddress, RateLimiter> eachIpRateLimiter = DcyRpcBootstrap.getInstance().getConfiguration().getEachIpRateLimiter();
    // 2.2.查看缓存中是否有限流器
    RateLimiter rateLimiter = eachIpRateLimiter.get(socketAddress);
    if (rateLimiter == null) {
        rateLimiter = new TokenBuketRateLimiter(500, 300);
        eachIpRateLimiter.put(socketAddress, rateLimiter);
    }

    boolean allowRequest = rateLimiter.allowRequest();
    // 3.处理限流
    if (!allowRequest) {
        // 需要封装响应并且返回
        dcyRpcResponse.setCode(ResponseCode.RATE_LIMIT.getCode());
    } else if (dcyRpcRequest.getRequestType() == RequestType.HEART_BEAT.getId()) {
        // 处理心跳
        dcyRpcResponse.setCode(ResponseCode.SUCCESS_HEART_BEAT.getCode());
        // 封装响应并返回
    } else {
        // 正常调用
        /**--------------------------具体调用过程--------------------------------------------*/
        // 1.获取负载内容payload
        RequestPayload requestPayload = dcyRpcRequest.getRequestPayload();

        // 2.根据负载内容进行方法调用
        try {
            Object result = callTargetMethod(requestPayload);
            log.info("请求【{}】已经在Provider端完成方法调用", dcyRpcRequest.getRequestId());
            // 3.封装响应
            dcyRpcResponse.setCode(ResponseCode.SUCCESS.getCode());
            dcyRpcResponse.setBody(result);
        } catch (Exception e) {
            dcyRpcResponse.setCode(ResponseCode.FAIL.getCode());
            log.error("请求id为【{}】时发生异常", dcyRpcRequest.getRequestId(), e);
        }

        // 4.写出响应
        channelHandlerContext.channel().writeAndFlush(dcyRpcResponse);
    }
}

e.实现客户端的熔断

修改core模块下的config包下的Configuration类:配置熔断器

// 略...
// 为每一个IP配置的断路器
private final Map<SocketAddress, CircuitBreaker> eachIpCircuitBreaker = new ConcurrentHashMap<>(16);

修改core模块下的proxy.handler包下的RpcConsumerInvocationHandler类的invoke()方法:修改业务逻辑

  • 1.封装报文,创建请求
  • 2.将请求存入本地线程
  • 3.发现服务
  • 4.获取当前地址的断路器
    • 如果断路器是打开的,则不发送请求,抛出异常
  • 5.尝试获取一个可用的通道
  • 6.写出报文
  • 7.清理threadLocal
  • 8.获得响应的结果
  • 9.发生任何异常,需要记录请求失败的次数
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    int tryTimes = 0;
    int intervalTime = 0;

    // 从接口中获取 判断是否需要重试
    TryTimes annotation = method.getAnnotation(TryTimes.class);
    if (annotation != null) {
        tryTimes = annotation.tryTimes();
        intervalTime = annotation.intervalTime();
    }


    while (true) {
        // 什么情况下要重试? 1.异常 2.响应有问题 code=500

        /**
         * ---------------------------封装报文---------------------------
         */
        // 1.封装报文
        RequestPayload requestPayload = RequestPayload.builder()
                .interfaceName(interfaceRef.getName())
                .methodName(method.getName())
                .parametersType(method.getParameterTypes())
                .parametersValue(args)
                .returnType(method.getReturnType())
                .build();

        DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
                .requestId(DcyRpcBootstrap.getInstance().getConfiguration().getIdGenerator().getId())
                .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.getInstance().getConfiguration().getCompressType()).getCode())
                .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.getInstance().getConfiguration().getSerializeType()).getCode())
                .requestType(RequestType.REQUEST.getId())
                .timeStamp(new Date().getTime())
                .requestPayload(requestPayload)
                .build();

        // 2.请求存入本地线程
        DcyRpcBootstrap.REQUEST_THREAD_LOCAL.set(dcyRpcRequest);

        // 3.发现服务,从注册中心拉取服务列表,并通过客户端负载均衡寻找一个可用的服务
        //  - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)

        // 获取当前配置的负载均衡器,选取一个可用的节点
        InetSocketAddress address = DcyRpcBootstrap.getInstance().getConfiguration().getLoadBalancer().selectServiceAddress(interfaceRef.getName());
        if (log.isInfoEnabled()) {
            log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
        }

        // 4.获取当前地址的断路器
        Map<SocketAddress, CircuitBreaker> eachIpCircuitBreaker = DcyRpcBootstrap.getInstance().getConfiguration().getEachIpCircuitBreaker();
        CircuitBreaker circuitBreaker = eachIpCircuitBreaker.get(address);
        if (circuitBreaker == null) {
            circuitBreaker = new CircuitBreaker(10, 0.5F);
            eachIpCircuitBreaker.put(address, circuitBreaker);
        }
        try {
            // 判断熔断器是否开启: 如果已打开
            if (dcyRpcRequest.getRequestType() != RequestType.HEART_BEAT.getId() && circuitBreaker.isBreak()) {
                // 定期打开
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        DcyRpcBootstrap.getInstance().getConfiguration().getEachIpCircuitBreaker().get(address).reset();
                    }
                }, 5000);

                throw new RuntimeException("当前断路器已经开启,无法发送请求");
            }

            // 5.尝试获取一个可用的通道
            Channel channel = getAvailableChannel(address);
            if (log.isInfoEnabled()) {
                log.info("获取了和【{}】建立的连接通道,准备发送数据", address);
            }


            /**
             * ---------------------------同步策略---------------------------
             */
//                ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
//                // get()阻塞获取结果
//                // getNow()获取当前的结果,如果未处理完成,返回null
//                if (channelFuture.isDone()) {
//                    Object object = channelFuture.getNow();
//                } else if (!channelFuture.isSuccess()) {
//                    // 发生问题,需要捕获异常。
//                    // 子线程可以捕获异步任务的异常
//                    Throwable cause = channelFuture.cause();
//                    throw new RuntimeException(cause);
//                }

            /**
             * ---------------------------异步策略---------------------------
             */

            // 6.写出报文
            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            // 将completableFuture暴露出去
            DcyRpcBootstrap.PENDING_REQUEST.put(dcyRpcRequest.getRequestId(), completableFuture);

            // 直接使用writeAndFlush 写出一个请求,这个请求的实例就会进入pipeline执行出栈的一系列操作
            channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
                // 需要处理异常
                if (!promise.isSuccess()) {
                    completableFuture.completeExceptionally(promise.cause());
                }
            });

            // 7.清理threadLocal
            DcyRpcBootstrap.REQUEST_THREAD_LOCAL.remove();

            // 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
            // 在Netty的pipeline中最终的handler的处理结果 调用complete
            // 8.获得响应的结果
            Object result = completableFuture.get(10, TimeUnit.SECONDS);
            // 记录成功的请求
            circuitBreaker.recordRequest();
            return result;
        } catch (Exception e) {
            // 次数减一,并且等待
            tryTimes--;
            //记录错误的次数
            circuitBreaker.recordErrorRequest();
            try {
                Thread.sleep(intervalTime);
            } catch (InterruptedException ex) {
                log.error("在进行重试时发生异常", ex);
            }
            if (tryTimes < 0) {
                log.error("对方法【{}】进行远程调用时,重试【{}】次,依然不可用", method.getName(), tryTimes, e);
                break;
            }
            log.error("在进行第【{}】次重试时发生异常", 3 - tryTimes, e);
        }
    }
    throw new RuntimeException("执行远程【{" + method.getName() + "}】方法调用失败");
}

在common模块下的exception包下,创建ResponseException:响应异常类

public class ResponseException extends RuntimeException{
    private byte code;
    private String msg;
    
    public ResponseException(byte code, String msg) {
        super(msg);
        this.msg = msg;
        this.code = code;
    }
}

修改core模块下的proxy.handler包下的MySimpleChannelInboundHandler类的channelRead0()方法

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DcyRpcResponse dcyRpcResponse) throws Exception {

    CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(dcyRpcResponse.getRequestId());

    SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
    Map<SocketAddress, CircuitBreaker> eachIpCircuitBreaker = DcyRpcBootstrap.getInstance().getConfiguration().getEachIpCircuitBreaker();
    CircuitBreaker circuitBreaker = eachIpCircuitBreaker.get(socketAddress);

    byte code = dcyRpcResponse.getCode();
    if (code == ResponseCode.FAIL.getCode()) {
        circuitBreaker.recordErrorRequest();
        completableFuture.complete(null);
        log.error("当前请求id【{}】,返回错误的结果,响应码【{}】", dcyRpcResponse.getRequestId(), code);
        throw new ResponseException(code, ResponseCode.FAIL.getDesc());
    } else if (code == ResponseCode.RATE_LIMIT.getCode()) {
        circuitBreaker.recordErrorRequest();
        completableFuture.complete(null);
        log.error("当前请求id【{}】,被限流,响应码【{}】", dcyRpcResponse.getRequestId(), code);
        throw new ResponseException(code, ResponseCode.RATE_LIMIT.getDesc());
    } else if (code == ResponseCode.RESOURCE_NOT_FOUND.getCode()) {
        circuitBreaker.recordErrorRequest();
        completableFuture.complete(null);
        log.error("当前请求id【{}】,未找到目标资源,响应码【{}】", dcyRpcResponse.getRequestId(), code);
        throw new ResponseException(code, ResponseCode.RESOURCE_NOT_FOUND.getDesc());
    } else if (code == ResponseCode.SUCCESS.getCode()) {
        // 异步
        // 服务提供方,给予的结果
        Object returnValue = dcyRpcResponse.getBody();
        returnValue = returnValue == null ? new Object() : returnValue;
        // 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
        completableFuture.complete(returnValue);
        log.info("已寻找【{}】completableFuture,处理响应结果", dcyRpcResponse.getRequestId());
    } else if (code == ResponseCode.SUCCESS_HEART_BEAT.getCode()) {
        completableFuture.complete(null);
        log.info("当前请求id【{}】,处理心跳检测,响应码【{}】", dcyRpcResponse.getRequestId(), code);
    }
}

f.流量隔离

1) 介绍

后期业务发展了,调用你接口的调用方就会越来越多,流量也会渐渐多起来。可能某一天,一个“爆炸式惊喜”就来了。其中一个调用方的流量突然激增,让你整个集群瞬间处于高负载运行,进而影响到其它调用方,导致整体的业务可用性降低。最好的办法就是隔离流量,将多个dcyrpc服务进行分组,一个调用方只能访问一个分组的服务,就是一个调用方流量爆炸也只会影响一个分组的服务,整体还是可用的。

怎么实现分组?

分组的逻辑的就是让调用方可以发现一个分组的服务,那实现的逻辑就一定是服务发现的时候只能拉取同一个分组的服务,我们需要在服务发现中做一些改造。

原本服务调用方是通过接口名去注册中心找到所有的服务节点来完成服务发现的,那换到这里的话,这样做其实并不合适,因为这样调用方会拿到所有的服务节点。因此为了实现分组隔离逻辑,我们需要重新改造下服务发现的逻辑,调用方去获取服务节点的时候除了要带着接口名,还需要另外加一个分组参数,相应的服务提供方在注册的时候也要带上分组参数。

非核心应用不要跟核心应用分在同一个组,核心应用之间应该做好隔离,一个重要的原则就是保障核心应用不受影响。比如提供给电商下单过程中用的商品信息接口,我们肯定是需要独立出一个单独分组,避免受其它调用方污染的。

在这里插入图片描述

在这里插入图片描述

2) 实现

修改common模块下的DcyRpcApi枚举

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DcyRpcApi {
    // 分组名称
    String group() default "default";
}

修改provider模块下的impl.DcyRpcImpl启动类:在注解上添加分组

@DcyRpcApi(group = "HK")
public class DcyRpcImpl implements DcyRpc {
    // 略...
}

修改core模块下的DcyRpcBootstrap启动类的scan()方法:

  • 通过注解获取分组信息
  • 将分组信息存入ServiceConfig中
public DcyRpcBootstrap scan(String packageName) {
    // 略...
    for (Class<?> clazz : classes) {
        // 略...
        
        // 获取分组信息
        DcyRpcApi dcyRpcApi = clazz.getAnnotation(DcyRpcApi.class);
        String group = dcyRpcApi.group();
        
        for (Class<?> anInterface : interfaces) {
            // 略...
            serviceConfig.setGroup(group);
            // 略...
        }
    }
    return this;
}

修改core模块下的ServiceConfig类的:

  • 添加group属性,group的setter/getter方法
private String group = "default";
public void setGroup(String group) {
    this.group = group;
}

public String getGroup() {
    return group;
}

修改core模块下的discovery.impl包下的ZookeeperRegistry类:register方法

  • 建立分组节点
@Override
public void register(ServiceConfig<?> service) {
    // 服务名称的节点
    String parentNode = Constant.BASE_PROVIDERS_PATH + "/" + service.getInterface().getName();

    // 判断节点是否存在,不存在则创建节点(持久)
    if (!ZookeeperUtils.existNode(zooKeeper, parentNode, null)) {
        ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode, null);
        ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);
    }

    // 建立分组节点
    parentNode = parentNode + "/" + service.getGroup();
    if (!ZookeeperUtils.existNode(zooKeeper, parentNode, null)) {
        ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode, null);
        ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);
    }

    // 略...
}

修改consumer模块下的启动类:

// 略...
DcyRpcBootstrap.getInstance()
        .application("first-dcyrpc-consumer")
        .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
        .serialize("jdk")
        .compress("gzip")
        .group("HK")
        .reference(reference);
// 略...

修改core模块下的config包的Configuration类:

// 分组信息
private String group = "default";

修改core模块下的DcyRpcBootstrap启动类:

  • 在reference方法中设置分组信息

  • 添加group方法

public DcyRpcBootstrap reference(ReferenceConfig<?> reference) {
    // 启动心跳检测
    log.info("开始心跳检测");
    HeartbeatDetector.detectHeartbeat(reference.getInterface().getName());

    // 配置reference,将来调用get方法时,方便生成代理对象
    // 1.reference需要一个注册中心
    reference.setRegistry(configuration.getRegistryConfig().getRegistry());
    reference.setGroup(this.getConfiguration().getGroup());
    return this;
}

public DcyRpcBootstrap group(String group) {
    this.configuration.setGroup(group);
    return this;
}

修改core模块下的ReferenceConfig类的:

  • 添加group属性,group的setter/getter方法
  • get()方法,把group传值给RpcConsumerInvocationHandler
public T get() {
    // 使用动态代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class<T>[] classes = new Class[]{interfaceRef};
    InvocationHandler handler = new RpcConsumerInvocationHandler(interfaceRef, registry, group);

    // 使用动态代理生成代理对象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);

    return (T) helloProxy;
}

private String group;
public void setGroup(String group) {
    this.group = group;
}

public String getGroup() {
    return group;
}

修改core模块下的proxy.handler包下的RpcConsumerInvocationHandler

  • 设置group变量
  • invoke()方法中,发现服务传入一个group
private String group;

public RpcConsumerInvocationHandler(Class<?> interfaceRef, Registry registry, String group) {
    this.interfaceRef = interfaceRef;
    this.registry = registry;
    this.group = group;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 略....
    // 3.发现服务,从注册中心拉取服务列表,并通过客户端负载均衡寻找一个可用的服务
    //  - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
    // 获取当前配置的负载均衡器,选取一个可用的节点
    InetSocketAddress address = DcyRpcBootstrap.getInstance()
            .getConfiguration().getLoadBalancer().selectServiceAddress(interfaceRef.getName(), group);
    // 略...
}

修改core模块下的loadbalancer包下的LoadBalancer接口

InetSocketAddress selectServiceAddress(String serviceName, String group);

修改core模块下的loadbalancer包下的AbstractLoadBalancer:在selectServiceAddress方法里的lookup传入group

@Override
public InetSocketAddress selectServiceAddress(String serviceName, String group) {
		// 略...
    if (selector == null) {
        // 这个负载均衡器,内部要维护服务列表,作为缓存
        List<InetSocketAddress> serviceList = DcyRpcBootstrap.getInstance().getConfiguration().getRegistryConfig().getRegistry().lookup(serviceName, group);
        // 略...
    }
    // 略...
}

修改core模块下的discovery.impl包下的ZookeeperRegistry类:lookup方法

  • 拼接分组信息
String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" + group;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1003953.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】包装类简单认识泛型

文章目录 1 包装类1.1 基本数据类型和对应的包装类1.2 装箱和拆箱 2 什么是泛型3 引出泛型3.1 语法 4 泛型类的使用4.1 语法4.2 示例4.3 类型推导(Type Inference) 5 泛型的上界5.1 语法5.2 示例5.3 复杂示例 6 泛型方法6.1 定义语法6.2 示例6.3 使用示例-可以类型推导6.4 使用…

Spring之文件上传下载,jrebel,多文件上传

文件上传,文件下载jrebel&多文件上传 1.文件上传,文件下载 文件上传 1.spring-xml配置多功能视图解析器 2.前端标记表单为多功能表单enctype”mutipart/form-data“ 3.后端可以直接利用mutipartFile类&#xff0c;接受前端传递到后台的文件 4.将文件转成流&#xff0c;然后…

DedeCMS_v5.7其他漏洞复现

一、URL重定向 http://127.0.0.1/DedeCMS-V5.7-UTF8-SP2/uploads/plus/download.php?open1&linkaHR0cDovL3d3dy5iYWlkdS5jb20 其中aHR0cDovL3d3dy5iYWlkdS5jb20是http://www.baidu.com的base64编码 访问后发现直接转到百度 二、后台shops_delivery_存储型XSS 管理员在…

30m退耕还湿空间数据集(2000-2010年,青藏高原地区)

摘要 a. 数据内容&#xff08;数据文件/表名称&#xff0c;包含的观测指标内容&#xff09; 30m退耕还湿空间数据集(2000-2010年&#xff0c;青藏高原地区)&#xff0c;反映了2000年至2010年期间青藏高原地区耕地退耕还湿的空间分布情况。 b. 建设目的 监测青藏高原地区的退耕还…

10 大演讲主题、14 位技术大咖!龙蜥大讲堂 9 月直播预告硬核来袭

「龙蜥大讲堂」9 月精彩预告来了&#xff0c;点击下方海报抢先了解。本月又是满满的技术干货分享&#xff0c;多位大咖带你共享技术盛宴&#xff01;提前进群&#xff0c;参与互动还有龙蜥精美周边等你来拿。 9 月精彩分享直达 &#x1f447; 加入龙蜥社区钉钉交流群&#xff0…

Springboot+swagger2

1.swagger配置 /*** Swagger 配置文件*/ Configuration public class SwaggerConfig {Beanpublic Docket createRestApi() {return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select().apis(RequestHandlerSelectors.basePackage("com.swagger.two&qu…

神经网络 02(激活函数)

一、激活函数 在神经元中引入了激活函数&#xff0c;它的本质是向神经网络中引入非线性因素的&#xff0c;通过激活函数&#xff0c;神经网络就可以拟合各种曲线。 如果不用激活函数&#xff0c;每一层输出都是上层输入的线性函数&#xff0c;无论神经网络有多少层&#xff0c…

华为云云服务器云耀L实例评测 | 智能不卡顿:如何实现流畅的业务运行

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

数据分享|R语言武汉流动人口趋势预测:灰色模型GM(1,1)、ARIMA时间序列、logistic逻辑回归模型...

全文链接&#xff1a;http://tecdat.cn/?p32496 人口流动与迁移&#xff0c;作为人类产生以来就存在的一种社会现象&#xff0c;伴随着人类文明的不断进步从未间断&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 相关视频 人力资源是社会文明进步、人民富裕…

视频直播点播平台EasyDSS如何单独保存录像计划文件?具体如何操作呢?

视频推拉流EasyDSS视频直播点播平台&#xff0c;集视频直播、点播、转码、管理、录像、检索、时移回看等功能于一体&#xff0c;可提供音视频采集、视频推拉流、播放H.265编码视频、存储、分发等视频能力服务。 有用户反馈&#xff1a;在视频直播点播平台EasyDSS中设置了片段形…

this执行问题

1.代码 var a 10;let obj {a: 20,n: function () {console.log(this.a);},};let fn obj.n;fn(); //此时的this指向windowobj.n(); //this指向obj这个对象 2.打印的结果 3.代码分析 let fn obj.n;将函数体复制给fn fn()是普通函数this指向window obj.fn里面的函数,可以理…

Autojs 小游戏实践-潮玩宇宙开扭蛋

概述 最近在玩潮流宇宙&#xff0c;里面有扭蛋兔的一个玩法&#xff0c;开始有很多蛋&#xff0c;需要我们一个个点开&#xff0c;然后根据装备品质替换分解&#xff0c;潮流提供了自动开扭蛋功能&#xff0c;但是开到品质比自己装备好的时候回暂停&#xff0c;由于个人懒得看…

【小黑送书—第一期】>>《Kali Linux高级渗透测试》

对于企业网络安全建设工作的质量保障&#xff0c;业界普遍遵循PDCA&#xff08;计划&#xff08;Plan&#xff09;、实施&#xff08;Do&#xff09;、检查&#xff08;Check&#xff09;、处理&#xff08;Act&#xff09;&#xff09;的方法论。近年来&#xff0c;网络安全攻…

基于elasticsearch-8.8.2 kibana-8.8.2 搭建一个文搜图系统demo

数据来源是由 图片url,图片descript,图片keywords 外加一个id 基于此首先创建 索引, keywords是一组由单词或词组 组成的一组数据,所以以数组形式压入数据: descript 是由两条语句组合成的数据(针对图片的两种不同描述) # 这里创建的keywords 数组元素类型为text,即可以模糊匹…

Python爬虫-IP隐藏技术与代理爬取

前言 在进行爬虫程序开发和运行时&#xff0c;常常会遇到目标网站的反爬虫机制&#xff0c;最常见的就是IP封禁&#xff0c;这时需要使用IP隐藏技术和代理爬取。 一、IP隐藏技术 IP隐藏技术&#xff0c;即伪装IP地址&#xff0c;使得爬虫请求的IP地址不被目标网站识别为爬虫。…

网络层IP协议

目录 前言 1.如何理解IP协议 2.IP协议格式 3.网段划分 4.特殊的IP地址 5.IP地址的数量限制 6.私有IP地址和公网IP地址 7.路由 总结 前言 在前面的文章中介绍了关于传输层常用的两个协议&#xff0c;UDP协议和TCP协议&#xff0c;当数据经过传输层之后&#xff0c;进入网…

关于ESP32S3无法识别到端口问题

前言 &#xff08;1&#xff09;因为实习问题&#xff0c;需要使用ESP32BOX进行二次开发。一般来说&#xff0c;接触一款MCU&#xff0c;3天上手是基本操作。但是对于乐鑫的芯片&#xff0c;环境搭建是真的折磨人&#xff08;苦笑&#xff09;&#xff0c;而且官方文档几乎没有…

软件测试———linux

文章目录 基础1. 发展史2 特征3 内核版本号的特征4.发布版5,安装 第二章Linux的常见命令Linux命令vi的使用文件的操作文件的压缩和解压缩文件阅读命令权限的操作用户设置配置系统查看名命令 基础 1. 发展史 unix—>BSD(TCP的使用)---->GNU---->Minix—>linux 2 …

使用Process Explorer查看线程的函数调用堆栈去排查程序高CPU占用问题

目录 1、问题描述 2、使用Process Explorer排查软件高CPU占用的一般思路 3、使用Process Explorer工具进行分析 3.1、找到CPU占用高的线程 3.2、查看CPU占用高的线程的函数调用堆栈&#xff0c;找到出问题的代码 3.3、libwebsockets库导出接口lws_service的说明 3.4、解…

200个常用的Python编程相关英语词汇以及它们的中文释义

大家好&#xff0c;我是涛哥。 好多小伙伴反馈说在学习python的过程中&#xff0c;遇到的英文比较多&#xff0c;为自己的学习和开发产生了很大的阻力&#xff0c;所以为大家梳理了一份 Python编程相关常用的英语词汇以及它们的中文释义&#xff0c;当你刚开始学习Python编程的…