IO模型
如何进行网络通信
Socket通信是进程通讯的一种方式,通过调用这个网络库的一些API函数可以实现分布在不同主机的相关进程之间的数据交换
网络编程的基本流程是什么?
服务端先创建socket套接字,然后用这个套接字去绑定并监听某个端口,而客户端直接创建套接字,然后绑定端口,向服务器发起连接,这时候服务器接收到了这个请求,用accept接口把请求提取上来,并且创建一个新的套接字,这时候,服务器与这个客户端之间的通信就依赖于这一个套接字。
而在Java中,也是对这些系统函数的封装,底层还是调用这些系统函数。
同步阻塞IO
同步:我想要拿到数据需要手动调用API把数据从内核中拿到用户空间
阻塞:如果应用程序调用了系统函数来获取数据,但是迟迟没有数据到来的时候,主程序会一直等待数据的到来,不会往后执行代码。
阻塞的原因是什么?
阻塞与否和调用的函数无关,而是和socket套接字有关,要看在socket的位图中是否设置了非阻塞。
小结
同步阻塞IO全流程总结
服务端:
客户端:
同步阻塞IO存在什么问题
同步阻塞IO会导致效率太低,如果其中一个连接卡住了,基本全完蛋了,都得等着这一个连接超时或者断开,那么如何解决呢?
可以使用线程池来解决这个问题,每一次从底层拿到连接都开辟一个新的线程
在Java代码中:
但调整后的代码依旧有很大的问题:
- 客户端的并发数与后端的线程数成1:1的比例,线程的创建、销毁都是非常消耗系统资源的。服务器性能严重下降,甚至发生堆栈溢出等错误
- socket是阻塞型的,当连接创建后,如果客户端没有数据来会一直阻塞,导致大量线程都在阻塞状态,浪费资源
- 搞一个线程池的话,线程数量虽然可控,一旦一个连接占用一个线程,连接未断开之前会一直占用,就算没有任务也会阻塞住线程,不能给其他客户端使用,这样线程池资源很快会被耗尽,服务支持的连接数也有限
小结
同步非阻塞IO
非阻塞的特点:
- 应用程序调用read等系统函数获取数据没有数据的时候,可以直接返回,不会等着数据来才返回
注意:因为发起read调用且没有数据时直接就返回了,socket数据到来后应用程序不知道,所以同步非阻塞的socket需要在应用程序层面做一个read调用的轮询操作。
- 优势:应用程序不会阻塞在某一次的系统调用,调用返回后可以执行后面的代码,一定程度上提高了线程利用率
- 弊端:
- 应用程序由于不知道什么时候有数据所以需要一个轮询调用,会有很多无意义的系统调用,带来了一定的调用开销,由于其需要轮询的特点也不会释放线程去执行别的任务,同样没有解决客户端和线程数1:1的问题
- JAVA代码:
- 不在基于 java.net.ServerSocket和 java.net.Socket (阻塞),而是基于java.nio 包下的API
IO多路复用模型
阻塞IO和非阻塞IO的共同问题
应用程序不知道socket什么时候有数据了,所以当应用程序调用了read等系统函数在没有数据到来时,阻塞IO选择等待,非阻塞IO选择直接返回然后下次轮询
能不能有一种方式知道socket中数据是否准备好了,准备好了再调用read等函数去读取数据,否则不发起读取数据的操作!!!
IO多路复用模型下提供了几种不同的系统调用可以用来检测socket的可读写状态
select
poll
epoll
kqueue
select调用的主要流程分析
此时socket应该设置阻塞还是非阻塞? 都可以,但一般设置成非阻塞的
fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK)
select的优势
- 一次系统调用可检测多个socket状态,内核层面复用一次系统调用,应用层面复用一个线程,减少了对线程资源的占用
- 平台移植性好
select的不足
- 文件描述符fd有上限,默认1024个。
- 需要维护一个存放大量fd的数据结构,调用select时把这些数据从用户空间拷贝给内核空间,开销大。
- select调用返回后需要自己线性遍历fd_set来获取到就绪的fd,随着fd的增加会使得效率变低。
poll 调用 和 select 调用基本差不多,是对select的改进,理论上可监听的fd没有select那样的数量限制,并且在遍历上时间复杂度更低,更高效。
小结
epoll主要流程分析
epoll 涉及到三个系统调用:epoll_create,epoll_ctl,epoll_wait
-
epoll模型当中的红黑树本质就是告诉内核,需要监视哪些文件描述符上的哪些事件,调用epll_ctl函数实际就是在对这颗红黑树进行对应的增删改操作。
-
epoll模型当中的就绪队列本质就是告诉内核,哪些文件描述符上的哪些事件已经就绪了,调用epoll_wait函数实际就是在从就绪队列当中获取已经就绪的事件。
这个就绪队列是一个双线循环链表
在epoll中,对于每一个事件都会有一个对应的epitem结构体,红黑树和就绪队列当中的节点分别是基于epitem结构中的rbn成员和rdllink成员的,epitem结构当中的成员ffd记录的是指定的文件描述符值,event成员记录的就是该文件描述符对应的事件。
struct epitem{
struct rb_node rbn; //红黑树节点
struct list_head rdllink; //双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
-
对于epitem结构当中rbn成员来说,ffd与event的含义是,需要监视ffd上的event事件是否就绪。
-
对于epitem结构当中的rdlink成员来说,ffd与event的含义是,ffd上的event事件已经就绪了。
-
红黑树是一种二叉搜索树,因此必须有键值key,而这里的文件描述符就天然的可以作为红黑树的key值。
-
调用epoll_ctl向红黑树当中新增节点时,如果设置了EPOLLONESHOT选项,当监听完这次事件后,如果还需要继续监听该文件描述符则需要重新将其添加到epoll模型中,本质就是当设置了EPOLLONESHOT选项的事件就绪时,操作系统会自动将其从红黑树当中删除。
-
而如果调用epoll_ctl向红黑树当中新增节点时没有设置EPOLLONESHOT,那么该节点插入红黑树后就会一直存在,除非用户调用epoll_ctl将该节点从红黑树当中删除。
所有添加到红黑树当中的事件,都会与设备(网卡)驱动程序建立回调方法,这个回调方法在内核中叫ep_poll_callback。
-
对于select和poll来说,操作系统在监视多个文件描述符上的事件是否就绪时,需要让操作系统主动对这多个文件描述符进行轮询检测,这一定会增加操作系统的负担。
-
而对于epoll来说,操作系统不需要主动进行事件的检测,当红黑树中监视的事件就绪时,会自动调用对应的回调方法,将就绪的事件添加到就绪队列当中。
-
当用户调用epoll_wait函数获取就绪事件时,只需要关注底层就绪队列是否为空,如果不为空则将就绪队列当中的就绪事件拷贝给用户即可。
-
采用回调机制最大的好处,就是不再需要操作系统主动对就绪事件进行检测了,当事件就绪时会自动调用对应的回调函数进行处理。
-
注意:
-
只有添加到红黑树当中的事件才会与底层建立回调方法,因此只有当红黑树当中对应的事件就绪时,才会执行对应的回调方法将其添加到就绪队列当中。
-
当不断有监视的事件就绪时,会不断调用回调方法向就绪队列当中插入节点,而上层也会不断调用epoll_wait函数从就绪队列当中获取节点,这是典型的生产者消费者模型。
-
由于就绪队列可能会被多个执行流同时访问,因此必须要使用互斥锁对其进行保护,eventpoll结构当中的lock和mtx就是用于保护临界资源的,因此epoll本身是线程安全的。
-
eventpoll结构当中的wq(wait queue)就是等待队列,当多个执行流想要同时访问同一个epoll模型时,就需要在该等待队列下进行等待。
这里写一下在Linux C中使用epoll的方法。
首先可以创建一个类,来构建epoll
#include "socket.hpp"
#include <sys/epoll.h>
#define BACK_LOG 5
#define SIZE 256
class EpollServer{
private:
int _listen_sock; //监听套接字
int _port; //端口号
int _epfd; //epoll模型
public:
EpollServer(int port)
: _port(port)
{}
void InitEpollServer()
{
//创建监听套接字
_listen_sock = Socket::SocketCreate();
//监听套接字绑定对应的端口
Socket::SocketBind(_listen_sock, _port);
Socket::SocketListen(_listen_sock, BACK_LOG);
//创建epoll模型
_epfd = epoll_create(SIZE);
if (_epfd < 0){
std::cerr << "epoll_create error" << std::endl;
exit(5);
}
}
~EpollServer()
{
if (_listen_sock >= 0){
close(_listen_sock);
}
if (_epfd >= 0){
close(_epfd);
}
}
};
-
首先,在epoll服务器开始死循环调用epoll_wait函数之前,需要先调用epoll_ctl将监听套接字添加到epoll模型当中,表示服务器刚开始运行时只需要监视监听套接字的读事件。
-
此后,epoll服务器就不断调用epoll_wait函数监视读事件是否就绪。如果epoll_wait函数的返回值大于0,则说明已经有文件描述符的读事件就绪,并且此时的返回值代表的就是有事件就绪的文件描述符个数,接下来就应该对就绪事件进行处理。
-
如果epoll_wait函数的返回值等于0,则说明timeout时间耗尽,此时直接准备进行下一次epoll_wait调用即可。如果epoll_wait函数的返回值为-1,此时也让服务器准备进行下一次epoll_wait调用,但实际应该进一步判断错误码,根据错误码来判断是否应该继续调用epoll_wait函数。
#include "socket.hpp"
#include <sys/epoll.h>
#define BACK_LOG 5
#define SIZE 256
#define MAX_NUM 64
class EpollServer{
private:
int _listen_sock; //监听套接字
int _port; //端口号
int _epfd; //epoll模型
public:
void Run()
{
AddEvent(_listen_sock, EPOLLIN); //将监听套接字添加到epoll模型中,并关心其读事件
for (;;){
struct epoll_event revs[MAX_NUM];
int num = epoll_wait(_epfd, revs, MAX_NUM, -1);
if (num < 0){
std::cerr << "epoll_wait error" << std::endl;
continue;
}
else if (num == 0){
std::cout << "timeout..." << std::endl;
continue;
}
else{
//正常的事件处理
//std::cout<<"有事件发生..."<<std::endl;
HandlerEvent(revs, num);
}
}
}
private:
void AddEvent(int sock, uint32_t event)
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = sock;
epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
}
};
-
默认情况下,只要底层有就绪事件没有处理,epoll也会一直通知用户,也就是调用epoll_wait会一直成功返回,并将就绪的事件拷贝到我们传入的数组当中。
-
需要注意的是,所谓的事件处理并不是调用epoll_wait将底层就绪队列中的就绪事件拷贝到用户层,比如当这里的读事件就绪后,我们应该调用accept获取底层建立好的连接,或调用recv读取客户端发来的数据,这才算是将读事件处理了。
-
如果我们仅仅是调用epoll_wait将底层就绪队列当中的事件拷贝到应用层,那么这些就绪事件实际并没有被处理掉,底层注册的回调函数会被再次调用,将就绪的事件重新添加到就绪队列当中,本质原因就是我们实际并没有对底层就绪的数据进行读取。
事件处理
如果底层就绪队列当中有就绪事件,那么调用epoll_wait函数时就会将底层就绪队列中的事件拷贝到用户提供的revs数组当中,接下来epoll服务器就应该对就绪事件进行处理了,事件处理过程如下:
- 根据调用epoll_wait时得到的返回值,来判断操作系统向revs数组中拷贝了多少个struct epoll_event结构,进而对这些文件描述符上的事件进行处理。
- 对于每一个拷贝上来的struct epoll_event结构,如果该结构当中的events当中包含读事件,则说明该文件描述符对应的读事件就绪,但接下来还需要进一步判断该文件描述符是监听套接字还是与客户端建立的套接字。
- 如果是监听套接字的读事件就绪,则调用accept函数将底层建立好的连接获取上来,并调用epoll_ctl函数将获取到的套接字添加到epoll模型当中,表示下一次调用epoll_wait函数时需要监视该套接字的读事件。
- 如果是与客户端建立的连接对应的读事件就绪,则调用recv函数读取客户端发来的数据,并将读取到的数据在服务器端进行打印。
- 如果在调用recv函数时发现客户端将连接关闭或recv函数调用失败,则epoll服务器也直接关闭对应的连接,并调用epoll_ctl函数将该连接对应的文件描述符从epoll模型中删除,表示下一次调用epoll_wait函数时无需再监视该套接字的读事件。
#include "socket.hpp"
#include <sys/epoll.h>
#define BACK_LOG 5
#define SIZE 256
#define MAX_NUM 64
class EpollServer{
private:
int _listen_sock; //监听套接字
int _port; //端口号
int _epfd; //epoll模型
public:
void HandlerEvent(struct epoll_event revs[], int num)
{
for (int i = 0; i < num; i++){
int fd = revs[i].data.fd; //就绪的文件描述符
if (fd == _listen_sock&&revs[i].events&EPOLLIN){ //连接事件就绪
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int sock = accept(_listen_sock, (struct sockaddr*)&peer, &len);
if (sock < 0){ //获取连接失败
std::cerr << "accept error" << std::endl;
continue;
}
std::string peer_ip = inet_ntoa(peer.sin_addr);
int peer_port = ntohs(peer.sin_port);
std::cout << "get a new link[" << peer_ip << ":" << peer_port << "]" << std::endl;
AddEvent(sock, EPOLLIN); //将获取到的套接字添加到epoll模型中,并关心其读事件
}
else if (revs[i].events&EPOLLIN){ //读事件就绪
char buffer[64];
ssize_t size = recv(fd, buffer, sizeof(buffer)-1, 0);
if (size > 0){ //读取成功
buffer[size] = '\0';
std::cout << "echo# " << buffer << std::endl;
}
else if (size == 0){ //对端连接关闭
std::cout << "client quit" << std::endl;
close(fd);
DelEvent(fd); //将文件描述符从epoll模型中删除
}
else{
std::cerr << "recv error" << std::endl;
close(fd);
DelEvent(fd); //将文件描述符从epoll模型中删除
}
}
}
}
private:
void AddEvent(int sock, uint32_t event)
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = sock;
epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
}
void DelEvent(int sock)
{
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
}
};
epoll的优势
- 可监听的文件描述符fd数量理论上是无限制的,支持的fd上限是最大可以打开的文件数,可查看 /proc/sys/fs/file-max,一般来说和系统内存有一定关系。
- 应用不需要维护一个fd集合,不需要在用户空间和内核空间来回拷贝全量fd。
- 不需要像select和poll一样在底层线性遍历所有的事件看是否就绪,而是通过函数回调的方式来告知上层事件是否就绪。
epoll的不足
- 可移植性不强。
小结
异步IO
- 同步IO是当socket缓冲区有数据后需要应用主动调用read等系统函数完成数据从内核缓冲区到用户空间的拷贝,并且拷贝过程中应用线程是阻塞的。
- 同步阻塞,同步非阻塞,IO多路复用,均属于同步模型
- 异步IO是指整个过程均由内核完成(内核等待数据到来,读取数据到内核缓冲区,然后将数据拷贝到用户空间),完成后通知用户进程,在整个I/O操作期间都不会阻塞用户进程。
异步IO的主要工作流程
异步IO在Windows下很成熟,Linux下不太完善
JAVA NIO
基于java.net.ServerSocket 和 java.net.Socket 的API 封装的是同步阻塞IO模型,不支持同步非阻塞和IO多路复用模型,而java.nio包下的相关API 是支持同步非阻塞及IO多路复用模型的
JAVA NIO概述
NIO,称之为New IO 或是 none-block IO (非阻塞IO),这两种说法都可以,其实称之为非阻塞IO更恰当一些,NIO的三大核心组件:Channel(通道)、Buffer(缓冲区)、Selector(选择器/多路复用器)
什么是Channel
Channel实现体系
大致可以分为网络读写的channel和文件读写的channel
ServerSocketChannel用于服务端socket,SocketChannel用户客户端的socket
小结
什么是Buffer?
在Buffer中有四个很重要的变量:
- capacity:缓冲区可以容纳的元素的最大数量
- limit:停止读写的索引
- position:当前要读或者写的索引,读写操作是从position开始,到limit停止
- mark:用于标记position的位置,默认为-1
ByteBuffer有哪些索引操作
flip:
1 向ByteBuffer 中写数据:核心方法是put
- put(byte[] src)
- put(byte[] src, int offset, int length)
- put(ByteBuffer src)
2 从ByteBuffer中读数据:核心方法是get
get(byte[] dst)
get(byte[] dst, int offset, int length)
Selector
创建selector:Selector selector = Selector.open();【epoll_create】
将channel注册给selector:socketChannel.register(selector, SelectionKey.OP_ACCEPT);[epoll_ctl]
selector执行select:selector.select();[epoll_wait]
基于JAVA NIO编写服务端程序
1,创建ServerSocketChannel,绑定地址端口,设置阻塞标识
2,打开一个Selector,将ServerSocketChannel注册到selector上,监听OP_ACCEPT事件
3,在一个loop中执行selector的select
4,根据返回的SelectionKey信息判断事件类型,根据事件类型完成不同操作
5,如果是ACCEPT事件则accept一个客户端连接SocketChannel,设置阻塞标识并将其注册到selector上,监听OP_READ事件
6,如果是读写事件则进行读写操作
package org.example;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
public class Main {
public static void main(String[] args) {
try {
//1.创建ServerSocketChannel通道,绑定端口,设置为非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
//2.打开一个Selector 将 serverSocketChannel注册一个accept事件
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//3.编写loop 执行selector.select看是否有事件就绪
while(true){
int select = selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
processSelectedKey(selectionKey,selector);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void processSelectedKey(SelectionKey selectionKey, Selector selector) throws IOException {
//根据不同的事件来做出不同的操作
//先判断key是否有效
if(selectionKey.isValid()){
//如果是Accept连接事件
if(selectionKey.isAcceptable()){
ServerSocketChannel serverSocketChannel=(ServerSocketChannel)selectionKey.channel();
//接收一个客户端连接
SocketChannel channel = serverSocketChannel.accept();
//设置为非阻塞
channel.configureBlocking(false);
//注册到selector监听 OP_READ 事件 并获取到SelectionKey
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
return;
}
}
if(selectionKey.isReadable()){
//有数据可以读取,用SocketChannel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
//如果读取到的长度大于0
if(read>0){
//从buffer中拿到数据
//先切换状态
buffer.flip();
//remaining获取大小
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
java.lang.String msg = new java.lang.String(bytes, Charset.defaultCharset());
System.out.println("服务端收到来自客户端的数据:"+msg);
//业务操作
ByteBuffer sendBuffer = ByteBuffer.allocate(256);
sendBuffer.put("hello nio client , I an nio server\n".getBytes(StandardCharsets.UTF_8));
sendBuffer.flip();
socketChannel.write(sendBuffer);
}
return;
}
if(selectionKey.isWritable()){
//可写入的
}
}
}