【Kafka源码走读】消息生产者与服务端的连接过程

news2025/1/18 8:59:15

说明:以下描述的源码都是基于最新版,老版本可能会有所不同。 

一. 查找源码入口

        kafka-console-producer.sh是消息生产者的脚本,我们从这里入手,可以看到源码的入口:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

 从上面的代码可以得知,源码是kafka.tools.ConsoleProducer,这是一个scala的文件。

二. 利用源码启动生产者进行调试

        阅读源码最好的方式就是在debug下,边看边断点跟踪,所以我们先把环境配置好,以便程序可以run起来。

        由于kafka server端配置了认证模式,那么在client侧,也需要加上认证的配置,否则会导致连接server失败。如何开启认证模式,可参考我之前写的这篇文章。我们可以参考kafka-console-producer.sh脚本运行时传入的参数,对应填入idea的Run/Debug Configurations界面中。

脚本:

/kafka/bin/kafka-console-producer.sh --bootstrap-server=127.0.0.1:9092 --topic=notif.test --producer.config=/kafka/config/topic.properties

topic.properties的内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

这个配置也可以放在producer.properties里面,下面会看到。 

idea界面: 

红色框起来的部分,就是和无认证模式下的区别,没有这两个参数,连接server就会失败。client.jaas.conf里面的参数,请参考上面提到的开启认证模式的文章。producer.properties是kafka自带配置文件,我们仅需要增加如下配置即可:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 好了,一切就绪,就可以执行run了,控制台如果没有错误,那就说明启动成功了,如下:

        说到这里,不得不感慨一下,平时基本上没有run过命令行输入内容的代码。然后,我停留在这个界面半个小时,一直以为没有连接成功,各种排查是哪里配置的不对之类的。突然间想起去看下server端的日志,结果发现连上了。然后试着在上面红色日志下方去输入内容(见下图),好家伙,consumer侧收到了,大写的尴尬!

 kafka同学,你说你要是在我输入的上方再写点提示日志该多好啊。。。

三. 查看生产者连接服务端的过程

        既然代码跑起来了,那就开始我们的阅读之旅。首先,在ConsoleProducer.scala中找到入口函数main()方法,这是任何编程语言的启动之源:

  def main(args: Array[String]): Unit = {

    try {
      val config = new ProducerConfig(args)
      val input = System.in
      val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))
      try loopReader(producer, newReader(config.readerClass, getReaderProps(config)), input, config.sync)
      finally producer.close()
      Exit.exit(0)
    } catch {
      case e: joptsimple.OptionException =>
        System.err.println(e.getMessage)
        Exit.exit(1)
      case e: Exception =>
        e.printStackTrace()
        Exit.exit(1)
    }
  }

可以看出,它调用了val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

        KafkaProducer是java代码,查看其最终调用的构造函数:

    KafkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  ProducerMetadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors<K, V> interceptors,
                  Time time) {
        try {
            this.producerConfig = config;
            this.time = time;

            // 此处省略多行代码
            this.errors = this.metrics.sensor("errors");
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(Duration.ofMillis(0), true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

关注this.sender = newSender(logContext, kafkaClient, this.metadata);这行代码,进入newSender()函数:

     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
        // 此处省略部分代码
        KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(producerConfig,
                this.metrics,
                "producer",
                logContext,
                apiVersions,
                time,
                maxInflightRequests,
                metadata,
                throttleTimeSensor,
                clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));

        short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
        return new Sender(参数省略);
    }

注意这行代码:

KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(参数省略);

前面都没有对kafkaClient进行赋值,所以这行代码可简化为:

KafkaClient client = ClientUtils.createNetworkClient(参数省略)

接下来查看ClientUtils.createNetworkClient()函数,最终会调用下面这个方法:

    public static NetworkClient createNetworkClient(入参省略) {
        ChannelBuilder channelBuilder = null;
        Selector selector = null;

        try {
            channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
            selector = new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                    metrics,
                    time,
                    metricsGroupPrefix,
                    channelBuilder,
                    logContext);
            return new NetworkClient(metadataUpdater,
                    metadata,
                    selector,
                    clientId,
                    maxInFlightRequestsPerConnection,
                    后续参数省略);
        } catch (Throwable t) {
            closeQuietly(selector, "Selector");
            closeQuietly(channelBuilder, "ChannelBuilder");
            throw new KafkaException("Failed to create new NetworkClient", t);
        }
    }

