Netty的ReplayingDecoder分析

news2025/1/11 6:56:43

说明

  • io.netty.handler.codec.ReplayingDecoder是io.netty.handler.codec.ByteToMessageDecoder的一个子类,是一个抽象类,它将字节流解码成其它的消息。
  • 需要ReplayingDecoder的子类实现decode(ChannelHandlerContext ctx, ByteBuf in, List out)这个函数,进行具体的解码。
  • ReplayingDecoder和ByteToMessageDecoder最大的不同是它允许子类实现函数decode() 和decodeLast()的时候,就好象需要的字节已经全部接收到一样,而不需要显式判断需要的字节是否已经全部接收到。
    例如,如果直接继承ByteToMessageDecoder类,decode() 函数的实现会类似下面的形式:
public class ServerBytesFramerDecoder  extends ByteToMessageDecoder {

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {	
		if (in.readableBytes() < 12) {
			return;
		}		
		out.add(in.readBytes(12));		
	}
}

而如果直接继承ReplayingDecoder类,decode() 函数的实现会类似下面的形式:

public class ServerBytesFramerDecoder  extends ReplayingDecoder<Void> {

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {			
		out.add(in.readBytes(12));		
	}
}
  • ReplayingDecoder之所以能做到上面那样,是因为ReplayingDecoder传递了一个特别的ByteBuf实现类ReplayingDecoderByteBuf:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {

    static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class, "REPLAY");

    private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf();
    private S state;
    private int checkpoint = -1;
final class ReplayingDecoderByteBuf extends ByteBuf {

    private static final Signal REPLAY = ReplayingDecoder.REPLAY;

    private ByteBuf buffer;
    private boolean terminated;
    private SwappedByteBuf swapped;
  • 当没有足够的字节的时候,ReplayingDecoderByteBuf会抛出一个Error类型,控制又回到ReplayingDecoder。这个Error类型是Signal:
public final class Signal extends Error implements Constant<Signal> {

    private static final long serialVersionUID = -221145131122459977L;
  • 当ReplayingDecoder捕获到Error后,会将buffer的readerIndex重置回初始的位置(也就是buffer的开始),当新的数据被接收到buffer以后,又会调用decode函数。

示例

一个简单的正常示例

本示例的验证场景:
在这个示例中,客户端发送了12个字节的数据,服务端的ServerBytesFramerDecoder解析成功,传递给了后续的ServerRegisterRequestHandler。

服务端代码片段

package com.thb.power.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * 服务端的主函数
 * @author thb
 *
 */
public class MainStation {
	
	static final int PORT = Integer.parseInt(System.getProperty("port", "22335"));

