【linux--->高级IO】

news2024/12/23 6:50:37

文章目录

    • @[TOC](文章目录)
  • 一、五种IO模型概念
    • 1.阻塞IO
    • 2.非阻塞IO
    • 3.信号驱动IO
    • 4.多路复用/多路转接IO
    • 5.异步IO
  • 二、非阻塞IO之fcntl应用
    • 1.fcntl系统调用接口介绍
    • 2.用fcntl实现非阻塞IO
  • 三、多路转接IO之select应用
    • 1.select接口介绍
    • 2.使用select实现多路转接IO
    • select的优缺点
  • 四、多路转接IO之poll应用
    • 1.poll接口介绍
    • 2.使用poll实现多路转接IO
    • 3.poll特点
  • 五、多路转接IO之epoll应用
    • 1.epoll接口介绍
    • 2.epoll原理
    • 3.用epoll实现多路转接IO
    • 4.reactor服务器概念
    • 5.epoll工作模式
  • 六、reactor服务器模式设计
    • 1.多线程模式设计理念
    • 2.master slaver模式设计理念

一、五种IO模型概念

IO的本质是等待+数据拷贝,而等待的本质是检测IO事件是否就绪.因为操作系统将系统缓冲区数据拷贝的用户缓冲区需要系统缓冲区数据就绪以及用户缓冲区空间就绪,从用户缓冲区拷贝数据到系统缓冲区需要用户缓冲区同样如此.达到IO条件叫做IO时间就绪.根据等待和数据拷贝的方式方法的不同衍生出了五种IO模型.

1.阻塞IO

在内核数据没有准备好之前,读/写系统调用会一直阻塞.

2.非阻塞IO

不管内核数据有没有准备好,读/写系统调用都会直接返回,而不是阻塞.

3.信号驱动IO

不用每次都调用系统调用检测IO事件是否就绪,IO事件就绪时,系统调用发送SIGIO信号通知进程,进程捕捉信号,设置递达动作处理处理数据.

4.多路复用/多路转接IO

同时等待多个IO事件就绪,有IO事件就绪就处理,使处理IO事件更加高效.

5.异步IO

由内核在后台数据拷贝完成时, 通知应用程序使用数据(而信号驱动是告诉应用程序何时可以开始拷贝数据).
在这里插入图片描述

二、非阻塞IO之fcntl应用

1.fcntl系统调用接口介绍

int fcntl(int fd, int cmd, ... /* arg */ );
功能:操作一个打开的文件描述符,操作有cmd参数决定
参数:fd为被操作的文件描述符,cmd为fd的操作指令宏值,...可变参数如何使用由cmd决定.

cmd的可选宏值:
	F_GETFL:获取文件的访问模式和文件状态标志,可变参数忽略
	F_SETFL:用可变参数指定的宏值设置文件状态标志,在Linux操作系统下,此命令只能修改
O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME和O_NONBLOCK标志。

返回值:如果调用成功,返回值与取决于cmd命令,cmd是F_GETFL返回值就是文件描述标志,cmd除了F_DUPFD, F_GETFD,F_GETFL,F_GETLEASE,F_GETOWN,F_GETSIG, F_GETPIPE_SZ这些命令其他命令调用成功后返回值都是0.失败返回-1.

2.用fcntl实现非阻塞IO

#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstdio>
#include <vector>
#include <functional>
using namespace std;

void SetNnblock(int fd)
{
    int flag = fcntl(fd, F_GETFL);
    if (flag != -1)
    {
        fcntl(fd, F_SETFL, flag | O_NONBLOCK);
    }
}

// 任务加载容器
using func = function<void(void)>;
vector<func> task;

// 其他任务
void Task1() { cout << "task1" << endl; }
void Task2() { cout << "task2" << endl; }
void Task3() { cout << "task3" << endl; }

void LoadTask()
{
    task.push_back(Task1);
    task.push_back(Task2);
    task.push_back(Task3);
}

int main()
{
    // 加载任务
    LoadTask();

    // 设置fd非阻塞属性
    SetNnblock(0);

    char buffer[2048];
    while (true)
    {
        // 执行其他任务
        for (int i = 0; i < task.size(); i++)
            task[i]();

        // 非阻塞读
        ssize_t n = read(0, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n - 1] = 0;
            cout << ">: " << buffer << endl;
        }
        else
        {
         	if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
                // 底层数据没有准备好,希望你下次继续来检测
                HandlerALLTask();
                sleep(1);

                std::cout << "data not ready" << std::endl;
                continue;
            }
            else if (errno == EINTR)
            {
                // 这次IO被信号中断,也需要重新读取
                continue;
            }
        }
        sleep(1);
    }
    return 0;
}

三、多路转接IO之select应用

1.select接口介绍

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
功能:程序一次等待多个文件描述符,这个文件描述符可以是用来链接的或者是IO数据的,直到这个文件描述符的IO事件为就绪状态.

参数: nfds:需要被等待的文件描述符其中的最大值+1;

	timeout:时间参数,timeval结构体可以设置秒和微米两个选项.如果两个参数都为0,表示select不阻塞等待,如果设置为null表示没有文件描述符就绪就一直阻塞等待,结构体其中一个参数>0表示这个时间内阻塞等待直到与文件描述符就绪,函数返回,timeout被内核设置剩余时间反馈回来,或者超时直接返回,timeout被设置为0.
		struct timeval
					{
						long tv_sec; /* seconds秒 */
              			long tv_usec;  /* microseconds */
					}
					
	readfds,writefds,exceptfds,都是位图结构体,用来讲需要内核等待的描述符按照读写异常事件类型设置到对应的对象中,这三个是输入输出型参数,用户将需要内核检测的文件描述符设置到结构体中,传参给内核,内核将检测结果覆盖设置到结构体中反馈回来. 
		typedef struct
	  	{
			#ifdef __USE_XOPEN
	   		__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
			# define __FDS_BITS(set) ((set)->fds_bits)
			#else
	   		__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
			# define __FDS_BITS(set) ((set)->__fds_bits)
			#endif
	  	} fd_set;
	设置fd_set对象中有专门的宏函数可以使用.
		void FD_CLR(int fd, fd_set *set);
		判断fd是否在set位图中
    	int  FD_ISSET(int fd, fd_set *set);
   		将fd设置到set位图中
	    void FD_SET(int fd, fd_set *set);
	    初始化set
	    void FD_ZERO(fd_set *set);

返回值:成功则返回三个就绪事件位图中设置的文件描述符总数,失败返回-1,返回后readfds,writefds,exceptfds,timeout参数需要重新设置.

2.使用select实现多路转接IO

log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <sstream>
#include <time.h>
#include<cstdio>

namespace log_space
{
    enum
    {
        DEBUG,
        INFO,
        ERROR,
        FATAL,
    };
    std::string GetLevelStr(int level)
    {
        std::string str;
        switch (level)
        {
        case DEBUG:
            str = "DEBUG";
            break;
        case INFO:
            str = "INFO";
            break;
        case ERROR:
            str = "ERROR";
            break;
        case FATAL:
            str = "FATAL";
            break;
        default:
            str = "UNKNOW";
            break;
        }
        return str;
    }
    std::string GetTimeStr()
    {
        time_t _time = time(nullptr);
        struct tm *pt = localtime(&_time);
        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%d-%d-%d_%d:%d:%d", pt->tm_year + 1900, pt->tm_mon + 1,
         pt->tm_mday, pt->tm_hour, pt->tm_min, pt->tm_sec);
        std::string str = buffer;
        return str;
    }

    void LogMessage(int level, const char *Format, ...)
    {
        // 级别,时间,pid
        char left[2048];
        std::string level_str = GetLevelStr(level);
        std::string time_str = GetTimeStr();
        pid_t id = getpid();
        snprintf(left, sizeof(left), "[%s][%s][%d]", level_str.c_str(), time_str.c_str(), id);

        char right[2048];
        va_list p;
        va_start(p, Format);
        vsnprintf(right, sizeof(right), Format, p);
        va_end(p);

        printf("%s: %s\n",left,right);
    }
}

