【基础】Kafka -- 主题与分区

news2025/2/24 13:41:24

Kafka -- 主题与分区

  • 主题的管理
    • 创建主题
      • 简单创建与查看
      • 指定分区副本分配创建
      • 指定参数创建
    • 查看主题
      • 主题的简单查看
      • 带附加功能的查看
    • 修改主题
      • 修改分区
      • 修改配置
    • 删除主题
  • 主题配置管理
    • 配置查看与变更
      • 配置查看
      • 配置变更
    • 主题端参数
  • KafkaAdminClient 主题管理
    • 基本使用
      • 创建主题
      • 查看主题
      • 删除主题
      • 修改主题
    • 主题合法性校验
  • 分区的管理
    • 优先副本选举
    • 分区重新分配
    • 修改副本因子

主题的管理

主题管理包括创建主题、查看主题消息、修改主题以及删除主题等操作,Kafka 提供的 kafka-topics.sh 脚本来执行这些操作,脚本位于$KAFKA_HOME/bin/目录下,该脚本实际上是调用了 kafka.admin.TopicCommand 类来执行主题管理的操作。

创建主题

简单创建与查看

若 broker 端的配置参数auto.create.topics.enable设置为 true,那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为 1)、副本因子为default.replication.factor(默认值为 1)的主题。同理,当一个消费者开始从未知主题中读取消息或任意客户端向未知主题发送元数据时也会按照上述参数创建相应的主题。

更加通用的方式是使用 kafka-topics.sh 来创建主题,下列代码创建了一个分区数为 4、副本因子为 2 的主题 topic-create:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-create --partitions 4 --replication-factor 2
Created topic topic-create-diff.

命令中参数的含义如下:

  • --bootstrap-server:zookeeper 连接地址;

  • --create:表示创建主题的指令;

    • --topic:主题名称;

    • --partitions:主题分区数;

    • --replication-factor:分区副本数;

脚本执行成功护,Kafka 会在 log.dir 或 log.dirs 配置参数所指定的目录下创建相应的主题分区,该目录默认为 /tmp/kafka-logs/。在不同的节点分别执行下列命令来查看当前节点创建的主题分区:

# master节点
[root@master kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 417 17:00 topic-create-0
drwxr-xr-x   2 root root 167 417 17:00 topic-create-2
# node01节点
[root@node01 kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 418 08:40 topic-create-1
drwxr-xr-x   2 root root 167 417 16:44 topic-create-2
drwxr-xr-x   2 root root 167 417 16:44 topic-create-3
# node02节点
[root@node02 kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 418 08:40 topic-create-0
drwxr-xr-x   2 root root 167 417 17:01 topic-create-1
drwxr-xr-x   2 root root 167 417 17:01 topic-create-3

可以看到,topic-create 主题存在 0、1、2、3 四个分区,每个分区存在两个副本。主题、分区、副本和 log 日志的关系如下图所示,其中,主题与分区都是提供给用户的抽象,而副本层和 log 层才是实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 当中才能提供有效的数据冗余。

在这里插入图片描述

除上述方法外,还可以使用 kafka-topics.sh 的--describe指令来查看分区副本的分配细节:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1
        Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

指定分区副本分配创建

上述的方法在进行主题的创建时,其分区副本是按照内部既定的逻辑来进行分配的。kafka-topics.sh 脚本还提供了--replica-assignment参数来手动指定分区副本的分配方案:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-create-diff --replica-assignment 0:1,1:2,2:0,2:1
Created topic topic-create-diff.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create-diff
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用英文逗号“,”隔开,分区内多个副本用英文冒号“:”隔开。

使用这种方法需要注意以下几点:

  • 同一个分区内的副本不能有重复,如0:0,1:1这种将会提示 AdminCommandFailedException 异常;

  • 各分区所指定的副本数应相同,如0:1,1,2,1:2这种将会提示 AdminOperationException 异常;

  • 不允许跳过分区进行分配,如0:1,,1:2,2:0这种将会提示 NumberFormatException 异常;

指定参数创建

下列命令使用--config参数来创建一个主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-config --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000
Created topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000
        Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

示例中设置了 cleanup.policy 参数为 compact,以及 max.message.bytes 参数为 10000,这两个参数都是主题端的配置。

查看主题

主题的简单查看

kafka-topics.sh 脚本提供了--list--describe指令来方便的查看主题信息。

使用--list可以查看当前所有的可用主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --list
topic-config
topic-create
topic-create-diff

使用--describe可以查看单个或者多个主题的详细信息,若不适用--topic指定主题则显示所有主题的详细信息:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000
        Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1
        Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000
        Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create,topic-create-diff
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1
        Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

带附加功能的查看

在使用--describe指令时可以额外指定--topics-with-overridesunder-replicated-partitions以及--unavailable-partitions参数来实现一些附加功能。

topics-with-overrides

使用该参数可以查看所有包含覆盖配置的主题,它只会列出包含了与集群不同配置的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topics-with-overrides
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000

under-replicated-partitions

使用该参数可以找出所有包含失效副本的分区,手动停掉 id=1 的 kafka broker,进行查询:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --under-replicated-partitions
        Topic: topic-create     Partition: 1    Leader: 2       Replicas: 1,2   Isr: 2
        Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0
        Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2

重新启动 id=1 的 broker,再次查询,不显示任何结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --under-replicated-partitions
[root@master kafka_2.12-3.4.0]#

unavailable-partitions

使用该参数可以查询主题中没有 leader 副本的分区,这些分区已处于离线状态,对于外界的生产者和消费者来说处于不可用的状态。手动停掉 id=1 和 id=2 的 kafka broker,进行查询:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --unavailable-partitions
        Topic: topic-create     Partition: 1    Leader: none    Replicas: 1,2   Isr: 1
        Topic: topic-create     Partition: 3    Leader: none    Replicas: 2,1   Isr: 1

重新启动 id=1 和 id=2 的 broker,再次查询,不显示任何结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --unavailable-partitions
[root@master kafka_2.12-3.4.0]#

修改主题

kafka-topics.sh 提供了--alert指令用于修改主题的分区数、修改配置等。

修改分区

下列指令可以修改主题的分区数:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --alter --topic topic-config --partitions 3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 3       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000
        Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-config     Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-config     Partition: 2    Leader: 2       Replicas: 2     Isr: 2

Kafka 只支持增加分区数而不支持减少分区数,如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --alter --topic topic-config --partitions 1
Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 1.
[2023-04-18 16:06:33,274] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 1.
 (kafka.admin.TopicCommand$)

修改配置

在当前最新的 kafka 3.4.0 版本中已经不允许使用 kafka-topics.sh 脚本来变更主题的配置了,新版本推荐使用 kafka-configs.sh 脚本实现相关的功能。

删除主题

如果一个主题确定不会再使用,那么最好将其删除释放一些资源。kafka-topics.sh 脚本提供了--delete指令用于主题的删除:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-delete --partitions 1 --replication-factor 1
Created topic topic-delete.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --delete --topic topic-delete

主题的删除与 broker 端配置参数delete.topic.enable有关,该参数配置为 true 时才能够删除主题,默认值为 true。

在删除主题时,若删除的是 Kafka 内部的主题或删除的主题不存在,则会报错。

主题配置管理

kafka-configs.sh 脚本用于对配置进行管理和操作,其可以实现相关配置在运行状态下的动态变更。该脚本包含变更配置指令--alter以及查看配置指令--describe两种类型,增删改操作都可以看作是对配置的变更。该脚本不仅支持操作主题的配置,其同样支持操作 broker、用户、客户端的配置。

配置查看与变更

配置查看

使用 kafka-configs.sh 脚本查看主题配置的命令如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  max.message.bytes=10000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10000, DEFAULT_CONFIG:message.max.bytes=1048588}

其中:

  • --entity-type指定要查看配置的实体类型;

  • --entity-config指定要查看配置的实体名称;

上述两个参数的对应关系如下:

entity-typeentity-name
主题类型,取值为 topics指定主题名称
broker 类型,取值为 brokers指定 brokerId
客户端类型,取值为 clients指定 clientId
用户类型,取值为 users指定用户名

配置变更

使用 kafka-configs.sh 脚本实现配置的变更时,需要将--alter指令与add-config以及delete-config参数一起使用,前者用于实现配置的增、改操作,后者用于实现配置的删除。

add-config

下列命令对主题原有配置进行了覆盖,若需要修改多个参数,则将多个参数使用英文逗号","隔开即可:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --alter --entity-type topics --entity-name topic-config --add-config max.message.bytes=20000
Completed updating config for topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  max.message.bytes=20000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=20000, DEFAULT_CONFIG:message.max.bytes=1048588}

delete-config

下列命令对主题新增的配置进行了删除,还原为默认配置:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --alter --entity-type topics --entity-name topic-config --delete-config max.message.bytes,cleanup.policy
Completed updating config for topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:

主题端参数

主题相关的所有配置参数在 broker 层面都有对应的参数,若没有指定或修改主题的任何配置参数,那么就使用 broker 端对应的参数作为默认值。

与主题相关的参数有很多,在进行配置时自行搜索查看即可。所有配置的查看与修改方法都与上述介绍的方法相同。

KafkaAdminClient 主题管理

一般情况下,我们使用 kafka-topics.sh 脚本来管理主题。但是在某些场景下,我们需要将主题管理类的功能集成到公司内部的系统当中进行统一管理,那么就需要使用程序调用 API 的方式去实现。KafkaAdminClient 类便提供了这些基本功能。

基本使用

创建主题

下列代码展示了如何使用 KafkaAdminClient 类创建一个分区数为 4、副本因子为 1 的主题 topic-admin:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        String topic = "topic-admin";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 定义主题创建的信息
        NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
        // 执行主题创建并获取执行结果
        CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
        try {
            result.all().get();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

查询新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin
Topic: topic-admin      TopicId: 8mKJR8KBQKuUOXXJCFS1SQ PartitionCount: 4       ReplicationFactor: 1    Configs:
        Topic: topic-admin      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-admin      Partition: 2    Leader: 2       Replicas: 2     Isr: 2
        Topic: topic-admin      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

其中,AdminClient 对象的获取调用的是create()方法,其定义如下:

public static AdminClient create(Properties props) {
    return KafkaAdminClient.createInternal(new AdminClientConfig(props), (TimeoutProcessorFactory)null);
}

在进行主题信息的定义时,需要创建一个 NewTopic 对象,其包含下述属性:

public class NewTopic {
    // 主题名称
    private final String name;
    // 分区数
    private final int numPartitions;
    // 副本因子
    private final short replicationFactor;
    // 分配方案
    private final Map<Integer, List<Integer>> replicasAssignments;
    // 配置
    private Map<String, String> configs = null;
    ...
}

指定分区副本分配

若想指定分区副本的分配方案来创建一个主题,可以将代码中“定义主题创建的信息”处的代码替换为下述代码:

// 指定分区副本分配方案
Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
replicasAssignments.put(0, List.of(1));
replicasAssignments.put(1, List.of(0));
replicasAssignments.put(2, List.of(1));
replicasAssignments.put(3, List.of(0));
// 定义主题创建的信息
NewTopic newTopic = new NewTopic(topic, replicasAssignments); // topic:"topic-admin-test1"

查询新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin-test1
Topic: topic-admin-test1        TopicId: pptWSwHbT8m4W_dJBEoWkg PartitionCount: 4       ReplicationFactor: 1    Configs:
        Topic: topic-admin-test1        Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin-test1        Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-admin-test1        Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin-test1        Partition: 3    Leader: 0       Replicas: 0     Isr: 0

指定主题配置参数

若想在创建主题时指定需要覆盖的配置,则需要在 NewTopic 对象中传入 configs 配置集,如下所示:

// 指定主题配置
Map<String, String> configs = new HashMap<>();
configs.put("cleanup.policy", "compact");
configs.put("max.message.bytes", "25000");
NewTopic newTopic = new NewTopic(topic, 4, (short) 1); // topic:"topic-admin-test2"
newTopic.configs(configs);

查看新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin-test2
Topic: topic-admin-test2        TopicId: IFegwoC6Tp-UwlnfwkrlmA PartitionCount: 4       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=25000
        Topic: topic-admin-test2        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-admin-test2        Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: topic-admin-test2        Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin-test2        Partition: 3    Leader: 0       Replicas: 0     Isr: 0

在使用 AdminClient 之后要调用close()方法来释放资源。

查看主题

查看主题可以调用listTopics()或者describeTopics()方法实现。

下列代码使用listTopics()方法查看主题:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 查询主题并获取查询结果
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        try {
            listTopicsResult.names().get().forEach(System.out::println);

        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

查询结果如下:

topic-admin-test1
topic-admin-test2
topic-create
topic-config
topic-create-diff
topic-admin

下列代码使用describeTopics()方法查看主题:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 查询主题并获取查询结果
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("topic-admin-test1", "topic-admin-test2"));
        try {
            describeTopicsResult.all().get().forEach((key, value) -> {
                System.out.println(key + "||" + value);
            });
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

查询结果如下:

topic-admin-test1||(name=topic-admin-test1, internal=false, partitions=(partition=0, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=1, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)),(partition=2, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=3, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)))
topic-admin-test2||(name=topic-admin-test2, internal=false, partitions=(partition=0, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)),(partition=1, leader=node02:9092 (id: 2 rack: null), replicas=node02:9092 (id: 2 rack: null), isr=node02:9092 (id: 2 rack: null)),(partition=2, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=3, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)))

