目录
0.往期文章
1.五种IO模型介绍
概念
调用函数(非阻塞IO)
2.详解多路转接 之select
select函数介绍
设置文件描述符
写一个基于select的TCP服务器
辅助库
基于TCP的Socket封装
服务器代码
测试服务器
小结
3.详解多路转接 之poll
poll函数介绍
pollfd 结构
写一个基于poll的TCP服务器
小结
0.往期文章
Linux--应用层协议HTTP协议(http服务器构建)-CSDN博客
Linux--传输层协议UDP-CSDN博客
Linux--传输层协议TCP-CSDN博客
1.五种IO模型介绍
概念
1. 阻塞IO模型
- 特点:在阻塞IO模型中,应用程序发起一个IO请求后会一直阻塞等待操作完成,直到数据准备好或者超时才返回结果。在等待IO完成期间,应用程序会处于阻塞状态,无法执行其他任务。
- 典型应用:阻塞socket、Java BIO等。
- 优点:实现难度低,开发应用较容易。
- 缺点:不适用并发量大的应用,因为每个请求IO都会阻塞进程,需要为每个请求分配一个处理进程(线程),系统开销大。
2. 非阻塞IO模型
- 特点:应用程序发起一个IO请求后会立即返回,无需等待操作完成。应用程序需要不断轮询或者使用事件通知来检查操作是否完成。
- 典型应用:socket设置为NONBLOCK模式。
- 优点:在等待数据的过程中可以立即返回,用户线程不会被阻塞,实时性较好。
- 缺点:进程轮询调用会消耗CPU资源,且实现难度和复杂度相对较高。
3. IO多路复用/多路转接模型
- 特点:使用操作系统提供的select、poll或epoll等多路复用机制,允许应用程序同时监视多个IO事件。应用程序可以将多个IO请求注册到一个多路复用器上,然后通过轮询或者阻塞等待多路复用器通知事件的发生。
- 典型应用:JAVA7 AIO、高性能服务器应用等。
- 优点:不阻塞,数据一步到位,提高了系统的并发性能。
- 缺点:需要操作系统的底层支持,且对单个连接的处理速度可能不如其他模型。
4. 信号驱动的IO模型
- 特点:使用信号机制来实现异步IO,应用程序通过向内核注册信号处理函数来处理IO事件。当IO操作完成时,内核会发送一个信号通知应用程序,然后由应用程序在信号处理函数中处理该事件。
- 优点:相比阻塞IO和非阻塞IO更为灵活,适用于需要处理多个IO事件的场景。
- 缺点:在Linux中信号队列是有限制的,如果超过限制可能导致无法读取数据。此外,信号处理函数的执行可能会受到系统调用的限制。
5. 异步IO模型
- 特点:通过操作系统提供的异步IO接口来实现,应用程序发起一个IO请求后会立即返回,并且在操作完成后会通过回调或事件通知的方式通知应用程序。应用程序无需等待操作完成,可以继续执行其他任务。
- 典型应用:需要高并发、高性能的场景,如网络服务器、大规模并行计算等。
- 优点:真正实现了非阻塞IO,提高了系统的并发性能和吞吐量。
- 缺点:实现难度和复杂度较高,需要操作系统和应用程序的紧密配合。
前面四种,都是同步IO,因为它们都参与了IO的过程。
调用函数(非阻塞IO)
非阻塞IO
fcntl函数:一个文件描述符, 默认都是阻塞 IO,通过fcntl可以改变已打开的文件性质。
其中,
fd
参数代表欲设置的文件描述符,cmd
参数代表打算操作的指令,根据cmd
的值,fcntl
可以接受第三个参数。
- 复制一个现有的描述符(cmd=F_DUPFD) .
- 获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD).
- 获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL).
- 获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN).
- 获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW)
我们此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为
非阻塞。下面是一个示例:
Comm.hpp
#include <iostream> #include <unistd.h> #include <fcntl.h> void SetNonBlock(int fd) { int fl = ::fcntl(fd, F_GETFL); if(fl < 0) { std::cout << "fcntl error" << std::endl; return; } ::fcntl(fd, F_SETFL, fl | O_NONBLOCK); }
fcntl
函数和F_GETFL
命令来获取与fd
关联的文件状态标志。这些标志包括文件是否以只读、只写或读写模式打开,以及是否设置了非阻塞模式等。再次调用fcntl
函数,但这次使用F_SETFL
命令来设置文件描述符的标志。它将之前获取的标志fl
与O_NONBLOCK
标志进行按位或操作,然后将结果作为新的标志集传递给fcntl
。
O_NONBLOCK
标志指定对文件描述符非阻塞,当设置了这个标志后,如果某个 I/O 操作不能立即完成,调用该操作的函数将不会使调用线程进入睡眠状态,而是立即返回一个错误,通常是EAGAIN
或EWOULDBLOCK
。Main.cc
#include <iostream> #include <cstdio> #include <unistd.h> #include "Comm.hpp" #include <sys/select.h> int main() { char buffer[1024]; SetNonBlock(0); while(true) { ssize_t n = ::read(0, buffer, sizeof(buffer)-1); if(n > 0) { buffer[n] = 0; printf("echo# %s", buffer); } else if(n == 0) // ctrl + d { printf("read done\n"); break; } else { // 如果是非阻塞,底层数据没有就绪,IO接口,会以出错形式返回 // 所以,如何区分 底层不就绪 vs 真的出错了? 根据errno错误码 if(errno == EWOULDBLOCK) { sleep(1); std::cout << "底层数据没有就绪,开始轮询检测" << std::endl; std::cout << "do other thing" << std::endl; continue; } else if(errno == EINTR)//被信号中断 { continue; } else { perror("read");//读写错误 break; } } } return 0; }
有输入的时候,就向显示器输出,没有的时候,进程可以做其他的事情。
2.详解多路转接 之select
多路转接的作用:为了等待多个fd,等该fd上面的新事件就绪(OS底层有数据了->读事件就绪;OS底层有看见了->写事件就绪了),通知程序员,事件已经就绪,可以就绪IO拷贝了!(IO = 等 + 拷贝,多路转接的作用就是在等上)
select函数介绍
定位:只负责进行等,不进行拷贝。
作用:select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的;程序会停在 select 这里等待, 直到被监视的文件描述符有一个或多个发生了状态改变。参数:
- nfds:这是一个整数值,指定了被检查的文件描述符的数量。它应该设置为文件描述符集合中的最大值加1。不过,在实际应用中,这个参数常常被设置为文件描述符集合中最大的文件描述符加1,但这并不是严格要求的,因为内核会忽略大于最大文件描述符的值。
- readfds:这是一个指向
fd_set
的指针,用于指定哪些文件描述符应该被检查可读性。如果设置为NULL
,则不检查可读性。- writefds:这是一个指向
fd_set
的指针,用于指定哪些文件描述符应该被检查可写性。如果设置为NULL
,则不检查可写性。- exceptfds:这是一个指向
fd_set
的指针,用于指定哪些文件描述符应该被检查异常条件(如带外数据到达)。如果设置为NULL
,则不检查异常条件。- timeout:这是一个指向
timeval
结构的指针,指定了等待的最大时间。如果设置为NULL
,则调用将无限期阻塞,直到至少有一个文件描述符就绪。如果timeout
中的秒数和微秒数都设置为0,则select
将立即返回,而不会等待文件描述符就绪。eg:timeval timeout={5,0},表示5秒内阻塞等待,5秒过后超时;timeval timeout={0,0},非阻塞轮询struct timeval的结构体类型:
struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ };
返回值:
- 成功时,
select
返回就绪(可读、可写或异常)的文件描述符数量。- 如果在调用时没有任何文件描述符就绪,并且
timeout
指定的时间已经过去,则返回0。- 如果发生错误,则返回-1,并设置
errno
以指示错误类型。使用
fd_set:大小128字节,1024个bit位(32位机器)
fd_set
是一个位向量,表示文件描述符集,其中每一位对应一个文件描述符。使用以下宏来操作fd_set
:
FD_ZERO(fd_set *set)
:将set
中的所有位清零。FD_SET(int fd, fd_set *set)
:将set
中对应于fd
的位设置为1。FD_CLR(int fd, fd_set *set)
:将set
中对应于fd
的位清零。FD_ISSET(int fd, fd_set *set)
:如果set
中对应于fd
的位被设置,则返回非零值(真)。设置文件描述符
select可以同时监视多个文件描述符(套接字)。
此时需要先将文件描述符集中到一起。集中时也要按照监视项(接收,传输,异常)进行区分,即按照上述3种监视项分成三类。
使用fd_set数组变量执行此项操作,该数组是存有0和1的位数组。
数组是从下标0开始,最左端的位表示文件描述符0。如果该位值为1,则表示该文件描述符是监视对象。
图上显然监视对象为fd1和fd3。“是否应当通过文件描述符的数字直接将值注册到fd_set变量?”
当然不是!操作fd_set的值由如下宏来完成:
写一个基于select的TCP服务器
辅助库
用于封装和处理 IP 地址及其端口号:InetAddr.hpp
#include <iostream> #include <string> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> class InetAddr { private: void ToHost(const struct sockaddr_in &addr) { _port = ntohs(addr.sin_port); // _ip = inet_ntoa(addr.sin_addr); char ip_buf[32]; // inet_p to n // p: process // n: net // inet_pton(int af, const char *src, void *dst); // inet_pton(AF_INET, ip.c_str(), &addr.sin_addr.s_addr); ::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf)); _ip = ip_buf; } public: InetAddr(const struct sockaddr_in &addr):_addr(addr) { ToHost(addr); } InetAddr() {} bool operator == (const InetAddr &addr) { return (this->_ip == addr._ip && this->_port == addr._port); } std::string Ip() { return _ip; } uint16_t Port() { return _port; } struct sockaddr_in Addr() { return _addr; } std::string AddrStr() { return _ip + ":" + std::to_string(_port); } ~InetAddr() { } private: std::string _ip; uint16_t _port; struct sockaddr_in _addr; };
日志库:Log.hpp
#include <iostream> #include <sys/types.h> #include <unistd.h> #include <ctime> #include <cstdarg> #include <fstream> #include <cstring> #include <pthread.h> #include "LockGuard.hpp" namespace log_ns { enum { DEBUG = 1, INFO, WARNING, ERROR, FATAL }; std::string LevelToString(int level) { switch (level) { case DEBUG: return "DEBUG"; case INFO: return "INFO"; case WARNING: return "WARNING"; case ERROR: return "ERROR"; case FATAL: return "FATAL"; default: return "UNKNOWN"; } } std::string GetCurrTime() { time_t now = time(nullptr); struct tm *curr_time = localtime(&now); char buffer[128]; snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d", curr_time->tm_year + 1900, curr_time->tm_mon + 1, curr_time->tm_mday, curr_time->tm_hour, curr_time->tm_min, curr_time->tm_sec); return buffer; } class logmessage { public: std::string _level; pid_t _id; std::string _filename; int _filenumber; std::string _curr_time; std::string _message_info; }; #define SCREEN_TYPE 1 #define FILE_TYPE 2 const std::string glogfile = "./log.txt"; pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER; // log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , ); class Log { public: Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE) { } void Enable(int type) { _type = type; } void FlushLogToScreen(const logmessage &lg) { printf("[%s][%d][%s][%d][%s] %s", lg._level.c_str(), lg._id, lg._filename.c_str(), lg._filenumber, lg._curr_time.c_str(), lg._message_info.c_str()); } void FlushLogToFile(const logmessage &lg) { std::ofstream out(_logfile, std::ios::app); if (!out.is_open()) return; char logtxt[2048]; snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s", lg._level.c_str(), lg._id, lg._filename.c_str(), lg._filenumber, lg._curr_time.c_str(), lg._message_info.c_str()); out.write(logtxt, strlen(logtxt)); out.close(); } void FlushLog(const logmessage &lg) { // 加过滤逻辑 --- TODO LockGuard lockguard(&glock); switch (_type) { case SCREEN_TYPE: FlushLogToScreen(lg); break; case FILE_TYPE: FlushLogToFile(lg); break; } } void logMessage(std::string filename, int filenumber, int level, const char *format, ...) { logmessage lg; lg._level = LevelToString(level); lg._id = getpid(); lg._filename = filename; lg._filenumber = filenumber; lg._curr_time = GetCurrTime(); va_list ap; va_start(ap, format); char log_info[1024]; vsnprintf(log_info, sizeof(log_info), format, ap); va_end(ap); lg._message_info = log_info; // 打印出来日志 FlushLog(lg); } ~Log() { } private: int _type; std::string _logfile; }; Log lg; #define LOG(Level, Format, ...) \ do \ { \ lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \ } while (0) #define EnableScreen() \ do \ { \ lg.Enable(SCREEN_TYPE); \ } while (0) #define EnableFILE() \ do \ { \ lg.Enable(FILE_TYPE); \ } while (0) };
给日志库上锁,保证线程安全:LockGuard.hpp
#include <pthread.h> class LockGuard { public: LockGuard(pthread_mutex_t *mutex):_mutex(mutex) { pthread_mutex_lock(_mutex); } ~LockGuard() { pthread_mutex_unlock(_mutex); } private: pthread_mutex_t *_mutex; };
基于TCP的Socket封装
使得Socket的使用更加面向对象。
#include <iostream> #include <cstring> #include <functional> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/wait.h> #include <pthread.h> #include <memory> #include "Log.hpp" #include "InetAddr.hpp" //以下是对socket的封装,方便面向对象式的使用socket namespace socket_ns { using namespace log_ns; class Socket; using SockSPtr = std::shared_ptr<Socket>;//Socket是虚基类,实际上是拿TcpSocket //定义的对象 enum//创建失败的常量 { SOCKET_ERROR = 1, BIND_ERROR, LISTEN_ERR }; const static int gblcklog = 8;//监听队列默认大小。 // 模版方法模式 class Socket { public: virtual void CreateSocketOrDie() = 0; virtual void CreateBindOrDie(uint16_t port) = 0; virtual void CreateListenOrDie(int backlog = gblcklog) = 0; virtual SockSPtr Accepter(InetAddr *cliaddr) = 0; virtual bool Conntecor(const std::string &peerip, uint16_t peerport) = 0; virtual int Sockfd() = 0; virtual void Close() = 0; virtual ssize_t Recv(std::string *out) = 0;//进行读取 virtual ssize_t Send(const std::string &in) = 0;//进行发送 public: void BuildListenSocket(uint16_t port)//创建监听套接字 { CreateSocketOrDie(); CreateBindOrDie(port); CreateListenOrDie(); } //创建客户端套接字 bool BuildClientSocket(const std::string &peerip, uint16_t peerport) { CreateSocketOrDie(); return Conntecor(peerip, peerport); } // void BuildUdpSocket() // {} }; class TcpSocket : public Socket { public: TcpSocket() { } //监听套接字初始化/构造函数式的初始化 TcpSocket(int sockfd) : _sockfd(sockfd) { } ~TcpSocket() { } void CreateSocketOrDie() override { // 1. 创建socket _sockfd = ::socket(AF_INET, SOCK_STREAM, 0); if (_sockfd < 0) { LOG(FATAL, "socket create error\n"); exit(SOCKET_ERROR); } LOG(INFO, "socket create success, sockfd: %d\n", _sockfd); // 3 } void CreateBindOrDie(uint16_t port) override//bind { struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = INADDR_ANY; // 2. bind sockfd 和 Socket addr if (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0) { LOG(FATAL, "bind error\n"); exit(BIND_ERROR); } LOG(INFO, "bind success, sockfd: %d\n", _sockfd); // 3 } //监听 void CreateListenOrDie(int backlog) override { // 3. 因为tcp是面向连接的,tcp需要未来不断地能够做到获取连接 if (::listen(_sockfd, gblcklog) < 0) { LOG(FATAL, "listen error\n"); exit(LISTEN_ERR); } LOG(INFO, "listen success\n"); } //方便获取客户端地址,accept获取一个新的文件描述符 //而该文件描述符本质就是ip+端口号 //之前我们使用文件描述符都是面向过程,都是作为函数参数进行传递的 //我们需要面向对象的使用套接字,我们将得到的IO文件描述符设置进套接字里面 //返回该套接字 //using SockSPtr = std::shared_ptr<Socket>;//Socket是虚基类,实际上是拿TcpSocket //定义的对象 SockSPtr Accepter(InetAddr *cliaddr) override { struct sockaddr_in client; socklen_t len = sizeof(client); // 4. 获取新连接:得到一个新的文件描述符,得到新的客户端 int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len); if (sockfd < 0) { LOG(WARNING, "accept error\n"); return nullptr; } *cliaddr = InetAddr(client); LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", cliaddr->AddrStr().c_str(), sockfd); return std::make_shared<TcpSocket>(sockfd); // C++14 } //连接目标服务器(是否成功) //客户端ip和端口号 bool Conntecor(const std::string &peerip, uint16_t peerport) override { struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(peerport); //将IPv4地址的字符串形式转换为网络字节顺序的二进制形式, //并将其存储在server.sin_addr中 ::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr); int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server)); if (n < 0) { return false; } return true; } int Sockfd()//文件描述符 { return _sockfd; } void Close() { if (_sockfd > 0) { ::close(_sockfd); } } ssize_t Recv(std::string *out) override//读到的消息 { char inbuffer[4096]; //从sockfd中读 ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0); if (n > 0) { inbuffer[n] = 0; //这里不能是=,不可以覆盖式的读取,因为每一次可能并不是读取到一条完整的报文 // "len"\r\n // "len"\r\n"{json}"\r\n //向上面的情况如果覆盖的读取将读取不到完整的报文了 //所以要用+= *out += inbuffer; } return n; } ssize_t Send(const std::string &in) override { return ::send(_sockfd, in.c_str(), in.size(), 0); } private: int _sockfd; // 可以是listensock,普通socketfd }; // class UdpSocket : public Socket // {}; } // namespace socket_n
代码逻辑:
- 命名空间和类定义:
- 定义了一个命名空间
socket_ns
,用于封装Socket相关的类和函数。- 定义了一个基类
Socket
,它是一个抽象类,提供了Socket操作的基本接口,如创建、绑定、监听、接收连接、发送和接收数据等。- 定义了一个派生类
TcpSocket
,它继承自Socket
类,并实现了所有虚函数,提供了TCP Socket的具体实现。- Socket基类:
- 定义了多个纯虚函数,包括创建Socket、绑定、监听、接受连接、连接服务器、获取文件描述符、关闭Socket、接收和发送数据等。
- 提供了一个构建监听Socket的成员函数
BuildListenSocket
,它依次调用创建Socket、绑定和监听函数来初始化监听Socket。- 提供了一个构建客户端Socket的成员函数
BuildClientSocket
,它调用创建Socket和连接服务器函数来初始化客户端Socket。- TcpSocket类:
- 实现了
Socket
类中的所有纯虚函数,提供了TCP Socket的具体实现。- 在构造函数中,可以初始化一个已存在的文件描述符,或者通过调用
CreateSocketOrDie
函数创建一个新的Socket文件描述符。CreateSocketOrDie
函数用于创建一个新的Socket文件描述符。CreateBindOrDie
函数用于将Socket绑定到一个指定的端口上。CreateListenOrDie
函数用于将Socket设置为监听模式,以便接受连接。Accepter
函数用于接受一个新的连接,并返回一个表示该连接的TcpSocket
对象。Conntecor
函数用于连接到一个指定的服务器。Sockfd
函数用于获取Socket的文件描述符。Close
函数用于关闭Socket。Recv
函数用于从Socket接收数据。Send
函数用于向Socket发送数据。- 日志和错误处理:
- 使用了自定义的日志系统(
log_ns
命名空间中的LOG
宏)来记录日志和错误信息。- 在发生错误时,使用
exit
函数终止程序,并传递一个错误码。- 内存管理:
- 使用了智能指针(
std::shared_ptr
)来管理TcpSocket
对象的内存,以避免内存泄漏。服务器代码
该服务器仅用于对select应用的测试, 没有上层逻辑,不完整。
#pragma once #include <iostream> #include <sys/select.h> #include "Socket.hpp" #include "Log.hpp" #include "InetAddr.hpp" using namespace socket_ns; class SelectServer { //位图有多少个bit位,就定义多大 const static int gnum = sizeof(fd_set) * 8; const static int gdefaultfd = -1; public: SelectServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()) { _listensock->BuildListenSocket(_port); } void InitServer() { for (int i = 0; i < gnum; i++) { fd_array[i] = gdefaultfd;//初始化辅助数组 } fd_array[0] = _listensock->Sockfd(); // 默认直接添加listensock到数组中 } // 处理新连接的 void Accepter() { // 我们叫做连接事件就绪,等价于读事件就绪 InetAddr addr; int sockfd = _listensock->Accepter(&addr); // 会不会被阻塞!一定不会! if (sockfd > 0) { LOG(DEBUG, "get a new link, client info %s:%d\n", addr.Ip().c_str(), addr.Port()); // 已经获得了一个新的sockfd // 接下来我们可以读取吗?绝对不能读!读取的时候,条件不一定满足 // 谁最清楚底层fd的数据是否就绪了呢??通过select! // select 为什么等待的fd会越来越多? //listensockt在获取新链接的同时,要把新链接添加到select当中 // 想办法把新的fd添加给select,由select统一进行监管。怎么做到?? // 只要将新的fd,添加到fd_array中即可! bool flag = false; for (int pos = 1; pos < gnum; pos++) { if (fd_array[pos] == gdefaultfd) { flag = true; fd_array[pos] = sockfd;//添加fd LOG(INFO, "add %d to fd_array success!\n", sockfd); break; } } if (!flag)//表示没有缺省值,已经添加满了 { LOG(WARNING, "Server Is Full!\n"); ::close(sockfd);//select无法监管,关闭fd } } } // 处理普通的fd就绪的 void HandlerIO(int i) { // 下面的读写对吗? // 普通的文件描述符,正常的读写 char buffer[1024]; ssize_t n = ::recv(fd_array[i], buffer, sizeof(buffer) - 1, 0); // 这里读取会阻塞吗?不会,已经就绪 if (n > 0) { buffer[n] = 0; std::cout << "client say# " << buffer << std::endl; std::string content = "<html><body><h1>hello Linux</h1></body></html>"; std::string echo_str = "HTTP/1.0 200 OK\r\n"; echo_str += "Content-Type: text/html\r\n"; echo_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n"; echo_str += content; // echo_str += buffer; //一个fd被新的accept创建的时候,读写缓冲区基本都是空的,所以在这可以直接向 //fd中写 ::send(fd_array[i], echo_str.c_str(), echo_str.size(), 0); // 临时方案 } else if (n == 0)//连接关闭了 { LOG(INFO, "client quit...\n"); // 关闭fd ::close(fd_array[i]); // select 不要在关心这个fd了 fd_array[i] = gdefaultfd; } else//读出错了 { LOG(ERROR, "recv error\n"); // 关闭fd ::close(fd_array[i]); // select 不要在关心这个fd了 fd_array[i] = gdefaultfd; } } // 一定会存在大量的fd就绪,可能是普通sockfd,也可能是listensockfd void HandlerEvent(fd_set &rfds) { // 事件派发 for (int i = 0; i < gnum; i++) { if (fd_array[i] == gdefaultfd) continue; // fd一定是合法的fd // 合法的fd不一定就绪, 判断fd是否就绪? if (FD_ISSET(fd_array[i], &rfds))//看看文件描述符在不在rfds中 { // 读事件就绪 // 1. listensockfd 2. normal sockfd就绪? if (_listensock->Sockfd() == fd_array[i]) { Accepter(); } else { HandlerIO(i); } } } } void Loop() { while (true) { // 1. 文件描述符进行初始化 fd_set rfds;//读文件fd集 FD_ZERO(&rfds);//将set中的所有位清零 int max_fd = gdefaultfd; // 2. 合法的fd 添加到rfds集合中 for (int i = 0; i < gnum; i++) { if (fd_array[i] == gdefaultfd) continue; FD_SET(fd_array[i], &rfds); // 2.1 更新出最大的文件fd的值 if (max_fd < fd_array[i]) { max_fd = fd_array[i]; } } struct timeval timeout = {30, 0};//超时时间 // _listensock->Accepter();// 不能,listensock && accept 我们把他也看做IO类的函数。 // 只关心新链接到来,等价于读事件就绪! // 只关心读事件,监控监听套接字(socket)的读事件->是否有新链接 int n = ::select(max_fd + 1, &rfds, nullptr, nullptr, nullptr /*&timeout*/); // 临时 switch (n) { case 0://服务器select超时 //timeout.tv_sec:这个成员变量表示超时时间中的秒数部分 //timeout.tv_usec:这个成员变量表示超时时间中的微秒数部分。 LOG(DEBUG, "time out, %d.%d\n", timeout.tv_sec, timeout.tv_usec); break; case -1: LOG(ERROR, "select error\n"); break; default: //LOG(DEBUG, "time out, %d.%d\n", timeout.tv_sec, timeout.tv_usec); LOG(INFO, "haved event ready, n : %d\n", n); // 如果事件就绪,但是不处理,select会一直通知我,直到我处理了! HandlerEvent(rfds);//处理事件 PrintDebug(); // sleep(1); break; } } } void PrintDebug()//打印出所有合法的fd { std::cout << "fd list: "; for (int i = 0; i < gnum; i++) { if (fd_array[i] == gdefaultfd) continue; std::cout << fd_array[i] << " "; } std::cout << "\n"; } ~SelectServer() {} private: uint16_t _port; std::unique_ptr<Socket> _listensock; // select要正常工作,需要借助一个辅助数组,来保存所有合法fd //方便对rfds进行重置 int fd_array[gnum]; };
该代码实现了一个基于
select
方法的TCP服务器,其主要逻辑可以分为以下几个部分:
- 初始化服务器:
- 在构造函数中,通过
_listensock
成员(一个std::unique_ptr<TcpSocket>
)创建一个监听套接字,并绑定到指定的端口上。InitServer
方法用于初始化一个固定大小的fd_array
数组,用于存储所有当前被select
监控的文件描述符(包括监听套接字和已接受的客户端连接)。监听套接字的文件描述符被直接放入数组的第一个位置。- 接受新连接:
Accepter
方法用于处理监听套接字上的新连接。当有新连接到来时,它会接受这个连接,并将新连接的文件描述符添加到fd_array
数组中(如果有空位的话)。如果没有空位,则关闭新连接的文件描述符。- 处理IO事件:
HandlerIO
方法用于处理普通文件描述符(即客户端连接)的就绪事件。它读取客户端发送的数据,并回复一个简单的HTTP响应。如果读取到0字节(表示连接关闭),或者读取出错,则关闭文件描述符,并从fd_array
中移除它。- 事件循环:
Loop
方法是服务器的主循环,它不断使用select
函数来等待文件描述符的就绪事件。每次循环,它都会重新构建rfds
集合,只包含当前fd_array
中有效的文件描述符。然后,它调用select
等待这些文件描述符的就绪事件。- 当
select
返回时,HandlerEvent
方法被调用以处理就绪的事件。如果是监听套接字就绪,则调用Accepter
接受新连接;如果是普通文件描述符就绪,则调用HandlerIO
处理IO事件。- 调试和日志:
PrintDebug
方法用于打印当前所有被select
监控的文件描述符,以便于调试。- 使用
LOG
宏进行日志记录,帮助追踪服务器的运行状态。- 资源管理:
- 使用
std::unique_ptr<Socket>
自动管理监听套接字的生命周期。- 在
Accepter
和HandlerIO
中,如果无法将新连接添加到fd_array
或遇到读取错误,会关闭相应的文件描述符,并从fd_array
中移除它。测试服务器
#include "SelectServer.hpp" #include <memory> int main(int argc, char *argv[]) { if (argc != 2) { std::cerr << "Usage: " << argv[0] << " local-port" << std::endl; exit(0); } uint16_t port = std::stoi(argv[1]); EnableScreen(); std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port); svr->InitServer(); svr->Loop(); return 0; }
- 使用
std::make_unique<SelectServer>(port)
创建一个SelectServer
类型的std::unique_ptr
智能指针svr
,并将命令行参数指定的端口号传递给SelectServer
的构造函数。std::make_unique
是一个C++14引入的函数模板,用于创建并返回一个拥有给定类型对象的std::unique_ptr
。- 调用
svr->InitServer()
初始化服务器。这个函数的具体实现应该包括设置监听端口、创建socket等准备工作。- 调用
svr->Loop()
进入服务器的事件循环。在这个循环中,服务器将等待并处理客户端的连接请求、接收数据、发送响应等。使用浏览器访问,服务器收到请求,并处理返回。通过
select
方法,它能够在单个线程中高效地管理多个客户端连接。然而,需要注意的是,由于fd_array
的大小是固定的,这限制了服务器能够同时处理的客户端连接数量。在实际应用中,可能需要采用更高级的多路复用技术(如poll
、epoll
)或引入线程池来处理更多的并发连接。小结
特点
- 可监控的文件描述符个数取决于 sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)= 512, 每 bit 表示一个文件描述符, 则我服务器上支持的最大文件描述符是 512*8=4096.
- 将 fd 加入 select 监控集的同时, 还要再使用一个数据结构 array 保存放到 select监控集中的 fd:一是用于再 select 返回后, array 作为源数据和 fd_set 进行 FD_ISSET 判断;二是 select 返回后会把以前加入的但并无事件发生的 fd 清空, 则每次开始select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先), 扫描 array 的同时取得 fd 最大值 maxfd, 用于 select 的第一个参数。
缺点
- 每次调用 select, 都需要手动设置 fd 集合, 从接口使用角度来说也非常不便.
- 每次调用 select, 都需要把 fd 集合从用户态拷贝到内核态, 这个开销在 fd 很多时会很大
- 同时每次调用 select 都需要在内核遍历传递进来的所有 fd, 这个开销在 fd 很多时也很大
- select 支持的文件描述符数量太小
3.详解多路转接 之poll
poll函数介绍
定位:只负责进行等,不进行拷贝。
作用:与select一样,等待多个fd,事件一旦就绪,就进行IO
fds
是一个指向pollfd
结构数组的指针,每个pollfd
结构都指定了一个要监视的文件描述符和感兴趣的事件。nfds
是数组fds
中元素的数量,即要监视的文件描述符的数量。timeout
指定了函数等待 I/O 事件发生的超时时间(以毫秒为单位)。如果timeout
为-1
,则poll
将无限期地等待,直到至少有一个文件描述符就绪;如果timeout
为0
,为非阻塞IO,poll
将立即返回,不会等待任何文件描述符就绪。返回值
- 正整数(>0):
- 表示在调用期间,至少有一个文件描述符的状态发生了指定的变化(如可读、可写或出现错误)。具体地说,这个正整数表示状态发生变化的文件描述符的数量。此时,调用者需要通过检查
pollfd
结构体数组的revents
字段来确定哪些文件描述符的状态发生了变化。- 0:
- 表示在指定的超时时间内,没有任何文件描述符的状态发生变化。这通常意味着所有被监控的文件描述符都处于非就绪状态,或者指定的超时时间已经到达。
- -1:
- 表示
poll
函数调用过程中发生了错误。此时,可以通过检查全局变量errno
来获取具体的错误原因。常见的错误包括无效的文件描述符、系统资源不足等。pollfd 结构
不同于 select 使用三个位图来表示三个 fdset 的方式, poll 使用一个 pollfd 的指针实现
pollfd
结构体用于指定要监视的文件描述符和事件:struct pollfd { int fd; /* 文件描述符 */ short events; /* 感兴趣的事件 */ short revents; /* 返回的事件 */ };
fd
是要监视的文件描述符。events
是请求监视的事件集合,可以通过位或操作组合多个事件,如POLLIN
(有数据可读)、POLLOUT
(写操作不再阻塞)等。revents
是由poll
函数返回时设置的事件集合,表示在fd
上实际发生了哪些事件。events 和 revents 的取值:每个事件都是宏
使用 poll
使用
poll
时,你首先需要准备一个pollfd
结构体数组,每个元素都指定了要监视的文件描述符和感兴趣的事件。然后,调用poll
函数并传入这个数组。poll
函数会阻塞等待(除非timeout
指定为 0),直到至少有一个文件描述符就绪,或者超时发生。最后,你可以通过检查每个pollfd
结构体的revents
字段来确定哪些文件描述符就绪,并据此执行相应的操作。1.用户告诉内核,你要我关心哪些fd(设置参数fd)上的哪些事件(设置参数events);
2.内核告诉用户,你要我关心哪些fd(设置参数fd)上的哪些事件(设置参数revents);
因为接口设置的好,就无需对fd的事件进行重新设定了
优点和缺点
优点:
- 相比
select
,poll
没有文件描述符数量的硬限制(尽管实际上仍然受到系统资源的限制)。poll
的接口更加清晰和灵活,可以指定对每个文件描述符感兴趣的具体事件。缺点:
- 当监视的文件描述符数量非常多时,
poll
的效率可能会下降,因为它仍然需要遍历整个pollfd
数组来检查哪些文件描述符就绪。poll
的可移植性可能不如select
,因为并非所有系统都提供了poll
函数。
写一个基于poll的TCP服务器
该服务器实现思路与select一样,只是用了poll函数:
#include <iostream> #include <poll.h> #include "Socket.hpp" #include "Log.hpp" #include "InetAddr.hpp" using namespace socket_ns; class PollServer { const static int gnum = sizeof(fd_set) * 8; const static int gdefaultfd = -1; public: PollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()) { _listensock->BuildListenSocket(_port); } void InitServer() { for (int i = 0; i < gnum; i++) { fd_events[i].fd = gdefaultfd; fd_events[i].events = 0; fd_events[i].revents = 0; } fd_events[0].fd = _listensock->Sockfd(); // 默认直接添加listensock到数组中 fd_events[0].events = POLLIN;//关心读事件 } // 处理新连接的 void Accepter() { // 我们叫做连接事件就绪,等价于读事件就绪 InetAddr addr; int sockfd = _listensock->Accepter(&addr); // 会不会被阻塞!一定不会! if (sockfd > 0) { LOG(DEBUG, "get a new link, client info %s:%d\n", addr.Ip().c_str(), addr.Port()); // 已经获得了一个新的sockfd // 接下来我们可以读取吗?绝对不能读!读取的时候,条件不一定满足 // 谁最清楚底层fd的数据是否就绪了呢??通过select! // 想办法把新的fd添加给select,由select统一进行监管。怎么做到?? // select 为什么等待的fd会越来越多?? // 只要将新的fd,添加到fd_array中即可! bool flag = false; for (int pos = 1; pos < gnum; pos++) { if (fd_events[pos].fd == gdefaultfd) { flag = true; fd_events[pos].fd = sockfd; fd_events[pos].events = POLLIN; LOG(INFO, "add %d to fd_array success!\n", sockfd); break; } } if (!flag) { LOG(WARNING, "Server Is Full!\n"); ::close(sockfd); // 扩容 // 添加 } } } // 处理普通的fd就绪的 void HandlerIO(int i) { // 下面的读写对吗? // 普通的文件描述符,正常的读写 char buffer[1024]; ssize_t n = ::recv(fd_events[i].fd, buffer, sizeof(buffer) - 1, 0); // 这里读取会阻塞吗?不会 if (n > 0) { buffer[n] = 0; std::cout << "client say# " << buffer << std::endl; std::string content = "<html><body><h1>hello bite</h1></body></html>"; std::string echo_str = "HTTP/1.0 200 OK\r\n"; echo_str += "Content-Type: text/html\r\n"; echo_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n"; echo_str += content; // echo_str += buffer; ::send(fd_events[i].fd, echo_str.c_str(), echo_str.size(), 0); // 临时方案 } else if (n == 0) { LOG(INFO, "client quit...\n"); // 关闭fd ::close(fd_events[i].fd); // select 不要在关心这个fd了 fd_events[i].fd = gdefaultfd; fd_events[i].events = 0; fd_events[i].revents = 0; } else { LOG(ERROR, "recv error\n"); // 关闭fd ::close(fd_events[i].fd); // select 不要在关心这个fd了 fd_events[i].fd = gdefaultfd; fd_events[i].events = 0; fd_events[i].revents = 0; } } // 一定会存在大量的fd就绪,可能是普通sockfd,也可能是listensockfd void HandlerEvent() { // 事件派发 for (int i = 0; i < gnum; i++) { if (fd_events[i].fd == gdefaultfd) continue; // fd一定是合法的fd // 合法的fd不一定就绪, 判断fd是否就绪? if (fd_events[i].revents & POLLIN) { // 读事件就绪 // 1. listensockfd 2. normal sockfd就绪? if (_listensock->Sockfd() == fd_events[i].fd) { Accepter(); } else { HandlerIO(i); } } } } void Loop() { int timeout = 1000; while (true) { // _listensock->Accepter();// 不能,listensock && accept 我们把他也看做IO类的函数。只关心新链接到来,等价于读事件就绪! int n = ::poll(fd_events, gnum, timeout); // 临时 switch (n) { case 0: LOG(DEBUG, "time out\n"); break; case -1: LOG(ERROR, "poll error\n"); break; default: // LOG(DEBUG, "time out, %d.%d\n", timeout.tv_sec, timeout.tv_usec); LOG(INFO, "haved event ready, n : %d\n", n); // 如果事件就绪,但是不处理,select会一直通知我,直到我处理了! HandlerEvent(); PrintDebug(); // sleep(1); break; } } } void PrintDebug() { std::cout << "fd list: "; for (int i = 0; i < gnum; i++) { if (fd_events[i].fd == gdefaultfd) continue; std::cout << fd_events[i].fd << " "; } std::cout << "\n"; } ~PollServer() {} private: uint16_t _port; std::unique_ptr<Socket> _listensock; // 1. poll要正常工作,需要借助一个辅助数组,来保存所有合法fd struct pollfd fd_events[gnum]; };
- 构造函数 (
PollServer(uint16_t port)
):
- 初始化服务器端口和监听套接字(
_listensock
),并绑定和监听该端口。- 初始化 (
InitServer
):
- 准备
poll
所需的fd_events
数组,将监听套接字(_listensock
)的文件描述符添加到数组中,并设置其事件为POLLIN
(表示对读事件感兴趣)。- 接受新连接 (
Accepter
):
- 当监听套接字的读事件就绪时(即有新连接到来),接受该连接,并尝试将新连接的文件描述符添加到
fd_events
数组中(如果数组未满)。- 如果数组已满,则关闭新连接并打印警告信息。
- 处理IO事件 (
HandlerIO
):
- 对除监听套接字外的其他文件描述符(即已连接的客户端套接字)的读事件进行处理。
- 读取客户端发送的数据,并回显一个简单的HTTP响应。
- 如果读取到EOF(
n == 0
),则关闭该连接,并从fd_events
数组中移除其文件描述符。- 如果读取发生错误,则同样关闭连接并移除其文件描述符。
- 处理事件 (
HandlerEvent
):
- 遍历
fd_events
数组,检查哪些文件描述符的就绪事件(revents
)与期望的事件(events
)相匹配。- 对于监听套接字的读就绪事件,调用
Accepter
方法接受新连接。- 对于其他套接字的读就绪事件,调用
HandlerIO
方法处理数据。- 主循环 (
Loop
):
- 使用
poll
函数等待文件描述符集合中的任何文件描述符就绪。- 根据
poll
的返回值(就绪的文件描述符数量),调用HandlerEvent
方法处理就绪的事件。- 如果
poll
超时,则打印超时信息。- 如果
poll
调用失败,则打印错误信息。- 打印调试信息 (
PrintDebug
):
- 打印当前
fd_events
数组中所有非默认(非-1
)文件描述符的值,用于调试目的。小结
虽然poll能 挂的fd没有上限,但是poll的底层,也需要遍历所有的fd,因此不够高效,为了解决这个问题,就有了epoll。
· 请看下篇文章Linux——IO模型_多路转接(epoll)-CSDN博客