聊聊分布式架构04——RPC通信原理

news2024/11/24 12:01:31

目录

RPC通信的基本原理

RPC结构

手撸简陋版RPC

知识点梳理

1.Socket套接字通信机制

2.通信过程的序列化与反序列化

3.动态代理

4.反射

思维流程梳理

码起来

服务端时序图

服务端—Api与Provider模块

客户端时序图


RPC通信的基本原理

RPC(Remote Procedure Call)是一种远程过程调用协议,用于在分布式系统中进行远程通信,允许一个计算机程序调用另一个地址空间(通常是在不同的机器上)的函数或过程,就像调用本地函数一样。下面是RPC通信的基本原理:

  1. 客户端调用:远程客户端(调用方)希望调用远程服务器(提供方)上的一个或多个远程过程(函数)。客户端在本地创建一个请求,并指定要调用的远程过程的名称以及传递给该过程的参数。

  2. 参数序列化:在将请求发送到远程服务器之前,客户端需要将参数序列化为字节流或其他适合传输的格式。序列化是将数据转换为可以在网络上传输的形式的过程。

  3. 网络传输:客户端通过网络将请求发送到远程服务器。通常,这涉及到将请求数据打包成网络消息,然后通过网络协议(如HTTP、TCP/IP)将消息发送到服务器的地址。

  4. 服务器接收:远程服务器接收到客户端的请求消息,通常通过网络协议(如HTTP服务器、Socket服务器)监听特定的端口。

  5. 参数反序列化:服务器从请求消息中提取参数数据,并将其反序列化为本地数据结构,以便将其传递给远程过程。

  6. 远程过程调用:服务器调用相应的远程过程,将参数传递给该过程并执行相应的操作。远程过程可以位于服务器的本地代码中或远程服务器上的远程服务中。

  7. 结果序列化:远程过程执行完毕后,服务器将结果序列化为字节流或其他适合传输的格式。

  8. 结果传输:服务器通过网络将结果数据打包成响应消息,并将其发送回客户端。

  9. 客户端接收:客户端接收到服务器的响应消息。

  10. 结果反序列化:客户端从响应消息中提取结果数据,并将其反序列化为本地数据结构。

  11. 客户端处理:客户端可以根据远程过程的执行结果采取相应的行动,可能是继续执行本地代码或返回结果给调用方。

  12. 通信完成:一次RPC调用完成后,客户端和服务器之间的通信过程结束。

RPC结构

  1. 客户端模块代理所有远程方法的调用

  2. 将目标服务、目标方法、调用目标方法的参数等必要信息序列化

  3. 序列化之后的数据包进一步压缩,压缩后的数据包通过网络通信传输到目标服务节点

  4. 服务节点将接受到的数据包进行解压

  5. 解压后的数据包反序列化成目标服务、目标方法、目标方法的调用参数

  6. 通过服务端代理调用目标方法获取结果,结果同样需要序列化、压缩然后回传给客户端

手撸简陋版RPC

总觉得文字描述太干了,有点咽不下去,还是手撸下试试吧。毕竟艾瑞莉娅的奶奶总说:

纸上得来终觉浅,绝知此事要躬行。

知识点梳理
1.Socket套接字通信机制

在Java中,Socket是用于网络通信的基础类之一,它提供了一种机制,通过该机制,计算机程序可以在网络上建立连接、发送数据、接收数据和关闭连接。

  • 服务器套接字(ServerSocket)

    • 服务器套接字用于在服务器端监听并接受客户端的连接请求。

    • 使用以下步骤创建和使用服务器套接字:

      • 创建ServerSocket对象,并绑定到一个特定的端口号。

      • 使用ServerSocket对象的accept()方法来等待客户端的连接请求,并接受连接。

      • 一旦接受连接,可以创建新的Socket对象来处理客户端的通信。

      • 完成通信后,关闭SocketServerSocket连接。

    ServerSocket serverSocket = new ServerSocket(port_number);
    Socket clientSocket = serverSocket.accept(); // 等待连接
    InputStream inputStream = clientSocket.getInputStream();
    OutputStream outputStream = clientSocket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    clientSocket.close();
    serverSocket.close();
    ​
  • 客户端套接字(Socket)

    • 客户端套接字用于连接到远程服务器,发送请求并接收响应。

    • 使用以下步骤创建和使用客户端套接字:

      • 创建Socket对象,指定远程服务器的主机名或IP地址以及端口号。

      • 使用Socket对象的输入流和输出流来进行数据的读取和写入。

      • 完成通信后,关闭Socket连接。

    Socket socket = new Socket("server_hostname", port_number);
    InputStream inputStream = socket.getInputStream();
    OutputStream outputStream = socket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    socket.close();
