本文参考:
https://blog.csdn.net/yhl_jxy/article/details/79335692
https://www.cnblogs.com/cuzzz/p/17290070.html
https://www.cnblogs.com/cuzzz/p/17473398.html
https://pdai.tech/md/java/io/java-io-nio-select-epoll.html
最近准备看 Kafka 源码,发现其底层涉及了许多网络 IO 相关的知识,之前对这方面仅限于理论学习,导致整个源码阅读过程就比较一头雾水。这篇文章就想通过一些实际的代码,来更深刻地理解一下网络中的相关知识。
一、IO模型&Java IO
Unix为程序员提供了以下5种基本的io模型:
-
blocking io: 阻塞io
-
nonblocking io: 非阻塞io
-
I/O multiplexing: io多路复用
-
signal driven I/O:信号驱动io
-
asynchronous I/O:异步io
但我们平时工作中说的最多是,阻塞
,非阻塞
,同步
,异步
1. 阻塞非阻塞,同步异步
-
阻塞非阻塞关注调用程序的线程状态。阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
最典型的就是下面的 rt.jar 包下的 BlockingQueue 用法
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @Author jiangxuzhao
* @Description
* @Date 2024/8/24
*/
@Slf4j
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
queue.put(1);
log.info("non-blocking offer starts...");
// 非阻塞的插入,如果容量允许返回 true,否则返回 false
boolean flag = queue.offer(2);
log.info("non-blocking offer ends... flag = {}", flag);
log.info("blocking put starts...");
// 阻塞的插入,如果容量有限,那么阻塞到有空间可用,或者被中断
queue.put(2);
log.info("blocking put ends...");
}
}
-
同步与异步关注的是 消息通知机制。同步是发起调用在没有得到结果之前,该调用不返回。异步是发起调用后,调用直接就返回。
消息队列的作用之一就是异步,发送方发完消息立马就返回了,不需要等待该消息被消费者处理。那么异步情况下被调用方如何通知调用方呢?
-
通知
在调用结束之后被调用方通过消息队列或者 RPC 等方式告知调用方 -
回调
调用方注册了一个回调事件给被调用方,被调用方结束了就出发这个回调事件
-
2. Unix的 IO 模型
IO 操作分为两步:
-
将数据从磁盘等外部媒介拷贝到内核空间(数据准备)
等待数据就绪,例如读文件的过程中需要等待磁盘扫描所需数据,等待数据从磁盘拷贝到操作系统内核缓冲区
-
将数据从内核空间拷贝到用户空间
将上一步操作系统内核缓冲区的数据拷贝到应用程序的缓冲区(用户空间)。
2.1. blocking IO/阻塞IO
用户发起系统调用,产生中断,操作系统从用户态切换到内核态,在内核态中完成数据从外部媒介拷贝到内核空间(数据准备),再次内核空间拷贝至用户空间的两个步骤以后,然后切换回用户态,应用进程继续运行处理。
这里说的阻塞,是指系统调用不会立即返回,而是需要阻塞直到数据准备完成,并拷贝至用户空间。
2.2. non-blocking IO 非阻塞IO
可看到,和阻塞 IO 的区别在于,数据准备这个过程,调用方应用程序不会阻塞直到数据准备完成,而是会立即返回。至于何时知道内核空间的数据准备完了,只能依赖调用方不断地系统调用轮询了。
第二个阶段,数据从内核空间复制到用户空间仍然是阻塞的,这个过程通常是比较快速的,因为这时候已经有DMA控制器完成了数据从磁盘搬运到内核空间的操作,只需要拷贝到用户态空间即可。
2.3 I/O multiplexing IO多路复用
可以看到IO多路复用的流程和blocking io阻塞io
类似,甚至还会多一次系统调用。那么IO多路复用存在的意义是什么昵?
假设我们现在是一个服务端程序,存在多个网络 IO 需要处理,如果这个时候对每个网络 IO 都开一个 Socket 线程进行处理,那么多个线程都会阻塞于网络 IO 的系统调用上,这是对线程资源的浪费。
IO 多路复用的优点:可以使一个线程同时监听多路IO,这个线程阻塞在 select 系统调用上,如果多路 IO 中存在任何一个调用方关心的事件(可读/可写事件),线程将被唤醒,并在内核空间中完成了数据的拷贝,后面在进行处理,有效节省了线程资源。
2.4. signal driven I/O信号驱动IO
可以看到,信号驱动的 IO 在数据准备阶段是非阻塞的,发起系统调用注册完信号就直接返回了,当操作系统完成了内核空间的数据准备后,就会发送信号来通知用户进程发生了某事件,用户进程只需要编写对应的信号处理函数。在信号处理函数中,阻塞在内核数据拷贝到用户空间数据的过程中,拷贝完成后就可以对数据进行处理了。
相比于 nonblocking IO 非阻塞 IO, 在数据准备阶段,调用方不需要一直去轮询内核,只需要等待内核的的信号处理信号就可以了。
2.5. asynchronous I/O 异步 IO
上面四种模型都会在数据从内核空间拷贝到用户空间
这一步发生阻塞,也就是说第二步都是需要同步等待操作系统完成拷贝的。
异步 IO 则是在两个阶段都不会发生阻塞,应用程序只要通知内核要读取的套接字对象以及数据的接收地址,然后直接返回。接下来整个过程都是由内核独立来完成,包括数据准备、数据从内核空间向用户空间的拷贝,拷贝完成后再通过信号来通知用户进程。
3.Java 中的 IO 模型
将阻塞
,非阻塞
,同步
,异步
进行组合
-
阻塞同步 IO
这就是Java中的 BIO
-
非阻塞同步 IO
这就是Java中的 NIO,Java中的 NIO 是通过 IO 多路复用实现的
-
非阻塞异步 IO
这就是 Java 中的 AIO,Java 中的 AIO 也是通过 IO 多路复用实现,呈现出异步的表象
二、Java BIO
先来复习下 Socket 编程的核心步骤:
下面来编程实验 Java 中的 BIO
服务端代码:
package bio;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author jiangxuzhao
* @Description
* @Date 2024/8/24
*/
@Slf4j
public class BIOServer {
// 线程池处理连接请求
private static ExecutorService threadPool = new ThreadPoolExecutor(2 * Runtime.getRuntime().availableProcessors(),
20, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
public static void main(String[] args) throws IOException {
// 1. 创建一个 Socket Server 监听 tcp 1000 端口,IP 地址为 localhost
ServerSocket serverSocket = new ServerSocket(1000);
// 2. 阻塞式的接收客户端请求
while (true) {
// 阻塞直到有客户端连接过来
Socket socket = serverSocket.accept();
log.info("{} 连接到服务器", socket.getRemoteSocketAddress());
// 3. 多线程处理客户端请求,每个线程独占阻塞处理一个客户端IO
threadPool.submit(() ->process(socket));
}
}
private static void process(Socket socket) {
// 执行完可以自动释放 out 输出流
try (OutputStream out = socket.getOutputStream()){
// 用户空间缓存
byte[] buffer = new byte[1024];
int len;
// 接收客户端的数据
while((len = socket.getInputStream().read(buffer)) > 0) {
log.info("接收到客户端 {} 的数据为 {}", socket.getRemoteSocketAddress(), new String(buffer, 0, len));
// 直接将内容回写给客户端
out.write(buffer, 0, len);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
客户端代码:
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
* @Author jiangxuzhao
* @Description
* @Date 2024/8/24
*/
@Slf4j
public class Client {
public static void main(String[] args) throws IOException {
String host = "127.0.0.1";
Integer port = 1000;
Socket socket = new Socket(host, port);
try (OutputStream out = socket.getOutputStream()) {
byte[] buffer = new String("hello").getBytes();
out.write(buffer);
log.info("发送给服务器的请求为 {}", new String(buffer));
InputStream input = socket.getInputStream();
int len = input.read(buffer);
log.info("接收到服务器 {} 的响应为 {}", socket.getRemoteSocketAddress(), new String(buffer, 0, len));
}
}
}
客户端输出为(可以多运行几次):
18:51:03.186 [main] INFO Client - 发送给服务器的请求为 hello
18:51:03.193 [main] INFO Client - 接收到服务器 /127.0.0.1:1000 的响应为 hello
服务端输出为:
18:49:49.539 [main] INFO bio.BIOServer - /127.0.0.1:52396 连接到服务器
18:49:49.640 [pool-1-thread-1] INFO bio.BIOServer - 接收到客户端 /127.0.0.1:52396 的数据为 hello
18:50:00.678 [main] INFO bio.BIOServer - /127.0.0.1:52419 连接到服务器
18:50:00.679 [pool-1-thread-2] INFO bio.BIOServer - 接收到客户端 /127.0.0.1:52419 的数据为 hello
18:50:06.785 [main] INFO bio.BIOServer - /127.0.0.1:52435 连接到服务器
18:50:06.786 [pool-1-thread-3] INFO bio.BIOServer - 接收到客户端 /127.0.0.1:52435 的数据为 hello
18:51:03.181 [main] INFO bio.BIOServer - /127.0.0.1:52550 连接到服务器
18:51:03.183 [pool-1-thread-4] INFO bio.BIOServer - 接收到客户端 /127.0.0.1:52550 的数据为 hello
上面代码实现的功能就是:客户端给服务端发送的消息被服务端接收并且原封不动写回,服务端使用线程池来处理客户端的连接。我们先解析下其中阻塞调用的点:
-
服务端 Socket socket = serverSocket.accept(),主线程会一直阻塞在这行代码,直到有某一个客户端连接上来,这样子其实是服务端主线程释放了 CPU,避免主线程无休止的自选
-
服务端 process() 方法中 write()/read() 调用是阻塞的,首先通过 socket.getOutputStream()/socket.getInputStream() 拿到输入输出流,然后进行系统调用。主线程需要阻塞等待系统调用完成,等线程将数据从网卡或者硬盘读入内核空间,然后再由内核空间拷贝到用户空间,然后 Java 线程再进行逻辑操作。
这里由于存在阻塞调用,因此我们直接使用线程池,线程池中一个线程处理一个客户端请求,阻塞也只是阻塞线程池中的线程,不会阻塞主线程。
这种多线程处理 BIO 的优点:
- 简单直接,开发人员专注于编写 process 的业务代码。
- 不用担心系统限流、过载的问题等问题。线程池可以起到异步缓冲的作用,并且多个线程可以分担处理请求。
- 使用多线程利用多核 CPU 的能力,当线程阻塞的时候,可以将 CPU 时间片分给其余线程池中的线程。
缺点:
-
线程占用系统资源,线程池虽然进行了复用,但是大多数线程还是会被阻塞挂起,CPU 利用率并不高,被唤醒还会导致上下文切换频繁
-
多个线程使用同一个 JVM 进程的空间,单个线程占用内存较大的话会导致 JVM 空间不够用
那么如何解决上面的问题呢?有没有办法解放线程不让他们阻塞在 write/read 上,能读取就读取,不能读取就返回,然后继续去处理别的 socket 呢?
三、Java NIO
上面说的“解放线程不让他们阻塞在 write/read 上,能读取就读取,不能读取就返回,然后继续去处理别的 socket”,回顾上面 non-blocking IO 的示意图,正是这种非阻塞的方式,希望系统调用可以直接返回,而不是阻塞。
Java 中的 NIO 就是基于 IO 多路复用实现了非阻塞同步 IO 的效果:
客户端仍然复用前面的代码,但是记得连接服务器的 port 修改一下。
服务端代码:
其中涉及到 java.nio 包中 ByteBuffer 类的使用,参考文档:
- API 使用:https://blog.csdn.net/Shujie_L/article/details/135109223
- Mark、position、limit、capacity 在不同 API 的体现,有助于理解使用:https://blog.csdn.net/tousdi/article/details/138336809
package nio;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
/**
* @Author jiangxuzhao
* @Description
* @Date 2024/8/24
*/
@Slf4j
public class NIOServer {
public static void main(String[] args) throws IOException {
// 1. 创建 selector 用于监听多路 IO 的文件描述符
// selector 担任了重要角色,可以将任意 IO 注册到 selector 上,同时配置关心的事件,通过非阻塞轮询 selector 来得知哪路 IO 有消息了
// 底层是 epoll
// 后续会把 server 端注册上来,监听服务端接收到的客户端 IO 消息
// 每个 client 端的连接也会注册上来,接收客户端发过来的数据
Selector selector = Selector.open();
// 2. java.nio 中的 ByteBuffer 来处理字节缓冲区数据,用法参考我的参考文档
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 分配 1024 缓冲区
// 3. 把 server 端注册到 selector 中
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 监听 tcp 2001 号端口,IP 地址为本机 127.0.0.1
serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 2001));
// 将 server socketChannel 配置为非阻塞
// 在非阻塞模式下,accept()方法会立刻返回,没有新进来的客户端连接直接返回 null
serverSocketChannel.configureBlocking(false);
// server 配置为关心 accept 事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 4. 这一步是阻塞的,服务端阻塞等待事件,也可以配置等待超时时间 timeout,最终返回 ready keys 数量
// 基于 IO 多路复用中的 select poll/epoll
if (selector.select() == 0) {
log.info("no client connecting...");
continue;
}
// 5. 走到这里,至少有一路 IO 存在 ready 的事件,那么 keys 不为空
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
if (key.isAcceptable()) {
// 前面 selector 中只注册了 ServerSocketChannel 这一个 accept 的服务端,因此可以直接强转
// 拿到客户端连接的 SocketChannel
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
log.info("客户端 {} 连接到服务器", socketChannel.getRemoteAddress());
// 将 client SocketChannel 配置为读写非阻塞
// 在非阻塞模式下,read() 函数会立刻返回,如果无法读则直接返回 -1
socketChannel.configureBlocking(false);
// 6. 当第一次 client 连接时,就将这个连接也注册到 selector 中,设置为可读
// 当前只是客户端建立了连接,但是并不代表可读,因此需要设置为可读,DMA 将网卡中的数据拷贝到内核空间中
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 7. 步骤6将客户端连接注册上来了,并设置为可读,因此该 channel 被选出来说明客户端有数据来了
SocketChannel socketChannel = (SocketChannel) key.channel();
// 8. 借助 ByteBuffer 读取发送数据
byteBuffer.clear();
// 非阻塞 read
if (socketChannel.read(byteBuffer) <= 0) {
continue;
}
// 将上面 read 的 position 清0
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
// 将 Bytebuffer 数据存入 bytes 中
byteBuffer.get(bytes);
log.info("接收到客户端 {} 的数据为 {}", socketChannel.getRemoteAddress(), new String(bytes));
// 重新写回客户端
byteBuffer.clear();
// 将 bytes 数据放回 Bytebuffer
byteBuffer.put(bytes);
byteBuffer.flip();
// 非阻塞 write
socketChannel.write(byteBuffer);
}
}
// 9. 非常重要,清理掉每个 channel 的 key, 表示已经处理过了,避免下次 select 到重复的 keys
selectionKeys.clear();
}
}
}
整个示意图如下:
每个 Channel 都注册到 Selector 中,都有一个感兴趣的操作。
- ServerSocketChannel 只会在 Selector 上注册一个,感兴趣的操作直邮 ACCEPT
- SocketChannel 在 Selector 上会注册多个,因为一个 Server 通常会接收到多个 Client 的请求,SocketChannel 感兴趣的操作主要是 READ、WRITE,需要进行读写数据的操作。
分析:
- select() 是阻塞的,它还支持超时阻塞模式。一个线程监听多个 IO,连接在其上面的客户端 IO 只要有就绪的事件(或者被 wakeup、被 interrupt),select 就会返回。注意有个 selectNow() 方法是非阻塞的,当不存在就绪的 IO 时返回0。
- read()/write() 是非阻塞的,socketChannel.configureBlocking(false) 设置其为非阻塞,如果无法读,read() 函数会立刻返回-1,然后让当前主线程去遍历其他就绪事件的 IO,而不是阻塞在这里,这是非阻塞 IO 的体现。由于还是主线程主动发起调用获取数据,不是其他“通知或者回调”的方式,体现了同步。
四、Java AIO
package aio;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* @Author jiangxuzhao
* @Description
* @Date 2024/8/25
*/
@Slf4j
public class AIOServer {
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 2002));
log.info("服务端开始监听 2002 端口...");
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@SneakyThrows
@Override
public void completed(AsynchronousSocketChannel channel, Object attachment) {
// 递归注册 accept
serverSocketChannel.accept(attachment, this);
log.info("线程 {} 有客户端 {} 连接上", Thread.currentThread(), channel.getRemoteAddress());
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer len, ByteBuffer attachment) {
// 递归注册 read
channel.read(byteBuffer, null, this);
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
if (bytes.length <= 0) {
return;
}
log.info("客户端消息为 {}", new String(bytes));
byteBuffer.clear();
// 写回客户端
channel.write(ByteBuffer.wrap(bytes));
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
// 避免主线程直接结束
Thread.sleep(Integer.MAX_VALUE);
}
}
在 AIO 中,所有创建的通道都会直接在 OS 上注册监听,当出现 IO 请求时,会先由操作系统接收、准备、拷贝好数据,然后再通知监听对应通道的程序处理数据。
客户端的连接到来后,同样会先注册到选择器上,但是整个数据准备以及内核数据拷贝到用户数据都是在另一个异步线程中完成的,并没有让服务端主线程发生阻塞,呈现了异步的特征。