sock.hpp


#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstring>
#include <errno.h>
#include "log.hpp"
#include "error.hpp"
#define BACKLOG_DEFAULT 32

namespace sock_space
{
    class Sock
    {
    public:
        Sock() : _fd(-1)
        {
        }
        // 获取套接字
        void Socket()
        {
            _fd = socket(AF_INET, SOCK_STREAM, 0);
            if (_fd < 0)
            {
                log_space::LogMessage(log_space::FATAL, "socket error,code:%d,%s", errno, strerror(errno));
                exit(SOCK_ERR);
            }
            log_space::LogMessage(log_space::INFO,"socket success");
            
        }
        // 绑定
        void Bind(const uint16_t &port)
        {
            struct sockaddr_in client;
            memset(&client, 0, sizeof(client));
            client.sin_addr.s_addr = htons(INADDR_ANY);
            client.sin_family = AF_INET;
            client.sin_port = htons(port);

            if (bind(_fd, (struct sockaddr *)&client, sizeof(client)) == -1)
            {
                log_space::LogMessage(log_space::FATAL, "bind error,code:%d,%s", errno, strerror(errno));
                exit(BIND_ERR);
            }
            log_space::LogMessage(log_space::INFO,"bind success");

        }
        // 监听
        void Listen()
        {
            int n = listen(_fd, BACKLOG_DEFAULT);
            if (n == -1)
            {
                log_space::LogMessage(log_space::FATAL, "listen error,code:%d,%s", errno, strerror(errno));
                exit(LISTEN_ERR);
            }
            log_space::LogMessage(log_space::INFO,"listen success");

        }
        // 获取链接
        int Accept(std::string &client_ip, uint16_t &client_port)
        {
            // 获取io套接字
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            int sockfd = accept(_fd, (struct sockaddr *)&client, &len);
            if (sockfd == -1)
            {
                log_space::LogMessage(log_space::ERROR, "accept fail,relink......");
            }
            log_space::LogMessage(log_space::INFO,"accept success");


            // 输出客户端ip和port
            client_ip = inet_ntoa(client.sin_addr);
            client_port = ntohs(client.sin_port);

            return sockfd;
        }
        // 链接
        int Connect(const string &server_ip, const uint16_t &server_port)
        {
            // 设置服务器地址与端口结构体
            struct sockaddr_in server;
            memset(&server, 0, sizeof(server));
            server.sin_family = AF_INET;
            server.sin_addr.s_addr = inet_addr(server_ip.c_str());
            server.sin_port = htons(server_port);

            // 向服务器申请链接
            int n = connect(_fd, (struct sockaddr *)&server, sizeof(server));
            if (n == -1)
            {
                log_space::LogMessage(log_space::ERROR, "connect fail,relink......");
            }
            log_space::LogMessage(log_space::INFO,"connect success");
            return n;
        }
        int get_sockfd() { return _fd; }
        ~Sock()
        {
            close(_fd);
        }

    private:
        int _fd;
    };
}

多路转接IO的本质是一次性等待多个fd就绪,所以要将每个需要检测是否就绪的文件描述符以及套接字都交给select处理

首先创建套接字,设置监听状态,listen套接字需要等待有客户端链接时才能获取连接,所以需要将listen套接字交给select处理

listen套接字就绪后获取IO套接字,IO套接字需要等待系统接收缓冲区就绪了才能拷贝数据,所以也需要交给select处理

然而,select是以位图的方式汇聚所有要检测是否就绪的fd传参的,而且参数是输入输出型参数,函数返回时,位图会被设置成只有就绪套接字的位图.下次再调用select需要重新将fd重新设置到fd_set位图对象中去.所以每次被设置的fd需要先保存在一个固定数组中,而且这个fd可能是读操作也可能是写操作,所以还要保存它的属性.所以这个数组成员必须是一个结构体.

select_server.hpp

#pragma once
#include "sock.hpp"
#include "log.hpp"
#include "error.hpp"
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <cerrno>
#include <cstring>
#include <queue>

// 默认端口号
static const uint16_t default_port = 8080;
// IO事件信息数组长度
static const int N = (sizeof(fd_set) * 8);
// IO事件信息
struct FdEvent
{
    int _fd;
    short _Event;
    std::string _ip;
    uint16_t _port;
};
enum
{
    READ = 1,
    WRITE,
};

class select_server
{
    typedef FdEvent type_t;

public:
    select_server(uint16_t port = default_port) : _port(port)
    {
        for (int i = 0; i < N; i++)
        {
            _fd_array[i]._fd = -1;
            _fd_array[i]._Event = 0;
            _fd_array[i]._port = 0;
        }
    }

    // 初始化
    void init()
    {
        // 创建套接字
        _sock.Socket();

        // 绑定ip和port
        _sock.Bind(_port);

        // 设置套接字监听状态
        _sock.Listen();
    }

    // 监听就绪处理
    void HandlerListenReady()
    {
        // 获取IO套接字
        std::string client_ip;
        uint16_t client_port = 0;
        int sockfd = _sock.Accept(client_ip, client_port);
        log_space::LogMessage(log_space::INFO,"链接%s::%d成功",client_ip.c_str(),client_port);

        // 检测fd数组空位
        int pos = 1;
        for (; pos < N; pos++)
        {
            if (_fd_array[pos]._fd == -1)
                break;
        }

        // fd_set空间用尽
        if (pos >= N)
        {
            close(sockfd);
        }

        // 有空间,将IO套接字设置进fd数组
        _fd_array[pos]._fd = sockfd;
        _fd_array[pos]._Event = READ;
        _fd_array[pos]._ip = client_ip;
        _fd_array[pos]._port = client_port;
    }

    // 读就绪处理
    void HandlerReadReady(int index)
    {
        char buffer[1024];
        int n = read(_fd_array[index]._fd, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout <<_fd_array[index]._ip<<"::"<<_fd_array[index]._port<<"# "<<buffer << std::endl;
            std::string sdata="server > ";
            sdata+=buffer;
            write(_fd_array[index]._fd, sdata.c_str(), sdata.size());
        }
        else
        {
            if (n < 0)
            {
                log_space::LogMessage(log_space::ERROR, "read error,error code:%d,error info%s", errno, strerror(errno));
            }
            else
            {
                log_space::LogMessage(log_space::INFO, "client quit");
            }
            close(_fd_array[index]._fd);
            _fd_array[index]._fd = -1;
            _fd_array[index]._Event = 0;
            _fd_array[index]._ip.clear();
            _fd_array[index]._port = 0;
        }
    }

   /*  // 写就绪处理
    void HandlerWriteReady(int index)
    {
        std::string recv_data = _data.front();
        _data.pop();
        write(_fd_array[index]._fd, recv_data.c_str(), recv_data.size());
    } */

    // 统一处理就绪事件
    void HandlerListenFd(fd_set &rfds, fd_set &wrfds)
    {
        // 检测就绪的IO事件
        for (int i = 0; i < N; i++)
        {
            int fd = _fd_array[i]._fd;
            short event = _fd_array[i]._Event;

            // 监听套接字就绪
            if (fd == _sock.get_sockfd() && FD_ISSET(fd, &rfds))
            {
                log_space::LogMessage(log_space::INFO,"fd->%d,监听就绪");
                HandlerListenReady();
            }
            // 读套接字就绪
            else if (fd != -1 && FD_ISSET(fd, &rfds))
            {
                log_space::LogMessage(log_space::INFO,"fd->%d,读就绪");
                HandlerReadReady(i);
            }
            /* else if (fd != -1 && FD_ISSET(fd, &wrfds))
            {
                log_space::LogMessage(log_space::INFO,"fd->%d,写就绪");
                HandlerWriteReady(i);
            } */
            else
            {}
        }
    }

