RPC(1)------Java BIO + JDK原生序列化 + JDK动态代理实现

news2024/11/16 4:34:23

本文跟着MY-RPC-FRamework的代码,根据自己的理解做的笔记,先理解,再学习。

RPC原理

客户端和服务端都可以访问到通用的接口,但是只有服务端有这个接口的实现类,客户端调用这个接口的方式,是通过网络传输,告诉服务端我要调用这个接口,服务端收到之后找到这个接口的实现类,并且执行,将执行的结果返回给客户端,作为客户端调用接口方法的返回值。

简单实现

1. 通用接口

这个接口就是客户端需要调用的接口,也是服务端需要实现的接口,这里只定义了一个接口 hello

public interface HelloService {
    String hello(HelloObject object);
}
@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
    private Integer id;
    private String message;
}

注意这个HelloObject 对象需要实现Serializable接口,因为它需要在调用过程中从客户端传递给服务端。实现序列化的方式很多,这里使用了JDK自带的序列化方式,后期可以改进。

有了这个公共接口,就可以先在服务端将其实现

public class HelloServiceImpl implements HelloService {
    private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
    @Override
    public String hello(HelloObject object) {
        logger.info("接收到:{}", object.getMessage());
        return "这是掉用的返回值,id=" + object.getId();
    }
}

传输协议

现在,思考客户端需要传输哪些信息给服务端,服务端才能知道调用哪个接口的哪个方法呢?

  1. 接口的名字
  2. 方法的名字
  3. 方法的参数
  4. 每个参数的类型

服务端知道以上四个条件,就可以找到这个方法并且调用了。我们把这四个条件写到一个对象里,到时候传输时传输这个对象就行了

@Data
@Builder
public class RpcRequest implements Serializable {
    /**
     * 待调用接口名称
     */
    private String interfaceName;
    /**
     * 待调用方法名称
     */
    private String methodName;
    /**
     * 调用方法的参数
     */
    private Object[] parameters;
    /**
     * 调用方法的参数类型
     */
    private Class<?>[] paramTypes;
}

服务器调用完这个方法后,需要给客户端返回哪些信息呢?如果调用成功的话,显然需要返回值,如果调用失败了,就需要失败的信息,这里封装成一个RpcResponse对象:

@Data
public class RpcResponse<T> implements Serializable {
    /**
     * 响应状态码
     */
    private Integer statusCode;
    /**
     * 响应状态补充信息
     */
    private String message;
    /**
     * 响应数据
     */
    private T data;
  
    public static <T> RpcResponse<T> success(T data) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setStatusCode(ResponseCode.SUCCESS.getCode());
        response.setData(data);
        return response;
    }
    public static <T> RpcResponse<T> fail(ResponseCode code) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setStatusCode(code.getCode());
        response.setMessage(code.getMessage());
        return response;
    }
}

客户端实现–动态代理

需要注意,前面我们提过,rpc客户端是没有接口的实现的,只有接口HelloService ,没有办法生成实例对象。这时,我们可以通过动态代理的方式生成实例,并且调用方法(代理方法invoke)时生成需要的RpcRequest对象并且发送给服务端。

传递host和port来指明服务端的位置。并且使用getProxy()方法来生成代理对象

public class RpcClientProxy implements InvocationHandler {

    private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class);
    private String host;
    private int port;

    public RpcClientProxy(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        logger.info("调用方法: {}#{}", method.getDeclaringClass().getName(), method.getName());
        RpcRequest rpcRequest = RpcRequest.builder()
                .interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameters(args)
                .paramTypes(method.getParameterTypes())
                .build();
        RpcClient rpcClient = new RpcClient();
        return rpcClient.sendRequest(rpcRequest, host, port);
    }
}

发送的逻辑使用了一个RpcClient对象来实现,这个对象的作用,就是将一个对象发过去,并且接受返回的对象。

public class RpcClient {

    private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);

    public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
        try (Socket socket = new Socket(host, port)) {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
            RpcResponse rpcResponse = (RpcResponse) objectInputStream.readObject();
            if(rpcResponse == null) {
                logger.error("服务调用失败,service:{}", rpcRequest.getInterfaceName());
                throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, " service:" + rpcRequest.getInterfaceName());
            }
            if(rpcResponse.getStatusCode() == null || rpcResponse.getStatusCode() != ResponseCode.SUCCESS.getCode()) {
                logger.error("调用服务失败, service: {}, response:{}", rpcRequest.getInterfaceName(), rpcResponse);
                throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, " service:" + rpcRequest.getInterfaceName());
            }
            return rpcResponse.getData();
        } catch (IOException | ClassNotFoundException e) {
            logger.error("调用时有错误发生:", e);
            throw new RpcException("服务调用失败: ", e);
        }
    }

}

服务端的实现——反射调用

服务端使用一个ServerSocket监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程中处理调用。这里创建线程采用线程池:

public class RpcServer {

    private final ExecutorService threadPool;
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

