Java基础之《netty(25)—handler链调用机制》

news2025/1/12 4:08:45

一、netty的handler的调用机制

1、使用自定义的编码器和解码器来说明netty的handler调用机制。
客户端发送long -> 服务器
服务端发送long -> 客户端

2、案例

二、客户端发送给服务端

1、服务端
NettyServer.java

package netty.inboundhandlerAndOutboundhandler;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
	public static void main(String[] args) {
		
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup(8);
		
		try {
			ServerBootstrap bootstrap = new ServerBootstrap();
			bootstrap.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(new NettyServerInitializer()); //自定义一个初始化类
			
			ChannelFuture cf = bootstrap.bind(7000).sync();
			cf.channel().closeFuture().sync();
			
		} catch (Exception e) {
			e.printStackTrace();
			
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}

NettyServerInitializer.java

package netty.inboundhandlerAndOutboundhandler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		
		ChannelPipeline pipeline = ch.pipeline();
		
		//入站的handler进行解码 MyByteToLongDecoder
		pipeline.addLast(new MyByteToLongDecoder());
		pipeline.addLast(new NettyChannelHandler());
	}

}

MyByteToLongDecoder.java

package netty.inboundhandlerAndOutboundhandler;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class MyByteToLongDecoder extends ByteToMessageDecoder {

	/**
	 * decode方法:会根据接收的数据,被调用多次,直到确定没有新的元素被添加到List
	 * ,或者是ByteBuf没有更多的可读字节为止
	 * 如果List out不为空,就会将List的内容传递给下一个channelInboundHandler处理
	 * 
	 * ctx 上下文对象
	 * in 入站的ByteBuf
	 * out List集合,将解码后的数据传给下一个handler(解析出一个Long就像下一个handler传递处理了)
	 */
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		System.out.println("MyByteToLongDecoder decode 被调用");
		//因为long占8个字节,需要判断有8个字节,才能读取一个long
		if (in.readableBytes() >= 8) {
			//两端约定好协议 比如是一次读8 就把数据对齐到8的倍数
			//不够8的倍数就填充补齐 然后传过去的信息带有总共大小 和填充数据大小就行了
			out.add(in.readLong());
		}
	}

}

NettyChannelHandler.java

package netty.inboundhandlerAndOutboundhandler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class NettyChannelHandler extends SimpleChannelInboundHandler<Long> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
		System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long " + msg);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.channel().close();
	}
}

2、客户端
NettyClient.java

package netty.inboundhandlerAndOutboundhandler;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
	public static void main(String[] args) {
		EventLoopGroup group = new NioEventLoopGroup();
		
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(group) //设置线程组
				.channel(NioSocketChannel.class)
				.handler(new NettyClientInitializer()); //自定义一个初始化对象
			
			ChannelFuture cf = bootstrap.connect("127.0.0.1", 7000).sync();
			cf.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			group.shutdownGracefully();
		}
	}
}

NettyClientInitializer.java

package netty.inboundhandlerAndOutboundhandler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

	/**
	 * 出站的handler从后往前调用,因为你pipeline是用addLast加在最后,入站是从前往后,出站就是从后往前
	 */
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		
		ChannelPipeline pipeline = ch.pipeline();
		
		//加入一个出站的handler,对数据进行一个编码
		pipeline.addLast(new MyLongToByteEncoder());
		//加入一个自定义的handler,处理业务逻辑
		pipeline.addLast(new NettyClientHandler());
	}

}

MyLongToByteEncoder.java

package netty.inboundhandlerAndOutboundhandler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {

	/**
	 * 编码的方法
	 */
	@Override
	protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
		System.out.println("MyLongToByteEncoder encode 被调用");
		System.out.println("msg=" + msg);
		out.writeLong(msg);
	}

}

NettyClientHandler.java

package netty.inboundhandlerAndOutboundhandler;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

