搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf

news2025/1/11 1:48:20

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、原项目
      • 四、修改项目
      • 五、测试一下
      • 五、小结


前言

本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

<dependency>
    <groupId>io.github.vipjoey</groupId>
    <artifactId>multi-kafka-consumer-starter</artifactId>
    <version>最新版本号</version>
</dependency>

例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量
  • Springboot 取消限定符
  • Springboot 支持消费protobuf类型的kafka消息

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、原项目

1、接前文,我们开发了一个kafka插件,但在使用过程中发现有些不方便的地方,在公共接口MmcKafkaStringInputer 显示地继承了BatchMessageListener<String, String>,导致我们没办法去指定消费protobuf类型的message。


	public interface MmcKafkaStringInputer extends MmcInputer, BatchMessageListener<String, String> {
	
	}

	/**
     * 消费kafka消息.
     */
    @Override
    public void onMessage(List<ConsumerRecord<String, String>> records) {

        if (null == records || CollectionUtils.isEmpty(records)) {

            log.warn("{} records is null or records.value is empty.", name);
            return;
        }

        Assert.hasText(name, "You must pass the field `name` to the Constructor or invoke the setName() after the class was created.");
        Assert.notNull(properties, "You must pass the field `properties` to the Constructor or invoke the setProperties() after the class was created.");

        try {

            Stream<T> dataStream = records.stream()
                    .map(ConsumerRecord::value)
                    .flatMap(this::doParse)
                    .filter(Objects::nonNull)
                    .filter(this::isRightRecord);

            // 支持配置强制去重或实现了接口能力去重
            if (properties.isDuplicate() || isSubtypeOfInterface(MmcKafkaMsg.class)) {

                // 检查是否实现了去重接口
                if (!isSubtypeOfInterface(MmcKafkaMsg.class)) {
                    throw new RuntimeException("The interface "
                            + MmcKafkaMsg.class.getName() + " is not implemented if you set the config `spring.kafka.xxx.duplicate=true` .");
                }

                dataStream = dataStream.collect(Collectors.groupingBy(this::buildRoutekey))
                        .entrySet()
                        .stream()
                        .map(this::findLasted)
                        .filter(Objects::nonNull);
            }

            List<T> datas = dataStream.collect(Collectors.toList());
            if (CommonUtil.isNotEmpty(datas)) {

                this.dealMessage(datas);

            }


        } catch (Exception e) {

            log.error(name + "-dealMessage error ", e);
        }
    }


2、由于实现了BatchMessageListener<String, String>接口,抽象父类必须实现onMessage(List<ConsumerRecord<String, String>> records)方法,这样会导致子类局限性很大,没办法去实现其它kafka的xxxListener接口,例如手工提交offset,单条消息消费等。

因此、所以我们要升级和优化。

四、修改项目

