【Linux网络编程】IO多种转接之Reactor

news2024/11/17 17:38:46

Reactor

在这里插入图片描述

点赞👍👍收藏🌟🌟关注💖💖
你的支持是对我最大的鼓励,我们一起努力吧!😃😃

基于上一篇epoll的学习,现在我们也知道epoll的工作模式有两种,一种默认LT工作模式,另一种是ET模式。关于epoll的LT工作模式我们已经写过了。接下来我们写一份基于ET模式下的Reator,处理所有的IO。

Reactor = 如何正确的处理IO+协议定制+业务逻辑处理

下面我们写一个简洁版的Reactor,它是一个半同步半异步IO,具体它什么原理,怎么做的,有什么特征。我们在代码层面上解开它的面纱。代码写完总结就理解了。其实Reactor是在Liunx网络中,最常用,最频繁的一种网络IO设计模式!

我们是这样打算的,对错误码,日志函数,套接字,epoll做封装然后在写服务器的时候用到的时候调用即可。错误码,日志函数,套接字以前我们封装过今天直接用就行了。

错误码封装

#pragma once

enum
{
    USAGG_ERR = 1,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR,
    EPOLL_CREATE_ERR
};

日志函数封装

#pragma once

#include<iostream>
#include<string>
#include<stdio.h>
#include <cstdarg>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include<fstream>

#define DEBUG  0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4

#define LOG_NORMAL "log.txt"
#define LOG_ERR "log.error"

const char* level_to_string(int level)
{
    switch(level)
    {
        case DEBUG: return "DEBUG";
        case NORMAL: return "NORMAL";
        case WARNING: return "WARNING";
        case ERROR: return "ERROR";
        case FATAL: return "FATAL";
    }
}

//时间戳变成时间
char* timeChange()
{
    time_t now=time(nullptr);
    struct tm* local_time;
    local_time=localtime(&now);

    static char time_str[1024];

    snprintf(time_str,sizeof time_str,"%d-%d-%d %d-%d-%d",local_time->tm_year + 1900,\
                    local_time->tm_mon + 1, local_time->tm_mday,local_time->tm_hour, \
                    local_time->tm_min, local_time->tm_sec);

    return time_str;
}



void logMessage(int level,const char* format,...)
{
    //[日志等级] [时间戳/时间] [pid] [message]
    //[WARNING] [2024-3-21 10-46-03] [123] [创建sock失败]
#define NUM 1024
    //获取时间
    char* nowtime=timeChange();
    char logprefix[NUM];
    snprintf(logprefix,sizeof logprefix,"[%s][%s][pid: %d]",level_to_string(level),nowtime,getpid());

    //
    char logconten[NUM];
    va_list arg;
    va_start(arg,format);
    vsnprintf(logconten,sizeof logconten,format,arg);

    
    std::cout<<logprefix<<logconten<<std::endl;  
};

今天这里我们把套接字封装改一下,以前把它相关接口都写成静态的了,不需要对象直接调用了。今天呢,都改成非静态的,未来这个类提供的方法都要以对象调用访问。

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Err.hpp"
#include "Log.hpp"

using namespace std;

class Sock
{
    const static int backlog = 32;
    const static int defaultsock = -1;

public:
    Sock(int sock = defaultsock) : _listensock(sock)
    {}

    ~Sock()
    {
        if (_listensock != defaultsock)
            close(_listensock);
    }

public:
    int sock()
    {
        // 1. 创建socket文件套接字对象
        _listensock= socket(AF_INET, SOCK_STREAM, 0);
        if (_listensock < 0)
        {
            logMessage(FATAL, "create socket error");
            exit(SOCKET_ERR);
        }
        logMessage(NORMAL, "create socket success: %d", _listensock);


        int opt = 1;
        setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
    }

	//方便外面获取_listensock
    int Fd()
    {
        return _listensock;
    }

    void Bind(int port)
    {
        // 2. bind绑定自己的网络信息
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;
        if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            logMessage(FATAL, "bind socket error");
            exit(BIND_ERR);
        }
        logMessage(NORMAL, "bind socket success");
    }

    void Listen()
    {
        // 3. 设置socket 为监听状态
        if (listen(_listensock, backlog) < 0)
        {
            logMessage(FATAL, "listen socket error");
            exit(LISTEN_ERR);
        }
        logMessage(NORMAL, "listen socket success");
    }

    int Accept(string *clientip, uint16_t *clientport,int* err)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
        *err=errno;
        if (sock < 0){}
            //logMessage(ERROR, "accept error, next");
        else
        {
            //logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
            *clientip = inet_ntoa(peer.sin_addr);
            *clientport = ntohs(peer.sin_port);
        }
        return sock;
    }

private:
    int _listensock;
};

对于一个epoll来说,首先要先给epoll创建出来,然后epoll的接口都要用到epoll创建成功的返回值。所以直接在这个epoll类中定义个成员变量直接就把创建epoll成功的返回值拿到。然后服务器不需要直接拿着这个返回值,而直接调用这个类对象使用里面的提供的接口就行了。epoll这里先写一个大体框架,后面需要什么了再加

#pragma once

#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#include "Err.hpp"
#include "Log.hpp"

using namespace std;

const int defaultepfd = -1;
const int size = 128;

class Epoller
{

public:
    Epoller(int epfd = defaultepfd) : _epfd(epfd)
    {
    }

    ~Epoller()
    {
        if (_epfd != defaultepfd)
            close(_epfd);
    }

public:
   
private:
    int _epfd;//创建epoll返回值
};

目前调用逻辑,后面在加内容

#include "TcpServer.hpp"
#include "Err.hpp"
#include <memory>

static void usage(std::string proc)
{
    std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        usage(argv[0]);
        exit(USAGG_ERR);
    }

    uint16_t port=atoi(argv[1]);
    std::unique_ptr<TcpServer> uls(new TcpServer(port));
    uls->initServer();
    uls->Dispatch();

    return 0;
}

服务器大体框架

#pragma once

#include <iostream>
#include "Sock.hpp"
#include "Err.hpp"
#include "Log.hpp"
#include "Epoller.hpp"

const int defaultport = 8080;

class TcpServer
{

public:
    TcpServer(int port = defaultport) : _port(port)
    {
    }

    ~TcpServer()
    {
    }

    void initServer()
    {  
    }

    void Dispatch()//事件派发
    {
    }
public:


private:
    uint16_t _port;
    Sock _sock;
    Epoller _epoll;
};

接下来就是编写服务器了

初始化服务器

void initServer()
{  
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
}

创建套接字之后,我们要在创建一个epoll模型,因此我们在Epoller写一个创建epoll模型的接口,然后服务器直接调用就行了

