Apache Kafka Spring 集成

news2024/11/9 13:37:50

Apache Kafka Spring 集成

今天来学习Spring如何集成 Apache kafka,在Spring Boot中如何集成kafka客户端 生产、消费消息。首先介绍下各个组件的版本信息:

  • Apache Kafka_2.13-3.3.1
  • Spring Boot 3.0.0
  • Apache-Maven-3.6.0
  • JDK-17.0.5

启动Kafka

# 进入kafka安装主目录
# 启动Zookeeper 
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka 
bin/kafka-server-start.sh config/server.properties

Maven依赖

<modelVersion>4.0.0</modelVersion>
<groupId>org.kafka.spring.example</groupId>
<artifactId>kafka-spring-example</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>3.0.0</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
  <maven.compiler.source>17</maven.compiler.source>
  <maven.compiler.target>17</maven.compiler.target>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.1</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
</dependencies>

Spring Boot 之生产者

生产者代码

@SpringBootApplication
public class KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic100")
                .partitions(2)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic100", "test");
        };
    }
}

启动代码,系统自动创建topic 并发送消息至 Kafka Broker。

消费者代码

./bin/kafka-console-consumer.sh --topic topic100 --from-beginning --bootstrap-server localhost:9092

在这里插入图片描述

如上图,消费者程序已经成功地消费生产者发送的消息,至此已经实现了Spring Boot集成Kafka 生产者、消费者最简单的代码逻辑。

Kafka配置

细心的同学可能已经注意到了,上述生产者代码并没有指定kafka的配置,如连接地址 就能够正常的向 localhost:9092 的kafka 发送消息了。如果kafka安装在内网中的其他机器的话,就需要为生产者配置相关的属性,代码如下:

@Bean
public ProducerFactory<String, String> producerFactory() {
  return new DefaultKafkaProducerFactory<>(senderProps());
}

private Map<String, Object> senderProps() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  return props;
}

只需要在KafkaApplication增加如上两个方法代码,即可实现。如果再动态化一点,可以相关的配置放置到配置文件中,然后使用 @Value注解引用配置属性,再注入到构造kafka生产者/消费者实例中。

生产者监听

某些情况下,生产者需要监听数据是否发送成功,以便做特殊的业务处理。Spring 定义了核心接口 - ProducerListener用于实现数据发送的信息回调:

public interface ProducerListener<K, V> {
  //消息发送成功回调函数
  default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
  //消息发送失败回调函数
  default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);
}

默认情况下,发送模板类 - KafkaTemplate 配置LoggingProducerListener,它记录错误,当发送成功时不做任何操作。如果需要监听消息发送成功的状态,需要开发者实现接口并配置到KafkaTemplate类中。

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
  KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);
  template.setProducerListener(new ProducerListener(){
    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata metadata) {
      System.out.println("============== onSuccess : " + metadata.offset());
    }

    @Override
    public void onError(ProducerRecord producerRecord, RecordMetadata metadata, Exception exception) {
      System.out.println("============== onError : " + metadata.offset() + " , " + exception.getMessage());
    }
  });
  return template;
}

在KafkaApplication类中添加以上方法,然后重新启动程序,可以在控制台看到发送成功的日志信息。

发送结果

在这里插入图片描述

如上图,消息发送模板类 - KafkaTemplate 中定义的所有发送消息方法的返回对象都是 CompletableFuture<SendResult<K, V>>,CompletableFuture 类是 Future接口的实现类,因此可以调用该类的方法实现

  • 同步等待返回结果

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
      return args -> {
        CompletableFuture<SendResult<String, String>> future = template.send("topic100", "test");
        // 同步等待 发送结果
        SendResult<String, String> result = future.get(10, TimeUnit.SECONDS);
        RecordMetadata metadata = result.getRecordMetadata();
        System.out.println("offset : " + metadata.offset());
      };
    }
    
  • 异步执行发送结果

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
      return args -> {
        CompletableFuture<SendResult<String, String>> future = template.send("topic100", "test");
        future.whenComplete((result, ex) -> {
          if (ex == null) {
            System.out.println("数据发送成功");
            return;
          }
          System.out.println("数据发送失败: " + ex.getMessage());
        });
      };
    }
    

