文章目录
系列文章目录
Netty核心源码分析(一),Netty的Server端启动过程源码分析
Netty核心源码分析(二),Netty的Server端接收请求过程源码分析
Netty核心源码分析(三)业务请求执行关键——ChannelPipeline、ChannelHandler、ChannelHandlerContext源码分析
Netty核心源码分析(四)心跳检测源码分析
Netty核心源码分析(五)核心组件EventLoop源码分析
一、心跳检测案例
Netty入门案例——Netty实现心跳检测
Netty作为一个网络框架,提供了诸多功能,本文就主要分析其心跳机制heartbeat的源码。
二、源码分析
1、Netty心跳的三个Handler
handler | 作用 |
---|---|
IdleStateHandler | 当连接的空闲时间(读或者写)太长时,将会触发一个IdleStateEvent事件,然后可以通过ChannelInboundHandler中重写userEventTriggered方法来处理该事件 |
ReadTimeoutHandler | 如果在指定时间没有发生读事件,就会抛出ReadTimeoutException这个异常,并自动关闭连接。可以在exceptionCaught方法中处理这个异常 |
WriteTimeoutHandler | 当一个写操作不能在一定的时间内完成时,会抛出WriteTimeoutException异常,并 关闭连接。可以在exceptionCaught方法中处理这个异常 |
其中,ReadTimeoutHandler和WriteTimeoutHandler会抛出异常并关闭连接,属于异常处理,重点还是分析IdleStateHandler。
2、IdleStateHandler源码
(1)四个关键属性
// 是否考虑出站时较慢的情况,默认是false
private final boolean observeOutput;
// 读事件空闲时间,0 则禁用事件,纳秒为单位
private final long readerIdleTimeNanos;
// 写事件空闲时间,0则禁用事件,纳秒为单位
private final long writerIdleTimeNanos;
// 读或写空闲时间,0则禁用事件,纳秒为单位
private final long allIdleTimeNanos;
(2)handlerAdded方法
当该handler添加到pipeline时会触发:
// io.netty.handler.timeout.IdleStateHandler#handlerAdded
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
initialize(ctx);
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
}
// io.netty.handler.timeout.IdleStateHandler#initialize
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
// 初始化“监控出站数据属性”
initOutputChanged(ctx);
// System.nanoTime() 返回当前纳秒
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
// 这里的schedule会调用eventLoop的schedule方法,将定时任务添加到队列里
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
我们发现,只要给定的参数大于0,就创建一个定时任务,每个事件单独判断,同时将state状态设置为1,防止重复初始化。
// io.netty.handler.timeout.IdleStateHandler#schedule
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
}
(3)四个内部类
IdleStateHandler中定义了四个内部类,其中AbstractIdleTask是其他三个的父类,AbstractIdleTask实现了Runnable接口并且重写了run方法,其他三个在AbstractIdleTask的基础上重写了run方法。
// io.netty.handler.timeout.IdleStateHandler.AbstractIdleTask
private abstract static class AbstractIdleTask implements Runnable {
private final ChannelHandlerContext ctx;
AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
if (!ctx.channel().isOpen()) {// 通道关闭,不执行
return;
}
run(ctx);
}
// 子类重写
protected abstract void run(ChannelHandlerContext ctx);
}
3、读事件的run方法——ReaderIdleTimeoutTask
// io.netty.handler.timeout.IdleStateHandler.ReaderIdleTimeoutTask
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
// 获取设置的读超时时间
long nextDelay = readerIdleTimeNanos;
if (!reading) {
// 当前时间减去给定时间和最后一次读(执行channelReadComplete方法设置)
nextDelay -= ticksInNanos() - lastReadTime;
}
// 如果小于0,就触发事件
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
// 用于取消任务promise
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
// 表示下一次读就不是第一次了
boolean first = firstReaderIdleEvent;
// first设置为false,在channelRead方法中会被改为true
firstReaderIdleEvent = false;
try {
// 创建一个IdleStateEvent 类型的读事件,传递给Handler的userEventTriggered方法
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// Handler传递,调用下一个Handler的userEventTriggered方法
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// 如果大于0,继续放入队列,间隔时间是新的计算时间
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就触发UserEventTriggered方法。
4、写事件的run方法——WriterIdleTimeoutTask
// io.netty.handler.timeout.IdleStateHandler.WriterIdleTimeoutTask
private final class WriterIdleTimeoutTask extends AbstractIdleTask {
WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
写任务的run代码逻辑基本和读任务的逻辑一样,唯一不同的是有一个针对出站较慢数据的判断hasOutputChanged。
5、所有事件的run方法——AllIdleTimeoutTask
表示读写事件都需要监控。
// io.netty.handler.timeout.IdleStateHandler.AllIdleTimeoutTask
private final class AllIdleTimeoutTask extends AbstractIdleTask {
AllIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = allIdleTimeNanos;
if (!reading) {
// 当前时间减去最后一次写或者读的世界,若大于0,说明超时了
// 这里的时间计算是取读写事件的最大值来的
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstAllIdleEvent;
firstAllIdleEvent = false;
try {
// 同写事件相同,出站较慢的判断
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
6、总结
- IdleStateHandler 可以实现心跳功能,当服务器和客户端没有任何读写交互时,并超过了给定的时间,则会触发用户 handler 的 userEventIriggered 方法。用户可以在这个方法中尝试向对方发送信息,如果发送失败,则关闭连接。
- IdleStateHandler 的实现基于 EventLoop 的定时任务,每次读写都会记录一个值,在定时任务运行的时候通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。
- 内部有 3 个定时任务,分别对应读事件,写事件,读写事件。通常用户监听读写事件就足够了。
- 同时,IdleStateHandler 内部也考虑了一些极端情况: 客户端接收缓慢,一次接收数据的速度超过了设置的空闲时间。Netty 通过构造方法中的 observeOutput 属性来决定是否对出站缓冲区的情况进行判断。
- 如果出站缓慢,Netty 不认为这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成 OOM,OOM 比空闲的问题更大。
- 所以,当你的应用出现了内存溢出,OOM之类,并且写空闲极少发生(使用了 observeOutput 为 rue),那么就需要注意是不是数据出站速度过慢。
- 还有一个注意的地方就是 ReadTimeouthandler ,它继承自 IdeStateHandler,当触发读空闲事件的时候,就触发 ctx.fireExceptionCaught 方法,并传入一个 ReadTimeoutException,然后关闭 Socket。
- 而WriteTimeoutHandler 的实现不是基于 IdleStateHandler 的,他的原理是,当调用 write 方法的时候,会创建一个定时任务,任务内容是根据传入的 promise 的完成情况来判断是否超出了写的时间。当定时任务根据指定时间开始运行,发现 promise 的 isDone 方法返回 false,表明还没有写完,说明超时了,则抛出异常。当 write方法完成后,会打断定时任务。