```cpp
void Create()
{
    _epfd = epoll_create(size);
    if (_epfd < 0)
    {
        logMessage(FATAL, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));
        exit(EPOLL_CREATE_ERR);
    }
    logMessage(NORMAL, "epoll create success, epfd: %d", _epfd);
}
void initServer()
{  
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();
    
	// 3.将目前唯一的一个sock,添加到epoll中
}

创建好套接字,epoll模型接下来我们先把_listensock套接字添加到epoll里,看看这样写有没有什么问题

不过先在Epoller类中补充一个用户告诉内核你要帮我关心对应fd的什么事件。

// user -> kernel
bool AddEvents(int sock, uint32_t event)
{
    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = sock;
    int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
    if (n < 0)
    {
        logMessage(ERROR, "sock join epoll fail");
        return false;
    }
    return true;
}

写好后我们直接调用就好了

void initServer()
{  
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3.将目前唯一的一个sock,添加到epoll中
    _epoll.AddEvents(_sock.Fd(),EPOLLIN | EPOLLET);
}

但是现在有小一个问题,前面说过的,如果未来你的套接字工作模式是ET模式,那么该套接字必须处于非阻塞,_listensock套接字创建处理默认就是阻塞的,因此我们需要将文件描述符设为非阻塞

我们在高级IO的非阻塞IO哪里就已经写过一个将一个fd变成非阻塞了,现在拿过来用就行了,不过我们也还是把它封装起来。然后调用。

#include <iostream>
#include <unistd.h>
#include <fcntl.h>

using namespace std;

class Util
{
public:
    static bool SetNonBlock(int fd)
    {
        int fl = fcntl(fd, F_GETFL);
        if (fl < 0)
            return false;
        fcntl(fd, F_SETFL, fl | O_NONBLOCK);
        return true;
    }
};

到目前为止好像貌似没问题了,但是真的吗?

void initServer()
{  
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
    Util::SetNonBlock(_sock.Fd());
    _epoll.AddEvents(_sock.Fd(),EPOLLIN | EPOLLET);
}

我们回过头看看之前写过的epoll服务器的代码,这里处理普通sock就绪事件的问题,你怎么保证你把本轮收到的数据都读完?即使把本轮数据都读完了,就一定能够读到一个完整的请求吗?即使未来在这里循环读取,可以反正我们非阻塞也都写过。那就能保证读到一个完整请求吗?
不一定!

那没读到完整的请求我们是不是只能把本轮读到的数据暂时保存到buffer里,可是暂时保存到buffer里,你保存了,那别人怎么办? 你这个sock和其他sock说你们别着急先别覆盖我的数据,buffer里面保存的是我的数据,你们先不要写。你想多了!

你读完之后整个这个代码区间就全部释放掉了,因为这是栈上面的空间。所以即便你没读完,或者你把这轮读完了。那么下轮在读,buffer早就释放了。你可能本轮读到后半部分但前半部分已经没有了,可能不到下次,下一个循环进来就给你清空了。

在这里插入图片描述
所以怎么样进行正确读写,光有一个套接字和定义一个栈上的缓存区远远不够!

所以我们需要给每一个文件描述符,都要把输入,输出用户层缓存区都要给它带上!

现在我们清楚知道历史代码的问题,我们要正确写整个服务,现在已经不够了。我们需要将每一个套接字进行封装,每一个套接字都要包含自己对应的输入,输出缓存区空间,只有每一个套接字都有自己对应的缓存空间,读取的时候把数据读取赞存在自己对应的缓存区中,没读完下次在读,这时候在添加到我自己的缓存区里,不就行了吗,这个时候谁和谁都不揉在一起。所以我们在封装一个类! 也就是说我认为未来每一个套接字都看类对象。

class Connection
{
public:
    Connection(int sock = defaultport) : sock_(sock)
    {
    }

    ~Connection()
    {
        if (sock_ == defaultsock)
            close(sock_);
    }

public:
    int sock_;//这个类对应的套接字是谁
    string inbuffer_;//输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了                                                                                                                     
    string outbuffer_;//输出缓存区 ,你并不能保证你的写事件就绪
    
}

针对于上面未来每一个Connection对象,因为每一个套接字未来面对的都有自己的读方法、写方法、异常方法,所以在Connection类中,对一个套接字来讲要提供三个回调方法,分别对应读方法、写方法,异常方法。

const int defaultport = 8080;
const int defaultsock = -1;

using func_t = function<void(Connection *)>;

class Connection
{
public:
    Connection(int sock = defaultport) : sock_(sock)
    {
    }

    ~Connection()
    {
        if (sock_ == defaultsock)
            close(sock_);
    }

public:
    int sock_;  //这个类对应的套接字是谁
    string inbuffer_;  //输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了                                                                                                                     
    string outbuffer_;  //输出缓存区 ,你并不能保证你的写事件就绪

    func_t recver_;  //从sock_读
    func_t sender_;  //向sock_写
    func_t excepter_;  //处理sock_,IO的时候上面的异常事件
};

所以在初始服务器写的,创建套接字,创建epoll模型,然后将_listensock套接字添加到epoll中,那未来是不是更多的套接字会被accept上来,我们把每一个套接字都看成Connection对象,所以未来整个服务器是不是存在非常多的Conncetion对象。同时_listensock 也是一个sock啊,也要看作成为一个 Connection对象,虽然_listensock不用所谓的输入输出缓存区,包括回调方法最多也是用一个 recver _,但是在我看来_listensock也是一个Connection。 所以刚才服初始服务器哪里写_listensock就有不合适了。并且未来每一个sock都是Connection对象,所以服务器要不要把这些多的Connection对象管理起来呢? 当然要了! 那服务器怎么管理?

先描述,在组织! ,我们管理起来了吗,管理起来了,Connection就是。怎么组织呢?既然每一个Connection对象都有一个对应的套接字,那我们就是哈希表管理。

class TcpServer
{

public:
	//...

private:
    uint16_t _port;

    Sock _sock;
    Epoller _epoll;
    unordered_map<int,Connection*> _connections;//所有链接集合
};

而且这里的代码仅仅是简单的将套接字添加到epoll中,未来我们不仅想将套接字添加到epoll中,并且还想将每一个Connectoin对象添加unordered_map里。在这里插入图片描述

为什么unordered_map我们使用的是套接字做key,因为将来在epoll中一旦有fd就绪了,我们知道是什么事件就绪,也知道是那个fd就绪。知道之后我们就可以在unordered_map根据文件描述符快速的找到对应套接字的Connection对象,然后输入,输出缓存区也就有了。

在这里插入图片描述

所以在写一个添加链接的函数,作为一个服务器收到一个套接字,它就需要添加一个链接。

void AddConnection(int sock, uint32_t events)
{
    // 1. 首先要为该sock创建Connection,并初始化,并添加到connections_


    // 2. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
    bool r = _epoll.AddEvents(sock, events);
    assert(r);
    (void)r;
    
}

剩下的等会再说,所以当你有了新的套接字的时候,不应该是把套接字直接添加到epoll里,而应该是进行AddConnection。_listensock也是如此

void initServer()
{
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
    // Util::SetNonBlock(_sock.Fd());
    // _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);
    AddConnection(_sock.Fd(),EPOLLIN|EPOLLET);
}

void AddConnection(int sock, uint32_t events)
{
   		// 1.设置非阻塞,ET模式fd要非阻塞
        if (events & EPOLLET)
            Util::SetNonBlock(sock);

        // 2. 该sock创建Connection,并初始化,并添加到connections_
        Connection *conn = new Connection(sock);

        // 3.

        // 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
        bool r = _epoll.AddEvents(sock, events);
        assert(r);
        (void)r;

        // 5. 将kv添加到connections_
        _connections.insert(make_pair(sock, conn));

        logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
}

这样写还没完,未来一旦有fd就绪了,然后就可以在unordered_map中根据fd找到对应的Connection,找到之后我们是不是要执行对应的读,写,异常方法,所以任何一个Connection内方法能够被调用,因此Connection类在提供一个给每一个Connection对象注册读,写,异常方法,

这样写的意思是,未来这个AddConnection接口不仅可以被用来注册_listensock套接字,也可以用来被注册一般文件描述符,一般文件描述符也可能关心读,也可以关心写,也可能关心异常。或者只关心读,只关心写等等情况,所以当我们每个套接字注册一个Connection对象,你怎么知道这个套接字未来要执行什么方法呢 ,所以我们需要Connection提供一个注册方法。
在这里插入图片描述

class Connection
{
public:
    Connection(int sock = defaultport) : sock_(sock)
    {
    }

    ~Connection()
    {
        if (sock_ == defaultsock)
            close(sock_);
    }

    void Rigster(func_t r, func_t s, func_t e)
    {
        recver_ = r;
        sender_ = s;
        excepter_ = e;
    }

public:
    int sock_;         // 这个类对应的套接字是谁
    string inbuffer_;  // 输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了
    string outbuffer_; // 输出缓存区 ,你并不能保证你的写事件就绪

    func_t recver_;   // 从sock_读
    func_t sender_;   // 向sock_写
    func_t excepter_; // 处理sock_,IO的时候上面的异常事件
};

所以当使用AddConnection的时候,注册文件描述符到epoll的时候,既要有要关心的fd,还要有关心fd上的什么事件,这两个字段是你要告诉epoll,当fd就绪时你想怎么处理这个fd,所以还要再给三个参数,告诉这个fd就绪时处理什么方法。

在这里插入图片描述

void AddConnection(int sock, uint32_t events,func_t recver, func_t sender, func_t excepter)
{
    // 1.设置非阻塞,ET模式fd要非阻塞
    if (events & EPOLLET)
        Util::SetNonBlock(sock);

    // 2. 该sock创建Connection,并初始化,并添加到connections_
    Connection *conn = new Connection(sock);

    // 3. 给对应的sock设置对应的回调方法
    conn->Rigster(recver, sender, excepter);

    // 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
    bool r = _epoll.AddEvents(sock, events);
    assert(r);
    (void)r;

    // 5. 将kv添加到connections_
    _connections.insert(make_pair(sock, conn));

    logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
}

所以当我们重新审视初始化服务器的代码,创建套接字,创建epoll模型,把_listensock套接字添加到epoll里,对于_listensock套接字我们也要设置对应的方法,它就绪时想怎么读,怎么写,怎么处理异常。不过对于_listensock套接字它只关心读,所以这里我们先给它一个未来要写的从_listensock获取链接的方法,不关心写和处理异常,直接设置为nullptr就行了。

不过这里还有一个细节,我们因为读、写、异常是我们使用的是包装器function,如何让它的对象调用一个类内的非静态成员函数会比较麻烦,因为类内的非静态成员函数内隐藏了一个this指针。这样的话function的参数还要添加一个,然后调用的话还需要再传一个这个类对象过去。但是我们在C++学过bing绑定,可以固定参数,这样以后调用这个成员函数直接就帮我自动传这个参数了,对这里不懂得可以看这里【C++】C++11中

void Accepter(Connection* conn)
{
}

void initServer()
{
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
    // Util::SetNonBlock(_sock.Fd());
    // _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);
    AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,
                  bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);
}

未来一旦设置到epoll里fd上有事件就绪了,然后就根据就绪的fd在unordered_map找到对应的Conncetion,然后调用Conncetion曾经注册的对应的方法来进行事件派发,所以接下来我们要写事件派发的接口,让epoll_wait帮我们获取就绪事件,这里有很多写法,不过我们还是不写在一块,多写一个Loop函数然后进行一次事件派发,Dispatch就是循环派发

// 事件派发器
void Dispatch()
{
    int timeout = -1000;
    while (true)
    {
        Loop(timeout);
    }
}


void Loop(int &timeout)
{
   _epoll.Wait()// 获取已经就绪的事件
}        

所以在Epoller再来一个接口,我们直接调用就行了,这里我们需要一个拷贝数组,数组大小,和等待的方式

// kernel -> user
int Wait(struct epoll_event *ets, int num, int timeout)
{
    int n = epoll_wait(_epfd, ets, num, timeout);
    switch (n)
    {
    case 0:
        logMessage(NORMAL, "timeout ...");
        break;
    case -1:
        logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));
        break;
    default:
        logMessage(NORMAL, "have event ready");
        break;
    }
    return n;
}

epoll_wait这里注定了我们一次可能会捞取许多就绪事件,所以我们要在服务器中在定义一个保存所有就绪事件的数组,和数组大小然后调用_epoll.Wait()的时候传过去

const int defaultnum = 128;

class TcpServer
{

public:
    TcpServer(int port = defaultport,int num = defaultnum) : _port(port),_revents(nullptr),_num(num)
    {
    }

	

private:
    int _port;

    Sock _sock;
    Epoller _epoll;
    unordered_map<int, Connection *> _con;
    struct epoll_event *_revents;
    int _num;
};

初始服务器的时候,我们就把数组申请好

void initServer()
{
    // 1.创建套接字
    _sock.sock();
    _sock.Bind(_port);
    _sock.Listen();

    // 2.创建epoll模型
    _epoll.Create();

    // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
    // Util::SetNonBlock(_sock.Fd());
    // _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);
    AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,
                  bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);

    // 4.拷贝数组
    _revents = new struct epoll_event[_num];
}

然后接下来就可以进行事件派发了

void Loop(int &timeout)
{
    int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

    for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
    {
        uint32_t event = _revents[i].events; // 就绪的事件
        int sock = _revents[i].data.fd;      // 就绪事件的fd

        // 事件派发
        if ((event & EPOLLIN) && sock == _sock.Fd()) // listensock读就绪
            _connections[sock]->recver_(_connections[sock]);
        if ((event & EPOLLIN) && sock != _sock.Fd()) // 普通sock读就绪
            _connections[sock]->recver_(_connections[sock]);
}

我们注意到不管是_listensock套接字还是普通sock套接字读就绪,它们的处理方法都是一样的,因此我们把它们写在一块。只要是读事件事件,我们都根据文件描述去执行对应的读方法,未来设怎么读都由自己曾经设置对应recver_来定

void Loop(int &timeout)
{
    int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

    for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
    {
        uint32_t event = _revents[i].events; // 就绪的事件
        int sock = _revents[i].data.fd;      // 就绪事件的fd

        // 事件派发

        // if ((event & EPOLLIN) && sock == _sock.Fd()) // listensock读就绪
        //     _connections[sock]->recver_(_connections[sock]);
        // if ((event & EPOLLIN) && sock != _sock.Fd()) // 普通fd读就绪
        //     _connections[sock]->recver_(_connections[sock]);

        if ((event & EPOLLIN))//读事件就绪
            _connections[sock]->recver_(_connections[sock]);
        if ((event & EPOLLOUT))//写事件就绪
            _connections[sock]->sender_(_connections[sock]);
}

这样写还有一些问题,首先我们还要判断一下这个套接字是否在unordered_map真的存在,其次在判断对应套接字的方法是否曾经被设置,不然也可能会存在调用的问题

bool ExitHashSock(int sock)
{
    auto pos = _con.find(sock);
    return pos != _con.end();
}

void Loop(int &timeout)
{
    int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

    for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
    {
        uint32_t event = _revents[i].events; // 就绪的事件
        int sock = _revents[i].data.fd;      // 就绪事件的fd

        // 事件派发


       // _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分
       if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))
           _connections[sock]->recver_(_connections[sock]);
       if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))
           _connections[sock]->sender_(_connections[sock]);
}

这里还有一丢丢小问题,对我们来说我们从来没说过异常的事件,但并不排除在通信的时候存在其他epoll对应的事件

  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;

我们可以这样写,如果当前就绪的事件出现异常了,我们设置进该就绪事件中的读和写去处理。这样的好处是将所有的异常问题,全部转化,成为读写问题 ,本来读写本来就要做异常处理,只要把读写异常处理了,这个异常也就被自动处理了!

void Loop(int &timeout)
{
    int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

    for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
    {
        uint32_t event = _revents[i].events; // 就绪的事件
        int sock = _revents[i].data.fd;      // 就绪事件的fd

		//将所有的异常问题,全部转化,成为读写问题
        if (event & EPOLLERR)
            event | EPOLLIN | EPOLLOUT;
        if (event & EPOLLHUP)
            event | EPOLLIN | EPOLLOUT;

       // _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分
       if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))
           _connections[sock]->recver_(_connections[sock]);
       if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))
           _connections[sock]->sender_(_connections[sock]);
}

因为我们目前就一个_listensock套接字,所以到目前为止有事件就绪就是_listensock套接字就绪,也就是这里只会执行_listensock套接字设置的读事件也就是我们曾经给_listensock套接字绑定的Accepter函数来获取新链接。

void Accepter(Connection *conn)
{
       string clientip;
       uint16_t clientport;
       int err = 0;//获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了
       int sock = _sock.Accept(&clientip, &clientport, &err);
       if (sock > 0)
       {
           // 新的sock套接字添加到AddConnetions
           AddConnetions(sock, EPOLLIN | EPOLLET,);
       }
}

对于未来获取到的普通文件描述符,它还要有对应的事件就绪时我们需要怎么处理这个套接字,所以我们需要设置一个对普通文件描述符就绪时它的处理读,写,异常的方法

void recver(Connection *conn)
{
}

void sender(Connection *conn)
{
}

void excepter(Connection *conn)
{
}

void Accepter(Connection *conn)
{
       string clientip;
       uint16_t clientport;
       int err = 0;//获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了
       int sock = _sock.Accept(&clientip, &clientport, &err);
       if (sock > 0)
       {
           // 新的sock套接字添加到AddConnetions
           AddConnetions(sock, EPOLLIN | EPOLLET,
                              bind(&TcpServer::recver, this, placeholders::_1),
                              bind(&TcpServer::sender, this, placeholders::_1),
                              bind(&TcpServer::excepter, this, placeholders::_1));
       }
}

现在有一个问题时,当时_listesock套接字注册的时候也是ET模式的,并且也设置成非阻塞的了,所以通知的时候只会通知_listesock套接字读就绪一次,Accepter调用也就一次。

在这里插入图片描述

也就是说你能保证底层到来的链接就一个吗?不能!你并不能保证此时正在处理_listesock套接字获取新连接时有几个到来了,有可能同时到来了非常多的连接,所以倒逼程序员必须把所有到达的连接全部都读上来,所有上面Accepet的写法是不对的,必须要循环读取全部都读上来!而且我们早把_listesock设置为非阻塞了,底层没有连接了也不会被阻塞,虽然返回值是-1,但是错误码被设置!可能是底层没错误了错误码被设为EAGAIN 或EWOULDBLOCK,或者读取时被信号中断了错误码被设为EINTR,或者真错误了!

void Accepter(Connection *conn)
{
    while (true)
    {
        string clientip;
        uint16_t clientport;
        int err = 0; // 获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了
        int sock = _sock.Accept(&clientip, &clientport, &err);
        if (sock > 0)
        {
            // 新的sock套接字添加到AddConnetions
            AddConnection(sock, EPOLLIN | EPOLLET,
                          bind(&TcpServer::recver, this, placeholders::_1),
                          bind(&TcpServer::sender, this, placeholders::_1),
                          bind(&TcpServer::excepter, this, placeholders::_1));

            logMessage(DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);
        }
        else
        {
            if (err == EAGAIN || err == EWOULDBLOCK)
                break;
            else if (err == EINTR)
                continue;
            else
                break;
        }
    }
}

上面处理好了,接下来就是处理普通套接字就绪事件了,假设现在读就绪了,我们先处理读事件。怎么读?注定了也要循环读取!

其次你即使把本来数据全部读完就能保证是一个完整报文吗?不能!不是就没有办法向上层交,也就意味着没有办法对它进行处理,此时我们就先把读到的数据放到对应fd的Connection对象的输入缓存区里

void recver(Connection *conn)
{
	   while (true)
	   {
	       char buffer[1024];
	       ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);
	       if (s > 0)//读取成功
	       {
	           buffer[s] = 0;
	           conn->inbuffer_ += buffer;
	       }
	       
	    }
}

如果s==0,就说明对方把连接关掉了,那我们是不是应该把对应的文件描述符从epoll中删除,然后关闭文件描述符,然后从unordered_map中移除等等工作。如果写出问题呢?我们的代码是不是会充满大量的异常调用,所以只要我们设置了对异常的处理,然后有问题的执行一次异常方法就行了。如果s == -1,我们内部再判断是非阻塞返回、是被信号中断、还是着出错误了。

void recver(Connection *conn)
{
	while (true)
	{
	   char buffer[1024];
	   ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);
	   if (s > 0)//读取成功
	   {
	       buffer[s] = 0;
	       conn->inbuffer_ += buffer;
	   }           
	   else if (s == 0)
	      {
	          if (conn->excepter_)
	          {
	              conn->excepter_(conn);
	              return;
	          }
	      }
	      else
	      {
	          if (errno == EAGAIN || errno == EWOULDBLOCK)//只是没数据了
	              break;
	          else if (errno == EIDRM)//读取时被信号中断,继续读
	              continue;
	          else//真错误了
	          {
	              if (conn->excepter_)
	              {
	                  conn->excepter_(conn);
	                  return;
	              }
	          }
	      }
	      	       
	}
}

接下来我们处理一下读取到的报文,如果是完整报文就向上交付,不是就继续读取直到是一个完整报文。

那问题来了你怎么知道是一个完整的报文?
还记得以前在写网络版本计数器是怎么处理的吗?没错定制一个协议!!!
我们以前学过,今天拿过来用就行了 协议封装

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <jsoncpp/json/json.h>

#define SEP " "
#define SEP_LEN strlen(SEP)
#define LINE_SEP "\r\n"
#define LINE_SEP_LEN strlen(LINE_SEP)

using namespace std;

// "x op y"  -> "content_len"\r\n"x op y"\r\n
string Enlenth(const string &text)//加报头
{
    string send_string = to_string(text.size());
    send_string += LINE_SEP;
    send_string += text;
    send_string += LINE_SEP;

    return send_string;
}

//"content_len"\r\n"x op y"\r\n  -> "x op y"
bool Delenth(const string &packge, string *text)//删除报头
{
    auto pos = packge.find(LINE_SEP);
    if (pos == string::npos)
        return false;
    string text_len_string = packge.substr(0, pos);
    int text_len = stoi(text_len_string);
    *text = packge.substr(pos + LINE_SEP_LEN, text_len);
    return true;
}

class Request//请求
{
public:
    Request() : _x(0), _y(0), _op(0)
    {
    }

    Request(int x, int y, char op) : _x(x), _y(y), _op(op)
    {
    }

    bool serialize(string *out)//序列化
    {
#ifdef MYSELF
        // 结构化 -> "x op y"
        *out = "";
        string x_string = to_string(_x);
        string y_string = to_string(_y);

        *out += x_string;
        *out += SEP;
        *out += _op;
        *out += SEP;
        *out += y_string;
#else
        Json::Value root;
        root["first"] = _x;
        root["second"] = _y;
        root["oper"] = _op;

        Json::FastWriter write;
        *out = write.write(root);
#endif
        return true;
    }

    bool deserialize(const string &in)//反序列
    {
#ifdef MYSELF
        // "x op y" -> 结构化
        auto left = in.find(SEP);
        auto right = in.rfind(SEP);
        if (left == string::npos || right == string::npos)
            return false;
        if (left == right)
            return false;
        if (right - (left + SEP_LEN) != 1)
            return false;

        string x_string = in.substr(0, left); // [0, 2) [start, end) , start, end - start
        string y_string = in.substr(right + SEP_LEN);

        if (x_string.empty())
            return false;
        if (y_string.empty())
            return false;
        _x = stoi(x_string);
        _y = stoi(y_string);
        _op = in[left + SEP_LEN];
#else
        Json::Value root;
        Json::Reader reader;
        reader.parse(in, root);

        _x = root["first"].asInt();
        _y = root["second"].asInt();
        _op = root["oper"].asInt();

#endif

        return true;
    }

public:
    int _x;
    int _y;
    char _op;
};

class Response//响应
{
public:
    Response() : _exitcode(0), _result(0)
    {
    }

    Response(int exitcode, int result) : _exitcode(exitcode), _result(result)
    {
    }

    bool serialize(string *out)//序列化
    {
#ifdef MYSELF
        // 结构化 -> "_exitcode  _result"
        *out = "";
        *out = to_string(_exitcode);
        *out += SEP;
        *out += to_string(_result);
#else
        Json::Value root;
        root["first"] = _exitcode;
        root["second"] = _result;

        Json::FastWriter write;
        *out = write.write(root);
#endif
        return true;
    }

    bool deserialize(const string &in)//反序列化
    {
#ifdef MYSELF
        //"_exitcode  _result" ->结构化
        auto pos = in.find(SEP);
        if (pos == string::npos)
            return false;

        string ec_string = in.substr(0, pos);
        string res_string = in.substr(pos + SEP_LEN);

        if (ec_string.empty())
            return false;
        if (res_string.empty())
            return false;

        _exitcode = stoi(ec_string);
        _result = stoi(res_string);
#else
        Json::Value root;
        Json::Reader reader;
        reader.parse(in, root);
        _exitcode = root["first"].asInt();
        _result = root["second"].asInt();
#endif
        return true;
    }

public:
    int _exitcode; // 0:计算成功,!0表示计算失败,具体是多少,定好标准
    int _result;   // 计算结果
};

//这里是读取一个完整报文!读到后放到text
bool PartOnepackge(string &inbuffer, string *text)
{
    //"content_len"/r/n"x op y"/r/n

    auto pos = inbuffer.find(LINE_SEP);
    if (pos == string::npos) // 没读到一个完整报文
        return false;
    //"content_len"/r/n"x op y"/r/n"content_len" >= 一个完整报文长度
    string text_len_string = inbuffer.substr(0, pos);
    int text_len = stoi(text_len_string);
    int total_len = text_len_string.size() + 2 * LINE_SEP_LEN + text_len;

    cout << "处理前#inbuffer: \n"
         << inbuffer << std::endl;

    if (inbuffer.size() < total_len) // 也没有读到一个完整报文
    {
        cout << "你输入的消息,没有严格遵守我们的协议,正在等待后续的内容, continue" << endl;
        return false;
    }

    // 至少有一个完整的报文
    *text = inbuffer.substr(0, total_len); // 读到一个完整报文
    inbuffer.erase(0, total_len);          // inbuffer内部减去这次读到的一个完整的报文

    cout << "处理后#inbuffer:\n " << inbuffer << endl;
    return true;

}

现在协议定义好了,我们也可以开始对读到的数据进行处理了!这里我们只想让服务器就处理IO事件,因此设置一个回调函数专门处理里面是否读取到一个完整报文,读到一个完整报文就序列化反序列业务处理等等。

using func_t = function<void(Connection *)>;

class TcpServer
{

public:
    TcpServer(func_t f,int port = defaultport, int num = defaultnum) : _service(f),_port(port), _revents(nullptr), _num(num)
    {
    }
	//。。。

private:
    uint16_t _port;

    Sock _sock;
    Epoller _epoll;
    unordered_map<int, Connection *> _connections; // 所有链接集合
    struct epoll_event *_revents;
    int _num;

    func_t _service;//业务处理
};

然后读取一次就交给上层一次,让上层进行处理,而recver专心读取数据就行了。

void recver(Connection *conn)
{
    while (true)
    {
        char buffer[1024];
        ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            buffer[s] = 0;
            conn->inbuffer_ += buffer;
            _service(conn);//处理业务
        }
        else if (s == 0)
        {
            if (conn->excepter_)
            {
                conn->excepter_(conn);
                return;
            }
        }
        else
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                break;
            else if (errno == EIDRM)
                continue;
            else
            {
                if (conn->excepter_)
                {
                    conn->excepter_(conn);
                    return;
                }
            }
        }
    }
}

如果读取到一个完整报文后就对请求就行处理,没有就返回继续读取,直到读取到完整的报文

void calculate(Connection *conn)
{
    string onePackage;
    while (PartOnepackge(conn->inbuffer_, &onePackage))
    {
        string req_str;
        // 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\n
        if (!Delenth(onePackage, &req_str))
            return;

        cout << "去掉报头的正文:\n"
             << req_str << endl;

        // 2. 对请求Request,反序列化
        // 2.1 得到一个结构化的请求对象
        Request req;
        if (!req.deserialize(req_str))
            return;


        // 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑
        // 3.1 得到一个结构化的响应
        Response resp;
        cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!

}

业务处理我们也写过直接用

void cal(const Request &req, Response &resp)
{
     req已经有结构化完成的数据啦,你可以直接使用
    resp._exitcode = OK;
    resp._result = OK;

    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if (req._y == 0)
            resp._exitcode = DIV_ERR;
        else
            resp._result = req._x / req._y;
    }
    break;
    case '%':
    {
        if (req._y == 0)
            resp._exitcode = MOD_ERR;
        else
            resp._result = req._x % req._y;
    }
    break;
    default:
        resp._exitcode = OPER_ERR;
        break;
    }
}

计算处理好了我们对响应序列化,得到一个响应的报文,接下来就是给对方发回去

void calculate(Connection *conn)
{
    string onePackage;
    while (PartOnepackge(conn->inbuffer_, &onePackage))
    {
        string req_str;
        // 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\n
        if (!Delenth(onePackage, &req_str))
            return;

        cout << "去掉报头的正文:\n"
             << req_str << endl;

        // 2. 对请求Request,反序列化
        // 2.1 得到一个结构化的请求对象
        Request req;
        if (!req.deserialize(req_str))
            return;


        // 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑
        // 3.1 得到一个结构化的响应
        Response resp;
        cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!


        // 4.对响应Response,进行序列化
        // 4.1 得到了一个"字符串"
        string resp_str;
        if (!resp.serialize(&resp_str))
            return;

        // 5 构建成为一个完整的报文
       string send_string= Enlenth(resp_str);
       cout << "--------------result: " << send_string << endl;
		
	   //发回去
    }

}

现在问题是,在epoll中能不能直接发送呢??
不能!用户无法保证发送条件是就绪的!谁最清楚??epoll!!
也就是说不光读事件要添加到epoll中,写事件是否就绪也要添加到epoll里。
什么叫做写发送事件就绪呢?
发送缓存区有空间!

服务器刚开始启动,或者很多情况下,发送事件是一直就绪的!可以直接发送!
只不过,如果我们没有发完怎么办?那就需要下一次发送,这里也就要求每一个sock都要有自己的发送缓存区!

所以根据上面的分析,大部分情况发送缓存区都是有空间的,所以其实可以直接发,有人说直接发那我就循环发,是的可以循环发,如果没发完非阻塞就直接返回错了,下次发就行了,可是下次从哪里发呢,所以就需要sock有自己的发送缓存区。

因为大部分情况下发送缓存区都是就绪的,所以直接发,没发完没关系下次在发,可是下次是什么时候,这次没发完不就是用户从缓存区数据量太大把发送缓冲区打满了,那下次发是不是就要等底层发送缓存区事件就绪我再发。所以注定你要将sock的EPOLLOUT事件也要有时候注册进epoll ,让epoll通知我,如果底层发事件就绪了我再发。现在问题是是什么时候注册?在新sock注册到epoll里的时候就开始关心写了可不可以?

在这里插入图片描述

未来事件派发一旦写事件就绪了自动调用sender方法

在这里插入图片描述

看起来很完美,但是一般读事件对于epoll我们要常设,写事件的关心对于epoll我们要按需设置!

也就是说当你需要发的时候,确实发送没发完,你在想办法在epoll中设置一下,让epoll关心对于fd的写事件! 刚开始设置sock如果你真的需要关心写事件你才需要设置写事件,不要把写事件常设到epoll中,因为你把它常设了,epoll就会一直在返回,因为每一个文件描述符的发送缓存区大概率都是就绪的,就一直的返回,你的处理逻辑就一直在消耗cpu资源,这样没必要,所以写事件我们按序设置。

所以我们的结论就是,因为是ET模式可以直接发送,发完就ok,没发完就让epoll关心一下对应的写事件的关心,所以我们需要常设EPOLLIN的关心,对EPOLLOUT按需设置!

然后这里呢,可能一次发不完,为了下次继续发送,所以我们把response序列化的之后的结果放到该fd的输出缓存区,以供下次发送。然后调用对应fd的读方法把数据发出去。

void calculate(Connection *conn)
{
    string onePackage;
    while (PartOnepackge(conn->inbuffer_, &onePackage))
    {
        string req_str;
        // 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\n
        if (!Delenth(onePackage, &req_str))
            return;

        cout << "去掉报头的正文:\n"
             << req_str << endl;

        // 2. 对请求Request,反序列化
        // 2.1 得到一个结构化的请求对象
        Request req;
        if (!req.deserialize(req_str))
            return;


        // 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑
        // 3.1 得到一个结构化的响应
        Response resp;
        cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!


        // 4.对响应Response,进行序列化
        // 4.1 得到了一个"字符串"
        string resp_str;
        if (!resp.serialize(&resp_str))
            return;

        // 5 构建成为一个完整的报文
        conn->outbuffer_ += Enlenth(resp_str);

        std::cout << "--------------result: " << conn->outbuffer_ << std::endl;
        
		// 直接发
	    if (conn->sender_)
	        conn->sender_(conn);
    }

}

调用sender并不能保证数据一次就发完了,因此如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!

void sender(Connection *conn)
{
    while (true)
    {
        ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
        if (s > 0)
        {
            conn->outbuffer_.erase(0, s);
            if (conn->outbuffer_.empty())//发完就退出
                break;
        }
        else
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)//没发完非阻塞退出
                break;
            else if (errno == EINTR)
                continue;
            else
            {
                if (conn->excepter_)
                {
                    conn->excepter_(conn);
                    return;
                }
            }
        }
    }
    
    // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
}

这里我们在设计一个接口,可以打开对于一个文件描述符,是否对它写事件关心,是否对它读事件关心的情况

bool EnableReadWrite(int sock, bool reader, bool writer)
{
    uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;
    //_epoll.Control();
}

Epoller类在设计一个函数,在epoll对对应的fd关心事件进行修改

bool Control(int sock, uint32_t event, int action)
{
    int n = 0;
    if (action == EPOLL_CTL_MOD)
    {
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = sock;
        n = epoll_ctl(_epfd, action, sock, &ev);
    }
    else if (action == EPOLL_CTL_DEL)
    {
        n = epoll_ctl(_epfd, action, sock, nullptr);//删除就不关心任何事件了
    }
    else
        n = -1;
    return n == 0;
}
bool EnableReadWrite(int sock, bool reader, bool writer)
{
    uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;
    _epoll.Control(sock, event, EPOLL_CTL_MOD);
}
void sender(Connection *conn)
{
    while (true)
    {
        ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
        if (s > 0)
        {
            conn->outbuffer_.erase(0, s);
            if (conn->outbuffer_.empty())
                break;
        }
        else
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                break;
            else if (errno == EINTR)
                continue;
            else
            {
                if (conn->excepter_)
                {
                    conn->excepter_(conn);
                    return;
                }
            }
        }
    }
    // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
    if (!conn->outbuffer_.empty()) // 设置关心该fd读
        EnableReadWrite(conn->sock_, true, true);
    else
        EnableReadWrite(conn->sock_, true, false);
}

只要我开启了对写事件的关心,epoll就会帮我关心对应fd的写事件,如果就绪了,就会调用对应的fd的sender,以上就是发送的逻辑。

在这里插入图片描述

如果对方连接出异常了怎么办呢?不管是读出现异常,还是写出现异常!并且在前面我们是所有异常情况全部转化成读写问题,然后所有读写异常我们都是调用excepter进行处理,换句话说所有异常处理都在这里。

我们处理异常逻辑很简单,我们也关闭对应文件描述符。

void excepter(Connection *conn)
{
    logMessage(DEBUG, "Excepter begin");
    //1,先在epoll中删除
    _epoll.Control(conn->sock_, 0, EPOLL_CTL_DEL);
    //2. 然后在unordered_map中删除
     _connections.erase(conn->sock_);
     //3.关闭fd
    close(conn->sock_);

    logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->sock_);

    delete conn;
}

到目前为止所有代码细节我们都写完了,对IO的处理+协议定制+业务逻辑都有,下面验证一下,我们看到能够正常通信,并且还有对异常的处理

在这里插入图片描述
Epoller封装完整代码

#pragma once

#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#include "Err.hpp"
#include "Log.hpp"

using namespace std;

const int defaultepfd = -1;
const int size = 128;

class Epoller
{

public:
    Epoller(int epfd = defaultepfd) : _epfd(epfd)
    {
    }

    ~Epoller()
    {
        if (_epfd != defaultepfd)
            close(_epfd);
    }

public:
    void Create()
    {
        _epfd = epoll_create(size);
        if (_epfd < 0)
        {
            logMessage(FATAL, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));
            exit(EPOLL_CREATE_ERR);
        }
        logMessage(NORMAL, "epoll create success, epfd: %d", _epfd);
    }

    // user -> kernel
    bool AddEvents(int sock, uint32_t event)
    {
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = sock;
        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
        if (n < 0)
        {
            logMessage(ERROR, "sock join epoll fail");
            return false;
        }
        // logMessage(NORMAL, "sock join epoll success");
        return true;
    }

    // kernel -> user
    int Wait(struct epoll_event *ets, int num, int timeout)
    {
        int n = epoll_wait(_epfd, ets, num, timeout);
        switch (n)
        {
        case 0:
            logMessage(NORMAL, "timeout ...");
            break;
        case -1:
            logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));
            break;
        default:
            logMessage(NORMAL, "have event ready");
            break;
        }
        return n;
    }

    bool Control(int sock, uint32_t event, int action)
    {
        int n = 0;
        if (action == EPOLL_CTL_MOD)
        {
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock;
            n = epoll_ctl(_epfd, action, sock, &ev);
        }
        else if (action == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epfd, action, sock, nullptr);
        }
        else
            n = -1;
        return n == 0;
    }

private:
    int _epfd;
};

服务器完整代码

#pragma once

#include <iostream>
#include <unordered_map>
#include <functional>
#include <cassert>
#include "Sock.hpp"
#include "Err.hpp"
#include "Log.hpp"
#include "Epoller.hpp"
#include "Util.hpp"

const int defaultport = 8080;
const int defaultsock = -1;
const int defaultnum = 128;

class Connection;

using func_t = function<void(Connection *)>;

class Connection
{
public:
    Connection(int sock = defaultport) : sock_(sock)
    {
    }

    ~Connection()
    {
        if (sock_ == defaultsock)
            close(sock_);
    }

    void Rigster(func_t r, func_t s, func_t e)
    {
        recver_ = r;
        sender_ = s;
        excepter_ = e;
    }

public:
    int sock_;         // 这个类对应的套接字是谁
    string inbuffer_;  // 输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了
    string outbuffer_; // 输出缓存区 ,你并不能保证你的写事件就绪

    func_t recver_;   // 从sock_读
    func_t sender_;   // 向sock_写
    func_t excepter_; // 处理sock_,IO的时候上面的异常事件
};

class TcpServer
{

public:
    TcpServer(func_t f, int port = defaultport, int num = defaultnum) : _service(f), _port(port), _revents(nullptr), _num(num)
    {
    }

    ~TcpServer()
    {
        if (_revents)
            delete[] _revents;
    }

    void initServer()
    {
        // 1.创建套接字
        _sock.sock();
        _sock.Bind(_port);
        _sock.Listen();

        // 2.创建epoll模型
        _epoll.Create();

        // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
        // Util::SetNonBlock(_sock.Fd());
        // _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);
        AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,
                      bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);

        // 4.拷贝数组
        _revents = new struct epoll_event[_num];
    }

    // 事件派发
    void Dispatch()
    {
        int timeout = -1000;
        while (true)
        {
            Loop(timeout);
        }
    }

private:
    bool IsConnectionExists(int sock)
    {
        auto pos = _connections.find(sock);
        return pos != _connections.end();
    }

    void Loop(int &timeout)
    {
        int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件

        for (int i = 0; i < n; ++i) // 下面一定是就绪的事件
        {
            uint32_t event = _revents[i].events; // 就绪的事件
            int sock = _revents[i].data.fd;      // 就绪事件的fd

            // 将所有的异常问题,全部转化,成为读写问题
            if (event & EPOLLERR)
                event | EPOLLIN | EPOLLOUT;
            if (event & EPOLLHUP)
                event | EPOLLIN | EPOLLOUT;

            // _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分
            if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))
                _connections[sock]->recver_(_connections[sock]);
            if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))
                _connections[sock]->sender_(_connections[sock]);
        }
    }

    void recver(Connection *conn)
    {
        while (true)
        {
            char buffer[1024];
            ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);
            if (s > 0)
            {
                buffer[s] = 0;
                conn->inbuffer_ += buffer;
                _service(conn);
            }
            else if (s == 0)
            {
                if (conn->excepter_)
                {
                    conn->excepter_(conn);
                    return;
                }
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if (errno == EIDRM)
                    continue;
                else
                {
                    if (conn->excepter_)
                    {
                        conn->excepter_(conn);
                        return;
                    }
                }
            }
        }
    }

    bool EnableReadWrite(int sock, bool reader, bool writer)
    {
        uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;
        _epoll.Control(sock, event, EPOLL_CTL_MOD);
    }

    void sender(Connection *conn)
    {
        while (true)
        {
            ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
            if (s > 0)
            {
                conn->outbuffer_.erase(0, s);
                if (conn->outbuffer_.empty())
                    break;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                {
                    if (conn->excepter_)
                    {
                        conn->excepter_(conn);
                        return;
                    }
                }
            }
        }
        // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
        if (!conn->outbuffer_.empty()) // 设置关心该fd读
            EnableReadWrite(conn->sock_, true, true);
        else
            EnableReadWrite(conn->sock_, true, false);
    }

    void excepter(Connection *conn)
    {
        logMessage(DEBUG, "Excepter begin");
        _epoll.Control(conn->sock_, 0, EPOLL_CTL_DEL);
        _connections.erase(conn->sock_);
        close(conn->sock_);

        logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->sock_);

        delete conn;
    }
    void Accepter(Connection *conn)
    {
        while (true)
        {
            string clientip;
            uint16_t clientport;
            int err = 0; // 获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了
            int sock = _sock.Accept(&clientip, &clientport, &err);
            if (sock > 0)
            {
                // 新的sock套接字添加到AddConnetions
                AddConnection(sock, EPOLLIN | EPOLLET,
                              bind(&TcpServer::recver, this, placeholders::_1),
                              bind(&TcpServer::sender, this, placeholders::_1),
                              bind(&TcpServer::excepter, this, placeholders::_1));

                logMessage(DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);
            }
            else
            {
                if (err == EAGAIN || err == EWOULDBLOCK)
                    break;
                else if (err == EINTR)
                    continue;
                else
                    break;
            }
        }
    }

    void AddConnection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter)
    {
        // 1.设置非阻塞,ET模式fd要非阻塞
        if (events & EPOLLET)
            Util::SetNonBlock(sock);

        // 2. 该sock创建Connection,并初始化,并添加到connections_
        Connection *conn = new Connection(sock);

        // 3. 给对应的sock设置对应的回调方法
        conn->Rigster(recver, sender, excepter);

        // 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
        bool r = _epoll.AddEvents(sock, events);
        assert(r);
        (void)r;

        // 5. 将kv添加到connections_
        _connections.insert(make_pair(sock, conn));

        logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
    }

public:
private:
    uint16_t _port;

    Sock _sock;
    Epoller _epoll;
    unordered_map<int, Connection *> _connections; // 所有链接集合
    struct epoll_event *_revents;
    int _num;

    func_t _service;
};

调用逻辑完整代码

#include "TcpServer.hpp"
#include "Err.hpp"
#include <memory>
#include "Protocol.hpp"

enum
{
    OK,
    DIV_ERR,
    MOD_ERR,
    OPER_ERR
};

void cal(const Request &req, Response &resp)
{
     req已经有结构化完成的数据啦,你可以直接使用
    resp._exitcode = OK;
    resp._result = OK;

    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if (req._y == 0)
            resp._exitcode = DIV_ERR;
        else
            resp._result = req._x / req._y;
    }
    break;
    case '%':
    {
        if (req._y == 0)
            resp._exitcode = MOD_ERR;
        else
            resp._result = req._x % req._y;
    }
    break;
    default:
        resp._exitcode = OPER_ERR;
        break;
    }
}

void calculate(Connection *conn)
{
    string onePackage;
    while (PartOnepackge(conn->inbuffer_, &onePackage))
    {
        string req_str;
        // 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\n
        if (!Delenth(onePackage, &req_str))
            return;

        cout << "去掉报头的正文:\n"
             << req_str << endl;

        // 2. 对请求Request,反序列化
        // 2.1 得到一个结构化的请求对象
        Request req;
        if (!req.deserialize(req_str))
            return;


        // 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑
        // 3.1 得到一个结构化的响应
        Response resp;
        cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!


        // 4.对响应Response,进行序列化
        // 4.1 得到了一个"字符串"
        string resp_str;
        if (!resp.serialize(&resp_str))
            return;

        // 5 构建成为一个完整的报文
        conn->outbuffer_ += Enlenth(resp_str);

        std::cout << "--------------result: " << conn->outbuffer_ << std::endl;
    }
    // 直接发
    if (conn->sender_)
        conn->sender_(conn);
}

static void usage(std::string proc)
{
    std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        usage(argv[0]);
        exit(USAGG_ERR);
    }

    uint16_t port=atoi(argv[1]);
    std::unique_ptr<TcpServer> uls(new TcpServer(calculate,port));
    uls->initServer();
    uls->Dispatch();

    return 0;
}

总结一下: 这个TcpServer服务器就是传说中的Reactor,对应上服务器上面有一个一个的Connection,未来有哪一个fd就绪了,它就把某一个Connection激活告诉我们事件,Reactor就会进行事件派发。然后去执行对应Connection的读,写,异常方法。这种模式就是Reactor反应堆模式

在这里插入图片描述
Reactor,很明显当我们进行事件就绪了它会回调曾经注册的绑定的方法,今天我们的服务器既保证了就绪事件通知,还负责了IO,其实还负责了业务处理。如果今天我们把一个报文构建成一个任务,扔到后端任务队列,让后端线程池帮我们处理后面的任务。此时对应的Reactor只负责读事件就绪+负责IO,不负责业务处理。

只负责读事件就绪+负责IO 这就是就是半同步
只负责业务处理 这就半异步

还有一种服务器少用的Proactor前摄器模式,可以自己了解了解。

如果想把这个服务器改成多多进程多线程,其实可以直接创建多进程,每个进程里都搞一个TcpServer,或者说创建多线程,每一个线程里都搞一个Tcpserver,把_listensock套接字添加到其中一个线程的Reactor里,一旦有连接就绪的时候,不是要执行Accepter吗,执行Accepter的时候就不仅仅是 AddConnection了,而是尝试把这个连接添加到哪一个线程的epoll中, 然后就在这个线程里把这个文件描述符处理完。

最后说一点,我们还可以在Connectoin中设置一个lasttime记录最近访问时间,每一次读或写的时候我们都更新一个对应时间戳,所以只要读或写就绪了就可以更新一下对应Connection最近时间,换句话说此时我们就可以在派发事件后当所有事件处理完了,就可以在unordered_map遍历所有的连接,计算每一个连接已经有多长时间没有动了,因为每一个连接都有自己的最近访问时间,每一次访问都会更新,不更新就是最开始的,所以我们可以获取当前时间在减去Connectoin里保存的历史最近访问时间,计算出时间差,然后就可以所以连接进行连接管理。时间超过5分钟都没有访问过的,服务器就直接把你关掉。

在这里插入图片描述

如上就是Reactor全部内容。如上也是Linux的全部内容。总结了40多篇Linux从系统编程到网络编程的文章,内容很丰富!!想说什么也不知道该怎么说,就这样把!
大家下篇文章再见 🙌 🙌 🙌 !

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1687936.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端菜鸡,对于35+程序员失业这个事有点麻了

“经常看到30岁程序员失业的新闻&#xff0c;说实话&#xff0c;有点麻。目前程序员供求关系并未失衡&#xff0c;哪怕是最基础的前端或者后台、甚至事务型的岗位也是足够的。 事实上&#xff0c;现在一个开出的岗位要找到一位尽职尽责能顺利完成工作的程序员并不是一件那么容…

石油化工巡检机器人:应对挑战的创新力量

在石油化工领域&#xff0c;安全始终是高悬的达摩克利斯之剑。人工巡检面临诸多痛点&#xff0c;如高危环境对人身安全的巨大威胁&#xff0c;复杂工况下难以做到全面细致监测&#xff0c;对有害气体检测存在滞后性&#xff0c;还有恶劣天气对巡检工作的严重干扰。而这些痛点&a…

[LLM-Agent]万字长文深度解析规划框架:HuggingGPT

HuggingGPT是一个结合了ChatGPT和Hugging Face平台上的各种专家模型&#xff0c;以解决复杂的AI任务&#xff0c;可以认为他是一种结合任务规划和工具调用两种Agent工作流的框架。它的工作流程主要分为以下几个步骤&#xff1a; 任务规划&#xff1a;使用ChatGPT分析用户的请求…

【linux】新增磁盘的使用

查看硬盘和分区信息 【Linux】lsblk 命令使用-CSDN博客 lsblk 根据您提供的lsblk命令输出&#xff0c;我们可以看到系统中的块设备及其分区信息。以下是对每个设备的解释&#xff1a; vda&#xff1a;这是一个大小为40G的磁盘设备。 vda1&#xff1a;这是vda磁盘的第一个分区…

unidbg入门笔记

一、unidbg 介绍 unidbg 是凯神 在 2019 年初开源的一个轻量级模拟器&#xff0c;一个基于Java的跨平台解密引擎&#xff0c;专门用于动态分析和逆向工程应用程序。它可以模拟不同CPU架构、操作系统和指令集&#xff0c;从而使用户能够在一个统一的环境中分析各种不同类型的二…

利用Python去除PDF水印

摘要 本文介绍了如何使用 Python 中的 PyMuPDF 和 OpenCV 库来从 PDF 文件中移除水印&#xff0c;并将每个页面保存为图像文件的方法。我们将深入探讨代码背后的工作原理&#xff0c;并提供一个简单的使用示例。 导言 简介&#xff1a;水印在许多 PDF 文件中都很常见&#x…

用 vue3 + phaser 实现经典小游戏:飞机大战

本文字数&#xff1a;7539字 预计阅读时间&#xff1a;30分钟 01 前言 说起小游戏&#xff0c;最经典的莫过于飞机大战了&#xff0c;相信很多同学都玩过。今天我们也来试试开发个有趣的小游戏吧&#xff01;我们将从零开始&#xff0c;看看怎样一步步实现一个H5版的飞机大战&a…

MyBatis从入门到“入土“

&#x1f495;喜欢的朋友可以关注一下&#xff0c;下次更新不迷路&#xff01;&#x1f495;(●◡●) 目录 一、Mybatis为何物&#xff1f;&#x1f44c; 二、快速入门&#x1f923; 1、新建项目&#x1f60a; 2、数据库建表&#x1f60a; 3、导入依赖的jar包&#x1f60a;…

【高级数据结构】B树

B树 一、概念性问题1、前置知识&#xff1a;常见搜索结构2、常规使用数据结构缺陷问题3、B树概念4、存放数量分析 二、代码实现逻辑1、结点定义和基本框架2、Find查找函数&#xff08;1&#xff09;思想&#xff08;2&#xff09;代码实现 3、InsertKey插入关键字函数--InsertK…

采用伪代码及C代码演示如何解决脱机最小值问题

采用伪代码及C代码演示如何解决脱机最小值问题 问题背景算法设计伪代码实现C代码实现证明数组正确性使用不相交集合数据结构最坏情况运行时间的紧确界 问题背景 脱机最小值问题涉及到一个动态集合 &#xff08; T &#xff09; &#xff08;T&#xff09; &#xff08;T&…

国内大模型价格战全面爆发:新旧势力逐鹿江湖【附主流模型价格对比】

近年来&#xff0c;随着人工智能技术的不断发展&#xff0c;大模型逐渐成为行业的焦点。然而&#xff0c;伴随而来的却是一场价格战。DeepSeek率先推出超低价服务&#xff0c;随后字节跳动、阿里巴巴、百度、科大讯飞、腾讯等巨头纷纷跟进&#xff0c;使得这一领域的竞争愈演愈…

echarts-树图、关系图、桑基图、日历图

树图 树图主要用来表达关系结构。 树图的端点也收symbol的调节 树图的特有属性&#xff1a; 树图的方向&#xff1a; layout、orient子节点收起展开&#xff1a;initialTreeDepth、expandAndCollapse叶子节点设置&#xff1a; leaves操作设置&#xff1a;roam线条&#xff1a…

Mysql触发器优化大数据表

背景 数据库的订单数量过多&#xff0c;需要分出热表用于快速查询&#xff0c;热表仅保存10天的订单数据。 解决思路 每次数据库订单表触发增删改时&#xff0c;同步操作到trigger_order_mul_info表&#xff0c;然后trigger_order_mul_info会定期删除超过10天的数据。 增删…

【编译原理复习笔记】正则表达式与自动机

正则表达式 正则表达式是一种用来描述正则语言的更紧凑的表达方法 e.g. r a ( a ∣ b ) ∗ ( ϵ ∣ ( . ∣ ) ( a ∣ b ) ) ra(a|b)^*(\epsilon|(.|\\_ )(a|b)) ra(a∣b)∗(ϵ∣(.∣)​(a∣b)) 正则表达式可以由较小的正则表达式按照特定的规则递归地构建。每个正则表达式定义…

【笔记】软件架构师要点记录(1)

【笔记】软件架构师要点记录 20240517 20240517 连续性&#xff1a;恢复能力&#xff1b;可用性&#xff1a;保持稳定态的时长 增量开发模式&#xff1a;在增量开发中&#xff0c;每个增量都有明确的范围和功能&#xff0c;并按照特定的功能顺序完成。增量之间的范围划分在开发…

防火墙技术基础篇:基于IP地址的转发策略

防火墙技术基础篇&#xff1a;基于IP地址的转发策略的应用场景及实现 什么是基于IP地址的转发策略&#xff1f; 基于IP地址的转发策略是一种网络管理方法&#xff0c;它允许根据目标IP地址来选择数据包的转发路径。这种策略比传统的基于目的地地址的路由更灵活&#xff0c;因…

图片转excel技术在医疗领域的应用探讨

在医疗行业中&#xff0c;图片转Excel技术的应用已经逐渐普及&#xff0c;为医护人员提供了极大的便利。这种技术利用OCR&#xff08;光学字符识别&#xff09;和机器学习的先进算法&#xff0c;将图片中的信息自动转化为Excel表格&#xff0c;大大提高了数据处理和分析的效率。…

智能锁千千万,谁是你的NO.1,亲身实测凯迪仕传奇大师K70旗舰新品

智能锁千千万&#xff0c;谁是你的NO.1。欢迎来到智哪儿评测室&#xff0c;这次我们为大家带来了凯迪仕传奇大师K70系列的一款重磅新品。 在科技的浪潮中&#xff0c;家居安全领域正经历着前所未有的变革。智能锁越来越成为家的安全守护神&#xff0c;以及智能生活的得力助手。…

Monodle centerNet3D 瑞芯微RKNN、地平线Horizon芯片部署、TensorRT部署

一直想做一点3D目标检测&#xff0c;先来一篇单目3D目标检测Monodle&#xff08;基于centernet的&#xff09;&#xff0c;训练代码参考官方【代码】&#xff0c;这里只讲讲如何部署。 模型和完整仿真测试代码&#xff0c;放在github上参考链接【模型和完整代码】。 1 模型训练…

Creating Server TCP listening socket *:6379: listen: Unknown error

错误&#xff1a; 解决方法&#xff1a; 在redis安装路径中打开cmd命令行窗口&#xff0c;输入 E:\Redis-x64-3.2.100>redis-server ./redis.windows.conf结果&#xff1a;