SprinBoot整合KafKa的使用(详解)

news2024/12/27 2:54:58

前言

1. 高吞吐量(High Throughput)

Kafka 设计的一个核心特性是高吞吐量。它能够每秒处理百万级别的消息,适合需要高频次、低延迟消息传递的场景。即使在大规模分布式环境下,它也能保持很高的吞吐量和性能,支持低延迟的数据传输。

2. 可扩展性(Scalability)

Kafka 具有强大的可扩展性。它支持水平扩展,可以轻松增加更多的节点来处理更多的数据流量。Kafka 通过分区机制(Partitioning)和分布式架构,确保即使数据量剧增,也能够平滑地扩展。
  • 分区:消息被划分成多个分区(partition),每个分区可以独立存储和读取数据,从而支持并行处理。
  • 副本:Kafka 会将每个分区的数据复制到多个节点上,以确保高可用性和容错性。

3. 高可用性和容错性(Fault Tolerance)

Kafka 提供了内建的高可用性和容错机制。它通过将消息复制到多个代理(broker)上来确保数据的可靠性,即使部分节点出现故障,数据仍然可以从其他副本中恢复。这种机制使得 Kafka 能够承受硬件故障而不会丢失数据。

4. 持久化和日志存储(Durability and Log Storage)

Kafka 的消息是持久化存储的,意味着消息会被写入磁盘并按日志顺序存储。这使得 Kafka 不仅可以作为一个消息队列,还能用作长期的日志存储系统,能够回溯历史数据。消息默认保留在 Kafka 中一段时间,消费者可以根据需要按需读取。

5. 低延迟(Low Latency)

Kafka 的设计能够提供低延迟的数据传输,特别适合于实时流处理的应用场景。Kafka 能够以毫秒级的延迟来传输大量数据,这对许多实时数据处理应用至关重要,比如实时分析、监控系统等。

6. 强大的消费模型(Consumer Model)

Kafka 提供了灵活的消费模型,允许消费者以不同的方式读取消息:
  • 发布/订阅模式:多个消费者可以同时订阅相同的主题(Topic),消息将被广播给所有消费者。
  • 点对点模式:消费者组(Consumer Group)机制允许多个消费者协调工作,处理不同的消息分区。
  • 消息偏移量:Kafka 维护每个消费者的消息偏移量(offset),消费者可以从任意位置开始读取消息,支持灵活的消息消费和重播。

7. 分布式和横向扩展(Distributed and Horizontal Scaling)

Kafka 本身是一个分布式系统,设计上支持横向扩展。随着系统需求的增加,可以简单地增加更多的 Kafka 节点,分区和副本会在节点之间自动重新平衡,确保负载均匀分布和高可用性。

8. 支持流处理(Stream Processing)

Kafka 不仅是一个消息队列,也可以作为流处理平台。通过 Kafka Streams API,可以对流数据进行实时处理(例如聚合、过滤、连接等)。此外,Kafka 可以与其他流处理框架(如 Apache Flink、Apache Spark)集成,构建复杂的数据处理管道。

9. 灵活的集成能力(Integration Capabilities)

Kafka 被广泛集成到各种技术栈中,包括传统数据库、数据仓库、大数据系统、微服务架构等。Kafka 的强大支持让它成为企业架构中的核心组件。
  • Kafka Connect:Kafka Connect 是 Kafka 官方提供的一个框架,用于将 Kafka 与外部系统(如数据库、文件系统、Hadoop、Elasticsearch 等)进行连接。它简化了系统之间的数据同步和集成。

10. 广泛的社区支持和生态系统

Kafka 拥有活跃的开源社区和庞大的生态系统。它得到了许多企业的广泛使用,拥有大量的插件和扩展工具,可以满足各种需求。Kafka 的生态系统包括但不限于:
  • Kafka Streams:用于实时数据流处理。
  • Kafka Connect:用于与外部系统集成。
  • KSQL:用于在 Kafka 上执行 SQL 查询的流处理工具。

