利用netty手写rpc框架

news2025/1/8 5:09:11

前言:利用netty异步事件驱动的网络通信模型,来实现rpc通信

一、大致目录结构:

二、两个端:服务端(发布),客户端(订阅消费),上代码:

1.服务端(发布):

RPCServer:

代码:

public class RpcServer {


    private Map<String, Object> registryMap = new HashMap<>();
    private List<String> classCache = new ArrayList<>();

    // 1.实现发布;实现方式:
    // 1.1查找指定目录下的所有接口,放入一个集合中
    // 1.2遍历所有接口,放入一个map中存放接口路径及接口名称
    // 1.3发送方法相关信息


    public void publish(String providerPackage) throws Exception {

        getProviderClass(providerPackage);
        doRegister();

        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new RpcServerHandler(registryMap));
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(9999).sync();
            System.out.println("服务端监听9999端口,启动成功。。。");
            future.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }

    private void doRegister() throws Exception {

        if (classCache.size() > 0) {
            for (String className :classCache){

                Class<?> clazz = Class.forName(className);
                String interfaceName = clazz.getInterfaces()[0].getName();
                registryMap.put(interfaceName, clazz.newInstance());
            }
        }


    }


    /**
     * 获取当前目录下的所有接口,汇总到一个集合中
     * @param providerPackage
     */
    private void getProviderClass(String providerPackage) {

        URL resource =         this.getClass().getClassLoader().getResource(providerPackage.replaceAll("\\.", "/"));

        File file = null;
        if (resource != null) {
            file = new File(resource.getFile());
        }

        if (file != null) {
            for (File f : Objects.requireNonNull(file.listFiles())) {
                if (f.isDirectory()) {
                    getProviderClass(providerPackage + "." + f.getName());
                } else if (f.getName().endsWith(".class")) {
                    String fileName = f.getName().replace(".class", "").trim();
                    classCache.add(providerPackage + "." + fileName);
                }
            }
        }
    }


}

服务端处理器:用于处理消费端发送过来的接口数据

代码:

public class RpcServerHandler extends ChannelInboundHandlerAdapter {

    private Map<String, Object> registryMap;

    public RpcServerHandler(Map<String, Object> registryMap) {
        this.registryMap = registryMap;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("rpc-server收到客户端消息:" + msg);

        if (msg instanceof InvokeMessage) {
            InvokeMessage method = (InvokeMessage) msg;
            Object result = "rpc-server端没有该方法";

            // 判断是否存在该方法
            if (registryMap.containsKey(method.getClassName())) {
                Object provider = registryMap.get(method.getClassName());
                result = provider.getClass()
                        .getMethod(method.getMethodName(), method.getParamTypes())
                        .invoke(provider, method.getParamValues());
            }
            // 把方法返回结果,发给订阅者
            ctx.channel().writeAndFlush(result);
            ctx.close();
        }

    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

接口的实现类:这里的接口是客户端的接口Api

代码:

public class SsenServiceApiImpl implements SsenServiceApi {

    @Override
    public String hellRpc(String name) {
        return name + "实现类方法";
    }
}

2.客户端:(订阅消费)

这里采用JDK原生的基于接口的动态代理f发

public class RpcProxy {


    // JDK基于接口的动态代理,用于创建代理对象
    public static <T> T create(Class<?> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
                new Class[]{clazz},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        if (Object.class.equals(method.getDeclaringClass())) {
                            return method.invoke(proxy, method, args);
                        }

                        // 通过netty将接口信息发送给提供者,获取指定方法
                        return RpcInvoke(clazz, method, args);
                    }
                });
    }

    private static Object RpcInvoke(Class<?> clazz, Method method, Object[] args) {
        NioEventLoopGroup eventGroup = new NioEventLoopGroup();

        RpcClientHandler rpcClientHandler = new RpcClientHandler();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(rpcClientHandler);
                        }
                    });

            // 绑定指定服务地址
            ChannelFuture future = bootstrap.connect("localhost", 9999).sync();

            // 指定接口信息发送给提供者
            InvokeMessage invokeMessage = new InvokeMessage();
            invokeMessage.setClassName(clazz.getName());
            invokeMessage.setMethodName(method.getName());
            invokeMessage.setParamTypes(method.getParameterTypes());
            invokeMessage.setParamValues(args);
            future.channel().writeAndFlush(invokeMessage).sync();

            future.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventGroup.shutdownGracefully();
        }
        return rpcClientHandler.getResult();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1471904.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

