Netty实战(六)

news2024/11/25 1:03:19

ChannelHandler和ChannelPipeline

  • 一、ChannelHandler
    • 1.1 Channel 的生命周期
    • 1.2 ChannelHandler 的生命周期
    • 1.3 ChannelInboundHandler 接口
    • 1.4 ChannelOutboundHandler 接口
    • 1.5 ChannelHandler 适配器
    • 1.6 资源管理
  • 二、ChannelPipeline 接口
    • 2.1 修改 ChannelPipeline
    • 2.2 触发事件
  • 三、ChannelHandlerContext 接口
    • 3.1 使用 ChannelHandlerContext
    • 3.2 ChannelHandler 和 ChannelHandlerContext 的高级用法
  • 四、异常处理
    • 4.1 处理入站异常
    • 4.2 处理出站异常

一、ChannelHandler

1.1 Channel 的生命周期

Channel 主要有四个生命周期向下表所示:

状 态描 述
ChannelUnregisteredChannel 已经被创建,但还未注册到 EventLoop
ChannelRegisteredChannel 已经被注册到了 EventLoop
ChannelActiveChannel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
ChannelInactiveChannel 没有连接到远程节点

当channel的生命周期发生改变后,会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。如图示:

在这里插入图片描述

1.2 ChannelHandler 的生命周期

ChannelHandler 的生命周期发生在ChannelHandler被添加到 ChannelPipeline 中或者被从
ChannelPipeline 中移除时。这些方法中的每一个都接受一个 ChannelHandlerContext 参数。

类 型描 述
handlerAdded当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
handlerRemoved当从 ChannelPipeline 中移除 ChannelHandler 时被调用
exceptionCaught当处理过程中在 ChannelPipeline 中有错误产生时被调用

1.3 ChannelInboundHandler 接口

ChannelInboundHandler用以处理入站数据以及各种状态变化。

下面是一些常用的ChannelInboundHandler方法,它们一般在数据被接收时或者与其对应的 Channel 状态发生改变时被调用。

类 型描 述
channelRegistered当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
channelUnregistered当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用
channelActive当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
channelInactive当 Channel 离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete当Channel上的一个读操作完成时被调用
channelRead当从 Channel 读取数据时被调用
ChannelWritabilityChanged当 Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生 OutOfMemoryError)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用 Channel 的 isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过 Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置
userEventTriggered当 ChannelnboundHandler.fireUserEventTriggered

当某个 ChannelInboundHandler 的实现重写 channelRead()方法时,它将负责显式地释放与池化的ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法 ReferenceCountUtil.release()。

如下面代码所示:

@Sharable
//扩展ChannelInboundHandlerAdapter
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//丢弃读取到的消息
ReferenceCountUtil.release(msg);
}
}

Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用 SimpleChannelInboundHandler。

像下面这个代码:

@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// 不需要任何显式的资源释放
}
}

由于 SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。(PS:Netty实战(二)中我们提到过客户端为什么继承它的原因就是因为它会自动的释放资源)

1.4 ChannelOutboundHandler 接口

ChannelOutboundHandler用以处理出站数据并且允许拦截所有的操作。

当它处理出站操作和数据时它的方法将被 Channel、ChannelPipeline 以及 ChannelHandlerContext 调用。

ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。

下面是ChannelOutboundHandler本身所定义的所有方法:

类 型描 述
bind(ChannelHandlerContext,SocketAddress,ChannelPromise)当请求将 Channel 绑定到本地地址时被调用
connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)当请求将 Channel 连接到远程节点时被调用
disconnect(ChannelHandlerContext,ChannelPromise)当请求将 Channel 从远程节点断开时被调用
close(ChannelHandlerContext,ChannelPromise)当请求关闭 Channel 时被调用
deregister(ChannelHandlerContext,ChannelPromise)当请求将 Channel 从它的 EventLoop 注销时被调用
read(ChannelHandlerContext)当请求从 Channel 读取更多的数据时被调用
flush(ChannelHandlerContext)当请求通过 Channel 将入队数据冲刷到远程节点时被调用
write(ChannelHandlerContext,Object,ChannelPromise)当请求通过 Channel 将数据写到远程节点时被调用

ChannelPromiseChannelFuture ChannelOutboundHandler中的大部分方法都需要一个
ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不可变。

