【Kafka从成神到升仙系列 五】面试官问我 Kafka 生产者的网络架构,我直接开始从源码背起.......

news2025/1/16 12:38:02
  • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,Java领域新星创作者
  • 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙
  • 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人

在这里插入图片描述

文章目录

  • Kafka 生产者的网络架构
    • 一、引言
    • 二、网络架构模型
    • 三、网络架构整体流程
    • 四、网络架构源码剖析
      • 1、KafkaProducer
        • 1.1 waitOnMetadata
        • 1.2 accumulator.append
        • 1.3 sender.wakeup
      • 2. Sender
        • 2.1 accumulator.ready
        • 2.2 metadata.requestUpdate
        • 2.3 remove any nodes
        • 2.4 accumulator.drain
        • 2.5 createProduceRequests
        • 2.6 client.send
        • 2.7 client.poll
      • 3. NetworkClient
        • 3.1 send
        • 3.2 poll
      • 4. Selector
        • 4.1send
        • 4.2 poll
          • 4.2.1 clear
          • 4.2.2 select
          • 4.2.3 pollSelectionKeys
          • 4.2.4 addToCompletedReceives
    • 五、总结


Kafka从成神到成仙系列

  • 【Kafka从成神到升仙系列 一】Kafka源码环境搭建
  • 【Kafka从成神到升仙系列 二】生产者如何将消息放入到内存缓冲区
  • 【Kafka从成神到升仙系列 三】你真的了解 Kafka 的元数据嘛
  • 【Kafka从成神到升仙系列 四】你真的了解 Kafka 的缓存池机制嘛

Kafka 生产者的网络架构

初学一个技术,怎么了解该技术的源码至关重要。

对我而言,最佳的阅读源码的方式,那就是:不求甚解,观其大略

你如果进到庐山里头,二话不说,蹲下头来,弯下腰,就对着某棵树某棵小草猛研究而不是说先把庐山的整体脉络研究清楚了,那么你的学习方法肯定效率巨低而且特别痛苦。

最重要的还是慢慢地打击你的积极性,说我的学习怎么那么不 happy 啊,怎么那么没劲那,因为你的学习方法错了,大体读明白,先拿来用,用着用着,很多道理你就明白了。

先从整体上把关源码,再去扣一些细节问题。

举个简单的例子:

如果你刚接触 HashMap,你刚有兴趣去看其源码,在看 HashMap 的时候,有一个知识:当链表长度达到 8 之后,就变为了红黑树,小于 6 就变成了链表,当然,还和当前的长度有关。

这个时候,如果你去深究红黑树、为什么是 8 不是别的,又去查 泊松分布,最终会慢慢的搞死自己。

所以,正确的做法,我们先把这一部分给略过去,知道这个概念即可,等后面我们把整个庐山看完之后,再回过头抠细节。

当然,本章我们讲述 Kafka 生产者的网络架构

一、引言

kafka生产端的组成主要由以下几方面构成:

  • 生产端的初始化
  • 元数据的更新
  • 缓存池(BufferPool)机制
  • 网络架构模型
  • 消息发送

其中,我们 生产端的初始化、元数据的更新缓存池(BufferPool)机制已经介绍完毕,今天我们来看看 网络架构模型

废话不多说,老司机开始发车

二、网络架构模型

从我们之前的讲解中,我们可以知道,生产端最重要的几个技术点:

  • KafkaProducer:主要将消息发送至 RecordAccumulator 并唤醒 Sender
  • Sender:调用 NetworkClientRecordAccumulator 的消息发送至 Broker
  • NetworkClient:KafkaJava NIO 的封装

而正是它们几个组成了 Kafka 生产者的网络架构,其网络模型如下:

在这里插入图片描述

不难看出,我们 Kafka 生产者最终的网络架构也是使用的 Java NIO,和我们的 Netty 殊途同归。

至于 kafka 为什么不用 Netty 做通信组件,这个之间在 【Kafka从成神到升仙系列 三】你真的了解 Kafka 的元数据嘛 已经讲过,此处不再叙述,有兴趣的同学可以跳转阅读。

三、网络架构整体流程

上面我们了解了 Kafka 生产端的几个网络组件及其对应的关系

我们深入的看一下,这几个组件之间到底是如何进行数据的处理及业务的处理的

