Kafka由浅入深(6) Sender线程执行源码解析

news2024/11/28 13:33:19

一、KafkaProducer消息流程图

1.1 KafkaProducer 消息架构图

 

1.2 KafkaProducer 消息架构分为两部分

第一部分是KafkaProducer主线程

主要逻辑提供消息拦截器、序列化器、和分区器的默认实现和对外自定义扩展功能,已经将消息追加并缓存到累加器RecordAccumulator中,为Sender线程提供批量数据队列和发送分区的准备工作。

备注:

1、对于主线程的原理分析,可以看之前的文章

Kafka由浅入深(2) 生产者主线程工作原理(上)_架构源启的博客-CSDN博客_kafka 生产者线程

Kafka由浅入深(4)生产者主线程工作原理(下)_架构源启的博客-CSDN博客

2、详细解说RecordAccumulator,可以看这边文章

Kafka由浅入深(5)RecordAccumulator的工作原理_架构源启的博客-CSDN博客

第二部分是Sender线程

当ProducerBatch已经满了,或者是新创建的ProducerBatch,则唤醒Sender线程执行发送任务,Sender从RecordAccumulator缓存中取出发送的数据,通过NetwordClient将数据发送到Kafka服务器中。Sender线程详细的原理和实现,也就是本篇文章详细分析

1.3 Sender线程执行流程图

 

二、Sender线程源码分析

Sender 是一个 Kafka 生产者消息执行的网络线程,无限循环运行在后台将 ProduceRequests 发送到 Kafka 集群中。

2.1 Sender线程的定义和创建

 

2.1.1 Sender线程定义

Sender继承了Runnable接口,而Sender线程是在KafkaProducer 的构造器中进行实例化和线程启动。当KafkaProducer被创建时,也会在构造器中实例化以 “kafka-producer-network-thread |[clientId]”命名的线程,而这个线程作为守护线程,伴随着KafkaProducer整个生命周期。

从下面的源码可以看出,Sender线程的run()方法中,核心发送入口方法为runOnce()。每一次执行runOnce()方法,Sender 将从RecordAccumulator记录累加器中获取1~max.request.size 个 ProducerBatch的消息数据,并最终将 ProduceRequests 发送到对应活动的 Kafka broker服务器中。

public class Sender implements Runnable {
		/**
     * The main run loop for the sender thread
     */
    @Override
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

       ....省略....
    }
}

 

2.1.2 KafkaProducer中实例化Sender线程

public class KafkaProducer<K, V> implements Producer<K, V> {
		...省略

		// KafkaProducer构造器中,对Sender线程进行创建和线程启动
		KafkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  ProducerMetadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors<K, V> interceptors,
                  Time time) {
				...省略
				//  Sender线程的创建和启动
				this.sender = newSender(logContext, kafkaClient, this.metadata);
				String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
				this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
				// Sender线程启动
				this.ioThread.start();
	      ...省略
	}

// Sender线程的创建
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
        // 最大InFlightConnection请求链接数
        int maxInflightRequests = producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
        // 请求超时时间
        int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        // 创建ChannelBuilder,提供服务Channel
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
        ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
        // kafka数据传感器
        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
        // 创建KafkaClient客户端,默认为NetworkClient
        KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                        this.metrics, time, "producer", channelBuilder, logContext),
                metadata,
                clientId,
                maxInflightRequests,
                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                requestTimeoutMs,
                producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
                producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
                time,
                true,
                apiVersions,
                throttleTimeSensor,
                logContext);

        short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
        // 创建Sender线程
        return new Sender(logContext,
                client,
                metadata,
                this.accumulator,
                maxInflightRequests == 1,
                producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                acks,
                producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
                metricsRegistry.senderMetrics,
                time,
                requestTimeoutMs,
                producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                this.transactionManager,
                apiVersions);
    }

}

 

2.1.3 Sender实例化参数说明

参数和实现功能说明

InFlightRequest: 飞行途中的请求,Sender线程发送到Kafka之前会保存数据到InFlightRequest中,主要作用是缓存已经发送但未接收到响应的请求,保存的数据模型为Map<NodeId,Deque<Request>>。