1.5 ChannelHandler 适配器

我们可以使用ChannelInboundHandlerAdapter 和
ChannelOutboundHandlerAdapter类作为自己的 ChannelHandler 的起始点

这两个适配器分别提供了 ChannelInboundHandler和 ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter,它们获得了它们共同的超接口 ChannelHandler 的方法。

像下面这样:
在这里插入图片描述
ChannelHandlerAdapter 还提供了实用方法 isSharable()。如果其对应的实现被标注为 Sharable,那么这个方法将返回 true,表示它可以被添加到多个 ChannelPipeline中。

在 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 中所提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法,从而将事件转发到了 ChannelPipeline 中的下一个ChannelHandler 中。

当我们要想在自己的 ChannelHandler 中使用这些适配器类时,只需要简单地扩展它们,并且重写那些你想要自定义的方法。

1.6 资源管理

每当通过调用 ChannelInboundHandler.channelRead()或者
ChannelOutboundHandler.write()方法来处理数据时,都需要确保没有任何的资源泄漏。

Netty 使用引用计数来处理池化的 ByteBuf。所以在完全使用完某个ByteBuf 后,调整其引用计数是很重要的。

Netty提供了class ResourceLeakDetector 帮助我们诊断潜在的(资源泄漏)问题。它将对你应用程序的缓冲区分配做大约 1%的采样来检测内存泄露。相关的开销是非常小的。

如果检测到了内存泄露,将会产生类似于下面的日志消息:

LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option
'-Dio.netty.leakDetectionLevel=ADVANCED' or call
ResourceLeakDetector.setLevel().

Netty 目前定义了 4 种泄漏检测级别,像下面这样:

级 别描 述
DISABLED禁用泄漏检测。只有在详尽的测试之后才应设置为这个值
SIMPLE使用 1%的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分的情况
ADVANCED使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置
PARANOID类似于 ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大的影响,应该只在调试阶段使用

内存泄露检测级别可以通过将下面的 Java 系统属性设置为表中的一个值来定义:

java -Dio.netty.leakDetectionLevel=ADVANCED

如果带着该 JVM 选项重新启动你的应用程序,你将看到自己的应用程序最近被泄漏的缓冲
区被访问的位置。下面是一个典型的由单元测试产生的泄漏报告:

Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)
...

那么实现 ChannelInboundHandler.channelRead()和 ChannelOutboundHandler.write()方法时,应该如何使用这个诊断工具来防止泄露呢?

1、消费入站消息的简单方式:由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被
称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在消息被 channelRead0()方法消费之后自动释放消息。

看下面代码:

@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {
@Override
//通过调用 ReferenceCountUtil.release() ChannelInboundandlerAdapter方法释放资源
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}

2、在出站方向这边,如果你处理了 write()操作并丢弃了一个消息,那么你也应该负责释放它。

看下面的代码:

@Sharable
public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,Object msg, ChannelPromise promise) {
//通过使用 ReferenceCountUtil.realse(...)方法释放资源
ReferenceCountUtil.release(msg);
//通知资源已经释放了(不仅要释放资源,还要通知 ChannelPromise。否则可能会出现 ChannelFutureListener 收不到某个消息已经被处理了的通知的情况。)
promise.setSuccess();
}
}

如果一个消息被消费或者丢弃了,且没有传递给 ChannelPipeline 中的下个ChannelOutboundHandler,那么用户就有责任调用 ReferenceCountUtil.release()。

如果消息到达了实际的传输层,那么当它被写入时或者 Channel 关闭时,都将被自动释放。

二、ChannelPipeline 接口

ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler 实例链。每一个新创建的
Channel 都将会被分配一个新的 ChannelPipeline。这项关联是永久性的;

Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler。

ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler 交互。 ChannelHandler 可以通知其所属的 ChannelPipeline 中的下一 个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline(这里指修改 ChannelPipeline 中的 ChannelHandler 的编排)。ChannelHandlerContext 具有丰富的用于处理事件和执行 I/O 操作的 API。

在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline 将跳过该ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(ChannelHandler 也可以同时实现 ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)

2.1 修改 ChannelPipeline

ChannelHandler 可以通过添加、删除或者替换其他的 ChannelHandler 来实时地修改ChannelPipeline
的布局。(它也可以将它自己从 ChannelPipeline 中移除。)

