Kafka 生产调优

news2025/1/21 9:32:21

Kafka生产调优

文章目录

  • Kafka生产调优
    • 一、Kafka 硬件配置选择
      • 场景说明
      • 服务器台数选择
      • 磁盘选择
      • 内存选择
      • CPU选择
    • 二、Kafka Broker调优
      • Broker 核心参数配置
      • 服役新节点/退役旧节点
      • 增加副本因子
      • 调整分区副本存储
    • 三、Kafka 生产者调优
      • 生产者如何提高吞吐量
      • 数据可靠性
      • 数据去重
      • 数据乱序
    • 四、Kafka 消费者调优
      • 消费者重要参数🚩
      • 消费者再平衡
      • 指定offset进行消费
      • 指定时间进行消费
      • 消费者如何提高吞吐量
    • 五、Kafka总体调优
      • 如何提升吞吐量🚩
      • 数据精准一次
      • 合理设置分区数
      • 单条日志大于 1m的问题
      • 集群压力测试
      • Kafka Producer 压力测试
      • Kafka Consumer 压力测试

一、Kafka 硬件配置选择

场景说明

100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条。

1 亿 / 24 小时 / 60 分 / 60 秒 = 1150 条/每秒钟。

每条日志大小:0.5k ~ 2k(约1k)。

1150 条/每秒钟 * 1k ≈ 1m/s 。

高峰期每秒钟:1150 条 * 20 倍 = 23000 条。

所以高峰每秒数据量:20MB/s。

服务器台数选择

服务器台数= 2 * (生产者峰值生产速率 * 副本 / 100) + 1

​ = 2 * (20m/s * 2 / 100) + 1

​ = 3 台

磁盘选择

kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。建议选择普通的机械硬盘。

每天总数据量:1 亿条 * 1k ≈ 100g

100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T

建议三台服务器硬盘总大小,大于等于 1T。

内存选择

Kafka 内存组成:堆内存 + 页缓存

1)Kafka 堆内存建议每个节点:10g ~ 15g(在 kafka-server-start.sh 中修改)

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi

2)页缓存:页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(1g)中25%的数据在内存中就好。

每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如 10 个分区,页缓存大小=(10 * 1g * 25%)/ 3 ≈ 1g

建议服务器内存大于等于 11G。

CPU选择

  • num.io.threads = 8 负责写磁盘的线程数,整个参数值要占总核数的 50%。
  • num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的 50%的 1/3。
  • num.network.threads = 3 数据传输线程数,这个参数占总核数的 50%的 2/3。

建议 32 个 cpu core。

二、Kafka Broker调优

Broker 核心参数配置

在这里插入图片描述

参数名称描述
replica.lag.time.max.msISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认30s。
auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytesKafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hoursKafka 中数据保存的时间,默认 7 天。
log.retention.minutesKafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.msKafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes默认等于-1,表示无穷大(也表示关闭)。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策
num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers默认是 1。副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。
log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改, 交给系统自己管理。
log.flush.interval.ms每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

服役新节点/退役旧节点

(1)创建一个要均衡的主题。

$ vim topics-to-move.json 
{
   "topics": [
 		{"topic": "first"}
 	],
 	"version": 1
}

(2)生成一个负载均衡的计划。

$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中),由步骤2生成的👆

$ vim increase-replication-factor.json

(4)执行副本存储计划。

$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --verify

增加副本因子

1)创建测试 topic(3分区、1副本)

$ bin/kafka-topics.sh --bootstrap-server node102:9092 --create --partitions 3 --replication-factor 1 --
topic four

2)手动增加副本存储(3分区、3副本),创建副本存储计划,所有副本都指定存储在 broker0、broker1、broker2 中

vim increase-replication-factor.json

{"version":1,"partitions":[
	{"topic":"four","partition":0,"replicas":[0,1,2]},
	{"topic":"four","partition":1,"replicas":[0,1,2]},
	{"topic":"four","partition":2,"replicas":[0,1,2]}
]}

3)执行副本存储计划。

$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --execute

调整分区副本存储

(1)创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。

 vim increase-replication-factor.json