2.通信过程的序列化与反序列化

在Java中通过 JDK 提供了 Java 对象的序列化方式实现对象序列化传输,主要通过输出流java.io.ObjectOutputStream和输入流java.io.ObjectInputStream来实现;

java.io.ObjectOutputStream:表示对象输出流 , 它的 writeObject(Object obj)方法可以对参数指定的 obj 对象进行序列化,把得到的字节序列写到一个目标输出流中;

java.io.ObjectInputStream:表示对象输入流 ,它的 readObject()方法源输入流中读取字节序列,再把它们反序列化成为一个对象,并将其返回;

3.动态代理

动态代理是在运行时动态生成代理类的方式。Java提供了java.lang.reflect.Proxy类和java.lang.reflect.InvocationHandler接口来实现动态代理。JDK中提供了基于接口动态代理的方法:

        // 创建代理对象
         MyInterface proxyObject = (MyInterface) Proxy.newProxyInstance(
                MyInterface.class.getClassLoader(),
                new Class[]{MyInterface.class},
                new MyInvocationHandler(realObject));

使用动态代理的目的:

  1. 透明远程调用: 动态代理可以隐藏底层的远程调用细节,使得远程过程调用看起来像本地方法调用一样简单。客户端无需关心网络通信、数据序列化和反序列化等细节,因为这些都由代理对象处理。

  2. 减少重复性代码: 使用动态代理可以减少编写和维护远程调用代码的工作量。代理类负责处理通用的远程调用逻辑,开发者只需关注具体的业务逻辑。

  3. 集中管理远程调用逻辑: 动态代理将远程调用逻辑集中在一个地方,这样可以更容易地管理和维护,例如添加统一的错误处理、日志记录或性能监控等功能。

4.反射

在RPC(Remote Procedure Call,远程过程调用)中使用反射的主要目的是实现透明的远程调用,即使在客户端和服务器之间存在远程分离,也可以像调用本地方法一样调用远程服务。反射在RPC中的具体目的和用途如下:

  1. 动态代理生成代理对象: 反射机制允许在运行时生成代理对象,这些代理对象可以代替实际的远程服务对象执行方法调用。这样,客户端代码不需要提前知道要调用的具体远程方法和对象,而可以动态生成代理并执行方法。

  2. 动态识别方法和参数: 反射允许在运行时识别远程方法和方法参数的名称、类型和数量。这对于将方法调用信息打包成请求并传递给远程服务器非常有用,服务器可以根据这些信息正确地解析请求并调用相应的方法。

  3. 动态序列化和反序列化: 反射可以用于动态地序列化请求和响应数据。在RPC中,请求和响应数据通常需要以某种格式进行序列化和反序列化,以便在网络上进行传输。反射可以帮助动态识别数据类型,将数据转换为适当的格式,并在服务器端进行反序列化。

思维流程梳理

码起来

假设需求是客户端想要访问服务端(ip:prot/helloService/sayHello)上的helloService调用sayHello()。先定义服务端的实现。

服务端时序图

服务端—Api与Provider模块

服务端作为服务提供者,自然需要具备常规的业务模块:Api与Provider模块

Api模块定义简单的HelloService接口,包含两个方法sayHello(String content)和saveUSer(User user)

public interface IHelloService {
    String sayHello(String content);
    String saveUSer(User user);
}

定义一个简单对象User类

