背景
Netty程序有固定的模板格式,以ServerBootstrap为例:
public class NettyServer {
public void start(int port) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boosGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(1);
try {
serverBootstrap.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childOption(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) {
channel.pipeline().addLast(new StringEncoder());
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new MyChannelInboundHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception exception) {
System.out.println(exception);
} finally {
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private static class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
}
}
public static void main(String[] args) {
new NettyServer().start(9998);
}
}
将负责处理连接的和负责业务处理的NioEventLoopGroup线程池对象、通道类型、处理器Handler、子处理器Handler、连接配置等保存至ServerBootstrap对象,然后调用ServerBootstrap对象的bind方法启动Netty程序。
可以将ServerBootstrap和Bootstrap理解为启动引导器,将信息告知引导器,然后由引导器负责完成后续的启动流程。介绍启动流程前,需要了解一下ServerBootstrap和Bootstrap。
1.ServerBootstrap与Bootstrap
ServerBootstrap和Bootstrap作为Netty的启动引导类,分别用于启动Netty服务端和客户端。
首先看一下类的定义,二者都继承自AbstractBootstrap, 继承关系如下图:
AbstractBootstrap的核心属性:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private volatile SocketAddress localAddress;
volatile EventLoopGroup group;
private volatile ChannelHandler handler;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
//...
}
localAddress用于记录本地ip和端口;group用于记录Netty使用的线程池对象NioEventLoopGroup;handler用于记录处理消息的ChannelHandler(后续加入Pipline);options和attrs记录属性,后续将被添加到通道上。
Bootstrap的核心属性如下所示:
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private final BootstrapConfig config = new BootstrapConfig(this);
private volatile SocketAddress remoteAddress;
//...
}
Bootstrap用于启动客户端,因此需要指定远程服务器地址,保存在remoteAddress属性中;config属性保存了Bootstrap的详细信息,后续启动过程中,可直接通过该属性获取配置信息。
ServerBootstrap的核心属性如下所示:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
}
ServerBootstrap用于启动服务器,与Bootstrap相同,选择将所有信息保存在配置对象config中。
由于服务端接收客户端连接后,还需要创建子通道,并将通道信息注册到选择器上;因此需要保存子通道相关的信息:childGroup、childHandler、childOptions和childAttrs.
鉴于服务端逻辑较为复杂, 且对异步流程的设计更为巧秒,本文选择服务端启动为例进行介绍Netty的启动流程。
2.启动流程图
整体的启动流程如下图所示:
Netty的启动流程整体上看由两个线程完成:启动线程和NioEventLoop线程。NioEventLoop的启动由启动线程触发,启动线程执行创建和配置通道、启动NioEventLoop等主线任务,将注册和绑定等操作委托给NioEventLoop线程执行;线程间通过Future异步交互。以下将结合代码分章节对上图进行介绍。
建议阅读本文前先阅读Netty系列的其他文章。
3.启动流程
启动的入口函数为serverBootstrap.bind(port)
:
// 调用异步方法bind得到ChannelFuture,并调用sync阻塞等待异步逻辑执行完成
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 持续监听,直到关闭Netty
channelFuture.channel().closeFuture().sync();
跟踪bind方法:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
// doBind前的校验
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
校验并运行doBind方法,所有核心逻辑均在doBind方法中。
3.1 doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1.initAndRegister
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 2.doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
// 2.doBind0
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind的核心方法是initAndRegister和doBind0,前者完成通道的创建、初始化和注册,后者执行绑定操作。initAndRegister成功执行完成,doBind0才可以执行。
上述代码可以简化为:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1.initAndRegister
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
ChannelPromise promise = channel.newPromise();
// 阻塞等待regFuture异步逻辑执行完成
regFuture.sync();
// 执行doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
之所以程序写的则这么复杂,是提高了程序效率:执行donBind的线程(这里是主线程)退出去可以执行其他任务,而不用阻塞在此。
ChannelFuture channelFuture = serverBootstrap.bind(port);
//... 执行其他任务
channelFuture.sync();
源码中根据regFuture异步注册是否完成有两个分支:
[1] 已完成,直接调用doBind0;
[2] 未完成,封装PendingRegistrationPromise对象,向regFuture添加监听器并返回PendingRegistrationPromise对象。后续阻塞在PendingRegistrationPromise上的线程依赖监听器唤醒(注册完成后唤醒,详细唤醒流程参考第二章的启动流程图);
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 2.doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
// 2.doBind0
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
说明:分支[1] 在使用Netty过程中基本不会发生,是对一种小概率事件的效率优化。
后续将介绍注册流程需要经历:创建任务、任务加入队列、启动线程、从任务队列获取任务并执行、设置promise结果等操作;其中启动线程需要等待CPU的调度,除非设置给ServerBootstrap的NioEventLoopGroup对象的线程已被启动过(实际开发一般为新创建),否则启动netty的线程进入上述判断逻辑的时间点将绝大概率早于注册流程完成时间点。
理解上述逻辑后,initAndRegister和doBind0属于时间上顺序执行的逻辑,以下分小节分别介绍。
3.2 initAndRegister
initAndRegister忽略异常分支的主线逻辑如下:
final ChannelFuture initAndRegister() {
// 步骤1:创建通道
Channel channel = channelFactory.newChannel();
// 步骤2:初始化通道
init(channel);
// 步骤3:注册通道
ChannelFuture regFuture = config().group().register(channel);
// 步骤4:返回异步对象
return regFuture;
}
3.2.1 创建通道
通过serverBootstrap.channel(NioServerSocketChannel.class)
将通道类型传递给serverBootstrap对象,这里通过反射调用NioServerSocketChannel的无参构造函数创建通道对象。
3.2.2 初始化通道
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
逻辑分为三个部分:
[1] 将预置在ServerBootStrap的options和attrs属性设置到channel上;
[2] 创建匿名ChannelInitializer对象,其initChannel方法将添加预置在ServerBootStrap的handler对象以及ServerBootstrapAcceptor对象到channel的pipeline上;
[3] 将ChannelInitializer对象添加到channel的pipeline中;
注意:上述提到的channel,都指代NioServerSocketChannel对象。
ServerBootstrapAcceptor是一个特殊的channelHandler,为客户端连接创建通道(NioSocketChannel)并向选择器注册,因此需要保存NioSocketChannel的线程池对象、子处理器Handler、子通道连接配置等信息;ServerBootstrapAcceptor作用将在Netty处理消息时详细介绍。
3.2.3 注册通道
使用选择器从NioEventLoopGroup中选择一个NioEventLoop,并调用NioEventLoop对象的register方法:
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
进入NioEventLoop的register方法:
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
获取channel的unsafe对象,并调用其register方法:
public ChannelFuture register(final ChannelPromise promise) {
promise.channel().unsafe().register(this, promise);
return promise;
}
unsafe对象是Netty对底层IO的封装,进入unsafe的register方法(删除异常分支逻辑):
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
这里的核心逻辑是register0,unsafe将register0委托给NioEventLoop线程异步执行。
注意:这里的eventLoop.execute(Runnable)会将Runnable保存在任务队列中,并将第一次启动NioEventLoop线程,启动后会从任务队列中取出任务再执行。可以参考: Netty系列-1 NioEventLoopGroup和NioEventLoop介绍
register0(剔除异常分支逻辑)的主线逻辑如下:
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
// 步骤1:注册channel
doRegister();
// 步骤2:设置状态变量registered为已注册
neverRegistered = false;
registered = true;
// 步骤3: 注册的后处理操作
pipeline.invokeHandlerAddedIfNeeded();
// 步骤4:设置Promise执行结果为成功状态
safeSetSuccess(promise);
// 步骤5: 向pipeline管道触发channelRegister事件
pipeline.fireChannelRegistered();
//...
}
步骤1: 注册channel
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
//...异常逻辑
}
}
}
javaChannel()
是NIO的ServerSocketChannel通道对象, eventLoop().unwrappedSelector()
是NIO的选择器对象,抽象出来为: SelectionKey selectionKey = serverSocketChannel.register(selector,0 , this);
注意: 这里的attachment为this, 即NioServerSocketChannel对象。
步骤2:设置状态变量registered为已注册
registered表示通道是否已向select注册成功,使用该状态变量可以防止反复注册。
步骤3: 注册的后处理操作
invokeHandlerAddedIfNeeded将会调用callHandlerAddedForAllHandlers方法
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
callHandlerAddedForAllHandlers在介绍Pipeline时介绍过:向pipeline添加处理器时,如果通道未注册,则添加处理器操作会被封装成延时任务;callHandlerAddedForAllHandlers执行时会调用这些任务:实现向pipeline添加handler,如果Handler是ChannlInitializer类型,则会调用ChannlInitializer的init方法。
步骤4:设置Promise执行结果为成功状态
safeSetSuccess(promise)方法通过promise.trySuccess()设置promise对象为执行成功:
public boolean trySuccess() {
return trySuccess(null);
}
public boolean trySuccess(V result) {
return setSuccess0(result);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
其中checkNotifyWaiters
通过notifyAll()
唤醒所有阻塞在该Promise上的线程,notifyListeners
依次调用监听器的operationComplete
方法。
此时章节3.1doBind中向regFuture添加的ChannelFutureListener监听器对象的operationComplete方法将被调用。
步骤5: 向pipeline管道触发channelRegister事件
此时,channelRegister事件将沿着pipeline的ctx链进行传递。
3.3 doBind0
继续跟进doBind0代码:
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
// 将绑定任务提交给channel绑定的NioEventLoop执行
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
注意这里的入参: channel是NioServerSocketChannel对象,待绑定的通道;localAddress是本地的地址;
regFuture是一个异步执行结果, 表示注册操作是否完成;promise也是一个异步执行结果,用于根据regFuture的状态设置promise。这里的promise是返回给Netty启动线程的:
// 这里sync阻塞等待的就是这个promise对象
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
当对promise设置结果时,sync会从阻塞中被唤醒。
继续跟进channel.bind方法:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 将bind操作委托给pipeline
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
bind属于出站操作,pipeline将从tail开始逆向搜索ctx,直到第一个可以处理bind方法的ctx或者进入HeadContext:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
HeadContext将任务委托给unsafe对象(省去异常分支逻辑和校验逻辑):
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
//...
doBind(localAddress);
//...
// 设置promise的成功状态(同时唤醒阻塞的线程)
safeSetSuccess(promise);
}
核心逻辑为执行doBind(localAddress)
以及设置promise对象的状态为执行成功。
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
javaChannel()得到的Nio的ServerSocketChannel,因为Netty以NIO为基础进行的封装。