【手撸RPC框架】netty入门

news2025/1/13 15:52:10

在这里插入图片描述

🐼作者简介:一名大三在校生🎋
空有想法,没有实践,难成大事

专栏前言:探索RPC框架的奥秘

在这里插入图片描述

简介在现代软件开发中,随着微服务架构的普及,远程过程调用(RPC)框架成为了连接服务之间通信的桥梁。我有决定开发了一款高性能的RPC框架,它不仅实现了服务之间的高效调用,还集成了关键的服务治理功能,如负载均衡、熔断机制和限流策略,以确保系统的稳定性和可靠性。

核心技术本项目采用Netty作为其强大的底层通信组件,确保了网络通信的高效与稳定。同时,通过与ZooKeeper的结合,实现了服务的注册与发现,为服务治理提供了坚实的基础。

下面我将提供一个全面的视角,来理解RPC框架的内部工作原理及其在实际开发中的应用。欢迎大家持续关注订阅专栏!!!

专属社群:

在这里插入图片描述

文章目录

    • 专栏前言:探索RPC框架的奥秘
    • 三、netty入门
      • 1、为什么要学习netty?
      • 2、netty的基本工作流程
      • 3、netty中的helloworld
    • 欢迎添加微信,加入我的核心小队,请备注来意

三、netty入门

1、为什么要学习netty?

一方面:现在物联网的应用无处不在,大量的项目都牵涉到应用传感器和服务器端的数据通信,Netty作为基础通信组件、能够轻松解决之前有较高门槛的通信系统开发,你不用再为如何解析各类简单、或复杂的通讯协议而薅头发了,有过这方面开发经验的程序员会有更深刻、或者说刻骨铭心的体会。

另一方面:现在互联网系统讲究的都是高并发分布式微服务,各类消息满天飞(是的,IM系统、消息推送系统就是其中的典型),Netty在这类架构里面的应用可谓是如鱼得水,如果你对当前的各种应用服务器不爽,那么完全可以基于Netty来实现自己的HTTP服务器、FTP服务器、UDP服务器、RPC服务器、WebSocket服务器、Redis的Proxy服务器、MySQL的Proxy服务器等等。

2、netty的基本工作流程

在netty中存在以下的核心组件:

  • ServerBootstrap:服务器端启动辅助对象;
  • Bootstrap:客户端启动辅助对象;
  • Channel:通道,代表一个连接,每个Client请对会对应到具体的一个Channel;
  • ChannelPipeline:责任链,每个Channel都有且仅有一个ChannelPipeline与之对应,里面是各种各样的Handler;
  • handler:用于处理出入站消息及相应的事件,实现我们自己要的业务逻辑;
  • EventLoopGroup:I/O线程池,负责处理Channel对应的I/O事件;
  • ChannelInitializer:Channel初始化器;
  • ChannelFuture:代表I/O操作的执行结果,通过事件机制,获取执行结果,通过添加监听器,执行我们想要的操作;
  • ByteBuf:字节序列,通过ByteBuf操作基础的字节数组和缓冲区。

我们结合其核心组件通过下图,可以清晰的看明白netty的基本工作原理:

在这里插入图片描述

在这其中,ChannelPipeline 是一个重要的组件,用于处理 I/O 事件和拦截 I/O 操作。它是一个处理器链,负责将 I/O 操作分发给各个 ChannelHandler 进行处理。通过组合不同的 ChannelHandler,用户可以定制处理网络事件的逻辑,其中大多数的ChannelHandler需要我们手动编写。

一个典型的 Netty ChannelPipeline 可以包含以下几种 ChannelHandler:

  1. 解码器(Decoder):将接收到的字节流(ByteBuf)解码为应用层所使用的数据结构(如 POJO 对象)。常见的解码器有:ByteToMessageDecoder、LengthFieldBasedFrameDecoder 等。
  2. 编码器(Encoder):将应用层的数据结构编码为字节流,以便在网络中传输。常见的编码器有:MessageToByteEncoder、LengthFieldPrepender 等。
  3. 业务逻辑处理器:处理应用层的业务逻辑,如数据库操作、业务计算等。业务逻辑处理器通常需要继承 ChannelInboundHandlerAdapter 或 ChannelOutboundHandlerAdapter,并实现相应的事件处理方法。

