通过手写简易版RPC理解RPC原理

news2024/11/17 17:53:52

RPC是什么


所谓的RPC其实是为了不同主机的两个进程间通信而产生的,通常不同的主机之间的进程通信,程序编写需要考虑到网络通信的功能,这样程序的编写将会变得复杂。RPC就来解决这一问题的,一台主机上的进程对另外一台主机的进程发起请求时,内核会将请求转交给RPC client,RPC client经过报文的封装转交给目标主机的RPC server,RPC server就将报文进行解析,还原成正常的请求,转交给目标主机上的目标进程。在我们看来在就像是在同一台主机上的两个进程通信一样,完全没有意识到是在不同的主机上。因此RPC其实也可以看做是一种协议或者是编程框架,目的是为了简化分布式程序的编写。

RPC基本流程

大致过程如下:

Rpc Client通过传入的IP、端口号、调用类以及方法的参数,通过动态代理找到具体的调用类的方法,将请求的类、方法序列化,传输到服务端;

当Rpc Service收到请求以后,将传入类和方法反序列化,通过反射找到对应的类的方法进行调用,最后将返回结果进行序列化,返回给客户端;

Rpc Client收到返回值以后,进行反序列化,最后将结果展示;

DEMO

假如我现在有一个HelloService服务,有一个say的方法,如下:

package com.cjian.rpc.provider;

/**
 * @Author: cjian
 * @Date: 2023/6/21 10:24
 * @Des:
 */
public interface HelloService {
    String say(String name);
}

实现类如下:

package com.cjian.rpc.provider;

/**
 * @Author: cjian
 * @Date: 2023/6/21 10:25
 * @Des:
 */
public class HelloServiceImpl implements HelloService {
    @Override
    public String say(String name) {
        return "您好, " + name;
    }
}

现在另一个服务需要调用say方法,如何使用rpc思想实现呢?

模拟注册中心:

package com.cjian.rpc.registercenter;

import java.io.IOException;

/**
 * @Author: cjian
 * @Date: 2023/6/21 10:25
 * @Des:
 */
public interface Server {
    void stop();

    void start() throws IOException;

    void register(Class serviceInterface, Class impl);

    boolean isRunning();

    int getPort();
}
package com.cjian.rpc.registercenter;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Author: cjian
 * @Date: 2023/6/21 10:26
 * @Des:
 */
public class ServiceCenter implements Server {
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static final HashMap<String, Class> serviceRegistryCenter = new HashMap<String, Class>();

    private static boolean isRunning = false;

    private static int port;

    public ServiceCenter(int port) {
        this.port = port;
    }

    public void stop() {
        isRunning = false;
        executor.shutdown();
    }

    public void start() throws IOException {
        ServerSocket server = new ServerSocket();
        server.bind(new InetSocketAddress(port));
        System.out.println("start server");
        try {
            while (true) {
                // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行
                executor.execute(new ServiceTask(server.accept()));
            }
        } finally {
            server.close();
        }
    }

    public void register(Class serviceInterface, Class impl) {
        serviceRegistryCenter.put(serviceInterface.getName(), impl);
    }

    public boolean isRunning() {
        return isRunning;
    }

    public int getPort() {
        return port;
    }

    private static class ServiceTask implements Runnable {
        Socket client = null;

        public ServiceTask(Socket client) {
            this.client = client;
        }

