Kafka(四)消费者消费消息

news2025/1/18 17:07:40

文章目录

  • 如何确保不重复消费消息?
  • 消费者业务逻辑重试
  • 消费者提交
  • 自定义反序列化类
  • 消费者参数配置及其说明
    • 重要的参数session.time.ms和heartbeat.interval.ms和group.instance.id
    • 增加消费者的吞吐量
    • 消费者消费的超时时间和poll()方法的关系
  • 消费者消费逻辑
  • 启动消费者
  • 关闭消费者
  • 配置listener
  • 结语
  • 示例源码仓库

在 上一篇文章里,对于生产者,发送时失败之后会由定时任务进行重新发送, 并且我们是根据消息的key进行分区的, 所以不管我们重新发送了多少次,对于同一个key,始终会被送到同一个分区

那么到消费者这里,最重要的问题是如何确保不会重复消费之前因为各种原因被重新发送到某个分区的消息。

如何确保不重复消费消息?

基本思路如下

  1. 我们在数据库中创建了一个已成功消费的消息表,里面只有一列,消息的key。当消费者消费逻辑成功之后,我们会把其key保存到这张表里 。
  2. 当消费者拉取新的一批消息时,我们会去数据库的消息表里查是否已经存在该消息的key,存在的话,就跳过实际的消费业务。
  3. 一批消息里也可能存在相同的key,所以我们处理完一次消费业务,就把该key放到一个set里,消费下一条消息时,则先去set里看一下,存在的话即跳过,不存在则正常执行消费业务。即使前面的消息消费业务失败了,后面相同key的消息也直接跳过,不会再次消费

消费者业务逻辑重试

对于消费者业务逻辑的重试,我们使用failsafe框架进行重试,该框架的使用可参考官方文档,这里不做过多赘述。

消费者提交

这里的方式采用的是Kafka权威指南中消费者一章中提出的方式。 异步+同步。平时使用异步提交,在关闭消费者时,使用同步提交,确保消费者退出之前将当前的offset提交上去。

自定义反序列化类

在生产者端,我们发送自定义的对象时,利用自定义序列化类将其序列化为JSON。在消费者端,我们同样需要自定义反序列类将JSON转为我们之前的对象

public class UserDTODeserializer implements Deserializer<UserDTO> {
    
    @Override
    @SneakyThrows
    public UserDTO deserialize(final String s, final byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(bytes, UserDTO.class);
    }
}

消费者参数配置及其说明

    /**
     * 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景需求 自己调整
     * https://kafka.apache.org/26/documentation/#group.instance.id
     *
     * 为什么需要group.instance.id?
     * 假设auto.offset.reset=latest
     * 1. 如果没有group.instance.id,那么kafka会认为此消费者是dynamic member,在重启期间如果有消息发送到topic,那么重启之后,消费者会【丢失这部分消息】
     * 假如auto.offset.reset=earliest
     * 1. 如果没有group.instance.id,那么kafka会认为此消费者是dynamic member,在重启期间如果有消息发送到topic,那么重启之后,消费者会重复消费【全部消息】
     *
     * 光有group.instance.id还不够,还需要修改heartbeat.interval.ms和session.timeout.ms的值为合理的值
     * 如果程序部署,重启期间,重启时间超过了session.timeout.ms的值,那么kafka会认为此消费者已经挂了会触发rebalance,在一些大型消息场景,rebalance的过程可能会很慢, 更详细的解释请参考
     * https://kafka.apache.org/26/documentation/#static_membership
     * @param groupInstanceId
     * @return
     */
    public static Properties loadConsumerConfig(int groupInstanceId, String valueDeserializer) {
        Properties result = new Properties();
        result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9093");
        result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        result.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 代表此消费者是消费者组的static member
        result.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "test-" + ++groupInstanceId);
        // 修改heartbeat.interval.ms和session.timeout.ms的值,和group.instance.id配合使用,避免重启或重启时间过长的时候,触发rebalance
        result.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 60);
        result.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000 * 60 * 5);
        // 关闭自动提交
        result.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
        // 默认1MB,增加吞吐量,其设置对应的是每个分区,也就是说一个分区返回10MB的数据
        result.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576 * 10);
        result.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        // 返回全部数据的大小
        result.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1048576 * 100);
        // 默认5分钟
        result.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000 * 60 * 5);
        return result;
    }