3、netty中的helloworld

首先创建Handler类,该类用于接收服务器端发送的数据,这是一个简化的类,只重写了消息读取方法channelRead0、捕捉异常方法exceptionCaught。

(1)定义客户端的处理器

客户端的Handler一般继承的是SimpleChannelInboundHandler,该类有丰富的方法,心跳、超时检测、连接状态等等,代码如下:

@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf>
{
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception
    {
        /**
        * @Description  处理接收到的消息
        **/
        System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException
    {
        /**
        * @Description  处理I/O事件的异常
        **/
        cause.printStackTrace();
        ctx.close();
    }
}

代码说明:

  • @ChannelHandler.Sharable:这个注解是为了线程安全,如果你不在乎是否线程安全,不加也可以;
  • SimpleChannelInboundHandler:这里的泛型可以是ByteBuf,也可以是String,还可以是对象,根据具体的实际情况来;
  • channelRead0:读取消息的方法,注意名称中有个0;
  • ChannelHandlerContext:通道上下文,代指Channel;
  • ByteBuf:字节序列,通过ByteBuf操作基础的字节数组和缓冲区,因为JDK原生操作字节麻烦、效率低,所以Netty对字节的操作进行了封装,实现了指数级的性能提升,同时使用更加便利;
  • CharsetUtil:这个是JDK原生的方法,用于指定字节数组转换为字符串时的编码格式。

(2)创建客户端
客户端启动类根据服务器端的IP和端口,建立连接,连接建立后,实现消息的双向传输,代码较简洁,如下:

public class AppClientHello
{
    private final String host;
    private fina lint port;

    public AppClientHello(String host, int port)
    {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception
    {
		//定义干活的线程池,I/O线程池
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bs = new Bootstrap();//客户端辅助启动类
            bs.group(group)
                .channel(NioSocketChannel.class)//实例化一个Channel
                .remoteAddress(newInetSocketAddress(host,port))
                .handler(newChannelInitializer<SocketChannel>()//进行通道初始化配置
                         {
                             @Override
                             protected void initChannel(SocketChannel socketChannel) throws Exception
                             {
                                 socketChannel.pipeline().addLast(newHandlerClientHello());//添加我们自定义的Handler
                             }
                         });

            //连接到远程节点;等待连接完成
            ChannelFuture future=bs.connect().sync();

            //发送消息到服务器端,编码格式是utf-8
            future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8));

            //阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            future.channel().closeFuture().sync();

        } finally{
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception
    {
        new AppClientHello("127.0.0.1",18080).run();
    }
}

由于代码中已经添加了详尽的注释,这里只对极个别的进行说明:

  • ChannelInitializer:通道Channel的初始化工作,如加入多个handler,都在这里进行;
  • bs.connect().sync():这里的sync()表示采用的同步方法,这样连接建立成功后,才继续往下执行;
  • pipeline():连接建立后,都会自动创建一个管道pipeline,这个管道也被称为责任链,保证顺序执行,同时又可以灵活的配置各类Handler,这是一个很精妙的设计,既减少了线程切换带来的资源开销、避免好多麻烦事,同时性能又得到了极大增强。

ChannelFuture代表一个异步的I/O操作的结果或状态。在Netty中,几乎所有的I/O操作都是异步执行的,这就意味着当您调用一个方法来执行某个操作时,该方法会立即返回一个ChannelFuture对象,而不会阻塞当前线程等待操作完成。
ChannelFuture提供了以下几个主要的功能:

  • 异步操作结果: ChannelFuture提供了方法来检查操作是否已完成,是否成功或失败,以及获取操作的结果。您可以通过调用isDone()来检查操作是否已完成,isSuccess()来检查操作是否成功,cause()来获取操作失败的原因,get()来获取操作的结果(会阻塞当前线程),或者通过addListener()添加监听器,在操作完成时执行回调方法。

  • 操作的连续性:
    ChannelFuture提供了一系列方法来支持操作的连续性。例如,您可以通过await()方法阻塞当前线程,直到操作完成,或者通过awaitUninterruptibly()方法以无中断方式等待操作完成。此外,您还可以通过sync()方法在操作完成前阻塞当前线程,并在操作失败时抛出异常。

  • 操作的顺序控制:
    ChannelFuture可以与其他ChannelFuture进行组合,以控制操作的顺序。通过调用addListener()并在回调方法中处理下一个操作,您可以实现操作的串行执行或者依赖关系。

总之,ChannelFuture是Netty中表示异步I/O操作结果的重要概念。它提供了一组方法来管理和处理操作的状态、结果和连续性,以便您可以编写具有高性能和灵活性的异步网络应用程序。

(3)创建服务器处理器
和客户端一样,只重写了消息读取方法channelRead(注意这里不是channelRead0)、捕捉异常方法exceptionCaught。
另外服务器端Handler继承的是ChannelInboundHandlerAdapter,而不是SimpleChannelInboundHandler,至于这两者的区别,这里不赘述,大家自行百度吧。
代码如下:

@ChannelHandler.Sharable
public class HandlerServerHello extends ChannelInboundHandlerAdapter
{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)  throws Exception
    {
        //处理收到的数据,并反馈消息到到客户端
        ByteBuf in = (ByteBuf) msg;
        System.out.println("收到客户端发过来的消息: "+ in.toString(CharsetUtil.UTF_8));
 
        //写入并发送信息到远端(客户端)
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是服务端,我已经收到你发送的消息", CharsetUtil.UTF_8));
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        //出现异常的时候执行的动作(打印并关闭通道)
        cause.printStackTrace();
        ctx.close();
    }
}

