kafka 对 java NIO 的封装

news2025/1/12 19:42:34

说明

  1. 本文基于 kafka 2.7 编写。
  2. @author JellyfishMIX - github / blog.jellyfishmix.com
  3. LICENSE GPL-2.0

java NIO 组件

几个 java NIO 的组件。

  1. Buffer: 缓冲区。这是一个接口,kafka 用它的 ByteBuffer 实现类,配合 SocketChannel 实现读写操作。读的时候,调用 channel#read(buffer) 把 SocketChannel 的数据读到 ByteBuffer 内。写的时候,调用 channel.write(buffer) 把 Buffer 中的数据写到 SocketChannel 内。
  2. SocketChannel: 网络连接通道, byte 数据的读写都发生在这个通道上,包括从通道中读出数据, 将数据写入通道。
  3. SelectionKey: 选择键。每个 SocketChannel 向 Selector 注册标识时,都会创建一个 SelectionKey。SelectionKey 里可以定义 Selector 监听 SocketChannel 的事件,包括连接、读、写事件(SelectionKey#OP_CONNECT, OP_READ, OP_WRITE)。
  4. Selector: 选择器,用来监听注册的 SelectionKey 关注的事件。

本文涉及 java NIO 相关内容, 推荐先阅读 ByteBuffer 相关内容。

kafka 对 java NIO 组件的封装

  1. Selector(Kafka 自己的 Selector 类): 对 NIO 中 Selector 的封装。
  2. TransportLayer: 对 NIO 中 SocketChannel 的封装。TransportLayer 是一个接口, 实现类有 PlaintextTransportLayer 和 SslTransportLayer,其中,PlaintextTransportLayer 是明文传输的实现,SslTransportLayer 是 SSL 加密传输的实现。本文只涉及 PlaintextTransportLayer。
  3. NetworkReceive: 对 NIO 中读 Buffer 的封装,用来缓存接收的数据。
  4. NetworkSend: 对 NIO 中写 Buffer 的封装,用来缓存发送的数据。
  5. KafkaChannel: 把 TransportLayer, NetworkReceive 和 NetworkSend 又做了一次封装,隐藏了底层组件的细节。
  6. Kafka 对 NIO 中的 SelectionKey 没有封装,直接使用。

kafka 封装的 NIO 组件关系

  1. Selector 监听到客户端的读写事件后,会获取绑定在 SelectionKey 上的 KafkaChannel。
  2. KafkaChannel 会调用 TransportLayer 进行读写操作, TransportLayer 会调用 SocketChannel 进行读写操作, 完成数据的发送。数据的接收流程类似。

6B28DE84-60F6-47AC-8EE1-28DAE5EAC2F2.png

TransportLayer

TransportLayer 是对 NIO 中 SocketChannel 的封装。它的实现类有 2 个:

  1. PlaintextTransportLayer, 明文传输的实现。
  2. SslTransportLayer 类, SSL 加密传输的实现。

本文只涉及 PlaintextTransportLayer。

PlaintextTransportLayer

PlaintextTransportLayer#finishConnect 方法 – 完成网络连接

org.apache.kafka.common.network.PlaintextTransportLayer#finishConnect

  1. 调用 SocketChannel#finishConnect 方法,返回连接是否已经建立。
  2. 如果连接已经建立,则取消对连接事件的监听,增加对读事件的监听。
    @Override
    public boolean finishConnect() throws IOException {
        // 调用 SocketChannel#finishConnect 方法,返回连接是否已经建立
        boolean connected = socketChannel.finishConnect();
        // 如果连接已经建立,则取消对连接事件的监听,增加对读事件的监听
        if (connected)
            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        return connected;
    }

PlaintextTransportLayer#read 方法 – 读取数据

org.apache.kafka.common.network.PlaintextTransportLayer#read(java.nio.ByteBuffer)

把 SocketChannel 中的数据读取到 ByteBuffer 中。

    @Override
    public int read(ByteBuffer dst) throws IOException {
        // 把 SocketChannel 中的数据读取到 ByteBuffer 中
        return socketChannel.read(dst);
    }

PlaintextTransportLayer#write – 写入数据

org.apache.kafka.common.network.PlaintextTransportLayer#write(java.nio.ByteBuffer)

把 ByteBuffer 中的数据写入到 SocketChannel 中。

    @Override
    public int write(ByteBuffer src) throws IOException {
        // 把 ByteBuffer 中的数据写入到 SocketChannel 中
        return socketChannel.write(src);
    }

NetworkReceive

  1. java NIO 一次读写不一定读写完数据,这样需要判断读写是否完成,没有读写完的数据需要继续执行读写操作。
  2. 这样的操作较为繁琐,对调用方不友好。于是 kafka 把 ByteBuffer 进行了封装,用于读的 Buffer 封装成 NetworkReceive, 用于写的 Buffer 封装成 NetworkSend。

NetworkReceive 的属性

    /**
     * channelId
     */
    private final String source;
    /**
     * size 是固定大小的 4 byte ByteBuffer, kafka 传输数据时, 约定把要传输数据的长度放在最开头 4 byte, size 只用来接收这 4 byte 的长度信息
     */
    private final ByteBuffer size;
    /**
     * 能接收的最大消息
     */
    private final int maxSize;
    /**
     * 内存池
     */
    private final MemoryPool memoryPool;
    /**
     * 记录真正数据内容长度信息大小
     */
    private int requestedBufferSize = -1;
    /**
     * buffer 用来承载真正的数据内容, 即 4 byte 长度数据后的内容
     */
    private ByteBuffer buffer;

NetworkReceive#readFrom 方法 – 把 channel 中的数据读到 ByteBuffer 中

org.apache.kafka.common.network.NetworkReceive#readFrom

  1. 注意 size 的作用, size 是固定大小的 4 byte ByteBuffer, kafka 传输数据时, 约定把要传输数据的长度放在最开头 4 byte, size 只用来接收这 4 byte 的长度信息。
  2. 判断 size 是否有剩余空间, 有剩余空间则从 channel 中读取数据至 size 中。
    1. 如果从 channel 中读取数据后, size 没有剩余空间了, 说明长度信息读取完了(因为长度信息总共只占 4 byte, 读取后刚好把 size 占满)。
    2. 前 4 个 byte 存放了数据的长度, 以 int 类型获取。
    3. 针对本次通过 channel 传输数据的长度做校验。
  3. 从 channel 中读取真正的数据内容, 即 4 byte 长度数据后的内容, buffer 用来承载真正的数据内容。
    1. 给 buffer 分配 size 中记录的长度信息大小的内存空间。
    2. 把 channel 中的数据读到 buffer 中。维护读取的字节大小数。
    /**
     * 把 channel 中的数据读到 ByteBuffer 中
     */
    public long readFrom(ScatteringByteChannel channel) throws IOException {
        // 维护读取的字节大小数
        int read = 0;
        // 注意 size 的作用, size 是固定大小的 4 byte ByteBuffer, kafka 传输数据时, 约定把要传输数据的长度放在最开头 4 byte, size 只用来接收这 4 byte 的长度信息
        // 判断 size 是否有剩余空间, 有剩余空间则从 channel 中读取数据至 size 中
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            // 维护读取的字节大小数
            read += bytesRead;
            // 如果从 channel 中读取数据后, size 没有剩余空间了, 说明长度信息读取完了(因为长度信息总共只占 4 byte, 读取后刚好把 size 占满)
            if (!size.hasRemaining()) {
                // ByteBuffer#position 置 0, 从头开始读取
                size.rewind();
                // 前 4 个 byte 存放了数据的长度, 以 int 类型获取
                int receiveSize = size.getInt();
                // 针对本次通过 channel 传输数据的长度做校验
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
                if (receiveSize == 0) {
                    buffer = EMPTY_BUFFER;
                }
            }
        }
        // 下面要从 channel 中读取真正的数据内容, 即 4 byte 长度数据后的内容, buffer 用来承载真正的数据内容
        // 给 buffer 分配 size 中记录的长度信息大小的内存空间
        if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
            buffer = memoryPool.tryAllocate(requestedBufferSize);
            if (buffer == null)
                log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
        }
        // 把 channel 中的数据读到 buffer 中
        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0)
                throw new EOFException();
            // 维护读取的字节大小数
            read += bytesRead;
        }

        return read;
    }

