用netty实现简易rpc

news2025/1/12 23:41:52

文章目录

  • rpc介绍:
  • rpc调用流程:
  • 代码:

rpc介绍:

RPC是远程过程调用(Remote Procedure Call)的缩写形式。SAP系统RPC调用的原理其实很简单,有一些类似于三层构架的C/S系统,第三方的客户程序通过接口调用SAP内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。

rpc调用流程:

在这里插入图片描述

代码:

public interface HelloService {

    String hello(String msg);
}
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String msg) {
        System.out.println("读取到客户端信息:" + msg);
        if (msg != null) {
            return "已收到客户端信息【" + msg + "】";
        } else {
            return "已收到客户端信息";
        }
    }
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接收客户端发送的信息,并调用服务
        // 规定每次发送信息时 都以"HelloService#hello#开头“, 其中最后一个#后面的为参数
        String message = msg.toString();
        System.out.println("最初消息:" + message);
        if (message.startsWith("HelloService#hello#")) {

            String arg = message.substring(19);
            System.out.println("接收的参数:" + arg);
            String result = new HelloServiceImpl().hello(arg);
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 出现异常时关闭通道
        ctx.close();
    }
}
public class NettyServer {

    /**
     * 启动服务
     *
     * @param host 主机地址
     * @param port 线程端口
     */
    private static void startServer0(String host, int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {  // workerGroup
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // String的编码解码器
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new NettyServerHandler()); // 自定义业务处理器
                        }
                    });
            // 绑定端口并启动
            ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
            System.out.println("服务器启动:");
            // 监听关闭
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void startServer(String host, int port) {
        startServer0(host, port);
    }
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext channelHandlerContext;
    private String result; // 服务端返回的数据
    private String param; // 客户端调用方法时传入的参数

    /**
     * 与服务器建立连接时被调用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive 被调用");
        this.channelHandlerContext = ctx;
    }

    /**
     * 收到服务器数据时被调用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被调用  ");
        result = msg.toString();
        // 唤醒等待的线程。
        notifyAll();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    /**
     * 当某个线程执行NettyClientHandler任务时,会调用get()方法,get()方法会阻塞当前线程,
     * 直到任务执行完成并返回结果或抛出异常。
     *
     * @return
     * @throws Exception
     */
    @Override
    public synchronized Object call() throws Exception {
        System.out.println("call--1  ");
        channelHandlerContext.writeAndFlush(param);
//        TimeUnit.MILLISECONDS.sleep(5 * 1000);
        wait(); // 等待channelRead()方法的调用
        System.out.println("call--2  ");
        return result;
    }

    /**
     * 设置参数
     *
     * @param param
     */
    public void setParam(String param) {
        this.param = param;
    }
}
public class NettyClient {

    // 设置为cpu核数个线程
    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler nettyClientHandler;


    private static void initClient() {
        nettyClientHandler = new NettyClientHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true) // tcp无延迟
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(nettyClientHandler);
                    }
                });
        try {
            ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

    public Object getBean(final Class<?> serviceClass, final String providerName) {
        /**
         * newProxyInstance()方法的第三个参数为实现了java.lang.reflect.InvocationHandler接口的类,
         */
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> {
            if (nettyClientHandler == null) {
                System.out.println("nettyClientHandler 被初始化");
                initClient();
            }

            System.out.println("进入到匿名内容类");
            nettyClientHandler.setParam(providerName + args[0]);
            return executorService.submit(nettyClientHandler).get();
        });
    }
}
public class ServerBootStrapService {
    public static void main(String[] args) {
        NettyServer.startServer("127.0.0.1",7000);
    }
}
public class ConsumerBootStrap {

    public final static String ProviderName = "HelloService#hello#";