public class User {
    private String name;
    private int age;
​
    // getter和setter
    public String getName() {
        return name;
    }
​
    public void setName(String name) {
        this.name = name;
    }
​
    public int getAge() {
        return age;
    }
​
    public void setAge(int age) {
        this.age = age;
    }
​
    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

Provider模块定义HelloService接口的实现类HelloServiceImpl(rpc-server-provider模块的pom中需要添加rpc-server-api模块的依赖)

public class HelloServiceImpl implements IHelloService {
    @Override
    public String sayHello(String content) {
        System.out.println("request in sayHello:" + content);
        return "Say hello:" + content;
    }
​
    @Override
    public String saveUSer(User user) {
        System.out.println("request in saveUser:" + user);
        return "Save user success";
    }
}

定义RpcServerProxy通过代理的方式使用Socket对外暴露服务接口

public class RpcServerProxy {
    private ExecutorService executorService = Executors.newCachedThreadPool();
​
    public void publisher(Object service, int port) {
        ServerSocket serverSocket = null;
        try {
          serverSocket = new ServerSocket(port);
          while (true) {
              Socket socket = serverSocket.accept(); // 使用accept()等待客户端连接,接收请求
              executorService.execute(new ProcessorHandler(socket, service)); // 每一个socket交给一个processorHandler处理
          }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源,jdk1.7后提供了try-with可以自动关闭
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

需要定义一个线程ProcessorHandler对每次的socket请求进行处理

public class ProcessorHandler implements Runnable{
​
    private Socket socket;
    private Object service;
​
    public ProcessorHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }
​
​
    @Override
    public void run() {
        // 使用ObjectOutputStream和ObjectInputStream配合socket的输入流输出流进行序列化和反序列化
        ObjectOutputStream objectOutputStream = null;
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());
​
            // 客户端传过来的信息:请求哪个类,哪个方法,方法的参数  ——>  封装为RpcRequest类
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); // 拿到客户端通信传递的请求类
            Object result = invoke(rpcRequest); // 反射调用本地服务
​
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush(); // 手动将缓冲区中的数据强制刷新到输出流中,以确保数据被立即写入底层的输出流。
​
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            // 关闭关联资源,jdk1.7后提供了try-with可以自动关闭
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
​
    private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        // 反射调用
        Object[] args = rpcRequest.getParameters(); // 获取客户端传递的RpcRequest中的参数
        Class<?>[] types = new Class[args.length]; // 获得每个参数的类型
        for (int i = 0; i < args.length; i++) {
            types[i] = args.getClass();
        }
        Class clazz = Class.forName(rpcRequest.getClassName()); // 根据请求的类名反射加载类
        Method method = clazz.getMethod(rpcRequest.getMethodName(), types); // 获取类中的方法
        Object result = method.invoke(service, args); // 进行反射调用方法
        return result;
    }
}

定义一个通用的RpcRequest类参与通信过程中的请求处理,这里是需要序列化的

public class RpcRequest implements Serializable {
    private String className;
    private String methodName;
    private Object[] parameters;
    
    // getter and setter
    public String getClassName() {
        return className;
    }
​
    public void setClassName(String className) {
        this.className = className;
    }
​
    public String getMethodName() {
        return methodName;
    }
​
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
​
    public Object[] getParameters() {
        return parameters;
    }
​
    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}

Provider的app中发布下服务,运行没有报错,服务端OK

public class App 
{
    public static void main( String[] args )
    {
        IHelloService helloService = new HelloServiceImpl();
        RpcServerProxy rpcServerProxy = new RpcServerProxy();
        rpcServerProxy.publisher(helloService, 8080); // 把服务发布出去
    }
}

客户端时序图

定义动态代理RpcClientProxy,JDK的接口代理方式就是一句话:

public class RpcClientProxy {
    public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {
       return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
    }
}

定义被代理接口RemoteInvocationHandler

public class RemoteInvocationHandler implements InvocationHandler {
    private String host;
    private int port;
​
    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }
​
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 客户端请求会进入这里进行包装
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameters(args);
        // 远程通信交给RpcNetTransport
        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        Object result = rpcNetTransport.send(rpcRequest);
        return result;
    }
}

定义通信传输类RpcNetTransport

public class RpcNetTransport {
    private String host;
    private int port;
​
    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }
​
    public Object send(RpcRequest rpcRequest) {
        Socket socket = null;
        Object result = null;
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            socket = new Socket(host, port); // 建立连接
​
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); // 客户端通信写入
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
​
            objectInputStream = new ObjectInputStream(socket.getInputStream()); // 客户端通信输出
            result = objectInputStream.readObject();
            return result;
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            // 关闭关联资源
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        return result;
    }
}

