【Netty】Netty中的超时处理与心跳机制(十九)

news2025/2/26 19:17:37

文章目录

  • 前言
  • 一、超时监测
  • 二、IdleStateHandler类
  • 三、ReadTimeoutHandler类
  • 四、WriteTimeoutHandler类
  • 五、实现心跳机制
    • 5.1. 定义心跳处理器
    • 5.2. 定义 ChannelInitializer
    • 5.3. 编写服务器
    • 5.4. 测试
  • 结语

前言

回顾Netty系列文章:

  • Netty 概述(一)
  • Netty 架构设计(二)
  • Netty Channel 概述(三)
  • Netty ChannelHandler(四)
  • ChannelPipeline源码分析(五)
  • 字节缓冲区 ByteBuf (六)(上)
  • 字节缓冲区 ByteBuf(七)(下)
  • Netty 如何实现零拷贝(八)
  • Netty 程序引导类(九)
  • Reactor 模型(十)
  • 工作原理详解(十一)
  • Netty 解码器(十二)
  • Netty 编码器(十三)
  • Netty 编解码器(十四)
  • 自定义解码器、编码器、编解码器(十五)
  • Future 源码分析(十六)
  • Promise 源码分析(十七)
  • 一行简单的writeAndFlush都做了哪些事(十八)

一、超时监测

Netty 的超时类型 IdleState 主要分为以下3类:

  • ALL_IDLE : 一段时间内没有数据接收或者发送。
  • READER_IDLE : 一段时间内没有数据接收。
  • WRITER_IDLE : 一段时间内没有数据发送。

针对上面的 3 类超时异常,Netty 提供了 3 类ChannelHandler来进行监测。

  • IdleStateHandler : 当 Channel 一段时间未执行读取、写入或者两者都未执行时,触发 -IdleStateEvent 事件。
  • ReadTimeoutHandler :在一定时间内未读取任何数据时,引发 ReadTimeoutEvent 事件。
  • WriteTimeoutHandler :当写操作在一定时间内无法完成时,引发 WriteTimeoutEvent 事件。

二、IdleStateHandler类

IdleStateHandler 包括了读\写超时状态处理,观察以下 IdleStateHandler 类的构造函数源码。

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this.writeListener = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            IdleStateHandler.this.lastWriteTime = IdleStateHandler.this.ticksInNanos();
            IdleStateHandler.this.firstWriterIdleEvent = IdleStateHandler.this.firstAllIdleEvent = true;
        }
    };
    this.firstReaderIdleEvent = true;
    this.firstWriterIdleEvent = true;
    this.firstAllIdleEvent = true;
    ObjectUtil.checkNotNull(unit, "unit");
    this.observeOutput = observeOutput;
    if (readerIdleTime <= 0L) {
        this.readerIdleTimeNanos = 0L;
    } else {
        this.readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
    }

    if (writerIdleTime <= 0L) {
        this.writerIdleTimeNanos = 0L;
    } else {
        this.writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
    }

    if (allIdleTime <= 0L) {
        this.allIdleTimeNanos = 0L;
    } else {
        this.allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
    }

}

在上述源码中,构造函数可以接收以下参数:

  • readerIdleTimeSecond:指定读超时时间,指定 0 表明为禁用。

  • writerIdleTimeSecond:指定写超时时间,指定 0 表明为禁用。

  • allIdleTimeSecond:在指定读写超时时间,指定 0 表明为禁用。

IdleStateHandler 使用示例:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("idleStateHandler",new IdleStateHandler(60,30,0));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

public class MyHandler extends ChannelDuplexHandler {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent e = (IdleStateEvent) evt;
            if(e.state() == IdleState.READER_IDLE){
                ctx.close();
            }else if(e.state() == IdleState.WRITER_IDLE){
                ctx.writeAndFlush(new PingMessage());
            }
        }
    }
}

在上述示例中,IdleStateHandler 设置了读超时时间为 60 秒,写超时时间为 30 秒。MyHandler 是针对超时事件 IdleStateEvent 的处理。

  • 如果 30 秒内没有出站流量(写超时)时发送 ping 消息的示例。
  • 如果 60 秒内没有入站流量(读超时)时,连接关闭。

三、ReadTimeoutHandler类

ReadTimeoutHandler 类包括了读超时状态处理。ReadTimeoutHandler 类的源码如下:

public class ReadTimeoutHandler extends IdleStateHandler {
    private boolean closed;

    public ReadTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public ReadTimeoutHandler(long timeout, TimeUnit unit) {
        super(timeout, 0L, 0L, unit);//禁用了写超时、读写超时
    }

    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;//只处理读超时

        this.readTimedOut(ctx);
    }

    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);//引发异常
            ctx.close();
            this.closed = true;
        }

    }
}

