kafka源码阅读-ReplicaStateMachine(副本状态机)解析

news2024/9/20 10:31:52

概述

Kafka源码包含多个模块,每个模块负责不同的功能。以下是一些核心模块及其功能的概述:

  1. 服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。

  2. Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。

  3. Connect源码 :用来构建异构数据双向流式同步服务。

  4. Stream源码 :用来实现实时流处理相关功能。

  5. Raft源码 :实现了Raft一致性协议。

  6. Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。

  7. Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。

  8. Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。

  9. Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。

  10. Common模块 :包含各种异常类以及错误验证。

  11. Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。

  12. Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。

  13. Coordinator模块 :负责管理部分consumer group和他们的offset。

  14. Javaapi模块 :提供Java语言的Producer和Consumer的API接口。

  15. Log模块 :负责Kafka文件存储,读写所有Topic消息数据。

  16. Message模块 :封装多条数据组成数据集或压缩数据集。

  17. Metrics模块 :负责内部状态监控。

  18. Network模块 :处理客户端连接,网络事件模块。

  19. Producer模块 :生产者细节实现,包括同步和异步消息发送。

  20. Security模块 :负责Kafka的安全验证和管理。

  21. Serializer模块 :序列化和反序列化消息内容。

  22. Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。

  23. Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。

  24. Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。

这些模块共同构成了Kafka的整体架构,使其能够提供高吞吐量、高可用性的消息队列服务。

kafka源码分支为1.0.2

分区状态机记录着当前集群所有 Partition 的状态信息以及如何对 Partition 状态转移进行相应的处理;副本状态机则是记录着当前集群所有 Replica 的状态信息以及如何对 Replica 状态转变进行相应的处理。

kafkaController初始化时,会启动replicaStateMachine和partitionStateMachine:

    //在 KafkaController 中
    //有两个状态机:分区状态机和副本状态机;
    //一个管理器:Channel 管理器,负责管理所有的 Broker 通信;
    //相关缓存:Partition 信息、Topic 信息、broker id 信息等;
    //四种 leader 选举机制:分别是用 leader offline、broker 掉线、partition reassign、最优 leader 选举时触发;
    //启动副本状态机,初始化所有 Replica 的状态信息,如果 Replica 所在节点是 alive 的,那么状态更新为 OnlineReplica, 否则更新为 ReplicaDeletionIneligible;
    replicaStateMachine.startup()
    //启动分区状态机,初始化所有 Partition 的状态信息,如果 leader 所在 broker 是 alive 的,那么状态更新为 OnlinePartition,否则更新为 OfflinePartition
    partitionStateMachine.startup()

