Netty核心技术十一--用Netty 自己 实现 dubbo RPC

news2025/1/12 10:36:20

1. RPC基本介绍

  1. RPC(Remote Procedure Call):远程 过程调用,是一个计算机 通信协议。该协议允许运 行于一台计算机的程序调 用另一台计算机的子程序, 而程序员无需额外地为这 个交互作用编程

  2. 两个或多个应用程序都分 布在不同的服务器上,它 们之间的调用都像是本地 方法调用一样(如图)

    image-20230715122856569

  3. 常见的 RPC 框架有: 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud。

2. RPC调用流程图

术语说明:在RPC中,

  • Client叫服务消费者
  • Server叫服务提供者

image-20230715123024750

3. PRC调用流程说明

  1. **服务消费方(client)**以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

4. 自己实现 dubbo RPC(基于Netty)

  • 需求说明

    1. dubbo 底层使用了 Netty 作为网络通讯框架,要求用Netty 实现一个简单的RPC框架
    2. 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty4.1.20
  • 设计说明

    image-20230715123505151

    1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
    2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
    3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty请求提供者返回数据

4.1 公共接口 publicinterface包

4.1.1 HelloService

package site.zhourui.nioAndNetty.netty.dubborpc.publicinterface;

//这个是接口,是服务提供方和 服务消费方都需要
public interface HelloService {
    String hello(String msg);
}

4.2 远程调用netty包

本质上就是客户端访问服务端

4.2.1 NettyClientHandler

  1. 我们实现了Callable方法
  2. setPara(String para)方法: 设置要发给服务器端的信息
  3. 我们将ctx在channelActive时抽取为全局对象context,方便我们在其他方法也能使用(这里就是call方法)
  4. call方法:
    • 开启子线程向服务端发送消息
    • 发送完成后该子线程进行wait,等待服务提供方处理并返回数据(被唤醒)
    • 唤醒后打印服务端返回数据全局变量result中的数据
  5. channelRead方法:
    • 收到服务器的返回数据后,将返回数据放在全局变量result中
    • 唤醒等待的线程
    • 因为channelRead和call方法是有同步关系的所有要加上synchronized加锁
  6. 小结: 代码执行流程
    1. channelActive()
    2. setPara()设置需要发送的数据
    3. call(wait之前代码)被代理对象调用, 发送数据给服务器-> wait
    4. 等待被唤醒(channelRead)->notify
    5. call(wait之后代码)
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //返回的结果
    private String para; //客户端调用方法时,传入的参数

    //与服务器的连接创建后,就会被调用, 这个方法是第一个被调用(1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被调用  ");
        context = ctx; //因为我们在其它方法会使用到 ctx
    }

    //收到服务器的数据后,调用方法 (4)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被调用  ");
        result = msg.toString();
        notify(); //唤醒等待的线程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead) -> 返回结果 (3)-》5
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被调用  ");
        context.writeAndFlush(para);
        //进行wait
        wait(); //等待channelRead 方法获取到服务器的结果后,唤醒
        System.out.println(" call2 被调用  ");
        return  result; //服务方返回的结果
    }

    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}

4.2.2 NettyClient

