聊聊分布式架构——RPC通信原理

news2025/1/19 20:30:39

目录

RPC通信的基本原理

RPC结构

手撸简陋版RPC

知识点梳理

1.Socket套接字通信机制

2.通信过程的序列化与反序列化

3.动态代理

4.反射

思维流程梳理

码起来

服务端时序图

服务端—Api与Provider模块

客户端时序图


RPC通信的基本原理

RPC(Remote Procedure Call)是一种远程过程调用协议,用于在分布式系统中进行远程通信,允许一个计算机程序调用另一个地址空间(通常是在不同的机器上)的函数或过程,就像调用本地函数一样。下面是RPC通信的基本原理:

  1. 客户端调用:远程客户端(调用方)希望调用远程服务器(提供方)上的一个或多个远程过程(函数)。客户端在本地创建一个请求,并指定要调用的远程过程的名称以及传递给该过程的参数。

  2. 参数序列化:在将请求发送到远程服务器之前,客户端需要将参数序列化为字节流或其他适合传输的格式。序列化是将数据转换为可以在网络上传输的形式的过程。

  3. 网络传输:客户端通过网络将请求发送到远程服务器。通常,这涉及到将请求数据打包成网络消息,然后通过网络协议(如HTTP、TCP/IP)将消息发送到服务器的地址。

  4. 服务器接收:远程服务器接收到客户端的请求消息,通常通过网络协议(如HTTP服务器、Socket服务器)监听特定的端口。

  5. 参数反序列化:服务器从请求消息中提取参数数据,并将其反序列化为本地数据结构,以便将其传递给远程过程。

  6. 远程过程调用:服务器调用相应的远程过程,将参数传递给该过程并执行相应的操作。远程过程可以位于服务器的本地代码中或远程服务器上的远程服务中。

  7. 结果序列化:远程过程执行完毕后,服务器将结果序列化为字节流或其他适合传输的格式。

  8. 结果传输:服务器通过网络将结果数据打包成响应消息,并将其发送回客户端。

  9. 客户端接收:客户端接收到服务器的响应消息。

  10. 结果反序列化:客户端从响应消息中提取结果数据,并将其反序列化为本地数据结构。

  11. 客户端处理:客户端可以根据远程过程的执行结果采取相应的行动,可能是继续执行本地代码或返回结果给调用方。

  12. 通信完成:一次RPC调用完成后,客户端和服务器之间的通信过程结束。

RPC结构

  1. 客户端模块代理所有远程方法的调用

  2. 将目标服务、目标方法、调用目标方法的参数等必要信息序列化

  3. 序列化之后的数据包进一步压缩,压缩后的数据包通过网络通信传输到目标服务节点

  4. 服务节点将接受到的数据包进行解压

  5. 解压后的数据包反序列化成目标服务、目标方法、目标方法的调用参数

  6. 通过服务端代理调用目标方法获取结果,结果同样需要序列化、压缩然后回传给客户端

手撸简陋版RPC

总觉得文字描述太干了,有点咽不下去,还是手撸下试试吧。毕竟艾瑞莉娅的奶奶总说:

纸上得来终觉浅,绝知此事要躬行。

知识点梳理
1.Socket套接字通信机制

在Java中,Socket是用于网络通信的基础类之一,它提供了一种机制,通过该机制,计算机程序可以在网络上建立连接、发送数据、接收数据和关闭连接。

  • 服务器套接字(ServerSocket)

    • 服务器套接字用于在服务器端监听并接受客户端的连接请求。

    • 使用以下步骤创建和使用服务器套接字:

      • 创建ServerSocket对象,并绑定到一个特定的端口号。

      • 使用ServerSocket对象的accept()方法来等待客户端的连接请求,并接受连接。

      • 一旦接受连接,可以创建新的Socket对象来处理客户端的通信。

      • 完成通信后,关闭SocketServerSocket连接。

    ServerSocket serverSocket = new ServerSocket(port_number);
    Socket clientSocket = serverSocket.accept(); // 等待连接
    InputStream inputStream = clientSocket.getInputStream();
    OutputStream outputStream = clientSocket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    clientSocket.close();
    serverSocket.close();
    ​
  • 客户端套接字(Socket)

    • 客户端套接字用于连接到远程服务器,发送请求并接收响应。

    • 使用以下步骤创建和使用客户端套接字:

      • 创建Socket对象,指定远程服务器的主机名或IP地址以及端口号。

      • 使用Socket对象的输入流和输出流来进行数据的读取和写入。

      • 完成通信后,关闭Socket连接。

    Socket socket = new Socket("server_hostname", port_number);
    InputStream inputStream = socket.getInputStream();
    OutputStream outputStream = socket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    socket.close();