    // 运行
    void start()
    {
        // listenfd加入数组
        _fd_array[0]._fd = _sock.get_sockfd();
        _fd_array[0]._Event = READ;

        while (true)
        {
            // 将需要检测的fd交给内核
            fd_set rfds, wrfds;
            timeval t;
            t.tv_sec = 5;
            int maxfd = 0;
            FD_ZERO(&rfds);
            FD_ZERO(&wrfds);
            for (int i = 0; i < N; i++)
            {
                if (_fd_array[i]._fd != -1 && (_fd_array[i]._Event & READ))
                {
                    FD_SET(_fd_array[i]._fd, &rfds);
                }

                if (_fd_array[i]._fd != -1 && (_fd_array[i]._Event & WRITE))
                {
                    FD_SET(_fd_array[i]._fd, &wrfds);
                }

                if (maxfd < _fd_array[i]._fd)
                    maxfd = _fd_array[i]._fd;
            }

            //打印fd
            std::cout<<"fd: ";
            for (auto a : _fd_array)
            {
                if (a._fd != -1)
                    std::cout << a._fd << " ";
            }
            std::cout << std::endl;

            // 每次阻塞等待5秒钟
            int s = select(maxfd+1, &rfds, &wrfds, nullptr, nullptr);
            // 就绪
            if (s > 0)
            {
                HandlerListenFd(rfds, wrfds);
            }
            else if (s == 0)
            {
                log_space::LogMessage(log_space::INFO, "timeout error code:%d,error info:%s",errno,strerror(errno));
            }
            else
            {
                log_space::LogMessage(log_space::ERROR, "select error,error code: %d error info: %s", errno, strerror(errno));
            }
        }
    }

private:
    uint16_t _port;

    sock_space::Sock _sock;

    type_t _fd_array[N];

    std::queue<std::string> _data;
};

select的优缺点

每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便.
每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
每次检测数组中那个fd就绪需要遍历这个数组.
select支持的文件描述符数量太小.

四、多路转接IO之poll应用

1.poll接口介绍


#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);


功能:功能:程序一次等待多个文件描述符,这个文件描述符可以是用来链接的或者是IO数据的,直到这个文件描述符的IO事件为就绪状态.

参数:fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.
	pollfd结构
		struct pollfd
		{
		 	int fd; /* file descriptor */
		 	short events; /* requested events */
		 	short revents; /* returned events */
		}
	nfds表示fds数组的长度.
	timeout表示poll函数的超时时间, 单位是毫秒(ms).

返回值:等于0, 表示poll函数等待超时;大于0, 表示poll由于监听的文件描述符就绪而返回,小于0,表示出错

2.使用poll实现多路转接IO

除了server.hpp改变,其他的与select相同
poll_server.hpp

#pragma once
#include "sock.hpp"
#include "log.hpp"
#include "error.hpp"
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <cerrno>
#include <cstring>
#include <queue>
#include <poll.h>

// 默认端口号
static const uint16_t default_port = 8080;
// IO事件信息数组长度
static const int N = 4096;

class poll_server
{
    typedef struct pollfd type_t;

public:
    poll_server(uint16_t port = default_port) : _port(port)
    {
    }

    // 初始化
    void init()
    {
        // 创建套接字
        _sock.Socket();

        // 绑定ip和port
        _sock.Bind(_port);

        // 设置套接字监听状态
        _sock.Listen();

        _fd_array = new type_t[N];
        for (int i = 0; i < N; i++)
        {
            _fd_array[i].fd = -1;
            _fd_array[i].events = 0;
            _fd_array[i].revents = 0;
        }
    }

    // 监听就绪处理
    void HandlerListenReady()
    {
        // 获取IO套接字
        std::string client_ip;
        uint16_t client_port = 0;
        int sockfd = _sock.Accept(client_ip, client_port);
        log_space::LogMessage(log_space::INFO, "链接%s::%d成功", client_ip.c_str(), client_port);

        // 检测fd数组空位
        int pos = 1;
        for (; pos < N; pos++)
        {
            if (_fd_array[pos].fd == -1)
                break;
        }

        // fd_set空间用尽
        if (pos >= N)
        {
            close(sockfd);
        }

        // 有空间,将IO套接字设置进fd数组
        _fd_array[pos].fd = sockfd;
        _fd_array[pos].events = POLLIN;
    }

    // 读就绪处理
    void HandlerReadReady(int index)
    {
        char buffer[1024];
        int n = read(_fd_array[index].fd, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;
            std::string sdata = "server > ";
            sdata += buffer;
            write(_fd_array[index].fd, sdata.c_str(), sdata.size());
        }
        else
        {
            if (n < 0)
            {
                log_space::LogMessage(log_space::ERROR, "read error,error code:%d,error info%s", errno, strerror(errno));
            }
            else
            {
                log_space::LogMessage(log_space::INFO, "client quit");
            }
            close(_fd_array[index].fd);
            _fd_array[index].fd = -1;
            _fd_array[index].events = 0;
        }
    }

    /*  // 写就绪处理
     void HandlerWriteReady(int index)
     {
         std::string recv_data = _data.front();
         _data.pop();
         write(_fd_array[index]._fd, recv_data.c_str(), recv_data.size());
     } */

    // 统一处理就绪事件
    void HandlerListenFd()
    {
        // 检测就绪的IO事件
        for (int i = 0; i < N; i++)
        {
            int fd = _fd_array[i].fd;
            short event = _fd_array[i].events;

            // 监听套接字就绪
            if (fd == _sock.get_sockfd() && _fd_array[i].revents & POLLIN)
            {
                log_space::LogMessage(log_space::INFO, "fd->%d,监听就绪");
                HandlerListenReady();
            }
            // 读套接字就绪
            else if (fd != _sock.get_sockfd() && _fd_array[i].revents & POLLIN)
            {
                log_space::LogMessage(log_space::INFO, "fd->%d,读就绪");
                HandlerReadReady(i);
            }
            /* else if (fd != -1 && FD_ISSET(fd, &wrfds))
            {
                log_space::LogMessage(log_space::INFO,"fd->%d,写就绪");
                HandlerWriteReady(i);
            } */
            else
            {
            }
        }
    }

    // 运行
    void start()
    {
        // listenfd加入数组
        _fd_array[0].fd = _sock.get_sockfd();
        _fd_array[0].events = POLLIN;

        while (true)
        {

            // 每次阻塞等待5秒钟
            int s = poll(_fd_array, N, -1);
            std::cout << "fd> ";
            for (int i = 0; i < N; i++)
            {
                if (_fd_array[i].fd != -1)
                    std::cout << _fd_array[i].fd << " ";
            }
            std::cout << std::endl;

            // 判断是否就绪
            if (s > 0)
            {
                HandlerListenFd();
            }
            else if (s == 0)
            {
                log_space::LogMessage(log_space::INFO, "timeout error code:%d,error info:%s", errno, strerror(errno));
            }
            else
            {
                log_space::LogMessage(log_space::ERROR, "select error,error code: %d error info: %s", errno, strerror(errno));
            }
        }
    }

private:
    uint16_t _port;

    sock_space::Sock _sock;

    type_t *_fd_array;

    std::queue<std::string> _data;
};