删除主题

调用deleteTopics()方法即可实现主题的删除:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 删除主题
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("topic-admin-test1", "topic-admin-test2"));
        try {
            deleteTopicsResult.all().get();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

删除后执行主题查询,结果如下:

topic-create
topic-config
topic-create-diff
topic-admin

修改主题

使用describeConfigs()方法可以查看主题的具体配置信息,具体使用方法如下:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 配置要查询的实体类型和实体名称
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
        // 执行查询并获取结果
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(List.of(configResource));
        try {
            Config config = describeConfigsResult.all().get().get(configResource);
            System.out.println("==========" + configResource.name() + "==========");
            config.entries().forEach(System.out::println);
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

返回结果如下,该方法会列出主题当中所有的配置信息:

==========topic-admin==========
ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.format.version, value=3.0-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])

使用alterConfigs()方法对主题的配置进行修改的方法如下:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 配置要查询的实体类型和实体名称
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
        // 配置要更改的配置参数
        ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
        Config configs = new Config(List.of(entry));
        Map<ConfigResource, Config> configMap = new HashMap<>();
        configMap.put(configResource, configs);
        // 执行配置更改并获取结果
        AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
        try {
            alterConfigsResult.all().get();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

修改完成后再次执行查询,可以看到配置已经被成功修改:

==========topic-admin==========
...
ConfigEntry(name=cleanup.policy, value=compact, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
...

KafkaAdminClient 还提供了createPartitions()方法用于增加某个主题的分区,其基本使用方法如下:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 配置要查询的实体类型和实体名称
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
        // 配置要更改的分区
        NewPartitions newPartitions = NewPartitions.increaseTo(5);
        Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
        newPartitionsMap.put("topic-admin", newPartitions);
        // 执行增加分区数的操作并获取结果
        CreatePartitionsResult partitionsResult = adminClient.createPartitions(newPartitionsMap);
        try {
            partitionsResult.all().get();
        } catch (Exception exception) {
            exception.printStackTrace();
        }      
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

直接在服务器上调用 kafka-topic.sh 脚本查看主题分区的增加结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin
Topic: topic-admin      TopicId: 8mKJR8KBQKuUOXXJCFS1SQ PartitionCount: 5       ReplicationFactor: 1    Configs: cleanup.policy=compact
        Topic: topic-admin      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-admin      Partition: 2    Leader: 2       Replicas: 2     Isr: 2
        Topic: topic-admin      Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin      Partition: 4    Leader: 2       Replicas: 2     Isr: 2

主题合法性校验

一般在生产环境下,Kafka 的auto.create.topics.enable参数将被设置为 false,集不允许自动创建主题。主题的创建一般由运维人员通过 kafka-topics.sh 脚本创建,若普通用户想要通过 KafkaAdminClient 提供的方法创建主题,建议在代码中对主题的命名、分区数、分区副本数进行校验,对不符合标准的申请进行过滤。

分区的管理

优先副本选举

分区采用多副本的机制以提升可靠性,但是只有 leader 副本对外提供读写服务,follower 副本只负责在内部进行消息的同步。若一个分区的 leader 副本不可用,则意味着整个分区不可用,此时就需要从 follower 副本中挑选一个作为新的 leader 继续对外提供服务。

在进行主题的创建时,主题的分区以及副本会尽可能均匀的分不到 Kafka 集群的各个 broker 节点上,leader 副本的分配也比较均匀。如下所示,创建一个分区数、分区副本数均为 3 的主题 topic-partition 并观察其分区副本的分布:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-partition --partitions 3 --replication-factor 3
Created topic topic-partition.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic-partition  Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到,leader 副本均匀的分布在 id 为 0、2、1 的 broker 节点当中。此时,若某个分区 leader 副本所在的 broker 节点宕机,该分区的一个 follower 节点就会成为新的 leader 节点。当之前的 leader 节点恢复并重新加入集群后,其只能作为一个新的 follower 节点,不在对外提供服务。

我们重启 brokerId=2 的节点,观察分区副本的分布如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2
        Topic: topic-partition  Partition: 1    Leader: 1       Replicas: 2,1,0 Isr: 1,0,2
        Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到分区 1 的 leader 节点从 2 变为了 1,而分区 2 的 leader 节点也为 1,这就导致了原本均衡的负载状态被打破,节点 1 的负载最高。

为处理上述负载失衡的情况,Kafka 引入了优先副本 preferred replica 的概念。优先副本即 AR 集合中的第一个副本,即查询主题信息时其中的 Replicas 集合。如上述查询结果所示,分区 1 的 AR 集合为【2,1,0】,其中第一个副本为 2,即分区 1 的优先副本为 2。

基于优先副本的概念,Kafka 提供了分区自动平衡的功能,该功能对应的 broker 端参数为auto.leader.rebalance.enable(默认值为 true)。自动平衡功能开启时,Kafka 会启动一个定时任务,该任务会轮询所有的 broker 节点,计算每个 broker 节点的分区不平衡率(=非优先副本的 leader 数量/分区总数)。若该值超过了leader.imbalance.per.broker.percentage(默认值为 10%)参数所配置的比率,就会自动执行优先副本选举的动作对分区进行平衡。自动平衡分区动作的执行周期由参数leader.imbalance.check.interval.seconds(默认值 300s)控制。

经过一段时间后再次查询分区副本分布:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2
        Topic: topic-partition  Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 1,0,2
        Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到分区 1 的 leader 节点自动平衡为 2。

分区重新分配

在日常的生产工作中,可能会碰到以下的场景:

  • 某节点宕机后,该节点上的分区副本将处于失效的状态,Kafka 并不会将失效的分区副本自动的迁移到集群中可用的 broker 节点上;

  • 当需要对集群中的某个节点进行有计划的下线时,需要通过某种方式将该节点上的副本迁移到其他可用节点上;

  • 当集群中增加了新的节点,只有新创建的主题分区才有可能被分配到这个节点上,已经存在的主题需要某种重分配的方式使分区分配更加合理;

为解决上述问题,Kafka 提供了 kafka-reassign-partitions.sh 脚本来执行分区的重分配。创建一个分区数为 4 分区副本因子为 2 的主题 topic-assign 用于测试:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-assign --partitions 4 --replication-factor 2
Created topic topic-assign.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign
Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-assign     Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: topic-assign     Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2

此时,如果我们想将 brokerId=1 的节点下线,首先需要将该节点上的分区副本迁移出去,整个迁移的操作如下所示:

首先创建一个 json 文件,文件内容包含要进行分区重分配的主题清单:

{   
    "topics":[
        {
            "topic":"topic-assign"    
        }
    ],
    "version":1
}

然后根据该文件内容,指定索要分配的 broker 的节点列表来生成一份候选的重分配方案:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --generate --topics-to-move-json-file /root/reassignment.json --broker-list 0,2
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[1,2],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[0,2],"log_dirs":["any","any"]}]}

