手写RPC框架第7版-框架容错性相关设计

news2025/1/12 16:15:25

源代码地址:https://github.com/lhj502819/IRpc/tree/v8

系列文章:

  • 注册中心模块实现
  • 路由模块实现
  • 序列化模块实现
  • 过滤器模块实现
  • 自定义SPI机制增加框架的扩展性的设计与实现
  • 基于线程和队列提升框架并发处理能力
  • 框架容错性相关设计
  • 通过SpringBoot-Starter接入SpringBoot

思考

RPC框架除了需要关注吞吐能力之外,对于失败场景的应对也是一个非常关键的点,本次我们主要从以下几个场景进行分析解决:

  • 服务端异常返回给到调用方展示
  • 客户端调用支持超时重试机制
  • 服务提供方支持接口限流

服务端异常信息

问题

目前我们的RPC框架中,当服务端发生异常时,异常信息并没有返回给客户端,这种情况带来的问题如下:

  • 在分布式场景下无疑会增加我们的排查错误的成本,每次Client端调用异常时都需要找到对应Service Provider查询对应的异常信息;
  • 如果Server端打印的异常不够丰富的话,无法判断是由那个Client发起的调用,更加增加了问题定位的困难;
  • 服务端日志堆积严重。

框架优化

我们的设计思路是将服务端的异常信息返回给Client,并且将堆栈记录打印出来。
我们之前会将请求的响应数据统一放到RpcInvocation中,这里我们将异常信息也放入该实体中。

public class RpcInvocation implements Serializable {

    private static final long serialVersionUID = 4925694661803675105L;
    
    .........省略部分代码...............

    /**
     * 用于匹配请求和响应的一个关键值,当请求从客户端发出的时候,会有一个uuid用于记录发出的请求
     *  待数据返回的时候通过uuid来匹配对应的请求线程,并且返回给调用线程
     */
    private String uuid;

    /**
     * 接口响应的数据塞入这个字段中(如果是异步调用或者是void类型,这里就为空)
     */
    private Object response;

    /**
     * 异常堆栈
     */
    private Throwable e;

    .........省略部分代码...............
}

同时在执行目标方法发生异常时,将异常进行放入:
在这里插入图片描述

在Client接收到响应之后判断异常信息字段是否为空,如果为空则打印异常日志:
在这里插入图片描述

问题:
由于异常堆栈的信息可能会非常多,TCP传输的数据体积过大,会导致一份数据包被拆解成多份进行传输。
因此在实际调用时Client端在进行数据读取的时候可能会报错:

java.lang.IndexOutOfBoundsException: readerIndex(11) + length(11) exceeds writerIndex(11): PooledSlicedByteBuf(ridx: 11, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 64, widx: 64, cap: 1024))
        at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1403)
        at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1390)
        at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:872)

我们通过采用netyy自带的协议封装规则来解决拆包粘包的问题,但是一旦遇到大体积的数据量还是会出现此类问题。
因此我们可以通过指定分隔符,并且通过参数定义每次传输的最大数据包体积,这样可以告知服务端每次读取的数据包的上限为配置的字节数长度,并且如果在这个分隔符内没有读取到完整的协议内容,则属于是异常数据包。调整之后的代码如下:

public class RpcEncoder extends MessageToByteEncoder<RpcProtocol> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol msg, ByteBuf out) throws Exception {
        out.writeShort(msg.getMagicNumber());
        out.writeInt(msg.getContentLength());
        out.writeBytes(msg.getContent());
        out.writeBytes(RpcConstants.DEFAULT_DECODE_CHAR.getBytes());
    }
    
}
public class RpcDecoder extends ByteToMessageDecoder {

    /**
     * 协议的开头部分的标注长度
     */
    public final int BASE_LENGTH = 2 + 4;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= BASE_LENGTH) {
            //这里对应了RpcProtocol的魔数
            if (!(in.readShort() == MAGIC_NUMBER)) {
                ctx.close();
                return;
            }
            int length = in.readInt();
            //说明剩余的数据包不是完整的,这里需要重置下readerIndex
            if (in.readableBytes() < length) {
                ctx.close();
                return;
            }

            //这里其实就是实际的RpcProtocol对象的content字段
            byte[] data = new byte[length];
            in.readBytes(data);
            RpcProtocol rpcProtocol = new RpcProtocol(data);
            out.add(rpcProtocol);
        }
    }
}

