Netty启动源码NioEventLoop剖析accept剖析read剖析write剖析

news2025/3/16 1:03:13

学习链接

NIO&Netty - 专栏

  • Netty核心技术十–Netty 核心源码剖析
  • Netty核心技术九–TCP 粘包和拆包及解决方案
  • Netty核心技术七–Google Protobuf
  • Netty核心技术六–Netty核心模块组件
  • Netty核心技术五–Netty高性能架构设计

聊聊Netty那些事儿 - 专栏

  • 一文搞懂Netty发送数据全流程 | 你想知道的细节全在这里

netty源码解析 - 系列

  • Netty源码分析 (一)----- NioEventLoopGroup
  • Netty源码分析 (二)----- ServerBootstrap
  • Netty源码分析 (三)----- 服务端启动源码分析
  • Netty源码分析 (四)----- ChannelPipeline
  • Netty源码分析 (五)----- 数据如何在 pipeline 中流动
  • Netty源码分析 (六)----- 客户端接入accept过程
  • Netty源码分析 (七)----- read过程 源码分析
  • Netty源码分析 (八)----- write过程 源码分析
  • Netty源码分析 (九)----- 拆包器的奥秘
  • Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
  • Netty源码分析 (十一)----- 拆包器之LengthFieldBasedFrameDecoder
  • Netty源码分析 (十二)----- 心跳服务之 IdleStateHandler 源码分析

文章目录

  • 学习链接
  • 1. 源码分析
    • 1.1 启动剖析
      • AbstractBootstrap#doBind
        • AbstractBootstrap#initAndRegister
          • ServerBootstrap#init
          • AbstractUnsafe#register
          • - ChannelInitializer#initChannel
        • AbstractBootstrap#doBind0
          • AbstractUnsafe#bind
          • - NioServerSocketChannel#doBind
          • - HeadContext#channelActive
          • -- AbstractNioChannel#doBeginRead
    • 1.2 NioEventLoop 剖析
      • NioEventLoop的重要组成
      • selector何时创建
      • nio线程在何时启动
        • SingleThreadEventExecutor#execute
          • *NioEventLoop#run
          • NioEventLoop#select
          • NioEventLoop#processSelectedKeys
          • - NioEventLoop#processSelectedKey
    • 1.3 accept 剖析
      • AbstractNioMessageChannel.NioMessageUnsafe#read
      • ServerBootstrapAcceptor#channelRead
        • AbstractChannel.AbstractUnsafe#register
          • *AbstractUnsafe#register0
          • - HeadContext#channelActive
          • --AbstractNioChannel#doBeginRead
    • 1.4 read 剖析
      • AbstractNioByteChannel.NioByteUnsafe#read
        • NioSocketChannel#doReadBytes
        • MaxMessageHandle#continueReading
    • 1.5 write剖析
      • write:写队列
      • flush:刷新写队列
      • writeAndFlush: 写队列并刷新

1. 源码分析

1.1 启动剖析

我们就来看看 netty 中对下面的代码是怎样进行处理的

(先明确Java nio的基础步骤如下,而netty在启动过程中,也是需要做下面的事情的)

//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open(); 

//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();

//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
serverSocketChannel.configureBlocking(false);

//4 启动 nio boss 线程执行接下来的操作

//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
// (注意,这里将NioServerSocketChannel作为附件绑定到了selectionKey上,当此ServerSocketChannel有可连接事件时,就可以获取到此selectionKey,从而获取到对应的NioServerSocketChannel)
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);

// (ServerBootstrapAcceptor是ChannelInboundHandlerAdapter入站类型的处理器)
//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor

//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));

//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);

在这里插入图片描述

源码入口,可以从下面的代码进入

(暂时先不看NioEventLoopGroup,而Selector是存在于NioEventLoop中的,所以selector.open暂时不看)

public class TestSourceServer {
    public static void main(String[] args) {
        new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    ch.pipeline().addLast(new LoggingHandler());
                }
            })
            .bind(8880); // 以bind为入口
    }
}

AbstractBootstrap#doBind

入口 io.netty.bootstrap.ServerBootstrap#bind

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind

(1、注意main线程和nio线程的切换;

​ 2、initAndRegister 对应 nio中 创建ServerSocketChannel 和 把ServerSocketChannel注册到selector上

​ 3、doBind0 对应 nio中 bind监听端口)