下面是一些ChannelHandler修改 ChannelPipeline 的方法:

名 称描 述
AddFirstaddBefore,addAfteraddLast将一个ChannelHandler 添加到ChannelPipeline 中
remove将一个ChannelHandler 从ChannelPipeline 中移除
replace将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler

展示以下如何修改 ChannelPipeline:

ChannelPipeline pipeline = ..;
//创建一个FirstHandler 实例
FirstHandler firstHandler = new FirstHandler();
//将该实例作为handler1添加到ChannelPipeline 
pipeline.addLast("handler1", firstHandler);
//将一个 SecondHandler的实例作为"handler2"添加到 ChannelPipeline的第一个槽中。
//这意味着它将被放置在已有的"handler1"之前
pipeline.addFirst("handler2", new SecondHandler());
//同理,添加到了最后一个槽中
pipeline.addLast("handler3", new ThirdHandler());
...
//通过名称移除"handler3"
pipeline.remove("handler3");
//通过引用移除FirstHandler(它是唯一的,所以不需要它的名称)
pipeline.remove(firstHandler);
//将 SecondHandler("handler2")替换为 FourthHandler:"handler4"
pipeline.replace("handler2", "handler4", new ForthHandler());

ChannelHandler 的执行和阻塞
通常 ChannelPipeline 中的每一个 ChannelHandler 都是通过它的 EventLoop(I/O 线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O 处理产生负面的影响。但有时可能需要与那些使用阻塞 API 的遗留代码进行交互。对于这种情况,ChannelPipeline 有一些接受一个 EventExecutorGroup 的 add()方法。如果一个事件被传递给一个自定义的 EventExecutorGroup,它将被包含在这个 EventExecutorGroup 中的某个 EventExecutor 所处理,从而被从该Channel 本身的 EventLoop 中移除。对于这种用例,Netty 提供了一个叫 DefaultEventExecutorGroup 的默认实现

ChannelPipeline 的用于访问 ChannelHandler 的操作:

名 称描 述
get通过类型或者名称返回 ChannelHandler
context返回和 ChannelHandler 绑定的 ChannelHandlerContext
names返回 ChannelPipeline 中所有 ChannelHandler 的名称

2.2 触发事件

首先是入站:

ChannelPipeline 用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件的入站操作:

名 称描 述
fireChannelRegistered调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext)方法
fireChannelUnregistered调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelUnregistered(ChannelHandlerContext)方法
fireChannelActive调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelActive(ChannelHandlerContext)方法
fireChannelInactive调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelInactive(ChannelHandlerContext)方法
fireExceptionCaught调用 ChannelPipeline 中下一个 ChannelInboundHandler 的exceptionCaught(ChannelHandlerContext, Throwable)方法
fireUserEventTriggered调用 ChannelPipeline 中下一个 ChannelInboundHandler 的userEventTriggered(ChannelHandlerContext, Object)方法
fireChannelRead调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelRead(ChannelHandlerContext, Object msg)方法
fireChannelReadComplete调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法
fireChannelWritabilityChanged调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelWritabilityChanged(ChannelHandlerContext)方法

接着我们看出站:

在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。

ChannelPipeline 的出站操作:

名 称描 述
bind将 Channel 绑定到一个本地地址,这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 bind(ChannelHandlerContext, SocketAddress, ChannelPromise)方法
connect将 Channel 连接到一个远程地址,这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 connect(ChannelHandlerContext, SocketAddress, ChannelPromise)方法
disconnect将Channel 断开连接。这将调用ChannelPipeline 中的下一个ChannelOutboundHandler 的 disconnect(ChannelHandlerContext, Channel Promise)方法
close将 Channel 关闭。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 close(ChannelHandlerContext, ChannelPromise)方法
deregister将 Channel 从它先前所分配的 EventExecutor(即 EventLoop)中注销。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 deregister(ChannelHandlerContext, ChannelPromise)方法
flush冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个ChannelOutboundHandler 的 flush(ChannelHandlerContext)方法
write将消息写入 Channel。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler的write(ChannelHandlerContext, Object msg, ChannelPromise)方法。注意:这并不会将消息写入底层的 Socket,而只会将它放入队列中。要将它写入 Socket,需要调用 flush()或者 writeAndFlush()方法
writeAndFlush这是一个先调用 write()方法再接着调用 flush()方法的便利方法
read请求从 Channel 中读取更多的数据。这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 read(ChannelHandlerContext)方法

