kafka-Producer Sender 源码分析

news2025/1/19 11:33:55

说明

  1. 本文基于 kafka 2.7 编写。
  2. @author blog.jellyfishmix.com / JellyfishMIX - github
  3. LICENSE GPL-2.0

Sender 类属性

public class Sender implements Runnable {

    private final Logger log;

    /**
     * Sender 具体用的是 KafkaClient 接口的实现类 NetworkClient, 为 Sender 提供了网络 IO 的能力
     */
    /* the state of each nodes connection */
    private final KafkaClient client;

    /**
     * RecordAccumulator, 可以获取待发送的 node 和此 node 对应待发送的消息
     */
    /* the record accumulator that batches records */
    private final RecordAccumulator accumulator;

    /**
     * MetaData 接口的实现类, 生产者元数据。存储着分区 leader 所在 node,node 的地址, topicPartition 等情况
     */
    /* the metadata for the client */
    private final ProducerMetadata metadata;

    /**
     * 是否保证消息在服务端的顺序性
     */
    /* the flag indicating whether the producer should guarantee the message order on the broker or not. */
    private final boolean guaranteeMessageOrder;

    /**
     * int 类型。请求的最大字节数,默认值是 1M
     */
    /* the maximum request size to attempt to send to the server */
    private final int maxRequestSize;

    /**
     * producer 的消息发送确认机制
     * ack 有 3 个枚举值,分别是 1, 0 和 -1, 默认值是 -1。ack 枚举值的含义:
     * 1) ack=1, producer 只要收到 leader 副本写入成功的响应就认为推送成功了。
     * 2)ack=0,producer 发送请求了就认为推送成功,不管实际是否推送成功。
     * 3)ack=-1,producer 只有收到 partition 内所有副本写入成功通知才认为推送消息成功了。
     */
    /* the number of acknowledgements to request from the server */
    private final short acks;

    /**
     * 生产者发送失败后的重试次数。默认是 0 次
     */
    /* the number of times to retry a failed request before giving up */
    private final int retries;

    /* the clock instance used for getting the time */
    private final Time time;

    /**
     * Sender 线程是否在运行中
     */
    /* true while the sender thread is still running */
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    private volatile boolean forceClose;

    /* metrics */
    private final SenderMetrics sensors;

    /**
     * producer 发送请求后等待 broker 响应的最大时间
     * 过了最大响应时间如果配置了重试,生产者会再次发送这个请求。重试次数用完仍然请求超时, 则认为是请求失败
     * 默认值 30,000,即 30 秒。
     */
    /* the max time to wait for the server to respond to the request*/
    private final int requestTimeoutMs;

    /**
     * 请求失败重发的间隔等待时间
     * producer 发送请求失败后可能会引起重新发送失败的请求,间隔时间目的是防止重发过快造成服务端压力过大
     * 默认是 100
     */
    /* The max time to wait before retrying a request which has failed */
    private final long retryBackoffMs;

    /**
     * ApiVersions,内部保存了每个 node 支持的 api 版本
     */
    /* current request API versions supported by the known brokers */
    private final ApiVersions apiVersions;

    /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
    private final TransactionManager transactionManager;

    /**
     * 发送中的请求。key: TopicPartition,value: List<ProducerBatch>
     */
    // A per-partition queue of batches ordered by creation time for tracking the in-flight batches
    private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
    
    // ...
}

消息的发送

发送请求分为两步。

  1. 第一步是消息预发送,Sender 从 RecordAccumulator 拉取要发送的消息集合,封装成 ClientRequest,传递给 NetworkClient。
    1. NetworkClient 首先根据 ClientRequest 构造 InFlightRequest,InFlightRequest 表示已发送但还未收到响应的请求。然后根据收到的 ClientRequest 构造 NetworkSend 类对象,放入到 KafkaChannel 的缓存里,消息预发送结束。
  2. 第二步是真正的网络 IO,Sender 会调用 Selector#poll 方法, 把请求真正发送到 broker node。

run 方法

org.apache.kafka.clients.producer.internals.Sender#run

org.apache.kafka.clients.producer.internals.Sender#runOnce

实现了 Runnable 接口的 run 方法。run 方法会一直循环调用 runOnce 方法。

