一、select介绍
1.1 初始select
系统提供的select函数来实现多路复用输入/输出模型。
- select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
- 程序会停止在select这里等待,直到被监视的文件描述符有一个或者多个发生了状态改变
IO = 等 + 拷贝,select函数只负责进行对fd进行等待,有时间就绪,就进行事件的派发,这里可以同时对多个fd进行等待。
1.2 认识一下select函数
函数原型:
函数参数:
- nfds:需要监视的最大的文件描述符值 + 1
- rdset/wrset/exset:分别对应于需要检测可读文件描述符的集合,可写文件描述符的集合以及异常文件描述符的集合。其结构是使用位图形式,位图中比特位的位置表示的是文件描述符的值,比特位的值:0表示的是不关心的文件描述符,1表示的是关心的文件描述符。
- timeout:其结构为timeval,用来设置select()的等待时间
函数功能:
在多路复用中,select函数用于监控多个文件描述符,以便在这些描述符上的某些事件(比如可读,可写或者异常条件)发生时进行处理:
- 等待事件:可以等待直到一个或者多个文件描述符变得可读、可写或者发生异常
- 非阻塞监控:它允许程序在多个文件描述符之间进行选择,而不是阻塞在单一操作中
- 提高效率:可以有效地处理大量并发连接,避免了使用线程或者进程池来处理每一个连接的开销
函数返回值:
- 执行成功,则返回文件描述符状态已经改变的个数
- 如果返回0,则代表在文件描述符状态改变前已经超过timeout时间了,没有返回
- 当有错误发生时,则返回-1,错误原因存在于errno,此时参数readfds,writefds,exceptfds和timeout的值变得不可预测
错误值可能为:
- EBADF:文件描述符为无效的或者该文件已经关闭
- EINTR:此调用被信号所中断
- EINVAL:参数n为负值
- ENOMEM:核心内存不足
参数timeout的取值:
- NULL:表示select()没有timeout,select函数将一直被阻塞,直到某一个文件描述符发生了事件,即阻塞IO
- 0:仅仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生,即非阻塞IO
- 特定的时间值:如果在指定的时间段里没有事件的发生,select函数将超时返回
关于timeval结构
timeval结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发送则函数返回,返回值为0。
关于 fd_set 结构
这个结构就是一个整数数组,更严格地说是一个“位图”,使用位图中对应的位来表示要监视的文件描述符,同时提供了一组操作fd_set的接口,来比较方便的操作位图。
void FD_CLR(int fd, fd_set *set); // 用来清楚描述符组set中相关的fd的位
void FD_ISSET(int fd, fd_set *set); // 用来测试描述符组set中相关的fd的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述符组set中相关的fd的位
void FD_ZERO(fd_set *set); // 用来清除描述符组set中的全部位
二、select函数的执行和socket的就绪
2.1 理解select函数的执行过程
理解select模型的关键在于理解fd_set,为了举例方便,我们去fd_set的长度为1字节,fd_set中的每一个bit可以对应一个文件描述符fd,则一个字节的fd_set最大可以对应8个fd。
- 执行fd_set set;FD_ZERO(&set);则set用位表示是 0000 0000
- 如果fd = 5,执行FD_SET(fd, &set);set变为了0001 0000(第五位置为1)
- 如果再加入fd = 2,fd = 1,则set变为 0001 0011
- 执行select(6,&set, 0, 0, 0)阻塞等待
- 如果fd = 1,fd = 2上都发生了可读事件,则select返回,此时set变成了0000 0011。注意:没有事件发生的 fd = 5 则被清空了
2.2 Socket就绪条件
2.2.1 读就绪
socket内核中,接收缓冲区中的字节数大于等于
2.2.2 写就绪
2.2.3 异常就绪
三、select的特点
3.1 select的优点
可以监控的文件描述符个数取决于sizeof(fd_set)的值,这个由服务器决定是多少,我的这个服务器上sizeof(fd_set)的值的大小为512,每一个bit表示一个文件描述符,则这个服务器上支持的最大的文件描述符是 512 * 8 = 4096
将fd加入select监控集的同时,还要再次使用一个数据结构array保存放到select监控集中的fd:一是用于在select返回后,array作为源数据和fd_set进行FD_ISSET判断;二是select返回后会把以前加入的但是并没有事件发生的fd清空,则每次开始select前,都要重新从array中取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数。
fd_set的大小可以调整,但是涉及重新编译内核
3.2 select的缺点
- 每次调用select,都需要手动设置fd集合,从接口使用角度来说是非常不便的
- 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd中很多时会很大
- 同时每次调用select,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
- select支持的文件描述符数量太少
四、看一看代码
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "Socket.hpp"
using namespace socket_ns;
// select服务器要正确的编写,需要借助一个第三方数组来完成,保存合法的,所有的fd到数组中,方便后期批量化统一添加
class SelectServer
{
const static int N = sizeof(fd_set) * 8;
const static int defaultfd = -1;
public:
SelectServer(uint16_t port)
: _port(port),
_listensock(std::make_unique<TcpSocket>())
{
InetAddr addr("0", _port);
_listensock->BuildListenSocket(addr);
for (int i = 0; i < N; i++)
{
_fd_array[i] = defaultfd;
}
_fd_array[0] = _listensock->SockFd();
}
void AcceptClient()
{
// 我们今天只关心了读,而读有:listensock 和 nornam sockfd
InetAddr clientaddr;
int sockfd = _listensock->Accepter(&clientaddr); // 这里调用accept会不会阻塞呢??不会。因为事件已经就绪了
if (sockfd < 0)
return;
LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());
// read/recv(sockfd); send(sockfd)?? 不能. 必须将新的fd,托管给select。如何托管呢???
// 只要将新的fd添加到辅助数组中即可。
int pos = 1;
for (; pos < N; pos++)
{
if (_fd_array[pos] == defaultfd)
break;
}
if (pos == N)
{
::close(sockfd); // sockfd->Close();
LOG(WARNING, "server is full!\n");
return;
}
else
{
_fd_array[pos] = sockfd;
LOG(DEBUG, "%d add to select array!\n", sockfd);
}
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
void ServiceIO(int pos)
{
char buffer[1024];
ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0); // 这里读取会不会被阻塞?不会
if (n > 0)
{
buffer[n] = 0;
std::cout << "client say# " << buffer << std::endl;
std::string echo_string = "[server echo]# ";
echo_string += buffer;
::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);
}
else if (n == 0)
{
LOG(DEBUG, "%d is closed\n", _fd_array[pos]);
::close(_fd_array[pos]);
_fd_array[pos] = defaultfd;
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
else
{
LOG(DEBUG, "%d recv error\n", _fd_array[pos]);
::close(_fd_array[pos]);
_fd_array[pos] = defaultfd;
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
}
void HandlerEvent(fd_set &rfds)
{
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
if (FD_ISSET(_fd_array[i], &rfds))
{
if (_fd_array[i] == _listensock->SockFd())
{
AcceptClient();
}
else
{
// 这里不就是普通的sockfd读事件就绪了吗?
ServiceIO(i);
}
}
}
}
void Loop()
{
while (true)
{
// listensocket 等待新连接到来,等价于对方给我发送数据!我们作为读事件同一处理
// 新连接到来 等价于 读事件就绪!
// 首先要将listensock添加到select中!
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = defaultfd;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
if (max_fd < _fd_array[i])
{
max_fd = _fd_array[i]; // 更新出最大的fd的值
}
}
struct timeval timeout = {0, 0};
// select 同时等待的fd,是有上限的。因为fd_set是具体的数据类型,有自己的大小!
// rfds是一个输入输出型参数,每次调用,都要对rfds进行重新设定!
int n = select(max_fd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);
switch (n)
{
case 0:
LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1:
LOG(ERROR, "select error...\n");
break;
default:
LOG(DEBUG, "Event Happen. n : %d\n", n); // 底层有一个事件就绪,select为什么会一直通知我?因为:我们没有处理!
HandlerEvent(rfds);
break;
}
}
}
std::string RfdsToString()
{
std::string fdstr;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
fdstr += std::to_string(_fd_array[i]);
fdstr += " ";
}
return fdstr;
}
~SelectServer()
{
}
private:
uint16_t _port;
std::unique_ptr<Socket> _listensock;
int _fd_array[N]; // 辅助数组
};