文章目录
- 一、入门案例
- 二、Bootstrap、ServerBootstrap
- 三、Future 、ChannelFuture
- 四、Channel
- 五、Selector
- 六、ChannelHandler 及其实现类
- 七、Pipeline 和 ChannelPipeline
- 八、ChannelHandlerContext
- 九、ChannelOption
- 十、EventLoopGroup 和其实现类
- 十一、Unpooled类与ByteBuf
- 参考资料
一、入门案例
入门案例请移步
二、Bootstrap、ServerBootstrap
Bootstrap 的意思就是引导类,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
常见的方法有:
// 该方法用于服务器端,用来设置两个 EventLoop
public ServerBootstrap group(EventLoopGroup parentGroup,EventLoopGroup childGroup)
// 该方法用于客户端,用来设置一个 EventLoop
public B group(EventLoopGroup group)
// 该方法用来设置一个服务器端的通道实现
public B channel(Class<? extends C> channelClass)
// 用来给 ServerChannel 添加配置
public <T> B option(ChannelOption<I> option,T value)
// 用来给接收到的通道添加配置
public <T> ServerBootstrap childOption(ChannelOption<T> childOption,T value)
// 该方法用来设置业务处理类 (自定义的handler)
public ServerBootstrap childHandler(ChannelHandler childHandler)
// 该方法用于服务器端,用来设置占用的端口号
public ChannelFuture bind(int inetPort)
// 该方法用于客户端,用来连接服务器端
public ChannelFuture connect(String inetHost,int inetPort)
三、Future 、ChannelFuture
Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
常见的方法有:
// 返回当前正在进行 IO 操作的通道
Channel channel()
// 等待异步操作执行完毕
ChannelFuture sync()
// 添加监听器
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
四、Channel
Channel是一个接口,它继承了AttributeMap, ChannelOutboundInvoker, Comparable三个类。Comparable表示这个类可以用来做比较。AttributeMap用来存储Channel的各种属性。ChannelOutboundInvoker主要负责Channel和外部 SocketAddress 进行连接和对写。
-
Channel是Netty 网络通信的组件,能够用于执行网络 IO 操作。
-
通过 Channel 可获得当前网络连接的通道的状态。
-
通过 Channel 可获得 网络连接的配置参数(例如接收缓冲区大小)。
-
Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 IO 调用都将立即回,并且不保证在调用结束时所请求的 I/O 操作已完成
-
调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 IO 操作成功、失败或取消时回调通知调用方
-
支持关联 I/O 操作与对应的处理程序
-
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,常用的 Channel 类型:
-
NioSocketChannel,异步的客户端 TCP Socket 连接。
-
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
-
NioDatagramChannel,异步的 UDP 连接。
-
NioSctpChannel,异步的客户端 Sctp 连接
-
NioSctoServerChannel,异步的 Sctp 服器端接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件IO。
一旦有客户端成功与服务端建立连接,将新建一个Channel与该客户端进行绑定
Channel从线程组NioEventloopGroup中获取一个NioEventloop,并注册到该NioEventloop,后续该Channel的生命周期内都与该NioEventloop绑定在一起
Channel同客户端进行网络连接、关闭和读写,生成对应的even事件,由Selector轮询到后,交给Worker线程组中的调度线程去执行
在不同的生命周期阶段,Channel会有不同的状态,并且能够在不同的状态之间进行流转和切换。
Channel的状态有四种:
ChannelUnregistered:已创建但还未被注册到监听器中
ChannelRegistered :已注册到监听器EventLoop中
ChannelActive :连接完成处于活跃状态,此时可以接收和发送数据
ChannelInactive :非活跃状态,代表连接未建立或者已断开
五、Selector
Netty 基于 Selector 对象实现 IO 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的Channel 是否有已就绪的 IO 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel。
六、ChannelHandler 及其实现类
ChannelHandler 是一个接口,处理 IO 事件或拦截 IO 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类。
ChannelHandler 及其实现类一览图:
ChannelInboundHandler用于处理入站IO事件;ChannelOutboundHandler用于处理出站IO操作。
ChannelInboundHandlerAdapter用于处理入站IO事件;ChannelOutboundHandlerAdapter用于处理出站IO操作;ChannelDuplexHandler用于处理入站和出站事件(少用)。
我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
// channel注册成功事件
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
// channel未注册成功事件
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
// channel就绪事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
// channel读取数据事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
// channel读取完成事件
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
// 发生异常事件
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
七、Pipeline 和 ChannelPipeline
ChannelPipeline 是一个重点:
(1)ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于个贯穿 Netty 的链。(也可以这样理解: ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截Channel 的入站事件和出站操作)
(2)ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel中各个的 ChannelHandler 如何相互交互
(3)在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:
一个Channel包含了一个ChannelPipeline,而ChannelPipeline 中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个ChannelHandlerContext 中又关联着一个 ChannelHandler。
入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的 handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的handler互不干扰。
入站操作表示server端向client端发送数据,出站操作表示client端向server端发送数据。
以下实例我们使用ChannelPipeline添加了两个Handler:
// server端添加两个Handler
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向管道加入处理器
//得到管道
ChannelPipeline pipeline = ch.pipeline();
//加入一个netty 提供的httpServerCodec codec =>[coder - decoder]
//HttpServerCodec 说明
//1. HttpServerCodec 是netty 提供的处理http的 编-解码器
pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
//2. 增加一个自定义的handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
System.out.println("ok~~~~");
}
});
// 读取事件
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
//channelRead0 读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
我们可以看出,Pipeline其实是一个双向链表。
ChannelPipeline常用方法:
// 把一个业务处理类(handler) 添加到链中的第一个位置
ChannelPipeline addFirst(ChannelHandler...handlers)
// 把一个业务处理类(handler) 添加到链中的最后一个位置
ChannelPipeline addLast(ChannelHandler...handlers)
八、ChannelHandlerContext
保存了Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。
ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和Channel的信息,方便对ChannelHandler进行调用:
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println("对应的channel=" + ctx.channel() + " pipeline=" + ctx
.pipeline() + " 通过pipeline获取channel" + ctx.pipeline().channel());
System.out.println("当前ctx的handler=" + ctx.handler());
我们可以看出ChannelHandlerContext也是一个双向链表,包含着所有的handler,关联着pipeline、channel、handler等信息。
常用方法:
// 关闭通道
ChannelFuture close()
// 数据写入
ChannelFuture write(Object msg);
// 数据刷新
ChannelHandlerContext flush();
// 将数据写到ChannelPipeline 中当前ChannelHandler 的下一个 ChannelHandler 开始处理(出站)
ChannelFuture writeAndFlush(Obiect msg)
// 获取channel
Channel channel();
// 获取pipeline
ChannelPipeline pipeline();
九、ChannelOption
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数进行相关配置。
ChannelOption 参数如下:
public static final ChannelOption<ByteBufAllocator> ALLOCATOR = valueOf("ALLOCATOR");
Netty适用对象池,重用缓冲区
public static final ChannelOption<RecvByteBufAllocator> RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR");
Netty参数,用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。
public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");
Netty参数,消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
CONNECT_TIMEOUT_MILLIS Netty参数,连接超时毫秒数,默认值30000毫秒即30秒。
public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
Netty参数,一次Loop读取的最大消息数,对于ServerChannel或者NioByteChannel,默认值为16,其他Channel默认值为1。默认值这样设置,是因为:ServerChannel需要接受足够多的连接,保证大吞吐量,NioByteChannel可以减少不必要的系统调用select。
public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
Netty参数,一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才能被响应不会因为单个大数据量写请求而耽误。
public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");
Netty参数,写高水位标记,默认值64KB。如果Netty的写缓冲区中的字节超过该值,Channel的isWritable()返回False。
public static final ChannelOption<Integer> WRITE_BUFFER_LOW_WATER_MARK=valueOf("WRITE_BUFFER_LOW_WATER_MARK");
Netty参数,写低水位标记,默认值32KB。当Netty的写缓冲区中的字节超过高水位之后若下降到低水位,则Channel的isWritable()返回True。写高低水位标记使用户可以控制写入数据速度,从而实现流量控制。推荐做法是:每次调用channl.write(msg)方法首先调用channel.isWritable()判断是否可写。
public static final ChannelOption<Boolean> AUTO_READ = valueOf("AUTO_READ");
Netty参数,一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
Socket参数,设置广播模式。
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,
比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,
比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
Netty对底层Socket参数的简单封装,关闭Socket的延迟时间,默认值为-1,表示禁用该功能。-1以及所有<0的数表示socket.close()方法立即返回,但OS底层会将发送缓冲区全部发送到对端。0表示socket.close()方法立即返回,OS放弃发送缓冲区的数据直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到或发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
这个参数设定的是HTTP连接成功后,等待读取数据或者写数据的最大超时时间,单位为毫秒
如果设置为0,则表示永远不会超时
public static final ChannelOption<Integer> IP_TOS = valueOf("IP_TOS");
IP参数,设置IP头部的Type-of-Service字段,用于描述IP包的优先级和QoS选项。
public static final ChannelOption<InetAddress> IP_MULTICAST_ADDR = valueOf("IP_MULTICAST_ADDR");
对应IP参数IP_MULTICAST_IF,设置对应地址的网卡为多播模式。
public static final ChannelOption<NetworkInterface> IP_MULTICAST_IF = valueOf("IP_MULTICAST_IF");
对应IP参数IP_MULTICAST_IF2,同上但支持IPV6。
public static final ChannelOption<Integer> IP_MULTICAST_TTL = valueOf("IP_MULTICAST_TTL");
IP参数,多播数据报的time-to-live即存活跳数。
public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");
对应IP参数IP_MULTICAST_LOOP,设置本地回环接口的多播功能。由于IP_MULTICAST_LOOP返回True表示关闭,所以Netty加上后缀_DISABLED防止歧义。
public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");
TCP参数,立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输延时
十、EventLoopGroup 和其实现类
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多 EventLoop同时工作,每个 EventLoop 维护着一个 Selector 实例。
EventLoopGroup 提供 next 方法,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty服务器端编程中,我们一般都需要提供两个 EventLoopGroup ,例如:BossEventLoopGroup 和WorkerEventLoopGroup 。
通常一个服务端口即一个 ServerSocketChannel 对应一个 Selector 和一个 EventLoop 线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理,如下图所示:
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel的Selector 实例,BossEventLoop 不断轮询Selector 将连接事件分离出来,通常是OP_ACCEPT事件,然后将接收到的 SocketChannel交给WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个EventLoop来将这个SocketChannel注册到其维护的Selector并对其后续的IO事件进行处理。
常用方法:
// 断开连接,关闭线程
Future<?> shutdownGracefully();
EventLoopGroup接口有很多实现类,常用的就是NioEventLoopGroup(处理NIO)、EpollEventLoopGroup(Epoll环境下使用)的,linux环境下如果支持Epoll,强烈建议使用EpollEventLoopGroup提高性能。
十一、Unpooled类与ByteBuf
网络数据的基本单位总是字节,java NIO提供ByteBuffer作为字节的容器,但是ByteBuffer使用起来过于复杂和繁琐。
ByteBuf是netty的Server与Client之间通信的数据传输载体(Netty的数据容器),它提供了一个byte数组(byte[])的抽象视图,既解决了JDK API的局限性,又为网络应用程序的开发者提供了更好的API。
ByteBuf优点:
容量可以按需增长
读写模式切换不需要调用flip()
读写使用了不同的索引
支持方法的链式调用
支持引用计数
支持池化
可以被用户自定义的缓冲区类型扩展
通过内置的复合缓冲区类型实现透明的零拷贝
ByteBuf工作机制:ByteBuf维护了两个不同的索引,一个用于读取,一个用于写入。readerIndex和writerIndex的初始值都是0,当从ByteBuf中读取数据时,它的readerIndex将会被递增(它不会超过writerIndex),当向ByteBuf写入数据时,它的writerIndex会递增。
名称以readXXX或者writeXXX开头的ByteBuf方法,会推进对应的索引,而以setXXX或getXXX开头的操作不会。
在读取之后,0~readerIndex的就被视为discard的,调用discardReadBytes方法,可以释放这部分空间,它的作用类似ByteBuffer的compact()方法。
readerIndex和writerIndex之间的数据是可读取的,等价于ByteBuffer的position和limit之间的数据。writerIndex和capacity之间的空间是可写的,等价于ByteBuffer的limit和capacity之间的可用空间。
//创建一个ByteBuf
//说明
//1. 创建 对象,该对象包含一个数组arr , 该数组是一个byte[10]
//2. 在netty 的buffer中,不需要使用flip 进行反转
// 底层维护了 readerindex 和 writerIndex
//3. 通过 readerindex 和 writerIndex 和 capacity, 将buffer分成三个区域
// 0---readerindex 已经读取的区域
// readerindex---writerIndex , 可读的区域
// writerIndex -- capacity, 可写的区域
ByteBuf buffer = Unpooled.buffer(10);
for(int i = 0; i < 10; i++) {
buffer.writeByte(i);// 写入,会导致writerIndex变化
}
System.out.println("capacity=" + buffer.capacity());//10
//输出
// for(int i = 0; i<buffer.capacity(); i++) {
// System.out.println(buffer.getByte(i)); // 不会导致readerindex变化
// }
for(int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.readByte()); // 会导致readerindex变化
}
System.out.println("执行完毕");
//创建ByteBuf
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
//使用相关的方法
if(byteBuf.hasArray()) { // true
byte[] content = byteBuf.array();
//将 content 转成字符串
System.out.println(new String(content, Charset.forName("utf-8")));
System.out.println("byteBuf=" + byteBuf);
System.out.println(byteBuf.arrayOffset()); // 0
System.out.println(byteBuf.readerIndex()); // 0
System.out.println(byteBuf.writerIndex()); // 12
System.out.println(byteBuf.capacity()); // 36
//System.out.println(byteBuf.readByte()); //
System.out.println(byteBuf.getByte(0)); // 104
int len = byteBuf.readableBytes(); //可读的字节数 12
System.out.println("len=" + len);
//使用for取出各个字节
for(int i = 0; i < len; i++) {
System.out.println((char) byteBuf.getByte(i));
}
//按照某个范围读取
System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
}
参考资料
https://blog.csdn.net/pengjianglilive/article/details/122457450
https://www.cnblogs.com/ylz8401/p/14327201.html
https://blog.csdn.net/qq_22701869/article/details/107091427