概述
书接上回的producer发送流程,在准备工作完成后,kafka的producer借助Sender和KafkaClient两大组件完成了数据的发送。其底层封装了java的NIO的组件channle以及selector,对于NIO组件不太熟悉的同学可以自行查询相关文档。
下面我整理了kafka发送数据的主要流程,下面是流程图:
下面根据上面的流程图一步一步剖析kafka的源码。
源码分析
接上第一节的producer主流程,这里代码唤醒sender完成消息发送。并且producer的所有和kafka服务端交互的都是在sender中完成的。
//6.发送消息。
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
sender实现了Runnable接口,所以只需要看run方法:
//一次启动,一直运行(除非关闭)
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
.....
void runOnce() {
long currentTimeMs = time.milliseconds();
//发送请求
long pollTimeout = sendProducerData(currentTimeMs);
//poll NIO事件
client.poll(pollTimeout, currentTimeMs);
}
进入sendProducerData方法,下面标注了主要的流程:
private long sendProducerData(long now) {
//1.获取元数据
Cluster cluster = metadata.fetch();
//2.获取到发送消息的主机leader
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
//3.遍历主机
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
// 4.移除发送消息的服务器
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
// 5.合并发往同一主机的请求,第一次请求batches为空
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 6.超时
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
//7.发送数据
sendProduceRequests(batches, now);
return pollTimeout;
}
第一次请求时会走到 if (!this.client.ready(node, now)),然后检查网络连接是否准备好,然后通过代理的NIO Selector初始化网络连接,并注册channel监听事件:
registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
// 网络是否连接好
if (isReady(node, now))
return true;
//是否可以建立网络连接
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
// 初始化连接
initiateConnect(node, now);
return false;
}
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
connectionStates.connecting(nodeConnectionId, now, node.host());
InetAddress address = connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node {} using address {}", node, address);
// 封装了NIO的selector,相当于加了一层代理
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
log.warn("Error connecting to node {}", node, e);
// Attempt failed, we'll try again after the backoff
connectionStates.disconnected(nodeConnectionId, now);
// Notify metadata updater of the connection failure
metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
}
}
网络连接 建立完成回到主流程,此时返回ready方法翻译false,移除所有发送消息服务器。
if (!this.client.ready(node, now)) {
// 4.移除发送消息的服务器
iter.remove();
这时将获取不到批次,退回到runOnce方法中。然后进入
client.poll(pollTimeout, currentTimeMs)方法。 这里就到了网络请求处理组件NetworkClient了。 核心方法:this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));在进入Selector里面去,这里核心方法是:
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
这里监听了各类事件,然后对不同事件进行了处理。
通过TransportLayer代理对象,最终调用NIO的socketChannel完成网络连接,并且通过取反或的方式移除了连接事件,注册读取数据事件。
public boolean finishConnect() throws IOException {
//完成网络连接
boolean connected = socketChannel.finishConnect();
if (connected)
// 移除OP_CONNECT,添加OP_READ事件
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return connected;
}
然后while再次循环,进入sendProducerData方法。
下面是发送请求数据方法流程:
sendProduceRequests(batches, now); >>client.send(clientRequest, now); >>doSend(request, false, now); >>doSend(clientRequest, isInternalRequest, now, builder.build(version)); >>
讲发送的数据封装到inFlightRequest中,然后调用selector发送请求。
//最大容忍多少个请求没有响应,默认是5个
private final InFlightRequests inFlightRequests;
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
}
Send send = request.toSend(header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(new NetworkSend(clientRequest.destination(), send));
}
这里最后给selector绑定了读取数据的事件:
public void send(NetworkSend send) {
String connectionId = send.destinationId();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// ensure notification via `disconnected`, leave channel in the state in which closing was triggered
this.failedSends.add(connectionId);
} else {
try {
//核心方法
channel.setSend(send);
}
...
}
}
public void setSend(NetworkSend send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
//绑定读取数据的事件
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public void addInterestOps(int ops) {
key.interestOps(key.interestOps() | ops);
}
然后又走到 client的poll方法,然后selector写数据:
// 写请求数据 attemptWrite(key, channel, nowNanos);//移除写事件 transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
最后就是读取服务器的响应数据:
//读取服务端事件,之前已经注册了读事件 if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel) && !explicitlyMutedChannels.contains(channel)) { attemptRead(channel); }
读取数据时kafka采用了4字节标识数据长度来避免粘包黏包的问题:
通过下面代码保证必须读取到完整的数据才会返回。
//4字节表示数据大小
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) {
size.rewind();
//4字节
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
public boolean complete() {
//是否读完
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}