Spring-Kafka生产者源码分析

news2025/2/27 17:27:28

文章目录

    • 概要
    • 初始化
    • 消息发送
    • 小结

概要

本文主要概括Spring Kafka生产者发送消息的主流程

代码准备:
SpringBoot项目中maven填加以下依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

消息发送使用KafkaTemplate

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@GetMapping("/test/send/{msg}")
public String sendMsg(@PathVariable String msg) {
    kafkaTemplate.send("alai_test", msg);
    return "success";
}

初始化

启动类KafkaAutoConfiguration
有两个地方需要关注

@Bean
@ConditionalOnMissingBean({KafkaTemplate.class})
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
    if (this.messageConverter != null) {
        kafkaTemplate.setMessageConverter(this.messageConverter);
    }

    kafkaTemplate.setProducerListener(kafkaProducerListener);
    kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
    return kafkaTemplate;
}

@Bean
@ConditionalOnMissingBean({ProducerFactory.class})
public ProducerFactory<?, ?> kafkaProducerFactory() {
    DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());
    String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
    if (transactionIdPrefix != null) {
        factory.setTransactionIdPrefix(transactionIdPrefix);
    }

    return factory;
}

其中的ProducerFactory使用的是DefaultKafkaProducerFactory

在发送消息之前,Spring Kafka会先创建Producer,返回的是CloseSafeProducer实现类,在该类中有一个委托类Producer<K, V> delegate,真正的发送消息处理逻辑委托给KafkaProducerKafkaProducer实例构造如下,边幅原因,这里只展示需要说明的部分

KafkaProducer(Map<String, Object> configs,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  ProducerMetadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors<K, V> interceptors,
                  Time time) {
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
                valueSerializer));
        try {
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = time;

            String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                    (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;

            this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

            LogContext logContext;
            if (transactionalId == null)
                logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
            else
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
            log = logContext.logger(KafkaProducer.class);
            log.trace("Starting the Kafka producer");

            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                    .tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                         Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // load interceptors and make sure they get clientId
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
            List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
             // 生产者拦截器       
            if (interceptors != null)
                this.interceptors = interceptors;
            else
                this.interceptors = new ProducerInterceptors<>(interceptorList);
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
                    valueSerializer, interceptorList, reporters);
             // 生产者往服务端发送消息的时候,规定一条消息最大多大?
			// 如果你超过了这个规定消息的大小,你的消息就不能发送过去。
			// 默认是1M,这个值偏小,在生产环境中,我们需要修改这个值。
			// 经验值是10M。但是大家也可以根据自己公司的情况来。       
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            //指的是缓存大小
			//默认值是32M,这个值一般是够用,如果有特殊情况的时候,我们可以去修改这个值。
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            // kafka是支持压缩数据的,可以设置压缩格式,默认是不压缩,支持gzip、snappy、lz4
			// 一次发送出去的消息就更多。生产者这儿会消耗更多的cpu.
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
			// 配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多长时间,由于缓冲区已满或元数据不可用,这些方法可
			// 能会被阻塞止
            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

            this.apiVersions = new ApiVersions();
            this.transactionManager = configureTransactionState(config, logContext);
            // 创建核心组件:记录累加器
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.compressionType,
                    lingerMs(config),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
          
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
            if (metadata != null) {
                this.metadata = metadata;
            } else {
           		 // 生产者每隔一段时间都要去更新一下集群的元数据,默认5分钟
                this.metadata = new ProducerMetadata(retryBackoffMs,
                        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                        config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                        logContext,
                        clusterResourceListeners,
                        Time.SYSTEM);
                this.metadata.bootstrap(addresses);
            }
            this.errors = this.metrics.sensor("errors");
            // 真正执行消息发送的逻辑
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            // 开启新的线程
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(Duration.ofMillis(0), true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

创建Sender时的方法如下:

// visible for testing
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
    // 使用幂等性,需要将 enable.idempotence 配置项设置为true。并且它对单个分区的发送,一次性最多发送5条
    int maxInflightRequests = producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
    // 控制客户端等待请求响应的最长时间。如果在超时过去之前未收到响应,客户端将
    // 在必要时重新发送请求,或者如果重试耗尽,请求失败
    int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
    ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);

    Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
        // 初始化了一个重要的管理网路的组件
// connections.max.idle.ms: 默认值是9分钟, 一个网络连接最多空闲多久,超过这个空闲时间,就关闭这个网络连接。
// max.in.flight.requests.per.connection:默认是5, producer向broker发送数据的时候,其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数
    KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
            new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                    this.metrics, time, "producer", channelBuilder, logContext),
            metadata,
            clientId,
            maxInflightRequests,
            producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
            producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
            producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
            producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
            requestTimeoutMs,
            producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
            producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
            time,
            true,
            apiVersions,
            throttleTimeSensor,
            logContext);

    short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
    return new Sender(logContext,
            client,
            metadata,
            this.accumulator,
            maxInflightRequests == 1,
            producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
            acks,
            producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), // 重试次数
            metricsRegistry.senderMetrics,
            time,
            requestTimeoutMs,
            producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
            this.transactionManager,
            apiVersions);
}