RoutingKafkaTemplate

从spring-kafka 2.5 开始,开发者可以使用RoutingKafkaTemplate在运行时根据目标主题名称选择生产者。但是需要注意:路由模板不支持事务、执行、刷新或metrics操作,因为此类操作时的主题未知

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {
        // 复制生产者默认的属性
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        // 覆盖属性
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        // 新注册一个生产者
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }
    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }
}

DefaultKafkaProducerFactory

框架内部使用ProducerFactory工厂类创建KafkaTemplate模板发送类。默认情况下,当不使用Transactions时,DefaultKafkaProducerFactory会创建一个供所有客户端使用的单例生产者,正如KafkaProduction javadocs中所建议的那样。但是,如果在模板上调用flush(),这可能会导致使用同一生成器的其他线程延迟。从2.3版开始,DefaultKafkaProducerFactory有一个新的属性producerPerThread。当设置为true时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。

注意:当producerPerThread为true时,当不再需要生产者时,用户代码必须在工厂上调用closeThreadBoundProducer()。这将实际关闭生成器并将其从ThreadLocal中删除。调用reset()或destroy()不会清除这些生产者。

ReplyingKafkaTemplate

2.1.3版引入了KafkaTemplate的子类,以提供请求/应答语义。该类名为ReplyingKafkaTemplate,有两个额外的方法;下面显示了方法签名:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

如果使用了第一个方法,或者replyTimeout参数为空,则使用模板的defaultReplyTimeout属性(默认为5秒)。相关代码如下

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

Spring Boot 之消费者

开发者可以通过配置MessageListenerContainer并提供消息监听器或使用@KafkaListener注释来接收消息。

初始化Factory

@Configuration
@EnableKafka
public class ConsumerConfiguration {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

消费监听类

@Component
public class KafkaConsumerExample {

    // concurrency : 覆盖工厂生产类的默认线程数量,表示使用多少个线程进行消费
    @KafkaListener(id = "myListener", topics = "topic100",
            autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
    public void listen(String data) {
        System.out.println(" ----------------------------- consumer data : " + data);
    }
}

在这里插入图片描述

从控制台可以看出,kafka消费者已经成功消费消息了。

指定分区消费

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    //
}

此外, 还可以使用逗号分隔 指定多个分区,如下:

@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}

消费元数据

开发者可以从消息头中获得有关记录的元数据。您可以使用以下标头名称来检索消费者元数据:

  • KafkaHeaders.OFFSET
  • KafkaHeaders.RECEIVED_KEY
  • KafkaHeaders.RECEIVED_TOPIC
  • KafkaHeaders.RECEIVED_PARTITION
  • KafkaHeaders.RECEIVED_TIMESTAMP
  • KafkaHeaders.TIMESTAMP_TYPE
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
  
}

批量消费

从1.1版开始,开发可以配置@KafkaListener方法来接收从消费者投票中接收的整批消费者记录。要配置侦听器容器工厂以创建批处理侦听器,可以设置batchListener属性。以下示例显示了如何执行此操作:

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    //
}

此外 还可以实现多种接口,以满足特殊场景的需要,如手动管理消费者位移等等。

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    //...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    //...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
   // ...
}

指定参数

开发者可以在@KafkaListener注解上 单独的设置消费参数,以覆盖通用的设置:

@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

类级别 @KafkaListener

在类级别使用@KafkaListener时,必须在方法级别指定@KafkaHandler。传递消息时,转换后的消息负载类型用于确定要调用的方法。以下示例显示了如何执行此操作:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }
}

消息过滤

在某些情况下,例如重平衡,可能会重新传递已经处理的消息,因此需要一种手段对此类重复的消息进行过滤。

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    //
}

如上述代码,开发者只需要在监听器注解上指定过滤器名称即可。同时开发者还需要实现核心接口 - RecordFilterStrategy

public interface RecordFilterStrategy<K, V> {