KafkaProducer 在创建Sender时会设置如下属性:

1、max.in.flight.requests.per.connection(maxInflightRequests): 设置每一个InFlightRequest链接最多缓存多少个未收到响应的请求,默认值为5。如果缓存的数据到达了最大值,则消息发送处于阻塞状态。如果需要保证消息顺序发送,可以将配置设置为1,但会影响到消息发送的执行效率,降低消息发送的吞吐量

2、request.timeout.ms(requestTimeoutMs):

KafkaProducer 等待请求响应的最长时间,默认值为 30000 ms。 请求超时之后可以选择进行重试。 注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。

3、max.request.size

这个参数用来限制生产者客户端能够发送的消息的最大值,默认值为 1048576 B,即 1 MB。 一般情况下,这个默认值就可以满足大多数的应用场景了。 不建议盲目地增大这个参数的配置值,尤其是在对 Kafka 整体脉络没有足够把控的时候。 因为这个参数还涉及一些其他参数的联动,比如 broker 端的 message.max.bytes 参数,如果配置错误可能会引起一些不必要的一场。 比如讲 broker 端端 message.max.bytes 参数配置为 10, 而 max.request.size 参数配置为 20,那么当我们发送一条消息大小为 15 的消息时,生产者客户端就会报出异常: org.apache.kafka.common.errors.RecordTooLargeException: The reqeust included a message larger than the max message size the server will accept.

4、retries 和 retry.backoff.ms retries 参数用来配置生产者重试的次数,默认值为 0,即发生异常的时候不进行任何的重试动作。 消息在从生产者发出道成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、Leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此通过内部重试来恢复而不是一味的将异常抛给生产者的应用程序。 如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。 不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不行了。 重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。 在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。 Kafka 可以保证同一个分区中的消息时有序的。 如果生产者按照一定的顺序发送消息,那么这些消息也会顺序的写入分区,进而消费者也可以按照同样的顺序消费它们。

5、acks : 消息应答机制,默认值即为 1

ack =1 —— 生产者发送消息后,只要分区的leader副本写入消息成功,则生产者客户端就会收到服务器的成功响应,视为消息发送成功。

ack 为1 的时候,保证客消息发送的吞吐量,但是在消息可靠性上不能完全保证。会出现如下的情况:

        a、如果消息无法写入leader副本,则生产者会收到错误响应,无法写入的情况通常情况为:leader副本崩溃、leader副本的重新选举期间。所以为了避免消息发送丢失的情况,生产者可以进行消息发送的失败重试。

        b、在极端情况下可能会出现,消息写入成功写入leader副本,并且生产者客户端收到了服务器的成功响应。而此时leader副本崩溃,但是其他follower副本未及时更新到leader副本的数据,在新选举出的leader副本中则服务查询到这一条数据的记录,也就出现了消息丢失的情况。

acks = 0—— 生成者发送消息后,不用等待任何服务端的成功与否的响应。如果消息写入到Kafak服务端出现异常,从而可能导致消息丢失的情况。设置为0 可以达到消息发送的最大吞吐量,但是也可能出现消息丢失的情况。

acks = -1 或 acks = all ——生产者发送后,需要等待 leader副本将消息同步到所有follower副本,则返回消息成功,从而保证消息的可靠性。

2.2 Sender的线程的唤起执行wakeup

首先,我们需要知道Sender线程的执行是在哪里阻塞的。Kafka客户端的底层网络通信是通过NIO实现,Kafka 的Selecotor类中,我们可以看到java.nio.channels.Selector nioSelector,对Kafak服务器的通信核心是通过nioSelector 来实现。

当调用nioSelector.select()方法时,调用者线程会进入阻塞状态,直到有就绪的Channel才会返回。而java.nio.channels.Selector 也提供了从阻塞状态唤起的方式,则是可以通过其他的线程调用nioSelector.wakeup进行阻塞线程的唤醒,则select()方法也会立即返回。

public class Selector implements Selectable, AutoCloseable {		
		
		private final java.nio.channels.Selector nioSelector;   