在创建RecordAccumulator时,其内部会维护一个ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches ,该Map的key是TopicPartition,这个类重写了equals方法,相同的topic,相同的分区,在batches中属于相同的key,就会被放入到队列Deque中。

消息发送

Spring Kafka对消息的发送,最后也是直接委托给了org.apache.kafka.clients.producer.KafkaProducer#doSend方法,下面以这个方法作为入口进行分析

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

拦截器

onSend 方法是遍历拦截器 onSend 方法,拦截器的目的是将数据处理加工, kafka 本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现 ProducerInterceptor 接口

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptRecord = interceptor.onSend(interceptRecord);
        } catch (Exception e) {
          // 其中一个拦截器出现处理异常时不回抛出异常,只会打印日志
            // do not propagate interceptor exception, log and continue calling other interceptors
            // be careful not to throw exception from here
            if (record != null)
                log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
            else
                log.warn("Error executing interceptor onSend callback", e);
        }
    }
    return interceptRecord;
}

ProducerInterceptor的3个方法:

  • onSend: Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
  • onAcknowledgement: 该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
  • close: 关闭interceptor,主要用于执行一些资源清理工作

消息发送主流程

/**
 * Implementation of asynchronously send a record to a topic.
 */
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime;
        try {
        // 首先确保该topic的元数据可用
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        // 序列化 record 的 key 和 value
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer", cce);
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer", cce);
        }
        // 获取该 record 要发送到的 partition
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();

        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        ensureValidRecordSize(serializedSize);
        long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
        if (log.isTraceEnabled()) {
            log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        }
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
		// 向 accumulator 中追加 record 数据,数据会先进行缓存
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

        if (result.abortForNewBatch) {
            int prevPartition = partition;
            partitioner.onNewBatch(record.topic(), cluster, prevPartition);
            partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            if (log.isTraceEnabled()) {
                log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
            }
            // producer callback will make sure to call both 'callback' and interceptor callback
            interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
        }
        if (transactionManager != null) {
            transactionManager.maybeAddPartition(tp);
        }

		// 如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的大小(或者batch 的剩余空间不足以添加下一条 Record),则唤醒 sender 线程发送数据。
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
    } catch (ApiException e) {
        ...
    }...
}
  • Producer 通过 waitOnMetadata() 方法来获取对应 topic 的 metadata 信息,需要先该topic 是可用的

  • Producer 端对 recordkeyvalue 值进行序列化操作,在 Consumer 端再进行相应的反序列化

  • 获取partition值,具体分为下面三种情况:
    1 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
    2 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
    3 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到partition 值,也就是常说的 round-robin 算法
    4 Producer 默认使用的 partitioner 是org.apache.kafka.clients.producer.internals.DefaultPartitioner

  • accumulator 写数据,先将 record 写入到 buffer 中,当达到一个 batch.size 的大小时,再唤起 sender线程去发送 RecordBatch,这里仔细分析一下Producer是如何向buffer写入数据的
    1.获取该 topic-partition 对应的 queue,没有的话会创建一个空的 queue
    2.向 queue 中追加数据,先获取 queue 中最新加入的那个 RecordBatch,如果不存在或者存在但剩余空余不足以添加本条 record 则返回 null,成功写入的话直接返回结果,写入成功
    3.创建一个新的 RecordBatch,初始化内存大小根据 max(batch.size,Records.LOG_OVERHEAD + Record.recordSize(key, value)) 来确定(防止单条record 过大的情况)
    4.向新建的 RecordBatch 写入 record,并将 RecordBatch 添加到 queue 中,返回结果,写入成功

  • 发送 RecordBatch,当 record 写入成功后,如果发现 RecordBatch 已满足发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒sender 线程,发送 RecordBatch 。sender 线程对 RecordBatch 的处理是在 run() 方法中进行的,该方法具体实现如下:
    1.获取那些已经可以发送的 RecordBatch 对应的 nodes
    2.如果与node 没有连接(如果可以连接,同时初始化该连接),就证明该 node 暂时不能发送数据,暂时移除该 node
    3.返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是
    node.id),并将 RecordBatch 从对应的 queue 中移除
    4.将由于元数据不可用而导致发送超时的 RecordBatch 移除
    5.发送 RecordBatch