说明:

  1. 创建线程池executor

  2. initClient():

    • 初始化NettyClientHandler 设为全局对象client
    • 创建客户端并连接客户端
      • StringDecoder():字符串编码器
      • StringEncoder():字符串解码器
      • pipeline.addLast(client):将加入自定义handler-client
  3. 编写方法getBean使用代理模式,获取一个代理对象

    public Object getBean(final Class<?> serivceClass, final String providerName) 
    
    • serivceClass: 需要代理的Class对象
    • providerName: 协议以及需要发送的数据
    • 如果client为空就初始化initClient
    • client.setPara():使用自定义handler的全局对象client设置需要发送的数据
    • executor.submit(client): 将我们的自定义handler提交给异步线程池,因为NettyClientHandler 实现了Callable方法,会自动调用call方法
      • .get():异步任务执行完成后获取返回结果
    • 将返回结果return
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NettyClient {
    //创建线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    //编写方法使用代理模式,获取一个代理对象

    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");
                    //{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码
                    if (client == null) {
                        initClient();
                    }

                    //设置要发给服务器端的信息
                    //providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
                    client.setPara(providerName + args[0]);

                    //
                    return executor.submit(client).get();

                });
    }

    //初始化客户端
    private static void initClient() {
        client = new NettyClientHandler();
        //创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.2.3 NettyServerHandler

  • 当通道发生读事件时
    • 获取客户端发送的消息,并调用服务
    • 按照协议规则取出数据(HelloService#hello#)
      • HelloService# 为协议头
      • hello为数据
    • 回复客户端调用结果
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import site.zhourui.nioAndNetty.netty.dubborpc.customer.ClientBootstrap;
import site.zhourui.nioAndNetty.netty.dubborpc.provider.HelloServiceImpl;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送的消息,并调用服务
        System.out.println("msg=" + msg);
        //客户端在调用服务器的api 时,我们需要定义一个协议
        //比如我们要求 每次发消息是都必须以某个字符串开头 "HelloService#hello#你好"
        if(msg.toString().startsWith(ClientBootstrap.providerName)) {

            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

4.2.4 NettyServer

  • 启动客户端
    • StringDecoder
    • StringEncoder
    • NettyServerHandler
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {
    public static void startServer(String hostName, int port) {
        startServer0(hostName,port);
    }

    //编写一个方法,完成对NettyServer的初始化和启动

    private static void startServer0(String hostname, int port) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler()); //业务处理器

                                      }
                                  }

                    );

            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            System.out.println("服务提供方开始提供服务~~");
            channelFuture.channel().closeFuture().sync();

        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

4.3 customer 包

4.3.1 ClientBootstrap

  • 设置providerName:我们发送的数据(协议+数据)
  • 创建一个消费者
  • 创建代理对象
  • 通过代理对象调用服务提供者的方法(服务)
package site.zhourui.nioAndNetty.netty.dubborpc.customer;

import site.zhourui.nioAndNetty.netty.dubborpc.netty.NettyClient;
import site.zhourui.nioAndNetty.netty.dubborpc.publicinterface.HelloService;

public class ClientBootstrap {
    //这里定义协议头
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws  Exception{

        //创建一个消费者
        NettyClient customer = new NettyClient();

        //创建代理对象
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (;; ) {
            Thread.sleep(2 * 1000);
            //通过代理对象调用服务提供者的方法(服务)
            String res = service.hello("你好 dubbo~");
            System.out.println("调用的结果 res= " + res);
        }
    }
}

4.4 provider 包

4.4.1 HelloServiceImpl

服务端提供方的实现,远程真正被调用的方法

package site.zhourui.nioAndNetty.netty.dubborpc.provider;

import site.zhourui.nioAndNetty.netty.dubborpc.publicinterface.HelloService;

public class HelloServiceImpl implements HelloService {
    private static int count = 0;
    //当有消费方调用该方法时, 就返回一个结果
    @Override
    public String hello(String msg) {
        System.out.println("收到客户端消息=" + msg);
        //根据mes 返回不同的结果
        if(msg != null) {
            return "你好客户端, 我已经收到你的消息 [" + msg + "] 第" + (++count) + " 次";
        } else {
            return "你好客户端, 我已经收到你的消息 ";
        }
    }
}

4.4.1 ServerBootstrap

ServerBootstrap 会启动一个服务提供者,就是 NettyServer

package site.zhourui.nioAndNetty.netty.dubborpc.provider;

import site.zhourui.nioAndNetty.netty.dubborpc.netty.NettyServer;

//ServerBootstrap 会启动一个服务提供者,就是 NettyServer
public class ServerBootstrap {
    public static void main(String[] args) {

        //代码代填..
        NettyServer.startServer("127.0.0.1", 7000);
    }
}