2.通信过程的序列化与反序列化

在Java中通过 JDK 提供了 Java 对象的序列化方式实现对象序列化传输,主要通过输出流java.io.ObjectOutputStream和输入流java.io.ObjectInputStream来实现;

java.io.ObjectOutputStream:表示对象输出流 , 它的 writeObject(Object obj)方法可以对参数指定的 obj 对象进行序列化,把得到的字节序列写到一个目标输出流中;

java.io.ObjectInputStream:表示对象输入流 ,它的 readObject()方法源输入流中读取字节序列,再把它们反序列化成为一个对象,并将其返回;

3.动态代理

动态代理是在运行时动态生成代理类的方式。Java提供了java.lang.reflect.Proxy类和java.lang.reflect.InvocationHandler接口来实现动态代理。JDK中提供了基于接口动态代理的方法:

        // 创建代理对象
         MyInterface proxyObject = (MyInterface) Proxy.newProxyInstance(
                MyInterface.class.getClassLoader(),
                new Class[]{MyInterface.class},
                new MyInvocationHandler(realObject));

使用动态代理的目的:

  1. 透明远程调用: 动态代理可以隐藏底层的远程调用细节,使得远程过程调用看起来像本地方法调用一样简单。客户端无需关心网络通信、数据序列化和反序列化等细节,因为这些都由代理对象处理。

  2. 减少重复性代码: 使用动态代理可以减少编写和维护远程调用代码的工作量。代理类负责处理通用的远程调用逻辑,开发者只需关注具体的业务逻辑。

  3. 集中管理远程调用逻辑: 动态代理将远程调用逻辑集中在一个地方,这样可以更容易地管理和维护,例如添加统一的错误处理、日志记录或性能监控等功能。

4.反射

在RPC(Remote Procedure Call,远程过程调用)中使用反射的主要目的是实现透明的远程调用,即使在客户端和服务器之间存在远程分离,也可以像调用本地方法一样调用远程服务。反射在RPC中的具体目的和用途如下:

  1. 动态代理生成代理对象: 反射机制允许在运行时生成代理对象,这些代理对象可以代替实际的远程服务对象执行方法调用。这样,客户端代码不需要提前知道要调用的具体远程方法和对象,而可以动态生成代理并执行方法。

  2. 动态识别方法和参数: 反射允许在运行时识别远程方法和方法参数的名称、类型和数量。这对于将方法调用信息打包成请求并传递给远程服务器非常有用,服务器可以根据这些信息正确地解析请求并调用相应的方法。

  3. 动态序列化和反序列化: 反射可以用于动态地序列化请求和响应数据。在RPC中,请求和响应数据通常需要以某种格式进行序列化和反序列化,以便在网络上进行传输。反射可以帮助动态识别数据类型,将数据转换为适当的格式,并在服务器端进行反序列化。

思维流程梳理

码起来

假设需求是客户端想要访问服务端(ip:prot/helloService/sayHello)上的helloService调用sayHello()。先定义服务端的实现。

服务端时序图

服务端—Api与Provider模块

服务端作为服务提供者,自然需要具备常规的业务模块:Api与Provider模块

Api模块定义简单的HelloService接口,包含两个方法sayHello(String content)和saveUSer(User user)

public interface IHelloService {
    String sayHello(String content);
    String saveUSer(User user);
}

定义一个简单对象User类