ReplicaStateMachine类相关方法:

  /**
   * Invoked on successful controller election. First registers a broker change listener since that triggers all
   * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
   * Then triggers the OnlineReplica state change for all replicas.
   */
  def startup() {
    // //初始化所有副本的状态信息
    initializeReplicaState()
    //将online的replica状态转变为OnlineReplica
    handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)

    info("Started replica state machine with initial state -> " + replicaState.toString())
  }

  /**
   * Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions
   * in zookeeper
   */
    //初始化所有副本的状态信息
    // 这里只是将 Replica 的状态信息更新副本状态机的缓存 replicaState 中,并没有真正进行状态转移的操作。
  private def initializeReplicaState() {
    for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
      val topic = topicPartition.topic
      val partition = topicPartition.partition
      assignedReplicas.foreach { replicaId =>
        val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
        //如果 Replica 所在机器是 alive 的,那么将其状态设置为 OnlineReplica
        //replicaId即brokerId
        if (controllerContext.isReplicaOnline(replicaId, topicPartition))
          replicaState.put(partitionAndReplica, OnlineReplica)
        else {
          // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
          // This is required during controller failover since during controller failover a broker can go down,
          // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
          //否则设置为 ReplicaDeletionIneligible 状态
          replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
        }
      }
    }
  }

  /**
   * This API is invoked by the broker change controller callbacks and the startup API of the state machine
   * @param replicas     The list of replicas (brokers) that need to be transitioned to the target state
   * @param targetState  The state that the replicas should be moved to
   * The controller's allLeaders cache should have been updated before this
   */
    //用于处理 Replica 状态的变化
  def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
                         callbacks: Callbacks = (new CallbackBuilder).build) {
    if (replicas.nonEmpty) {
      info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
      try {
        brokerRequestBatch.newBatch()
        //状态转变
        replicas.foreach(r => handleStateChange(r, targetState, callbacks))
        //向 broker 发送相应请求
        brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
      } catch {
        case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
      }
    }
  }

  /**
   * This API exercises the replica's state machine. It ensures that every state transition happens from a legal
   * previous state to the target state. Valid state transitions are:
   * NonExistentReplica --> NewReplica
   * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
   *   partition to every live broker
   *
   * NewReplica -> OnlineReplica
   * --add the new replica to the assigned replica list if needed
   *
   * OnlineReplica,OfflineReplica -> OnlineReplica
   * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
   *   partition to every live broker
   *
   * NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica
   * --send StopReplicaRequest to the replica (w/o deletion)
   * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
   *   UpdateMetadata request for the partition to every live broker.
   *
   * OfflineReplica -> ReplicaDeletionStarted
   * --send StopReplicaRequest to the replica (with deletion)
   *
   * ReplicaDeletionStarted -> ReplicaDeletionSuccessful
   * -- mark the state of the replica in the state machine
   *
   * ReplicaDeletionStarted -> ReplicaDeletionIneligible
   * -- mark the state of the replica in the state machine
   *
   * ReplicaDeletionSuccessful -> NonExistentReplica
   * -- remove the replica from the in memory partition replica assignment cache


   * @param partitionAndReplica The replica for which the state transition is invoked
   * @param targetState The end state that the replica should be moved to
   */
  def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
                        callbacks: Callbacks) {
    val topic = partitionAndReplica.topic
    val partition = partitionAndReplica.partition
    val replicaId = partitionAndReplica.replica
    val topicAndPartition = TopicAndPartition(topic, partition)
    // Replica 不存在的话,状态初始化为 NonExistentReplica
    val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
    try {

      def logStateChange(): Unit =
        stateChangeLog.trace(s"Changed state of replica $replicaId for partition $topicAndPartition from " +
          s"$currState to $targetState")

      val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
      //校验状态转变是否符合要求
      assertValidTransition(partitionAndReplica, targetState)
      targetState match {
        case NewReplica => 其前置状态只能为 NonExistentReplica
          // start replica as a follower to the current leader for its partition
          //从 zk 获取 Partition 的 leaderAndIsr 信息
          val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
          leaderIsrAndControllerEpochOpt match {
            case Some(leaderIsrAndControllerEpoch) =>
              //若是leader的replica状态不能变为NewReplica
              if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
                throw new StateChangeFailedException(s"Replica $replicaId for partition $topicAndPartition cannot " +
                  s"be moved to NewReplica state as it is being requested to become leader")
              //向该 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求
              brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
                                                                  topic, partition, leaderIsrAndControllerEpoch,
                                                                  replicaAssignment, isNew = true)
              //对于新建的 Partition,处于这个状态时,该 Partition 是没有相应的 LeaderAndIsr 信息的
            case None => // new leader request will be sent to this replica when one gets elected
          }
          //将该 Replica 的状态转移成 NewReplica,然后结束流程。
          replicaState.put(partitionAndReplica, NewReplica)
          logStateChange()
        case ReplicaDeletionStarted => //其前置状态只能为 OfflineReplica
          //更新向该 Replica 的状态为 ReplicaDeletionStarted;
          replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
          // send stop replica command
          //发送 StopReplica 请求给该副本,并设置 deletePartition=true
          //broker收到这请求后,会从物理存储上删除这个 Replica 的数据内容
          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
            callbacks.stopReplicaResponseCallback)
          logStateChange()
        case ReplicaDeletionIneligible => //其前置状态只能为 ReplicaDeletionStarted
          replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
          logStateChange()
        case ReplicaDeletionSuccessful => //其前置状态只能为 ReplicaDeletionStarted
          replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
          logStateChange()
        case NonExistentReplica => //其前置状态只能为 ReplicaDeletionSuccessful。
          // NonExistentReplica 是副本完全删除、不存在这个副本的状态
          // remove this replica from the assigned replicas list for its partition
          //在 controller 的 partitionReplicaAssignment 删除这个 Partition 对应的 replica 信息;
          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
          //将这个 Topic 从缓存中删除。
          replicaState.remove(partitionAndReplica)
          logStateChange()
        case OnlineReplica =>//其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible
          //副本正常工作时的状态,此时的 Replica 既可以作为 leader 也可以作为 follower
          replicaState(partitionAndReplica) match {
            case NewReplica => //其前置状态如果为 NewReplica
              // add this replica to the assigned replicas list for its partition
              //从 Controller 的 partitionReplicaAssignment 中获取这个 Partition 的 AR;
              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
              //如果 Replica 不在 AR 中的话,那么就将其添加到 Partition 的 AR 中;
              if(!currentAssignedReplicas.contains(replicaId))
                controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
              logStateChange()
            case _ => //其前置状态如果为:OnlineReplica, OfflineReplica, ReplicaDeletionIneligible
              // check if the leader for this partition ever existed
              //如果该 Partition 的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的状态,并发送相应的请求
              //否则不做任何处理;
              controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
                case Some(leaderIsrAndControllerEpoch) =>
                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                    replicaAssignment)
                  replicaState.put(partitionAndReplica, OnlineReplica)
                  logStateChange()
                case None => // that means the partition was never in OnlinePartition state, this means the broker never
                             // started a log for that partition and does not have a high watermark value for this partition
              }
          }
          //最后将 Replica 的状态设置为 OnlineReplica 状态。
          replicaState.put(partitionAndReplica, OnlineReplica)
        case OfflineReplica => //其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible
          // send stop replica command to the replica so that it stops fetching from the leader
          //发送 StopReplica 请求给该副本,先停止副本同步 (deletePartition = false)
          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
          // As an optimization, the controller removes dead replicas from the ISR
          val leaderAndIsrIsEmpty: Boolean =
            controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
              case Some(_) =>
                //将该 replica 从 Partition 的 isr 移除这个 replica(前提 isr 中还有其他有效副本)
                controller.removeReplicaFromIsr(topic, partition, replicaId) match {
                  case Some(updatedLeaderIsrAndControllerEpoch) =>
                    // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
                    val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
                    if (!controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition)) {
                      // 发送 LeaderAndIsr 请求给剩余的其他副本,因为 ISR 变动了
                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
                        topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                    }
                    //更新这个 Replica 的状态为 OfflineReplica
                    replicaState.put(partitionAndReplica, OfflineReplica)
                    logStateChange()
                    false
                  case None =>
                    true
                }
              case None =>
                true
            }
          if (leaderAndIsrIsEmpty && !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))
            throw new StateChangeFailedException(
              s"Failed to change state of replica $replicaId for partition $topicAndPartition since the leader " +
                s"and isr path in zookeeper is empty")

      }
    }
    catch {
      case t: Throwable =>
        stateChangeLog.error(s"Initiated state change of replica $replicaId for partition $topicAndPartition from " +
          s"$currState to $targetState failed", t)
    }
  }