总结一下:

  • ChannelPipeline 保存了与 Channel 相关联的 ChannelHandler;
  • ChannelPipeline 可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改;
  • ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件。

三、ChannelHandlerContext 接口

ChannelHandlerContext介绍:

  • ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联。
  • 每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext。
  • ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。

1、如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。

2、而调用位于 ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。

ChannelHandlerContext 的 API汇总:

名 称描 述
alloc返回和这个实例相关联的Channel 所配置的 ByteBufAllocator
bind绑定到给定的 SocketAddress,并返回 ChannelFuture
channel返回绑定到这个实例的 Channel
close关闭 Channel,并返回 ChannelFuture
connect连接给定的 SocketAddress,并返回 ChannelFuture
deregister从之前分配的 EventExecutor 注销,并返回 ChannelFuture
disconnect从远程节点断开,并返回 ChannelFuture
executor返回调度事件的 EventExecutor
fireChannelActive触发对下一个 ChannelInboundHandler 上的channelActive()方法(已连接)的调用
fireChannelInactive触发对下一个 ChannelInboundHandler 上的channelInactive()方法(已关闭)的调用
fireChannelRead触发对下一个 ChannelInboundHandler 上的channelRead()方法(已接收的消息)的调用
fireChannelReadComplete触发对下一个ChannelInboundHandler上的channelReadComplete()方法的调用
fireChannelRegistered触发对下一个 ChannelInboundHandler 上的fireChannelRegistered()方法的调用
fireChannelUnregistered触发对下一个 ChannelInboundHandler 上的fireChannelUnregistered()方法的调用
fireChannelWritabilityChanged触发对下一个 ChannelInboundHandler 上的fireChannelWritabilityChanged()方法的调用
fireExceptionCaught触发对下一个 ChannelInboundHandler 上的fireExceptionCaught(Throwable)方法的调用
fireUserEventTriggered触发对下一个 ChannelInboundHandler 上的fireUserEventTriggered(Object evt)方法的调用
handler返回绑定到这个实例的 ChannelHandlerisRemoved 如果所关联的 ChannelHandler 已经被从 ChannelPipeline中移除则返回 true
name返回这个实例的唯一名称
pipeline返回这个实例所关联的 ChannelPipeline
read将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个channelRead事件,并(在最后一个消息被读取完成后通知ChannelInboundHandler 的 channelReadComplete(ChannelHandlerContext)方法
write通过这个实例写入消息并经过 ChannelPipeline
writeAndFlush通过这个实例写入并冲刷消息并经过 ChannelPipeline

当使用 ChannelHandlerContext 的 API 的时候,请牢记以下两点:

  • ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的
  • 相对于其他类的同名方法,ChannelHandler Context的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

3.1 使用 ChannelHandlerContext

ChannelHandlerContext、Channel 和 ChannelPipeline的关系如图:
在这里插入图片描述

从上面我们可以知道ChannelPipeline 包含着ChannelHandlerContext、Channel。ChannelHandler之间是用ChannelHandlerContext连接。

那么从 ChannelHandlerContext 如何访问 Channel?

看下面代码:

//先获取获取到与 ChannelHandlerContext相关联的 Channel 的引用
ChannelHandlerContext ctx = ..;
Channel channel = ctx.channel();
//通过 Channel 写入"Netty in Action" 到缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));

那么从 ChannelHandlerContext 如何访问 ChannelPipeline?

看下面代码:

//先获取获取到与 ChannelHandlerContext相关联的 ChannelPipeline 的引用
ChannelHandlerContext ctx = ..;
ChannelPipeline pipeline = ctx.pipeline();
//通过 ChannelPipeline 写入"Netty in Action" 到缓冲区
pipeline.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));

可以看到上面两段代码的事件流是一样的,需要注意的是虽然被调用的 Channel 或 ChannelPipeline 上的 write()方法一直传播事件通过整个 ChannelPipeline,但是在 ChannelHandler 的级别上,事件从一个ChannelHandler到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的。

通过 Channel 或者 ChannelPipeline 进行的事件传播示例:

在这里插入图片描述
Channel 或者 ChannelPipeline事件的传播都是通过ChannelHandlerContext来进行,要想调用从某个特定的 ChannelHandler 开始的处理过程,必须获取到在(ChannelPipeline)该 ChannelHandler 之前的 ChannelHandler 所关联的 ChannelHandlerContext。这个 ChannelHandlerContext 将调用和它所关联的 ChannelHandler 之后的ChannelHandler。

例如我们调用 ChannelHandlerContext 的 write()方法。

代码很简单:

//获取ChannelHandlerContext 
ChannelHandlerContext ctx = ..;
//write()方法将把缓冲区数据发送到下一个 ChannelHandler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

那么它内部是咋操作的呢?
在这里插入图片描述
如图消息将从下一个 ChannelHandler 开始流经 ChannelPipeline,绕过了所有前面的 ChannelHandler。

3.2 ChannelHandler 和 ChannelHandlerContext 的高级用法

前面我们说了基本的通过调用 ChannelHandlerContext上的pipeline()方法来获得被封闭ChannelPipeline 的引用,在运行时得以操作ChannelPipeline 的ChannelHandler。

除了这些我们还可以通过将 ChannelHandler 添加到 ChannelPipeline 中来实现动态的协议切换或者将引用缓存到ChannelHandlerContext以供稍后使用。

1、将引用缓存到 ChannelHandlerContext

public class WriteHandler extends ChannelHandlerAdapter {
 private ChannelHandlerContext ctx;
 @Override
 public void handlerAdded(ChannelHandlerContext ctx) {
 //存储到 ChannelHandlerContext的引用以供稍后使用
 this.ctx = ctx;
 }
 public void send(String msg) {
 //使用之前存储的到 ChannelHandlerContext的引用来发送消息
 ctx.writeAndFlush(msg);
 }
} 

因为一个 ChannelHandler 可以从属于多个 ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext 实例。

在多个 ChannelPipeline 中共享同一个 ChannelHandler,对应的 ChannelHandler 必须要使用@Sharable 注解标注;否则,试图将它添加到多个 ChannelPipeline 时将会触发异常(多个Channel使用则必须保证线程安全)。

看个可共享ChannelHandler的例子:

@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 System.out.println("Channel read message: " + msg);
 //记录方法,发送给下一个ChannelHandler的channelRead()
 ctx.fireChannelRead(msg);
 }
} 

注意:拥有状态时@Sharable注解将不能保证线程安全!只应该在确定了你的 ChannelHandler 是线程安全的时才使用@Sharable 注解。

像下面就是一个错误的示例:

@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
 private int count;
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 //这里并不能保证状态的写入是安全的
 count++;
 System.out.println("channelRead(...) called the "+ count + " time");
 ctx.fireChannelRead(msg);
 }
}

我们共享ChannelHandler的一个常见的原因是用于收集跨越多个 Channel 的统计信息。

四、异常处理

4.1 处理入站异常

如果在处理入站事件的过程中有异常被抛出,那么它将从它在 ChannelInboundHandler里被触发的那一点开始流经 ChannelPipeline。要想处理这种类型的入站异常,你需要在你的 ChannelInboundHandler 实现中重写下面的方法。

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception

一个简单的处理例子:

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻辑的 ChannelInboundHandler 通常位于 ChannelPipeline 的最后。这确保了所有的入站异常都总是会被处理,无论它们可能会发生在 ChannelPipeline 中的什么位置。

总结一下:

  • ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline 中的下一个 ChannelHandler;
  • 如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;
  • 要想定义自定义的处理逻辑,你需要重写 exceptionCaught()方法。然后你需要决定是否需要将该异常传播出去

4.2 处理出站异常

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。

  • 每个出站操作都将返回一个 ChannelFuture。注册到 ChannelFuture 的 ChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了。
  • 几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise的实例。

作为 ChannelFuture 的子类,ChannelPromise 也可以被分配用于异步通知的监听器。但是,ChannelPromise 还具有提供立即通知的可写方法:

ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);

添加 ChannelFutureListener 只需要调用 ChannelFuture 实例上的 addListener(ChannelFutureListener)方法,并且有两种不同的方式可以做到这一点:

1、调用出站操作(如 write()方法)所返回的 ChannelFuture 上的 addListener()方法。

ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
})

2、将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOutboundHandler 的方法的ChannelPromise。

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,ChannelPromise promise) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
}