NetworkSend

层次关系

NetworkSend extends ByteBufferSend, ByteBufferSend implements Send

image-20230604173917572

ByteBufferSend#writeTo 方法 – 把 ByteBuffer 中的数据写入 SocketChannel

org.apache.kafka.common.network.ByteBufferSend#writeTo

  1. 把 ByteBuffer 中的数据写入 SocketChannel, 返回写入的字节数。
  2. 维护还剩多少字节没有写进 SocketChannel。
    /**
     * 把 ByteBuffer 中的数据写入 SocketChannel
     */
    @Override
    public long writeTo(GatheringByteChannel channel) throws IOException {
        // 把 ByteBuffer 中的数据写入 SocketChannel, 返回写入的字节数
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        // 维护还剩多少字节没有写进 SocketChannel
        remaining -= written;
        pending = TransportLayers.hasPendingWrites(channel);
        return written;
    }

NetworkSend#sizeBuffer 方法 – 分配 4 个字节的 sizeBuffer

org.apache.kafka.common.network.NetworkSend#sizeBuffer

分配 4 个字节的 sizeBuffer, 用来存储要发送的数据长度

    /**
     * 分配 4 个字节的 sizeBuffer, 用来存储要发送的数据长度
     */
    private static ByteBuffer sizeBuffer(int size) {
        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
        sizeBuffer.putInt(size);
        sizeBuffer.rewind();
        return sizeBuffer;
    }

