相关文章:
《IO 模型与多路复用》
《Java NIO》
《Netty:入门(1)》
写在开头:本文为学习后的总结,可能有不到位的地方,错误的地方,欢迎各位指正。
前言
在前文中,我们对Netty的内容做了简单的介绍,本文我们会结合Netty的流程图相对深入一些的介绍下其中的重要组件。
一、EventLoop
EventLoop
事件循环对象 EventLoop本质是一个单线程执行器(同时维护了一个 Selector),支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
- I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
- 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
它的继承关系如下:
- 继承自 j.u.c.ScheduledExecutorService,因此包含了线程池中所有的方法
- 继承自 netty 自己的 OrderedEventExecutor,提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop,同时还提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup
EventLoopGroup
事件循环组EventLoopGroup是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)。
继承自 netty 自己的 EventExecutorGroup,实现了 Iterable 接口提供遍历 EventLoop 的,并提供了next方法获取集合中下一个 EventLoop。
EventLoop初始化时可以指定线程数,可以不指定,如果不指定,这里会传0用来调用父类MultithreadEventLoopGroup中得构造方法。
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
...
}
MultithreadEventLoopGroup类提供1个默认线程数DEFAULT_EVENT_LOOP_THREADS,比较1与Netty系统参数"io.netty.eventLoopThreads"(如未设置则使用CPU核心数*2)中得最大值。当传入得初始化线程数为0时就使用这个默认线程数。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
// 默认线程数
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
// 静态代码块中,给默认线程数DEFAULT_EVENT_LOOP_THREADS 赋值
// 比较1与Netty系统参数"io.netty.eventLoopThreads"(如未设置则使用CPU核心数*2)中得最大值
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
// 如果传入得线程数为0,则使用默认线程数
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...
}
EventLoop处理普通与定时任务
public class TestEventLoop {
public static void main(String[] args) {
// 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
EventLoopGroup group = new NioEventLoopGroup(2);
// 通过next方法可以获得下一个 EventLoop
System.out.println(group.next());
System.out.println(group.next());
// 通过EventLoop执行普通任务
group.next().execute(()->{
System.out.println(Thread.currentThread().getName() + " hello");
});
// 通过EventLoop执行定时任务(表示立即执行(0),每次间隔1秒(1))
group.next().scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName() + " hello2");
}, 0, 1, TimeUnit.SECONDS);
// 优雅地关闭
group.shutdownGracefully();
}
}
输出结果如下
io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。
下面演示下处理IO任务:
服务器代码
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
// 添加EventLoop
.group(new NioEventLoopGroup())
// 选择服务端Channel实现
.channel(NioServerSocketChannel.class)
// 添加处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override // 连接建立后调用
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 将服务端收到的数据转bytebuf后再转String
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
客户端代码
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = new Bootstrap()
// 添加EventLoop
.group(new NioEventLoopGroup())
// 选择客户端Channel实现
.channel(NioSocketChannel.class)
// 添加处理器
.handler(new ChannelInitializer<SocketChannel>() {
@Override // 连接建立后调用
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
System.in.read();
}
}
EventLoopGroup分工
Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理Accept事件(boss线程)与Read/Write(worker线程)(参考下我们前一篇文章中介绍得流程图)
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
// 两个Group,分别为Boss(负责Accept事件),Worker(负责读写事件)
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
// ...
// 上文中处理IO任务得代码
}
}
使用上文中多个客户端分别发送 hello 结果
nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4
可以看出,一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件。
分工细化
当有的任务需要较长的时间处理时,可以再添加一个非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理。
public class MyServer {
public static void main(String[] args) {
// 增加自定义的非NioEventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
// 调用下一个handler
ctx.fireChannelRead(msg);
}
})
// 该handler绑定自定义的Group
.addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
启动四个客户端发送数据
nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4
可以看出,客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理。
EventLoopGroup切换源码
不同的EventLoopGroup切换的实现原理如下:由上面的图可以看出,当handler中绑定的Group不同时,需要切换Group来执行不同的任务。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获取下一个handler得EventLoop
EventExecutor executor = next.executor();
// 判断当前handler中得线程与executor(下一个handler得EventLoop)是否为同一个
if (executor.inEventLoop()) {
// 使用当前EventLoopGroup中的EventLoop来处理任务
next.invokeChannelRead(m);
} else {
// 将要执行得代码作为任务交给executor处理
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用