Kafka生产者原理

news2024/9/24 7:20:08

消息发送流程介绍

  1. Producer创建时,会创建⼀个sender线程并设置为守护线程。
  2. ⽣产消息时,内部其实是异步的;⽣产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
  3. 批次发送的条件为:缓冲区数据⼤⼩达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
  4. 批次发送后,发往指定分区,然后落盘到broker;如果⽣产者配置了retrires参数⼤于0并且失败原因允许重试,那么客户端内部会对该消息进⾏重试。
  5. 落盘到broker成功,返回⽣产元数据给⽣产者。
  6. 元数据返回有两种⽅式:⼀种是通过阻塞直接返回,另⼀种是通过回调返回。
    在这里插入图片描述

生产流程代码

kafka生产消息Demo

private KafkaProducer<String, String> producer;

    private static final String GENERAL_TOPIC = "test";

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 失败尝试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 是否压缩发送,压缩发送的好处是 节省了网络传输事件,但增加了CPU利用率
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        // 0:只管发送,不管kafka节点是否成功接收。 1:kafka的leader节点收到  2:主节点和从节点都有收到
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        producer = new KafkaProducer<>(properties);
    }

    @PreDestroy
    public void destroy() {
        producer.close();
    }

    public void send(String msg) {
        send(GENERAL_TOPIC, msg);
    }

    public void send(String topic, String msg) {
     	/**
     	* ProducerRecord实际满参构造函数有5个
     	* ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
     	* @param topic 主题
        * @param partition 指定分区存储
        * @param timestamp 消息时间戳.
        * @param key 决定分区的key值
        * @param value 消息内容
       */
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);
        // 消息发送成功与否情况回调
        producer.send(producerRecord, (metadata, exception) -> {
            if(exception == null) System.out.println("send success");
            else System.out.println("send error");
        });
    }
属性说明
bootstrap.serverskafka的url连接地址
key.serializerkey值序列化器
value.serializervalue值序列化器
acks确认broker是否成功接收到生产消息 0:发送后不管broker接收结果 1:broker主节点收到消息 all:主和跟随者节点都收到该消息
compression.type是否压缩发送,减少网络传输时间,但增加了CPU算力
retries重试次数

拦截器

上述demo中,如配置了拦截器,则先走拦截器过滤,如下源码。

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }


public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // 拦截器抛出的异常是没用的哈
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }

拦截器基类:

/**
* 手动修改record,返回新的记录
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

/**
* 记录发送到服务器后回调,或者发送服务器异常回调。 在 callback回调执行前执行该方法
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);

/**
* 拦截器关闭时调用 
*/
public void close();
  • onSend(ProducerRecord):该⽅法封装进KafkaProducer.send⽅法中,即运⾏在⽤户主线程中。 Producer确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在Producer回调逻辑触发之前。 onAcknowledgement运⾏在Producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。
    如前所述, Interceptor可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。

分区器

分区器,顾名思义对生产的消息进行分区运算,这里我们可以在配置中添加自己的分区器,也可以使用kafka默认分区器。
默认分区器工作机制:
在这里插入图片描述

/**
* 如果record指定分区不为空,则使用record自带的partition
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        // record是否含partition
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

  
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		// 获取所有的partitions
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            // 下一个值
            int nextValue = nextValue(topic);
            // 拿到所有可用分区
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                // 通过topic的下一个value,对可用的partition取模
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // 没有分区是可用的,对所有分区数取模
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // 通过key取hash,对partions数量取模
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
 }

 private int nextValue(String topic) {
 		// 线程安全的ConcurrentHashMap
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
       	// 获取一个随机值
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            // 存入map
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

自定义分区器

  1. 实现Partitioner接口即可
  2. 在KafkaProducer中进⾏设置:configs.put(“partitioner.class”,“xxx.xx.Xxx.class”)
    代码就略了
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * partition关闭时调用
     */
    public void close();

源代码原理剖析