runOnce 方法主要逻辑:

  1. 把消息传递给 KafkaChanel 缓存。
  2. 执行网络 IO。
    /**
     * The main run loop for the sender thread
     */
    @Override
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        // ...
    }

    /**
     * Run a single iteration of sending
     *
     */
    void runOnce() {
		// ...

        long currentTimeMs = time.milliseconds();
        // 把消息传递给 KafkaChanel 缓存
        long pollTimeout = sendProducerData(currentTimeMs);
        // 执行网络 IO
        client.poll(pollTimeout, currentTimeMs);
    }

sendProducerData 方法 – 消息预发送

org.apache.kafka.clients.producer.internals.Sender#sendProducerData

  1. 获取元数据。
  2. 检查已经准备好的节点。
  3. 如果不存在任何 leaderPartition, 就更新元数据。
  4. 检查客户端和各 node 间连接是否正常。
  5. 把按分区聚合的请求集合, 转换为按节点聚合的请求集合(因为网络 IO 是按节点发请求)。
    1. A67DB93A-3B3B-42EB-998D-A3E4B6D5790B.png
    2. 如图所示, 假设有两个 node(两台 broker 实例), 某个 topic 有 6 个 partition,每个 node 分配了 3 个 partition。如果按 partition 发送有 6 个请求,按 node 发送有 2 个请求。按 node 发送可以减小网络 IO 的开销。
  6. 收集过期的 batch, Sender#inflightBatches 发送中的请求集合里过期的 batch, RecordAccumulator#batches 集合里过期的 batch。处理过期 batch。
  7. 预发送消息。
    /**
     * 消息预发送, 把消息传递给 KafkaChanel 缓存
     */
	private long sendProducerData(long now) {
        // 获取元数据
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        // 检查已经准备好的节点
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // 如果不存在任何 leaderPartition, 就更新元数据
        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic, now);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        // remove any nodes we aren't ready to send to
        // 检查客户端和各 node 间连接是否可用
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        // create produce requests
        // 把按分区聚合的请求集合, 转换为按节点聚合的请求集合(因为网络 IO 是按节点发请求)
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        addToInflightBatches(batches);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        accumulator.resetNextBatchExpiryTime();
        // 收集过期的 batch
        // Sender#inflightBatches 发送中的请求集合里过期的 batch
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        // RecordAccumulator#batches 集合里过期的 batch
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);

        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
        // we need to reset the producer id here.
        // 处理过期 batch
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }
        sensors.updateProduceRequestMetrics(batches);

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
        // that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
        // 预发送消息
        sendProduceRequests(batches, now);
        return pollTimeout;
    }

getExpiredInflightBatches 方法 – 收集过期的 batch

org.apache.kafka.clients.producer.internals.Sender#getExpiredInflightBatches

  1. 遍历 inFlightBatches,遍历当前 partition 的 batches 列表。
  2. 判断 batch 是否投递超时。默认消息投递过期时间是 2 min。
    1. 如果 batch 超时且没有 done 的状态,就把 batch 加入到 expiredBatches 集合。
    2. 如果 batch 没有超时,则更新下一个 batch 的超时时间。
	private List<ProducerBatch> getExpiredInflightBatches(long now) {
        List<ProducerBatch> expiredBatches = new ArrayList<>();

        // 遍历 inFlightBatches
        for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt = inFlightBatches.entrySet().iterator(); batchIt.hasNext();) {
            Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next();
            List<ProducerBatch> partitionInFlightBatches = entry.getValue();
            if (partitionInFlightBatches != null) {
                // 遍历当前 partition 的 batches 列表
                Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator();
                while (iter.hasNext()) {
                    ProducerBatch batch = iter.next();
                    // 判断 batch 是否投递超时。默认消息投递过期时间是 2 min
                    if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
                        iter.remove();
                        // expireBatches is called in Sender.sendProducerData, before client.poll.
                        // The !batch.isDone() invariant should always hold. An IllegalStateException
                        // exception will be thrown if the invariant is violated.
                        // 如果 batch 没有 done 的状态,就把 batch 加入到 expiredBatches 集合
                        if (!batch.isDone()) {
                            expiredBatches.add(batch);
                        } else {
                            throw new IllegalStateException(batch.topicPartition + " batch created at " +
                                batch.createdMs + " gets unexpected final state " + batch.finalState());
                        }
                    } else {
                        // 更新下一个 batch 的超时时间
                        accumulator.maybeUpdateNextBatchExpiryTime(batch);
                        break;
                    }
                }
                if (partitionInFlightBatches.isEmpty()) {
                    batchIt.remove();
                }
            }
        }
        return expiredBatches;
    }