1、新增KafkaAbastrctProcessor抽象父类,直接实现MmcInputer接口,要求所有子类都需要继承本类,子类通过调用{@link #receiveMessage(List)} 模板方法来实现通用功能;

@Slf4j
@Setter
abstract class KafkaAbstractProcessor<T> implements MmcInputer {
   
   // 类的内容基本和MmcKafkaKafkaAbastrctProcessor保持一致
   // 主要修改了doParse方法,目的是让子类可以自定义解析protobuf
   /**
     * 将kafka消息解析为实体,支持json对象或者json数组.
     *
     * @param msg kafka消息
     * @return 实体类
     */
    protected Stream<T> doParse(ConsumerRecord<String, Object> msg) {

        // 消息对象
        Object record = msg.value();

        // 如果是pb格式
        if (record instanceof byte[]) {

            return doParseProtobuf((byte[]) record);

        } else if (record instanceof String) {

            // 普通kafka消息
            String json = record.toString();

            if (json.startsWith("[")) {

                // 数组
                List<T> datas = doParseJsonArray(json);
                if (CommonUtil.isEmpty(datas)) {

                    log.warn("{} doParse error, json={} is error.", name, json);
                    return Stream.empty();
                }

                // 反序列对象后,做一些初始化操作
                datas = datas.stream().peek(this::doAfterParse).collect(Collectors.toList());

                return datas.stream();

            } else {

                // 对象
                T data = doParseJsonObject(json);
                if (null == data) {

                    log.warn("{} doParse error, json={} is error.", name, json);
                    return Stream.empty();
                }

                // 反序列对象后,做一些初始化操作
                doAfterParse(data);

                return Stream.of(data);
            }

        } else if (record instanceof MmcKafkaMsg) {

            // 如果本身就是PandoKafkaMsg对象,直接返回
            //noinspection unchecked
            return Stream.of((T) record);

        } else {


            throw new UnsupportedForMessageFormatException("not support message type");
        }
    }

    /**
     * 将json消息解析为实体.
     *
     * @param json kafka消息
     * @return 实体类
     */
    protected T doParseJsonObject(String json) {
        if (properties.isSnakeCase()) {
            return JsonUtil.parseSnackJson(json, getEntityClass());
        } else {
            return JsonUtil.parseJsonObject(json, getEntityClass());
        }
    }

    /**
     * 将json消息解析为数组.
     *
     * @param json kafka消息
     * @return 数组
     */
    protected List<T> doParseJsonArray(String json) {
        if (properties.isSnakeCase()) {
            try {
                return JsonUtil.parseSnackJsonArray(json, getEntityClass());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            return JsonUtil.parseJsonArray(json, getEntityClass());
        }
    }

    /**
     * 序列化为pb格式,假设你消费的是pb消息,需要自行实现这个类.
     *
     * @param record pb字节数组
     * @return pb实体类流
     */
    protected Stream<T> doParseProtobuf(byte[] record) {

        throw new NotImplementedException();
    }
}

2、修改MmcKafkaBeanPostProcessor类,暂存KafkaAbastrctProcessor的子类。

public class MmcKafkaBeanPostProcessor implements BeanPostProcessor {

    @Getter
    private final Map<String, KafkaAbstractProcessor<?>> suitableClass = new ConcurrentHashMap<>();

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if (bean instanceof KafkaAbstractProcessor) {

            KafkaAbstractProcessor<?> target = (KafkaAbstractProcessor<?>) bean;
            suitableClass.putIfAbsent(beanName, target);
            suitableClass.putIfAbsent(bean.getClass().getName(), target);
        }

        return bean;
    }
}

3、修改MmcKafkaProcessorFactory,更换构造的目标类为KafkaAbstractProcessor

public class MmcKafkaProcessorFactory {

    @Resource
    private DefaultListableBeanFactory defaultListableBeanFactory;

    public KafkaAbstractProcessor<? > buildInputer(
            String name, MmcMultiKafkaProperties.MmcKafkaProperties properties,
            Map<String, KafkaAbstractProcessor<? >> suitableClass) throws Exception {

        // 如果没有配置process,则直接从注册的Bean里查找
        if (!StringUtils.hasText(properties.getProcessor())) {

            return findProcessorByName(name, properties.getProcessor(), suitableClass);
        }

        // 如果配置了process,则从指定配置中生成实例
        // 判断给定的配置是类,还是bean名称
        if (!isClassName(properties.getProcessor())) {

            throw new IllegalArgumentException("It's not a class, wrong value of ${spring.kafka." + name + ".processor}.");
        }

        // 如果ioc容器已经存在该处理实例,则直接使用,避免既配置了process,又使用了@Service等注解
        KafkaAbstractProcessor<? > inc = findProcessorByClass(name, properties.getProcessor(), suitableClass);
        if (null != inc) {
            return inc;
        }

        // 指定的processor处理类必须继承KafkaAbstractProcessor
        Class<?> clazz = Class.forName(properties.getProcessor());
        boolean isSubclass = KafkaAbstractProcessor.class.isAssignableFrom(clazz);
        if (!isSubclass) {
            throw new IllegalStateException(clazz.getName() + " is not subClass of KafkaAbstractProcessor.");
        }

        // 创建实例
        Constructor<?> constructor = clazz.getConstructor();
        KafkaAbstractProcessor<? > ins = (KafkaAbstractProcessor<? >) constructor.newInstance();

        // 注入依赖的变量
        defaultListableBeanFactory.autowireBean(ins);

        return ins;
    }

    private KafkaAbstractProcessor<? > findProcessorByName(String name, String processor, Map<String,
            KafkaAbstractProcessor<? >> suitableClass) {

        return suitableClass.entrySet()
                .stream()
                .filter(e -> e.getKey().startsWith(name) || e.getKey().equalsIgnoreCase(processor))
                .map(Map.Entry::getValue)
                .findFirst()
                .orElseThrow(() -> new RuntimeException("Can't found any suitable processor class for the consumer which name is " + name
                        + ", please use the config ${spring.kafka." + name + ".processor} or set name of Bean like @Service(\"" + name + "Processor\") "));
    }


    private KafkaAbstractProcessor<? > findProcessorByClass(String name, String processor, Map<String,
            KafkaAbstractProcessor<? >> suitableClass) {

        return suitableClass.entrySet()
                .stream()
                .filter(e -> e.getKey().startsWith(name) || e.getKey().equalsIgnoreCase(processor))
                .map(Map.Entry::getValue)
                .findFirst()
                .orElse(null);
    }

    private boolean isClassName(String processor) {

        // 使用正则表达式验证类名格式
        String regex = "^[a-zA-Z_$][a-zA-Z\\d_$]*([.][a-zA-Z_$][a-zA-Z\\d_$]*)*$";
        return Pattern.matches(regex, processor);
    }

}

4、修改MmcMultiConsumerAutoConfiguration,更换构造的目标类的父类为KafkaAbstractProcessor

 @Bean
    public MmcKafkaInputerContainer mmcKafkaInputerContainer(MmcKafkaProcessorFactory factory,
                                                             MmcKafkaBeanPostProcessor beanPostProcessor) throws Exception {

        Map<String, MmcInputer> inputers = new HashMap<>();

        Map<String, MmcMultiKafkaProperties.MmcKafkaProperties> kafkas = mmcMultiKafkaProperties.getKafka();

        // 逐个遍历,并生成consumer
        for (Map.Entry<String, MmcMultiKafkaProperties.MmcKafkaProperties> entry : kafkas.entrySet()) {

            // 唯一消费者名称
            String name = entry.getKey();

            // 消费者配置
            MmcMultiKafkaProperties.MmcKafkaProperties properties = entry.getValue();

            // 是否开启
            if (properties.isEnabled()) {

                // 生成消费者
                KafkaAbstractProcessor inputer = factory.buildInputer(name, properties, beanPostProcessor.getSuitableClass());

                // 输入源容器
                ConcurrentMessageListenerContainer<Object, Object> container = concurrentMessageListenerContainer(properties);

                // 设置容器
                inputer.setContainer(container);
                inputer.setName(name);
                inputer.setProperties(properties);

                // 设置消费者
                container.setupMessageListener(inputer);

                // 关闭时候停止消费
                Runtime.getRuntime().addShutdownHook(new Thread(inputer::stop));

                // 直接启动
                container.start();

                // 加入集合
                inputers.put(name, inputer);
            }

        }

        return new MmcKafkaInputerContainer(inputers);
    }

5、修改MmcKafkaKafkaAbastrctProcessor,用于实现kafka的BatchMessageListener 接口,当然你也可以实现其它Listener接口,或者在这基础上扩展。

public abstract class MmcKafkaKafkaAbastrctProcessor<T> extends KafkaAbstractProcessor<T> implements BatchMessageListener<String, Object> {

    @Override
    public void onMessage(List<ConsumerRecord<String, Object>> records) {

        if (null == records || CollectionUtils.isEmpty(records)) {

            log.warn("{} records is null or records.value is empty.", name);
            return;
        }

        receiveMessage(records);
    }
}

五、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>

2、定义一个pb类型消息和业务处理类。

(1) 定义pb,然后通过命令生成对应的实体类;

syntax = "proto2";

package  com.mmc.multi.kafka;

option java_package = "com.mmc.multi.kafka.starter.proto";
option java_outer_classname = "DemoPb";

message PbMsg {

    optional string routekey = 1;

    optional string cosImgUrl = 2;

    optional string base64str = 3;


}

(2)创建PbProcessor消息处理类,用于消费protobuf类型的消息;

@Slf4j
@Service("pbProcessor")
public class PbProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {

    @Override
    protected Stream<DemoMsg> doParseProtobuf(byte[] record) {
        try {
            DemoPb.PbMsg msg = DemoPb.PbMsg.parseFrom(record);
            DemoMsg demo = new DemoMsg();
            BeanUtils.copyProperties(msg, demo);
            return Stream.of(demo);
        } catch (InvalidProtocolBufferException e) {
            log.error("parssPbError", e);
            return Stream.empty();
        }

    }

    @Override
    protected void dealMessage(List<DemoMsg> datas) {
        System.out.println("PBdatas: " + datas);
    }
}

3、配置kafka地址和指定业务处理类。

## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

4、编写测试类。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiConsumerAutoConfiguration.class, DemoService.class, PbProcessor.class})
@TestPropertySource(value = "classpath:application-pb.properties")
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"},
        topics = {"${spring.kafka.pb.topic}"})