网络架构整体流程如下所示:

在这里插入图片描述

这里涉及的主要几个方法:

  • KafkaProducer
    • waitOnMetadata:等待更新元数据
    • accumulator.append:消息发送到缓冲区
    • sender.wakeup:唤醒 Sender 线程
  • Sender
    • accumulator.ready:得到符合发送规定的节点
    • metadata.requestUpdate:是否更新元数据
    • remove any nodes:删除尚未建立连接的节点
    • accumulator.drain:得到每个节点需要发送的消息批次
    • createProduceRequests:组装成客户端请求
    • client.send:调用 NetworkClient 设置事件类型
    • client.poll:调用 NetworkClient 发送消息
  • NetworkClient
    • send:调用 Selector 设置事件类型
    • poll:调用 Selector 发送消息
  • Kafka-Selector
    • send:设置事件类型
    • poll:发送消息

可能大多数的小伙伴这个时候已经有点晕了,没关系,我们本篇文章就是解决你晕的问题的

我们会从 Producer 的源码一直会讲到 Selector 的源码并最终通过打日志的方式验证我们的猜想

戴好安全带,我们发车了

四、网络架构源码剖析

1、KafkaProducer

对于 KafkaProducer 来说,其最重要的功能就是将 record 发送至我们的 RecordAccumulator 中去

1.1 waitOnMetadata

这个方法相信看过上篇博客:【Kafka从成神到升仙系列 三】你真的了解 Kafka 的元数据嘛,已经有印象

对,没错,这个就是我们 kafka 在发送消息时,会优先请求 Broker 获取元数据信息,然后再去发送消息

具体细节的话,这里也不叙述了

总之:第一次发送消息时,这里会判断当前是否拿到了元数据。如果没有拿到元数据信息,这里会堵塞循环并唤醒 Sender 线程,让其帮忙更新元数据。

1.2 accumulator.append

这个其实我们这篇博客中也讲过:【Kafka从成神到升仙系列 二】生产者如何将消息放入到内存缓冲区

img

具体的细节如上,更多的细节可以参考上面那篇博客

1.3 sender.wakeup

当我们 **首次获取元数据 **或者 当前的 batch 满了 或者 一个新的 batch 创建了,我们都可以去唤醒我们的 Sender,让这个线程执行我们的业务。

  • 首次获取元数据:让 Sender 去更新元数据信息
  • 当前的 batch 满了 或者 一个新的 batch 创建:让 Senderbatch 发送至 Broker

那这个 sender.wakeup 到底执行了什么呢,我们一起来看看其执行流程与执行代码

// 类 = KafkaProducer
sender.wakeup();

// 类 = Sender
public void wakeup() {
    this.client.wakeup();
}

// 类 = NetworkClient
public void wakeup() {
    this.selector.wakeup();
}

// 类 = Selector
public void wakeup() {
    this.nioSelector.wakeup();
}

// 类 = WindowsSelectorImpl
public Selector wakeup() {
    // Java NIO 包里面的操作
}

这里可以看到,整体的调用流程和我们上面的 网络架构 是一样的,也侧面验证了我们上面的 网络架构 是正确的。

不难看出,sender.wakeup() 实际上是唤醒了 Java NIO 里面的 Selector,让其能够接受所有的 keys,从而完成通信的链接与发送。

2. Sender

Sender 线程的东西稍微有点多,但核心只有两个:

  • 更新元数据消息
  • 将消息发送至 Broker

Sender 线程启动时,会启动如下代码:

public void run() {
    while (running) {
        run(time.milliseconds());
    }
}

void run(long now) {
    // 业务代码
}

从代码中不难看出,当我们启动 Sender 线程之后,Sender 线程会不断的轮询调用 run(long now) 该方法,执行其业务。

run(long now) 方法到底做了些什么呢,我们一起来看一下

