kafka发送消息-分区策略(消息发送到哪个分区中?是什么策略)

news2025/2/28 19:56:34

生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

  • 1、默认策略,程序自动计算并指定分区
    • 1.1、指定key,不指定分区
    • 1.2、不指定key,不指定分区
  • 2、轮询分配策略RoundRobinPartitioner
    • 2.1、创建配置类
    • 2.2、application.yml文件
    • 2.3、生产者
    • 2.4、测试类
    • 2.5、执行结果
  • 3、自定义分区分配策略
    • 3.1、创建自定义分配策略类
    • 3.2、修改kafka配置类
    • 3.3、application.yml文件
    • 3.4、生产者
    • 3.5、测试类
    • 3.6、测试结果
    • 3.7、总结

在这里插入图片描述

1、默认策略,程序自动计算并指定分区

1.1、指定key,不指定分区

生产者:在编写代码发送消息时我们先不指定分区,即分区设为null,看看程序最终会把消息发送到哪个分区。

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;
    
    public void send9(){
        User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
        //分区是null,让kafka自己去决定把消息发送到哪个分区
        kafkaTemplate2.send("heTopic",null,System.currentTimeMillis(),"k9",user);
    }
}

测试类:

package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;
    
    @Test
    void send9(){
        eventProducer.send9();
    }
}

程序最终是通过以下代码进行目标分区计算的:

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

通过调试发现,程序是通过以下代码进行目标分区计算的:
程序自动读取生产者发送消息时的key(本次发送时值为“key9”),将key生成一个32位的HASH值,将该HASH值与默认分区数(这个topic中有9个分区)取余数(余数结果一定在0-8之间),进而计算得出消息默认发送到的分区值

在这里插入图片描述

在这里插入图片描述

1.2、不指定key,不指定分区

生产者:
在这里插入图片描述

测试类:

在这里插入图片描述
此时时通过随机数与默认分区取余数计算默认分区的

使用随机数 % numPartitions

2、轮询分配策略RoundRobinPartitioner

通过查看kafka源码发现,分区接口有一个轮询分配策略相关实现类。
在这里插入图片描述

在application.yml配置文件中生产者配置项,我发现并生产者并没有相关轮询分配策略的配置,那么该如何试下轮询指定分区的配置呢?
在这里插入图片描述

需要编写代码试下轮询指定分区策略:

在这里插入图片描述

2.1、创建配置类

package com.power.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
        return props;
    }

    public ProducerFactory<String, ?> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, ?> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    //第二次创建
    @Bean
    public NewTopic newTopic9() {
        return new NewTopic("heTopic", 9, (short) 1);
    }
}

2.2、application.yml文件

spring:
  application:
    #应用名称
    name: spring-boot-01-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092
    #配置生产者(24个配置)
    producer:
      #key默认是StringSerializer序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value默认是ToStringSerializer序列化
      value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer


    #配置消费者(24个配置)
    consumer:
      auto-offset-reset: earliest

    template:
      default-topic: default-topic

在这里插入图片描述

2.3、生产者

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;

    public void send10(){
        User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
        //分区是null,让kafka自己去决定把消息发送到哪个分区
        kafkaTemplate2.send("heTopic",user);
    }
}

在这里插入图片描述

2.4、测试类

在这里插入图片描述

package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void send10(){
        for (int i = 0; i <5 ; i++) {
            eventProducer.send10();
        }

    }

}

2.5、执行结果

执行完测试类,发现5次请求分别发送到了kafka的heTopic主题的5个不同分区中:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3、自定义分区分配策略

在这里插入图片描述

3.1、创建自定义分配策略类

package com.power.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomerPartitioner implements Partitioner {

    private AtomicInteger nextPartition = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if(key==null){
            //使用轮询方式选择分区
            int next = nextPartition.getAndIncrement();
            if(next>=numPartitions){
                nextPartition.compareAndSet(next,0);
            }
            if(next>0){
                next--;
            }
            System.out.println("分区值:"+next);
            return next;
        }else {
            //如果key不为inull,则使用默认的分区策略
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3.2、修改kafka配置类

指定使用自定义的分区分配类
在这里插入图片描述

3.3、application.yml文件

spring:
  application:
    #应用名称
    name: spring-boot-01-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092
    #配置生产者(24个配置)
    producer:
      #key默认是StringSerializer序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value默认是ToStringSerializer序列化
      value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer


    #配置消费者(24个配置)
    consumer:
      auto-offset-reset: earliest

    template:
      default-topic: default-topic

在这里插入图片描述

3.4、生产者

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;

    public void send10(){
        User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
        //分区是null,让kafka自己去决定把消息发送到哪个分区
        kafkaTemplate2.send("heTopic",user);
    }
}

在这里插入图片描述

3.5、测试类

在这里插入图片描述

package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void send10(){
        for (int i = 0; i <5 ; i++) {
            eventProducer.send10();
        }

    }

}