class KafkaPbMessageTest {


    @Resource
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${spring.kafka.pb.topic}")
    private String topicPb;


    @Test
    void testDealMessage() throws Exception {

        Thread.sleep(2 * 1000);

        // 模拟生产数据
        produceMessage();

        Thread.sleep(10 * 1000);
    }

    void produceMessage() {


        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        Producer<String, byte[]> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new ByteArraySerializer()).createProducer();


        for (int i = 0; i < 10; i++) {

            DemoPb.PbMsg msg = DemoPb.PbMsg.newBuilder()
                    .setCosImgUrl("http://google.com")
                    .setRoutekey("routekey-" + i).build();


            producer.send(new ProducerRecord<>(topicPb, "my-aggregate-id", msg.toByteArray()));
            producer.flush();
        }


    }
}

5、运行一下,测试通过。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1793066.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Clo3D导出服装动画,使用Unity3D展示

1.前言 Clo3D是一款应用于时装行业的3D服装设计软件,其强大的布料模拟算法可在3D空间中实现设计、制版、试衣和走秀,大幅提升数字作品逼真度和制作效率。为了让服装动画效果展示在Unity3D上模拟效果&#xff0c;需要Clo3D模拟出逼着的衣服动画。总体流程为Clo3D - Mixamo -Blen…

