I/O多路转接——epoll服务器代码编写

news2024/11/25 2:13:18

目录

一、poll​

二、epoll

1.epoll

2.epoll的函数接口

①epoll_create

②epoll_ctl

③epoll_wait

3.操作原理

三、epoll服务器编写

1.日志打印

2.TCP服务器

3.Epoll

①雏形

②InitEpollServer 与 RunServer

③HandlerEvent

四、Epoll的工作模式

1.LT模式与ET模式

2.基于LT模式的epoll服务器

①整体框架

②处理BUG

③优化结构            

④异常处理

④序列化与反序列化

⑤优化

⑥Reactor模式与Proactor模式


一、poll

        该函数的作用也是如同select一样在IO中负责等,但是它不用每次将参数重设,并且没有上限。

        这需要参数的帮助。

        第一个参数的类型是一个指针,其中存放的类型是一个结构体。

        fd为文件描述符,events是负责告诉内核关注什么事件,revents是负责告诉用户关注的事件是否就绪。替代了select中输入输出参数为同一个的问题。我们还可以将参数稍作调整更为直观。

int poll(struct pollfd fds[],nfds_t nfds,int timeout);  

        我们要关注读事件写事件怎么传呢?

        这些都是宏,我们想关注多个事件可以将他们对应的宏按位与在一起,再传入。  

        第二个参数的类型是nfds_t,实则是long int。

        该参数传递的是前一个参数作为数组中的元素个数。

        第三个参数,与select中的timeout不同的是 ,这里的timeout的参数不是结构体,直接表示的是微秒。

        关于poll的服务器编写就不多阐述了,我们将上篇文章的select稍微修改下,就能使用。大家可以移步先去观看上篇文章。

代码:

#include <iostream>
#include <poll.h>
#include "Sock.hpp"

using namespace std;
#define NUM 1024
struct pollfd fdsArray[NUM]; // 辅助数组 里面存放历史文件描述符
#define DEFAUIT -1           // 默认

void Usage(string process)
{
    cout << "Please entry" << process << " port" << endl;
}

static void ShowArray()
{
    cout << "当前的文件描述符为: ";
    for (int i = 0; i < NUM; i++)
    {
        if (fdsArray[i].fd == DEFAUIT)
            continue;
        cout << fdsArray[i].fd << ' ';
    }
    cout << endl;
}

static void HandlerEvent(int listensock)
{
    for (int j = 0; j < NUM; j++)
    {
        if (fdsArray[j].fd == DEFAUIT)
            continue;
        if (j == 0 && fdsArray[j].fd == listensock)
        {

            if (fdsArray[j].revents & POLLIN)
            {
                cout << "新连接到来,需要处理" << endl;

                string clientip;
                uint16_t clientport = 0;
                int sock = Sock::Accept(listensock, &clientip, &clientport); // 这里不会阻塞
                if (sock < 0)
                    return;
                cout << "获取新连接成功 " << clientip << ":" << clientport << " Sock:" << sock << endl;
                
                int i = 0;
                for (; i < NUM; i++)
                {
                    if (fdsArray[i].fd == DEFAUIT)
                        break;
                }
                if (i == NUM)
                {
                    cerr << "服务器已经到达了上限" << endl;
                    close(sock);
                }
                else
                {
                    // 将文件描述符放入fdsArray中
                    fdsArray[i].fd = sock;
                    fdsArray[i].events = POLLIN;
                    fdsArray[i].revents = 0;
                    // debug
                    ShowArray();
                }
            }
        }
        else
        {
            // 处理其他的文件描述符的IO事件
            if (fdsArray[j].revents & POLLIN)
            {
                char buffer[1024];
                ssize_t s = recv(fdsArray[j].fd, buffer, sizeof(buffer), 0);
                // 这里的阻塞读取真的会阻塞住吗?并不会,因为走到这里select已经帮我们等了,并且此时事件就绪。
                if (s > 0)
                {
                    buffer[s] = 0;
                    cout << "client[" << fdsArray[j].fd << "]"
                         << " # " << buffer << endl;
                }
                else if (s == 0)
                {
                    cout << "client[" << fdsArray[j].fd << "] "
                         << "quit"
                         << " server will close " << fdsArray[j].fd << endl;
                    fdsArray[j].fd = DEFAUIT; // 恢复默认
                    fdsArray[j].events = 0;
                    fdsArray[j].revents = 0;
                    close(fdsArray[j].fd);    // 关闭sock
                    ShowArray();           // debug
                }
                else
                {
                    cerr << "recv error" << endl;
                    fdsArray[j].fd = DEFAUIT; // 恢复默认
                    fdsArray[j].events = 0;
                    fdsArray[j].revents = 0;
                    close(fdsArray[j].fd);    // 关闭sock
                    ShowArray();           // debug
                }
            }
        }
    }
}

int main(int argc, char **argv)
{
    if (argc != 2)
    {
        Usage(argv[0]);
        exit(1);
    }
    int listensocket = Sock::Socket();
    Sock::Bind(listensocket, atoi(argv[1]));
    Sock::Listen(listensocket);

    for (int i = 0; i < NUM; i++)
    {
        fdsArray[i].fd = DEFAUIT;
        fdsArray[i].events = 0;
        fdsArray[i].revents = 0;
    }
    fdsArray[0].fd = listensocket; // 默认fdsArray第一个元素存放
    fdsArray[0].events = POLLIN;
    int timeout = 100000;
    while (1)
    {
        int n = poll(fdsArray, NUM, timeout);
        switch (n)
        {
        case 0:
            // timeout
            cout << "timeout ... : " << (unsigned int)time(nullptr) << endl;
            break;
            // error
            cout << "select error : " << strerror(errno) << endl;
        case -1:
            break;
        default:
            // 等待成功
            HandlerEvent(listensocket);
            break;
        }
    }
}

现象:

二、epoll