我们在第二步调试的时候,不是加了认证的配置参数吗,处理认证配置的方法就在上面的方法里面,具体是如下代码:

channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
    public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) {
        SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
        String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
        return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,
                clientSaslMechanism, time, true, logContext);

 ChannelBuilders.createChannelBuilder()方法只是外层的判断:

    public static ChannelBuilder clientChannelBuilder(入参省略) {

        if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
            if (contextType == null)
                throw new IllegalArgumentException("`contextType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
            if (clientSaslMechanism == null)
                throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
        }
        return create(securityProtocol, ConnectionMode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
                saslHandshakeRequestEnable, null, null, time, logContext, null);
    }

详细的处理逻辑是在ChannelBuilders.create()方法里面:

private static ChannelBuilder create(入参省略) {
        Map<String, Object> configs = channelBuilderConfigs(config, listenerName);

        ChannelBuilder channelBuilder;
        switch (securityProtocol) {
            case SSL:
                requireNonNullMode(connectionMode, securityProtocol);
                channelBuilder = new SslChannelBuilder(connectionMode, listenerName, isInterBrokerListener, logContext);
                break;
            case SASL_SSL:
            case SASL_PLAINTEXT:
                // 业务代码太长,省略
                break;
            case PLAINTEXT:
                channelBuilder = new PlaintextChannelBuilder(listenerName);
                break;
            default:
                throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
        }

        channelBuilder.configure(configs);
        return channelBuilder;
    }

 好了,现在又回到ClientUtils.createNetworkClient()方法:

    public static NetworkClient createNetworkClient(入参省略) {
        ChannelBuilder channelBuilder = null;
        Selector selector = null;

        try {
            channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
            selector = new Selector(参数省略);
            return new NetworkClient(metadataUpdater,
                    metadata,
                    selector,
                    clientId,
                    maxInFlightRequestsPerConnection,
                    后续参数省略);
        } catch (Throwable t) {
            closeQuietly(selector, "Selector");
            closeQuietly(channelBuilder, "ChannelBuilder");
            throw new KafkaException("Failed to create new NetworkClient", t);
        }
    }

创建channelBuilder之后,紧接着是创建一个Selector对象,然后再创建一个NetworkClient对象,并返回。创建SelectorNetworkClient对象的构造函数都只是初始化各类参数,没有值得需要注意的地方,所以这里就跳过了。

        上述代码执行完毕,则会回到KafkaProducer.newSender()方法:

     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
        // 此处省略部分代码
        KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(参数省略);

        short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
        return new Sender(参数省略);
    }

从前面的代码可知,ClientUtils.createNetworkClient()方法返回一个NetworkClient对象,kafkaClientNetworkClient的父类,所以kafkaClient client即NetworkClient client。kafkaClient client赋值完成之后,接着是创建一个Sender对象,并返回。 因为Sender对象也只是一些初始化操作,所以这里也跳过。

        KafkaProducer.newSender()方法返回一个Sender对象,然后回到KafkaProducer的构造方法:

    KafkaProducer(入参省略) {
        try {
            // 此处省略多行代码
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // 此处省略多行代码
        }
    }

赋值sender之后,接下来是创建KafkaThread对象,构造方法如下:

    public KafkaThread(final String name, Runnable runnable, boolean daemon) {
        super(runnable, name);
        configureThread(name, daemon);
    }

 由此可以看出KafkaThread只是对线程做了一些附加的工作,KafkaThread对象创建完成,下一步就是执行start()方法。在KafkaThread的构造函数中传入的Runable参数是Sender对象,所以,我们需要去看下Sender的run()方法:

