手写RPC框架,与Spring整合,基于Netty作为网络框架,protobuf作为序列化协议。可以和实际项目相结合完美运行

news2025/1/13 2:50:11

注:由于RPC框架过于庞大所以本篇文章只是作为阅读RPC源码的一个指导,设计精巧之处还需要各位读者结合源码进行实践

RPC源码地址:https://github.com/xhpcd/rpc

git clone: https://github.com/xhpcd/rpc.git

如果觉得有收获麻烦留下一颗start

先看效果

项目分为两部分一部分时rpc包下的用来实现rpc服务端和客户端逻辑,一部分是rpc-demo作为框架的测试,api是公共接口,公用的部分

服务端

@RpcService(interfaceClass = OrderService.class)
public class OrderServiceImpl implements OrderService {

    @Override
    public String getOrder(String userId, String orderId) {
        return "user: "+userId+"orderId:"+orderId;
    }
}

服务端就是见到那一个springboot项目引入了rpc-server

客户端

@RestController
@RequestMapping("/order")
public class OrderController {


    @RpcRemote
    private OrderService orderService;


    @GetMapping("/getOrder")
    public String getOrder(String userId,String orderId){
        return orderService.getOrder(userId,orderId);
    }

}

使用自定义注解然后整合springboot同时引入rpc-client,接着就可以远程调用这里的getOrder接口

效果

先从服务架构演变开始

单体架构是最初的软件设计模式,所有的功能模块都集中在一个单独的应用程序中。这种架构的优点包括:

 单体架构的设计和部署相对简单,易于理解和维护。

 所有的功能模块集中在一起,方便统一管理和测试。

 由于模块间调用的开销较小,单体架构通常具有较好的性能。

但是,单体架构也存在一些缺点:

当应用程序规模增大时,单体架构会变得难以扩展和维护。

 各功能模块之间存在较强的耦合,不利于技术栈的灵活选择。

 对单体应用的任何变更都需要重新部署整个应用程序。

就算是水平扩展依然解决不了的问题就是模块热点问题,一个模块比较热点会导致其它模块受到性能的影响,因为毕竟是部署到同一台服务器

紧接着出现了垂直架构

了解决单体架构的问题,垂直架构应运而生。垂直架构将应用程序划分为多个相对独立的服务,每个服务都专注于特定的功能领域。这种架构的优点包括:

 各个服务可以独立扩展,提高了应用程序的整体可扩展性。

 每个服务可以选择最合适的技术栈,提高了技术选型的灵活性。

 只需部署变更的服务,而不需要重新部署整个应用程序。

但是,垂直架构也存在一些缺点:

各个服务之间需要通过网络进行通信,引入了额外的复杂性。

 服务之间的依赖关系需要仔细管理,以确保应用程序的正常运行。

分布式环境下,监控和故障排查变得更加复杂。

为了进一步解决垂直架构的问题,分布式架构应运而生。分布式架构将应用程序拆分为更细粒度的服务,每个服务都可以独立部署和扩展。这种架构的优点包括:

每个服务可以独立扩展,提高了应用程序的整体可扩展性。

每个服务可以选择最合适的技术栈,提高了技术选型的灵活性。

只需部署变更的服务,而不需要重新部署整个应用程序。

服务之间的故障可以相互隔离,提高了应用程序的可靠性。

但是,分布式架构也存在一些缺点:

服务之间需要通过网络进行通信,引入了额外的复杂性。

服务之间的依赖关系需要仔细管理,以确保应用程序的正常运行。

分布式环境下,监控和故障排查变得更加复杂。

微服务架构

微服务架构是分布式架构的一种特殊形式,它将应用程序拆分为更小、更专注的服务。这种架构的优点包括:

各个服务之间高度解耦,易于独立开发、部署和扩展。

每个服务可以选择最合适的技术栈,提高了技术选型的灵活性。