   /**
     * Interrupt the nioSelector if it is blocked waiting to do I/O.
     */
    @Override
    public void wakeup() {
        this.nioSelector.wakeup();
    }

}

wake()被调用的地方:

KafkaProducer 的doSend(),initTransactions(), sendOffsetsToTransaction(), commitTransaction(), abortTransaction(), waitOnMetadata()和 flush()方法

Sender类的 initiateClose()方法

三、Sender线程核心流程源码分析

3.1 Sender线程run()方法逻辑

Sender线程的状态通过两个变量控制,分别是running和forceClose

running: Sender线程是否正在运行中

forceClose: 是否强制关闭正在发送和待发送的消息,

强制关闭状态,忽略未发送和正在发送中的消息

分三种情况

1、状态一 running状态:

sender线程处于running状态,则一直循环执行runOnce()

2、状态二 非强制关闭状态:

     a、停止接受新的请求,如果事务管理器、累加器或等待确认的过程中可能仍有请求,请等到这些完成。

     b、如果任何提交或中止未通过事务管理器的队列,则中止事务

3、强制关闭状态

     a、将所有未完成的事务请求和batchs置为失败,并唤醒正在等待的线程
     b、我们需要使所有不完整的事务请求和批处理失败,并唤醒等待未来的线程。

 

 

Sender.run()源码分析

// sender线程运行状态
private volatile boolean running;

// 强制关闭状态,忽略未发送和正在发送中的消息
private volatile boolean forceClose;

@Override
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    // 状态一 running状态: sender线程处于running状态,则一直循环执行runOnce()
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

    // 状态二 非强制关闭状态
    //停止接受新的请求,如果事务管理器、累加器或等待确认的过程中可能仍有请求,请等到这些完成。
    while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    
    //如果任何提交或中止未通过事务管理器的队列,则中止事务
    while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
        if (!transactionManager.isCompleting()) {
            log.info("Aborting incomplete transaction due to shutdown");
            transactionManager.beginAbort();
        }
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    // 状态三 强制关闭状态
    if (forceClose) {       
        // 将所有未完成的事务请求和batchs置为失败,并唤醒正在等待的线程
        //我们需要使所有不完整的事务请求和批处理失败,并唤醒等待未来的线程。
        if (transactionManager != null) {
            log.debug("Aborting incomplete transactional requests due to forced shutdown");
            transactionManager.close();
        }
        log.debug("Aborting incomplete batches due to forced shutdown");
        this.accumulator.abortIncompleteBatches();
    }
    try {
        // KafkaClient Kafka客户端关闭
        this.client.close();
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }

    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

 

3.2 Sender线程runOnce()方法逻辑

从run()方法的逻辑可以看出,runOnce()方法是消息发送的入口

a、当事务存在的时候,进行事务处理逻辑,详细的事务处理,将在后续技术文章中进行讲解

b、 调用sendProducerData()方法,则是Kafka消息发送的核心处理逻辑

void runOnce() {
    
    if (transactionManager != null) {
        try {
						// 事务的发送处理逻辑
						//尝试解析未解析的序列。如果所有正在执行的请求都已完成,并且某些分区仍无法解析,则如果可能,请中断epoch,或者转换为致命错误
            transactionManager.maybeResolveSequences();

            // 如果事务管理器处于失败状态,则不要继续发送
            if (transactionManager.hasFatalError()) {
                RuntimeException lastError = transactionManager.lastError();
                if (lastError != null)
                    maybeAbortBatches(lastError);
                client.poll(retryBackoffMs, time.milliseconds());
                return;
            }

            //检查我们是否需要新的producerId。如果是这样,我们将在下面发送InitProducerId请求
            transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
            //如果发送或轮询事务请求,或者FindCoordinator请求已排队,则返回true
            if (maybeSendAndPollTransactionalRequest()) {
                return;
            }
        } catch (AuthenticationException e) {
            // This is already logged as error, but propagated here to perform any clean ups.
            log.trace("Authentication exception while processing transactional request", e);
            transactionManager.authenticationFailed(e);
        }
    }

    long currentTimeMs = time.milliseconds();

    // 发送ProducerData核心逻辑
    long pollTimeout = sendProducerData(currentTimeMs);
    client.poll(pollTimeout, currentTimeMs);
}

 

