手写RPC框架06-基于线程和队列提升框架并发处理能力

news2025/1/16 21:07:11

源代码地址:https://github.com/lhj502819/IRpc/tree/v7

系列文章:

  • 注册中心模块实现
  • 路由模块实现
  • 序列化模块实现
  • 过滤器模块实现
  • 自定义SPI机制增加框架的扩展性的设计与实现
  • 基于线程和队列提升框架并发处理能力

Server端

现有的问题

目前我们的RPC框架Server端在接收到请求之后会直接在Netty的IO线程中执行业务逻辑,如果业务逻辑比较简单还好,但是如果业务逻辑比较复杂,需要处理的时间又比较长,那就会对Netty的IO线程占用时间较长,导致阻塞住其他的请求,因此我们可能就需要由业务线程单独去处理业务逻辑。

解决方案

面对高并发场景下,我们可以通过将请求放到阻塞队列中,通过业务线程去消费处理,达到提升我们框架的并发处理能力的需求。
由于RPC框架天生支持水平扩容,因此在单台机器处理能力不足的情况下,我们可以进行扩容来提升我们整个Server的能力。

落地

请求分发处理器
其中包含一个阻塞队列,在ServerHandler接收到请求后便将请求封装后放到该队列中,后续会有专门的线程池去处理这些请求。

public class ServerChannelDispatcher {

    private BlockingQueue<ServerChannelReadData> RPC_DATA_QUEUE;

    private ExecutorService executorService;

    public void init(int queueSize, int bizThreadNums) {
        RPC_DATA_QUEUE = new ArrayBlockingQueue<>(queueSize);
        executorService = new ThreadPoolExecutor(bizThreadNums, bizThreadNums,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(512));
    }

    public void add(ServerChannelReadData serverChannelReadData) {
        RPC_DATA_QUEUE.add(serverChannelReadData);
    }

    public ServerChannelDispatcher() {
    }

