NioEventLoop 的异步任务队列成员:
NioEventLoop 中对newTaskQueue 接口的实现,返回的是JCTools工具包Mpsc队列(多生产者单一消费者无锁队列,(无界和有界都有实现)
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// newMpscQueue 无界对列,newMpscQueue(maxPendingTasks) 有界队列
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
任务的提交
Task任务的提交有3种典型使用场景,具体如下:
- 用户提交的普通任务:
- 用户提交的定时任务
- 非Reactor线程调用Channel的各种方法,例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景,最终的Write 会提交到任务队列中后被异步消费.
用户提交的普通任务
通过ChannelHandlerContext 获取channel,通过channel 获取eventLoop,然后调用execute方法即可放入到任务队列中,代码如下:
Channel channel =ctx.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
try{
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
在AbstractChannel.AbstractUnsafe.register中,有一个eventLoop.execute()方法调用启动Eventlopo线程的入口,
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//此处处理的是用户提交的普通任务
//如果线程没有启动,启动线程
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 通道注册到选择器,向流水线发送通道激活事件
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
如果线程没有启动,register0会作为一个Runable实例封装起来通过eventLoop.execute()方法提交到任务队列中.
用户提交的定时任务
Netty 提供了一些添加定时任务的接口,NioEventLoop的父类AbstractScheduledEventExecutor的schedule方法,通过ChannelHandlerContext获取channel,通过channel 获取eventLoop,然后调用schedule方法即放入到任务队列,代码如下:
public void heartBeat(ChannelHandlerContext ctx, ProtoMsg heartbeatMsg){
ctx.executor().schedule(()->{
if (ctx.channel().isActive()) {
ctx.writeAndFlush(heartbeatMsg);
heartBeat(ctx, heartbeatMsg);
}
},HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}
}
schedule 第一个参数和普通任务一样,传入一个线程即可,第二个参数是延时时间,第三个参数是延时单位,此处使用的是秒.
非Reactor线程调用Channel的各种方法
非反应器线程的消息发送操作,当用户线程(业务线程)发起write操作时,Netty 会进行判断,如果发现不是NioEventLoop 线程(反应器线程),则将发送消息封装封成WriteTask,放入NioEventLoop 的任务队列,由NioEventLoop 线程后续去执行.
用户线程发起write操作时的入口为io.netty.channel.AbstractChannelHandlerContext#write(final Object msg, final ChannelPromise promise),其源代码如下:
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
private static boolean safeExecute(EventExecutor executor, Runnable runnable,
ChannelPromise promise, Object msg, boolean lazy) {
try {
if (lazy && executor instanceof AbstractEventExecutor) {
((AbstractEventExecutor) executor).lazyExecute(runnable);
} else {
//executor执行的是netty自己实现的SingleThreadEventExecutor#execute方法
executor.execute(runnable);
}
return true;
} catch (Throwable cause) {
try {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
} finally {
promise.setFailure(cause);
}
return false;
}
}
任务的处理
任务处理的时序图如下:
这里safeExecute执行的task就是前面write写入时包装的AbstractWriteTask,
io.netty.channel.AbstractChannelHandlerContext.WriteTask#run
WriteTask的run经过一些系统处理操作,最终会调用io.netty.channel.ChannelOutboundBuffer#addMessage方法,将发送消息加入发送队列(链表)
io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
// select 策略选择
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
// 1.1 非阻塞的select策略,即重试IO循环
case SelectStrategy.CONTINUE:
continue;
//1.2非阻塞的新事件IO循环
case SelectStrategy.BUSY_WAIT:
//1.3 阻塞的select策略
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
...
}
// 1.4 不需要select,目前已经有可以执行的任务了
default:
}
} catch (IOException e) {
...
}
selectCnt++;
//2.执行网络IO事件和任务调度
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
//2.1 处理网络IO事件,分发入口
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
//2.2 处理系统Task和自定义Task
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
//根据ioRatio 计算非IO最多执行的时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
...
}
} else {
//处理队列中的task任务
ranTasks = runAllTasks(0);
}
...
} catch (CancelledKeyException e) {
...
}
...
}
}
io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks()
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 聚合到期的定时任务
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) { //执行任务
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
private boolean fetchFromScheduledTaskQueue() {
//定时任务队列为空的时候返回
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = getCurrentTimeNanos();
for (;;) {
//从定时任务队列中抓取第一个定时任务,寻找截止时间为nanoTime任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
//只有该定时任务队列不为空,才会塞到任务队列里面
if (scheduledTask == null) {
return true;
}
//如果添加到普通任务队列过程中失败
if (!taskQueue.offer(scheduledTask)) {
//则重新添加到定时任务队列中
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
//重点代码:安全执行消息队列中的任务
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
其中定时任务队列scheduledTaskQueue 定义在AbstractScheduledEventExecutor中,
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
io.netty.util.concurrent.AbstractEventExecutor#safeExecute
protected static void safeExecute(Runnable task) {
try {
//直接调用run方法执行
runTask(task);
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
protected static void runTask(@Execute Runnable task) {
task.run();
}