3.3 Sender线程sendProducerData()的核心逻辑梳理

虚线框内的逻辑,正式sendProducerData()的实现逻辑

 

private long sendProducerData(long now) {
	// 获取当前运行并且未阻塞的Kafka集群元数据信息
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    // 获取准备发送数据的分区(node节点)集合
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    // 如果存在分区的leader未知,则强制更新元数据
    if (!result.unknownLeaderTopics.isEmpty()) {
        //未知leader的主题集包含leader选举待定的topic以及可能已过期的topic。
        // 再次将topic添加到ProducerMetadata元数据中,以确保包含该topic并请求元数据更新,因为这些消息要需要发送到对应主题。
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic, now);

        log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
            result.unknownLeaderTopics);
        this.metadata.requestUpdate();
    }

    // 移除还没有准备好的node节点
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            //仅更新延迟统计的readyTimeMs,以便每次批处理就绪时都向前移动(那么readyTimeM和drainTimeMs之间的差异将表示数据等待节点的时间)。
            this.accumulator.updateNodeLatencyStats(node.id(), now, false);
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
        } else {
            
            //更新readyTimeMs和drainTimeMs,这将“重置”节点延迟。
            this.accumulator.updateNodeLatencyStats(node.id(), now, true);
        }
    }

  
    // 创建produce请求,加入到Map<TopicPartition, List<ProducerBatch>> inFlightBatches中
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    addToInflightBatches(batches);

    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        // 将所有排水分区静音
        for (List<ProducerBatch> batchList : batches.values()) {
            for (ProducerBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    // 重置消息累加器中下一批次的过期时间
    accumulator.resetNextBatchExpiryTime();
    // 获取已达到超时待发送的in-flight batches 数据
    List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
    // 获取累积器中放置时间过长且需要过期的批次列表。
    List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
    expiredBatches.addAll(expiredInflightBatches);

   
    //如果之前已将过期批次发送给代理,请重置生产者id。同时更新过期批次的指标。请参阅@TransactionState.resetIdempotentProducerId的文档,以了解为什么需要在此处重置生产者id。
    if (!expiredBatches.isEmpty())
        log.trace("Expired {} batches in accumulator", expiredBatches.size());
    for (ProducerBatch expiredBatch : expiredBatches) {
        String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
            + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
        failBatch(expiredBatch, new TimeoutException(errorMessage), false);
        if (transactionManager != null && expiredBatch.inRetry()) {
            // This ensures that no new batches are drained until the current in flight batches are fully resolved.
            //这可确保在当前in-flight batches 完全解决之前,不会排出新批次。
            transactionManager.markSequenceUnresolved(expiredBatch);
        }
    }
    sensors.updateProduceRequestMetrics(batches);

    
    //如果我们有任何节点准备发送+具有可发送数据,则轮询0超时,这样可以立即循环并尝试发送更多数据。
    // 否则,超时将是下一批到期时间与检查数据可用性的延迟时间之间的较小值。
    // 请注意,由于延迟、后退等原因,节点可能有尚未发送的数据。这特别不包括具有尚未准备好发送的可发送数据的节点,因为它们会导致繁忙的循环。
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
    pollTimeout = Math.max(pollTimeout, 0);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        
        //如果某些分区已经准备好发送,则选择时间将为0;
        // 否则,如果某个分区已经积累了一些数据,但还没有准备好,则选择时间将是现在与其延迟到期时间之间的时间差;
        // 否则,选择时间将是现在与元数据到期时间之间的时间差;

        pollTimeout = 0;
    }
    // 消息请求发送
    sendProduceRequests(batches, now);
    return pollTimeout;
}

3.3.1 获取当前运行并且未阻塞的Kafka集群元数据信息

Cluster cluster = metadata.fetch();

 

3.3.2 从RecordAccumulator 消息累加器中,获取到准备发送数据的分区集合

RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

 

3.3.3 如果准备发送的分区集合中,有分区的leader出现未知情况。

