Kafka-创建topic源码

news2024/12/26 18:34:49

一、命令创建topic

kafka-topics --create --topic quickstart-events --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

二、kafka-topics脚本

exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TopicCommand "$@"

脚本中指定了处理它的主类:TopicCommand

三、TopicCommand

public abstract class TopicCommand {
    public static void main(String... args) {
        Exit.exit(mainNoExit(args));
    }

    private static int mainNoExit(String... args) {
        try {
            execute(args);
            return 0;
        } catch (Throwable e) {
            return 1;
        }
    }

    static void execute(String... args) throws Exception {
        //解析命令行参数
        TopicCommandOptions opts = new TopicCommandOptions(args);
        //创建TopicService
        TopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer());
        try {
            if (opts.hasCreateOption()) {
                //这是处理topic创建的,我们主要分析它
                topicService.createTopic(opts);
            } else if (opts.hasAlterOption()) {
                //更高topic逻辑
                topicService.alterTopic(opts);
            } else if (opts.hasListOption()) {
                //获取topic
                topicService.listTopics(opts);
            } else if (opts.hasDescribeOption()) {
                //topi相关描述信息
                topicService.describeTopic(opts);
            } else if (opts.hasDeleteOption()) {
                //删除topic
                topicService.deleteTopic(opts);
            }
        }catch(...){...
        }finally {
            topicService.close();
        }
    }

    public static class TopicService implements AutoCloseable {
        public void createTopic(TopicCommandOptions opts) throws Exception {
            CommandTopicPartition topic = new CommandTopicPartition(opts);
            if (Topic.hasCollisionChars(topic.name)) {
                //由于度量名称的限制,带有句点(“.”)或下划线(“_”)的主题可能会发生冲突。为了避免问题,最好使用其中之一,但不要两者都使用
                System.out.println(".........");
            }
            createTopic(topic);
        }


       public void createTopic(CommandTopicPartition topic) throws Exception {
            if (topic.replicationFactor.filter(rf -> rf > Short.MAX_VALUE || rf < 1).isPresent()) {
                //复制因子必须介于1和“+Short.MAX_VALUE+”之间
                throw new IllegalArgumentException("...");
            }
            if (topic.partitions.filter(p -> p < 1).isPresent()) {
                //分区必须大于0
                throw new IllegalArgumentException("...");
            }

            try {
                NewTopic newTopic;
                //取决于创建 topic 时 是否指定了   replica-assignment
                if (topic.hasReplicaAssignment()) {
                    newTopic = new NewTopic(topic.name, topic.replicaAssignment);
                } else {
                    newTopic = new NewTopic(topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue));
                }
                //给topic设置参数
                Map<String, String> configsMap = topic.configsToAdd.stringPropertyNames().stream().collect(Collectors.toMap(name -> name, name -> topic.configsToAdd.getProperty(name)));

                newTopic.configs(configsMap);
                //批量创建topic
                CreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic),
                    new CreateTopicsOptions().retryOnQuotaViolation(false));
                //等待所有topic都创建成功
                createResult.all().get();
                System.out.println("Created topic " + topic.name + ".");
            } catch (ExecutionException e) {
                //......
            }
        }

    }

}

TopicCommandOptions中有对创建topic所有参数的解读,我们下面来详细看下这些参数

四、创建Topic参数

bootstrap-server

        必选项:连接Kafka server用

command-config

        包含要传递给Admin Client的配置的属性文件。这仅与--bootstrap-server选项一起使用,用于描述和更改broker配置

list

        列出所有可用的topic

create

        创建一个新的topic

delete

        删除一个topic

alter

        更改分区数量和副本分配,通过--alter更新现有主题的配置

describe

        列出给定topic的详细信息

topic

        要创建、更改、描述或删除的主题。它还接受正则表达式,但--create选项除外。将主题名称放在双引号中,并使用“\\”前缀转义正则表达式符号;例如 \"test\\.topic\"

topic-id

        仅与用于描述主题的--bootstrap-server选项一起使用

config

        正在创建的主题的主题配置覆盖。

delete-config

        要删除现有主题的主题配置覆盖

partitions

        正在创建或更改的主题的分区数量(警告:如果为具有键的主题增加分区,则分区逻辑或消息顺序将受到影响)。如果未提供用于,则为集群默认值