KafkaChannel

org.apache.kafka.common.network.KafkaChannel

KafkaChannel#setSend 方法-- 正式发送请求前设置 NetworkSend

org.apache.kafka.common.network.KafkaChannel#setSend

正式发送请求前设置 NetworkSend(用于发送的 byteBuffer), 并让 SelectionKey 关注写事件。

    /**
     * 正式发送请求前设置 NetworkSend(用于发送的 byteBuffer), 并让 SelectionKey 关注写事件
     */
    public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
        this.send = send;
        // SelectionKey 关注写事件
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

KafkaChannel#write – 发送数据

org.apache.kafka.common.network.KafkaChannel#write

把 NetworkSend 中的数据写入 SocketChannel。

    /**
     * 把 NetworkSend 中的数据发送出去
     */
    public long write() throws IOException {
        if (send == null)
            return 0;

        midWrite = true;
        // 把 NetworkSend 中的数据写入 SocketChannel
        return send.writeTo(transportLayer);
    }

KafkaChannel#read 方法 – 读取数据

org.apache.kafka.common.network.KafkaChannel#read

  1. 把 SocketChannel 中的数据读取到 NetworkReceive 中。
  2. 判断是否读完的条件是 NetworkReceive 里的 size 和 buffer 是否用完, 因为 NetworkReceive 的 size 和 buffer 两个 byteBuffer 的大小,正好是 SocketChannel 中接收到数据的大小。
    /**
     * 把 SocketChannel 中的数据读取到 NetworkReceive 中
     */
    public long read() throws IOException {
        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
        }

        // 把 SocketChannel 中的数据读取到 NetworkReceive 中, 返回读取信息的字节数
        long bytesReceived = receive(this.receive);

        // 判断是否读完的条件是 NetworkReceive 里的 size 和 buffer 是否用完, 因为 NetworkReceive 的 size 和 buffer 两个 byteBuffer 的大小,正好是 SocketChannel 中接收到数据的大小
        if (this.receive.requiredMemoryAmountKnown() && !this.receive.memoryAllocated() && isInMutableState()) {
            //pool must be out of memory, mute ourselves.
            mute();
        }
        return bytesReceived;
    }

Selector

Selector#connect – 建立连接

