Netty源码解读
Netty线程模型
1、定义了两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
2、BossGroup和WorkerGroup类型都是NioEventLoopGroup,Group中维护了多个事件循环线程NioEventLoop,每个NioEventLoop维护了一个Selector和TaskQueue
3、每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
3.1、处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
3.2、将NioSocketChannel注册到某个worker NIOEventLoop上的selector
3.3、runAllTasks处理任务队列TaskQueue的任务
4、 每个worker NioEventLoop线程循环执行的步骤
4.1、轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
4.2、处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
4.3、runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在pipeline中的流动处理
4.4、处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler处理器用来处理 channel 中的数据
Netty服务启动示例
// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置handler处理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
// 启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(9099).sync();
Netty源码分析
从bootstrap.bind作为入口分析启动流程,进入后可以看到会调用AbstractBootstrap#doBind,最终会调用initAndRegister()方法,主要逻辑都在前三步中实现,本次也主要分析这三个步骤
# AbstractBootstrap类
// 1、创建一个服务端Channel,即NioServerSocketChannel
channel = channelFactory.newChannel();
// 2、初始化NioServerSocketChannel,在pipeline中添加一些处理器hander
init(channel);
// 3、进行注册
ChannelFuture regFuture = config().group().register(channel);
// 把NioServerSocketChannel绑定到指定端口
channel.bind(localAddress, promise);
channelFactory.newChannel();
bootstrap.channel(NioServerSocketChannel.class) 会将serverChannel绑定到ReflectiveChannelFactory上
# AbstractBootstrap类
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
channelFactory.newChannel()会调用ReflectiveChannelFactory的newChannel方法,进而调用constructor.newInstance(),而该constructor正好是NioServerSocketChannel类;所以new的对象就是NioServerSocketChannel
服务端NioServerSocketChannel进行初始化
1、设置感兴趣事件为连接事件OP_ACCEPT
2、设置channel为非阻塞
3、初始化服务端pipeline
# NioServerSocketChannel类
public NioServerSocketChannel(ServerSocketChannel channel) {
// 将感兴趣的事件设置为连接事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// 父类初始化方法 ch 即为NioServerSocketChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
// 设置为非阻塞
ch.configureBlocking(false);
}
// 父类的父类中初始化pipeline,此时只有HeadContext和TailContext
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
init(channel)
调用ServerBootstrap.init方法,向服务端NioServerSocketChannel的pipeline中添加hander处理器ChannelInitializer;此时服务端pipeline链表中的hander如下
# ServerBootstrap 类
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
//向 pipeline中添加hander处理器ChannelInitializer
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
config().group().register(channel)
bootstrap.group(bossGroup, workerGroup)构造时,将group设置为bossGroup,childGroup设置为workerGroup; config().group().register(channel)会调用bossGroup的register方法,从bossGroup的MultithreadEventLoopGroup线程组中取一个线程SingleThreadEventLoop进行调用register方法
register注册逻辑
服务端的NioServerSocketChannel和客户端的NioSocketChannel都会调用此方法进行注册
1、服务启动时,NioServerSocketChannel注册到selector上,对客户端OP_ACCEPT操作感兴趣
2、当有客户端连接时,通过NioServerSocketChannel的accept()得到每个客户端的NioSocketChannel,将其注册到selector上,对客户端OP_READ操作感兴趣
# SingleThreadEventLoop extends SingleThreadEventExecutor 类
public ChannelFuture register(final ChannelPromise promise) {
promise.channel().unsafe().register(this, promise);
return promise;
}
调用AbstractChannel的register方法,创建一个注册的task交给EventLoop线程处理
# AbstractChannel 类
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
.......
AbstractChannel.this.eventLoop = eventLoop;
.......
// 1、处理连接事件时,用的是bossGroup里的NioEventLoop
// 2、处理读写事件时,用的是workGroup里的NioEventLoop
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
private void register0(ChannelPromise promise) {
doRegister();
// 1、NioServerSocketChannel 处理逻辑
// 调用NioServerSocketChannel服务端pipeline中hander的handlerAdded方法
// 此时会调用到ChannelInitializer的handlerAdded,然后调用其initChannel,该方法中
// 会向服务端pipeline中加入ServerBootstrapAcceptor
// 调用服务端pipeline中hander的channelRegistered方法
// 调用服务端pipeline中hander的ChannelActive方法
// 2、NioSocketChannel 处理逻辑 调用我们自定义hander中的方法
// 调用客户端pipeline中hander的handlerAdded方法
// 调用客户端pipeline中hander的channelRegistered方法
// 调用客户端pipeline中hander的ChannelActive方法,我们自定义hander的ChannelActive在此调用
pipeline.invokeHandlerAddedIfNeeded();
pipeline.fireChannelRegistered();
pipeline.fireChannelActive();
}
// doRegister()逻辑由子类AbstractNioChannel实现
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将channel注册到Selector上
// 1、NioServerSocketChannel注册到Selector上
// 2、NioSocketChannel注册到Selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
eventLoop.execute就是调用SingleThreadEventExecutor#execute
# SingleThreadEventExecutor 类
@Override
public void execute(Runnable task) {
// 将注册register0逻辑加入队列taskQueue
addTask(task);
// 开启线程循环监听事件,会调用SingleThreadEventExecutor.run方法
// 最终调用子类NioEventLoop的run()方法
startThread();
}
死循环执行 selector.select方法,直到监听到事件或者超时,才会执行processSelectedKeys逻辑
1、服务端启动后,NioServerSocketChannel若监听到客户端OP_ACCEPT操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环
2、客户端连接成功后,NioSocketChannel若监听到客户端OP_READ操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环
# NioEventLoop 类
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
....
case SelectStrategy.SELECT:
// 该方法监听到事件(OP_ACCEPT|OP_READ)时才会返回
select(wakenUp.getAndSet(false));
default:
}
} catch (IOException e) {
.....
}
// 监听到事件执行
try {
// 1、获取SelectionKey处理事件
processSelectedKeys();
} finally {
// 2、执行taskQueue中其他的注册方法register0
runAllTasks();
}
}
}
}
private void select(boolean oldWakenUp) throws IOException {
// 一直循环遍历
int selectCnt = 0;
for (;;) {
// 根据注册的定时任务,获取本次select的阻塞时间
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 没有监听到事件或没有超时,则一直阻塞(会让出cpu资源)
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// 正常场景
// 当有连接|读写操作或者selector被唤醒了,则直接返回
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 正常场景
// 说明没有监听到事件,而是超时了,则重置selectCnt
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 异常场景 select 空轮询bug修复
// 若空轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD配置
// 则关闭老的select,建立新的select
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
}
private void processSelectedKeysOptimized() {
// 遍历所有的selectedKeys进行处理
for (int i = 0; i < selectedKeys.size; ++i) {
processSelectedKey(k, (AbstractNioChannel) a);
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 连接|读写操作会调用该方法
// 1、连接操作调用NioMessageUnsafe的read方法
// 2、读写操作调用NioByteUnsafe的read方法
unsafe.read();
}
}
OP_ACCEPT连接操作处理
1、为每个客户端创建NioSocketChannel,并进行初始化
1.1、设置感兴趣事件为OP_READ
1.2、设置channel为非阻塞
1.3、初始化客户端pipeline
2、调用服务端NioServerSocketChannel的pipeline,将客户端的NioSocketChannel作为参数传过去,最终会调用到ServerBootstrapAcceptor,将NioSocketChannel注册到workGroup上
# NioMessageUnsafe 类
public void read() {
final ChannelPipeline pipeline = pipeline();
// 创建每个客户端的NioSocketChannel
doReadMessages(readBuf);
int size = readBuf.size();
// readBuf为NioSocketChannel,遍历客户端所有的NioSocketChannel
// 执行服务端NioServerSocketChannel的pipeline,循环执行fireChannelRead,
// 最终会调用服务端hander的ChannelRead方法,此处会调用到ServerBootstrapAcceptor的ChannelRead方法
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
// 调用服务端pipeline的读完成方法
pipeline.fireChannelReadComplete();
}
protected abstract int doReadMessages(List<Object> buf) throws Exception;
// 调用子类NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
// 获取客户端的连接得到SocketChannel,每个客户端在服务端都有一个对应的SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// NioSocketChannel处理方式同NioServerSocketChannel
// 1、设置感兴趣事件为连接事件OP_READ
// 2、设置channel为非阻塞
// 3、初始化客户端NioSocketChannel的pipeline
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
}
return 0;
}
将我们自定义的hander添加到NioSocketChannel的pipeline上,然后将NioSocketChannel注册到workGroup上,此时客户端pipeline链表中的hander如下
# ServerBootstrapAcceptor 类
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 传过来的NioSocketChannel
final Channel child = (Channel) msg;
// 将我们手动添加的Hander添加到pipeline
child.pipeline().addLast(childHandler);
try {
// 将NioSocketChannel注册workGroup的一个线程的selector上,
// 方式同NioServerSocketChannel,执行register注册逻辑
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
.....
}
});
} catch (Throwable t) {
}
}
OP_READ操作处理
进行数据读写,并执行pipeline中自定义的hander
# NioByteUnsafe类
// 接受到客户端OP_READ事件时调用
public void read() {
// 获取客户端NioSocketChannel的pipeline
final ChannelPipeline pipeline = pipeline();
do {
// 数据读写
// 调用pipeline.fireChannelRead时会依次调用pipeline中hander的ChannelRead方法
// 我们自定义的hander的ChannelRead方法就会在此处调用
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());
allocHandle.readComplete();
// 调用pipeline的读完成方法
pipeline.fireChannelReadComplete();
}