重要的参数session.time.ms和heartbeat.interval.ms和group.instance.id

三者的使用方式见上面代码中的注释。

增加消费者的吞吐量

和上一篇文章一样,由于我们的邮件消息每个大概是20KB,使用默认的消费者参数,吞吐量是上不来的。 所以做了一些优化,除了消费者消费逻辑要尽可能简单之外,为了增加消费者的吞吐量,可以根据实际场景修改倒数第4、3、2个参数。

消费者消费的超时时间和poll()方法的关系

由max.poll.interval.ms参数控制,默认5分钟。如果消费者业务逻辑处理特别耗时,在5分钟之内没有再次调用poll()拉取消息,则Kafka认为消费者已死,根据具体配置会立刻触发rebalance还是等一段时间再触发rebalance。

这里特别强调一下,网上有一部分文章说是要确保消费逻辑在poll(timeUnit)时间内处理完,否则就会触发rebalance。这都是很早之前的Kafka版本了,是因为原来消费者的poll()线程和心跳线程使用的是同一个线程。现在的版本早就把这两个分开了。所以你只需要注意,自己的消费逻辑别超过max.poll.interval.ms即可,如果觉得不够用,也可自己调整。

poll()方法中的时间代表的是多长时间去拉取一次消息。假设你设置的是1分钟,你的消费逻辑处理的很快,可能用了10s。那么在你消费完了之后,消费者会在1分钟之后拉取新消息。

在消费者中使用手动提交。

消费者消费逻辑

这里要注意

  1. 如果消费逻辑可能抛出异常,则使用try-catch处理,防止因为抛出异常,导致我们错误的关闭了消费者
  2. 消费者消费逻辑失败时会重试,重试N次之后,我们会将其保存在数据库中,以便和生产者一样,定时处理失败的消息
  3. 消费逻辑没问题的话,则把该消息的key进行入库处理
@Log
public class MessageConsumerRunner implements Runnable {
    
    private final AtomicBoolean closed = new AtomicBoolean(false);
    
    private MessageAckConsumesSuccessService messageAckConsumesSuccessService = new MessageAckConsumesSuccessService();
    
    private MessageFailedService messageFailedService = new MessageFailedService();
    
    private final KafkaConsumer<String, UserDTO> consumer;
    
    private final int consumerPollIntervalSecond;
    
    public MessageConsumerRunner(KafkaConsumer<String, UserDTO> consumer, int consumerPollIntervalSecond) {
        this.consumer = consumer;
        this.consumerPollIntervalSecond = consumerPollIntervalSecond;
    }
    
