目录
- NIO编程
- NIO介绍
- NIO和BIO的比较
- 缓冲区(Buffer)
- 基本介绍
- 常用API
- 缓冲区对象创建
- 添加数据
- 读取数据
- 通道(Channel)
- 基本介绍
- Channel常用类
- ServerSocketChannel
- SocketChannel
- Selector (选择器)
- 基本介绍
- 常用API介绍
- 示例代码
- NIO 三大核心原理
- Netty核心概念
- Netty 介绍
- 原生 NIO 存在的问题
- Netty概述
- 线程模型
- 基本介绍
- 传统阻塞 I/O 服务模
- Reactor 模型
- 单Reactor单线程
- 单 Reactor多线程
- 主从 Reactor 多线程
- Netty线程模型
NIO编程
NIO介绍
Java NIO 全称java non-blocking IO ,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的。
NIO 有三大核心部分:Channel(通道)
,Buffer(缓冲区)
,Selector(选择器)
NIO是 面向缓冲区编程的。数据读取到一个缓冲区中,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情。
通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。
NIO和BIO的比较
-
BIO 以流的方式处理数据,而 NIO 以缓冲区的方式处理数据,缓冲区 I/O 的效率比流 I/O 高很多
-
BIO 是阻塞的,NIO则是非阻塞的
-
BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道
缓冲区(Buffer)
基本介绍
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个数组,该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer。
常用API
Buffer 类及其子类
在 NIO 中,Buffer是一个顶层父类,它是一个抽象类, 类的层级关系图,常用的缓冲区分别对应byte,short, int, long,float,double,char 7种,这些也是抽象类,下面还有很多具体的子类。
缓冲区对象创建
方法名
方法名 | 说明 |
---|---|
static ByteBuffer allocate(长度) | 创建byte类型的指定长度的缓冲区 |
static ByteBuffer wrap(byte[] array) | 创建一个有内容的byte类型缓冲区 |
示例代码:
package com.cys.nio;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class CreateBufferDemo {
public static void main(String[] args) {
// 1.创建一个指定长度的缓冲区, 以ByteBuffer为例
ByteBuffer byteBuffer = ByteBuffer.allocate(5);
// 初始化的数据默认都是0
for (int i = 0; i < 5; i++) {
System.out.println(byteBuffer.get());
}
// 再次调用会报错--后续再读缓冲区时着重讲解
// System.out.println(byteBuffer.get());
//2.创建一个有内容的缓冲区
ByteBuffer wrap = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8));
for (int i = 0; i < 5; i++) {
System.out.println(wrap.get());
}
}
}
添加数据
方法名 | 说明 |
---|---|
int position()/position(int newPosition) | 获得当前要操作的索引/修改当前要操作的索引位置 |
int limit()/limit(int newLimit) | 最多能操作到哪个索引/修改最多能操作的索引位置 |
int capacity() | 返回缓冲区的总长度 |
int remaining()/boolean hasRemaining() | 还有多少能操作索引个数/是否还有能操作 |
put(byte b)/put(byte[] src) | 添加一个字节/添加字节数组 |
图解:
示例代码:
package com.cys.nio;
import java.nio.ByteBuffer;
public class PutBufferDemo {
public static void main(String[] args) {
// 1.创建一个指定长度的缓冲区, 以ByteBuffer为例
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
System.out.println(byteBuffer.position());//0 获取当前索引所在位置
System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//10 还有多少个能操作
// 修改当前索引位置
byteBuffer.position(1);
// 修改最多能操作到哪个索引位置
byteBuffer.limit(9);
// 此时数据发生变化
System.out.println(byteBuffer.position());//1 获取当前索引所在位置
System.out.println(byteBuffer.limit());//9 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//8 还有多少个能操作
// 添加一个字节,注意上面的position已经改为1了,所以从索引1开始添加的
byteBuffer.put((byte) 97);
// 修改最多能操作到哪个索引位置回到10
byteBuffer.limit(10);
System.out.println(byteBuffer.position());//2 获取当前索引所在位置
System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//8 还有多少个能操作
// 添加一个字节数组,注意上面的position已经改为2了,所以从索引2开始添加的
byteBuffer.put("abc".getBytes()); // 添加个3个长度
System.out.println(byteBuffer.position());//5 获取当前索引所在位置
System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//5 还有多少个能操作
// 注意当添加超过缓冲区的长度时会报错
byteBuffer.put("01234".getBytes());
System.out.println(byteBuffer.position());//10 获取当前索引所在位置
System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//0 还有多少个能操作
System.out.println(byteBuffer.hasRemaining());// false 是否还能有操作的数组
// 如果缓存区存满后, 可以调整position位置可以重复写,这样会覆盖之前存入索引的对 应的值
byteBuffer.position(0);
byteBuffer.put("012345".getBytes());
}
}
代码结合图例,会很好理解。
读取数据
方法 | 说明 |
---|---|
flip() | 写切换读模式, limit设置到position位置, position设置0 |
get() | 读一个字节 |
get(byte[] dst) | 读多个字节 |
get(int index) | 读指定索引的字节 |
rewind() | 将position设置为0,可以重复读 |
clear() | 切换写模式 position设置为0 , limit 设置为 capacity,但原来的数据还在 |
array() | 将缓冲区转换成字节数组返回 |
图解flip()
方法:
图解clear()
方法:
示例代码:
package com.cys.nio;
import java.nio.ByteBuffer;
public class GetBufferDemo {
public static void main(String[] args) {
// 1.创建一个指定长度的缓冲区
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
System.out.println("position:" + allocate.position());//4
System.out.println("limit:" + allocate.limit());//10
System.out.println("capacity:" + allocate.capacity());//10
System.out.println("remaining:" + allocate.remaining());//6
//切换读模式
System.out.println("读取数据--------------");
allocate.flip();
System.out.println("position:" + allocate.position());//0
System.out.println("limit:" + allocate.limit());//4
System.out.println("remaining:" + allocate.remaining());//4
for (int i = 0; i < allocate.limit(); i++) {
System.out.println(allocate.get());
}
//读取完毕后.继续读取会报错,超过limit值
// System.out.println(allocate.get());
//读取指定索引字节,不会受到limit影响
System.out.println("读取指定索引字节--------------");
System.out.println(allocate.get(1));
System.out.println("读取多个字节--------------");
// position设为0,可重复读取
allocate.rewind();
byte[] bytes = new byte[4];
allocate.get(bytes);
System.out.println(new String(bytes));
// 将缓冲区转化字节数组返回
System.out.println("将缓冲区转化字节数组返回--------------");
byte[] array = allocate.array();
System.out.println(new String(array));
// 切换写模式,覆盖之前索引所在位置的值
System.out.println("写模式--------------");
allocate.clear();
allocate.put("abc".getBytes());
System.out.println(new String(allocate.array()));
}
}
注意事项:
- 获取缓冲区里面数据之前,需要调用flip方法
- 再次写数据之前,需要调用clear方法,但是数据还未消失,等再次写入数据,被覆盖了才会消失
通道(Channel)
基本介绍
通常来说NIO中的所有IO都是从 Channel(通道) 开始的。NIO 的通道类似于流,但有些区别如下:
- 通道可以读也可以写,流一般来说是单向的(只能读或者写,所以之前我们用流进行IO操作的时候需要分别创建一个输入流和一个输出流)
- 通道可以异步读写
- 通道总是基于缓冲区Buffer来读写
Channel常用类
- Channel接口
常用的Channel实现类有 :FileChanne
, DatagramChannel
,ServerSocketChannel
和SocketChannel
。FileChannel 用于文件的数据读写, DatagramChannel 用于 UDP 的数据读写, ServerSocketChannel 和SocketChannel 用于 TCP 的数据读写。
ServerSocketChannel类似ServerSocket , SocketChannel类似Socket,可以完成客户端与服务端数据的通信工作。
ServerSocketChannel
服务端实现步骤:
-
打开一个服务端通道
-
绑定对应的端口号
-
通道默认是阻塞的,需要设置为非阻塞
-
检查是否有客户端连接 有客户端连接会返回对应的通道
-
获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
-
给客户端回写数据
-
释放资源
代码实现:
package com.cys.nio.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class NIOServer {
public static void main(String[] args) throws IOException, InterruptedException {
//1. 打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2. 绑定对应的端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//3. 通道默认是阻塞的,需要设置为非阻塞
serverSocketChannel.configureBlocking(false);
System.out.println("服务端启动成功..........");
while (true) {
//4. 检查是否有客户端连接 有客户端连接会返回对应的通道 , 否则返回null
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel == null) {
System.out.println("没有客户端连接...我去做别的事情");
Thread.sleep(2000);
continue;
}
//5. 获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//返回值: 正数: 表示本次读到的有效字节个数; 0 : 表示本次没有读到有效字节; -1 : 表示读到了末尾
int read = socketChannel.read(byteBuffer);
System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
//6. 给客户端回写数据
socketChannel.write(ByteBuffer.wrap("收到".getBytes(StandardCharsets.UTF_8)));
//7. 释放资源
socketChannel.close();
}
}
}
SocketChannel
实现步骤
-
打开通道
-
设置连接IP和端口号
-
写出数据
-
读取服务器写回的数据
-
释放资源
代码实现:
package com.cys.nio.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class NIOClient {
public static void main(String[] args) throws IOException {
//1.打开通道
SocketChannel socketChannel = SocketChannel.open();
//2.设置连接IP和端口号
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
//3.写出数据
socketChannel.write(ByteBuffer.wrap("你好".getBytes(StandardCharsets.UTF_8)));
//4.读取服务器写回的数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
System.out.println("服务端消息:" + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
//5.释放资源
socketChannel.close();
}
}
Selector (选择器)
基本介绍
可以用一个线程,处理多个的客户端连接,就会使用到NIO的Selector(选择器). Selector 能够检测多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
下面在这种没有选择器的情况下,对应每个连接对应一个处理线程,但是连接并不能马上就会发送信息,所以还会产生资源浪费:
下面是有selector模型的:
只有在通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程, 避免了多线程之间的上下文切换导致的开销。
常用API介绍
- Selector
是一个抽象类
常用方法:
- Selector.open() : 得到一个选择器对象
- selector.select() : 一直阻塞,监控所有注册的通道,当有对应的事件操作时, 会将SelectionKey放入集合内部并返回事件数量
- selector.select(1000): 阻塞1000 毫秒,监控所有注册的通道,当有对应的事件操作时, 会将SelectionKey放入集合内部并返回
- selector.selectedKeys() : 返回存有SelectionKey的集合
- SelectionKey
标识SelectableChannel在选择器中的注册标记。
在每次向选择器注册通道时,就会创建一个 选择键(SelectionKey)。通过调用某个键的cancel()方法、关闭其通道,或者通过关闭其选择器取消该键之前,通道一直保持有效。取消某个键不会立即从其选择器中移除它,而是将该键添加到选择器的已取消键集,以便在下一次进行select()方法操作时移除它。可通过调用某个键的isValid()方法来测试其有效性。
常用方法:
- SelectionKey.isAcceptable(): 是否是连接继续事件
- SelectionKey.isConnectable(): 是否是连接就绪事件
- SelectionKey.isReadable(): 是否是读就绪事件
- SelectionKey.isWritable(): 是否是写就绪事件
SelectionKey中定义的4种事件:
- SelectionKey.OP_ACCEPT —— 接收连接就绪事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
- SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户端与服务器的连接已经建立成功
- SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
- SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)
示例代码
服务端实现步骤:
- 打开一个服务端通道
- 绑定对应的端口号
- 通道默认是阻塞的,需要设置为非阻塞
- 创建选择器
- 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
- 检查选择器是否有事件
- 获取事件集合
- 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
- 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
- 判断是否是客户端读就绪事件SelectionKey.isReadable()
- 得到客户端通道,读取数据到缓冲区
- 给客户端回写数据从集合中删除对应的事件, 因为防止二次处理
代码实现:
package com.cys.nio.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
public class NIOSelectorServer {
public static void main(String[] args) throws IOException {
//1. 打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2. 绑定对应的端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//3. 通道默认是阻塞的,需要设置为非阻塞
serverSocketChannel.configureBlocking(false);
//4. 创建选择器
Selector selector = Selector.open();
//5. 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功...");
while (true) {
//6. 检查选择器是否有事件
int select = selector.select(1000);
if (select == 0) {
continue;
}
//7. 获取事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
if (iterator.hasNext()) {
SelectionKey key = iterator.next();
//8. 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
if (key.isAcceptable()) {
//9. 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端已连接......" + socketChannel);
//必须设置通道为非阻塞, 因为selector需要轮询监听每个通道的事件
socketChannel.configureBlocking(false);
//并指定监听事件为OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
}
//10. 判断是否是客户端读就绪事件SelectionKey.isReadable()
if (key.isReadable()) {
//11.得到客户端通道,读取数据到缓冲区
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
//12.给客户端回写数据
socketChannel.write(ByteBuffer.wrap("收到".getBytes(StandardCharsets.UTF_8)));
socketChannel.close();
}
}
//13.从集合中删除对应的事件, 因为防止二次处理. iterator.remove();
iterator.remove();
}
}
}
}
客户端不需要修改。
NIO 三大核心原理
一张图描述 NIO 的 Selector 、 Channel 和 Buffer 的关系
-
每个 channel 都会对应一个 Buffer
-
Selector 对应一个线程, 一个线程对应多个 channel(连接)
-
每个 channel 都注册到 Selector选择器上
-
Selector不断轮询查看Channel上的事件, 事件是通道Channel非常重要的概念
-
Selector 会根据不同的事件,完成不同的处理操作
-
Buffer 就是一个内存块 , 底层是有一个数组
-
数据的读取写入是通过 Buffer, BIO 中要么是输入流,或者是输出流, 不能双向,但是NIO 的 Buffer 是可以读也可以写 , channel 是双向的
Netty核心概念
Netty 介绍
原生 NIO 存在的问题
-
NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
-
需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
-
开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
-
JDK NIO 的 Bug:臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决
在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方
法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场
景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll
bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影
响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。
Netty概述
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
从图中就能看出 Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。
具备如下优点:
- 设计优雅,提供阻塞和非阻塞的 Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型。
- 具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。
- 提供安全传输特性。
- 支持多种主流协议;预置多种编解码功能,支持用户开发私有协议。
线程模型
基本介绍
不同的线程模式,对程序的性能有很大影响,在学习Netty线程模式之前,首先讲解下 各个线程模式, 最后看看 Netty 线程模型有什么优越性.目前存在的线程模型有:
-
传统阻塞 I/O 服务模型
-
Reactor 模型
根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现
- 单 Reactor 单线程
- 单 Reactor 多线程
- 主从 Reactor 多线程
传统阻塞 I/O 服务模
采用阻塞 IO 模式获取输入的数据, 每个连接都需要独立的线程完成数据的输入 , 业务处理和数据返回工作
存在问题:
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费
Reactor 模型
Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher模式。
Reactor 模式使用IO 复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键。
单Reactor单线程
特点:
- Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求
- Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发
- 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
- Handler 会完成 Read→业务处理→Send 的完整业务流程
优点:
模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点:
- 性能问题: 只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
- 可靠性问题: 线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
单 Reactor多线程
特点:
- Reactor 对象通过 selector 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发
- 如果建立连接请求, 则右 Acceptor 通过accept 处理连接请求
- 如果不是连接请求,则由 Reactor 分发调用连接对应的 handler 来处理
- handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的worker 线程池的某个线程处理业务
- worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handlerhandler 收到响应后,通过 send 将结果返回给 client
优点:
可以充分的利用多核 cpu 的处理能力
缺点:
多线程数据共享和访问比较复杂, Reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈
主从 Reactor 多线程
特点:
- Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过Acceptor 处理客户端连接事件
- 当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给 SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端建立连接之后将连接交由 SubReactor 监听后面的 IO 事件。)
- SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理
- 当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理
- Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理
- Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler
- Handler 通过 send 向客户端发送响应数据
- 一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个SubReactor 线程
优点:
- MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要接收新连接,SubReactor 线程完成后续的业务处理
- MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传给 SubReactor 线程,SubReactor 线程无需返回数据
- 多个 SubReactor 线程能够应对更高的并发请求
缺点:
这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括Nginx、Memcached、Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。
Netty线程模型
Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。
下面一步步看下Netty 线程模型的特点
- 简单版Netty模型
特点:
- BossGroup 线程维护 Selector,ServerSocketChannel 注册到这个 Selector 上,只关注连接建立请求事件(主 Reactor)
- 当接收到来自客户端的连接建立请求事件的时候,通过 ServerSocketChannel.accept 方法获得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程中的Selector,每个 Selector 运行在一个线程中(从 Reactor)
- 当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的 IO 事件后,就调用 Handler 进行处理
- 进阶版Netty模型
特点:
- BossGroup 和 WorkerGroup线程组下都可以有多个线程,但是一般BossGroup就一个
- BossGroup 和 WorkerGroup 含有多个不断循环的执行事件处理的线程,每个线程都包含一个 Selector,用于监听注册在其上的 Channel
- 每个 BossGroup 中的线程循环执行以下三个步骤:
- 轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- 处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到WorkerGroup 中某个线程上的 Selector 上
- 再去依次循环处理任务队列中的下一个事件
- 每个 WorkerGroup 中的线程循环执行以下三个步骤
- 轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
- 在对应的 NioSocketChannel 上处理 read/write 事件
- 再去依次循环处理任务队列中的下一个事件
- 详细版Netty模型
特点:
- Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是NioEventLoopGroup
- NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop
- NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)
- NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
- 每个 BossNioEventLoop 中循环执行以下三个步骤
- select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上
- runAllTasks:再去依次循环处理任务队列中的其他任务
- 每个 WorkerNioEventLoop 中循环执行以下三个步骤
- select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
- processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
- runAllTasks:再去以此循环处理任务队列中的其他任务
- 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。