3.6、测试结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.7、总结

使用自定义分区策略类尝试发送消息,发现发送的5次消息,并没有连续发送到5个挨着的分区中,查看kafka源码的org.apache.kafka.clients.producer.KafkaProducer类的doSend方法发现,每一次发送前,调用了两次计算分区的方法,导致第一个得到的分区并不会正在的发送消息。

doSend方法;

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer", cce);
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer", cce);
        }
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();

        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        ensureValidRecordSize(serializedSize);
        long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
        if (log.isTraceEnabled()) {
            log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        }
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionManager.failIfNotReadyForSend();
        }
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

        if (result.abortForNewBatch) {
            int prevPartition = partition;
            partitioner.onNewBatch(record.topic(), cluster, prevPartition);
            partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            if (log.isTraceEnabled()) {
                log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
            }
            // producer callback will make sure to call both 'callback' and interceptor callback
            interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
        }

        if (transactionManager != null && transactionManager.isTransactional())
            transactionManager.maybeAddPartitionToTransaction(tp);

        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw new InterruptException(e);
    } catch (KafkaException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (Exception e) {
        // we notify interceptor about all exceptions, since onSend is called before anything else in this method
        this.interceptors.onSendError(record, tp, e);
        throw e;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2071781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3.4-CoroutineScope/CoroutineContext:coroutineScope() 和 supervisorScope()

文章目录 coroutineScope()supervisorScope()总结 coroutineScope() coroutineScope() 和我们创建协程时的 CoroutineScope 名字是相同的&#xff0c;实际上它们也确实有所关联&#xff0c;为了方便理解我们先说下 coroutineScope() 是怎样的效果。 我们在使用 coroutineScop…

PPT分享:某集团公司供应链-销售与运营计划SOP

今天笔者给大家带来的是“供应链中的S&OP”&#xff0c;在分享PPT之前&#xff0c;咱们先了解下什么是“S&OP”。PPT下载链接见文末~ 供应链中的S&OP&#xff0c;全称为Sales and Operations Planning&#xff0c;即销售与运营计划&#xff0c;是一种跨职能的决策…

【Unity】移动端草海解决方案

草海是开放大世界渲染的必不可少的因素&#xff0c;Unity 原生的 Terrain 草海效率较低&#xff0c;而且无法与 RVT 结合起来&#xff0c;无法在移动端上实现。因此我们自己搓出来一套草海系统&#xff0c;使用 C# 多线程辅助运算&#xff0c;并能支持割草、烧草等进阶玩法。草…

springboot嵌入式数据库实践-H2内嵌数据库(文件、内存)

本文章记录笔者的嵌入式数据库简单实现&#xff0c; 记录简要的配置过程。自用文章&#xff0c;仅作参考。 目录 本文章记录笔者的嵌入式数据库简单实现&#xff0c; 记录简要的配置过程。自用文章&#xff0c;仅作参考。 嵌入式数据库 -------------------------------具…

数学基础(八)

一、假设检验 什么是假设&#xff1a; 对总体参数&#xff08;均值、比例等&#xff09;的具体数值所作的陈述。 什么是假设检验&#xff1a; 先对总体的参数提出某种假设&#xff0c;然后利用样本的信息判断假设是否成立的过程 假设检验的应用&#xff1a; 推广新的教育…

【C++ Primer Plus习题】4.10

问题: 解答: #include <iostream> #include <array> using namespace std;int main() {array<float, 3> grade;float average0;for (int i 0; i < 3; i){cout << "请输入第" << i 1 << "次跑40米的成绩:";cin &…

『功能项目』新输入系统【06】

我们打开上一篇04禁止射线穿透行为项目&#xff0c; 本章要做的事情是在Unity编辑器中添加 新输入系统 实现主角在场景中鼠标右键可以使主角 转向。 本次项目需要让Unity引擎重新启动所以先保存当前项目 再次打开项目后&#xff0c; 修改为Both 点击Apply前注意要先保存项目&a…

Apollo9.0 PNC源码学习之Planning模块—— Lattice规划(五):横向运动轨迹规划

参考文章: (1)自动驾驶规划理论与实践 (2)Apollo6.0代码Lattice算法详解——Part5: 生成横纵向轨迹 0 前言 横向运动轨迹规划主要对车辆方向盘转向的控制策略;求解方法分为基于s的5次多项式和基于二次规划的方法 2 基于s的5次多项式的横向运动轨迹的生成 纵向运动轨迹…

Linux环境使用docker搭建Navidrome本地个人音乐库并实现远程访问

文章目录 前言1. 安装Docker2. Docker镜像源添加方法3. 创建并启动Navidrome容器 前言 本文和大家分享一款目前在G站有11KStar的开源跨平台音乐服务器Navidrome&#xff0c;如何在Linux环境本地使用Docker部署&#xff0c;并结合cpolar内网穿透工具配置公网地址&#xff0c;实…

Docker安装Logstash,并结合logback实现ELK日志收集

拉取镜像 docker pull docker.elastic.co/logstash/logstash:8.14.3创建文件夹 mkdir /mnt/data/logstash创建默认文件 先不做目录挂载&#xff0c;run出一个容器 docker run -d --rm -it docker.elastic.co/logstash/logstash:8.14.3将config和pipeline从容器cp到宿主机 …

BEV学习---LSS-1:论文原理及代码串讲

目前在自动驾驶领域&#xff0c;比较火的一类研究方向是基于采集到的环视图像信息&#xff0c;去构建BEV视角下的特征完成自动驾驶感知的相关任务。所以如何准确的完成从相机视角向BEV视角下的转变就变得由为重要。目前感觉比较主流的方法可以大体分为两种&#xff1a; 1、显式…

Linux驱动.之字符设备驱动框架,及手动,自动注册创建/dev下设备节点详解(一)

一、Linux 字符设备驱动框架 Linux一切皆文件&#xff0c;通过VFS虚拟文件系统&#xff0c;把所有外设的细节都隐藏起来&#xff0c;最后都以文件的形态呈现于应用层&#xff0c; 操作设备&#xff0c;就像对文件的打开、关闭、读写、控制。这种方式完美的统一了对用户的接口&a…

WIN/MAC 图像处理软件Adobe Photoshop PS2024软件下载安装

目录 一、软件概述 1.1 基本信息 1.2 主要功能 二、系统要求 2.1 Windows 系统要求 2.2 macOS 系统要求 三、下载 四、使用教程 4.1 基本界面介绍 4.2 常用工具使用 4.3 进阶操作 一、软件概述 1.1 基本信息 Adobe Photoshop&#xff08;简称PS&#xff09;是一款…

快手怎么免费的去掉视频水印?分享这三个工具给你

​ 我们经常会遇到想要保存的视频带有水印&#xff0c;这不仅影响美观&#xff0c;也不利于分享。为了解决这个问题&#xff0c;我将分享三个免费去除视频水印的工具&#xff0c;帮助你轻松去除水印&#xff0c;享受无干扰的视频体验。 工具一&#xff1a;奈斯水印助手(小程序…

挑战Infiniband, 爆改Ethernet(3)

Infiniband的特点 Infiniband在HPC集群和AI集群中获得广泛应用是和Infiniband技术的特点密切相关的&#xff0c;今天我们看看Infiniband的技术特点: 1&#xff09;网络分层模型 这个分层模型并不是Infiniband技术独有的&#xff0c;各种现代网络技术都普遍采用分层模型来设计…

在.NET开发中使用 Excel 的最佳方式之一:MiniExcel

前言 在桌面开发应用中&#xff0c;处理 Excel 文件是一个非常常见的需求。无论是生成报表、导入数据&#xff0c;还是与客户或其他系统进行数据交换&#xff0c;Excel 文件都扮演着重要角色。在 .NET 生态系统中&#xff0c;有许多处理 Excel 文件的工具和库&#xff0c;其中…

Flutter ListView 实现不同样式 item

我们在实际开发中会创建显示不同类型内容的列表。以下是使用 Flutter 创建此类结构的方法&#xff1a; 1. 创建包含不同类型项目的数据源。 2. 将数据源转换为小部件列表。 创建包含不同类型项目的数据源 项目类型 要表示列表中不同类型的项目&#xff0c;请为每种类型的项目…

链表(静态/动态创建,遍历,计数,查找,在节点的前/后方插入新节点,头插法,尾插法)

目录 一、前言 二、链表的静态创建和遍历 三、链表统计节点&#xff0c;查找节点是否存在 四、从指定节点的后方插入新节点 五、从指定节点的前方插入新节点 六、动态创建链表&尾插法 七、头插法 八、删除节点 一、前言 链表本质是一个结构体&#xff0c;结构体里…

19448 算法设计与分析(第五版)习题2-7 集合划分问题

### 思路 1. **递归公式**&#xff1a;根据提示信息&#xff0c;递归公式为&#xff1a; - \( f(n, x) f(n-1, x-1) f(n-1, x) \times x \) - 其中&#xff0c;\( f(n, x) \) 表示将 \( n \) 个元素分成 \( x \) 个非空子集的方案数。 2. **边界条件**&#xff1a; …

【STM32】串口(异步通信部分)

经典的串口接口硬件说实话在现在的电脑上已经很难见到了&#xff0c;而是被USB这种通用的串行接口替代了&#xff0c;哪怕外部设备要以串口连接到电脑&#xff0c;都需要进行各种硬件转换。但不得不说&#xff0c;在工业领域&#xff0c;串口还是一个非常常用的数据传输方式。 …