在这里插入图片描述

  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器
    主线程原理
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // 拿到集群原始数据初始化完成需要等待的时间
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            // 剩余的阻塞时长
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
            // 序列化key
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
            // 序列化value
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            // 获取消息所属的分区(见分区器)
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
			// 消息头设置只读状态,防止更改
            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
			// 估算消息byte字节大小
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
           // 验证消息的大小,不能太大
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // 回调包装,如果有拦截器,先回调拦截器的 acknowledge方法,再回调用户定义的callback方法
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

            // 如果开启事务需要将TopicPartition放transactionManager中
            if (transactionManager != null && transactionManager.isTransactional())
                   //添加到缓冲区
                      transactionManager.maybeAddPartitionToTransaction(tp);
     
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
			// 如果缓冲区新建或已满,唤醒sender线程
            if (result.batchIsFull || result.newBatchCreated) {
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);
        } catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (KafkaException e) {
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (Exception e) {
            // we notify interceptor about all exceptions, since onSend is called before anything else in this method
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        }
    }
  • 累加器原理
    RecoderAccumulator中;
  1. 消息收集器RecoderAccumulator为每个分区TopicPartition都维护了⼀个 Deque 类型的双端队列,使用ConcurrentHashMap存储。
  2. ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低⽹络影响;
  3. 由于⽣产者客户端使⽤ java.io.ByteBuffer 在发送消息之前进⾏消息保存,并维护了⼀个BufferPool 实现 ByteBuffer 的复⽤;该缓存池只针对特定⼤⼩( batch.size 指定)的 ByteBuffer进⾏管理,对于消息过⼤的缓存,不能做到重复利⽤。
  4. 每次追加⼀条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取⼀个ProducerBatch,判断当前消息的⼤⼩是否可以写⼊该批次中。若可以写⼊则写⼊;若不可以写⼊,则新建⼀个ProducerBatch。
    在这里插入图片描述
public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // 旨在记录正在添加消息的条目(添加完成后,会还原)
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // 拿到Deque,如不存在则创建一个Deque(tp是Topic和Partition共同决定,作为key)
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
           // 尝试新增一条记录,如Deque末尾ProducerBatch集合存在且未满,追加记录,否则失败
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }
			/**
			* 以下逻辑旨在给 Deque分配一个新的 ProducerBatch
			*/
			// 猜测可能是根据kafka的api版本分配的最大可用字节
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            // 分配到批次字节与消息本身所占用字节取最大值
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            /** 
            * 分配内存,这行代码为什么没放到锁代码里面,想了想可能是这个原因
            * 1. 消息生产是相当快的,ProducerBatch很快打满或超时发送到broker,也就是说多个线程排队抢dq期间,其中大部分线程可能都得重新键ProducerBatch。
            * 2. 分配内存代码可能是相对耗时的,线程在等待获取锁的期间,先将内存提前分配避免进入到锁内,每个线程排队落库ProducerBatch再重新分配内存,提高效率
            */
            buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // 防止分配内存期间,其他线程已创建ProducerBatch,重试添加(防止并发)
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    return appendResult;
                }
				// 为消息分配内存
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                // ProducerBatch添加记录
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
				// 添加到双端队列末尾
                dq.addLast(batch);
                incomplete.add(batch);
                buffer = null;

                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            if (buffer != null)
            	// 释放缓存
                free.deallocate(buffer);
                // 该条消息添加完毕,还原
            appendsInProgress.decrementAndGet();
        }
    }
  • Sender线程:
  1. 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List 的形式, Node表示集群的broker节点。
  2. 进⼀步将<Node, List转化为<Node, Request>形式,此时才可以向服务端发送数据。
  3. 在发送之前,Sender线程将消息以 Map<NodeId, Deque> 的形式保存到InFlightRequests 中进⾏缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压⼒最⼩的⼀个,以实现消息的尽快发出。
public void run() {
        while (running) {
            try {
                // 开始消费
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        /** 
        * 1. 以上消费鉴于线程的开关,在线程收到关闭命令后,停止消费。 
        * 2. 那未消费完的Deque怎么办,接着消费直至完成
        * 3. 那些还在等待请求结果request 咋办,等着一起完成
        */
        while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        if (forceClose) {
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

 void run(long now) {
        if (transactionManager != null) {
            try {
                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // 重置批次生产信息
                    transactionManager.resetProducerId();
                 // 如果事务还在进行中,等待
                if (!transactionManager.isTransactional()) {
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                    transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);
                    return;
                }

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, now);
                    return;
                } else if (transactionManager.hasAbortableError()) {
                    accumulator.abortUndrainedBatches(transactionManager.lastError());
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);
                transactionManager.authenticationFailed(e);
            }
        }
		// 这才是重点,发送生产数据
        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

private long sendProducerData(long now) {
        //步骤1:获取元数据
        Cluster cluster = metadata.fetch();
 
        // get the list of partitions with data ready to send
        //步骤2: 判断哪些Partition有消息可以发送,获取到这个partition的Leader Partition对应的Broker主机
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
        // if there are any partitions whose leaders are not known yet, force metadata update
        //步骤3:标识还没有拉取到元数据的topic
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
 
            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
 
            this.metadata.requestUpdate();
        }
 
        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            //步骤4:检查与要发送的主机的网络是否已经建立好
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }
 
        // create produce requests
        /**步骤5:要发送的partition有很多个,很有可能有一些partition的Leader Partition有可能是在同一台服务器上的
        * 当我们的分区的个数大于集群节点个数时,一定会有多个Leader分区在同一台服务器上,
        * 它会按照Broker进行分组,同一个Broker的Partition为同一组
        */
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
 
        //步骤6:放弃超时的batch
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeoutMs, now);
        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }
 
        sensors.updateProduceRequestMetrics(batches);
 
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
        //步骤7:发送请求
        sendProduceRequests(batches, now);
 
        return pollTimeout;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/498529.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于clash退出后,华硕电脑连不上网了