在这里插入图片描述

3.poll特点

不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现.
pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便.
poll并没有最大数量限制 (但是数量过大后性能也是会下降).

poll的缺点
poll中监听的文件描述符数目增多时和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.

五、多路转接IO之epoll应用

1.epoll接口介绍

int epoll_create(int size);
功能:创建一个epoll对象,epoll是一个管理模型,也就是结构体.
参数:size任意大于零的数字
返回值:成功则返回一个非负的文件描述符,失败返回-1并设置错误码.

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
功能:对被文件描述符epfd引用的epoll对象进行op增删改操作.增删改内容包括文件描述符fd和event事件.
参数:epfd,epoll_create的返回值
	op:控制操作类型,有EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD,分别对应增删改
	fd:被epoll对象操作的文件描述符
	event:被epoll对象操作的文件操作属性,是一个结构体.
		typedef union epoll_data {
               void        *ptr;
               int          fd;  //一般只用这个
               uint32_t     u32;
               uint64_t     u64;
           } epoll_data_t;

           struct epoll_event {
               uint32_t     events;      /* Epoll events */
               epoll_data_t data;        /* User data variable */
           };
返回值:成功返回0,失败返回-1,错误码被设置.

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
功能:等待被epfd引用的epoll对象中的所有事件,最多返回maxevents个可用事件.
参数:epfd是epoll_create的返回值.
	events用于接收内核就绪事件的events结构体数组指针.
	maxevents:最多返回事件的总数
	timeout:epoll_wait最大阻塞等待的时间,单位毫秒
返回值:0代表等待超时,>0代表就绪事件的fd的个数,失败返回-1

2.epoll原理

操作系统内部维护了一个红黑树阻塞结构和一个就绪队列结构体模型,结构体模型以及结构操作方法共同组成了epoll模型,当调用epoll_create时os创建一个epoll对象,并返回一个与epoll对象建立映射关系的文件描述符epfd;

有需要管理的fd时,就调用epoll_ctl,将需要管理的fd以及事件性质作为节点添加到epoll对象管理的红黑树结构体中等待

当网卡收到网络发送过来的数据后向连接了CPU的针脚发送电流信号,产生硬件中断信号,CPU收到信号后查询中断向量表,中断向量表是一个函数指针数组,数组成员为硬件驱动程序,调用对应的硬件驱动将硬件信息拷贝到内核缓冲区中,并设置回调函数,将报文通过IP协议拷贝至传输层文件缓冲区,最后系统调用回调函数将就绪事件节点插入就绪队列.

通过调用epoll_wait将就绪文件缓冲区的数据拷贝至应用层用户缓冲区.操作系统只需要判断就绪队列是否为空,就可以判断有没有就绪的IO事件.

3.用epoll实现多路转接IO

log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <sstream>
#include <time.h>
#include<cstdio>

namespace log_space
{
    enum
    {
        DEBUG,
        INFO,
        ERROR,
        FATAL,
    };
    std::string GetLevelStr(int level)
    {
        std::string str;
        switch (level)
        {
        case DEBUG:
            str = "DEBUG";
            break;
        case INFO:
            str = "INFO";
            break;
        case ERROR:
            str = "ERROR";
            break;
        case FATAL:
            str = "FATAL";
            break;
        default:
            str = "UNKNOW";
            break;
        }
        return str;
    }
    std::string GetTimeStr()
    {
        time_t _time = time(nullptr);
        struct tm *pt = localtime(&_time);
        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%d-%d-%d_%d:%d:%d", pt->tm_year + 1900, pt->tm_mon + 1,
         pt->tm_mday, pt->tm_hour, pt->tm_min, pt->tm_sec);
        std::string str = buffer;
        return str;
    }

    void LogMessage(int level, const char *Format, ...)
    {
        // 级别,时间,pid
        char left[2048];
        std::string level_str = GetLevelStr(level);
        std::string time_str = GetTimeStr();
        pid_t id = getpid();
        snprintf(left, sizeof(left), "[%s][%s][%d]", level_str.c_str(), time_str.c_str(), id);

        char right[2048];
        va_list p;
        va_start(p, Format);
        vsnprintf(right, sizeof(right), Format, p);
        va_end(p);

        printf("%s: %s\n",left,right);
    }
}

util.hpp

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <cstring>
#include <ctype.h>

namespace util_space
{
 


    void split(const std::string &str, const char *sep, std::vector<std::string> &strv)
    {
        int start = 0;
        while (start < str.size())
        {
            // 找分隔符位置
            auto pos = str.find(sep, start);
            if (pos == std::string::npos)
                break;

            // 分割字符串到strv
            std::string result = str.substr(start, pos - start);
            strv.push_back(result);

            // 更新查找sep的位置
            start = pos + strlen(sep);
        }

        // 最后一段字符可能不包含分隔符
        if (start < str.size())
        {
            strv.push_back(str.substr(start, str.size() - start));
        }
    }
}

protocol.hpp

#pragma once
#include "log.hpp"
#include "util.hpp"
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <jsoncpp/json/json.h>

namespace protocol_space
{
// 分隔符
#define SEPARATOR "\r\n"
#define SEP_LEN strlen(SEPARATOR)

    // 接收数据包
    int GetPackage(std::string &str, std::string &package)
    {
        // 查找第一个分隔符
        auto pos = str.find(SEPARATOR);
        if (pos == std::string::npos)
            return 0;

        // 获取报头
        std::string header = str.substr(0, pos);

        // 获取有效载荷长度
        int len = atoi(header.c_str());

        // 确定完整数据包长度
        int overall_len = len + header.size() + SEP_LEN * 2;

        // 判断str的长度是否达到完整数据包长度
        if (str.size() < overall_len)
            return 0;

        // 截取完整数据包到str中去
        package = str.substr(0, overall_len);

        // 删除str中已经被复制截取的部分
        str.erase(0, overall_len);
        return len;
    }

    // 去除报头
    void RemoveHeader(std::string &str, int len)
    {
        std::cout << "去除报头之前: "<<std::endl;
        std::cout<< str << std::endl;
        str = str.substr(str.size() - SEP_LEN - len, len);
        std::cout << "去除报头之后: "<<std::endl;
        std::cout<< str << std::endl;
    }

    // 添加报头
    void AddHeader(std::string &str)
    {
        std::cout << "添加报头之前: "<<std::endl;
        std::cout<< str << std::endl;
        std::string strlen = std::to_string(str.size());
        str = strlen + SEPARATOR + str + SEPARATOR;
        std::cout << "添加报头之后: "<<std::endl;
        std::cout << str << std::endl;
    }
    // 需求
    class request
    {
    public:
        request() : _x(-1), _y(-1), _op(' ')
        {
        }
        request(int x, int y, char op) : _x(x), _y(y), _op(op)
        {
        }

        // 反序列化
        void deserializetion(const std::string &str)
        {

#ifdef MYSELF
            // 分割
            std::vector<std::string> strv;
            util_space::split(str, " ", strv);
            for (auto e : strv)
            {
                std::cout << e << " ";
            }
            std::cout << std::endl;

            // 转换
            _x = atoi(strv[0].c_str());
            _op = strv[1][0];
            _y = atoi(strv[2].c_str());
#else
            // 从将序列化信息分割填充在root结构体中
            Json::Value root;
            Json::Reader read;
            read.parse(str, root);

            // 从root中提取结构化信息
            _x = root["x"].asInt();
            _op = root["op"].asInt();
            _y = root["y"].asInt();
#endif

        }