在初始化Netty时分别假如对应的编码器:
Client:
在这里插入图片描述

Server:
在这里插入图片描述

客户端调用支持超时重试机制

什么情况下适合进行超时重试?

  • 当Service的两个Provider所在A、B两个机器的服务器性能不佳时,处理请求比较缓慢,B服务器的性能比A好,当调用A服务器发生超时的时候,可以尝试重新调用,将请求转到B机器上;
  • 由于网络问题导致的请求超时,可以进行重试。

什么情况不适合超时重试?

面对一些幂等性的接口调用,重试机制应该谨慎使用,比如:转账、下单以及一些金融相关的接口,当调用发生超时的时候,不好确认请求是否到达Server,如果重试的话可能会造成数据的错误,这种情况下的重试机制还是要谨慎的。

超时重试实现

Client调用时增加超时重试次数
在这里插入图片描述

在动态代理逻辑中进行超时重试,具体代码如下:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        RpcInvocation rpcInvocation = new RpcInvocation();
        rpcInvocation.setArgs(args);
        rpcInvocation.setTargetMethod(method.getName());
        rpcInvocation.setTargetServiceName(rpcReferenceWrapper.getAimClass().getName());
        //这里面注入了一个uuid,对每一次的请求都单独区分
        rpcInvocation.setUuid(UUID.randomUUID().toString());
        rpcInvocation.setAttachments(rpcReferenceWrapper.getAttatchments());
        rpcInvocation.setRetry(rpcReferenceWrapper.getRetry());
        SEND_QUEUE.add(rpcInvocation);

        if (rpcReferenceWrapper.isAsync()) {
            return null;
        }
        RESP_MAP.put(rpcInvocation.getUuid(), OBJECT);

        long beginTime = System.currentTimeMillis();
        long nowTimeMillis = System.currentTimeMillis();

        //总重试次数
        int retryTimes = 0;
        //客户端请求超时的判断依据
        while (nowTimeMillis - beginTime < timeout || rpcInvocation.getRetry() > 0) {
            Object object = RESP_MAP.get(rpcInvocation.getUuid());
            if (object instanceof RpcInvocation) {
                RpcInvocation rpcInvocationResp = (RpcInvocation) object;
                if (rpcInvocation.getRetry() == 0 && rpcInvocationResp.getE() == null) {
                    return rpcInvocationResp.getResponse();
                } else if (rpcInvocation.getE() != null) {
                    //重试
                    if (rpcInvocation.getRetry() == 0) {
                        return rpcInvocationResp.getResponse();
                    }

                    //只有因为超时才会进行重试,否则重试不生效
                    if (nowTimeMillis - beginTime < timeout) {
                        retryTimes++;
                        //重新请求
                        rpcInvocation.clearRespAndError();
                        //每次重试的时候都将需重试次数减1
                        rpcInvocation.setRetry(rpcInvocationResp.getRetry() - 1);
                        RESP_MAP.put(rpcInvocation.getUuid(), OBJECT);
                        SEND_QUEUE.add(rpcInvocation);
                    }
                }
            } else {
                nowTimeMillis = System.currentTimeMillis();
            }
        }
        RESP_MAP.remove(rpcInvocation.getUuid());
        throw new TimeoutException("Wait for response from server on client " + rpcReferenceWrapper.getTimeOUt() + "ms, retry times is " + retryTimes + "Server's name is " + rpcInvocation.getTargetServiceName() + "#" + rpcInvocation.getTargetMethod());
    }

重试的方式

  • 间隔重试:适用于对于实时性没有要求的场景
  • 立即重试:调用失败后立即进行重试,并且会路由到其他的机器上,在RPC的分布式场景下用的比较多,对服务的容错性有一定提升

我们也可以使用目前市面上比较流行的框架提供的重试机制,比如Goog Guava retry和Spring retry。

服务端接口限流

在微服务的场景下,通常Server端的请求会比Client端的大很多,所以为了防止由于请求激增把Server干掉,因此需要设置合理的流量阈值,适当的Server进行保护

保护点

  • 整个Server端连接数的限制
  • 单个Service的请求限流
  • 方法级别限流

整个Server端连接数限制

由于我们是基于Netty进行NIO通信,所有向Sever端发起的调用都需要建立一个连接,当Server端连接数达到某个上限的时候,则直接拒绝连接。
在Netty中,由于我们的Server端都会将accept单独由mainReactor负责,而workerReactor则负责IO请求,那我们的连接数记录和限制则可以在mainReactor中实现,我们可以通过自定义一个ChannelHandler,限制最大连接数,当连接数超过阈值后则立即关闭channel,通知Client。

