IdleStateHandler 心跳机制源码详解

news2025/1/17 15:45:27

优质博文:IT-BLOG-CN

一、心跳机制

Netty支持心跳机制,可以检测远程服务端是否存活或者活跃。心跳是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制。在服务器和客户端之间一定时间内没有数据交互时, 即处于idle状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 即一个PING-PONG交互。当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保TCP连接的有效性。

Netty提供了IdleStateHandler可以对三种类型心跳进行检测,是用来监测连接的空闲情况。然后我们就可以根据心跳情况,来实现具体的处理逻辑,比如说断开连接、重新连接等等。同时,还提供了ReadTimeoutHandlerWriteTimeoutHandler检测连接的有效性。

名称作用
IdleStateHandler当连接的空闲时间(读或者写)太长时,将会触发一个IdleStateEvent事件。然后,你可以通过你的ChannelInboundHandler中重写userEventTrigged方法来处理该事件。
ReadTimeoutHandler如果在指定的事件没有发生读事件,就会抛出这个异常,并自动关闭这个连接。你可以在exceptionCaught方法中处理这个异常。
WriteTimeoutHandler当一个写操作不能在一定的时间内完成时,抛出此异常,并关闭连接。你同样可以在exceptionCaught方法中处理这个异常。

二、IdleStateHandler 简介

IdleStateHandler也是一个ChannelHandler,也需要被载入到ChannelPipeline中,加入我们在服务器端的ChannelInitializer中。我们在channel链中加入了IdleSateHandler,第一个参数是5,单位是秒,那么这样做的意思就是:在服务器端会每隔5秒来检查一下channelRead方法被调用的情况,如果在5秒内该链上的channelRead方法都没有被触发,就会调用userEventTriggered方法:

//创建服务类
ServerBootstrap serverBootstrap = new ServerBootstrap();

//boss和worker
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();

try {
    //设置线程池
    serverBootstrap.group(boss,worker);
    //设置socket工厂,Channel 是对 Java 底层 Socket 连接的抽象
    serverBootstrap.channel(NioServerSocketChannel.class);
    //设置管道工厂
    serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            //设置后台转换器(二进制转换字符串)
            ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new StringEncoder());
            ch.pipeline().addLast(new ServerSocketHandler());
        }
    });

并且还是个ChannelInboundHandler,是用来处理入站事件的。看下它的构造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
名称作用
readerIdleTimeSeconds读超时。即当在指定的时间间隔内没有从Channel读取到数据时,会触发一个READER_IDLEIdleStateEvent事件
writerIdleTimeSeconds写超时。即当在指定的时间间隔内没有数据写入到Channel时,会触发一个WRITER_IDLEIdleStateEvent事件
allIdleTimeSeconds读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个ALL_IDLEIdleStateEvent事件

三、IdleStateHandler 源码

【1】handlerAddedhandlerRemovedIdleStateHandler是在创建IdleStateHandler实例并添加到ChannelPipeline时添加定时任务来进行定时检测的,具体在initialize(ctx)方法实现;同时在从ChannelPipeline移除或Channel关闭时,移除这个定时检测,具体在destroy()实现。IdleStateHandlerchannelActive()方法在socket通道建立时被触发。

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
        this.initialize(ctx);
    }

}

public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    this.destroy();
}

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    this.destroy();
    super.channelInactive(ctx);
}

【2】initialize:根据配置的readerIdleTimeWriteIdleTIme等超时事件参数往任务队列taskQueue中添加定时任务task。我先先查看ReaderIdleTimeoutTask的作用。