    /**
     * 1. 使用https://failsafe.dev/进行重试
     * 2. 每次消费消息前,判断消息ID是否存在于数据库中和当前Set集合中,避免重复消费,
     *    我们的消息时根据消息的key进行hash分区的,所以同一个消息即使生产多次,一定会到同一个partition中,partition动态增加引起的特殊情况不在考虑范围之内
     * 4. 在一次消费消息中重试两次,如果两次都失败,那么将失败原因、消息的JSON字符串插入到message_failed表中,以便后续再次生产或排查问题
     * 3. 平时异步提交,关闭消费者时使用同步提交
     */
    @Override
    public void run() {
        AtomicReference<String> errorMessage = new AtomicReference<>(StringUtils.EMPTY);
        RetryPolicy<Boolean> retryPolicy = RetryPolicy.<Boolean>builder()
            .handle(Exception.class)
            // 如果业务逻辑返回false或者抛出异常,则重试
            .handleResultIf(Boolean.FALSE::equals)
            // 不包含首次
            .withMaxRetries(2)
            .withDelay(Duration.ofMillis(200))
            .onRetry(e -> log.warning("consume message failed, start the {}th retry"+ e.getAttemptCount()))
            .onRetriesExceeded(e -> {
                Optional.ofNullable(e.getException()).ifPresent(u -> errorMessage.set(u.getMessage()));
                log.severe("max retries exceeded" + e.getException());
            })
            .build();
        Fallback<Boolean> fallback = Fallback.<Boolean>builder(e -> {
            // do nothing, suppress exceptions
        }).build();
        try {
            consumer.subscribe(Collections.singletonList("email"));
            while (!closed.get()) {
                // get message from kafka
                ConsumerRecords<String, UserDTO> records = consumer.poll(Duration.ofSeconds(consumerPollIntervalSecond));
                if (records.isEmpty()) {
                    return;
                }
                Set<UserDTO> successConsumed = new HashSet<>();
                Set<UserDTO> failedConsumed = new HashSet<>();
                Map<String, String> failedConsumedReason = new HashMap<>();
                // check message if exist in database
                Set<String> checkingMessageIds = new HashSet<>(records.count());
                records.iterator().forEachRemaining(item -> checkingMessageIds.add(item.value().getMessageId()));
                Set<String> hasBeenConsumedMessageIds = messageAckConsumesSuccessService.checkMessageIfExistInDatabase(checkingMessageIds);
                records.forEach(item -> {
                    if (hasBeenConsumedMessageIds.contains(item.value().getMessageId())) {
                        // if exist, continue
                        return;
                    }
                    // 每一批消息中也可能存在同样的消息,所以需要再次判断
                    hasBeenConsumedMessageIds.add(item.value().getMessageId());
                    try {
                        Failsafe.with(fallback, retryPolicy)
                            .onSuccess(e -> successConsumed.add(item.value()))
                            .onFailure(e -> {
                                failedConsumed.add(item.value());
                                failedConsumedReason.put(item.value().getMessageId(), StringUtils.isNotBlank(errorMessage.get()) ? errorMessage.get() : "no reason, may be check server log");
                                errorMessage.set(StringUtils.EMPTY);
                            })
                            .get(() -> {
                                // 这里是业务逻辑,可以返回true或false,为什么要这样?是因为上面RetryPolicy这里定义的boolean,根据自己实际业务设置相应的类型
                                return true;
                            });
                        // 这里要catch住所有业务异常,防止由业务异常导致消费者线程退出
                    }catch (Exception e) {
                        log.severe("failed to consume email message" + e);
                        failedConsumed.add(item.value());
                        failedConsumedReason.put(item.value().getMessageId(), StringUtils.isNotBlank(e.getMessage()) ? e.getMessage() : e.getCause().toString());
                    }
                });
                postConsumed(successConsumed, failedConsumed, failedConsumedReason);
                // 平时使用异步提交
                consumer.commitAsync();
            }
        }catch (WakeupException e) {
            if (!closed.get()) {
                throw e;
            }
        } finally {
            // 消费者退出时使用同步提交
            try {
                consumer.commitSync();
            } catch (Exception e) {
                log.info("commit sync occur exception: " + e);
            } finally{
                try {
                    consumer.close();
                }catch (Exception e) {
                    log.info("consumer close occur exception: " + e);
                }
                log.info( "shutdown kafka consumer complete");
            }
        }
    }
    