11. 使用场景

Kafka 被广泛应用于以下场景:
  • 日志收集与传输:Kafka 可以作为一个日志聚合平台,将来自不同服务或系统的日志收集起来并传输到中央日志分析平台。
  • 实时数据处理:Kafka 用于支持实时数据处理和流计算,例如监控系统、推荐系统、用户行为分析等。
  • 事件驱动架构:Kafka 非常适合于事件驱动的微服务架构,帮助微服务之间实现解耦和异步通信。
  • 数据管道:Kafka 作为一个高效的数据管道,广泛用于将不同系统中的数据实时传输到数据仓库、数据湖或分析平台。

总结:

Kafka 是一个高性能、可扩展、容错且持久化的消息队列系统,非常适合处理大规模的实时数据流。它在大数据流处理、日志聚合、微服务架构、事件驱动架构等多个领域都有广泛应用。如果你的系统需要处理高吞吐量、低延迟、可扩展的消息传递和实时数据流处理,那么 Kafka 是一个非常合适的选择。

使用教程

1.导入依赖

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2.导入配置


# Spring
spring:
  kafka:
    producer:
      # Kafka服务器
      bootstrap-servers: 你自己的kafka服务器地址
      # 开启事务,必须在开启了事务的方法中发送,否则报错
      transaction-id-prefix: kafkaTx-
      # 发生错误后,消息重发的次数,开启事务必须设置大于0。
      retries: 3
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      # 开启事务时,必须设置为all
      acks: all
      # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 生产者内存缓冲区的大小。
      buffer-memory: 1024000
      # 键的序列化方式
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

    consumer:
      # Kafka服务器
      bootstrap-servers: 你自己的kafka服务器地址
      group-id: firstGroup
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      #auto-commit-interval: 2s
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
      # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
      properties:
        spring:
          json:
            trusted:
              packages: "*"
      # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
      # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
      # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
      # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
      # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
      # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
      max-poll-records: 3
    properties:
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      max:
        poll:
          interval:
            ms: 600000
      # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
      session:
        timeout:
          ms: 10000
    listener:
      # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
      concurrency: 4
      # 自动提交关闭,需要设置手动消息确认
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
      missing-topics-fatal: false
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      poll-timeout: 600000

3.config文件(一共4个,已经封装好直接使用即可)

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka配置,也可以写在yml,这个文件会覆盖yml
 */
@SpringBootConfiguration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.properties.session.timeout.ms}")
    private String sessionTimeout;
    @Value("${spring.kafka.properties.max.poll.interval.ms}")
    private String maxPollIntervalTime;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;
    @Value("${spring.kafka.listener.missing-topics-fatal}")
    private boolean missingTopicsFatal;
    @Value("${spring.kafka.listener.poll-timeout}")
    private long pollTimeout;

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> propsMap = new HashMap<>(16);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //自动提交的时间间隔,自动提交开启时生效
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
        //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
        //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
        //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
        //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
        //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
        //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
        //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
        //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
        //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
        //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return propsMap;
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        //配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
        try (JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {
            deserializer.trustedPackages("*");
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);
        }
    }

    /**
     * KafkaListenerContainerFactory 用来做消费者的配置
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //在侦听器容器中运行的线程数,一般设置为 机器数*分区数
        factory.setConcurrency(concurrency);
        //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
        factory.setMissingTopicsFatal(missingTopicsFatal);
        // 自动提交关闭,需要设置手动消息确认
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        // 设置为批量监听,需要用List接收
        // factory.setBatchListener(true);
        return factory;
    }
}
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.util.HashMap;
import java.util.Map;

@SpringBootConfiguration
public class KafkaProviderConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.transaction-id-prefix}")
    private String transactionIdPrefix;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
        //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
        //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
        //开启事务必须设为all
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        //发生错误后,消息重发的次数,开启事务必须大于0
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
        //批次的大小可以通过batch.size 参数设置.默认是16KB
        //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。
        //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟
        //实测batchSize这个参数没有用
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
        //即使数据没达到16KB,也将这个批次发送出去
        props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
        //生产者内存缓冲区的大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        //反序列化,和生产者的序列化方式对应
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Object, Object> producerFactory() {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        //开启事务,会导致 LINGER_MS_CONFIG 配置失效
        factory.setTransactionIdPrefix(transactionIdPrefix);
        return factory;
    }

    @Bean
    public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;

/**
 * kafka消息发送回调
 */