	public static void main(String[] args) throws Exception {
		// 配置服务器
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b =  new ServerBootstrap();
			b.group(bossGroup, workerGroup)
			 .channel(NioServerSocketChannel.class)
			 .option(ChannelOption.SO_BACKLOG, 100)
			 .handler(new LoggingHandler(LogLevel.INFO))
			 .childHandler(new MainStationInitializer());
			
			// 启动服务端
			ChannelFuture f = b.bind(PORT).sync();
			
			// 等待直到server socket关闭
			f.channel().closeFuture().sync();
		} finally {
			// 关闭所有event loops以便终止所有的线程
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

}
package com.thb.power.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class MainStationInitializer extends ChannelInitializer<SocketChannel> {

	 @Override
	 public void initChannel(SocketChannel ch) throws Exception {
		 ChannelPipeline p = ch.pipeline();		
		 
		 p.addLast(new LoggingHandler(LogLevel.INFO));
		 p.addLast(new ServerBytesFramerDecoder());	
		 p.addLast(new ServerRegisterRequestHandler());
	 }
}
package com.thb.power.server;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

public class ServerBytesFramerDecoder  extends ReplayingDecoder<Void> {

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {				
		out.add(in.readBytes(12));		
	}
}
package com.thb.power.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerRegisterRequestHandler extends ChannelInboundHandlerAdapter {
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		ByteBuf m = (ByteBuf)msg;
		System.out.println("ServerRegisterRequestHandler: readableBytes: " + m.readableBytes());
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

启动服务端

在这里插入图片描述

启动客户端

在这里插入图片描述

从客户端发送12个字节的业务数据

在这里插入图片描述

观察服务端的输出

在这里插入图片描述
从上面服务端的输出可以看出,ServerRegisterRequestHandler收到了12个字节的数据。而ServerRegisterRequestHandler在ServerBytesFramerDecoder的后面,所以这12个字节的数据是ServerBytesFramerDecoder解析出来传递过来的。当时在ChannelPipeline添加的ChannelHandler的顺序:

 @Override
 public void initChannel(SocketChannel ch) throws Exception {
	 ChannelPipeline p = ch.pipeline();		
	 
	 p.addLast(new LoggingHandler(LogLevel.INFO));
	 p.addLast(new ServerBytesFramerDecoder());	
	 p.addLast(new ServerRegisterRequestHandler());
 }

客户端发送的数据少于服务端ReplayingDecoder实现类要求接收的数据

本示例的验证场景:
在这个示例中,客户端发送了12个字节的数据,服务端的ServerBytesFramerDecoder要求接收100个字节的数据,所以没有接收到足够的数据,导致后续的ServerRegisterRequestHandler没有接收到数据。

服务端代码片段

package com.thb.power.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * 服务端的主函数
 * @author thb
 *
 */
public class MainStation {
	
	static final int PORT = Integer.parseInt(System.getProperty("port", "22335"));

	public static void main(String[] args) throws Exception {
		// 配置服务器
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b =  new ServerBootstrap();
			b.group(bossGroup, workerGroup)
			 .channel(NioServerSocketChannel.class)
			 .option(ChannelOption.SO_BACKLOG, 100)
			 .handler(new LoggingHandler(LogLevel.INFO))
			 .childHandler(new MainStationInitializer());
			
			// 启动服务端
			ChannelFuture f = b.bind(PORT).sync();
			
			// 等待直到server socket关闭
			f.channel().closeFuture().sync();
		} finally {
			// 关闭所有event loops以便终止所有的线程
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

}
package com.thb.power.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class MainStationInitializer extends ChannelInitializer<SocketChannel> {

	 @Override
	 public void initChannel(SocketChannel ch) throws Exception {
		 ChannelPipeline p = ch.pipeline();		
		 
		 p.addLast(new LoggingHandler(LogLevel.INFO));
		 p.addLast(new ServerBytesFramerDecoder());	
		 p.addLast(new ServerRegisterRequestHandler());
	 }
}
package com.thb.power.server;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

public class ServerBytesFramerDecoder  extends ReplayingDecoder<Void> {

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
	    //out.add(in.readBytes(12));				
		out.add(in.readBytes(100));		
	}
}
package com.thb.power.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerRegisterRequestHandler extends ChannelInboundHandlerAdapter {
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		ByteBuf m = (ByteBuf)msg;
		System.out.println("ServerRegisterRequestHandler: readableBytes: " + m.readableBytes());
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

启动服务端

在这里插入图片描述

启动客户端,并发送12个字节的数据

在这里插入图片描述

观察服务端的输出

在这里插入图片描述
从上面服务端的输出可以发现,LoggingHandler是收到了12个字节的数据,但ServerRegisterRequestHandler没有接收到数据。这是因为ServerBytesFramerDecoder没有接收到足够的数据(期望接收100个字节),也就没有传递给后续的ServerRegisterRequestHandler。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/859246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Selenium 自动化 | 案例实战篇

Chrome DevTools 简介 Chrome DevTools 是一组直接内置在基于 Chromium 的浏览器&#xff08;如 Chrome、Opera 和 Microsoft Edge&#xff09;中的工具&#xff0c;用于帮助开发人员调试和研究网站。 借助 Chrome DevTools&#xff0c;开发人员可以更深入地访问网站&#xf…

恒盛策略:快跌慢涨是主力洗盘?

当股市一直处于震荡状态&#xff0c;不断重复时。许多股民纷纷开端猜想股市未来走势&#xff0c;同时也有不少人议论着什么是“主力洗盘”和“快跌慢涨”。这儿&#xff0c;咱们来从多个视点来剖析这个问题。 首要&#xff0c;咱们需要了解“主力洗盘”和“快跌慢涨”两个概念。…

leetcode 475. 供暖器(java)

供暖器 供暖器题目描述双指针代码演示 双指针专题 供暖器 难度 - 中等 leetcode 475 题目描述 冬季已经来临。 你的任务是设计一个有固定加热半径的供暖器向所有房屋供暖。 在加热器的加热半径范围内的每个房屋都可以获得供暖。 现在&#xff0c;给出位于一条水平线上的房屋 ho…

window下部署Yapi接口管理系统部署总结

window下部署Yapi接口管理系统部署总结 YApi 是高效、易用、功能强大的 api 管理平台&#xff0c;旨在为开发、产品、测试人员提供更优雅的接口管理服务。可以帮助开发者轻松创建、发布、维护 API&#xff0c;YApi 还为用户提供了优秀的交互体验&#xff0c;开发人员只需利用平…

使用几何和线性代数从单个图像进行 3D 重建

使用几何和线性代数从单个图像进行 3D 重建 萨蒂亚 一、说明 3D重构是一个挑战性题目&#xff0c;而且这个新颖的题目正处于启发和膨胀阶段&#xff1b;因此&#xff0c;各种各样的尝试层出不穷&#xff0c;本篇说明尝试的一种&#xff0c;至于其它更多的尝试&#xff0c;我们在…

uniapp+vue3项目中使用vant-weapp

创建项目 通过vue-cli命令行创建项目 Vue3/Vite版要求 node 版本^14.18.0 || >16.0.0 uni-app官网 (dcloud.net.cn) npx degit dcloudio/uni-preset-vue#vite my-vue3-project打开项目 点击顶部菜单栏终端/新建终端 执行安装依赖指令 yarn install 或 npm install 安装vant…

AI语音工牌在通讯行业营业大厅场景应用

在运营商营业大厅中&#xff0c;每天都有大量的客户来访咨询、办理业务。同时也会经常产生大量的客诉纠纷和服务差评。但因为缺乏有效的管理工具&#xff0c;加上线下沟通场景的数据采集难度高&#xff0c;数字化程度低&#xff0c;管理一直处于盲区。如何有效的管控营业厅人员…

从三个主要需求市场分析,VR全景创业的潜力发展

VR全景&#xff0c;5G时代朝阳产业&#xff0c;其实拍摄制作很简单&#xff0c;就是利用一套专业的相机设备去给商家拍摄&#xff0c;结合后期专业的3DVR全景展示拍摄制作平台&#xff0c;打造3D立体环绕的效果&#xff0c;将线下商家真实环境1&#xff1a;1还原到线上&#xf…

从C语言到C++_31(unordered_set和unordered_map介绍+哈希桶封装)

目录 1. unordered_set和unordered_map 1.1 unordered_map 1.2 unordered_set 1.3 unordered系列写OJ题 961. 在长度 2N 的数组中找出重复 N 次的元素 - 力扣&#xff08;LeetCode&#xff09; 349. 两个数组的交集 - 力扣&#xff08;LeetCode&#xff09; 217. 存在重…

NIO 非阻塞式IO

NIO Java NIO 基本介绍 Java NIO 全称 Java non-blocking IO&#xff0c;是指 JDK 提供的新 API。从 JDK1.4 开始&#xff0c;Java 提供了一系列改进的输入/输出的新特性&#xff0c;被统称为 NIO&#xff08;即 NewIO&#xff09;&#xff0c;是同步非阻塞的。NIO 相关类都被…

AIGC 浪潮下,鹅厂新一代前端人的真实工作感受

点击链接了解详情 原创作者&#xff1a;张波 腾小云导读 AIGC 这一时代潮流已然不可阻挡&#xff0c;我们要做的不是慌乱&#xff0c;而是把握住这个时代的机会。本文就和大家一起来探索在 AIGC 下&#xff0c;前端工程师即将面临的挑战和机遇。聊聊从以前到现在&#xff0c;A…

诸神之战:数字时代的低代码服务商与代理商究竟谁更强?

随着数字化转型浪潮的推进&#xff0c;企业对数字化应用开发的需求迅速增长。低代码作为一种新的软件开发范式&#xff0c;以其可视化和快速构建应用的能力&#xff0c;被广泛应用于成千上万家企业中。当低代码行业的逐渐发展成熟&#xff0c;越来越多的人看到了低代码的商业价…

使用乐观锁解决超卖问题

目录 什么是超卖&#xff1f; 乐观锁和悲观锁的定义 悲观锁&#xff1a; 乐观锁&#xff1a; 乐观锁的实现方式 1.版本号 2.CAS法 什么是超卖&#xff1f; 举个例子&#xff1a;订单系统中&#xff0c;用户在执行下单操作时&#xff0c;可能同一时间有无数个用户同时下单&…

平替版Airtag

Airtag是什么&#xff1f; AirTag是苹果公司设计的一款定位神奇&#xff0c;它通过一款纽扣电池进行供电&#xff0c;即可实现长达1-2年的关键物品的定位、查找的功能。 按照苹果公司自己的话说—— 您“丢三落四这门绝技&#xff0c;要‍失‍传‍了”。 AirTag 可帮你轻松追…

USB(二):Type-C

一、引脚定义 Type-C口有 4对TX/RX差分线&#xff0c;2对USB D/D-&#xff0c;1对SBU&#xff0c;2个CC&#xff0c;4个VBUS和4个地线Type-C母座视图&#xff1a; Type-C公头视图&#xff1a; 二、关键名词 DFP(Downstream Facing Port)&#xff1a; 下行端口&#xff0c…

【云原生】Pod的进阶

目录 一、资源限制二、重启策略三、健康检查 &#xff0c;又称为探针&#xff08;Probe&#xff09;3.1示例1&#xff1a;exec方式3.2示例2&#xff1a;httpGet方式3.3示例3&#xff1a;tcpSocket方式3.4示例4&#xff1a;就绪检测3.5示例5&#xff1a;就绪检测2 四、启动、退出…

设置VsCode 将打开的多个文件分行(栏)排列,实现全部显示

目录 1. 前言 2. 设置VsCode 多文件分行(栏)排列显示 1. 前言 主流编程IDE几乎都有排列切换选择所要查看的文件功能&#xff0c;如下为Visual Studio 2022的该功能界面&#xff1a; 图 1 图 2 当在Visual Studio 2022打开很多文件时&#xff0c;可以按照图1、图2所示找到自…

价格监测与数据分析的关系

所谓的价格监测&#xff0c;其实可以理解为是低价数据的监测&#xff0c;当监测价格时&#xff0c;其他页面上的商品数据也会被同时采集监测&#xff0c;如标题、库存、销量、评价等内容&#xff0c;所以品牌在做电商价格监测时&#xff0c;其实也可以对数据进行分析。 力维网络…

【React学习】—jsx语法规则(三)

【React学习】—jsx语法规则&#xff08;三&#xff09; 一、jsx语法规则&#xff1a; 1、定义虚拟DOM&#xff0c;不要写引号&#xff0c; 2、标签中混入JS表达式要用{} 3、样式的类名指定不要用class&#xff0c;要用className 4、内联样式&#xff0c;要用style{{key:value}…

linux环形缓冲区kfifo实践2:配合等待队列使用

基础 struct __wait_queue_head {spinlock_t lock;struct list_head task_list; }; typedef struct __wait_queue_head wait_queue_head_t; 初始化等待队列&#xff1a;init_waitqueue_head 深挖init_waitqueue_head宏的定义可知&#xff0c;传递给它的参数q是一个wait_queu…