如何初始化的bufferPool的
在初始化的时候 初始化BufferPool对象
// 设置缓冲区
this.accumulator = new RecordAccumulator(xxxxx,其他参数,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
this.free = bufferPool;
发送消息时
RecordAccumulator.RecordAppendResult result = accumulator.append(xxx);
buffer = free.allocate(size, maxTimeToBlock); // 内存分配
总体架构
在KafkaProudcer初始化的时候,会创建一个32MB的缓冲池,buffer.memory参数可以自定义,同事缓冲池被分成多个块,一个块就是batch.size 默认就是16KB。
我们来分析下,在一个Kafka集群中 如果有3个Broker。那么当一个topic创建的时候,就是三个分区。
分区A: 分区B: 分区C: 三个分区分别存储消息 发送消息。所以在申请的时候,也是按照分区级别进行申请Batch内存块。
但是如果频繁的申请、发送完毕消息,被GC回收,其实是比较消耗资源的方式,所以更好的方式就是通过池化技术,
总体流程
1.申请之后发送完毕消息后,自动归还给BufferPool,避免内存块被频繁回收的问题。
基本属性
// 总内存大小 32MB
private final long totalMemory;
// 每个内存块大小 batchSize 默认16K
private final int poolableSize;
// 申请、归还内存的方法的同步锁
private final ReentrantLock lock;
// 空闲内存块
private final Deque<ByteBuffer> free;
// 需要等待空闲内存块的事件
private final Deque<Condition> waiters;
/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */
// 缓冲池还未分配的空闲内存,新申请的内存块就是从这里获取内存值
private long nonPooledAvailableMemory;
内存分配
申请内存
org.apache.kafka.clients.producer.internals.BufferPool#allocate
1.判断申请内存大小超过总内存大小 抛出异常
2.申请加锁,如果缓冲区已经关闭,直接释放锁,抛出异常
3.内存够的情况下,如果申请内存等于16K,并且缓冲区内存不为空
4.如果申请内存超过一个batch.size的大小,当前空闲内存总空间 以及回收的内存空间是否足够申请的内存大小
5.内存不够的情况下,申请一个condition 添加到waiter,不断收集空闲的内存,直到大于申请的内存,退出。在申请过程中,await进行阻塞等待。
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("");
ByteBuffer buffer = null;
this.lock.lock();
if (this.closed) {
this.lock.unlock();
throw new KafkaException("Producer closed while allocating memory");
}
try {
// size大小等于batchSIze 并且free不为空 直接获取空闲内存块
// 这里为什么必须是batchSize 因为如果大于batchSize的话,就无法满足,
// 因为batchSize是固定值,不能超过batchSize
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// 已经回收的内存总大小 = 当前回收内存的个数 * batchSize
int freeListSize = freeSize() * this.poolableSize;
// 总空闲内存 大于等于 申请的内存
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
// 空闲内存 减去申请的内存大小
this.nonPooledAvailableMemory -= size;
// 内存足够的情况
} else {
// 内存不够的情况
// we are out of memory and will have to block
int accumulated = 0;
// 创建本次等待的condition
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
// 添加到类型Deque的waiter中 -- 之后会唤醒
this.waiters.addLast(moreMemory);
//只有当超过申请内存大小 退出
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
// 阻塞等待
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
if (this.closed)
throw new KafkaException("Producer closed while allocating memory");
if (waitingTimeElapsed) {
this.metrics.sensor("buffer-exhausted-records").record();
throw new BufferExhaustedException("xx")
}
remainingTimeToBlockNs -= timeNs;
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
buffer = this.free.pollFirst();
accumulated = size;
} else {
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -= got;
accumulated += got;
}
}
accumulated = 0;
} finally {
this.nonPooledAvailableMemory += accumulated;
this.waiters.remove(moreMemory);
}
}
} finally {
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
lock.unlock();
}
}
if (buffer == null)
return safeAllocateByteBuffer(size);
else
return buffer;
}
内存回收
内存释放的时候,加锁处理。然后判断规范内存等于batch.size 直接回收给free。
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
// 如果归还的内存块大小等于batchSize
if (size == this.poolableSize && size == buffer.capacity()) {
// 清空添加到缓冲池中,归还给缓冲池
buffer.clear();
this.free.add(buffer);
} else {
// 直接加在内存未分配的地址,等待JVM GC回收
this.nonPooledAvailableMemory += size;
}
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
// 唤醒第一个待分配的
moreMem.signal();
} finally {
lock.unlock();
}
}
品一品其中的设计
1.恰到好处的避免频繁的不断的JVM GC,使用内存池的方式,到达资源的复用。
2.结合业务设计batch.size 不能无脑设置消息体大小。如果太大则会导致不断创建新的ByteBuffer 并且不会归还到缓冲池中。
3.配合多线程的等待/唤醒机制来实现同步。
参考文档
https://www.cnblogs.com/rwxwsblog/p/14754810.html
https://greedypirate.github.io/2020/05/02/kafka%E7%BC%93%E5%86%B2%E6%B1%A0-BufferPool-%E5%8E%9F%E7%90%86%E5%89%96%E6%9E%90/#%E5%89%8D%E8%A8%80
https://blog.csdn.net/huaxiawangyong/article/details/132389908