1.epoll

        按照man手册中说法为:是为处理大批量句柄而做了改进的poll。

        epoll被公认为性能最好的多路I/O就绪通知方法。

 2.epoll的函数接口

①epoll_create

        该函数的目的是创建一个epoll句柄。

        自从linux2.6.8之后,size参数是被忽略的。

        返回值:成功时返回一个文件描述符,失败时返回-1。

②epoll_ctl

        该函数的作用是为对epoll句柄中添加特点的文件描述符对应的事件。也就是用户告诉内核要关注哪些事件。

        epfd:传入epoll_create的返回值。

        op:选项,有EPOLL_CTL_ADD、 EPOLL_CTL_MOD、EPOLL_CTL_DEL。

        fd:文件描述符。

        event:对应的事件。

        其中的结构体为:

③epoll_wait

        作用是等待文件描述符对应的事件是否就绪。

        epfd:传入epoll_create的返回值。

        events:对应的事件。

        maxevents:当前关注的文件描述符的最大值。

        timeout:等待时间。

3.操作原理

        了解了上文这么多函数,其实仅仅是看并没有真正理解到epoll是怎么工作的,下面来讲讲操作原理。

        操作系统如何得知网络中的数据到来了?

        网卡中得到数据,会向CPU发送硬件中断,调用OS预设的中断函数,负责从外设进行数据拷贝,从外设拷贝内核缓冲区。

        epoll_create创建的epoll句柄是什么?

        epoll句柄可以理解为epoll模型。

        epoll_create会创建一个空的红黑树,一个就绪队列,创建对应的回调函数。

        这里的红黑树中的节点存放着要关注的文件描述符和事件等信息,属于用户告诉内核。这里的树等价于当初写poll、select维护的数组。

         就绪队列存放着已经就绪的事件。属于内核告诉用户。        回调函数是在当数据到来发生硬件中断,os调用中断函数中拷贝之后使用。

         该回调函数会依据就绪的数据,获取到对应的文件描述符和对应的事件,依据这两个内容构建一个fd_queue节点,插入到就绪队列中。

        这三个合起来为epoll模型。

        这个epoll模型是调用epoll_create会创建的,但是为什么返回值是一个文件描述符呢?

        我们知道文件描述符其实就是数组的下标,该数组存放着指向的管理文件的结构体的指针。具体的大家可以看我这篇文章。

        struct file中就存放着epoll的数据结构。

        由文件描述符找到文件的结构体,就可以找到红黑树,以及就绪队列等关于epoll的数据。        

        epoll_ctl则是来维护这个红黑树的,负责增加节点删除节点,也就是维护用户告诉内核信息的函数。

        epoll_wait则是来从内核中的就绪队列拿数据,有数据则证明有事件就绪了,以前poll需要便利数组去查看是否有文件描述符对应的事件就绪,现在只用通过检查就绪队列是否有数据就知道是否有文件描述符对应的事件就绪。时间复杂度为O(1)。

       

三、epoll服务器编写

1.日志打印

Log.hpp:
#include <cstdio>
#include <cstdarg>
#include <cassert>
#include <stdlib.h>
#include <time.h>

#define DEBUG 0
#define NOTICE 1
#define WARNING 2
#define FATAL 3

const char *log_level[] = {"DEBUG", "NOTICE", "WARNING", "FATAL"};

void logMessage(int level, const char *format, ...)
{
    assert(level >= DEBUG);
    assert(level <= FATAL);
    char logInfor[1024];
    char *name = getenv("USER");
    va_list ap;
    va_start(ap, format);

    vsnprintf(logInfor, sizeof(logInfor) - 1, format, ap);

    va_end(ap);
    
    FILE * out = (level == FATAL) ? stderr : stdout;

    fprintf(out,"%s | %u | %s | %s\n",\
        log_level[level],\
        (unsigned int)time(nullptr),\
        name == nullptr ? "Unkown" : name,\
        logInfor
    );
}

2.TCP服务器

Sock.hpp:
#pragma once

#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>

class Sock
{
public:
    static const int gbacklog = 20;

    static int Socket()
    {
        int listenSock = socket(PF_INET, SOCK_STREAM, 0);
        if (listenSock < 0)
        {
            exit(1);
        }
        int opt = 1;
        setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listenSock;
    }
    static void Bind(int socket, uint16_t port)
    {
        struct sockaddr_in local; // 用户栈
        memset(&local, 0, sizeof local);
        local.sin_family = PF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;

        // 2.2 本地socket信息,写入sock_对应的内核区域
        if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0)
        {
            exit(2);
        }
    }
    static void Listen(int socket)
    {
        if (listen(socket, gbacklog) < 0)
        {
            exit(3);
        }
    }

    static int Accept(int socket, std::string *clientip, uint16_t *clientport)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);

        int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);
        if (serviceSock < 0)
        {
            // 获取链接失败
            return -1;
        }
        if (clientport)
            *clientport = ntohs(peer.sin_port);
        if (clientip)
            *clientip = inet_ntoa(peer.sin_addr);
        return serviceSock;
    }
};

3.Epoll

①雏形

EpollServer.hpp:
#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include "log.hpp"
using namespace std;

class EpollServer
{
public:
    EpollServer(uint16_t port)
        : port_(port), listensock_(-1), epfd_(-1)
    {
    }
    void InitEpollServer()
    {

    }
    void RunServer()
    {

    }

    ~EpollServer()
    {
        if(listensock_ != -1) close(listensock_);
        if(epfd_ != -1) close(epfd_);
    }
private:
    int listensock_;
    int epfd_;
    uint16_t port_;
};
epoll.cc:
#include "EpollServer.hpp"
#include "Sock.hpp"
#include "Log.hpp"
#include <memory>
void Usage(string process)
{
    cout << "Please entry" << process << " port" << endl;
}

int main(int argv, char **argc)
{
    if (argv != 2)
    {
        Usage(argc[0]);
        exit(1);
    }   
    unique_ptr<EpollServer> epoll(new EpollServer(atoi(argc[1])));
    epoll->InitEpollServer();
    epoll->RunServer();

    return 0;
}