org.apache.kafka.common.network.Selector#connect

  1. 验证。
  2. 创建并配置 SocketChannel。
    1. 包括配置非阻塞模式, 设置长连接, 设置 SO_SNDBUF 和 SO_RCVBUF 的大小。SO_SNDBUF、SO_RCVBUF 表示发送和接收数据缓存的大小。
    2. 建立一个连接,由于是非阻塞建立连接,方法会直接返回,不一定连接建立完毕。后面会通过 Selector#finishConnect 方法, 连接并确认是否连接成功。
    3. 将上面创建的 SocketChannel 注册到 nioSelector 上,关注 OP_CONNECT 事件。
    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        // 验证
        ensureNotRegistered(id);
        // 创建 SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        SelectionKey key = null;
        try {
            // 配置 SocketChannel
            configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
            // 建立一个连接,由于是非阻塞建立连接,方法会直接返回,不一定连接建立完毕
            // 后面会通过 Selector#finishConnect 方法, 连接并确认是否连接成功
            boolean connected = doConnect(socketChannel, address);
            // 将上面创建的 SocketChannel 注册到 nioSelector 上,关注 OP_CONNECT 事件
            key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);

            // 如果已经连接成功了,则取消对OP_CONNECT的监听
            if (connected) {
                // OP_CONNECT won't trigger for immediately connected channels
                log.debug("Immediately connected to node {}", id);
                immediatelyConnectedKeys.add(key);
                key.interestOps(0);
            }
        } catch (IOException | RuntimeException e) {
            if (key != null)
                immediatelyConnectedKeys.remove(key);
            channels.remove(id);
            socketChannel.close();
            throw e;
        }
    }

Selector#send – 将 Send 设置到 KafkaChannel 中

org.apache.kafka.common.network.Selector#send

  1. 获取 channelId 作为 connectionId, 获取连接。
  2. 把 send 放入 KafkaChannel 里,并让 SelectionKey 关注写事件。
    /**
     * 将 Send 设置到 KafkaChannel 的 send 字段中,并让 SelectionKey 关注写事件
     */
    public void send(Send send) {
        // 获取 channelId 作为 connectionId
        String connectionId = send.destination();
        // 获取连接
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        // 如果连接是关闭的,就把 connectionId 放到 closingChannels 集合里
        if (closingChannels.containsKey(connectionId)) {
            // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
            this.failedSends.add(connectionId);
        } else {
            try {
                // 把 send 放入 KafkaChannel 里,并让 SelectionKey 关注写事件
                channel.setSend(send);
            } catch (Exception e) {
                // update the state for consistency, the channel will be discarded after `close`
                // 异常处理
                channel.state(ChannelState.FAILED_SEND);
                // ensure notification via `disconnected` when `failedSends` are processed in the next poll
                this.failedSends.add(connectionId);
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                if (!(e instanceof CancelledKeyException)) {
                    log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
                            connectionId, e);
                    throw e;
                }
            }
        }
    }

Selector#write 方法 – 调用 KafkaChannel 执行写操作

org.apache.kafka.common.network.Selector#write

  1. 获取 KafkaChannel 对应的 nodeId。
  2. 把 NetworkSend 中的数据发送出去。
  3. 如果发送完成,则返回 send,并取消 SelectionKey 对写事件的关注。
    /**
     * 调用 KafkaChannel 执行写操作
     */
    // package-private for testing
    void write(KafkaChannel channel) throws IOException {
        // 获取 KafkaChannel 对应的 nodeId
        String nodeId = channel.id();
        // 把 NetworkSend 中的数据发送出去
        long bytesSent = channel.write();
        // 如果发送完成,则返回 send,并取消 SelectionKey 对写事件的关注
        Send send = channel.maybeCompleteSend();
        // We may complete the send with bytesSent < 1 if `TransportLayer.hasPendingWrites` was true and `channel.write()`
        // caused the pending writes to be written to the socket channel buffer
        if (bytesSent > 0 || send != null) {
            long currentTimeMs = time.milliseconds();
            if (bytesSent > 0)
                this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
            if (send != null) {
                this.completedSends.add(send);
                this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
            }
        }
    }

Selector#attemptWrite 方法 – 尝试调用 KafkaChannel 执行写操作

org.apache.kafka.common.network.Selector#attemptWrite

