poll
- 1.poll初始
- 2.poll函数接口
- 3.poll服务器
- 4.poll的优点缺点
点赞👍👍收藏🌟🌟关注💖💖
你的支持是对我最大的鼓励,我们一起努力吧!😃😃
1.poll初始
poll也是一种linux中多路转接的方案。它所对应的多路转接方案主要是解决select两个问题。
- select的文件描述符有上限的问题
- select每次都要重新设置关心的fd
下面通过poll接口来认识它是怎么解决select的问题的。
2.poll函数接口
struct pollfd * fds:这里可以把它想象一个动态数组、数组或者new/malloc出来的结构体数组
nfds_t nfds:代表这个数组的长度
int timeout:纯输入型,时间单位ms
- 大于0:在timeout以内 阻塞,超过timeout非阻塞返回一次
- 等于0 :非阻塞
- 小于<0:阻塞
这个和select一模一样的意思。用起来更简单了。
返回值:同select一模一样
- 大于0:表示有几个fd就绪了
- 等于0:表示超时了
- 小于0:表示poll等待失败了
poll的作用和select一模一样:只负责等待!
这个struct pollfd 结构体 在传给poll表示 用户->内核:
int fd:你要关心一下这个fd哦
short events:关心的是这个fd的什么事件。我们把对应的事件设置进events里
输入看:fd+events
当poll返回时这个struct pollfd 结构体 表示内核->用户:
你要关心的fd上面的events中有那些事件已经就绪啦
short revents:就绪事件由revents返回
输出看:fd+revents
很显然这种设计解决了这样的问题:
- 输入输出分离!
现在,用户->内核,内核->用户,events和revents的分离!以前select就用一张位图表示不同含义,因为输入输出分离了所以决定了poll不需要对参数进行重新设定
events和revents类型是整数,对应的事件如下:
其中对我们来说常用的是POLLIN、POLLOUT、POLLERR ,这些都是大写的宏每一个占一个比特位,不同比特位表示不同事件。
所以用户->内核,只要将events设置成要关心的宏值,那么操作系统就帮我们进行关心了。当操作系统返回时只要把revents设置成对应的宏值,不就把那些事件就绪不就告诉我们了吗。
因为它的类型是short而没有用操作系统自己封装的各种各样的结构体,所以对于事件的设计,我们自己用户检测事件有没有设置或者就绪一定要由我们自己来做,按位与,按位或这样的操作。
- select等待fd有上限的问题
struct pollfd *fds不是一个数组吗,nfds_t nfds不就是该数组大小也就是上限吗,你怎么说poll解决了select等待fd上限的问题?
select是一个具体的数据类型fd_set,既然是一个具体的类型那就直接决定了数据类型大小只能由你的编译环境自己定,今天不一样了,因为这个数组由我们自己说的算!
3.poll服务器
前面不是写了select服务器吗,现在我们把它改成poll服务器
错误码封装
#pragma once
enum
{
USAGG_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR
};
日志封装
#pragma once
#include<iostream>
#include<string>
#include<stdio.h>
#include <cstdarg>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include<fstream>
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
#define LOG_NORMAL "log.txt"
#define LOG_ERR "log.error"
const char* level_to_string(int level)
{
switch(level)
{
case DEBUG: return "DEBUG";
case NORMAL: return "NORMAL";
case WARNING: return "WARNING";
case ERROR: return "ERROR";
case FATAL: return "FATAL";
}
}
//时间戳变成时间
char* timeChange()
{
time_t now=time(nullptr);
struct tm* local_time;
local_time=localtime(&now);
static char time_str[1024];
snprintf(time_str,sizeof time_str,"%d-%d-%d %d-%d-%d",local_time->tm_year + 1900,\
local_time->tm_mon + 1, local_time->tm_mday,local_time->tm_hour, \
local_time->tm_min, local_time->tm_sec);
return time_str;
}
void logMessage(int level,const char* format,...)
{
//[日志等级] [时间戳/时间] [pid] [message]
//[WARNING] [2024-3-21 10-46-03] [123] [创建sock失败]
#define NUM 1024
//获取时间
char* nowtime=timeChange();
char logprefix[NUM];
snprintf(logprefix,sizeof logprefix,"[%s][%s][pid: %d]",level_to_string(level),nowtime,getpid());
//
char logconten[NUM];
va_list arg;
va_start(arg,format);
vsnprintf(logconten,sizeof logconten,format,arg);
std::cout<<logprefix<<logconten<<std::endl;
};
套接字封装
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"
using namespace std;
class Sock
{
const static int backlog = 32;
public:
static int sock()
{
// 1. 创建socket文件套接字对象
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
logMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "create socket success: %d", sock);
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt));
return sock;
}
static void Bind(int sock,int port)
{
// 2. bind绑定自己的网络信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind socket error");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success");
}
static void Listen(int sock)
{
// 3. 设置socket 为监听状态
if (listen(sock, backlog) < 0)
{
logMessage(FATAL, "listen socket error");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "listen socket success");
}
static int Accept(int listensock, std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
logMessage(ERROR, "accept error, next");
else
{
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
*clientip = inet_ntoa(peer.sin_addr);
*clientport = ntohs(peer.sin_port);
}
return sock;
}
};
调用逻辑
#include "pollServer.hpp"
#include "err.hpp"
#include <memory>
static void usage(std::string proc)
{
std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}
string service(string request)
{
return request;
}
int main(int argc,char* argv[])
{
if(argc != 2)
{
usage(argv[0]);
exit(USAGG_ERR);
}
unique_ptr<pollServer> usl(new pollServer(service,atoi(argv[1])));
usl->initServer();
usl->start();
return 0;
今天poll服务器,也是需要一个数组。只不过以前select数组纯纯的保存文件描述符,poll这里必须是保存struct pollfd结构体的数组。
一般我们如果把fd设置为-1或者小于0的值,操作系统就不会关注这样的文件描述符了。它只会关心大于等于0的fd。
因此我们要重新定义一个指针,构造析构都跟着改一下
class pollServer
{
static const int defaultport = 8080;
static const int defaultfd = -1;
static const int defaultnum=2048;
using func_t=function<string(string)>;
public:
pollServer(func_t f,int port = defaultport) : _cbs(f),_port(port), _listensock(-1), _rfds(nullptr)
{
}
~pollServer()
{
if (_listensock != defaultfd)
close(_listensock);
if (_rfds)
delete[] _rfds;
}
private:
int _listensock;
int _port;
struct pollfd* _rfds;
func_t _cbs;
};
接下来初始化服务器这里创建结构体数组大小自己随意定
void initServer()
{
// 1.创建套接字
_listensock = Sock::sock();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
_rfds=new struct pollfd[defaultnum];//大小这里自己随便定
for (int i = 0; i < defaultnum; ++i)//数组初始化
{
_rfds[i].fd = defaultfd;
_rfds[i].events=0;
_rfds[i].revents=0;
}
_rfds[0].fd = _listensock; // 这个位置后面就不变了
_rfds[0].events=POLLIN; //告诉内核帮我关心_listensock读事件
}
打印这里也改一下
void print()
{
for (int i = 0; i < defaultnum; ++i)
{
if (_rfds[i].fd != defaultfd)
cout << _rfds[i].fd << " ";
}
cout << endl;
}
现在当我们启动服务之后,就不需要每次调用select之前都需要重新设置fd了添加到读文件描述符集里面了,然后才能添加到select里面。现在直接把数组给poll。所以能明显感觉到poll比select简单
void start()
{
int timenout=1000;
for (;;)
{
int n=poll(_rfds,defaultnum,timenout);
switch (n)
{
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(WARNING, "poll error, code: %d, err string: %s", errno, strerror(errno));
break;
default:
// 说明有事件就绪了,目前只有一个监听事件就绪了
logMessage(NORMAL, "have event ready!");
HandlerEvent();//这里不用传了,因为就绪事件就在_rfds里
break;
}
}
}
今天这里我们只处理读事件就绪的情况
// 1.handler event _rfds 中,不仅仅是有一个fd是就绪的,可能存在多个
// 2.我们的poll目前只处理了read事件
void HandlerEvent()
{
// 你怎么知道那些fd就绪了呢? 我不知道,我只能遍历
for (int i = 0; i < defaultnum; ++i)
{
// 不合法fd
if (_rfds[i].fd == defaultfd)
continue;
// 合法fd,但必须曾经向内核设置过帮我关心对应fd读事件才能往下走
if (!(_rfds[i].events & POLLIN))
continue;
if (_rfds[i].fd == _listensock && _rfds[i].revents & POLLIN)
Accepter(_listensock);
else if (_rfds[i].revents & POLLIN)
Recver(i);
}
处理_listensock读就绪事件
void Accepter(int listensock)
{
logMessage(DEBUG, "Accepter in");
// 走到这里,accept 函数,会不会被阻塞?
// 走到这里就是, poll 告送我,_listensock就绪了,然后才能执行下面代码
string clientip;
uint16_t clientport;
int sock = Sock::Accept(listensock, &clientip, &clientport); accept = 等 + 获取
if (sock < 0)
return;
logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);
// 得到一个sock套接字后,然后我们可以直接进行read/recv吗? 不能,整个代码只有poll有资格检测事件是否就绪
// 将新的sock 托管给poll!
// 将新的sock,托管给poll的本质,其实就是将sock,添加到_rfds数组里!
int i = 0;
for (; i < defaultnum; ++i)
{
if (_rfds[i].fd != defaultfd)
continue;
else
break;
}
if (i == defaultnum)
{
logMessage(WARNING, "server if full, please wait");
close(sock);
}
else
{
_rfds[i].fd = sock;
_rfds[i].events = POLLIN;
_rfds[i].revents = 0;
}
print();
logMessage(DEBUG, "Accepter out");
}
处理普通sock读就绪事件
void ResetItem(int i)
{
_rfds[i].fd = defaultfd;
_rfds[i].events = 0;
_rfds[i].revents = 0;
}
void Recver(int pos)
{
logMessage(DEBUG, "in Recver");
// 1. 读取request
// 这样读取是有问题的!
char buffer[1024];
ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 这里在进行读取的时候,会不会被阻塞?
if (s > 0) // 读取成功
{
buffer[s] = 0;
logMessage(NORMAL, "client# %s", buffer);
}
else if (s == 0) // 对方关闭了文件描述符
{
close(_rfds[pos].fd);
ResetItem(pos);
logMessage(NORMAL, "client quit");
return;
}
else // 读取失败
{
close(_rfds[pos].fd);
ResetItem(pos);
logMessage(ERROR, "client quit: %s", strerror(errno));
return;
}
// 2. 处理request
std::string response = _cbs(buffer);
// 3. 返回response
// write bug
write(_rfds[pos].fd, response.c_str(), response.size());
logMessage(DEBUG, "out Recver");
}
自此poll服务器就已经写完了,很显然poll服务器主体代码和select服务器一模一样,只不过poll在进行事件监听的时候明显要比select简洁,而且数组没有上限。
poll服务器完整代码
#pragma once
#include <iostream>
#include <functional>
#include <poll.h>
#include "sock.hpp"
using namespace std;
class pollServer
{
static const int defaultport = 8080;
static const int defaultfd = -1;
static const int defaultnum = 2048;
using func_t = function<string(string)>;
public:
pollServer(func_t f, int port = defaultport) : _cbs(f), _port(port), _listensock(-1), _rfds(nullptr)
{
}
void initServer()
{
// 1.创建套接字
_listensock = Sock::sock();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
_rfds = new struct pollfd[defaultnum]; // 大小这里自己随便定
for (int i = 0; i < defaultnum; ++i) ResetItem(i);
_rfds[0].fd = _listensock; // 这个位置后面就不变了
_rfds[0].events = POLLIN; // 告诉内核帮我关心_listensock读事件
}
void print()
{
for (int i = 0; i < defaultnum; ++i)
{
if (_rfds[i].fd != defaultfd)
cout << _rfds[i].fd << " ";
}
cout << endl;
}
void Accepter(int listensock)
{
logMessage(DEBUG, "Accepter in");
// 走到这里,accept 函数,会不会被阻塞?
// 走到这里就是, poll 告送我,_listensock就绪了,然后才能执行下面代码
string clientip;
uint16_t clientport;
int sock = Sock::Accept(listensock, &clientip, &clientport); accept = 等 + 获取
if (sock < 0)
return;
logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);
// 得到一个sock套接字后,然后我们可以直接进行read/recv吗? 不能,整个代码只有poll有资格检测事件是否就绪
// 将新的sock 托管给poll!
// 将新的sock,托管给select的本质,其实就是将sock,添加到fdarray数组里!
int i = 0;
for (; i < defaultnum; ++i)
{
if (_rfds[i].fd != defaultfd)
continue;
else
break;
}
if (i == defaultnum)
{
logMessage(WARNING, "server if full, please wait");
close(sock);
}
else
{
_rfds[i].fd = sock;
_rfds[i].events = POLLIN;
_rfds[i].revents = 0;
}
print();
logMessage(DEBUG, "Accepter out");
}
void ResetItem(int i)
{
_rfds[i].fd = defaultfd;
_rfds[i].events = 0;
_rfds[i].revents = 0;
}
void Recver(int pos)
{
logMessage(DEBUG, "in Recver");
// 1. 读取request
// 这样读取是有问题的!
char buffer[1024];
ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 这里在进行读取的时候,会不会被阻塞?
if (s > 0) // 读取成功
{
buffer[s] = 0;
logMessage(NORMAL, "client# %s", buffer);
}
else if (s == 0) // 对方关闭了文件描述符
{
close(_rfds[pos].fd);
ResetItem(pos);
logMessage(NORMAL, "client quit");
return;
}
else // 读取失败
{
close(_rfds[pos].fd);
ResetItem(pos);
logMessage(ERROR, "client quit: %s", strerror(errno));
return;
}
// 2. 处理request
std::string response = _cbs(buffer);
// 3. 返回response
// write bug
write(_rfds[pos].fd, response.c_str(), response.size());
logMessage(DEBUG, "out Recver");
}
// 1.handler event _rfds 中,不仅仅是有一个fd是就绪的,可能存在多个
// 2.我们的poll目前只处理了read事件
void HandlerEvent()
{
// 你怎么知道那些fd就绪了呢? 我不知道,我只能遍历
for (int i = 0; i < defaultnum; ++i)
{
// 不合法fd
if (_rfds[i].fd == defaultfd)
continue;
// 合法fd,但必须曾经向内核设置过帮我关心对应fd读事件才能往下走
if (!(_rfds[i].events & POLLIN))
continue;
if (_rfds[i].fd == _listensock && _rfds[i].revents & POLLIN)
Accepter(_listensock);
else if (_rfds[i].revents & POLLIN)
Recver(i);
}
}
void start()
{
int timenout = -1;
for (;;)
{
int n = poll(_rfds, defaultnum, timenout);
switch (n)
{
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(WARNING, "poll error, code: %d, err string: %s", errno, strerror(errno));
break;
default:
// 说明有事件就绪了,目前只有一个监听事件就绪了
logMessage(NORMAL, "have event ready!");
HandlerEvent(); // 这里不用传了,因为就绪事件就在_rfds里
break;
}
}
}
~pollServer()
{
if (_listensock != defaultfd)
close(_listensock);
if (_rfds)
delete[] _rfds;
}
private:
int _listensock;
int _port;
struct pollfd *_rfds;
func_t _cbs;
};
4.poll的优点缺点
poll的优点就不用过多介绍,输入输出分离,而且没有select上限的问题
poll的主要缺点依旧是遍历问题,因为我们交给poll多个文件描述符,poll在底层去遍历去查找。随着等待的文件描述符变多,poll要线性遍历的方式检测所有文件描述符,这势必会带来效率的降低 。
正是因为poll有这样的问题,所有才有了下一个多路转接之epoll