文章目录
- 1.ByteBufAllocator 内存管理器
- 2.UnpooledByteBufAllocator
- 2.1 heap内存的分配
- 2.2 direct内存的分配
- 3.PooledByteBufAllocator
- 3.1 heap内存和direct内存的分配
- 3.2 directArena分配direct内存的流程
- 3.3 内存规格的介绍
- 4.缓存的相关问题
- 4.1 缓存的数据结果
- 4.2 命中缓存的分配流程
- 5.PoolThreadCache的相关问题
- 5.1 Arena数据结构分析
- 5.2 Page级别的内存分配
- 5.3 Subpage级别的内存分配
1.ByteBufAllocator 内存管理器
ByteBuf分类:
- pooled和unpooled
- heap和direct
- unsafe和非unsafe
通过不同的方法去读取到数据。
@Override
public ByteBuf buffer() {
if (directByDefault) {
return directBuffer();
}
return heapBuffer();
}
堆内存分类
@Override
public ByteBuf heapBuffer() {
return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newHeapBuffer(initialCapacity, maxCapacity);
}
- PooledByteBufAllocator: 通过在预先分配好的内存中分配数据
- UnpooledByteBufAllocator: 从操作系统中直接分配内存数据
2.UnpooledByteBufAllocator
- newHeapBuffer
- newDirectBuffer
2.1 heap内存的分配
UnpooledByteBufAllocator.newHeapBuffer()
判断是否是Unsafe, 如果有Unsafe的辅助, 为InstrumentedUnpooledUnsafeHeapByteBuf
, 如果没有则为InstrumentedUnpooledHeapByteBuf
一个是UnpooledUnsafeHeapByteBuf
, 一个是UnpooledHeapByteBuf
, 不过UnpooledUnsafeHeapByteBuf
是继承UnpooledHeapByteBuf
的
UnpooledUnsafeHeapByteBuf
这里的UnpooledUnsafeHeapByteBuf
和UnpooledHeapByteBuf
区别在于_getByte()以及一些类似的方法, 区别在于工具类的使用上
UnpooledUnsafeHeapByteBuf
UnpooledHeapByteBuf
一个是UnsafeByteBufUtil
, 一个是HeapByteBufUtil
UnsafeByteBufUtil
HeapByteBufUtil
UnsafeByteBufUtil是通过对象加偏移量的方式获取数据的, 而HeapByteBufUtil是通过数组获取数据的。前一种的效率高一些。
2.2 direct内存的分配
UnpooledByteBufAllocator.newDirectBuffer()
- InstrumentedUnpooledDirectByteBuf: UnpooledDirectByteBuf
- InstrumentedUnpooledUnsafeDirectByteBuf: UnpooledUnsafeDirectByteBuf
- InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf: UnpooledUnsafeNoCleanerDirectByteBuf, UnpooledUnsafeDirectByteBuf
UnpooledDirectByteBuf
UnpooledDirectByteBuf.setByteBuffer()
UnpooledUnsafeDirectByteBuf.setByteBuffer()
先是调用了子类的方法, 然后调用Unsafe拿到对应的数据。
其次是_getByte()等方法的不同, Unsafe会通过一个内存地址加偏移量获取数据, 非unsafe通过数组下标获取数据的。
3.PooledByteBufAllocator
- newDirectBuffer
- newHeapBuffer
3.1 heap内存和direct内存的分配
private final PoolThreadLocalCache threadCache;
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
// 获取线程局部缓存PoolThreadCache
PoolThreadCache cache = threadCache.get();
// 从局部缓存中获取到heap竞技场部分
PoolArena<byte[]> heapArena = cache.heapArena;
final ByteBuf buf;
if (heapArena != null) {
// 主要的进行分配的过程
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
- 获取线程局部缓存PoolThreadCache, 从PoolThreadLocalCache 获取到
- 从局部缓存中获取到heap竞技场部分PoolArena
- 从Arena上进行内存分配
这里的nDirectArena为2倍的cpu核数, 为的是之前创建的NioEventLoop的数量对应
PooledByteBufAllocator的结构, 每一个Thread和Area对应, 都是2倍Cpu核数
线程局部缓存PoolThreadCache
- tinyCacheSize
- smallCacheSize
- normalCacheSize
3.2 directArena分配direct内存的流程
directArena.allocate(cache, initialCapacity, maxCapacity)
- 从对象池中获取PooledByteBuf进行复用
- 从缓存上进行内存分配
- 从内存堆中进行内存分配
- Unsafe: PooledUnsafeDirectByteBuf
- 非Unsafe: PooledDirectByteBuf
- 轻量级对象池获取ByteBuf
- ByteBuf复用方法
allocate()
分配内存逻辑
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
3.3 内存规格的介绍
- 0 - 512B 为tinySize, 单位为SubPage
- 512B - 8K 为smallSize, 单位为SubPage
- 8K - 16M 为normalSize, 单位为Page
- 16M以上 为hugeSize, 单位为Chunk
4.缓存的相关问题
4.1 缓存的数据结果
MemoryRegionCache
- queue: chunk handler执行逻辑
- sizeClass: tiny, small, normal
- size: 16B, 512B, 1K, 2K, 4K, 8K, 16K, 32K
- tiny的数组大小为32: 16B - 496B
- small的数组大小为4: 512BM 1K 2K 4K
- normal的数组大小为3: 8K 16K 32K
通过计算和传参获取到数组大小和数组的单位的大小
4.2 命中缓存的分配流程
分配内存逻辑中存在名字缓存的分配流程。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
// 命中缓存的分配流程
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 命中缓存的分配流程
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
- 找到对应size的MemoryRegionCache
- 从queue中弹出一个entry给ByteBuf初始化
- 将弹出的entry扔到对象池进行复用
找到对应size的MemoryRegionCache
从queue中弹出一个entry给ByteBuf初始化
初始化
将弹出的entry扔到对象池进行复用
5.PoolThreadCache的相关问题
- cache: 之前的MemoryRegionCache
- arena: PoolArena, PoolChunkList, PoolChunk, PoolSubpage
5.1 Arena数据结构分析
Arena: ChunkList的双向链表 -> Chunk的双向链表
Chunk -> subpage[]数组
5.2 Page级别的内存分配
allocateNormal()
- 尝试在现有的chunk上分配
- 创建一个chunk进行分配
- 初始化pooledByteBuf: c.allocate()
创建chunk分配 PoolChunk
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
unpooled = false;
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.maxOrder = maxOrder;
this.chunkSize = chunkSize;
this.offset = offset;
unusable = (byte) (maxOrder + 1);
log2ChunkSize = log2(chunkSize);
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;
assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
maxSubpageAllocs = 1 << maxOrder;
memoryMap = new byte[maxSubpageAllocs << 1];
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
subpages = newSubpageArray(maxSubpageAllocs);
cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}
handle: 对应chunk中第几个page的第几个subpage, 就是拿到内存里的哪一个连续内存
初始化PooledByteBuf
5.3 Subpage级别的内存分配
allocateSubpage()
- 定位到Subpage对象
- 初始化Subpage: 划分page
- 初始化PooledByteBuf: 和page级别的内存分配一个代码