        void print()
        {
            std::cout << "x: " << _x << std::endl;
            std::cout << "y: " << _y << std::endl;
            std::cout << "op: " << _op << std::endl;
        }
        // 序列化
        void serializetion(std::string &str)
        {
#ifdef MYSELF
            str = std::to_string(_x) + ' ' + _op + ' ' + std::to_string(_y);
#else
            // 将信息填充在root结构中
            Json::Value root;
            root["x"] = _x;
            root["y"] = _y;
            root["op"] = _op;

            // 用root结构体将信息序列化到str中
            Json::StyledWriter writer;
            // Json::FastWriter writer;
            str = writer.write(root);
#endif
        }

        ~request()
        {
        }

    public:
        int _x;
        int _y;
        char _op;
    };

    // 回应
    class response
    {
    public:
        response() : _result(-1), _code(-1)
        {
        }
        response(int result, int code) : _result(result), _code(code)
        {
        }
        // 反序列化
        void deserializetion(const std::string &str)
        {
#ifdef MYSELF
            // 分割
            std::vector<std::string> strv;
            util_space::split(str, " ", strv);

            // 转换
            _result = atoi(strv[0].c_str());
            _code = atoi(strv[1].c_str());
#else
            // 从将序列化信息分割填充在root结构体中
            Json::Value root;
            Json::Reader reader;
            reader.parse(str, root);

            // 从root中提取结构化信息
            _result = root["result"].asInt();
            _code = root["code"].asInt();
#endif

        }

        // 序列化
        void serializetion(std::string &str)
        {
#ifdef MYSELF
            str = std::to_string(_result) + ' ' + std::to_string(_code);
#else
            // 将信息填充在root结构中
            Json::Value root;
            root["result"] = _result;
            root["code"] = _code;

            // 用root结构体将信息序列化到str中
            Json::StyledWriter writer;
            // Json::FastWriter writer;
            str = writer.write(root);
#endif
        }
        void print()
        {
            std::cout << "result: " << _result << std::endl;
            std::cout << "code: " << _code << std::endl;
        }
        ~response()
        {
        }

    public:
        int _result;
        int _code;
    };
}

sock.hpp


#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstring>
#include <errno.h>
#include "log.hpp"
#include "error.hpp"
#define BACKLOG_DEFAULT 32

namespace sock_space
{
    class Sock
    {
    public:
        Sock() : _fd(-1)
        {
        }
        // 获取套接字
        void Socket()
        {
            _fd = socket(AF_INET, SOCK_STREAM, 0);
            if (_fd < 0)
            {
                log_space::LogMessage(log_space::FATAL, "socket error,code:%d,%s", errno, strerror(errno));
                exit(SOCK_ERR);
            }
        }
        // 绑定
        void Bind(const uint16_t &port)
        {
            struct sockaddr_in client;
            memset(&client, 0, sizeof(client));
            client.sin_addr.s_addr = htons(INADDR_ANY);
            client.sin_family = AF_INET;
            client.sin_port = htons(port);

            if (bind(_fd, (struct sockaddr *)&client, sizeof(client)) == -1)
            {
                log_space::LogMessage(log_space::FATAL, "bind error,code:%d,%s", errno, strerror(errno));
                exit(BIND_ERR);
            }
            log_space::LogMessage(log_space::INFO,"bind success");

        }
        // 监听
        void Listen()
        {
            int n = listen(_fd, BACKLOG_DEFAULT);
            if (n == -1)
            {
                log_space::LogMessage(log_space::FATAL, "listen error,code:%d,%s", errno, strerror(errno));
                exit(LISTEN_ERR);
            }
            log_space::LogMessage(log_space::INFO,"listen success");

        }
        // 获取链接
        int Accept(std::string &client_ip, uint16_t &client_port,int& err)
        {
            // 获取io套接字
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            int sockfd = accept(_fd, (struct sockaddr *)&client, &len);
            if (sockfd == -1&& (errno!=EAGAIN||errno!=EWOULDBLOCK)&&errno!=EINTR)
            {
                log_space::LogMessage(log_space::ERROR, "accept fail,error code:%d,error info:%s",errno,strerror(errno));
            }
            err=errno;

            // 输出客户端ip和port
            client_ip = inet_ntoa(client.sin_addr);
            client_port = ntohs(client.sin_port);
            

            return sockfd;
        }
        // 链接
        int Connect(const string &server_ip, const uint16_t &server_port)
        {
            // 设置服务器地址与端口结构体
            struct sockaddr_in server;
            memset(&server, 0, sizeof(server));
            server.sin_family = AF_INET;
            server.sin_addr.s_addr = inet_addr(server_ip.c_str());
            server.sin_port = htons(server_port);

            // 向服务器申请链接
            int n = connect(_fd, (struct sockaddr *)&server, sizeof(server));
            if (n == -1)
            {
                log_space::LogMessage(log_space::ERROR, "connect fail,relink......");
            }
            log_space::LogMessage(log_space::INFO,"connect success");
            return n;
        }
        int ListenFd() { return _fd; }
        ~Sock()
        {
            close(_fd);
        }

    private:
        int _fd;
    };
}

epoll_client.cc

#include "sock.hpp"
#include "protocol.hpp"
#include <cstdlib>
enum Stat
{
    LEFT,
    MIDDLE,
    RIGHT,
};

protocol_space::request FormatStr(std::string str)
{
    Stat s = LEFT;
    std::string x, y, op;
    for (int i = 0; i < str.size();)
    {
        switch (s)
        {
        case LEFT:
        {
            if (isdigit(str[i]))
            {
                x.push_back(str[i++]);
            }
            else
            {
                s = MIDDLE;
            }
        }
        break;
        case MIDDLE:
        {
            op = str[i++];
            s = RIGHT;
        }
        break;
        default:
        {
            y.push_back(str[i++]);
        }
        break;
        }
    }
    protocol_space::request req;
    req._x = atoi(x.c_str());
    req._y = atoi(y.c_str());
    req._op = op[0];
    return req;
}
int main(int argc, char *argv[])
{
    // 创建套接字
    sock_space::Sock _sock;
    _sock.Socket();

    // 链接服务器
    int N = 5;
    std::string serverip = argv[1];
    uint16_t serverport = atoi(argv[2]);

    while (_sock.Connect(serverip, serverport) == -1)
    {
        --N;
        if (N <= 0)
        {
            exit(0);
        }
    }

    while (true)
    {
        // 获取用户输入
        cout << "输入操作 ";
        std::string str;
        getline(std::cin, str);

        // 格式化
        protocol_space::request req = FormatStr(str);

        // 序列化
        std::string sendstr;
        req.serializetion(sendstr);

        // 加报头
        protocol_space::AddHeader(sendstr);

        // 发送数据
        ssize_t sn = send(_sock.ListenFd(), sendstr.c_str(), sendstr.size(), 0);
        if (sn < 0)
        {
            log_space::LogMessage(log_space::ERROR, "send error,error code:%d,error info:%s", errno, strerror(errno));
        }

        // 接收数据
        std::string recvstr;
        std::string buffer(1024, '0');
        ssize_t rn = recv(_sock.ListenFd(), (char *)buffer.c_str(), buffer.size(), 0);
        if (rn > 0)
        {
            buffer.resize(rn);
        }
        else if (rn == 0)
        {
            log_space::LogMessage(log_space::INFO, "server quit");
            return 0;
        }
        else
        {
            log_space::LogMessage(log_space::ERROR, "recv error,error code:%d,error info:%s", errno, strerror(errno));
            exit(-1);
        }

        int n = protocol_space::GetPackage(buffer, recvstr);
        if (n == 0)
            continue;
        else
        {
            // 去报头
            protocol_space::RemoveHeader(recvstr, n);

            // 反序列化
            protocol_space::response resp;
            resp.deserializetion(recvstr);

            // 输出信息
            std::cout << "server recv# "
                      << "result=" << resp._result << " "
                      << "code=" << resp._code << std::endl;
        }
    }
    return 0;
}