failBatch 方法 – 触发回调并改变 future 状态

org.apache.kafka.clients.producer.internals.Sender#failBatch(org.apache.kafka.clients.producer.internals.ProducerBatch, long, long, java.lang.RuntimeException, boolean)

batch.done 调用了里面的回调方法,然后删除 batch 并释放 batch 占用的空间。

	private void failBatch(ProducerBatch batch,
                           long baseOffset,
                           long logAppendTime,
                           RuntimeException exception,
                           boolean adjustSequenceNumbers) {
        if (transactionManager != null) {
            transactionManager.handleFailedBatch(batch, exception, adjustSequenceNumbers);
        }

        this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);

        // batch.done 触发回调并改变 future 状态,然后删除 batch 并释放 batch 占用的空间
        if (batch.done(baseOffset, logAppendTime, exception)) {
            maybeRemoveAndDeallocateBatch(batch);
        }
    }

Sender#sendProduceRequest 方法 – 预发送消息

org.apache.kafka.clients.producer.internals.Sender#sendProduceRequest

预发送消息, 模型转换, 把 ProducerBatch 转换成 ClientRequest, 并把 ClientRequest 传递到 KafkaChannel 的缓存中。

  1. 初始化两个集合, produceRecordsByPartition 用于构建 ProducerRequest, recordsByPartition 用于构建 callback。
  2. 按分区填充 produceRecordsByPartition 和 recordsByPartition 两个集合。
  3. 构建 ProducerRequestBuilder, 构建 producerRequest 的 callback, 封装成 ClientRequest。
  4. 调用 NetworkClient#send 方法预发送消息, 把 ClientRequest 传递给 NetworkClient(传递到 KafkaChannel 的缓存中)。
    /**
     * Create a produce request from the given record batches
     *
     * 预发送消息
     * 模型转换, 把 ProducerBatch 转换成 ClientRequest, 并把 ClientRequest 传递到 KafkaChannel 的缓存中
     */
    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (batches.isEmpty())
            return;

        // 初始化两个集合, produceRecordsByPartition 用于构建 ProducerRequest, recordsByPartition 用于构建 callback
        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

        // find the minimum magic version used when creating the record sets
        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();
        }

        // 按分区填充 produceRecordsByPartition 和 recordsByPartition 两个集合
        for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();

            // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
            // that the producer starts building the batch and the time that we send the request, and we may have
            // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
            // the new message format, but found that the broker didn't support it, so we need to down-convert on the
            // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
            // not all support the same message format version. For example, if a partition migrates from a broker
            // which is supporting the new magic version to one which doesn't, then we will need to convert.
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);
        }

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionalId = transactionManager.transactionalId();
        }
        // 构建 ProducerRequestBuilder
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
        // 构建 producerRequest 的 callback
        RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());

        String nodeId = Integer.toString(destination);
        // 构建 ClientRequest
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                requestTimeoutMs, callback);
        // 预发送消息, 把 ClientRequest 传递给 NetworkClient(传递到 KafkaChannel 的缓存中)
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
    }

处理消息的响应

Sender#handleProduceResponse 方法 – 处理 ProduceRequest 的响应

org.apache.kafka.clients.producer.internals.Sender#handleProduceResponse

  1. 一个 response 是某一个 node 发给 client 的,一个 node 每次向 client 发送的 response 也是批量的,一个 response 有可能包含多个 partition 的响应信息。

  2. Sender 收到 response 后会根据结果按情况处理, 处理方法是 completeBatch()。

  3. Sender 需要触发 callback, callback 在构建 ClientRequest 时填充了。

    /**
     * Handle a produce response
     *
     * 处理 ClientResponse
     */
    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
        RequestHeader requestHeader = response.requestHeader();
        int correlationId = requestHeader.correlationId();
        // 连接失败
        if (response.wasDisconnected()) {
            log.trace("Cancelled request with header {} due to node {} being disconnected",
                requestHeader, response.destination());
            for (ProducerBatch batch : batches.values())
                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
            // 处理版本不匹配
        } else if (response.versionMismatch() != null) {
            log.warn("Cancelled request {} due to a version mismatch with node {}",
                    response, response.destination(), response.versionMismatch());
            for (ProducerBatch batch : batches.values())
                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now);
            // 处理正常 response
        } else {
            log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
            // if we have a response, parse it
            // 存在 response
            if (response.hasResponse()) {
                ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
                for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                    TopicPartition tp = entry.getKey();
                    ProduceResponse.PartitionResponse partResp = entry.getValue();
                    ProducerBatch batch = batches.get(tp);
                    // 调用 completeBatch 方法处理
                    completeBatch(batch, partResp, correlationId, now);
                }
                this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
            } else {
                // this is the acks = 0 case, just complete all requests
                // response ack=0 时的处理
                for (ProducerBatch batch : batches.values()) {
                    completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
                }
            }
        }
    }