ChannelPromise 的可写方法
通过调用 ChannelPromise 上的 setSuccess()和 setFailure()方法,可以使一个操作的状态在ChannelHandler 的方法返回给其调用者时便即刻被感知到

如果你的 ChannelOutboundHandler 本身抛出了异常会发生什么呢?

这种情况下Netty 本身会通知任何已经注册到对应 ChannelPromise 的监听器

PS:在调用出站操作时添加 ChannelFutureListener 更合适。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/555252.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习】采样方法

文章目录 采样方法11.1 简介11.2 常见采样方法11.2.1 均匀分布采样11.2.2 逆变换采样11.2.3 拒绝采样11.2.4 重要采样11.2.5 Metropolis方法11.2.6 Metropolis-Hasting 算法11.2.7 吉布斯采样 采样方法 11.1 简介 什么是采样 从一个分布中生成一批服从该分布的样本&#xff0c…

Linux知识点 -- 进程概念(一)

Linux知识点 – 进程概念&#xff08;一&#xff09; 文章目录 Linux知识点 -- 进程概念&#xff08;一&#xff09;一、冯诺伊曼体系结构二、操作系统&#xff08;OS&#xff09;1.概念2.设计OS的目的3.如何理解管理4.系统调用和库函数的概念 三、进程概念1.理解进程2.描述进程…

Windows源码安装INDEMIND双目惯性模组

最近电赛准备在Win10的平台上做一个增强现实眼镜&#xff0c;所以IMU相机也是必不可少的传感器&#xff0c;记录Windows源码安装INDEMIND双目惯性模组。 文章目录 实验环境一、配置准备1、SDK下载及准备安装2、安装CMake并添加环境变量3、设置".sln"文件的默认打开方…

区块链服务网络发展联盟最新成员单位名单公布

原标题&#xff1a;《BSN发展联盟第六批入盟成员单位及全体联盟成员名单公示》 为了更好地推动BSN的发展&#xff0c;国家信息中心、中国移动通信集团有限公司、中国银联股份有限公司、北京红枣科技有限公司共同发起成立了区块链服务网络发展联盟&#xff08;Blockchain-based…

六、AOP(2)

一、AOP操作&#xff08;AspectJ注解&#xff09;重点 1.创建类&#xff0c;在类里面定义方法com.zhilei.spring5.aopanno public class User {public void add(){System.out.println("add...");} }2.创建增强类&#xff0c;编写增强逻辑 在增强类里面&#xff0c…

基于LC3模拟器的简单游戏设计:简易四子棋

一、实验目的 分析和理解指定的需解决问题。利用LC-3的汇编代码设计实现相关程序。通过LC-3仿真器调试和运行相关程序并得到正确的结果。 二、实验内容 四子棋是一款普遍流行的简易型桌面游戏&#xff0c;据说&#xff0c;虎克船长曾因专注于此游戏而长期隐身在住所&#xf…

如何绘制算法流程图?常见渠道一览

算法流程图是一种用于描述算法执行流程的图形化工具&#xff0c;它通常将算法的执行过程分解成若干个步骤&#xff0c;并通过线条连接这些步骤&#xff0c;形成一个完整的流程图。在计算机科学和信息技术的发展过程中&#xff0c;算法流程图已经成为了一种非常有用的工具&#…

ThreadLocal为什么容易内存泄露?

文章目录 一、Java的四种引用二、ThreadLocal为什么容易内存泄露&#xff1f;三、源码 一、Java的四种引用 1、强引用&#xff1a;强引用在程序内存不足&#xff08;OOM&#xff09;的时候也不会被回收 2、软引用&#xff1a;软引用在程序内存不足时&#xff0c;会被回收 3、弱…

前端axios fetch 解决接口请求响应数据返回快慢不均导致的数据错误问题

引言 搜索功能&#xff0c;我想很多业务都会涉及&#xff0c;这个功能的特点是&#xff1a; 用户可以在输入框中输入一个关键字&#xff0c;然后在一个列表中显示该关键字对应的数据&#xff1b;输入框是可以随时修改/删除全部或部分关键字的&#xff1b;如果是实时搜索&…

Go学习圣经:0基础精通GO开发与高并发架构(1)