各个服务可以独立扩展,提高了应用程序的整体可扩展性。

 服务之间的故障可以相互隔离,提高了应用程序的可靠性。

RPC 作为一种常见的服务间通信机制,在从单体架构到微服务架构的演化过程中发挥了重要作用。RPC 在从单体架构到微服务架构的演化过程中,一直在解决服务间通信、服务发现和服务调用透明性等关键问题,为不同架构模式下的服务治理提供了重要支持。

正文:

RPC是远程调用的缩写,它使得客户端像调用本地方法一样去调用远端的接口

其本质就是通过把参数调用的接口等信息通过网络传递给远端服务,然后接受响应。

既然是远程调用就需要保证调用可靠

在分布式环境下,服务提供者的地址和端口信息可能会发生变化,服务消费者需要能够动态发现可用的服务实例。Zookeeper 提供了一个服务注册中心,服务提供者可以将自己的信息注册到 Zookeeper 上,服务消费者可以通过查询 Zookeeper 来发现可用的服务。这解决了服务发现的问题。

当有多个服务提供者实例时,RPC 系统需要能够将请求合理地分配到不同的实例上,以实现负载均衡。Zookeeper 可以根据服务实例的负载情况,为服务消费者提供一个负载均衡的服务实例列表。

RPC 系统通常需要一些全局性的配置信息,如超时时间、重试次数等。Zookeeper 可以作为配置中心,存储这些配置信息,服务提供者和消费者可以动态地从 Zookeeper 获取配置,避免了配置信息的硬编码。

通过zookeeper我们可以实现服务的协调管理

接下来要解决的就是网络通信的问题,本例子采用的是基于TCP协议的通信而不是基于Http的,基于TCP的通信我们就需要自己实现通信过程。所以选用的是Java高性能网络框架Netty(之前文章对其高性能做了分析)。

我们解决了网络和服务协调的问题后,我们还要指定一些规则来解决tcp流式传输的问题粘包粘包的问题,以及通信信息的格式。粘包问题有许多种解决方法这里就采用比较通用简单的方式

消息长度头:在每个数据包的头部添加一个长度字段,用于表示数据包的长度,接收方可以根据长度字段来判断每个数据包的结束位置。

接下来就要解决java数据在网络传输的过程了,其实可以采用java自带的序列化方法,但是相比较来说还是有点冗余了,所以采用google的protobuf序列化。

通用部分

为了通用我们把RPC过程需要的请求响应进行封装

public class RpcRequest {
    private String requestId;
    private String className;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;
}

public class RpcResponse {
    private String requestId;
    private Object result;
    private Throwable cause;

    public boolean hasError(){
        return cause!=null;
    }
}

我们首先先来完成rpc服务端

服务端实现类

@RpcService(interfaceClass = OrderService.class)
public class OrderServiceImpl implements OrderService {

    @Override
    public String getOrder(String userId, String orderId) {
        return "user: "+userId+"orderId:"+orderId;
    }
}

首先先要完成的就是服务的注册所,谓服务的注册就是来向zk提供自己用于远程服务的信息

RpcService是我们的注解,这个注解的特殊之处在于其上整合了@Component注解可以作为bean被spring管理,然后同时也指明了intefaceClass作为zk的服务注册信息

作为服务提供类的实现我们想把其注入的容器中,我们同时还需要能在调用时获得到此类

首先我们封装了一个spring工具类利用Aware机制注入spring容器

@Component("springBeanFactory")
public class SpringBeanFactory implements ApplicationContextAware {
    private static ApplicationContext context;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    public static <T> T getBean(Class<T> cls){
        return context.getBean(cls);
    }

    public static Object getBean(String name){
        return context.getBean(name);
    }

    public static Map<String,Object> getBeanByAnnotation(Class<? extends Annotation> annotaionClass){
        return context.getBeansWithAnnotation(annotaionClass);
    }
}
public class ZkRegistry implements RpcRegistry {

