目录
编辑
一,多路复用
1,IO的分类
2,IO的效率
二,Linux环境下实现通信的多路复用
1,select
select的特点:
参数:
操作函数:
返回值:
使用select实现网络通信:
2,poll
poll的特点:
poll的参数:
返回值:
pollfd结构体:
使用poll进行网络通信:
3,epoll
epoll介绍:
epoll中的关键函数:
一,多路复用
1,IO的分类
在系统当中,IO的方式有多种。如:
1,阻塞式IO
2,非阻塞轮询式IO
3,多路复用/多路转接式IO
4,信号驱动式IO
5,异步IO
这些IO的基本使用方法,大家可以去搜索了解一下。今天我们来重点的谈谈多路复用IO。
2,IO的效率
IO的本质其实就是等待+拷贝。拷贝一般都是要拷贝的,但是等待时间是可以减少的。减少等待时间的IO方式就是高级IO。减少等待时间也就意味着IO的效率的提高。
二,Linux环境下实现通信的多路复用
要实现网络通信,要让一个服务端的能够对接多个客户端的请求。如何做到呢?多进程?多线程?以上的方式都可以做到。但是,可不可以就开一个进程来实现一个服务端给多个客户端进行服务呢?答案当然是可以的。解决方案就叫作多路复用。
1,select
select的原型如下:
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
select的特点:
select函数就是为了实现多路复用而设计的,所以select可以一次性等待多个文件描述符。
参数:
nfds:这个参数表示接收到的文件描述符+1。
所以在接收发来请求的文件描述符时我们需要去找到这些文件描述符的最大值。(所以文件描述符也要被管理起来,所以需要使用一个辅助数组)。
readfds:设置为读状态的文件描述符便被设置在这个readfds位图内。
writefds:设置为写状态的文件描述符被设置进入这个位图内。
exceptfds:设置为异常状态的文件描述符被设置在这个位图内。
timeout:时间结构体对象,通过这个参数能够设置select等待的时间,一般这个参数被设置为nullptr表示阻塞式等待。
操作函数:
1,位图操作函数
void FD_CLR(int fd, fd_set *set);
// 将位图的某一个位置变成0.
int FD_ISSET(int fd, fd_set *set);
//查看某一个文件描述符是否被设置
void FD_SET(int fd, fd_set *set);
//设置某一个文件描述符到位图中
void FD_ZERO(fd_set *set);
//将位图清零
返回值:
返回值为-1:代表等待失败
返回值为0:代表超过等待时间
返回值为其他数字:代表等待的文件描述符的个数
使用select实现网络通信:
1,准备工作
首先,按照规则我们得先把套接字创建出来然后bind并设置为监听状态。代码如下:
#include"log.hpp"
#include"Socket.hpp"
#include<iostream>
class SelectServer
{
public:
SelectServer()
{ }
void Init()
{
Sock_.Sock();//创建套接字
Sock_.Bind();//绑定套接字
Sock_.Listen();//监听套接字
}
~SelectServer()
{
}
private:
Socket Sock_;
};
当然,这里我使用了封装后的Socket类,在后面会有这个Socket类的源码。
在搭建好这个SelectServer服务器的框架后便可以开始启动服务器开始服务了。
2,Start函数
void Start()
{
fd_set rfds; // 创建读状态位图
fd_array[0] = Sock_.sockfd_; //把0位置的值设为监听套接字
while (1)
{
FD_ZERO(&rfds); //清空位图
FD_SET(Sock_.sockfd_, &rfds); // 把监听套接字设置进入到rfds位图里面
int max_fd = fd_array[0];
for (int i = 0; i < max_num; i++) // 第二个循环,找到最大的fd,并且要将位图更新。
{
int fd = fd_array[i];
if (fd == -1)
{
continue;
}
FD_SET(fd, &rfds);
if (fd > max_fd)
{
max_fd = fd;
lg(Info, "max_fd change");
}
}
int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)//根据返回值来判断做什么处理
{
case -1:
lg(Info, "select err");
break;
case 0:
lg(Info, "timeout");
break;
default:
lg(Info, "get a link");
//开始处理事件
break;
}
}
}
在Start函数这里,我们要做的便是要将select函数用起来。所以在前期会有很多的准备工作:
1,创建位图,将位图清空,还要将监听套接字设置到位图内。
2,要找到最大的文件描述符,就要循环遍历的找。所以需要一个辅助数组来存储历史上的文件描述符。
3,调用select接口后要根据返回值来决定执行的操作。
#注意:位图的清空工作是要循环进行的,如果不循环进行就会发生错误,因为这里的位图是一个输入输出型参数,在select以后会改变。当然,我们也可以使用一个temp位图来接收rfds并代替rfds传入select中。这样也就不需要每次都要清空rfds了。
3,事件处理操作
void HandleEvents(fd_set &rfds) // 处理事件
{
for (int i = 0; i < max_num; i++)//循环遍历的把fd_array数组里面的描述符遍历一遍
{
int fd = fd_array[i];
if (fd == defaultfd)//如果是-1那就继续遍历
continue;
if(FD_ISSET(fd,&rfds))//如果已经被设置到位图当中了
{
if (fd == Sock_.sockfd_) // 如果是监听套接字
{
std::string clientip;
int port;
int sock = Sock_.Accept(&clientip, &port);
if (sock < 0)
{
lg(Fatal, "accept err");
continue;
}
lg(Info, "get a new link,ip:%s,port:%d", clientip.c_str(), port);
int pos = 1;
for (; pos < max_num; pos++)
{
if (fd_array[pos] != -1)
{
continue;
}
break;
}
if (pos == max_num)
{
lg(Warning, "fd_arry is full");
}
else
{
fd_array[pos] = sock; // 加入到数组内
}
}
else
{
// 开始读取
char buffer[1024];//这里的读取是有bug的,因为可能会粘包或者读取不完整,要定制协议才能比较好的解决。
int n = read(fd, buffer, sizeof(buffer));
if (n == -1) // 读取失败
{
fd_array[i] = defaultfd; // 重新置为-1
close(fd);
lg(Warning, "read err");
}
else if (n == 0) // 客户端断开连接
{
fd_array[i] = defaultfd; // 重新置为-1
close(fd);
lg(Warning, "client break");
}
else
{
buffer[n] = 0;
std::cout << "get a message: " << buffer << std::endl;
}
}
}
}
}
上述的事件处理操作只处理了读的情况,也可以自己写代码处理一下写的情况。
2,select的优缺点
优点:
select的优点便是可以等待多个文件描述符,这样便可以让accept不需要等待直接获取文件描述 符。减少等待时间,变成高级IO。
缺点:
1,select等待的文件描述符的个数是有上限的。因为select中的位图是一个结构体,在linux中这个位图能够放下的文件描述符最多为1024个。
2,使用select时拷贝的次数过多。
2,poll
poll的特点:
为了解决select的等待的文件描述符数量有上限的问题,后面便发展出来了poll。poll也是一个实现多路复用的函数。并且,poll不需要多次的重置。
#include <poll.h>
int poll(struct pollfd fds[], nfds_t nfds, int timeout);
poll的参数:
fds[]
:一个指向struct pollfd
数组的指针,其中包含要监视的文件描述符以及它们关联的事件。nfds
:fds[]
数组中元素的数量。这个参数的类型其实就是一个整型变量,所以用户想填多少就填多少。而且,因为第一个参数其实是一个指针所以可以扩容,所以实现了可以无限的接收文件描述符的特点。timeout
:在毫秒级别指定poll()
调用的超时时间。传递-1
表示永远等待,传递0
表示立即返回,传递正整数表示等待指定的毫秒数。
返回值:
poll()
函数的返回值表示发生事件的文件描述符的数量,或者出现错误时返回 -1
。
pollfd结构体:
struct pollfd {
int fd; // 要监视的文件描述符
short events; // 事件掩码(要监视的事件)
short revents; // 实际发生的事件(由内核填充)
};
参数:
fd
:要监视的文件描述符。events
:要监视的事件掩码,可以是POLLIN
(可读事件)、POLLOUT
(可写事件)、POLLERR
(错误事件)等。revents
:由内核填充的实际发生的事件,可能是POLLIN
、POLLOUT
、POLLERR
等。、
使用poll进行网络通信:
这一份代码其实是在select的基础上改动的,所以不做仔细地介绍。体会一下polll的特点就可以了。代码:
#pragma once
#include "log.hpp"
#include "Socket.hpp"
#include<poll.h> //引入头文件
#include <iostream>
#define max_num 1024
#define defaultfd -1
class SelectServer
{
public:
SelectServer()
{
for (int i = 0; i < max_num; i++) // 初始化数组
{
rfds[i].fd = -1;
}
}
void Init()
{
Sock_.Sock(); // 创建套接字
Sock_.Bind(); // 绑定套接字
Sock_.Listen(); // 监听套接字
}
void HandleEvents() // 处理事件
{
for (int i = 0; i < max_num; i++)
{
int fd = rfds[i].fd;
if (fd == defaultfd)
continue;
if (rfds[i].revents&POLLIN) // 如果事件已经是可读状态,内核告诉用户
{
if (fd == Sock_.sockfd_) // 如果是监听套接字
{
std::string clientip;
int port;
int sock = Sock_.Accept(&clientip, &port);
if (sock < 0)
{
lg(Fatal, "accept err");
continue;
}
lg(Info, "get a new link,ip:%s,port:%d", clientip.c_str(), port);
int pos = 1;
for (; pos < max_num; pos++)//第三个循环
{
if (rfds[pos].fd != -1)
{
continue;
}
break;
}
// std::cout << pos << std::endl;
if (pos == max_num)
{
lg(Warning, "fd_arry is full");
}
else
{
rfds[pos].fd = sock; // 加入到数组内
rfds[pos].events = POLLIN;//设置事件为可读
}
}
else
{
// 开始读取
char buffer[1024];
// std::cout << "read in" << std::endl;
int n = read(fd, buffer, sizeof(buffer));
if (n == -1) // 读取失败
{
rfds[i].fd = defaultfd; // 重新置为-1
close(fd);
lg(Warning, "read err");
}
else if (n == 0) // 客户端断开连接
{
rfds[i].fd = defaultfd; // 重新置为-1
close(fd);
lg(Warning, "client break");
}
else
{
buffer[n] = 0;
std::cout << "get a message: " << buffer << std::endl;
}
}
}
}
}
void Start()
{
rfds[0].fd = Sock_.sockfd_;
rfds[0].events = POLLIN;
while (1)
{
int n = poll(rfds, max_num, 1000); // 时间填-1代表阻塞
switch (n) // 根据返回值来判断做什么处理
{
case -1:
lg(Info, "select err");
break;
case 0:
lg(Info, "timeout");
break;
default:
// 开始处理事件
HandleEvents();
break;
}
}
}
~SelectServer()
{
}
private:
Socket Sock_;
struct pollfd rfds[max_num];//加入一个结构体类型数组
};
#注意:文件描述符是否就绪时内核告诉用户的,所以使用的是revent,不是event
rfds[i].revents&POLLIN
3,epoll
epoll介绍:
epoll是在select和poll的基础上发展起来的。epoll可以解决
epoll中的关键函数:
1,int epoll_create(int size);
作用:申请一个epoll的空间。
返回值:返回一个文件描述符,这个文件描述符便标识了这个epoll空间。
参数:szie表示用户需要申请空间的大小,但是在现在的epoll_create中这个参数已经无效。
2, int epoll_wait(int epfd, struct epoll_event *even,int maxevents, int timeout);
作用:等待epoll事件在epoll实例当中发生,返回事件和文件描述符。
返回值:当返回值为-1时表示等待失败,返回值为0时表示等待超时,返回值为其它时表示就绪的文件描述符的个数。
参数:
epfd:epoll的标识符。
event:epoll事件,是一个结构体数组指针,在添加到epoll实例之前要设置文件fd和事件的状态。
maxevents:能等待的事件的最大个数。
timeout:最长等待时间。
使用这个函数等待的值都会被设置进入到events数组里面,在epoll的模型内部这个数组便是一个就绪队列。
问题:这个就绪队列是有最大长度的,如何保证我们的就绪队列不会爆范围呢?
答: 这个就绪队列在读到最大值后就会返回,剩余的数据会在下一次读取时再读取。
3,epoll_ctl: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
作用:对epoll进行操作,操作的方式有三种:
EPOLL_CTL_ADD
EPOLL_CTL_DEL
EPOLL_CTL_MOD
返回值:成功时返回数字0,失败时返回一个-1。
参数:
epfd:epoll的标识符。
op:操作,填上面三个选项。
fd:文件描述符。
event:事件,在添加之前要对这个事件进行设置(fd,events)。
这个函数是对epoll模型内的红黑树进行操作,会将事件放到红黑树的内部。是在对红黑树进行操作。
使用epoll进行网络通信代码:
#pragma once
#include "log.hpp"
#include "Socket.hpp"
#include <iostream>
#include <sys/epoll.h>
#define num 128
class EpollServer
{
public:
EpollServer()
{}
void Init()//初始化
{
Sock_.Sock();
Sock_.Bind();
Sock_.Listen();
epollfd_ = epoll_create(num); // 申请epoll空间。创建epoll模型
if (epollfd_ < 0)
{
lg(Fatal, "create epollfd_ err epollfd_:%d", epollfd_);
return;
}
lg(Info, "create epollfd_ sucess!");
}
void Handeler()//开始处理事件
{
for (int i = 0; i < num;i++)//遍历数组
{
uint32_t fd = events_[i].data.fd;
int eve = events_[i].events;
if(eve&EPOLLIN)//如果是事件是读事件
{
if(fd == Sock_.sockfd_)//处理监听
{
std::string clientip;
int clientport;
int sock = Sock_.Accept(&clientip,&clientport);
if(sock<0)
{
lg(Fatal, "sock create err");
continue;
}
lg(Info, "clientip:%s,clientport:%d", clientip.c_str(), clientport);
//添加事件到epoll模型当中
struct epoll_event eve;
eve.data.fd = sock;
eve.events = EPOLLIN;
int n = epoll_ctl(epollfd_, EPOLL_CTL_ADD, sock, &eve);
if(n<0)
{
lg(Warning, "epoll_ctl err");
}
}
else
{
char buffer[1024];
int n = read(fd, buffer, sizeof(buffer) - 1);
if(n == -1)
{
lg(Warning, "read err");
epoll_ctl(epollfd_, EPOLL_CTL_DEL, fd, events_);
close(fd);
}
else if(n == 0)
{
lg(Warning, "read err");
epoll_ctl(epollfd_, EPOLL_CTL_DEL, fd, events_);
close(fd);
}
else
{
buffer[n] = 0;
std::cout << "get a message:" << buffer << std::endl;
}
}
}
}
}
void Start()
{
// 设置和添加监听套接字
events_[0].data.fd = Sock_.sockfd_;
events_[0].events = EPOLLIN;
int n = epoll_ctl(epollfd_, EPOLL_CTL_ADD, Sock_.sockfd_, &events_[0]);
for (;;)
{
int n = epoll_wait(epollfd_, events_, 1024, -1); // 获取事件
switch (n)
{
case -1:
lg(Fatal, "epoll_wait err");
break;
case 0:
lg(Warning, "time out");
default:
// 处理事件
Handeler();
break;
}
}
}
~EpollServer()
{
Sock_.Close();
}
private:
Socket Sock_;//套接字
int epollfd_;//epoll的标识符
struct epoll_event events_[num];//epoll结构体数组,epoll模型的红黑树也是这个类型
};
在这里,如果觉得使用epoll的接口难受的话也可以和我一样对epoll进行封装,得到一个Epoller类:
#pragma once
#include<sys/epoll.h>
#include"log.hpp"
#include<iostream>
class Epoller
{
static const int size = 128;
public:
Epoller()
{
epollfd = epoll_create(size);
if(epollfd<0)
{
lg(Error, "epoll create err");
}
else
{
lg(Info, "epoll create ok");
}
}
int Epoller_Wait()
{
int n = epoll_wait(epollfd,events,size,-1);//等待epoll里面的事件就绪
if(n>=0)
{
lg(Info,"epoll wait sucess");
}
else
{
lg(Error, "epoll wait err");
}
return n;
}
int Epoll_Ctl(int oper,int sock,struct epoll_event* evs = nullptr)
{
if(oper == EPOLL_CTL_DEL)
{
int n = epoll_ctl(epollfd, oper, sock, nullptr);//删除掉某个文件描述符
if(n == 0)
{
lg(Info, "epoll delete success");
}
else
{
lg(Info, "epoll delete fail");
}
}
else
{
int n = epoll_ctl(epollfd, oper, sock, evs); // 增加或修改掉某个文件描述符
if (n == 0)
{
lg(Info, "epoll moodify success");
}
else
{
lg(Info, "epoll moodify success");
}
}
}
~Epoller()
{
if(epollfd>=0)//关掉文件描述符
{
close(epollfd);
}
}
public:
int epollfd;
struct epoll_event events[size];//数组
};
这样便可以在后面的使用当中更加方便了。