2.1 accumulator.ready

  • 遍历所有的 TopicPartition,获取每一个 TopicPartitionLeader 节点
  • 弹出每一个 TopicPartition 的第一个 batch,校验该 batch 有没有符合发送的规定
  • 如果该 batch 符合了发送的规定后,将节点放至 readyNodes 中,标识该节点已经可以发送数据了
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    // 准备好的节点
    Set<Node> readyNodes = new HashSet<>();
    // 遍历所有的 TopicPartition
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<RecordBatch> deque = entry.getValue();
		  // 获取当前Partition的leader节点
        Node leader = cluster.leaderFor(part);
        if (leader == null) {
            unknownLeadersExist = true;
        } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
            synchronized (deque) {
                // 弹出每一个 TopicPartition 的第一个batch
                RecordBatch batch = deque.peekFirst();

                if (batch != null) {
                    // bactch 满足 batch.size() 或者 时间达到 linger.ms、
                    boolean full = deque.size() > 1 || batch.records.isFull();
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if (sendable && !backingOff) {
                        // 将当前的节点添加至准备好的队列中
                        readyNodes.add(leader);
                    } else { 
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }
    // 最终返回该节点(这里最重要的还是 Set<String> 也就是准备好的节点集合)
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}

public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) {
    this.readyNodes = readyNodes;
    this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
    this.unknownLeadersExist = unknownLeadersExist;
}

2.2 metadata.requestUpdate

  • 如果发现有 TopicPartition 没有 leader,那么这里就调用 requestUpdate() 方法更新 metadata
// 如果这个地方是 True,说明我们上面有的 TopicPartition 的 leader 节点为 null
if (result.unknownLeadersExist){
    // 更新元数据
    this.metadata.requestUpdate();
}

// 设置标记位为true,后续进行更新
public synchronized int requestUpdate() {
    this.needUpdate = true;
    return this.version;
}

2.3 remove any nodes

  • 遍历所有准备好的节点,利用 NetworkClient 来判断改节点是不是已经准备完毕
  • 如果该节点未准备完毕,则从 readyNodes 中剔除
  • 节点未准备完毕,会初始化链接该节点,便于下一次的消息发送

PS:这里可能会有同学对上面已经准备好了,下面为什么还有准备好的逻辑筛选有疑问

  • 第一步筛选的是 TopicPartition 对应的 batch 已经满足了发送的必要
  • 第二步筛选的是 TopicPartition 对应的 Broker 是否建立了链接,如果不是则初始化链接
// 遍历所有准备好的节点
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
    Node node = iter.next();
    // 利用 NetworkClient 来判断改节点是不是已经准备完毕
    // 如果还未准备好,从准备好的队列中剔除掉
    if (!this.client.ready(node, now)) {
        iter.remove();
        notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
    }
}

// 判断节是否准备好发送
// 如果没有准备好发送,则会与该节点初始化链接,便于下一次的消息发送
public boolean ready(Node node, long now) {
    // 已经准备好
    if (isReady(node, now)){
        return true;
    }
    // 与该节点的初始化
    if (connectionStates.canConnect(node.idString(), now)){
        initiateConnect(node, now);
    }
    return false;
}

2.4 accumulator.drain

  • 遍历所有准备好的 readyNodes,得到该 Broker 上所有的 PartitionInfo 信息,判断该 Partition 是否被处理中,如果没有在处理中则获取其对应的 Deque<RecordBatch>
  • 弹出队列中的 First,判断其是否在 backoff (没有重试过,或者重试了但是间隔已经达到了retryBackoffMs)加上该 batch 的大小 < maxRequestSize,该 batch 符合规定
  • 将该 batch放进 readyRecordBatchList中,最终放进 Map<node.id(), readyRecordBatchList> ,这样我们一个 Broker 可以发送的 batch 就已经整理完毕。
  • 最终我们得到 Map<Integer, List<RecordBatch>>key 代表当前已经连接好的 Brokervalue 代表当前需要发送的 batch
// 生成节点对应的batch消息
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize, now);