关于clash退出后&#xff0c;华硕电脑连不上网了 问题记录 问题记录 昨天因为overleaf老断网&#xff0c;然后我就挂了一下clash&#xff08;第一次用&#xff0c;不怎么懂&#xff09;&#xff0c;后来直接关闭退出了。 今天早上突然浏览器的网页&#xff08;微软自带、华硕…

挑战14天学完Python----初识Python语法

往期文章 Java继承与组合 你知道为什么会划分数据类型吗?—JAVA数据类型与变量 10 &#xff1e; 20 && 10 / 0 0等于串联小灯泡?—JAVA运算符 你真的知道怎样用java敲出Hello World吗&#xff1f;—初识JAVA 目录 往期文章前言1.温度转换实例2. 程序格式框架2.1 高…

ATT汇编快速学习

说明 文档来源 https://flint.cs.yale.edu/cs421/papers/x86-asm/asm.html 使用AT&T语法; 原文档是intel语法翻译过来的; 内容 基于32位, x86的硬件环境; 指令仅仅介绍常用, 即还有很大一部分的指令并没有支持; 编译器(汇编器) GAS(GNU assembler: 即gnu组织提供; 使…

《斯坦福数据挖掘教程·第三版》读书笔记(英文版)Chapter 5 Link Analysis

来源&#xff1a;《斯坦福数据挖掘教程第三版》对应的公开英文书和PPT Chapter 5 Link Analysis Terms: words or other strings of characters other than white space. An inverted index is a data structure that makes it easy, given a term, to find (pointers to) a…

【数码】收音机,德生PL380使用教程与注意事项

文章目录 1、主界面功能介绍&#xff08;注意闹钟和自动关机&#xff09;2、电池和电池模式的匹配3、收音机天线与信号&#xff0c;耳机与噪音F、参考资料 1、主界面功能介绍&#xff08;注意闹钟和自动关机&#xff09; 红色的按钮&#xff1a;power 按一下开机&#xff0c;按…

DJYOS开源往事一:djyos爱好者大南山相聚写实

前言&#xff1a;DJYOS开源社区成立于2009年&#xff0c;后因为专注技术方向和垂直产业化等原因&#xff0c;2015年后关闭开源社区。斗转星移&#xff0c;DJYOS开源社区虽然关闭&#xff0c;但是DJYOS开源和精神依然在&#xff0c;转眼DJYOS发布十四年了。记录一下DJYOS开源往事…

Qt Plugin插件开发

一、Qt 插件机制 .1 Qt 插件简介 插件是一种遵循一定规范的应用程序接口编写出来的程序&#xff0c;定位于开发实现应用软件平台不具备的功能的程序。插件与宿主程序之间通过接口联系&#xff0c;就像硬件插卡一样&#xff0c;可以被随时删除&#xff0c;插入和修改&#xff…

2.2.4 Linux安装模式下,磁盘分区的选择(重要)

目录树结构 &#xff08;directory tree&#xff09; 整个Linux系统最重要的地方就是在于目录树架构。 所谓的目录树架构&#xff08;directory tree&#xff09;就是以根目录为主&#xff0c;然后向下呈现分支状的目录结构的一种文件架构。 所以&#xff0c;整个目录树架构最重…

Midjourney-Discord入门+高手指引手册

上一篇我们说了如何注册和订阅&#xff0c;今天我们来讲一讲相关的细节&#xff0c;首先我们来一个概览图&#xff0c;先有个大致的印象。 概览图 生成的四张图片&#xff0c;类似于Demo&#xff0c;从左至右&#xff0c;从上至下&#xff0c;1,2,3,4放大按钮U1&#xff0c;U2…

【旋转编码器如何工作以及如何将其与Arduino一起使用】

