前言
(本文是作者学习制作rpc框架时,一些自用的笔记,并不会完整详细的介绍某个模块,会写大概的流程及一些相关概念,供日后复习使用~)
IO模型
先理解基本的IO流程:
- 应用A把消息发送到 TCP发送缓冲区
- TCP发送缓冲区再把消息发送出去,经过网络传递后,消息会发送到B服务器的TCP接收缓冲区
- B再从TCP接收缓冲区去读取属于自己的数据
1.阻塞IO模型
A发送消息到TCP发送缓冲区的过程,同样会出现以下情况:
- TCP发送缓冲区满了,也就是说A发送的消息TCP发送缓冲区接收不了
- A无法成功的将消息发送到TCP发送缓冲区
这时候TCP发送缓冲区产生了一个问题?
- 告诉A,现在缓冲区满了,你该干啥干啥去
- 告诉A,现在缓冲区满了,你等一会,有空间的时候,你再拷贝数据到缓冲区(阻塞)
以应用A为例,阻塞IO就是,当A发起 发送数据的请求,发送缓冲区一直处于满的状态,则A一直处于等待状态,直到内核将数据发送出去有空间,A发送出为止。
术语描述:在应用调用recvfrom读取数据时,其系统调用直到数据包到达且被复制到应用缓冲区中或者发送错误时才返回,在此期间一直会等待,进程从调用到返回这段时间内都是被阻塞的,称为阻塞IO。
recvfrom
用来接收远程主机经指定的socket 传来的数据, 并把数据存到由参数buf 指向的内存空间。
流程:
- 应用进程向内核发起recvfrom读取数据
- 准备数据报(应用进程阻塞)
- 将数据从内核复制到应用空间
- 复制完成后,返回成功提示
2.非阻塞IO模型
很明显,非阻塞IO就是,内核数据没有准备好之前,会直接通知B未准备好,让B不要等待了
B 问 准备好了吗?内核 没准备好 过了一会
术语:非阻塞IO是在应用调用recvfrom读取数据时,如果该缓冲区没有数据的话,就会直接返回一个EWOULDBLOCK错误,不会让应用一直等待中。在没有数据的时候会即刻返回错误标识,那也意味着如果应用要读取数据就需要不断的调用recvfrom请求,直到读取到它数据要的数据为止。B又问:准备好了吗?
流程:
- 应用进程向内核发起recvfrom读取数据
- 没有数据报准备好,即刻返回EWOULDBLOCK错误码
- 应用进程向内核发起recvfrom读取数据
- 已有数据包准备好就进行一下 步骤,否则还是返回错误码
- 将数据从内核拷贝到用户空间
- 完成后,返回成功提示
但会存在一个问题,如果只有一个线程的话,会忙等待,即一直询问数据有无准备好,没有准备好返回错误,造成cpu空转,浪费资源
而且如果是多线程的话,对于每个连接请求都要建立一个线程来处理,也会造成资源的浪费
3.IO多路复用模型
就如刚刚所说的:
假设B从TCP缓冲区中读取数据,如果在并发的环境下,可能会N个人向应用B发送消息,这种情况下我们的应用就必须创建N个线程去读取数据,每个线程都会自己调用recvfrom 去读取数据。那么此时情况可能如下图:
这时我们可以以下这样处理:
可以有一个或者多个线程监控多个网络请求(fd文件描述符,linux系统把每个网络请求以一个fd来标识),这样就可以只需要一个或几个线程就可以完成数据状态询问的操作,当有数据准备就绪之后再分配对应的线程去读取数据,这么做就可以节省出大量的线程资源出来,这个就是IO复用模型的思路
但这个模型依然存在一个问题:我们该如何得知每个网络请求的状态?
这个模型一般来说是轮询每个网络连接的状态,找到需要发出请求(读or写)的网络连接,然后分配线程来处理。但是大多数情况下很多线程都是空闲状态,不会频繁发起读写请求,所以采用轮询的方式,也会造成资源的浪费(为了一个连接请求,需要遍历全部的连接请求)
术语描述:进程通过将一个或多个fd传递给select,阻塞在select操作上,select帮我们侦测多个fd是否准备就绪,当有fd准备就绪时,select返回数据可读状态,应用程序再调用recvfrom读取数据
复用IO的基本思路就是通过select或poll、epoll 来监控多fd ,来达到不必为每个fd创建一个对应的监控线程,从而减少线程资源创建的目的。
4.信号驱动IO模型
为了解决以上问题,提出了该方案。
不要我总是去问你是否数据准备就绪,能不能我发出请求后等你数据准备好了就通知我,这就衍生了信号驱动IO模型
术语描述:首先开启套接口信号驱动IO功能,并通过系统调用sigaction执行一个信号处理函数,此时请求即刻返回,当数据准备就绪时,就生成对应进程的SIGIO信号,通过信号回调通知应用线程调用recvfrom来读取数据。(侧重点是告诉你该进行下一步的IO操作了)
IO复用模型里面的select虽然可以监控多个fd了,但select其实现的本质上还是通过不断的轮询fd来监控数据状态, 因为大部分轮询请求其实都是无效的,所以信号驱动IO意在通过这种建立信号关联的方式,实现了发出请求后只需要等待数据就绪的通知即可,这样就可以避免大量无效的数据状态轮询操作。
以上模型虽然数据准备就绪后,会发送信号通知内核进行读取,但是读取的过程依然是阻塞的,直到读取完毕
5.异步IO
不管IO复用还是信号驱动,我们读取数据都要进行两个步骤:
1.发送select请求,将自己加入到select进行监控,询问状态是否准备好
2、发送recvfrom请求读取数据(阻塞)
为了解决第二步读取数据时会阻塞的问题,这里提出了异步IO。
应用只需要向内核发送一个read 请求,告诉内核它要读取数据后立即返回;内核收到请求后会建立一个信号联系,当数据准备就绪,内核会主动把数据从内核复制到用户空间,等所有操作都完成之后,内核会发起一个通知告诉应用读取完毕
术语描述: 应用告知内核启动某个操作,并让内核在整个操作完成之后,通知应用,异步IO与信号驱动模型的主要区别在于,信号驱动IO只是由内核通知我们什么时候可以开始下一个IO操作,而异步IO模型是由内核通知我们该操作什么时候完成。
异步IO的优化思路是解决了应用程序需要先发送询问请求、后发送接收数据请求两个阶段的模式,在异步IO的模式下,只需要向内核发送一次请求就可以完成状态询问和数拷贝的所有操作。
BIO、NIO
BIO
BIO 全称Block-IO, 是一种同步且阻塞的通信模式
在IO模型里面如果请求方从发起请求到数据最后完成的这一段过程中都需要自己参与,那么这种我们称为同步请求;反之,如果应用发送完指令后就不再参与过程了,只需要等待最终完成结果的通知,那么这就属于异步。
如上图,是典型的BIO模型,每当有一个连接到来,经过协调器的处理,就开启一个对应的线程进行接管。如果连接有1000条,那就需要1000个线程。线程资源是非常昂贵的,除了占用大量的内存,还会占用非常多的CPU调度时间,所以BIO在连接非常多的情况下,效率会变得非常低。
下面的代码是使用ServerSocket
实现的一个简单socket服务器,监听在8888端口:
public class BIO {
static boolean stop = false;
public static void main(String[] args) throws Exception {
int connectionNum = 0;
int port = 8888;
ExecutorService service = Executors.newCachedThreadPool();
ServerSocket serverSocket = new ServerSocket(port);
while (!stop) {
if (10 == connectionNum) {
stop = true;
}
Socket socket = serverSocket.accept();
service.execute(() -> {
try {
Scanner scanner = new Scanner(socket.getInputStream());
PrintStream printStream = new PrintStream(socket.getOutputStream());
while (!stop) {
String s = scanner.next().trim();
printStream.println("PONG:" + s);
}
} catch (Exception ex) {
ex.printStackTrace();
}
});
connectionNum++;
}
service.shutdown();
serverSocket.close();
}
}
可以看到,BIO的读写操作是阻塞的,线程的整个生命周期和连接的生命周期是一样的,而且不能够被复用。
就单个阻塞IO
来说,它的效率并不比NIO
慢。但是当服务的连接增多,考虑到整个服务器的资源调度和资源利用率等因素,NIO
就有了显著的效果,NIO非常适合高并发场景。
NIO
Java NIO,全称 Non-Block IO ,是Java SE 1.4版以后,针对网络传输效能优化的新功能。是一种非阻塞同步的通信模式
NIO主要有三种实现:select,poll,epoll。这三种都是IO多路复用的机制。
I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的。
Java的NIO,在Linux上底层是使用epoll实现的
fd
每条连接、每个文件,都对应着一个描述符,比如端口号。内核在定位到这些连接的时候,就是通过fd进行寻址的event
事件
Select
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。
调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。
当select函数返回后,需要通过遍历fdset,来找到就绪的描述符
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。
select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。
Poll
int poll (struct pollfd *fds, unsigned int nfds, int timeout);
不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events to watch */
short revents; /* returned events witnessed */
};
pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。
和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket
。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降
Epoll
epoll对select中存在的问题都逐一解决,简单来说epoll的优势包括:
- 对fd数量没有限制(当然这个在poll也被解决了)
- 抛弃了bitmap数组实现了新的结构来存储多种事件类型
- 无需重复拷贝fd 随用随加 随弃随删
- 采用事件驱动避免轮询查看可读写事件
epoll出现之后大大提高了并发量,能够轻松应对C10K问题
//用户数据载体
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
//fd装载入内核的载体
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
//三板斧api
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
-
int epoll_create(int size)
创建一个epoll的句柄,后续的操作都是基于此fd的。
size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,
参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
。 当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。 -
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
函数是对指定描述符fd执行op操作。
-
- epfd:是epoll_create()的返回值。
-
- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
-
- fd:是需要监听的fd(文件描述符)
-
- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; //events可以是以下几个宏的集合: EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLOUT:表示对应的文件描述符可以写; EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR:表示对应的文件描述符发生错误; EPOLLHUP:表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
-
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
等待epfd上的io事件,最多返回maxevents个事件。 参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
-
epoll_event是用户态需监控fd的代言人,后续用户程序对fd的操作都是基于此结构的;
通俗描述:
-
epoll_create场景:
大学开学第一周,你作为班长需要帮全班同学领取相关物品,你在学生处告诉工作人员,我是xx学院xx专业xx班的班长,这时工作人员确定你的身份并且给了你凭证,后面办的事情都需要用到(
也就是调用epoll_create向内核申请了epfd结构,内核返回了epfd句柄给你使用
); -
epoll_ctl场景:
你拿着凭证在办事大厅开始办事,分拣办公室工作人员说班长你把所有需要办理事情的同学的学生册和需要办理的事情都记录下来吧,于是班长开始在每个学生手册单独写对应需要办的事情:
李明需要开实验室权限、孙大熊需要办游泳卡......就这样班长一股脑写完并交给了工作人员(
也就是告诉内核哪些fd需要做哪些操作
); -
epoll_wait场景:
你拿着凭证在领取办公室门前等着,这时候广播喊xx班长你们班孙大熊的游泳卡办好了速来领取、李明实验室权限卡办好了速来取....还有同学的事情没办好,所以班长只能继续(
也就是调用epoll_wait等待内核反馈的可读写事件发生并处理
);
工作模式
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件
。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件
。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
1. LT模式
LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。
2. ET模式
ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知。
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
NIO代码示例
package com.mszlu.rpc.io.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NIO {
static boolean stop = false;
public static void main(String[] args) throws Exception {
int connectionNum = 0;
int port = 8888;
ExecutorService service = Executors.newCachedThreadPool();
//创建了一个服务端ssc,并开启一个新的事件选择器,监听它的OP_ACCEPT事件。
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress("localhost", port));
Selector selector = Selector.open();
//共有4种事件类型。
// 分别是新连接事件(OP_ACCEPT)、连接就绪事件(OP_CONNECT)、读就绪事件(OP_READ)、写就绪事件(OP_WRITE)。
// 任何网络和文件操作,都可以抽象成这四个事件
//SelectionKey.OP_ACCEPT = 16 ssc.validOps()=16
ssc.register(selector, ssc.validOps());
while (!stop) {
if (10 == connectionNum) {
stop = true;
}
//在while循环里,使用select函数,阻塞在主线程里。
// 所谓阻塞,就是操作系统不再分配CPU事件片到当前线程中,所以select函数是几乎不占用任何系统资源的
int num = selector.select();
if (num == 0) {
continue;
}
//一旦有新的事件到达,比如有新的连接到来,主线程就能够被调度到,程序就能够向下执行。
// 这时候,就能够根据订阅的事件通知,持续获取订阅的事件。
//由于注册到selector的连接和事件可能会有多个,所以这些事件也会有多个
//使用安全的迭代器循环进行处理,在处理完毕之后,将它删除。
//如果事件不删除的话,或者漏掉了某个事件的处理,会怎么样呢?
// 后果还是比较严重的,由于事件总是存在,我们的程序会陷入无休无止的循环之中。
Iterator<SelectionKey> events = selector.selectedKeys().iterator();
while (events.hasNext()) {
SelectionKey event = events.next();
if (event.isAcceptable()) {
//NIO操作的对象是抽象的概念Channel,通过缓冲区进行数据交换
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//订阅OP_READ事件,数据流读取
sc.register(selector, SelectionKey.OP_READ);
connectionNum++;
} else if (event.isReadable()) {
try {
SocketChannel sc = (SocketChannel) event.channel();
//创建了一个1024字节的缓冲区,用于数据的读取
//如果连接中的数据,大于1024字节怎么办?
//水平触发 (level-triggered) 称作LT模式。只要缓冲区有数据,事件就会一直发生
//边缘触发 (edge-triggered) 称作ET模式。缓冲区有数据,仅会触发一次。事件想要再次触发,必须先将fd中的数据读完才行
//java的NIO使用了LT模式,LT模式频繁环唤醒线程,效率相比较ET模式低,所以Netty使用JNI的方式,实现了ET模式,效率上更高一些
ByteBuffer buf = ByteBuffer.allocate(1024);
//这依旧是阻塞的
int size = sc.read(buf);
if(-1==size){
sc.close();
}
String result = new String(buf.array()).trim();
ByteBuffer wrap = ByteBuffer.wrap(("PONG:" + result).getBytes());
sc.write(wrap);
} catch (Exception ex) {
ex.printStackTrace();
}
} else if (event.isWritable()) {
SocketChannel sc = (SocketChannel) event.channel();
}
events.remove();
}
}
service.shutdown();
ssc.close();
}
}