②InitEpollServer 与 RunServer

代码:

    void InitEpollServer()
    {
        listensock_ = Sock::Socket();
        Sock::Bind(listensock_, port_);
        Sock::Listen(listensock_);

        epfd_ = epoll_create(gsize);
        if (epfd_ < 0)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
            exit(1);
        }
        logMessage(DEBUG, "epoll_creatr success,epoll模型创建成功,epfd: %d", epfd_);
    }
    void RunServer()
    {
        // 1.先添加listensock_到epoll模型中
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = listensock_;
        int a = epoll_ctl(epfd_, EPOLL_CTL_ADD, listensock_, &ev);
        if (a != 0)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
            exit(1);
        }
        struct epoll_event revs[num];
        int timeout = 10000;
        while (1)
        {
            int n = epoll_wait(epfd_, revs, num, timeout);
            switch (n)
            {
            case 0:
                // timeout
                cout << "timeout ... : " << (unsigned int)time(nullptr) << endl;
                break;
                // error
                cout << "epoll_wait error : " << strerror(errno) << endl;
            case -1:
                break;
            default:
                // 等待成功
                cout<<"event 到来"<<endl;
                break;
            }
        }
    }

现象:

③HandlerEvent

        下面开始处理就绪事件。首先,我们定义一个成员变量它的类型是function<int(int)>,并初始化,

         我们将自己想要对文件描述符处理的函数传入在类的实例化的过程中,并在HandlerEvent函数中调用该函数。

int myfuc(int sock)
{
    // 这里bug,TCP基于流式,如何保证一次将数据读取完毕,一会解决
    char buff[1024];
    int sz = recv(sock, buff, sizeof buff -1, 0);
    if (sz > 0)
    {
        buff[sz] = 0;
        logMessage(DEBUG, "client[%d]:%s", sock, buff);
        return sz;
    }
    return sz;
}

int main(int argv, char **argc)
{
    if (argv != 2)
    {
        Usage(argc[0]);
        exit(1);
    }
    unique_ptr<EpollServer> epoll(new EpollServer(atoi(argc[1]), myfuc));
    epoll->InitEpollServer();
    epoll->RunServer();
    return 0;
}

        

    void HandlerEvent(struct epoll_event *revs, int n)
    {
        for (int i = 0; i < n; i++)
        {
            int sock = revs[i].data.fd;
            uint32_t revent = revs[i].events;
            if (revent & EPOLLIN)
            {
                // IO
                if (sock == listensock_)
                {
                    // 监听套接字
                    string clientip;
                    uint16_t clientport = 0;
                    int iosock = Sock::Accept(listensock_, &clientip, &clientport);
                    if (iosock < 0)
                    {
                        logMessage(FATAL, "Sock error , errno : %d :%s", errno, strerror(errno));
                        continue;
                    }
                    // 托管给epoll
                    struct epoll_event ev;
                    ev.data.fd = iosock;
                    ev.events = EPOLLIN;
                    int a = epoll_ctl(epfd_, EPOLL_CTL_ADD, iosock, &ev);
                    assert(a == 0);
                    (void)a;
                }
                else
                {
                    // 其他套接字
                    int n = fuc_(sock);
                    if (n == 0 || n < 0)
                    {
                        int x = epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr);
                        assert(x == 0);
                        (void)x;
                        close(sock);
                        logMessage(DEBUG,"clinet[%d] exit",sock);
                    }
                }
            }
            else
            {
                // epollout 后续处理
            }
        }
    }

 现象:

四、Epoll的工作模式

1.LT模式与ET模式

        LT(level trigger)水平触发,ET(edge trigger)边缘触发。

        LT模式通俗来讲,只要底层有数据,就一直通知上层。

        ET模式,只要底层有数据且是从无到有,从有到多,发生变化时才会通知上层。

        例如:你对妈妈说今天午饭LT模式,当妈妈将饭做好,等你吃饭时,并且一直在喊你,宝贝儿子快来吃饭,直到你去吃饭。这次你对妈妈说今天午饭ET模式,当妈妈将饭做好,饭从无到有,此时妈妈叫你吃饭,你没有去,此后饭的量恒定没有变化,妈妈就再也不会通知你。

        这两个模式哪一个更为高效呢?ET模式更为高效。以TCP通信来讲,并没有多少数据的到来,底层却一直在提醒上层,多数的提醒的都是在提醒同一个数据就绪。

        LT模式,只要有数据就会提醒,我们可以先去处理其他业务,等某次提醒时再处理底层的数据。但ET模式,数据到来之后没有变化,就再也不会提醒你,所以只能在提醒时就处理数据,要不后面再也没有提醒,这就倒逼程序员一次将数据读完。

        如何知道底层数据已经读完了?只有不断的去调用recv、read函数,如果报错了证明数据已经读完了。数据已经读完了,但最后一次,没数据了,read却阻塞住了。

        所以在ET模式下,所有的文件描述符都要设置成非阻塞。

        epoll默认就是LT模式。

2.基于LT模式的epoll服务器

①整体框架

        整体代码已经上传到gitee上,配合整体代码观看,更加直观便捷。

        下面我们基于上文的epoll服务器,但不同与上文,大家请往下看。        

        首先先建立一个类,该类将文件描述符和回调方法结合。

Tcpserver.hpp:
class Connection;
using fuc_t = function<int(Connection *)>;

class Connection
{

public:
    // 文件描述符
    int _sock;

    Tcpserver *_ptr;
    //  自己的接受和发送缓冲区
    string _inbuff;
    string _outbuff;