    class ServerJobCoreHandle implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    ServerChannelReadData serverChannelReadData = RPC_DATA_QUEUE.take();
                    executorService.submit(() -> {
                        try {
                            RpcProtocol rpcProtocol = serverChannelReadData.getRpcProtocol();
                            RpcInvocation rpcInvocation = SERVER_SERIALIZE_FACTORY.deserialize(rpcProtocol.getContent(), RpcInvocation.class);

                            //doFilter
                            SERVER_FILTER_CHAIN.doFilter(rpcInvocation);

                            //这里的PROVIDER_CLASS_MAP就是一开始预先在启动的时候存储的Bean集合
                            Object aimObject = PROVIDER_CLASS_MAP.get(rpcInvocation.getTargetServiceName());
                            Method[] methods = aimObject.getClass().getDeclaredMethods();
                            Object result = null;
                            for (Method method : methods) {
                                if (method.getName().equals(rpcInvocation.getTargetMethod())) {
                                    if (method.getReturnType().equals(Void.TYPE)) {
                                        method.invoke(aimObject, rpcInvocation.getArgs());
                                    } else {
                                        result = method.invoke(aimObject, rpcInvocation.getArgs());
                                    }
                                    break;
                                }
                            }
                            rpcInvocation.setResponse(result);
                            RpcProtocol respRpcProtocol = new RpcProtocol(SERVER_SERIALIZE_FACTORY.serialize(rpcInvocation));
                            serverChannelReadData.getChannelHandler().writeAndFlush(respRpcProtocol);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public void startDataConsume(){
        Thread thread = new Thread(new ServerJobCoreHandle());
        thread.start();
    }
}

Client端

问题

Client端对于一些不关注任务执行结果的调用,没有处理手段,其实将这个问题处理掉也是变相的提升了我们框的处理能力。

解决方案

在发起调用时增加是否异步调用,针对异步调用的请求直接返回,不再阻塞等待结果。

落地

代码比较简单就不过多阐述了
在这里插入图片描述

在这里插入图片描述

总结

本次版本主要是针对于Server和Client端的并发处理能力进行了优化,Server端主要是通过对请求通过队列和业务线程异步化,使得Netty的NIO线程和业务逻辑解耦,以免阻塞时间过长的逻辑影响整个RPC框架的请求接入;Client端主要是增加了对异步请求的支持,变相的提升了整个框架的处理能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/151124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

酷早报:1月9日全球Web3加密行业重大资讯大汇总

2023年1月9日 星期一 【数据指标】 加密货币总市值&#xff1a;$0.84万亿 BTC市值占比&#xff1a;39.14% 恐慌贪婪指数&#xff1a;25 极度恐慌【今日快讯】 1、【政讯】 1.1、美债关键收益率曲线倒挂幅度创纪录以来新高 1.2.1、美联储博斯蒂克&#xff1a;倾向于将利率升至5%…

2020年MathorCup高校数学建模挑战赛—大数据竞赛A题移动通信基站流量预测求解全过程文档及程序

2020年MathorCup高校数学建模挑战赛—大数据竞赛 A题 移动通信基站流量预测 原题再现&#xff1a; 随着移动通信技术的发展&#xff0c;4G、5G 给人们带来了极大便利。移动互联网的飞速发展&#xff0c;使得移动流量呈现爆炸式增长&#xff0c;从而基站的流量负荷问题变得越来…

代码随想录第55天|● 392.判断子序列 ● 115.不同的子序列

392.判断子序列 dp[i][j] 表示以下标i-1为结尾的字符串s&#xff0c;和以下标j-1为结尾的字符串t&#xff0c;相同子序列的长度为dp[i][j]。 if (s[i - 1] t[j - 1])&#xff0c;那么dp[i][j] dp[i - 1][j - 1] 1;&#xff0c;因为找到了一个相同的字符&#xff0c;相同子…

当没有成熟案例可参考时,企业该如何实现数字化转型?

对于企业来说&#xff0c;数字化转型过程中&#xff0c;参考成熟的案例是可以提高成功率的。但是在现实中&#xff0c;很多企业由于行业、领先地位、技术保密性等原因&#xff0c;导致没有或者找不到可参考的数字化转型案例为自身提供经验。那么这种情况下该如何做呢&#xff0…

Java中日期和时间的类

文章目录JDK8之前日期和时间的APISystem类中的Date类中的java.util.Date类中的二个构造器二个方法java.sql.Date类中的实例化将java.sql.Date类对象转化为java.util.Date类的对象将java.util.Date类对象转化为java.sql.Date类的对象每日一考JDK8之前日期和时间的API System类中…

你对Bug了解多少?如何“正确的”向开发人员提出Bug?

目录 一、Bug的级别 二、Bug的生命周期 三、如何向开发人员提出Bug&#xff08;如何创建Bug&#xff09;? 四、跟开发产生争执怎么办&#xff1f;&#xff08;面试高频&#xff09; 一、Bug的级别 为什么Bug也要存在级别&#xff1f;不同的Bug等级&#xff0c;惩罚机制不一…

环形缓冲区

文章目录一. 什么是环形缓冲区&#xff1f;二、实现环形缓冲区&#xff1a;三、环形缓冲区示例代码&#xff1a;总结一. 什么是环形缓冲区&#xff1f; 环形缓冲区 是一段 先进先出 的循环缓冲区&#xff0c;有一定的大小&#xff0c;我们可以把它抽象理解为一块环形的内存。 …

快速掌握web服务器相关知识

目录 1.web服务器 2.HTTP的状态码 3.web实验 4.算法介绍 1.web服务器 web服务器指网站服务器&#xff0c;是指驻留与因特网上某种类型计算机的程序&#xff0c;可以向浏览器等WEB客户端提供文档&#xff0c;也可以放置网站文件&#xff0c;让全世界浏览&#xff1b;可以放置…

关于batchnormlization理解

论文一般是这两张典型图片引用wz博客辅助理解上图展示了一个batch size为2&#xff08;两张图片&#xff09;的Batch Normalization的计算过程&#xff0c;假设feature1、feature2分别是由image1、image2经过一系列卷积池化后得到的特征矩阵&#xff0c;feature的channel为2&am…

TCP三次握手和四次挥手

三次握手 先ping域名为www.baidu.com&#xff0c;便于DNS解析。ping走的协议就包括DNS、ARP和ICMP。 接着使用Wireshark去抓包&#xff0c;抓包这里导航栏直接过滤ip就可以了&#xff0c;输入ip.host 183.232.231.174 接着直接在浏览器输入百度域名www.baidu.com访问请求&am…

TikTok新规:严禁录播盗播,保护原创内容

让我们一起来看看今日都有哪些新鲜事吧&#xff01;01 2023年&#xff0c;TikTok将在社交买家渗透率和用户使用时间上面成为美国第一 eMarketer最新预测显示&#xff0c;TikTok的社交买家渗透率和用户使用时间正在迅速攀升&#xff0c;预计将在2023年分别超过Facobook和Youtub…

全景剖析阿里云容器网络数据链路(一):Flannel

作者&#xff1a;余凯 本系列文章由余凯执笔创作&#xff0c;联合作者&#xff1a;阿里云云原生应用平台 谢石 对本文亦有贡献 前言 近几年&#xff0c;企业基础设施云原生化的趋势越来越强烈&#xff0c;从最开始的 IaaS 化到现在的微服务化&#xff0c;客户的颗粒度精细化…

基于单机最高能效270亿参数GPT模型的文本生成与理解

作者&#xff1a;李鹏&#xff0c;王玮&#xff0c;陈嘉乐&#xff0c;黄松芳&#xff0c;黄俊 单位&#xff1a;阿里云智能机器学习平台PAI & 达摩院自然语言基础技术 概述 GPT模型能较好的处理文本生成领域的各种任务&#xff0c;比如文本补全&#xff0c;自由问答&am…

scala 流计算之 aggregate()

函数参数详解 def aggregate[B](z: >B)(seqop: (B, A) > B, combop: (B, B) > B): BB: 函数返回结果的数据类型&#xff1b;z&#xff1a;聚类前的参数的初始化值&#xff1b;seqop&#xff1a;是用于序列运算的运算符&#xff0c;用于计算所述集合中每个元素的总和&a…

JAVA环境安装及配置

目录 一、前言 二、JAVA下载及安装配置 1、下载SDK开发包 2、安装SDK 3、环境变量配置 一、前言 大学毕业前系统学习过JAVA&#xff0c;记得当时还是1.6版本&#xff0c;并且特意研读了我人生中第一本最厚的图书《JAVA学习笔记》。掌握了那时比较流行的框架SSH&#xff0c;…

场景编程集锦 - 趣谈验证码

1. 场景描述 或许是近年来电话推销机器人太泛滥了&#xff0c;常常搞得正常的电话销售“灰头土脸”。有人为了验证对方究竟是人还是机器&#xff0c;竟想出来各种各样的奇葩手段。最近一小伙接到了一个汽车推销电话&#xff0c;但他听声音无法判断对方是不是人工客服人员。尽管…

大数据导论笔记

视频课林子雨老师 大数据导论 网页笔记预习大数据导论 大数据导论复习笔记 一、大数据概述 1.数据的概念、类型和组织形式 数据概念 数据类型 &#xff08;1&#xff09;数据基本类型 数据类型包括文本&#xff0c;图片&#xff0c;音频&#xff0c;视频等 数据组织形式 2…

【UE4 第一人称射击游戏】30-简单的任务提示功能

上一篇&#xff1a;UE4 第一人称射击游戏】29-流畅的枪械移动本篇效果&#xff1a;到达指定位置后&#xff0c;右上角会出现新的任务提示信息步骤&#xff1a;打开“ThirdPersonCharacter”&#xff0c;添加一个string类型变量默认值设为“Progress Through The Level”打开“F…

.Net Core实现健康检查

ASP.NET Core 提供运行状况检查中间件和库&#xff0c;以用于报告应用基础结构组件的运行状况。运行状况检查由应用程序作为 HTTP 终结点公开。可以为各种实时监视方案配置运行状况检查终结点&#xff1a; 运行状况探测可以由容器业务流程协调程和负载均衡器用于检查应用的状态…

MySQL 8.0 多实例安装

规划&#xff1a;主要就是data目录和port 端口以及socket 文件路径的差异管理&#xff1a; 配置文件准备 mkdir -p /data/330{6..8}/data chown -R mysql.mysql /data/* cat > /data/3306/my.cnf <<EOF [mysqld] usermysql basedir/usr/local/mysql datadir/data/3306…