目录
一、高级IO
1.1 概念
1.2 五种IO模型
1.3 小结
二、多路转接的老派
2.1 select 的作用
2.2 select 的接口
三、select 的编写
3.1 类的预先准备
3.2 类的整体框架
3.3 类的执行 Loop
四、Loop 中的回调函数
4.1 HandlerEvent
4.2 AcceptClient
4.3 ServiceIO
一、高级IO
1.1 概念
了解了网络通信相关的知识后,我们也许能直到,通信的本质就是IO,通信的核心是在两个或多个设备之间传输数据,这与计算机系统中的输入输出操作类似。
当通信时使用接收端口例如 recv 时,系统等待网络数据包从远程服务器通过网络传输到本地机器,数据包从网卡的硬件缓冲区复制到系统内存中的应用程序缓冲区;当文件读取时,系统等待磁盘将所请求的数据读取到磁盘缓冲区中,数据从磁盘缓冲区复制到系统内存中的用户空间。
所以换种说法,IO = 等待 + 拷贝
那么如何提高IO的效率呢?
当缩小了等待的时间后,IO的效率就会提高。
1.2 五种IO模型
然后,从一个钓鱼的实例引入今天的主题:
将上面的例子抽象成通信IO:
水池:OS内部缓冲区
水桶:用户缓冲区
鱼:数据
鱼竿:文件描述符
上面的五个人物分别对应了五种IO模型:
其中,前四个人都属于同步IO,即只要参与了IO的过程,那就是同步IO。田七将钓鱼的工作交给了另一个人,并没有参与IO,所以是异步IO。
阻塞 IO:在内核将数据准备好之前,系统调用会一直等待。所有的套接字,默认都是阻塞方式。阻塞 IO 是最常见的 IO 模型。
非阻塞 IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回 EWOULDBLOCK 错误码。非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询。这对 CPU 来说是较大的浪费,一般只有特定场景下才使用。
信号驱动 IO:内核将数据准备好的时候,使用 SIGIO 信号通知应用程序进行 IO 操作。
IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似。实际上最核心在于 IO 多路转接能够同时等待多个文件描述符的就绪状态。
异步 IO: 由内核在数据拷贝完成时,通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)。
1.3 小结
任何 IO 过程中,都包含两个步骤。第一是等待,第二是拷贝。而且在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间。让 IO 更高效,最核心的办法就是让等待的时间尽量少。
二、多路转接的老派
上面介绍的五人中,只有赵六真正实现了减少等待的时间,所以在IO中可以使用多路转接以达到高校IO,这里我们要介绍的就是多路转接中的老派—— select ,即使它有很多缺点,但是以为其出现的时间比较早,所以基本很多程序都会兼容它,一些比较古早的程序中也仍使用它。
2.1 select 的作用
select的主要作用是监视一组文件描述符,以查看其中哪些文件描述符处于可读、可写或有错误状态。当有时间就绪,就进行任务的派发。
2.2 select 的接口
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
其中,除了第一个参数,后三个均为输出型参数。
用户通过传入 fd_set 来告诉系统需要帮用户关注哪些文件描述符,系统则通过修改比特位告诉用户哪些文件描述符已经满足用户条件。在位图中,位数表示fd的值,0或1表示是否就绪。
用户通过传入 timeout 来表示等待的规则,struct timeval 是一个结构体,由一个表示秒的变量与一个表示微妙的变量组合,为 select 设置等待规则,如果用户设置等待 5s ,实际花费了 3s,则返回的是剩余时间 2s
- 如果
timeout
为NULL
,select
将会无限等待,直到至少一个文件描述符变得可用。 - 如果
timeout
为一个有效的timeval
结构,select
会等待指定的时间长度。如果超时,select
将返回0。 - 如果
timeout
为一个零时间的timeval
结构(即tv_sec
和tv_usec
都为0),select
会立即返回,不会等待。
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
-
FD_CLR(int fd, fd_set *set)
:- 作用:从文件描述符集合
set
中移除文件描述符fd
。将fd
在集合中的对应位清零。 - 使用场景:当不再需要监视某个文件描述符时,使用该函数将其从集合中移除。
- 作用:从文件描述符集合
-
FD_ISSET(int fd, fd_set *set)
:- 作用:检查文件描述符
fd
是否在集合set
中。如果fd
在集合中,则返回非零值;否则返回零。 - 使用场景:在
select
返回后,用于检测哪个文件描述符有事件发生。
- 作用:检查文件描述符
-
FD_SET(int fd, fd_set *set)
:- 作用:将文件描述符
fd
添加到集合set
中。将fd
在集合中的对应位置为1。 - 使用场景:在调用
select
之前,将需要监视的文件描述符添加到集合中。
- 作用:将文件描述符
-
FD_ZERO(fd_set *set)
:- 作用:初始化文件描述符集合
set
,将集合中所有的位清零。 - 使用场景:在使用
fd_set
之前,首先需要使用该函数初始化集合,以确保集合中的所有位都是零。
- 作用:初始化文件描述符集合
三、select 的编写
这里以 select_echo_server 入手,来认识熟悉 select 。
因为本篇博客是关于网络的信息,所以以下所说的文件描述符与套接字都是一个意思,即 sockfd
3.1 类的预先准备
select 需要有端口号与套接字,在套接字这里,我们选择使用TCP套接字,同时将之前编写过的TCP服务端进行进一步封装,这里使用自己封装过的类,可以省去了在程序中直接对套接字的创建、初始化与监听等工作。
下面先来看一下我们封装的Socket类,
Socket.hpp:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
namespace socket_ns
{
class Socket;
const static int gbacklog = 8;
using socket_sptr = std::shared_ptr<Socket>;
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR
};
class Socket
{
public:
virtual void CreateSocketOrDie() = 0;
virtual void BindSocketOrDie(InetAddr &addr) = 0;
virtual void ListenSocketOrDie() = 0;
virtual int Accepter(InetAddr *addr) = 0;
virtual bool Connetcor(InetAddr &addr) = 0;
virtual void SetSocketAddrReuse() = 0;
virtual int SockFd() = 0;
virtual int Recv(std::string *out) = 0;
virtual int Send(const std::string &in) = 0;
virtual void Close() = 0;
public:
void BuildListenSocket(InetAddr &addr)
{
CreateSocketOrDie();
SetSocketAddrReuse();
BindSocketOrDie(addr);
ListenSocketOrDie();
}
bool BuildClientSocket(InetAddr &addr)
{
CreateSocketOrDie();
return Connetcor(addr);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket(int fd = -1) : _sockfd(fd)
{
}
void CreateSocketOrDie() override
{
// 1. 创建流式套接字
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket error");
exit(SOCKET_ERROR);
}
LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
}
void BindSocketOrDie(InetAddr &addr) override
{
// 2. bind
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(addr.Port());
local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
int n = ::bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
if (n < 0)
{
LOG(FATAL, "bind error\n");
exit(BIND_ERROR);
}
LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
}
void ListenSocketOrDie() override
{
int n = ::listen(_sockfd, gbacklog);
if (n < 0)
{
LOG(FATAL, "listen error\n");
exit(LISTEN_ERROR);
}
LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
}
int Accepter(InetAddr *addr) override
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sockfd = ::accept(_sockfd, (struct sockaddr *)&peer, &len);
if (sockfd < 0)
{
LOG(WARNING, "accept error\n");
return -1;
}
*addr = peer;
return sockfd;
}
virtual bool Connetcor(InetAddr &addr)
{
struct sockaddr_in server;
// 构建目标主机的socket信息
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(addr.Port());
server.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
std::cerr << "connect error" << std::endl;
return false;
}
return true;
}
void SetSocketAddrReuse() override
{
int opt = 1;
::setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}
int Recv(std::string *out) override
{
char inbuffer[4096];
ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);
if (n > 0)
{
inbuffer[n] = 0;
*out = inbuffer; // ??? +=
}
return n;
}
int Send(const std::string &in) override
{
int n = ::send(_sockfd, in.c_str(), in.size(), 0);
return n;
}
int SockFd() override
{
return _sockfd;
}
void Close() override
{
if (_sockfd > -1)
::close(_sockfd);
}
private:
int _sockfd;
};
}
这里还用到了之前封装的 InetAddr 类与日志宏,详情可以看下面的博客:
Linux网络——套接字与UdpServer-CSDN博客
Linux网络——TcpServer-CSDN博客
日志宏的编写与线程池的结合-CSDN博客
Log.hpp:
#pragma once
#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "LockGuard.hpp"
bool gIsSave = false;
const std::string logname = "log.txt";
// 1. 日志是由等级的
enum Level
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL
};
void SaveFile(const std::string &filename, const std::string &message)
{
std::ofstream out(filename, std::ios::app);
if (!out.is_open())
{
return;
}
out << message;
out.close();
}
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
std::string GetTimeString()
{
time_t curr_time = time(nullptr);
struct tm *format_time = localtime(&curr_time);
if (format_time == nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
std::string levelstr = LevelToString(level);
std::string timestr = GetTimeString();
pid_t selfid = getpid();
char buffer[1024];
va_list arg;
va_start(arg, format);
vsnprintf(buffer, sizeof(buffer), format, arg);
va_end(arg);
std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer;
LockGuard lockguard(&lock);
if (!issave)
{
std::cout << message;
}
else
{
SaveFile(logname, message);
}
}
#define LOG(level, format, ...) \
do \
{ \
LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
} while (0)
#define EnableFile() \
do \
{ \
gIsSave = true; \
} while (0)
#define EnableScreen() \
do \
{ \
gIsSave = false; \
} while (0)
InetAddr:
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
class InetAddr
{
private:
void GetAddress(std::string *ip, uint16_t *port)
{
*port = ntohs(_addr.sin_port);
*ip = inet_ntoa(_addr.sin_addr);
}
public:
InetAddr(const struct sockaddr_in &addr) : _addr(addr)
{
GetAddress(&_ip, &_port);
}
InetAddr(const std::string &ip, uint16_t port) : _ip(ip), _port(port)
{
_addr.sin_family = AF_INET;
_addr.sin_port = htons(_port);
_addr.sin_addr.s_addr = inet_addr(_ip.c_str());
}
InetAddr()
{}
std::string Ip()
{
return _ip;
}
bool operator == (const InetAddr &addr)
{
if(_ip == addr._ip && _port == addr._port) // 方便测试
{
return true;
}
return false;
}
struct sockaddr_in Addr()
{
return _addr;
}
uint16_t Port()
{
return _port;
}
~InetAddr()
{
}
private:
struct sockaddr_in _addr;
std::string _ip;
uint16_t _port;
};
为了保护日志宏的线程安全,我们又使用到了之前封装的锁:
LockGuard.hpp:
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include <iostream>
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex); // 构造加锁
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
#endif
3.2 类的整体框架
我们知道 select 可以监视一组套接字,所以类的内部就需要一个数组来辅助,同时包括上面说的端口号与Tcp套接字。
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "Socket.hpp"
using namespace socket_ns;
class SelectServer
{
const static int N = sizeof(fd_set) * 8;
const static int defaultfd = -1;
public:
SelectServer(uint16_t port)
: _port(port),
_listensock(std::make_unique<TcpSocket>())
{
InetAddr addr("0", _port);
_listensock->BuildListenSocket(addr);
for (int i = 0; i < N; i++)
{
_fd_array[i] = defaultfd;
}
_fd_array[0] = _listensock->SockFd();
}
~SelectServer()
{
}
private:
uint16_t _port;
std::unique_ptr<Socket> _listensock;
int _fd_array[N]; // 辅助数组
};
在初始化时,初始化端口号是必须的,紧接着根据端口号初始化InetAddr,再根据端口号创建套接字并执行监听:
随后,要对辅助数组中的元素进行初始化,因为文件标识符不小于0,所以使用 -1 进行初始化,以后的代码只要判断数组中的该位置是否 <0 ,即可判断是否为有效的 fd 。
3.3 类的执行 Loop
在使用main函数时,只需要调用该函数就可以完成相关的操作。
这里就根据 select 的返回值进行相应的操作,比如成功、出错或超时。
int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1:
LOG(ERROR, "select error...\n");
break;
default:
LOG(DEBUG, "Event Happen. n : %d\n", n);
HandlerEvent(rfds);
break;
}
HandlerEvent是我们后续要写的一个回调函数,select 的参数中,max_fd 与 rfds 就是我们提前要进行的工作,其中,每次 select 每次都会将已就绪的套接字添加到 rfds 中。
接下来,根据 select 的传参,我们要进行两个变量的定义:
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = defaultfd;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
if (max_fd < _fd_array[i])
{
max_fd = _fd_array[i]; // 更新出最大的fd的值
}
}
首先是定义了一个 fd_set 的集合 rfds ,在select 中传入表示我们只在意读就绪的套接字。同时,当 select 不断更新已就绪的套接字,我们每次也要重新进行更新,要知道在 rfds 中保存的可能不是连续的数字,而是会自动分配当前最小的文件描述符,比如文件描述符 10 已经分配了但是用户未退,而 5 已经退了,此时再进来一个新的连接,会分配 5 而不是 11。这一步基本是使用 select 时必做的一个操作。
同时,上述的操作我们需要一直进行,每次一有新连接,价于对方给我发送数据!我们作为读事件同一处理,也就是说新连接到来等价于读事件就绪!所以我们要一直重复,把它放在 while 中:
void Loop()
{
while (true)
{
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = defaultfd;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
if (max_fd < _fd_array[i])
{
max_fd = _fd_array[i]; // 更新出最大的fd的值
}
}
struct timeval timeout = {0, 0};
int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1:
LOG(ERROR, "select error...\n");
break;
default:
LOG(DEBUG, "Event Happen. n : %d\n", n);
HandlerEvent(rfds);
break;
}
}
}
四、Loop 中的回调函数
4.1 HandlerEvent
下面根据回调函数的逻辑画了一张简略的流程图:
首先,遍历整个类成员——存放 sockfd 的数组;其次,使用 FD_ISSET 函数来确保该sockfd已就绪;随后,判断该文件描述符是否是用户的套接字,即判断是否是TCP中的 sockfd ,若不是,才会去执行最后的回调函数。
根据 FD_ISSET 的参数,很显然我们设计的该回调函数应该有 fd_set 的集合,故需要传入 Loop() 中的 fd_set 。
void HandlerEvent(fd_set &rfds)
{
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
if (FD_ISSET(_fd_array[i], &rfds))
{
if (_fd_array[i] == _listensock->SockFd())
{
AcceptClient();
}
else
{
ServiceIO(i);
}
}
}
}
4.2 AcceptClient
如果该 sockfd 是 listen 监听到的套接字,那么服务端就需要对其进行 accept 的处理,表示服务端已经收到了来自客户端的第三次握手请求,此时的返回值就是以后要使用 select 处理的返回值。
关于 accept 的介绍可以参考一下博客:Linux网络——TcpServer-CSDN博客
也就是说,使用 accept 后,返回的套接字信息,才是以后真正要进行处理的。所以这时候,又需要一次遍历,来为其返回值找到一个合适的位置。最后,还要判断该位置是否为合法位置,若合法才能进入数组,否则,添加失败。
void AcceptClient()
{
InetAddr clientaddr;
int sockfd = _listensock->Accepter(&clientaddr);
if (sockfd < 0)
return;
LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());
int pos = 1;
for (; pos < N; pos++)
{
if (_fd_array[pos] == defaultfd)
break;
}
if (pos == N)
{
::close(sockfd);
LOG(WARNING, "server is full!\n");
return;
}
else
{
_fd_array[pos] = sockfd;
LOG(DEBUG, "%d add to select array!\n", sockfd);
}
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
4.3 ServiceIO
当程序执行到这里的时候,基本可以判断该文件描述符是 select 要进行处理的 sockfd 了,这时定义的回调函数就可以根据要求任意定义了:
void ServiceIO(int pos)
{
char buffer[1024];
ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << "client say# " << buffer << std::endl;
std::string echo_string = "[server echo]# ";
echo_string += buffer;
::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);
}
else if (n == 0)
{
LOG(DEBUG, "%d is closed\n", _fd_array[pos]);
::close(_fd_array[pos]);
_fd_array[pos] = defaultfd;
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
else
{
LOG(DEBUG, "%d recv error\n", _fd_array[pos]);
::close(_fd_array[pos]);
_fd_array[pos] = defaultfd;
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
}
其中,为了方便阅读,特意写了一个 ToString 的函数:
std::string RfdsToString()
{
std::string fdstr;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
fdstr += std::to_string(_fd_array[i]);
fdstr += " ";
}
return fdstr;
}