private ChannelFuture doBind(final SocketAddress localAddress) {
    
	// 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 2. 因为 initAndRegister 是异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分
    // 2.1 如果已经完成(如果前面做的比较快,就进入这个if块)
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        // 3.1 立刻调用 doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } 
    // 2.2 还没有完成
    else {
        final PendingRegistrationPromise promise 
            = new PendingRegistrationPromise(channel);
        // 3.2 回调 doBind0
        regFuture.addListener(new ChannelFutureListener() {
            
            // (这个operationComplete是由nio线程来调用的)
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // 处理异常...
                    promise.setFailure(cause);
                } else {
                    promise.registered();
					// 3. 由注册线程去执行 doBind0
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}
AbstractBootstrap#initAndRegister

关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //(1、这里会去调用NioServerSocketChannel的无参构造方法去得到channel
        // 2、在NioServerSocketChannel的无参构造方法中会去创建javanio的ServerSocketChannel,
        //    并且将该ServerSocketChannel维护在NioServerSocketChannel中,
        //    并配置为非阻塞模式,感兴趣的事件是OP_ACCEPT,但是还没注册到selector上,
        //    只是维护了这些基本信息到NioServerSocketChannel。
        //    并且在NioServerSocketChannel的构造方法中会去创建NioServerSocketChannelConfig
        //    维护到NioServerSocketChannel中。
        //    并且NioServerSocketChannel的构造方法中会去创建DefaultChannelPipeline
        //    维护到NioServerSocketChannel中)
        channel = channelFactory.newChannel();
        // 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer
        init(channel);
    } catch (Throwable t) {
        // 处理异常...
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上
    // (这里会从eventLoopGroup中挑选出1个eventLoop来注册ServerSocketChannel)
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        // 处理异常...
    }
    return regFuture;
}
ServerBootstrap#init

关键代码 io.netty.bootstrap.ServerBootstrap#init

(这里会给ServerSocketChannel的pipeline中添加1个ChannelInitializer初始化器,该初始化器只会执行1次,后续将会移除掉。)

// 这里 channel 实际上是 NioServerSocketChannel
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }
	
    // 为 NioServerSocketChannel 的pipeline 添加初始化器!!!
    // (1、该初始化器的initChannel方法只会执行1次,后续该初始化器将会移除掉,
    //      移除动作是在ChannelInitializer#initChannel中操作的。
    //  2、注意该初始化器的initChannel方法在此处尚未被调用。
    //  3、initChannel方法的调用时机是在AbstractChannel的register0方法中,
    //     在做完将channel注册到selector上之后的
    //     pipeline.invokeHandlerAddedIfNeeded()这句代码调用的)
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            
            final ChannelPipeline pipeline = ch.pipeline();
            
            // (这里意味着可以通过配置给config1个handler,
            //  从而给serverSocketChannel的pipeline添加1个handler)
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // 1、初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
            // 2、ServerBootstrapAcceptor的作用是在selector触发可连接事件时,建立连接
            // 3、保证添加这个动作是在nio线程中完成的
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, 
                                                                 currentChildHandler, 
                                                                 currentChildOptions, 
                                                                 currentChildAttrs)
                                    );
                }
            });
        }
    });
}
AbstractUnsafe#register

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 一些检查,略...

    AbstractChannel.this.eventLoop = eventLoop;

    // (判断当前线程是不是eventLoop的线程,因为顺着刚刚的逻辑,当前还在主线程中,所以走else)
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行
            // 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
            // 这行代码完成的事实是 main -> nio boss 线程的切换
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // (该方法在nio线程上执行,并且注意promise传进去了,用于通知其它线程)
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // 日志记录...
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

#####- *AbstractUnsafe#register0

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        
        // 1.2.1 【将原生的 nio channel 绑定到 selector 上】,
        //       注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
        doRegister();
        
        neverRegistered = false;
        registered = true;

        // 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel
        //(1、调用到ServerBootstrap的init方法中为NioServerSocketChannel的
        //    pipeline添加的初始化器的initChannel方法。
        //  2、该initChannel方向pipeline中添加了ServerBootstrapAcceptor这个入站处理器)
        pipeline.invokeHandlerAddedIfNeeded();

        // (给promise对象1个成功的结果,这样前面的监听就能收到这个结果触发operationComplete方法,
        //   就会去通知前面在AbstractBootstrap#doBind方法中注册的监听去做doBind0绑定监听端口)
        // 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0
        safeSetSuccess(promise);
        
        pipeline.fireChannelRegistered();
        
        // 对应 server socket channel 还未绑定,isActive 为 false
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 将java的channel注册到了eventLoop的selector上
                // (此时,尚未注册感兴趣的事件。同时,注意当前this作为附件绑定到了selectionKey)
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }
- ChannelInitializer#initChannel

关键代码 io.netty.channel.ChannelInitializer#initChannel

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
            // 1.2.2.1 执行初始化!!!(调用前面添加的初始化器的initChannel方法)
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            // 1.2.2.2 移除初始化器!!!(调用完成后,移除初始化器)
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}
AbstractBootstrap#doBind0

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0

// 3.1 或 3.2 执行 doBind0
private static void doBind0(final ChannelFuture regFuture, 
                            final Channel channel,
                            final SocketAddress localAddress, 
                            final ChannelPromise promise) {

    // 1、确保执行是在nio eventLoop线程中执行
    // 2、绑定会从pipe的tail开始找,最终会到headContext中调用到AbstractUnsafe的bind方法
    channel.eventLoop().execute(new Runnable() {
        
        @Override
        public void run() {
            
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise)
                       .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
        
    });
}
AbstractUnsafe#bind

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // 记录日志...
    }

    boolean wasActive = isActive();
    try {
        // 3.3 【执行端口绑定】
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    
    // 从这里可以看出是绑定端口后,再去触发active事件的

    // 当前serverSocketChannel的pipeline已经添加了head-acceptor-tail处理器链, 
    // 并且已经绑定好端口了,所以这里触发pipeline上所有handler的active事件,
    // 接下来,去看HeadContext#channelActive方法
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 3.4 【触发 active 事件】
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}
- NioServerSocketChannel#doBind

3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        // 调用java原生channel的绑定端口的方法
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
- HeadContext#channelActive

3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

public void channelActive(ChannelHandlerContext ctx) {
    
    // 触发所有handler的active事件
    ctx.fireChannelActive();
    
    // 从这里可以看出是所有handler的active触发之后,再将channel注册感兴趣的事件的
    
	// 触发 read ,目的是为了触发channel的事件注册,注册OP_ACCEPT事件,
    //            见AbstractNioChannel#doBeginRead
    // (注意: NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)
    readIfIsAutoRead();
}
– AbstractNioChannel#doBeginRead

关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead

protected void doBeginRead() throws Exception {
    
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // readInterestOp 取值是 16,在NioServerSocketChannel创建时初始化好,代表关注 accept 事件
    if ((interestOps & readInterestOp) == 0) {
        // 注册感兴趣的事件!!!
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

1.2 NioEventLoop 剖析

在这里插入图片描述

NioEventLoop的重要组成

1、在NioEventLoop类中有成员变量

private Selector selector; 
private Selector unwrappedSelector;

2、在NioEventLoop的父类SingleThreadEventExecutor中有成员变量:

private volatile Thread thread;

// 使用的跟上面同1个thread
private final Executor executor; 

// 由于eventLoop是单线程,其它的任务先放在taskQueue任务队列中,然后由单线程依次执行
private final Queue<Runnable> taskQueue;

3、在NioEventLoop的父类的父类AbstractScheduledEventExecutor中有成员变量

// 用来处理定时任务
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务)

selector何时创建

在NioEventLoop的唯一构造方法中,创建了selector

NioEventLoop(NioEventLoopGroup parent, 
             Executor executor, 
             SelectorProvider selectorProvider,
             SelectStrategy strategy, 
             RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory taskQueueFactory, 
             EventLoopTaskQueueFactory tailTaskQueueFactory) {
    super(parent, executor, false, 
          newTaskQueue(taskQueueFactory), 
          newTaskQueue(tailTaskQueueFactory),
          rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    
    // 在这里创建selector
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

那为什么会有2个selector呢?

因为netty要把selector里面原来的selectionKey的set实现改为用数组实现,因为数组遍历的性能比set好!

nio线程在何时启动

(当首次调用eventLoop的execute方法时,会启动线程,并且state状态位控制只会启动1次。)

public class TestEventLoop {

    public static void main(String[] args) {

        NioEventLoopGroup group = new NioEventLoopGroup();
        group.next()
             // 入口
            .execute(() -> {
                System.out.println("Hello");
            });
    }

}
SingleThreadEventExecutor#execute

提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute

public void execute(Runnable task) {

    if (task == null) {
        throw new NullPointerException("task");
    }

    // 判断当前线程是否是eventLoop的thread,很显然,现在eventLoop的thread是null
    boolean inEventLoop = inEventLoop();

    // 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
    addTask(task);

    if (!inEventLoop) {

        // inEventLoop 如果为 false 表示由其它线程来调用 execute,
        // 即首次调用时,需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
        startThread();

        if (isShutdown()) {
            // 如果已经 shutdown,做拒绝逻辑,代码略...
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        // 如果线程由于 IO select 阻塞了,添加任务的线程需要负责唤醒 NioEventLoop 线程
        wakeup(inEventLoop);
    }
}

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 启动线程
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

private void doStartThread() {

    assert thread == null;

    // 这个executor是在 MultithreadEventExecutorGroup的构造方法中初始化的,
    //               (直接new的ThreadPerTaskExecutor)
    executor.execute(new Runnable() {
        @Override
        public void run() {

            // 将线程池的当前线程保存在成员变量中,以便后续使用
            // 将thread线程设置为执行线程
            // (所以eventLoopgroup中的executor属性的线程和thread属性是同一个线程)
            thread = Thread.currentThread();

            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下
                // 【启动 EventLoop 主循环 】
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally { 
                // 清理工作,代码略... 
            }
        }
    });
}

@Override
protected void wakeup(boolean inEventLoop) {

    // !eventLoop的理解: 只有其它非nio线程提交任务,才会有机会去唤醒selector停止阻塞
    //                  (因为如果是eventLoop自己提交任务给自己,在提交的时候,
    //                    当前eventLoop正在运行,没有阻塞,所以不需要唤醒selector)
    // wakenUp的理解: 当多个其它非nio线程提交任务,那么只会将selector唤醒1次
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {

        // 唤醒 select 阻塞线程
        // (这个wakeup调用后,如果selector正在select,那么直接唤醒,
        //   如果selector还没select,那么该selector去select时,就不会阻塞。
        //   类似于LockSupport的park和unpark。)
        selector.wakeup();
    }
}
*NioEventLoop#run

io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有定时任务,有没有 IO 事件,如果有,则执行。

// 死循环执行
for (;;) {
    try {
        try {
            
            //calculateStrategy 的逻辑如下:
            /* [代码] hasTasks ? selectNow() : SelectStrategy.SELECT; */
            // 当有任务时, 会执行一次selectNow(),去获取看看是否有io事件,
            //           并且会清除上一次的wakeup结果, 无论有没有IO事件,都会跳过switch
            // (因为有任务的话,即便没有io事件,也得干活,所以没有必要阻塞了)
            // 当没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
            // (因为没有任务的话,那就等有io事件了,再干活,所以就设置超时阻塞,
            //   同时还要看在阻塞期间有其它非nio线程提交任务,并唤醒selector。
            //   那么默认阻塞多久呢?那就需要看NioEventLoop#select(boolean oldWakenUp)方法
            //    默认是阻塞1s + 0.5ms,不过还得看有没有定时任务。)
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE: // -2
                    continue;

                case SelectStrategy.BUSY_WAIT: // -3

                case SelectStrategy.SELECT: // -1
                    
                    // 因为IO线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,
                    // 因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
                    // 进行 select 阻塞,并设置唤醒状态为 false
                    boolean oldWakenUp = wakenUp.getAndSet(false);

                    // 这里select方法中会调用select(timeoutMillis)阻塞,那么什么时候唤醒呢?
                    // 当有io事件时自动唤醒
                    // 或者超时自动唤醒
                    // 或者有任务提交时,手动唤醒以便及时处理io事件以外的普通任务
                    
                    // 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
                    // 下面的 select 方法不会阻塞
                    // 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
                    // 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
                    // 才能执行,让 select 方法无谓阻塞
                    select(oldWakenUp);

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
        } catch (IOException e) {
            rebuildSelector0();
            handleLoopException(e);
            continue;
        }

        // 有任务 或者 正在等待io事件但io事件还没来就被唤醒了 或者 io事件来了
        
        cancelledKeys = 0;
        needsToSelectAgain = false;
        
        // (如果eventLoop在执行非io任务的事件过长,势必会影响到io事件的处理)
        // ioRatio 默认是 50
        final int ioRatio = this.ioRatio;
        
        // 如果ioRatio设置为 100,那么会让普通任务都运行完。
        // 如果ioRatio不设置为100,那么会根据io事件处理的运行时间,算出普通任务可以运行的时间,
        //                       算出的这个时间仅仅是用来判断要不要继续运行下1个普通任务,
        //                       因此,如果1个普通任务本身耗时就特别长,
        //                            这里是没有中断这个任务的说法的,而且还得任务响应中断才行。
        if (ioRatio == 100) {
            try {
                processSelectedKeys();
            } finally {
                // ioRatio 为 100 时,总是运行完所有非 IO 任务
                runAllTasks();
            }
        } else {                
            final long ioStartTime = System.nanoTime();
            try {
                processSelectedKeys();
            } finally {
                // 记录 io 事件处理耗时
                final long ioTime = System.nanoTime() - ioStartTime;
                // 运行非 IO 任务,一旦超时会退出 runAllTasks
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        }
    } catch (Throwable t) {
        handleLoopException(t);
    }
    try {
        if (isShuttingDown()) {
            closeAll();
            if (confirmShutdown()) {
                return;
            }
        }
    } catch (Throwable t) {
        handleLoopException(t);
    }
}
NioEventLoop#select

io.netty.channel.nio.NioEventLoop#select

private void select(boolean oldWakenUp) throws IOException {
    
    Selector selector = this.selector;
    
    try {
        
        int selectCnt = 0;
        
        long currentTimeNanos = System.nanoTime();
        
        // 计算等待时间
        // * 没有 scheduledTask定时任务,超时时间为 1s
        // * 有 scheduledTas定时任务k,超时时间为 `下一个定时任务执行时间 - 当前时间`
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            
            long timeoutMillis = (selectDeadLineNanos 
                                  - currentTimeNanos 
                                  + 500000L) / 1000000L;
            
            // 如果超时,退出循环
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // 如果期间又有task退出循环,如果没这个判断,那么任务就会等到下次select超时时才能被执行
            // wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            // select 有限时阻塞
            // 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,
            // 导致不断空轮询,cpu 占用 100%
            // (所以就用了 selectCnt ++ 来统计次数,因为如果bug发生的话,循环会很快,
            //   这样selectCnt就会猛增,就检测到了)
            int selectedKeys = selector.select(timeoutMillis);
            // 计数加 1
            selectCnt ++;

            // 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
            if (selectedKeys != 0 
                || oldWakenUp 
                || wakenUp.get() 
                || hasTasks()
                || hasScheduledTasks()) {
                break;
            }
            
            if (Thread.interrupted()) {
               	// 线程被打断,退出循环
                // 记录日志
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            
            if (time-TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // 如果超时,计数重置为 1,下次循环就会 break
                selectCnt = 1;
            } 
            
            // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
            // 这是为了解决 nio 空轮询 bug
            // (重新创建1个selector,来替换原来的selector,来解决nio空轮询bug)
            else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 
                     && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                
                // 重建 selector
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            // 记录日志
        }
    } catch (CancelledKeyException e) {
        // 记录日志
    }
}
NioEventLoop#processSelectedKeys

处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys

private void processSelectedKeys() {
    if (selectedKeys != null) {
        // 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet 
        // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

private void processSelectedKeysOptimized() {
    
    // 遍历所有的selectionKey
    for (int i = 0; i < selectedKeys.size; ++i) {
        
        final SelectionKey k = selectedKeys.keys[i];
        
        // 获取完就置为null
        selectedKeys.keys[i] = null;

        // 附件就是 NioServerSocketChannel
        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            // 处理selectionKey
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}
- NioEventLoop#processSelectedKey

io.netty.channel.nio.NioEventLoop#processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    
    
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    
    // 当 key 取消或关闭时会导致这个 key 无效
    if (!k.isValid()) {
        // 无效时处理...
        return;
    }

    try {
        
        int readyOps = k.readyOps();
        
        // 连接事件
        // (这个是客户端需要监听处理的事件,服务端就不用管)
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
        
        // OP_ACCEPT 和 OP_CONNECT的理解:
        // ServerBootstrap用来处理服务器端,而Bootstrap用于客户端。当服务器绑定端口后,会注册OP_ACCEPT事件,等待客户端连接。一旦有连接进来,就会触发这个事件,然后创建子Channel来处理通信。而客户端在连接服务器时,会发起非阻塞的连接操作,这时候会注册OP_CONNECT,当连接建立完成后,触发该事件,之后就可以进行读写操作了
        // OP_CONNECT只是在连接过程中注册,一旦连接成功,就会触发,之后可能需要修改感兴趣的事件为OP_READ等。
        // 需要注意,当连接失败时,OP_CONNECT也会触发,这时候需要处理异常情况。比如,在Netty中,连接失败会触发相应的异常处理机制,比如channel的exceptionCaught方法
        // 总结来说,OP_ACCEPT是服务器端用于接收新连接,而OP_CONNECT是客户端用于处理连接建立完成的事件

        // 可读:当客户端或服务端的缓冲区有接收到数据,这时候,就会通知程序有数据可以读了,然后这里就会触发read事件,然后handler就使用channel去读取数据,假设这里在handler里面只读了1半数据,然后就不读了,就是说还有数据没有读,但是这个时候,就去处理下1个selectionKey,那么当调用下1次selector.select方法时,仍然会由于有数据要读取,而被唤醒,仍然是对应该channel的selectionKey的可读事件。
        // 可写:当客户端或服务端需要将数据发送出去,这时候,需要订阅可写事件,当发送缓冲区可写时,就会触发这个事件,然后使用channel将数据写出到缓冲区,假设1次没写完,那就需要继续订阅可写事件,直到全部数据写完,然后数据全部写完之后,取消订阅可写事件,然后又有数据需要发送,就再次订阅可写事件,写完之后,就取消订阅可写事件。
        
        // 可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        // 可读 或 可接入事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || 
            readyOps == 0) {
            // (这个方法同时处理 可接入 和 可读 事件,因为如果是NioServerSocketChannel它感兴趣的是OP_ACCEPT事件,而如果是NioSocketChannel它感兴趣的是OP_READ事件,对应的unsafe是不一样的。)
            // 如果是可接入 AbstractNioMessageChannel.NioMessageUnsafe#read
            // 如果是可读   AbstractNioByteChannel.NioByteUnsafe#read
            unsafe.read();
        }
        
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

1.3 accept 剖析

在这里插入图片描述

其中,前面3步,在NioEventLoop#processSelectedKey中已经做过分析了。

nio 中如下代码,在 netty 中的流程

//1 阻塞直到事件发生
selector.select();

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {    
    //2 拿到一个事件
    SelectionKey key = iter.next();
    
    //3 如果是 accept 事件
    if (key.isAcceptable()) {
        
        //4 执行 accept
        SocketChannel channel = serverSocketChannel.accept();
        channel.configureBlocking(false);
        
        //5 关注 read 事件
        channel.register(selector, SelectionKey.OP_READ);
    }
    // ...
}

入口代码

// 服务端
public class TestSourceServer {
    public static void main(String[] args) {
        new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    ch.pipeline().addLast(new LoggingHandler());
                }
            })
            .bind(8888);
    }
}
// 客户端
public class TestSourceClient {

    public static void main(String[] args) {

        new Bootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    ch.pipeline().addLast(new LoggingHandler());
                }
            })
            .connect(new InetSocketAddress("localhost", 8888));

    }

}

AbstractNioMessageChannel.NioMessageUnsafe#read

先来看可接入事件处理(accept)

io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();    
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {

                // doReadMessages中执行了accept并创建【NioSocketChannel】作为消息放入readBuf
                // readBuf 是一个 ArrayList 用来缓存消息
                // (看下面的doReadMessages方法)
                int localRead = doReadMessages(readBuf);

                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                // localRead 为 1,就一条消息,即接收一个客户端连接
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t; // 忽略暂时的异常
        }

        int size = readBuf.size();
        
        for (int i = 0; i < size; i ++) {
            
            readPending = false;
            
            // 触发 read 事件,让NioServerSocketChannel的pipelin上的 handler 处理,
            // 这时 肯定交给 ServerBootstrapAcceptor#channelRead
            pipeline.fireChannelRead(readBuf.get(i));
        }
        
        readBuf.clear();
        allocHandle.readComplete();
        // 触发读取完毕事件
        pipeline.fireChannelReadComplete();

        if (exception != null) {
            closed = closeOnReadError(exception);
            // 触发异常事件
            pipeline.fireExceptionCaught(exception);
        }

        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    
    // 获取到SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // 创建NioSocketChannel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

ServerBootstrapAcceptor#channelRead

关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    
    // 这时的 msg 是 NioSocketChannel
    final Channel child = (Channel) msg;

    // NioSocketChannel 添加  childHandler 即初始化器
    // (这里添加的是初始化器)
    child.pipeline().addLast(childHandler);

    // 设置选项
    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        
        // 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
AbstractChannel.AbstractUnsafe#register

又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 一些检查,略...

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        
        register0(promise);
        
    } else {
        try {
            
            // 这行代码完成的事实是 nio boss -> nio worker 线程的切换!!!
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // 调用注册的方法
                    register0(promise);
                }
            });
            
        } catch (Throwable t) {
            // 日志记录...
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
*AbstractUnsafe#register0

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
    try {
        
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        
        boolean firstRegistration = neverRegistered;
        
        // 这里面在 AbstractNioChannel 的 doRegister()方法中会将channel注册到selector上
        doRegister();
        
        neverRegistered = false;
        registered = true;
		
        //【关键代码,注意初始化器执行前后。这里将会为NiosocketChannel添加自定义的handler。】
        // 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
        pipeline.invokeHandlerAddedIfNeeded();
        // 执行后就是 head -> logging handler -> my handler -> tail

        // 上面将客户端的channel已经配置好了,所以通知promise已经成功设置了
        safeSetSuccess(promise);
        
        // 触发handler的 channelRegistered事件
        pipeline.fireChannelRegistered();
        
        if (isActive()) {
            if (firstRegistration) {
                // 触发 pipeline 上 active 事件
                // (这里就会在HeadContext#channelActive中让channel关注可读事件)
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
- HeadContext#channelActive

回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

public void channelActive(ChannelHandlerContext ctx) {

    ctx.fireChannelActive();

    // 触发read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
    // (注册可读事件)
    readIfIsAutoRead();
}

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        // 进入该调用,经过pipeline逐链调用,会来到HeadContext的read方法
        channel.read();
    }
}

@Override
public void read(ChannelHandlerContext ctx) {
    // 接着这里调用到了AbstractNioChannel#doBeginRead
    unsafe.beginRead();
}
–AbstractNioChannel#doBeginRead

io.netty.channel.nio.AbstractNioChannel#doBeginRead

protected void doBeginRead() throws Exception {
    
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;
    
	// 这时候 interestOps 是 0
    final int interestOps = selectionKey.interestOps();
    
    if ((interestOps & readInterestOp) == 0) {
        
        // 关注 read 事件!!!
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

1.4 read 剖析

接着NioEventLoop#processSelectedKey的那节,当对方发送消息来时。

AbstractNioByteChannel.NioByteUnsafe#read

再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete

public final void read() {

    final ChannelConfig config = config();

    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }

    final ChannelPipeline pipeline = pipeline();

    // io.netty.allocator.type 决定 allocator 的实现
    final ByteBufAllocator allocator = config.getAllocator();

    // 用来分配 byteBuf,确定单次读取大小
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

    allocHandle.reset(config);

    ByteBuf byteBuf = null;

    boolean close = false;
    try {
        do {

            byteBuf = allocHandle.allocate(allocator);

            // 上面还是空的byteBuf

            // 读取(这里就会将缓冲区中的数据读取到byteBuf中,
            //      调用NioSocketChannel#doReadBytes)
            allocHandle.lastBytesRead(doReadBytes(byteBuf));

            if (allocHandle.lastBytesRead() <= 0) {
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            
            // 触发read事件,让pipeline上的handler处理,这时是处理 NioSocketChannel上的 handler
            pipeline.fireChannelRead(byteBuf);
            
            byteBuf = null;
        } 
        // 是否要继续循环
        while (allocHandle.continueReading());

        allocHandle.readComplete();
        
        // 触发 read complete 事件
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
NioSocketChannel#doReadBytes
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    
    // 读取数据
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
MaxMessageHandle#continueReading

io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return 
           // 一般为 true
           config.isAutoRead() &&
           // respectMaybeMoreData 默认为 true
           // maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
           (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
           // 小于最大次数,maxMessagePerRead 默认 16
           totalMessages < maxMessagePerRead &&
           // 实际读到了数据
           totalBytesRead > 0;
}

1.5 write剖析

public class TestSourceServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new LoggingHandler());

                    }
                })
                .bind(8888);
    }
}
public class TestSourceClient {