4.5 测试

  1. 启动ServerBootstrap

    image-20230715153549514

  2. 启动ClientBootstrap

    image-20230715153625019

    image-20230715153633552

4.5.1 debug看一下ClientBootstrap启动

首先还是先启动服务端ServerBootstrap

  1. debug启动ClientBootstrap

    image-20230715153928061

  2. NettyClient(),此时只是初始化了全局属性

  3. getBean:创建代理对象

    • 先看看入参是什么数据

      image-20230715154250353

    • 如果client没有被初始化就初始化

      image-20230715154338794

    • 设置要发给服务器端的信息

      image-20230715154437686

    • executor.submit:提交异步任务就会来到NettyClientHandler的call方法

      image-20230715154604732

    • call方法执行到wait()方法后,channelRead不久后就会收到服务端的调用结果然后唤醒call方法的子线程继续执行

      image-20230715154847317

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/756427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AJAX与axios框架

文章目录 前言案例跨域访问总结❗ 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 通过 ajax 进行前后端交互 案例 此项目用到了javaweb知识 首先创建JavaWeb项目编写代码&#xff1a; package ajax;import java.io.IOException; import java.util.A…

个人引导页源码带三个按钮可添加

个人引导页源码带三个按钮可添加 简洁优雅引导页

SpingBoot配置文件

普通参数配置 之前用阿里云oss&#xff0c;如果在每一个都程序都写这个 1.代码一旦修改要一个一个改 2.代码复用性低 所以可以配置到springBoot的配置文件来进行对应的读取&#xff0c;这样复用性就高了 赋值Value&#xff08;和最下面注解功能相同&#xff09; 在applicat…

淘宝app商品详情原数据API接口【详情页优惠券数据】Python语言示例请求范例,多种语言均支持

首先以Python语言请求示例为介绍 请求示例 # coding:utf-8 """ Compatible for python2.x and python3.x requirement: pip install requests """ from __future__ import print_function import requests # 请求示例 url 默认请求参数已经做U…

密码学学习笔记(十一):哈希函数 - Merkle–Damgård结构

Merkle–Damgrd是一种算法&#xff0c;由Ralph Merkle和Ivan Damgrd提出。它通过迭代调用压缩函数来计算消息的哈希值。 应用 拿SHA-2举例&#xff0c;首先我们需要对需要进行哈希运算的输入做填充&#xff0c;然后将填充后的输入划分为等长的分组&#xff0c;每个分组的长度…

中级课程——SSRF

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言挖掘 前言 挖掘

【网站开发】jq (jquery)实现瀑布流布局

要实现网站瀑布流效果&#xff0c;可以使用HTML、CSS和jquery来完成。下面是一种常见的实现方式&#xff1a; 注意要引入jQuery库。 代码如下&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title>…

git下载源码及环境搭建之数据库(二)

学习目标&#xff1a; 数据库 新项目使用 数据库文件 的配置 及相关属性的设置 步骤&#xff1a; 数据库 下图所示为开发时所用数据库 第一步&#xff1a;新建一个数据库 注意&#xff1a; 字符集与排序规则我们应该选择utf-8 相关 选中新创建的表&#xff0c;点击备份—还…

【雕爷学编程】Arduino动手做(06)---KY-038声音传感器模块4

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

Flutter:网络图像缓存插件——cached_network_image

前言 为什么要使用这个插件&#xff0c;有什么用呢&#xff1f;毕竟官方提供了Image.network来进行网络图片加载 Image.network和CachedNetworkImage都可以用于在Flutter中加载网络图片&#xff0c;但它们之间有一些区别。 Image.network是Flutter核心库提供的一个构造函数&…

趣味:关于AI是否具备自我意识的探究方案之一

随着gpt等其他NLP大语言模型的爆火&#xff0c;AI技术再次成为我们谈论的焦点&#xff0c;AI是一种拟人的、能够通过学习和自我优化执行各种任务的技术&#xff0c;关于ai是否具备自我意识的答案很明显是否定的&#xff0c;以下将使用一个例子来论证的观点 1.实验准备 我们将…

