【Kafka源码走读】Admin接口的客户端与服务端的连接流程

news2024/10/5 13:09:14

注:本文对应的kafka的源码的版本是trunk分支。写这篇文章的主要目的是当作自己阅读源码之后的笔记,写的有点凌乱,还望大佬们海涵,多谢!

最近在写一个Web版的kafka客户端工具,然后查看Kafka官网,发现想要与Server端建立连接,只需要执行

	Admin.create(Properties props)

方法即可,但其内部是如何工作的,不得而知。鉴于此,该死的好奇心又萌动了起来,那我们今天就来看看,当执行Admin.create(Properties props)方法之后,client是如何与Server端建立连接的。

首先,我们看下Admin.create(Properties props)方法的实现:

    static Admin create(Properties props) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), null);
    }

Admin是一个接口,create()是其静态方法,该方法内部又调用的是KafkaAdminClient.createInternal()方法,createInternal()源码如下:

    static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {
        return createInternal(config, timeoutProcessorFactory, null);
    }

上述代码又调用了KafkaAdminClient类的另一个createInternal()方法

    static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory,
                                           HostResolver hostResolver) {
        Metrics metrics = null;
        NetworkClient networkClient = null;
        Time time = Time.SYSTEM;
        String clientId = generateClientId(config);
        ChannelBuilder channelBuilder = null;
        Selector selector = null;
        ApiVersions apiVersions = new ApiVersions();
        LogContext logContext = createLogContext(clientId);

        try {
            // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
            // simplifies communication with older brokers)
            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
                config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                    config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
                    config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
            metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
            List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                MetricsReporter.class,
                Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                .tags(metricTags);
            JmxReporter jmxReporter = new JmxReporter();
            jmxReporter.configure(config.originals());
            reporters.add(jmxReporter);
            MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                    config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
            metrics = new Metrics(metricConfig, reporters, time, metricsContext);
            String metricGrpPrefix = "admin-client";
            channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
            selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                    metrics, time, metricGrpPrefix, channelBuilder, logContext);
            networkClient = new NetworkClient(
                metadataManager.updater(),
                null,
                selector,
                clientId,
                1,
                config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
                config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
                config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
                (int) TimeUnit.HOURS.toMillis(1),
                config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
                config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
                time,
                true,
                apiVersions,
                null,
                logContext,
                (hostResolver == null) ? new DefaultHostResolver() : hostResolver);
            return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
                timeoutProcessorFactory, logContext);
        } catch (Throwable exc) {
            closeQuietly(metrics, "Metrics");
            closeQuietly(networkClient, "NetworkClient");
            closeQuietly(selector, "Selector");
            closeQuietly(channelBuilder, "ChannelBuilder");
            throw new KafkaException("Failed to create new KafkaAdminClient", exc);
        }
    }

前面的都是构造参数,关注以下这行代码:

	return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
                timeoutProcessorFactory, logContext);

KafkaAdminClient的构造方法如下:

    private KafkaAdminClient(AdminClientConfig config,
                             String clientId,
                             Time time,
                             AdminMetadataManager metadataManager,
                             Metrics metrics,
                             KafkaClient client,
                             TimeoutProcessorFactory timeoutProcessorFactory,
                             LogContext logContext) {
        this.clientId = clientId;
        this.log = logContext.logger(KafkaAdminClient.class);
        this.logContext = logContext;
        this.requestTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
        this.defaultApiTimeoutMs = configureDefaultApiTimeoutMs(config);
        this.time = time;
        this.metadataManager = metadataManager;
        this.metrics = metrics;
        this.client = client;
        this.runnable = new AdminClientRunnable();
        String threadName = NETWORK_THREAD_PREFIX + " | " + clientId;
        this.thread = new KafkaThread(threadName, runnable, true);
        this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
            new TimeoutProcessorFactory() : timeoutProcessorFactory;
        this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
        this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
        log.debug("Kafka admin client initialized");
        thread.start();
    }

上面的代码,大部分都是传递参数,但里面有个细节,不能忽略。最后一行代码是thread.start(),这里启动了一个线程,根据thread对象往前找,看看该对象是如何初始化的:

    this.thread = new KafkaThread(threadName, runnable, true);

由此可知,threadKafkaThread构造的对象,KafkaThread继承于Thread类。同时,上述代码中KafkaThread的构造方法中的第二个参数是runnable,该参数的定义如下:

    this.runnable = new AdminClientRunnable();