/**
     * The main run loop for the sender thread
     */
    @Override
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        if (transactionManager != null)
            transactionManager.setPoisonStateOnInvalidTransition(true);

        // main loop, runs until close is called
        while (running) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the transaction manager, accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        // Abort the transaction if any commit or abort didn't go through the transaction manager's queue
        while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
            if (!transactionManager.isCompleting()) {
                log.info("Aborting incomplete transaction due to shutdown");
                try {
                    // It is possible for the transaction manager to throw errors when aborting. Catch these
                    // so as not to interfere with the rest of the shutdown logic.
                    transactionManager.beginAbort();
                } catch (Exception e) {
                    log.error("Error in kafka producer I/O thread while aborting transaction when during closing: ", e);
                    // Force close in case the transactionManager is in error states.
                    forceClose = true;
                }
            }
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        if (forceClose) {
            // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
            // the futures.
            if (transactionManager != null) {
                log.debug("Aborting incomplete transactional requests due to forced shutdown");
                transactionManager.close();
            }
            log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

由于这部分代码是重点,所以就没有对代码做简化。上面的代码可以看出,多次调用了runOnce()方法,所以我们来看下这个方法是在做什么:

    /**
     * Run a single iteration of sending
     *
     */
    void runOnce() {
        if (transactionManager != null) {
            try {
                transactionManager.maybeResolveSequences();

                RuntimeException lastError = transactionManager.lastError();

                // do not continue sending if the transaction manager is in a failed state
                if (transactionManager.hasFatalError()) {
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, time.milliseconds());
                    return;
                }

                if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
                    return;
                }

                // Check whether we need a new producerId. If so, we will enqueue an InitProducerId
                // request which will be sent below
                transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

                if (maybeSendAndPollTransactionalRequest()) {
                    return;
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request", e);
                transactionManager.authenticationFailed(e);
            }
        }

        long currentTimeMs = time.milliseconds();
        long pollTimeout = sendProducerData(currentTimeMs);
        client.poll(pollTimeout, currentTimeMs);
    }

 上述代码中最重要的方法应该就是client.poll()了吧,查看poll()方法的注释信息,定义在KafkaClient中:

    /**
     * Do actual reads and writes from sockets.
     *
     * @param timeout The maximum amount of time to wait for responses in ms, must be non-negative. The implementation
     *                is free to use a lower value if appropriate (common reasons for this are a lower request or
     *                metadata update timeout)
     * @param now The current time in ms
     * @throws IllegalStateException If a request is sent to an unready node
     */
    List<ClientResponse> poll(long timeout, long now);

上面注释表示该方法用于对报文进行读写工作。

        好了,现在回到KafkaProducer的构造方法,当执行this.ioThread.start()代码之后,KafkaProducer对象的初始化基本上就算完成了。但是,你们发现没有,上面的代码执行流程,都没有发现连接kafka server的代码呢?

        起初我怀疑是不是阅读源码时,把哪里的代码给遗漏了,于是又回头走了一遍,还是没发现连接server的过程。没办法了,开启debug模式吧。为了避免一步步debug,根据我的经验,在开启debug之前,我们可以回头看下,上述的各个java类中,哪一个类里面包含了连接server的方法,然后把断点加上去。

        因为上述代码就只有几个类,寻找的过程还是很简单的。很快,我就锁定到Selector这个类里面,代码如下:

    /**
     * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
     * number.
     * <p>
     * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long)}
     * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
     * @param id The id for the new connection
     * @param address The address to connect to
     * @param sendBufferSize The send buffer for the new connection
     * @param receiveBufferSize The receive buffer for the new connection
     * @throws IllegalStateException if there is already a connection for that id
     * @throws IOException if DNS resolution fails on the hostname or if the broker is down
     */
    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        ensureNotRegistered(id);
        SocketChannel socketChannel = SocketChannel.open();
        SelectionKey key = null;
        try {
            configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
            boolean connected = doConnect(socketChannel, address);
            key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);

            if (connected) {
                // OP_CONNECT won't trigger for immediately connected channels
                log.debug("Immediately connected to node {}", id);
                immediatelyConnectedKeys.add(key);
                key.interestOps(0);
            }
        } catch (IOException | RuntimeException e) {
            if (key != null)
                immediatelyConnectedKeys.remove(key);
            channels.remove(id);
            socketChannel.close();
            throw e;
        }
    }

看方法上面的注释,也很符合我的猜测,来吧,上断点。 然后查看断点处的线程栈:

没想到吧, 连接server的流程,是执行KafkaThread.start()方法才触发的。前面提到Sender.run()方法是重点,贴出的代码未作简化处理,原因正源于此。执行顺序:

run()->runOnce()->maybeSendAndPollTransactionalRequest()->.......