    // 回调函数
    fuc_t _readfuc;
    fuc_t _writefuc;
    fuc_t _exceptfuc;

public:
    Connection(int sock, Tcpserver *ptr) : _sock(sock), _ptr(ptr)
    {
    }
    ~Connection()
    {
    }
    void SetReadfuc(fuc_t fuc)
    {
        _readfuc = fuc;
    }
    void SetWritefuc(fuc_t fuc)
    {
        _writefuc = fuc;
    }
    void SetExceptfuc(fuc_t fuc)
    {
        _exceptfuc = fuc;
    }
};

        

Tcpserver.hpp:
class Tcpserver
{
public:
    Tcpserver(int port)
    {

        // 网络
        _listensock = Sock::Socket();
        Sock::Bind(_listensock, port);
        Sock::Listen(_listensock);

        // epoll
        _epfd = Epoller::CreateEpoll();

        // add事件
        Epoller::Addevent(_epfd, _listensock, EPOLLIN | EPOLLET);

        // 将listensock匹配的connection方法添加到unordered_map中
        auto iter = new Connection(_listensock, this);
        iter->SetReadfuc(std::bind(&Tcpserver::Accepter, this, std::placeholders::_1));
        _conn.insert({_listensock, iter});

        // 初始化就绪队列
        _revs = new struct epoll_event[_revs_num];
    }
    int Accepter(Connection *conn)
    {
        string clientip;
        uint16_t clientport;
        int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
        if (sockfd < 0)
        {
            logMessage(FATAL, "accept error");
            return -1;
        }
        logMessage(DEBUG, "Get a new connect : %d", sockfd);
        AddConn(sockfd, EPOLLIN | EPOLLET);
        return 0;
    }
    bool SockinConn(int sock)
    {
        auto iter = _conn.find(sock);
        if (iter == _conn.end())
        {
            return false;
        }
        else
        {
            return true;
        }
    }

    void AddConn(int sock, uint32_t event)
    {
        // 将文件描述符加入epoll模型中
        Epoller::Addevent(_epfd, sock, event);
        // 将文件描述符匹配的connection,也加入map中
        _conn.insert({sock, new Connection(sock, this)});
        logMessage(DEBUG, "将文件描述符匹配的connection加入map成功");

    }

    void Dispatcher()
    {
        // 获取就绪事件
        int n = Epoller::GetReadyFd(_epfd, _revs, _revs_num);
        // logMessage(DEBUG, "GetReadyFd,epoll_wait");
        // 事件派发
        for (int i = 0; i < n; i++)
        {
            int sock = _revs[i].data.fd;
            uint32_t revent = _revs[i].events;
            if (EPOLLIN & revent)
            {
                // 先判空
                if (SockinConn(sock) && _conn[sock]->_readfuc)
                {
                    // 该文件描述符对应的读方法
                    _conn[sock]->_readfuc(_conn[sock]);
                }
            }
            if (EPOLLOUT & revent)
            {
                // 先判空
                if (SockinConn(sock) && _conn[sock]->_writefuc)
                {
                    // 该文件描述符对应的写方法
                    _conn[sock]->_writefuc(_conn[sock]);
                }
            }
        }
    }

    void Run()
    {
        while (1)
        {
            Dispatcher();
        }
    }
    ~Tcpserver()
    {
        if (_listensock != -1)
            close(_listensock);
        if (_epfd != -1)
            close(_epfd);
        delete[] _revs;
    }

private:
    // 1.网络sock
    int _listensock;
    // 2.epoll
    int _epfd;
    // 3.将epoll与上层代码结合
    unordered_map<int, Connection *> _conn;
    // 4.就绪事件列表
    struct epoll_event *_revs;
    // 5.就绪事件列表大小
    static const int _revs_num = 64;
};
Epoller.hpp:
#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <sys/epoll.h>
#include "Log.hpp"
using namespace std;

class Epoller
{
public:
    static int CreateEpoll()
    {
        int size = 128;
        int epfd = epoll_create(size);
        if (epfd < 0)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
            exit(1);
        }
        return epfd;
    }
    static bool Addevent(int epfd, int sock, uint32_t event)
    {
        struct epoll_event ev;
        ev.data.fd = sock;
        ev.events = event;
        int n = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
        if (n != 0)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
            return false;
        }
        return true;
    }
    static int GetReadyFd(int epfd, struct epoll_event evs[], int num)
    {
        // 阻塞式
        int n = epoll_wait(epfd, evs, num, -1);
        if (n == -1)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
        }
        return n;
    }
};

       

Epoll.cc:
#include "Tcpserver.hpp"
#include "Sock.hpp"
#include <memory>

using namespace std;


void Usage(string process)
{
    cout << "Please entry" << process << " port" << endl;
}



int main(int argv, char **argc)
{
    if (argv != 2)
    {
        Usage(argc[0]);
        exit(1);
    }
    unique_ptr<Tcpserver> ep(new Tcpserver(atoi(argc[1])));
    ep->Run();
    return 0;
}

现象: 

        当然还没有写完。

        结合整体代码看:①通过Tcpserver的构造函数,先创建网络套接字,建立epoll模型②监听listen套接字,并为listen设置对应的connect类设置读方法③将listen套接字添加到epoll模型中④通过epoll_wait等待事件就绪⑤当listen套接字就绪时,调用回调函数其中accept来获取新连接的到来。

        同时,因为我们今天写的是ET模式,所以要将文件描述符设置为非阻塞式,即使用fcntl。

        目前代码只写到了这里,想纵观全貌更加的详细请看我的gitee。

        下面继续增加内容,新的连接到来,要为新的连接增加对应的方法。

        并且要进行序列化与反序列化,因为我们的服务器是基于TCP的流式读取,每次读取我们确保不了读上来的数据是完整的数据,所以要做处理。

        当前的处理为我们读上的数据如同:112233X1213Xadasd。‘X’作为分隔符,我们需要读上来的数据,将数据进行分割。

        同样建立一个回调方法,在read之后对数据进行分割通过分割,在此之前对它进行初始化。

using callbcak_t = function<int(Connection *, string &)>;