GO 学习圣经&#xff1a;底层原理和实操 说在前面&#xff1a; 现在拿到offer超级难&#xff0c;甚至连面试电话&#xff0c;一个都搞不到。 尼恩的技术社群中&#xff08;50&#xff09;&#xff0c;很多小伙伴凭借 “左手云原生右手大数据”的绝活&#xff0c;拿到了offer…

高并发线程内存事件处理器 disruptor 三 高性能原理

一 disruptor为什么快的核心原理 属性填充&#xff1a;通过添加额外的无用信息&#xff0c;避免伪共享问题 什么是共享内存 在系统内存中&#xff0c;我们的数据存在于cpu缓存中&#xff0c;cpu缓存的基础缓存单位为 cache line&#xff0c;通常cache line的大小为64字节&…

什么是测试策略?怎么制定测试策略?测试管理篇

之前说了太多的测试技术和测试用例设计方法&#xff0c;猛地发现有点“偏科“了。今天我们放松一些&#xff0c;泡一杯茶&#xff0c;一起来聊一聊测试策略吧。 当然&#xff0c;文章脉络肯定是咱们老三样&#xff1a;什么是测试策略&#xff0c;为什么要制定测试策略&#xff…

5个最好的WooCommerce商城自动化动作来增加销售量

您是否正在寻找简单智能的方法来自动执行任务并增加 WooCommerce 商店的销售额&#xff1f; 通过在线商店中的自动化任务&#xff0c;您可以在发展业务和增加销售额的同时节省时间和金钱。 在本文中&#xff0c;我们将向您展示如何使用 WooCommerce商城自动化来增加销售额。 …

Puppeteer入门实践

环境 1、安装nodejs 官网&#xff1a;https://nodejs.org/zh-cn 下载安装好nodejs只后 验证&#xff1a;node -v 出现版本号表示安装成功&#xff0c;否则需要配置环境变量 2、创建node项目并初始化 随便新建一个文件夹 进入文件夹搜索cmd回车 执行npm init -y 安装依赖 …

RabbitMq--- 惰性队列

前言 消息堆积是Mq消费时常见的问题&#xff0c;这里我们展开说一下消息堆积的原因&#xff0c;以及RabbitMq 中是如何解决这个问题的。 1. 消息堆积问题 当生产者发送消息时的速度超过了消费者处理消息的速度&#xff0c;就会导致队列中的消息堆积&#xff0c;直到队列存储…

【Linux】shell脚本—正则表达式

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、正则表达式概述二、基本的正则表达式三、操作演示 一、正则表达式概述 正则表达式是对字符串操作的一种逻辑公式&#xff0c;就是用事先定义好的一些特定字符、…

WIN11+CLion+CMake+MINGW+OPENCV编译需注意问题

安装编译教程可参考以下链接&#xff1a; CLion OpenCV cmake&#xff0c;源码编译及使用_clion编译opencv_拜阳的博客-CSDN博客 使用CLion开发openCV——环境搭建全记录_-Willing-的博客-CSDN博客 此文主要是记录自己在编译过程中遇到的问题和解决方法 1、版本问题 C…

Windows 10 主机上的 VMware Workstation 和设备/凭据防护不兼容“错误

Windows 10 主机上的 VMware Workstation 和设备/凭据防护不兼容“错误 VMware Workstation 和 Device/Credential Guard 不兼容。VMware 工作站可以在禁用设备/凭据防护后运行。 排查错误的过程&#xff1a; 要解决错误&#xff0c;请按照以下步骤操作&#xff1a; 如果您…

LDR6020 【USB_C显示器方案】台式显示器方案介绍

首先介绍一下什么是Type-c接口&#xff1f; 现在显示器上常见的有这几种接口&#xff1a;HDMI、DP、USB-A、USB-C&#xff08;USB Type-c接口&#xff09;、3.5mm和电源接口&#xff0c;像之前流行的VGA接口和DVI接口&#xff0c;基本上在新显示器上&#xff0c;尤其是中高端显…

什么是频谱型温振变送器(附常见振动故障特征)

在机械振动方面&#xff0c;振动分析是一项十分重要的技术。这项技术是预测维修程序中的基础&#xff0c;是机器状态的指示器&#xff0c;为了避免机械设备异常振动所带来的损失&#xff0c;对工业机械设备做振动分析是非常有必要的&#xff01; 频谱型温振变送器是一款选用了M…