以上代码很简洁,大家注意和客户端Handler类进行比较。
(4)创建服务器

public class AppServerHello
{
    private int port;
 
    public AppServerHello(int port)
    {
        this.port = port;
    }
 
    public void run() throws Exception
    {
        //Netty的Reactor线程池,初始化了一个NioEventLoop数组,用来处理I/O操作,如接受新的连接和读/写数据
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try{
            ServerBootstrap b = newServerBootstrap();//用于启动NIO服务
            b.group(boss,worker)
                .channel(NioServerSocketChannel.class) //通过工厂方法设计模式实例化一个channel
                .localAddress(newInetSocketAddress(port))//设置监听端口
                .childHandler(newChannelInitializer<SocketChannel>() {
                    //ChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel,用于把许多自定义的处理类增加到pipline上来
                    @Override
                    //ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。
                    public void initChannel(SocketChannel ch) throws Exception {
                        //配置childHandler来通知一个关于消息处理的InfoServerHandler实例
                        ch.pipeline().addLast(new HandlerServerHello());
                    }
                });

            //绑定服务器,该实例将提供有关IO操作的结果或状态的信息
            ChannelFuture channelFuture= b.bind().sync();
            System.out.println("在"+ channelFuture.channel().localAddress()+"上开启监听");
 
            //阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            // closeFuture().sync()会阻塞当前线程,直到通道关闭操作完成。这可以用于确保在关闭通道之前,程序不会提前退出。
            channelFuture.channel().closeFuture().sync();
        } finally{
            group.shutdownGracefully().sync();//关闭EventLoopGroup并释放所有资源,包括所有创建的线程
        }
    }
 
    public static void main(String[] args)  throws Exception
    {
        new AppServerHello(8080).run();
    }
}

代码说明:

  • EventLoopGroup:实际项目中,这里创建两个EventLoopGroup的实例,一个负责接收客户端的连接,另一个负责处理消息I/O。
  • NioServerSocketChannel:通过工厂通过工厂方法设计模式实例化一个channel,这个在大家还没有能够熟练使用Netty进行项目开发的情况下,不用去深究。
    通常我们会将ChannelPipeline的定义放在一个独立的外部类中,如下:
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // 添加解码器
        pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
        pipeline.addLast(new MyMessageDecoder());

        // 添加编码器
        pipeline.addLast(new LengthFieldPrepender(2));
        pipeline.addLast(new MyMessageEncoder());

        // 添加业务逻辑处理器
        pipeline.addLast(new MyBusinessHandler());
    }
}

在这个示例中,我们首先创建了一个自定义的 ChannelInitializer,并重写了 initChannel 方法。在该方法中,我们通过 ch.pipeline() 获取 ChannelPipeline 的实例,然后依次添加解码器、编码器和业务逻辑处理器。这样,当有新的连接建立时,Netty 会自动调用 initChannel 方法,为新连接创建一个包含指定处理器的 ChannelPipeline。