int HandlerPro(Connection *conn, string &message)
{
    // 我们能保证走到这里一定是完整的报文,已经解码
    // 接下来是反序列化
    cout << "获取request : " << message <<"剩余的信息是"<<conn->_inbuff<<endl;
}

int main(int argv, char **argc)
{
    if (argv != 2)
    {
        Usage(argc[0]);
        exit(1);
    }
    unique_ptr<Tcpserver> ep(new Tcpserver(HandlerPro, atoi(argc[1])));
    ep->Run();
    return 0;
}

    int TcpRecver(Connection *conn)
    {
        // 对普通套接字读取
        while (true)
        {
            char buff[1024];
            ssize_t sz = recv(conn->_sock, buff, sizeof(buff) - 1, 0);
            if (sz > 0)
            {
                buff[sz] = '\0';
                conn->_inbuff += buff;
            }
            else if (sz == 0)
            {
                logMessage(DEBUG, "client quit");
            }
            else if (sz < 0)
            {
                if (errno == EINTR)
                {
                    // 因为信号导致IO关闭,但数据还没有读完
                    continue;
                }
                else if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    // 读完了
                    break;
                }
                else
                {
                    // 读取出错
                }
            }
        }
        // 本轮读取完毕
        // 将读取上来的 如:xxxxx/3xxxxx/3xxx/3
        // 分为 xxxxx 、xxxxx、xxx
        vector<string> result;
        PackageSplit(conn->_inbuff, &result);
        for (auto &message : result)
        {
            _cb(conn, message);
        }

        return 0;
    }

现象:

②处理BUG

        我们要解决Accept函数的BUG,因为我们当前是ET模式,如果当前有大量的连接来,系统只会通知上层一次,而只进行一次调用显然是不对的,会导致读不上其他到来的链接。所以此时我们要进行循环读取,那循环读取时什么时候停止呢?要根据Accept函数中的accept函数调用失败时设置的errno来判别。我们来看下究竟errno会被设置成什么。

        EAGAIN和EWOULDBLOCK,意思为,当前的文件描述符被置为非阻塞的且当前没有可接受的连接。意味着已经将当前到来的连接读完了。

        EINTR表示当前的系统调用被一个捕捉到的信号中断在一个有效的连接到来之前。

        这三个宏值得我们关注,剩下的宏都是表明accept出错了,所以我们可以这样修改函数。

    int Accepter(Connection *conn)
    {
        while (1)
        {
            string clientip;
            uint16_t clientport;
            int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
            if (sockfd < 0)
            {
                if (errno == EINTR) // 被信号中断
                    continue;
                else if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取结束
                    break;
                else
                {
                    // 出错
                    logMessage(FATAL, "accept error");
                    return -1;
                }
            }
            logMessage(DEBUG, "Get a new connect : %d", sockfd);
            AddConn(sockfd, EPOLLIN | EPOLLET);
        }
        return 0;
    }

        

③优化结构                 

        我们会发现被标注的代码,与下文的AddConn函数功能具有相似性,为了避免耦合性更高,我们将这段代码移到AddConn函数中。

    void AddConn(int sock, uint32_t event, fuc_t readfuc, fuc_t writefuc, fuc_t exceptfuc)
    {
        if (event & EPOLLET)
            Util::SetNonBlock(sock);

        // 将文件描述符加入epoll模型中
        Epoller::Addevent(_epfd, sock, event);
        // 将文件描述符匹配的connection,也加入map中
        Connection *conn = new Connection(sock, this);

        conn->SetReadfuc(readfuc);
        conn->SetWritefuc(writefuc);
        conn->SetExceptfuc(exceptfuc);

        // conn->SetReadfuc(std::bind(&Tcpserver::TcpRecver, this, std::placeholders::_1));
        // conn->SetWritefuc(std::bind(&Tcpserver::TcpSender, this, std::placeholders::_1));
        // conn->SetExceptfuc(std::bind(&Tcpserver::TcpExcepter, this, std::placeholders::_1));

        _conn.insert({sock, conn});
        logMessage(DEBUG, "将文件描述符匹配的connection加入map成功");
    }

         改了AddConn函数,还需对其他涉及到此函数的地方大动干戈。

    Tcpserver(callbcak_t cb, int port) : _cb(cb)
    {

        // 网络
        _listensock = Sock::Socket();
        Util::SetNonBlock(_listensock);
        Sock::Bind(_listensock, port);
        Sock::Listen(_listensock);

        // epoll
        _epfd = Epoller::CreateEpoll();

        //  添加listen事件
        AddConn(_listensock, EPOLLIN | EPOLLET,
                std::bind(&Tcpserver::Accepter, this, std::placeholders::_1), nullptr, nullptr);

        // // add事件
        // Epoller::Addevent(_epfd, _listensock, EPOLLIN | EPOLLET);

        // // 将listensock匹配的connection方法添加到unordered_map中
        // auto iter = new Connection(_listensock, this);
        // iter->SetReadfuc(std::bind(&Tcpserver::Accepter, this, std::placeholders::_1));
        // _conn.insert({_listensock, iter});

        // 初始化就绪队列
        _revs = new struct epoll_event[_revs_num];
    }
    int Accepter(Connection *conn)
    {
        while (1)
        {
            string clientip;
            uint16_t clientport;
            int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
            if (sockfd < 0)
            {
                if (errno == EINTR) // 被信号中断
                    continue;
                else if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取结束
                    break;
                else
                {
                    // 出错
                    logMessage(FATAL, "accept error");
                    return -1;
                }
            }
            logMessage(DEBUG, "Get a new connect : %d", sockfd);
            AddConn(sockfd, EPOLLIN | EPOLLET,
                    std::bind(&Tcpserver::TcpRecver, this, std::placeholders::_1),
                    std::bind(&Tcpserver::TcpSender, this, std::placeholders::_1),
                    std::bind(&Tcpserver::TcpExcepter, this, std::placeholders::_1));
        }
        return 0;
    }

