Kafka RecordAccumulator 三 高并发写入数据

news2024/9/21 0:43:30

Kafka RecordAccumulator 三 高并发写入数据

在这里插入图片描述

在这里插入图片描述

首先我们客户端会通过多线程的方式来发送消息(一般业务需求可能会通过业务系统或者大数据流计算系统如Spark Streaming或者Flink将业务数据发送出去,进而让下游系统消费使用),那这里业务日志可能数据量巨大,会开多线程进行发送数据,这里就会涉及两个问题:

  • 线程安全

  • 高并发,高吞吐

最简单的方式就是将Append这个方法全局加锁,变成 synchronized append(xxxxx), 但是这样子就会导致锁对整个函数方法的内容加锁,锁了太多内容了,那怎么办呢,就执行分段加锁。

分段加锁:

其大致的样子如下:

append(
xxxxx

synchronized(dp) {
    xxxx
}

xxxx

synchronized(dp) {
    xxxx
}


)

分析第一次数据发送的执行流程

  • 因为数据是第一次发送进来,那么实际上它对应的分区的对列还没创建出来

    Deque<RecordBatch> dq = getOrCreateDeque(tp);
    

    首先这段代码是线程安全的,因为内部用的是之前分析的CopyOnWriteMap这种读写分离高性能读的线程安全的数据结构, 然后会第一次创建出这个这个分区的一个空对列

  • 第一次尝试发送数据

    synchronized (dq) {
                    //线程一进来了
                    //线程二进来了
                    if (closed)
                        throw new IllegalStateException("Cannot send after the producer is closed.");
                    /**
                     * 步骤二:
                     *      尝试往队列里面的批次里添加数据
                     *
                     *      一开始添加数据肯定是失败的,我们目前只是以后了队列
                     *      数据是需要存储在批次对象里面(这个批次对象是需要分配内存的)
                     *      我们目前还没有分配内存,所以如果按场景驱动的方式,
                     *      代码第一次运行到这儿其实是不成功的。
                     */
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                    //线程一 进来的时候,
                    //第一次进来的时候appendResult的值就为null
                    if (appendResult != null)
                        return appendResult;
                }//释放锁
    

    加锁确保这段代码线程安全, 重点分析下tryAppend方法

    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
            //首先要获取到队列里面一个批次
            RecordBatch last = deque.peekLast();
            //第一次进来是没有批次的,所以last肯定为null
    
            //线程二进来的时候,这个last不为空
            if (last != null) {
                //线程二就插入数据就ok了
                FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                if (future == null)
                    last.records.close();
                else
                    //返回值就不为null了
                    return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
            }
       
            return null;
        }
    

因为是第一次进来,那么当前分区还只有一个空对列。因此从这个空对列的队尾获取批次一定是空的,所以last是空的,就直接返回了。

  • 因为第一次上面代码返回的是空,所以下面代码继续,这里就需要执行申请内存空间

    int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
                
                ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
    
  • 第一次申请内存后,执行创建批次,写数据的内容。这里面加锁

    synchronized (dq) {
                   
                    if (closed)
                        throw new IllegalStateException("Cannot send after the producer is closed.");
                    /**
                     * 步骤五:
                     *      尝试把数据写入到批次里面。
                     *      代码第一次执行到这儿的时候 依然还是失败的(appendResult==null)
                     *      目前虽然已经分配了内存
                     *      但是还没有创建批次,那我们向往批次里面写数据
                     *      还是不能写的。
                     *
                     */
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                    //失败的意思就是appendResult 还是会等于null
                    if (appendResult != null) {
                        //释放内存
    
                        //线程二到这儿,其实他自己已经把数据写到批次了。所以
                        //他的内存就没有什么用了,就把内存个释放了(还给内存池了。)
                        free.deallocate(buffer);
                        return appendResult;
                    }
                    /**
                     * 步骤六:
                     *  根据内存大小封装批次
                     *
                     *
                     *  线程一到这儿 会根据内存封装出来一个批次。
                     */
                    MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                    RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                    //尝试往这个批次里面写数据,到这个时候 我们的代码会执行成功。
    
                    //线程一,就往批次里面写数据,这个时候就写成功了。
                    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
                    /**
                     * 步骤七:
                     *  把这个批次放入到这个队列的队尾
                     *
                     *
                     *  线程一把批次添加到队尾
                     */
                    dq.addLast(batch);
                    incomplete.add(batch);
                    return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
                }//释放锁
    

一开始还是执行RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);

