Netty核心源码分析(四)心跳检测源码分析

news2025/1/10 16:40:42

文章目录

  • 系列文章目录
  • 一、心跳检测案例
  • 二、源码分析
    • 1、Netty心跳的三个Handler
    • 2、IdleStateHandler源码
      • (1)四个关键属性
      • (2)handlerAdded方法
      • (3)四个内部类
    • 3、读事件的run方法——ReaderIdleTimeoutTask
    • 4、写事件的run方法——WriterIdleTimeoutTask
    • 5、所有事件的run方法——AllIdleTimeoutTask
    • 6、总结

系列文章目录

Netty核心源码分析(一),Netty的Server端启动过程源码分析
Netty核心源码分析(二),Netty的Server端接收请求过程源码分析
Netty核心源码分析(三)业务请求执行关键——ChannelPipeline、ChannelHandler、ChannelHandlerContext源码分析
Netty核心源码分析(四)心跳检测源码分析
Netty核心源码分析(五)核心组件EventLoop源码分析

一、心跳检测案例

Netty入门案例——Netty实现心跳检测

Netty作为一个网络框架,提供了诸多功能,本文就主要分析其心跳机制heartbeat的源码。

二、源码分析

1、Netty心跳的三个Handler

handler作用
IdleStateHandler当连接的空闲时间(读或者写)太长时,将会触发一个IdleStateEvent事件,然后可以通过ChannelInboundHandler中重写userEventTriggered方法来处理该事件
ReadTimeoutHandler如果在指定时间没有发生读事件,就会抛出ReadTimeoutException这个异常,并自动关闭连接。可以在exceptionCaught方法中处理这个异常
WriteTimeoutHandler当一个写操作不能在一定的时间内完成时,会抛出WriteTimeoutException异常,并 关闭连接。可以在exceptionCaught方法中处理这个异常

其中,ReadTimeoutHandler和WriteTimeoutHandler会抛出异常并关闭连接,属于异常处理,重点还是分析IdleStateHandler。

2、IdleStateHandler源码

(1)四个关键属性

// 是否考虑出站时较慢的情况,默认是false
private final boolean observeOutput;
// 读事件空闲时间,0 则禁用事件,纳秒为单位
private final long readerIdleTimeNanos;
// 写事件空闲时间,0则禁用事件,纳秒为单位
private final long writerIdleTimeNanos;
// 读或写空闲时间,0则禁用事件,纳秒为单位
private final long allIdleTimeNanos;

(2)handlerAdded方法

当该handler添加到pipeline时会触发:

// io.netty.handler.timeout.IdleStateHandler#handlerAdded
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
        // channelActive() event has been fired already, which means this.channelActive() will
        // not be invoked. We have to initialize here instead.
        initialize(ctx);
    } else {
        // channelActive() event has not been fired yet.  this.channelActive() will be invoked
        // and initialization will occur there.
    }
}
// io.netty.handler.timeout.IdleStateHandler#initialize
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
    case 1:
    case 2:
        return;
    }

    state = 1;
    // 初始化“监控出站数据属性”
    initOutputChanged(ctx);
	// System.nanoTime() 返回当前纳秒
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
    	// 这里的schedule会调用eventLoop的schedule方法,将定时任务添加到队列里
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

我们发现,只要给定的参数大于0,就创建一个定时任务,每个事件单独判断,同时将state状态设置为1,防止重复初始化。

// io.netty.handler.timeout.IdleStateHandler#schedule
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
    return ctx.executor().schedule(task, delay, unit);
}

(3)四个内部类

IdleStateHandler中定义了四个内部类,其中AbstractIdleTask是其他三个的父类,AbstractIdleTask实现了Runnable接口并且重写了run方法,其他三个在AbstractIdleTask的基础上重写了run方法。
在这里插入图片描述

// io.netty.handler.timeout.IdleStateHandler.AbstractIdleTask
private abstract static class AbstractIdleTask implements Runnable {

    private final ChannelHandlerContext ctx;