    /**
     * 处理成功、成功后的回调、失败
     * @param successConsumed
     * @param failedConsumed
     * @param failedConsumedReason
     */
    private void postConsumed(Set<UserDTO> successConsumed, Set<UserDTO> failedConsumed, Map<String, String> failedConsumedReason) {
        // 后置处理开启异步线程处理,不阻塞消费者线程
        
        // 克隆传进来的集合,而不使用原集合的引用,因为原集合每次消费都会重置
        Set<UserDTO> cloneSuccessConsumed = new HashSet<>(successConsumed);
        Set<UserDTO> cloneFailedConsumed = new HashSet<>(failedConsumed);
        Map<String, String> cloneFailedConsumedReason = new HashMap<>(failedConsumedReason);
        new Thread( () -> {
            if (!cloneSuccessConsumed.isEmpty()) {
                messageAckConsumesSuccessService.insertMessageIds(cloneSuccessConsumed.stream().map(UserDTO::getMessageId).collect(Collectors.toSet()));
                cloneFailedConsumed.forEach(item -> {
                    if (Objects.nonNull(item.getCallbackMetaData())) {
                        // do callback
                        CallbackProducer callbackProducer = new CallbackProducer();
                        callbackProducer.sendCallbackMessage(item.getCallbackMetaData(), MessageFailedPhrase.PRODUCER);
                    }
                });
            }
            if (!cloneFailedConsumed.isEmpty()) {
                ObjectMapper objectMapper = new ObjectMapper();
                cloneFailedConsumed.forEach(item -> {
                    MessageFailedEntity entity = new MessageFailedEntity();
                    entity.setMessageId(item.getMessageId());
                    entity.setMessageType(MessageType.EMAIL);
                    entity.setMessageFailedPhrase(MessageFailedPhrase.CONSUMER);
                    entity.setFailedReason(cloneFailedConsumedReason.get(item.getMessageId()));
                    try {
                        entity.setMessageContentJsonFormat(objectMapper.writeValueAsString(item));
                    } catch (JsonProcessingException e) {
                        log.info("failed to convert UserDTO message to json string");
                    }
                    messageFailedService.saveOrUpdateMessageFailed(entity);
                });
            }
        }).start();
    }
    
    public void shutdown() {
        log.info( Thread.currentThread().getName() + " shutdown kafka consumer");
        closed.set(true);
        consumer.wakeup();
    }
}

启动消费者

通过实现ServletContextListener接口对于方法使其在Tomcat启动之后,启动消费者

public class StartUpConsumerListener implements ServletContextListener {
    
    
    /**
     * 假设开启10个消费者.
     *
     * 消费者的数量要和partition的数量一致,实际情况下,可以调用AdminClient的方法获取到topic的partition数量,然后根据partition数量来创建消费者.
     * @param sce
     */
    @Override
    public void contextInitialized(final ServletContextEvent sce) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 30L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), new AbortPolicy());
        for (int i = 0; i < 10; i++) {
            KafkaConsumer<String, UserDTO> consumer = new KafkaConsumer<>(KafkaConfiguration.loadConsumerConfig(i, UserDTO.class.getName()));
            MessageConsumerRunner messageConsumerRunner = new MessageConsumerRunner(consumer, 10);
            // 使用另外一个线程来关闭消费者
            Thread shutdownHooks = new Thread(messageConsumerRunner::shutdown);
            KafkaListener.KAFKA_CONSUMERS.add(shutdownHooks);
            // 启动消费者线程
            threadPoolExecutor.execute(messageConsumerRunner);
        }
    }
}

关闭消费者

public class KafkaListener implements ServletContextListener {
    
    public static final Vector<Thread> KAFKA_CONSUMERS = new Vector<>();

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        // do noting
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        KAFKA_CONSUMERS.forEach(Thread::run);
    }
}

配置listener

<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
                      https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"
         version="6.0">
  <display-name>Kafka消息的消费者-消息系统</display-name>

<!--  listener的contextInitialized顺序按照声明顺序执行, contextDestroyed方法按照声明顺序反向执行-->
  <listener>
    <listener-class>com.message.server.listener.KafkaListener</listener-class>
  </listener>

  <listener>
    <listener-class>com.message.server.listener.StartUpConsumerListener</listener-class>
  </listener>
</web-app>