public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes,int maxSize,long now) {
    Map<Integer, List<RecordBatch>> batches = new HashMap<>();
    // 遍历所有准备好的node节点
    for (Node node : nodes) {
        int size = 0;
        // 通过node节点获取其所有的Partition
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        // 存储该节点需要发送的Batch
        List<RecordBatch> ready = new ArrayList<>();
        int start = drainIndex = drainIndex % parts.size();
        do {
            // 取Partition
            PartitionInfo part = parts.get(drainIndex);
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            // 当分区没有正在进行的批处理时
            if (!muted.contains(tp)) {
                // 获取该分区的所有的RecordBatch
                Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                if (deque != null) {
                    synchronized (deque) {
                        // 查看队列第一个
                        RecordBatch first = deque.peekFirst();
                        if (first != null) {
                            // 判断其重试与时间
                            boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                            if (!backoff) {
                                // 判断是否超越最大发送限制
                                if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                    break;
                                } else {
                                    // 取出队列第一个
                                    RecordBatch batch = deque.pollFirst();
                                    batch.records.close();
                                    // 当前发送的大小累积
                                    size += batch.records.sizeInBytes();
                                    // 放入准备好的列表中
                                    ready.add(batch);
                                    batch.drainedMs = now;
                                }
                            }
                        }
                    }
                }
            }
            this.drainIndex = (this.drainIndex + 1) % parts.size();
        } while (start != drainIndex);
        // 将节点与准备好的batch列表对应
        batches.put(node.id(), ready);
    }
    // 最终返回:所有准备好的节点与对应的batch列表
    return batches;
}

2.5 createProduceRequests

  • 遍历刚刚我们得到的 Map<node.id(), readyRecordBatchList,组装成客户端请求
List<ClientRequest> requests = createProduceRequests(batches, now);

// 组装客户端请求
private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
    List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
    for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
        requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
    return requests;
}

2.6 client.send

  • 遍历每一个客户端请求并进行发送

PS:这里的发送是通过 KafkaClient 提供的接口,具体由 NetworkClient 实现,我们后面会讲

for (ClientRequest request : requests){
    client.send(request, now);
}

2.7 client.poll

  • 发送消息

PS:这里也同样是通过 KafkaClient 提供的接口,具体由 NetworkClient 实现,我们后面会讲

this.client.poll(pollTimeout, now);

3. NetworkClient

我们的 SenderProducer 发送的消息进行 校验、筛选、组装,让我们的 NetworkClient 进一步的将消息发送

3.1 send

  • 拿到当前客户端请求的 node,校验其是否有权限
  • 如果有权限的话,我们设置下时间并添加到到 inFlightRequests,调用 selector 进行发送(这里提前剧透一下,send 方法虽然叫发送,实际上并没有发送,只是注册了写事件,后面会讲到)

inFlightRequests 的作用:

  • 缓存已经发出去但还没有收到响应的请求,保存对象的具体形式为 Map<NodeId,Deque<Request>>
  • 配置参数 max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未收到响应的请求,超过这个数值之后便不能再往这个连接发送更多的请求了
public void send(ClientRequest request, long now) {
    // 拿到当前客户端请求的node
    String nodeId = request.request().destination();
    // 是否可以发送请求(我们前面已经校验过,一般情况下都能够发送)
    if (!canSendRequest(nodeId))
        throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
    doSend(request, now);
}

private void doSend(ClientRequest request, long now) {
    // 设置时间
    request.setSendTimeMs(now);
    // 将当前请求添加到 inFlightRequests
    this.inFlightRequests.add(request);
    selector.send(request.request());
}

3.2 poll

  • 判断当前需要更新元数据,如果需要则更新元数据
  • 调用 selectorpoll 方法进行 Socket IO 的操作(这里也在后面会讲到)
  • 处理完成之后的操作
    • 处理已经完成的 send
    • 处理从 server 端接收到 Receive
    • 处理连接失败那些连接
    • 处理新建立的那些连接
    • 处理超时的连接
  • 如果回调的话,处理回调的信息