{
	"version":1,
	"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
	{"topic":"three","partition":1,"replicas":[0,1]},
	{"topic":"three","partition":2,"replicas":[1,0]},
	{"topic":"three","partition":3,"replicas":[1,0]}
	]
}

(2)执行副本存储计划。

$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --execute

(3)验证副本存储计划

$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --verify

三、Kafka 生产者调优

生产者如何提高吞吐量

参数名称描述
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m。
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传 以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd。

数据可靠性

参数名称描述
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列 里面的所有节点收齐数据后应答。默认值是-1

至少一次 = ACK 级别设置为-1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2

数据去重

参数名称描述
enable.idempotence是否开启幂等性,默认 true,表示开启幂等性。

Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();

// 2 开启事务
void beginTransaction() throws ProducerFencedException;

// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws 
ProducerFencedException;

// 4 提交事务
void commitTransaction() throws ProducerFencedException;

// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

数据乱序

参数名称描述
enable.idempotence是否开启幂等性,默认 true,表示开启幂等性。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保证该值是 1-5 的数字。

四、Kafka 消费者调优

消费者重要参数🚩

参数描述
bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和 value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit自动提交offset开关,默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms消费者偏移量向Kafka提交的频率,默认5s。(如果设置自动提交offset时才生效)
auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。
latest:默认,自动重置偏移量为最新的偏移量。
none:如果消费组原来的(previous)偏移量
offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区。
heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。
☘️session.timeout.msKafka consumer 和 coordinator 之间连接超时时间,默认 45s。 超过该值,该消费者被移除,消费者组执行再平衡。
☘️max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
🚩fetch.min.bytes消费者获取服务器端一批消息最小的字节数。 默认 1 个字节
🚩fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
🚩fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
🚩max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

消费者再平衡

参数名称描述
heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的 1/3。
session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认 45s。 超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy消费者分区分配策略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。 可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky

指定offset进行消费

public class CustomConsumerByHandSync {
   public static void main(String[] args) {
   // 1. 创建 kafka 消费者配置类
   Properties properties = new Properties();
   // 2. 添加配置参数
   // 添加连接
   properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   // 配置序列化 必须
   properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringDeserializer");
   properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                   "org.apache.kafka.common.serialization.StringDeserializer");
     
   // 配置消费者组 
   properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
     
   // 是否自动提交 offset
   properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     
   //3. 创建 kafka 消费者
   KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
     
   // 2 订阅一个主题
   ArrayList<String> topics = new ArrayList<>();
   topics.add("first");
   kafkaConsumer.subscribe(topics);
   
   //🚩 获取消费者分区信息,并指定offset进行消费
   Set<TopicPartition> assignment= new HashSet<>();
 	  while (assignment.size() == 0) {
 		 kafkaConsumer.poll(Duration.ofSeconds(1));
 		 // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
 		 assignment = kafkaConsumer.assignment();
       }
 	    
      // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
	  for (TopicPartition tp: assignment) {
	  	kafkaConsumer.seek(tp, 1700);
	  }
     
     // 消费数据
	 while (true){
 		// 读取消息
 	    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
       		// 输出消息
 			for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
 				System.out.println(consumerRecord.value());
 			}
 	}
     
  }
}   

指定时间进行消费

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

public class CustomConsumerByHandSync {
   public static void main(String[] args) {
   // 1. 创建 kafka 消费者配置类
   Properties properties = new Properties();
   // 2. 添加配置参数
   // 添加连接
   properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   // 配置序列化 必须
   properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringDeserializer");
   properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                   "org.apache.kafka.common.serialization.StringDeserializer");
     
   // 配置消费者组 
   properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
     
   // 是否自动提交 offset
   properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     
   //3. 创建 kafka 消费者
   KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
     
   // 2 订阅一个主题
   ArrayList<String> topics = new ArrayList<>();
   topics.add("first");
   kafkaConsumer.subscribe(topics);
   
   //🚩 获取消费者分区信息,并指定offset进行消费
   Set<TopicPartition> assignment= new HashSet<>();
   while (assignment.size() == 0) {
 	  kafkaConsumer.poll(Duration.ofSeconds(1));
 	  // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
 	  assignment = kafkaConsumer.assignment();
   }
     
   HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();	    
   // 封装集合存储,每个分区对应一天前的数据
   for (TopicPartition topicPartition : assignment) {
       timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
   }
    // 获取从 1 天前开始消费的每个分区的 offset
 	Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
    // 遍历每个分区,对每个分区设置消费时间。
 	for (TopicPartition topicPartition : assignment) {
 		OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
 		// 根据时间指定开始消费的位置
 		if (offsetAndTimestamp != null){
 			kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
 		}
 	}
     
     // 消费数据
	 while (true){
 		// 读取消息
 	    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
       		// 输出消息
 			for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
 				System.out.println(consumerRecord.value());
 			}
 	}
     
  }
}   	

消费者如何提高吞吐量

增加分区数:

$ bin/kafka-topics.sh --bootstrap-server node102:9092 --alter --topic first --partitions 3
参数名称描述
fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config) or max.message.bytes (topic config)影响。
max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

五、Kafka总体调优

如何提升吞吐量🚩

1)提升生产吞吐量

  • buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。
  • batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时
  • linger.ms,这个值默认是 0,意思就是消息必须立即被发送。一般设置一个 5~100毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
  • compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的 CPU 开销。

2)增加分区

3)消费者提高吞吐量

  • 调整 fetch.max.bytes 大小,默认是 50m。
  • 调整 max.poll.records 大小,默认是 500 条。

4)增加下游消费者处理能力

数据精准一次

1)生产者角度

  • acks 设置为-1 (acks=-1)
  • 幂等性(enable.idempotence = true) + 事务

2)broker 服务端角度

  • 分区副本大于等于 2 (–replication-factor 2)
  • ISR 里应答的最小副本数量大于等于 2 (min.insync.replicas = 2)

3)消费者

  • 事务 + 手动提交 offset (enable.auto.commit = false)
  • 消费者输出的目的地必须支持事务(MySQL、Kafka)

合理设置分区数

(1)创建一个只有 1 个分区的 topic。

(2)测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。

(3)假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。

(4)然后假设总的目标吞吐量是 Tt,那么分区数 = Tt / min(Tp,Tc)。

例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;

分区数 = 100 / 20 = 5 分区

分区数一般设置为:3-10 个

分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。

单条日志大于 1m的问题

参数名称描述
message.max.bytes默认 1m,broker 端接收每个批次消息最大值
max.request.size默认 1m,生产者发往 broker 每个请求消息最大值。针对 topic级别设置消息体的大小
replica.fetch.max.bytes默认 1m,副本同步数据,每个批次消息最大值
fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对 最大值。一批次的大小受 message.max.bytes (broker config) or max.message.bytes (topic config)影响

集群压力测试

用 Kafka 官方自带的脚本,对 Kafka 进行压测

  • 生产者压测:kafka-producer-perf-test.sh
  • 消费者压测:kafka-consumer-perf-test.sh

Kafka Producer 压力测试

(1)创建一个 test topic,设置为 3 个分区 3 个副本

$ bin/kafka-topics.sh --bootstrap-server node102:9092 --create --replication-factor 3 --partitions 3 --topic test

(2)在/opt/module/kafka/bin 目录下面有这两个文件。我们来测试一下

$ bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node102:9092,node103:9092,node104:9092 batch.size=16384 linger.ms=0

测试参数说明:

  • record-size 是一条信息有多大,单位是字节,本次测试设置为 1k。
  • num-records 是总共发送多少条信息,本次测试设置为 100 万条。
  • throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。本次设置为每秒钟 1 万条。
  • producer-props 后面可以配置生产者相关参数,batch.size 配置为 16k

调优参数:

更改不同的值进行压测

参数名称描述
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m。
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传 以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd。

Kafka Consumer 压力测试

(1)修改/opt/module/kafka/config/consumer.properties 文件中的一次拉取条数为 500

max.poll.records=500

(2)消费 100 万条日志进行压测

$ bin/kafka-consumer-perf-test.sh --bootstrap-server node102:9092,node103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties

参数说明:

  • –bootstrap-server 指定 Kafka 集群地址
  • –topic 指定 topic 的名称
  • –messages 总共要消费的消息个数。本次实验 100 万条