既然runnable是类AdminClientRunnable构造的对象,那么,当thread.start()代码执行之后,类AdminClientRunnablerun()方法就开始执行了,我们看下run()方法的源码:

    @Override
    public void run() {
        log.debug("Thread starting");
        try {
            processRequests();
        } finally {
            closing = true;
            AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);

            int numTimedOut = 0;
            TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
            synchronized (this) {
                numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited.");
            }
            numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited.");
            numTimedOut += timeoutCallsToSend(timeoutProcessor);
            numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
                    "The AdminClient thread has exited.");
            if (numTimedOut > 0) {
                log.info("Timed out {} remaining operation(s) during close.", numTimedOut);
            }
            closeQuietly(client, "KafkaClient");
            closeQuietly(metrics, "Metrics");
            log.debug("Exiting AdminClientRunnable thread.");
        }
    }

在上述代码中,只需关注processRequests()方法,源码如下:

    private void processRequests() {
        long now = time.milliseconds();
        while (true) {
            // Copy newCalls into pendingCalls.
            drainNewCalls();
    
            // Check if the AdminClient thread should shut down.
            long curHardShutdownTimeMs = hardShutdownTimeMs.get();
            if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs))
                break;
    
            // Handle timeouts.
            TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now);
            timeoutPendingCalls(timeoutProcessor);
            timeoutCallsToSend(timeoutProcessor);
            timeoutCallsInFlight(timeoutProcessor);
    
            long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
            if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
                pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);
            }
    
            // Choose nodes for our pending calls.
            pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now));
            long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
            if (metadataFetchDelayMs == 0) {
                metadataManager.transitionToUpdatePending(now);
                Call metadataCall = makeMetadataCall(now);
                // Create a new metadata fetch call and add it to the end of pendingCalls.
                // Assign a node for just the new call (we handled the other pending nodes above).
    
                if (!maybeDrainPendingCall(metadataCall, now))
                    pendingCalls.add(metadataCall);
            }
            pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));
    
            if (metadataFetchDelayMs > 0) {
                pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
            }
    
            // Ensure that we use a small poll timeout if there are pending calls which need to be sent
            if (!pendingCalls.isEmpty())
                pollTimeout = Math.min(pollTimeout, retryBackoffMs);
    
            // Wait for network responses.
            log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout);
            List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now);
            log.trace("KafkaClient#poll retrieved {} response(s)", responses.size());
    
            // unassign calls to disconnected nodes
            unassignUnsentCalls(client::connectionFailed);
    
            // Update the current time and handle the latest responses.
            now = time.milliseconds();
            handleResponses(now, responses);
        }
    }

额,上面的代码,此时并未发现连接Server的过程,同时,我发现上述代码通过poll()方法在获取Server端的消息:

    List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now);

按照我当时看这段代码的思路,由于这部分代码没有连接的过程,所以,我也就不进入poll()方法了,从方法名上看,它里面也应该没有连接的过程,所以转而回头看下client对象是如何定义的,在KafkaAdminClient.createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory, HostResolver hostResolver)方法中,定义如下:

    networkClient = new NetworkClient(
                    metadataManager.updater(),
                    null,
                    selector,
                    clientId,
                    1,
                    config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                    config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
                    config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
                    (int) TimeUnit.HOURS.toMillis(1),
                    config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
                    config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
                    time,
                    true,
                    apiVersions,
                    null,
                    logContext,
                    (hostResolver == null) ? new DefaultHostResolver() : hostResolver);

再看下NetworkClient的构造函数:

    public NetworkClient(MetadataUpdater metadataUpdater,
                         Metadata metadata,
                         Selectable selector,
                         String clientId,
                         int maxInFlightRequestsPerConnection,
                         long reconnectBackoffMs,
                         long reconnectBackoffMax,
                         int socketSendBuffer,
                         int socketReceiveBuffer,
                         int defaultRequestTimeoutMs,
                         long connectionSetupTimeoutMs,
                         long connectionSetupTimeoutMaxMs,
                         Time time,
                         boolean discoverBrokerVersions,
                         ApiVersions apiVersions,
                         Sensor throttleTimeSensor,
                         LogContext logContext,
                         HostResolver hostResolver) {
        /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
         * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
         * super constructor is invoked.
         */
        if (metadataUpdater == null) {
            if (metadata == null)
                throw new IllegalArgumentException("`metadata` must not be null");
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        } else {
            this.metadataUpdater = metadataUpdater;
        }
        this.selector = selector;
        this.clientId = clientId;
        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
        this.connectionStates = new ClusterConnectionStates(
                reconnectBackoffMs, reconnectBackoffMax,
                connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext, hostResolver);
        this.socketSendBuffer = socketSendBuffer;
        this.socketReceiveBuffer = socketReceiveBuffer;
        this.correlation = 0;
        this.randOffset = new Random();
        this.defaultRequestTimeoutMs = defaultRequestTimeoutMs;
        this.reconnectBackoffMs = reconnectBackoffMs;
        this.time = time;
        this.discoverBrokerVersions = discoverBrokerVersions;
        this.apiVersions = apiVersions;
        this.throttleTimeSensor = throttleTimeSensor;
        this.log = logContext.logger(NetworkClient.class);
        this.state = new AtomicReference<>(State.ACTIVE);
    }