    AbstractIdleTask(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    @Override
    public void run() {
        if (!ctx.channel().isOpen()) {// 通道关闭,不执行
            return;
        }

        run(ctx);
    }
	// 子类重写
    protected abstract void run(ChannelHandlerContext ctx);
}

3、读事件的run方法——ReaderIdleTimeoutTask

// io.netty.handler.timeout.IdleStateHandler.ReaderIdleTimeoutTask
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

    ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {
    	// 获取设置的读超时时间
        long nextDelay = readerIdleTimeNanos;
        if (!reading) {
        	// 当前时间减去给定时间和最后一次读(执行channelReadComplete方法设置)
            nextDelay -= ticksInNanos() - lastReadTime;
        }
		// 如果小于0,就触发事件
        if (nextDelay <= 0) {
            // Reader is idle - set a new timeout and notify the callback.
            // 用于取消任务promise
            readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

			// 表示下一次读就不是第一次了
            boolean first = firstReaderIdleEvent;
            // first设置为false,在channelRead方法中会被改为true
            firstReaderIdleEvent = false;

            try {
            	// 创建一个IdleStateEvent 类型的读事件,传递给Handler的userEventTriggered方法
                IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                // Handler传递,调用下一个Handler的userEventTriggered方法
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
        	// 如果大于0,继续放入队列,间隔时间是新的计算时间
            // Read occurred before the timeout - set a new timeout with shorter delay.
            readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}

总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就触发UserEventTriggered方法。

4、写事件的run方法——WriterIdleTimeoutTask

// io.netty.handler.timeout.IdleStateHandler.WriterIdleTimeoutTask
private final class WriterIdleTimeoutTask extends AbstractIdleTask {

    WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {

        long lastWriteTime = IdleStateHandler.this.lastWriteTime;
        long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
        if (nextDelay <= 0) {
            // Writer is idle - set a new timeout and notify the callback.
            writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstWriterIdleEvent;
            firstWriterIdleEvent = false;

            try {
                if (hasOutputChanged(ctx, first)) {
                    return;
                }

                IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // Write occurred before the timeout - set a new timeout with shorter delay.
            writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}

写任务的run代码逻辑基本和读任务的逻辑一样,唯一不同的是有一个针对出站较慢数据的判断hasOutputChanged。

5、所有事件的run方法——AllIdleTimeoutTask

表示读写事件都需要监控。

// io.netty.handler.timeout.IdleStateHandler.AllIdleTimeoutTask
private final class AllIdleTimeoutTask extends AbstractIdleTask {

    AllIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {

        long nextDelay = allIdleTimeNanos;
        if (!reading) {
        	// 当前时间减去最后一次写或者读的世界,若大于0,说明超时了
        	// 这里的时间计算是取读写事件的最大值来的
            nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
        }
        if (nextDelay <= 0) {
            // Both reader and writer are idle - set a new timeout and
            // notify the callback.
            allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstAllIdleEvent;
            firstAllIdleEvent = false;

            try {
            	// 同写事件相同,出站较慢的判断
                if (hasOutputChanged(ctx, first)) {
                    return;
                }

                IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // Either read or write occurred before the timeout - set a new
            // timeout with shorter delay.
            allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}

6、总结

  1. IdleStateHandler 可以实现心跳功能,当服务器和客户端没有任何读写交互时,并超过了给定的时间,则会触发用户 handler 的 userEventIriggered 方法。用户可以在这个方法中尝试向对方发送信息,如果发送失败,则关闭连接。
  2. IdleStateHandler 的实现基于 EventLoop 的定时任务,每次读写都会记录一个值,在定时任务运行的时候通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。
  3. 内部有 3 个定时任务,分别对应读事件,写事件,读写事件。通常用户监听读写事件就足够了。
  4. 同时,IdleStateHandler 内部也考虑了一些极端情况: 客户端接收缓慢,一次接收数据的速度超过了设置的空闲时间。Netty 通过构造方法中的 observeOutput 属性来决定是否对出站缓冲区的情况进行判断。
  5. 如果出站缓慢,Netty 不认为这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成 OOM,OOM 比空闲的问题更大。
  6. 所以,当你的应用出现了内存溢出,OOM之类,并且写空闲极少发生(使用了 observeOutput 为 rue),那么就需要注意是不是数据出站速度过慢。
  7. 还有一个注意的地方就是 ReadTimeouthandler ,它继承自 IdeStateHandler,当触发读空闲事件的时候,就触发 ctx.fireExceptionCaught 方法,并传入一个 ReadTimeoutException,然后关闭 Socket。
  8. 而WriteTimeoutHandler 的实现不是基于 IdleStateHandler 的,他的原理是,当调用 write 方法的时候,会创建一个定时任务,任务内容是根据传入的 promise 的完成情况来判断是否超出了写的时间。当定时任务根据指定时间开始运行,发现 promise 的 isDone 方法返回 false,表明还没有写完,说明超时了,则抛出异常。当 write方法完成后,会打断定时任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/461133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

easyrecovery16最新数据恢复软件密钥使用方法教程

easyrecovery是一款专业的数据恢复软件,其最新版本为easyrecovery2023将于2022年底发布。总之,easyrecovery是一款功能齐全、性能稳定的专业数据恢复软件,无论删除文件、格式化分区或磁盘故障,它都可以提供最高的恢复成功率。值得个人用户选用。此版本在功能和性能上有较大提升…

支持中英双语和多种插件的开源对话语言模型,160亿参数

一、开源项目简介 MOSS是一个支持中英双语和多种插件的开源对话语言模型&#xff0c;moss-moon系列模型具有160亿参数&#xff0c;在FP16精度下可在单张A100/A800或两张3090显卡运行&#xff0c;在INT4/8精度下可在单张3090显卡运行。MOSS基座语言模型在约七千亿中英文以及代码…

HTB靶机-Lame-WP

Lame 简介&#xff1a; Lame is a beginner level machine, requiring only one exploit to obtain root access. It was the first machine published on Hack The Box and was often the first machine for new users prior to its retirement Tags&#xff1a; Injection, C…

Midjourney 注册 12 步流程教学

原文&#xff1a; https://bysocket.com/midjourney-register/ 先推荐一个 PromptHero 中文官网 https://promptheroes.cn/ &#xff1a;Prompt Heroes 官网是提供 AI 绘画相关提示词中文网站&#xff0c;包括 Midjourney&#xff08;MJ&#xff09;、 Stable Diffusion、DALL…

printf,echo,cat指令与输出重定向>,输入重定向<与追加重定向>>等

printf指令的功能&#xff08;输出/追加重定向&#xff09; 语法&#xff1a;printf “格式化数据” (>/>>重定向)功能&#xff1a;格式化输出(默认往显示器文件且不带换行符&#xff09; 实例演示 echo指令的功能&#xff08;输出/追加重定向&#xff09; 语法&am…

使用chatgpt分析 too many open files 问题-未验证

java.io.IOException: Too many open files 怎么能定位到时哪行代码出的问题 &#xff1f; 2023/4/25 19:46:33 当出现类似 "java.io.IOException: Too many open files" 的错误时&#xff0c;通常是因为程序打开了过多的文件句柄&#xff08;File Handles&#xff…

【操作系统】第四章 文件管理

文章目录 知识体系4.1 文件系统基础4.1.1 文件的基本概念4.1.2 文件控制块和索引节点4.1.3 文件的操作4.1.4 文件保护4.1.5 文件的逻辑结构4.1.6 文件的物理结构 4.2 目录4.2.1 目录的基本概念4.2.2 目录结构4.2.3 目录的操作*4.2.4 目录实现4.2.5 文件共享 4.3 文件系统4.3.1 …

快速部署和测试API:使用APIfox的实战经验分享

最近发现一款接口测试工具--apifox&#xff0c;我我们很难将它描述为一款接口管理工具 或 接口自测试工具。 官方给了一个简单的公式&#xff0c;更能说明apifox可以做什么。 20分钟学ApiFox接口测试工具&#xff0c;结合30个项目实战讲解&#xff01;_哔哩哔哩_bilibili20分…

十、v-model的基本使用

一、v-model的基本使用 表单提交是开发中非常常见的功能&#xff0c;也是和用户交互的重要手段&#xff1a; 比如用户在登录、注册时需要提交账号密码&#xff1b;比如用户在检索、创建、更新信息时&#xff0c;需要提交一些数据&#xff1b; 这些都要求我们可以在代码逻辑中…

LVS+KeepAlived高可用负载均衡集群

1. 高可用群集的相关知识 1. 1 高可用&#xff08;HA&#xff09;群集与普通群集的比较 普通群集 普通的群集的部署是通过一台度器控制调配多台节点服务器进行业务请求的处理&#xff0c;但是仅仅是一台调度器&#xff0c;就会存在极大的单点故障风险&#xff0c;当该调度器…

【MCS-51】51单片机结构原理

至今为止&#xff0c;MCS-51系列单片机有许多种型号的产品&#xff1a;其中又分为普通型51&#xff08;8031、8051、89S51&#xff09;和增强型52&#xff08;8032、8052、89S52等&#xff09;。它们最大的区别在于存储器配置各有差异。下面我举例子的都是8051这一系列的单片机…

如何用ChatGPT协助做内容分发?(文本变成直播/音频/视频脚本)

该场景对应的关键词库&#xff08;14个&#xff09;&#xff1a; 直播博主、直播达人、音频主持人、产品特点、品牌故事、品牌活动、品牌logo、视频主角、画外音解说员、编剧身份、品牌内容分发方向、时长、脚本类型、产品 提问模板&#xff1a; 1、你是一名优秀的美妆达人&am…

零拷贝技术详解

文章目录 零拷贝技术前世数据的四次拷贝与四次上下文切换4 次 copy4 次上下文切换弊端 零拷贝诞生准备--DMA技术DMA 参与下的数据四次拷贝 零拷贝诞生零拷贝技术什么是零拷贝技术&#xff1f;零拷贝的实现方式技术总结1、DMA 技术2、使用 page cache 的 zero copy&#xff1a;3…

Redis命令及不同类型数据的应用场景

文章目录 本章要点基本命令心跳命令ping读写键值命令DB切换select查看key数量 dbsize删除当前库数据flushdb删除所有库数据flushall退出客户端命令 Key操作命令String型Value操作命令典型应用场景 Hash型Value操作命令应用场景 List型Value操作命令应用场景 Set型Value操作命令…

麻了,部门新来的00后给我卷崩溃了...

今天上班开早会就是新人见面仪式&#xff0c;听说来了个很厉害的大佬&#xff0c;年纪还不大&#xff0c;是上家公司离职过来的&#xff0c;薪资已经达到中高等水平&#xff0c;很多人都好奇不已&#xff0c;能拿到这个薪资应该人不简单&#xff0c;果然&#xff0c;自我介绍的…

前端工程化知识总结

1.webpack 和 gulp 区别 gulp&#xff08;流&#xff09;主要指前端开发的工作流程&#xff0c;通过配置一系列的task&#xff0c;定义task处理的事务(例如文件压缩合并、雪碧图、启动server、版本控制等)&#xff0c;再定义执行顺序&#xff0c;让gulp执行 task&#xff0c;从…

【WCH】CH32F203基于内部RTC+I2C SSD1306 OLED时钟和温度显示

【WCH】CH32F203基于内部RTCI2C SSD1306 OLED时钟和温度显示 &#x1f4cc;相关篇《【WCH】CH32F203基于内部RTC时钟I2C SSD1306 OLED显示》&#x1f4fa;显示效果&#xff1a; ✨主要是在其基础 上增加温度显示&#xff0c;温度数据来源于DS18B20&#xff0c;更换了OLED驱动显…

19.Java文件操作---I/O流

Java文件操作—I/O流 流(stream)的概念源于UNIX中管道(pipe)的概念。在UNIX中&#xff0c;管道是一条不间断的字节流&#xff0c;用来实现程序或进程间的通信&#xff0c;或读写外围设备、外部文件等。一个流&#xff0c;必有源端和目的端&#xff0c;它们可以是计算机内存的某…

c++ 虚基类(好理解)

当有类Base&#xff0c;类Base1继承了Base&#xff0c;Base2也继承了Base&#xff0c;Derived 类多继承了Base1和Base2 也就是呈现如图一种继承关系 如果Base类有一个public的 int a &#xff0c;在Derived里面要访问这个a&#xff0c;就必须指定是哪个类的a&#xff08;Base1…

2023年的深度学习入门指南(8) - CUDA编程基础

2023年的深度学习入门指南(8) - CUDA编程基础 上一篇我们走马观花地看了下SIMD和GPGPU的编程。不过线条太粗了&#xff0c;在开发大模型时遇到问题了肯定还会晕。 所以我们还是需要深入到CUDA中去探险一下。 获取CUDA设备信息 在使用CUDA设备之前&#xff0c;首先我们得获取…