    public static void main(String[] args) {

        new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new LoggingHandler());
                        ch.pipeline().addLast(new ChannelDuplexHandler() {
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                ctx.channel().write("halo");
                            }
                        });
                    }
                })
                .connect(new InetSocketAddress("localhost", 8888));

    }

}

HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

write:写队列

我们来看看channel中unsafe的write方法,先来看看其中的一个属性

AbstractUnsafe

protected abstract class AbstractUnsafe implements Unsafe {
    private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);

我们来看看 ChannelOutboundBuffer 这个类

public final class ChannelOutboundBuffer {
    private final Channel channel;
    private ChannelOutboundBuffer.Entry flushedEntry;
    private ChannelOutboundBuffer.Entry unflushedEntry;
    private ChannelOutboundBuffer.Entry tailEntry;

ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。其中的属性我们下面会详细讲

我们回到正题,接着看 unsafe.write(msg, promise);

AbstractUnsafe

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

1.调用 filterOutboundMessage() 方法,将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer

@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }

        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

2.接下来,估算出需要写入的ByteBuf的size
3.最后,调用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情

ChannelOutboundBuffer

public void addMessage(Object msg, int size, ChannelPromise promise) {
    // 创建一个待写出的消息节点
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    incrementPendingOutboundBytes(size, false);
}