未知leader的主题集:包含leader选举待定的topic以及可能已过期的topic。 再次将topic添加到ProducerMetadata元数据中,以确保包含该topic并请求元数据更新,因为这些消息要需要发送到对应主题。

通过调用ProducerMetadata 元数据的requestUpdate()方法打上needFullUpdate标识,标识Kafka集群需要进行强制更新。

// 如果存在分区的leader未知,则强制更新元数据
if (!result.unknownLeaderTopics.isEmpty()) {
    
    // 再次将topic添加到ProducerMetadata元数据中,以确保包含该topic并请求元数据更新,因为这些消息要需要发送到对应主题。
    for (String topic : result.unknownLeaderTopics)
        this.metadata.add(topic, now);

    log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
        result.unknownLeaderTopics);
    
    this.metadata.requestUpdate();
}

 

3.3.4 移除未准备好的node节点

  • 检查当前 KafkaProducer 是否与目标 Node 建立了网络连接,如果没有建立,则尝试初始化网络连接,如果初始化失败,则直接返回 false,表示此时不适合向该 Node 发送请求。
  • 其次就是检查当前已发送但未响应的请求是否已经达到上限,要是有很多这种请求存在,可能是 broker 宕机了或是 broker 处理能力不足,此时也不适合继续发送请求。
  • 除了进行网络方面的检查之外,还会检查 kafka 元数据是否需要更新,如果需要更新的话,也不能发送请求。毕竟使用过期的或是错误的元数据来发送数据,请求也不会发送成功

不适合发送请求的 Node 节点会从 readyNodes 集合中删除。

// RecordAccumulator.ReadyCheckResult 获取到所有待发送的节点
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
    Node node = iter.next();
		// 通过KafkaClient进行检查,node节点是否已经准备好
    if (!this.client.ready(node, now)) {
        
        //仅更新延迟统计的readyTimeMs,以便每次批处理就绪时都向前移动(那么readyTimeM和drainTimeMs之间的差异将表示数据等待节点的时间)。
        this.accumulator.updateNodeLatencyStats(node.id(), now, false);
        iter.remove();
        notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
    } else {
        // Update both readyTimeMs and drainTimeMs, this would "reset" the node
        // latency.
        //更新readyTimeMs和drainTimeMs,这将“重置”节点延迟。
        this.accumulator.updateNodeLatencyStats(node.id(), now, true);
    }
}

 

3.3.5 创建produce请求,将ProducerBatch加入到InFlightBatches中

a、调用记录累加器RecordAccumlator.drain(),获取当前可以集群下的所有数据,并将它们整理成一个批次列表

b、将消息将入到Map<TopicPartition, List<ProducerBatch>> inFlightBatches 的队列中,记录正在发送的这一批数据

c、如果需要进行顺序消息发送,则将通过数据将ProducerBatch的主题信息记录到RecordAccumlator 的Set<TopicPartition> muted


//获当前可以集群下的所有数据,并将它们整理成一个批次列表
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 加入到InFlightBatch队列中
addToInflightBatches(batches);

if (guaranteeMessageOrder) {
    // 保证消息顺序发送
    // 将所有批次的主题加入到Set<TopicPartition> muted中
    for (List<ProducerBatch> batchList : batches.values()) {
        for (ProducerBatch batch : batchList)
            this.accumulator.mutePartition(batch.topicPartition);
    }
}

3.3.6 获取已经超时待发送的消息数据

a、重置消息累加器中下一批次的过期时间

b、获取已达到超时待发送的in-flight batches 批次数据

c、获取累积器中放置时间过长且需要过期的批次数据。

d、将超时待发送的批次数据进行汇总

// 重置消息累加器中下一批次的过期时间
accumulator.resetNextBatchExpiryTime();
// 获取已达到超时待发送的in-flight batches 数据
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
// 获取累积器中放置时间过长且需要过期的批次列表。
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

 

3.3.7 处理已超时的消息批次,通知该批消息发送失败