@ChannelHandler.Sharable
public class MaxConnectionLimitHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(MaxConnectionLimitHandler.class);

    private final int maxConnectionNum;

    private final AtomicInteger numConnection = new AtomicInteger(0);

    private final Set<Channel> childChannel = Collections.newSetFromMap(new ConcurrentHashMap<>());

    private final LongAdder numDroppedConnections = new LongAdder();

    private final AtomicBoolean loggingScheduled = new AtomicBoolean(false);


    public MaxConnectionLimitHandler(int maxConnectionNum) {
        this.maxConnectionNum = maxConnectionNum;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = (Channel) msg;
        int conn = numConnection.incrementAndGet();
        if (conn > 0 && conn <= maxConnectionNum) {
            this.childChannel.add(channel);
            channel.closeFuture().addListener(future -> {
                childChannel.remove(channel);
                numConnection.decrementAndGet();
            });
            super.channelRead(ctx, msg);
        } else {
            numConnection.decrementAndGet();
            //立即关闭tcp连接
            channel.config().setOption(ChannelOption.SO_LINGER, 0);
            //立即关闭channel
            channel.unsafe().closeForcibly();
            numDroppedConnections.increment();
            if (loggingScheduled.compareAndSet(false,true)){
                //延时打印日志
                ctx.executor().schedule(this::writeNumDroppedConnectionLog,1, TimeUnit.SECONDS);
            }
        }
    }

    /**
     * 记录连接失败的日志
     */
    private void writeNumDroppedConnectionLog() {
        loggingScheduled.set(false);
        final long dropped = numDroppedConnections.sumThenReset();
        if(dropped>0){
            LOGGER.error("Dropped {} connection(s) to protect server,maxConnection is {}",dropped,maxConnectionNum);
        }
    }


}

单个Service的请求限流

实现方法:当接收到请求后,每次接收到请求后判断该Service的请求数是否达到阈值,达到阈值则直接返回错误,如果为达到阈值则将该Service计数+1,当目标方法执行完之后,将计数-1。
在Jdk中提供了Semaphore信号量并发工具类,具体的用法大家可以自行查询下,很简单,只需要在初始化时指定大小,每次调用#acquire、#tryAcquire等方法则会尝试将计数-1,如果计数为0则会返回,在执行完成后可通过执行#release来将计数归还,但每个API的具体行为有些不同,#acquire会阻塞等待至有信号量被释放,而#tryAcquire则会立即返回,这里我们使用后者,因为如果大量请求打入,会导致大量的线程阻塞,影响整个程序的正常运行。
实现:
之前我们在框架中增加了过滤器,这里我们也通过过滤器来实现,但我们需要对过滤器进行细化,分为前/后置过滤器,前置过滤器在方法执行前执行,起到将计数-1的目的,而后置过滤器则在方法执行后执行,负责将计数归还。

@SPI("before")
public class ServerServiceBeforeLimitFilterImpl implements IServerFilter{

    private static final Logger LOGGER = LoggerFactory.getLogger(ServerServiceBeforeLimitFilterImpl.class);

    @Override
    public void doFilter(RpcInvocation rpcInvocation) {
        String serviceName = rpcInvocation.getTargetServiceName();
        ServerServiceSemaphoreWrapper serverServiceSemaphoreWrapper = CommonServerCache.SERVER_SERVICE_SEMAPHORE_MAP.get(serviceName);
        Semaphore semaphore = serverServiceSemaphoreWrapper.getSemaphore();
        boolean tryResult = semaphore.tryAcquire();
        if (!tryResult){
            String message = String.format("[ServerServiceBeforeLimitFilterImpl#doFilter] %s's max request is %s,reject now", serviceName, serverServiceSemaphoreWrapper.getMaxNums());
            LOGGER.error(message);
            MaxServiceLimitRequestException requestException = new MaxServiceLimitRequestException(message,rpcInvocation);
            rpcInvocation.setE(requestException);
            throw requestException;
        }
    }

}
@SPI("after")
public class ServerServiceAfterLimitFilterImpl implements IServerFilter {
    @Override
    public void doFilter(RpcInvocation rpcInvocation) {
        String serviceName = rpcInvocation.getTargetServiceName();
        CommonServerCache.SERVER_SERVICE_SEMAPHORE_MAP.get(serviceName)
                .getSemaphore()
                .release();
    }
}