Sender#completeBatch 方法 – 处理 response 的状态

org.apache.kafka.clients.producer.internals.Sender#completeBatch(org.apache.kafka.clients.producer.internals.ProducerBatch, org.apache.kafka.common.requests.ProduceResponse.PartitionResponse, long, long)

  1. 过长的单条消息,会把单条消息分成多个 batch 发送。
  2. 如果存在错误,能否再次发送, 可以的话则入队 batch。不能再次发送则进行不同错误情况的处理。
    1. 重复发送, 不用做任何处理。
    2. 授权失败等其他异常,统一调用 failBatch 处理。
  3. 没有错误正常执行回调方法, 并释放 accumulator 的空间。
    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                               long now) {
        Errors error = response.error;
        // 过长的单条消息,会把单条消息分成多个 batch 发送
        if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
                (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
            // If the batch is too large, we split the batch and send the split batches again. We do not decrement
            // the retry attempts in this case.
            log.warn(
                "Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
                correlationId,
                batch.topicPartition,
                this.retries - batch.attempts(),
                error);
            if (transactionManager != null)
                transactionManager.removeInFlightBatch(batch);
            this.accumulator.splitAndReenqueue(batch);
            maybeRemoveAndDeallocateBatch(batch);
            this.sensors.recordBatchSplit();
            // 如果存在错误
        } else if (error != Errors.NONE) {
            // 能否再次发送, 可以的话则入队 batch
            if (canRetry(batch, response, now)) {
                log.warn(
                    "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                    correlationId,
                    batch.topicPartition,
                    this.retries - batch.attempts() - 1,
                    error);
                reenqueueBatch(batch, now);
                // 重复发送, 不用做任何处理
            } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
                // If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
                // the sequence of the current batch, and we haven't retained batch metadata on the broker to return
                // the correct offset and timestamp.
                //
                // The only thing we can do is to return success to the user and not return a valid offset and timestamp.
                completeBatch(batch, response);
            } else {
                final RuntimeException exception;
                // topic 授权失败
                if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                    exception = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
                // cluster 授权失败
                else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
                    exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
                else
                    exception = error.exception(response.errorMessage);
                // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
                // its retries -- if it did, we don't know whether the sequence number was accepted or not, and
                // thus it is not safe to reassign the sequence.
                // 授权失败等其他异常,统一调用 failBatch 处理
                failBatch(batch, response, exception, batch.attempts() < this.retries);
            }
            // metadata 无效或错误, 更新 metadata
            if (error.exception() instanceof InvalidMetadataException) {
                if (error.exception() instanceof UnknownTopicOrPartitionException) {
                    log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                            "topic-partition may not exist or the user may not have Describe access to it",
                        batch.topicPartition);
                } else {
                    log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
                            "to request metadata update now", batch.topicPartition, error.exception(response.errorMessage).toString());
                }
                metadata.requestUpdate();
            }
        } else {
            // 正常执行回调方法
            completeBatch(batch, response);
        }

        // Unmute the completed partition.
        if (guaranteeMessageOrder)
            this.accumulator.unmutePartition(batch.topicPartition);
    }

	private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
        if (transactionManager != null) {
            transactionManager.handleCompletedBatch(batch, response);
        }
        // 执行回调方法,并释放 accumulator 的空间
        if (batch.done(response.baseOffset, response.logAppendTime, null)) {
            maybeRemoveAndDeallocateBatch(batch);
        }
    }

Sender#canRetry 方法 – response 存在错误判断是否能再次发送

org.apache.kafka.clients.producer.internals.Sender#canRetry