从上述源码可以看出,ReadTimeoutHandler 继承自 IdleStateHandler,并在构造函数中禁用了写超时、读写超时,而且在处理超时时,只会针对 READER_IDLE状态进行处理,并引发 ReadTimeoutException 异常。
ReadTimeoutHandler 的使用示例如下:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(30));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

//处理器处理ReadTimeoutException 
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof ReadTimeoutException){
            //...
        }else {
            super.exceptionCaught(ctx,cause);
        }
    }
}

在上述示例中,ReadTimeoutHandler 设置了读超时时间是 30 秒。

四、WriteTimeoutHandler类

WriteTimeoutHandler 类包括了写超时状态处理。WriteTimeoutHandler 类的源码如下:

public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
    private static final long MIN_TIMEOUT_NANOS;
    private final long timeoutNanos;
    private WriteTimeoutHandler.WriteTimeoutTask lastTask;
    private boolean closed;

    public WriteTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public WriteTimeoutHandler(long timeout, TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");
        if (timeout <= 0L) {
            this.timeoutNanos = 0L;
        } else {
            this.timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
        }

    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.timeoutNanos > 0L) {
            promise = promise.unvoid();
            this.scheduleTimeout(ctx, promise);
        }

        ctx.write(msg, promise);
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        WriteTimeoutHandler.WriteTimeoutTask task = this.lastTask;

        WriteTimeoutHandler.WriteTimeoutTask prev;
        for(this.lastTask = null; task != null; task = prev) {
            task.scheduledFuture.cancel(false);
            prev = task.prev;
            task.prev = null;
            task.next = null;
        }

    }

    private void scheduleTimeout(ChannelHandlerContext ctx, ChannelPromise promise) {
        WriteTimeoutHandler.WriteTimeoutTask task = new WriteTimeoutHandler.WriteTimeoutTask(ctx, promise);
        task.scheduledFuture = ctx.executor().schedule(task, this.timeoutNanos, TimeUnit.NANOSECONDS);
        if (!task.scheduledFuture.isDone()) {
            this.addWriteTimeoutTask(task);
            promise.addListener(task);
        }

    }

    private void addWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
        if (this.lastTask != null) {
            this.lastTask.next = task;
            task.prev = this.lastTask;
        }

        this.lastTask = task;
    }

    private void removeWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
        if (task == this.lastTask) {
            assert task.next == null;

            this.lastTask = this.lastTask.prev;
            if (this.lastTask != null) {
                this.lastTask.next = null;
            }
        } else {
            if (task.prev == null && task.next == null) {
                return;
            }

            if (task.prev == null) {
                task.next.prev = null;
            } else {
                task.prev.next = task.next;
                task.next.prev = task.prev;
            }
        }

        task.prev = null;
        task.next = null;
    }

    protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            this.closed = true;
        }

    }

  //...
}

从上述源码可以看出,WriteTimeoutHandler 在处理超时时,引发了 WriteTimeoutException 异常。
WriteTimeoutHandler 的使用示例如下:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("writeTimeoutHandler",new WriteTimeoutHandler(30));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

//处理器处理ReadTimeoutException 
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof WriteTimeoutException ){
            //...
        }else {
            super.exceptionCaught(ctx,cause);
        }
    }
}

在上述示例中,WriteTimeoutHandler 设置了写超时时间是 30 秒。

五、实现心跳机制

针对超时的解决方案——心跳机制。
在程序开发中,心跳机制是非常常见的。其原理是,当连接闲置时可以发送一个心跳来维持连接。一般而言,心跳就是一段小的通信。

5.1. 定义心跳处理器

public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
	
	// (1)心跳内容
	private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
			.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
					CharsetUtil.UTF_8));  

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
			throws Exception {

		// (2)判断超时类型
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;
			String type = "";
			if (event.state() == IdleState.READER_IDLE) {
				type = "read idle";
			} else if (event.state() == IdleState.WRITER_IDLE) {
				type = "write idle";
			} else if (event.state() == IdleState.ALL_IDLE) {
				type = "all idle";
			}

			// (3)发送心跳
			ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
					ChannelFutureListener.CLOSE_ON_FAILURE);
 
			System.out.println( ctx.channel().remoteAddress()+"超时类型:" + type);
		} else {
			super.userEventTriggered(ctx, evt);
		}
	}
}

对上述代码说明:

  • 定义了心跳时,要发送的内容。

  • 判断是不是 IdleStateEvent 事件,是则处理。

  • 将心跳内容发送给客户端。

5.2. 定义 ChannelInitializer

HeartbeatHandlerInitializer用于封装各类ChannelHandler,代码如下:

public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {

	private static final int READ_IDEL_TIME_OUT = 4; // 读超时
	private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
	private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时

	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
				WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // (1)
		pipeline.addLast(new HeartbeatServerHandler()); // (2)
	}
}

对上述代码说明如下:

  • 添加了一个IdleStateHandler到 ChannelPipeline,并分别设置了读、写超时的时间。为了方便演示,将超时时间设置的比较短。
  • 添加了HeartbeatServerHandler,用来处理超时时,发送心跳。

5.3. 编写服务器

服务器代码比较简单,启动后侦听 8083 端口。

public final class HeartbeatServer {

    static final int PORT = 8083;

    public static void main(String[] args) throws Exception {

        // 配置服务器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new HeartbeatHandlerInitializer());

            // 启动
            ChannelFuture f = b.bind(PORT).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

5.4. 测试

首先启动 HeartbeatServer,客户端用操作系统自带的 Telnet 程序即可:

telnet 127.0.0.1 8083

可以看到客户端与服务器的交互效果如下图。
在这里插入图片描述

结语

文章如果对你有帮助,看完记得点赞、关注、收藏。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/582277.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

太坑了,盘点BeanUtils.copyProperties的11个小坑

我们日常开发中&#xff0c;经常涉及到DO、DTO、VO对象属性拷贝赋值&#xff0c;很容易想到org.springframework.beans.BeanUtils的copyProperties 。它会自动通过反射机制获取源对象和目标对象的属性&#xff0c;并将对应的属性值进行复制。可以减少手动编写属性复制代码的工作…

“游蛇”大规模邮件攻击针对中国用户

近半年黑客团伙频频对我国实施攻击活动。研究人员发现&#xff0c;“游蛇”黑产团伙自2022年下半年开始至今&#xff0c;针对中国用户发起了大规模电子邮件攻击活动。 黑客使用电子邮件在内的多种传播方式。 该团伙利用钓鱼邮件、伪造的电子票据下载站、虚假应用程序下载站、…

eclipse固件库生成的操作流程

一.方法介绍 有时候我们需要将某个功能模块封装成一个库&#xff0c;只留出接口供别人使用&#xff0c;那么就需要打包处理&#xff0c;eclipse是如何操作的呢&#xff1f;本文仅仅讨论我所知道的两种方式&#xff0c;倘若还有更简便的方法也非常欢迎网友补充。 1.在已有的工…

【PyQt5】使用QtDesigner创建Splitter

目录 Splitter效果演示 目前在Qt Designer无法检索到QSplitter。 实现方式&#xff1a; 1.同时选中两个需要实现splitter样式的控件&#xff0c;以QTreeWidget和QTableWidget为例&#xff1b; 2.右击–>布局–>使用分裂器&#xff08;根据需求选择水平或垂直布局&#x…

Groovy系列二 Groovy GDK

目录 Groovy中自带集合方法 sort方法对集合排序 findAll 查询所有符合条件的元素 collect 返回 一个新的list inject 强大的累计功能 each、eachWithIndex find、findIndexOf 查询符合条件的数据 any判断符合条件的数据 every查询所有条件都满足的数据 reverse将集合…

linux下安装EclipseCDT:离线安装与在线安装

文章目录 前言&#xff1a;1. 离线下载1.1 下载EclipseCDT1.2 下载jdk1.3 安装jdk1.4 安装eclipse 2. 在线安装&#xff1a;2.1 安装jdk2.2 安装EclipseCDT2.2.1 简单安装2.2.2 ubuntu官方推荐安装方式2.2.3 apt安装(报错logo) 总结&#xff1a; 前言&#xff1a; Eclipse使用…

ChatGPT对软件测试的影响

本文首发于个人网站「BY林子」&#xff0c;转载请参考版权声明。 ChatGPT是一个经过预训练的AI语言模型&#xff0c;可以通过聊天的方式回答问题&#xff0c;或者与人闲聊。它能处理的是文本类的信息&#xff0c;输出也只能是文字。它从我们输入的信息中获取上下文&#xff0c;…

Spring 日志文件

日志 日志是程序的重要组成部分,日志可以:a.记录错误日志和警告日志(发现和定位问题)b.记录用户登录日志,方便分析用户是正常登录还是恶意破解用户c.记录系统的操作日志,方便数据恢复和定位操作人d.记录程序的执行时间,方便为以后优化程序提供数据支持 日志使用 SpringBoot …

东风/小米投资!去年EHB出货20万台,这家公司获科技进步一等奖

5月26日上午&#xff0c;2022年度上海市科学技术奖励大会在上海展览中心中央大厅召开&#xff0c;隆重表彰为国家、为上海科技事业和现代化建设作出突出贡献的科技工作者。同驭汽车与同济大学等单位联合申报的“汽车线控制动系统关键技术及产业化”项目获得科技进步奖项目一等奖…

【CCNP | 网络模拟器GNS系列】安装、配置和使用 GNS3

目录 1. 下载 GNS31.1 GitHub下载&#xff08;推荐&#xff09;1.2 官方下载&#xff08;示例&#xff09; 2. 安装GNS3&#xff08;1&#xff09;进入GNS3设置界面&#xff08;2&#xff09;许可协议&#xff08;3&#xff09;选择启动目录文件夹&#xff08;4&#xff09;选择…

ArcGIS中制作一张985、211院校分布图

一、数据来源及介绍 1.985、211院校名录 985、211院校名录主要来源于网络。 2.行政边界数据 行政边界数据来源于环境资源科学与数据中心&#xff08;中国科学院资源环境科学与数据中心 (resdc.cn)&#xff09;&#xff0c;该网站包含我们国家任何一个省市的行政边界&#xf…

2024考研408-计算机组成原理第二章-数据的表示

文章目录 一、数制与编码1.1、进位计数制1.1.1、计数方法&#xff08;最古老计数方法、十进制计数、r进制计数&#xff09;1.1.2、进制转换①任意进制转为十进制②二进制转八进制、十六进制③八进制、十六进制转二进制④十进制转任意进制&#xff08;包含整数、小数&#xff0c…

Python绘图神器Plotly安装、使用及导出图像教程

1. Plotly安装 Plotly 是一个快速完善并崛起的交互式的、开源的绘图库库&#xff0c;Python 库则是它的一个重要分支。现已支持超过40种独特的图表类型&#xff0c;涵盖了广泛的统计、金融、地理、科学和三维用例。 Python 中可以使用 pip 或者 conda 安装 Plotly&#xff1a…

使用校园账号登录WOS(Web of Science)并检索文献

使用校园账号登录WOS&#xff08;Web of Science&#xff09;并检索文献 写在最前面登录WOS检索文献文献检索文献检索结果分析文章类型&#xff08;Document Types&#xff09;发表年份&#xff08;Publication years&#xff09;期刊&#xff08;Publication/Source Titles&am…

chatgpt赋能python:Python中n个数相加–实现简单、计算准确

Python中n个数相加 – 实现简单、计算准确 Python是一门功能强大的编程语言&#xff0c;能够在各个领域得到广泛应用。在数据处理和科学领域&#xff0c;Python是最受欢迎的编程语言之一。在Python中&#xff0c;n个数相加是一种常见的操作&#xff0c;它可以在数据处理中做到…

计算机网络六 应用层

应用层 网络应用模型 客户/服务器模型(C/S) 客户/服务器模型是一种常见的网络应用模型。客户端是指与用户直接交互的计算机应用程序&#xff0c;服务器则是提供服务的计算机系统或应用程序。在客户/服务器模型中&#xff0c;客户端发送请求&#xff0c;服务器端回应请求。客户…

Redis7实战加面试题-高阶篇(案例落地实战bitmap/hyperloglog/GEO)

案例落地实战bitmap/hyperloglog/GEO 面试题&#xff1a; 抖音电商直播&#xff0c;主播介绍的商品有评论&#xff0c;1个商品对应了1系列的评论&#xff0c;排序展现取前10条记录 用户在手机App上的签到打卡信息:1天对应1系列用户的签到记录&#xff0c;新浪微博、钉钉打卡签…

ADC和DAC常用的56个技术术语

采集时间 采集时间是从释放保持状态(由采样-保持输入电路执行)到采样电容电压稳定至新输入值的1 LSB范围之内所需要的时间。采集时间(Tacq)的公式如下&#xff1a; ​混叠 根据采样定理&#xff0c;超过奈奎斯特频率的输入信号频率为“混叠”频率。也就是说&#xff0c;这些频…

一图看懂 importlib_metadata 模块:用于提供第三方访问Python包的元数据的库,资料整理+笔记(大全)

本文由 大侠(AhcaoZhu)原创&#xff0c;转载请声明。 链接: https://blog.csdn.net/Ahcao2008 一图看懂 importlib_metadata 模块&#xff1a;用于提供第三方访问Python包的元数据的库&#xff0c;资料整理笔记&#xff08;大全&#xff09; &#x1f9ca;摘要&#x1f9ca;模块…

rpm 方式部署 MongoDB

文章目录 rpm 方式部署 MongoDB1. 下载 rpm 包2. 上传到服务器3. 执行安装4. 启动5. 登陆6. 开启远程登陆7. 测试远程登陆8. 开启 auth 认证9. 远程登陆验证 rpm 方式部署 MongoDB 参考地址&#xff1a;https://blog.csdn.net/baidu_23491131/article/details/127664931 1. 下载…