public class NettyClientHandler extends SimpleChannelInboundHandler<Long> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
		
	}

	//重写channelActive 发送数据
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("NettyClientHandler 发送数据");
		ctx.writeAndFlush(123456L); //发送的是一个long
		
		//如果这里传字符串
		//ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));
		//分析
		//1. "abcdabcdabcdabcd"是16个字节
		//2. 该处理器的前一个handler 是 MyLongToByteEncoder
		//3. MyLongToByteEncoder 父类是 MessageToByteEncoder
		//4. 父类有一个write方法,会判断msg的类型是不是自己要处理的,如果不是就写出去
		//5. 因此我们编写Encoder时要注意,传入的数据类型和处理的数据类型一致
		/*
		   try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
		 */
	}
}

三、执行结果

1、服务端

MyByteToLongDecoder decode 被调用
从客户端/127.0.0.1:62579读取到long 123456

2、客户端

NettyClientHandler 发送数据
MyLongToByteEncoder encode 被调用
msg=123456

四、服务端发送给客户端

客户端增加一个Decoder,服务端增加一个Encoder。

1、修改服务端代码
NettyChannelHandler.java
添加给客户端回送信息

package netty.inboundhandlerAndOutboundhandler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class NettyChannelHandler extends SimpleChannelInboundHandler<Long> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
		System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long " + msg);
		
		//给客户端回送一个Long
		ctx.writeAndFlush(98765L); //writeAndFlush是ChannelOutboundInvoker的方法
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.channel().close();
	}
}

NettyServerInitializer.java
增加返回编码器

package netty.inboundhandlerAndOutboundhandler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		
		ChannelPipeline pipeline = ch.pipeline();
		
		//入站的handler进行解码 MyByteToLongDecoder
		pipeline.addLast(new MyByteToLongDecoder());
		
		//增加返回编码器
		pipeline.addLast(new MyLongToByteEncoder());
		
		//编码解码位置颠倒无所谓,但必须在handler上面
		pipeline.addLast(new NettyChannelHandler());
	}

}

2、修改客户端代码
NettyClientHandler.java
添加接收服务端信息channelRead0接口

package netty.inboundhandlerAndOutboundhandler;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

public class NettyClientHandler extends SimpleChannelInboundHandler<Long> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
		System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
		System.out.println("收到服务器消息=" + msg);
	}

	//重写channelActive 发送数据
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("NettyClientHandler 发送数据");
		ctx.writeAndFlush(123456L); //发送的是一个long
		
		//如果这里传字符串
		//ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));
		//分析
		//1. "abcdabcdabcdabcd"是16个字节
		//2. 该处理器的前一个handler 是 MyLongToByteEncoder
		//3. MyLongToByteEncoder 父类是 MessageToByteEncoder
		//4. 父类有一个write方法,会判断msg的类型是不是自己要处理的,如果不是就写出去
		//5. 因此我们编写Encoder时要注意,传入的数据类型和处理的数据类型一致
		/*
		   try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
		 */
	}
}

NettyClientInitializer.java
增加一个入站的解码器

package netty.inboundhandlerAndOutboundhandler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

	/**
	 * 出站的handler从后往前调用,因为你pipeline是用addLast加在最后,入站是从前往后,出站就是从后往前
	 */
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		
		ChannelPipeline pipeline = ch.pipeline();
		
		//加入一个出站的handler,对数据进行一个编码
		pipeline.addLast(new MyLongToByteEncoder());
		
		//增加一个入站的解码器
		pipeline.addLast(new MyByteToLongDecoder());
		
		//加入一个自定义的handler,处理业务逻辑
		pipeline.addLast(new NettyClientHandler());
	}

}

五、执行结果

1、服务端

MyByteToLongDecoder decode 被调用
从客户端/127.0.0.1:64909读取到long 123456
MyLongToByteEncoder encode 被调用
msg=98765

2、客户端

NettyClientHandler 发送数据
MyLongToByteEncoder encode 被调用
msg=123456
MyByteToLongDecoder decode 被调用
服务器的ip=/127.0.0.1:7000
收到服务器消息=98765

六、ChannelPipeline类中的图

 * <pre>
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+
 * </pre>

七、结论

1、不论解码器handler还是编码器handler,即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行。

2、在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果和期望结果可能不一致。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/169996.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++】从0到1入门C++编程学习笔记 - 基础入门篇:程序流程结构