尝试调用 KafkaChannel 执行写操作,需满足如下条件:

  1. send 不为空。
  2. KafkaChannel 连接正常。
  3. SelectionKey 是可写状态。
  4. 客户端验证没有开启。
    /**
     * 尝试调用 KafkaChannel 执行写操作
     */
    private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
        /*
         * 1. send 不为空
         * 2. KafkaChannel 连接正常
         * 3. SelectionKey 是可写状态
         * 4. 客户端验证没有开启
         */
        if (channel.hasSend()
                && channel.ready()
                && key.isWritable()
                && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
            write(channel);
        }
    }

Selector#attemptRead 方法 – 尝试调用 kafkaChannel 执行读操作

org.apache.kafka.common.network.Selector#attemptRead

  1. 调用 kafkaChannel 执行读操作, 返回读取的字节数。
  2. 如果当前 NetworkReceive 读取满了(说明本次请求完整接收了),则将其置空,下次读操作时会创建新的 NetworkReceive 对象。
  3. 读完的 NetworkReceive 加入 completedReceives 队列中。
    /**
     * 尝试调用 kafkaChannel 执行读操作
     */
    private void attemptRead(KafkaChannel channel) throws IOException {
        String nodeId = channel.id();

        // 调用 kafkaChannel 执行读操作, 返回读取的字节数
        long bytesReceived = channel.read();
        if (bytesReceived != 0) {
            long currentTimeMs = time.milliseconds();
            sensors.recordBytesReceived(nodeId, bytesReceived, currentTimeMs);
            madeReadProgressLastPoll = true;

            // 如果当前 NetworkReceive 读取满了(说明本次请求完整接收了),则将其置空,下次读操作时会创建新的 NetworkReceive 对象
            NetworkReceive receive = channel.maybeCompleteReceive();
            if (receive != null) {
                // 读完的 NetworkReceive 加入 completedReceives 队列中
                addToCompletedReceives(channel, receive, currentTimeMs);
            }
        }
        if (channel.isMuted()) {
            outOfMemory = true; //channel has muted itself due to memory pressure.
        } else {
            madeReadProgressLastPoll = true;
        }
    }

Selector#poll 方法 – 获取监听的网络 IO 事件并处理

org.apache.kafka.common.network.Selector#poll

  1. 将上一次 poll 方法的结果全部清除掉。
  2. nioSelector 线程 selectNow 非阻塞或 select 阻塞地获取 IO 事件。
  3. 监听到 IO 事件, 或立即连接的集合不为空,或有数据在缓存中,则进行处理。
    1. 获取有 IO 事件的 SelectionKey 集合。
    2. 调用处理有 IO 事件的 SelectionKey。
    3. 处理立即连接的 SelectionKey。
    @Override
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");

        boolean madeReadProgressLastCall = madeReadProgressLastPoll;
        // 将上一次 poll 方法的结果全部清除掉
        clear();

        boolean dataInBuffers = !keysWithBufferedRead.isEmpty();

        if (!immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
            timeout = 0;

        if (!memoryPool.isOutOfMemory() && outOfMemory) {
            //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
            log.trace("Broker no longer low on memory - unmuting incoming sockets");
            for (KafkaChannel channel : channels.values()) {
                if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
                    channel.maybeUnmute();
                }
            }
            outOfMemory = false;
        }

        /* check ready keys */
        long startSelect = time.nanoseconds();
        // nioSelector 线程 selectNow 非阻塞或 select 阻塞地获取 IO 事件
        int numReadyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        // 监听到 IO 事件, 或立即连接的集合不为空,或有数据在缓存中
        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
            // 获取有 IO 事件的 SelectionKey 集合
            Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

            // Poll from channels that have buffered data (but nothing more from the underlying socket)
            if (dataInBuffers) {
                keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
                Set<SelectionKey> toPoll = keysWithBufferedRead;
                keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
                pollSelectionKeys(toPoll, false, endSelect);
            }

            // Poll from channels where the underlying socket has more data
            // 处理有 IO 事件的 SelectionKey
            pollSelectionKeys(readyKeys, false, endSelect);
            // Clear all selected keys so that they are included in the ready count for the next select
            readyKeys.clear();

            // 处理立即连接的 SelectionKey
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            immediatelyConnectedKeys.clear();
        } else {
            madeReadProgressLastPoll = true; //no work is also "progress"
        }

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

        // Close channels that were delayed and are now ready to be closed
        completeDelayedChannelClose(endIo);

        // we use the time at the end of select to ensure that we don't close any connections that
        // have just been processed in pollSelectionKeys
        maybeCloseOldestConnection(endSelect);
    }