结语

  1. 在处理消费者相关逻辑时,我们重点关心如何确保消息不重复消费以及如何增加消费者的吞吐量
  2. 消费逻辑尽可能保证处理速度快,尽量减少耗时的逻辑

示例源码仓库

  1. Github地址
  2. 项目下message-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述

我们生产者和消费者的正常情况都以处理完了,下一篇文章我们将重点处理生产者失败和消费者失败之后重新生产消息和消费消息的逻辑,以及简单说一下Kafka中的rebalance。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1224847.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

哈希

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析&#xff08;3&#xff09; 目录 &#x1f449;&#x1f3fb;unordered系列关联式容器un…

探索NLP中的核心架构:编码器与解码器的区别

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

【刷题专栏—突破思维】LeetCode 138. 随机链表的复制

前言 随机链表的复制涉及到复制一个链表&#xff0c;该链表不仅包含普通的next指针&#xff0c;还包含random指针&#xff0c;该指针指向链表中的任意节点或空节点。 文章目录 原地修改链表 题目链接&#xff1a; LeetCode 138. 随机链表的复制 原地修改链表 题目介绍&#xf…

Arduino驱动LM35线性温度传感器(温湿度传感器)

目录 1、传感器特性 2、控制器和传感器连线图 3、驱动程序 LM35半导体的温度传感器,可以用来对环境温度进行定性的检测。LM35半导体温度传感器是美国国家半导体公司生产的线性温度传感器。其测温范围是-40℃到150℃,灵敏度为10mV/℃,输出电压与温度成正比。

亚马逊云Amazon OpenSearch Serverless“利刃在手,‘向量’八方“

全Serverless架构新价值 随着Amazon OpenSearch Serverless正式上线“商用”&#xff0c;亚马逊云科技的全栈“Serverless”应用架构也“初见雏形”&#xff0c;这也意味着&#xff0c;未来企业可以在亚马逊云科技之上简单和轻松的搭建完整的无服务器应用架构。 数据也显示&am…

【LeetCode刷题日志】225.用队列实现栈

&#x1f388;个人主页&#xff1a;库库的里昂 &#x1f390;C/C领域新星创作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏✨收录专栏&#xff1a;LeetCode 刷题日志&#x1f91d;希望作者的文章能对你有所帮助&#xff0c;有不足的地方请在评论区留言指正&#xff0c;…

Git配置代理:fatal: unable to access*** github Failure when receiving data from

~吐槽一下 github自从被微软收购以后&#xff0c;大多数情况没点科技上网都进不去了&#xff0c;还是怀念以前随时访问的时光。 我一直都是开着系统代理的&#xff0c;但是今天拉一个项目发现拉不下来了&#xff0c;报错&#xff1a; fatal: unable to access https://githu…

人工智能基础_机器学习044_逻辑回归代码实现与手动计算概率---人工智能工作笔记0084

上面我们已经把逻辑回归的公式,以及,公式对应的图形都画画出来了,然后我们再来看看 如何用代码实现 可以看到上面是代码,咱们自己去写一下 import numpy as np from sklearn.linear_model import LogistieRegression from sklearn import datasets # 训练数据和测试数据拆分…

浅析AcrelEMS-CIA机场智慧能源管平台解决方案-安科瑞 蒋静

1 概述 机场智慧能源管平台解决方案对机场范围内变电站内的高低压配电设备 、 发电机、变压器 、UPS、EPS 、广场照明 、 室内照明 、通风及排水等机电设备进行实时分布式监控和集中管理 , 实现无人值守 , 确保高速公路安全畅通 , 提高 自动化管理水平 , 降低机电设备的运行维…

联想笔记本电脑触摸板失灵了怎么办

这里写自定义目录标题 thinkbook笔记本电脑触摸板失灵 thinkbook笔记本电脑触摸板失灵 由于重装系统&#xff0c;导致笔记本的触控板失灵&#xff0c; 网上说的办法有 1、按键盘上的ctrlf6键&#xff0c;打开触控板功能&#xff1a;无效 2、设置——>设备——>触控板&am…