在本教程中,我们将学习旋转编码器的工作原理以及如何将其与Arduino一起使用。您可以观看以下视频或阅读下面的书面教程。 1. 概述 旋转编码器是一种位置传感器,用于确定旋转轴的角度位置。它根据旋转运动产生模拟或数字电信号。 有许多不同类型的旋转编码器按输出信号或传感…

三个令人惊艳超有用的 ChatGPT 项目,开源了!

自 3 月初 Open AI 开放 ChatGPT API 以来&#xff0c;GitHub 上诞生的开源项目数量之多&#xff0c;着实多得让我眼花缭乱、应接不暇。 今天&#xff0c;我将着重挑选几个对日常工作、生活、学习帮助较大的 ChatGPT 开源项目&#xff0c;跟大家分享下&#xff0c;希望对你有所…

Illustrator如何使用填充与线条之实例演示?

文章目录 0.引言1.绘制星空小插画2.制作清新小碎花壁纸3.双重描边文字 0.引言 因科研等多场景需要进行绘图处理&#xff0c;笔者对Illustrator进行了学习&#xff0c;本文通过《Illustrator CC2018基础与实战》及其配套素材结合网上相关资料进行学习笔记总结&#xff0c;本文对…

labelme标注数据集,并利用paddleseg完成标注数据的准备工作

1、安装labelme 1、创建labelme虚拟环境 先检查python的版本 python -V使用命令 conda create -n labelme python=3.9,创建虚拟环境 2、激活虚拟环境 conda activate labelme3、安装labelme pip install labelme2、labelme的使用 1、打开cmd输入命令 labelme2、进入label…

关于安装Node/Yarn/Electron过程中遇到的问题

目录 1、安装Node2、安装electron很慢3、PowerShell中无法使用yarn命令4、Yarn命令目录bin与其全局安装位置不在同一个文件夹 1、安装Node 【参考文章】Node.js下载安装及环境配置教程 2、安装electron很慢 npm config set electron_mirror https://npm.taobao.org/mirrors/…

Java日志详解

文章目录 1.日志的概述1.1 日志文件1.1.1 调试日志1.1.2 系统日志 1.2 JAVA日志框架1.2.1 为什么要用日志框架1.2.2 日志框架和日志门面 2.JUL2.1 JUL简介2.2 JUL组件介绍2.3 JUL的基本使用2.3.1 日志输出的级别2.3.2 日志的输出方式2.3.3 自定义日志的级别2.3.4 将日志输出到具…

20个最流行的3D打印机切片软件

3D 打印切片机&#xff08;Slicer&#xff09;通过生成 G 代码为你的 3D 打印机准备 3D 模型&#xff0c;G 代码是一种广泛使用的数控 (NC) 编程语言。 3D打印切片软件的选择范围很广。 因此&#xff0c;为了帮助你找到最合适的工具&#xff0c;本文列出了20个顶级 3D 打印切片…

2022年NOC大赛编程马拉松赛道决赛图形化高年级A卷-正式卷

2022年NOC大赛编程马拉松赛道决赛图形化高年级A卷-正式卷 2022NOC-图形化决赛高年级A卷正式卷 编程题&#xff1a;蓝色星球的绿色踪迹任务描述&#xff1a;有一颗蔚蓝色的星球&#xff0c;她就是我们人类赖以生存的家园——地球。地球是人类的母亲&#xff0c;她为我们提供了各…

第三十七章 Unity GUI系统(上)

Unity 提供了三个 UI 系统帮助我们创建游戏界面 (UI)&#xff1a; 第一&#xff0c;UI 工具包 第二&#xff0c;Unity UI 软件包 (UGUI) 第三&#xff0c;IMGUI UI 工具包是 Unity 中最新的 UI 系统。它旨在优化跨平台的性能&#xff0c;并基于标准 Web 技术。您可以使用 U…

并发编程04:LockSupport与线程中断

文章目录 4.1 线程中断机制4.1.1 从阿里蚂蚁金服面试题讲起4.1.2 什么是中断机制4.1.3 中断的相关API方法之三大方法说明4.1.4 大厂面试题中断机制考点4.1.5 总结 4.2 LockSupport是什么4.3 线程等待唤醒机制4.3.1 三种让线程等待和唤醒的方法4.3.2 Object类中的wait和notify方…

uboot命令体系 源码解读并从中添加命令

1、uboot命令体系基础 1.1、使用uboot命令 (1)uboot启动后进入命令行底下&#xff0c;在此输入命令并回车结束&#xff0c;uboot会接收这个命令并解析&#xff0c;然后执行。 1.2、uboot命令体系实现代码在哪里 (1)uboot命令体系的实现代码在uboot/common/下&#xff0c;其…