java IO简介
I/O 全称Input/Output,即输入/输出,通常指数据在内部存储器和外部存储器或其他周边设备之间的输入/输出。
涉及 I/O 的操作,不仅仅局限于硬件设备的读写,还要网络数据的传输。无论是从磁盘中读写文件,还是在网络中传输数据,可以说 I/O 主要为处理人机交互、机与机交互中获取和交换信息提供的一套解决方案。
java中用“流(stream)”来抽象表示这么一个写入写出的功能,封装成一个“类”,都放在java.io包下,大致可以分成四组:
- 基于字节操作的 I/O 接口:InputStream 和 OutputStream
- 基于字符操作的 I/O 接口:Writer 和 Reader
- 基于磁盘操作的 I/O 接口:File
- 基于网络操作的 I/O 接口:Socket
前两组主要从传输数据的数据格式不同,进行分组;后两组主要从传输数据的方式不同,进行分组。
基于字节操作的接口
无论是输入还是输出,操作数据的方式可以组合使用,各个处理流的类并不是只操作固定的节点流,比如如下输出方式:
//将文件输出流包装到序列化输出流中,再将序列化输出流包装到缓冲中
OutputStream out = new BufferedOutputStream(new ObjectOutputStream(new FileOutputStream(new File("fileName")));
基于字符操作
不管是磁盘还是网络传输,最小的存储单元都是字节,而不是字符,所以 I/O 操作的都是字节而不是字符。基于字符的输入和输出操作接口分别是:Reader 和 Writer。
不管是 Reader 还是 Writer 类,它们都只定义了读取或写入数据字符的方式,也就是说要么是读要么是写,但是并没有规定数据要写到哪去,写到哪去就是我们后面要讨论的基于磁盘或网络的工作机制。
字节与字符转换
我们的程序中通常操作的数据都是以字符形式,而磁盘/网络传输的最小存储单位是字节,这是就需要字节/字符之间进行想换转换。InputStreamReader 和 OutputStreamWriter 就是转化桥梁。
输入流程的相互转换过程如下:
InputStreamReader 类是字节到字符的转化桥梁, 其中StreamDecoder指的是一个解码操作类,Charset指的是字符集。
InputStream 到 Reader 的过程需要指定编码字符集,否则将采用操作系统默认字符集,很可能会出现乱码问题,StreamDecoder 则是完成字节到字符的解码的实现类。
InputStream 到 Reader 转化过程源码如下:
public InputStreamReader(InputStream in) {
super(in);
sd = StreamDecoder.forInputStreamReader(in, this,
Charset.defaultCharset()); // ## check lock object
}
输出流转化过程如下图所示:
通过 OutputStreamWriter 类完成字符到字节的编码过程,由 StreamEncoder 完成编码过程。其部分源码如下:
public OutputStreamWriter(OutputStream out) {
super(out);
se = StreamEncoder.forOutputStreamWriter(out, this,
Charset.defaultCharset());
}
基于磁盘操作
数据通过字节流/字符流操作后,其中一个主要的处理方式就是将数据持久化到物理磁盘,数据在物理磁盘的唯一最小描述是文件,即上层应用程序只能通过文件来操作磁盘上的数据,文件也是操作系统和磁盘驱动器交互的一个最小单元。
在 Java I/O 体系中,File 类是唯一代表磁盘文件本身的对象。File 类定义了一些与平台无关的方法来操作文件,包括检查一个文件是否存在、创建、删除文件、重命名文件、判断文件的读写权限是否存在、设置和查询文件的最近修改时间等等操作。
值得注意的是 Java 中通常的 File 并不代表一个真实存在的文件对象,当你通过指定一个路径描述符时,它就会返回一个代表这个路径相关联的一个虚拟对象,这个可能是一个真实存在的文件或者是一个包含多个文件的目录。
读取一个文件内容,程序如下:
public static void main(String[] args) throws IOException {
StringBuilder sb = new StringBuilder();
char[] chars = new char[1024];
FileReader fileReader = new FileReader("d://fileTest.txt");
while (fileReader.read() >0){
sb.append(chars);
}
System.out.println(sb.toString());
}
当我们传入一个指定的文件名来创建file对象,通过 FileReader 来读取文件内容时,会自动创建一个FileInputStream对象来读取文件内容,也就是我们上文中所说的字节流来读取文件。
紧接着,会创建一个FileDescriptor的对象,其实这个对象就是真正代表一个存在的文件对象的描述。可以通过FileInputStream对象调用getFD() 方法获取真正与底层操作系统关联的文件描述。
由于我们需要读取的是字符格式,所以需要 StreamDecoder 类将byte解码为char格式,至于如何从磁盘驱动器上读取一段数据,由操作系统帮我们完成。
基于网络操作
数据写到何处的另一种处理方式是将数据写入互联网中以供其他电脑访问。
Socket 是描述计算机之间完成相互通信的一种抽象定义,典型的基于 Socket 通信的应用程序场景,如下图:
!
主机 A 的应用程序要想和主机 B 的应用程序通信,必须通过 Socket 建立连接,而建立 Socket 连接必须需要底层 TCP/IP 协议来建立 TCP 连接。
为了准确无误地把数据送达目标处,TCP 协议采用了三次握手策略,如下图:
其中,SYN 全称为 Synchronize Sequence Numbers,表示同步序列编号,是 TCP/IP 建立连接时使用的握手信号。
ACK 全称为 Acknowledge character,即确认字符,表示发来的数据已确认接收无误。
在客户机和服务器之间建立正常的 TCP 网络连接时,客户机首先发出一个 SYN 消息,服务器使用 SYN + ACK 应答表示接收到了这个消息,最后客户机再以 ACK 消息响应。
这样在客户机和服务器之间才能建立起可靠的 TCP 连接,数据才可以在客户机和服务器之间传递。
简单流程如下:
- 发送端 –(发送带有 SYN 标志的数据包 )–> 接受端(第一次握手);
- 接受端 –(发送带有 SYN + ACK 标志的数据包)–> 发送端(第二次握手);
- 发送端 –(发送带有 ACK 标志的数据包) –> 接受端(第三次握手);
完成三次握手之后,客户端应用程序与服务器应用程序就可以开始传送数据了。
当客户端要与服务端通信时,客户端首先要创建一个 Socket 实例,默认操作系统将为这个 Socket 实例分配一个没有被使用的本地端口号,并创建一个包含本地、远程地址和端口号的套接字数据结构,这个数据结构将一直保存在系统中直到这个连接关闭。
public class SocketClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1",9090);
//向服务端发送数据
PrintStream ps = new PrintStream(new BufferedOutputStream(socket.getOutputStream()));
//读取服务端返回的数据
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
ps.println("hello world!");
ps.flush();
String info = br.readLine();
System.out.println(info);
ps.close();
br.close();
}
}
与之对应的服务端,也将创建一个 ServerSocket 实例,ServerSocket 创建比较简单,只要指定的端口号没有被占用,一般实例创建都会成功,同时操作系统也会为 ServerSocket 实例创建一个底层数据结构,这个数据结构中包含指定监听的端口号和包含监听地址的通配符,通常情况下都是*即监听所有地址。
之后当调用 accept() 方法时,将进入阻塞状态,等待客户端的请求。
public class SocketServer {
public static void main(String[] args) throws IOException {
//初始化服务端端口9090
ServerSocket serverSocket = new ServerSocket(9090);
System.out.println("服务端已启动,端口为9090");
//开启循环监听
for (;;){
//等待客户端的链接
Socket accept = serverSocket.accept();
// 将字节流转化为字符流,读取客户端发来的数据
BufferedReader br = new BufferedReader((new InputStreamReader(accept.getInputStream()))) ;
//一行一行的读取客户端的数据
String s = br.readLine();
System.out.println("服务端收到客户端的信息:"+s);
}
}
}
执行结果:
当连接建立成功,服务端和客户端都会拥有一个socket实例,每个Socket实例都有一个INputStream和 OutputStream,正如我们前面所说的,网络 I/O 都是以字节流传输的,Socket 正是通过这两个对象来交换数据。
当 Socket 对象创建时,操作系统将会为 InputStream 和 OutputStream 分别分配一定大小的缓冲区,数据的写入和读取都是通过这个缓存区完成的。
写入端将数据写到 OutputStream 对应的 SendQ 队列中,当队列填满时,数据将被发送到另一端 InputStream 的 RecvQ 队列中,如果这时 RecvQ 已经满了,那么 OutputStream 的 write 方法将会阻塞直到 RecvQ 队列有足够的空间容纳 SendQ 发送的数据。
值得特别注意的是,缓存区的大小以及写入端的速度和读取端的速度非常影响这个连接的数据传输效率,由于可能会发生阻塞,所以网络 I/O 与磁盘 I/O 在数据的写入和读取还要有一个协调的过程,如果两边同时传送数据时可能会产生死锁的问题。
IO 模型
在计算机中,常见的IO模型有以下五种:
- 同步阻塞IO(BIO)
- 同步非阻塞IO(NIO)
- IO 多路复用
- 信号驱动IO模型
- 异步非阻塞IO(AIO)
阻塞IO 和非阻塞IO
阻塞指的是用户进程(或者线程)一直在等待,而不能干别的事情;阻塞IO 指的是需要内核IO操作彻底完成后,才返回到用户空间执行用户程序的操作指令。传统的IO模型都是阻塞的,并且在java中,默认创建的Socket 都属于阻塞IO 模型。
非阻塞是指用户进程(或者线程)拿到内核返回的状态值就返回自己的空间,可以去干别的事情。非阻塞IO 是指用户空间的程序不需要等待内核IO操作彻底完成,可以立即返回用户空间去执行后续的指令;即发起IO请求的用户进程/线程处于非阻塞的状态,与此同时,内核会立即返回给用户一个IO的状态值。
同步和异步
同步和异步可以看成是发起IO请求的两种方式。
同步IO是指用户空间(进程或线程)是主动发起IO请求的一方,系统内核是被动接受方;
异步IO 是系统内核主动发起IO请求的一方,用户空间是被动接受方。
同步阻塞IO(Blocking IO ,BIO)
同步阻塞IO ,指的是用户空间(或者线程)主动发起,需要等待内核 IO 操作彻底 完成后,才返回到用户空间的 IO 操作, IO 操作过程中,发起 IO 请求的用户进程(或者线程) 处于阻塞状态。
BIO 是一个连接一个线程,客户端有连接请求时服务端就需要启动一个线程进行处理,线程开销很大。传统的BIO可以用下图表示:
!
在阻塞IO模型中,java应用程序从发起IO系统调用开始,一直到系统调用返回,在这段时间内,发起IO请求的java进程或线程是阻塞的,直到返回成功后,应用程序才能开始处理用户空间的缓冲区数据。BIO的具体流程如下图所示:
其示例代码:
package com.th.io;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BIOServer {
public static void main(String[] args) throws IOException {
//创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//创建serverSocket
ServerSocket serverSocket = new ServerSocket(8888);
System.out.println("服务端已启动,端口为8888....");
for (;;){
System.out.println("线程的信息 id="+Thread.currentThread().getId()+ " 名称="+Thread.currentThread().getName());
System.out.println("等待连接...");
//监听
final Socket accept = serverSocket.accept();
System.out.println("有一个客户端链接。。。。");
executorService.execute(new Runnable() {
@Override
public void run() {
handler(accept);
}
});
}
}
private static void handler(Socket socket) {
try{
byte[] bytes = new byte[1024];
//获取输入流
InputStream inputStream = socket.getInputStream();
for (;;){
System.out.println("线程的信息 id="+Thread.currentThread().getId()+ " 名称="+Thread.currentThread().getName());
System.out.println("read...");
int read = inputStream.read(bytes);
if (read != -1){
System.out.println(new String(bytes,0,read));
}else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
启动服务控制台输出:
cmd命令行中输入telnet 127.0.0.1 8888 回车;
按住ctrl + ] 回车;输入数据send 100ok,控制台输出:
BIO的特点是每个请求都需要独立的线程完成数据read,业务处理,数据wrtie的完整操作。
BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择。
但是BIO 还存在以下两个问题:
(1)当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大;
(2)连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在read操作上,造成线程资源浪费。
同步非阻塞IO (Non-Blocking IO,NIO)
同步非阻塞NIO 指的是用户进程主动发起,不需要等待内核 IO 操作彻底完成之后, 就能立即返回到用户空间的IO操作。 IO 操作过程中发起 IO 请求的用户进程(或者线程) 处于非阻塞状态。
在 Java 1.4 中引入,对应的在java.nio包下。
在Linux 系 统下, socket 连接默认是阻塞模式,可以通过设置将 socket 变成为非阻塞的模 式( Non Blocking )。
在 NIO 模型中,应用程序一旦开始 IO 系统调用,会出现以下两种情况:
( 1 )在内核缓冲区中没有数据的情况下,系统调用会立即返回,返回一个调用失败的 信息。
( 2 )在内核缓冲区中有数据的情况下,在数据的复制过程中系统调用是阻塞的,直到 完成数据从内核缓冲复制到用户缓冲。复制完成后,系统调用返回成功,用户进程(或者线 程)可以开始处理用户空间的缓存数据。
同步非阻塞IO的流程如下:
NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。JDK1.4 开始支持。
同步非阻塞IO 的特点:应用程序的线程需要不断地进行 IO 系统调用,轮询数据是否已 经准备好,如果没有准备好,就继续轮询,直到完成 IO 系统调用为止。
同步非阻塞 IO 的优点:每次发起的 IO 系统调用,在内核等待数据过程中可以立即返回。 用户线程不会阻塞,实时性较好。
同步非阻塞 IO 的缺点:不断地轮询内核,这将占用大量的 CPU 时间,效率低下。
IO多路复用(IO Multiplexing)
IO多路复用是同步非阻塞IO 的升级,主要解决同步非阻塞NIO中轮询等待的问题。IO多路复用( IO Multiplexing 是高性能 Reactor 线程模型的基础 IO 模型。
在IO多路复用模型中,引入了新的系统调用, Linux 系统中新的系统调用为 select/epoll 系统调用。通过该系统调用,一个用户进程(或者线程)可以监视多个文件描述符,一旦某个描述符就绪(一般 是内核缓冲区可读 可写),内核能够将文件描述符的就绪状态返回给用户进程(或者线程), 用户空间可以根据文件描述符的就绪状态,进行相应的 IO 系统调用。
目前支持IO 多路复用的系统 调用,有 select 、 epoll 等等。 select 系统调用,几乎在所有的 操作系统上都有支持,具有良好的跨平台特性,但是socket连接数最大是1024个。 epoll 是在 Linux 2.6 内核中提出的,是 select 系统调用的 Linux 增强版本。
多路复用模型的流程。发起一个多路复用IO的 sys_read 读操作的系统调用,流程如下:
(1)选择器注册。在这种模式中,首先,将需要 sys_read 操作的目标文件描述符( socket连接),提前注册到 Linux 的 select/epoll 选择器中,在 Java 中所对应的选择器类是 Selector 类。 然后,才可以开启整个 IO 多路复用模型的轮询流程。
(2)就绪状态的轮询。通过选择器的查询方法,查询所有的提前注册过的目标文件描述符( socket 连接)的IO就绪状态。通过查询的系统调用,内核会返回一个就绪的 socket 列 表。当任何一个注册过的 socket 中的数据准备好或者就绪了,就是内核缓冲区有数据了,内核就将该 socket 加入到就绪的列表中,并且返回就绪事件。
(3)用户线程获得了就绪状态的列表后,根据其中的 socket 连接,发起 sys_read 系统调用,用户线程阻塞。内核开始复制数据,将数据从内核缓冲区复制到用户缓冲区。
(4)复制完成后,内核返回结果,用户线程才会解除阻塞的状态,用户线程读取到了数据,继续执行。
IO多路复用模型的 sys_read 系统调用流程,如下图所示:
IO多路复用模型的特点: IO 多路复用模型的 IO 涉及两种系统调用,一种是 IO 操作的系统调用,另一种是 select/epoll 就绪查询系统调用。 IO 多路复用模型建立在操作系统的基础设 施之上,即操作系统的内核必须能够提供多路分离的系统调用 select/epoll 。
IO多路复用模型的优点:一个选择器查询线程,可以同时处理成千上万的网络连接, 所以,用户程序不必创建大量的线程,也不必维护这些线程,从而大大减小了系统的开销。 这是一个线程维护一个连接的阻塞 IO 模式相 比,使用多路 IO 复用模型的最大优势。
IO多路复用模型的缺点:本质上, select/epoll 系统调用是阻塞式的,属于同步阻塞 IO 。 都需要在读写事件就绪后,由系统调用本身负责进行读写,也就是说这个事件的查询过程是 阻塞的。
Java NIO
NIO原理图如下:
Java NIO类库包含以下三个核心组件:
- Channel(通道)
- Buffer(缓冲区)
- Selector(选择器)
通道Channel
NIO 的核心是通道和缓存区,它们之间的工作模式如下所示:
通道有点类似 IO 中的流,但不同的是,同一个通道既允许读也允许写,而任意一个流要么是读流要么是写流。
通道和流一样都是需要基于物理文件的,而每个流或者通道都通过文件指针操作文件,这里说的通道是双向的也是有前提的,那就是通道基于随机访问文件RandomAccessFile的可读可写文件指针。
NIO中的 Channel 的主要实现有:
- FileChannel :用于文件IO操作
- DatagramChannel :用于UDP的IO操作
- SocketChannel :用于TCP 的传输操作
- ServerSocketChannel :用于TCP连接监听操作
通道不能单独存在,它永远需要绑定一个缓存区,所有的数据只会存在于缓存区中,无论你是写或是读,必然是缓存区通过通道到达磁盘文件,或是磁盘文件通过通道到达缓存区。即缓存区是数据的起点,也是终点。
缓存区Buffer
NIO的 Buffer (缓冲区)本质上是一个内存块,既可以写入数据,也可以从中读取数据。 Java NIO 中代表缓冲区的 Buffer 类是一个抽象类,位于 java.nio 包中;所有缓冲区都是Buffer抽象类的子类。
Channel 提供从文件、网络读取数据的渠道,但是读写的数据都必须经过 Buffer 。所以Buffer 主要用于和 NIO 通道进行交互,数据是从通道读入到缓冲区的,然后从缓冲区中写入到通道中的。
Buffer 就像一个数组,可以保存多个相同类型的数据。根据数据类型的不同(boolean)除外,有以下 Buffer 常用子类:ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。上述 Buffer 类他们都是通过相似的方法进行管理数据的,只是各自管理的数据类型不同而已。都是通过如下的方法获取一个 Buffer 对象:
public static XxxBuffer allocate(int capacity) {}
缓冲区的基本属性如下:
容量(capacity):表示 Buffer 最大数据容量,缓冲区容量不能为负,并且一旦创建不能更改。
限制(limit):表示可以写入或者读取的最大上限。与缓冲区的读写模式有关,在不同的模式下, limit 的值的含义是不同的,具体分为以下两种情况:
(1 )在写入模式下 limit 属性值的含义为可以写入的数据最大上限。在刚进入到写入 模式时, limit 的值会被设置成缓冲区的 capacit y 容量值,表示可以一直将缓冲区的容量写满。
( 2 )在读取模式下 limit 的值含义为最多能从缓冲区中读取到多少数据。
位置(position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制。 position 属性的值与缓冲区的读写模式有关。在不同的模式下, position 属性值的含义是不同的,在缓冲区进行读写的模式改变时, position 值会进行相应的调整。
在写入模式下, position 的值变化规则如下:
( 1 )在 刚进入到写入模式时, position 值为 0 ,表示当前的写入位置为从头开始。
( 2 )每当一个数据写到缓冲区之后 position 会向后移动到下一个可写的位置。
( 3 )初始的 position 值为 0 ,最大可写值为 limit 1 。当 position 值达到 limit 时,缓冲区就 已经无空间可写了。
在读模式下, position 的值变化规则如下:
( 1 )当缓冲区刚开始进入到读取模式时 position 会被重置为 0 。
( 2 )当从缓冲区读取时,也是从 position 位置开始读。读取数据后, position 向前移动到 下一个可读 的位置。
( 3 )在读模式下 limit 表示可以读上限。 position 的最大值,为最大可读上限 limit ,当 position 达到 limit 时,表明缓冲区已经无数据可读。
标记(mark)和重置(reset):标记是一个索引,通过Buffer中的mark()方法指定Buffer中的一个特定的position,之后可以通过调用reset()方法恢复到这个position。
简而言之:0 <= mark <= position <= limit <= capacity。
Selector(选择器)
Selector 被称为选择器 (多路复用器),其使命是完成IO多路复用,其主要功能通道的注册、监听、事 件查询。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的。它是Java NIO 核心组件的其中之一,用于检查一个或多个 Channel(通道)的状态是否处于连接就绪、接受就绪、可读就绪、可写就绪。
如此可以实现单线程管理多个 channels,也就是可以管理多个网络连接。
使用 Selector 的好处在于: 相比传统方式使用多个线程来管理 IO,Selector 使用了更少的线程就可以处理通道了,并且实现网络高效传输!
创建一个选择器一般是通过 Selector 的工厂方法,Selector.open :
Selector selector = Selector.open();
而一个通道想要注册到某个选择器中,必须调整模式为非阻塞模式,通道和选择器之间的关联,通过register (注册)的方式完成。调用通道的代码如下:
//创建一个 TCP 套接字通道
SocketChannel channel = SocketChannel.open();
//调整通道为非阻塞模式
channel.configureBlocking(false);
//向选择器注册一个通道
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
注册一个通道到选择器中的最简单版本,支持注册选择器的通道都有一个 register 方法,该方法就是用于注册当前实例通道到指定选择器的。register()方法有两个参数,第一个参数就是指定通道注册到的选择器实例;第二个参数,指定选择器要监控的 IO 事件类型。
可供选择器监控的通道IO 事件类型,包括以下四种:
- 连接:SelectionKey.OP_CONNECT
- 接收:SelectionKey.OP_ACCEPT
- 可读:SelectionKey.OP_READ
- 可写:SelectionKey.OP_WRITE
以枚举类型提供了以下几种取值:
- int OP_READ = 1 << 0: 读取就绪事件,第0位
- int OP_WRITE = 1 << 2: 写入就绪事件,第2为
- int OP_CONNECT = 1 << 3:传输通道建立成功的IO事件,第3位
- int OP_ACCEPT = 1 << 4:新连接就绪事件,第4位
register 方法会返回一个 SelectionKey 实例,该实例代表的就是选择器与通道的一个关联关系。你可以调用它的 selector 方法返回当前相关联的选择器实例,也可以调用它的 channel 方法返回当前关联关系中的通道实例。
Selector并不直接去管理 Channel ,而是直接管理 SelectionKey ,通过 SelectionKey 与 Channel 发生关系 。而 Java NIO 源码中规定了,一个 Channel 最多能向 Selector 注册一次,注册之后就形成了唯一的 SelectionKey 然后被Selector管理起来。
Selector 有一个核心成员 keys 专门用于管理注册上来的 SelectionKey; Channel 注册到 Selector 后所创建的那一个唯一的 SelectionKey ,添加在这个 keys 成员中 ,这是一个 HashSet 类型的集合 。 除了成员 keys 之外, Selector还有一个核心成员selectedKeys ,用于存放已经发生了IO事件的 SelectionKey 。
一旦某个选择器中注册了多个通道,我们不可能一个一个的记录它们注册时返回的 SelectionKey 实例来监听通道事件,选择器应当有方法返回所有注册成功的通道相关的 SelectionKey 实例。
Set<SelectionKey> selectionKeySets = selector.selectedKeys();
selectedKeys方法会返回选择器中注册成功的所有通道的 SelectionKey 实例集合。
通道和选择器的监控关系注册成功后,Selector 就可以查询就绪事件。具体的查询操作是通过调用选择器 Selector 的 select( )系列方法来完成。通过 select 系列方法,选择器会通过 JNI 去进行底层操作系统的系统调用(比如 select /epoll ),可以不断地查询通道中所发生操作的就绪状态 (或者 IO 事件), 并且把这些发生了 底层 IO 事件 ,转换成 Java NIO 中的 IO 事件,记录在的通道关联的 SelectionKey 的 readyOps 上。
Selector在Linux的实现类是EPollSelectorImpl,委托给EPollArrayWrapper实现,其中三个native方法是对epoll的封装,而EPollSelectorImpl. implRegister方法,通过调用epoll_ctl向epoll实例中注册事件,还将注册的文件描述符(fd)与SelectionKey的对应关系添加到fdToKey中,这个map维护了文件描述 符与SelectionKey的映射。
fdToKey有时会变得非常大,因为注册到Selector上的Channel非常多(百万连接);过期或失效的Channel没有及时关闭。fdToKey总是串行读取的,而读取是在select方法中进行的,该方法是非线程安全的。
与OIO 相比, NIO 使用选择器的最大优势:系统开销小,系统不必为每一个网络连接(文件描述符)创建进程线程,从而大大减小了系统的开销。
总之, 通过 Java NIO 可以达到一个线程负责多个连接通道的 IO 处理, 这是非常高效的 。 这种高效 ,恰恰就来自于Java的选择器组件 Selector 以及其底层的操作系统 IO 多路复用技术 的支持。
NIO 实现同步非阻塞原理
一个线程Thread使用一个选择器Selector监听多个通道Channel 上的IO事件,从而让一个线程就可以处理多个IO事件。 通过配置监听的通道Channel为非阻塞,那么当Channel上的IO事件还未到达时,线程会在select方法被挂起,让出CPU资源;直到监听到Channel有IO事件发生时,才会进行相应的响应和处理。
Selector能够检测到多个注册的通道上是否有IO事件发生(多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,以便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
Selector只有在通道上有真正的IO事件发生时,才会进行相应的处理,这就不必为每个连接都创建一个线程,避免线程资源的浪费和多线程之间的上下文切换导致的开销。
示例服务端代码如下:
package com.th.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
//通道管理器(Selector)
private static Selector selector;
public static void main(String[] args) throws IOException {
//创建通道管理器(Selector)
selector = Selector.open();
//创建通道SeverSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//将通道设置为非阻塞
serverSocketChannel.configureBlocking(false);
//将severSocketChannel 对应的ServerSocket绑定到指定端口(port)
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(9090));
/**
* 将通道(Channel)注册到通道管理器(Selector),并为该通道注册selectionKey.OP_ACCEPT事件
* 注册该事件后,当事件到达的时候,selector.select()会返回,
* 如果事件没有到达selector.select()会一直阻塞。
*/
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
//循环处理
for (;;){
//当注册时间达到时,方法返回,否则改方法会一直阻塞
selector.select();
//获取监听时间
Set<SelectionKey> selectionKeySets = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeySets.iterator();
//迭代处理
while (iterator.hasNext()){
//获取事件
SelectionKey key = iterator.next();
//移除事件,避免重复处理
iterator.remove();
//检查是否是一个就绪的可以被接受的客户端请求连接
if (key.isAcceptable()){
handleAccept(key);
}else {
handleRead(key);
}
}
}
}
/**
* 处理客户端连接成功事件
* @param key
*/
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
//从通道读取数据到缓冲区
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
//输出客户端发送过来的消息
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("server received msg from client:"+msg);
}
/**
* 监听到读事件,读取客户端发送过来的消息
* @param key
*/
private static void handleAccept(SelectionKey key) throws IOException {
//获取客户端连接通道
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
//信息通过通道发生给客户端
String msg = "Hello Client";
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
//给通道设置读事件,客户端监听到读事件后,进行读取操作
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
客户端代码如下:
package com.th.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioClient {
//通道管理器(Selector)
private static Selector selector;
public static void main(String[] args) throws IOException {
//创建通道管理器(selector)
selector = Selector.open();
//创建通道SocketChannel
SocketChannel channel = SocketChannel.open();
//将通道设置为非阻塞
channel.configureBlocking(false);
//客户端连接服务器,其实方法执行并没有实现连接,需要在handleConnect方法中调用channel。finishConnect才能完成
channel.connect(new InetSocketAddress("127.0.0.1",9090));
//将通道(channel)注册到通道管理器(Selector),并为该通道注册selectionKey。OP_CONNECT
//注册该事件后,当事件到达的时候,selector.select()会返回,如果事件没有到达selector.select()会一直阻塞
channel.register(selector, SelectionKey.OP_CONNECT);
//循环处理
for (;;){
//选择一组可以进行I/O操作的事件,放在selector中,客户端的方法不会阻塞,seletor的wakeup方法被调用,方法返回。
//而对于客户端来说,通道一直是被选中的,
selector.select();
//获取监听事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//迭代处理
while(iterator.hasNext()){
//获取事件
SelectionKey key = iterator.next();
//移除事件,避免重复处理
iterator.remove();
//检查是否是一个就绪的已经连接服务端成功事件
if (key.isConnectable()){
handleConnect(key);
}else{
handleRead(key);
}
}
}
}
/**
* 处理客户端连接成功事件
* @param key
*/
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
//从通道读取数据到缓冲区
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
//输出服务端响应发送过来的消息
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("服务端发来的消息:"+msg);
}
/**
* 监听到读事件,读取客户端发送过来的消息
* @param key
*/
private static void handleConnect(SelectionKey key) throws IOException {
//获取与服务端建立连接的通道
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()){
// channel.finishConnect()才能完成连接
channel.finishConnect();
}
channel.configureBlocking(false);
//数据写入通道
String msg = "Hello Sever";
channel.write(ByteBuffer.wrap(msg.getBytes()));
//通道注册到选择器,并且这个通道只对读事件感兴趣
channel.register(selector,SelectionKey.OP_READ);
}
}
客户端执行结果如下:
服务端执行结果:
信号驱动IO模型
在信号驱动IO 模型中,用户线程通过向核心注册 IO 事件的回调函数,来避免IO 时间查询的阻塞。
具体的做法是,用户进程预先在内核中设置一个回调函数,当某个事件发生时,内核使用信号( SIGIO )通知进程运行回调函数。 然后用户线程会继续执行,在信号回调函数中调用 IO 读写操作来进行实际的 IO 请求操作。
信号驱动 IO 的基本流程是:用户进程通过系统调用,向内核注册 SIGIO 信号的 owner 进程和以及进程内的回调函数。内核IO事件发生后,通知用户程序,用户进程通过 sys_read 系统调用,将数据复制到用户空间,然后执行业务逻辑。
信号驱动IO模型,每当套接字发生 IO 事件时,系统内核都会向用户进程发送 SIGIO 事件, 所以,一般用于 UDP 传输,在 TCP 套接字的开发过程中很少使用,原因是 SIGIO 信号产生得 过于频繁,并且内核发送的 SIGIO 信号,并没有告诉用户进程发生了什么 IO 事件。
但是在 UDP 套接字 上, 通过 SIGIO 信号进行下面两个事件的类型判断即可 :
(1 )数据报到达套接字
(2 )套接字上 发生 错误
信号驱动IO 优势:用户进程在等待数据时,不会被阻塞,能够 提高 用户进程的效率。具体来说:在信号驱动式 I/O 模型中,应用程序使用套接口进行信号驱动 I/O ,并安装一个信号处理函数,进程继续运行并不阻塞。
信号驱动IO缺点:
( 1 )在大量 IO 事件发生时,可能会由于处理不过来,而导致信号队列溢出。
(2)对于处理 UDP 套接字来讲,对于信号驱动 I/O 是有用的。可是,对于 TCP 而言,由于致使 SIGIO 信号通知的条件为数众多,进行 IO 信号进一步区分的成本太高,信号驱动的 I/O 方式近乎无用。
(3 )信号驱动 IO 可以看成是一种异步 IO ,可以简单理解为系统进行用户函数的回调。
只是 ,信号驱动IO的异步特性,又做的不彻底。 是因为信号驱动 IO 仅仅在 IO 事件的通知阶段是异步的, 而在第二阶段也就是在将数据从内核缓冲区复制到用户缓冲区这个过程, 用户进程是阻塞的、同步的。
异步IO(Asynchronous IO)
从 Java 1.7开始,Java 提供了 AIO(异步I/O)。Java AIO 也被称为 NIO2.0,提供了异步I/O的方式,用法和标准的I/O有非常大的差异。
异步IO ,指的是用户空间与内核空间的调用方式大反转 。用户空间的线程变成被动接受者,而内核空间成了主动调用者。
在异步 IO 模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户缓冲区内,内核在 IO 完成后通知用户线程直接使用即可。
异步 IO 类似于 Java 中典型的回调模式,用户进程(或者线程)向内核空间注册了各种 IO 事件的回调函数,由内核去主动调用。
Java AIO 采用订阅-通知模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。
AIO 的基本流程是:用户线程通过系统调用,向内核注册某个 IO 操作。内核在整个 IO 操作(包括数据准备、数据复制)完成后, 通知用户程序,用户执行后续的业务操作。
在异步IO 模型中,在整个内核的数据处理过程中,包括内核将数据从网络物理设备(网卡)读取到内核缓冲区、将内核缓冲区的数据复制到用户缓冲区,用户程序都不需要阻塞。
异步IO 模型的流程,如图:
异步IO 模型的特点:在内核等待数据和复制数据的两个阶段,用户线程都不是阻塞的。 用户线程需要接收内核的 IO 操作完成的事件,或者用户线程需要注册一个 IO 操作完成的回 调函数。正因为如此,异步 IO 有的时候也被称为信号驱动 IO 。
异步 IO 异步模型的缺点:应用程序仅需要进行事件的注册与接收,其余的工作都留给 了操作系统,也就是说,需要底层内核提供支持 。 理论上来说,异步 IO 是真正的异步输入输出,它的吞吐量高于 IO 多路复用模型的吞吐 量。
就目前而言, Windows 系统下通过 IOCP 实现了真正的异步 IO 。而在 Linux 系统下,异步 IO 模型在 2.6 版本才引入, J DK 的对其的支持目前并不完善,因此异步 IO 在性能上没有明显的优势。
服务端示例代码:
package aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
public class AioServer {
private static AsynchronousServerSocketChannel serverSocketChannel;
public void listen() throws Exception{
//打开一个服务端通道
serverSocketChannel = AsynchronousServerSocketChannel.open();
//监听9988
serverSocketChannel.bind(new InetSocketAddress(9988));
//监听
serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AioServer>() {
@Override
public void completed(AsynchronousSocketChannel result, AioServer attachment) {
try {
if (result.isOpen()){
System.out.println("接收到新的客户端的连接,地址:"+result.getRemoteAddress());
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取客户端发生的数据
result.read(byteBuffer, result, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
@Override
public void completed(Integer result, AsynchronousSocketChannel attachment) {
try {
//读取请求,处理客户端发送的数据
byteBuffer.flip();
String content = Charset.defaultCharset().decode(byteBuffer).toString();
System.out.println("服务端接收到客户端发来的数据:"+content);
//向客户端发送数据
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put("server send ".getBytes());
writeBuffer.flip();
attachment.write(writeBuffer).get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
try{
exc.printStackTrace();;
attachment.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
} catch (IOException e) {
e.printStackTrace();
}finally {
//当有新的客户端接入的时候,直接调用accept的方法,递归执行下去,这样可以保证多个客户端都可以阻塞
attachment.serverSocketChannel.accept(attachment, this);
}
}
@Override
public void failed(Throwable exc, AioServer attachment) {
exc.printStackTrace();
}
});
}
public static void main(String[] args) throws Exception {
new AioServer().listen();
Thread.sleep(Integer.MAX_VALUE);
}
}
客户端示例代码如下:
package aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
public class AioClient {
public static void main(String[] args) throws IOException, InterruptedException {
//打开一个客户端通道
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
//与服务端建立连接
channel.connect(new InetSocketAddress("127.0.0.1",9988));
//睡眠一秒,等待与服务端的连接
Thread.sleep(1000);
try {
//向服务端发送数据
channel.write(ByteBuffer.wrap("Hello ,I am client".getBytes())).get();
} catch (ExecutionException e) {
e.printStackTrace();
}
try{
//从服务端读取返回的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
//将通道中的数据写入缓冲buffer
channel.read(buffer).get();
buffer.flip();
String result = Charset.defaultCharset().newDecoder().decode(buffer).toString();
//服务端返回的数据
System.out.println("客户端收到服务端返回的内容:"+result);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
NIO 和BIO 的区别
(1)NIO 是以块的方式处理数据,BIO是以字节流或者字符流的形式处理数据的。
(2)NIO 是通过缓存区和通道的方式处理数据,BIO 是通过InputStream 和OutputStream 流的方式处理数据;
(3)NIO 是双向的,BIO的方向只能是单向的。
(4)NIO 采用的多路复用的同步阻塞IO 模型,BIO采用的是普通的同步阻塞IO 模型;
(5)NIO的效率比BIO公安,NIO 适用于网咯IO,BIO 适用于文件IO;