//如果之前已将过期批次发送给代理,请重置生产者id。同时更新过期批次的指标。请参阅@TransactionState.resetIdempotentProducerId的文档,以了解为什么需要在此处重置生产者id。
if (!expiredBatches.isEmpty())
    log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
    String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
        + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
    failBatch(expiredBatch, new TimeoutException(errorMessage), false);
    if (transactionManager != null && expiredBatch.inRetry()) {
        // This ensures that no new batches are drained until the current in flight batches are fully resolved.
        //这可确保在当前in-flight batches 完全解决之前,不会排出新批次。
        transactionManager.markSequenceUnresolved(expiredBatch);
    }
}
// 收集统计指标
sensors.updateProduceRequestMetrics(batches);

 

3.3.8 构建请求,发送数据

//如果我们有任何节点准备发送+具有可发送数据,则轮询0超时,这样可以立即循环并尝试发送更多数据。
// 否则,超时将是下一批到期时间与检查数据可用性的延迟时间之间的较小值。
// 请注意,由于延迟、后退等原因,节点可能有尚未发送的数据。这特别不包括具有尚未准备好发送的可发送数据的节点,因为它们会导致繁忙的循环。
// 设置下一次的发送延时
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
    log.trace("Nodes with data ready to send: {}", result.readyNodes);
    
    //如果某些分区已经准备好发送,则选择时间将为0;
    // 否则,如果某个分区已经积累了一些数据,但还没有准备好,则选择时间将是现在与其延迟到期时间之间的时间差;
    // 否则,选择时间将是现在与元数据到期时间之间的时间差;

    pollTimeout = 0;
}

sendProduceRequests(batches, now);

 

3.4 Sender线程sendProduceRequest()的核心逻辑梳理

/**
* 从给定的记录批次创建生产请求
*/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
  if (batches.isEmpty())
      return;

  final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

  // 找到创建记录集时使用的最小魔法版本
  byte minUsedMagic = apiVersions.maxUsableProduceMagic();
  for (ProducerBatch batch : batches) {
      if (batch.magic() < minUsedMagic)
          minUsedMagic = batch.magic();
  }
  // ProduceRequestData 
	ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
  
	for (ProducerBatch batch : batches) {
      TopicPartition tp = batch.topicPartition;
      MemoryRecords records = batch.records();

      //如有必要,向下转换到使用的最小魔法的消息格式。
      // 通常情况下生产者开始构建批次以及我们发送请求的时间选择了基于过时元数据的消息格式。
      // 在最坏的情况下,如果选择使用新的消息格式,但发现代理不支持它,所系需要在客户端发送之前进行消息格式的向下转换。
      // 这旨在处理围绕集群升级的边缘情况,其中代理可能并非所有消息都支持相同的消息格式版本。
      // 例如,如果分区从代理迁移它支持新的魔法版本,但不支持,就需要转换。
      if (!records.hasMatchingMagic(minUsedMagic))
          records = batch.records().downConvert(minUsedMagic, 0, time).records();
      ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
      if (tpData == null) {
          tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
          tpd.add(tpData);
      }
      tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
              .setIndex(tp.partition())
              .setRecords(records));
      recordsByPartition.put(tp, batch);
  }

  String transactionalId = null;
  if (transactionManager != null && transactionManager.isTransactional()) {
      transactionalId = transactionManager.transactionalId();
  }

  ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
          new ProduceRequestData()
                  .setAcks(acks)
                  .setTimeoutMs(timeout)
                  .setTransactionalId(transactionalId)
                  .setTopicData(tpd));
  RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());

  // 创建clientRequest请求
  String nodeId = Integer.toString(destination);
  ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
          requestTimeoutMs, callback);
	// 通过KafkaClient的实现类NetworkClient 调用发送
  client.send(clientRequest, now);
  log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

说明:源码分析基于Kafak 3.3版本 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/3663.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NeurIPS 2022 | 涨点神器!利用图像辅助三维点云分析的训练新范式

原文链接&#xff1a;https://www.techbeat.net/article-info?id4212 作者&#xff1a;颜旭 点云作为一种基本的三维表征形式&#xff0c;活跃在自动驾驶、机器人感知等多种任务上。尽管三维点云分析在近年来取得了良好的发展&#xff0c;但由于点云其本身往往是无序、无纹理以…