epoll_server.cc

#include "epoll_server.hpp"
#include "protocol.hpp"
#include <memory>
#include <cstdio>
#include<stdlib.h>

// 信息处理
protocol_space::response Calculate(const protocol_space::request &req)
{
    // 计算
    protocol_space::response resp;
    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        resp._code = 0;
        break;
    case '-':
        resp._result = req._x - req._y;
        resp._code = 0;
        break;
    case '*':
        resp._result = req._x * req._y;
        resp._code = 0;
        break;
    case '/':
    {
        if (req._y == 0)
        {
            resp._result = 0;
            resp._code = -1;
        }
        else
        {
            resp._result = req._x / req._y;
            resp._code=0;
        }
    }
    break;
    case '%':
    {
        if (req._y == 0)
        {
            resp._result = 0;
            resp._code = -1;
        }
        else
        {
            resp._result = req._x % req._y;
            resp._code=0;
        }
    }
    break;
    default:
    {
        resp._result=0;
        resp._code=-1;
    }
    break;
    }
    return resp;

}
int main(int argc, char *argv[])
{
    // 获取port
    uint16_t port = atoi(argv[1]);

    // 运行server
    std::unique_ptr<EpollServer> es(new EpollServer(Calculate, port));
    es->Init();
    es->Dispatcher();
}

epoll_server.hpp

#pragma once
#include "sock.hpp"
#include "log.hpp"
#include "error.hpp"
#include "epoll.hpp"
#include "protocol.hpp"
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <cerrno>
#include <cstring>
#include <unordered_map>
#include <functional>

// 默认端口号
static const uint16_t default_port = 8080;
// IO事件信息数组长度
static const int gnum = 4096;
// IO事件信息类型
typedef epoll_event type_t;

// 回调类型
class EpollServer;
class Connect;
using callback_t = std::function<void(Connect *)>;

using func_t = function<protocol_space::response(const protocol_space::request &req)>;

// 链接信息类
class Connect
{
public:
    Connect(int fd, uint32_t events) : _fd(fd), _events(events)
    {
    }
    void Register(callback_t recv, callback_t send, callback_t exception)
    {
        _recv = recv;
        _send = send;
        _exception = exception;
    }
    ~Connect()
    {
    }

public:
    // IO事件信息
    int _fd;
    uint32_t _events;

    // 用户缓冲区
    std::string _inbuffer;
    std::string _outbuffer;

    // 客户端信息
    std::string _clientip;
    uint16_t _clientport;

    // IO处理函数
    callback_t _recv;
    callback_t _send;
    callback_t _exception;
};

class EpollServer
{
public:
    EpollServer(func_t func, uint16_t port = default_port) : _func(func), _port(port)
    {
    }

    // 初始化
    void Init()
    {
        // 创建套接字
        _sock.Socket();

        // 绑定ip和port
        _sock.Bind(_port);

        // 设置套接字监听状态
        _sock.Listen();

        // 创建epoll对象
        _epoll.EpollCreate();

        // listenfd添加到内核
        AddLinkInformation(_sock.ListenFd(), EPOLLIN | EPOLLET);
    }

    // 事件派发器
    void Dispatcher()
    {
        while (true)
        {
            int timeout = 1000;
            LoopOnce(timeout);
        }
    }

    // 循环一次
    void LoopOnce(int timeout)
    {
        int readynumber = _epoll.EpollWait(_readyevents, gnum, timeout);

        // 派发事件
        for (int i = 0; i < readynumber; i++)
        {
            int fd = _readyevents[i].data.fd;
            uint32_t event = _readyevents[i].events;

            log_space::LogMessage(log_space::INFO, "当前正在处理%d上的%s", fd, (event & EPOLLIN) ? "EPOLLIN" : "OTHER");

            // 文件错误或被挂起
            if ((event & EPOLLERR) || (event & EPOLLHUP))
            {
                log_space::LogMessage(log_space::ERROR, "fd->%d exception,error code:%d,error info:%s", errno, strerror(errno));
            }

            // 读事件就绪
            if ((event & EPOLLIN) && ConnIsExist(fd))
            {
                _connects[fd]->_recv(_connects[fd]);
            }

            // 写事件就绪
            if ((event & EPOLLOUT) && ConnIsExist(fd))
            {
                _connects[fd]->_send(_connects[fd]);
            }
        }
    }

    // 监听就绪处理
    void AcceptLink(Connect *conn)
    {
        // 获取全部链接
        do
        {
            std::string clientip;
            uint16_t clientport;
            int err = 0;

            // 获取链接
            int sockfd = _sock.Accept(clientip, clientport, err);
            if (sockfd > 0)
            {
                log_space::LogMessage(log_space::INFO, "链接%s::%d成功", clientip.c_str(), clientport);
                // 添加I事件fd到内核
                AddLinkInformation(sockfd, EPOLLIN | EPOLLET, clientip, clientport);
            }
            else
            {
                // 套接字被标记为非阻塞,并且不存在可接受的连接
                if ((err & EAGAIN) || (err & EWOULDBLOCK))
                    break;

                // 系统调用被在有效连接到达之前捕获的信号中断;
                else if (err & EINTR)
                    continue;

                // 出错
                else
                {
                    log_space::LogMessage(log_space::ERROR, "accept error,error code:%d,error info", err, strerror(err));
                    continue;
                }
            }

        } while (conn->_events & EPOLLET);
    }

    // 协议处理
    void ProtocolHandler(Connect *conn)
    {

        // 分割并处理报文
        bool quit = false;
        while (!quit)
        {
            // 提取完整报文
            std::string package;
            int len = protocol_space::GetPackage(conn->_inbuffer, package);
            if (len > 0)
            {
                // 去除报头
                protocol_space::RemoveHeader(package, len);

                // 反序列化
                protocol_space::request req;
                req.deserializetion(package);

                // 业务处理
                protocol_space::response resp = _func(req);

                // 序列化
                std::string sendstr;
                resp.serializetion(sendstr);

                // 添加报头
                protocol_space::AddHeader(sendstr);

                // 待发送报文,添加到connect缓冲区
                conn->_outbuffer += sendstr;
            }
            else
            {
                quit = true;
            }
        }
    }

    // 读数据
    bool RecvHelper(Connect *conn)
    {
        bool ret = true;
        do
        {
            // 读取所有报文
            char buffer[1024];
            int n = recv(conn->_fd, buffer, sizeof(buffer) - 1, 0);

            // 读到了数据
            if (n > 0)
            {
                buffer[n] = 0;
                conn->_inbuffer += buffer;
            }

            // 数据读到了结尾
            else if (n == 0)
            {
                log_space::LogMessage(log_space::ERROR, "client quit");
                conn->_exception(conn);
                ret = false;
                break;
            }

            // 异常情况
            else
            {
                // 接收缓冲区没有就绪
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;

                // 在系统调用结束之前被信号打断
                else if (n == EINTR)
                    continue;

                // 出错
                else
                {
                    log_space::LogMessage(log_space::ERROR, "recv error,error code:%d,error info:%s", errno, strerror(errno));
                    conn->_exception(conn);
                    ret = false;
                    break;
                }
            }
        } while (conn->_events & EPOLLET);
        return ret;
    }

    // 读就绪处理
    void Recv(Connect *conn)
    {
        // 读数据
        if (RecvHelper(conn) == false)
            return;

        // 协议处理
        ProtocolHandler(conn);

        // 立即发送
        if (!conn->_outbuffer.empty())
            conn->_send(conn);
    }