看下Sender.maybeSendAndPollTransactionalRequest()的源码:

    /**
     * Returns true if a transactional request is sent or polled, or if a FindCoordinator request is enqueued
     */
    private boolean maybeSendAndPollTransactionalRequest() {
        // 省略部分代码
        try {
            // 省略部分代码
            if (targetNode != null) {
                if (!awaitNodeReady(targetNode, coordinatorType)) {
                    log.trace("Target node {} not ready within request timeout, will retry when node is ready.", targetNode);
                    maybeFindCoordinatorAndRetry(nextRequestHandler);
                    return true;
                }
            } else if (coordinatorType != null) {
                // 省略部分代码
            } else {
                // 省略部分代码
            }
                // 省略部分代码
        }
    }

进入Sender.awaitNodeReady()方法:

    private boolean awaitNodeReady(Node node, FindCoordinatorRequest.CoordinatorType coordinatorType) throws IOException {
        if (NetworkClientUtils.awaitReady(client, node, time, requestTimeoutMs)) {
            if (coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION) {
                // Indicate to the transaction manager that the coordinator is ready, allowing it to check ApiVersions
                // This allows us to bump transactional epochs even if the coordinator is temporarily unavailable at
                // the time when the abortable error is handled
                transactionManager.handleCoordinatorReady();
            }
            return true;
        }
        return false;
    }

接着进入NetworkClientUtils.awaitReady() :

    public static boolean awaitReady(KafkaClient client, Node node, Time time, long timeoutMs) throws IOException {
        if (timeoutMs < 0) {
            throw new IllegalArgumentException("Timeout needs to be greater than 0");
        }
        long startTime = time.milliseconds();

        if (isReady(client, node, startTime) ||  client.ready(node, startTime))
            return true;

        // 省略部分代码
    }

接着进入NetworkClientUtils.isReady():

    public static boolean isReady(KafkaClient client, Node node, long currentTime) {
        client.poll(0, currentTime);
        return client.isReady(node, currentTime);
    }

接着进入NetworkClient.poll():

    @Override
    public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();

        // 省略部分代码

        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        long telemetryTimeout = telemetrySender != null ? telemetrySender.maybeUpdate(now) : Integer.MAX_VALUE;
        // 省略部分代码

        return responses;
    }

 继续进入NetworkClient.DefaultMetadataUpdater.maybeUpdate()方法:

    class DefaultMetadataUpdater implements MetadataUpdater {

        // 省略部分代码

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
            this.inProgress = null;
        }

        // 省略部分代码
        
        public long maybeUpdate(long now) {
            // 省略部分代码
            return maybeUpdate(now, leastLoadedNode.node());
        }
    }

继续进入NetworkClient.DefaultMetadataUpdater.maybeUpdate()方法:

        private long maybeUpdate(long now, Node node) {
            // 省略部分代码

            if (connectionStates.canConnect(nodeConnectionId, now)) {
                // We don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node);
                initiateConnect(node, now);
                return reconnectBackoffMs;
            }

            return Long.MAX_VALUE;
        }

继续进入NetworkClient.initiateConnect()方法:

    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            connectionStates.connecting(nodeConnectionId, now, node.host());
            InetAddress address = connectionStates.currentAddress(nodeConnectionId);
            log.debug("Initiating connection to node {} using address {}", node, address);
            
            // 这里就是连接server的终极入口了
            selector.connect(nodeConnectionId,
                    new InetSocketAddress(address, node.port()),
                    this.socketSendBuffer,
                    this.socketReceiveBuffer);
        } catch (IOException e) {
            // 省略部分代码
        }
    }

好了,终于看到希望了,进入Selector.connect()方法,正是我之前打断点的代码,这里就不再占用篇幅了。

        通过打断点跟踪的方式,终于找到了生产者连接server的过程。连接成功之后,就可以发送消息了。我们再回过头来看下ConsoleProducer.main()方法:

  def main(args: Array[String]): Unit = {

    try {
      val config = new ProducerConfig(args)

      // 接受控制台输入
      val input = System.in

      // 连接server
      val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))
      
      // 发送消息
      try loopReader(producer, newReader(config.readerClass, getReaderProps(config)), input, config.sync)
      finally producer.close()
      Exit.exit(0)
    } catch {
      // 省略部分代码
    }
  }