prometheus+grafana监控nginx的简单实现

1.编译安装NGINX 加入编译安装nginx-module-vts模块,目的是为了获取更多的监控数据(虚拟主机&#xff0c;upstream等) nginx下载 http://nginx.org/download/nginx-1.20.2.tar.gz nginx-module-vts下载 https://github.com/vozlt/nginx-module-vts/archive/refs/tags/v0.2…

手机打开 第三方 “微信、快手、QQ、电话、信息” 等

前期回顾 Vue3 TS Element-Plus —— 项目系统中封装表格搜索表单 十分钟写五个UI不在是问题_vue3 封装table 配置表格-CSDN博客https://blog.csdn.net/m0_57904695/article/details/135538630?spm1001.2014.3001.5501 目录 &#x1f916; 下载App如下图所示&#xff1a;…

【Unity】如何从现有项目中抽取好用的资源

【背景】 在做Unity项目的过程中引入各种各样的Package&#xff0c;有的Package很大&#xff0c;但是觉得非常有用的可能只是几个Prefab或者Material等。如果直接拷贝想要的Prefab和Material&#xff0c;又需要自己确认所有有依赖关系的资源。 如果能将所有日常经受项目中自己…

蓝桥杯算法 一.

分析&#xff1a; 本题记录&#xff1a;m个数&#xff0c;异或运算和为0&#xff0c;则相加为偶数&#xff0c;后手获胜。 分析&#xff1a; 369*99<36500&#xff0c;369*100>36500。 注意&#xff1a;前缀和和后缀和问题

ffmpeg的pcm、yuv小知识点

ffmpeg的pcm、yuv小知识点 pcm、yuv保存调用&#xff0c;写个通用工具方法&#xff0c;平时快速保存&#xff0c;和调用方便查看自己bug ffmpeg的AVFrame存储 yuv 调用方法 保存方法 void save_yuv420p_file(unsigned char *y_buf , unsigned char *u_buf,unsigned char *…

解决gogs勾选“使用选定的文件和模板初始化仓库”报错500,gogs邮件发送失败,gogs邮件配置不生效,gogs自定义模板等问题

解决gogs勾选“使用选定的文件和模板初始化仓库”报错500,gogs邮件发送失败,gogs邮件配置不生效,gogs自定义模板等问题 前几天出了教程本地部署gogs&#xff0c;在后期运行时发现两个问题&#xff1a; 第一&#xff1a;邮件明明配置了&#xff0c;后台显示未配置&#xff0c;…

如何实现多账户管理?海外代理IP推荐

伴随着互联网的发展&#xff0c;目前越来越多的用户开始拥有不止一个社交媒体或者电商平台等类型的账号&#xff0c;但实际上不论是社交平台还是电商平台对于用户的多账号使用行为都十分的抵制。如果用户不采取任何措施直接长时间进行多账户操作的话&#xff0c;可能会遇到以下…

SpringBoot原理篇

文章目录 SpingBoot原理1. 配置优先级2. Bean管理2.1 获取Bean2.2 Bean作用域2.3 第三方Bean 3. SpringBoot原理3.1 起步依赖3.2 自动配置3.2.1 概述3.2.2 常见方案3.2.2.1 概述3.2.2.2 方案一3.2.2.3 方案二 3.2.3 原理分析3.2.3.1 源码跟踪3.2.3.2 Conditional 3.2.4 案例3.2…

Python爬虫技术详解:从基础到高级应用,实战与应对反爬虫策略【第93篇—Python爬虫】

前言 随着互联网的快速发展&#xff0c;网络上的信息爆炸式增长&#xff0c;而爬虫技术成为了获取和处理大量数据的重要手段之一。在Python中&#xff0c;requests模块是一个强大而灵活的工具&#xff0c;用于发送HTTP请求&#xff0c;获取网页内容。本文将介绍requests模块的…

sql-labs第46关 脚本盲注

我们直接可以从界面中得知传参的参数为SORT 查看源码 我们令参数即sort为1尝试输入&#xff1a;即 ?sort1 令参数即sort为2 看到按照字母顺序进行了排序&#xff0c;所以使用了order by语句。 用 rand()盲注 这里首先假设给true以及false两个来观察回显 ?sortrand(true) …

苹果iPad通过Code APP应用实现SSH连接服务器远程进行开发

文章目录 1. 在iPad下载Code APP2.安装cpolar内网穿透2.1 cpolar 安装2.2 创建TCP隧道 3. iPad远程vscode4. 配置固定TCP端口地址4.1 保留固定TCP地址4.2 配置固定的TCP端口地址4.3 使用固定TCP地址远程vscode 本文主要介绍开源iPad应用IDE Code App 如何下载安装&#xff0c;并…

Linux-实用操作(黑马学习笔记)

各类小技巧&#xff08;快捷键&#xff09; ctrl c 强制停止 ● Linux某些程序的运行&#xff0c;如果想要强制停止它&#xff0c;可以使用快捷键ctrl c ● 命令输入错误&#xff0c;也可以通过快捷键ctrl c&#xff0c;退出当前输入&#xff0c;重新输入 ctrl d 退出或登…

用Python实现创建十二星座数据分析图表

下面小编提供的代码中&#xff0c;您已经将pie.render()注释掉&#xff0c;并使用了pie.render_to_file(十二星座.svg)来将饼状图渲染到一个名为十二星座.svg的文件中。这是一个正确的做法&#xff0c;如果您想在文件中保存图表而不是在浏览器中显示它。 成功创建图表&#xf…

本地部署ChatGPT

发布一下我之前做的一个本地大模型部署,不需要API key,但要有自己的账号 利用Docker 的Pandora做本地ChatGPT模型部署 先下载安装Docker,设置好运行如下 会要求升级核心,cmd运行如下命令就OK 安装Pandora 再管理员cmd中输入如下命令拉取Pandora镜像 docker pull pengzhi…

索引学习以及索引原理

有时候&#xff0c;建索引并不一定会加快查询效率。但是&#xff0c;有时候&#xff0c;表的数据量是大数据量的话&#xff0c;还是要看下是否能使用索引优化查询效率。 1、建索引的几大原则&#xff1a; 1.1、最左前缀匹配原则非常重要的原则&#xff0c;mysql会一直向右匹配…

3分钟快速实现串口PLC远程下载程序操作说明

3分钟快速实现串口PLC远程下载程序操作说明 搜索蓝蜂物联网官网&#xff0c;即可免费领取样机使用&#xff01;&#xff01;先到先得&#xff01;&#xff01;&#xff01; 一. 适用产品型号 其余型号网关此功能正在开发中&#xff0c;敬请期待。 二. 远程下载功能使用流程 …

HTTP/HTTPS协议

什么是HTTP协议 HTTP被称为超文本传输协议(里面不仅仅可以是字符串,还可以是图片,特殊字符等),这是一种应用非常广泛的应用层协议. HTTP协议诞生于1991年,现在是最主流使用的一种应用层协议.它从诞生到现在为止迭代了多个版本. 但目前最主流使用的还是HTTP1.1和HTTP2.0. HTTP协…

图像处理ASIC设计方法 笔记3 跨时钟域处理思想

P43第二章 数据路径并行策略 全并行、全串行、半并行 P47 第三章 时序问题和解决方法 锁存器的时序分析,工具做不到,需要设计者手工分析 存储器中SRAM可以直接做到ASIC里面(CMOS工艺)而DRAM不行 P55 作者是不是把sensor翻译成 成像器 成像器向ASIC输入被处理数据 片外…

【ECharts】调用接口获取后端数据的四种方法

使用eacharts做大屏&#xff0c;需要使用后端数据&#xff0c;下面的方法是自己试过有效的&#xff0c;有什么不对的&#xff0c;望各位大佬指点。 目录 方法一&#xff1a;在mounted中使用定时器调用eacharts方法&#xff08;定时器可以获取到data中的数据&#xff09; 方法…

MySQL的SQL语句

1.MySQL连接 连接命令一般是这样写的 mysql -h$ip -P$port -u$user -p比如:mysql -h127.0.0.1 -P3306 -uroot -p -h 指定连接的主机地址&#xff1b;-P 指定连接端口号&#xff1b;-u 指定用户名 -p指定用户名密码 2.SQL分类 DDL(Data Definition Language) 数据定义语言&…