OpenCV笔记--人脸识别算法Eigenfaces和Fisherfaces

目录 1--前言 2--处理ORL数据集 3--Eigenfaces复现过程 4--Fisherfaces复现过程 5--分析 1--前言 ①SYSU模式识别课程作业 ②配置&#xff1a;基于Windows11、OpenCV4.5.5、VSCode、CMake&#xff08;参考OpenCV配置方式&#xff09; ③原理及源码介绍&#xff1a;Face…

云栖探馆!云小宝首秀遇上老司机小龙,猜猜谁赢了?

为啥人人都喜欢秋天呢&#xff1f;因为我们打喷嚏都是“爱秋”啊&#xff5e; 为啥大家会来云栖大会呢&#xff1f;因为云栖大会让我们在一“栖”啊&#xff5e; 2022云栖大会龙蜥峰会&#xff0c;小龙来啦~&#xff01; ​ 去年&#xff0c;小龙作为萌新来到云栖大会&#…

归并排序.

归并排序介绍 归并排序(MERGE-SORT)是利用归并的思想实现的排序方法,该算法采用金典的分治(divide-and-conquer)策略(分治法将问题分(divide)成一些小的问题然后递归求解,而治(conquer)的阶段则将分的阶段得到的各个答案"修补"在一起,即分而治之) 归并排序的思想示…

为什么程序员会秃头?盘点程序员糟心的几大因素

程序员与脱发似乎存在某种必然的逻辑连接&#xff0c;程序员秃头已经成为大家心中的思维定势。 一提到程序员&#xff0c;难免会想起来java&#xff0c; c&#xff0c; python以及无休止的debug环节&#xff0c;不难想象经常会有程序员跳楼自杀的情况。因为实在是生存不易&…

L5W1作业1 手把手实现循环神经网络

欢迎来到课程5的第一个作业&#xff01;在此作业中&#xff0c;你将使用numpy实现你的第一个循环神经网络。 循环神经网络&#xff08;RNN&#xff09;在解决自然语言处理和其他序列任务上非常有效&#xff0c;因为它们具有“记忆”&#xff0c;可以一次读取一个输入 x⟨t⟩x^…

C语言百日刷题第六天

C语言百日刷题第六天51.鸡兔同笼问题52.输出所有形如aabb的完全平方数53.3n1问题54.输出100~999的所有水仙花数55.韩信点兵56.倒三角形57.求子序列的和58.分数化小数59.开灯问题60.蛇形填数51.鸡兔同笼问题 分析&#xff1a;小学生数学问题。设鸡为a个&#xff0c;兔为b个&…

大数据技术基础实验十:Hive实验——新建Hive表

大数据技术基础实验十&#xff1a;Hive实验——新建Hive表 文章目录大数据技术基础实验十&#xff1a;Hive实验——新建Hive表一、前言二、实验目的三、实验要求四、实验原理五、实验步骤1、启动Hive2、创建表3、显示表4、显示表列5、更改表6、删除表或者列六、最后我想说一、前…

vue学习笔记——简单入门总结(三)

文章目录1.Vue的理解&#xff1a;1.1.mvvm模型&#xff1a;1.2.vue2的数据代理&#xff1a;1.3.vue2的生命周期&#xff1a;1.4.vue中的render函数&#xff1a;1.5. mixin混入&#xff1a;2.Vue组件间通信&#xff1a;2.0.props&#xff1a;2.1.全局事件总线&#xff1a;2.2.消…

WeMos Mini ESP32-S2FN4R2介绍

WeMos Mini ESP32-S2FN4R2介绍LOLIN S2 Mini V1.0.0 ESP32-S2 4MB FLASH 2MB PSRAM WIFI开发板 &#x1f33c;功能介绍 基于 ESP32-S2FN4R2TYPE-C USB27个数字输入/输出引脚&#xff0c;所有引脚都支持中断/pwm/I2C/单线ADC、DAC、I2C、SPI、UART、USB OTG &#x1f4cd; 详细…

棋盘(马蹄集)