调优参数优化:(在consumer.properties中进行修改)

  • 修改一次拉取的数量:max.poll.records=2000
  • 调整文件中的拉取一批数据大小 100m:fetch.max.bytes=104857600

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1441377.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何写一个其他人可以使用的GitHub Action

前言 在GitHub中&#xff0c;你肯定会使用GitHub Actions自动部署一个项目到GitHub Page上&#xff0c;在这个过程中总要使用workflows工作流&#xff0c;并在其中使用action&#xff0c;在这个使用的过程中&#xff0c;总会好奇怎么去写一个action呢&#xff0c;所以&#xff…

无人机图像识别技术研究及应用,无人机AI算法技术理论,无人机飞行控制识别算法详解

在现代科技领域中&#xff0c;无人机技术是一个备受瞩目的领域。随着人们对无人机应用的需求在不断增加&#xff0c;无人机技术也在不断发展和改进。在众多的无人机技术中&#xff0c;无人机图像识别技术是其中之一。 无人机图像识别技术是利用计算机视觉技术对无人机拍摄的图像…

Project 2010下载安装教程,保姆级教程,附安装包和工具

前言 Project是一款项目管理软件&#xff0c;不仅可以快速、准确地创建项目计划&#xff0c;而且可以帮助项目经理实现项目进度、成本的控制、分析和预测&#xff0c;使项目工期大大缩短&#xff0c;资源得到有效利用&#xff0c;提高经济效益。软件设计目的在于协助专案经理发…

6、5 门关于 AI 和 ChatGPT 的免费课程,带您从 0-100

5 门关于 AI 和 ChatGPT 的免费课程,带您从 0-100 想在 2024 年免费了解有关 AI 和 ChatGPT 的更多信息吗? 图片由 DALLE 3 提供 活着是多么美好的时光啊。还有什么比现在更适合了解生成式人工智能(尤其是 ChatGPT)等人工智能元素的呢!许多人对这个行业感兴趣,但有些…

一文读懂:MybatisPlus从入门到进阶

快速入门 简介 在项目开发中&#xff0c;Mybatis已经为我们简化了代码编写。 但是我们仍需要编写很多单表CURD语句&#xff0c;MybatisPlus可以进一步简化Mybatis。 MybatisPlus官方文档&#xff1a;https://www.baomidou.com/&#xff0c;感谢苞米豆和黑马程序员。 Mybat…

表单标记(html)

前言 发现input的type属性还是有挺多的&#xff0c;这里把一些常用的总结一下。 HTML 输入类型 (w3school.com.cn)https://www.w3school.com.cn/html/html_form_input_types.asp text-文本 文本输入,如果文字太长&#xff0c;超出的部分就不会显示。 定义供文本输入的单行…

C语言操作符超详细总结

文章目录 1. 操作符的分类2. 二进制和进制转换2.1 2进制转10进制2.1.1 10进制转2进制数字 2.2 2进制转8进制和16进制2.2.1 2进制转8进制2.2.2 2进制转16进制 3. 原码、反码、补码4.移位操作符4.1 左移操作符4.2 右移操作符 5. 位操作符&#xff1a;&、|、^、~6. 逗号表达式…

vue3 之 通用组件统一注册全局

components/index.js // 把components中的所组件都进行全局化注册 // 通过插件的方式 import ImageView from ./ImageView/index.vue import Sku from ./XtxSku/index.vue export const componentPlugin {install (app) {// app.component(组件名字&#xff0c;组件配置对象)…

图解 V8 执行 JS 的过程

本文来分享 V8 引擎执行 JavaScript 的过程 1. JS 代码执行过程 在说V8的执行JavaScript代码的机制之前&#xff0c;我们先来看看编译型和解释型语言的区别。 编译型语言和解释型语言 我们知道&#xff0c;机器是不能直接理解代码的。所以&#xff0c;在执行程序之前&#xf…

Java_栈_队列

文章目录 一、栈&#xff08;Stack&#xff09;1.概念2.栈的使用3.栈的模拟实现1、定义接口2、定义栈3、成员4、构造方法5、判断空间是否满 full6、入栈 push7、出栈 pop8、获取栈顶元素 peek9、获取栈中有效元素个数 size10、检测栈是否为空 empty完整代码 4.练习1、有效括号2…

