一、上下文
《Kafka-生产者源码分析》博客中我们了解了Kafka是如何生产数据的,《Kafka-broker粗粒度启动流程》博客中我们了解了KafkaApis中有各种api和对应处理逻辑,其中PRODUCE请求对应了处理produce请求的逻辑,下面我们跟着源码来看下处理细节
class KafkaApis(
//......
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
......
}
}
二、handleProduceRequest
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
//获取请求体
val produceRequest = request.body[ProduceRequest]
//为每个TopicPartition 的不同情况声明不同的responseMap
//未经授权的
val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
//不存在的
val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
//无效的
val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
//已授权的
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
//缓存结果以避免冗余的授权调用
val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
produceRequest.data().topicData().asScala)(_.name())
//依次循环 topic > partition 来处理
//这说明这一个请求中需要处理多个 topic 的多个 partition 的数据
produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
val topicPartition = new TopicPartition(topic.name, partition.index)
val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
if (!authorizedTopics.contains(topicPartition.topic))
unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
try {
//只有授权且metadataCache中有这个topicPartition 才能走到这
//校验数据,并把该数据放入authorizedRequestInfo
ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
authorizedRequestInfo += (topicPartition -> memoryRecords)
} catch {
//......
}
}
//回调函数,这里先不展开,后续处理完数据需要返回给producer时再展开
@nowarn("cat=deprecation")
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {...}
def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
processingStats.forKeyValue { (tp, info) =>
updateRecordConversionStats(request, tp, info)
}
}
//如果没有有效数据,就立即返回空的情况
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID
val transactionSupportedOperation = if (request.header.apiVersion > 10) genericError else defaultError
//调用副本管理器将消息追加到副本
//副本管理器 是一个屏蔽了集群的层 里面既包括本地leader写也包括远程Follower写
replicaManager.handleProduceAppend(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
transactionalId = produceRequest.transactionalId,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordValidationStatsCallback = processingStatsCallback,
requestLocal = requestLocal,
transactionSupportedOperation = transactionSupportedOperation)
//如果请求被放入炼狱,它将有一个被保留的引用,因此不能被垃圾回收;因此,我们在这里清除它的数据,以便让GC回收内存,因为它已经附加到日志中
//当follower的数据追平leader的数据,且leader没有新数据增长时,follower的fetch请求会放入炼狱,来减少带宽的消耗
produceRequest.clearPartitionRecords()
}
}
三、ReplicaManager
最终需要将数据给到ReplicaManager来进行实际的追加
class ReplicaManager(...){
def handleProduceAppend(...){
val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
entriesPerPartition.forKeyValue { (topicPartition, records) =>
// 生成请求(仅需要验证的请求)应该在“批处理”中每个分区只有一个批处理,但为了安全起见,请检查所有批处理。
val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)
transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
if (transactionalBatches.nonEmpty)
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
}
if (transactionalProducerInfo.size > 1) {
//从这里看出,事务记录应该只包含一个生产者id
//抛出异常并提示:事务记录包含多个生产者ID
throw new InvalidPidMappingException("Transactional records contained more than one producer ID")
}
//又封装了一层回调
def postVerificationCallback(...){}
//如果事务记录包含0个生产者id,不用处理
if (transactionalProducerInfo.size < 1) {
postVerificationCallback(
requestLocal,
(Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard])
)
return
}
//事务记录中只能有一个生产者id
maybeStartTransactionVerificationForPartitions(
topicPartitionBatchInfo,
transactionalId,
transactionalProducerInfo.head._1,
transactionalProducerInfo.head._2,
//当事务验证完成时,将要处理的回调封装在任意请求处理程序线程上。传入的本地请求仅在立即执行回调时使用。
KafkaRequestHandler.wrapAsyncCallback(
postVerificationCallback,
requestLocal
),
transactionSupportedOperation
)
}
}
1、postVerificationCallback
事务校验完成,会处理回调,执行真正的数据追加
def postVerificationCallback(...): Unit = {
val (preAppendErrors, verificationGuards) = results
//将事务协调器错误转换为已知的生产者响应错误
val errorResults = preAppendErrors.map {
//......
}
val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) }
val preAppendPartitionResponses = buildProducePartitionStatus(errorResults).map { case (k, status) => k -> status.responseStatus }
def newResponseCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {
responseCallback(preAppendPartitionResponses ++ responses)
}
//执行数据追加操作
appendRecords(
timeout = timeout,
requiredAcks = requiredAcks,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.CLIENT,
entriesPerPartition = entriesWithoutErrorsPerPartition,
responseCallback = newResponseCallback,
recordValidationStatsCallback = recordValidationStatsCallback,
requestLocal = newRequestLocal,
actionQueue = actionQueue,
verificationGuards = verificationGuards
)
}
2、appendRecords
将消息附加到分区的leader副本,并等待它们复制到其他副本;当超时或满足所需的ack时,将触发回调函数;如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁
注意,所有挂起的延迟检查操作都存储在队列中。ReplicaManager.appendRecords() 的所有调用者都应该对所有受影响的分区调用ActionQueue.tryCompleteActions,而不会持有任何冲突的锁(将多线程锁的问题转成线性队列操作来提升性能)
def appendRecords(t...): Unit = {
//验证 acks 必须是 0 1 -1 三种类型
if (!isValidRequiredAcks(requiredAcks)) {
sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
return
}
val sTime = time.milliseconds
//向本地log写入数据
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap)
//生产到本地日志中用了多长时间
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
//下面的我们后续博客再接着分析,本地写完log,
//按理论知识:如果ack=0或者1,就可以直接返回了,如果是-1就需要等待备份数据写入成功
val produceStatus = buildProducePartitionStatus(localProduceResults)
addCompletePurgatoryAction(actionQueue, localProduceResults)
recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
k -> v.info.recordValidationStats
})
maybeAddDelayedProduce(
requiredAcks,
delayedProduceLock,
timeout,
entriesPerPartition,
localProduceResults,
produceStatus,
responseCallback
)
}
3、appendToLocalLog
接下来我们看看kafka是如何将消息附加到本地副本日志的
private def appendToLocalLog(...){
//按照每个topic > 分区 来写
entriesPerPartition.map { case (topicPartition, records) =>
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
//如果不允许,则拒绝附加到内部主题
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")),
hasCustomErrorMessage = false))
} else {
try {
val partition = getPartitionOrException(topicPartition)
//接下来交给Partition向leader追加数据
val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,
//......省略.....
} catch {
//......
}
}
}
}
四、Partition
它是表示topic partition的数据结构。leader负责维护AR、ISR、CUR、RAR
并发注意事项:
1、分区是线程安全的。分区上的操作可以从不同的请求处理程序线程并发调用
2、ISR更新使用读写锁同步。读锁用于检查是否需要更新,以避免在不执行更新的情况下获取副本的常见情况下获取写锁。在执行更新之前,在写锁下第二次检查ISR更新条件
3、在保持ISR写锁的同时,处理各种其他操作,如leader更改。这可能会在生成和副本获取请求中引入延迟,但这些操作通常不常见
4、使用ISR读锁同步HW更新
5、锁用于防止在ReplicaAlterDirThread执行时更新follower副本。可以使用ReplaceCurrentWithFutureReplica()用未来的副本替换follower副本。
1、appendRecordsToLeader
ReplicaManager调用了Partition的该方法来继续执行数据的追加操作
def appendRecordsToLeader(...): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
//最小的ISR
val minIsr = effectiveMinIsr(leaderLog)
val inSyncSize = partitionState.isr.size
//如果没有足够的insync副本来保证安全,请避免写信给leader
//如果目前的ids < 最小的isr 要求,且还要求了 acks = -1 就直接返回异常
if (inSyncSize < minIsr && requiredAcks == -1) {
//这个 topic 的 partition 当前的isr 集合 为 partitionState.isr 不足以满足 min.isr 要求
throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
}
//将追加数据的任务委托给leaderLog既:UnifiedLog
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion, requestLocal, verificationGuard)
//我们可能需要增加高水位,因为ISR可能会降至1
(info, maybeIncrementLeaderHW(leaderLog))
case None =>
//抛出异常:不能在该broker为xx分区的ledaer写入数据
}
}
info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
}
五、UnifiedLog
本地和分层日志段的统一视图。
1、appendAsLeader
//将此消息集附加到本地日志的活动段 segment ,分配偏移量和分区前导纪元
def appendAsLeader(...): LogAppendInfo = {
//这里需要验证log的来源,从而决定是否有对offset验证的必要
//当下log是来自客户端,要写入的对象是RAFT_LEADER,筏头的意思,kafka中有一个HW的概念
//我理解的意思是:此时要写入的数据是在HW之上的,因此称之为木筏leader
//总共有四种来源:既REPLICATION(副本)、COORDINATOR(组协调员和事务)、CLIENT(客户端)、RAFT_LEADER(leader)
val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
//将此消息集附加到本地日志的活动段,必要时滚动到新段。
//此方法通常负责为消息分配偏移量,但是如果传递了assignOffsets=false标志,我们将只检查现有的偏移量是否有效。
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
}
2、append
private def append(...): LogAppendInfo = {
//我们希望确保在将任何日志数据写入磁盘之前,将分区元数据文件写入日志目录。
//这将确保在发生故障时,可以使用正确的topic ID恢复任何日志数据。
//可能要写元数据了,这里用了flush,因为元数据比较重要,数据到pagecache后需要直接写入磁盘
maybeFlushMetadataFile()
//对要追加的数据进行解析和校验,校验的有以下几点
//1、每条消息都与其CRC匹配,循环冗余校验
//2、每个消息大小是否有效
//3、传入记录批的序列号与现有状态一致
//4、offset是否单调递增
//也需要计算下面纬度的数量:
//1、第一条消息的 offset
//2、最后一条消息的offset
//3、消息条数
//4、有效字节数
//5、偏移量是否单调递增
//6、是否使用了任何压缩编解码器(如果使用了多个,则给出最后一个)
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)
// 如果我们没有有效的消息,或者这是最后一个附加条目的重复,则返回
if (appendInfo.validBytes <= 0) appendInfo
else {
// 在将任何无效字节或部分消息附加到磁盘日志之前,请先对其进行修剪
var validRecords = trimInvalidBytes(records, appendInfo)
// 它们是有效的,请将其插入日志中
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
//主要是检查 log 的日志文件 的 mmap 是否关闭
localLog.checkIfMemoryMappedBufferClosed()
if (validateAndAssignOffsets) {
//为消息集分配偏移量
val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
appendInfo.setFirstOffset(offset.value)
val validateAndOffsetAssignResult = try {
val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression())
val validator = new LogValidator(validRecords,
topicPartition,
time,
appendInfo.sourceCompression,
targetCompression,
config.compact,
config.recordVersion.value,
config.messageTimestampType,
config.messageTimestampBeforeMaxMs,
config.messageTimestampAfterMaxMs,
leaderEpoch,
origin,
interBrokerProtocolVersion
)
validator.validateMessagesAndAssignOffsets(offset,
validatorMetricsRecorder,
requestLocal.getOrElse(throw new IllegalArgumentException(
"requestLocal should be defined if assignOffsets is true")
).bufferSupplier
)
} catch {
//......
}
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)
appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)
// 如果消息大小有可能发生变化(由于重新压缩或消息格式转换),则对其进行电子验证
if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
validRecords.batches.forEach { batch =>
if (batch.sizeInBytes > config.maxMessageSize) {
// 我们记录原始消息集大小,而不是修剪后的大小,以与预压缩字节RejectedRate记录保持一致
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
}
}
}
} else {
//我们 使用 自己给的offsets
if (appendInfo.firstOrLastOffsetOfFirstBatch < localLog.logEndOffset) {
// 如果日志为空,我们仍然可以恢复,例如:从未批处理对齐的leader上的日志开始偏移量中获取,这可能是由于AdminClient#deleteRecords()造成的
val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffset
val firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()
val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"
throw new UnexpectedAppendOffsetException(...)
}
}
// 用领导者标记在消息上的纪元更新纪元缓存
validRecords.batches.forEach { batch =>
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
} else {
// 在部分升级场景中,我们可能会对消息格式进行临时回归。为了确保领导人选举的安全性,我们清除了纪元缓存,以便在下一次领导人选举后恢复到 HW 截断。
leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
cache.clearAndFlush()
}
}
}
// 检查消息集大小可能超过config.segmentSize 这一批次消息的大小不能超过 整个段的大小
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException(......)
}
//如果该段已满,可能会滚动日志
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = new LogOffsetMetadata(
appendInfo.firstOrLastOffsetOfFirstBatch,
segment.baseOffset,
segment.size)
// 现在我们有了有效的记录、分配的偏移量和更新的时间戳,我们需要验证生产者的幂等/事务状态,并收集一些元数据
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, origin, verificationGuard)
maybeDuplicate match {
case Some(duplicate) =>
appendInfo.setFirstOffset(duplicate.firstOffset)
appendInfo.setLastOffset(duplicate.lastOffset)
appendInfo.setLogAppendTime(duplicate.timestamp)
appendInfo.setLogStartOffset(logStartOffset)
case None =>
//追加记录,并在追加后立即递增本地日志结束偏移量,因为对下面事务索引的写入可能会失败,
// 我们希望确保未来追加的偏移量仍然单调增长。恢复日志目录后,将清理由此产生的事务索引不一致。
// 请注意,如果事务索引的追加失败,ProducerStateManager的结束偏移量将不会更新,最后一个稳定偏移量也不会前进。
//这里就开始将记录插入 log 和索引文件了 并更新最后的 offset
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords)
updateHighWatermarkWithLogEndOffset()
//更新生产者状态
updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))
// 用真实的最后一个稳定偏移量更新事务索引。
// 使用READ_COMMITTED的消费者可见的最后一个偏移量将受到此值和高水印的限制。
completedTxns.foreach { completedTxn =>
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
segment.updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
}
// 始终更新最后一个生产者id映射偏移量,以便快照反映当前偏移量,即使没有写入任何幂等数据
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// 更新第一个不稳定偏移量(用于计算LSO)
maybeIncrementFirstUnstableOffset()
trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
s"first offset: ${appendInfo.firstOffset}, " +
s"next offset: ${localLog.logEndOffset}, " +
s"and messages: $validRecords")
//localLog.unflushedMessages : 还没有 flush 的消息数
//config.flushInterval 为 flush.messages配置 默认 Long.MAX_VALUE
//说明:
//此设置允许指定一个间隔,在该间隔内,我们将强制对写入日志的数据进行fsync。
// 例如,如果将其设置为1,我们将在每条消息后进行fsync;如果是5,我们将在每5条消息后进行fsync。
// 一般来说,我们建议您不要设置此选项,而是使用副本来提高持久性,并允许操作系统的后台刷新功能,因为它更高效。
// 此设置可以按topic覆盖(请参阅<a href=\“#topicconfigs\”>按主题配置部分</a 很灵活,可以为topic设置不同的 flush策略
if (localLog.unflushedMessages >= config.flushInterval) flush(false)
}
appendInfo
}
}
}
}
六、LocalLog
用于在本地存储消息的仅追加日志。日志是一系列LogSegments,每个LogSegments都有一个基本偏移。根据可配置的策略创建新的日志段,该策略控制给定段的字节大小或时间间隔。
因此数据寻址的逻辑是:基地址+偏移地址(基地址找到具体的LogSegment,再根据偏移地址找到文件中的具体数据)
private[log] def append(...): Unit = {
//向本地活动段中追加数据
segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
//更新LEO:既log末尾的offset
updateLogEndOffset(lastOffset + 1)
}
七、 LogSegment
日志的一部分。每个段有两个组成部分:日志和索引。日志是一个包含实际消息的FileRecords。该索引是一个从逻辑偏移映射到物理文件位置的OffsetIndex。每个段都有一个基本偏移量,该偏移量<=该段中任何消息的最小偏移量,>任何先前段中的任何偏移量。
基偏移量为[base_offset]的段将存储在两个文件中,一个[base_ooffset].index和一个[base_offset].log文件。
public void append(long largestOffset,
long largestTimestampMs,
long shallowOffsetOfMaxTimestamp,
MemoryRecords records) throws IOException {
if (records.sizeInBytes() > 0) {
//在位置{}的末尾偏移量{}处插入{}个字节,在偏移量{}处插入最大的时间戳{}
LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}",
records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp);
int physicalPosition = log.sizeInBytes();
if (physicalPosition == 0)
rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
ensureOffsetInRange(largestOffset);
//数据真正写的地方
long appendedBytes = log.append(records);
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);
// 更新内存中的最大时间戳和相应的偏移量。
if (largestTimestampMs > maxTimestampSoFar()) {
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
}
// 在索引中附加一个条目(如果需要)
//这证明并不是每次写数据都会向索引中写入标记,因此索引指向了一段数据
//需要累计写入的数据 > 索引中条目的大致字节数(index.interval.bytes 默认4096字节)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(largestOffset, physicalPosition);
timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
bytesSinceLastIndexEntry = 0;
}
bytesSinceLastIndexEntry += records.sizeInBytes();
}
}
1、log追加
FileRecords
由文件支持的 Records 实现。可以将可选的开始和结束位置应用于此实例,以允许对一系列日志记录进行切片。
public int append(MemoryRecords records) throws IOException {
int written = records.writeFullyTo(channel);
size.getAndAdd(written);
return written;
}
MemoryRecords
由ByteBuffer支持的 Records实现。这仅用于读取或就地修改记录批的现有缓冲区。
//将所有记录写入给定通道(包括部分记录)。
public int writeFullyTo(GatheringByteChannel channel) throws IOException {
buffer.mark();
int written = 0;
//并没有调 flush ,这里只是写入了 pagecache 中
//需要考内核的机制将其flush到磁盘 (linux系统中可以配置参数 当到达总内存的多少后或者脏页机制触发去写)
while (written < sizeInBytes())
written += channel.write(buffer);
buffer.reset();
return written;
}
2、offsetIndex追加
OffsetIndex
public void append(long offset, int position) {
lock.lock();
try {
if (isFull())
throw new IllegalArgumentException(...);
if (entries() == 0 || offset > lastOffset) {
log.trace("Adding index entry {} => {} to {}", offset, position, file().getAbsolutePath());
//利用mmap的特性将数据写入内核的pagecache
mmap().putInt(relativeOffset(offset));
mmap().putInt(position);
incrementEntries();
lastOffset = offset;
if (entries() * ENTRY_SIZE != mmap().position())
throw new IllegalStateException(...);
} else
throw new InvalidOffsetException(...);
} finally {
lock.unlock();
}
}
3、timeIndex追加
TimeIndex
public void maybeAppend(long timestamp, long offset) {
maybeAppend(timestamp, offset, false);
}
public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
lock.lock();
try {
if (!skipFullCheck && isFull())
throw new IllegalArgumentException(...);
// 当偏移量等于最后一个条目的偏移量时,我们不会抛出异常。这意味着我们正试图插入与最后一个条目相同的时间索引条目。
// 如果要插入的时间戳索引条目与最后一个条目相同,我们只需忽略插入,因为这可能会在以下两种情况下发生:
// 1.日志段关闭了
// 2.当滚动活动日志段时,会调用LogSegment.onBecomeActiveSegment()。
if (entries() != 0 && offset < lastEntry.offset)
throw new InvalidOffsetException(...);
if (entries() != 0 && timestamp < lastEntry.timestamp)
throw new IllegalStateException(...);
// 只有当时间戳大于最后插入的时间戳时,我们才会附加到时间索引。
// 如果所有消息都是消息格式v0,则时间戳将始终为NoTimestamp。在这种情况下,时间索引将为空。
if (timestamp > lastEntry.timestamp) {
log.trace("Adding index entry {} => {} to {}.", timestamp, offset, file().getAbsolutePath());
//同样是调用mmap将数据写入pagecache
MappedByteBuffer mmap = mmap();
mmap.putLong(timestamp);
mmap.putInt(relativeOffset(offset));
incrementEntries();
this.lastEntry = new TimestampOffset(timestamp, offset);
if (entries() * ENTRY_SIZE != mmap.position())
throw new IllegalStateException(...);
}
} finally {
lock.unlock();
}
}
八、总结
1、producer调用send发送数据
2、kafka调用对应的api进行处理
3、获取请求体中的数据
4、校验是否有有效数据,如果没有立即返回
5、调用副本管理器(ReplicaManager)将数据进行追加
6、校验事务完成后处理回调,执行真正的数据追加
7、对acks进行校验(必须是0、1、-1)
8、循环处理这次请求中的每个topic、partition,调用Partition进行数据追加
9、获取最小的ISR以及可以正常写的副本数量,如果存活的副本节点数量<最小ISR数量,且请求中的acks=-1,里面抛出异常
10、将数据委托给UnifiedLog进行追加
11、根据数据的来源对offset设置校验等级
12、写入元数据(因为元数据重要,因此要立马flush到磁盘)
13、再次对数据进行解析和校验(CRC、消息大小是否有效、序列号是否一致、offset是否单调递增)
14、校验日志的mmap是否关闭
15、为数据分配offset
16、校验数据大小是否超过了段大小
17、委托给LocalLog向本地活动段追加数据
18、委托给LogSegment进行数据追加
19、调用FileRecords、MemoryRecords将数据写入pagecache
20、判断累计数据量是否>index.interval.bytes 默认4096字节,如果大于开始写入索引
21、调用OffsetIndex将offset索引利用mmap写入pagecache
22、调用TimeIndex将time offset索引利用mmap写入pagecache(time的索引执行了offset索引,offset索引指向了真正位置)
23、更新该broker中的HW和LEO