第13章 服务端创建
13.1 原生NIO类库复杂性
开发高质量的NIO的程序并不简单,成本太高
13.2 服务端创建源码
通过ServerBootStrap启动辅助类启动Netty
13.2.1 创建时序图
以下是关键步骤:
-
创建ServerBootStrap实例。这是启动辅助类,提供一系列方法用于设置启动相关的参数,因为参数太多,所以构造函数没有参数
-
设置并绑定Reactor线程池,Netty的Reactor线程池是EventLoopGroup,就是EventGroup的数组。EventLoop就是用来处理所有注册到Selector上的Channel,由EventLoop的run方法进行轮询操作。除了处理I/O事件,也处理用户自定义的Task,和定时任务。
-
设置并绑定服务端Channel,Netty对NIO中的ServerSocketChannel进行了封装,对应NioServerSocketChannel。在ServerBootStrap中可以指定ServerSocketChannel的类型。
-
链路建立的时候创建并初始化ChannelPipeline,本质是一个负责处理网络事件的职责链,负责管理和执行ChannelHandler。网络事件以事件流的形式在ChannelPipeline中流转,由ChannelPipeline根据ChannelHandler的执行策略调度ChannelHandler,有一些典型的网络事件:
- 链路注册
- 链路激活
- 链路断开
- 接收到请求消息
- 请求消息接收并处理完毕
- 发送应答消息
- 链路发生异常
- 发生用户自定义事件
-
初始化ChannelPipeline后,添加并设置ChannelHandler。通过ChannelHandler可以完成用户自己的定制与扩展,同时Netty也提供了大量的系统ChannelHandler,一些实用的:
- 系统编解码框架-ByteToMessageCodec
- 通用基于长度的半包解码器-LengthFieldBasedFrameDecoder
- 码流日志打印Handler-LoggingHandler
- SSL安全认证Handler—SslHandler
- 链路空闲检测Handler-IdleStateHandler
- 流量整形Handler-ChannelTrafficShapingHandler
- Base64编解码-Base64Decoder和Base64Encoder
创建和添加ChannelHandler源码:
.childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( //new LoggingHandler(LogLevel.INFO), new EchoServerHandler()); } });
-
绑定并启动端口。将ServerSocketChannel注册到Selector上监听客户端连接
-
Selector轮询。由Reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合
-
当轮询到准备就绪的Channel之后,就由Reactor线程NioEventLoop执行ChannelPipeline的相应方法,最终执行ChannelHandler
-
执行ChannelHandler,ChannelPipeline根据具体网络事件的类型,调度并执行ChannelHandler
13.2.2 服务端创建源码
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(port).sync(); // (5)
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new EchoServer(port).run();
}
}
- NioEventLoopGroup是用来处理I/O操作的线程池,Netty对 EventLoopGroup 接口针对不同的传输协议提供了不同的实现。在本例子中,需要实例化两个NioEventLoopGroup,通常第一个称为“boss”,用来accept客户端连接,另一个称为“worker”,处理客户端数据的读写操作。
- ServerBootStrap是启动服务的辅助类,有关socket的参数可以通过ServerBootStrap进行设置
- 这里指定为NioServerSocketChannel类初始化channel
- 通常会为新SocketChannel通过添加一些handler,来设置ChannelPipeline。ChannelInitializer 是一个特殊的handler,其中initChannel方法可以为SocketChannel 的pipeline添加指定handler。
- 通过绑定端口8080,就可以对外提供服务。
EchoServerHandler实现:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoServerHandler.class.getName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
具体分析:
一切从ServerBootstrap开始,逐层深入。ServerBootStrap需要两个NioEventLoopGroup实例,按照职责划分成boss和work:boss负责请求的accept,work负责请求的read、write
NioEventLoopGroup
NioEventLoopGroup主要管理eventLoop的生命周期
eventLoop可以被视为一个处理线程,数量默认是处理器个数的两倍
NioEventLoopGroup的构造方法:
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
可以看出参数最终都传入了父类,MultithreadEventLoopGroup是NioEventLoopGroup的父类,它的构造方法:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
其中 DEFAULT_EVENT_LOOP_THREADS 为处理器数量的两倍。
MultithreadEventExecutorGroup是核心,也就是MultithreadEventLoopGroup的父类,管理eventLoop 的生命周期,变量:
- children:EventExecutor数组,保存eventLoop。
- chooser:从children中选取一个eventLoop的策略。
构造方法:
//略
- 根据数组的大小,采用不同策略初始化chooser,如果大小为2的幂次方,则采用PowerOfTwoEventExecutorChooser,否则使用GenericEventExecutorChooser。
- newChild方法重载,初始化EventExecutor时,实际执行的是NioEventLoopGroup中的newChild方法,所以children元素的实际类型为NioEventLoop。
NioEventLoop
每个eventLoop会维护一个selector和taskQueue,负责处理客户端请求和内部任务,如ServerSocketChannel注册和ServerSocket绑定等。
NioEventLoop构造方法:
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
SingleThreadEventLoop是NioEventLoop的父类,构造方法:
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
传给父类SingleThreadEventExecutor,一个只有一个线程的线程池,其中的几个变量:
- state:线程池当前的状态
- taskQueue:存放任务的队列
- thread:线程池维护的唯一线程
- scheduledTaskQueue:定义在其父类AbstractScheduledEventExecutor中,用以保存延迟执行的任务。
SingleThreadEventExecutor构造方法略,内容如下:
- 初始化一个线程,并在线程内执行NioEventLoop类的run方法,但不会立刻执行
- 使用LinkedBlockingQueue类初始化taskQueue
到这里,处理线程已经初始化完成。
ServerBootStrap
通过ServerBootStrap.bind(port)启动,过程:
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
doBind实现:代码略
- 方法initAndRegister返回一个ChannelFuture实例regFuture,通过regFuture可以判断initAndRegister执行结果
- 如果regFuture.isDone()为true,说明initAndRegister已经执行完,则直接执行doBind0进行socket绑定。
- 否则regFuture添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。
当initAndRegister操作结束后进行bind操作
initAndRegister实现如下:
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel); //这里调用了init(channel)
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
- 负责创建服务端的NioServerSocketChannel实例
- 为NioServerSocketChannel的pipieline添加Handler
- 注册NioServerSocketChannel到selector上
大部分与NIO类似
NioServerSocketChannel
对Nio的ServerSocketChannel和SelectionKey进行了封装
构造方法:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
- 方法newSocket利用provider.openServerSocketChannel()生成Nio中的ServerSocketChannel对象
- 设置SelectionKey.OP_ACCEPT事件
父类AbstractNioMessageChannel的构造方法:
略
它的父类AbstractNioChannel的构造方法:
略
- 但设置了当前ServerSocketChannel为非阻塞通道
继续,顶层父类AbstractChannel构造方法:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
- 初始化unsafe,这里的Unsafe并非是jdk中底层Unsafe类,用来负责底层的connect、register、read和write等操作。
- 初始化pipeline,每个Channel都有自己的pipeline,当有请求事件发生时,pipeline负责调用相应的hander进行处理。
ServerBootStrap的init(Channel channel)方法
添加Handler到Channel的Pipeline中
//略
过程:
- 设置channel的options和attrs。
- 在pipeline中添加一个ChannelInitializer对象。
init执行完,需要把当前channel注册到EventLoopGroup,最终目的就是实现NIO中把ServerSocket注册到Selector上,实现client请求的监听。
Netty’实现:
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
public EventLoop next() {
return (EventLoop) super.next();
}
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
因为EventLoopGroup中维护了多个eventLoop,next方法会调用chooser策略找到下一个eventLoop,并执行eventLoop的register方法进行注册,register方法如下。
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
...
channel.unsafe().register(this, promise);
return promise;
}
这里会使用到channel.unsafe():
NioServerSocketChannel初始化时,会创建一个NioMessageUnsafe实例,用于实现底层的register、read、write等操作。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
private void register0(ChannelPromise promise) {
try {
if (!ensureOpen(promise)) {
return;
}
Runnable postRegisterTask = doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (postRegisterTask != null) {
postRegisterTask.run();
}
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
if (!promise.tryFailure(t)) {
}
closeFuture.setClosed();
}
}
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
- register0方法提交到eventLoop线程池中执行,这个时候会启动eventLoop中的线程。
- 方法doRegister()才是最终Nio中的注册方法,方法javaChannel()获取ServerSocketChannel。
protected Runnable doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return null;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
ServerSocketChannel注册完之后,通知pipeline执行fireChannelRegistered方法,pipeline中维护了handler链表,通过遍历链表,执行InBound类型handler的channelRegistered方法,最终执行init中添加的ChannelInitializer handler。
channelRegistered方法
- initChannel方法最终把ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline,负责accept客户端请求。
- 在pipeline中删除对应的handler。
- 触发fireChannelRegistered方法,可以自定义handler的channelRegistered方法。
到这里,ServerSocketChannel完成了初始化并注册到了Selector上,启动线程执行selector.select方法接收客户端请求。
Netty实现将socket绑到指定端口,Netty把注册操作放到了eventLoop中,最终由unsafe实现端口的bind操作。bind完成后,且ServerSocketChannel也已经注册完成,则触发pipeline的fireChannelActive方法,所以在这里可以自定义fireChannelActive方法,默认执行tail的fireChannelActive(当channel变为活动状态时,Netty会调用这个方法通过ChannelPipeline中的所有ChannelHandler来处理这个channel)。具体为这个方法会调用channel.read()方法,read方法会触发pipeline的行为,最终会在pipeline中找到handler执行read方法,默认是head。
Server启动完毕!