    @Autowired
    private ServerZKit serverZKit;

    @Autowired
    private RpcServerConfiguration rpcServerConfiguration;

    @Autowired
    private RpcServer rpcServer;
    @Override
    public void serviceRegistry() {
        Map<String, Object> beanByAnnotation = SpringBeanFactory.getBeanByAnnotation(RpcService.class);
        if(beanByAnnotation!=null&&!beanByAnnotation.isEmpty()) {
            //根节点的创建
            serverZKit.createRootNode();
            //ip获取
            String serverIp = IpUtils.getRealIp();
            for (Map.Entry<String, Object> entry : beanByAnnotation.entrySet()) {
                RpcService annotation = entry.getValue().getClass().getAnnotation(RpcService.class);
                Class<?> interfaceClass = annotation.interfaceClass();
                //服务名称
                String name = interfaceClass.getName();
                serverZKit.createPersistentNode(name);
                String providerNode = serverIp+":"+rpcServerConfiguration.getRpcPort();
                serverZKit.createNode(name+"/"+providerNode);
                log.info("服务{}-{}完成了注册",name,providerNode);
            }
            rpcServer.start();
        }
    }
}

这里我们完成了向zk存储信息接下来我们就来看一下整合spring部分

首先通过工具类获通过自定义注解得到spring容器中的用于提供服务的实现类,紧接着获得其接口信息,并把接口信息和端口号等信息(来自配置文件)封装作为节点名称保存在zk中

紧接着我们来看服务端通信关键部分

   protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {

        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(rpcRequest.getRequestId());
        try {
            String className = rpcRequest.getClassName();
            String methodName = rpcRequest.getMethodName();
            Object[] parameters = rpcRequest.getParameters();
            Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
            //通过spring容器获取实现类
            Object bean = SpringBeanFactory.getBean(Class.forName(className));
            Method method = bean.getClass().getMethod(methodName, parameterTypes);
            Object result = method.invoke(bean,parameters);
            rpcResponse.setResult(result);
        } catch (Exception e){
            rpcResponse.setCause(e);
            log.error("RpcRequestHandler service has error");
        }finally {
            channelHandlerContext.channel().writeAndFlush(rpcResponse);
        }
    }

通过请求传递的接口,参数,参数类型等信息获得到我们注入到spring容器中的服务提供bean,然后通过反射调用对应方法进而得到结果进行封装,至于和netty的结合源码中有详细代码本文只做辅助阅读。

接下来我们来看客户端代理

作为代理我们需要在其字段上加上注解

@Documented
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcRemote {
    String value() default "";

    Class<?> interfaceClass() default void.class;
    
}

作为我们的注入依据,为了和spring整合我们利用了spring bean的生命周期

@Component
public class RpcAnnotationProcessor implements BeanPostProcessor {
    @Autowired
    private ProxyFactory proxyFactory;
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> aClass = bean.getClass();
        Field[] declaredFields = aClass.getDeclaredFields();
        for (Field declaredField : declaredFields) {
            RpcRemote annotation = declaredField.getAnnotation(RpcRemote.class);
            if(annotation != null){
                declaredField.setAccessible(true);
                Class<?> type = declaredField.getType();
                Object o = proxyFactory.newProxyInstance(type);
                try {

                    declaredField.set(bean,o);
                } catch (IllegalAccessException e) {
                    log.error("filed {} inject field",declaredField);
                     throw new RuntimeException(e);
                }
            }

        }
        return bean;
    }
}

