服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。
Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。
Connect源码 :用来构建异构数据双向流式同步服务。
Stream源码 :用来实现实时流处理相关功能。
Raft源码 :实现了Raft一致性协议。
Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。
Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。
Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。
Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。
Common模块 :包含各种异常类以及错误验证。
Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。
Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。
Coordinator模块 :负责管理部分consumer group和他们的offset。
Javaapi模块 :提供Java语言的Producer和Consumer的API接口。
Log模块 :负责Kafka文件存储,读写所有Topic消息数据。
Message模块 :封装多条数据组成数据集或压缩数据集。
Metrics模块 :负责内部状态监控。
Network模块 :处理客户端连接,网络事件模块。
Producer模块 :生产者细节实现,包括同步和异步消息发送。
Security模块 :负责Kafka的安全验证和管理。
Serializer模块 :序列化和反序列化消息内容。
Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。
Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。
Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。
分区状态机记录着当前集群所有 Partition 的状态信息以及如何对 Partition 状态转移进行相应的处理;副本状态机则是记录着当前集群所有 Replica 的状态信息以及如何对 Replica 状态转变进行相应的处理。
//在 KafkaController 中
//一个管理器:Channel 管理器,负责管理所有的 Broker 通信;
//相关缓存:Partition 信息、Topic 信息、broker id 信息等;
//四种 leader 选举机制:分别是用 leader offline、broker 掉线、partition reassign、最优 leader 选举时触发;
//启动副本状态机,初始化所有 Replica 的状态信息,如果 Replica 所在节点是 alive 的,那么状态更新为 OnlineReplica, 否则更新为 ReplicaDeletionIneligible;
//启动分区状态机,初始化所有 Partition 的状态信息,如果 leader 所在 broker 是 alive 的,那么状态更新为 OnlinePartition,否则更新为 OfflinePartition
* Invoked on successful controller election. First registers a broker change listener since that triggers all
* state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
* Then triggers the OnlineReplica state change for all replicas.
def startup() {
// //初始化所有副本的状态信息
handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
info("Started replica state machine with initial state -> " + replicaState.toString())
* Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions
* in zookeeper
// 这里只是将 Replica 的状态信息更新副本状态机的缓存 replicaState 中,并没有真正进行状态转移的操作。
private def initializeReplicaState() {
for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
val topic = topicPartition.topic
val partition = topicPartition.partition
assignedReplicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
//如果 Replica 所在机器是 alive 的,那么将其状态设置为 OnlineReplica
if (controllerContext.isReplicaOnline(replicaId, topicPartition))
replicaState.put(partitionAndReplica, OnlineReplica)
else {
// mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
// This is required during controller failover since during controller failover a broker can go down,
// so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
//否则设置为 ReplicaDeletionIneligible 状态
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
* This API is invoked by the broker change controller callbacks and the startup API of the state machine
* @param replicas The list of replicas (brokers) that need to be transitioned to the target state
* @param targetState The state that the replicas should be moved to
* The controller's allLeaders cache should have been updated before this
//用于处理 Replica 状态的变化
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
if (replicas.nonEmpty) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
replicas.foreach(r => handleStateChange(r, targetState, callbacks))
//向 broker 发送相应请求
} catch {
case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
* This API exercises the replica's state machine. It ensures that every state transition happens from a legal
* previous state to the target state. Valid state transitions are:
* NonExistentReplica --> NewReplica
* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
* partition to every live broker
* NewReplica -> OnlineReplica
* --add the new replica to the assigned replica list if needed
* OnlineReplica,OfflineReplica -> OnlineReplica
* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
* partition to every live broker
* NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica
* --send StopReplicaRequest to the replica (w/o deletion)
* --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
* UpdateMetadata request for the partition to every live broker.
* OfflineReplica -> ReplicaDeletionStarted
* --send StopReplicaRequest to the replica (with deletion)
* ReplicaDeletionStarted -> ReplicaDeletionSuccessful
* -- mark the state of the replica in the state machine
* ReplicaDeletionStarted -> ReplicaDeletionIneligible
* -- mark the state of the replica in the state machine
* ReplicaDeletionSuccessful -> NonExistentReplica
* -- remove the replica from the in memory partition replica assignment cache
* @param partitionAndReplica The replica for which the state transition is invoked
* @param targetState The end state that the replica should be moved to
def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
callbacks: Callbacks) {
val topic = partitionAndReplica.topic
val partition = partitionAndReplica.partition
val replicaId = partitionAndReplica.replica
val topicAndPartition = TopicAndPartition(topic, partition)
// Replica 不存在的话,状态初始化为 NonExistentReplica
val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
try {
def logStateChange(): Unit =
stateChangeLog.trace(s"Changed state of replica $replicaId for partition $topicAndPartition from " +
s"$currState to $targetState")
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
assertValidTransition(partitionAndReplica, targetState)
targetState match {
case NewReplica => 其前置状态只能为 NonExistentReplica
// start replica as a follower to the current leader for its partition
//从 zk 获取 Partition 的 leaderAndIsr 信息
val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
throw new StateChangeFailedException(s"Replica $replicaId for partition $topicAndPartition cannot " +
s"be moved to NewReplica state as it is being requested to become leader")
//向该 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求
topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment, isNew = true)
//对于新建的 Partition,处于这个状态时,该 Partition 是没有相应的 LeaderAndIsr 信息的
case None => // new leader request will be sent to this replica when one gets elected
//将该 Replica 的状态转移成 NewReplica,然后结束流程。
replicaState.put(partitionAndReplica, NewReplica)
case ReplicaDeletionStarted => //其前置状态只能为 OfflineReplica
//更新向该 Replica 的状态为 ReplicaDeletionStarted;
replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
// send stop replica command
//发送 StopReplica 请求给该副本,并设置 deletePartition=true
//broker收到这请求后,会从物理存储上删除这个 Replica 的数据内容
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
case ReplicaDeletionIneligible => //其前置状态只能为 ReplicaDeletionStarted
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
case ReplicaDeletionSuccessful => //其前置状态只能为 ReplicaDeletionStarted
replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
case NonExistentReplica => //其前置状态只能为 ReplicaDeletionSuccessful。
// NonExistentReplica 是副本完全删除、不存在这个副本的状态
// remove this replica from the assigned replicas list for its partition
//在 controller 的 partitionReplicaAssignment 删除这个 Partition 对应的 replica 信息;
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
//将这个 Topic 从缓存中删除。
case OnlineReplica =>//其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible
//副本正常工作时的状态,此时的 Replica 既可以作为 leader 也可以作为 follower
replicaState(partitionAndReplica) match {
case NewReplica => //其前置状态如果为 NewReplica
// add this replica to the assigned replicas list for its partition
//从 Controller 的 partitionReplicaAssignment 中获取这个 Partition 的 AR;
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//如果 Replica 不在 AR 中的话,那么就将其添加到 Partition 的 AR 中;
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
case _ => //其前置状态如果为:OnlineReplica, OfflineReplica, ReplicaDeletionIneligible
// check if the leader for this partition ever existed
//如果该 Partition 的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的状态,并发送相应的请求
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
replicaState.put(partitionAndReplica, OnlineReplica)
case None => // that means the partition was never in OnlinePartition state, this means the broker never
// started a log for that partition and does not have a high watermark value for this partition
//最后将 Replica 的状态设置为 OnlineReplica 状态。
replicaState.put(partitionAndReplica, OnlineReplica)
case OfflineReplica => //其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible
// send stop replica command to the replica so that it stops fetching from the leader
//发送 StopReplica 请求给该副本,先停止副本同步 (deletePartition = false)
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean =
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(_) =>
//将该 replica 从 Partition 的 isr 移除这个 replica(前提 isr 中还有其他有效副本)
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request to all the remaining alive replicas of the partition.
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if (!controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition)) {
// 发送 LeaderAndIsr 请求给剩余的其他副本,因为 ISR 变动了
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
//更新这个 Replica 的状态为 OfflineReplica
replicaState.put(partitionAndReplica, OfflineReplica)
case None =>
case None =>
if (leaderAndIsrIsEmpty && !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))
throw new StateChangeFailedException(
s"Failed to change state of replica $replicaId for partition $topicAndPartition since the leader " +
s"and isr path in zookeeper is empty")
catch {
case t: Throwable =>
stateChangeLog.error(s"Initiated state change of replica $replicaId for partition $topicAndPartition from " +
s"$currState to $targetState failed", t)
上面 Replica 各种转移的触发的条件: