大家好,我是老三,上一节我们讨论了Linux的五种IO模型,接下来,我们从Java语言层面,来看看对IO的实现。
在Java中,一共有三种IO模型,分别是阻塞IO(BIO)
、非阻塞IO(NIO)
和异步IO(AIO)
。
Java BIO
Java BIO
就是Java的传统IO模型,对应了操作系统IO模型里的阻塞IO。
Java BIO
相关的实现都位于java.io
包下,其通信原理是客户端、服务端之间通过Socket
套接字建立管道连接,然后从管道中获取对应的输入/输出流,最后利用输入/输出流对象实现发送/接收信息。
我们来看个Demo:
- BioServer:
/**
* @Author 三分恶
* @Date 2023/4/30
* @Description BIO服务端
*/
public class BioServer {
public static void main(String[] args) throws IOException {
//定义一个ServerSocket服务端对象,并为其绑定端口号
ServerSocket server = new ServerSocket(8888);
System.out.println("===========BIO服务端启动================");
//对BIO来讲,每个Socket都需要一个Thread
while (true) {
//监听客户端Socket连接
Socket socket = server.accept();
new BioServerThread(socket).start();
}
}
/**
* BIO Server线程
*/
static class BioServerThread extends Thread{
//socket连接
private Socket socket;
public BioServerThread(Socket socket){
this.socket=socket;
}
@Override
public void run() {
try {
//从socket中获取输入流
InputStream inputStream=socket.getInputStream();
//转换为
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(inputStream));
String msg;
//从Buffer中读取信息,如果读取到信息则输出
while((msg=bufferedReader.readLine())!=null){
System.out.println("收到客户端消息:"+msg);
}
//从socket中获取输出流
OutputStream outputStream=socket.getOutputStream();
PrintStream printStream=new PrintStream(outputStream);
//通过输出流对象向客户端传递信息
printStream.println("你好,吊毛!");
//清空输出流
printStream.flush();
//关闭socket
socket.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- BioClient
/**
* @Author 三分恶
* @Date 2023/4/30
* @Description BIO客户端
*/
public class BioClient {
public static void main(String[] args) throws IOException {
List<String> names= Arrays.asList("帅哥","靓仔","坤坤");
//通过循环创建多个多个client
for (String name:names){
//创建socket并根据IP地址与端口连接服务端
Socket socket=new Socket("127.0.0.1",8888);
System.out.println("===========BIO客户端启动================");
//从socket中获取字节输出流
OutputStream outputStream=socket.getOutputStream();
//通过输出流向服务端传递信息
String hello="你好,"+name+"!";
outputStream.write(hello.getBytes());
//清空流,关闭socket输出
outputStream.flush();
socket.shutdownOutput();
//从socket中获取字节输入流
InputStream inputStream=socket.getInputStream();
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(inputStream));
//读取服务端消息
String msg;
while((msg=bufferedReader.readLine())!=null){
System.out.println("收到服务端消息:"+msg);
}
inputStream.close();
outputStream.close();
socket.close();
}
}
}
- 先启动
BioServer
,再启动BioClient
,运行结果
===========BIO服务端启动================
收到客户端消息:你好,帅哥!
收到客户端消息:你好,靓仔!
收到客户端消息:你好,坤坤!
===========BIO客户端启动================
收到服务端消息:你好,吊毛!
===========BIO客户端启动================
收到服务端消息:你好,吊毛!
===========BIO客户端启动================
收到服务端消息:你好,吊毛!
在上述Java-BIO
的通信过程中,如果客户端一直没有发送消息过来,服务端则会一直等待下去,从而服务端陷入阻塞状态。同理,由于客户端也一直在等待服务端的消息,如果服务端一直未响应消息回来,客户端也会陷入阻塞状态。
在BioServer
定义了一个类BioServerThread
,继承了Thread
类,run
方法里主要是通过socket和流来读取客户端的消息,以及发送消息给客户端,每处理一个客户端的Socket连接,就得新建一个线程。
同时,IO读写操作也是阻塞的,如果客户端一直没有发送消息过来,线程就会进入阻塞状态,一直等待下去。
在BioClient
里,循环创建Socket
,向服务端收发消息,客户端的读写也是阻塞的。
在这个Demo里就体现了BIO的两个特点:
- 一个客户端连接对应一个处理线程
- 读写操作都是阻塞的
毫无疑问,不管是创建太多线程,还是阻塞读写,都会浪费服务器的资源。
Java NIO
那么我们就进入Java的下一种IO模型——Java NIO
,它对应操作系统IO模型中的多路复用IO,底层采用了epoll
实现。
Java-NIO
则是JDK1.4
中新引入的API
,它在BIO
功能的基础上实现了非阻塞式的特性,其所有实现都位于java.nio
包下。NIO
是一种基于通道、面向缓冲区的IO
操作,相较BIO
而言,它能够更为高效的对数据进行读写操作,同时与原先的BIO
使用方式也大有不同。
我们还是先来看个Demo:
- NioServer
/**
* @Author 三分恶
* @Date 2023/4/30
* @Description NIO服务端
*/
public class NioServer {
public static void main(String[] args) throws IOException {
//创建一个选择器selector
Selector selector= Selector.open();
//创建serverSocketChannel
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
//绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//必须得设置成非阻塞模式
serverSocketChannel.configureBlocking(false);
//将channel注册到selector并设置监听事件为ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("===========NIO服务端启动============");
while(true){
//超时等待
if(selector.select(1000)==0){
System.out.println("===========NIO服务端超时等待============");
continue;
}
// 有客户端请求被轮询监听到,获取返回的SelectionKey集合
Iterator<SelectionKey> iterator=selector.selectedKeys().iterator();
//迭代器遍历SelectionKey集合
while (iterator.hasNext()){
SelectionKey key=iterator.next();
// 判断是否为ACCEPT事件
if (key.isAcceptable()){
// 处理接收请求事件
SocketChannel socketChannel=((ServerSocketChannel) key.channel()).accept();
//非阻塞模式
socketChannel.configureBlocking(false);
// 注册到Selector并设置监听事件为READ
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("成功连接客户端");
}
//判断是否为READ事件
if (key.isReadable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
// 获取以前设置的附件对象,如果没有则新建一个
ByteBuffer buffer = (ByteBuffer) key.attachment();
if (buffer == null) {
buffer = ByteBuffer.allocate(1024);
key.attach(buffer);
}
// 清空缓冲区
buffer.clear();
// 将通道中的数据读到缓冲区
int len = socketChannel.read(buffer);
if (len > 0) {
buffer.flip();
String message = new String(buffer.array(), 0, len);
System.out.println("收到客户端消息:" + message);
} else if (len < 0) {
// 接收到-1,表示连接已关闭
key.cancel();
socketChannel.close();
continue;
}
// 注册写事件,下次向客户端发送消息
socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);
} catch (IOException e) {
// 取消SelectionKey并关闭对应的SocketChannel
key.cancel();
socketChannel.close();
}
}
//判断是否为WRITE事件
if (key.isWritable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
//获取buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
String hello = "你好,坤坤!";
//清空buffer
buffer.clear();
//buffer中写入消息
buffer.put(hello.getBytes());
buffer.flip();
//向channel中写入消息
socketChannel.write(buffer);
buffer.clear();
System.out.println("向客户端发送消息:" + hello);
// 设置下次读写操作,向 Selector 进行注册
socketChannel.register(selector, SelectionKey.OP_READ, buffer);
}
// 移除本次处理的SelectionKey,防止重复处理
iterator.remove();
}
}
}
}
- NioClient
public class NioClient {
public static void main(String[] args) throws IOException {
// 创建SocketChannel并指定ip地址和端口号
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));
System.out.println("==============NIO客户端启动================");
// 非阻塞模式
socketChannel.configureBlocking(false);
String hello="你好,靓仔!";
ByteBuffer buffer = ByteBuffer.wrap(hello.getBytes());
// 向通道中写入数据
socketChannel.write(buffer);
System.out.println("发送消息:" + hello);
buffer.clear();
// 将channel注册到Selector并监听READ事件
socketChannel.register(Selector.open(), SelectionKey.OP_READ, buffer);
while (true) {
// 读取服务端数据
if (socketChannel.read(buffer) > 0) {
buffer.flip();
String msg = new String(buffer.array(), 0, buffer.limit());
System.out.println("收到服务端消息:" + msg);
break;
}
}
// 关闭输入流
socketChannel.shutdownInput();
// 关闭SocketChannel连接
socketChannel.close();
}
}
- 先运行NioServer,再运行NioClient,运行结果:
===========NIO服务端启动============
===========NIO服务端超时等待============
===========NIO服务端超时等待============
成功连接客户端
收到客户端消息:你好,靓仔!
向客户端发送消息:你好,坤坤!
==============NIO客户端启动================
发送消息:你好,靓仔!
收到服务端消息:你好,坤坤!
我们在这个案例里实现了一个比较简单的Java NIO 客户端服务端通信,里面有两个小的点需要注意,注册到选择器上的通道都必须要为非阻塞模型,同时通过缓冲区传输数据时,必须要调用flip()
方法切换为读取模式。
Java-NIO中有三个核心概念:Buffer
(缓冲区)、Channel
(通道)、Selector
(选择器)。
-
每个客户端连连接本质上对应着一个
Channel
通道,每个通道都有自己的Buffer
缓冲区来进行读写,这些Channel
被Selector
选择器管理调度 -
Selector
负责轮询所有已注册的Channel
,监听到有事件发生,才提交给服务端线程处理,服务端线程不需要做任何阻塞等待,直接在Buffer
里处理Channel
事件的数据即可,处理完马上结束,或返回线程池供其他客户端事件继续使用。 -
通过
Selector
,服务端的一个Thread
就可以处理多个客户端的请求 -
Buffer(缓冲区)就是饭店用来存放食材的储藏室,当服务员点餐时,需要从储藏室中取出食材进行制作。
-
Channel(通道)是用于传输数据的车道,就像饭店里的上菜窗口,可以快速把点好的菜品送到客人的桌上。
-
Selector(选择器)就是大堂经理,负责协调服务员、厨师和客人的配合和沟通,以保证整个就餐过程的效率和顺畅。
Java AIO
Java-AIO
也被成为NIO2
,它是在NIO
的基础上,引入了新的异步通道的概念,并提供了异步文件通道和异步套接字的实现。
它们的主要区别就在于这个异步通道,见名知意:使用异步通道去进行IO
操作时,所有操作都为异步非阻塞的,当调用read()/write()/accept()/connect()
方法时,本质上都会交由操作系统去完成,比如要接收一个客户端的数据时,操作系统会先将通道中可读的数据先传入read()
回调方法指定的缓冲区中,然后再主动通知Java程序去处理。
我们还是先来看个Demo:
- AioServer
/**
* @Author 三分恶
* @Date 2023/5/1
* @Description AIO服务端
*/
public class AioServer {
public static void main(String[] args) throws Exception {
// 创建异步通道组,处理IO事件
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
//创建异步服务器Socket通道,并绑定端口
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(8888));
System.out.println("=============AIO服务端启动=========");
// 异步等待接收客户端连接
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
// 创建ByteBuffer
final ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public void completed(AsynchronousSocketChannel channel, Object attachment) {
System.out.println("客户端连接成功");
try {
buffer.clear();
// 异步读取客户端发送的消息
channel.read(buffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer len, Object attachment) {
buffer.flip();
String message = new String(buffer.array(), 0, len);
System.out.println("收到客户端消息:" + message);
// 异步发送消息给客户端
channel.write(ByteBuffer.wrap(("你好,阿坤!").getBytes()), null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
// 关闭输出流
try {
channel.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
// 继续异步等待接收客户端连接
server.accept(null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
// 继续异步等待接收客户端连接
server.accept(null, this);
}
});
// 等待所有连接都处理完毕
group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
- AioClient
/**
* @Author 三分恶
* @Date 2023/5/1
* @Description AIO客户端
*/
public class AioClient {
public static void main(String[] args) throws Exception {
// 创建异步Socket通道
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// 异步连接服务器
client.connect(new InetSocketAddress("127.0.0.1", 8888), null, new CompletionHandler<Void, Object>() {
// 创建ByteBuffer
final ByteBuffer buffer = ByteBuffer.wrap(("你好,靓仔!").getBytes());
@Override
public void completed(Void result, Object attachment) {
// 异步发送消息给服务器
client.write(buffer, null, new CompletionHandler<Integer, Object>() {
// 创建ByteBuffer
final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
@Override
public void completed(Integer result, Object attachment) {
readBuffer.clear();
// 异步读取服务器发送的消息
client.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
readBuffer.flip();
String msg = new String(readBuffer.array(), 0, result);
System.out.println("收到服务端消息:" + msg);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 等待连接处理完毕
Thread.sleep(1000);
// 关闭输入流和Socket通道
client.shutdownInput();
client.close();
}
}
- 看下运行结果
=============AIO服务端启动=========
客户端连接成功
收到客户端消息:你好,靓仔!
收到服务端消息:你好,阿坤!
可以看到,所有的操作都是异步进行,通过completed
接收异步回调,通过failed
接收错误回调。
而且我们发现,相较于之前的NIO
而言,AIO
其中少了Selector
选择器这个核心组件,选择器在NIO
中充当了协调者的角色。
但在Java-AIO
中,类似的角色直接由操作系统担当,而且不是采用轮询的方式监听IO
事件,而是采用一种类似于“订阅-通知”的模式。
在AIO
中,所有创建的通道都会直接在OS
上注册监听,当出现IO
请求时,会先由操作系统接收、准备、拷贝好数据,然后再通知监听对应通道的程序处理数据。
Java-AIO
这种异步非阻塞式IO
也是由操作系统进行支持的,在Windows
系统中提供了一种异步IO
技术:IOCP(I/O Completion Port
,所以Windows
下的Java-AIO
则是依赖于这种机制实现。不过在Linux
系统中由于没有这种异步IO
技术,所以Java-AIO
在Linux
环境中使用的还是epoll
这种多路复用技术进行模拟实现的。
因为Linux的异步IO技术实际上不太成熟,所以Java-AIO
的实际应用并不是太多,比如大名鼎鼎的网络通信框架Netty
就没有采用Java-AIO,而是使用Java-NIO,在代码层面,自行实现异步。
小结
那么这期我们就快速过了一下Java的三种IO机制,它们的特点,我们直接看下图:
我们也发现,虽然Java-NIO
、Java-AIO
,在性能上比Java-BIO
要强很多,但是可以看到,写法上一个比一个难搞,不过好在基本也没人直接用Java-NIO
、Java-AIO
,如果要进行网络通信,一般都会采用Netty
,它对原生的Java-NIO
进行了封装优化,接下来,我们会继续走近Netty
,敬请期待。
参考:
[1].《Netty权威指南》
[2].https://juejin.cn/post/7130952602350534693#heading-14
[3].https://www.jianshu.com/p/670033e5b916