事与愿违,有点尴尬,从NetworkClient的构造方法来看,也不涉及连接Server端的代码,那连接是在什么时候发生的呢?我想到快速了解NetworkClient类中都有哪些方法,以寻找是否有建立连接的方法。可喜的是,我找到了initiateConnect(Node node, long now)方法,见下图:

这个方法像是连接Server的,然后顺着这个方法,去查看是谁在调用它的,如下图所示:

调用栈显示,有两个方法调用了initiateConnect()方法,他们分别是ready()maybeUpdate()方法,然后分别对ready()maybeUpdate()方法又进行反向跟踪,看他们又分别被谁调用,中间的反向调用过程在这里就省略了,感兴趣的可以自己去研究下。

我们先从maybeUpdate()方法着手吧,通过该方法,最后可追踪到maybeUpdate()方法最终被poll()所调用。嗯?是不是前面我们也跟踪到poll()方法了。难道就是在调用poll方法之后,才实现连接Server的过程?下面是poll()方法的实现:

    /**
     * Do actual reads and writes to sockets.
     *
     * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
     *                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
     *                metadata timeout
     * @param now The current time in milliseconds
     * @return The list of responses received
     */
    @Override
    public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();

        if (!abortedSends.isEmpty()) {
            // If there are aborted sends because of unsupported version exceptions or disconnects,
            // handle them immediately without waiting for Selector#poll.
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }

        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutConnections(responses, updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);

        return responses;
    }

由上述代码可知,maybeUpdate()方法是被metadataUpdater对象所调用,接下来我们就需要了解metadataUpdater对象属于哪个类。
回到NetworkClient的构造方法可看到这段代码:

    if (metadataUpdater == null) {
        if (metadata == null)
            throw new IllegalArgumentException("`metadata` must not be null");
        this.metadataUpdater = new DefaultMetadataUpdater(metadata);
    } else {
        this.metadataUpdater = metadataUpdater;
    }

注意这里,如果metadataUpdater的值为null,则metadataUpdater = new DefaultMetadataUpdater(metadata),也就是说metadataUpdater对象属于DefaultMetadataUpdater类;

如果metadataUpdater的值不为null,则其值保持不变,也就是说,这个值是由调用者传入的。
现在我们需要跟踪调用者传入该值时是否为null,则需要回到KafkaAdminClient.createInternal()方法,下面对代码进行了精简,仅关注重点:

    AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
        config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
        config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
        
        .......部分代码省略......
        
    networkClient = new NetworkClient(
        metadataManager.updater(),
        null,
        selector,
        clientId,
        1,
        config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
        config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
        config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
        config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
        (int) TimeUnit.HOURS.toMillis(1),
        config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
        config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
        time,
        true,
        apiVersions,
        null,
        logContext,
        (hostResolver == null) ? new DefaultHostResolver() : hostResolver);

由上述代码可知,在传入NetworkClient的构造方法时,metadataManager.updater()=AdminMetadataManager.updater(),而AdminMetadataManager的源码如下:

    public AdminMetadataManager(LogContext logContext, long refreshBackoffMs, long metadataExpireMs) {
        this.log = logContext.logger(AdminMetadataManager.class);
        this.refreshBackoffMs = refreshBackoffMs;
        this.metadataExpireMs = metadataExpireMs;
        this.updater = new AdminMetadataUpdater();
    }

    public AdminMetadataUpdater updater() {
        return updater;
    }

由此可知,传入NetworkClient的构造方法时的metadataUpdater对象并不为null,且该对象属于AdminMetadataUpdater类。

