【Kafka】Kafka Producer的缓冲池机制原理

news2024/10/20 16:57:35

如何初始化的bufferPool的

在初始化的时候 初始化BufferPool对象

// 设置缓冲区
this.accumulator = new RecordAccumulator(xxxxx,其他参数,
        new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

this.free = bufferPool;

发送消息时

  RecordAccumulator.RecordAppendResult result = accumulator.append(xxx);
  buffer = free.allocate(size, maxTimeToBlock); // 内存分配

总体架构

在KafkaProudcer初始化的时候,会创建一个32MB的缓冲池,buffer.memory参数可以自定义,同事缓冲池被分成多个块,一个块就是batch.size 默认就是16KB。
我们来分析下,在一个Kafka集群中 如果有3个Broker。那么当一个topic创建的时候,就是三个分区。
分区A: 分区B: 分区C: 三个分区分别存储消息 发送消息。所以在申请的时候,也是按照分区级别进行申请Batch内存块。
在这里插入图片描述
但是如果频繁的申请、发送完毕消息,被GC回收,其实是比较消耗资源的方式,所以更好的方式就是通过池化技术,
在这里插入图片描述
总体流程
1.申请之后发送完毕消息后,自动归还给BufferPool,避免内存块被频繁回收的问题。
在这里插入图片描述

基本属性

	// 总内存大小 32MB
    private final long totalMemory;
    // 每个内存块大小 batchSize 默认16K
    private final int poolableSize;
    // 申请、归还内存的方法的同步锁
    private final ReentrantLock lock;
    // 空闲内存块
    private final Deque<ByteBuffer> free;
    // 需要等待空闲内存块的事件
    private final Deque<Condition> waiters;
    /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
    // 缓冲池还未分配的空闲内存,新申请的内存块就是从这里获取内存值
    private long nonPooledAvailableMemory;

内存分配

申请内存

org.apache.kafka.clients.producer.internals.BufferPool#allocate

1.判断申请内存大小超过总内存大小 抛出异常
2.申请加锁,如果缓冲区已经关闭,直接释放锁,抛出异常
3.内存够的情况下,如果申请内存等于16K,并且缓冲区内存不为空
4.如果申请内存超过一个batch.size的大小,当前空闲内存总空间 以及回收的内存空间是否足够申请的内存大小
5.内存不够的情况下,申请一个condition 添加到waiter,不断收集空闲的内存,直到大于申请的内存,退出。在申请过程中,await进行阻塞等待。

    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("");

        ByteBuffer buffer = null;
        this.lock.lock();

        if (this.closed) {
            this.lock.unlock();
            throw new KafkaException("Producer closed while allocating memory");
        }

        try {
            // size大小等于batchSIze 并且free不为空 直接获取空闲内存块
            // 这里为什么必须是batchSize 因为如果大于batchSize的话,就无法满足,
            // 因为batchSize是固定值,不能超过batchSize
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // 已经回收的内存总大小 = 当前回收内存的个数 * batchSize
            int freeListSize = freeSize() * this.poolableSize;
            // 总空闲内存 大于等于 申请的内存
            if (this.nonPooledAvailableMemory + freeListSize >= size) {
                // we have enough unallocated or pooled memory to immediately
                // satisfy the request, but need to allocate the buffer
                freeUp(size);
                // 空闲内存 减去申请的内存大小
                this.nonPooledAvailableMemory -= size;
            // 内存足够的情况
            } else {
                // 内存不够的情况
                // we are out of memory and will have to block
                int accumulated = 0;
                // 创建本次等待的condition
                Condition moreMemory = this.lock.newCondition();
                try {
                    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                    // 添加到类型Deque的waiter中 -- 之后会唤醒
                    this.waiters.addLast(moreMemory);

                    //只有当超过申请内存大小 退出
                    while (accumulated < size) {
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            // 阻塞等待
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                        } finally {
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            recordWaitTime(timeNs);
                        }

                        if (this.closed)
                            throw new KafkaException("Producer closed while allocating memory");

                        if (waitingTimeElapsed) {
                            this.metrics.sensor("buffer-exhausted-records").record();
                            throw new BufferExhaustedException("xx")
                        }

                        remainingTimeToBlockNs -= timeNs;

                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            buffer = this.free.pollFirst();
                            accumulated = size;
                        } else {
                            freeUp(size - accumulated);
                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                            this.nonPooledAvailableMemory -= got;
                            accumulated += got;
                        }
                    }
                    accumulated = 0;
                } finally {
                    this.nonPooledAvailableMemory += accumulated;
                    this.waiters.remove(moreMemory);
                }
            }
        } finally {
            try {
                if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();
            } finally {
                lock.unlock();
            }
        }

        if (buffer == null)
            return safeAllocateByteBuffer(size);
        else
            return buffer;
    }