public class User {
    private String name;
    private int age;
​
    // getter和setter
    public String getName() {
        return name;
    }
​
    public void setName(String name) {
        this.name = name;
    }
​
    public int getAge() {
        return age;
    }
​
    public void setAge(int age) {
        this.age = age;
    }
​
    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

Provider模块定义HelloService接口的实现类HelloServiceImpl(rpc-server-provider模块的pom中需要添加rpc-server-api模块的依赖)

public class HelloServiceImpl implements IHelloService {
    @Override
    public String sayHello(String content) {
        System.out.println("request in sayHello:" + content);
        return "Say hello:" + content;
    }
​
    @Override
    public String saveUSer(User user) {
        System.out.println("request in saveUser:" + user);
        return "Save user success";
    }
}

定义RpcServerProxy通过代理的方式使用Socket对外暴露服务接口

public class RpcServerProxy {
    private ExecutorService executorService = Executors.newCachedThreadPool();
​
    public void publisher(Object service, int port) {
        ServerSocket serverSocket = null;
        try {
          serverSocket = new ServerSocket(port);
          while (true) {
              Socket socket = serverSocket.accept(); // 使用accept()等待客户端连接,接收请求
              executorService.execute(new ProcessorHandler(socket, service)); // 每一个socket交给一个processorHandler处理
          }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源,jdk1.7后提供了try-with可以自动关闭
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

需要定义一个线程ProcessorHandler对每次的socket请求进行处理

public class ProcessorHandler implements Runnable{
​
    private Socket socket;
    private Object service;
​
    public ProcessorHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }
​
​
    @Override
    public void run() {
        // 使用ObjectOutputStream和ObjectInputStream配合socket的输入流输出流进行序列化和反序列化
        ObjectOutputStream objectOutputStream = null;
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());
​
            // 客户端传过来的信息:请求哪个类,哪个方法,方法的参数  ——>  封装为RpcRequest类
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); // 拿到客户端通信传递的请求类
            Object result = invoke(rpcRequest); // 反射调用本地服务
​
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush(); // 手动将缓冲区中的数据强制刷新到输出流中,以确保数据被立即写入底层的输出流。
​
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            // 关闭关联资源,jdk1.7后提供了try-with可以自动关闭
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
​
    private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        // 反射调用
        Object[] args = rpcRequest.getParameters(); // 获取客户端传递的RpcRequest中的参数
        Class<?>[] types = new Class[args.length]; // 获得每个参数的类型
        for (int i = 0; i < args.length; i++) {
            types[i] = args.getClass();
        }
        Class clazz = Class.forName(rpcRequest.getClassName()); // 根据请求的类名反射加载类
        Method method = clazz.getMethod(rpcRequest.getMethodName(), types); // 获取类中的方法
        Object result = method.invoke(service, args); // 进行反射调用方法
        return result;
    }
}

定义一个通用的RpcRequest类参与通信过程中的请求处理,这里是需要序列化的

public class RpcRequest implements Serializable {
    private String className;
    private String methodName;
    private Object[] parameters;
    
    // getter and setter
    public String getClassName() {
        return className;
    }
​
    public void setClassName(String className) {
        this.className = className;
    }
​
    public String getMethodName() {
        return methodName;
    }
​
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
​
    public Object[] getParameters() {
        return parameters;
    }
​
    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}

Provider的app中发布下服务,运行没有报错,服务端OK

public class App 
{
    public static void main( String[] args )
    {
        IHelloService helloService = new HelloServiceImpl();
        RpcServerProxy rpcServerProxy = new RpcServerProxy();
        rpcServerProxy.publisher(helloService, 8080); // 把服务发布出去
    }
}

客户端时序图

定义动态代理RpcClientProxy,JDK的接口代理方式就是一句话:

public class RpcClientProxy {
    public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {
       return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
    }
}

定义被代理接口RemoteInvocationHandler

public class RemoteInvocationHandler implements InvocationHandler {
    private String host;
    private int port;
​
    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }
​
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 客户端请求会进入这里进行包装
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameters(args);
        // 远程通信交给RpcNetTransport
        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        Object result = rpcNetTransport.send(rpcRequest);
        return result;
    }
}

定义通信传输类RpcNetTransport

public class RpcNetTransport {
    private String host;
    private int port;
​
    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }
​
    public Object send(RpcRequest rpcRequest) {
        Socket socket = null;
        Object result = null;
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            socket = new Socket(host, port); // 建立连接
​
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); // 客户端通信写入
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
​
            objectInputStream = new ObjectInputStream(socket.getInputStream()); // 客户端通信输出
            result = objectInputStream.readObject();
            return result;
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            // 关闭关联资源
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        return result;
    }
}

客户端请求远程服务