好了,到这里我们已经把metadataUpdater的值搞清楚了,其值并不为null。但如果通过IDE的代码默认跟踪方式,会将metadataUpdater的值定位为DefaultMetadataUpdater类,如果是这样,那会有什么影响呢?

前面我们提到,NetworkClient.poll()方法会调用maybeUpdate()方法,即如下这行代码:

    long metadataTimeout = metadataUpdater.maybeUpdate(now);

metadataUpdater对象如果为DefaultMetadataUpdater类,则调用上述maybeUpdate(now)方法时,会执行连接Server的过程,源码如下:

    @Override
    public long maybeUpdate(long now) {
        // should we update our metadata?
        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
        long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;

        long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
        if (metadataTimeout > 0) {
            return metadataTimeout;
        }

        // Beware that the behavior of this method and the computation of timeouts for poll() are
        // highly dependent on the behavior of leastLoadedNode.
        Node node = leastLoadedNode(now);
        if (node == null) {
            log.debug("Give up sending metadata request since no node is available");
            return reconnectBackoffMs;
        }

        return maybeUpdate(now, node);
    }

    # maybeUpdate(now)再调用maybeUpdate(now, node)方法,代码如下:
    
    private long maybeUpdate(long now, Node node) {
        String nodeConnectionId = node.idString();
    
        if (canSendRequest(nodeConnectionId, now)) {
        Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);
        MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
        log.debug("Sending metadata request {} to node {}", metadataRequest, node);
        sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
        inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
        return defaultRequestTimeoutMs;
        }
    
        // If there's any connection establishment underway, wait until it completes. This prevents
        // the client from unnecessarily connecting to additional nodes while a previous connection
        // attempt has not been completed.
        if (isAnyNodeConnecting()) {
        // Strictly the timeout we should return here is "connect timeout", but as we don't
        // have such application level configuration, using reconnect backoff instead.
        return reconnectBackoffMs;
        }
    
        if (connectionStates.canConnect(nodeConnectionId, now)) {
        // We don't have a connection to this node right now, make one
        log.debug("Initialize connection to node {} for sending metadata request", node);
        
        # 这里就是连接Server端的入口了
        initiateConnect(node, now);
        return reconnectBackoffMs;
        }
    
        // connected, but can't send more OR connecting
        // In either case, we just need to wait for a network event to let us know the selected
        // connection might be usable again.
        return Long.MAX_VALUE;
    }

注意上述代码的中文注释部分,initiateConnect(node, now)方法就是连接Server端的入口,该方法的实现如下:

    /**
     * Initiate a connection to the given node
     * @param node the node to connect to
     * @param now current time in epoch milliseconds
     */
    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            connectionStates.connecting(nodeConnectionId, now, node.host());
            InetAddress address = connectionStates.currentAddress(nodeConnectionId);
            log.debug("Initiating connection to node {} using address {}", node, address);
            selector.connect(nodeConnectionId,
                    new InetSocketAddress(address, node.port()),
                    this.socketSendBuffer,
                    this.socketReceiveBuffer);
        } catch (IOException e) {
            log.warn("Error connecting to node {}", node, e);
            // Attempt failed, we'll try again after the backoff
            connectionStates.disconnected(nodeConnectionId, now);
            // Notify metadata updater of the connection failure
            metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
        }
    }

所以,metadataUpdater对象如果为DefaultMetadataUpdater类,就会在调用poll()方法时,初始化连接Server的过程。但前面已知,metadataUpdater对象属于AdminMetadataUpdater类,他又是在哪里与Server进行连接的呢?

我们再回到之前已知悉的内容,有两个方法调用了initiateConnect()方法,他们分别是ready()maybeUpdate()方法。通过上面的跟踪,目前可以排除maybeUpdate()方法了。接下来,通过ready()方法,我们再反向跟踪一下,哪些地方都调用了ready()方法。

通过层层筛选,发现KafkaAdminClient.sendEligibleCalls()方法调用了ready()方法,如下图所示:

通过sendEligibleCalls()方法又反向查找是谁在调用该方法,如下图所示:

由图可知,是KafkaAdminClient.processRequests()方法调用了sendEligibleCalls()方法,而processRequests()方法正是我们前面跟踪代码时,发现无法继续跟踪的地方。精简之后的代码如下:

    private void processRequests(){
        long now=time.milliseconds();
        while(true){
            // Copy newCalls into pendingCalls.
            drainNewCalls();
    
            ......部分代码省略......
    
            pollTimeout=Math.min(pollTimeout,sendEligibleCalls(now));
    
            ......部分代码省略......
    
            // Wait for network responses.
            log.trace("Entering KafkaClient#poll(timeout={})",pollTimeout);
            List<ClientResponse> responses=client.poll(Math.max(0L,pollTimeout),now);
            log.trace("KafkaClient#poll retrieved {} response(s)",responses.size());
            
            ......部分代码省略......
            
        }
    }