在进行初始化时也需要将前/后置过滤器分别加载保存:
在这里插入图片描述

方法级别限流

方法级别限流其实也类似,我们就不实现了,小伙伴们感兴趣的话可以自行实现。

总结

本次我们主要针对RPC框架的容错性进行了优化,主要包含以下内容:

  • 服务端异常日志的返回
  • 客户端调用超时重试
  • 服务端连接数限制
  • 服务端Service请求数限流

其实在容错性方面还有很多待优化的空间,比如方法级别的限流、超时或者异常之后指定参数回调、服务降级、注册中心异常后的自动重连、Server支持动态调整限流参数等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/153740.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构:关于空间复杂度的例题计算

1、计算冒泡排序的空间复杂度 答案&#xff1a;该程序空间复杂度为O(1)。 解析&#xff1a;该程序在栈空间所申请的临时变量空间只有三个&#xff0c;也就是看成常数个&#xff0c;所以是O(1)。如下图所示 2、动态开辟N个数的数组空间复杂度 答案&#xff1a;该程序空间复杂度…

【UE4 第一人称射击游戏】31-更好的UI界面

素材资料地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1epyD62jpOZg-o4NjWEjiyg密码&#xff1a;jlhr上一篇&#xff1a;【UE4 第一人称射击游戏】30-简单的任务提示功能本篇效果&#xff1a;步骤&#xff1a;在UE中新建一个“HUD_Export”文件夹&#xff0c;将所…

《码出高效:java开发手册》七 - 并发与多线程

前言 现代CPU运算速度以百亿计&#xff0c;家用计算机和操作系统也是数十进程&#xff0c;数百线程&#xff0c;程序相应也需要采用多线程和并发的技术 并发和并行&#xff1a;并发是指某个时间段&#xff0c;多任务处理&#xff1b;并行是指同时处理多任务的能力&#xff1b;…

接口测试项目实战与经典面试题解析,挑战 BAT 大厂必会!

近年来&#xff0c;接口测试技术体系已在各大互联网企业落地普及&#xff0c;各种新接口框架不断涌现&#xff0c;业界也形成了不少成熟方案和成功案例。当前 BAT 大厂在招聘测试人员时&#xff0c;接口测试技能和项目经验是必考重点&#xff0c;直接影响到职级评定和薪资水平&…

2023/1/10 Vue学习笔记6 - 路由基本使用

1 路由的简介-router 1、路由就是一组key-value的对应关系。 2、多个路由&#xff0c;需要经过路由器的管理。 SPA (single page web application&#xff09;应用 - 单页面web应用 {"key":"/class","value":"班级组件" }1.vue-rout…

kali中wpscan工具使用

一.wpscan工具简介 wpscan是一款专门针对wordpress的扫描工具&#xff0c;采用ruby语言编写&#xff0c;能够扫描worpress网站中包括主题漏洞、插件漏洞以及wordpress网站本身存在的漏洞。wpscan还可以扫描wordpress网站启用的插件和其他功能。 在Kali Linux系统中&#xff0…

关于distinct——去除重复记录

distinct译为&#xff1a;不同的&#xff0c;有区别的&#xff1b;在SQL语句中表示去除重复记录的意思 举例&#xff1a;在员工表emp中查询所有的工作岗位。 分析&#xff1a;在员工表中的工作岗位字段下有重复的工作岗位&#xff0c;我们在查询的时候就希望将重复的工作岗位显…

数据库取证——MySQL基础知识

目录 一、数据库基础知识 &#xff08;一&#xff09;数据库&#xff08;DB&#xff09; &#xff08;二&#xff09;数据库管理系统&#xff08;DBMS&#xff09; &#xff08;三&#xff09;数据库系统&#xff08;DBS&#xff09; &#xff08;四&#xff09; 数据库的…

【Unity云消散】简单理论基础:实现边缘光

写在前面 既然想要实现云的消散效果&#xff0c;那么边缘光如何计算也是一个重点。 在Unity Shader入门精要的14章&#xff0c;介绍轮廓线渲染就介绍了——轮廓边检测&#xff0c;而边缘光也是需要先检测出轮廓边再进行的。 Unity3D Shader系列之边缘光RimLight 这篇博客给…