总结一下,main()方法就做了三件事:

  • 接受控制台输入
  • 连接server
  • 发送消息 

发送消息后续有机会再研究吧,本章内容到此完结,撒花!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2067518.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue处理表格长字段显示问题

背景 有些单元个中会有比较长的内容&#xff0c;如果使用默认格式&#xff0c;会导致单元格的高度比较怪异&#xff0c;需要将超长的内容进行省略。 当前效果&#xff1a; 优化效果&#xff1a; 优化代码&#xff1a; 在内容多的单元格增加下面代码 <el-table-columnprop…

SAP成本核算-事前控制(标准成本核算)

一、BOM清单 1、BOM清单抬头 BOM用途:决定成本核算控制的依据 物料清单状态:决定成本核算控制的依据 基本数量:用于计算标准的用量 有效期:决定生产工单开单的日期范围,以及成本核算的日期范围 物料清单状态默认值后台配置:事务代码OS21 2、BOM清单行项目 有效期:决…

Java框架Shiro、漏洞工具利用、复现以及流量特征分析

Shiro流量分析 前言 博客主页&#xff1a; 靶场&#xff1a;Vulfocus 漏洞威胁分析平台 Shiro&#xff08;Apache Shiro&#xff09;是一个强大且灵活的开源安全框架&#xff0c;专为Java应用程序提供安全性解决方案。它由Apache基金会开发和维护&#xff0c;广泛应用于企业级…

Anolis os系统进入单用户模式重置密码

Anolis os系统进入单用户模式重置密码 1、重启计算机。 2、在启动时&#xff0c;当GRUB菜单出现时&#xff0c;按下任意键停止自动倒计时。 3、选择要启动的内核版本&#xff0c;然后按e键编辑启动参数。 4、找到以linux或linux16开头的行&#xff0c;通常这行包含了启动内核…

keepalived与lvs

1 lvs Linux服务器集群系统(一) -- LVS项目介绍 LVS&#xff08;Linux Virtual Server&#xff09;即Linux虚拟服务器,是一个基于Linux操作系统的虚拟服务器技术&#xff0c;用于实现负载均衡和高可用性。章文嵩&#xff0c;是中国国内最早出现的自由软件项目之一。 2 lvs发展…

Circuitjs 快捷键完全列表

对于常见组件, 反复通过菜单去选择也是比较繁琐的, 系统考虑到这一点, 也为那些常用组件添加了快捷键. 通过 菜单--选项--快捷键... 可以查看所有快捷键, 分配新的快捷键或调整现有的快捷键. 点开菜单时, 位于菜单右侧的那些字母即是对应的快捷键, 如下图所示: 注: 旧版本有, …

Debug-022-el-upload照片上传-整体实现回顾

前情概要&#xff1a; 最近业务里通过el-upload实现一个上传图片的功能&#xff0c;有一些小小的心得分享给各位。一方面是review一下&#xff0c;毕竟实现了很多细小的功能&#xff0c;还有这么多属性、方法&#xff08;钩子&#xff09;和碰到的问题&#xff0c;感觉小有成就…

Swing中如何实现快捷键绑定和修改

在许许多多市面上常见的桌面软件中, 可以使用快捷键操作&#xff0c; 比如像微信一样,使用AltA 可以打开截图窗口&#xff0c;如果不习惯于AltA按键时&#xff0c;还可以打开设置去修改。 如果在swing中也想实现一个快捷键绑定和修改的操作&#xff0c;那么应该如何操作&#…

《计算机操作系统》(第4版)第8章 磁盘存储器的管理 复习笔记

第8章 磁盘存储器的管理 一 、外存的组织方式 1. 连续组织方式(连续分配方式) (1)概述 ①定义 连续组织方式要求为每一个文件分配一组相邻接的盘块。磁盘空间的联系组织方式如图8-1所示。 ②记录方法 在目录项的“文件物理地址”字段中记录该文件第一个记录所在的盘块号和文件…

【Docker深入浅出】Docker引擎架构介绍