GEE Colab——如何利用Matplotlib在colab中进行图形制作

在colab中绘制图表 笔记本的一个常见用途是使用图表进行数据可视化。Colaboratory 提供多种图表工具作为 Python 导入,让这一工作变得简单。 Matplotlib Matplotlib 是最常用的图表工具包,详情请查看其文档,并通过示例获得灵感。 线性图 线性图是一种常见的图表类型,用…

LabVIEW网络测控系统

LabVIEW网络测控系统 介绍了基于LabVIEW的网络测控系统的开发与应用&#xff0c;通过网络技术实现了远程的数据采集、监控和控制。系统采用LabVIEW软件与网络通信技术相结合&#xff0c;提高了系统的灵活性和扩展性&#xff0c;适用于各种工业和科研领域的远程测控需求。 随着…

哈希表(Hash Table)-----运用实例【通过哈希表来管理雇员信息】(java详解) (✧∇✧)

目录 一.哈希表简介&#xff1a; 实例介绍&#xff1a; 类的创建与说明&#xff1a; 各功能图示&#xff1a; 1.class HashTab{ }; 2. class EmpLinkedList{ }&#xff1b; 3. class Emp{ }&#xff1b; 4.测试&#xff1a; 运行结果&#xff1a; 最后&#xff0c;完整…

springboot微信小程序uniapp学习计划与日程管理系统

基于springboot学习计划与日程管理系统&#xff0c;确定学习计划小程序的目标&#xff0c;明确用户需求&#xff0c;学习计划小程序的主要功能是帮助用户制定学习计划&#xff0c;并跟踪学习进度。页面设计主要包括主页、计划学习页、个人中心页等&#xff0c;然后用户可以利用…

Java汽车销售管理

技术架构&#xff1a; springboot mybatis Mysql5.7 vue2 npm node 有需要该项目的小伙伴可以私信我你的Q。 功能描述&#xff1a; 针对汽车销售提供客户信息、车辆信息、订单信息、销售人员管理、财务报表等功能&#xff0c;提供经理和销售两种角色进行管理 效果图&…

CTF--Web安全--SQL注入之‘绕过方法’

一、什么是绕过注入 众所周知&#xff0c;SQL注入是利用源码中的漏洞进行注入的&#xff0c;但是有攻击手段&#xff0c;就会有防御手段。很多题目和网站会在源码中设置反SQL注入的机制。SQL注入中常用的命令&#xff0c;符号&#xff0c;甚至空格&#xff0c;会在反SQL机制中…

预测模型:MATLAB线性回归

1. 线性回归模型的基本原理 线性回归是统计学中用来预测连续变量之间关系的一种方法。它假设变量之间存在线性关系&#xff0c;可以通过一个或多个自变量&#xff08;预测变量&#xff09;来预测因变量&#xff08;响应变量&#xff09;的值。基本的线性回归模型可以表示为&…

Ps:窗口排列

Ps菜单&#xff1a;窗口/排列 Window/Arrange Photoshop 的“窗口/排列” Arrange子菜单中提供了多种方式来组织和查看打开的文档窗口&#xff0c;这在处理多个文档或比较图像时非常有用。 ◆ ◆ ◆ 常用操作方法与技巧 1、同文档双窗口处理法 将同一个图像显示在两个窗口中&…

The Back-And-Forth Method (BFM) for Wasserstein Gradient Flows windows安装

本文记录了BFM算法代码在windows上的安装过程。 算法原网站&#xff1a;https://wasserstein-gradient-flows.netlify.app/ github&#xff1a;https://github.com/wonjunee/wgfBFMcodes 文章目录 FFTWwgfBFMcodesMATLABpython注 FFTW 官网/下载路径&#xff1a;https://ww…

备战蓝桥杯---动态规划(基础2)

本专题主要是介绍几个比较经典的题目&#xff1a; 假设我们令f[i]为前i个的最长不下降子序列&#xff0c;我们会发现难以转移方程很难写&#xff08;因为我们不知道最后一个数&#xff09;。 于是&#xff0c;我们令f[i]为以i结尾的最长不下降子序列&#xff0c;这样子我们就可…