④异常处理

        就绪的文件描述符挂断了,就绪的文件描述符出错了怎么办。我们统一起来转为文件描述符

读事件或写事件就绪,必定在读时或写时出错,我们转而去那时处理异常。

 

         接下来,我们呢要对异常处理函数编写。

代码:

    int TcpExcepter(Connection *conn)
    {
        // 处理普通套接字异常
        // 0.检测
        if (!SockinConn(conn->_sock))
            return -1;

        // 1.移除事件
        Epoller::DelEvent(_epfd, conn->_sock);
        logMessage(DEBUG, "remove epoll event");

        // 2.关闭文件描述符
        close(conn->_sock);
        logMessage(DEBUG, "close fd :%d", conn->_sock);

        // 3.删除map中的sock对应的conn
        // delete conn;
        delete _conn[conn->_sock];
        logMessage(DEBUG, "delete conn object success");

        // 4.去掉sock和conn的映射关系 上一步只是delete掉了对象,但是映射关系还在
        _conn.erase(conn->_sock);
        logMessage(DEBUG, "erase conn from map success");
    }

现象:

④序列化与反序列化

        何为序列化?何为反序列化?

        我们在网络传输时以字符串形式发送,传输时以二进制形式。那我想要给对方发送结构体怎么办,则需要将结构体转换为字符串形式,这个过程称为序列化;对方收到字符串,通过反序列化就可以得到结构体。

        我们今天的结构体如图所示:

struct Request 
{
    int _x;
    int _y;
    char _op;
};

struct Response
{
    int _result;
    int _exitcode;
};

         客户端发送一串字符串我们接受到之后,通过反序列化转换为结构体。

bool Parser(string &in, Request *out)
{
    // 反序列化
    // 1 + 1, 2 * 4, 5 * 9, 6 *1
    std::size_t spaceOne = in.find(SPACE);
    if (std::string::npos == spaceOne)
        return false;
    std::size_t spaceTwo = in.rfind(SPACE);
    if (std::string::npos == spaceTwo)
        return false;

    std::string dataOne = in.substr(0, spaceOne);
    std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);
    std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));
    if (oper.size() != 1)
        return false;

    // 转成内部成员
    out->_x = atoi(dataOne.c_str());
    out->_y = atoi(dataTwo.c_str());
    out->_op = oper[0];

    return true;
}

void Serialize(Response &in, string *out)
{
    // 序列化
    // "exitCode_ result_"
    std::string ec = std::to_string(in._exitcode);
    std::string res = std::to_string(in._result);

    *out = ec;
    *out += SPACE;
    *out += res;
    *out += CRLF;
}

        接下来,去文件描述符对应的读方法,调用该函数。

Response Calculate(Request &req)
{

    Response resp = {0,0};
    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if (req._y == 0)
        {
            resp._exitcode = 1; // 1 除零错误
            resp._result = INT32_MAX;
        }
        else
            resp._result = req._x / req._y;
        break;
    }
    case '%':
    {
        if (req._y == 0)
        {
            resp._exitcode = 2; // 2 模零错误
            resp._result = INT32_MAX;
        }
        else
            resp._result = req._x % req._y;
        break;
    }
    default:
        resp._exitcode = 3; // 非法输入
        break;
    }
    return resp;
}

int HandlerPro(Connection *conn, string &message)
{
    // 我们能保证走到这里一定是完整的报文,已经解码
    cout << "获取request : " << message << endl;
    // 1 * 1
    // 接下来是反序列化
    Request req;
    if (Parser(message, &req) == false)
    {
        return -1;
    }

    // 业务处理
    Response resp = Calculate(req);

    // 序列化
    string out;
    Serialize(resp, &out);

    // 发送给client
    conn->_outbuff += out;
    // 发送 
}

        能不能直接调用send方法无脑的直接向客户端发送呢?

        首先要知道写的缓冲区是否已满,如何检测缓冲区已满?

        在LT模式中,当我们想写时只需要将对应的文件描述符所对应的EPOLLOUT添加事件中,在我们今天的编写代码中,在whlie循环中的事件检测中,当检测到是EPOLLOUT事件时,我们就可以在回调函数中调用send函数。

        在ET模式中,我们也可以使用上面的方法,但是ET模式追求高效,所以一般会直接发送数据,如果数据发送完了,那就可以结束了;如果没有发送完,缓冲区已满,就会选择去拜托EPOLL去完成后续任务。


int HandlerPro(Connection *conn, string &message)
{

    // 我们能保证走到这里一定是完整的报文,已经解码
    cout << "---------------" << endl;
    // 1 * 1
    // 接下来是反序列化
    cout << "获取request : " << message << endl;
    Request req;
    if (Parser(message, &req) == false)
    {
        return -1;
    }

    // 业务处理
    Response resp = Calculate(req);

    // 序列化
    string out;
    Serialize(resp, &out);

    // 发送给client
    conn->_outbuff += out;
    conn->_writefuc(conn);
    if (conn->_outbuff.empty())
    {
        if (conn->_outbuff.empty() == 0)
            conn->_ptr->ModSockEvent(conn->_sock, true, false);
        else
            conn->_ptr->ModSockEvent(conn->_sock, true, true);
    }
    // // 发送
    // // conn->_ptr->ModSockEvent(conn->_sock, true, true);

    cout << "---------------" << endl;
}
    void ModSockEvent(int sock, bool read, bool write)
    {
        uint32_t event = 0;
        event |= read ? EPOLLIN : 0;
        event |= write ? EPOLLOUT : 0;
        Epoller::ModEvent(_epfd, sock, event);
    }
    static bool ModEvent(int epfd, int sock, uint32_t event)
    {
        struct epoll_event ev;
        ev.data.fd = sock;
        ev.events = event;
        int n = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);
        return n == 0;
    }

        因为,我们截取字符串的函数没有找到我们规定的分隔符的话,就会将最后的一部字符串归到下一次读取时。原来的代码如下所示,所以就会导致每次输入一段字符串,只会输出最后一段的结果,我们只需要稍作修改。

