文章目录
- 一、poll函数接口
- 二、socket就绪条件
- 三、poll的优点
- 四、poll的缺点
- 五、poll使用案例--只读取数据的server服务器
- 1.err.hpp
- 2.log.hpp
- 3.sock.hpp
- 4.pollServer.hpp
- 5.main.cc
一、poll函数接口
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// pollfd结构
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
参数说明
fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.
nfds表示fds数组的长度.
timeout表示poll函数的超时时间, 单位是毫秒(ms)
时间单位:ms
1.>0:在timeout以内,阻塞,否则非阻塞返回一次
2.=0:非阻塞等待
3.<0:阻塞等待
events和revents的取值:
用户 -> 内核 :要帮我关心一下fd。输入看 :fd + events
内核告诉用户:你要关心fd上面的events中有哪些事件已经就绪了。输出时看:fd + revents
这样达到了输入和输出分离,poll不需要对参数进行重新设定,以及解决了select等待fd有上限的问题
返回结果
返回值小于0, 表示出错;
返回值等于0, 表示poll函数等待超时;
返回值大于0, 表示poll由于监听的文件描述符就绪而返回.
二、socket就绪条件
读就绪
socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0;
socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
监听的socket上有新的连接请求;
socket上有未处理的错误;
写就绪
socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记
SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0;
socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号;
socket使用非阻塞connect连接成功或失败之后;
socket上有未读取的错误;
异常就绪
socket上收到带外数据. 关于带外数据, 和TCP紧急模式相关(TCP协议头中, 有一个紧急指针的字段),
三、poll的优点
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现.
pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便.
poll并没有最大数量限制 (但是数量过大后性能也是会下降)
四、poll的缺点
poll中监听的文件描述符数目增多时,和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
五、poll使用案例–只读取数据的server服务器
1.err.hpp
#pragma once
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR
};
2.log.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <stdarg.h>
#define NORMAL 0
#define DEBUG 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
#define LOG_NORMAL "./log.txt"
#define LOG_ERR "./err.txt"
#define NUM 1024
const char *to_levelstr(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case NORMAL:
return "NORMAL";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return nullptr;
}
}
void LogMessage(int level, const char *format, ...)
{
// [日志等级] [时间戳/时间] [pid] [messge]
char logprofix[NUM];
snprintf(logprofix, sizeof logprofix, "[%s][%ld][pid:%d]", to_levelstr(level), (long int)time(nullptr), getpid());
char logcontent[NUM];
va_list arg;
va_start(arg, format);
vsnprintf(logcontent, sizeof logcontent, format, arg);
std::cout << logprofix << logcontent << std::endl;
FILE *log = fopen(LOG_NORMAL, "a");
FILE *error = fopen(LOG_ERR, "a");
if (log && error)
{
FILE *cur = nullptr;
if (level == DEBUG || level == NORMAL || level == WARNING)
cur = log;
if (level == ERROR || level == FATAL)
cur = error;
if (cur)
fprintf(cur, "%s%s\n", logprofix, logcontent);
fclose(log);
fclose(error);
}
}
3.sock.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"
class Sock
{
static const int backlog = 32;
public:
// 1. 创建socket文件套接字对象
static int Socket()
{
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
LogMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
LogMessage(NORMAL, "create socket success:%d", sock);
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof opt);
return sock;
}
// 2.bind自己的网络信息
static void Bind(int sock, const uint16_t &port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
int n = bind(sock, (struct sockaddr *)&local, sizeof local);
if (n < 0)
{
LogMessage(FATAL, "socket bind error");
exit(BIND_ERR);
}
LogMessage(NORMAL, "socket bind success");
}
// 3. 设置socket 为监听状态
static void Listen(int sock)
{
int n = listen(sock, backlog);
if (n < 0)
{
LogMessage(FATAL, "socket listen error");
exit(LISTEN_ERR);
}
LogMessage(NORMAL, "socket listen success");
}
static int Accept(int listensock, std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
memset(&peer, 0, sizeof peer);
socklen_t len = sizeof(peer);
int sock = accept(listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
{
LogMessage(ERROR, "socket accept error,next");
}
else
{
LogMessage(NORMAL, "accept a new link success, get new sock: %d", sock);
*clientip = inet_ntoa(peer.sin_addr);
*clientport = ntohs(peer.sin_port);
}
return sock;
}
};
4.pollServer.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <cerrno>
#include <string>
#include <functional>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <poll.h>
#include "sock.hpp"
#include "log.hpp"
#include "err.h"
namespace poll_ns
{
const static int defaultport = 8080;
const static int num = 2048;
const static int defaultfd = -1;
typedef std::function<std::string(const std::string &)> func_t;
class pollServer
{
public:
pollServer(const func_t &func, const uint16_t &port = defaultport)
: _port(port), _func(func), _listensock(-1), _rfds(nullptr)
{
}
void initServer()
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
_rfds = new struct pollfd[num];
for (int i = 0; i < num; i++)
ResetItem(i);
_rfds[0].fd = _listensock;
_rfds[0].events = POLLIN;
_rfds[0].revents = 0;
}
void ResetItem(int pos)
{
_rfds[pos].fd = defaultfd;
_rfds[pos].events = 0;
_rfds[pos].revents = 0;
}
void Print()
{
std::cout << "fd list: ";
for (int i = 0; i < num; i++)
{
if (_rfds[i].fd != defaultfd)
{
std::cout << _rfds[i].fd << " ";
}
}
}
void Accepter(int listensock)
{
std::string clientip;
uint16_t clientport;
int sock = Sock::Accept(listensock, &clientip, &clientport);
if (sock < 0)
return;
LogMessage(NORMAL, "accept success[%s:%d]", clientip.c_str(), clientport);
int i = 0;
for (; i < num; i++)
{
if (_rfds[i].fd == defaultfd)
continue;
else
break;
}
if (i == num)
{
LogMessage(WARNING, "server is full,please wait");
close(sock);
}
else
{
_rfds[i].fd = sock;
_rfds[i].events = POLLIN;
_rfds[i].revents = 0;
}
Print();
}
void Recver(int pos)
{
// 1. 读取request
char buffer[1024];
ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 这里在进行读取的时候,会不会被阻塞?1, 0
if (s > 0)
{
buffer[s] = 0;
LogMessage(NORMAL, "client# %s", buffer);
}
else if (s == 0)
{
close(_rfds[pos].fd);
ResetItem(pos);
LogMessage(NORMAL, "client quit");
return;
}
else
{
close(_rfds[pos].fd);
ResetItem(pos);
LogMessage(ERROR, "client error:%s", strerror(errno));
return;
}
// 2. 处理request
std::string response = _func(buffer);
// 3. 返回response
write(_rfds[pos].fd, response.c_str(), response.size());
}
void HandlerEvent()
{
for (int i = 0; i < num; i++)
{
// 过滤掉非法的fd
if (_rfds[i].fd == defaultfd)
continue;
if (!(_rfds[i].events & POLLIN))
continue;
if (_rfds[i].fd == _listensock && (_rfds[i].events & POLLIN))
Accepter(_listensock);
else if (_rfds[i].events & POLLIN)
Recver(i);
else
{
}
}
}
void start()
{
for (;;)
{
int timeout = -1;
int n = poll(_rfds, num, timeout);
switch (n)
{
case 0:
LogMessage(NORMAL, "timeout...");
break;
case -1:
LogMessage(WARNING, "select error,code: %d, err string: %s", errno, strerror(errno));
break;
default:
// 说明有事件就绪了,目前只有一个监听事件就绪了
LogMessage(NORMAL, "have event ready!");
HandlerEvent();
break;
}
}
}
~pollServer()
{
if (_listensock > 0)
close(_listensock);
if (_rfds)
delete[] _rfds;
}
private:
int _port;
int _listensock;
struct pollfd *_rfds;
func_t _func;
};
}
5.main.cc
#include "pollServer.hpp"
#include "err.hpp"
#include <memory>
using namespace std;
using namespace poll_ns;
static void Usage(const string proc)
{
std::cerr << "Usage:\n\t" << proc << " port\n\n";
}
string transaction(const string &request)
{
return "pollServer# " + request;
}
int main(int argc, char *argv[])
{
// if (argc != 2)
// {
// Usage(argv[0]);
// exit(USAGE_ERR);
// }
std::unique_ptr<pollServer> svr(new pollServer(transaction));
svr->initServer();
svr->start();
return 0;
}