通过灵活地组合不同的 ChannelHandler,用户可以轻松地实现各种网络协议和应用逻辑。


在这里插入图片描述

欢迎添加微信,加入我的核心小队,请备注来意

👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1644952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

全网最全!场外个股期权和场内个股期权的区别的详细解析

场外个股期权和场内个股期权的区别 场外个股期权是指在沪深交易所之外交易的个股期权&#xff0c;其本质是一种金融衍生品&#xff0c;允许投资者在股票交易场所外以特定价格买进或卖出证券。场内个股期权是以单只股票作为标的资产的期权合约&#xff0c;其内在价值是基于标的…

编程入门(六)【Linux系统基础操作一】

读者大大们好呀&#xff01;&#xff01;!☀️☀️☀️ &#x1f525; 欢迎来到我的博客 &#x1f440;期待大大的关注哦❗️❗️❗️ &#x1f680;欢迎收看我的主页文章➡️寻至善的主页 文章目录 &#x1f525;前言&#x1f680;Linux操作系统介绍与环境准备Linux操作系统介…

Java中的枚举类型介绍

一、背景及定义 情景&#xff1a; 枚举是在JDK1.5以后引入的。 主要用途是&#xff1a; 将一组常量组织起来&#xff0c;在这之前表示一组常量通常使用定义常量的方式&#xff1a; 这种定义方式实际上并不好。 例如&#xff1a;如果碰巧有另一个变量也是1&#xff0c;那么…

举个栗子!Alteryx 技巧(11):运用目录工具

你了解目录工具吗&#xff0c;它有什么作用&#xff1f;目录工具可以返回指定目录中所有文件的列表&#xff0c;即返回文件名和有关文件的其他信息&#xff0c;例如文件大小、创建日期、上次修改日期等。那么&#xff0c;怎样运用目录工具返回相关信息呢&#xff1f; 本期《举…

面对 800G 以太网设计的挑战

以太网是一种广泛使用的网络技术&#xff0c;用于连接局域网和广域网中的设备。它以 10 Mbps 的适度速度开始&#xff0c;多年来经历了大规模的创新。如今&#xff0c;以太网技术正朝着 800 Gbps 的速度发展&#xff0c;为数据中心和云计算基础设施等高需求环境提供超快的数据传…

Python-VBA函数之旅-pow函数

目录 一、pow函数的常见应用场景 二、pow函数使用注意事项 三、如何用好pow函数&#xff1f; 1、pow函数&#xff1a; 1-1、Python&#xff1a; 1-2、VBA&#xff1a; 2、推荐阅读&#xff1a; 个人主页&#xff1a;神奇夜光杯-CSDN博客 一、pow函数的常见应用场景 Py…

PyQt5中重要的概念:信号与槽

PyQt中信号与槽概念定义如下&#xff08;网络上引用的&#xff09;&#xff1a; 信号&#xff08;signal&#xff09;和槽&#xff08;slot&#xff09;是Qt的核心机制&#xff0c;也是在PyQt编程中对象之间进行通信的机制。在创建事件循环之后&#xff0c;通过建立信号和槽的…

asp.net朱勇项目个人博客(3)

引文:按照书上的项目&#xff0c;我们最后实现管理端的三个增删改查的功能即可,相对与三个增删改查&#xff0c;文章&#xff0c;分类和留言&#xff0c;这里我们所需要用的的关联的一个表就是文章表&#xff0c;因为文章表每一个文章的增加显示和修改都需要对应的一个分类&…

OpenHarmony实战开发-如何实现动画帧

请求动画帧 请求动画帧时通过requestAnimationFrame函数逐帧回调&#xff0c;在调用该函数时传入一个回调函数。 runframe在调用requestAnimationFrame时传入带有timestamp参数的回调函数step&#xff0c;将step中的timestamp赋予起始的startTime。当timestamp与startTime的差…

分布式与一致性协议之ZAB协议(二)