response 存在错误判断是否能再次发送, 需要满足以下条件:

  1. 没有到投递的超时时间。
  2. batch 重试次数没有超过设定的次数。
  3. batch 状态未结束。
  4. 如果被事务管理器管理, 则调用事务管理器判断是否能重试。
    private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
        // 没有到投递的超时时间
        return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
            // batch 重试次数没有超过设定的次数
            batch.attempts() < this.retries &&
            // batch 状态未结束
            !batch.isDone() &&
            // 如果被事务管理器管理, 则调用事务管理器判断是否能重试
            (transactionManager == null ?
                    response.error.exception() instanceof RetriableException :
                    transactionManager.canRetry(response, batch));
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/532447.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法】经典背包问题

作者&#xff1a;指针不指南吗 专栏&#xff1a;算法篇 &#x1f43e;或许会很慢&#xff0c;但是不可以停下来&#x1f43e; 文章目录 引入Dp1.01背包2.完全背包3.多重背包4.分组背包 acwing 背包问题——学习笔记 01背包、完全背包、多重背包、分组背包 引入Dp Dp问题&#…

[SUCTF 2018]GetShell

有个文件上传&#xff0c;给了部分源码 if($contentsfile_get_contents($_FILES["file"]["tmp_name"])){$datasubstr($contents,5);foreach ($black_char as $b) {if (stripos($data, $b) ! false){die("illegal char");}} } 可以知道有…

黑白照片如何变彩色?黑白照变彩色的秘诀分享。​

黑白照片如何变彩色&#xff1f;将黑白照片变成彩色可以给照片增添生动的视觉效果和真实感&#xff0c;使得人物、场景更加具体形象&#xff0c;让人们更容易与之产生共鸣和情感联系&#xff0c;此外&#xff0c;通过给黑白老照片添加颜色&#xff0c;还可以打破时间和空间的限…

社区分享|JumpServer引领我走向开源天地

编者注&#xff1a;以下内容基于山东青岛的JumpServer社区用户JonnyJ的社区分享整理而成。 “接触到JumpServer之后&#xff0c;我从一个开源受益者逐渐成长为开源的贡献者。其实我们每个人都可以成为开源贡献者&#xff0c;不局限于软件产品&#xff0c;哪怕只是你的一段共享…

K8S集群+kubeadm+flannel+docker+harbor实例

目录 第一章.环境准备 1.1.部署架构图 1.2.节点要求 1.3.部署软件 1.4.修改主机名 1.5.所有节点修改hosts文件 1.6.关闭防火墙规则&#xff0c;关闭selinux&#xff0c;关闭swap交换 1.7.调整内核参数 第二章.部署K8S集群 2.1.所有节点安装docker 2.2.所有节点安装ku…

KVM管理-快照

KVM管理-快照 创建快照 为虚拟机vm1创建一个快照 [rootmyserver ~]# virsh snapshot-create-as vm1 vm1.snap Domain snapshot vm1.snap created快照只能使用qcow2创建&#xff0c;raw格式一般无法创建快照 查看磁盘镜像信息 [rootmyserver ~]# qemu-img info /var/lib/lib…

方案设计——食物测温仪方案

食物测温仪&#xff0c;在食物烹饪时&#xff0c;温度和时间至关重要&#xff0c;所以食物测温仪孕育而生&#xff0c;当用户使用时只需将食物测温仪的探头插入食物中&#xff0c;即刻能得到当前食物温度数据&#xff0c;不必用经验判断。做为一款食物测温仪&#xff0c;运用场…

Spring Boot :统一功能处理

在用户登陆验证的业务中&#xff0c;如果只是使用Spring AOP的话&#xff0c;session无法获取的&#xff0c;还有各种参数&#xff08;request等&#xff09;很难获取&#xff0c;这时候Spring拦截器就发挥了重大的作用了。 1.Spring 拦截器 创建拦截器分俩步&#xff1a;1.创…

项目集效益管理

项目集效益管理是定义、创建、最大化和交付项目集所提供的效益的绩效领域。 本章内容包括&#xff1a; 1 效益识别 2 效益分析和规划 3 效益交付 4 效益移交 5 效益维持 项目集效益管理包括一系列对项目集的成功极为重要的要素。项目集效益管理包括阐明项目集的 计划效益和预期…