上面 Replica 各种转移的触发的条件:
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1951599.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kettle从入门到精通 第八十一课 ETL之kettle kettle中的json对象字段写入postgresql中的json字段正确姿势

1、上一节可讲解了如何将json数据写入pg数据库表中的json字段&#xff0c;虽然实现了效果&#xff0c;但若客户继续使用表输出步骤则仍然无法解决问题。 正确的的解决方式是设置数据库连接参数stringtypeunspecified 2、stringtypeunspecified 参数的作用&#xff1a; 当设置…

重生之我当程序猿外包

第一章 个人介绍与收入历程 我出生于1999年&#xff0c;在大四下学期进入了一家互联网公司实习。当时的实习工资是3500元&#xff0c;公司还提供住宿。作为一名实习生&#xff0c;这个工资足够支付生活开销&#xff0c;每个月还能给父母转1000元&#xff0c;自己留2500元用来吃…

RustDesk远程控屏软件使用教学

RustDesk自建服务器使用教学RustDesk远程控屏软件使用教学 下载软件后 右键管理员运行 点击右上角设置按钮 管理员运行 保证启动服务 点击左侧导航栏网络按钮 复制域名或者ip地址到 ID服务器 输入框 然后点击应用即可

C++ | Leetcode C++题解之第295题数据流的中位数

题目&#xff1a; 题解&#xff1a; class MedianFinder {multiset<int> nums;multiset<int>::iterator left, right;public:MedianFinder() : left(nums.end()), right(nums.end()) {}void addNum(int num) {const size_t n nums.size();nums.insert(num);if (!…

MyBatis SQL语句 #{} 和 ${}的区别

T04BF &#x1f44b;专栏: 算法|JAVA|MySQL|C语言 &#x1faf5; 今天你敲代码了吗 文章目录 关于 #{} 和 ${}1. Integer类型的参数2.String 类型的参数3. #{} 和 ${} 的区别3.1 性能问题:3.2 安全问题 关于 #{} 和 ${} MyBatis参数赋值有两种方式:#{} 和 ${} 1. Integer类型…

Spring Security学习笔记(二)Spring Security认证和鉴权

前言&#xff1a;本系列博客基于Spring Boot 2.6.x依赖的Spring Security5.6.x版本 上一篇博客介绍了Spring Security的整体架构&#xff0c;本篇博客要讲的是Spring Security的认证和鉴权两个重要的机制。 UsernamePasswordAuthenticationFilter和BasicAuthenticationFilter是…

C++:平衡搜索二叉树(AVL)

hello&#xff0c;各位小伙伴&#xff0c;本篇文章跟大家一起学习《C&#xff1a;平衡搜索二叉树&#xff08;AVL&#xff09;》&#xff0c;感谢大家对我上一篇的支持&#xff0c;如有什么问题&#xff0c;还请多多指教 &#xff01; 文章目录 :maple_leaf:AVL树:maple_leaf:…