ZAB协议 ZAB协议是如何实现操作地顺序性的&#xff1f; 如果用一句话解释ZAB协议到底是什么&#xff0c;我觉得它是能保证操作顺序性的、基于主备模式的原子广播协议。 接下来&#xff0c;还是以指令X、Y为例具体演示一下&#xff0c;帮助你更好地理解为什么ZAB协议能实现操作…

8086 汇编学习 Part 8

移位指令 当 C N T > 1 CNT > 1 CNT>1 时&#xff0c;CNT 必须是 CL 寄存器 逻辑左移 SHL OPR , CNT 将寄存器或内存单元中的数据向左移 CNT 位&#xff0c;最后移除的一位写入 CF&#xff0c;最低位用 0 补充 循环左移 ROL OPR , CNT 将寄存器中的值的最高位存…

山海鲸医疗科技:引领智慧医疗新潮流

随着科技的飞速发展&#xff0c;智慧医疗已经成为医疗行业创新的重要方向。在这个背景下&#xff0c;山海鲸智慧医疗解决方案应运而生&#xff0c;以其先进的技术和全面的服务&#xff0c;为医疗行业带来了前所未有的变革。 山海鲸智慧医疗解决方案是一套集成医疗信息化、大数…

三丰云:免费虚拟主机和云服务器的全面评测

引言&#xff1a; 在当前数字化时代&#xff0c;云计算已经成为企业和个人实现在线业务和数据存储的重要方式。然而&#xff0c;选择适合自己需求的云服务提供商并非易事。今天&#xff0c;我们将对三丰云的免费虚拟主机和云服务器进行全面评测&#xff0c;帮助您了解其优势和…

FME学习之旅---day26

我们付出一些成本&#xff0c;时间的或者其他&#xff0c;最终总能收获一些什么。 【由于上周&#xff0c;上班状态不是很好&#xff0c;事情多又杂&#xff0c;没有学习的劲头&#xff0c;就短暂的休息了一下下。双休爬山&#xff0c;给自己上了强度&#xff0c;今天才缓过来…

云商城系统,无后门,一站式系统Java源码

云商城系统&#xff0c;无后门&#xff0c;一站式系统Java源码&#xff0c;心权益商品数量不限数量 系统对接 手动发货 自动发货 兑 换 码 订单监控 商品监控 对象存储 邮箱提醒 加价模板 密价功能 三方支付 会员体系 财务明细 交易分析 售后服务 技术支持 建议配置&#xf…

Flask与HTTP

一、请求响应循环 “请求-响应循环”&#xff1a;客户端发出请求&#xff0c;服务器处理请求并返回响应。 Flask Web程序的工作流程&#xff1a; 当用户访问一个URL&#xff0c;浏览器便生成对应的HTTP请求&#xff0c;经由互联网发送到对应的Web服务器。Web服务器接收请求&a…

本地部署开源白板工具Excalidraw并结合内网穿透远程绘制流程图

文章目录 1. 安装Docker2. 使用Docker拉取Excalidraw镜像3. 创建并启动Excalidraw容器4. 本地连接测试5. 公网远程访问本地Excalidraw5.1 内网穿透工具安装5.2 创建远程连接公网地址5.3 使用固定公网地址远程访问 本文主要介绍如何在Ubuntu系统使用Docker部署开源白板工具Excal…

request库爬取100页tb商品信息

注册账号获取api测试key 1.打开淘宝网站登录并且搜索商品信息。和上一篇文章不一样&#xff0c;这一次我们找到search&#xff1f;这个文件右击复制。 2.在上一篇文章用到的在线curl命令转代码网站将复制过来的东西转换成python代码后&#xff0c;运行得到response。 3.建立cs…

Leetcode—1652. 拆炸弹【简单】

2024每日刷题&#xff08;127&#xff09; Leetcode—1652. 拆炸弹 实现代码 class Solution { public:vector<int> decrypt(vector<int>& code, int k) {int codeSize code.size();vector<int> ans(codeSize, 0);if(k 0) {return ans;}if(k > 0)…

vue2.7与vue2.6、vue3的区别

官网链接&#xff1a;https://v2.cn.vuejs.org/v2/guide/migration-vue-2-7.html -组合式与选项式 选项式&#xff1a;export default { 各种选项关键字名&#xff0c;都定好了&#xff0c;无需引入&#xff0c;配对放置即可}