学习Kafka生产者的缓冲池设计

news2025/1/8 5:38:08

大家一定都了解Java的线程池,线程池有什么好处呢?如果没有线程池,我们每次创建线程都要新建一个线程,这样对CPU的消耗比较大。那么利用线程池我们可以对已经创建好的线程复用,线程就不用频繁创建和销毁了。

同样,我们的内存池也是这个原理,producerBatch需要空间存储消息的时候,就去缓存池申请一块内存,而不用频繁地创建和销毁内存,也就避免了频繁地GC。

BufferPool简介

下面的结构图简单说明了BufferPool的组成结构和处理缓存的流程:
在这里插入图片描述
整个BufferPool的大小默认为32M,内部内存区域分为两块:固定大小内存块集合free、非池化缓存nonPooledAvailableMemory。固定大小内存块默认大小为16k。当ProducerBatch向BufferPool申请一个大小为size的内存块时,BufferPool会根据size的大小判断由哪个内存区域分配内存块。同时,free和nonPooledAvailableMemory这两块区域的内存可以交换。

接下来,我们通过代码来学习Kafka底层提供的高效的内存池设计。

类BufferPool

重要字段如下:

public class BufferPool {

    static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";

    private final long totalMemory;//默认32M
    private final int poolableSize;//池化大小16k
    private final ReentrantLock lock;//分配和回收时用的锁。
    private final Deque<ByteBuffer> free;//池化的内存
    private final Deque<Condition> waiters;//阻塞线程对应的Condition集合
    private long nonPooledAvailableMemory;//非池化可使用的内存
}
  • totalMemory:整个BufferPool内存大小,默认是32M。
  • poolableSize:池化缓存区一块内存块的大小,默认是16k。
  • lock:类型是ReentrantLock。因为会有多线程并发和回收ByteBuffer,所以使用锁控制并发,保证了线程的安全。
  • free:类型是Deque。缓存了指定大小的ByteBuffer对象。
  • waiters:类型是Deque队列。因为会有申请不到足够内存的线程,线程为了等待其他线程释放内存而阻塞等待,对应的Condition对象会进入该队列。
  • nonPooledAvailableMemory:非池化可使用的内存。

接下来,我再来介绍下重要的方法。

allocate()方法是向BufferPool申请ByteBuffer。

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    //1.验证申请的内存是否大于总内存
    if (size > this.totalMemory)
        throw new IllegalArgumentException("Attempt to allocate " + size
                                           + " bytes, but there is a hard limit of "
                                           + this.totalMemory
                                           + " on memory allocations.");

    ByteBuffer buffer = null;
    //2.加锁,保证线程安全。
    this.lock.lock();

    if (this.closed) {
        this.lock.unlock();
        throw new KafkaException("Producer closed while allocating memory");
    }

    try {
        // check if we have a free buffer of the right size pooled
        //3.申请内存的大小是否是池化的内存大小,16k
        if (size == poolableSize && !this.free.isEmpty())
            //如果是就从池里Bytebuffer
            return this.free.pollFirst();
        // 池化内存空间的大小
        int freeListSize = freeSize() * this.poolableSize;
        //4.如果非池化可以空间加池化内存空间大于等于要申请的空间
        if (this.nonPooledAvailableMemory + freeListSize >= size) {
     
            // 如果申请的空间大小小于池化的大小,就从free队列里拿出一个池化的大小的Bytebuffer加到nonPooledAvailableMemory中
            // 5.如果一个池化的大小的Bytebuffer不满足size,就持续释放池化内存Bytebuffer直到满足为止。
            freeUp(size);
            this.nonPooledAvailableMemory -= size;
            //如果非池化可以空间加池化内存空间大于要申请的空间
        } else {
            // we are out of memory and will have to block
            int accumulated = 0;
            //创建对应的Condition
            Condition moreMemory = this.lock.newCondition();
            try {
                //线程最长阻塞时间
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                //放入waiters集合中
                this.waiters.addLast(moreMemory);
               
                // 没有足够的空间就一直循环
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        //空间不够就阻塞,并设置超时时间。
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        recordWaitTime(timeNs);
                    }

                    if (this.closed)
                        throw new KafkaException("Producer closed while allocating memory");

                    if (waitingTimeElapsed) {
                        this.metrics.sensor("buffer-exhausted-records").record();
                        throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    remainingTimeToBlockNs -= timeNs;

                    // check if we can satisfy this request from the free list,
                    // otherwise allocate memory
                    //ByteBuffer池化集合里是否有元素
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        // just grab a buffer from the free list
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        //尝试给nonPooledAvailableMemory扩容
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                        this.nonPooledAvailableMemory -= got;
                        //累计分配了多少空间
                        accumulated += got;
                    }
                }
                accumulated = 0;
            } finally {
                this.nonPooledAvailableMemory += accumulated;//把已经分配的内存还回nonPooledAvailableMemory
                this.waiters.remove(moreMemory);//删除对应的condition
            }
        }
    } finally {
        // signal any additional waiters if there is more memory left
        // over for them
        try {
            if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                this.waiters.peekFirst().signal();
        } finally {
            // Another finally... otherwise find bugs complains
            lock.unlock();
        }
    }

    if (buffer == null)
        //非池化ByteBuffer分配内存
        return safeAllocateByteBuffer(size);
    else
        return buffer;
}

