由浅入深Netty简易实现RPC框架

news2025/1/16 2:48:40

目录

    • 1 准备工作
    • 2 服务器 handler
    • 3 客户端代码第一版
    • 4 客户端 handler 第一版
    • 5 客户端代码 第二版
    • 6 客户端 handler 第二版


1 准备工作

在这里插入图片描述

这些代码可以认为是现成的,无需从头编写练习

为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

@Data
public abstract class Message implements Serializable {

    // 省略旧的代码

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;

    static {
        // ...
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

}

请求消息

@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {

    /**
     * 调用的接口全限定名,服务端根据它找到实现
     */
    private String interfaceName;
    /**
     * 调用接口中的方法名
     */
    private String methodName;
    /**
     * 方法返回类型
     */
    private Class<?> returnType;
    /**
     * 方法参数类型数组
     */
    private Class[] parameterTypes;
    /**
     * 方法参数值数组
     */
    private Object[] parameterValue;

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
        super.setSequenceId(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }

    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}

响应消息

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
    /**
     * 返回值
     */
    private Object returnValue;
    /**
     * 异常值
     */
    private Exception exceptionValue;

    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}

服务器架子

@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        
        // rpc 请求消息处理器,待实现
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

客户端架子

public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        
        // rpc 响应消息处理器,待实现
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

服务器端的 service 获取

public class ServicesFactory {

    static Properties properties;
    static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

    static {
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
            Set<String> names = properties.stringPropertyNames();
            for (String name : names) {
                if (name.endsWith("Service")) {
                    Class<?> interfaceClass = Class.forName(name);
                    Class<?> instanceClass = Class.forName(properties.getProperty(name));
                    map.put(interfaceClass, instanceClass.newInstance());
                }
            }
        } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    public static <T> T getService(Class<T> interfaceClass) {
        return (T) map.get(interfaceClass);
    }
}

相关配置 application.properties

serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

2 服务器 handler

@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
        RpcResponseMessage response = new RpcResponseMessage();
        response.setSequenceId(message.getSequenceId());
        try {
            // 获取真正的实现对象
            HelloService service = (HelloService)
                    ServicesFactory.getService(Class.forName(message.getInterfaceName()));
            
            // 获取要调用的方法
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            
            // 调用方法
            Object invoke = method.invoke(service, message.getParameterValue());
            // 调用成功
            response.setReturnValue(invoke);
        } catch (Exception e) {
            e.printStackTrace();
            // 调用异常
            response.setExceptionValue(e);
        }
        // 返回结果
        ctx.writeAndFlush(response);
    }
}

3 客户端代码第一版

只发消息

@Slf4j
public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();

            ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
                    1,
                    "cn.itcast.server.service.HelloService",
                    "sayHello",
                    String.class,
                    new Class[]{String.class},
                    new Object[]{"张三"}
            )).addListener(promise -> {
                if (!promise.isSuccess()) {
                    Throwable cause = promise.cause();
                    log.error("error", cause);
                }
            });

            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

4 客户端 handler 第一版

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
    }
}

5 客户端代码 第二版

包括 channel 管理,代理,接收结果

@Slf4j
public class RpcClientManager {


    public static void main(String[] args) {
        HelloService service = getProxyService(HelloService.class);
        System.out.println(service.sayHello("zhangsan"));
//        System.out.println(service.sayHello("lisi"));
//        System.out.println(service.sayHello("wangwu"));
    }