文章目录一、选择结构1.1 if 语句1.2 三目运算符1.3 switch语句二、循环结构2.1 while 循环语句2.2 do...while 循环语句2.3 for 循环语句2.4 嵌套循环三、跳转语句3.1 break 语句3.2 continue 语句3.3 goto 语句C/C支持最基本的三种程序运行结构&#xff1a;顺序结构、选择结构…

MySQL进阶——优化

1、选择最合适的字段属性 Mysql是一种关系型数据库&#xff0c;可以很好地支持大数据量的存储&#xff0c;但是一般来说&#xff0c;数据库中的表越小&#xff0c;在它上面执行的查询也就越快。因此&#xff0c;在创建表的时候&#xff0c;为了获得更好的性能&#xff0c;我们…

腾讯云HiFlow场景连接器 联动对象存储企业网盘,打通数据分发“最后一公里”

对云厂商和企业用户来说&#xff0c;随着数据规模的快速增长&#xff0c;企业除了对存储功能和性能的要求不断增加&#xff0c;也越来越注重数据分发的效率。在传统数据分发的过程中&#xff0c;数据管理员往往需要先在存储桶下载对应的客户方案/交付资料&#xff0c;再使用微信…

LINUX软中断-softirq

前言 关于linux的软中断的文章&#xff0c;在网上可以找到很多&#xff0c;但总觉着讲的都不够深入&#xff0c;打算自己写一下 软中断的感性认识 中断一旦被触发&#xff0c;本地cpu正在运行的不管是什么程序都要让路&#xff0c;让中断程序执行并且执行过程中不能被打断。…

分布式事务问题

4.2 分布式事务问题 4.2.1 什么是分布式事务 一次课程发布操作需要向数据库、redis、elasticsearch、MinIO写四份数据&#xff0c;这里存在分布式事务问题。 什么是分布式事务&#xff1f; 首先理解什么是本地事务&#xff1f; 平常我们在程序中通过spring去控制事务是利用…

Linux---进程优先级

目录 基本概念 查看系统进程 PRI and NI 用top命令更改已存在进程的nice&#xff1a; 其他概念 基本概念 cpu资源分配的先后顺序&#xff0c;就是指进程的优先权&#xff08;priority&#xff09;。 优先权高的进程有优先执行权利。配置进程优先权对多任务环境的linux很…

JNPF 3.4.5 快速开发框架源码目录截图 Gitee代码托管和研发协作平台

Gitee Gitee 除了提供最基础的 Git 代码托管之外&#xff0c;还提供代码在线查看、历史版本查看、Fork、Pull Request、打包下载任意版本、Issue、Wiki 、保护分支、代码质量检测、PaaS项目演示等方便管理、开发、协作、共享的功能。 作为一个应用项目&#xff0c;一般会有一…

flink环境参数引起的错误

环境参数&#xff1a;flink使用的版本是1.13.5、CentOS Linux 8一&#xff0c;默认环境引起本地与集群的jar包冲突遇到的情况是在idea执行的时候是没有问题的&#xff0c;然后打成jar包用集群执行的时候就会遇到问题。报错的时候会不太一&#xff0c;总之顺着错误去找的话会找到…

【力学性能预测】基于人工神经网络的钢板力学性能预测(附完整代码和数据集,系列3)

写在前面: 首先感谢兄弟们的订阅,让我有创作的动力,在创作过程我会尽最大能力,保证作品的质量,如果有问题,可以私信我,让我们携手共进,共创辉煌。 Hello,大家好,我是augustqi。今天手把手带大家做一个机器学习实战项目:基于人工神经网络的钢板力学性能预测,或者称…

文本生成视频、AI临床知识理解、大模型有趣案例、智源社区《预训练周刊》第70期...

No.70智源社区预训练组预训练研究观点资源活动周刊订阅《预训练周刊》已经开启“订阅功能”&#xff0c;扫描下面二维码&#xff0c;进入《预训练周刊》主页&#xff0c;选择“关注TA”&#xff0c;即可收到推送消息。关于周刊本期周刊&#xff0c;我们选择了12篇来自国内外知名…

《机器人SLAM导航核心技术与实战》第1季:第4章_机器人传感器