replication-factor

        正在创建的主题中每个分区的复制因子。如果未提供,则为群集默认值

replica-assignment

        正在创建或更改的topic的手动分区到broker分配列表

under-replicated-partitions

        如果在描述主题时设置,则仅在复制分区下显示

unavailable-partitions

        如果在描述主题时设置,则仅显示其leader不可用的分区

under-min-isr-partitions

        如果在描述主题时设置,则仅显示isr计数 < 配置的最小值的分区。

at-min-isr-partitions

        如果在描述主题时设置,则仅显示isr计数 = 配置的最小值的分区        

topics-with-overrides

        如果在描述主题时设置,则仅显示已覆盖配置的topic

if-exists

        如果在更改、删除或描述主题时设置,则仅当主题存在时才会执行该操作

if-not-exists

        如果在创建主题时设置,则仅当主题不存在时才会执行该操作。

exclude-internal

        运行list或describe命令时排除内部topic。默认情况下,内部topic将被列出

partition-size-limit-per-response

        一个DescribeTopicPartitions响应中包含的最大分区大小

五、AdminClient

从第二步的源码中看到最终将topic的创建交给了AdminClient来完成,下面我们继续往下分析

1、创建

在TopicService的构造方法中创建的AdminClient

它是Kafka的管理客户端,支持管理和检查topic、broker、配置和ACL。

AdminClient的创建用到了bootstrap.servers,它里面有连接KafkaServer的host:port列表。

bootstrap.servers配置仅用于发现群集中的broker,然后AdminClient将根据需要连接到这些broker。因此,只包括两个或三个经纪人地址就足以应对broker不可用的风险。

TopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer());
    public static class TopicService implements AutoCloseable {

        private final Admin adminClient;

        public TopicService(Properties commandConfig, Optional<String> bootstrapServer) {
            this.adminClient = createAdminClient(commandConfig, bootstrapServer);
        }

        private static Admin createAdminClient(Properties commandConfig, Optional<String> bootstrapServer) {
            if (bootstrapServer.isPresent()) {
                commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer.get());
            }
            return Admin.create(commandConfig);
        }

    }

2、交由子类KafkaAdminClient处理

public class KafkaAdminClient extends AdminClient {

    private final AdminClientRunnable runnable;

    //创建一批topic
    public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                           final CreateTopicsOptions options) {
        final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size());
        final CreatableTopicCollection topics = new CreatableTopicCollection();
        for (NewTopic newTopic : newTopics) {
            //判断名字是否符合规范
            if (topicNameIsUnrepresentable(newTopic.name())) {
                KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();
                future.completeExceptionally(new InvalidTopicException("The given topic name '" +
                    newTopic.name() + "' cannot be represented in a request."));
                topicFutures.put(newTopic.name(), future);
            } else if (!topicFutures.containsKey(newTopic.name())) {
                //topicFutures 装的是还没有创建的 topicname
                topicFutures.put(newTopic.name(), new KafkaFutureImpl<>());
                topics.add(newTopic.convertToCreatableTopic());
            }
        }
        if (!topics.isEmpty()) {
            final long now = time.milliseconds();
            final long deadline = calcDeadlineMs(now, options.timeoutMs());
            //里面封装了 ApiKeys.CREATE_TOPICS 请求
            final Call call = getCreateTopicsCall(options, topicFutures, topics,
                Collections.emptyMap(), now, deadline);
            //实现了Runnable接口
            runnable.call(call, now);
        }
        return new CreateTopicsResult(new HashMap<>(topicFutures));
    }
}

从这里我们看到,这里会用一个线程向broker发送ApiKeys.CREATE_TOPICS 请求。下面我们来看broker端怎么处理topics的创建请求的。按照我们之前的经验,要去看KafkaApis中对应ApiKeys.CREATE_TOPICS的处理逻辑

class KafkaApis(...){

      request.header.apiKey match {
        //....
        case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
        case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
        case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
        case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
        case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
        case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
        //.....
      }
}

六、CREATE_TOPICS的处理逻辑

从KafkaApi中我们看到很多请求都调用了maybeForwardToController()方法来处理,但是传入的参数不同,从名称上我们可以猜测这些请求可能交由Controller来处理,回想下《Kafka-Controller角色需要做什么?》中当一个broker当选为Controller时第一件事就是注册监听器,去监听broker改变、topic改变、topic删除、isr改变等,并分别准备好了响应的处理逻辑。因此这里只要让topic发生改变就可以自动触发让Controller处理了。下面看下handleCreateTopicsRequest()中都做了什么?