我们通过遍历每一个bean 然后去遍历其所有的字段查找被RpcRemote标志的。然后通过cglib动态代理技术(之前文章有详细讲述)生成代理类封装了通信过程。然后利用反射进行了属性的注入

  public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {

        log.info("method:{} 执行代理调用",method.getName());
        RpcRequest request = RpcRequest.builder().parameters(objects).parameterTypes(method.getParameterTypes())
                .className(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .requestId(UUID.randomUUID().toString()).build();

        RpcRequestManager rpcRequestManager = SpringBeanFactory.getBean(RpcRequestManager.class);
        RpcResponse response = rpcRequestManager.sendRequest(request);
        if(response.hasError()){
            throw response.getCause();
        }
        return response.getResult() ;

    }

把请求封装然后通过Netty进行传输。在进行传输之前我们要面对一个问题就是传输给谁。

我们此时可以利用zk获得到远程服务提供者的ip和端口。这时我们就需要进行服务的发现,同时还要监听节点,因为服务端可能出现一些服务器上下线的变化,然后我们把信息缓存起来

   public void rpcServerDiscovery() {

        List<String> service = clientZKit.getService();
        for (String s : service) {
            List<ServiceProvider> serviceInfos = clientZKit.getServiceInfos(s);
            serviceProviderCache.put(s,serviceInfos);
            clientZKit.subscribeZKNode(s);
            log.info("client subscribe {},services{}",s,serviceInfos);
        }
    }
    public void subscribeZKNode(String name){
        String node = configuration.getZkRoot() + "/" + name;
        zkClient.subscribeChildChanges(node, new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> list) throws Exception {
                if(!CollectionUtils.isEmpty(list)){
                    List<ServiceProvider> serviceProviders = convertToProviderService(s, list);
                    serviceProviderCache.update(s,serviceProviders);
                }
            }
        });
    }

这时拿到了服务提供者,我们在面对众多提供者的时候又有问题了那就是如何选择有许多种方法比如说轮询,随机,hash等我在代码中都有实现可以对看一下。

接下来看一下Netty如何建立连接进行通讯

 Channel channel;
        if(!RpcHolder.channelExist(serviceProvider.getServerIp(),serviceProvider.getRcpPort())){
            NioEventLoopGroup group = new NioEventLoopGroup(NettyRuntime.availableProcessors()*2);
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("FrameDecoder",new FrameDecoder());
                            pipeline.addLast("RpcResponseDecoder",new RpcResponseDecoder());
                            pipeline.addLast("FrameEncoder",new FrameEncoder());
                            pipeline.addLast("RpcRequestEncoder",new RpcRequestEncoder());
                            pipeline.addLast("RpcResponseHandler",new RpcResponseHandler());
                        }
                    });
            try {
                ChannelFuture future = bootstrap.connect(new InetSocketAddress(serviceProvider.getServerIp(), serviceProvider.getRcpPort())).sync();
                if (future.isSuccess()) {
                    channel = future.channel();
                    RpcHolder.setChannelMapping(new ChannelMapping(serviceProvider.getServerIp(), serviceProvider.getRcpPort(), channel));
                }
            }catch (Exception e){
                group.shutdownGracefully();
            }
        }


        try {

            ChannelMapping channelMapping = RpcHolder.getChannelMapping(serviceProvider.getServerIp(), serviceProvider.getRcpPort());
            channel = channelMapping.getChannel();
            channel.writeAndFlush(request);
                RequestPromise requestPromise = new RequestPromise(channel.eventLoop());
                RpcHolder.set(request.getRequestId(),requestPromise);
                RpcResponse rpcResponse = (RpcResponse) requestPromise.get();
                return rpcResponse;

        }catch (Exception e){
            e.printStackTrace();
        }
        return new RpcResponse();
    }

这里就涉及到一个线程间通信问题了,Netty有自己的线程,所以为了共享结果我们使用了promise,但是由于Netty可能有许多人调用所以我们根据请求Id和对应的promise进行了hash存储,这样就可以在众多并发中找到对应的promise。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1959382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用 Easysearch 打造企业内部知识问答系统

