文章目录
- Pre
- 主要特点和工作原理
- 类关系
- 源码解析
- 入口索引
- AbstractNioByteChannel.NioByteUnsafe#read
- allocHandle.allocate(allocator)
- 小结
Pre
Netty Review - 直接内存的应用及源码分析
Netty Review - 底层零拷贝源码解析
主要特点和工作原理
ByteBuf 内存池是 Netty 中用于管理 ByteBuf 对象的一种机制,旨在提高内存的使用效率和性能。
以下是 ByteBuf 内存池的主要特点和工作原理:
-
复用缓冲区对象:内存池会维护一组预分配的 ByteBuf 对象,这些对象可以被重复使用,避免了频繁地创建和销毁对象,从而减少了内存分配和释放的开销。
-
提高内存分配速度:由于预先分配了一定数量的 ByteBuf 对象,当需要分配新的缓冲区时,可以直接从内存池中获取可用的对象,避免了频繁地向操作系统请求内存,提高了分配速度。
-
减少内存碎片:内存池会根据需求预分配一定数量和大小的缓冲区对象,这些对象大小一致或相近,有利于减少内存碎片的产生。
-
提高性能:通过复用缓冲区对象和减少内存分配和释放的次数,可以降低系统的开销,提高了系统的性能。
-
线程安全:内存池通常是线程安全的,多个线程可以并发地从内存池中获取和释放缓冲区对象,而不需要额外的同步措施。
工作原理:
- 当需要分配缓冲区对象时,首先尝试从内存池中获取可用的对象。
- 如果内存池中没有可用的对象,则根据需求创建新的缓冲区对象。
- 当缓冲区对象不再使用时,将其归还给内存池,以便重复利用。
类关系
源码解析
入口索引
结合我们的Netty线程模型源码图 ,找到入口 。
AbstractNioByteChannel.NioByteUnsafe#read
这段代码是 Netty 中的 read()
方法实现,用于从通道中读取数据并触发相应的事件到 ChannelPipeline
中。
@Override
public final void read() {
final ChannelConfig config = config(); // 获取通道配置信息
if (shouldBreakReadReady(config)) { // 判断是否应该中断读就绪操作
clearReadPending(); // 清除读等待标志
return;
}
final ChannelPipeline pipeline = pipeline(); // 获取通道的管道
final ByteBufAllocator allocator = config.getAllocator(); // 获取分配器
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // 获取接收字节缓冲区分配句柄
allocHandle.reset(config); // 重置分配句柄状态
ByteBuf byteBuf = null; // 字节缓冲区
boolean close = false; // 是否关闭标志
try {
do {
byteBuf = allocHandle.allocate(allocator); // 分配字节缓冲区
allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 读取数据到缓冲区
if (allocHandle.lastBytesRead() <= 0) {
// 如果没有读取到数据
// 释放缓冲区
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0; // 是否关闭标志
if (close) {
// 如果收到 EOF,表示没有数据可读了
readPending = false; // 清除读等待标志
}
break;
}
allocHandle.incMessagesRead(1); // 增加读取消息数
readPending = false; // 清除读等待标志
pipeline.fireChannelRead(byteBuf); // 触发通道读事件到管道
byteBuf = null;
} while (allocHandle.continueReading()); // 继续读取数据,直到不再需要读取为止
allocHandle.readComplete(); // 读操作完成
pipeline.fireChannelReadComplete(); // 触发通道读完成事件到管道
if (close) {
closeOnRead(pipeline); // 如果需要关闭通道,执行关闭操作
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle); // 处理读取异常
} finally {
// 检查是否有未处理的读等待操作
// 这可能有两个原因:
// 1. 用户在 channelRead(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()
// 2. 用户在 channelReadComplete(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()
// 详见 https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp(); // 移除读操作
}
}
}
allocHandle.allocate(allocator)
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
在给定的 ByteBufAllocator 上分配一个新的 ByteBuf 实例。
return alloc.ioBuffer(guess())
: 使用给定的 ByteBufAllocator 对象调用ioBuffer()
方法来分配一个新的 ByteBuf 实例。guess()
方法用于估算分配的字节数。
该方法的作用是在给定的 ByteBufAllocator 上分配一个新的 ByteBuf 实例,并返回分配的实例。
alloc.ioBuffer(guess())
@Override
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe()) { // 检查当前平台是否支持直接内存
return directBuffer(initialCapacity); // 如果支持直接内存,则调用 directBuffer() 方法创建直接内存的 ByteBuf 实例
}
return heapBuffer(initialCapacity); // 如果不支持直接内存,则调用 heapBuffer() 方法创建堆内存的 ByteBuf 实例
}
该方法的作用是根据当前平台是否支持直接内存来选择合适的内存类型(堆内存或直接内存),并根据传入的初始容量参数创建相应类型的 ByteBuf 实例
PlatformDependent.hasUnsafe() ---- true
@Override
public ByteBuf directBuffer(int initialCapacity) {
return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); // 调用重载方法 directBuffer(int initialCapacity, int maxCapacity),传入默认的最大容量值 DEFAULT_MAX_CAPACITY
}
directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) { // 如果初始容量和最大容量都为0
return emptyBuf; // 返回一个空的 ByteBuf 实例
}
validate(initialCapacity, maxCapacity); // 验证初始容量和最大容量的合法性
return newDirectBuffer(initialCapacity, maxCapacity); // 创建一个新的直接内存的 ByteBuf 实例
}
newDirectBuffer
方法,我们发现它是一个抽象方法,由AbstractByteBufAllocator
的子类负责具体实现
newDirectBuffer(initialCapacity, maxCapacity)
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 获取当前线程的线程缓存
PoolThreadCache cache = threadCache.get();
// 获取直接内存池
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) { // 如果直接内存池可用
// 从直接内存池中分配内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else { // 如果直接内存池不可用
// 使用平台相关的方式创建直接内存的 ByteBuf 实例
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
// 返回一个包装了泄漏感知器的 ByteBuf 实例
return toLeakAwareBuffer(buf);
}
directArena.allocate(cache, initialCapacity, maxCapacity);
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// 创建一个新的 PooledByteBuf 实例,其中 maxCapacity 为指定的最大容量
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 使用指定的线程缓存和请求容量来分配内存给 ByteBuf
allocate(cache, buf, reqCapacity);
// 返回分配的 ByteBuf
return buf;
}
这段代码实现了从线程缓存中分配内存给 ByteBuf,并返回分配的 ByteBuf 实例。
重点分析newByteBuf的实现,它同样是个抽象方法,由子类DirectArena和HeapArena来实现不同类型的缓冲区分配.
我们这里使用的是直接内存,因此重点分析DirectArena的实现
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
// 如果支持 Unsafe
if (HAS_UNSAFE) {
// 创建 PooledUnsafeDirectByteBuf 的实例
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
// 创建 PooledDirectByteBuf 的实例
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
这段代码是 Netty 中用于创建新的 PooledByteBuf 对象的方法。根据是否支持 Unsafe,选择创建 PooledUnsafeDirectByteBuf 或者 PooledDirectByteBuf 的实例。这两个类都是用于管理直接内存的缓冲区,其中 PooledUnsafeDirectByteBuf 是使用 Unsafe 的方式来操作内存,而 PooledDirectByteBuf 则是不依赖 Unsafe 来操作内存。
PooledUnsafeDirectByteBuf.newInstance(maxCapacity)
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
// 从对象池中获取 PooledUnsafeDirectByteBuf 实例
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
// 重用实例并设置最大容量
buf.reuse(maxCapacity);
return buf;
}
通过RECYCLER
的get方法循环使用ByteBuf
对象,如果是非内存池实现,则直接创建一个新的ByteBuf对象.
小结
总的来说,ByteBuf 内存池通过复用缓冲区对象和减少内存分配和释放的开销,提高了内存的使用效率和性能,是 Netty 中重要的优化手段之一。