返回结果中包含两部分内容:Current partition replica assignment 后的内容展示的是当前分区副本的分配情况;Proposed partition reassignment configuration 后的内容展示的是重分配的候选方案。

接下来将提供的候选方案保存在一个 json 文件当中,执行下列命令进行分区重分配,并查看结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --execute --reassignment-json-file /root/assignment.json
Current partition replica assignment

{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[1,2],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-assign-0,topic-assign-1,topic-assign-2,topic-assign-3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign                                                            Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-assign     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: topic-assign     Partition: 3    Leader: 0       Replicas: 0,2   Isr: 2,0

可以看到分区的分配已经按照 json 文件中定义的方式进行了重分配。重分配的本质是首先增加新的副本,然后进行数据的同步,最后将就的副本删除。数据的复制会占用额外的资源。

修改副本因子

kafka-reassign-partition.sh 脚本还能够实现分区重分配的功能,将上一小节中的 json 文件进行修改,进行副本因子的增加,如下:

{
	"version": 1,
	"partitions": [
		{
			"topic": "topic-assign",
			"partition": 0,
			"replicas": [
				2,
                1,
				0
			],
			"log_dirs": [
				"any",
                "any",
				"any"
			]
		},
		{
			"topic": "topic-assign",
			"partition": 1,
			"replicas": [
				0,
                1,
				2
			],
			"log_dirs": [
				"any",
                "any",
				"any"
			]
		},
		{
			"topic": "topic-assign",
			"partition": 2,
			"replicas": [
				2,
                1,
				0
			],
			"log_dirs": [
				"any",
                "any",
				"any"
			]
		},
		{
			"topic": "topic-assign",
			"partition": 3,
			"replicas": [
				0,
                1,
				2
			],
			"log_dirs": [
				"any",
                "any",
				"any"
			]
		}
	]
}

执行脚本命令,并再次查看分区副本:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --execute --reassignment-json-file /root/assignment.json
Current partition replica assignment

{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[0,2],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-assign-0,topic-assign-1,topic-assign-2,topic-assign-3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign
Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 3    Configs:
        Topic: topic-assign     Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,2,1
        Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic-assign     Partition: 3    Leader: 0       Replicas: 0,1,2 Isr: 0,2,1

可以看到所有分区的副本中都增加了在 brokerId=1 中的副本。该方法同样适用与减少分区副本数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/444007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【创作赢红包】SQL Server之索引设计

SQL Server之索引设计 一、前言二、索引设计背景知识2.1、索引设计策略包括的任务 三、常规索引设计3.1、数据库注意事项3.2、查询注意事项3.3、列注意事项3.4、索引的特征3.5、索引排序顺序设计指南 总结 一、前言 索引设计不佳和缺少索引是提高数据库和应用程序性能的主要障…

【Spring Data Jpa】原生Jpa的使用

【Spring Data Jpa】原生Jpa的使用 1. Dependency2. Config1.1 persistence.xml1.2 Entity1.3 application.properties 3. Test4. 原生JdbcTemplate 5. Awakening 1. Dependency <dependency><groupId>org.hibernate</groupId><artifactId>hibernate-e…

本地连接github

本地连接github 想要通过github把本地代码同步一下&#xff0c;但是每次换一个电脑都要重新搜索如何配置连接github&#xff0c;趁着这次机会把电脑配置的时候记录一下&#xff0c;到时候找起来方便一点 一、git环境配置 1、首先安装git 找个安装包直接安装就行 2、配置用…

跨越行业壁垒:金融校对软件在跨国金融业务中的应用

随着全球金融市场的融合和跨国金融业务的快速发展&#xff0c;金融专业人士需要处理不同语言、文化和法规背景下的金融文档。金融校对软件在这一领域发挥着至关重要的作用&#xff0c;为跨国金融业务提供有力支持。本文将探讨金融校对软件在跨国金融业务中的应用。 一、跨语言支…

【故障诊断】基于最小熵反卷积、最大相关峰度反卷积和最大二阶环平稳盲反卷积等盲反卷积方法在机械故障诊断中的应用研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【工具】使用VS Code调试Docker Container中的代码

目录 使用VS Code调试Docker Container中的Autoware.ai代码第一种方法 -- 在VS Code中进行DebugStep1Step2Step3Step4c_cpp_properties.jsonlaunch.jsonsettings.jsontask.json Step5Step6Step7参考链接 第二种方法 -- cmake重新编译cmake使用方法&#xff08;简介&#xff09;…

SPARQL endpoint with Ontop CLI部署,python使用SPARQLWrapper

Ontop CLI部署&#xff0c;避免踩坑 0.前言1.提示2.详细部署流程3.python操作4.碎碎念 0.前言 教程&#xff1a;Setting up an Ontop SPARQL endpoint with Ontop CLI照着教程来&#xff0c;不知道为啥&#xff0c;总是报错&#xff0c;后来发现&#xff0c;手机搜到的跟电脑不…

Mybatis模糊查询——三种定义参数方法和聚合查询、主键回填

目录 相关导读 一、使用#定义参数 1. 持久层接口添加根据名字内容模糊查询方法 2. UserMapper.xml映射文件添加标签 3. 添加测试方法 4. 运行结果 二、使用$定义参数 1. UserMapper.xml映射文件更改标签内容 2. 修改测试方法 3. 运行结果 三、使用标签定义参数 1. …

JavaWeb开发 —— MyBatis入门

目录 一、快速入门程序 二、配置SQL提示 三、JDBC 四、数据库连接池 五、lombok工具包 MyBatis是一款优秀的 持久层Dao层 框架&#xff0c;用于简化JDBC的开发。 MyBatis本是 Apache的一个开源项目iBatis, 2010年这个项目由apache迁移到了 google code&#xff0c;并且改…

企业为什么都需要产品手册?

随着科技的不断发展和市场竞争的日益激烈&#xff0c;企业在推广和销售产品时需要给客户提供更多的信息和保障&#xff0c;而产品手册就成为了必不可少的工具之一。本文将从以下几个方面详细介绍企业为什么都需要产品手册。 产品手册的定义和作用 产品手册是一本介绍企业产品…

【C++初阶】:指针空值nullptr

指针空值nullptr 一.空指针二.空指针nullptr 一.空指针 在良好的C/C编程习惯中&#xff0c;声明一个变量时最好给该变量一个合适的初始值&#xff0c;否则可能会出现不可预料的错误&#xff0c;比如未初始化的指针。如果一个指针没有合法的指向&#xff0c;我们基本都是按照如下…

无人机巡检智能一体化解决方案

随着无人机技术的不断发展&#xff0c;无人机应用领域已经越来越生活化&#xff0c;其产品不仅在军事、商业等领域得到了广泛应用&#xff0c;也在普通人的生活中得到了广泛应用。无人机的自动巡检是无人机应用的一个重要方向&#xff0c;具有广阔的发展前景&#xff0c;本文将…

太阳的G2

我已经忘记是怎么喜欢上保罗的 入职腾讯的第一天&#xff0c;同事看到我的英文名cris&#xff0c;就笃信我应该是保罗的球迷。 是的&#xff0c;我是保罗的球迷「当然&#xff0c;不只是保罗的球迷」。 14-15赛季&#xff0c;保罗在的快船跟马刺鏖战7场&#xff0c;硬是在第7场…

NumberPicker分析(二)

NumberPicker分析(二) NumberPicker继承自LinearLayout。一般而言&#xff0c;无论是继承自View&#xff0c;还是继承自ViewGroup&#xff0c;必然会经过如下的几个阶段&#xff1a; onMeasureonLayoutonDraw onMeasure 在onMeasure方法测量当前控件大小&#xff0c;为正式…

Faster RCNN系列2——RPN的真值与预测值概述

Faster RCNN系列&#xff1a; Faster RCNN系列1——Anchor生成过程 Faster RCNN系列2——RPN的真值与预测值概述 Faster RCNN系列3——RPN的真值详解与损失值计算 Faster RCNN系列4——生成Proposal与RoI Faster RCNN系列5——RoI Pooling与全连接层 对于目标检测任务&#xf…

Replicator简介

Replicator 文章目录 ReplicatorReplicator简介合成数据训练背后的理论Replicator核心组件已知的问题 Replicator简介 Omniverse Replicator 是一个高度可扩展的框架&#xff0c;构建在可扩展的 Omniverse 平台上&#xff0c;可生成物理上准确的 3D 合成数据&#xff0c;以加速…

传输线的物理基础(十):特性阻抗的频率变化

到目前为止&#xff0c;我们一直假设传输线的特性阻抗随频率保持不变。正如我们所见&#xff0c;从传输线前端看&#xff0c;输入阻抗与频率密切相关。毕竟&#xff0c;在低频时&#xff0c;远端开路的传输线的输入阻抗看起来像一个电容器&#xff0c;阻抗开始很高&#xff0c;…

JavaScript中的执行上下文和执行栈

执行上下文概念以及理解 执行上下文是评估和执行JavaScript代码环境的抽象概念&#xff0c;但我们在JavaScript中所做的声明变量&#xff0c;声明函数&#xff0c;执行函数。他们都是在执行上下文中运行&#xff0c;也有了所谓的作用域。 执行上下文的类型 执行上下文分为三…

创建vite+vue+electron项目

写在前面的废话 首先&#xff0c;这是一篇缝合文&#xff0c;我的目的就是想用vite、vue结合electron打包一个windows应用&#xff1b;其次&#xff0c;项目只是这三个工具的简单应用&#xff0c;目前还没有往里面添加其他内容。再次&#xff0c;项目过程中参考了google的多篇文…

执行数学的运算

数学是计算机编程的重要能力。遗憾的是&#xff0c;对shell脚本来说&#xff0c;这个处理过程比较麻烦。在shell脚本中两种途径来进行数学运算。 expr命令 最开始&#xff0c;Bourne shell提供了一个特别的命令用来处理数学表达式。expr命令允许在命令行上处理数学数学表达式。…