大家一定都了解Java的线程池,线程池有什么好处呢?如果没有线程池,我们每次创建线程都要新建一个线程,这样对CPU的消耗比较大。那么利用线程池我们可以对已经创建好的线程复用,线程就不用频繁创建和销毁了。
同样,我们的内存池也是这个原理,producerBatch需要空间存储消息的时候,就去缓存池申请一块内存,而不用频繁地创建和销毁内存,也就避免了频繁地GC。
BufferPool简介
下面的结构图简单说明了BufferPool的组成结构和处理缓存的流程:
整个BufferPool的大小默认为32M,内部内存区域分为两块:固定大小内存块集合free、非池化缓存nonPooledAvailableMemory。固定大小内存块默认大小为16k。当ProducerBatch向BufferPool申请一个大小为size的内存块时,BufferPool会根据size的大小判断由哪个内存区域分配内存块。同时,free和nonPooledAvailableMemory这两块区域的内存可以交换。
接下来,我们通过代码来学习Kafka底层提供的高效的内存池设计。
类BufferPool
重要字段如下:
public class BufferPool {
static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";
private final long totalMemory;//默认32M
private final int poolableSize;//池化大小16k
private final ReentrantLock lock;//分配和回收时用的锁。
private final Deque<ByteBuffer> free;//池化的内存
private final Deque<Condition> waiters;//阻塞线程对应的Condition集合
private long nonPooledAvailableMemory;//非池化可使用的内存
}
- totalMemory:整个BufferPool内存大小,默认是32M。
- poolableSize:池化缓存区一块内存块的大小,默认是16k。
- lock:类型是ReentrantLock。因为会有多线程并发和回收ByteBuffer,所以使用锁控制并发,保证了线程的安全。
- free:类型是Deque。缓存了指定大小的ByteBuffer对象。
- waiters:类型是Deque队列。因为会有申请不到足够内存的线程,线程为了等待其他线程释放内存而阻塞等待,对应的Condition对象会进入该队列。
- nonPooledAvailableMemory:非池化可使用的内存。
接下来,我再来介绍下重要的方法。
allocate()方法是向BufferPool申请ByteBuffer。
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
//1.验证申请的内存是否大于总内存
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
//2.加锁,保证线程安全。
this.lock.lock();
if (this.closed) {
this.lock.unlock();
throw new KafkaException("Producer closed while allocating memory");
}
try {
// check if we have a free buffer of the right size pooled
//3.申请内存的大小是否是池化的内存大小,16k
if (size == poolableSize && !this.free.isEmpty())
//如果是就从池里Bytebuffer
return this.free.pollFirst();
// 池化内存空间的大小
int freeListSize = freeSize() * this.poolableSize;
//4.如果非池化可以空间加池化内存空间大于等于要申请的空间
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// 如果申请的空间大小小于池化的大小,就从free队列里拿出一个池化的大小的Bytebuffer加到nonPooledAvailableMemory中
// 5.如果一个池化的大小的Bytebuffer不满足size,就持续释放池化内存Bytebuffer直到满足为止。
freeUp(size);
this.nonPooledAvailableMemory -= size;
//如果非池化可以空间加池化内存空间大于要申请的空间
} else {
// we are out of memory and will have to block
int accumulated = 0;
//创建对应的Condition
Condition moreMemory = this.lock.newCondition();
try {
//线程最长阻塞时间
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
//放入waiters集合中
this.waiters.addLast(moreMemory);
// 没有足够的空间就一直循环
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
//空间不够就阻塞,并设置超时时间。
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
if (this.closed)
throw new KafkaException("Producer closed while allocating memory");
if (waitingTimeElapsed) {
this.metrics.sensor("buffer-exhausted-records").record();
throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
//ByteBuffer池化集合里是否有元素
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
//尝试给nonPooledAvailableMemory扩容
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -= got;
//累计分配了多少空间
accumulated += got;
}
}
accumulated = 0;
} finally {
this.nonPooledAvailableMemory += accumulated;//把已经分配的内存还回nonPooledAvailableMemory
this.waiters.remove(moreMemory);//删除对应的condition
}
}
} finally {
// signal any additional waiters if there is more memory left
// over for them
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
}
}
if (buffer == null)
//非池化ByteBuffer分配内存
return safeAllocateByteBuffer(size);
else
return buffer;
}
这里先明确三个变量:
- free:由固定大小ByteBuffer组成的集合。
- nonPooledAvailableMemory:非池化可利用的内存。
- size:申请的ByteBuffer大小。
第一步,验证申请的空间大小size是否大于总内存,BufferPool的总内存默认是32M。如果比总内存还大,就抛出异常。
第二步,因为会涉及到Deque的操作,而Deque不是线程安全的,这里要加锁,防止多线程操作引起的问题。
第三步,如果free不为空,而且申请的空间size和free的元素的大小相同,就从free中拿出一个ByteBuffer并返回,ByteBuffer申请成功。
第四步,如果不满足上述条件,free加上nonPooledAvailableMemory比要申请的大,就调用freeUp(size)方法凑齐足够的空间给size。
freeUp(size)方法源码参考下面:
private void freeUp(int size) {
while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
只要固定大小ByteBuffer集合不为空且非池化可利用空间小于申请的size,就不断从free里往nonPooledAvailableMemory添加ByteBuffer,直到满足size的大小。
第五步,如果nonPooledAvailableMemory加上free的空间小于size的大小,就意味着现在的剩余空间满足不了size的大小。要随着其他线程对内存的释放一点点累加到满足size大小。那该怎么办呢?
首先定义int型变量accumulated作为标记已经获得了多大的空间。定义这个线程的Condition,并放入Condition的集合waiters中。然后进入到while循环中,当累加的空间和size一样大了才跳出循环。进入while循环后先通过await()阻塞线程,等待其他线程释放内存。当其他线程释放内存时,会唤醒这里的阻塞。
假如有线程释放内存且唤醒这里的阻塞了,那么先看size是否满足释放free里的ByteBuffer的条件,如果满足就从free里取出一个ByteBuffer,否则再调用freeUp()给nonPooledAvailableMemory扩容。如果累计的空间还是不满足size的大小,那就再次await()等待下次有线程释放空间。
我们再来分析下释放空间的代码,deallocate()方法:
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
//如果是池化ByteBuffer大小的ByteBuffer
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
//否则释放到nonPooledAvailableMemory
this.nonPooledAvailableMemory += size;
}
//拿出一个condition,并signal,唤醒阻塞。
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
先判断size是否和池化ByteBuffer的大小一样,如果满足就把要释放的ByteBuffer放回free里,否则非池化可利用缓存会回收这个ByteBuffer。因为有ByteBuffer回收了,我们就要看阻塞线程的Condition集合waiters是否为空,如果不为空就取第一个Condition并唤醒阻塞。