    // 写就绪处理
    void Send(Connect *conn)
    {
        do
        {

            // 发送数据
            ssize_t n = send(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);

            // 发送成功
            if (n > 0)
            {
                // 删除已经发送的数据
                conn->_outbuffer.erase(0, n);

                // 发送缓冲区已清空
                if (conn->_outbuffer.empty())
                    break;
            }
            else
            {
                // 对端接收缓冲区未就绪
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    break;
                }

                // 被信号中断
                else if (errno == EINTR)
                    continue;

                // 出错
                else
                {
                    log_space::LogMessage(log_space::ERROR, "send error,error code:%d,error info:%s", errno, strerror(errno));
                    conn->_exception(conn);
                    return;
                }
            }
        } while (conn->_events & EPOLLET);

        // 没发完,加入等待队列
        if (!conn->_outbuffer.empty())
        {
            EnableWrite(conn, true);
        }

        // 从等待队列去除
        else if(conn->_events & EPOLLOUT)
        {
            EnableWrite(conn, false);
        }
    }

    // 异常处理
    void Exception(Connect *conn)
    {
        // 从内核移除IO事件
        _epoll.EpollDel(conn->_fd, conn->_events);

        // 关闭fd
        close(conn->_fd);

        // 从链接管理中移除链接信息
        _connects.erase(conn->_fd);

        // 析构链接信息节点对象空间
        delete conn;
        conn == nullptr;

        log_space::LogMessage(log_space::INFO, "exception handler success");
    }

    // 增加链接信息
    void AddLinkInformation(int fd, uint32_t event, const std::string &ip = "127.0.0.1", const uint16_t &port = default_port)
    {
        // 设置fd为非阻塞状态
        // if(event & EPOLLET)

        int fl = fcntl(fd, F_GETFL);
        if (fl < 0)
        {
            log_space::LogMessage(log_space::ERROR, "fcntl error,error code:%d,error info:%s", errno, strerror(errno));
            return;
        }
        int n = fcntl(fd, F_SETFL, fl | O_NONBLOCK);
        if (n < 0)
        {
            log_space::LogMessage(log_space::ERROR, "fcntl error,error code:%d,error info:%s", errno, strerror(errno));
            return;
        }

        // 构造链接信息类对象
        Connect *conn = new Connect(fd, event);
        conn->_clientip = ip;
        conn->_clientport = port;
        if (fd == _sock.ListenFd())
        {
            conn->_recv = std::bind(&EpollServer::AcceptLink, this, std::placeholders::_1);
            conn->_send = nullptr;
            conn->_exception = nullptr;
        }
        else
        {
            conn->_recv = std::bind(&EpollServer::Recv, this, std::placeholders::_1);
            conn->_send = std::bind(&EpollServer::Send, this, std::placeholders::_1);
            conn->_exception = std::bind(&EpollServer::Exception, this, std::placeholders::_1);
        }

        // 记录链接信息对象
        _connects.insert(std::make_pair(fd, conn));

        // 向内核添加信息
        _epoll.EpollAdd(fd, event);
    }

    // 判断fd是否存在
    bool ConnIsExist(int fd)
    {
        return _connects.find(fd) != _connects.end();
    }

    // 使能write
    void EnableWrite(Connect *conn, bool write)
    {
        conn->_events = EPOLLIN | (write ? EPOLLOUT : 0) | EPOLLET;
        _epoll.EpollMod(conn->_fd, conn->_events);

        if (conn->_events & EPOLLOUT)
        {
            log_space::LogMessage(log_space::INFO, "write enable ");
        }
        else
        {
            log_space::LogMessage(log_space::INFO, "close write enable");
        }
    }

    ~EpollServer()
    {
        close(_sock.ListenFd());
        _epoll.Close();
    }

private:
    // 端口号
    uint16_t _port;

    // 网络套接字类
    sock_space::Sock _sock;

    // 就绪链接信息缓冲区
    type_t _readyevents[gnum];

    // IO轮巡类
    Epoll _epoll;

    // 链接类
    std::unordered_map<int, Connect *> _connects;

    // 业务处理回调
    func_t _func;
};

4.reactor服务器概念

reactor服务器:基于多路转接的包含dispatcher(事件派发器),Connection(链接管理器)半同步半异步的服务器.

链接管理器:
用Connect对象管理链接.Connect对象在链接或者IO事件到来时创建,Connect对象包含链接或者IO事件的文件描述符,事件类型,客户端IP和端口号,事件所对应的回调函数,将Connect对象中的fd作为key,Connect对象作为value映射到哈希表中管理.在有链接或者IO事件到来或者关闭的时候插入删除或者修改节点.

超时管理:
给每个Connect设置time_t lastime(最后访问时间),设置初始值为当前时间time(nullptr),并设置超时时间time_t timeout;在每次链接访问即将结束的时候更新访问时间为当前时间.事件派发一次结束后检测一次,如果lasttime+timeout>time(nullptr)则证明还没有超时,否则为超时,直接将时间按照异常处理即可.

事件派发器:
用来轮巡检测多个IO或者是链接事件,包括读/写/链接获取事件,并负责链接的超时管理.reactor用epoll结构管理多个IO事件或者链接,在初始化的时候创建epoll对象,在有链接或者IO事件到来或者关闭的时候插入删除或者修改节点.将就绪的节点用epoll_events结构体数组管理,统一派发,在析构的时候释放.

5.epoll工作模式

epoll工作模式有两种:level triggered(水平触发)模式,简称LT模式和edge triggered(边缘触发)模式,简称ET模式;

LT模式:,传输层接收缓冲区或者链接就绪队列中有数据未被取走,epoll会一直发送事件就绪通知,读取数据时可以不用关心数据一次拷贝不完,因为下一次epoll还会通知,这种模式下的IO效率是比较低的,epoll每次通知相当于一次调用返回,每次返回意味着一次系统调用,时间成本比较高.但是向应用层响应的效率比较高.适用于高响应的应用场景

ET模式:事件就绪时,epoll不会像LT模式一样一直通知,而是只通知一次,这就要求程序编写时,读数据要设置循环读取,这样会引发一个问题,如果底层没有数据就绪时,读系统调用会阻塞,服务器将会停止运行,影响其他模块正常运行.所以要给文件描述符设置非阻塞状态,虽然recv和send这样的系统调用有非阻塞和阻塞选项但是accept没有这样的选项,所以还是用fcntl文件描述符控制系统调用统一设置文件描述符.这样在底层没有数据的时候就不会阻塞在读系统调用上了,但是这种情况不能将返回值为-1的情况一概而论,当错误码为EAGAIN和EWOULDBLOCK时,意思是fd被设置非阻塞,但是内核接收缓冲区没有就绪数据,说明数据读完了,要跳出循环继续其他任务.

IO高效性
一次性将内核接收缓冲区数据接收,其本质是快速应用层在快速清空内核接收缓冲区,给对端提供更大的滑动窗口或者拥塞窗口,提高数据吞吐量.虽然LT模式下也可以设置循环非阻塞读取数据,但是这在编程的角度上来说不是强制的,可能因为编写问题,数据没有一次读完,仍然不影响程序向后运行.但是ET是强制性的,不一次读完,程序会出现bug.所以ET模式IO效率>=LT模式IO效率.

六、reactor服务器模式设计

1.多线程模式设计理念

主线程和线程池为大框架,每个线程都管理一个epoll_server服务器对象,在线程池中构建一个任务队列,用于管理connect对象,主线程充当生产者,初始化listen套接字,并负责监听listen套接字,将就绪的套接字添加到任务队列中;线程池各线程充当消费者,竞争任务队列中就绪的listen套接字,拿到listen套接字,用accept获取连接,各线程自己管理自己的sockfd.互不干扰.