想要理解上面这段代码,必须得掌握写缓存中的几个消息指针,如下图

在这里插入图片描述
hannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise,下面分别是三个指针的作用

1.flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点
2.unFlushedEntry 指针表示第一个未被写入到操作系统Socket缓冲区中的节点
3.tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个节点

初次调用 addMessage 之后,各个指针的情况为

在这里插入图片描述
fushedEntry指向空,unFushedEntry和 tailEntry 都指向新加入的节点

第二次调用 addMessage之后,各个指针的情况为
在这里插入图片描述
第n次调用 addMessage之后,各个指针的情况为
在这里插入图片描述
可以看到,调用n次addMessage,flushedEntry指针一直指向NULL,表示现在还未有节点需要写出到Socket缓冲区,而unFushedEntry之后有n个节点,表示当前还有n个节点尚未写出到Socket缓冲区中去

flush:刷新写队列

不管调用channel.flush(),还是ctx.flush(),最终都会落地到pipeline中的head节点

HeadContext

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

之后进入到AbstractUnsafe

AbstractUnsafe

public final void flush() {
   assertEventLoop();

   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
   if (outboundBuffer == null) {
       return;
   }

   outboundBuffer.addFlush();
   flush0();
}

flush方法中,先调用 outboundBuffer.addFlush();