1、获取ZooKeeper

val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))

2、判断集群当下是否有Controller

如果集群当下没有Controller,直接向客户端返回Errors.NOT_CONTROLLER错误。我们按照集群当下有Controller继续分析。

    if (!zkSupport.controller.isActive) {
      //如果没有contorller,直接向客户端发送响应信息(集群当下没有controller),且这个时候时创建不了topic的,
      createTopicsRequest.data.topics.forEach { topic =>
        results.add(new CreatableTopicResult().setName(topic.name)
          .setErrorCode(Errors.NOT_CONTROLLER.code))
      }
      sendResponseCallback(results)
    } else {
      //正常逻辑
    }

3、检查topic名称

集群元数据topic是一个具有不同实现的内部topic。不应允许用户创建同名的topic。

          if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {
            //拒绝创建内部主题  __cluster_metadata
            info(s"Rejecting creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME}")
          }
          topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))

4、调用ZkAdminManager创建topic

      zkSupport.adminManager.createTopics(
        createTopicsRequest.data.timeoutMs,
        createTopicsRequest.data.validateOnly,
        toCreate,
        authorizedForDescribeConfigs,
        controllerMutationQuota,
        handleCreateTopicsResults)
    }

1、循环校验每个topic是否符合规则

1、topic是否已经存在

2、topic是否为null

3、numPartitions或replicationFactor和replicasAssignments都已设置。两者不能同时使用

2、确定分区分配列表

如果用户指定了列表,那么就直接用用户的,否则使用Kafka自己的分配策略(下篇博客分析)

        val assignments = if (topic.assignments.isEmpty) {
          CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(
            brokers.asJavaCollection, resolvedNumPartitions, resolvedReplicationFactor))
        } else {
          val assignments = new mutable.HashMap[Int, Seq[Int]]
          //注意:我们不会检查replicaAssignment是否包含未知的代理——与添加分区的情况不同,这遵循TopicCommand中的现有逻辑
          topic.assignments.forEach { assignment =>
            assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int)
          }
          assignments
        }

3、topics目录下创建指定的topic

//ConfigType.TOPIC : topics 目录
//topic :要创建的topic名称
zkClient.setOrCreateEntityConfigs(ConfigType.TOPIC, topic, config)

4、topic目录下创建分区目录和对应信息

    writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },isUpdate = false, usesTopicId)

5、创建对应的元数据

CreatePartitionsMetadata(topic.name, assignments.keySet)

七、Controller端处理逻辑

我们找到TopicChange对应的处理逻辑

  override def process(event: ControllerEvent): Unit = {
    try {
      event match {
        case TopicChange =>
          processTopicChange()
        //......
      }
    }
  }


  private def processTopicChange(): Unit = {
    if (!isActive) return
      //从 brokers/topics/目录下获取所有的topic
    val topics = zkClient.getAllTopicsInCluster(true)
    //从controllerContext 获取当下缓存中所有的 topic
    //两者相减获取 新增加的 topic
    val newTopics = topics -- controllerContext.allTopics
    // 获取删除的topic (既topics目录没有,但是缓存中有)
    val deletedTopics = controllerContext.allTopics.diff(topics)
    //设置新的topic到缓存
    controllerContext.setAllTopics(topics)

    //检测zk中 每个topic 目录的变化
    registerPartitionModificationsHandlers(newTopics.toSeq)
    //现在要添加分区和副本了,也就是从topic下获取 topic_id、adding_replicas、removing_replicas、partitions 信息
    val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentAndTopicIdForTopics(newTopics)
    deletedTopics.foreach(controllerContext.removeTopic)
    processTopicIds(addedPartitionReplicaAssignment)

    addedPartitionReplicaAssignment.foreach { case TopicIdReplicaAssignment(_, _, newAssignments) =>
      newAssignments.foreach { case (topicAndPartition, newReplicaAssignment) =>
        //controllerContext 的缓存中 更新分区、副本、leder信息
        controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
      }
    }
    info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
      s"[$addedPartitionReplicaAssignment]")
    if (addedPartitionReplicaAssignment.nonEmpty) {
      val partitionAssignments = addedPartitionReplicaAssignment
        .map { case TopicIdReplicaAssignment(_, _, partitionsReplicas) => partitionsReplicas.keySet }
        .reduce((s1, s2) => s1.union(s2))
      //更高topic下的分区、副本为可用状态 OnlineReplica
      //此时 往topic 生产数据就ok了 
      onNewPartitionCreation(partitionAssignments)
    }
  }

 从源码中我们可以看到,Controller这端会不断的将新的topic以及其下的topic_id、adding_replicas、removing_replicas、partitions 信息加载到缓存,并使用它们的状态机将它们更新至可用状态。并剔除掉删除的topic。始终保持,当向topic生产数据时,它这里都时最新的状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253279.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MeterSphere接口测试提取数组及引用