@Component
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {
    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        System.out.println("消息发送成功:" + producerRecord.toString());
    }

    @Override
    public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
        System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage());
    }
}
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 异常处理
 */
@Component
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

    @Override
    @NonNull
    public Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception) {
        return new Object();
    }

    @Override
    @NonNull
    public Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception,
                              Consumer<?, ?> consumer) {
        System.out.println("消息详情:" + message);
        System.out.println("异常信息::" + exception);
        System.out.println("消费者详情::" + consumer.groupMetadata());
        System.out.println("监听主题::" + consumer.listTopics());
        // TODO 消费记录
        return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
    }
}

4.代码实现

生产者:

    @Autowired
    private KafkaTemplate<Object,Object> kafkaTemplate;

    public void test() {
        // 生成一个随机的 UUID 字符串
        String message = UUID.randomUUID().toString();

        // 使用 KafkaTemplate 将消息发送到 Kafka 中的 "test" 主题
        // KafkaTemplate.send() 方法的第一个参数是目标主题名,第二个参数是要发送的消息内容
        kafkaTemplate.send("test", message);

        // 发送成功后,Kafka 会异步处理消息,返回一个 Future 对象, 
        // 如果需要进一步处理发送成功与否的回调,可以通过该对象的回调接口进行处理
    }


消费者:

import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;


@Component
@Log4j2
public class Consumer {

    @Autowired
    private StringRedisTemplate redisTemplate;


    private static final String REDIS_SET_KEY = "test";

    // Kafka 监听器配置
    @KafkaListener(topics = "test",
            containerFactory = "kafkaListenerContainerFactory", errorHandler = "myKafkaListenerErrorHandler")
    public void consumer(ConsumerRecord<Object, Object> consumerRecord, Acknowledgment acknowledgment) {
        String key = (String) consumerRecord.key();
        Object value = consumerRecord.value();

        // 记录消费的基本信息,便于追踪
        log.info("开始消费消息,Topic: {}, Partition: {}, Offset: {}, 消费内容: {}", 
             consumerRecord.topic(), 
             consumerRecord.partition(), 
             consumerRecord.offset(), 
             value);

        try {
            // Redis 去重,确保消息没有重复消费
            Long result = redisTemplate.opsForSet().add(REDIS_SET_KEY, key);

            // 如果 Redis 返回 1,表示该 key 尚未消费,才进行后续处理
            if (result != null && result == 1) {
                // 处理业务逻辑
                
                // 手动提交偏移量
                acknowledgment.acknowledge();
                log.info("消费成功,消费的信息: {}", value);
            } else {
                log.info("消息已消费过,跳过处理。key: {}", key);
                acknowledgment.acknowledge();
            }
        } catch (Exception e) {
            log.error("消费失败,错误信息: {}", e.getMessage(), e);

            // 如果发生异常,进行重试处理
            acknowledgment.nack(1000); // 可配置重试时间
        }
    }