Selector#pollSelectionKeys 方法 – 处理监听到的 IO 事件

org.apache.kafka.common.network.Selector#pollSelectionKeys

具体处理监听到的 IO 事件,包括连接事件, 读事件和写事件,处理立即完成的连接。

  1. 遍历有 IO 事件的 SelectionKey。
  2. 判断连接是否建立好了, 如果连接已经建立,则取消对连接事件的监听,增加对读事件的监听。
    1. 连接尚未建立, 跳过当前 SelectionKey。
  3. 维护 KafkaChannel 的状态。
  4. 处理读事件()和写事件。
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                           boolean isImmediatelyConnected,
                           long currentTimeNanos) {
        // 遍历有 IO 事件的 SelectionKey
        for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
            KafkaChannel channel = channel(key);
            long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
            boolean sendFailed = false;
            String nodeId = channel.id();

            // register all per-connection metrics at once
            sensors.maybeRegisterConnectionMetrics(nodeId);
            if (idleExpiryManager != null)
                idleExpiryManager.update(nodeId, currentTimeNanos);

            try {
                /* complete any connections that have finished their handshake (either normally or immediately) */
                // 判断连接是否建立好了, 如果连接已经建立,则取消对连接事件的监听,增加对读事件的监听
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        this.connected.add(nodeId);
                        this.sensors.connectionCreated.record();

                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                                socketChannel.socket().getReceiveBufferSize(),
                                socketChannel.socket().getSendBufferSize(),
                                socketChannel.socket().getSoTimeout(),
                                nodeId);
                    } else {
                        // 连接尚未建立, 跳过当前 SelectionKey
                        continue;
                    }
                }

                /* if channel is not ready finish prepare */
                if (channel.isConnected() && !channel.ready()) {
                    channel.prepare();
                    if (channel.ready()) {
                        long readyTimeMs = time.milliseconds();
                        boolean isReauthentication = channel.successfulAuthentications() > 1;
                        if (isReauthentication) {
                            sensors.successfulReauthentication.record(1.0, readyTimeMs);
                            if (channel.reauthenticationLatencyMs() == null)
                                log.warn(
                                    "Should never happen: re-authentication latency for a re-authenticated channel was null; continuing...");
                            else
                                sensors.reauthenticationLatency
                                    .record(channel.reauthenticationLatencyMs().doubleValue(), readyTimeMs);
                        } else {
                            sensors.successfulAuthentication.record(1.0, readyTimeMs);
                            if (!channel.connectedClientSupportsReauthentication())
                                sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs);
                        }
                        log.debug("Successfully {}authenticated with {}", isReauthentication ?
                            "re-" : "", channel.socketDescription());
                    }
                }

                // 维护 KafkaChannel 的状态
                if (channel.ready() && channel.state() == ChannelState.NOT_CONNECTED)
                    channel.state(ChannelState.READY);
                Optional<NetworkReceive> responseReceivedDuringReauthentication = channel.pollResponseReceivedDuringReauthentication();
                responseReceivedDuringReauthentication.ifPresent(receive -> {
                    long currentTimeMs = time.milliseconds();
                    addToCompletedReceives(channel, receive, currentTimeMs);
                });

                //if channel is ready and has bytes to read from socket or buffer, and has no
                //previous completed receive then read from it
                if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel)
                        && !explicitlyMutedChannels.contains(channel)) {
                    // 处理读事件
                    attemptRead(channel);
                }

                if (channel.hasBytesBuffered()) {
                    keysWithBufferedRead.add(key);
                }

                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */

                long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                try {
                    // 处理写事件
                    attemptWrite(key, channel, nowNanos);
                } catch (Exception e) {
                    sendFailed = true;
                    throw e;
                }

                /* cancel any defunct sockets */
                if (!key.isValid())
                    close(channel, CloseMode.GRACEFUL);

            } catch (Exception e) {
                // ...
            } finally {
                maybeRecordTimePerConnection(channel, channelStartTimeNanos);
            }
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/627214.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