public List<ClientResponse> poll(long timeout, long now) {
    // 判断当前需要更新元数据,如果需要则更新元数据
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    
    // 调用 selector 的 poll 方法进行 Socket IO 的操作
    this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    

    // 处理完成之后的操作
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    // 处理已经完成的 send(不需要 response 的 request,如 send)
   	 handleCompletedSends(responses, updatedNow);
    // 处理从 server 端接收到 Receive(如 Metadata 请求)
    handleCompletedReceives(responses, updatedNow);
    // 处理连接失败那些连接,重新请求 meta
    handleDisconnections(responses, updatedNow);
    // 处理新建立的那些连接(还不能发送请求,比如:还未认证)
    handleConnections();
    // 处理超时的连接
    handleTimedOutRequests(responses, updatedNow);

    // 处理回调的信息
    for (ClientResponse response : responses) {
        if (response.request().hasCallback()) {
            try {
                response.request().callback().onComplete(response);
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }
	
    // 返回响应结果
    return responses;
}

4. Selector

终于来到了我们的最后一步,Kafka 自己封装的 Selector,这个哥们就是真正发送消息的地方

激动的心,颤抖的手,跟着我一起看看 Selector 到底是怎么发送消息的

4.1send

  • 根据当前节点的编号拿到当前客户端的 channel
  • 向当前的 KafkaChannel 注册写事件

写事件触发的时间:当 Scoket缓冲区 有空闲时,触发该事件

从这里可以看出来,我们的 send 方法其实也没有真正的发送消息,只是向 KafkaChannel 注册了 写事件,保障后面 poll 轮旋事件发送的正确性。

public void send(Send send) {
    // 根据当前节点的编号拿到当前客户端的channel
    KafkaChannel channel = channelOrFail(send.destination());
    try {
        // 向当前的 KafkaChannel 注册写事件
        channel.setSend(send);
    } catch (CancelledKeyException e) {
        this.failedSends.add(send.destination());
        close(channel);
    }
}

public void setSend(Send send) {
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

4.2 poll

  • 清除相关记录
  • 获取就绪事件
  • 处理 io 操作
  • 将处理得到的 stagedReceives 添加到 completedReceives 中(NetworkClient处理响应)
  • 关闭老的连接

由于这个方法比较重要,所以我们一个一个的讲,跟着我们的思路来

public void poll(long timeout) throws IOException {
    
    // 清除相关缓存记录
    clear();
    
   // 获取就绪事件
    long startSelect = time.nanoseconds();
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    currentTimeNanos = endSelect;
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
	
    // 处理 io 操作
    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        pollSelectionKeys(this.nioSelector.selectedKeys(), false);
        pollSelectionKeys(immediatelyConnectedKeys, true);
    }
	
    // 将处理得到的 stagedReceives 添加到 completedReceives 中
    addToCompletedReceives();

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    
    // 关闭老的连接
    maybeCloseOldestConnection();
}
4.2.1 clear

clear() 方法是在每次 poll() 执行的第一步,它作用的就是清理上一次 poll 过程产生的部分缓存。

这里的缓存是不是感觉有点熟悉,他就是我们之前在 NetworkClient 的 **处理完成之后的操作 **对应的缓存,忘了的小伙伴可以回去看一下

private void clear() {
    this.completedSends.clear();
    this.completedReceives.clear();
    this.connected.clear();
    this.disconnected.clear();
    this.disconnected.addAll(this.failedSends);
    this.failedSends.clear();
}
4.2.2 select

select(ms) 方法主要通过调用 nioSelectorselect 方法,返回我们就绪事件的数量

这里的 nioSelector 是属于 java.nio.channels.Selector 的,也就是我们 Java NIO 包里面的

  • nioSelector.selectNow非阻塞的,当前操作没有通道准备好立即返回,返回是0
  • nioSelector.select阻塞的,当前没有通道准备好会阻塞住,最长时间为 long ms
private int select(long ms) throws IOException {
    if (ms == 0L) {
        return this.nioSelector.selectNow();
    } else {
        return this.nioSelector.select(ms);
    }
}
4.2.3 pollSelectionKeys
pollSelectionKeys(this.nioSelector.selectedKeys(), false);
pollSelectionKeys(immediatelyConnectedKeys, true);

这部分是 socket IO 的主要部分,发送 Send 及接收 Receive 都是在这里完成的,在 poll() 方法中,这个方法会调用两次:

  • 第一次调用的目的是:处理已经就绪的事件,进行相应的 IO 操作;

  • 第二次调用的目的是:处理新建立的那些连接,添加缓存及传输层(Kafka 又封装了一次,这里后续文章会讲述)的握手与认证。

我们来剖析下 pollSelectionKeys 整理的步骤:

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
    // 拿到当前所有准备好的keys
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while (iterator.hasNext()) {
        // 获取key并删除它,防止重复使用
        SelectionKey key = iterator.next();
        iterator.remove();
        
        // 根据 key 拿到对应的附件 KafkaChannel
        KafkaChannel channel = channel(key);
        sensors.maybeRegisterConnectionMetrics(channel.id());
        lruConnections.put(channel.id(), currentTimeNanos);

        try {
			   // 处理所有已经完成握手(Tcp)的连接(正常或立即)
            if (isImmediatelyConnected || key.isConnectable()) {
                if (channel.finishConnect()) {
                    this.connected.add(channel.id());
                    this.sensors.connectionCreated.record();
                } else
                    continue;
            }

            // 如果通道未准备好,请完成准备
            if (channel.isConnected() && !channel.ready())
                channel.prepare();

            // 如果通道已准备好从任何具有可读数据的连接中读取
            if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                NetworkReceive networkReceive;
                while ((networkReceive = channel.read()) != null)
                    addToStagedReceives(channel, networkReceive);
            }

			  // 如果通道准备好了,就向缓冲区中有空间且我们有数据的任何套接字写入
            if (channel.ready() && key.isWritable()) {
                Send send = channel.write();
                if (send != null) {
                    this.completedSends.add(send);
                    this.sensors.recordBytesSent(channel.id(), send.size());
                }
            }

            // 取消所有失效的套接字
            if (!key.isValid()) {
                close(channel);
                this.disconnected.add(channel.id());
            }

        } 
    }
}
  • 拿到所有准备好的 keys,获取 keys 并删除它(防止重复使用),根据 key 拿到对应的附件 KafkaChannel
  • 处理以下几种情况:
    • 所有已经完成握手的连接
    • 通道未准备好的 key
    • 通道准备好的数据
    • 可写入的 key
    • 取消所有失效的套接字