文章目录 一. docker引擎介绍1. Docker daemon&#xff1a;实现Docker API&#xff0c;通过API管理容器2. containerd&#xff1a;负责容器的生命周期3. runc&#xff1a;用于创建和启动容器 二. 启动容器的过程1. 启动过程2. docker daemon的维护不会影响到运行中的容器3. shi…

依靠 VPN 生存——探索 VPN 后利用技术

执行摘要 在这篇博文中,Akamai 研究人员强调了被忽视的 VPN 后利用威胁;也就是说,我们讨论了威胁行为者在入侵 VPN 服务器后可以用来进一步升级入侵的技术。 我们的发现包括影响 Ivanti Connect Secure 和 FortiGate VPN 的几个漏洞。 除了漏洞之外,我们还详细介绍了一组…

C语言试题(含答案解析)

单选 1.下面C程序的运行结果为&#xff08;&#xff09; int main(void) {printf("%d", B < A);return 0; }A.编译错误 B.1 C.0 D.运行错误 A’的ascii码值为65&#xff0c;‘B’的ascii码值为66&#xff0c;‘B’<‘A’是不成立的&#xff0c;返回0&#xf…

spring学习(1)

目录 一、是什么 二、IOC思想 2.1 IOC创建对象的方式 三、Spring的配置 3.1 别名 3.2 Bean的配置 3.3 import 四、DI依赖注入 4.1 构造器注入 4.2 Set方式注入【重点】 4.3 拓展方式注入 五、Bean的自动装配 5.1 byName自动装配 5.2 byType自动装配 5.3 注解实现…

Vue小知识大杂烩

一、Vue组件的三大部分&#xff1a;template、Script、Style template --> 组件的模板结构 写html的地方 注意&#xff1a;<template> 是 vue 提供的容器标签&#xff0c;只起到包裹性质的作用&#xff0c;它不会被渲染为真正的 DOM 元素。 script -> 组件的…

超声波清洗机什么牌子值得入手? 清洁力好的超声波清洗机推荐

对于眼镜佩戴人士而言&#xff0c;超声波清洗机无疑是清洁神器&#xff01;它凭借高频振动技术&#xff0c;能深入眼镜的每一细微处及手洗难以触及的缝隙&#xff0c;有效清除顽固污渍&#xff0c;不仅大幅提高清洁效率&#xff0c;而且清洁质量远胜传统方法。随着超声波清洗机…

Linux下快速搭建七日杀官方私人服务器教程

今天给大家分享一下七日杀的个人开服教程&#xff0c;本教程基于Linux系统开发&#xff0c;推荐有一定基础的小伙伴尝试&#xff01;如果你没有Linux的基础但实在想开的小伙伴可以根据以下教程一步步进行操作&#xff0c;后续这边也会上架对应视频操作 架设前准备&#xff1a; …

Redis篇三:在Ubuntu下安装Redis

文章目录 1. 安装Redis2. 更改Redis的IP3. 使用redis自带的客户端来连接服务器4. Redis的客户端介绍 1. 安装Redis sudo apt install redis2. 更改Redis的IP 刚安装的Redis的ip是一个本地环回的ip&#xff0c;也就是只能由当前主机上的客户端进行访问&#xff0c;跨主机就访问…

IO进程线程 0823作业

作业 创建子父进程&#xff0c;子进程将1.txt内容拷贝到2.txt中&#xff0c;父进程将3.txt内容拷贝到4.txt中。 #include <myhead.h> int main(int argc, const char *argv[]) {pid_t ID;ID fork();if(ID > 0){int fd1;fd1 open("./3.txt",O_RDONLY);if(…

js 键盘监听 组合键

今天分享如何快速实现js快捷键监听 所需环境&#xff1a; 浏览器js 实现目标 mac/win兼容&#xff0c;一套代码&#xff0c;多个平台支持快捷键监听/单按键监听事件是否冒泡可设置使用方式简单快速挂载与卸载4行代码实现组合键监听 代码原理 把键盘监听事件挂载在documen…

c#-DataGridView控件实现分页

有时候我们需要进行分页显示&#xff0c;第一方面是在大数据量下可以降低卡顿&#xff0c;另一方面也是方便查找。 首先划重点&#xff0c;如果卡顿&#xff0c;不要用单元格填充的方式去刷新&#xff0c;用绑定数据源的方式比较高效&#xff01; 下面重点讲如何使用数据源绑定…