文章目录
- 一、操作系统底层IO原理
- 1. 简介
- 2. 操作系统进行IO的流程
- 二、BIO底层原理
- 1. 什么是Socket
- 2. JDK原生编程的BIO
- 三、Java原生编程的NIO
- 1. 简介
- 2. NIO和BIO的主要区别
- 3. Reactor模式
- 4. NIO的三大核心组件
- 5. NIO核心源码分析
一、操作系统底层IO原理
1. 简介
IO,即Input/Output,指的是输入和输出。在计算机科学中,IO描述的是数据在内部存储器和外部存储器或其他周边设备之间的流动过程,既包括数据从外部复制到内存(输入),也包括数据从内存复制到外部(输出)。IO是计算机与外界交互的过程,涉及到的对象可以是人或其他设备,如文件、管道、网络、命令行、信号等,更广义地讲,I/O指代任何操作系统理解为“文件”的事务。此外,IO也是操作系统中的一个核心概念,在各种系统中都有重要地位,例如在本机、传统的单体应用、分布式系统中。IO操作可以有多种方式,如DIO(Direct I/O)、AIO(Asynchronous I/O,异步I/O)、Memory-Mapped I/O(内存映射I/O)等,不同的I/O方式有不同的实现方式和性能,适用于不同的应用场景
2. 操作系统进行IO的流程
首先我们需要了机计算机网络的协议栈,这里有两种分别是OSI参考模型和TCP/IP五层模型,在实际中通常使用到的只有TCP/IP五层模型,因为OSI参考模型实现过于复杂。
现在我们大致看一下数据是如何从一个计算机传递到另一个计算机的,假如张三向李四发送了一条你好
的消息,大致会经过一下过程:
- 首先应用程序会进行编码处理将字符消息转化为二进制流,然后交给传输层(此时产生的数据包类型为报文)
- TCP 根据应用的指示,负责建立连接、发送数据以及断开连接。TCP 提供将应用层发来的数据顺利发送至对端的可靠传输。为了实现这一功能,需要将应用层数据封装为报文段 (segment)并附加一个 TCP 首部然后交给下面的 IP 层。
- IP 将 TCP 传过来的 TCP 首部和 TCP 数据合起来当做自己的数据,并在 TCP 首部的前端 加上自己的 IP 首部生成 IP 数据报(datagram)然后交给下面的数据链路层。
- 从 IP 传过来的 IP 包对于数据链路层来说就是数据。给这些数据附加上链路层首部封装为链路层帧(frame),生成的链路层帧(frame)将通过物理层传输给接收端。
- 然后到了李四的计算机,就会逆向的进行上面的过程,将消息最后传输给应用程序,这样李四就收到了张三的消息。
上面就是整个计算机网络基于TCP通信的大致过程,那么现在的问题是操作系统内部是如何进行IO的
?
我们知道IO无非就是两个核心点,读数据和写数据,我们的应用程序是工作在操作系统的用户态时,当应用程序要执行IO时,用户态需要通过系统调用从用户态切换到核心态。如果应用程序现在在执行读操作,那么操作系统首先会将接收到的网络IO数据存储在内核缓冲,然后将内核缓存准备好的数据拷贝到用户缓存区,然后应用程序就可以处理接收到的数据了。如果应用程序正在执行写操作,那么操作系统需要将应用程序准备好的数据从用户缓存拷贝到内核缓存,接着发送出去,下图就展示了大致的细节。
可以发现上面的过程经过了多次的操作系统用户态到内核态的切换,这是很耗时的,可以使用0拷贝等相关技术进行优化,这里就不详细分析了。
下面我们更加深入:
- 读数据
①首先在网络的网卡上或本地存储设备中准备数据,然后调用read()函数。
②调用read()函数后,由内核将网络/本地数据读取到内核缓冲区中。
③读取完成后向CPU发送一个中断信号,通知CPU对数据进行后续处理。
④CPU将内核中的数据写入到对应的程序缓冲区或网络Socket接收缓冲区中。
⑤数据全部写入到缓冲区后,应用程序开始对数据开始实际的处理。
程序中试图利用IO机制读写数据时,仅仅只是调用了内核提供的接口函数而已,本质上真正的IO操作还是由内核自己去完成的。Linux 系统为了提高 IO 效率,会在用户空间和内核空间都加入缓冲区(缓冲区可以减少频繁的系统 IO 调 用。系统调用需要保存之前的进程数据和状态等信息,而结束调用之后回来还需要恢复之前的信息,为 了减少这种损耗时间、也损耗性能的系统调用,于是出现了缓冲区)
- 写数据
①应用程序准备要写入的数据,可能是从用户输入、其他应用程序输出或者本地文件等获取的数据。
②当应用程序调用write()函数时,数据被写入到应用程序的内核缓冲区。
③CPU处理写操作,内核在写入数据到内核缓冲区后,向CPU发送一个中断信号,通知CPU有数据需要写入到指定的目的地(例如硬盘或网络)。
④发送完成通知,当数据全部写入到目标设备或网络中时,系统可能会向应用程序发送一个写入完成的通知。
二、BIO底层原理
1. 什么是Socket
Socket 是应用层与 TCP/IP 协议族通信的中间软件抽象层,它是一组接口,一般由操作系统提供。在设计模式中,Socket 其实就是一个门面模式,它把复杂的 TCP/IP 协议处理和通信缓存管理等等都隐藏在 Socket 接口后面,对用户来说,使用一组简单的接口就能进行网络应用编程,让 Socket 去组织数据,以符合指定的协议。主机 A 的应用程序要能和主机 B 的 应用程序通信,必须通过 Socket 建立连接。客户端连接上一个服务端,就会在客户端中产生一个 socket 接口实例,服务端每接受 一个客户端连接,就会产生一个 socket 接口实例和客户端的 socket 进行通信,有多个客户端连接自然就有多个 socket 接口实例。
2. JDK原生编程的BIO
BIO也就是阻塞式IO。在 BIO 中类 ServerSocket 负责绑定IP地址,启动监听端口,等待客户连接;客户端 Socket 类的实例发起连接操作,ServerSocket 接受连接后产生一个新的服务端 socket 实例负责和客户端 socket 实例通过输入和输出流进行通信。
BIO阻塞的含义体现在两个方面:
- 若一个服务器启动就绪,那么主线程就一直在等待着客户端的连接,这个等待过程中主线程就一直在阻塞。
- 在连接建立之后,在读取到 socket 信息之前,客户端线程也是一直在等待,一直处于阻塞的状态下的。
我们看一个java实现的BIO通信模式的案例的代码,首先是服务端:
public static void main(String[] args) throws IOException {
//服务端启动必备
ServerSocket serverSocket = new ServerSocket();
//表示服务端在哪个端口上监听
serverSocket.bind(new InetSocketAddress(10001));
System.out.println("Start Server ....");
try{
while(true){
new Thread(new ServerTask(serverSocket.accept())).start();
}
}finally {
serverSocket.close();
}
}
//每个和客户端的通信都会打包成一个任务,交个一个线程来执行
private static class ServerTask implements Runnable{
private Socket socket = null;
public ServerTask(Socket socket){
this.socket = socket;
}
@Override
public void run() {
//实例化与客户端通信的输入输出流
try(ObjectInputStream inputStream =
new ObjectInputStream(socket.getInputStream());
ObjectOutputStream outputStream =
new ObjectOutputStream(socket.getOutputStream())){
//接收客户端的输出,也就是服务器的输入
String userName = inputStream.readUTF();
System.out.println("Accept client message:"+userName);
//服务器的输出,也就是客户端的输入
outputStream.writeUTF("Hello,"+userName);
outputStream.flush();
}catch(Exception e){
e.printStackTrace();
}finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
首先定义了一个ServerSocket方法并调用accept方法去监听10001
端口,当然上面代码是创建了一个新的线程来专门监听10001
端口,我们看看accept方法底层到底在做什么?
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
首先它会调用isClosed()
方法判断当前的ServerSocket
是否已经关闭了,ServerSocket
声明了一个closed变量,来维护ServerSocekt
的状态。
private boolean closed = false;
public boolean isClosed() {
synchronized(closeLock) {
return closed;
}
}
下面代码用于判断当前的SocketServer
是否已经与端口绑定了,ServerSocekt
底层同样是有一个bound
成员变量来维护当前ServerSocket
的绑定状态。
if (!isBound())
throw new SocketException("Socket is not bound yet");
上面代码中我们调用了bind方法来将ServerSocket与指定的端口进行绑定,下面我们看看绑定的时候底层在做什么?
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
其实核心的就下面两句代码,一个是绑定动作,一个是监听动作,监听动作底层调用了socketListen
这一个native方法。
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
回到accept方法,接着它创建了一个Socket对象
Socket s = new Socket((SocketImpl) null);
然后调用了implAccept(s)
方法,参数是上面我们创建的Socekt,我们进入该方法。
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
try {
if (s.impl == null)
//用 setImpl() 方法,该方法用于设置 Socket 对象的底层实现。
s.setImpl();
else {
//调用 reset() 方法,该方法用于重置 Socket 对象的底层实现。
s.impl.reset();
}
si = s.impl;
s.impl = null;
si.address = new InetAddress();
//指定文件描述符
si.fd = new FileDescriptor();
//这个accept底层也是调用的socketListen这个native方法
getImpl().accept(si);
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccept(si.getInetAddress().getHostAddress(),
si.getPort());
}
} catch (IOException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
} catch (SecurityException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
}
s.impl = si;
s.postAccept();
}
socketListen 方法通常是在底层操作系统或网络库中实现的,用于启动套接字的监听过程。这个方法在大多数情况下是阻塞的,因为它需要等待客户端的连接请求到达。当调用 socketListen 方法后,套接字会进入监听状态,等待客户端连接请求。在这个过程中,如果没有客户端连接请求到达,socketListen 方法会一直阻塞,直到有新的连接请求到达或者发生超时。
上面方法我们会阻塞在 getImpl().accept(si);
然后一旦客户端有连接来,就会立即返回accept方法,并将新创建的Socket返回,重新回顾整个过程,服务端程序一直阻塞等待,如果客户端有连接来了就会创建一个新的Socket用于与该连接通信,上面有个疑问的地方就是bind和accept方法好像都有一个socketListen
那么意味bind
方法执行后是否就可以处理客户端连接了?我的个人理解是前者主要是用来建立TCP连接的,参考[这篇博客]。
上面就是传统的BIO的通信模型,采用 BIO 通信模型的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理(上面的案例我没有使用多线程处理,而是服务端就一个线程),处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答模型,同时数据的读取写入也必须阻塞在一个线程内等待其完成。
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程 个数和客户端并发访问数呈 1:1 的正比关系,Java 中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就死掉了。
为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程,实现 1 个或 多个线程处理 N 个客户端的模型(但是底层还是使用的同步阻塞 I/O),通常被称为“伪异 步 I/O 模型”。我们知道,如果使用 CachedThreadPool 线程池(不限制线程数量,如果不清楚请参考 文首提供的文章),其实除了能自动帮我们管理线程(复用),看起来也就像是 1:1 的客户 端:线程数模型,而使用 FixedThreadPool 我们就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了 N:M 的伪异步 I/O 模型。
但是,正因为限制了线程数量,如果发生读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。
三、Java原生编程的NIO
1. 简介
NIO 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 BIO 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。NIO 被称为 no-blocking io 或者 new io 都说得通。
2. NIO和BIO的主要区别
- 面向流和面向缓冲
Java NIO 和 BIO 之间第一个最大的区别是,BIO 是面向流的,NIO 是面向缓冲区的。 Java BIO 面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地 方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO 的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。
- 阻塞和非阻塞IO
Java IO 的各种流是阻塞的。这意味着,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。Java NIO 的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞 IO 的空闲时间用于在其它通道上执行 IO 操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
3. Reactor模式
Reator模式可以看[这篇博客]。
4. NIO的三大核心组件
- Selector
Selector 的英文含义是“选择器”,也可以称为为“轮询代理器”、“事件订阅器”、“channel 容器管理机”都行。
Java NIO 的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器(Selectors),然后使用一个单独的线程来操作这个选择器,进而“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。应用程序将向 Selector 对象注册需要它关注的 Channel,以及具体的某一个 Channel 会对哪些 IO 事件感兴趣。Selector 中也会维护一个“已经注册的 Channel”的容器。
- Channels
通道,被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据,而且可以同时进行读写。
• 所有被 Selector(选择器)注册的通道,只能是继承了SelectableChannel 类的子类。
• ServerSocketChannel:应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用 IO”的端口监听。同时支持 UDP 协议和 TCP 协议。
• ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP端口 到服务器 IP端口的通信连接。 通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。
- Buffer
我们前面说过 JDK NIO 是面向缓冲的。Buffer 就是这个缓冲,用于和 NIO 通道进行交互。 数据是从通道读入缓冲区,从缓冲区写入到通道中的。以写为例,应用程序都是将数据写入缓冲,再通过通道把缓冲的数据发送出去,读也是一样,数据总是先从通道读到缓冲,应用 程序再读缓冲的数据。缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存(其实就是数组)。 这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。
5. NIO核心源码分析
首先我们给出服务端的实现:
public class NioServer {
private static NioServerHandle nioServerHandle;
public static void main(String[] args){
nioServerHandle = new NioServerHandle(DEFAULT_PORT);
new Thread(nioServerHandle,"Server").start();
}
}
public class NioServerHandle implements Runnable{
private volatile boolean started;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
/**
* 构造方法
* @param port 指定要监听的端口号
*/
public NioServerHandle(int port) {
try {
/*创建选择器的实例*/
selector = Selector.open();
/*创建ServerSocketChannel的实例*/
serverSocketChannel = ServerSocketChannel.open();
/*设置通道为非阻塞模式*/
serverSocketChannel.configureBlocking(false);
/*绑定端口*/
serverSocketChannel.socket().bind(new InetSocketAddress(port));
/*注册事件,表示关心客户端连接*/
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
started = true;
System.out.println("服务器已启动,端口号:"+port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(started){
try {
/*获取当前有哪些事件*/
selector.select(1000);
/*获取事件的集合*/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
的键出现,这会导致我们尝试再次处理它。*/
iterator.remove();
handleInput(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*处理事件的发生*/
private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()){
/*处理新接入的客户端的请求*/
if(key.isAcceptable()){
/*获取关心当前事件的Channel*/
ServerSocketChannel ssc
= (ServerSocketChannel) key.channel();
/*接受连接*/
SocketChannel sc = ssc.accept();
System.out.println("==========建立连接=========");
sc.configureBlocking(false);
/*关注读事件*/
sc.register(selector,SelectionKey.OP_READ);
}
/*处理对端的发送的数据*/
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
/*创建ByteBuffer,开辟一个缓冲区*/
ByteBuffer buffer = ByteBuffer.allocate(1024);
/*从通道里读取数据,然后写入buffer*/
int readBytes = sc.read(buffer);
if(readBytes>0){
/*将缓冲区当前的limit设置为position,position=0,
用于后续对缓冲区的读取操作*/
buffer.flip();
/*根据缓冲区可读字节数创建字节数组*/
byte[] bytes = new byte[buffer.remaining()];
/*将缓冲区可读字节数组复制到新建的数组中*/
buffer.get(bytes);
String message = new String(bytes,"UTF-8");
System.out.println("服务器收到消息:"+message);
/*处理数据*/
String result = Const.response(message);
/*发送应答消息*/
doWrite(sc,result);
}else if(readBytes<0){
/*取消特定的注册关系*/
key.cancel();
/*关闭通道*/
sc.close();
}
}
}
}
/*发送应答消息*/
private void doWrite(SocketChannel sc,String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
sc.write(buffer);
}
public void stop(){
started = false;
}
}
首先NioServerHandle
构成方法接受一个参数port
,也就是socket要绑定的本地端口,首先它创建了一个选择器实例,Selector就是IO多路复用中的多路复用器,Selector选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。
selector = Selector.open();
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
这个provider()本质是SelectorProviderl类,是一个抽象类,它定义了创建Selector和Channel实例的方法。不同的操作系统可能有不同的I/O机制和系统调用,因此SelectorProvider的实现类会根据当前平台的特性提供相应的Selector和Channel实例。创建Selector调用了抽象类Selector中的静态方法,open方法。这个方法的返回值是操作系统对应的选择器,这个与你虚拟机所在的系统相关),这里我们就不深纠了。创建好选择器之后就执行下面代码创建了一个ServerSocketChannel
对象。
serverSocketChannel = ServerSocketChannel.open();
我们看看这个open()静态方法底层做了一些什么事:
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
它同样调用了SelectorProviderl
的provider方法返回了一个适合本系统的Channel实现。然后下面就开始绑定端口了:
serverSocketChannel.socket().bind(new InetSocketAddress(port));
public abstract class ServerSocketChannel
extends AbstractSelectableChannel
implements NetworkChannel
{
public abstract ServerSocket socket();
}
根据上面代码我们知道serverSocketChannel内部是封装了ServerSocket的实现了的,所以通道的本质上就是在Socket的基础上封装了更多的操作。下面就是NIO特别的地方了,它将Channel注册到了Selector中:
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
register方法有两个参数,第一个是通道要注册的选择器,第二个参数就是选择器所关心的通道操作。这个是SelectionKey中定义的四个事件之一,也就是连接事件
。它实际上是一个表示选择器在检查通道就绪状态时需要关心的操作的比特掩码。如果 Selector 对通道的多操作类型感兴趣,可以用“位或”操作符来实现,SelectionKey.OP_READ|SelectionKey.OP_WRITE;。
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
注意:一个 Channel 仅仅可以被注册到一个 Selector 一次, 如果将 Channel 注册 到 Selector 多次, 那么其实就是相当于更新 SelectionKey 的 interest set。我们进入SelectionKey类:
public abstract int interestOps();
public abstract int readyOps();
interestOps
可以判断 Selector 是否对 Channel 的某种事件感兴趣, readyOps()来获取相关通道已经就绪的操作。然后还有下面两个方法:
public abstract SelectableChannel channel();
abstract Selector selector();
public abstract void cancel();
通过上面方法我们可以获取这个 SelectionKey 所关联的 Selector 和 Channel。 如果我们要取消关联关系,SelectionKey 对象的 cancel()方法来取消特定的注册关系。
上面我们服务端的ServerSocketChannel
就创建完了,通过上面我们知道上面的核心关键就是创建了Selector并将ServerSoceketChannel关联的SelectionKey注册到了Seletctor中了。下面回到NioServer
类,下面就是创建一个新的线程来开启服务端。
new Thread(nioServerHandle,"Server").start();
NioServerHandle
本身就是实现了Runable接口的,所以在上面创建的线程执行run方法的时候,会间接调用到NioServerHandle
的run方法,我们进入该方法。
@Override
public void run() {
while(started){
try {
/*获取当前有哪些事件*/
selector.select(1000);
/*获取事件的集合*/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
的键出现,这会导致我们尝试再次处理它。*/
iterator.remove();
handleInput(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
首先执行了selector.select(1000)
方法,该方法是一个阻塞方法,它会等待一段时间(以毫秒为单位),直到有一个或多个通道准备好进行 I/O 操作、超时时间到达或者当前线程被中断。底层实现会查询注册在 Selector 上的所有通道,检查它们是否处于就绪状态。就绪状态表示通道可以执行某种 I/O 操作,比如读取或写入数据。
当有通道处于就绪状态时,select() 方法会返回对应的通道数量,并且可以通过调用 selector.selectedKeys() 方法获取到这些就绪的 SelectionKey 集合。而在超时时间到达之前,如果没有通道处于就绪状态或者当前线程被中断,select() 方法也会提前返回,返回值为 0。
public int select(long var1) throws IOException {
if (var1 < 0L) {
throw new IllegalArgumentException("Negative timeout");
} else {
return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
}
}
select最底层也是调用的本地方法,而且它是线程安全的。我们这里只需要知道它会返回就绪的通道的数量。然后调用下面方法来获取所有的就绪的SelectionKey的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
public Set<SelectionKey> selectedKeys() {
if (!this.isOpen() && !Util.atBugLevel("1.4")) {
throw new ClosedSelectorException();
} else {
return this.publicSelectedKeys;
}
}
然后就迭代就绪的SelectionKey
,然后将该事件从集合中删除(表示这个事件已经被处理了),然后就调用了handleInput
来开始具体的处理。
private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()){
/*处理新接入的客户端的请求*/
if(key.isAcceptable()){
/*获取关心当前事件的Channel*/
ServerSocketChannel ssc
= (ServerSocketChannel) key.channel();
/*接受连接*/
SocketChannel sc = ssc.accept();
System.out.println("==========建立连接=========");
sc.configureBlocking(false);
/*关注读事件*/
sc.register(selector,SelectionKey.OP_READ);
}
/*处理对端的发送的数据*/
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
/*创建ByteBuffer,开辟一个缓冲区*/
ByteBuffer buffer = ByteBuffer.allocate(1024);
/*从通道里读取数据,然后写入buffer*/
int readBytes = sc.read(buffer);
if(readBytes>0){
/*将缓冲区当前的limit设置为position,position=0,
用于后续对缓冲区的读取操作*/
buffer.flip();
/*根据缓冲区可读字节数创建字节数组*/
byte[] bytes = new byte[buffer.remaining()];
/*将缓冲区可读字节数组复制到新建的数组中*/
buffer.get(bytes);
String message = new String(bytes,"UTF-8");
System.out.println("服务器收到消息:"+message);
/*处理数据*/
String result = Const.response(message);
/*发送应答消息*/
doWrite(sc,result);
}else if(readBytes<0){
/*取消特定的注册关系*/
key.cancel();
/*关闭通道*/
sc.close();
}
}
}
}
/*发送应答消息*/
private void doWrite(SocketChannel sc,String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
sc.write(buffer);
}
然后它会获得这个SelectionKey
所绑定的通道。首先我们可以发现有两个if判断这里就是该SelectionKey绑定的事件是读事件、写事件还是连接事件(注意连接事件是客户端的),如果是连接事件就获得ServerSocketChannel对象:
ServerSocketChannel ssc= (ServerSocketChannel) key.channel();
然后就可以执行下面代码来处理连接了,可以发现也是调用的accept方法,因为前面说过通道的底层是封装了Socket了的。
SocketChannel sc = ssc.accept();
sc.register(selector,SelectionKey.OP_READ);
可以发现ssc.accept()
,也就是说一旦有连接接入就会创建一个新的SocketChannel
对象,然后这个通道也要注册到selector中,绑定事件为读事件,这样就可以接受客户端发来的数据了。
如果SelectionKey绑定的事件是读事件,说明现在已经接受到了用户的数据了我们可以进行处理了,首先我们仍然是从SelectionKey获取对应的通道。
SocketChannel sc = (SocketChannel) key.channel();
然后从channel中读取数据,注意这里就是和BIO很大的不同的地方,它不是以流的形式读完所有数据,而是读到了一个buffer缓冲中。
ByteBuffer buffer = ByteBuffer.allocate(1024);
注意此时只是将数据读到了一个缓冲中,应用程序还没有处理数据,现在有了这个缓冲我们就可以很方便的处理接受到的数据了,注意此时buffer要调用flip方法来切换模式。flip 方法将 Buffer 从写模式切换到读模式。调用 flip()方法会将 position 设回 0,并将 limit设置成之前的position。
buffer.flip();
最后服务端向客户端发出回应
doWrite(sc,result);
以上就是服务端的大致工作中原理,下面我们看看客户端又是怎么工作的。
public class NioClient {
private static NioClientHandle nioClientHandle;
public static void start(){
nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);
//nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,8888);
new Thread(nioClientHandle,"client").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception{
nioClientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args) throws Exception {
start();
Scanner scanner = new Scanner(System.in);
while(NioClient.sendMsg(scanner.next()));
}
}
public class NioClientHandle implements Runnable{
private String host;
private int port;
private volatile boolean started;
private Selector selector;
private SocketChannel socketChannel;
public NioClientHandle(String ip, int port) {
this.host = ip;
this.port = port;
try {
/*创建选择器的实例*/
selector = Selector.open();
/*创建ServerSocketChannel的实例*/
socketChannel = SocketChannel.open();
/*设置通道为非阻塞模式*/
socketChannel.configureBlocking(false);
started = true;
} catch (IOException e) {
e.printStackTrace();
}
}
public void stop(){
started = false;
}
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
//获取当前有哪些事件可以使用
Set<SelectionKey> keys = selector.selectedKeys();
//转换为迭代器
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
的键出现,这会导致我们尝试再次处理它。*/
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null)
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
//具体的事件处理方法
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//获得关心当前事件的channel
SocketChannel sc = (SocketChannel) key.channel();
//连接事件
if(key.isConnectable()){
if(sc.finishConnect()){
socketChannel.register(selector,
SelectionKey.OP_READ);}
else System.exit(1);
}
//有数据可读事件
if(key.isReadable()){
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position,position=0,
// 用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
}
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
private void doWrite(SocketChannel channel,String request)
throws IOException {
//将消息编码为字节数组
byte[] bytes = request.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
/*关心事件和读写网络并不冲突*/
channel.write(writeBuffer);
}
private void doConnect() throws IOException{
/*非阻塞的连接*/
if(socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector,SelectionKey.OP_READ);
}else{
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
//写数据对外暴露的API
public void sendMsg(String msg) throws Exception{
doWrite(socketChannel, msg);
}
}
其实有了上面的基础,客户端就很简单了,只是多了一个连接事件,我们看看这部分:
if(socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector,SelectionKey.OP_READ);
}else{
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
这个if判断是很关键的,我们知道我们调用connect方法后,底层需要进行TCP的三次握手,如果网络状况不好的话,这个connect方法执行完毕后可能连接并没有执行完毕socketChannel.connect(new InetSocketAddress(host,port))
如果为false就说明连接没有建立完,所以需要创建一个通道来处理连接事件,否则我们就可以注册读事件通道来等待服务端回应。