    // 可选:定时清理 Redis 集合中的已消费记录
    @Scheduled(fixedDelay = 3600000) // 每小时清理一次
    public void cleanUpRedis() {
        // 可以根据消息处理的需要调整清理策略
        redisTemplate.delete(REDIS_SET_KEY);
        log.info("已清理 Redis 中的消费记录");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253234.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++预编译头文件

文章目录 c预编译头文件1.使用g编译预编译头文件2.使用visual studio进行预编译头文件2.1visual studio如何设置输出预处理文件&#xff08;.i文件&#xff09;2.2visual studio 如何设置预编译&#xff08;初始创建空项目的情况下&#xff09;2.3 visual studio打开输出编译时…

Zookeeper的通知机制是什么?

大家好&#xff0c;我是锋哥。今天分享关于【Zookeeper的通知机制是什么?】面试题。希望对大家有帮助&#xff1b; Zookeeper的通知机制是什么? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Zookeeper的通知机制主要通过Watcher实现&#xff0c;它是Zookeeper客…

基于Pyside6开发一个通用的在线升级工具

UI main.ui <?xml version"1.0" encoding"UTF-8"?> <ui version"4.0"><class>MainWindow</class><widget class"QMainWindow" name"MainWindow"><property name"geometry"&…

开源 - Ideal库 - Excel帮助类,ExcelHelper实现(四)

书接上回&#xff0c;前面章节已经实现Excel帮助类的第一步TableHeper的对象集合与DataTable相互转换功能&#xff0c;今天实现进入其第二步的核心功能ExcelHelper实现。 01、接口设计 下面我们根据第一章中讲解的核心设计思路&#xff0c;先进行接口设计&#xff0c;确定Exce…

嵌入式系统应用-LVGL的应用-平衡球游戏 part1

平衡球游戏 part1 1 平衡球游戏的界面设计2 界面设计2.1 背景设计2.2 球的设计2.3 移动球的坐标2.4 用鼠标移动这个球2.5 增加边框规则2.6 效果图2.7 游戏失败重启游戏 3 为小球增加增加动画效果3.1 增加移动效果代码3.2 具体效果图片 平衡球游戏 part2 第二部分文章在这里 1 …

《Python基础》之Pandas库

目录 一、简介 二、Pandas的核心数据结构 1、Series 2、DataFrame 三、数据读取与写入 1、数据读取 2、数据写入 四、数据清洗与处理 1、处理缺失值 2、处理重复值 3、数据转换 五、数据分析与可视化 1、统计描述 2、分组聚合 3、数据可视化 六、高级技巧 1、时…

网络安全-夜神模拟器如何通过虚拟机的Burp Suite代理应用程序接口

第一步、查看虚拟机的IP地址 我们可以通过ifconfig命令来查看虚拟机的IP地址,如下图所示。 第二步、在Burp Suite上设置代理 打开虚拟机上的Burp Suite,进入到代理模块中,进入到代理设置中心 打开系统代理设置中心之后,将我们虚拟机的地址添加到上面,作为新的代理。 第…

PyTorch 2.5.1: Bugs修复版发布

一&#xff0c;前言 在深度学习框架的不断迭代中&#xff0c;PyTorch 社区始终致力于提供更稳定、更高效的工具。最近&#xff0c;PyTorch 2.5.1 版本正式发布&#xff0c;这个版本主要针对 2.5.0 中发现的问题进行了修复&#xff0c;以提升用户体验。 二&#xff0c;PyTorch 2…

SpringAi整合大模型(进阶版)

进阶版是在基础的对话版之上进行新增功能。 如果还没弄出基础版的&#xff0c;请参考 https://blog.csdn.net/weixin_54925172/article/details/144143523?sharetypeblogdetail&sharerId144143523&sharereferPC&sharesourceweixin_54925172&spm1011.2480.30…

Python实现网站资源批量下载【可转成exe程序运行】

Python实现网站资源批量下载【可转成exe程序运行】 背景介绍解决方案转为exe可执行程序简单点说详细了解下 声明 背景介绍 发现 宣讲家网 的PPT很好&#xff0c;作为学习资料使用很有价值&#xff0c;所以想下载网站的PPT课件到本地&#xff0c;但是由于网站限制&#xff0c;一…

CSS函数

目录 一、背景 二、函数的概念 1. var()函数 2、calc()函数 三、总结 一、背景 今天我们就来说一说&#xff0c;常用的两个css自定义属性&#xff0c;也称为css函数。本文中就成为css函数。先来看一下官方对其的定义。 自定义属性&#xff08;有时候也被称作CSS 变量或者级…

UG NX二次开发(C#)-选择对象居中(不是全部居中)

文章目录 1、前言2、什么是对象居中3、功能实现代码3.1 对象居中3.1 恢复原视图1、前言 在UG NX二次开发过程中,我们经常会用到居中以查看完整的模型,但是对于如果想展示某些对象,而不是全部模型时,那么我们就想将选择的对象(如体对象)居中查看,当查看结束后还能恢复到…

动态规划-----路径问题

动态规划-----路径问题 下降最小路径和1&#xff1a;状态表示2&#xff1a;状态转移方程3 初始化4 填表顺序5 返回值6 代码实现 总结&#xff1a; 下降最小路径和 1&#xff1a;状态表示 假设&#xff1a;用dp[i][j]表示&#xff1a;到达[i,j]的最小路径 2&#xff1a;状态转…

[C++设计模式] 为什么需要设计模式?

文章目录 什么是设计模式&#xff1f;为什么需要设计模式&#xff1f;GOF 设计模式再次理解面向对象软件设计固有的复杂性软件设计复杂性的根本原因如何解决复杂性&#xff1f;分解抽象 结构化 VS 面向对象(封装)结构化设计代码示例&#xff1a;面向对象设计代码示例&#xff1…

级联树结构TreeSelect和上级反查

接口返回结构 前端展示格式 前端组件 <template><div ><el-scrollbar height"70vh"><el-tree :data"deptOptions" :props"{ label: label, children: children }" :expand-on-click-node"false":filter-node-me…

Figma入门-自动布局

Figma入门-自动布局 前言 在之前的工作中&#xff0c;大家的原型图都是使用 Axure 制作的&#xff0c;印象中 Figma 一直是个专业设计软件。 最近&#xff0c;很多产品朋友告诉我&#xff0c;很多原型图都开始用Figma制作了&#xff0c;并且很多组件都是内置的&#xff0c;对…

【Unity基础】使用InputSystem实现物体跳跃

要在Unity中使用 InputSystem 实现小球按空格键跳起的效果&#xff0c;可以按照以下步骤进行&#xff1a; 1. 安装 InputSystem 包 首先&#xff0c;确保你已经安装了 Input System 包。你可以通过以下步骤安装&#xff1a; 打开 Unity 编辑器&#xff0c;点击菜单 Window -…

【ArkTS】使用AVRecorder录制音频 --内附录音机开发详细代码

系列文章目录 【ArkTS】关于ForEach的第三个参数键值 【ArkTS】“一篇带你读懂ForEach和LazyForEach” 【小白拓展】 【ArkTS】“一篇带你掌握TaskPool与Worker两种多线程并发方案” 【ArkTS】 一篇带你掌握“语音转文字技术” --内附详细代码 【ArkTS】技能提高–“用户授权”…

一种多功能调试工具设计方案开源

一种多功能调试工具设计方案开源 设计初衷设计方案具体实现HUB芯片采用沁恒微CH339W。TF卡功能网口功能SPI功能IIC功能JTAG功能下行USB接口 安路FPGA烧录器功能Xilinx FPGA烧录器功能Jlink OB功能串口功能RS232串口RS485和RS422串口自适应接口 CAN功能烧录器功能 目前进度后续计…

浏览器的事件循环机制

浏览器和Node的事件循环机制 引言浏览器的事件循环机制 引言 由于JS是单线程的脚本语言&#xff0c;所以在同一时间只能做一件事情&#xff0c;当遇到多个任务时&#xff0c;我们不可能一直等待任务完成&#xff0c;这会造成巨大的资源浪费。为了协调时间&#xff0c;用户交互…