在这里插入图片描述

内存回收

内存释放的时候,加锁处理。然后判断规范内存等于batch.size 直接回收给free。

    public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            // 如果归还的内存块大小等于batchSize
            if (size == this.poolableSize && size == buffer.capacity()) {
                // 清空添加到缓冲池中,归还给缓冲池
                buffer.clear();
                this.free.add(buffer);
            } else {
                // 直接加在内存未分配的地址,等待JVM GC回收
                this.nonPooledAvailableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                // 唤醒第一个待分配的
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }

在这里插入图片描述

品一品其中的设计

1.恰到好处的避免频繁的不断的JVM GC,使用内存池的方式,到达资源的复用。
2.结合业务设计batch.size 不能无脑设置消息体大小。如果太大则会导致不断创建新的ByteBuffer 并且不会归还到缓冲池中。
3.配合多线程的等待/唤醒机制来实现同步。

参考文档

https://www.cnblogs.com/rwxwsblog/p/14754810.html
https://greedypirate.github.io/2020/05/02/kafka%E7%BC%93%E5%86%B2%E6%B1%A0-BufferPool-%E5%8E%9F%E7%90%86%E5%89%96%E6%9E%90/#%E5%89%8D%E8%A8%80

https://blog.csdn.net/huaxiawangyong/article/details/132389908

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2219430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

免费送源码:Java+SpringBoot+MySQL SpringBoot珠宝店客户关系管理系统 计算机毕业设计原创定制

摘 要 随着计算机技术的发展&#xff0c;特别是计算机网络技术与数据库技术的发展&#xff0c;使用人们的生活与工作方式发生了很大的改观。本课题研究的珠宝店客户关系管理系统&#xff0c;主要功能模块包括系统用户&#xff0c;部门类别&#xff0c;职务类别&#xff0c;外出…

【ssh】Mac 使用 ssh 连接阿里云报错:Connection reset by 8.155.1.xxx port 22

Mac 使用 ssh 连接阿里云报错&#xff1a;Connection reset by 8.155.1.xxx port 22 问题描述解决办法 问题描述 Connection reset by 8.155.1.xxx port 22解决办法 关掉代理 VPN

SpringCloudAlibaba升级手册

目录 1. 版本对照 版本现状 SpringCloud与AlibabaCloud对应版本 Springboot与Elasticsearch版本对应 2. openfeign问题 问题 解决方案 3. Feign请求问题 问题 解决方法 4. Sentinel循环依赖 问题 解决方案 5. bootstrap配置文件不生效 问题 解决方案 6. Nacos的…

visio导出pdf公式变形问题杂谈

其实不会变形。 我自己的情况是直接用edge PDF阅读器打开pdf看到的是公式有变形&#xff08;常见是字体、形状变了&#xff09;&#xff0c;但换一个pdf阅读器如adobe的就是正常的了 不过大家一般是用edge pdf阅读器直接打开查看&#xff0c;所以通过visio打印的方式导出pdf可…

DNS 与 ICMP

DNS(Domain Name System)快速了解 DNS 是一整套从域名映射到 IP 的系统 DNS 背景 TCP/IP 中使用 IP 地址和端口号来确定网络上的一台主机的一个程序. 但是 IP 地址不方便记忆 于是人们发明了一种叫主机名的东西, 是一个字符串, 并且使用 hosts 文件来描述主机 名和 IP 地…

【Hive】8-Hive性能优化及Hive3新特性

Hive性能优化及Hive3新特性 Hive表设计优化 Hive查询基本原理 Hive的设计思想是通过元数据解析描述将HDFS上的文件映射成表 基本的查询原理是当用户通过HQL语句对Hive中的表进行复杂数据处理和计算时&#xff0c;默认将其转换为分布式计算 MapReduce程序对HDFS中的数据进行…

基于排名的股票预测的关系时态图卷积网络(RT-GCN)

“ 为了充分利用股票之间的关系&#xff0c;获得最高收益&#xff0c;提出了一种关系时态图卷积网络(RT-GCN)。” 标题&#xff1a;Relational Temporal Graph Convolutional Networks for Ranking-Based Stock Prediction 链接&#xff1a;https://ieeexplore.ieee.org/do…

Android15之解决gdb:Remote register badly formatted问题(二百三十六)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

2024全国大数据与计算智能挑战赛火热报名中!

一年一度的 全国大数据与计算智能挑战赛震撼来袭&#xff01; 报名速通&#xff1a; https://www.datafountain.cn/special/BDSSF2024 大数据与决策&#xff08;国家级&#xff09;实验室连续三年组织发起全国大数据与计算智能挑战赛&#xff0c;旨在深入挖掘大数据应用实践中亟…

