一、DefaultEventExecutorGroup的用途
DefaultEventExecutorGroup
是 Netty 框架中的一个类,用于管理和调度事件处理器(EventExecutor
)的组。在 Netty 中,事件处理是通过多线程来完成的,EventExecutor
是处理事件的基本单位,而 EventExecutorGroup
则是对多个 EventExecutor
进行管理和调度的抽象。
具体来说,DefaultEventExecutorGroup
具有以下主要用途:
-
多线程事件处理:
DefaultEventExecutorGroup
会创建多个EventExecutor
实例,每个实例都在独立的线程中执行事件处理。这有助于提高并发性能,允许多个事件可以并行地在不同线程中处理。 -
任务调度: 除了处理事件,
EventExecutor
还可以用于执行定时任务和异步任务。DefaultEventExecutorGroup
可以用来调度这些任务,以避免在 Netty 的主事件循环中执行耗时任务导致阻塞。 -
避免阻塞主事件循环: 如果一个事件处理器的执行时间较长,可能会影响 Netty 的主事件循环的性能。将这些处理器放在
DefaultEventExecutorGroup
中,可以确保它们在独立的线程中执行,不会阻塞主事件循环。 -
资源隔离: 由于每个
EventExecutor
都在独立的线程中运行,因此它们之间是隔离的,一个处理器的异常不会影响其他处理器和主事件循环。
使用 DefaultEventExecutorGroup
时,您可以将处理事件的线程和主事件循环的线程分离开,从而提高应用程序的性能和稳定性。但是需要注意的是,创建多个线程可能会带来一些额外的开销和管理成本,需要根据具体情况进行权衡和优化。
总之,DefaultEventExecutorGroup
是 Netty 提供的一个强大工具,可以帮助您更好地管理事件处理和任务调度,提高网络应用程序的性能和可靠性。
二、使用示例
private static DefaultEventExecutorGroup defaultEventExecutorGroup =
new DefaultEventExecutorGroup((Runtime.getRuntime().availableProcessors() * 2));
// NettyServerHandler-业务处理类
pipeline.addLast(defaultEventExecutorGroup, "handler", new NettyServerHandler());
- 接口ChannelPipeline
我们这里使用的是第二个方法
ChannelPipeline addLast(String var1, ChannelHandler var2);
ChannelPipeline addLast(EventExecutorGroup var1, String var2, ChannelHandler var3);
ChannelPipeline addLast(ChannelHandler... var1);
ChannelPipeline addLast(EventExecutorGroup var1, ChannelHandler... var2);
- addLast实现,见类io.netty.channel.DefaultChannelPipeline
可以看到,它是异步执行的,将work线程和I/O线程隔离开来。
从代码newCtx = this.newContext(group, this.filterName(name, handler), handler);继续往后看线程池EventExecutor。
三、DefaultEventExecutor.java
- 类的继承关系,它继承于SingleThreadEventExecutor,详情见下
- DefaultEventExecutorGroup和DefaultEventExecutor的关系
- DefaultEventExecutor的核心方法run()
protected void run() {
do {
Runnable task = this.takeTask();
if (task != null) {
task.run();
this.updateLastExecutionTime();
}
} while(!this.confirmShutdown());
}
- 关键类io.netty.util.concurrent.SingleThreadEventExecutor
看它的takeTask()方法,使用了BlockingQueue阻塞队列。
protected Runnable takeTask() {
assert this.inEventLoop();
if (!(this.taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
} else {
BlockingQueue taskQueue = (BlockingQueue)this.taskQueue;
Runnable task;
do {
ScheduledFutureTask<?> scheduledTask = this.peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
task = (Runnable)taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException var7) {
}
return task;
}
long delayNanos = scheduledTask.delayNanos();
task = null;
if (delayNanos > 0L) {
try {
task = (Runnable)taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException var8) {
return null;
}
}
if (task == null) {
this.fetchFromScheduledTaskQueue();
task = (Runnable)taskQueue.poll();
}
} while(task == null);
return task;
}
}
四、总结
在将io线程和工作线程隔离的时候,建议你直接使用netty自带的并发类,而无需自己去定义业务线程池。因为它完全可胜任异步的需求。