大家可能都有这样的经历&#xff0c;刚入职一家企业时&#xff0c;同事往往会给你分享一些文档资料&#xff0c;有可能是产品信息、规章制度等等。这些文档有的过于冗长&#xff0c;很难第一时间找到想要的内容。有的已经有了新版本&#xff0c;但员工使用的还是老版本。 基于…

centos7-8/redhat7-8一键安装配置vsftp服务

1.脚本介绍 1.1.介绍&#xff1a; linux下一键安装及配置vsftpd服务 &#xff0c;通过执行install.sh脚本&#xff0c;脚本会根据参数区域的值执行安装和配置vsftp服务&#xff0c;安装后会创建一个默认ftp用户wangxf密码wangxf2023 1、支持自定义安装(更改脚本内参数值) 2、…

javaweb_01:http

一、什么是http HTTP 是一个简单的请求-响应协议&#xff0c;它通常运行在TCP之上&#xff0c;它指定了客户端可能发送给服务器什么样的消息&#xff0c;以及得到什么样的响应。请求和响应都是以ASCll码的形式给出&#xff1b;而消息内容则具有一个类似MIME的格式。这个简单模…

面试Redis篇

本篇主要总结一下面试官可能会在Redis上询问的主要问题。 Redis的使用场景 问&#xff1a;你的项目中哪些场景中用到了Redis&#xff1f; 答&#xff1a;根据你的项目回答&#xff0c;一般会在一下几个部分缓存、分布式锁...... 缓存 缓存穿透 查询一个不存在的数据&#xff…

黑马点评--给店铺类型查询添加缓存

controller/ShopTypeController.java /*** 店铺分类查询&#xff0c;用于展示首页头部店铺分类* return*/GetMapping("list")public Result queryTypeList() {return typeService.queryList();} service/IShopTypeService.java Result queryList(); service/impl/S…

4234324

作者主页&#xff1a; 作者主页 本篇博客专栏&#xff1a;C 创作时间 &#xff1a;2024年6月20日 最后&#xff1a; 十分感谢你可以耐着性子把它读完和我可以坚持写到这里&#xff0c;送几句话&#xff0c;对你&#xff0c;也对我&#xff1a; 1.一个冷知识&#xff1a; …

alg-in-go-1:动态连通性问题

前言&#xff1a; 有本算法书叫&#xff1a;Algorithms 4th Edition.pdf&#xff0c;它是用java实现的&#xff0c;但是算法的内核是一样&#xff0c;不在乎于语言&#xff0c;考虑到java当今的…, 咱们尝试用golang学习算法. 问题&#xff1a; 思考&#x1f914;&#xff…

弹幕背后:B站UP主创作服务解析

引言 在B站&#xff0c;每一条飘过的弹幕都是一个故事的碎片&#xff0c;它们汇聚成一幅幅生动的社交画卷。这里&#xff0c;不仅仅是一个视频分享平台&#xff0c;弹幕背后更是一个充满活力的创作者生态系统。B站以其独特的弹幕文化&#xff0c;为创作者和观众之间搭建起了一座…

【电控笔记-xuan】各种估测器扰动估计性能比较

各种扰动观测器观测结果 蓝色: 扰动值 隆博戈估测器扰动补偿 论文53disturb扰动补偿 2order eso 观测

《系统架构设计师教程(第2版)》第13章-层次式架构设计理论与实践-01-层次式体系结构概述

文章目录 1. 常用层次是架构2. 层次式架构设计的注意点2.1 污水池反模式2.2 应用变得庞大 本章教材又赘述了一遍架构的定义和层次架构风格的概述&#xff0c;我之前的笔记都写了 架构的定义回看《第7章-系统架构设计基础知识-01-软件架构&#xff08;Software Architecture&…

AD的问题

连续放置同规则元件&#xff1a;先选择再按Tab编辑放置&#xff1b; 拖动元件&#xff1a;&#xff08;shift 空格 &#xff1a;旋转元件&#xff1b;原理图中按x水平&#xff0c;按y垂直翻转&#xff09;按ctrl键可以丝滑流畅放置 测试距离&#xff1a;RM 距离单位转…