Go专家编程读书小记

文章目录协程进程和线程进程&#xff1a;进程间通信&#xff1a;线程&#xff1a;区别&#xff1a;协程GMP模型调度策略内存管理内存分配span&#xff1a;cache&#xff1a;central&#xff1a;heap&#xff1a;垃圾回收常见的垃圾回收算法&#xff1a;三色标记&#xff1a;垃圾…

【NI Multisim 14.0原理图设计基础——查找元器件】

目录 序言 一、查找元器件 &#x1f34d;1.浏览元器件 &#x1f34d;2.搜索元器件 &#x1f34d; 3.显示找到的元器件及其所属元器件库 &#x1f34d; 4.加载找到元器件的所属元器件库 序言 NI Multisim最突出的特点之一就是用户界面友好。它可以使电路设计者方便、快捷地…

大资金现金管理的利器:稳定币网格做市策略

数量技术宅团队在CSDN学院推出了量化投资系列课程 欢迎有兴趣系统学习量化投资的同学&#xff0c;点击下方链接报名&#xff1a; 量化投资速成营&#xff08;入门课程&#xff09; Python股票量化投资 Python期货量化投资 Python数字货币量化投资 C语言CTP期货交易系统开…

从一条记录说起—— InnoDB 记录结构

准备工作 到现在为止&#xff0c;MySQL对于我们来说还是一个黑盒&#xff0c;我们只负责使用客户端发送请求并等待服务器返回结果&#xff0c;表中的数据到底存到了哪里&#xff1f;以什么格式存放的&#xff1f;MySQL是以什么方式来访问的这些数据&#xff1f;这些问题我们统…

springboot构造树形结构数据并查询的方法

因为项目需要,页面上需要树形结构的数据进行展示(类似下图这样),因此需要后端返回相应格式的数据。 不说废话,直接开干!!! 我这里用的是springboot+mybatis-plus+mysql,示例的接口是查询一级权限以及二级权限、三级权限整个权限树… 下面是导入的maven依赖 <depe…

SSD核心设计

摘要本文介绍了此类设计选择的分类&#xff0c;并使用跟踪驱动的模拟器和从实际系统中提取的工作负载跟踪分析各种配置的可能性能。我们发现SSD性能和生命周期对工作负载非常敏感&#xff0c;并且通常较高的复杂系统问题出现在存储堆栈中甚至在分布式系统中&#xff0c;与设备固…

9. 回文数

文章目录题目描述方法一 转换为字符串方法二 转存入数组方法三 数学方法倒转数字方法四 对折参考文献题目描述 给你一个整数 x &#xff0c;如果 x 是一个回文整数&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 回文数是指正序&#xff08;从左向右&…

树脂吸附处理冶炼含钴丨废水四氯化三钴、草酸钴废水回收钴

生产过程中会产生含钴废水&#xff0c;现有技术中&#xff0c;处理含钴废水的方法主要有化学沉淀法、吸附法、膜分离法等。 对于化学沉淀法&#xff0c;原理是通过向废水中添加一定的沉淀剂&#xff0c;使其与废水中溶解性的钴离子反应生成沉淀&#xff0c;通过重力沉降去除钴…

ArcGIS 前端动态地图与要素服务符号化的区别小结

在ArcGIS中&#xff0c; 除了在桌面端做配图之外&#xff0c;一些符号化工作也可以在代码端去进行设置&#xff0c;这里简单的做了一些测试及小结。 一、服务的区别 在ArcGIS中最基础的两种数据服务就是动态地图服务&#xff08;MapServer&#xff09;和要素服务&#xff08;…

Spring Boot 教程

Spring Boot 教程Spring Boot 基础开发Spring Boot 简介Spring Boot 第一个项目Spring Boot 项目启动机制Spring Boot 数据访问Spring Boot 集成 MyBatisSpring Boot 运行管理Spring Boot 日志管理Spring Boot 异常处理Spring Boot 定时任务Spring Boot 使用拦截器Spring Boot …

C#汽车美容管理服务系统源码 功能强大代码完整,开源分享!

一套完整的汽车美容管理服务系统源码&#xff0c;专门服务于汽车美容4s店&#xff0c;终端功能强大而又简便实用&#xff0c;界面友好而美观&#xff0c;让用户更好的体验度,基于jquery技术实现页面无刷新,可广泛适用于大型以及小型汽车美容机修等公司&#xff0c;包含 洗车、机…