Netty源码解析——事件循环
- Netty事件循环
- 源码解析
- select()
- processSelectedKeys()
- NioMessageUnsafe#read()
- NioByteUnsafe#read()
- runAllTasks()
Netty事件循环
当Netty服务端启动起来以后,就可以接受客户端发送的请求,接收到客户端发来的请求后就会有事件就绪,有事件就绪就会在Netty的事件循环中被监听到并处理,我们下面看看Netty事件循环的逻辑。
Netty中的NIO事件循环的代码位于NioEventLoop的run()方法内部,这个方法总体是一个for(;;)死循环,然后for循环里头依次执行的三个重要方法分别是:
- select():调用Selector#select()方法监听注册到Selector上的Channel,等待Channel关注的事件就绪。
- processSelectedKeys():处理就绪事件,会调用ChannelPipeline处理,ChannelPipeline又会通过责任链模式调用里面的ChannelHandler处理。
- runAllTasks():处理NioEventLoop中的taskQueue的异步任务。
这就是Netty事件循环的大体逻辑,下面我们进入代码解析。
源码解析
select()
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
在NioEventLoop的事件循环中,首先是select()方法的调用,select()方法里面就是调用NioEventLoop对应的Selector的select()方法,阻塞当前线程,监听注册到Selector的Channel,等待事件就绪。
当有事件就绪时,当前线程就会解阻塞,然后调用processSelectedKeys()方法处理就绪事件。
processSelectedKeys()
NioEventLoop的 processSelectedKeys()方法会进入processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (...) {...}
}
无论是accept事件,还是read事件时,都是调用unsafe.read()方法。
只是这里的Unsafe的类型有可能不一样,如果是NioServerSocketChannel的话,那么Unsafe的类型就是NioMessageUnsafe,是在创建NioServerSocketChannel时就创建好的,我们看一下NioMessageUnsafe的read()方法。
NioMessageUnsafe#read()
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
...
doReadMessages(readBuf);
...
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
...
pipeline.fireChannelRead(readBuf.get(i));
}
...
}
}
重要方法就两个doReadMessages(readBuf)和pipeline.fireChannelRead(readBuf.get(i)),其他代码全部省略不看。
我们先看doReadMessages(readBuf)
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
...
buf.add(new NioSocketChannel(this, ch));
...
}
SocketUtils.accept(javaChannel())里面是调用ServerSocketChannel的accept()方法,返回一个SocketChannel。
然后new NioSocketChannel(this, ch)把返回的SocketChannel包装成NioSocketChannel,放入buf中,这个buf就是外面read()方法的readBuf。
NioSocketChannel的构造方法调用父类AbstractNioByteChannel的构造方法:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
可以看到指定关注的事件类型是read事件,再看看AbstractNioByteChannel的父类AbstractNioChannel的构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (...) {...}
}
继续调用父类的构造方法,然后保存了NioSocketChannel和关注的事件类型OP_READ,设置NioSocketChannel为非阻塞。
再看AbstractNioChannel的父类AbstractChannel的构造方法
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
创建一个Unsafe,然后初始化了ChannelPipeline。newUnsafe()方法会进入NioSocketChannel的newUnsafe()方法,创建的时NioSocketChannelUnsafe类型的Unsafe,因此NioSocketChannel的Unsafe类型就是NioSocketChannelUnsafe。
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
NioSocketChannel的构造方法总结起来就是做了5件事:
- 创建并保存NioSocketChannelUnsafe
- 创建并保存ChannelPipeline
- 保存SocketChannel
- 保存关注的事件类型OP_READ
- 设置SocketChannel为非阻塞
然后回到unsafe.read()方法,接下来执行的代码pipeline.fireChannelRead(readBuf.get(i))就是调用触发就绪事件的Channel对应的ChannelPipeline的fireChannelRead(…)方法,触发ChannelPipeline中每个处理入站事件的ChannelHandler的入站事件处理。
ChannelPipeline的fireChannelRead(…)方法会从头到尾以责任链的处理方式调用每个ChannelInboundHandler类型的channelRead(…)方法。NioServerSocketChannel的ChannelPipeline中的ChannelHandler是ServerBootstrapAcceptor。我们看看ServerBootstrapAcceptor的channelRead(…)方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {...});
} catch (...) {...}
}
这里的参数msg的类型是NioSocketChannel,就是上面放入buf中的NioSocketChannel,调用pipeline.fireChannelRead(readBuf.get(i))方法时readBuf.get(i)就是中buf中取出NioSocketChannel作为参数传进来。
然后child.pipeline().addLast(childHandler)这一行这里的childHandler是我们定义的ChannelInitializer,这里放入NioSocketChannel的ChannelPipeline中,当NioSocketChannel里面的SocketChannel被注册到Selector之后,会触发ChannelInitializer的调用,初始化ChannelPipeline。
然后childGroup.register(child)就是把这个NioSocketChannel注册到workerGroup中的其中一个NioEventLoop上,也就是把NioSocketChannel中的SocketChannel注册到workerGroup中的其中一个NioEventLoop的Selector上。
这里NioSocketChannel注册的细节跟NioServerSocketChannel的注册是一样的,上一篇文章已经分析过,这里就不重复了。
NioSocketChannel注册好之后,就可以接收客户端发来的数据,于是又有read事件触发,此时调用的unsafe.read(),就是NioSocketChannelUnsafe的父类NioByteUnsafe的read()方法了。
NioByteUnsafe#read()
@Override
public final void read() {
...
byteBuf = allocHandle.allocate(allocator);
doReadBytes(byteBuf);
...
pipeline.fireChannelRead(byteBuf);
...
}
就是通过allocator分配一个ByteBuf,然后把Channel中的数据读取到ByteBuf中,然后调用pipeline.fireChannelRead(byteBuf)触发ChannelPipeline的处理,然后ChannelPipeline中的ChannelHandler就会处理byteBuf中的数据,这里的ChannelPipeline中的ChannelHandler就是我们定义的ChannelInitializer组装到ChannelPipeline中的ChannelHandler了。
runAllTasks()
runAlllTasks方法其实就是从NioEventLoop的taskQueue中不停的取出task并执行。
protected boolean runAllTasks(long timeoutNanos) {
...
Runnable task = pollTask();
...
for (;;) {
safeExecute(task);
...
task = pollTask();
if (task == null) {
...
break;
}
}
...
}
可以看到,就是在一个for循环中,pollTask()方法取出task,然后在下一轮循环中调用safeExecute(task)去执行,safeExecute(task)里面就是调用task.run()方法直接执行,没什么好看的。如果pollTask()方法取出的task为null,那么就break结束循环。