Redis 7.x 系列【27】集群原理之通信机制

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2 节点和节点2.1 集群拓扑2.2 集群总线协议2.3 流言协议2.4 心跳机制2.5 节点握…

matlab仿真 数字信号载波传输(下)

&#xff08;内容源自详解MATLAB&#xff0f;SIMULINK 通信系统建模与仿真 刘学勇编著第七 章内容&#xff0c;有兴趣的读者请阅读原书&#xff09; clear all M8; msg[1 4 3 0 7 5 2 6]; ts0.01; T1; %t0:ts:T; t0:ts:T-ts; %x0:ts:length(msg); x0:ts:length(msg)-ts; f…

爬虫 APP 逆向 ---> 粉笔考研

环境&#xff1a; 粉笔考研 v6.3.15&#xff1a;https://www.wandoujia.com/apps/1220941/history_v6031500雷电9 模拟器&#xff1a;https://www.ldmnq.com/安装 magisk&#xff1a;https://blog.csdn.net/Ruaki/article/details/135580772安装 Dia 插件 (作用&#xff1a;禁…

前端开发知识-vue

大括号里边放键值对&#xff0c;即是一个对象。 一、vue可以简化前端javascript的操作。 主要特点是可以实现视图、数据的双向绑定。 使用vue主要分为三个步骤&#xff1a; 1.javascript中引入vue.js 可以src中可以是vue的网址&#xff0c;也可以是本地下载。 2.在javasc…

地形材质制作(能使地面湿润)

如图&#xff0c;创建一个材质并写以下逻辑 Landscape Layer Blend节点能使在地形模式绘制中有三个选择&#xff0c;根据以上逻辑&#xff0c;Red是原材质,Green是绿色材质也就是草&#xff0c;Blue为水&#xff08;这个我认为比较重要&#xff09; Blue的颜色最好为这个 这个节…

董宇辉离职,我一点都不意外!只不过感觉来的太快

下面这张图&#xff0c;是我在半年多前写的一段随笔&#xff0c;没想到来的这么快&#xff01; 碰巧的是今天中午&#xff0c;在开发者群里有两位老铁自曝&#xff0c;本以为能公司干到老&#xff0c;但公司却不给机会&#xff0c;已经不在是公司员工了。 最近&#xff0c;晓衡…

一些关于颜色的网站

欢迎来到 破晓的历程的 博客 ⛺️不负时光&#xff0c;不负己✈️ 1、中国传统色 2、网页颜色选择器 3、渐变色网站 4、多风味色卡生成 5、波浪生成 6、半透明磨砂框 色卡组合

苍穹外卖01

0. 配置maven (仅一次的操作 1.项目导入idea 2. 保证nginx服务器运行 &#xff08;nginx.exe要在非中文的目录下&#xff09; 开启服务&#xff1a; start nginx 查看任务进程是否存在&#xff1a; tasklist /fi "imagename eq nginx.exe" 关闭ngi…

SAPUI5基础知识20 - 对话框和碎片(Dialogs and Fragments)

1. 背景 在 SAPUI5 中&#xff0c;Fragments 是一种轻量级的 UI 组件&#xff0c;类似于视图&#xff08;Views&#xff09;&#xff0c;但它们没有自己的控制器&#xff08;Controller&#xff09;。Fragments 通常用于定义可以在多个视图中重用的 UI 片段&#xff0c;从而提…

【数据结构--排序】

目录 一、排序概述1.1、排序的相关定义1.2、排序用到的结构与函数 二、常见排序算法2.1、冒泡算法&#xff08;交换顺序&#xff09;&#xff08;1&#xff09;算法&#xff08;2&#xff09;性能分析 2.2、简单选择排序&#xff08;1&#xff09;算法&#xff08;2&#xff09…

express连接mysql

一、 安装express npm install express --save二、express配置 //引入 const express require("express"); //创建实例 const app express(); //启动服务 app.listen(8081, () > {console.log("http://localhost:8081"); });三、安装mysql npm i m…

《昇思25天学习打卡营第6天|ResNet50图像分类》

写在前面 从本次开始&#xff0c;接触一些上层应用。 本次通过经典的模型&#xff0c;开始本次任务。这里开始学习resnet50网络模型&#xff0c;应该也会有resnet18&#xff0c;估计18的模型速度会更快一些。 resnet 通过对论文的结论进行展示&#xff0c;说明了模型的功能&…

第2章 编译SDK

安装编译依赖 sudo apt-get update sudo apt-get install clang-format astyle libncurses5-dev build-essential python-configparser sconssudo apt-get install repo git ssh make gcc libssl-dev liblz4-tool \ expect g patchelf chrpath gawk texinfo chrpath diffstat …