废酸再生工艺的稳定性强优势

废酸再生工艺&#xff0c;作为现代工业中一项重要的环保技术&#xff0c;其核心目的是将工业生产过程中产生的废酸进行回收、处理和再利用&#xff0c;以实现资源的节约和环境的保护。这一工艺不仅有助于减少废酸对环境的污染&#xff0c;还能为企业带来显著的经济效益。 一、废…

idea实用快捷键(持续更新...)

文章目录 1、快速输入try/catch/finally2、选中多个光标3、实现接口4、方法参数提示5、查看某个类的子类6、弹出显示查找内容的搜索框 1、快速输入try/catch/finally CtrlAltT 2、选中多个光标 ShiftAlt单机多选 End可以全部到行尾&#xff0c;Home则可以全部回到行首 3、实现接…

MySQL的增删改查2

文章目录 1. 数据库约束1.1 约束类型1.2 NOT NULL约束1.3 UNIQUE唯一约束1.4 DEFAULT默认值约束1.5 PRIMARY KEY主键约束1.6 FOREIGN KEY外键约束1.7 CHECK约束 2. 新增3. 查询3.1 聚合查询3.1.1 聚合函数3.1.2 GROUP BY子句3.1.3 HAVING 3.2 联合查询3.2.1 内连接3.2.2 外连接…

慧天卓特:2024年“一带一路”之哈萨克斯坦旱情监测案例分析(FYDI)

引言 联合国防治荒漠化公约组织指出&#xff1a;中亚约有1200万人生活在干旱风险高的地区&#xff0c;面积约为4000万公顷。位于亚洲中部的哈萨克斯坦共和国&#xff08;简称哈萨克斯坦&#xff09;和中国有着长期友好的睦邻和经贸关系&#xff0c;中国是哈萨克斯坦的主要农产…

C++:day3

思维导图 练习题 #include <iostream> using namespace std;class Per { private:string name;int age;int *height;double weight;public:Per(){cout << "Per::无参构造函数" << endl;}Per(string name, int age, int height, double weight) :…

2004NOIP普及组真题 2. 花生采摘

线上OJ&#xff1a; 【04NOIP普及组】花生采摘 核心思想&#xff1a; 1、本题为贪心即可。 2、因为本题严格限制了顺序&#xff0c;所以先把每个节点的花生数量按降序排序。然后逐一判断下一个花生是否需要去采摘即可 3、每一次采摘完&#xff0c;记录耗时 t 以及采集的花…

GPT-4o(OpenAI最新推出的大模型)

简介&#xff1a;最近&#xff0c;GPT-4o横空出世。对GPT-4o这一人工智能技术进行评价&#xff0c;包括版本间的对比分析、GPT-4o的技术能力以及个人感受等。 方向一&#xff1a;对比分析 GPT-4o&#xff08;OpenAI最新推出的大模型&#xff09;与GPT-4之间的主要区别体现在响应…

SSM框架整合,内嵌Tomcat。基于注解的方式集成

介绍&#xff1a; SSM相信大家都不陌生&#xff0c;在spring boot出现之前&#xff0c;SSM一直是Java在web开发中的老大哥。现在虽说有了spring boot能自动整合第三方框架了&#xff0c;但是现在市面上任然有很多老项目是基于SSM技术的。因此&#xff0c;能熟练掌握SSM进行开发…