ChannelOutboundBuffer

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedEntry = null;
    }
}

可以结合前面的图来看,首先拿到 unflushedEntry 指针,然后将 flushedEntry 指向unflushedEntry所指向的节点,调用完毕之后,三个指针的情况如下所示
在这里插入图片描述

相当于所有的节点都即将开始推送出去

接下来,调用 flush0();

AbstractUnsafe

protected void flush0() {
    doWrite(outboundBuffer);
}

发现这里的核心代码就一个 doWrite,继续跟

AbstractNioByteChannel

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
        // 拿到第一个需要flush的节点的数据
        Object msg = in.current();

        if (msg instanceof ByteBuf) {
            // 强转为ByteBuf,若发现没有数据可读,直接删除该节点
            ByteBuf buf = (ByteBuf) msg;

            boolean done = false;
            long flushedAmount = 0;
            // 拿到自旋锁迭代次数
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            // 自旋,将当前节点写出
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            // 写完之后,将当前节点删除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}

这里略微有点复杂,我们分析一下

1.第一步,调用current()先拿到第一个需要flush的节点的数据

ChannelOutBoundBuffer

public Object current() {
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;
    }

    return entry.msg;
}

2.第二步,拿到自旋锁的迭代次数

if (writeSpinCount == -1) {
    writeSpinCount = config().getWriteSpinCount();
}

3.自旋的方式将ByteBuf写出到jdk nio的Channel