由上述代码可知,与Server端的连接是在poll()方法执行之前,隐藏在pollTimeout=Math.min(pollTimeout,sendEligibleCalls(now));代码中。如果想要验证自己的理解是否正确,则可以通过调试源码,增加断点来验证,这里就略过了。

现在回过头来,就会发现,为什么我之前读到这个processRequests()方法时,没有发现这个方法呢?因为没有注意到一些细节,所以忽略了这个方法,误以为连接发生在其他地方。

当然,这可能也和我的惯性思维有关,总是通过类名和方法名来猜测这个方法的大概意图,然后当找不到流程的时候,就通过反向查找调用栈的方式去梳理执行流程,也算是殊途同归吧。

最后,用一张时序图来总结下上面的内容:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/779532.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python基础教程:sklearn机器学习入门

1. sklearn基础介绍 sklearn&#xff08;全名为scikit-learn&#xff09;是一个建立在NumPy、SciPy和matplotlib等科学计算库的基础上&#xff0c;用于机器学习的Python开源库。它提供了丰富的工具和函数&#xff0c;用于处理各种机器学习任务&#xff0c;包括分类、回归、聚类…

线性表的顺序存储和链式存储—Python数据结构(二)

线性表 定义&#xff1a; 线性表的定义是描述其逻辑结构&#xff0c;而通常会在线性表上进行的查找、插入、删除等操作。 线性表作为一种基本的数据结构类型&#xff0c;在计算机存储器中映象(表示)一般有两种形式&#xff0c;一种是顺序映象&#xff0c;一种是链式映象。 线…

接口漏洞-WebService-wsdl+SOAP-Swagger+HTTP-WebPack

什么是接口&#xff1f; 接口就是位于复杂系统之上并且能简化你的任务&#xff0c;它就像一个中间人让你不需要了解详细的所有细节。像谷歌搜索系统&#xff0c;它提供了搜索接口&#xff0c;简化了你的搜索任务。再像用户登录页面&#xff0c;我们只需要调用我们的登录接口&am…

Jupyter 安装和使用

安装Jupyter 使用pip工具进行安装&#xff0c;在命令提示窗口输入命令如下&#xff1a; pip install jupyter notebook 使用Jupyter 在命令提示窗口输入如下命令&#xff0c;启动浏览器页面&#xff1a; jupyter notebook 修改jupyter的工作路径/存储路径 由于默认工作路…

去括号问题(C++处理)

继http://t.csdn.cn/kIcUT后的文章 题目描述 当老师不容易&#xff0c;尤其是当小学的老师更难:现在的小朋友做作业喜欢滥用括号。 虽然不影响计算结果&#xff0c;但不够美观&#xff0c;容易出错&#xff0c;而且可读性差。但又不能一棒子打死&#xff0c;也许他们就是将来的…

【Linux从入门到精通】进程的控制(进程退出+进程等待)

本篇文章主要讲述的是进程的退出和进程等待。希望本篇文章的内容会对你有所帮助。 文章目录 一、fork创建子进程 1、1 在创建子进程中操作系统的作用 1、2 写时拷贝 二、进程终止 2、1 常见的进程退出 2、2 进程的退出码 2、2、1 运行结果正确实例 2、2、2 运行结果不正确实例…

防御第二天-防火墙演示实验

1.上课思维导图 2.防火墙演示实验 防火墙FW1&#xff1a;原用户名&#xff1a;admin 原密码&#xff1a;Admin123 配地址&#xff1a;<USG6000V1>sy [USG6000V1]int g0/0/0 [USG6000V1-GigabitEthernet0/0/0]ip add 192.168.18.2 24 打开所有权限[USG6000V1-Gig…

C++(一):基本数据类型

基本数据类型 基本内置类型定义变量type field value;type field(value);type field{value};type field {value}; 数学常量及函数静态类型转换 static_cast格式化字符串std::stringstreamstd::string引入三方库 fmt/core.h 字符运算auto 关键字枚举类型数据类型定义别名判断是…

牛客网刷题之链表(一)