    public static void main(String[] args) throws InterruptedException {
        NettyClient nettyClient = new NettyClient();

        /**
         * helloService为代理对象
         */
        HelloService helloService = (HelloService) nettyClient.getBean(HelloService.class, ProviderName);
        for (int i = 0; ; ) {
            TimeUnit.MILLISECONDS.sleep(2000);

            /**
             * 当helloService调用hello()方法时,会进入到 实现了InvocationHandler类中的invoke()方法,也就是这个匿名内部类:(proxy, method, args) -> {
             *             if (nettyClientHandler == null) {
             *                 initClient();
             *             }
             *             nettyClientHandler.setParam(providerName + args[0]);
             *             return executorService.submit(nettyClientHandler).get();
             */
            helloService.hello("哈喽,哈喽: " + i++);
        }
    }
}

gitee地址:https://gitee.com/okgoodfine/rpc-netty

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1080527.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

群晖内安装的windows虚拟机如何扩展磁盘(虚拟机如何扩展磁盘,解决扩展磁盘不生效的问题)

文章目录 问题解决问题问题 在群晖的虚拟机中创建了一个Win7x64的虚拟机,由于配置低的原因,但是容量只设置了30G,但是现在要满了,所以现在要迁移和扩容 迁移很简单,直接选择迁移就行,扩容的话也就是选择扩容到指定的容量 (注意:这里容量扩大是不可逆的,扩大了不能再变…

银河麒麟安装arm架构mysql8

1. 准备工作 2. 查看麒麟系统版本 使用命令 Linux version 4.19.90-25.21.v2101.ky10.aarch64 (KYLINSOFTlocalhost.localdomain) (gcc version 7.3.0 (GCC)) #1 SMP Wed Sep 28 16:37:42 CST 2022可以看出这是麒麟 v10 &#xff0c;aarch64 &#xff08;ARM 架构的&#xff…

A股风格因子看板 (2023.10 第02期)

该因子看板跟踪A股风格因子&#xff0c;该因子主要解释沪深两市的市场收益、刻画市场风格趋势的系列风格因子&#xff0c;用以分析市场风格切换、组合风格暴露等。 今日为该因子跟踪第02期&#xff0c;指数组合数据截止日2023-09-30&#xff0c;要点如下 1) 近1年A股风格因子检…

聊一聊 Spring 6 面向切面AOP

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 今天我们一起看看Spring AOP的相关操作&#xff01; 1、场景模拟 搭建子模块&#xff1a;spring6-aop 1.1、声明接口 声明计算器接口Calculator&#xff0c;包含加减乘除的抽象方法 public interface Calculator…

tailwindcss安装完插件代码不提示

安装完插件鼠标滑过tailwindcss类名claa不提示 vscode版本太低.需要安装最新的vscode插件扩展设置中的Tailwind CSS :Emmet Completions默认是未勾选的,需要手动勾选

科技云报道:不堪忍受英伟达霸权,微软、OpenAI纷纷自研AI芯片

科技云报道原创。 英伟达是当之无愧的“AI算力王者”&#xff0c;A100、H100系列芯片占据金字塔顶尖位置&#xff0c;是ChatGPT这样的大型语言模型背后的动力来源。 但面对英伟达的独霸天下&#xff0c;科技巨头们都纷纷下场自研AI芯片。 10月6日&#xff0c;媒体援引知情人…

二、WebGPU阶段间变量(inter-stage variables)

二、WebGPU阶段间变量&#xff08;inter-stage variables&#xff09; 在上一篇文章中&#xff0c;我们介绍了一些关于WebGPU的基础知识。在本文中&#xff0c;我们将介绍阶段变量&#xff08;inter-stage variables&#xff09;的基础知识。 阶段变量在顶点着色器和片段着色…

科技资讯|苹果下一代Vision Pro头显将更小更轻,预装处方镜片

据彭博社的 Mark Gurman 在《Power On》新闻简报中透露&#xff0c;苹果和 Meta 的混合现实头显还未发售&#xff0c;但两家的下一代机型的开发工作已经在顺利进行。 据报道&#xff0c;苹果下代产品的一个重点是通过更小、更轻的设计&#xff0c;使其设备佩戴起来更加舒适。据…

VScode远程root权限调试

尝试诸多办法无法解决的情况下&#xff0c;允许远程登陆用户直接以root身份登录 编辑sshd_config文件 sudo vim /etc/ssh/sshd_config 激活配置 注释掉PermitRootLogin without-password&#xff0c;即#PermitRootLogin without-password 增加一行&#xff1a;PermitRootLo…

浅谈高速公路服务区分布式光伏并网发电

前言 今年的国家经济工作会议提出&#xff1a;将“做好碳达峰、碳中和工作”作为 2021年的主要任务之一&#xff0c;而我国高速公路里程 15.5万公里&#xff0c;对能源的需求与日俱增&#xff0c;碳排放量增速明显。 为了实现采用减少碳排放量&#xff0c;采用清洁能源替代的…

mac版postman升级后数据恢复办法

postman升级了一下&#xff0c;所有的collections都丢失了。 首先在finder里找到这个路径 /Users/{用户名}/Library/Application Support/Postman找到升级之前的的最新的backup.json&#xff0c;然后在postman里import这个文件。 所有升级前的collections都恢复了&#xff0…

云上攻防-云服务篇对象存储Bucket桶任意上传域名接管AccessKey泄漏

文章目录 章节点前言对象存储各大云名词&#xff1a; 对象存储-以阿里云为例&#xff1a;正常配置权限配置错误公共读或公共读写&#xff1a;可完整访问但不显示完整结构权限Bucket授权策略&#xff1a;设置ListObject显示完整结构权限Bucket读写权限&#xff1a;公共读写直接P…

数据结构相关知识点(一)

一、线性表 1.什么是线性表&#xff1f; 线性表&#xff0c;全名为线性存储结构。使用线性表存储数据的方式可以理解理解为&#xff1a;把所有数据按照顺序&#xff08;线性&#xff09;的存储结构方式&#xff0c;存储在物理空间。 2.线性存储结构 顺序存储结构&#xff1…

建筑行业+办公领域低代码解决方案:创新、效率与商机的共赢

随着科技的不断进步和创新&#xff0c;建筑行业面临着技术创新和转型升级的压力。新的技术和工艺不断涌现&#xff0c;建筑企业需要紧跟技术趋势&#xff0c;引入新的技术和工艺&#xff0c;提高生产效率和质量&#xff0c;增强自身的核心竞争力。 而且建筑行业是一个高度竞争…

MongoDB副本集集群原理

文章目录 1、副本集-Replica Sets1.1、是什么1.2、副本集的三个角色1.3、副本集架构目标1.4、副本集的创建1.4.1 第一步&#xff1a;创建主节点1.4.2 第二步&#xff1a;创建副本节点1.4.3 第三步&#xff1a;创建仲裁节点1.4.4 第四步&#xff1a;初始化配置副本集和主节点1.4…

基于SSM的影视企业全渠道会员管理系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

计算机毕业设计选什么题目好?springboot 仓库在线管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

400电话是什么电话,和普通电话有什么不同

400电话是一种特殊的电话号码&#xff0c;通常用于企业或机构提供客户服务和咨询。与普通电话相比&#xff0c;400电话有以下几个不同之处。 首先&#xff0c;400电话是一种虚拟号码&#xff0c;不依赖于地理位置。普通电话号码通常与特定的地区或城市相关联&#xff0c;而400…

HP ENVY x360 Convert 15-bp002tx,15-bp106TX原厂Windows10系统

惠普笔记本ENVY X360原装出厂OEM预装Win10镜像文件 下载链接&#xff1a;https://pan.baidu.com/s/1GXoEIHaJtP752zYDJknrJg?pwdmq35 适用型号&#xff1a;15-bp001tx,15-bp002tx,15-bp003tx,15-bp004tx,15-bp005tx,15-bp006tx 15-bp101TX,15-bp102tx,15-bp103tx,15-bp104t…

跨境商城源码的终极秘密:寻找最佳解决方案的5个步骤!

在当今全球化的商业环境下&#xff0c;跨境电商已经成为许多企业拓展市场的重要战略之一。而搭建一个高效稳定的跨境商城平台是成功的关键所在。但是&#xff0c;要找到最佳解决方案并不容易。本文将详细介绍寻找跨境商城源码的最佳解决方案的五个步骤&#xff0c;帮助您在选择…