实际工作中常见的使用场景&#xff1a; 1、提取数组中某个特定值&#xff1b; $.data.groups[n].name提取特定值 2、提取数组中全部值&#xff1b; $.data.groups[*].name&#xff0c;并勾选匹配多条以提取全部值 3、提取数组中的某几个特定值&#xff1b; 如提取数组中第1个和…

【数据结构】队列的概念、结构和实现详解

本文来介绍一下数据结构中的队列&#xff0c;以及如何用C语言去模拟实现。 1.队列的概念及结构 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表。 特点&#xff1a;数据先进先出FIFO&#xff08;first in first out&#xf…

【Linux】设计文件系统(C实现)

要求&#xff1a; (1)可以实现下列几条命令 dir 列文件目录 create 创建文件 delete 删除文件 read 读文件 write 写文件 (2)列目录时要列出文件名、存取权限&#xff08;八进制&#xff09;、文件长度、时间&#xff08;创建时间&#xff0c;修改时间以及…

基于Java Springboot武汉市公交路线查询APP且微信小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 微信…

单片机-- 松瀚sonix学习过程

硬件&#xff1a;松瀚sn8f5701sg、SN-LINK 3 Adapter模拟器、sn-link转接板 软件&#xff1a; keil-c51&#xff08;v9.60&#xff09;&#xff1a;建立工程&#xff0c;编辑&#xff0c;烧录程序 SN-Link_Driver for Keil C51_V3.00.005&#xff1a;安装sonix设备包和snlin…

http(请求方法,状态码,Cookie与)

目录 1.http中常见的Header(KV结构) 2.http请求方法 2.1 请求方法 2.2 telnet 2.3 网页根目录 2.3.1 概念 2.3.2 构建一个首页 2.4 GET与POST方法 2.4.1 提交参数 2.4.2 GET与POST提交参数对比 2.4.3 GET和POST对比 3.状态码 3.1 状态码分类 3.2 3XXX状态码 3.2 …

实现PDF文档加密,访问需要密码

01. 背景 今天下午老板神秘兮兮的来问我&#xff0c;能不能做个文档加密功能&#xff0c;就是那种用户下载打开需要密码才能打开的那种效果。boss都发话了&#xff0c;那必须可以。 需求&#xff1a;将 pdf 文档经过加密处理&#xff0c;客户下载pdf文档&#xff0c;打开文档需…

机器学习周志华学习笔记-第13章<半监督学习>

机器学习周志华学习笔记-第13章&#xff1c;半监督学习&#xff1e; 卷王&#xff0c;请看目录 13半监督学习13.1 生成式方法13.2 半监督SVM13.3 基于分歧的方法13.4 半监督聚类 13半监督学习 前面我们一直围绕的都是监督学习与无监督学习&#xff0c;监督学习指的是训练样本包…

106.【C语言】数据结构之二叉树的三种递归遍历方式

目录 1.知识回顾 2.分析二叉树的三种遍历方式 1.总览 2.前序遍历 3.中序遍历 4.后序遍历 5.层序遍历 3.代码实现 1.准备工作 2.前序遍历函数PreOrder 测试结果 3.中序遍历函数InOrder 测试结果 4.后序遍历函数PostOrder 测试结果 4.底层分析 1.知识回顾 在99.…

1.Git安装与常用命令

前言 Git中会用到的一些基本的Linux命令 ls/ll 查看文件目录 (ll可以看隐藏文件)cat 查看文件内容touch 创建文件vi vi编辑器 1.下载与安装 安装成功后鼠标右键会出现Git Bash和Git GUI Git GUI&#xff1a;GUI图形化界面 Git Bash&#xff1a;Git提供的命令行工具 当安装…