    // 创建代理类
    public static <T> T getProxyService(Class<T> serviceClass) {
        ClassLoader loader = serviceClass.getClassLoader();
        Class<?>[] interfaces = new Class[]{serviceClass};
        //                                                            sayHello  "张三"
        Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
            // 1. 将方法调用转换为 消息对象
            int sequenceId = SequenceIdGenerator.nextId();
            RpcRequestMessage msg = new RpcRequestMessage(
                    sequenceId,
                    serviceClass.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2. 将消息对象发送出去
            getChannel().writeAndFlush(msg);

            // 3. 准备一个空 Promise 对象,来接收结果             指定 promise 对象异步接收结果线程
            DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
            RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);

//            promise.addListener(future -> {
//                // 线程
//            });

            // 4. 等待 promise 结果
            promise.await();
            if(promise.isSuccess()) {
                // 调用正常
                return promise.getNow();
            } else {
                // 调用失败
                throw new RuntimeException(promise.cause());
            }
        });
        return (T) o;
    }

    private static Channel channel = null;
    private static final Object LOCK = new Object();

    // 获取唯一的 channel 对象
    public static Channel getChannel() {
        if (channel != null) {
            return channel;
        }
        synchronized (LOCK) { //  t2
            if (channel != null) { // t1
                return channel;
            }
            initChannel();
            return channel;
        }
    }

    // 初始化 channel 方法
    private static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProcotolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(MESSAGE_CODEC);
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {
            channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().addListener(future -> {
                group.shutdownGracefully();
            });
        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

6 客户端 handler 第二版

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

    //                       序号      用来接收结果的 promise 对象
    public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
        // 拿到空的 promise
        Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
        if (promise != null) {
            Object returnValue = msg.getReturnValue();
            Exception exceptionValue = msg.getExceptionValue();
            if(exceptionValue != null) {
                promise.setFailure(exceptionValue);
            } else {
                promise.setSuccess(returnValue);
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/551121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

由浅入深Netty代码调优

目录 1. 优化1.1 扩展序列化算法 2 参数调优2.1 CONNECT_TIMEOUT_MILLIS2.2 SO_BACKLOG2.3 ulimit -n2.4 TCP_NODELAY2.5 SO_SNDBUF & SO_RCVBUF2.6 ALLOCATOR2.7 RCVBUF_ALLOCATOR 1. 优化 1.1 扩展序列化算法 序列化&#xff0c;反序列化主要用在消息正文的转换上 序列…

Windows11部署WSL2以及迁移操作系统位置

1 缘起 笔记本电脑Windows 10内存紧张&#xff1a;16 G&#xff0c; 但是&#xff0c;开发需要一些组件&#xff0c;如Redis&#xff08;Redisearch、ReJson&#xff09;、MySQL等&#xff0c; 在Linux容器化中部署更方便&#xff0c;易用&#xff0c; 在Windows中通过虚拟机安…

安卓与串口通信-modbus篇

前言 在之前的两篇文章中&#xff0c;我们讲解了串口的基础知识和在安卓中使用串口通信的方法&#xff0c;如果还没看过之前文章的同学们&#xff0c;建议先看一遍&#xff0c;不然可能会不理解这篇文章讲的某些内容。 事实上&#xff0c;在实际应用中&#xff0c;我们很少会…

Tip in/Out变速箱齿轮敲击过程详细分析

Tip in/Out变速箱齿轮敲击过程详细分析(模型由AMEsim例子改造而成&#xff0c;数据均虚构&#xff0c;仅学习用&#xff09; 1、发动机稳态工况2、Tip in/Out工况3、总结 1、发动机稳态工况 发动机输出力矩&#xff1a; 一轴齿轮驱动力矩&#xff08;离合器减振器输出力矩&am…

为什么要做问卷调查?企业获得用户心声的捷径

问卷调查作为一种重要的数据收集方法&#xff0c;在市场营销、社会学研究、用户研究等领域得到广泛应用。通过问卷调查&#xff0c;我们可以了解受访者的态度、行为、需求等信息&#xff0c;进而为企业和组织的决策提供支持。那么&#xff0c;为什么要做问卷调查呢&#xff1f;…

大语言模型架构设计

【大模型慢学】GPT起源以及GPT系列采用Decoder-only架构的原因探讨 - 知乎本文回顾GPT系列模型的起源论文并补充相关内容&#xff0c;中间主要篇幅分析讨论为何GPT系列从始至终选择采用Decoder-only架构。 本文首发于微信公众号&#xff0c;欢迎关注&#xff1a;AI推公式最近Ch…

一些云原生开源安全工具介绍

本博客地址&#xff1a;https://security.blog.csdn.net/article/details/130789465 一、Kubernetes安全监测工具kube-bench kube-bench是一个用Golang开发的、由Aqua Security发布的自动化Kubernetes基准测试工具&#xff0c;它运行CIS Kubernetes基准中的测试项目。这些测试…

在 uniapp 中通过 Intent 的方式启动其他APP并且传参

文章目录 前言一、其他软件调用文档中的安卓原生代码二、在uniAPP中实现上述方式三、总结四、感谢 前言 由于业务需求需要&#xff0c;我方研发的安卓APP需要调用其他安卓APP&#xff0c;并且将保存返回的文件存储路径进行读取后操作。对方软件公司提供了对接文档和一个测试调…

docker安装华为gaussdb数据库

docker安装gaussdb docker镜像&#xff1a; http://docker.hub.com/ 这里我们使用docker hub镜像下载&#xff0c;该镜像下载较慢&#xff0c;可能有时访问不同&#xff0c;可以使用阿里云镜像下载&#xff0c;阿里云镜像配置参考《docker国内阿里云镜像加速》 拉取镜像 下载…

程序翻译的过程,linux环境下处理,生成 .i、.s、.o 文件(预处理、编译、汇编、链接)

1. 程序翻译的过程有四个步骤&#xff0c;预处理->编译->汇编->链接。 那么每个步骤是干什么&#xff1f; 预处理阶段&#xff1a;处理-> 头文件、宏替换、条件编译等等&#xff0c;我用 linux 环境查看一下&#xff0c;如下&#xff1a; 首先写一个简单的 .c 文…

【iptables 防火墙设置】

目录 一、iptables概述1、netfilter/iptables 关系 二、四表五链2.1、四表:2.2、五链&#xff1a; 三、规则链之间的匹配顺序四、规则链内的匹配顺序五、iptables的安装配置5.1、安装iptables5.2、配置iptables1、常用的管理选项2、常用的参数3、常用的控制类型4、iptables语法…

ThinkPHP6 模型层的模型属性,表映射关系,以及如何在控制层中使用模型层和模型层中的简单CRUD

ThinkPHP6 模型层的模型属性&#xff0c;表映射关系&#xff0c;以及模型层的CRUD及如何在控制层中使用模型层 1. model 模型层的默认映射规则 模型&#xff0c;即mvc模式中的model层&#xff0c;model层用来对接数据库&#xff0c;操作数据库的增删改查。 在tp6中&#xff…

springboot整合sharding-jdbc实现分库分表详解

目录 一、为什么需要分库分表 1.1 分库分表的优势 二、分库分表基本概念 2.1 垂直分表 2.2 水平分表 2.3 垂直分库 2.4 水平分库 三、分库分表带来的问题 3.1 分布式事务问题 3.2 跨节点关联查询问题 3.3 跨节点分页、排序问题 3.4 主键避重问题 四、分库分表常用…

Java --- 云尚办公之菜单管理模块

一、菜单管理 数据库表&#xff1a; CREATE TABLE sys_menu (id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 编号,parent_id BIGINT(20) NOT NULL DEFAULT 0 COMMENT 所属上级,name VARCHAR(20) NOT NULL DEFAULT COMMENT 名称,type TINYINT(3) NOT NULL DEFAULT 0 COMMEN…

LSTM预测汇率涨跌分析

前言 本文主要是采用lstm对汇率涨跌进行预测&#xff0c;是一个二分类的预测问题。 步骤解析 数据构造 原始数据是单变量数据 import pandas as pdfile_path r"./huilv.csv" data pd.read_csv(file_path, usecols[1],encodinggbk) data[level] -1 美元 l…

打造高效接口自动化框架,YAML测试用例封装技巧大揭秘!

目录 前言&#xff1a; 一、框架介绍 本框架包含两个部分&#xff1a; 本框架的构建目标是&#xff1a; 二、框架目录结构 三、规范YAML测试用例封装步骤 四、框架使用 五、总结 前言&#xff1a; 本文介绍了一个基于Python和PyTest的接口自动化框架封装项目实战&#…

最佳实践,高效编写Web自动化测试强制交互方法封装技巧

目录 前言&#xff1a; 一、Web自动化测试的基本原理 二、封装强制交互方法 1、输入框强制交互 2、其他强制交互 三、封装基础类方法 四、总结 前言&#xff1a; Web自动化测试是现代软件开发中必不可少的部分。Web自动化测试可以帮助测试人员快速地验证页面功能并发现潜…

Fiddler 抓包工具下载安装基本使用(详)

在做软件测试或者Bug定位的时候会用到一些抓包工具&#xff0c;当然抓包工具还要一些其他用途可以做一些API的抓取&#xff0c;那么本篇内容就来讲 Fiddler 抓包工具的下载安装以及如何来实际的应用。讲了这些可能有的读者还不知道这个"Fiddler"怎么读呢&#xff1f;…

详解flutter刷新流程,让你的应用更流畅

本文已授权公众号【缦图技术团队】发布 详解flutter刷新流程&#xff0c;让你的应用更流畅 一、概述 Flutter 是谷歌推出的高性能、跨端UI框架&#xff0c;可以通过一套代码&#xff0c;支持 iOS、Android、Windows/MAC/Linux 等多个平台&#xff0c;且能达到原生性能。Flutte…

pthread_getspecific和pthread_setspecific详解

写在前面 在Linux系统中使用C/C进行多线程编程时&#xff0c;我们遇到最多的就是对同一变量的多线程读写问题&#xff0c;大多情况下遇到这类问题都是通过锁机制来处理&#xff0c;但这对程序的性能带来了很大的影响&#xff0c;当然对于那些系统原生支持原子操作的数据类型来…