Kafka源码分析之Producer数据发送流程(四)

news2025/1/8 6:40:50

概述

书接上回的producer发送流程,在准备工作完成后,kafka的producer借助SenderKafkaClient两大组件完成了数据的发送。其底层封装了java的NIO的组件channle以及selector,对于NIO组件不太熟悉的同学可以自行查询相关文档。

下面我整理了kafka发送数据的主要流程,下面是流程图:

下面根据上面的流程图一步一步剖析kafka的源码。

源码分析

接上第一节的producer主流程,这里代码唤醒sender完成消息发送。并且producer的所有和kafka服务端交互的都是在sender中完成的。

     //6.发送消息。
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

 sender实现了Runnable接口,所以只需要看run方法:

//一次启动,一直运行(除非关闭)    
public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
.....



   void runOnce() {

        long currentTimeMs = time.milliseconds();
        //发送请求
        long pollTimeout = sendProducerData(currentTimeMs);
       //poll NIO事件
        client.poll(pollTimeout, currentTimeMs);
    }

进入sendProducerData方法,下面标注了主要的流程:

 private long sendProducerData(long now) {
        //1.获取元数据
        Cluster cluster = metadata.fetch();
        //2.获取到发送消息的主机leader
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic, now);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        // remove any nodes we aren't ready to send to
        //3.遍历主机
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                // 4.移除发送消息的服务器
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        // create produce requests
        // 5.合并发往同一主机的请求,第一次请求batches为空
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        addToInflightBatches(batches);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
        // 6.超时
        accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);

        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }
        sensors.updateProduceRequestMetrics(batches);

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
        // that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
       //7.发送数据
        sendProduceRequests(batches, now);
        return pollTimeout;
    }

第一次请求时会走到 if (!this.client.ready(node, now)),然后检查网络连接是否准备好,然后通过代理的NIO Selector初始化网络连接,并注册channel监听事件:

registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
    public boolean ready(Node node, long now) {
        if (node.isEmpty())
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
        // 网络是否连接好
        if (isReady(node, now))
            return true;
        //是否可以建立网络连接
        if (connectionStates.canConnect(node.idString(), now))
            // if we are interested in sending to a node and we don't have a connection to it, initiate one
            // 初始化连接
            initiateConnect(node, now);

        return false;
    }


  private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            connectionStates.connecting(nodeConnectionId, now, node.host());
            InetAddress address = connectionStates.currentAddress(nodeConnectionId);
            log.debug("Initiating connection to node {} using address {}", node, address);    
            // 封装了NIO的selector,相当于加了一层代理
            selector.connect(nodeConnectionId,
                    new InetSocketAddress(address, node.port()),
                    this.socketSendBuffer,
                    this.socketReceiveBuffer);
        } catch (IOException e) {
            log.warn("Error connecting to node {}", node, e);
            // Attempt failed, we'll try again after the backoff
            connectionStates.disconnected(nodeConnectionId, now);
            // Notify metadata updater of the connection failure
            metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
        }
    }

网络连接 建立完成回到主流程,此时返回ready方法翻译false,移除所有发送消息服务器。

   if (!this.client.ready(node, now)) {
                // 4.移除发送消息的服务器
                iter.remove();

 这时将获取不到批次,退回到runOnce方法中。然后进入

client.poll(pollTimeout, currentTimeMs)方法。
这里就到了网络请求处理组件NetworkClient了。

核心方法:this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

在进入Selector里面去,这里核心方法是:

pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);

这里监听了各类事件,然后对不同事件进行了处理。

通过TransportLayer代理对象,最终调用NIO的socketChannel完成网络连接,并且通过取反或的方式移除了连接事件,注册读取数据事件。

   public boolean finishConnect() throws IOException {
        //完成网络连接
        boolean connected = socketChannel.finishConnect();
        if (connected)
            // 移除OP_CONNECT,添加OP_READ事件
            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        return connected;
    }

然后while再次循环,进入sendProducerData方法。

 下面是发送请求数据方法流程:

sendProduceRequests(batches, now); >> 
client.send(clientRequest, now); >>
doSend(request, false, now); >>
doSend(clientRequest, isInternalRequest, now, builder.build(version)); >>

讲发送的数据封装到inFlightRequest中,然后调用selector发送请求。

 //最大容忍多少个请求没有响应,默认是5个
 private final InFlightRequests inFlightRequests;
 
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (log.isDebugEnabled()) {
            log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
                clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
        }
        Send send = request.toSend(header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        selector.send(new NetworkSend(clientRequest.destination(), send));
    }