AMBER分子动力学模拟之结果分析(最低能量结果)-- HIV蛋白酶-抑制剂复合物(3)

AMBER分子动力学模拟之结果分析(最低能量结果)-- HIV蛋白酶-抑制剂复合物(3) 在analysis目录下 解析.out文件 下载process_mdout.perl 脚本 perl process_mdout.perl ../md/md0.out ../md/md1.out ../md/md2.out # 可以不使用md0.out # 或者 $AMBERHOME/bin/process_md…

ShardingSphere 5.3 系列ShardingSphere-Proxy保姆级教程 | Spring Cloud 50

一、前言 通过以下系列章节&#xff1a; Spring Boot集成ShardingSphere实现数据分片&#xff08;一&#xff09; | Spring Cloud 40 Spring Boot集成ShardingSphere实现数据分片&#xff08;二&#xff09; | Spring Cloud 41 Spring Boot集成ShardingSphere实现数据分片&…

Linux:centos:组账户管理 》》添加组,用户加入组(设置组密码),删除组,查询账户信息,查询登录用户信息

/etc/group # 组信息文件 /etc/gshadow # 组密码文件&#xff08;不常用&#xff09; groupadd &#xff08;属性&#xff09; 组名 # 新建组 groupdel &#xff08;属性&#xff09; 组名 # 删除组 gpasswd # 可以…

Cartographer源码阅读---番外篇: Submap封装与维护

Cartographer中Submap(子图)没有被直接的调用进行维护, 而是针对2D和3D场景分别派生出子类Submap2D和Submap3D, 进行调用. 以2D为例, 为了方便维护, 又把Submap2D封装成了ActiveSubmaps2D进行维护, 其维护方式类似与滑窗, 也是只维护最近的一些数据. 1. Submap类 /*** brief …

Python学习之生成带logo背景图的二维码(静态和动态图)

前言 二维码简称 QR Code&#xff08;Quick Response Code&#xff09;&#xff0c;学名为快速响应矩阵码&#xff0c;是二维条码的一种&#xff0c;由日本的 Denso Wave 公司于 1994 年发明。现随着智能手机的普及&#xff0c;已广泛应用于平常生活中&#xff0c;例如商品信息…

探索三维世界【4】:Three.js dat.gui gsap 的使用

探索三维世界【4】&#xff1a;Three.js & dat.gui & gsap 的使用 1、dat.gui是什么&#xff1f;2、gsap的介绍与使用2.1、前提准备工作&#xff08;绘制一个BoxGeometry&#xff09;2.2、安装引入gsap动画库2.3、使用gsap动画2.4、配合事件使用 3、使用dat.gui3.1、添…

生物信息学知识点

生物信息学知识点 1. 序列比对&#xff1a;1.1 基本概念&#xff1a;1.2 全局比对和局部比对&#xff1a;1.3 空位罚分的改进&#xff1a;1.4 同源性和相似性&#xff1a;1.5 相似性矩阵&#xff1a;1.5.1 PAM&#xff1a;1.5.2 BLOSUM&#xff1a; 2. BLAST算法&#xff1a;2.…

React | React的过渡动画

✨ 个人主页&#xff1a;CoderHing &#x1f5a5;️ React.js专栏&#xff1a;React的过渡动画 &#x1f64b;‍♂️ 个人简介&#xff1a;一个不甘平庸的平凡人&#x1f36c; &#x1f4ab; 系列专栏&#xff1a;吊打面试官系列 16天学会Vue 11天学会React Node专栏 &#…

Grafana之Clock Panel使用(06)

Clock Panel可以用来显示当前(各国)时间或用于倒计时,并支持每秒更新一次。 Clock plugin for Grafana | Grafana Labs Clock Panel也是Grafana Labs提供,但并非Native,需自行安装,安装命令如下: # grafana-cli plugins install grafana-clock-panel # systemctl …

前端面试题 — — vue篇

前端面试笔记之vue篇 前言1.数据双向绑定原理⭐⭐⭐2. VUE生命周期⭐⭐⭐3.组件之间如何传值⭐⭐⭐4.路由之间如何传参⭐5.谈一谈VUEX⭐⭐6.如何解决VUEX页面刷新数据丢失问题&#xff1f;⭐⭐7.computed和watch的区别&#xff1f;⭐⭐⭐8.如何封装axios&#xff1f;⭐9.Route和…