for (int i = writeSpinCount - 1; i >= 0; i --) {
    int localFlushedAmount = doWriteBytes(buf);
    if (localFlushedAmount == 0) {
        setOpWrite = true;
        break;
    }

    flushedAmount += localFlushedAmount;
    if (!buf.isReadable()) {
        done = true;
        break;
    }
}

doWriteBytes 方法跟进去

protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

我们发现,出现了 javaChannel(),表明已经进入到了jdk nio Channel的领域,我们来看看 buf.readBytes(javaChannel(), expectedWrittenBytes);

public int readBytes(GatheringByteChannel out, int length) throws IOException {
    this.checkReadableBytes(length);
    int readBytes = this.getBytes(this.readerIndex, out, length);
    this.readerIndex += readBytes;
    return readBytes;
}

我们来看关键代码 this.getBytes(this.readerIndex, out, length)

private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
    this.checkIndex(index, length);
    if (length == 0) {
        return 0;
    } else {
        ByteBuffer tmpBuf;
        if (internal) {
            tmpBuf = this.internalNioBuffer();
        } else {
            tmpBuf = ((ByteBuffer)this.memory).duplicate();
        }

        index = this.idx(index);
        tmpBuf.clear().position(index).limit(index + length);
        //将tmpBuf中的数据写到out中
        return out.write(tmpBuf);
    }
}

我们来看看out.write(tmpBuf)

public int write(ByteBuffer src) throws IOException {
    ensureOpen();
    if (!writable)
        throw new NonWritableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                n = IOUtil.write(fd, src, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

和read实现一样,SocketChannelImpl的write方法通过IOUtil的write实现:关键代码 n = IOUtil.write(fd, src, -1, nd);

static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
    //如果是DirectBuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送
    if (var1 instanceof DirectBuffer) {
        return writeFromNativeBuffer(var0, var1, var2, var4);
    } else {
        //非DirectBuffer
        //获取已经读取到的位置
        int var5 = var1.position();
        //获取可以读到的位置
        int var6 = var1.limit();

        assert var5 <= var6;
        //申请一个原buffer可读大小的DirectByteBuffer
        int var7 = var5 <= var6 ? var6 - var5 : 0;
        ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);

        int var10;
        try {

            var8.put(var1);
            var8.flip();
            var1.position(var5);
            //通过DirectBuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送
            int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
            if (var9 > 0) {
                var1.position(var5 + var9);
            }

            var10 = var9;
        } finally {
            //回收分配的DirectByteBuffer
            Util.offerFirstTemporaryDirectBuffer(var8);
        }

        return var10;
    }
}

代码逻辑我们就不再讲了,代码注释已经很清楚了,这里我们关注一点,我们可以看看我们前面的一个方法 filterOutboundMessage(),将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer

说明到了这一步所有的 var1 意境是直接内存DirectBuffer,就不需要走到else,就不需要write两次了

4.删除该节点

节点的数据已经写入完毕,接下来就需要删除该节点

ChannelOutBoundBuffer

public boolean remove() {
    Entry e = flushedEntry;
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
    }

    // recycle the entry
    e.recycle();

    return true;
}

首先拿到当前被flush掉的节点(flushedEntry所指),然后拿到该节点的回调对象 ChannelPromise, 调用 removeEntry()方法移除该节点

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        flushedEntry = null;
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        flushedEntry = e.next;
    }
}

这里的remove是逻辑移除,只是将flushedEntry指针移到下个节点,调用完毕之后,节点图示如下
在这里插入图片描述

writeAndFlush: 写队列并刷新

理解了write和flush这两个过程,writeAndFlush 也就不难了

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } 
}

可以看到,最终,通过一个boolean变量,表示是调用 invokeWriteAndFlush,还是 invokeWrite,invokeWrite便是我们上文中的write过程

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    invokeWrite0(msg, promise);
    invokeFlush0();
}

可以看到,最终调用的底层方法和单独调用 write 和 flush 是一样的

private void invokeWrite(Object msg, ChannelPromise promise) {
        invokeWrite0(msg, promise);
}

private void invokeFlush(Object msg, ChannelPromise promise) {
        invokeFlush0(msg, promise);
}

由此看来,invokeWriteAndFlush基本等价于write方法之后再来一次flush

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2315725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【HarmonyOS Next之旅】基于ArkTS开发(三) -> 兼容JS的类Web开发(七) -> JS动画(二)

目录 1 -> 动画动效 1.1 -> 创建动画对象 1.2 -> 添加动画事件和调用接口 2 -> 动画帧 2.1 -> 请求动画帧 2.2 -> 取消动画帧 1 -> 动画动效 通过设置插值器来实现动画效果。 说明 从API Version 6 开始支持。 1.1 -> 创建动画对象 通过cre…

LINUX下的tcp协议

TCP 1. 面向数据流&#xff08;流式套接字&#xff09; 2. 建立连接 3. 安全可靠的传输协议 应用场景&#xff1a; HTTP, MQTT, FTP 三次握手&#xff1a;TCP建立连接时&#xff0c;需要进行三次握手&#xff0c;确保收发数据前&#xff0c;双方都已准备就绪。 四次挥…

Handy Multi Agent—task1:CAMEL环境配置及你的第一个Agent

目录 1.1 获取CAMEL 1.1.1 通过 PyPI 安装 1.1.2 通过源码安装 1.1.2.1 使用 Poetry 工具从源码安装 1.1.2.2 使用Conda和Pip从源码安装 1.2.2 使用API调用模型 1.2.2.1 使用语言模型 1.2.2.2 使用多模态模型 1.2.2.3 视频理解 1.2.2.4 作业1 1.2.2.5 作业2 1.1 获取…

CSS元素层叠顺序规则

CSS元素层叠顺序规则 看图说话总结: background/borderz-index(<0)blockfloatinline/inline-blockz-index(0,auto)z-index (>0)

微服务全局ID方案汇总

