【Netty 源码】服务端启动流程源码分析 篇一
1.原生Java NIO服务端创建流程
使用Java NIO创建服务端时,通常我们需要先创建Channel,Selector两个对象,然后将Channel绑定端口并注册到Selector上,最后对事件轮询监听
//第一步:创建Channel
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
//第二步:创建selector
Selector selector = Selector.open();
//第三步:将Channel注册到selector
SelectionKey selectionKey = channel.register(selector, 0, new Object());
//第四步:Channel监听端口
channel.bind(new InetSocketAddress(8080));
//第五步:关注感兴趣的事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
2.Netty 服务端创建流程
Netty的服务端创建流程都在 ServerBootstrap.bind方法中完成
private ChannelFuture doBind(final SocketAddress localAddress) {
//创建 channel并初始化
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//不能肯定register完成,因为register是丢到nio event loop里面执行去了。
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//进一步进行绑定操作(此处的绑定指的是将 channel 绑定 selector)
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//等着register完成后再通知再执行bind
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
2.1 initAndRegister() 初始化Channel并将Channel注册到Selector
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//通过工厂类创建Channel
channel = channelFactory.newChannel();
//完成Channel初始化
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//开始register
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
通过堆栈信息可以看到此方法由main线程进行调用
channelFactory.newChannel()通过DEBUG追踪,调用的 io.netty.channel.ReflectiveChannelFactory#newChannel,底层通过反射无参构造,创建的Channel
public T newChannel() {
try {
//反射创建channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
·ChannelFactory 是一个接口类,只有一个抽象方法 newChannel() ,在ServerBootstrap.channel 方法执行时,赋值
io.netty.bootstrap.AbstractBootstrap#channel
io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)
`NioServerSocketChannel 无参构造被调用的时候会执行 newSocket 方法
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
参数 DEFAULT_SELECTOR_PROVIDER 被 SelectorProvider.provider() 赋值,不同的平台下 SelectorProvider 实现类不一样
最终通过 provider.openServerSocketChannel() 创建一个Channel
io.netty.channel.socket.nio.NioServerSocketChannel#newSocket
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
可以看到这里Netty的做法跟原生的Java NIO 是一样的
2.1.1 init() 初始化Channel,并添加ServerBootstrapAcceptor 处理器
@Override
void init(Channel channel) {
//参数配置,后面再看 此时跳过·
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
//此时Channel已经创建出来了,拿到pipeline,准备添加Handler
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
//ChannelInitializer一次性、初始化handler:
//负责添加一个ServerBootstrapAcceptor handler,添加完后,自己就移除了:
//ServerBootstrapAcceptor handler: 负责接收客户端连接创建连接后,对连接的初始化工作。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//将ServerBootstrap.Handler()方法设置的Handler添加到pipeline中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//获取一个EventLoop,并提交往pipeline中添加ServerBootstrapAcceptor Handler的任务
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
·ServerBootstrapAcceptor 继承自 ChannelInboundHandlerAdapter ,ServerBootstrapAcceptor作用就是当发生连接事件时与Channel建立连接。
需要注意的时,此时只是往pipeline中添加了一个Handler,并没有真正执行。
2.1.2 register(channel) 将Channel注册到EventLoop
ChannelFuture regFuture = config().group().register(channel);
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
promise.channel().unsafe().register(this, promise);
io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
//此时还是main线程,因此走false逻辑
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//将任务交由EventLoop执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
2.2.3 register0()真正执行Channel注册
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//将Channel注册到Selector上
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
//server socket的注册不会走进下面if,server socket接受连接创建的socket可以走进去。因为accept后就active了。
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
doRegister 方法将调用父类AbstractNioChannel
io.netty.channel.nio.AbstractNioChannel#doRegister
protected void doRegister() throws Exception {
boolean selected = false;
//死循环注册Channel,0没有绑定事件,并将Channel作为附件,方便后续取出使用
for (;;) {
try {
logger.info("initial register: " + 0);
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
此处是一个死循环,确保一定会注册上 并且将Channel作为一个附件,方便取出使用。每个eventLoop都维护了一个Selector。
回到register0方法中,在将Channel注册到Selector上后,执行pipeline.invokeHandlerAddedIfNeeded();
io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
此时将调用 init()方法中,往pipeline中添加的ChannelInitializer.initChannel()。由此可知,ChannelInitializer.initChannel 只会被调用一次,就是在Channel实例化后,成功注册到Selector上。
此时Channel已经注册到Selector上了,因此重新获取一个EventLoop后,提交任务往pipeline中添加ServerBootstrapAcceptor Handler,负责Channel建立连接事件
在回到 register0 方法,执行完Handler后,将调用 safeSetSuccess(promise) 放promise中设置结果,进而触发promise的监听器,执行doBind0()方法
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#doBind0
将任务交给EventLoop执行
io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
每个pipeline默认都有tail和head两个Handler,此时的bind方法将交由tail执行
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
//任务交由了EventLoop执行,因此走true逻辑
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
invokeBind 方法调用链比较深,最终走到io.netty.channel.socket.nio.NioServerSocketChannel#doBind
`ServerSocketChannel继承自父类ServerSocketChannel,拿到的就是java的ServerSocketChannel,如果java版本大于7,最终通过ServerSocketChannelImpl.bind方法绑定端口号,并设置是否阻塞
执行doBind方法后,AbstractUnsafe将给EventLoop提交pipeline.fireChannelActive 任务。
io.netty.channel.AbstractChannel.AbstractUnsafe#bind
此时的pipeline中有三个Handler,分别是head,tail,acceptor。fireChannelActive 方法将调用他们的channelActive方法。
我们主要看 head.channelActive
io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
调用链比较深,最终会回到AbstractNioChannel
io.netty.channel.nio.AbstractNioChannel#doBeginRead