这里最后给selector绑定了读取数据的事件:

    public void send(NetworkSend send) {
        String connectionId = send.destinationId();
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        if (closingChannels.containsKey(connectionId)) {
            // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
            this.failedSends.add(connectionId);
        } else {
            try {
               //核心方法
                channel.setSend(send);
            } 
            ...
      
        }
    }


public void setSend(NetworkSend send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
        this.send = send;
       //绑定读取数据的事件
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

 public void addInterestOps(int ops) {
        key.interestOps(key.interestOps() | ops);

    }

然后又走到 client的poll方法,然后selector写数据:

// 写请求数据
attemptWrite(key, channel, nowNanos);
//移除写事件
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

最后就是读取服务器的响应数据:

//读取服务端事件,之前已经注册了读事件
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel)
        && !explicitlyMutedChannels.contains(channel)) {
    attemptRead(channel);
}

读取数据时kafka采用了4字节标识数据长度来避免粘包黏包的问题:

 通过下面代码保证必须读取到完整的数据才会返回。 

     //4字节表示数据大小
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {
                size.rewind();
                //4字节
                int receiveSize = size.getInt();
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
                if (receiveSize == 0) {
                    buffer = EMPTY_BUFFER;
                }
            }
        }

 public boolean complete() {
        //是否读完
        return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/431983.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从0搭建Vue3组件库(二):Monorepo项目搭建

本篇文章是从0搭建Vue3组件库系列文章第二篇,本篇文章将带领大家使用pnpm搭建一个简单的Monorepo项目,并完成包的关联与测试 什么是 Monorepo 其实很简单,就是一个代码库里包含很多的项目,而这些项目虽然是相关联的,但是在逻辑上是独立的,可以由不同人或者团队来维护 为什么…

Scala之集合(1)

目录 ​​​​​​​集合介绍&#xff1a; 不可变集合继承图&#xff1a;​编辑 可变集合继承图 数组&#xff1a; 不可变数组&#xff1a; 样例代码&#xff1a; 遍历集合的方法&#xff1a; 1.for循环 2.迭代器 3.转换成List列表&#xff1a; 4.使用foreach()函数&a…

WebServer项目(二)->linux网络编程基础知识

WebServer项目->linux网络编程基础知识其中&#xff0c;遇到的错误总结1). read&#xff1a;Connection reset by peer2).什么叫连接被重置&#xff1f;1. socket 介绍2. 字节序从主机字节序到网络字节序的转换函数&#xff1a;htons、htonl&#xff1b; 从网络字节序到主机…

科创人·中建三局一公司尹奎:数字化变革能创造全新行业,其意义超越形式、范式创新

尹奎 中建三局一公司技术中心主任 教授级高级工程师&#xff0c;BIM领域资深专家&#xff0c;完成10余个基于BIM的相关研究课题&#xff0c;获省部级以上科技进步奖 13 项&#xff1b;公开出版专著3部&#xff0c;参与编写“十二五”国家重点图书出版规划项目《BIM应用施工》&a…

CCS5.5环境设置

CCS5.5环境设置 文件编码格式设置利用断点导入*.dat文件先用Python生成*.dat文件DSP代码&#xff08;sys/bios&#xff09; 步骤利用strip6x工具去除**.out文件中的调试信息硬件跟踪功能应用名词解释使用方法 ccs显示图片参数设置 文件编码格式设置 可分别对工作空间、工程、单…

Mysql下载安装

1.Mysql官网下载 MySQLhttps://www.mysql.com/ 有商业版和社区版&#xff0c;商业版使用收费&#xff0c;有试用期&#xff0c;社区版免费&#xff0c;选择社区版即可&#xff1a; 点击MySQL社区服务器&#xff1a; 选择要安装的版本&#xff1a; 进行下载即可&#xff1a; 2…

Docker设置http proxy代理

需求&#xff1a; 由于公司服务器无法正常访问公网&#xff0c;想要下载一些外部依赖包需要配置公司的内部代理。 Docker构建镜像或拉取镜像时需要通过代理访问外网&#xff0c;可以按照以下步骤设置HTTP代理 目录 创建目录 创建并编辑配置文件 重新加载Docker服务配置 重启…

Devops流程探究

1、DevOps面向对象 软件开发是由开发团队和运维团队共同协同配合才能完成一个软件的开发。 2、开发团队和运维团队 开发团队主要负责软件的开发和迭代更新&#xff0c;运维团队则是负责测试和部署上线。 3、解决问题 但是&#xff0c;这样会存在一个问题&#xff0c;只有当…

第五章 数据链路层与局域网

数据链路层服务 组帧&#xff1b;将要传输的数据封装成帧链路接入&#xff1b;可分为点对点链路&#xff08;独占&#xff09;和广播链路&#xff08;共享&#xff09;可靠交付&#xff1b;即在相邻节点间经数据链路实现数据报的可靠传输差错控制&#xff1b;检错纠错 差错控…

前端基础复习

1.什么叫HTML5&#xff1f;和原本的所说的HTML有什么区别&#xff1f; 本质上html和html5是一样的的。区别有&#xff1a; 1. 在文档类型声明上 HTML4.0 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loos…

密码加密之bcrypt

在这里是用的bcrypt加密算法&#xff0c;这种现在比较流行 而且无法进行解密 引入依赖 <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</ar…

JetBrains AppCode 2023.1 (macOS x64、aarch64) - 适用于 iOS/macOS 开发的智能 IDE

Xcode 14.3 compatibility, Swift refactorings and intentions, the IDE’s UI, and Kotlin Multiplatform Mobile. 请访问原文链接&#xff1a;https://sysin.org/blog/jb-appcode-2023/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a…

技术分享 | OMS 初识

作者&#xff1a;高鹏 DBA&#xff0c;负责项目日常问题排查&#xff0c;广告位长期出租 。 本文来源&#xff1a;原创投稿 *爱可生开源社区出品&#xff0c;原创内容未经授权不得随意使用&#xff0c;转载请联系小编并注明来源。 本文主要贡献者&#xff1a;进行OMS源码分析的…

GPT-4,大增长时代的序幕

虽然我们早在 2017 年就预测了超大模型的到来&#xff0c;因此才搞了分布式深度学习框架 OneFlow&#xff08;github.com/Oneflow-Inc/oneflow/&#xff09;&#xff0c;且 2020 年的 GPT-3 也掀起了大模型热潮&#xff08;OneFlow—— 让每一位算法工程师都有能力训练 GPT&…

BGP相关实验

实验要求及其拓扑图 划分好IP的拓扑图 实验分析 1,AS1存在两个环回&#xff0c;一个地址为192.168.1.0/24该地址不能在任何协议中宣告&#xff0c;AS3中存在两个环回&#xff0c;一个地址为192.168.2.0/24该地址不能在任何协议中宣告&#xff0c;最终要求这两个环回可以互相…

30天学会《Streamlit》(4)

30学会《Streamlit》是一项编码挑战&#xff0c;旨在帮助您开始构建Streamlit应用程序。特别是&#xff0c;您将能够&#xff1a; 为构建Streamlit应用程序设置编码环境 构建您的第一个Streamlit应用程序 了解用于Streamlit应用程序的所有很棒的输入/输出小部件 第4天 - st…

【NLP】自然语言处理_NLP入门——分词和词性标注

【NLP】自然语言处理_NLP入门——分词和词性标注 文章目录 【NLP】自然语言处理_NLP入门——分词和词性标注1. 介绍2. 概念和工具2.1 分词2.2 词性标注2.3 NLTK2.4 Jieba2.5 LAC 3. 代码实现举例3.1 分词3.1.1 使用nltk进行分词3.1.2 使用jieba进行分词3.1.3 使用LAC进行分词 3…

定义全局变量property与getprop

authordaisy.skye的博客_CSDN博客-Qt,嵌入式,Linux领域博主 adb调试 adb shell getprop .adb logcat 报错 init: sys_prop: permission denied uid:1006 name:ro.camera.gc02m1 在linux驱动中查找 find ./ -name *.c | xargs grep -n "property_set" find ./ -n…

《2023金融科技·校园招聘白皮书》新鲜出炉|牛客独家

数智创新时代&#xff0c;科技人才为先。 眼下&#xff0c;在建设“数字中国”的时代背景下&#xff0c;金融行业全面数智化转型已箭在弦上。政策端&#xff0c;金融行业为中共中央、国务院印发《数字中国建设整体布局规划》的7大重点行业之一。 资本端&#xff0c;仅2022年三…

【C++ 四】函数、指针

函数、指针 文章目录 函数、指针前言1 函数1.1 概述1.2 函数定义1.3 函数调用1.4 值传递1.5 函数常见样式1.6 函数声明1.7 函数分文件编写 2 指针2.1 指针基本概念2.2 指针变量定义和使用2.3 指针所占内存空间2.4 空指针和野指针2.5 const 修饰指针2.6 指针和数组2.7 指针和函数…