手撕RPC——实现简单的RPC调用

news2025/1/10 18:23:38

手撕RPC——实现简单的RPC调用

  • 一、场景设计
  • 二、设计思路
    • 2.1 客户端的设计
    • 2.2 服务端的设计
    • 2.3 通信设计
  • 三、代码实现
    • 3.1 定义用户信息
    • 3.2 用户服务接口
    • 3.3 用户服务接口实现
    • 3.4 定义消息格式
    • 3.5 实现动态代理类
    • 3.6 封装信息传输类
    • 3.7 定义服务端Server接口
    • 3.8 实现RpcServer接口
    • 3.9 实现WorkThread类
    • 3.10 实现本地服务存放器
    • 3.11 客户端主程序
    • 3.12 服务端主程序

一、场景设计

现在A,B位于不同的服务器上,但现在A想调用B的某个方法,如何实现呢?

服务端B
有一个用户表

  1. UserService 里有一个功能:getUserByUserId(Integer id)
  2. UserServiceImpl 实现了UserService接口和方法

客户端A
调用getUserByUserId方法, 内部传一个Id给服务端,服务端查询到User对象返回给客户端

如何实现以上调用过程呢?

二、设计思路

主要考虑客户端、服务端、以及双方如何通信才能实现此功能

2.1 客户端的设计

  1. 调用getUserByUserId方法时,内部将调用信息处理后发送给服务端B,告诉B我要获取User
  2. 外部调用方法,内部进行其它的处理——这种场景我们可以使用动态代理的方式,改写原本方法的处理逻辑

2.2 服务端的设计

  1. 监听到A的请求后,接收A的调用信息,并根据信息得到A想调用的服务与方法
  2. 根据信息找到对应的服务,进行调用后将结果发送回给A

2.3 通信设计

  1. 使用Java的socket网络编程进行通信
  2. 为了方便A ,B之间 对接收的消息进行处理,我们需要将请求信息和返回信息封装成统一的消息格式

三、代码实现

在此部分我们将理论转化为代码,分别实现客户端和服务端。

项目目录结构
在这里插入图片描述

3.1 定义用户信息

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    // 客户端和服务端共有的
    private Integer id;
    private String userName;
    private Boolean sex;
}

3.2 用户服务接口

public interface UserService {
    // 客户端通过这个接口调用服务端的实现类
    User getUserByUserId(Integer id);
    //新增一个功能
    Integer insertUserId(User user);
}

3.3 用户服务接口实现

public class UserServiceImpl implements UserService {
    @Override
    public User getUserByUserId(Integer id) {
        System.out.println("客户端查询了"+id+"的用户");
        // 模拟从数据库中取用户的行为
        Random random = new Random();
        User user = User.builder().userName(UUID.randomUUID().toString())
                .id(id)
                .sex(random.nextBoolean()).build();
        return user;
    }

    @Override
    public Integer insertUserId(User user) {
        System.out.println("插入数据成功"+user.getUserName());
        return user.getId();
    }
}

3.4 定义消息格式

//定义请求信息格式RpcRequest
@Data
@Builder
public class RpcRequest implements Serializable {
    //服务类名,客户端只知道接口
    private String interfaceName;
    //调用的方法名
    private String methodName;
    //参数列表
    private Object[] params;
    //参数类型
    private Class<?>[] paramsType;
}//定义返回信息格式RpcResponse(类似http格式)
@Data
@Builder
public class RpcResponse implements Serializable {
    //状态码
    private int code;
    //状态信息
    private String message;
    //具体数据
    private Object data;
    //构造成功信息
    public static RpcResponse sussess(Object data){
        return RpcResponse.builder().code(200).data(data).build();
    }
    //构造失败信息
    public static RpcResponse fail(){
        return RpcResponse.builder().code(500).message("服务器发生错误").build();
    }
}

3.5 实现动态代理类

@AllArgsConstructor
public class ClientProxy implements InvocationHandler {
    //传入参数service接口的class对象,反射封装成一个request
    private String host;
    private int port;