视频讲解 【第1季】4.第4章_机器人传感器-视频讲解 【第1季】4.1.第4章_机器人传感器_惯性测量单元-视频讲解 【第1季】4.2.第4章_机器人传感器_激光雷达-视频讲解 【第1季】4.3.第4章_机器人传感器_相机-视频讲解 【第1季】4.4.第4章_机器人传感器_带编码器的减速电机-视频…

Python机器学习:数据探索与可视化(一)

什么是数据探索&#xff1f; 在前面我们说到&#xff0c;所谓机器学习&#xff0c;就是用已知的数据通过算法去预测未来未知的数据。但是这个过程进行的前提就是要保证已知数据的完成性。所以数据探索&#xff0c;就是检查数据是否完整&#xff0c;是否有缺失值。 什么是可视化…

【安全研究】基于OPA和Spring Security的外部访问控制

译者导读 CNCF的毕业项目Open Policy Agent&#xff08;OPA&#xff09;, 为策略决策需求提供了一个统一的框架与服务。它将策略决策从软件业务逻辑中解耦剥离&#xff0c;将策略定义、决策过程抽象为通用模型&#xff0c;实现为一个通用策略引擎&#xff0c;可适用于广泛的业…

阿里云对话 Tapdata:「开发者优先」正在影响商业化软件的开源选择

在刚刚过去的2022年&#xff0c;Tapdata 带着开源项目 PDK&#xff08;Plugin Development Kit&#xff09;及 Tapdata Community 和大家见面&#xff0c;兑现了我们对自己以及开发者们的开源承诺&#xff0c;同时与阿里云等生态伙伴联合&#xff0c;加速构建更加开放的数据生态…

Linux基础 - DNS服务进阶

‍‍&#x1f3e1;博客主页&#xff1a; Passerby_Wang的博客_CSDN博客-系统运维,云计算,Linux基础领域博主&#x1f310;所属专栏&#xff1a;『Linux基础』&#x1f30c;上期文章&#xff1a; Linux基础 - DNS服务基础&#x1f4f0;如觉得博主文章写的不错或对你有所帮助的话…

贪心策略(三)多机调度问题、活动选择(库函数sort的整理)

把sort库函数的使用总结一下&#xff1a; 1、头文件#include<algorithm> 时间复杂度nlog(n) 2、使用格式 sort&#xff08;arr.begin(), arr.end()&#xff09;&#xff1b; 3、默认使用升序排序&#xff0c;第三个参数默认使用less<T>() 4、如果需要进行降序排序…

springcloud + nacos多环境联调、本地联调(即灰度版本)

背景&#xff1a;当我们使用nacos为注册中心注册微服务时&#xff0c;想本地环境和测试环境公用一个nacos&#xff0c;即注册中心等基础服务共用。当我们在服务A开发时&#xff0c;本地服务和测试环境服务都是注册到同一个nacos&#xff0c;由于nacos自带负载均衡策略&#xff…

小程序开发经验分享(9)小程序快速上线汇总

微信小程序申请 开发中的Appid 需要从“微信公众平台”中获取 如果是直接从git上拉取的话 直接项目导入就可以了(名称可以是中文) 小程序基础配置 如果需要修改显示的名称和appid可以去生成的配置文件project.config.json里面修改

前端特效之毛玻璃-倾斜-日历

前端特效之毛玻璃-倾斜-日历描述项目效果index.htmlindex.css描述 项目描述开发语言HTML、JavaScript、CSS库dyCalendarJS、vanilla-tilt 该项目中需要使用到的库有&#xff1a; dyCalendarJS vanilla-tilt.js 是 JavaScript 中的一个平滑的 3D 倾斜库。vanilla-tilt dyCalen…

MS SQL Server 日志审核工具

手动审核数据库活动是一项艰巨的任务。有效实现这一目标的最佳方法是使用全面的解决方案来简化和自动化数据库和活动监控。该解决方案还应使数据库管理员能够监控、跟踪、即时识别任何操作问题的根本原因&#xff0c;并实时检测对机密数据的未经授权的访问。 审核 Microsoft S…