        public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
            try {
                // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
                input = new ObjectInputStream(client.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class serviceClass = serviceRegistryCenter.get(serviceName);
                if (serviceClass == null) {
                    throw new ClassNotFoundException(serviceName + " not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);

                // 3.将执行结果反序列化,通过socket发送给客户端
                output = new ObjectOutputStream(client.getOutputStream());
                output.writeObject(result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (output != null) {
                    try {
                        output.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (input != null) {
                    try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (client != null) {
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

客户端:

package com.cjian.rpc.client;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * @Author: cjian
 * @Date: 2023/6/21 10:32
 * @Des:
 */
public class RPCClient<T> {
    public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
        // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
                new InvocationHandler() {
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
                        try {
                            // 2.创建Socket客户端,根据指定地址连接远程服务提供者
                            socket = new Socket();
                            socket.connect(addr);

                            // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);

                            // 4.同步阻塞等待服务器返回应答,获取应答后返回
                            input = new ObjectInputStream(socket.getInputStream());
                            return input.readObject();
                        } finally {
                            if (socket != null) socket.close();
                            if (output != null) output.close();
                            if (input != null) input.close();
                        }
                    }
                });
    }
}

测试:

package com.cjian.rpc;

import com.cjian.rpc.client.RPCClient;
import com.cjian.rpc.provider.HelloService;
import com.cjian.rpc.provider.HelloServiceImpl;
import com.cjian.rpc.registercenter.Server;
import com.cjian.rpc.registercenter.ServiceCenter;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
 * @Author: cjian
 * @Date: 2023/6/21 10:33
 * @Des:
 */
public class Test {
    public static void main(String[] args) {
        // 模拟注册中心
        new Thread(() -> {
            try {
                Server serviceServer = new ServiceCenter(8088);
                // 将服务注册到注册中心
                serviceServer.register(HelloService.class, HelloServiceImpl.class);
                // 开启注册中心
                serviceServer.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
        // 客户端远程调用
        HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));
        System.out.println(service.say("test"));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1423351.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【2024程序员必看】鸿蒙应用开发行业分析

鸿蒙操作系统沉浸四年&#xff0c;这次终于迎来了破局的机会&#xff0c;自从2023年华为秋季发布会上宣布鸿蒙 Next操作系统不在兼容Android后&#xff0c;就有不少大厂开始陆续与华为达成了鸿蒙原生应用的开发合作&#xff0c;据1月18日华为官方宣布110多天的产业合力“突进”…

log4j2 无垃圾稳态日志 Garbage-free Steady State Logging

无垃圾稳态日志 垃圾收集暂停是导致延迟峰值的常见原因&#xff0c;对于许多系统来说&#xff0c;需要花费大量精力来控制这些暂停。 许多日志库&#xff0c;包括以前版本的Log4j&#xff0c;在稳态日志记录期间分配临时对象&#xff0c;如日志事件对象、字符串、char数组、字…

RocksDB是如何实现存算分离的

核心参考文献&#xff1a; Dong, S., P, S. S., Pan, S., Ananthabhotla, A., Ekambaram, D., Sharma, A., Dayal, S., Parikh, N. V., Jin, Y., Kim, A., Patil, S., Zhuang, J., Dunster, S., Mahajan, A., Chelluri, A., Datye, C., Santana, L. V., Garg, N., & Gawde,…

基于YOLOv7算法的高精度实时安全帽和背心目标检测系统(PyTorch+Pyside6+YOLOv7)

摘要&#xff1a;基于YOLOv7算法的高精度实时安全帽和背心目标检测系统可用于日常生活中检测与定位安全帽和安全背心&#xff0c;此系统可完成对输入图片、视频、文件夹以及摄像头方式的目标检测与识别&#xff0c;同时本系统还支持检测结果可视化与导出。本系统采用YOLOv7目标…

B样条基函数

​定义&#xff1a;令U{u0,u1,…,um}是一个单调不减的实数序列&#xff0c;即ui≤ui1&#xff0c;i0&#xff0c;1&#xff0c;…&#xff0c;m-1。其中&#xff0c;ui称为节点&#xff0c;U称为节点矢量&#xff0c;用Ni,p(u)表示第i个p次&#xff08;p1阶&#xff09;B样条基…

短视频界的变革者:上海 AI lab 发布 Vlogger,几句话生成分钟级视频

现如今&#xff0c;vlog 已经成为我们日常生活的重要组成部分。无论是看视频学习休闲、记录珍贵瞬间还是分享生活见闻&#xff0c;视频已经成为人们表达创意和观点的独特媒介。 然而&#xff0c;与几秒钟的短视频不同&#xff0c;要创作出引人入胜、生动有趣的长视频&#xff…

十分钟学会用springboot制作微信小程序富文本编辑器

1.1 富文本模型设计 在构建富文本编辑器系统时&#xff0c;首先需要设计一个合适的富文本模型。 CREATE TABLE IF NOT EXISTS rich_texts (id INT PRIMARY KEY AUTO_INCREMENT,title VARCHAR(255),content TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );这个表包括…

一键转换MOV至MP3:轻松删除原视频,释放存储空间!

你是否曾经有一个MOV格式的视频文件&#xff0c;想要提取其中的音频却苦于没有合适的工具&#xff1f;现在&#xff0c;有了我们的全新视频剪辑工具&#xff0c;这个烦恼全部消失&#xff01;我们为你提供一键式解决方案&#xff0c;将MOV视频文件快速转换为MP3音频格式。 首先…

PyNest 一个可以搭建微服务的 Python 包

PyNest 在构建 Python API 和微服务方面崭露头角&#xff0c;解决了 FastAPI 中存在的关键问题&#xff0c;因此成为卓越的框架。凭借其模块化的架构和先进的特性&#xff0c;PyNest 在 2024 年及以后有望成为 Python 开发者的首选选择。 随着 Python 生态系统的不断成熟&…

YAYI-UIE: 一个用于通用信息提取的聊天增强的指令微调框架

1、写作动机&#xff1a; 最近的研究提出了基于大型语言模型的方法&#xff0c;以统一地建模不同的信息提取任务。然而&#xff0c;这些现有方法在处理英语以外的中文语言的信息提取能力方面存在不足。 2、主要贡献&#xff1a; 提出了YAYI-UIE&#xff0c;一个端到端的聊天…

Prometheus+grafana配置监控系统

使用docker compose安装 方便拓展, 配置信息都放在在 /docker/prometheus 目录下 1.目录结构如下 . ├── conf │ └── prometheus.yml ├── grafana_data ├── prometheus_data └── prometheus_grafana.yaml2.创建目录文件 mkdir /docker/prometheus &&am…

Java面试题之 IO(四)

Java面试题之 IO&#xff08;四&#xff09; 文章目录 Java面试题之 IO&#xff08;四&#xff09;随机访问流 文章来自Java Guide 用于学习如有侵权&#xff0c;立即删除 随机访问流 这里要介绍的随机访问流指的是支持随意跳转到文件的任意位置进行读写的 RandomAccessFile 。…

C# Onnx yolov8 仪表指针检测

目录 效果 模型信息 项目 代码 训练数据 下载 C# Onnx yolov8 仪表指针检测 效果 模型信息 Model Properties ------------------------- date&#xff1a;2024-01-31T11:19:38.828556 author&#xff1a;Ultralytics task&#xff1a;detect license&#xff1a;AGPL-…

C++ 哈希 开放定址法

哈希算法 哈希&#xff0c;是一种算法思想吗&#xff0c;它的核心是映射&#xff0c;哈希方法中使用的转换函数称为哈希(散列)函数&#xff0c;构造出来的结构称为哈希表(Hash Table)(或者称散列表) 在STL 中&#xff0c;提供了两个使用哈希底层实现的容器 unordered_set 和 …

西瓜书读书笔记整理(十二) —— 第十二章 计算学习理论(下)

第十二章 计算学习理论&#xff08;下&#xff09; 12.4 VC 维&#xff08;Vapnik-Chervonenkis dimension&#xff09;12.4.1 什么是 VC 维12.4.2 增长函数&#xff08;growth function&#xff09;、对分&#xff08;dichotomy&#xff09;和打散&#xff08;shattering&…

Python算法题集_合并区间

本文为Python算法题集之一的代码示例 题目56&#xff1a;合并区间 说明&#xff1a;以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需…

CANoe实际项目中文件夹的规划

本人&#xff0c;之前设计了一个CANoe工程&#xff0c;由于工程设计之初没有设计好文档的归纳分类&#xff0c;导致文件查找起来非常费劲。 为了避免以后出现文件混乱&#xff0c;不可查找的问题&#xff0c;故特此归纳说明。 建立工程时&#xff1a; 第1步就应该设计好文档…

品牌定位传播之道:公关、广告与定位原则的结合

​在当今商业环境中&#xff0c;品牌传播的重要性日益凸显。一个成功的品牌传播策略不仅能提升品牌知名度和美誉度&#xff0c;还能在消费者心智中建立稳固的地位。本文将深入探讨公关、广告和定位原则在品牌传播中的作用&#xff0c;以及迅腾文化如何助力品牌传播价值。 一、…

miniReact<一>

一、工程化配置 1.1 目录结构 1.1.1 Multi-repo VS Mono-repo Multi-repo 每个库有自己独立的仓库&#xff0c;逻辑清晰&#xff0c;协同管理复杂 Mono-repo 很方便管理不同独立的库的生命周期&#xff0c;会有更高的操作复杂度 项目有很多包&#xff0c;同时管理多个不同的…

iOS开发Xcode中的ld64和-ld_classic是什么意思

在iOS应用程序开发中&#xff0c;Xcode是一款广泛使用的集成开发环境&#xff08;IDE&#xff09;&#xff0c;而链接器是构建应用程序的关键组成部分之一。在Xcode中&#xff0c;我们常常会遇到两个重要的概念&#xff1a;ld64和-ld_classic。它们分别代表了默认链接器和经典链…