2.master slaver模式设计理念

以主进程和一个进程池为大框架,主进程通过管道连接各个子进程,用管道文件读端控制子进程的运行,主进程负责将初始化listen套接字和监视listen套接字,子进程负责处理就绪的套接字获取链接,当主进程listen套接字就绪,就像任意管道发送任意数据,激活任意子进程,让子进程获取链接,处理链接.


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/860301.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

开源力量再现,国产操作系统商业化的全新探索

文章目录 1. 开源运动的兴起2. 开源力量的推动3. 国产操作系统的崭露头角3.1 国产操作系统有哪些 4.国产操作系统的商业化探索5.开源力量对国产操作系统商业化的推动 操作系统作为连接硬件、中间件、数据库、应用软件的纽带&#xff0c;被认为是软件技术体系中最核心的基础软件…

【人工智能前沿弄潮】—— SAM系列:玩转SAM(Segment Anything)

玩转SAM(Segment Anything) 官网链接&#xff1a; Segment Anything | Meta AI (segment-anything.com) github链接&#xff1a; facebookresearch/segment-anything: The repository provides code for running inference with the SegmentAnything Model (SAM), links fo…

vue实现5*5宫格当鼠标滑过选中的正方形背景颜色统一变色

vue实现5*5宫格当鼠标滑过选中的正方形背景颜色统一变色 1、实现的效果 2、完整代码展示 <template><div id"app" mouseleave"handleMouseLeave({row: 0, col: 0 })"><div v-for"rowItem in squareNumber" :key"rowItem…

Redis的简介,安装(Linux、Windows),配置文件的修改---详细介绍

Redis基础 Redis是一个基于内存的key-value结构数据库。 基于内存存储&#xff0c;读写性能高适合存储热点数据&#xff08;热点商品、资讯、新闻)企业应用广泛 1、Redis入门 1.1、Redis简介 The open source, in-memory data store used by millions of developers as a …

C++——关于命名空间

写c项目时&#xff0c;大家常用到的一句话就是&#xff1a; using namespace std; 怎么具体解析这句话呢&#xff1f; 命名冲突&#xff1a; 在c语言中&#xff0c;我们有变量的命名规范&#xff0c;如果一个变量名或者函数名和某个库里面自带的库函数或者某个关键字重名&…

电脑麦克风没声音?

这3招就可以解决&#xff01; 在我们使用电脑录制视频时&#xff0c;有时会遇到一个令人头疼的问题&#xff1a;麦克风没有声音。那么&#xff0c;为什么会出现这种情况呢&#xff1f;更重要的是&#xff0c;我们应该如何解决这个问题呢&#xff1f;本文将介绍3种方法&#xf…

MySQL 中的 Hash 索引

Hash 本身是一个函数&#xff0c;又被称为散列函数&#xff0c;它可以帮助我们大幅提升检索数据的效率。打个比方&#xff0c;Hash 就好像一个智能前台&#xff0c;你只要告诉它想要查找的人的姓名&#xff0c;它就会告诉你那个人坐在哪个位置&#xff0c;只需要一次交互就可以…

【Cocos Creator 项目实战 】消灭星星加强版(附带完整源码工程)

本文乃Siliphen原创&#xff0c;转载请注明出处 目录 概述 游戏整体流程 游戏框架设计 单一职责的类 主要流程控制类 核心玩法模块 UI&#xff1a; 游戏世界&#xff1a; 本文项目的代码组织结构 作者项目实践总结 场景只有一个入口脚本 尽量少在节点上挂载脚本 构…

四级以内的单词

单词 第一单元 excuse&#xff0c;me&#xff0c;yes&#xff0c;is&#xff0c;this&#xff0c;your&#xff0c;handbag&#xff0c;pardon&#xff0c;it&#xff0c;thank&#xff0c;you&#xff0c;very&#xff0c;much&#xff0c;pen&#xff0c;pencil&#xff0c…

模拟量电流电压采集软件使用教程

一.启动模拟量采集系统。打开软件需要登录用户 二.主界面是采集监控界面&#xff0c;每组采集柜设置采集不同产品和参数&#xff0c;选中产品判断设备连接状态和设置输出产品电压。 三.扫描每组柜的产品电流电压数据&#xff0c;判断每组柜哪些通道放了采集产品&#xff0c;扫描…

【云原生】K8S集群

目录 一、调度约束1.1 POT的创建过程1.1调度过程 二、指定节点调度2.1 通过标签选择节点 三、亲和性3.1requiredDuringSchedulingIgnoredDuringExecution&#xff1a;硬策略3.1 preferredDuringSchedulingIgnoredDuringExecution&#xff1a;软策略3.3Pod亲和性与反亲和性3.4使…

分支和循环语句(2)(C语言)

目录 do...while()循环 do语句的语法 do语句的特点 do while循环中的break和continue 练习 goto语句 do...while()循环 do语句的语法 do 循环语句; while(表达式); do语句的特点 循环至少执行一次&#xff0c;使用的场景有限&#xff0c;所以不是经常使用。 #inc…

C#导入数据使用Task异步处理耗时任务

C#多线程中&#xff0c;我们可以使用async和await来异步处理耗时任务。 现在我们打开一个Excel表格&#xff0c;将Excel表格的每一行数据进行处理&#xff0c;并存储到数据库中 新建Windows应用程序DataImportDemo&#xff0c;.net framework 4.6.1 将默认的Form1重命名为Fo…

js防止F12扒数据

添加 js 代码防止F12扒数据 ((function() {var callbacks [],timeLimit 50,open false;setInterval(loop, 1);return {addListener: function(fn) {callbacks.push(fn);},cancleListenr: function(fn) {callbacks callbacks.filter(function(v) {return v ! fn;});}}funct…

面试八股文Mysql:(1)事务实现的原理

1. 什么是事务 事务就是一组数据库操作&#xff0c;这些操作是一个atomic&#xff08;原子性的操作&#xff09; &#xff0c;不可分割&#xff0c;要么都执行&#xff0c;要么回滚&#xff08;rollback&#xff09;都不执行。这样就避免了某个操作成功某个操作失败&#xff0…

藏语翻译器:藏语翻译小助手

这是一款翻译功能齐全的翻译软件&#xff0c;主打藏语翻译功能&#xff0c;同时具备文字翻译、图片翻译、音频翻译、视频翻译、文档翻译等热门功能&#xff0c;支持将翻译结果导出为可编辑的文本文档&#xff0c;方便后续编辑整理。支持朗读原文和译文&#xff0c;帮助我们学习…

LeetCode209. 长度最小的子数组

题目&#xff1a;LeetCode209. 长度最小的子数组 描述&#xff1a; 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其和 ≥ target 的长度最小的 连续子数组 [numsl, numsl1, …, numsr-1, numsr] &#xff0c;并返回其长度。如果不存在符合条件的子…

Grafana Prometheus 通过JMX监控kafka

第三方kafka exporter方案 目前网上关于使用Prometheus 监控kafka的大部分资料都是使用一个第三方的 kafka exporter&#xff0c;他的原理大概就是启动一个kafka客户端&#xff0c;获取kafka服务器的信息&#xff0c;然后提供一些metric接口供Prometheus使用&#xff0c;随意它…

【React学习】—函数式组件(四)

【React学习】—函数式组件&#xff08;四&#xff09; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><ti…

17.电话号码的字母组合(回溯)

目录 一、题目 二、代码 一、题目 17. 电话号码的字母组合 - 力扣&#xff08;LeetCode&#xff09; 二、代码 class Solution {const char*data[10]{"","","abc","def","ghi","jkl","mno","pq…