Netty 组件学习
- Netty 各个组件通俗理解
- EventLoop
- EventLoopGroup
- 关闭
- Channel
- Future & Promise
- Handler & Pipeline
- ByteBuf
- 创建
- 直接内存和堆内存
- 池化和非池化
- 组成
- 方法
- 扩容机制
- 读取
- retain和release方法
Netty 各个组件通俗理解
Channel即数据通道
Msg是数据,传输时是ByteBuf,通过pipeline加工后可以解码为其他类型对象
Handler是数据的处理工序,Handler合在一起便是pipeline
Handler分为Inbound和Outbound两种
EventLoop是处理数据的工人
绑定:EventLoop负责了某个Channel的工作后,应负责到底
工人既可以执行IO操作,也可以进行业务处理
工人按照pipeline的顺序,依次按照Handler的规划处理数据,可以为每道工序指定不同工人
EventLoop
- 继承自ScheduledExecutorService,包含线程池的所有方法
- 继承自OrderedEventExecutor,inEventLoop(thread)方法可以查看该线程是否属于eventLoop,parent 方法可以查看自己属于哪个EventLoopGroup
维护了一个Selector的单线程执行器
一旦建立连接,Channel会和一个EventLoop绑定,后续所有请求都会由一个EventLoop发送
EventLoopGroup
继承自EventExecutorGroup,实现了Iterable 接口提供遍历EventLoop的能力,next 方法获取下一个EventLoop
设置boss和worker两个EventLoopGroup,boss负责ServerSocketChannel上的accept事件,worker负责socketChannel上的读写
关闭
使用shutdownGracefully
方法优雅地关闭EventLoopGroup,该方法会使EventLoopGroup先切换到关闭状态,拒绝新任务的加入,然后等待任务队列中的任务全部执行完毕后,再停止线程的运行。
Channel
- close方法 关闭Channel
- closeFuture 处理Channel的关闭 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
- sync 同步等待Channel的关闭
- addListener 异步等待Channel的关闭
- pipeline 方法添加处理器
- write 写入数据
- writeAndFlush 将数据写入并刷出
Future & Promise
- JDK Future
- cancel 取消任务
- isCanceled 任务是否已被取消
- isDone 任务是否完成(只知道是否完成,不知道是否执行成功)
- get 获取任务结果,阻塞等待
- Netty Future 继承自JDK Future
- getNow 获取任务结果,非阻塞,未产生结果时返回Null
- await 等待任务结束
- sync 等待任务结束,若任务失败抛出异常
- isSuccess 判断任务是否成功
- cause 获取失败信息,非阻塞,若任务没失败则返回null
- addListener 添加回调,异步接收结果
- Promise 扩展了 Netty Future
- setSuccess 设置成功结果
- setFailure 设置失败结果
Handler & Pipeline
Pipeline 的 addLast 方法可以添加处理器
ChannelHandler 分为入站、出站两种
-
入站处理器一般都是 ChannelInboundHandlerAdapter 的子类,用于读取接收的数据
-
出站处理器一般都是 ChannelOutboundHandlerAdapter 的子类,用于对发送的数据进行加工
-
ctx.fireChannelRead(msg)
方法(即super.channelRead(ctx, name)
) 调用下一个入站处理器 -
ctx.channel().write(msg)
方法 从尾部开始触发出站处理器 -
ctx.write(msg, promise)
方法 触发上一个出站处理器
ByteBuf
(对字节数据的封装)
创建
初始容量为10
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
直接内存和堆内存
创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接内存创建和销毁代价高,但读写性能好,适合配合池化使用
- 直接内存对GC压力小,不受JVM垃圾回收的管理,应注意手动释放
池化和非池化
Netty 4.1 以后,非Android平台默认开启池化,也可以通过以下系统变量进行设置
-Dio.netty.allocator.type={unpooled|pooled}
优点:有了池化可以重用ByteBuf实例,可以提升效率,同时在高并发场景中更节约内存,减少内存溢出的可能
组成
方法
writeXXX
默认Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value)方法 Little Endian写入 int 值,即 0x250,写入后 50 02 00 00
扩容机制
- 如果写入后数据大小未超过512,则扩容后capacity是下一个16的整数倍。
- 如果写入后数据大小超过512,则扩容后capacity是下一个2n。
- 若扩容超过max capacity,会报错。
读取
ByteBuf读过的部分会被废弃,再读只能读到未读取的部分
若要重复读取,应在read前先做个标记mark
buffer.markReaderIndex();
调用read方法后,使用resetReaderIndex方法可以将读指针重置到标志位置
buffer.resetReaderIndex();
retain和release方法
Netty采用引用计数法来控制回收内存
- 每个ByteBuf初始计数为1
- 调用release方法计数减1,调用retain方法计数加1
- 计数为0时,底层内存被回收
最后使用者负责release
入站处理规则:
- 将ByteBuf转换为其他类型的Java对象后,ByteBuf没用了的时候必须release
- 如果不需要调用 ctx.fireChannelRead(msg) 向后传递时,必须release
- 如果因为异常导致ByteBuf没有传递到下一个Handler,必须release
- 若消息一直向后传递,会由TailContext负责释放未处理信息
出站处理规则:
- 由HeadContext flush后release
异常处理规则:
- 不清楚ByteBuf被引用多少次时,可以循环调用 release 直到返回 true