区块链产业快速发展 和数集团开启区块链应用新时代

UTONMOS区块链游戏要来了。 就在5月底&#xff0c;UTONMOS品牌所属公司上海和数集团在泰国发布了【神念无界】系列的多款国际版链游&#xff0c;包括【神念无界-源起山海】、【北荒传奇】、【神宠岛】、【神农园】等区块链游戏。 以【神念无界-源起山海】为例&#xff0c;其是…

Web、容器化 Native、小程序跨平台!三种跨平台方案对比

前端码农工作几年&#xff0c;从一家公司跳到另一家公司&#xff0c;永远逃不掉的是跨平台需求。除了本身应用在多平台上架的需求之外&#xff0c;资源有限恐怕是最大的原因&#xff0c;跨平台方案确实可以减少重复开发工作&#xff0c;降低成本和节省时间&#xff1b;而且掌握…

从开源到云原生,时序数据库 TDengine 六年回顾精彩纷呈

2023 年 6 月 6 日&#xff0c;涛思数据旗下时序数据库&#xff08;Time Series Database&#xff09; TDengine 迎来六周年庆典&#xff0c;并于北京保利国际广场T2举办了主题为“TDengine 6th Anniversary&#xff1a;Back to The Future”的庆典活动&#xff0c;设置了「TDe…

《Contrastive Learning for Unpaired Image-to-Image Translation》

Contrastive Learning for Unpaired Image-to-Image Translation 1. 摘要2. 介绍3. 相关工作3.1 图像转换、循环一致性3.2 关系保持3.3 深度网络嵌入中的感知相似性3.4 对比表示学习 4. 方法 原文及代码链接 https://github.com/taesungp/contrastive-unpaired-translation 1.…

API之Apifox和Postman工具该如何抉择?

目录 前言 一.功能列表对比 &#xff08;一&#xff09;接口设计与文档管理功能 &#xff08;二&#xff09;接口调试功能对比 &#xff08;三&#xff09;接口mock功能 &#xff08;四&#xff09;接口测试功能 二.团队协作功能 三.Apifox 没有的功能 四.产品价格 前…

代码随想录算法训练营第五十二天|300.最长递增子序列|674. 最长连续递增序列|718. 最长重复子数组

LeetCode300.最长递增子序列 动态规划五部曲&#xff1a; 1&#xff0c;dp[i]的定义&#xff1a;本题中&#xff0c;正确定义dp数组的含义十分重要。dp[i]表示i之前包括i的以nums[i]结尾的最长递增子序列的长度。为什么一定表示 “以nums[i]结尾的最长递增子序” &#xff0c…

FP独立站卖家怎么解决收款问题?挑选支付公司有何关注点?

2023年是充满希望又充满挑战的一年。这一年&#xff0c;新冠肺炎疫情恢复&#xff0c;经济慢慢复苏&#xff0c;对做跨境电商的卖家来说是个不错的机遇&#xff1b;但由于chatgpt人工智能的出现&#xff0c;F牌网站被检测出来的几率大大提高……让F牌独立站卖家最头疼的是&…

使用iTerm2打造ssh神器

在日常工作中&#xff0c;经常要通过ssh连接远程服务器&#xff0c;每次连接都输入密码&#xff0c;会比较麻烦。 在Window系统上&#xff0c;我习惯使用xshell管理连接&#xff0c;非常方便。 在MacOS系统上&#xff0c;没有xshell&#xff0c;而一些类似xshell的工具中&#…

vue+elementui+nodejs美容院理发店产品网上商城管理系统0ffvo

在当前的信息化管理浪潮下&#xff0c;我国的各行业不断转向信息化&#xff0c;现代化的高效管理模式。研发新一代美发管理系统&#xff0c;使一直沿袭传统的美发管理模式而产生的管理效率较低&#xff0c;经营管理水平相对滞后等现象得以改善&#xff1b;而这种新型的美容美发…

红外人体感应灯单片机开发方案