初学Mybatis之动态 SQL

动态 SQL 是指根据不同的条件生成不同的 SQL 语句 动态 SQL 详情请看链接 搭建环境&#xff1a; mysql 建立博客表 CREATE TABLE blog(id VARCHAR(50) NOT NULL COMMENT 博客id,title VARCHAR(100) NOT NULL COMMENT 博客标题,author VARCHAR(30) NOT NULL COMMENT 博客作者…

华为od机试真题:悄悄话(Python)

2024华为OD机试&#xff08;C卷D卷&#xff09;最新题库【超值优惠】Java/Python/C合集 题目描述 给定一个二叉树&#xff0c;每个节点上站一个人&#xff0c;节点数字表示父节点到该节点传递悄悄话需要花费的时间。 初始时&#xff0c;根节点所在位置的人有一个悄悄话想要传…

windows家庭版安装Hyper-V

uniapp开发鸿蒙需要开启Hyper-V&#xff0c;但家庭版默认没有&#xff0c;去网上搜索整理了一下。 1.检查是否安装过Hyper-V 直接搜索 Hyper-V&#xff0c;如果出现就代表有&#xff0c;如果没出现&#xff0c;就搜索 启用或关闭windows功能 。 如果有Hyper-V这一项&…

eclipse手动安装Ivy插件

1、下载四个文件 &#xff08;1&#xff09;从这个网址选择一个自己需要的版本的“ivy-”开头的文件夹进去&#xff08;是“ivy”开头&#xff0c;不是“ivyde”&#xff09; https://archive.apache.org/dist/ant/ivyde/updatesite/ 我这里选的是“ivy-2.5.0.final_201910201…

TortoiseSVN迁移到本地git

TortoiseSVN迁移到本地git 文章目录 TortoiseSVN迁移到本地git0 背景1 环境准备2 SVN库迁移到VisualSVN2.1 导出dump2.2 将dump文件灌入VisualSVN2.3 获取SVN仓最新URL 3 迁移到Git库中4 迁移分支到Git库 0 背景 之前在前东家工作都是采用git进行项目管理&#xff0c;高效便捷…

大模型应用中常听说的投毒实验是什么?

大模型应用中常听说的投毒实验是什么&#xff1f; 大模型投毒实验是指在训练或使用大规模人工智能模型&#xff08;如GPT-4等&#xff09;时&#xff0c;通过有意加入恶意数据或修改训练过程&#xff0c;使模型产生不正确或有害输出的行为。随着人工智能技术的快速发展&#x…

【深度学习】声码器(Vocoder),Vocos 论文

Vocos: Closing the gap between time-domain and Fourier-based neural vocoders for high-quality audio synthesis https://arxiv.org/abs/2306.00814 https://github.com/gemelo-ai/vocos?tabreadme-ov-file 文章目录 Vocos&#xff1a;弥合时域和基于傅里叶的神经声码器…

必看!50个ChatGPT顶尖学术论文指令,助你高效学术研究

随着人工智能技术的进步&#xff0c;AI已成为学术创作的重要工具。本文将为您展示如何利用AI来润色您的论文。我们精心整理了50个顶级ChatGPT学术论文指令&#xff0c;强烈建议您加以利用&#xff01; 这些指令不仅实用&#xff0c;还能大幅提升您的写作效率。无论是翻译难懂的…

高效的知识付费SaaS平台构建:探索Spring Cloud结合Spring Boot的最佳实践

知识付费平台&#xff1a;引领在线教育的未来 在数字化教育的浪潮中&#xff0c;知识付费平台以其便捷、高效的学习方式&#xff0c;迅速成为教育领域的新宠。该平台围绕用户需求构建&#xff0c;提供职业技能、生活兴趣、人文社科等多领域的专业知识&#xff0c;并通过视频播…