	/**
	 * 返回true 则表示该消息被过滤掉
	 * @param consumerRecord the record.
	 * @return true to discard.
	 */
	boolean filter(ConsumerRecord<K, V> consumerRecord);
	//...
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/135318.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ArcGIS基础实验操作100例--实验38删除冗余节点

本实验专栏参考自汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 高级编辑篇--实验38 删除冗余节点 目录 一、实验背景 二、实验数据 三、实验步骤 &#xff08;1&…

免费分享在线设计工具,比ps还方便的贺卡设计工具!

元旦贺卡在线制作工具&#xff0c;不用自己设计&#xff0c;只需借助在线工具平台的模板就能轻松搞定的贺卡设计方法&#xff01;跟着小编下面的设计步骤&#xff0c;使用在线工具乔拓云轻松设计活动贺卡&#xff0c;设计过程简单且一键生成分享链接&#xff0c;不用自己设计工…

推荐四款常见的电子教室软件,大家觉得哪款好用

现在越来越多的教学教室都在使用多媒体教学软件&#xff0c;不仅包括学校&#xff0c;还有一些培训机构和教堂。 那么&#xff0c;多媒体教学软件有哪些&#xff1f; 哪款多媒体教学软件好用&#xff1f; 在今天的文章中&#xff0c;小编挑选了4款流行的多媒体教学软件推荐给大…

【全网最细PAT题解】【PAT乙】1009 说反话(cin、getline、cin.getline三种输入格式)

题目链接 1009 说反话 题目描述 给定一句英语&#xff0c;要求你编写程序&#xff0c;将句中所有单词的顺序颠倒输出。输入格式&#xff1a; 测试输入包含一个测试用例&#xff0c;在一行内给出总长度不超过 80 的字符串。字符串由若干单词和若干空格组成&#xff0c;其中单词…

【Spring 入门教程2】

&#x1f308;博客主页&#xff1a;屠一乐的博客 &#x1f4c5; 发文时间&#xff1a;2023.1.2 &#x1f388; 一定存在只有你才能做成的事 &#x1f339; 博主水平有限&#xff0c;如有错误&#xff0c;欢迎指正 欢迎各位&#x1f44d;收藏&#x1f48e;评论✉ Spring_day02 S…

C++ - opencv应用实例之矩形框检测

C++ - opencv应用实例之矩形框检测 现阶段下,目标检测在实际应用场景中的表现颇为重要,工业质检、移动机器人视觉伺服、作业、交通监控、安防领域等均需要通过目标检测来实现对目标的定位、测量或者统计、辅助控制等目前目标检测主要分为两个方向的发展,其一是基于传统图像处…

let、const、var关键字

1、let ES6中新增的用于声明变量的关键字。 &#xff08;1&#xff09;let声明的变量只在所处于的块级有效 if (true) { let a 10;} console.log(a) // a is not defined 注意&#xff1a;使用let关键字声明的变量才具有块级作用域&#xff0c;使用var声明的变量不具备块级…

计算机网络(二)Linux网络编程

layout: post title: 计算机网络&#xff08;二&#xff09;Linux网络编程 description: 计算机网络&#xff08;二&#xff09;Linux网络编程 tag: 计算机网络 文章目录POSIX概念POSIX网络相关APIsocket()bind()网络字节序与主机字节序&#xff08;大小端设备&#xff09;list…

TensorFlow之过拟合与欠拟合-2

1 基本概念 过度拟合&#xff08;overfit&#xff09; 正则化&#xff08;regularization&#xff09; L1正则化&#xff08;L1 regularization&#xff09; L2正则化&#xff08;L2 regularization&#xff09; 删除正则化&#xff08;dropout regularization&#xff09…

植物大战僵尸:实现灵魂收割者

植物大战僵尸这款游戏可以说是很多90后的回忆了&#xff0c;基本上只要是90后或多或少的都接触过&#xff0c;而玩游戏与制作辅助是两个概念&#xff0c;今天我将给大家分享一些游戏辅助方面的制作技巧&#xff0c;之所以使用植物大战僵尸这款游戏是因为游戏简单容易分析&#…

【JavaSE成神之路】聊聊封装这件事

哈喽&#xff0c;我是兔哥呀&#xff0c;今天就让我们继续这个JavaSE成神之路&#xff01; 这一节啊&#xff0c;咱们要学习的内容是Java的封装。 1. 什么是封装 Java的封装是指&#xff0c;在一个类中把一些重要的信息隐藏起来&#xff0c;使得外部不能直接访问。 Java的封…

Java --- JUC之volatile

目录 一、volatile两大特点 二、volatile的内存语义 三、volatile内存屏障 四、volatile四大屏障 五、volatile的特性 六、volatile使用场景 一、volatile两大特点 1、可见性 2、有序性 二、volatile的内存语义 1、当写一个volatile变量时&#xff0c;JMM会把该线程对…

C语言入门系列 - 共用体union,枚举enum,宏定义#define,条件编译,const与指针

C语言入门系列 - 共用体union,枚举enum&#xff0c;宏定义#define,条件编译,const与指针 第一节 C 语言基础以及基本数据类型 第二节 C 语言运算符 第三节 C 语言控制语句 第四节 C 语言自定义函数 第五节 C 语言修饰变量的关键字 第六节 C 语言构造数据类型–数组 第七节 C 语…

创建型模式

创建型模式 创建型模式对类的实例化过程进行了抽象&#xff0c;能够将软件中对象的创建和使用分离&#xff0c;使整个系统的设计更加符合单一职责原则。 什么是对象的创建与对象的使用分离&#xff1f; 一个女生想吃苹果&#xff0c;怎么办&#xff1f; 对象的创建和对象的使用…

TC275——04Blinky-LED

项目工程框架 项目驱动文件这个见过&#xff0c;三个main文件真的是开了眼&#xff0c;一个main代表一个核吗&#xff1f; 按照以往对实现LED闪烁的流程&#xff0c;一般是先配置时钟、再配置IO、延时&#xff0c;实现反转效果。 Blinky_LED.c 在驱动文件里&#xff1a; /*…

数据分析 -Hive学习 Day5

HIVE 核心技能之窗口函数 大家好呀&#xff0c;这节课我们学习 Hive 核心技能中最难的部分——窗口函数。窗口函数我们之前在学 MySQL 的时候有学过一些&#xff0c;但是只学了三个排序的窗口函数。这节课我们会学习更多的窗口函数&#xff0c;包括累计计算、分区排序、切片排…

nvm中node包管理器在windows中的使用

你要使用的 1/nuvm 只能在Linux 和 OS X 2/推荐使用nvm-windows&#xff0c;git上&#xff0c;我认为还可以就使用 操作 1/下载的git地址nvm-windows 2/好像要清除掉所有node&#xff0c;是个注意点 3/安装 4/路径上不允许存在空格&#xff0c;否则后面会各种报错 5…

读书笔记《硬件十万个为什么——开发流程篇》

大家好,这里是大话硬件。 今天想给大家分享上周末在家写的读书笔记,内容来源于重读《硬件十万个为什么——开发流程篇》这本书的一些启发和总结。 1. 为什么我要重读这本书籍? 这本书收到快递的时间是2022.8.26,拆开快递的那个晚上大约花了2个小时从头到尾快速浏览了一次…

(创建失败)小米万兆路由器里的Docker安装Redis7.0

小米2022年12月份发布了万兆路由器&#xff0c;里面可以使用Docker。 今天尝试在小米的万兆路由器里安装Redis7.0。 创建失败&#xff0c;有时间时确认是否可以解决。 Server cant set maximum open files to 10032 because of OS error: Operation not permitted. Current m…

【STM32 Cortex-M4核中断实验】

STM32 Cortex-M4核中断实验实验要求MX相关设置LED设置按键设置光电开关&#xff0c;火焰传感器&#xff0c;人体红外设置UART4设置代码编写结果测试实验要求 使用M4核开发 PF7/PF8/PF9/PF5/PF12/PE15检测中断 要求1&#xff1a;当中断到来时&#xff0c;LED灯状态取反 要求2&a…