void PackageSplit(string &buff, vector<string> *result)
{
    // asdasXdasdaXda
    // asdas dasda da
    while (true)
    {
        size_t pos = buff.find(SEP);
        if (pos == string::npos)
        {
            break;
        }
        result->push_back(buff.substr(0, pos));
        buff.erase(0, pos + SEP_SZ);
    }
}

修改后:

void PackageSplit(string &buff, vector<string> *result)
{
    // asdasXdasdaXda
    // asdas dasda da
    while (true)
    {
        size_t pos = buff.find(SEP);
        if (pos == string::npos)
        {
            if (buff.size() < 5)
            {
                buff.clear();
                break;
            }
            result->push_back(buff.substr(0, buff.size()));
            buff.clear();
            break;
        }
        result->push_back(buff.substr(0, pos));
        buff.erase(0, pos + SEP_SZ);
    }
}

现象:

⑤优化

        我们的代码耦合度太高了,数据处理的函数放在了源文件中,我们另起一个头文件,将处理函数放到该头文件中,这样业务处理是业务处理,网络服务是网络服务。

Service.hpp:
#pragma once 

#include "Protocol.hpp"
#include <functional>

using service_t = function<Response (Request &req)>;


Response Calculate(Request &req)
{

    Response resp = {0, 0};
    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if (req._y == 0)
        {
            resp._exitcode = 1; // 1 除零错误
            resp._result = INT32_MAX;
        }
        else
            resp._result = req._x / req._y;
        break;
    }
    case '%':
    {
        if (req._y == 0)
        {
            resp._exitcode = 2; // 2 模零错误
            resp._result = INT32_MAX;
        }
        else
            resp._result = req._x % req._y;
        break;
    }
    default:
        resp._exitcode = 3; // 非法输入
        break;
    }
    return resp;
}

    

epoll.cc:
int HandlerProHelp(Connection *conn, string &message,service_t service)
{
// 我们能保证走到这里一定是完整的报文,已经解码
    cout << "---------------" << endl;
    // 1 * 1
    // 接下来是反序列化
    cout << "获取request : " << message << endl;
    Request req;
    if (Parser(message, &req) == false)
    {
        return -1;
    }

    // 业务处理
    Response resp = service(req);

    // 序列化
    string out;
    Serialize(resp, &out);

    // 发送给client
    conn->_outbuff += out;
    conn->_writefuc(conn);
    if (conn->_outbuff.empty())
    {
        if (conn->_outbuff.empty() == 0)
            conn->_ptr->ModSockEvent(conn->_sock, true, false);
        else
            conn->_ptr->ModSockEvent(conn->_sock, true, true);
    }
    // // 发送
    // // conn->_ptr->ModSockEvent(conn->_sock, true, true);

    cout << "---------------" << endl;

    return 0;
}


int HandlerPro(Connection *conn, string &message)
{
    return HandlerProHelp(conn,message,Calculate);
}

        这样在想处理其他业务的时候,只需要将业务处理函数放入Service.hpp中,然后将源文件中调用就行。

⑥Reactor模式与Proactor模式

        我们今天使用的是Reactor模式。

Reactor模式:

        Linux系统中最常用的反应器模式。

        半同步半异步。

        即负责事件的派发,有否则IO,或者说业务处理。

Proactor模式:

        只负责事件的派发,就绪的事件推送给后台的进程、线程池,不关心处理的细节。

        到这里,epoll服务器的编写,应该就告一段落了,尽管还有数不清的BUG,和没有说清楚的知识点,但是总体还是挺完美的,那么感谢观看,我们下次再见。

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/482180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第二十一章 光源

光源是每个场景必不可少的部分&#xff0c;光源除了能够照亮场景之外&#xff0c;还可以产生阴影效果。 Unity中分为四种光源类型&#xff1a; 1. 方向光&#xff1a;Directional Light 用于模拟太阳光&#xff0c;方向光任何地方都能照射到。 2. 点光源&#xff1a;Point L…

JavaWeb-Servlet【内含思维导图】

目录 Servlet思维导图​编辑 1.什么是Servlet 2.Servelt概述 3.Servlet-Quickstart Your Project 3.1创建一个Web项目&#xff0c;导入Servlet依赖 3.1.1 选择Servlet导入依赖 3.1.2 导入Servlet依赖 3.2 在Web项目&#xff0c;定义类&#xff0c;实现Servlet接口…

Java8新特性-流式操作

在Java8中提供了新特性—流式操作&#xff0c;通过流式操作可以帮助我们对数据更快速的进行一些过滤、排序、去重、最大、最小等等操作并且内置了并行流将流划分成多个线程进行并行执行&#xff0c;提供更高效、快速的执行能力。接下来我们一起看看Java8为我们新增了哪些便捷呢…

Python基础合集 练习19(类与对象3(多态))

多态 class Horse: def init(self, name) -> None: self.name name def fature(self):return 父亲-----马的名字: {0}.format(self.name)def mover(self):print(马儿跑起来很潇洒)class Monkey: def init(self, name) -> None: self.name name def fature(self):ret…

《用于准确连续非侵入性血压监测的心跳内生物标志物》阅读笔记

目录 0 基础知识 1 论文摘要 2 论文十问 3 实验结果 4 论文亮点与不足之处 5 与其他研究的比较 6 实际应用与影响 7 个人思考与启示 参考文献 0 基础知识 非侵入性是指在进行医学检查或治疗时&#xff0c;不需要切开皮肤或穿刺体内组织&#xff0c;而是通过外部手段进…

【VQGAN论文精读】Taming Transformers for High-Resolution Image Synthesis

【VQGAN论文精读】Taming Transformers for High-Resolution Image Synthesis 0、前言Abstract1. Introduction2. Related Work3. Approach3.1. Learning an Effective Codebook of Image Constituents for Use in Transformers学习一个有效的图像成分的Codebook为了在Transfor…