在这里插入图片描述

小结

在这里插入图片描述

由上图可以看出:KafkaProducer有两个基本线程:

主线程:

  1. 负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecordAccumulator中;
  2. RecordAccumulator为每个分区都维护了一个Deque` 类型的双端队列。
  3. ProducerBatch可以理解为是ProducerRecord` 的集合,批量发送有利于提升吞吐量,降低网络影响;
  4. 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。
  5. 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。

Sender线程:

  1. 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的形式, Node 表示集群的broker节点。
  2. 进一步将<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以向服务端发送数据。
  3. 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/966136.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大胆尝试这些新的CSS属性,释放CSS的力量吧(一)

本文章系《Unleashing the Power of CSS》&#xff08;释放CSS的力量&#xff0c;暂且这么翻译吧&#xff09;一书的学习笔记&#xff0c;希望通本书的学习&#xff0c;系统的梳理下CSS相关的高级新特性。本篇文章是其第一部分&#xff0c;由于全书英文版&#xff0c;理解和阅读…

mac制作ssl证书|生成自签名证书,nodejs+express在mac上搭建https+wss(websocket)服务器

注意 mac 自带 openssl 所以没必要像 windows 一样先安装 openssl&#xff0c;直接生成即可 生成 ssl/自签名 证书 生成 key # 生成rsa私钥&#xff0c;des3算法&#xff0c;server_ssl.key是秘钥文件名 1024位强度 openssl genrsa -des3 -out server_ssl.key 1024让输入两…

MySQL开机自启动设置(Windows)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

黑马 软件测试从0到1 常用分类 模型 流程 用例

课程内容&#xff1a; 1、软件测试基础 2、测试设计 3、缺陷管理 4、Web常用标签 5、项目实战 以终为始&#xff0c;由交付实战目标为终&#xff0c;推出所学知识&#xff1b;从认识软件及软件测试&#xff0c;到如何设计测试、缺陷标准及缺陷管理&#xff0c;最终以项目实战贯…

pytorch异常——loss异常,不断增大,并且loss出现inf

文章目录 异常报错异常截图异常代码原因解释修正代码执行结果 异常报错 epoch1:loss3667.782471 epoch2:loss65358620.000000 epoch3:loss14979486720.000000 epoch4:loss1739650891776.000000 epoch5:loss12361745880317952.000000 epoch6:loss2740315398365287284736.000000…

PMD代码检查:为了提升性能,正确使用记录日志的语句(GuardLogStatement)

https://docs.pmd-code.org/pmd-doc-6.55.0/pmd_rules_java_bestpractices.html#guardlogstatement 对应记录日志的语句&#xff0c;要首先检查对应的日志级别有没有实际打开&#xff1b;如果没有实际打开&#xff0c;那么就要跳过字符串的生成环节&#xff0c;以提升性能。 另…

C#,数值计算——Midsql的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { public class Midsql : Midpnt { private double aorig { get; set; } 0.0; public new double func(double x) { return 2.0 * x * funk.funk(aorig x * x); } p…

MySQL中表的设计

在MySQL中表的设计&#xff0c;需要一定的经验才能理解&#xff0c;由于笔者目前在读中&#xff0c;理解不是很深刻&#xff0c;仅根据自己的想法外界的一些参考资料做出下述文字描述&#xff0c;一些错误&#xff0c;请大佬及时指正~~ 在本篇文章中&#xff0c;介绍一点简单粗…