执行这里还是空,因为批次对象还没创建出来,所以返回的是null,所以它后面开始执行创建批次对象了:

MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());

那么一旦批次对象创建出来了后,在执行写数据,那么就OK了,第一次数据就可以写进去了。

FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); 再把批次对象放到队尾,方便后面这个分区数据再次写进来的时候直接从队尾取出这个批次对象,塞数据用。

分析多线程发送数据时,如何确保高并发高性能

假设我们有三个线程,并且在假设正好每个线程发送的数据 正好都是同一个分区

  • 获取对列

    Deque<RecordBatch> dq = getOrCreateDeque(tp);
    

假设线程1创建出了对列,并写入到batches里面,另外两个线程就可以直接获取到该对垒对象了。

  • 尝试首次写数据

    synchronized (dq) {
                    //线程一进来了
                    //线程二进来了
                    if (closed)
                        throw new IllegalStateException("Cannot send after the producer is closed.");
                    /**
                     * 步骤二:
                     *      尝试往队列里面的批次里添加数据
                     *
                     *      一开始添加数据肯定是失败的,我们目前只是以后了队列
                     *      数据是需要存储在批次对象里面(这个批次对象是需要分配内存的)
                     *      我们目前还没有分配内存,所以如果按场景驱动的方式,
                     *      代码第一次运行到这儿其实是不成功的。
                     */
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                    //线程一 进来的时候,
                    //第一次进来的时候appendResult的值就为null
                    if (appendResult != null)
                        return appendResult;
                }//释放锁
    

    假设线程1 执行这段代码,因为第一次所以返回空,释放锁,正好被线程2抢到,然后线程2也是第一次进来,所以返回还是空,释放锁,然后线程3抢到锁,也是第一次进来,所以返回还是空。

  • 每个申请都会执行申请内存

    int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
                /**
                 * 步骤四:
                 *  根据批次的大小去分配内存
                 *
                 *
                 *  线程一,线程二,线程三,执行到这儿都会申请内存
                 *  假设每个线程 都申请了 16k的内存。
                 *
                 *  线程1 16k
                 *  线程2 16k
                 *  线程3 16k
                 *
                 */
                ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
    

获取size这个不加锁,后面申请内存部分会加锁。假设线程1线抢到锁,然后申请了16K的内存释放锁,然后线程2抢到锁,申请16K内存释放锁,再然后线程3抢到锁,申请了16K内存,释放锁。这样每个线程多申请了各自的内存.

  • 执行创建批次对象,写数据

    synchronized (dq) {
                    //假设线程一 进来了。
                    //线程二进来了
                    // Need to check if producer is closed again after grabbing the dequeue lock.
                    if (closed)
                        throw new IllegalStateException("Cannot send after the producer is closed.");
                    /**
                     * 步骤五:
                     *      尝试把数据写入到批次里面。
                     *      代码第一次执行到这儿的时候 依然还是失败的(appendResult==null)
                     *      目前虽然已经分配了内存
                     *      但是还没有创建批次,那我们向往批次里面写数据
                     *      还是不能写的。
                     *
                     *   线程二进来执行这段代码的时候,是成功的。
                     */
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                    //失败的意思就是appendResult 还是会等于null
                    if (appendResult != null) {
                        //释放内存
    
                        //线程二到这儿,其实他自己已经把数据写到批次了。所以
                        //他的内存就没有什么用了,就把内存个释放了(还给内存池了。)
                        free.deallocate(buffer);
                        return appendResult;
                    }
                    /**
                     * 步骤六:
                     *  根据内存大小封装批次
                     *
                     *
                     *  线程一到这儿 会根据内存封装出来一个批次。
                     */
                    MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                    RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                    //尝试往这个批次里面写数据,到这个时候 我们的代码会执行成功。
    
                    //线程一,就往批次里面写数据,这个时候就写成功了。
                    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
                    /**
                     * 步骤七:
                     *  把这个批次放入到这个队列的队尾
                     *
                     *
                     *  线程一把批次添加到队尾
                     */
                    dq.addLast(batch);
                    incomplete.add(batch);
                    return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
                }//释放锁
    

假设线程1先获取到锁,所以线程1会在整个锁的生命周期内做完所有的事情,并且把该分区的批次对象放到对列的尾巴,这时候线程1执行完之后,释放锁,假设此时线程2获取到锁,开始执行代码,首先第一步线程2尝试append数据:

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
        //首先要获取到队列里面一个批次
        RecordBatch last = deque.peekLast();
        //第一次进来是没有批次的,所以last肯定为null

        //线程二进来的时候,这个last不为空
        if (last != null) {
            //线程二就插入数据就ok了
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
            if (future == null)
                last.records.close();
            else
                //返回值就不为null了
                return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
        }
        //返回结果就是一个null值
        return null;
    }