Python接入淘宝API接口采集商品详情页到手价优惠券信息数据:智能化营销的加速器

在电子商务领域&#xff0c;智能化营销正在成为提高效率和竞争力的关键。淘宝API提供了一套完整的解决方案&#xff0c;帮助商家实现智能化营销&#xff0c;从而提升销售业绩和顾客满意度。 库存管理&#xff1a; 淘宝API使商家能够实时监控库存水平&#xff0c;自动补货&#…

【康耐视国产案例】智能AI相机:深度解析DataMan 380大视野高速AI读码硬实力

随着读码器技术的不断更新迭代&#xff0c;大视野高速应用成为当前工业读码领域的关键发展方向。客户对大视野高速读码器的需求源于其能显著减少生产成本并提升工作效率。然而&#xff0c;大视野应用场景往往伴随着对多个条码的读取需求&#xff0c;这无疑增加了算法的处理负担…

VCAST创建单元测试工程

1. 设置工作路径 选择工作目录,后面创建的 UT工程 将会生成到这个目录。 2. 新建工程 然后填写 工程名称,选择 编译器,以及设置 基础路径。注意 Base Directory 必须要为代码工程的根目录,否则后面配置环境会失败。 这样工程就创建好了。 把基础路径设置为相对路径。 …

LabVIEW储油罐监控系统

LabVIEW储油罐监控系统 介绍了基于LabVIEW的储油罐监控系统的设计与实施。系统通过集成传感器技术和虚拟仪器技术&#xff0c;实现对储油罐内液位和温度的实时监控&#xff0c;提高了油罐监管的数字化和智能化水平&#xff0c;有效增强了油库安全管理的能力。 项目背景 随着…

算法与数据结构高手养成:朴素的贪心法(下)二分答案

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

Plotly的魔力:如何用Python创建令人惊叹的图表?

大家好&#xff0c;在数据分析和可视化领域&#xff0c;图表是不可或缺的工具。它们可以帮助我们更直观地理解数据趋势和模式。今天&#xff0c;我们要介绍的是一个强大的Python库——Plotly&#xff0c;它可以让你轻松创建交互式、漂亮的图表。无论你是数据科学家、分析师&…

计算机网络到底是指什么?

计算机网络是信息技术领域中最为核心和复杂的一部分&#xff0c;它涵盖了众多的技术原理和应用。下面&#xff0c;我们将从技术层面深入探讨计算机网络的相关内容。 一、计算机网络的分层模型 计算机网络的分层模型是网络通信的基石&#xff0c;它将网络通信过程划分为不同的层…

【ARM Cache 系列文章 2.1 -- Cache PoP 及 PoDP 介绍】

请阅读【ARM Cache 及 MMU/MPU 系列文章专栏导读】 及【嵌入式开发学习必备专栏】 文章目录 PoP 及 PoDPCache PoDPCache PoP应用和影响PoP 及 PoDP Cache PoDP 点对深度持久性(Point of Deep Persistence, PoDP)是内存系统中的一个点,在该点达到的任何写操作即使在系统供电…

【深度解析GPT-4o】:全面解析新一代AI技术的突破与优势

目录 ​编辑 1.版本对比&#xff1a;从GPT-3到GPT-4&#xff0c;再到GPT-4o的飞跃 1.1 模型规模的扩展 1.2 训练数据的更新 1.3 算法优化与效率提升 1.4 案例分析 2.技术能力&#xff1a;GPT-4o的核心优势 2.1 卓越的自然语言理解 2.1.1 上下文理解能力 2.1.2 语义分…

教师个人出书需要具备什么条件?

邮箱&#xff1a;2621542409qq.com&#xff08;qkfb88688-来稿备注独著&#xff09; 教师个人出书通常需要具备以下一些条件&#xff1a; 1. 内容价值&#xff1a;书稿内容要有一定的学术价值、教学经验分享价值或对教育领域有独特的见解和贡献。 2. 原创性&#xff1a;书稿必…

软件测试——Java自动化测试Selenium

目录 1.运行环境 2.环境配置 3.第一个浏览器程序 4.浏览器操作 5.元素定位 6.元素操作常用API 7.特殊元素定位与操作 8.元素三大等待 9.iframe操作 10.window操作 11.select选择框 12.js语句执行 13.鼠标操作 14.截图操作 1.运行环境 编译工具&#xff1a;IDEA …