近来&#xff0c;红外人体感应灯受到了居家人们关注和喜爱。为此&#xff0c;宇凡微推出了一款低成本红外人体感应灯单片机方案。红外人体感应灯可应用于走廊、床边、楼梯、衣柜等地方&#xff0c;提供柔和照明作用。人来即亮&#xff0c;人走即灭&#xff0c;不受强光影响睡眠…

[游戏开发][Unity]Assetbundle下载篇(7)获取运行时(边玩边下)下载列表

啥是运行时下载清单&#xff1f;现在大多数手游都会有一个边玩边下功能&#xff0c;会提示用户是否开启下载&#xff0c;要XXX流量&#xff0c;如果你下载完了&#xff0c;可能还会有奖励。 疑问&#xff1a;为何要有边玩边下功能&#xff1f; 解答&#xff1a;为了发包的包体…

常见的五种HDMI接口类型,你知道多少?

高清多媒体接口&#xff08;High Definition Multimedia Interface&#xff09;简称HDMI&#xff0c;是一种全数字化视频和声音发送接口&#xff0c;可以 同时发送未压缩的视频及音频信号 &#xff0c;且发送时采用同一条线材&#xff0c;大大简化了系统线路的安装难度&#xf…

java SSM 网上拍卖myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM 网上拍卖系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和 数据库&#xff0c;系统主要采用B…

[STJson]一个.Net开源json解析库

创作背景 项目地址: https://debugst.github.io/STJson 在开发过程中难免会遇到需要处理json的时候&#xff0c;但是.Net中自带的库似乎有点一言难尽啊。最后虽然找到了Newstonsoft.Json感觉还不错&#xff0c;但是还是觉得有些不如意的地方&#xff0c;它的功能虽然强大但是…

C语言:编写代码实现,模拟用户登录情景,并且只能登录三次

题目&#xff1a; 编写代码实现&#xff0c;模拟用户登录情景&#xff0c;并且只能登录三次。 只允许输入三次密码&#xff0c; 如果密码正确则提示登录成功&#xff0c; 如果三次均输入错误&#xff0c;则退出程序。 思路&#xff1a; 总体思路&#xff1a; &#xff08;一&a…

MIAOYUN“一云多芯”解决方案获评2023西部信创优秀解决方案

6月7日&#xff0c;由工业和信息化部电子第五研究所主办的“2023西部信息技术应用创新产业生态大会” 在重庆成功举办。会上&#xff0c;2023年西部地区信息技术应用创新优秀解决方案汇编正式发布&#xff0c;成都元来云志科技有限公司&#xff08;简称“MIAOYUN”&#xff09;…

现代化 Android 开发:基础架构

作者&#xff1a;古哥E下 Android 开发经过 10 多年的发展&#xff0c;技术在不断更迭&#xff0c;软件复杂度也在不断提升。到目前为止&#xff0c;虽然核心需求越来越少&#xff0c;但是对开发速度的要求越来越高。高可用、流畅的 UI、完善的监控体系等都是现在的必备要求了。…

从零开始 Spring Boot 36:注入集合

从零开始 Spring Boot 36&#xff1a;注入集合 图源&#xff1a;简书 (jianshu.com) 在前面一篇文章从零开始 Spring Boot 27&#xff1a;IoC中&#xff0c;讨论过依赖注入集合&#xff08;Java 容器&#xff09;的内容&#xff0c;这里更深入地讨论注入集合的相关内容。 我们…

ThinkPHP5学生学术管理系统

有需要请私信或看评论链接哦 可远程调试 ThinkPHP5学生学术管理系统 一 介绍 此学生学术管理系统基于ThinkPHP5框架开发&#xff0c;数据库mysql&#xff0c;前端Amazeui。系统角色分为学生用户和管理员。学生可以对个人信息&#xff0c;发表论文&#xff0c;专利授权&#x…

chatgpt赋能python:Python快速建站的SEO(搜索引擎优化)指南

Python快速建站的SEO&#xff08;搜索引擎优化&#xff09;指南 在当今数字时代&#xff0c;任何企业都需要一个强大和有效的网站。随着多个开源和商业网站平台的出现&#xff0c;建立一个网站变得更加容易。其中一个让人充满激情的开源工具是Python&#xff0c;它是一种流行的…