    public RpcServer() {
        int corePoolSize = 5;
        int maximumPoolSize = 50;
        long keepAliveTime = 60;
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
    }
  
}

RpcServer暂时只能注册一个接口,即对外提供一个接口的调用服务,添加register方法,在注册完一个服务后立刻开始监听

public void register(Object service, int port) {
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            logger.info("服务器正在启动...");
            Socket socket;
            while((socket = serverSocket.accept()) != null) {
                logger.info("客户端连接!Ip为:" + socket.getInetAddress());
                threadPool.execute(new WorkerThread(socket, service));
            }
        } catch (IOException e) {
            logger.error("连接时有错误发生:", e);
        }
    }

WorkerThread实现了Runnable接口,用于接收RpcRequest对象,解析并且调用,生成RpcResponse对象并传输回去

 @Override
    public void run() {
        try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            Object returnObject = method.invoke(service, rpcRequest.getParameters());
            objectOutputStream.writeObject(RpcResponse.success(returnObject));
            objectOutputStream.flush();
        } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            logger.error("调用或发送时有错误发生:", e);
        }
    }

测试

服务端侧,我们已经在上面实现了一个HelloService的实现类HelloServiceImpl的实现类了,我们只需要创建一个RpcServer并且把这个实现类注册进去就行了

public class TestServer {
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        RpcServer rpcServer = new RpcServer();
        rpcServer.register(helloService, 9000);
    }
}`

客户端方面,我们需要通过动态代理,生成代理对象,并且调用,动态代理会自动帮我们向服务端发送请求的

public class TestClient {
    public static void main(String[] args) {
        RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
        HelloService helloService = proxy.getProxy(HelloService.class);
        HelloObject object = new HelloObject(12, "This is a message");
        String res = helloService.hello(object);
        System.out.println(res);
    }
}

在这里插入图片描述

改进,v1.0版本下载之后就是改进的版本

改进点:

  1. 核心就是支持注册多个接口,对应ServiceRegistry接口,使用Map保存已经注册的服务。
  2. RequestHandler:真实调用服务的类,并将结果返回
  3. RequestHandlerThread:和前面的WorkerThread类似。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/360282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

记一次对某假冒征信站点的实战渗透

1. 背景介绍 这次渗透是去年之前的了&#xff0c;原因是当时收到了这个钓鱼短信&#xff0c;这次渗透带有侥幸、偶然性。 后台无脑弱口令 后台无脑文件上传getshell 运气好&#xff0c;直接无脑提权 因为时间问题&#xff0c;本文的记录可能不太完整。 开局之后就是这样的假…

springbatch设置throttle-limit参数不生效

背景描述 当springbatch任务处理缓慢时&#xff0c;就需要使用多线程并行处理任务。 参数throttle-limit用于控制当前任务能够使用的线程数的最大值。 调整throttle-limit为10时&#xff0c;处理线程只有8&#xff0c;再次增大throttle-limit值为20&#xff0c;处理线程依旧为…

make、Makefile项目自动化构建工具

环境&#xff1a;centos7.6&#xff0c;腾讯云服务器Linux文章都放在了专栏&#xff1a;【Linux】欢迎支持订阅&#x1f339;前言自动化构建工具是干什么的呢&#xff1f;主要是为了让我们对指令进行一些设置&#xff0c;就比如说&#xff0c;假如一个项目里有很多个源文件&…

Web3的“陨落”,西下与东升

文/尹宁出品/陀螺研究院极富传奇色彩的Web3似乎正在陨落。去年的Web3&#xff0c;在人声沸腾中讲着下一代互联网摄人心魄的故事&#xff0c;传统资本大刀阔斧般迈入&#xff0c;大厂青年以自由为名掘金&#xff0c;Web3一度成为资本造神新概念。根据Messari 统计,2022 年Web3 V…

如何开发微信小程序呢

也许很多人对小程序&#xff0c;H5程序&#xff0c;Vue&#xff0c;网页程序&#xff0c;PC端程序认识比较模糊&#xff0c;因为这些跨度非常的大&#xff0c;很少人会一次性全部接触&#xff0c;甚至只是听说过&#xff0c;并不了解其中的关系&#xff0c;下面我来厘清他们的关…

传闻腾讯引进Quest 2?我觉得可行性很低

根据36kr最新消息称&#xff0c;腾讯XR团队解散后&#xff0c;确定不碰XR硬件领域&#xff0c;但并未完全放弃XR规划&#xff0c;将转变思路和玩法&#xff0c;业内消息称腾讯计划引进Meta旗下Quest 2 VR一体机。消息称&#xff0c;该计划在2022年11月份XR部门负责人沈黎走后便…

C++ 智能指针的原理:auto_ptr、unique_ptr、shared_ptr、weak_ptr

目录一、理解智能指针1.普通指针的使用二、智能指针1.auto_ptr2.unique_ptr3.shared_ptr&#xff08;1&#xff09;了解shared_ptr&#xff08;2&#xff09;shared_ptr的缺陷4.weak_ptr本文代码在win10的vs2019中通过编译。 一、理解智能指针 1.普通指针的使用 如果程序需要…

2.22.1、死锁的概念

1、死锁的概念 1.1、什么是死锁 我等待你&#xff0c;你等待他&#xff0c;他等待她&#xff0c;她等待我…这世界每个人都爱别人… 我们从资源占有的角度来分析&#xff0c;这段关系为什么看起来那么纠结… 在并发环境下&#xff0c;各进程因竞争资源而造成的一种互相等待对方…

ChatGPT入门案例|张量流商务智能客服

本篇介绍了序列-序列机制和张量流的基本概念,基于中文语料库说明基于循环神经网络的语言翻译的实战应用。 01、序列-序列机制 序列-序列机制概述 序列-序列(Sequence To Sequence,Seq2Seq)是一个编码器-解码器 (Encoder-Decoder Mechanism)结构的神经网络,输入是序列(…

【C#个人错题笔记】

观前提醒 记录一些我不会或者少见的内容&#xff0c;不一定适合所有人 字符串拼接 int a3,b8; Console.WriteLine(ab);//11 Console.WriteLine("ab");//ab Console.WriteLine(a""b);//38 Console.WriteLine("ab"ab);//ab38 Console.WriteLine…

17个优秀WordPress LMS在线教育平台主题

为您的WordPress在线教育平台主题网站选择在线课程主题是您在建立在线教育业务时做出的最重要的决定之一。正确的主题不仅决定了您网站的外观和感觉&#xff0c;还决定了用户体验。这在构建在线课程平台时变得更加重要&#xff0c;因为您的访问者将是您的学生&#xff0c;他们会…

nodejs基于vue疫情期间网课管理系统

随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于疫情网课管理系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了疫情网课管理系统&#xff0c;它彻底改变了过去传统…

内网渗透(四十四)之横向移动篇-DCOM远程执行命令横向移动

系列文章第一章节之基础知识篇 内网渗透(一)之基础知识-内网渗透介绍和概述 内网渗透(二)之基础知识-工作组介绍 内网渗透(三)之基础知识-域环境的介绍和优点 内网渗透(四)之基础知识-搭建域环境 内网渗透(五)之基础知识-Active Directory活动目录介绍和使用 内网渗透(六)之基…

波次分拣系统

一、系统架构&#xff1a; v1.2基站软件管理系统仓库标签v1.4仓库标签二、系统简介&#xff1a; 标签系统主要由标签服务器&#xff0c;基站&#xff0c;电子标签前三部分组成&#xff0c;操作界面借助于京东仓库已有的作业电脑来实现&#xff0c;标签服务器与WMS进行数据对接。…

罗技LogitechFlow技术--惊艳的多电脑切换体验

作者&#xff1a;Eason_LYC 悲观者预言失败&#xff0c;十言九中。 乐观者创造奇迹&#xff0c;一次即可。 一个人的价值&#xff0c;在于他所拥有的。所以可以不学无术&#xff0c;但不能一无所有&#xff01; 技术领域&#xff1a;WEB安全、网络攻防 关注WEB安全、网络攻防。…

mysql 索引原理和使用

一、索引是什么&#xff1f; 1.1. 索引是什么 当一张表有 500 万条数据&#xff0c;在没有索引的 name 字段上执行一个查询&#xff1a; select * from user_innodb where name ‘jim’; 如果 name 字段上面有索引呢&#xff1f; ALTER TABLE user_innodb DROP INDEX idx_n…

快学会这个技能-.NET API拦截技法

大家好&#xff0c;我是沙漠尽头的狼。 本文先抛出以下问题&#xff0c;请在文中寻找答案&#xff0c;可在评论区回答&#xff1a; 什么是API拦截&#xff1f;一个方法被很多地方调用&#xff0c;怎么在不修改这个方法源码情况下&#xff0c;记录这个方法调用的前后时间&…

Java零基础入门到精通(持续更新中)

打开CMD命令窗口 WINR输入cmd 常用cmd命令代码 切换磁盘 E: 回车即可切换到e盘查看当前路径下的所有内容 dir进入目录 cd test回退到上一级目录 cd..进入多级目录 cd test\index\aaa回退到磁盘目录 cd \清屏 cls关闭命令行窗口 exit小例子&#xff1a;使用命令行窗口…

细粒度视觉分析综述TPAMI2021

细粒度图像分析&#xff08;FGIA&#xff0c;Fine-grained image analysis&#xff09;是计算机视觉中一个长期存在的基本问题&#xff0c;并支撑着一系列不同的现实应用。FGIA的任务目标是分析从属类别&#xff08;subordinate categories&#xff09;的视觉对象&#xff0c;例…

【Azure 架构师学习笔记】-Azure Data Factory (1)-调度入门

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Data Factory】系列。 前言 在开发好一个ADF pipeline&#xff08;功能&#xff09;之后&#xff0c;需要将其按需要运行起来&#xff0c;这个称之为调度。下图是一个简单的ADF 运作图&#xff0c; 按照需要的顺序&am…