此时因为线程1已经在对列尾部添加了批次对象,所以线程2直接从队尾取已经不是空了,所以执行last.append真正开始加数据进去了

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            //TODO 往批次里面去写数据
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

加完之后返回RecordAppendResult对象,代表已经成功加到批次里。

然后就是很重要的地方加成功到批次里了,就可以释放申请的内存了

 if (appendResult != null) {
                    //释放内存

                    //线程二到这儿,其实他自己已经把数据写到批次了。所以
                    //他的内存就没有什么用了,就把内存个释放了(还给内存池了。)
                    free.deallocate(buffer);
                    return appendResult;
                }

经过这一系列的做法就可以高效支撑多个并发的数据写入

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/60440.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】进程状态|僵尸进程 |孤儿进程

索引运行状态&#xff1a;阻塞状态挂起状态看看Linux是怎么做的运行状态R睡眠状态S停止状态T两个特殊的进程&#xff1a;僵尸进程孤儿进程在之前我们听过很多很多进程的状态&#xff0c;像是运行、新建、就绪、挂起、阻塞、等待、停止、挂机、死亡等等。推荐阅读&#xff1a;通…

http协议之digest(摘要)认证,详细讲解并附Java SpringBoot源码

目录 1.digest认证是什么&#xff1f; 2.digest认证过程 3.digest认证参数详解 4.基于SpringBoot实现digest认证 5.digest认证演示 6.digest认证完整项目 7.参考博客 1.digest认证是什么&#xff1f; HTTP通讯采用人类可阅读的文本格式进行数据通讯&#xff0c;其内容非…

Android入门第40天-Android中的Service(SimpleStartService)

简介 博文总阅读量已经突破了300万&#xff0c;给自己加油打CALL。 从今天开始&#xff0c;之前39天的Android如果每一篇只有30分钟就能读完和掌握那么从今天开始越往后会越复杂。因为我们的Android教程开始进入“中级”难度了。特别是Service&#xff0c;这个Service我要分成…

java面向对象的三大特性之封装和继承(配视频讲解)

&#x1f345;程序员小王的博客&#xff1a;程序员小王的博客 &#x1f345;程序员小王的资源博客&#xff1a;http://wanghj.online/ &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 如有编辑错误联系作者&#xff0c;如果有比较好的文章欢迎…

JMeter入门教程(13) --事务

文章目录1.任务背景2.任务目标3.任务实操3.1.1 事务控制器3.2.2循环控制器1.任务背景 JMeter中的事务是通过事务控制器实现的。&#xff0c;为了衡量服务器对某一个或一系列操作处理的响应时间&#xff0c;需要定义事务。下面我们详细介绍在JMeter中如何使用事务 2.任务目标 …

基于JSP的网络教学平台的设计与实现

目 录 摘 要 I Abstract II 一、 引言 1 &#xff08;一&#xff09;项目开发的背景 1 &#xff08;二&#xff09;项目开发的意义 1 二、可行性分析及总体设计原则 3 &#xff08;一&#xff09;可行性分析 3 1&#xff0e;技术可行性 3 2&#xff0e;经济可行性 3 3&#xff…

MATLAB算法实战应用案例精讲-【图像处理】目标检测(附实战案例及代码实现)

前言 目标检测,也叫目标提取,是一种基于目标几何和统计特征的图像分割。它将目标的分割和识别合二为一,其准确性和实时性是整个系统的一项重要能力。尤其是在复杂场景中,需要对多个目标进行实时处理时,目标自动提取和识别就显得特别重要。 随着计算机技术的发展和计算机视…

Servlet —— Tomcat, 初学 Servlet 程序

JavaEE传送门JavaEE HTTP —— HTTP 响应详解, 构造 HTTP 请求 HTTPS —— HTTPS的加密方式 目录TomcatServlethello world创建项目引入 Servlet 依赖创建目录结构编写代码doGet打包程序部署程序验证程序Tomcat Tomcat 是一个 HTTP 服务器 HTTP 客户端, 就是大家平时用到的浏…

大数据hadoop_HDFS概述(1)