Windows7安装SSH客户端的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

归并排序的详解!

本文旨在讲解归并排序的实现&#xff08;递归及非递归&#xff09;搬好小板凳&#xff0c;干货来了&#xff01; 前序&#xff1a; 在介绍归并排序之前&#xff0c;需要给大家介绍的是什么是归并&#xff0c;归并操作&#xff0c;也叫归并算法&#xff0c;指的是将两个顺序序列…

11 - 深入了解NIO的优化实现原理

Tomcat 中经常被提到的一个调优就是修改线程的 I/O 模型。Tomcat 8.5 版本之前&#xff0c;默认情况下使用的是 BIO 线程模型&#xff0c;如果在高负载、高并发的场景下&#xff0c;可以通过设置 NIO 线程模型&#xff0c;来提高系统的网络通信性能。 我们可以通过一个性能对比…

机器学习——决策树与随机森林

机器学习——决策树与随机森林 文章目录 前言一、决策树1.1. 原理1.2. 代码实现1.3. 网格搜索1.4. 可视化决策树 二、随机森林算法2.1. 原理2.2. 代码实现 三、补充&#xff08;过拟合与欠拟合&#xff09;总结 前言 决策树和随机森林都是常见的机器学习算法&#xff0c;用于分…

OS 内存分区和分页 多级页表与快表

每个进程的PCB都有一个LDT 内存紧缩不实用&#xff0c;所需时间太长 类似于段表&#xff0c;存在页表 但是不连续需要的空间太多了&#xff0c;太麻烦了 多级页表&#xff1a;类比于书的章目录和节目录 构建页目录 每个页目录号指向4M的地址 快表是寄存器&#xff0c;很昂…

Kotlin管道Channel在receiveAsFlow时debounce与flow差异

Kotlin管道Channel在receiveAsFlow时debounce与flow差异 import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.coroutine…

Nginx高级配置

目录 一、Nginx 第三方模块 1.1ehco 模块 二、变量 2.1 内置 2.2 自定义变量 三、nginx压缩功能 ​编辑四、https功能 一、Nginx 第三方模块 1.1ehco 模块 基于nginx 模块 ngx_http_stub_status_module 实现&#xff0c;在编译安装nginx的时候需要添加编译参数 --with-…

第7篇:ESP32连接按钮点亮LED无源喇叭播放声音

第1篇:Arduino与ESP32开发板的安装方法 第2篇:ESP32 helloword第一个程序示范点亮板载LED 第3篇:vscode搭建esp32 arduino开发环境 第4篇:vscodeplatformio搭建esp32 arduino开发环境 第5篇:doit_esp32_devkit_v1使用pmw呼吸灯实验 第6篇:ESP32连接无源喇叭播放音乐《涛声…

程序员自由创业周记#3:No1.作品

作息 如果不是热爱&#xff0c;很难解释为什么能早上6点自然醒后坐在电脑前除了吃饭一直敲代码到23点这个现象&#xff0c;而且还乐此不疲。 之前上班的时候生活就很规律&#xff0c;没想到失业后的生活比之前还要规律&#xff1b;记得还在上班的时候&#xff0c;每天7点半懒洋…

670. 最大交换

链接&#xff1a; ​​​​​​670. 最大交换 题解&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 class Solution { public:int maximumSwap(int num) {if (num < 0) {return 0;}std::string str to_string(num);stack<int&g…

植物大战僵尸各种僵尸攻略(一)

前言 此文章为“植物大战僵尸”专栏中的004刊&#xff08;2023年9月第2刊&#xff09;&#xff0c;欢迎订阅。版权所有。 注意&#xff1a; 1.本博客适用于pvz无名版&#xff1b; 2.pvz指植物大战僵尸&#xff08;Plants VS Zonbies)&#xff1b; 3.本文以耗费低做标准&am…

JY901B智能9轴加速度计陀螺仪角度传感器

今日学习使用JY901B智能9轴加速度计陀螺仪角度传感器 本文会先使用上位机获取数据作演示&#xff0c;后介绍它的数据表发送原理。 文章提供详细的原理讲解&#xff0c;测试工程下载&#xff0c;代码讲解&#xff0c;本人有多注释的习惯&#xff0c;希望对大家有帮助。 我的J…