客户端请求远程服务

public class App 
{
    public static void main( String[] args )
    {
        RpcClientProxy rpcClientProxy = new RpcClientProxy();
        IHelloService helloService = rpcClientProxy.clientProxy(IHelloService.class, "localhost", 8080);
        String result = helloService.sayHello("Elaine");
        System.out.println(result);
    }
}

请求成功,客户端获得返回结果

服务端打印了请求日志

到这里,一个简陋且粗糙的RPC通信版本就好了。

别灰心,要记住艾瑞莉娅的奶奶总说

路漫漫其修选兮,吾将上下而求索。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1069206.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法通过村第十二关-字符串|黄金笔记|冲刺难题

文章目录 前言最长公共前缀纵向比较横向比较 字符串压缩问题表示数值的字符串总结 前言 提示&#xff1a;我有时候在想&#xff0c;我是真的不太需要其他人&#xff0c;还是因为跟他们在一起时没法自己&#xff0c;所以才保持距离。我们的交谈就像是平行而毫无交集的自言自语。…

高压功率放大器是什么东西

高压功率放大器是一种电子设备&#xff0c;用于将低功率信号放大到高功率水平。它通常由多个功率放大器组成&#xff0c;可以处理来自各种信号源的输入信号&#xff0c;并输出具有更大功率的信号。 高压功率放大器广泛应用于许多领域&#xff0c;包括无线通信、广播、雷达、医学…

Pytorch-学习记录-1-Tensor

1. 张量 (Tensor): 数学中指的是多维数组&#xff1b; torch.Tensor data: 被封装的 Tensor dtype: 张量的数据类型 shape: 张量的形状 device: 张量所在的设备&#xff0c;GPU/CPU requires_grad: 指示是否需要计算梯度 grad: data 的梯度 grad_fn: 创建 Tensor 的 Functio…

要多搞笑才能上B站热门?

自从B站开放竖屏模式之后&#xff0c;越来越多站外的优秀创作者加入B站。这其中搞笑UP主的数量大大增加&#xff0c;有很多竖屏UP主、短视频UP主在B站发光发热。 飞瓜数据&#xff08;B站版&#xff09;热门视频显示&#xff0c;近期热度最高的是来自UP主七颗猩猩QKXX发布的竖…

three.js点击模型实现模型边缘高亮选中效果

three.jsreact实现点击模型实现高亮选中效果 1、创建一个场景 let scene, camera, renderer, controls; let stats null; // 检测动画运行时的帧数 let clock new THREE.Clock(); // getDelta()方法获得两帧的时间间隔 let FPS 30; let renderT 1 / FPS; let timeS 0;con…

机器学习基础-数据分析:房价预测

mac设置中文字体 #要设置下面两行才能显示中文 Arial Unicode MS 为字体 plt.rcParams[font.sans-serif] [Arial Unicode MS] #设置图片大小 plt.figure(figsize(20, 11), dpi200)pie官方文档 总体代码 python import pandas as pd import numpy as np import matplotlib.…

Qt/C++原创推流工具/支持多种流媒体服务/ZLMediaKit/srs/mediamtx等

一、前言 1.1 功能特点 支持各种本地视频文件和网络视频文件。支持各种网络视频流&#xff0c;网络摄像头&#xff0c;协议包括rtsp、rtmp、http。支持将本地摄像头设备推流&#xff0c;可指定分辨率和帧率等。支持将本地桌面推流&#xff0c;可指定屏幕区域和帧率等。自动启…

【大数据 | 综合实践】大数据技术基础综合项目 - 基于GitHub API的数据采集与分析平台

&#x1f935;‍♂️ 个人主页: AI_magician &#x1f4e1;主页地址&#xff1a; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 &#x1f468;‍&#x1f4bb;景愿&#xff1a;旨在于能和更多的热爱计算机的伙伴一起成长&#xff01;&#xff01;&…

在供应链管理中,如何做好库存分析?库存分析有哪些监控指标?

