一、为什么选择Netty:
API使用简单,开发门槛低,屏蔽了NIO通信的底层细节。
功能强大,预制了很多种编解码功能,支持主流协议。
定制能力强,可以通过ChannelHandler对通信框架进行灵活地拓展。
性能高、成熟、稳定(Netty修复了已经发现的所有Java NIO BUG)。
社区活跃、版本迭代周期短。
经历了大规模的商业应用考研,质量得到验证。
二、Netty基础知识:
Netty的主要应用场景:
RPC框架的基础网络通信框架:主要用于分布式节点之间的通信和数据交换。例如Dubbo、RocketMQ、Hadoop的基础通信和序列化框架Avro。
私有协议的基础通信框架:例如Thrift协议、Dubbo协议等。
公有协议的基础通信框架:例如HTTP协议、SMPP协议等。
Netty服务端创建需要必备的知识:
熟悉JDK NIO主要类库的使用,例如ByteBuffer、Selector、ServerSocketChannel、SelectionKey等。
熟悉JDK的多线程编程。
了解Reactor模式。
Netty服务端的作用:
服务端需要监听客户端链接、处理客户端链接的读写。
Netty中典型的网络事件:
链路注册、链路激活、链路断开、接受到请求消息、请求消息接受并处理完毕、发送应当消息、链路发送异常、发送用户自定义事件
Netty提供的ChannelHeader:
系统编解码框架:ByteToMessageCodec
通用基于长度的半包编解码:LengthFieldBassedFrameDecoder
码流日志打印Handler:LoggingHandler
SSL安全认证Handler:SslHandler
链路空闲检查Handler:IDLEStateHandler
流量整形Handler:ChannelTrafficShapingHandler
Base64编解码:Base64Decoder和Base64Encoder
Netty提供的主要TCP参数:
SO_TIMEOUT(timeout):控制读取操作将阻塞多少毫秒。如果返回值为0,计时器就被禁止了,该线程将无限期阻塞。
SO_SNDBUF(snd buf):套接字使用的发送缓冲区大小。
SO_RCVBUF(rcv buf):套接字使用的接收缓冲区大小。
SO_REUSEADDR(resuse addr):用于决定如果网络上仍然有数据向旧的ServerSocket传输数据,是否允许新的ServerSocket绑定到与旧的ServerSocket同样的端口上。SO_REUSEADDR选项的默认值与操作系统有关,在某些操作系统中,允许重用端口,而在某些操作系统中不允许重用端口。
CONNECT_TIMEOUT_MILLIS:客户端连接超时时间,由于NIO原生的客户端并不提供设置连接超时的接口,因此,Ntty采用的是自定义连接超时定时器负责检测和超时控制。
TCP_NODELAY:激活或禁止TCP_NODELAY套接字选项,它决定是否使用Nagle算法。如果是时延敏感型的应用,建议关闭Nagle算法。
三、Netty代码相关:
(一) ByteBuf和相关辅助类:
1. JDK ByteBuffer的缺点:
ByteBuffer固定长度,一旦分配完成,不能动态调整。
ByteBuffer只有一个标识位置的指针position,读写时需要手工调用flip()【作用:读写切换】和rewind()【将position=0、mark=-1、limit的值不变】。
ByteBuffer的API功能有限,一些高级和使用的特性它不支持。
2. ByteBuf的基本功能:
ByteBuf依然是一个Byte数组的缓冲区,他的基本功能与ByteBuffer一直,主要如下:
7种Java基础类型、byte数组、ByteBuffer(ByteBuf)等的读写。
缓冲区自身的copy和slice等。
设置网络字节序。 ->网络字节序采用高端字节序的表示方法
构造缓冲区实例。
操作位置指针等方法。
3. ByteBuf功能介绍:
顺序读操作(read):
它类似于ByteBuffer的get操作,方法名称以readXXX开头。
顺序写操作(write):
它类似于ByteBuffer的put操作,方法名称以writeXXX开头。
readerIndex和writerIndex:
Netty提供两个指针变量用于支持顺序读取和写入操作:readerIndex用于标识读取索引、writerIndex用于标识写入索引。
Discardable bytes:
缓冲区的分配和释放都是一个耗时的操作,我需要尽量重用他们。DiscardableBytes会清除已经读取的缓冲区,将可读取的字节数组复制到原理已经读取的缓冲区上。但是频繁的内存复制会牺牲性能。
Readabke bytes和Writable bytes:
可读空间段是数据实际存储的区域,以read或skip开头的任何操作都将会从readerIndex开始读取或者跳过指定的数据,操作完成之后readerIndex增加了读取或跳过字节数长度。
可写空间段是尚未被使用可以填充的空闲空间,任何以write开头的操作都会从writerIndex开始向空闲空间写入字节,操作完成之后writerIndex增加写入的字节数量长度。
Clear操作:
将readerIndex和writerIndex还原成初始分配值。
Mark 和Rest:
对缓冲区进行读操作时,需要对之前操作进行回滚。读操作并不会改变缓冲区的内容,回滚操作主要就是重新设置索引信息。
ByteBuff,调用mark操作,会将当前位置的指针备份到mark变量中,调用rest操作之后,在复原。
Netty也有类型的mark和rest接口,因为ByteBuf有读写索引,因此,它总共有4个相关方法:
markReaderIndex:将当前的readerIndex备份到MarkReaderIndex中;
restReaderIndex:将当前readerIndex设置成MarkReaderIndex;
markWriterIndex:将当前的writerIndex备份到MarkWriterIndex中;
restWriterIndex:将当前writerIndex设置成MarkWriterIndex;
查找操作:
它提供了多种查找方法用于满足不同的应用场景:indexOf、bytesBefore等等
Derived Buffers:
类似于数据库的视图,ByteBuf提供多个接口用于创建某个ByteBuf视图或者复制ByteBuf。具体方法有duplicate、cope等。
转换成标准的ByteBuffer
随机读写(set和get)
随机读写是指可以指定读写的索引位置。
4. ByteBuf内存分配角度分类:
堆内存字节缓冲区(HeapByteBuf):
特点:内存分配和回收速度快,可以被JVM自动回收;缺点:如果进行IO读写,需要额外一次内存复制,IO读写速度慢一些。
直接内存缓冲区(DirectByteBuf):
特点:堆外分配内存,减少一次内存复制,IO读写速度快一些;缺点:内存分配和回收慢一些。
5. ByteBuf内存回收的角度分类:
基于对象池的ByteBuf与普通的ByteBuf:
基于对象池ByteBuf的特点:可以重用ByteBuf对象,循环利用,提升内存的使用效率,降低高负载导致的频繁GC。
6. UnpooledHeapByteBuf(堆内存):
特点:它基于堆内存进行分配的字节缓冲区,并且没有基于对象池技术实现,所以每次IO的读写都会创建一个新对象。
缺点:频繁进行大块内存的分配和回收对性能会造成一定影响,但是相对于堆外内存的申请和释放,它的成本,还是会低一些。
7. PooledByteBuf(内存池):
见《Netty的内存池》
8. PooledDirectByteBuf(基于内存池)与UnPooledDirectByteBuf(不基于内存池)
9. ByteBuf相关辅助类的功能:
ByteBufHolder:
它是ByteBuf的容器。ByteBufHolder对象,包含了一个ByteBuf,另外还提供了一些其他的实用方法,使用者继承ByteBufHolder接口后可以按需求封装自己的实现。
ByteBufAllocator:
它是字节缓冲区分配器,按照Netty的缓冲区实现不同,有两种分配器:PoolByteBufAllocator(基于内存池的)、UnPoolByteBufAllocator(普通的)、
Composite ByteBuf:
它允许将多个ByteBuf的实例组装到一起,形成一个统一的视图。
ByteBufUtil:
工具类。
(二) Channel与Unsafe:
1. 概念:
io.netty.channel.Channel是Netty网络的抽象类,主要包括网络的读、写、客户端发起链接、主动关闭、链路关闭、获取通讯双方的网络地址等,它还包含了Netty框架的一些功能,包括获取该Channel的EvenLoop、获取缓冲分配器和Pipeline等。
2. 设计理念:
它的设计原理比较简单,但是功能却比较繁杂,主要的设计理念如下:
在Channel接口层,采用Facade模式进行统一封装,将网络I/O操作、网络I/O相关联的其他操作封装起来,统一对外提供。
Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度地实现功能和接口的重用。
具体实现采用聚合,而非包含的方式,将相关的功能类聚合在Channel中,由Channel统一负责分配和调度,功能实现更加灵活。
3. 主要功能介绍:
网络IO操作:
Channel read():
ChannelFuture write(Object msg):write操作只是将消息存入到信息发送的环形数组中,并没有真正发送,只用调用flush操作,才被写入Channel中,发送给对方。
ChannelFuture write(Object msg,ChannelPromise promise):与上述类似,就是多携带了ChannelPromise参数负责设置写入操作的结果。
Channel flush():将之前写入到发送环形数组中的消息全部写入到目标Channel中,发送给通讯对方。
ChannelFuture writeAndFlush(Object msg,ChannelPromise promise):等价于write+flush操作
ChannelFuture writeAndFlush(Object msg):
ChannelFuture close(ChannelPromise promise):主动关闭当前链接
ChannelFuture disconnect(ChannelPromise promise):请求断开与远程通信对端的链接并使用ChannelPromise来获取操作结果的通知。
ChannelFuture connect(SocketAddress remoteAddress):客户端使用指定的服务端地址发起链接请求。
ChannelFuture connect(SocketAddress remoteAddress,ChannelPromise promise):
ChannelFuture connect(SocketAddress remoteAddress,SocketAddress localAddress,ChannelPromise promise):多了绑定本地地址
ChannelFuture bind(SocketAddress localAddress):绑定指定的本地地址
ChannelFuture bind(SocketAddress localAddress,ChannelPromise promise):
ChannelConfig config():获取当前Channel的配置信息,例如CONNECT_TIMEOUT_MILLIS
boolean isOpen():判断当前Channel是否已经打开。
boolean isRegistered():判断当前Channel是否已经注册到EventLoop上。
boolean isActive():判断当前Channel是否已经处于激活状态。
.... ...
其他常用API功能:
eventLoop():获取到Channel注册的EventLoop()
metedate():获取当前Channel的元数据描述
parent():对于服务端Channel而言,它的父Channel为空,对于客户端Channel,它的父Channel就是创建它的ServerSocketChannel。
id():获取Channel的标识id
... ...
4. Channel主要的基础关系类图:
5. Channel生命周期(4种状态):
channelUnregistered:Channel已创建,还未注册到一个EventLoop上。
channelRegistered:Channel已创建,已经注册到一个EventLoop上。
channelActive:Channel处于活跃状态(已经连接到远端),可以接受和收发数据。
channelInactive: Channel未连接到远端。
Channel正常的生命周期如下图。随着状态发生变化,产生相应的事件。这些事件被转发到ChannelPipeline中的ChannelHandler来触发相应的操作。
6. Unsafe概念:
Unsafe接口实际是Channel接口的辅助接口,它不应该被用户代码直接调用。实际的IO读写操作都是由Unsafe接口负责完成的。
7. Unsafe继承关系:
(三) ChannelPipeline与ChannelHandler:
1. 两者相关概念:
Netty的ChannelPipeline与ChannelHandler机制类似于Servlet和Filter过滤器,这类拦截器实际上是职责链模式的一种变形,主要是为了方便事件的拦截和用户业务逻辑的指定。
Netty将Channel的数据管道抽象为ChannelPipeline,消息在ChannelPipeline中流动和传递。
ChannelPipeline持有IO事件拦截器ChannelHandler的链表,由ChannelHandler对IO事件进行拦截和处理,可以方便通过,新增和删除ChannelHandler来实现不同的业务逻辑定制,不需要对现有ChannelHandler进行修改,进而能够实现对修改封闭和对拓展的支持。
2. ChannelPipeline的结构:
ChannelPipeline是ChannelHandler的容器,它负责ChannelHandler的管理,事件拦截与调用。它与Map等容器的实现非常类似。
ChannelPipeline底层使用了一个双向链表来存储ChannelHandler,但并不是直接存储的ChannelHandler,而是ChannelHandlerContext,在ChannelHandlerContext可以直接获取到与之对应的ChannelHandler、ChannelPipeline、Channel。
3. ChannelPipeline的主要特性:
ChannelPipeline支持运行态动态的添加或删除ChannelHandler:
例如:当业务高峰期需要对系统做拥塞保护时,可以根据系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护ChannelHandler添加到当前的ChannelPipeline中,当高峰期过去之后,就可以动态删除拥塞保护ChannelHandler。
ChannelPipeline是线程安全,ChannelHandler却不是线程安全:
ChannelPipeline线程安全表示:N个业务线程可以并发地操作ChannelPipeline,而不存在多线程并发问题。ChannelHandler需要自己保证ChannelHandler的线程安全。
4. ChannelPipeline的事件处理:
4.1 消息的读取和发送处理全流程描述如下:
底层的 SocketChannel read() 读取ByteBuf,触发ChannelRead事件,由IO线程 NioEventLoop 调用 ChannelPipeline 的fireChannelRead(Object msg) ,将消息(ByteBuf)传输到ChannelPipeline中。
消息依次被 HeadHandler、ChannelHandler1 … TailHandler(从头到尾的Handler)拦截和处理,在这个过程中,任何ChannelHandler都可以中断当前的流程,结束消息的传递。
用户在调用 ChannelHandlerContext 的 write() 发送消息,消息从TailHandler、ChannelHandlerN … HeadHandler(从尾到头的Handler),最终被添加到消息发送缓冲区中,等待刷新和发送,在此过程中,可以中断消息的传递,例如当编码失败时,就需要中断流程,构造异常的Future返回。
4.2 Inbound(入站)事件(对应图17-1的左半部分):
它通常由IO线程触发,例如:TCP链路建立事件、链路关闭事件、读事件、异常通知事件等。
Pipeline中以fireXXX命名的方法都是从IO线程流向用户业务Handler的inbound事件。
触发Inbound事件方法如下:
ChannelHandlerContext.fireChannelRegistered():Channel注册事件。
ChannelHandlerContext.fireChannelActive():TCP链路建立成功,Channel激活事件。
ChannelHandlerContext.fireChannelRead(Object):读事件。
ChannelHandlerContext.fireChannelReadComplete():读操作完成通知事件。
ChannelHandlerContext.fireExceptionCaught(Throwable):异常通知事件。
ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件。
ChannelHandlerContext.fireChannelWritabilityChanged():Channel的可写状态变化通知事件。
ChannelHandlerContext.fireChannellnactive():TCP链路关闭,链路不可用通知事件。
4.3 Outbound(出站)事件(对应图17-1的右半部分):
它通常是由用户主动发起的网络I/O操作,例如:用户发起的连接操作、绑定操作、消息发送等操作。
触发outbound事件的方法如下:
ChannelHandlerContext.bind(SocketAddress,ChannelPromise):绑定本地地址事件。
ChannelHandlerContext.connect(SocketAddress,SocketAddress,ChannelPromise):连接服务端事件。
ChannelHandlerContext.write(Object,ChannelPromise):发送事件。
ChannelHandlerContext.flush():刷新事件。
ChannelHandlerContext.writeAndFlush(Object msg):发送并刷新事件。
ChannelHandlerContext.writeAndFlush(Object msg, ChannelPromise promise):发送并刷新事件。
ChannelHandlerContext.read():读事件。
ChannelHandlerContext.disconnect(ChannelPromise):断开连接事件。
ChannelHandlerContext.close(ChannelPromise):关闭当前Channel事件。
5. ChannelPipeline类的继承关系图:
6. 如何构建ChannelPipeline:
用户不用自己创建ChannelPipeline,使用ServerBootstrap或Bootstrap启动时,Netty会为每个Channel链接创建一个独立的pipeline,对应使用者而言,只需要将自定义拦截器加入到pipeline即可。
7. 如何自定义拦截器(ChannelHandler):
通常只需继承ChannelHandlerAdapter类覆盖自己关心的方法即可。
8. ChannelHandler概念:
ChannelHandler类似于Servlet的Filter过滤器,负责对IO事件或者IO操作进行拦截和处理,它可以选择性地拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递。
ChannelHandler支持注解,目前支持两种注解:
Sharable:多个ChannelPipeline共用一个ChannelHandler
Skip:被Skip注解的方法不会调用,直接被忽略。
9. Netty框架自己实现的ChannelHandler:
ChannelHandlerAdapter:
绝大多数的ChannelHandler会选择性的拦截和处理某一个或者某些事件,其他事件忽略,由下一个ChannelHandler进行拦截和处理,但是用户实现ChannelHandler接口就必须实现所有的方法,会导致代码冗余和臃肿。所以,Netty提供了ChannelHandlerAdapter基类,它对所有接口实现都是事件透传。
主要方法:*
isSharable():是否可共享,此Handler是否可以添加到不同的ChannelPipeline中
handlerAdded(ChannelHandlerContext ctx):添加Handler
handlerRemoved(ChannelHandlerContext ctx):移除Handler
exceptionCaught(ChannelHandlerContext ctx, Throwable cause):发生异常时,调用此方法。该方法用ctx.fireExceptionCaught(cause)会将异常发送给ChannelPipeline中的下一个ChannelHandler (Tips:以下的方法都类似)
channelRegistered(ChannelHandlerContext ctx):Channel注册时,调用此方法。
channelActive(ChannelHandlerContext ctx):TCP链路建立成功时,调用此方法。
channelInactive(ChannelHandlerContext ctx):TCP链路关闭时,调用此方法。
channelRead(ChannelHandlerContext ctx, Object msg):服务端返回应答信息时,该方法被调用。即读事件开始。
channelReadComplete(ChannelHandlerContext ctx):读取完本次socket中的数据时,该方法被调用。Tips:TCP拆包粘包的问题,业务上数据完整性与Socket中完整性不同,可能一次完整的业务数据,触发多次channelReadComplete()。
userEventTriggered(ChannelHandlerContext ctx, Object evt):触发用户自定义事件时,调用此方法。
channelWritabilityChanged(ChannelHandlerContext ctx):Channel的可写状态变化时,调用此方法。
--- --- ---- ----
bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise):绑定本地地址时,调用此方法。
connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise):连接服务端时,调用此方法。
disconnect(ChannelHandlerContext ctx, ChannelPromise promise):断开连接时,调用此方法。
close(ChannelHandlerContext ctx, ChannelPromise promise):关闭当前Channel时,调用此方法。
read(ChannelHandlerContext ctx):触发读事件时,调用此方法。
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise):触发写事件时,调用此方法。
flush(ChannelHandlerContext ctx):触发刷新时,调用此方法。
ByteToMessageDecoder:
为了方便将ByteBuf解码成业务POJO对象,Netty提供了ByteToMessageDecoder抽象工具解码类。用户的解码器继承该类,实现void decode()方法即可完成ByteBuf到POJO。
但是它没有考虑TCP粘包和组包的等场景,读半包需要用户解码器自己负责处理。
MessageToMessageDecoder:
它是Netty的二次解码器,将一个对象二次解码为其他对象。
LengthFieldBasedFrameDecoder:
如果消息是通过长度进行区分的,LengthFieldBasedFrameDecoder都可以自动处理粘包和半包问题。
原理:在消息头中包含一个长度字段。并使用四个参数组合进行解码:lengthFieldOffset、lengthFieldLength、lengthAdjustment、initialBytesToStrip
MessageToByteEncoder:
将POJO对象编码成ByteBuf,用户的编码器继承该类,实现void encode()方法。
Tips:在Netty中,编码器是一个Outbound出站处理器。
MessageToMessageEncoder:
将一个POJO对象编码成另一个对象。
LengthFieldPrepender:
如果协议中的第一个字段为长度字段,Netty提供了LengthFieldPrepender编码器,它可以计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf缓冲区头中。
10. ChannelHandler的类继承关系图:
11. ChannelHandler中相关联的事件:
Channel新连接建立相关事件:handlerAdded -> channelRegistered -> channelActive
EventLoop监听Channel read事件:channelRead -> channelReadComplete
Channel关闭:channelInactive -> channelUnregistered -> handlerRemoved
Channel读取信息异常:exceptionCaught