1. 初识select
系统提供select函数来实现多路复用输入/输出模型.
-
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的;
-
程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变
-
select只负责等待,可以等待多个fd,select本身无数据拷贝的能力,拷贝需要read、write来完成
2. select接口
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct tomeval *timeout);
参数:nfds:select要监视多个fd中值最大的fd+1
其余的都是输入输出型参数
timeout:nullptr:阻塞时等待,直到有一个fd有数据
struct timeval timeout={0,0},非阻塞式等待
{5,0},5s内阻塞式,超过5s非阻塞返回一次,5s内有任意fd就绪,则会直接返回timeout返回的为剩余的时间
返回值:>0:返回几就是有几个fd就绪了
=0:超时返回
<0:select调用失败
select关心的事件值有三类:读、写、异常 -- 对于任意的fd都是如此
fd_set:位图,表示文件描述符的集合
readfds:
输入:表示用户告诉内核,你要帮我关心集合readfds中的所有的fd的读事件----哪些fd上的读事件内核你要关心。比特位的位置目标是fd的数值;比特位的内容,表示是否关心。
输出:return>0时,内核告诉用户,你所关心的用户fd中,有哪些fd已经就绪了---比特位的位置表示fd的数值;比特位的内容表示哪些fd上面对应的事件已经就绪了。
通过上面的方式,让用户和内核沟通,知晓对方所关心的fd。
其余两个参数也是同样的道理。
位图操作:
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组 set 中相关 fd 的位int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组 set 中相关 fd 的位是否为真void FD_SET(int fd, fd_set *set); // 用来设置描述词组 set 中相关 fd的位void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
3. 细节
- listensocksock首先交给select,listensock的链接就绪事件==读事件就绪(因为listensock是通过三次握手成功后得到的,也就是表明客户端要想服务端发消息)
- accept在没有链接时,会等待(阻塞式的),所以需要将listensock交给select,否则就跟阻塞式IO相同了
- 编写代码时需要自己维护一个存储所有可用fd的数组(_fdArray),可以同容器、数组、动态数组等实现
- fd是有上限的:sizeof(fd_set)*8 = 1024
- 每次select前,都要找到最大的fd(遍历fd数组),并将合法的fd设置大rfds中
- accept后,产生新的sock,再将其加入(加入到fd数组中未被设置过的位置,若数组满就关闭当前的sock)到fd数组中,让select来处理。
4. demo代码实现
下面的代码只考虑了写事件。
makefile
select_server: main.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f select_server
err.hpp
#pragma once
#include <iostream>
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR
};
log.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <stdarg.h>
#include <ctime>
#include <unistd.h>
using namespace std;
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3 // 出错可运行
#define FATAL 4 // 致命错误
const char* to_levelStr(int level)
{
switch ((level))
{
case DEBUG: return "DEBUG";
case NORMAL: return "NORMAL";
case WARNING: return "WARNING";
case ERROR: return "ERROR";
case FATAL: return "FATAL";
default: return nullptr;
}
}
void logMessage(int level, const char* format, ...) // ... 可变参数列表
{
#define NUM 1024
char logPreFix[NUM];
snprintf(logPreFix, sizeof(logPreFix), "[%s][%ld][pid: %d]", to_levelStr(level), (long int)time(nullptr), getpid());
char logContent[NUM];
va_list arg;
va_start(arg, format);
vsnprintf(logContent, sizeof(logContent), format, arg);
cout << logPreFix << logContent << endl;
}
sock.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "log.hpp"
#include "err.hpp"
class Sock
{
const static int gbacklog = 32;
public:
static int Socket()
{
// 1.创建socket文件套接字对象
int sock = socket(AF_INET, SOCK_STREAM, 0); // 第二个参数与UDP不同
if (sock < 0)
{
// 创建套接字失败
logMessage(FATAL, "created socket error!");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "created socket success: %d!", sock);
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
return sock;
}
static void Bind(int sock, int port)
{
// 2.bind绑定自己的网络信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind socket error!");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success!");
}
static void Listen(int sock)
{
// 3.设置socket 为监听状态
if (listen(sock, gbacklog) < 0) // 第二个参数backlog后面会讲 5的倍数
{
logMessage(FATAL, "listen socket error!");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "listen socket success!");
}
static int Accept(int listensock, std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(listensock, (struct sockaddr *)&peer, &len); // sock 和client进行通信
if (sock < 0)
logMessage(ERROR, "accept error, next!");
else
{
logMessage(NORMAL, "accept a new link success, get new sock: %d!", sock); // ?
*clientip = inet_ntoa(peer.sin_addr);
*clientport = ntohs(peer.sin_port);
}
return sock;
}
};
selectServer.hpp
#pragma once
#include <string>
#include <iostream>
#include <functional>
#include "sock.hpp"
#include "log.hpp"
#include "err.hpp"
namespace select_ns
{
static const int defaultport = 8080;
static const int fd_num = sizeof(fd_set) * 8;
static const int defaultfd = -1;
using func_t = std::function<std::string (const std::string&)>;
class SelectServer
{
public:
SelectServer(func_t func, int port = defaultport)
: _port(port), _listenSock(-1), _fdArray(nullptr), _func(func)
{
}
void Print()
{
std::cout << "fd list: ";
for (int i = 0; i < fd_num; i++)
{
if (_fdArray[i] != defaultfd)
std::cout << _fdArray[i] << " ";
}
std::cout << std::endl;
}
void Accepter(int listenSock)
{
// 走到这里accept不会阻塞 listensock套接字已经就绪了
string clientIp;
uint16_t clientPort = 0;
int sock = Sock::Accept(listenSock, &clientIp, &clientPort);
if (sock < 0)
return;
logMessage(NORMAL, "accept success [%s:%d]", clientIp.c_str(), clientPort);
// sock 我们能直接recv/read吗?--不能 整个代码 只有select有资格检测事件是否就绪
// 将新的sock交给select
// 将新的sock托管给select的本质,将sock添加到_fdArray数组中
int i;
for (i = 0; i < fd_num; i++)
{
if (_fdArray[i] != defaultfd)
continue;
else
break;
}
if (i == fd_num)
{
logMessage(WARNING, "server is full, please wait!");
close(sock);
}
else
{
_fdArray[i] = sock;
}
Print();
}
void Recver(int sock, int pos)
{
// 1.读取
// 这样读取有问题!不能保证是否读取到一个完整的报文
char buffer[1024];
ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0); // 这里在进行recv时,不会被阻塞,因为走到这里时文件描述符已经就绪了
if (s > 0)
{
buffer[s] = 0;
logMessage(NORMAL, "client# %s", buffer);
}
else if (s == 0)
{
close(sock);
_fdArray[pos] = defaultfd;
logMessage(NORMAL, "client quit");
return;
}
else
{
close(sock);
_fdArray[pos] = defaultfd;
logMessage(ERROR, "client quit: %s", strerror(errno));
return;
}
// 2.处理request
std::string response = _func(buffer);
// 3.返回response
// write
write(sock, response.c_str(), response.size());
}
// handler event 中 不仅仅是有一个fd就绪,可能有多个
// 我们的select只处理了read
void HandlerReadEvent(fd_set &rfds)
{
for (int i = 0; i < fd_num; i++)
{
// 过滤掉非法的fd
if (_fdArray[i] == defaultfd)
continue;
// 下面的为正常的fd
// 正常的fd不一定就绪
// 目前一定是listen套接字
if (FD_ISSET(_fdArray[i], &rfds) && _fdArray[i] == _listenSock)
Accepter(_listenSock);
else
Recver(_fdArray[i], i);
}
}
void initServer()
{
_listenSock = Sock::Socket();
Sock::Bind(_listenSock, _port);
Sock::Listen(_listenSock);
// logMessage(NORMAL, "creat socket..");
_fdArray = new int[fd_num];
for (int i = 0; i < fd_num; i++)
_fdArray[i] = defaultfd;
_fdArray[0] = _listenSock; // 不变了
}
void start()
{
for (;;)
{
fd_set rfds;
FD_ZERO(&rfds);
int maxfd = _fdArray[0];
for (int i = 0; i < fd_num; i++)
{
if (defaultfd == _fdArray[fd_num])
continue;
FD_SET(_fdArray[i], &rfds); // 将合法fd全部添加到读文件描述符集中
if (maxfd < _fdArray[i])
maxfd = _fdArray[i];
}
// struct timeval timeout = {1, 0};
// int n = select(_listenSock+1, &rfds, nullptr, nullptr, &timeout);
// 一般而言 要使用select 需要程序员维护一个保存所有合法fd 的数组!
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(WARNING, "select error, code: %d, err string: %s", errno, strerror(errno));
break;
default:
// 说明有时间就绪了,目前只有一个监听事件就绪
logMessage(NORMAL, "get a new link...");
HandlerReadEvent(rfds);
break;
}
sleep(1);
// 下面为阻塞式写法
// std::string clientIp;
// uint16_t clientPort = 0;
// int sock = Sock::Accept(_listenSock, &clientIp, &clientPort);
// if(sock < 0) continue;
// // 开始进行服务器处理逻辑
}
}
~SelectServer()
{
if (_listenSock < 0)
close(_listenSock);
if (_fdArray)
delete[] _fdArray;
}
private:
int _port;
int _listenSock;
int *_fdArray;
func_t _func;
};
}
main.cc
#include "selectServer.hpp"
#include "err.hpp"
#include <memory>
using namespace std;
using namespace select_ns;
static void usage(std::string proc)
{
std::cerr << "Usage:\n\t" << proc << " prot" << std::endl;
}
std::string transaction(const string& request)
{
return request;
}
int main(int argc, char* argv[])
{
if(argc != 2)
{
usage(argv[0]);
exit(USAGE_ERR);
}
unique_ptr<SelectServer> svr(new SelectServer(transaction, atoi(argv[1])));
svr->initServer();
svr->start();
return 0;
}
实验结果如下 :
使用telnet作为客户端向服务器发消息时,收到了服务端的响应。
5. select优缺点
- select能同时等待的文件fd是有上限的,除非修改内核,否则无法解决
- select必须借助第三方数组来维护合法的fd
- select的大部分参数是输入输出型的,调用select前要重新设置所有的fd,调用之后还要检查更新所有的fd,遍历是有成本的。
- select为什么第一个参数是最大fd+1? 确定遍历范围--内核层面
- select采用位图,用户和内核之间通信,要来回进行数据拷贝,会有拷贝成本