这里先明确三个变量:

  • free:由固定大小ByteBuffer组成的集合。
  • nonPooledAvailableMemory:非池化可利用的内存。
  • size:申请的ByteBuffer大小。

第一步,验证申请的空间大小size是否大于总内存,BufferPool的总内存默认是32M。如果比总内存还大,就抛出异常。

第二步,因为会涉及到Deque的操作,而Deque不是线程安全的,这里要加锁,防止多线程操作引起的问题。

第三步,如果free不为空,而且申请的空间size和free的元素的大小相同,就从free中拿出一个ByteBuffer并返回,ByteBuffer申请成功。

第四步,如果不满足上述条件,free加上nonPooledAvailableMemory比要申请的大,就调用freeUp(size)方法凑齐足够的空间给size。

freeUp(size)方法源码参考下面:

private void freeUp(int size) {
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
        this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

只要固定大小ByteBuffer集合不为空且非池化可利用空间小于申请的size,就不断从free里往nonPooledAvailableMemory添加ByteBuffer,直到满足size的大小。

第五步,如果nonPooledAvailableMemory加上free的空间小于size的大小,就意味着现在的剩余空间满足不了size的大小。要随着其他线程对内存的释放一点点累加到满足size大小。那该怎么办呢?

首先定义int型变量accumulated作为标记已经获得了多大的空间。定义这个线程的Condition,并放入Condition的集合waiters中。然后进入到while循环中,当累加的空间和size一样大了才跳出循环。进入while循环后先通过await()阻塞线程,等待其他线程释放内存。当其他线程释放内存时,会唤醒这里的阻塞。

假如有线程释放内存且唤醒这里的阻塞了,那么先看size是否满足释放free里的ByteBuffer的条件,如果满足就从free里取出一个ByteBuffer,否则再调用freeUp()给nonPooledAvailableMemory扩容。如果累计的空间还是不满足size的大小,那就再次await()等待下次有线程释放空间。

我们再来分析下释放空间的代码,deallocate()方法:

public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
        //如果是池化ByteBuffer大小的ByteBuffer
        if (size == this.poolableSize && size == buffer.capacity()) {
            buffer.clear();
            this.free.add(buffer);
        } else {
            //否则释放到nonPooledAvailableMemory
            this.nonPooledAvailableMemory += size;
        }
        //拿出一个condition,并signal,唤醒阻塞。
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        lock.unlock();
    }
}

先判断size是否和池化ByteBuffer的大小一样,如果满足就把要释放的ByteBuffer放回free里,否则非池化可利用缓存会回收这个ByteBuffer。因为有ByteBuffer回收了,我们就要看阻塞线程的Condition集合waiters是否为空,如果不为空就取第一个Condition并唤醒阻塞。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/614701.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

回顾 | Semantic Kernel:面向 AI 编程(三) - 云原生