HarmonyOS开发中,如何高效定位并分析内存泄露相关问题

HarmonyOS开发中&#xff0c;如何高效定位并分析内存泄露相关问题 (1)Allocation的应用调试方式Memory泳道Native Allocation泳道 (2)Snapshot(3)ASan的应用使用约束配置参数使能ASan方式一方式二 启用ASanASan检测异常码 (4)HWASan的应用功能介绍约束条件使能HWASan方式一方式…

Spring Cloud+Nacos+KMS 动态配置最佳实践

作者&#xff1a;柳遵飞 前言 Spring Cloud 框架在微服务领域被广大开发者所使用&#xff0c;Value 是每位开发者都会接触到的注解&#xff0c;在 SpringBean 中可以通过 Value 注解引用 application.properties 属性&#xff0c;实现配置代码分离&#xff0c;提升应用代码部…

HTML 快速上手

目录 一. HTML概念 二. HTML标签 1. 标题标签 2. 段落标签 3. 换行标签 4. 图片标签 5. 超链接标签 6. 表格标签 7. 表单标签 7.1 form 标签 7.2 input 标签 (1) 文本框 (2) 单选框 (3) 密码框 (4) 复选框 (5) 普通按钮 (6) 提交按钮 8. select标签 9. 无语义…

微软表示不会使用你的 Word、Excel 数据进行 AI 训练

​微软否认使用 Microsoft 365 应用程序&#xff08;包括 Word、Excel 和 PowerPoint&#xff09;收集数据来训练公司人工智能 (AI) 模型的说法。 此前&#xff0c;Tumblr 的一篇博文声称&#xff0c;雷德蒙德使用“互联体验”功能抓取客户的 Word 和 Excel 数据&#xff0c;用…

「Mac畅玩鸿蒙与硬件36」UI互动应用篇13 - 数字滚动抽奖器

本篇将带你实现一个简单的数字滚动抽奖器。用户点击按钮后&#xff0c;屏幕上的数字会以滚动动画的形式随机变动&#xff0c;最终显示一个抽奖数字。这个项目展示了如何结合定时器、状态管理和动画实现一个有趣的互动应用。 关键词 UI互动应用数字滚动动画效果状态管理用户交…

Selenium3+Python如何操作键盘

selenium操作键盘&#xff0c;需要导入Keys类&#xff1a;“from selenium.webdriver.common.keys import Keys” 调用键盘操作的快捷键的方法 &#xff1a; 单键值&#xff1a;直接传入对应的键值“element.send_keys”(快捷键的键值) 组合键&#xff1a;键值之间由逗号分隔…

从技术视角看AI在Facebook全球化中的作用

在全球化日益加深的今天&#xff0c;人工智能&#xff08;AI&#xff09;作为一种变革性技术&#xff0c;正在深刻影响全球互联网巨头的发展方向。Facebook作为全球最大的社交媒体平台之一&#xff0c;正通过AI技术突破语言、文化和技术的障碍&#xff0c;推动全球化战略的实现…

jmeter 压测常用静默参数解释应用

简介&#xff1a; JMeter静默压测&#xff08;即无界面压测&#xff09;是一种常用的性能测试方法&#xff0c;用于模拟多个用户同时访问系统并测量系统的响应时间和吞吐量等关键性能指标。在JMeter静默压测中&#xff0c;常用的压测参数及其解释如下&#xff1a; 一、基本…

【机器学习】分类任务: 二分类与多分类

二分类与多分类&#xff1a;概念与区别 二分类和多分类是分类任务的两种类型&#xff0c;区分的核心在于目标变量&#xff08;label&#xff09;的类别数&#xff1a; 二分类&#xff1a;目标变量 y 只有两个类别&#xff0c;通常记为 y∈{0,1} 或 y∈{−1,1}。 示例&#xff…

【自用】管材流转项目前端重部署流程 vue2 webpackage4 vuecli4

一、配置 1.下载项目&#xff0c;使用 IDEA 打开&#xff0c;并配置 Nodejs 它提示我&#xff0c;需要 Node.js&#xff0c;因为 nodejs 14 的 installer 已经官网已经找不到了&#xff0c;使用 fnm 又太麻烦&#xff0c; 所以直接采用在 IDEA 中下载的方式就好了。 2.清除缓…