buuctf-web-p6 [NPUCTF2020]web 狗

java: HelloWorld.class import java.io.PrintStream;public class HelloWorld {public static void main(String[] paramArrayOfString){System.out.println("众所周知&#xff0c;你是一名WEB选手&#xff0c;掌握javaweb也是一项必备技能&#xff0c;那么逆向个java应…

央企太卷.....来自央企的7个面试题,一个一个生产难题

说在前面 在40岁老架构师尼恩的&#xff08;50&#xff09;读者社群中&#xff0c;最近小伙伴&#xff0c;面试央企、美团、京东、阿里、 百度、头条等大厂。 下面是一个小伙伴成功拿到通过了一个央企设计研究院一面面试&#xff0c;现在把面试真题和参考答案收入咱们的宝典。…

Git企业开发级讲解(五)

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、bug 分⽀二、删除临时分支三、小结 一、bug 分⽀ 假如我们现在正在 dev2 分⽀上进⾏开发…

LeetCode994.腐烂的橘子

看完题我觉得这不是和上一道岛屿的题一样简单嘛&#xff0c;然后写了将近2个小时才写出来&#xff0c;我的思路就是&#xff0c;用check()先对grid检查一下&#xff0c;是否有以下情况&#xff1a; &#xff08;如果有1的周围都是空&#xff0c;则这个位置用不腐烂&#xff0c;…

清华学霸告诉你:如何自学人工智能?

清华大学作为中国顶尖的学府之一&#xff0c;培养了许多优秀的人才&#xff0c;其中不乏在人工智能领域有所成就的学霸。通过一位清华学霸的经验分享&#xff0c;揭示如何自学人工智能&#xff0c;帮助你在这场科技浪潮中勇往直前。 一、夯实基础知识 数学基础&#xff1a;学习…

【Qt开发流程】之HelloWorld程序

【Qt开发流程】之HelloWorld程序 目的编写程序新建项目文件说明及界面设计 程序运行及发布程序运行程序发布手动构建使用windeployqt进行构建 设置应用程序图标修改快捷键类型列表命令行编译程序命令行编译.ui文件自定义类项目模式及项目文件介绍项目模式项目文件 目的 从Hell…

vue --version无法显示,只弹出vs窗口

参考连接&#xff1a; nodejs环境配置&#xff08;解压包&#xff09;安装教程_nodejs解压版安装及环境配置_tubond的博客-CSDN博客 原因&#xff1a;环境没搞好&#xff0c;没有设置全局文件夹&#xff0c;node默认放在C盘了&#xff0c;C盘有权限。因为npm -i vue/cli创建…

2023最新最全【OpenMV】 入门教程

1. 什么是OpenMV OpenMV 是一个开源&#xff0c;低成本&#xff0c;功能强大的 机器视觉模块。 OpenMV上的机器视觉算法包括 寻找色块、人脸检测、眼球跟踪、边缘检测、标志跟踪 等。 以STM32F427CPU为核心&#xff0c;集成了OV7725摄像头芯片&#xff0c;在小巧的硬件模块上&a…

电磁场与电磁波part4--时变电磁场

1、采用洛伦兹条件使得矢量位 与标量位 分离在两个独立的方程中&#xff0c;且矢量位 仅与电流密度 有关&#xff0c;而标量位 仅与电荷密度 有关。 2、电磁能量守恒定理&#xff08;坡印廷定理&#xff09; 即减少的电磁能量电磁场所做的功流出的电磁能量 3、设u(r,t)是…

单元测试实战(四)MyBatis-Plus 的测试

为鼓励单元测试&#xff0c;特分门别类示例各种组件的测试代码并进行解说&#xff0c;供开发人员参考。 本文中的测试均基于JUnit5。 单元测试实战&#xff08;一&#xff09;Controller 的测试 单元测试实战&#xff08;二&#xff09;Service 的测试 单元测试实战&am…