点击蓝字 关注我们 编辑&#xff1a;Alan Wang 排版&#xff1a;Rani Sun 微软 Reactor 为帮助广开发者&#xff0c;技术爱好者&#xff0c;更好的学习 .NET Core, C#, Python&#xff0c;数据科学&#xff0c;机器学习&#xff0c;AI&#xff0c;区块链, IoT 等技术&#xff0…

chatgpt赋能python:Python如何创建角色

Python如何创建角色 在游戏开发中&#xff0c;创建角色是非常重要的一个环节&#xff0c;也是游戏设计的重要一环。Python作为一种广泛使用的编程语言&#xff0c;可以用于快速且高效地创建角色。 1. 创建角色的基本思路 创建角色的主要思路是定义角色的属性&#xff0c;包括…

ReactJS入门(一)—— 初步认识React

React刚开始红的时候&#xff0c;由于对其不甚了解&#xff0c;觉得JSX的写法略非主流&#xff0c;故一直没打算将其应用在项目上&#xff0c;随着身边大神们的科普&#xff0c;才后知后觉是个好东西。 好在哪里呢&#xff1f;个人拙见&#xff0c;有俩点&#xff1a; 1. 虚拟…

14.数据结构之多路查找树与堆

前言 之前介绍的都是二叉查找树&#xff0c;二叉树一个节点最多有两个子节点&#xff0c;那么多于两个节点是什么情况呢&#xff0c;这就是我们本节要介绍的多路查找树。 多路查找树&#xff0c;也是我们数据库mysql底层索引维护方式。下面&#xff0c;我们来详细介绍。 1. …

小红书母婴博主类型怎么选,类型区分

母婴类型的分享不管在哪个平台都是涨粉最快的&#xff0c;也可能是因为当前的大环境因素导致的。不过如果你想成为一名母婴博主或者想要借助它的影响&#xff0c;得先了解一些东西。那么小红书母婴博主类型怎么选&#xff0c;类型怎么区分。 小红书母婴博主是指聚集在小红书平台…

【数据结构】带你玩转排序:堆排序、希尔排序、插入排序、选择排序、冒泡排序、快排(多版本)、归并排序

Yan-英杰的主页 悟已往之不谏 知来者之可追 C程序员&#xff0c;2024届电子信息研究生 目录 常见算法的实现 插入排序 希尔排序 堆排序 选择排序 冒泡排序 快速排序 Hoare版本 随机选Keyi 三数取中 挖坑法 前后指针版本 归并排序 常见算法的实现 插入排序 动画演示&…

信创提速,人才为先!麒麟信安与领路信创签订《人才合作协议》

5月23日&#xff0c;麒麟信安杨涛董事长一行考察了设立在长沙领路信创科技有限公司&#xff08;简称&#xff1a;领路信创&#xff09;的“国家新一代自主安全计算系统产业集群人才基地”&#xff08;简称人才基地&#xff09;&#xff0c;并与领路信创刘耿董事长签署《人才合作…

苹果WWDC2023:首款MR头显震撼发布,开发者泪洒现场,一文读懂全新产品及创新功能

&#x1f337; 博主 libin9iOak带您 Go to New World.✨ &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &#x1f30a; 《IDEA开发…

macOS Sonoma 发布,全面提升生产力和创意工作流(ISO、IPSW、PKG 下载)

macOS Sonoma 14.0 Beta 1 (23A5257q) ISO、IPSW、PKG 下载 本站下载的 macOS 软件包&#xff0c;既可以拖拽到 Applications&#xff08;应用程序&#xff09;下直接安装&#xff0c;也可以制作启动 U 盘安装&#xff0c;或者在虚拟机中启动安装。另外也支持在 Windows 和 Li…

chatgpt赋能python:Python如何删除行:从入门到精通

Python如何删除行&#xff1a;从入门到精通 在Python编程中&#xff0c;删除行是必不可少的操作之一。无论是清除不必要的数据&#xff0c;还是在数据集中删除重复行&#xff0c;或者在文本文件中删除某些行&#xff0c;删除行都是一项极其重要的任务。 什么是Python语言&…