private void initialize(ChannelHandlerContext ctx) {
    switch(this.state) {
    case 1:
    case 2:
        return;
    default:
        this.state = 1;
        this.initOutputChanged(ctx);
        this.lastReadTime = this.lastWriteTime = this.ticksInNanos();
        if (this.readerIdleTimeNanos > 0L) {
            // 这里的 schedule 方法会调用 eventLoop 的 schedule 方法,将定时任务添加进队列中
            this.readerIdleTimeout = this.schedule(ctx, new IdleStateHandler.ReaderIdleTimeoutTask(ctx), this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }

        if (this.writerIdleTimeNanos > 0L) {
            this.writerIdleTimeout = this.schedule(ctx, new IdleStateHandler.WriterIdleTimeoutTask(ctx), this.writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }

        if (this.allIdleTimeNanos > 0L) {
            this.allIdleTimeout = this.schedule(ctx, new IdleStateHandler.AllIdleTimeoutTask(ctx), this.allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }

    }
}

定时任务添加到对应线程EventLoopExecutor对应的任务队列taskQueue中,在对应线程的run()方法中循环执行。只要给定的参数大于0,就创建一个定时任务,每个事件都创建。同时,将state状态设置为1,防止重复初始化。调用initOutputChanged方法,初始化监控出站数据属性

private void initOutputChanged(ChannelHandlerContext ctx) {
    if (this.observeOutput) {
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 记录了出站缓冲区相关的数据,buf 对象的 hash 码,和 buf 的剩余缓冲字节数
        if (buf != null) {
            this.lastMessageHashCode = System.identityHashCode(buf.current());
            this.lastPendingWriteBytes = buf.totalPendingWriteBytes();
            this.lastFlushProgress = buf.currentProgress();
        }
    }
}

这边会触发一个ReaderIdleTimeoutTasknextDelay的初始化值为超时秒数readerIdleTimeNanos。如果检测的时候没有正在读,且计算多久没读了,nextDelay -= 当前时间 - 上次读取时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了。则创建IdleStateEvent事件,IdleState枚举值为READER_IDLE,然后调用channelIdle方法分发给下一个ChannelInboundHandler,通常由用户自定义一个ChannelInboundHandler来捕获并处理。

private final class ReaderIdleTimeoutTask extends IdleStateHandler.AbstractIdleTask {
    ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    protected void run(ChannelHandlerContext ctx) {
        long nextDelay = IdleStateHandler.this.readerIdleTimeNanos;
        if (!IdleStateHandler.this.reading) {
            nextDelay -= IdleStateHandler.this.ticksInNanos() - IdleStateHandler.this.lastReadTime;
        }

        if (nextDelay <= 0L) {
            IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, IdleStateHandler.this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            boolean first = IdleStateHandler.this.firstReaderIdleEvent;
            IdleStateHandler.this.firstReaderIdleEvent = false;

            try {
                IdleStateEvent event = IdleStateHandler.this.newIdleStateEvent(IdleState.READER_IDLE, first);
                IdleStateHandler.this.channelIdle(ctx, event);
            } catch (Throwable var6) {
                ctx.fireExceptionCaught(var6);
            }
        } else {
            IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }

    }
}

firstxxxxIdleEvent作用:假设当你的客户端应用每次接收数据是30秒,而你的写空闲时间是25秒,那么,当你数据还没有写出的时候,写空闲时间触发了。实际上是不合乎逻辑的。因为你的应用根本不空闲。

Netty的解决方案是:记录最后一次输出消息的相关信息,并使用一个值firstXXXXIdleEvent表示是否再次活动过,每次读写活动都会将对应的first值更新为true,如果是false,说明这段时间没有发生过读写事件。同时如果第一次记录出站的相关数据和第二次得到的出站相关数据不同,则说明数据在缓慢的出站,就不用触发空闲事件。

总的来说,这个字段就是用来对付 “客户端接收数据奇慢无比,慢到比空闲时间还多” 的极端情况。所以,Netty默认是关闭这个字段的。

总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就触发·UserEventTriggered`方法:

protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    ctx.fireUserEventTriggered(evt);
}

【3】写任务的run方法逻辑基本和读任务的逻辑一样,唯一不同的就是有一个针对 出站较慢数据的判断。

if (hasOutputChanged(ctx, first)) {
   return;
}

如果这个方法返回true,就不执行触发事件操作了,即使时间到了。看看该方法实现:

private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
    if (observeOutput) {
        // 如果最后一次写的时间和上一次记录的时间不一样,说明写操作进行过了,则更新此值
        if (lastChangeCheckTimeStamp != lastWriteTime) {
            lastChangeCheckTimeStamp = lastWriteTime;
            // 但如果,在这个方法的调用间隙修改的,就仍然不触发事件
            if (!first) { // #firstWriterIdleEvent or #firstAllIdleEvent
                return true;
            }
        }
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 如果出站区有数据
        if (buf != null) {
            // 拿到出站缓冲区的 对象 hashcode
            int messageHashCode = System.identityHashCode(buf.current());
            // 拿到这个 缓冲区的 所有字节
            long pendingWriteBytes = buf.totalPendingWriteBytes();
            // 如果和之前的不相等,或者字节数不同,说明,输出有变化,将 "最后一个缓冲区引用" 和 “剩余字节数” 刷新
            if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                lastMessageHashCode = messageHashCode;
                lastPendingWriteBytes = pendingWriteBytes;
                // 如果写操作没有进行过,则任务写的慢,不触发空闲事件
                if (!first) {
                    return true;
                }
            }
        }
    }
    return false;
}

如果用户没有设置需要观察出站情况。就返回false,继续执行事件。如果设置了观察出站的情况,且最后一次写的时间和上一次记录的时间不一样,说明写操作刚刚做过了,则更新此值,但仍然需要判断这个first的值,如果这个值还是false,说明在这个写事件是在两个方法调用间隙完成的,或者是第一次访问这个方法,就仍然不触发事件。如果不满足上面的条件,就取出缓冲区对象,如果缓冲区没对象了,说明没有发生写的很慢的事件,就触发空闲事件。反之,记录当前缓冲区对象的hashcode和剩余字节数,再和之前的比较,如果任意一个不相等,说明数据在变化,或者说数据在慢慢的写出去。那么就更新这两个值,留在下一次判断。继续判断first,如果是fasle,说明这是第二次调用,就不用触发空闲事件了。

【4】所有事件的run方法:这个类叫做AllIdleTimeoutTask,表示这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致,除了这里:

long nextDelay = allIdleTimeNanos;
if (!reading) {
   // 当前时间减去 最后一次写或读 的时间 ,若大于0,说明超时了
   nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}

这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。最后调用ctx.fireUserEventTriggered(evt)方法。

通常这个使用的是最多的。构造方法一般是:

pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));

读写都是0表示禁用,30表示30秒内没有任务读写事件发生,就触发事件。注意,当不是0的时候,这三个任务会重叠。

四、总结

【1】IdleStateHandler心跳检测主要是通过向线程任务队列中添加定时任务,判断channelRead()方法或write()方法是否调用空闲超时,如果超时则触发超时事件执行自定义userEventTrigger()方法;
【2】Netty通过IdleStateHandler实现最常见的心跳机制不是一种双向心跳的PING-PONG模式,而是客户端发送心跳数据包,服务端接收心跳但不回复,因为如果服务端同时有上千个连接,心跳的回复需要消耗大量网络资源;如果服务端一段时间内没有收到客户端的心跳数据包则认为客户端已经下线,将通道关闭避免资源的浪费;在这种心跳模式下服务端可以感知客户端的存活情况,无论是宕机的正常下线还是网络问题的非正常下线,服务端都能感知到,而客户端不能感知到服务端的非正常下线;
【3】要想实现客户端感知服务端的存活情况,需要进行双向的心跳;Netty中的channelInactive()方法是通过Socket连接关闭时挥手数据包触发的,因此可以通过channelInactive()方法感知正常的下线情况,但是因为网络异常等非正常下线则无法感知;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1280796.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习记录--梯度下降法

什么是梯度下降法&#xff1f; 梯度下降法是用来求解成本函数cost函数中使得J(w,b)函数值最小的参数(w,b) 梯度下降法的实现 通过对参数w,b的不断更新迭代&#xff0c;使J(w,b)的值趋于局部最小值或者全局最小值 如何进行更新&#xff1f; 以w为例&#xff1a;迭代公式 ww-…

Go连接mysql数据库

package main import ("database/sql""fmt"_ "github.com/go-sql-driver/mysql" ) //go连接数据库示例 func main() {// 数据库信息dsn : "root:roottcp(192.168.169.11:3306)/sql_test"//连接数据库 数据库类型mysql,以及数据库信息d…

【数据库】基于封锁的数据库调度器,以及等待锁处理的优先级策略

封锁调度器的体系结构 ​专栏内容&#xff1a; 手写数据库toadb 本专栏主要介绍如何从零开发&#xff0c;开发的步骤&#xff0c;以及开发过程中的涉及的原理&#xff0c;遇到的问题等&#xff0c;让大家能跟上并且可以一起开发&#xff0c;让每个需要的人成为参与者。 本专栏会…

LeedCode刷题---子数组问题

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C/C》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、最大子数组和 题目链接&#xff1a;最大子数组和 题目描述 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连…

开发猿的平平淡淡周末---2023/12/3

2023/12/3 天气晴 温度适宜 AM 早安八点多的世界&#xff0c;起来舒展了下腰&#xff0c;阳光依旧明媚&#xff0c;给平淡的生活带来了一丝暖意 日常操作&#xff0c;喂鸡&#xff0c;时政&#xff0c;洗漱&#xff0c;恰饭&#xff0c;肝会儿游戏 看会儿手机 ___看累…

“此应用专为旧版android打造,因此可能无法运行”,问题解决方案

当用户在Android P系统上打开某些应用程序时&#xff0c;可能会弹出一个对话框&#xff0c;提示内容为&#xff1a;“此应用专为旧版Android打造&#xff0c;可能无法正常运行。请尝试检查更新或与开发者联系”。 随着Android平台的发展&#xff0c;每个新版本通常都会引入新的…

[英语学习][6][Word Power Made Easy]的精读与翻译优化

[序言] 针对第18页的阅读, 进行第二次翻译优化以及纠错, 这次译者的翻译出现的严重问题: 没有考虑时态的变化导致整个翻译跟上下文脱节, 然后又有偷懒的嫌疑, 翻译得很随意. [英文学习的目标] 提升自身的英语水平, 对日后编程技能的提升有很大帮助. 希望大家这次能学到东西,…

基于SSM的职业高中智慧作业试题系统设计

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;JSP 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 一、…

基于Java SSM框架实现师生交流答疑作业系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现师生交流答疑作业系统演示 摘要 在新发展的时代&#xff0c;众多的软件被开发出来&#xff0c;给用户带来了很大的选择余地&#xff0c;而且人们越来越追求更个性的需求。在这种时代背景下&#xff0c;人们对师生交流平台越来越重视&#xff0c;更好的实…

【5G PHY】5G NR 如何计算资源块的数量?

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…

分享一个国内可用的免费AI-GPT网站

背景 ChatGPT作为一种基于人工智能技术的自然语言处理工具&#xff0c;近期的热度直接沸腾&#x1f30b;。 我们也忍不住做了一个基于ChatGPT的网站&#xff0c;可以免登陆&#xff01;&#xff01;国内可直接对话AI&#xff0c;也有各种提供工作效率的工具供大家使用。 可以这…

Python的模块与库,及if __name__ == ‘__main__语句【侯小啾python领航班系列(二十四)】

Python的模块与库,及if name == __main__语句【侯小啾python领航班系列(二十四)】 大家好,我是博主侯小啾, 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ…

c# OpenCV 读取、显示和写入图像(二)

读取、显示和写入图像是图像处理和计算机视觉的基础。即使在裁剪、调整大小、旋转或应用不同的滤镜来处理图像时&#xff0c;您也需要先读取图像。因此&#xff0c;掌握这些基本操作非常重要。 imread()读取图像imshow()在窗口中显示图像imwrite()将图像保存到文件目录里 我们…

Python:私人定制密码保险库 - Vault

简介&#xff1a;Vault是一种用于安全访问机密的工具。秘密是您想要严格控制访问权限的任何内容&#xff0c;例如API密钥、密码、证书等等。Vault为任何机密提供了统一的界面&#xff0c;同时提供了严格的访问控制并记录了详细的审核日志。 历史攻略&#xff1a; Python&…

Android 相机库CameraView源码解析 (三) : 滤镜相关类说明

1. 前言 这段时间&#xff0c;在使用 natario1/CameraView 来实现带滤镜的预览、拍照、录像功能。 由于CameraView封装的比较到位&#xff0c;在项目前期&#xff0c;的确为我们节省了不少时间。 但随着项目持续深入&#xff0c;对于CameraView的使用进入深水区&#xff0c;逐…

【springboot】启动失败 Failed to start bean ‘webServerStartStop‘

lsof -i&#xff1a;xxx 发现端口被占用 kill掉该进程

大模型的RPA应用 | 代理流程自动化(APA),开启智能自动化新纪元

随着技术创新的持续推进&#xff0c;自动化技术已经变得至关重要&#xff0c;成为驱动企业和社会向前发展的核心动力。在自动化的里程碑中&#xff0c;机器人流程自动化&#xff08;RPA&#xff09;已经有效地将简单、重复且规则性的任务自动化。可是随着对处理更为复杂、多变且…

【学习笔记】机器学习——GAN

提出于2014年。 GAN由两个神经网络组成&#xff1a;一个试图生成看起来与训练数据相似数据的生成器&#xff0c;以及一个试图从虚假数据中分辨出真实数据的判别器。生成器和判别器在训练期间相互竞争。 对抗训练&#xff08;训练竞争性网络&#xff09;是一种重要的机器学习思想…

循环神经网络RNN及其变体LSTM、GRU

1. 背景 RNN(Recurrent Neural Networks) CNN利用输入中的空间几何结构信息&#xff1b;RNN利用输入数据的序列化特性。 2. SimpleRNN单元 传统多层感知机网络假设所有的输入数据之间相互独立&#xff0c;但这对于序列化数据是不成立的。RNN单元用隐藏状态或记忆引入这种依赖…

2024美赛数学建模资料---100%获奖资料

很好的教程了 一共二十四章 每一章都是一个模型 并且有matlab编程编码 第一章 线性规划 第二章 整数规划 第三章 非线性规划 第四章 动态规划 第五章 图与网络 第六章 排队论 第七章 对策论 第八章 层次分析法 第九章 插值与拟合 第十章 数据的统计描述和分析 第十一章…