public class App 
{
    public static void main( String[] args )
    {
        RpcClientProxy rpcClientProxy = new RpcClientProxy();
        IHelloService helloService = rpcClientProxy.clientProxy(IHelloService.class, "localhost", 8080);
        String result = helloService.sayHello("Elaine");
        System.out.println(result);
    }
}

请求成功,客户端获得返回结果

服务端打印了请求日志

到这里,一个简陋且粗糙的RPC通信版本就好了。

别灰心,要记住艾瑞莉娅的奶奶总说

路漫漫其修选兮,吾将上下而求索。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1064367.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法练习Day13】二叉树的层序遍历翻转二叉树对称二叉树

​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;练题 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录 二叉树的层序遍历翻转二叉树…

安装Ubuntu提示:系统找不到指定的文件。

今天我删除Ubuntu后重新下载&#xff0c;发现报错&#xff0c;错误信息如下&#xff1a; 这是因为系统没有卸载干净而导致的。 解决办法&#xff1a; 第一步&#xff1a; ##查询当前已安装的系统 wsl.exe --list --all 执行结果&#xff1a; 第二步&#xff1a; ##注销当前…

【GSEP202303 C++】1级 长方形面积

[GSEP202303 一级] 长方形面积 题目描述 小明刚刚学习了如何计算长方形面积。他发现&#xff0c;如果一个长方形的长和宽都是整数&#xff0c;它的面积一定也是整数。现在&#xff0c;小明想知道如果给定长方形的面积&#xff0c;有多少种可能的长方形&#xff0c;满足长和宽…

BF算法详解(JAVA语言实现)

目录 BF算法的介绍 图解 JAVA语言实现 BF算法的时间复杂度 BF算法的介绍 BF算法&#xff0c;即暴力(Brute Force)算法&#xff0c;是普通的模式匹配算法&#xff0c;BF算法的思想就是将目标串S的第一个字符与模式串T的第一个字符进行匹配&#xff0c;若相等&#xff0c;则继…

C++设计模式-桥接(Bridge)

目录 C设计模式-桥接&#xff08;Bridge&#xff09; 一、意图 二、适用性 三、结构 四、参与者 五、代码 C设计模式-桥接&#xff08;Bridge&#xff09; 一、意图 将抽象部分与它的实现部分分离&#xff0c;使它们都可以独立地变化。 二、适用性 你不希望在抽象和它…

[笔记] Microsoft Windows网络编程《三》网际协议

文章目录 前言3.1 IPv43.1.1 寻址3.1.1.1 单播3.1.1.2 多播(组播)3.1.1.3 广播 3.1.2 IPv4 管理协议&#xff08;ARP&#xff0c;ICMP&#xff0c;IGMP&#xff09;ARPICMPIGMP 3.1.3 Winsock 中的IPv4 寻址 3.2 IPv63.2.1 寻址3.2.1.1 单播链接——本地地址站点——本地地址&a…

ipa文件怎么把应用上架到苹果ios系统下载的App Store商城

注册为苹果开发者&#xff1a;首先&#xff0c;您需要注册为苹果开发者。前往苹果开发者网站&#xff08;https://developer.apple.com/&#xff09;&#xff0c;点击"Enroll"按钮&#xff0c;并按照相关步骤注册和付费&#xff08;开发者账号需要年度费用&#xff0…

【Java 进阶篇】使用 JDBCTemplate 执行 DQL 语句详解

在前面的文章中&#xff0c;我们已经学习了如何使用 Spring 的 JDBCTemplate 执行 DML&#xff08;Data Manipulation Language&#xff09;操作&#xff0c;包括插入、更新和删除操作。现在&#xff0c;让我们来深入了解如何使用 JDBCTemplate 执行 DQL&#xff08;Data Query…

SpringCloud Alibaba - Seata 四种分布式事务解决方案(TCC、Saga)+ 实践部署(下)

目录 一、Seata 分布式解决方案 1.1、TCC 模式 1.1.1、TCC 模式理论 对比 TCC 和 AT 模式的一致性和隔离性 TC 的工作模型 1.2.2、TCC 模式优缺点 1.2.3、TCC 模式注意事项&#xff1a;空回滚 1.2.4、TCC 模式注意事项&#xff1a;业务悬挂 1.2.5、实现 TCC 模式 案例…

MySQL数据库基础回顾与复习一