链表 NB1 删除链表峰值NB2 牛群排列去重NB3 调整牛群顺序NB4 牛群的重新分组NB5 牛群的重新排列NB6 合并两群能量值&#xff08;合并有序单链表&#xff09;NB7 牛群的能量值&#xff08;单链表相加&#xff09; 以下题全部出自牛客网。 题目题目考察的知识点链表&#xff1a; …

Element Plus 日期选择器

计算开始日期到结束日期的总天数 结构 <el-form-item label"计划开始时间" required prop"StartTime"><el-date-pickertype"date"v-model"ruleForm.StartTime":disabled-date"StartTime"placeholder"计划开始…

pytorch工具——使用pytorch构建一个分类器

目录 分类器任务和数据介绍训练分类器的步骤在GPU上训练模型 分类器任务和数据介绍 训练分类器的步骤 #1 import torch import torchvision import torchvision.transforms as transformstransformtransforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5,0.5,0.…

SpringCloud学习路线(8)—— Docker

一、Docker的开始 &#xff08;一&#xff09;项目部署问题&#xff1a; 依赖关系复杂&#xff0c;容易出现兼容性问题开发、测试、生产环境有差异 &#xff08;二&#xff09;Docker如何解决问题&#xff1f; 1、依赖兼容问题 &#xff08;1&#xff09;将应用的Libs&…

垃圾回收之三色标记法(Tri-color Marking)

关于垃圾回收算法&#xff0c;基本就是那么几种&#xff1a;标记-清除、标记-复制、标记-整理。在此基础上可以增加分代&#xff08;新生代/老年代&#xff09;&#xff0c;每代采取不同的回收算法&#xff0c;以提高整体的分配和回收效率。 无论使用哪种算法&#xff0c;标记…

(已解决)RuntimeError: Java gateway process exited before sending its port number

今天用Pycharm远程使用pysaprk解释器时&#xff0c;跑代码出现了这个错误&#xff1a; RuntimeError: Java gateway process exited before sending its port number 找了好多博客都没解决问题&#xff0c;有说重装spark的&#xff0c;有说本地配Java_home的&#xff0c;后面我…

[C语言刷题]杨氏矩阵、返回型参数

本文包含知识点 杨氏矩阵极其解法函数return多个值的四种方法 题目&#xff1a; 杨氏矩阵 有一个数字矩阵&#xff0c;矩阵的每行从左到右是递增的&#xff0c;矩阵从上到下是递增的&#xff0c;请编写程序在这样的矩阵中查找某个数字是否存在。 要求&#xff1a;时间复杂度小于…

js 在浏览器窗口关闭后还可以不中断网络请求

有个需求&#xff0c;我们需要在用户发送数据过程中&#xff0c;如果用户关闭了网页(包括整个浏览器关闭)&#xff0c;不要中断数据传递 目前XMLHttpRequest对象是不支持的 http服务器 为了测试效果我们用nodejs写了个http服务器代码 文件名为httpServer.js如下&#xff0c;…

获取大疆无人机的飞控记录数据并绘制曲线

机型M350RTK&#xff0c;其飞行记录文件为加密的&#xff0c;我的完善代码如下 gitgithub.com:huashu996/DJFlightRecordParsing2TXT.git 一、下载安装官方的DJIFlightRecord git clone gitgithub.com:dji-sdk/FlightRecordParsingLib.git飞行记录文件在打开【我的电脑】&am…

Windows nvm 安装后webstrom vue项目编译报错,无法识别node

1 nvm安装流程 卸载原先nodejs用管理员权限打开exe安装nvmnvm文件夹和nodejs文件夹 都授权Authenticated Users 完全控制nvm list availablenvm install 16.20.1nvm use 16.20.1输入node和npm检查版本命令&#xff0c;正常显示确认系统变量和用户变量都有nvm 和nodejs 2 bug情…

数学建模-聚类算法 系统(层次)聚类

绝对值距离:网状道路 一般用组间和组内距离 聚类的距离计算如何选取&#xff1a;看结果是否解释的通&#xff0c;选择一种结果解释的通的方法。

【数据挖掘】将NLP技术引入到股市分析

一、说明 在交易中实施的机器学习模型通常根据历史股票价格和其他定量数据进行训练&#xff0c;以预测未来的股票价格。但是&#xff0c;自然语言处理&#xff08;NLP&#xff09;使我们能够分析财务文档&#xff0c;例如10-k表格&#xff0c;以预测股票走势。 二、对自然语言处…