棋盘 难度&#xff1a;白银 0时间限制&#xff1a;1秒 巴占用内存&#xff1a;64M 求一个N*N棋盘中的方块总数。 格式 输入格式&#xff1a;输入整型N 输出格式&#xff1a;输出整型 CSDN盛溪的猫 #include<bits/stdc.h> using namespace std; int main(){ long n,sum1;…

Mybatis要点总结

一、了解orm框架 1.什么是ORM框架&#xff1a;对象关系映射&#xff08;Object Relational Mapping&#xff0c;简称ORM&#xff09;&#xff0c;该模式是为了解决面向对象与关系数据库互补匹配的现象的技术&#xff1b;orm框架是连接数据库的桥梁&#xff0c;主要提供了人持久…

大数据技术之Zookeeper总结Ⅰ

zookeeper总结目录1. Zookeeper 入门1.1 zookeeper概述1.2 Zookeeper特点1.3 ZooKeeper 数据模型的结构2. Zookeeper 本地安装2.1 本地模式安装2.2 配置参数解读3. Zookeeper 集群操作3.1 集群操作3.2 Zookeeper 集群启动停止脚本3.3 客户端命令行语法1. Zookeeper 入门 1.1 z…

数据结构七:七大排序

目录 1&#xff1a;排序的概率 2.插入排序 2.1&#xff1a;直接插入排序-----稳定 2.1.1&#xff1a;基本思想 2.2&#xff1a;希尔排序 2.2.1&#xff1a;概念&#xff1a; 3.选择排序 3.1&#xff1a;选择排序 3.1.1&#xff1a;概念 3.2:堆排序 4.交换排序 4.1&…

微信小程序自动化测试之路

1. 前言 在每次发布新版本之前、都需要回归核心功能、已确保上线后小程序也能按照预期运行. 目前这部分回归工作是由测试同事手工去验证测试用例、按照每周一版本的迭代节奏、回归就花了测试挺多时间的. 最近前端工作比较轻松、故在思考能否把这部分重复的工作交给程序自动来进…

【EhCache: 一款Java的进程内缓存框架】EhCache 是什么、代码实战 Demo

文章目录1 EhCache 是什么2 代码实战 DemoTestEH.javaehcache.xml1 EhCache 是什么 Ehcache 是一种开源的、基于标准的缓存&#xff0c;可提高性能、卸载数据库并简化可扩展性。它是最广泛使用的基于 Java 的缓存&#xff0c;因为它健壮、经过验证、功能齐全&#xff0c;并且与…

python 基于PHP+MySQL的装修网站的设计与实现

至今为止,越来越多企业公司都已经实现了线上推广,提高了企业的运营工作效率,为装修公司设计一款强大的智能装修网,集企业信息展示和信息管理于一体,结合企业与外部的在线交流功能,主要用于大力宣传企业服务、企业产品等信息,让更多的人了解,提高企业的知名度 1&#xff1a;系统…

Spring Data Neo4j(1.对象映射)

文章目录一、Spring Data Neo4j二、注释NodeIdVersion(乐观锁)PropertyRelationship一、Spring Data Neo4j Spring Data Neo4j或简称SDN是下一代Spring Data模块&#xff0c;由Neo4j&#xff0c;Inc.创建和维护。与VMware的Spring Data Team密切合作。 它支持所有官方支持的Ne…

<Linux> shell运行原理及Linux权限的理解

文章目录一、shell 命令及其运行原理shell外壳shell运行原理二、Linux 权限的概念1.用户分类2.切换用户3.用户提权三、Linux 权限管理1.文件访问者的分类&#xff08;人&#xff09;2.文件类型和访问权限&#xff08;事物属性&#xff09;四、文件权限值的表示方法1.字符表示法…

Pycharm 配置远程SSH服务器环境(切换不同虚拟环境)

1.首先在Xshell上通过conda创建新的虚拟环境 2.此时在 /home/y210101004/.conda/envs下多了刚刚创建的环境的文件夹 3.路径说明&#xff01; &#xff08;注意&#xff01;&#xff09;该环境的编译器python3.6就在.../jiayan_test/bin里面 &#xff08;注意&#xff01;&…