MySQL数据库 一、原理定义概念 定义 数据库(Database)是按照数据结构来组织、存储和管理数据的建立在计算机存储设备上的仓库 数据库是长期储存在计算机内、有组织的、可共享的数据集合 分类&#xff1a; &#xff08;1&#xff09;非结构化数据&#xff1a; 数据相对来讲没…

Spring Cloud Gateway网关中各个过滤器的作用与介绍

文章目录 1. Route To Request URL Filter&#xff08;路由过滤器&#xff09;2. Gateway Filter&#xff08;全局过滤器&#xff09;3. Pre Filter&#xff08;前置过滤器&#xff09;4. Post Filter&#xff08;后置过滤器&#xff09;5. Error Filter&#xff08;错误过滤器…

【刷题笔记10.6】LeetCode:汉明距离

LeetCode&#xff1a;汉明距离 一、题目描述 两个整数之间的汉明距离是指这两个数字对应二进制位不同的位置的数目。 给你两个整数x 和 y&#xff0c;计算并返回他们之间的汉明距离。 二、分析及代码实现 对于汉明距离问题我们其实可以将其转换为&#xff1a;计算x 和 y按…

U盘作为启动盘安装苹果OS X操作系统

如何制作 macOS USB启动盘&#xff1f;如何创建可引导的 macOS 安装器&#xff1f;接下来就为大家带来可引导的苹果电脑 macOS 系统U盘启动盘制作教程。U盘是我们在工作和生活中的好帮手&#xff0c;能储存和传递数据文件&#xff0c;重要的是&#xff0c;U盘还可以制作成苹果电…

leetcode - 365周赛

一&#xff0c;2873.有序三元组中的最大值 I ​ 该题的数据范围小&#xff0c;直接遍历&#xff1a; class Solution {public long maximumTripletValue(int[] nums) {int n nums.length;long ans 0;for(int i0; i<n-2; i){for(int ji1; j<n-1; j){for(int kj1; k<…

矩阵键盘的扫描原理与基础应用

基础知识 原理图 首先需要先将 J5 跳帽放到1和2之间。 表示选择的是矩阵键盘。 简化原理图 扫描原理&#xff1a; 以左上角按键为例。 先向 R1 输出低电平&#xff0c;向 R2&#xff0c;R3&#xff0c;R4 输出高电平。 再然后向 C1&#xff0c;C2&#xff0c;C3&#xff…

在Linux中软链接和硬链接的区别是什么?

2023年10月6日&#xff0c;周五晚上 目录 软链接(SymbolicLink):硬链接(HardLink):区别: 软链接(SymbolicLink): 软链接本身只是一个指向其他文件或目录的指针,不占用任何磁盘空间。软链接的修改或删除不会影响原文件。软链接可以指向不同文件系统中的文件。 硬链接(HardLink…

Cookie和Session详解以及结合生成登录效果

目录 引言 1.Cookie中的数据从哪来数据长啥样&#xff1f; 2.Cookie有什么作用&#xff1f; 3.cookie与session的工作关联&#xff1f; 4.Cookie到哪去&#xff1f; 5.Cookie如何存&#xff1f; 6.Session 7.Cookie与Session的关联与区别 8.通过代码理解 8.1 相关代码 8.2…

c++学习之 继承的方式

在C中&#xff0c;继承方式&#xff08;或继承访问权限&#xff09;有三种&#xff1a;public、protected 和 private&#xff0c;它们决定了派生类&#xff08;子类&#xff09;对基类&#xff08;父类&#xff09;成员的访问权限&#xff0c;它们之间的区别如下&#xff1a; …

局部放电发生因素与局部放电试验的重要性

局部放电发生的几个因素&#xff1a;   ①电场过于集中于某点&#xff1b;   ②固体介质有气泡&#xff0c;有害杂质未除净&#xff1b;   ③油中含水、含气、有悬浮微粒&#xff1b;   ④不同的介质组合中&#xff0c;在界面处有严重的电场畸变。   局部放电试验的重…

【小工具-生成合并文件】使用python实现2个excel文件根据主键合并生成csv文件

1 小工具说明 1.1 功能说明 一般来说&#xff0c;我们会先有一个老的文件&#xff0c;这个文件内容是定制好相关列的表格&#xff0c;作为每天的报告。 当下一天来的时候&#xff0c;需要根据新的报表文件和昨天的报表文件做一个合并&#xff0c;合并的时候就会出现有些事新增…