在供应链管理中&#xff0c;库存分析是其重要的一环。库存分析的方法繁杂且广泛&#xff0c;选择正确的方法才能更好的进行库存分析&#xff0c;下面就为大家盘点一些常用的库存分析方法和监控指标&#xff0c;全程干货&#xff0c;建议收藏&#xff01; 01 如何进行库存分析&…

【MySQL】基本查询(三)聚合函数+group by

文章目录 一. 聚合函数二. group by子句结束语 建立如下表 //创建表结构 mysql> create table exam_result(-> id int unsigned primary key auto_increment,-> name varchar(20) not null comment 同学姓名,-> chinese float default 0.0 comment 语文成绩,->…

08_selenium实战——学习平台公开数据批量获取

0、:前言 该实战任务是对某视频平台中’标题’、 ‘点赞数量’、 ‘投币数量’、‘收藏数量’、‘播放次数’、以及前五条评论进行爬取。要求1:可以控制爬取视频的主题(爬取主题搜索之后的内容)要求2:可以控制爬取视频的数量要求3:对于评论数不足5条的用0填充评论内容爬虫…

vue启动项目,npm run dev出现error:0308010C:digital envelope routines::unsupported

运行vue项目&#xff0c;npm run dev的时候出现不支持错误error:0308010C:digital envelope routines::unsupported。 在网上找了很多&#xff0c;大部分都是因为版本问题&#xff0c;修改环境之类的&#xff0c;原因是对的但是大多还是没能解决。经过摸索终于解决了。 方法如…

第九课 排序

文章目录 第九课 排序排序算法lc912.排序数组--中等题目描述代码展示 lc1122.数组的相对排序--简单题目描述代码展示 lc56.合并区间--中等题目描述代码展示 lc215.数组中的第k个最大元素--中等题目描述代码展示 acwing104.货仓选址--简单题目描述代码展示 lc493.翻转树--困难题…

保护 Web 服务器安全性

面向公众的系统&#xff08;如 Web 服务器&#xff09;经常成为攻击者的目标&#xff0c;如果这些业务关键资源没有得到适当的保护&#xff0c;可能会导致安全攻击&#xff0c;从而导致巨大的财务后果&#xff0c;并在客户中失去良好的声誉。 什么是网络服务器审核 当有人想要…

无线振弦采集仪在岩土工程中如何远程监测和远程维护

无线振弦采集仪在岩土工程中如何远程监测和远程维护 随着岩土工程施工的不断发展和科技水平的不断提高&#xff0c;远程监测和远程维护设备也得到了广泛关注和应用。无线振弦采集仪是一种广泛应用于岩土工程中的测量仪器&#xff0c;在现代化施工中扮演着重要的角色。本文将就…

ChromeDriver驱动最新版下载

下载地址ChromeDriver - WebDriver for Chrome - Downloads selenium.common.exceptions.SessionNotCreatedException: Message: session not created: This version of ChromeDriver only supports Chrome version 113 Current browser version is 117.0.5938.150 with binar…

2、模块传参和依赖

一、模块传参 使用函数 module_param(name,type,perm); 将指定的全局变量设置成模块参数 /* name:全局变量名 type&#xff1a;使用符号 实际类型 传参方式bool bool insmod xxx.ko 变量名0 或 1invbool bool insmod xx…

运营商sdwan优缺点及sdwan服务商优势

SD-WAN(软件定义广域网)作为一种重要的网络解决方案&#xff0c;已经受到了广泛的关注和采用。然而&#xff0c; 无论是由传统运营商提供的SD-WAN还是专门的SD-WAN服务提供商&#xff0c;都存在各自的优缺点。 运营商提供的SD-WAN的缺点&#xff1a; 1. 有限的灵活性&#xf…

数据库查询详解

数据库查询操作 前置&#xff1a;首先我们创建一个练习的数据库 /* SQLyog Professional v12.09 (64 bit) MySQL - 5.6.40-log : Database - studentsys ********************************************************************* *//*!40101 SET NAMES utf8 */;/*!40101 SET …

【uniapp】自定义导航栏时,设置安全距离,适配不同机型

1、在pages.json中&#xff0c;给对应的页面设置自定义导航栏样式 {"path": "pages/index/index","style": {"navigationStyle": "custom","navigationBarTextStyle": "white","navigationBarTitl…