高性能:负载均衡

目录 什么是负载均衡 负载均衡分类 服务端负载均衡 服务端负载均衡——软硬件分类 服务端负载均衡——OSI模型分类 客户端负载均衡 负载均衡常见算法 七层负载均衡做法 DNS解析 反向代理 什么是负载均衡 将用户请求分摊&#xff08;分流&#xff09; 到不同的服务器上…

小记Java调用C++开发的动态链接库(DLL)

一、背景 五一快乐吖&#xff01;死肥宅正趁着五一这段时间&#xff0c;努力提升自己&#xff01; 最近使用Java拦截Windows系统中一些默认事件时&#xff0c;发现了一些瓶颈。 我用Java操作浏览器、用Java最小化其他应用窗口&#xff0c;但是我发现这个操作&#xff0c;他都…

【Unity-UGUI控件全面解析】| InputField 输入框组件详解

🎬【Unity-UGUI控件全面解析】| InputField 输入框组件详解一、组件介绍二、组件属性面板2.1 Content Type(内容类型)三、代码操作组件四、组件常用方法示例4.1 代码限制输入字符4.2 校验文本输入格式4.3 校验输入文本长度💯总结🎬 博客主页:https://xiaoy.blog.csdn.…

话说【永恒之塔sf】里面最有前途的职业:商人

如果有人问我永恒之塔里面什么职业最有前途&#xff01;那我告诉你就是商人&#xff01; 做一个NB商人比拥有一身牛b装备要更有成就感。 在老区由于进入的比较晚&#xff0c;所以最后随了大流被淹死在千万基纳中。为了证明商人在永恒之塔是钱途无量的&#xff0c;我转到了新区—…

快解析动态域名解析,实现外网访问内网数据库

今天跟大家分享一下如何借助快解析动态域名解析&#xff0c;在两种特定网络环境下&#xff0c;实现外网访问内网mysql数据库。 第1种网络环境&#xff1a;路由器分配的是动态公网IP&#xff0c;且有路由器登录管理权限。如何实现外网访问内网mysql数据库&#xff1f; 针对这种…

IDEA2022版教程上()

0、前景摘要 0.1 概览 0.2 套课程适用人群 初学Java语言&#xff0c;熟悉了记事本、EditPlus、NotePad或Sublime Text3等简易开发工具的Java初学者熟练使用其他Java集成开发环境&#xff08;IDE&#xff09;&#xff0c;需要转向IDEA工具的Java工程师们关注IDEA各方面特性的J…

Hadoop大数据分析技术(伪分布式搭建)

一.安装JDK和配置SSH免密登录 &#xff08;1&#xff09;准备软件 &#xff08;2&#xff09;解压压缩包 tar -zxvf jdk-8u221-linux-x64.tar.gz &#xff08;3&#xff09;在此处我们配置系统环境变量&#xff0c;使用命令&#xff1a; vim /etc/profile &#xff08;4&#x…

Python入门教程(高级版)

Python用了好几年了&#xff0c;但似乎一直没 “系统入门” 过&#xff08;o(╯□╰)o&#xff09;。今年&#xff08;2023年&#xff09;趁着五一假期&#xff0c;我做了一次相对完整的 “入门” ——本文是这次学习历程的详细记录。 目录 1 Python基础1.1 Python1.1.1 认识Py…

Oracle VM VirtualBox安装centos7步骤 for win10

目录 1.安装VirtualBox 2.安装vagrant 3.安装centos7 4.查看网络与百度和物理机连通情况 5.设置IP 1.安装VirtualBox 下载的链接:Downloads – Oracle VM VirtualBox 2.安装vagrant 根据自己的操作系统选择对应的版本。 Install | Vagrant | HashiCorp Developer 我的P…

asp.net+sqlserver旅游网站zjy99A2

1&#xff0e;系统登录&#xff1a;系统登录是用户访问系统的路口&#xff0c;设计了系统登录界面&#xff0c;包括用户名、密码和验证码&#xff0c;然后对登录进来的用户判断身份信息&#xff0c;判断是管理员用户还是普通用户。 2&#xff0e;系统用户管理&#xff1a;不管是…

redis使用总结

目录 redis安装与登录redis 持久化RDB(Redis DataBase)AOF(Append Only File)RDB-AOF混合持久纯缓存模式 redis 的 keyredis 的数据类型和常见应用场景StringListHashMapSet集合ZSet有序集合bitmap位图HyperLogLog基数统计GEO 地理空间Stream 流bitfiled redis 事务事务的正常执…

《基于EPNCC的脉搏信号特征识别与分类研究》阅读笔记

目录 一、论文摘要 二、论文十问 三、论文亮点与不足之处 四、与其他研究的比较 五、实际应用与影响 六、个人思考与启示 参考文献 一、论文摘要 为了快速获取脉搏信号的完整表征信息并验证脉搏信号在相关疾病临床诊断中的敏感性和有效性。在本文中&#xff0c;提出了一…

ChatGPT根据销售数据、客户反馈、财务报告,自动生成报告,并根据不同利益方的需要和偏好进行调整?

该场景对应的关键词库&#xff08;24个&#xff09;&#xff1a; 汇报对象身份&#xff08;下属、跨部门平级、领导&#xff09;、销售数据&#xff08;销售额、销售量、销售渠道&#xff09;、财务报告&#xff08;营业收入、净利润、成本费用&#xff09;、市场分析&#xf…

Vulkan实战之验证层

文章目录 验证层是什么&#xff1f;使用验证层消息回调调试实例的创建和销毁测试配置最终代码 验证层是什么&#xff1f; Vulkan API是围绕最小化驱动程序开销的想法设计的&#xff0c;该目标的表现之一是默认情况下API中的错误检查非常有限。即使是像将枚举设置为不正确的值或…