其中我们不难看出,最重要的当属 处理可写入的 key,我们有必要来详细说说 Send send = channel.write(); 的实现

public Send write() throws IOException {
    Send result = null;
    if (send != null && send(send)) {
        result = send;
        send = null;
    }
    return result;
}

// 是否发送成功
private boolean send(Send send) throws IOException {
    // 写入消息
    send.writeTo(transportLayer);
    // 写完之后取消写事件,防止无限触发写事件
    if (send.completed())
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

    return send.completed();
}

// 通过客户端的channel向服务端发送信息
public long writeTo(GatheringByteChannel channel) throws IOException {
    long written = channel.write(buffers);
    remaining -= written;
    if (channel instanceof TransportLayer)
        pending = ((TransportLayer) channel).hasPendingWrites();

    return written;
}

最终还是调用了我们 Java NIO 中的 channel.write(buffers) 方法完成发送消息。

4.2.4 addToCompletedReceives

client 的时序性而是通过 InFlightRequestsRecordAccumulatormutePartition 来保证的。因此对于 Client 端而言,这里接收到的所有 Receive 都会被放入到 completedReceives 的集合中等待后续处理。

这里面的数据主要我们上面 ``pollSelectionKeys中添加的,然后在这放入到completedReceives,随后被我们NetworkClient` 中被处理

// 处理响应放入到 completedReceives 中
private void addToCompletedReceives() {
    if (!this.stagedReceives.isEmpty()) {
        Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
            KafkaChannel channel = entry.getKey();
            if (!channel.isMute()) {
                Deque<NetworkReceive> deque = entry.getValue();
                NetworkReceive networkReceive = deque.poll();
                this.completedReceives.add(networkReceive);
                this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
                if (deque.isEmpty())
                    iter.remove();
            }
        }
    }
}

五、总结

终于写完了,其实最开始学 kafka 的时候是今年 2 月份,那时候还不懂什么是 IO,看源码的通信基本看不懂

后来,花了几个月的时间学了 操作系统 --> 计算机网络 --> Linux 通信 --> Java NIO --> Netty,现在看 Kafka 的通信就变得通透了。

另外,基本现在所有源码的通信都有 Netty 架构的影子

所以,如果你也想学源码的话,最好是先看看 Netty 的相关知识,学完之后,你会发现,通信架构不过如此。

如果你能看到这,想必已经跟完了整个 Producer 的网络架构部分,你有没有感觉到一个事情:网络架构就是整个生产者运行的全部流程

对的,本来我以为讲网络架构就是网络架构,越写越发觉,这不就是整个生产者发送的全部流程嘛

所以,我们生产者全部的文章就结束了,总体如下:

  • 生产者如何将消息放入到内存缓冲区
  • 你真的了解 Kafka 的元数据嘛
  • 你真的了解 Kafka 的缓存池机制嘛

喜欢的可以点个关注吆,后续会继续更新 kafka 源码系列文章,下一部分应该就是 服务端

我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,Java领域新星创作者,喜欢后端架构和中间件源码。

我们下期再见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/81887.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring 6 源码编译和高效阅读源码技巧分享

一. 前言 Spring Boot 3 RELEASE版本于 2022年11月24日 正式发布&#xff0c;相信已经有不少同学开始准备新版本的学习了&#xff0c;不过目前还不建议在实际项目中做升级&#xff0c;毕竟还有很多框架和中间件没出适配版本。此次Spring Boot里程碑的升级也要求了最低JDK 17 和…

风靡互联网关键词 Web3.0 | 区块链 | 元宇宙……

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Web web是互联网的总称&#xff0c;全称为World Wide Web&#xff0c;缩写WWW &#xff0c;即全球广域网&#xff0c;也称为万维网&#xff0c;它是一种基于超文本和H…

nacos--基础--1.4--理论--原理

nacos–基础–1.4–理论–原理 1、基本架构 2、Nacos 原理 2.1、信息的同步主要的几种方式 push (服务端主动push)pull (客户端的轮询)&#xff0c; 超时时间比较短long pull (超时时间比较长) 2.2、配置中心原理 nacos 配置中心就是采用&#xff1a;客户端 long pull 的方式…

新体制SAR——BiDi SAR

1. 概述 TerraSAR的相控阵天线具备方位向和距离向二维电扫能力&#xff0c;在这一能力的基础上&#xff0c;TerraSAR可以工作在条带模式、ScanSAR模式、滑聚模式和TOPS模式&#xff08;TOPS是实验模式&#xff0c;不是主模式&#xff09;。载荷的PRF可以设计在3-6.5kHz&#xf…

【笔试强训】Day 7

&#x1f308;欢迎来到笔试强训专栏 (꒪ꇴ꒪(꒪ꇴ꒪ )&#x1f423;,我是Scort目前状态&#xff1a;大三非科班啃C中&#x1f30d;博客主页&#xff1a;张小姐的猫~江湖背景快上车&#x1f698;&#xff0c;握好方向盘跟我有一起打天下嘞&#xff01;送给自己的一句鸡汤&#x…

安科瑞AcrelEMS-SW智慧水务能效管理平台解决方案

系统概述 安科瑞电气具备从终端感知、边缘计算到能效管理平台的产品生态体系&#xff0c;AcrelEMS-SW智慧水务能效管理平台通过在污水厂源、网、荷、储、充的各个关键节点安装保护、监测、分析、治理装置&#xff0c;用于监测污水厂能耗总量和能耗强度&#xff0c;重点监测主要…

2022年史上最全Java面试题:数据结构+算法+JVM+线程+finalize+GC

基本概念 操作系统中 heap 和 stack 的区别 什么是基于注解的切面实现 什么是 对象/关系 映射集成模块 什么是 Java 的反射机制 什么是 ACID BS与CS的联系与区别 Cookie 和 Session的区别 fail-fast 与 fail-safe 机制有什么区别 get 和 post请求的区别 Interface 与 …

服务端高并发分布式架构演进之路

1. 概述 本文以淘宝作为例子&#xff0c;介绍从一百个到千万级并发情况下服务端的架构的演进过程。同时列举出每个演进阶段会遇到的相关技术&#xff0c;让大家对架构的演进有一个整体的认知。文章最后汇总了一些架构设计的原则。 特别说明&#xff1a;本文以淘宝为例仅仅是为…

如何入门学python,这是很值得借鉴的学习方法

前言 众所周知&#xff0c;python的应用领域十分广泛&#xff0c;无论是对于专业的程序员还是从事其他工作的人&#xff0c;python这门编程语言都非常值得学习。 但对于零基础的人来说&#xff0c;该如何入门python编程呢&#xff1f; 虽然现在网上有关python编程的教程很多…

PyFlink系列之一:PyFlink安装和PyFlink使用的详细技术

PyFlink系列之一&#xff1a;PyFlink安装和PyFlink使用的详细技术一、下载PyFlink二、创建TableEnvironment三、TableEnvironment API1.Table/SQL 操作2.执行/解释作业3.创建/删除用户自定义函数4.依赖管理5.配置四、Catalog APIs五、Statebackend&#xff0c;Checkpoint 以及重…

小程序图片加载失败binderror方法处理

场景&#xff1a;我们在小程序项目中的一个图片列表&#xff0c;当某些图片加载失败后&#xff0c;直接显示空白&#xff0c;这样用户体验不好&#xff0c;为了解决当图片加载失败&#xff0c;我们给一个默认图片代替&#xff0c;参考官方给的图片加载失败的处理方法&#xff1…

C51单片机开发程序报错 main.c (11) : error C267 : ‘Func‘ : requires ANSI-style prototype

问题 C51单片机开发程序报错 main.c (11) : error C267 : Func : requires ANSI-style prototype详细问题 问题一 问题二 问题三 可能原因一 函数定义声明处&#xff08;.h文件中&#xff09;与主函数中函数&#xff08;函数名/参数类型/返回值类型&#xff09;不一致 解决…

【Vue2+Element ui通用后台】项目搭建和vue-router使用

文章目录介绍创建项目并引入Element-ui按需引入全局引入vue-router安装嵌套路由介绍 通过这个系列文章&#xff0c;我们将学到&#xff1a; 1.项目搭建使用element实现首页布局 2.顶部导航菜单及与左侧导航联动的面包屑实现 3.封装—个ECharts组件 4.封装一个Form表单组件和Ta…

木字楠后台管理系统开发(4):SpringSecurity引入并编写登陆接口

&#x1f3b6; 文章简介&#xff1a;木字楠后台管理系统开发(4)&#xff1a;SpringSecurity引入并编写登陆接口 &#x1f4a1; 创作目的&#xff1a;为了带大家完整的体验木字楠后台管理系统模版的开发流程 ☀️ 今日天气&#xff1a;冬天来啦&#xff01; &#x1f4dd; 每日一…

在ubuntu上部署gitlab详细步骤

一、Ubuntu安装gitlab步骤&#xff1a; 安装依赖 通过快捷键ctrlaltT打开命令行窗口&#xff0c;然后运行下面两行命令 sudo apt update sudo apt-get upgrade sudo apt-get install curl openssh-server ca-certificates postfix 如果这一步遇到下面提示界面&#xff0c…

BUUCTF Web2

[HCTF 2018]admin flask session的伪造 改密码的页面源码有提示&#xff0c;得到秘钥ckj123 自己的session .eJw9kEGLwjAUhP_KkrOHJm09CB5cbKULeaHwanm5iKu1adK4UBVpxP--XRc8zGmGj5l5sN1paC6GLa7DrZmxXXdkiwf7-GYLptCl2uoOcHWXmDu1kYnGLIFNdQdsBYmtkbb3YI89YDXKUHKNTkCg8S9PliJ…

Kotlin 开发Android app(二十二):Retrofit和简单的mvp框架

到这一节&#xff0c;基本上把大部分kotlin和android的开发都已经介绍完成了&#xff0c;通过了前面和这一章的框架结构&#xff0c;基本上能解决开发中的很多问题&#xff0c;并且能够知道android的主要的技术&#xff0c;并进行独立开发了。对于传统的开发的话&#xff0c;还…

一些可以显著提高大型 Java 项目启动速度的尝试

我们线上的业务 jar 包基本上普遍比较庞大&#xff0c;动不动一个 jar 包上百 M&#xff0c;启动时间在分钟级&#xff0c;拖慢了我们在故障时快速扩容的响应。于是做了一些分析&#xff0c;看看 Java 程序启动慢到底慢在哪里&#xff0c;如何去优化&#xff0c;目前的效果是大…

SpringSecurity安全框架

目录 一、Spring Security介绍 1、框架介绍 2、认证与授权实现思路 二、整合Spring Security 1、在common下创建spring_security模块 2、在spring_security引入相关依赖 3.代码结构说明&#xff1a; 4、创建spring security核心配置类 5、创建认证授权相关的工具类 &a…

Roson的Qt之旅 #139 Qt读写Excel

1.使用QAxObject读写Excel QAxObject类提供了一个包裹COM对象的QObject。 QAxObject可以被实例化为一个空的对象&#xff0c;用它应该包裹的COM对象的名字&#xff0c;或者用一个指向代表现有COM对象的IUnknown的指针。如果COM对象实现了IDispatch接口&#xff0c;该对象的属性…