自增id 对于大多数系统来说&#xff0c;使用mysql的自增id当作主键再最合适不过了。在数据库层面就可以获取一个顺序的、唯一的、空间占用少的id。 自增id需要是 int、bigint这些整数类型&#xff0c;uint 支持 40 亿的数据量&#xff0c;bigint unsign&#xff08;0 &#x…

【论文笔记】Contrastive Learning for Compact Single Image Dehazing(AECR-Net)

文章目录 问题创新网络主要贡献Autoencoder-like Dehazing NetworkAdaptive Mixup for Feature PreservingDynamic Feature Enhancement1. 可变形卷积的使用2. 扩展感受野3. 减少网格伪影4. 融合空间结构信息 Contrastive Regularization1. 核心思想2. 正样本对和负样本对的构建…

vue项目如何实现条件查询?

目录 1.前端 2.后端 3.mybatis的sql语句 结语 1.前端 说白了就是&#xff0c;无论该参数是否是空字符串&#xff0c;都会传递到后端。&#xff08;反正不是null就行&#xff09;。 2.后端 在controller层中&#xff0c;使用RequestParam注解接收名为registerName的参数&…

在Linux中安装Nginx

上传nginx安装包 Nginx的安装包&#xff0c;从官方下载下来的是c语言的源码包&#xff0c;我们需要自己编译安装。具体操作步骤如下&#xff1a; 安装nginx 安装nginx运行时需要的依赖 yum install -y pcre pcre-devel zlib zlib-devel openssl openssl-devel 解压源码包到当…

【每日学点HarmonyOS Next知识】状态栏字体、生命周期、自定义对话框屏幕中间、透明度、tab居中

1、HarmonyOS 单页面如何控制状态栏字体颜色&#xff1f; 状态栏字体颜色可通过设置statusBarContentColor修改&#xff0c;参考文档如下&#xff1a; https://developer.huawei.com/consumer/cn/doc/harmonyos-references-V5/js-apis-window-V5 参考代码&#xff1a; import…

外贸企业可以申请网络专线吗?

在对外业务不断扩大的情况下&#xff0c;外贸企业对网络的需求愈发迫切。稳定、快速的网络连接不仅是企业开展国际业务的基础&#xff0c;更是提升竞争力的关键。外贸企业是否可以申请网络专线&#xff1f;如何选择适合的外贸网络专线服务&#xff1f;本文将为您详细解答。 网络…

阿里巴巴发布 R1-Omni:首个基于 RLVR 的全模态大语言模型,用于情感识别

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

QuickAPI 和 DBAPI 谁更香?SQL生成API工具的硬核对比(一)

最近低代码开发火得不行&#xff0c;尤其是能把数据库秒变API的工具&#xff0c;简直是开发者的救星。今天咱就聊聊两款国内玩家&#xff1a;QuickAPI&#xff08;麦聪软件搞出来的低代码神器&#xff09;和 DBAPI&#xff08;开源社区的硬核作品&#xff09;。这两货都能靠SQL…

Git使用(一)--如何在 Windows 上安装 Git:详细步骤指南

如果你想在 Windows 机器上安装 Git&#xff0c;可以按照以下详细指南进行操作。 第一步&#xff1a;下载 Git 可通过官网下载 适用于 Windows 的 Git 最新版本。 如果下载速度较慢&#xff0c;可以通过下面提供的百度网盘 链接下载安装包&#xff0c; https://git-scm.com/d…

C#-使用VisualStudio编译C#工程

一.创建csproj文件 二.创建源cs文件 三.生成解决方案 四.运行解决方案 五.VisualStudio功能列表 <1.代码格式化: CtrlKD完成代码整体格式化 <2.窗口布局 窗口->重置窗口布局 <3.引用查找&关联 <4.包管理 <5.日志输出级别 工具->选项->项目解决方案…

Qt常见面试题合集

零、基本概念 什么是信号槽? 信号槽类似于软件设计模式中的观察者模式&#xff0c;&#xff08;观察者模式是一种对象行为模式。它定义对象间的一种一对多的依赖关系&#xff0c;当一个对象的状态发生改变时&#xff0c;所有依赖于它的对象都得到通知并被自动更新。&#xf…

vscode编译器的一些使用问题

目录 解决pip不可用问题 检查VSCode的终端配置 解决pip不可用问题 eg&#xff1a; C:\Users\student>pip pip 不是内部或外部命令&#xff0c;也不是可运行的程序或批处理文件。 先找到系统环境变量 高级->环境变量 系统属性->Path 变量名随意&#xff0c;自己后续知道…

Docker 》》Docker Compose 》》network 网络 compose

docker 默认的网络 三种模式 # 列出所有当前主机上或Swarm集群上的网络 docker network ls#查看网络详情 docker network inspect network名称# 清除未使用的docker网络 docker network prune -f# 创建网络 ocker network create -d bridge 网络名称 docker network create –s…

【SpringMVC】深入解析使用 Postman 和浏览器模拟将单个与多个参数传递到后端的原理和后端接收参数的过程

SpringMVC—请求(Request) 访问不同的路径&#xff0c;就是发送不同的请求&#xff1b;在发送请求时&#xff0c;可能会带一些参数&#xff0c;所以学习Spring的请求&#xff0c;主要是学习如何传递参数到后端以及后端如何接收&#xff1b; 我们主要是使用 浏览器 和 Postman …

VSTO(C#)Excel开发10:启动和卸载顺序 事件处理 监视变化

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…

vue 仿deepseek前端开发一个对话界面

后端&#xff1a;调用deepseek的api&#xff0c;所以返回数据格式和deepseek相同 {"model": "DeepSeek-R1-Distill-Qwen-1.5B", "choices": [{"index": 0, "delta": {"role": "assistant", "cont…