STM32传感器模块编程实践(九) VL53L0X激光红外测距传感器简介及驱动源码

文章目录 一.概要二.VL53L0X测距原理三.VL53L0X主要特性四.VL53L0X硬件参考设计五.模块接线说明六.模块通讯协议介绍七.光学盖玻片介绍八.STM32单片机与VL53L0模块实现距离测量实验1.硬件准备2.软件工程3.软件主要代码4.实验效果 九.小结 一.概要 VL53L0X是一款由ST&#xff0…

通过PHP与API的结合,开启电商数据集成的新篇章

在数字化转型的浪潮中&#xff0c;电子商务数据的集成对于企业来说变得越来越重要。无论是在线零售商还是品牌商&#xff0c;都需要实时访问商品数据以优化库存管理、制定定价策略、提升客户体验。PHP&#xff0c;作为服务端脚本语言的佼佼者&#xff0c;为开发者提供了强大的工…

调查显示软件供应链攻击增加

OpenText 发布了《2024 年全球勒索软件调查》&#xff0c;强调了网络攻击的重要趋势&#xff0c;特别是在软件供应链中&#xff0c;以及生成式人工智能在网络钓鱼诈骗中的使用日益增多。 尽管各国政府努力加强网络安全措施&#xff0c;但调查显示&#xff0c;仍有相当一部分企…

【verilog】3_8-4_16数选器

文章目录 前言一、实验原理二、实验过程三、实验结果参考文献 前言 进行 实验 一、实验原理 二、实验过程 三、实验结果 代码 timescale 1ns/1ns module decoder_38_tb ; reg a; reg b; reg c; wire [7:0]data;decoder_38 u0 (.a(a),.b(b),.c(c),.data(data) ); init…

pandas-使用技巧

pandas-使用技巧 简单技巧 仅个人笔记使用&#xff0c;感谢点赞关注 简单技巧 pd.to_dict()&#xff1a;Dataframe格式数据转字典数据pd.dropna()&#xff1a;去nan值& | ~&#xff1a;pd逻辑运算符pd.isnan()&#xff1a;判断是否为nan值pd.concat&#xff1a;多个pd拼接…

关于武汉芯景科技有限公司的限流开关芯片XJ6241开发指南(兼容LTC4411)

一、芯片引脚介绍 1.芯片引脚 二、系统结构图 三、功能描述 1.CTL引脚控制VIN和VOUT的通断 2.CTL引脚控制STAT引脚的状态 3.输出电压高于输入电压加上–VRTO的值&#xff0c;芯片处于关断状态

揭秘Map与Set的键值奥秘与集合魅力,解锁高效数据魔法

文章目录 前言➰一、关联式容器1.1 关联式容器的概述1.2 关联式容器的工作原理1.3 关联式容器的核心特性 ➰二、键值对2.1 键值对的基本概念2.2 键值对在C中的实现 ➰三、树形结构的关联式容器3.1 树形结构的特点3.2 使用场景 ➰四、set的使用与定义4.1 set的基本特性4.2 set的…

OpenCV高级图形用户界面(11)检查是否有键盘事件发生而不阻塞当前线程函数pollKey()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 轮询已按下的键。 函数 pollKey 无等待地轮询键盘事件。它返回已按下的键的代码或如果没有键自上次调用以来被按下则返回 -1。若要等待按键被按…

软件压力测试如何进行?深圳软件测试机构分享

软件压力测试是每个重要软件测试工作的一部分&#xff0c;是一种基本的软件质量保证行为。压力测试不是在常规条件下运行手动或自动测试&#xff0c;而是在计算机数量较少或系统资源匮乏的条件下运行测试。通常要进行软件压力测试的资源包括内部内存、CPU 可用性、磁盘空间和网…

算法day-1

数组&#xfeff; 数组是存放在连续内存空间上的相同类型数据的集合。数组的下标或者索引是从0开始的. 数组的优点 快速访问&#xff1a;通过索引可以在常数时间内&#xff08;O(1)&#xff09;访问数组中的任意元素。顺序存储&#xff1a;数组中的元素在内存中是连续的&…

Codeforces 979 Div2 A-D (D. QED‘s Favorite Permutation详解)

比较开心能做出D A 原题 A. A Gift From Orangutan 思路 找到最大值最小值差值乘n - 1 即可 代码 #include <bits/stdc.h> #define int long long#define F(i, a, b) for (int i (a); i < (b); i) #define dF(i, a, b) for (int i (a); i > (b); i--)using…