shell文件读取

文件读取 一、whilefor shell中读取文件有两种方式 while 和 for while #!/bin/bash cat filename | while read line doetho $line donefor #!/bin/bash for line in cat filename(待读取的文件) doecho $line done第三中写法&#xff0c;上述两种方式的升级写法 #!/bin/bas…

SQL-约束

SQL-约束 1.1 概念 约束是作用于表中列上的规则&#xff0c;用于限制加入表的数据约束的存在保证了数据库中数据的正确性、有效性和完整性 1.2 分类 类型描述关键字非空约束保证列中所有的数据不能有null值NOT NULL唯一约束保证列中所有数据各不相同UNIQUE主键约束主键是一行…

HTTP首部(上)

HTTP 协议的请求和响应报文中必定包含 HTTP 首部&#xff0c;只是我们平时在使用 Web 的过程中感受不到它。本章我们一起来学习 HTTP 首部的结构&#xff0c;以及首部中各字段的用法。 1.HTTP报文首部 先来看看http报文的首部结构图&#xff1a; HTTP 协议的请求和响应报文中…

不知道如何搭建帮助中心?这里有解决办法!

在今天的数字化时代&#xff0c;帮助中心已经成为许多公司所必需的一个重要部分。它是一个客户与公司沟通和交互的重要渠道&#xff0c;可以帮助客户解决问题和获得支持。本文将介绍如何搭建一个有效的帮助中心&#xff0c;以提高客户满意度和公司的效率。 一、明确帮助中心的…

哪些因素对会影响到企业制定自己的融合CDN战略

我们知道一个企业/组织有多种方法可以实现和利用多CDN战略&#xff0c;由于带宽承诺、成本、超期费用等因素&#xff0c;因此对不同的指标进行评估至关重要。 以下是可能影响您的融合CDN战略的一些因素&#xff1a; 地理因素 在选择CDN时需要考虑的一个重要因素是用户所在的…

静态内存管理

内存管理的基本概念 在一般的实时嵌入式系统中&#xff0c;由于实时性的要求&#xff0c;很少使用虚拟内存机制。所有的内存都需要用户参与分配&#xff0c;直接操作物理内存&#xff0c;所分配的内存不能超过系统的物理内存&#xff0c;所有的系统堆栈的管理&#xff0c;都由…

快速上手kettle(三)壶中可以放些啥?

快速上手kettle&#xff08;三&#xff09;壶中可以放些啥&#xff1f; 序言一 、kettle这壶能装些啥二、Access输入2.1 准备Acess数据库和表2.2 新建一个转换并设置2.3 启动转换预览数据 三、CSV文件输入3.1 准备csv文件&#xff0c;并将csv输入控件拖入工作区3.2 csv输入控件…

JDBC 拾枝杂谈—入门篇(通俗易懂)

目录 一、前言 二、JDBC介绍 1.基本概述 : 2.基本原理 : 三、JDBC模拟 1.模拟接口 &#xff1a; 2.模拟实现类 : 3.模拟测试类 : 4.模拟扩展 : 四、JDBC入门 1.编写JDBC程序的核心四部曲 : &#xff08;全文背诵&#xff09; 2.准备工作 : ①导入jar包 ②创建测试表 …

Sys Tick【转】

STM32学习及应用笔记一&#xff1a;SysTick定时器学习及应用 - STM32/STM8技术论坛 - 电子技术论坛 - 广受欢迎的专业电子论坛! (elecfans.com) 1、SysTick究竟是什么&#xff1f; 关于SysTick在STM32的资料中并没有详细的介绍&#xff0c;这可能由于SysTick是ARM内核的东西。在…

在用对讲机中竟有近5成属于违规使用?

目前对讲机在很多领域和场景中都有着广泛的应用&#xff0c;包括建筑工地、宾馆饭店、住宅小区、大型商场超市、安保活动、物业管理等。 不过据非官方数据统计&#xff0c;在用对讲机中竟有近5成属于违规使用&#xff0c;这严重干扰了城市上空的无线电波秩序。 根据近年来无线…