    //jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端)
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //构建request
        RpcRequest request=RpcRequest.builder()
                .interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .params(args).paramsType(method.getParameterTypes()).build();
        //IOClient.sendRequest 和服务端进行数据传输
        RpcResponse response= IOClient.sendRequest(host,port,request);
        return response.getData();
    }
     public <T>T getProxy(Class<T> clazz){
        Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
        return (T)o;
    }
}

3.6 封装信息传输类

public class IOClient {
    //这里负责底层与服务端的通信,发送request,返回response
    public static RpcResponse sendRequest(String host, int port, RpcRequest request){
        try {
            Socket socket=new Socket(host, port);
            ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());

            oos.writeObject(request);
            oos.flush();

            RpcResponse response=(RpcResponse) ois.readObject();
            return response;
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        }
    }
}

3.7 定义服务端Server接口

public interface RpcServer {
    //开启监听
    void start(int port);
    void stop();
}

3.8 实现RpcServer接口

@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
    private ServiceProvider serviceProvide;
    @Override
    public void start(int port) {
        try {
            ServerSocket serverSocket=new ServerSocket(port);
            System.out.println("服务器启动了");
            while (true) {
                //如果没有连接,会堵塞在这里
                Socket socket = serverSocket.accept();
                //有连接,创建一个新的线程执行处理
                new Thread(new WorkThread(socket,serviceProvide)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void stop() {
    }
}

3.9 实现WorkThread类

WorkThread类负责启动线程和客户端进行数据传输,WorkThread类中的getResponse方法负责解析收到的request信息,寻找服务进行调用并返回结果。

@AllArgsConstructor
public class WorkThread implements Runnable{
    private Socket socket;
    private ServiceProvider serviceProvide;
    @Override
    public void run() {
        try {
            ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
            //读取客户端传过来的request
            RpcRequest rpcRequest = (RpcRequest) ois.readObject();
            //反射调用服务方法获取返回值
            RpcResponse rpcResponse=getResponse(rpcRequest);
            //向客户端写入response
            oos.writeObject(rpcResponse);
            oos.flush();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    private RpcResponse getResponse(RpcRequest rpcRequest){
        //得到服务名
        String interfaceName=rpcRequest.getInterfaceName();
        //得到服务端相应服务实现类
        Object service = serviceProvide.getService(interfaceName);
        //反射调用方法
        Method method=null;
        try {
            method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
            Object invoke=method.invoke(service,rpcRequest.getParams());
            return RpcResponse.sussess(invoke);
        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            e.printStackTrace();
            System.out.println("方法执行错误");
            return RpcResponse.fail();
        }
    }
}

3.10 实现本地服务存放器

因为一个服务器会有多个服务,所以需要设置一个本地服务存放器serviceProvider存放服务,在接收到服务端的request信息之后,我们在本地服务存放器找到需要的服务,通过反射调用方法,得到结果并返回

//本地服务存放器
public class ServiceProvider {
    //集合中存放服务的实例
    private Map<String,Object> interfaceProvider;

    public ServiceProvider(){
        this.interfaceProvider=new HashMap<>();
    }
    //本地注册服务
    public void provideServiceInterface(Object service){
        String serviceName=service.getClass().getName();
        Class<?>[] interfaceName=service.getClass().getInterfaces();

        for (Class<?> clazz:interfaceName){
            interfaceProvider.put(clazz.getName(),service);
        }

    }
    //获取服务实例
    public Object getService(String interfaceName){
        return interfaceProvider.get(interfaceName);
    }
}

3.11 客户端主程序

public class TestClient {
    public static void main(String[] args) {
        ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999);
        UserService proxy=clientProxy.getProxy(UserService.class);

        User user = proxy.getUserByUserId(1);
        System.out.println("从服务端得到的user="+user.toString());

        User u=User.builder().id(100).userName("wxx").sex(true).build();
        Integer id = proxy.insertUserId(u);
        System.out.println("向服务端插入user的id"+id);
    }
}

3.12 服务端主程序

public class TestServer {
    public static void main(String[] args) {
        UserService userService=new UserServiceImpl();

        ServiceProvider serviceProvider=new ServiceProvider();
        serviceProvider.provideServiceInterface(userService);

        RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider);
        rpcServer.start(9999);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1854859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ATU Book - i.MX8系列 - OS】NXP i.MX Linux Desktop (Ubuntu) BSP 开发环境架设

一、概述 谈论嵌入式系统的开发环境&#xff0c;不得不提起近年来相当实用的 Yocto 建构工具。此工具拥有极为灵活的平台扩展性&#xff0c;广泛的软体套件与社群支持、多平台支援整合性&#xff0c;能够满足开发者特定需求和多种热门的嵌入式系统架设&#xff0c;已成为当今顶…

css如何动态累计数字?

导读&#xff1a;css如何动态累计数字&#xff1f;用于章节目录的序列数生成&#xff0c;用css的计数器实现起来比 js方式更简单&#xff01; 伪元素 ::after ::before伪元素设置content 可以在元素的首部和尾部添加内容&#xff0c;我们要在元素的首部添加序列号&#xff0c…

Cesium如何高性能的实现上万条道路的流光穿梭效果

大家好&#xff0c;我是日拱一卒的攻城师不浪&#xff0c;专注可视化、数字孪生、前端、nodejs、AI学习、GIS等学习沉淀&#xff0c;这是2024年输出的第20/100篇文章&#xff1b; 前言 在智慧城市的项目中&#xff0c;经常会碰到这样一个需求&#xff1a;领导要求将全市的道路…

支持WebDav的网盘infiniCloud(静读天下,Zotero 等挂载)

前言 WebDav是一种基于HTTP的协议&#xff0c;允许用户在Web上直接编辑和管理文件&#xff0c;如复制、移动、删除等。 尽管有一些网盘支持WebDav&#xff0c;但其中大部分都有较多的使用限制。这些限制可能包括&#xff1a;上传文件的大小限制、存储空间的限制、下载速度的限…

借助AI快速提高英语听力:如何获得适合自己的听力材料?

英语听力是英语学习中的一个重要组成部分&#xff0c;它对于提高语言理解和交流能力至关重要。可理解性学习&#xff08;comprehensible input&#xff09;是语言习得理论中的一个概念&#xff0c;由语言学家Stephen Krashen提出&#xff0c;指的是学习者在理解语言输入的同时&…

残差网络中的基础结构——残差模块

残差网络的思想 随着网络深度的增加&#xff0c;网络能获取的信息量随之增加&#xff0c;而且提取的特征更加丰富。然而在残差结构提出之前&#xff0c;实验证明&#xff0c;随着网络层数的增加&#xff0c;模型的准确率起初会不断提高&#xff0c;直至达到最大饱和值。然后&a…

194.回溯算法:组合总和||(力扣)

代码解决 class Solution { public:vector<int> res; // 当前组合的临时存储vector<vector<int>> result; // 存储所有符合条件的组合// 回溯函数void backtracing(vector<int>& candidates, int target, int flag, int index, vector<bool>…

不需要new关键字创建实例?jQuery是如何做到的

这篇文章是jQuery源码专栏的开篇文章了&#xff0c;有人会问为什么都2024年了&#xff0c; 还要研究一个已经过时的框架呢&#xff0c;其实&#xff0c;jQuery对比vue和react这种响应式框架&#xff0c;其在使用上算是过时的&#xff0c;毕竟直接操作DOM远不如操作虚拟DOM来的方…

力扣SQL50 游戏玩法分析 IV 子查询

Problem: 550. 游戏玩法分析 IV &#x1f468;‍&#x1f3eb; 参考题解 这个SQL查询的目的是计算每个玩家在登录后的第二天参与活动的比例。查询使用了子查询和左连接来实现这一目的。下面是查询的详细解释&#xff0c;包括每个部分的作用和注释&#xff1a; -- 计算每个玩…

LLm与微调入门

前言 两种 Finetune 范式 增量预训练微调 使用场景&#xff1a;让基座模型学习到一些新知识&#xff0c;如某个垂类领域的常识 训练数据&#xff1a;文章、书籍、代码等 指令跟随微调 使用场景&#xff1a;让模型学会对话模板&#xff0c;根据人类指令进行对话 训练数据…

C++第二学期期末考试选择题题库(qlu题库,自用)

又到了期末周&#xff0c;突击一下c吧— 第一次实验 1、已知学生记录的定义为&#xff1a; struct student { int no; char name[20]; char sex; struct 注意年月日都是结构体&#xff0c;不是student里面的 { int year; int month; …

数据分析BI仪表盘搭建

BI仪表盘搭建六个原则&#xff1a; 1.仪表盘搭建符合业务的阅读&#xff0c;思考和操作逻辑。 2.明确仪表盘主题&#xff0c;你的用户对什么感兴趣。 普通业务人员&#xff1a;销售&#xff1a;注册&#xff0c;激活&#xff0c;成交投放&#xff1a;消耗&#xff0c;转化率…

构建下一代数据解决方案:SingleStore、MinIO 和现代 Datalake 堆栈

SingleStore 是专为数据密集型工作负载而设计的云原生数据库。它是一个分布式关系 SQL 数据库管理系统&#xff0c;支持 ANSI SQL&#xff0c;并因其在数据引入、事务处理和查询处理方面的速度而受到认可。SingleStore 可以存储关系、JSON、图形和时间序列数据&#xff0c;以满…

Java面试八股之简述JVM内存结构

简述JVM内存结构 Java虚拟机&#xff08;JVM&#xff09;内存结构主要分为线程私有区域和线程共享区域两大部分&#xff0c;具体组成部分如下&#xff1a; 线程私有区域 程序计数器&#xff08;Program Counter Register&#xff09;&#xff1a; 记录当前线程执行的字节码行…

24-6-23-读书笔记(七)-《文稿拾零》豪尔赫·路易斯·博尔赫斯(第三辑)

文章目录 《文稿拾零》阅读笔记记录总结 《文稿拾零》 《文稿拾零》超厚的一本书&#xff08;570&#xff09;&#xff0c;看得时间比较长&#xff0c;这本书是作者零散时间写的一些关于文学性质的笔记&#xff0c;读起来还是比较无趣的&#xff0c;非常零散&#xff0c;虽然有…

CP AUTOSAR标准之FlashTest(AUTOSAR_CP_SWS_FlashTest)(更新中……)

1 简介和功能概述 该规范指定了AUTOSAR基础软件模块Flash测试驱动程序的功能、API和配置。   此闪存测试模块提供测试恒定内存的算法。恒定内存可以是数据/程序闪存、程序SRAM、锁定缓存,可以嵌入微控制器中,也可以通过内存映射连接到微控制器。为简化起见,SW模块称为闪存…

秋招突击——第六弹——Java的SSN框架快速入门——MyBatisPlus

文章目录 引言正文入门案例整和MybatisPlus的相关内容 概述标准数据层开发分页查询DQL编程控制条件查询——NULL值处理 查询投影查询条件设定等于操作范围查询模糊查询分组查询 字段映射和表名映射 DML编程控制——增删改查相关操作添加操作id生成策略控制 删除操作多数据删除逻…

面试:关于word2vec的相关知识点Hierarchical Softmax和NegativeSampling

1、为什么需要Hierarchical Softmax和Negative Sampling 从输入层到隐含层需要一个维度为NK的权重矩阵&#xff0c;从隐含层到输出层又需要一个维度为KN的权重矩阵&#xff0c;学习权重可以用反向传播算法实现&#xff0c;每次迭代时将权重沿梯度更优的方向进行一小步更新。但…

Qt画实时曲线图

Qt引入QcustomPlot 首先下载QcustomPlot源代码&#xff0c;https://github.com/qcustomplot/qcustomplot 下载zip文件 运行所下载的项目生成库文件libqcustomplotd2.a文件和qcustomplotd2.dll文件。 在项目中添加printsupport。 并将qcustomplot.h文件和qcustomplot.cpp文…

RMDA通信1:通信过程和优势,以太网socket为何用户空间拷贝到内核空间

视频分享&#xff1a; 1.1 RDMA基本原理和优势&#xff0c;以太网socket通信为什么要用户空间拷贝到内核空间_哔哩哔哩_bilibili 一、以太网socket通信 1.1 以太网socket通信过程 1、发送端发起一次通信操作&#xff0c;数据由用户空间拷贝到内核空间。拷贝由CPU完成&#x…