serverbootstrap用于建立netty服务端,核心逻辑--
- 设置线程池-- bossGroup和workGroup
- 设置accept连接handler
- 定义服务器的色弱v儿serversocketchannel实现
- 设置IO读写的业务逻辑相关childHanlder
- 绑定监听端口--
- 创建serversocketchannel对象
- 初始化serversocketchannel--添加Handler到pipeline
- 注册serversocketchannel到处理bossGroup上的nioEventLoop上,绑定端口
- 监听连接事件,将新的连接事件注册到workGroup的nioEventLoop上,用于处理IO事件;
这里涉及到了netty中的几个核心对象--NioEventLoopGroup,NioEventLoop,handler(handlerContext,handlerPipeline),本篇文章不做详细解释,这里只关于ServerBootstrap类的bind方法做分析,梳理一下netty服务端建立过程;
serverBootstrap继承了abstractBootstrap--
其bind(int port)方法调用了AbstractBootstrap的doBind()方法--
dobind()方法源码--
private ChannelFuture doBind(final SocketAddress localAddress) {
//创建serversocketchannel对象,并注册到bossGroup的EventLoop上
final ChannelFuture regFuture = initAndRegister();
//获取上一步创建的serversocketchannel对象
final Channel channel = regFuture.channel();
//判断serversocketchannel的创建注册结果
//如果创建或者注册失败,获取异常信息--
if (regFuture.cause() != null) {
return regFuture;
}
//创建完成----因为上一步已经判断了失败结果,所以这里一般情况下是成功的
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
//调用了jdk的bind方法
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {//这里其实做了冗余判断
// 未来的注册几乎总是已经完成了,但以防万一。
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// EventLoop上的注册失败,因此一旦我们尝试访问通道的EventLoop,直接使ChannelPromise失败,不会导致IllegalStateException。
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
上一步主要方法是两个--
initAndRegister()
doBind0(regFuture, channel, localAddress, promise);
先来看一下initAndRegister()方法--
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//使用工厂方法创建serversocketchannel对象--这里的工厂对象之后会详细解释,主要是启动引导类里的.channel()方法配置的;
channel = channelFactory.newChannel();
//channel初始化--,主要为serversocketchannel添加相关的Handler处理器,初始化serversocketchannel的pipeline;pipeline其他文章中详解
init(channel);
} catch (Throwable t) {
//为了理解业务逻辑,删掉了一场处理的情况相关代码
}
//将serversocketchannel注册到bossGroup的EventLoop上
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
说明--
- 使用工厂方法创建了jdk的serversocketchannel对象,
- 对创建的serversocketchannel对象完成了netty的初始化--为其创建pipeline并添加相关的Handler
- 将新建的serversocketchannel注册到bossGroup的EventLoop上--可以理解为为其分配了执行线程
再来看一下doBInd0()方法--
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//在触发channelRegistered()之前调用此方法。让用户处理程序有机会在其channelRegistered()实现中设置管道。
//这个方法直接调用了EventLoop的execute方法,里边调用了jdk的bind方法--后续在EventLoop详解中解释
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
介绍--dobind0()方法这里其实已经完成了serversocketchannel的创建初始化,dobind0()方法基本上可以启动线程开始监听端口了;
至此,netty服务启动完成,开始监听端口并处理相关逻辑,但是有一个重要的点就是serversocketchannel监听到新的连接会dispatch到workerGroup的EventLoop上,整个逻辑是体现在serversocketchannel的pipeline上的,并且由netty帮我们实现,具体的实现在哪里呢,
这里重点看一点initAndRegister()方法中的init()方法,这个方法就是为serversocketchannel添加相关的处理器,而serversocketchannel的主要处理逻辑就是监听端口,将新的连接请求分发到workerGroup的EventLoop上,先来看一下代码--
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
//创建pipeline对象
ChannelPipeline p = channel.pipeline();
//绑定childWorkerGroup
final EventLoopGroup currentChildGroup = childGroup;
//设置childHandler
final ChannelHandler currentChildHandler = childHandler;
//添加创建ServerBootstrap时指定的Handler---这里只指定一个Handler,如果指定多个需要在handler实现ChannelInitializer,(但是ServerBootstrapacceptor)
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
//添加指定的handler
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//添加ServerBootstrapAcceptor---这个ServerBootstrapacceptor 就是执行建立新连接并分发给workerGroup的EventLoop的逻辑
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
可以看到init()方法中添加了两个handler--一个是创建serverbootstrap时候配置的handler,另一个就是serverbootstrapacceptor--而这个acceptor就是完成新连接的channel分发的逻辑的--具体实现在channelRead方法中---(channelRead的调用机制在NioEventLoop和pipeline中详解)
channelRead()源码--
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//接受accept事件对象--socketchannel
final Channel child = (Channel) msg;
//为channel添加handler
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//调用childGroup的register方法---register方法中实际调用了选择器Chooser的实现,完成新连接的分发和注册
childGroup.register(child);
} catch (Throwable t) {
forceClose(child, t);
}
}