收拾屋子找出10年前的三维教程

明天找个带光驱的电脑&#xff0c;打开看看。那会还是刻盘&#xff0c; 那会我还是小鲜肉&#xff0c;只是喜欢PS2游戏。所以才接触到三维软件&#xff0c;可惜没干这行。现在变成中年大叔了&#xff0c;拿出来玩会。 想当初现在e维网&#xff0c;下载最新的教程。后面电驴里…

高并发下保证接口幂等性的常用策略

接口幂等性问题&#xff0c;对于开发人员来说是一个常见的公共问题。这里分享一些我在项目中用到过的一些方法&#xff0c;给有需要的同学们一个参考。 你是否遇到过以下的场景&#xff1a; 在填写form页面表单时&#xff0c;如果前端没做loading或者防抖操作&#xff0c;保存…

松松商城上线“谷歌英文外链“资源,松松软文推出英文站点资源

我是卢松松&#xff0c;点点上面的头像&#xff0c;欢迎关注我哦&#xff01; 近期&#xff0c;为了丰富资源&#xff0c;松松商城和松松软文迎来了一系列新的更新。松松商城推出了“谷歌外贸站英文"外链资源&#xff0c;而松松软文则上线了英文站点资源&#xff0c;为用…

MURF20100CT-ASEMI快恢复对管MURF20100CT

编辑&#xff1a;ll MURF20100CT-ASEMI快恢复对管MURF20100CT 型号&#xff1a;MURF20100CT 品牌&#xff1a;ASEMI 封装&#xff1a;TO-220F 恢复时间&#xff1a;50ns 正向电流&#xff1a;20A 反向耐压&#xff1a;1000V 芯片个数&#xff1a;2 引脚数量&#xff1…

场景图生成——RelTR训练自己的数据集

RelTR训练自己的数据集 省流量省时间版本框的标注关系的标注总的 前言Open Images V6的标注格式RelTR中使用的Open Images V6的数据标注格式具体步骤框的标注生成格式关系三元组的生成格式 结束语参考链接 省流量省时间版本 框的标注 共需要创建4个json标注文件 train.json, …

功能升级,数据同步更便捷!场景化数据同步助您提效60%!

在企业数仓建设初期&#xff0c;为了保障数字化转型的落地效果&#xff0c;需要提供充足的数据资源&#xff0c;除了基础的数据抽取、转换和加载等过程&#xff0c;数据的同步也是重要环节之一。数据同步常用于数仓ODS、ADS层的建设&#xff0c;通过不同数据源的同步&#xff0…

回归预测 | MATLAB实现基于BiGRU-AdaBoost双向门控循环单元结合AdaBoost多输入单输出回归预测

回归预测 | MATLAB实现基于BiGRU-AdaBoost双向门控循环单元结合AdaBoost多输入单输出回归预测 目录 回归预测 | MATLAB实现基于BiGRU-AdaBoost双向门控循环单元结合AdaBoost多输入单输出回归预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现基于B…

Michael.W基于Foundry精读Openzeppelin第8期——Context.sol

Michael.W基于Foundry精读Openzeppelin第8期——Context.sol 0. 版本0.1 Context.sol 1. 目标合约2. 代码精读2.1 _msgSender()2.2 _msgSender() 0. 版本 [openzeppelin]&#xff1a;v4.8.3&#xff0c;[forge-std]&#xff1a;v1.5.6 0.1 Context.sol Github: https://gith…

MIT 6.S081 Lab 11 -- NetWork - 下

MIT 6.S081 Lab 11 -- NetWork -- 下 引言代码解析网络子系统初始化相关数据结构lab 分析e1000_transmit函数实现e1000_recv函数实现socket write全流程分析socket read全流程分析socket关闭ARP数据报的发送与接收 引言 本文为 MIT 6.S081 2020 操作系统 实验十一解析。 MIT …