文章目录1. HDFS概述1.1 HDFS背景1.2 定义2. HDFS优缺点2.1 优点2.2 缺点3. HDFS架构4. HDFS文件块大小1. HDFS概述 1.1 HDFS背景 面对今天的互联网公司&#xff0c;每天都会有上亿次的用户访问量&#xff0c;用户每进行一次操作&#xff0c;都会产生数据&#xff0c;面对传统…

Android入门第41天-Android中的Service(bindService)

介绍 在前一天我们介绍了Android中有两种启动Service的方法。并擅述了startService和bindService的区别。同时我们着重讲了startService。 因此今天我们就来讲bindService。bindService大家可以认为它是和Android的一个共生体。即这个service所属的activity如果消亡那么bindS…

Docker涉及的Linux命名空间、CGroups

概述 Linux的NameSpace介绍 很多编程语言都包含了命名空间的概念&#xff0c;我们可以认为命名空间是一种封装&#xff0c;封装本身实现了代码的隔离。在操作系统中命名空间提供的是系统资源的隔离&#xff0c;其中系统资源包括了&#xff1a;进程、网络、文件系统…实际上li…

《模拟电子技术》半导体原理部分笔记

《模拟电子技术》笔记绪论第一章 常用半导体器件第二章 基本放大电路绪论 有的人把三极管的出现作为电子技术工业革命的开始标志学习架构&#xff1a;半导体器件&#xff08;二极管、三极管、场效应晶体管&#xff09;、基于上述管的放大电路、集成运算放大器、放大电路的频率…

第11章 初识IdentityServer4

1 构建IdentityServer4 服务 1.1 通过配置类配置类(Config)实例化IdentityServer4中间件 using IdentityServer4.Models; namespace BuilderServer { /// <summary> /// 【配置--类】 /// <remarks> /// 摘要&#xff1a; /// 通过该中类的方法成员&#xff…

如何给firefox和google chrome鼠标手势

背景 已经习惯了有鼠标手势&#xff0c;因为一天到晚都在浏览器上查询资料&#xff0c;所以必须把这个鼠标手势设置好。 firefox 搜索Foxy Gestures然后安装 google chrome crxMouse Chrome 点击google浏览器上的扩展程序图标&#xff0c;然后点击管理扩展程序&#xff1a…

游泳耳机哪个牌子好、分享几款游泳听音乐最好的耳机推荐

如今,水上运动爱好者越来越多了,无论是游泳,还是冲浪早已成为了我们很多人经常参加的运动项目。不过他们都抱怨过类似的问题——可以在水上运动中使用的无线耳机实在是太少了。防水性能达不到可游泳级别,不带内存需要配备手机使用,这些都是造成耳机无法在水上使用的原因。今天小…

【matplotlib】1-使用函数绘制图表

文章目录使用函数绘制图表1.绘制matplotlib图表组成元素的主要函数2.准备数据3.函数用法3.1函数plot()--展现变量的趋势变化3.2函数scatter()--寻找变量之间的关系3.3函数xlim()--设置x轴的数值显示范围3.4函数xlabel()--设置x轴的标签文本3.5 函数grid()--绘制刻度线的网格线3…

Kong(三)Konga UI安装和使用

一 konga 的github地址 konga安装参考 ① Kong 可视化UI 选择 官方kong-dashboard 1&#xff09;收费&#xff1a;当前kong的社区版是没有dashboard的,但是付费的企业版是有带的2&#xff09;kong-dashboard最新版本v3.6.0,只支持到kongv0.14.x,对于更高的kong版本,功能支…

SpringBoot整合RabbitMQ,实现单机抢票系统

MQ全称为Message Queue, 消息队列&#xff08;MQ&#xff09;是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表&#xff0c;一端往消息队列中不断写入消息&#xff0c;而另一端则可以读取队列中的消息。 消息中间件最主要的作用是解耦&#xff0c;中间…

C与C++中的常用符号与标点用法详解及实例

C与C中的常用符号与标点符号有&#xff1a;“”、“-”、“*”、 “/”、“%”、“&”、“\”、“|”、“~”、“^”、“&”、“|”、“&#xff01;”、“>”、“<”、""、“#”、“&#xff1f;”、“&#xff0c;”、“.”、“&#xff1a;”、单引…

d3rlpy离线强化学习算法库安装及使用

GitHub - takuseno/d3rlpy: An offline deep reinforcement learning library d3rlpy&#xff0c;离线强化学习算法库 我装在windows下用anaconda&#xff0c;按照官网教程 conda install -c conda-forge d3rlpy 第一次安装报错CondaSSLError: OpenSSL appears to be unavaila…