🐼作者简介:一名大三在校生🎋
空有想法,没有实践,难成大事
专栏前言:探索RPC框架的奥秘
简介:在现代软件开发中,随着微服务架构的普及,远程过程调用(RPC)框架成为了连接服务之间通信的桥梁。我有决定开发了一款高性能的RPC框架,它不仅实现了服务之间的高效调用,还集成了关键的服务治理功能,如负载均衡、熔断机制和限流策略,以确保系统的稳定性和可靠性。
核心技术:本项目采用Netty作为其强大的底层通信组件,确保了网络通信的高效与稳定。同时,通过与ZooKeeper的结合,实现了服务的注册与发现,为服务治理提供了坚实的基础。
下面我将提供一个全面的视角,来理解RPC框架的内部工作原理及其在实际开发中的应用。欢迎大家持续关注订阅专栏!!!
专属社群:
文章目录
- 专栏前言:探索RPC框架的奥秘
- 三、netty入门
- 1、为什么要学习netty?
- 2、netty的基本工作流程
- 3、netty中的helloworld
- 欢迎添加微信,加入我的核心小队,请备注来意
三、netty入门
1、为什么要学习netty?
一方面:现在物联网的应用无处不在,大量的项目都牵涉到应用传感器和服务器端的数据通信,Netty作为基础通信组件、能够轻松解决之前有较高门槛的通信系统开发,你不用再为如何解析各类简单、或复杂的通讯协议而薅头发了,有过这方面开发经验的程序员会有更深刻、或者说刻骨铭心的体会。
另一方面:现在互联网系统讲究的都是高并发、分布式、微服务,各类消息满天飞(是的,IM系统、消息推送系统就是其中的典型),Netty在这类架构里面的应用可谓是如鱼得水,如果你对当前的各种应用服务器不爽,那么完全可以基于Netty来实现自己的HTTP服务器、FTP服务器、UDP服务器、RPC服务器、WebSocket服务器、Redis的Proxy服务器、MySQL的Proxy服务器等等。
2、netty的基本工作流程
在netty中存在以下的核心组件:
- ServerBootstrap:服务器端启动辅助对象;
- Bootstrap:客户端启动辅助对象;
- Channel:通道,代表一个连接,每个Client请对会对应到具体的一个Channel;
- ChannelPipeline:责任链,每个Channel都有且仅有一个ChannelPipeline与之对应,里面是各种各样的Handler;
- handler:用于处理出入站消息及相应的事件,实现我们自己要的业务逻辑;
- EventLoopGroup:I/O线程池,负责处理Channel对应的I/O事件;
- ChannelInitializer:Channel初始化器;
- ChannelFuture:代表I/O操作的执行结果,通过事件机制,获取执行结果,通过添加监听器,执行我们想要的操作;
- ByteBuf:字节序列,通过ByteBuf操作基础的字节数组和缓冲区。
我们结合其核心组件通过下图,可以清晰的看明白netty的基本工作原理:
在这其中,ChannelPipeline 是一个重要的组件,用于处理 I/O 事件和拦截 I/O 操作。它是一个处理器链,负责将 I/O 操作分发给各个 ChannelHandler 进行处理。通过组合不同的 ChannelHandler,用户可以定制处理网络事件的逻辑,其中大多数的ChannelHandler需要我们手动编写。
一个典型的 Netty ChannelPipeline 可以包含以下几种 ChannelHandler:
- 解码器(Decoder):将接收到的字节流(ByteBuf)解码为应用层所使用的数据结构(如 POJO 对象)。常见的解码器有:ByteToMessageDecoder、LengthFieldBasedFrameDecoder 等。
- 编码器(Encoder):将应用层的数据结构编码为字节流,以便在网络中传输。常见的编码器有:MessageToByteEncoder、LengthFieldPrepender 等。
- 业务逻辑处理器:处理应用层的业务逻辑,如数据库操作、业务计算等。业务逻辑处理器通常需要继承 ChannelInboundHandlerAdapter 或 ChannelOutboundHandlerAdapter,并实现相应的事件处理方法。
3、netty中的helloworld
首先创建Handler类,该类用于接收服务器端发送的数据,这是一个简化的类,只重写了消息读取方法channelRead0、捕捉异常方法exceptionCaught。
(1)定义客户端的处理器
客户端的Handler一般继承的是SimpleChannelInboundHandler,该类有丰富的方法,心跳、超时检测、连接状态等等,代码如下:
@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf>
{
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception
{
/**
* @Description 处理接收到的消息
**/
System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException
{
/**
* @Description 处理I/O事件的异常
**/
cause.printStackTrace();
ctx.close();
}
}
代码说明:
- @ChannelHandler.Sharable:这个注解是为了线程安全,如果你不在乎是否线程安全,不加也可以;
- SimpleChannelInboundHandler:这里的泛型可以是ByteBuf,也可以是String,还可以是对象,根据具体的实际情况来;
- channelRead0:读取消息的方法,注意名称中有个0;
- ChannelHandlerContext:通道上下文,代指Channel;
- ByteBuf:字节序列,通过ByteBuf操作基础的字节数组和缓冲区,因为JDK原生操作字节麻烦、效率低,所以Netty对字节的操作进行了封装,实现了指数级的性能提升,同时使用更加便利;
- CharsetUtil:这个是JDK原生的方法,用于指定字节数组转换为字符串时的编码格式。
(2)创建客户端
客户端启动类根据服务器端的IP和端口,建立连接,连接建立后,实现消息的双向传输,代码较简洁,如下:
public class AppClientHello
{
private final String host;
private fina lint port;
public AppClientHello(String host, int port)
{
this.host = host;
this.port = port;
}
public void run() throws Exception
{
//定义干活的线程池,I/O线程池
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bs = new Bootstrap();//客户端辅助启动类
bs.group(group)
.channel(NioSocketChannel.class)//实例化一个Channel
.remoteAddress(newInetSocketAddress(host,port))
.handler(newChannelInitializer<SocketChannel>()//进行通道初始化配置
{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
{
socketChannel.pipeline().addLast(newHandlerClientHello());//添加我们自定义的Handler
}
});
//连接到远程节点;等待连接完成
ChannelFuture future=bs.connect().sync();
//发送消息到服务器端,编码格式是utf-8
future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8));
//阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
future.channel().closeFuture().sync();
} finally{
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception
{
new AppClientHello("127.0.0.1",18080).run();
}
}
由于代码中已经添加了详尽的注释,这里只对极个别的进行说明:
- ChannelInitializer:通道Channel的初始化工作,如加入多个handler,都在这里进行;
- bs.connect().sync():这里的sync()表示采用的同步方法,这样连接建立成功后,才继续往下执行;
- pipeline():连接建立后,都会自动创建一个管道pipeline,这个管道也被称为责任链,保证顺序执行,同时又可以灵活的配置各类Handler,这是一个很精妙的设计,既减少了线程切换带来的资源开销、避免好多麻烦事,同时性能又得到了极大增强。
ChannelFuture代表一个异步的I/O操作的结果或状态。在Netty中,几乎所有的I/O操作都是异步执行的,这就意味着当您调用一个方法来执行某个操作时,该方法会立即返回一个ChannelFuture对象,而不会阻塞当前线程等待操作完成。
ChannelFuture提供了以下几个主要的功能:
-
异步操作结果: ChannelFuture提供了方法来检查操作是否已完成,是否成功或失败,以及获取操作的结果。您可以通过调用isDone()来检查操作是否已完成,isSuccess()来检查操作是否成功,cause()来获取操作失败的原因,get()来获取操作的结果(会阻塞当前线程),或者通过addListener()添加监听器,在操作完成时执行回调方法。
-
操作的连续性:
ChannelFuture提供了一系列方法来支持操作的连续性。例如,您可以通过await()方法阻塞当前线程,直到操作完成,或者通过awaitUninterruptibly()方法以无中断方式等待操作完成。此外,您还可以通过sync()方法在操作完成前阻塞当前线程,并在操作失败时抛出异常。 -
操作的顺序控制:
ChannelFuture可以与其他ChannelFuture进行组合,以控制操作的顺序。通过调用addListener()并在回调方法中处理下一个操作,您可以实现操作的串行执行或者依赖关系。
总之,ChannelFuture是Netty中表示异步I/O操作结果的重要概念。它提供了一组方法来管理和处理操作的状态、结果和连续性,以便您可以编写具有高性能和灵活性的异步网络应用程序。
(3)创建服务器处理器
和客户端一样,只重写了消息读取方法channelRead(注意这里不是channelRead0)、捕捉异常方法exceptionCaught。
另外服务器端Handler继承的是ChannelInboundHandlerAdapter,而不是SimpleChannelInboundHandler,至于这两者的区别,这里不赘述,大家自行百度吧。
代码如下:
@ChannelHandler.Sharable
public class HandlerServerHello extends ChannelInboundHandlerAdapter
{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
//处理收到的数据,并反馈消息到到客户端
ByteBuf in = (ByteBuf) msg;
System.out.println("收到客户端发过来的消息: "+ in.toString(CharsetUtil.UTF_8));
//写入并发送信息到远端(客户端)
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是服务端,我已经收到你发送的消息", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
//出现异常的时候执行的动作(打印并关闭通道)
cause.printStackTrace();
ctx.close();
}
}
以上代码很简洁,大家注意和客户端Handler类进行比较。
(4)创建服务器
public class AppServerHello
{
private int port;
public AppServerHello(int port)
{
this.port = port;
}
public void run() throws Exception
{
//Netty的Reactor线程池,初始化了一个NioEventLoop数组,用来处理I/O操作,如接受新的连接和读/写数据
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap b = newServerBootstrap();//用于启动NIO服务
b.group(boss,worker)
.channel(NioServerSocketChannel.class) //通过工厂方法设计模式实例化一个channel
.localAddress(newInetSocketAddress(port))//设置监听端口
.childHandler(newChannelInitializer<SocketChannel>() {
//ChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel,用于把许多自定义的处理类增加到pipline上来
@Override
//ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。
public void initChannel(SocketChannel ch) throws Exception {
//配置childHandler来通知一个关于消息处理的InfoServerHandler实例
ch.pipeline().addLast(new HandlerServerHello());
}
});
//绑定服务器,该实例将提供有关IO操作的结果或状态的信息
ChannelFuture channelFuture= b.bind().sync();
System.out.println("在"+ channelFuture.channel().localAddress()+"上开启监听");
//阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
// closeFuture().sync()会阻塞当前线程,直到通道关闭操作完成。这可以用于确保在关闭通道之前,程序不会提前退出。
channelFuture.channel().closeFuture().sync();
} finally{
group.shutdownGracefully().sync();//关闭EventLoopGroup并释放所有资源,包括所有创建的线程
}
}
public static void main(String[] args) throws Exception
{
new AppServerHello(8080).run();
}
}
代码说明:
- EventLoopGroup:实际项目中,这里创建两个EventLoopGroup的实例,一个负责接收客户端的连接,另一个负责处理消息I/O。
- NioServerSocketChannel:通过工厂通过工厂方法设计模式实例化一个channel,这个在大家还没有能够熟练使用Netty进行项目开发的情况下,不用去深究。
通常我们会将ChannelPipeline的定义放在一个独立的外部类中,如下:
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
pipeline.addLast(new MyMessageDecoder());
// 添加编码器
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new MyMessageEncoder());
// 添加业务逻辑处理器
pipeline.addLast(new MyBusinessHandler());
}
}
在这个示例中,我们首先创建了一个自定义的 ChannelInitializer,并重写了 initChannel 方法。在该方法中,我们通过 ch.pipeline() 获取 ChannelPipeline 的实例,然后依次添加解码器、编码器和业务逻辑处理器。这样,当有新的连接建立时,Netty 会自动调用 initChannel 方法,为新连接创建一个包含指定处理器的 ChannelPipeline。
通过灵活地组合不同的 ChannelHandler,用户可以轻松地实现各种网络协议和应用逻辑。
欢迎添加微信,加入我的核心小队,请备注来意
👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