Linux--epoll(ET)实现Reactor模式

news2024/10/23 14:30:14

Linux–多路转接之epoll

Reactor反应堆模式

Reactor反应堆模式是一种事件驱动的设计模式,通常用于处理高并发的I/O操作,尤其是在服务器或网络编程中。

基本概念

Reactor模式又称之为响应器模式,基于事件多路复用机制,使得单个线程能够同时管理大量并发连接,而不需要为每个连接创建一个独立的线程。它通过一个事件分发器(Reactor)来监听和管理不同的I/O事件,当事件发生时,分发器会将该事件分发给对应的事件处理器来处理。

核心组件

  • 事件分发器(Reactor):负责监听各种事件源(如socket、文件描述符)并将事件分发给相应的处理器。事件分发器通常使用I/O多路复用机制(如select、poll、epoll)来同时监听多个I/O事件。
  • 事件处理器(Event Handler):定义了如何处理特定事件。当事件分发器检测到某个事件时,就会触发相应的事件处理器中的回调函数。
  • 同步事件分离器(Demultiplexer):本质上是系统调用,用于监听事件源上的事件,并将事件通知给事件分发器。例如,在Linux中,可以使用select、poll或epoll等系统调用来实现同步事件分离器。

工作流程

  • 注册事件:事件分发器注册需要监听的I/O事件(如连接、读写),并关联相应的事件处理器。
  • 进入循环:事件分发器进入循环,使用I/O多路复用机制来监听注册的I/O事件。
  • 分发事件:一旦某个I/O事件发生,事件分发器会将该事件分发给对应的事件处理器。
  • 处理事件:事件处理器执行预定义的操作来处理该事件。处理完成后,可能会重新注册事件或关闭连接。
    在这里插入图片描述

epoll服务器(ET)

服务器监听一个指定的端口,当有新的连接请求到来时,服务器接受连接并将其注册到Reactor中,以便处理后续的数据读写事件。

Socket.hpp

包含了一个抽象基类 Socket 和一个继承自 Socket 的具体实现类 TcpSocket。提供一个面向对象的网络套接字编程接口,允许用户通过继承和实现基类中的纯虚函数来创建不同类型的套接字(例如 TCP 套接字)。

#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Comm.hpp"
namespace socket_ns
{
    class Socket;
    const static int gbacklog=8;//默认最大连接数
    using socket_sptr=std::shared_ptr<Socket>;//套接字指针

    enum
    {
        SOCKET_ERROR = 1,
        BIND_ERROR,
        LISTEN_ERROR,
        USAGE_ERROR
    };

    //在基类创建一系列虚函数,只要派生类能用到就在这里创建
    class Socket
    {
    public:
        virtual void CreateSocketOrDie() =0; //创建套接字
        virtual void BindSocketOrDie(InetAddr& addr) =0;  //绑定套接字
        virtual void ListenSocketOrDie()=0; //监听套接字
        virtual int Accepter(InetAddr* addr,int* code) =0; //接受客户端
        virtual bool Connector(InetAddr &addr) = 0; //连接客户端
        virtual int SockFd() = 0; //获取Sockfd
        virtual int Recv(std::string *out) = 0; //接收对方信息
        virtual int Send(const std::string &in) = 0; //发送给对方信息
        virtual void Close()=0; //关闭对应文件
    public:
        //创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建
        void BuildListenSocket(InetAddr& addr)
        {
            CreateSocketOrDie();
            BindSocketOrDie(addr);
            ListenSocketOrDie();
        }
        bool BuildClientSocket(InetAddr &addr)
        {
            CreateSocketOrDie();
            return Connector(addr);
        }
    };

    class TcpSocket : public Socket
    {
    public:
        TcpSocket(int sockfd=-1)
        :_sockfd(sockfd)
        {}
        void CreateSocketOrDie() override  //override明确的重写基类函数
        {
            _sockfd=socket(AF_INET,SOCK_STREAM,0);
            if(_sockfd<0)
            {
                LOG(FATAL, "socket error");
                exit(SOCKET_ERROR);
            }
            SetNonBlock(_sockfd);
            LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
        }
        void BindSocketOrDie(InetAddr& addr) override
        {
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(addr.Port());
            local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());

            int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
            if (n < 0)
            {
                LOG(FATAL, "bind error");
                exit(BIND_ERROR);
            }
            LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
        }
        void ListenSocketOrDie() override
        {
            int n=listen(_sockfd,gbacklog);
            if (n < 0)
            {
                LOG(FATAL, "listen error");
                exit(LISTEN_ERROR);
            }
            LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
        }
        int Accepter(InetAddr* addr,int* code) override
        {
            struct sockaddr_in peer;
            socklen_t len=sizeof(peer);
            int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);
            *code=errno;
            if (sockfd < 0)
            {
                LOG(WARNING, "accept error\n");
                return -1;
            }
            *addr=peer;
            SetNonBlock(sockfd);
            //socket_sptr sock=std::make_shared<TcpSocket>(sockfd);
            return sockfd;
        }
        virtual bool Connector(InetAddr& addr)
        {
            struct sockaddr_in server;
            memset(&server,0,sizeof(server));
            server.sin_family=AF_INET;
            server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());
            server.sin_port=htons(addr.Port());

            int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));
            if (n < 0)
            {
                std::cerr << "connect error" << std::endl;
                return false;
            }
            return true;
        }
        int Recv(std::string *out) override
        {
            char inbuffer[1024];
            ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);
            if (n > 0)
            {
                inbuffer[n] = 0;
                *out += inbuffer; // 接收次数可能不只一次,一般是多次的,
                
            }
            return n;
        }
        int Send(const std::string &in) override
        {
            int n = send(_sockfd,in.c_str(),in.size(),0);
            return n;
        }
        int SockFd() override
        {
            return _sockfd;
        }
        void Close() override
        {
            if (_sockfd > -1)
                ::close(_sockfd);
        }
        ~TcpSocket()
        {}
    private:
        int _sockfd;
    };
}


代码和之前不一样的地方是实现了非阻塞套接字的设置
在这里插入图片描述

Calculate.hpp

用于执行基本的算术运算

#pragma once
#include <iostream>
#include "ProToCol.hpp"

using namespace protocol_ns;

class Calculate
{
public:
    Calculate()
    {
    }

    //根据输入的请求通过实际计算转换为结果
    Response Excute(const Request &req)
    {
        Response resp(0, 0);

        switch (req._oper)
        {
        case '+':
            resp._result = req._x + req._y;
            break;
        case '-':
            resp._result = req._x - req._y;
            break;
        case '*':
            resp._result = req._x * req._y;
            break;
        case '/':
        {
            if (req._y == 0)
            {
                resp._code = 1;
            }
            else
            {
                resp._result = req._x / req._y;
            }
        }
        break;
        case '%':
        {
            if (req._y == 0)
            {
                resp._code = 2;
            }
            else
            {
                resp._result = req._x % req._y;
            }
        }
        break;
        default:
            resp._code = 3;
            break;
        }
        return resp;
    }

     ~Calculate()
    {
    }

private:
};

protocol.hpp

用于处理网络通信中数据序列化和反序列化、编码和解码以及请求和响应对象生成的类和函数.

#pragma once 
#include <iostream>
#include <string>
#include<unistd.h>
#include<memory>
#include<jsoncpp/json/json.h>

namespace protocol_ns
{
    // 协议的样子:
    // 报文 = 报头+有效载荷
    // "有效载荷的长度"\r\n"有效载荷"\r\n
    const std::string SEP= "\r\n";

    // 解决TCP的粘报问题,TCP 读取不全的问题
    std::string Encode(const std::string &json_str)
    {
        int json_str_len = json_str.size(); //有效载荷的长度
        std::string proto_str = std::to_string(json_str_len); //转为string
        proto_str += SEP; //+ 分隔符
        proto_str += json_str;// + 数据字符串
        proto_str += SEP;// + 分隔符
        return proto_str; //返回一个报文
    }

    //将报文分析出数据字符串出来
    std::string Decode(std::string &inbuffer)
    {
        auto pos = inbuffer.find(SEP); //找到分隔符的位置
        if (pos == std::string::npos)
            return std::string();
        
        std::string len_str = inbuffer.substr(0, pos);//前头的有效数据长度的字符串
        if (len_str.empty())
            return std::string();
        int packlen = std::stoi(len_str);//记录数据字符串的实际长度(传递时的差错主要出在这里)

        int total = packlen + len_str.size() + 2 * SEP.size(); //报文总长度
        if (inbuffer.size() < total)
            return std::string();
        
        std::string package = inbuffer.substr(pos + SEP.size(), packlen); //取出数据字符串
        inbuffer.erase(0, total); //删除掉原先的报文
        return package;
    }
    //请求将我们的数据序列化和反序列化(客户端)    
    class Request
    {
    public:
        Request()
        {
        }
        Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
        {
        }   
        //序列化:将结构体数据转换为字符串
        bool Serialize(std::string* out)
        {
            Json::Value root; //Json::Value: Json格式的值
            root["x"] = _x;
            root["y"] = _y;
            root["oper"] = _oper;

            Json::FastWriter writer;
            *out=writer.write(root); //将Json值转换为字符串
            return true;
        }
        
        //反序列化:将字符串转换为结构体数据
        bool DeSerialize(const std::string& in)
        {
            Json::Value root;
            Json::Reader reader;//解析字符串
            bool res=reader.parse(in,root);//将字符串转为Json值,存放于root中
            if (!res)
                return false;

            //再将Json值转为结构体数据
            _x = root["x"].asInt();
            _y = root["y"].asInt();
            _oper = root["oper"].asInt();
            return true;
            
        }
    public:
        int _x;
        int _y;
        char _oper; //操作符 _x 加减乘除 _y
    };

    //将结果序列化和反序列化(服务端)
    class Response
    {
    public:
        Response()
        {
        }
        Response(int result, int code) : _result(result), _code(code)
        {
        }
        bool Serialize(std::string *out)
        {
            // 转换成为字符串
            Json::Value root;
            root["result"] = _result;
            root["code"] = _code;

            Json::FastWriter writer;
            // Json::StyledWriter writer;
            *out = writer.write(root);
            return true;
        }
        bool Deserialize(const std::string &in)
        {
            Json::Value root;
            Json::Reader reader;
            bool res = reader.parse(in, root);
            if (!res)
                return false;

            _result = root["result"].asInt();
            _code = root["code"].asInt();
            return true;
        }
    public:
        int _result; // 结果
        int _code;   // 0:success 1: 除0 2: 非法操作 3. 4. 5
    };

    //创建需求
    class Factory
    {
    public:
      
        Factory()
        {
            srand(time(nullptr) ^ getpid());
            opers = "+-*/%^&|";                                                                                                                 
        }

        std::shared_ptr<Request> BuildRequest()
        {
            int x = rand() % 10 + 1;
            usleep(x * 10);
            int y = rand() % 5; // [0,1,2,3,4]
            usleep(y * x * 5);
            char oper = opers[rand() % opers.size()];
            std::shared_ptr<Request> req= std::make_shared<Request>(x,y,oper);
            return req;
        }
        std::shared_ptr<Response> BuildResponse()
        {
            return std::make_shared<Response>();
        }
        ~Factory()
        {
        }
        
    private:
        std::string opers;
    };
}
  • SEP:定义了报文分隔符为 “\r\n”。
  • Encode:接受一个 JSON 字符串作为有效载荷,将其长度、分隔符和有效载荷拼接成一个完整的报文字符串返回。
  • Decode:从输入缓冲区 inbuffer 中解析出一个报文,返回有效载荷字符串,并从 inbuffer 中删除已解析的报文。

Request 类:

  • 表示一个计算请求,包含两个整数 _x 和 _y 作为操作数,以及一个字符 _oper 作为运算符。
  • Serialize:将 Request 对象序列化为 JSON 格式的字符串。
  • DeSerialize:将 JSON 格式的字符串反序列化为 Request 对象。

Response 类:

  • 表示一个计算响应,包含一个整数 _result 作为运算结果,以及一个整数 _code 作为状态码。
  • Serialize:将 Response 对象序列化为 JSON 格式的字符串。
  • Deserialize:将 JSON 格式的字符串反序列化为 Response 对象。

Factory 类:用于生成 Request 和 Response 对象的工厂类。
在构造函数中初始化了一个包含所有可能运算符的字符串 opers,并使用当前时间和进程 ID 作为随机数种子。

  • BuildRequest:生成一个随机的 Request 对象,其中包括随机的操作数和运算符。
  • BuildResponse:生成一个默认的 Response 对象。目前,这个实现只是简单地返回了一个新创建的 Response 对象,没有设置任何特定的值。

PackageParse.hpp

负责解析从连接(Connection 对象)中接收到的报文,处理这些报文,并将响应发送回客户端

#pragma once

#include <iostream>
#include "Connection.hpp"
#include "ProToCol.hpp"
#include "CalCulate.hpp"

using namespace protocol_ns;
//对报文进行解析
class PackageParse
{
public:
    static void Parse(Connection *conn)
    {
        // std::cout << "inbuffer: " << conn->Inbuffer() << std::endl;
        // 2. 分析数据,确认完整报文
        std::string package;
        Request req;
        Calculate cal;
        while (true)
        {
            // std::cout << conn->Inbuffer() << std::endl;
            // conn->AppendOutBuffer(conn->Inbuffer());
            // break;
            package = Decode(conn->Inbuffer());//取出缓冲区的报文
            if (package.empty())
                break;
            std::cout << "------------------------begin---------------" << std::endl;
            std::cout << "resq string:\n"<< package << std::endl;
                      
            // 3.反序列化
            req.DeSerialize(package);
            // 4. 业务处理
            Response resp = cal.Excute(req);

            // 5. 对应答做序列化
            std::string send_str;
            resp.Serialize(&send_str);

            std::cout << "resp Serialize:" << std::endl;
            std::cout << send_str << std::endl;
            // 6. 添加长度报头
            send_str = Encode(send_str);
            std::cout << "resp Encode:" << std::endl;
            std::cout << send_str << std::endl;
            //将报文放到发送缓冲区中
            conn->AppendOutBuffer(send_str);
        }
        //将缓冲区内容取出,发送到客户端       
        if(!conn->OutbufferEmpty()&& conn->_sender!=nullptr)
        {
            conn->_sender(conn);
            conn->_R->EnableReadWrite(conn->Sockfd(), true, true);
        }
    }
};

Comm.hpp

#pragma once

#include <iostream>
#include <unistd.h>
#include <fcntl.h>

//错误原因
enum
{
    SOCKET_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERROR,
    USAGE_ERROR,
    EPOLL_CREATE_ERROR,
};

//设置为非阻塞的
void SetNonBlock(int fd)
{
    int fl = ::fcntl(fd, F_GETFL);//获取之前的信息
    if(fl < 0) 
    {
        return;
    }
    fcntl(fd, F_SETFL, fl | O_NONBLOCK);//转换为非阻塞的
}

Connection.hpp

一个网络连接,用于在客户端和服务器之间传输数据;
Connection 类与 Reactor 类一起工作,实现了事件驱动的网络编程模型。
在这个模型中,Reactor 负责监听和处理各种网络事件(如连接、读取、写入等),而 Connection 对象则作为这些事件的处理者。
在这里插入图片描述

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include"Reactor.hpp"
#include "InetAddr.hpp"
#include <unistd.h>

class Connection;
class Reactor;
using func_t = std::function<void(Connection *)>; //定义出有关连接函数的指针

//连接客户端与服务端
//而客户端发出的请求称为事件,服务端称为反应堆,会将事件统收,将处理好的事件派发出去
//连接起到事件与反应堆之间的桥梁
class Connection
{
public:
    Connection(int sock) : _sock(sock), _R(nullptr)
    {
    }
    int Sockfd()
    {
        return _sock;
    }
    //设置有关事件(初始化)
    void SetEvents(int events)
    {
        _events = events;
    }
    uint32_t Events()
    {
        return _events;
    }
    //初始化对应函数指针,调用时触发函数
    void Register(func_t recver, func_t sender, func_t excepter)
    {
        _recver = recver;
        _sender = sender;
        _excepter = excepter;
    }
    //初始化反应堆
    void SetSelf(Reactor *R)
    {
        _R = R;
    }
    //将数据放入输入缓冲区
    void AppendInBuffer(const std::string &buff)
    {
        _inbuffer += buff;
    }
    //将数据拿出
    std::string& Inbuffer() 
    {
        return _inbuffer;
    }
    //将数据放到输出缓冲区
    void AppendOutBuffer(const std::string &buff)
    {
        _outbuffer += buff;
    }
    //将数据拿出
    std::string &Outbuffer()
    {
        return _outbuffer;
    }
    //判断输出缓冲区是不是为空
    bool OutbufferEmpty()
    {
        return _outbuffer.empty();
    }
    //将输出缓冲区数据拿出多少
    void OutbufferRemove(int n)
    {
        _outbuffer.erase(0, n);
    }
    void Close()
    {
        if(_sock>=0)
            ::close(_sock);
    }
    ~Connection()
    {
    }

    func_t _recver; //接收者(调用触发接收函数)
    func_t _sender; //发送者(调用触发发送函数)
    func_t _excepter; //其他,处理错误
    Reactor *_R; //反应堆指针(服务器)
private:
    int _sock; //sockfd
    std::string _inbuffer;//输入缓冲区 
    std::string _outbuffer;//输出缓冲区 

    InetAddr _addr;//网络地址

    uint32_t _events; // Connection对象中,_sock关心的事件集合
};


  • 构造函数:接收一个套接字描述符(sock),并将其存储在私有成员 _sock 中。同时,将 _R(指向 Reactor 的指针)初始化为 nullptr。
  • Sockfd 方法:返回与这个连接关联的套接字描述符。
  • SetEventsEvents 方法:允许设置和查询这个连接关心的事件集合(如可读、可写等)。这些事件用于通知 Reactor 何时应该对这个连接进行操作。
  • Register 方法:允许为这个连接注册三个回调函数:_recver(接收数据时调用)、_sender(发送数据时调用)和 _excepter(处理错误时调用)。这些回调函数是 std::function<void(Connection *)> 类型的,意味着它们可以接受一个指向 Connection 对象的指针作为参数。
  • SetSelf 方法:允许设置这个连接所属的 Reactor 对象(通过 _R 指针)。
  • AppendInBufferInbuffer 方法:用于管理输入缓冲区。AppendInBuffer 方法将接收到的数据添加到输入缓冲区中,而 Inbuffer 方法则返回输入缓冲区的引用。

HandlerConnection.hpp

对具体处理函数的实现:

#pragma once

#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include "Log.hpp"
#include "Connection.hpp"


class HandlerConnection
{
public:
    HandlerConnection(func_t func):_func(func)
    {}
    //处理接收
    void HanlderRecv(Connection *conn)
    {
        LOG(DEBUG, "HanlderRecv fd : %d\n", conn->Sockfd());
        while (true)
        {
            errno = 0; //表示当前没有错误
            char buffer[1024];//接收存储区域
            ssize_t n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);//接收函数
            if(n > 0)
            {
                buffer[n] = 0;
                conn->AppendInBuffer(buffer);//将数据放入缓冲区中
            }
            else
            {
                if(errno == EWOULDBLOCK || errno == EAGAIN)//将报文接收完了(在非阻塞操作中)
                {
                    break;
                }
                else if(errno == EINTR)//事件操作时被中断了
                {
                    continue;
                }
                else//出现错误
                {
                    conn->_excepter(conn);//其他处理
                    return; // 一定要提前返回
                }
            }
        }
        _func(conn);//调用函数 ,处理解析
    }
    //处理发送
    void HanlderSend(Connection *conn)
    {
        errno = 0;
        while(true)
        {
            ssize_t n = ::send(conn->Sockfd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);//发送到客户端
            if(n > 0)
            {
                // n 实际发送了多少
                conn->OutbufferRemove(n);//发完的在缓冲区去掉
                if(conn->OutbufferEmpty()) break;
            }
            else if(n == 0)//没有发送数据了
            {
                break;
            }
            else
            {
                if(errno == EWOULDBLOCK || errno == EAGAIN)//缓冲区读取完毕
                {
                    break; 
                }
                else if(errno == EINTR)//事件中断
                {
                    continue;
                }
                else//出现错误
                {
                    conn->_excepter(conn);
                    return;
                }
            }
        }

        //发送缓冲区不为空时
        if(!conn->OutbufferEmpty())
        {
            conn->_R->EnableReadWrite(conn->Sockfd(), true, true); //可读可写
        }
        else//发送缓冲区为空时,不可写出
        {
            conn->_R->EnableReadWrite(conn->Sockfd(), true, false);
        }
    }
    //处理其他
    void HanlderExcpet(Connection *conn)
    {
        errno = 0;
        LOG(DEBUG, "client quit : %d\n",conn->Sockfd());
        conn->_R->RemoveConnection(conn->Sockfd());//断开连接
    }
private:
    func_t _func;//函数指针
};

Epoller.hpp

封装了 Linux 中 epoll 接口的使用,用于高效地管理大量并发网络连接或文件描述符的事件通知。

#pragma once

#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include "Log.hpp"
#include "Comm.hpp"

static const int gsize=128;

class Epoller
{
private:
    bool EventMethodCore(int fd,u_int32_t events,int type)
    {
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = fd;
        int n = ::epoll_ctl(_epfd, type, fd, &ev);
        if(n < 0)
        {
            LOG(ERROR, "epoll_ctl error!\n");
            return false;
        }
        LOG(DEBUG, "epoll_ctl add %d success!\n", fd); // TODO
        return true;
    }
public:
    //初始化,创建epoll
    Epoller()
    {
        _epfd = ::epoll_create(gsize);
        if (_epfd < 0)
        {
            LOG(FATAL, "epoll create error!\n");
            exit(EPOLL_CREATE_ERROR);
        }
        LOG(FATAL, "epoll create success, epfd: %d\n", _epfd);
    }
    //将事件添加到epoll中
    bool AddEvent(int fd, uint32_t events)
    {
        return EventMethodCore(fd,events,EPOLL_CTL_ADD);
    }
    //将事件进行修改
    bool ModEvent(int fd, uint32_t events)
    {
        return EventMethodCore(fd, events, EPOLL_CTL_MOD);
    }
    //对事件进行删除
    bool DelEvent(int fd)
    {
        return ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
    }
    //epoll等待事件的发生
    int Wait(struct epoll_event revs[], int num, int timeout)
    {
        int n = ::epoll_wait(_epfd, revs, num, timeout);
        return n;
    }
    ~Epoller()
    {
        if(_epfd >= 0)//析构需要释放掉epoll的fd
            ::close(_epfd);
    }
private:
    int _epfd;//epoll的fd
};

EventMethodCore:这是一个辅助方法,用于向 epoll 实例中添加、修改或删除事件。

Listener.hpp

Listener 的类: 用于在指定端口上监听并接受新的连接请求的

#pragma once

#include <iostream>
#include <memory>
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "HandlerConnection.hpp"

using namespace socket_ns;

class Listener
{
public:
    Listener(int port, HandlerConnection &hc)
        : _port(port),
          _listensock(std::make_unique<TcpSocket>()),
          _hc(hc)
    {
        InetAddr addr("0", _port);
        _listensock->BuildListenSocket(addr);//创建监听fd
    }
    //接收新的连接
    void Accepter(Connection *conn) 
    {
        while (true)
        {
            InetAddr clientaddr;
            int code = 0;
            int sockfd = _listensock->Accepter(&clientaddr, &code);//接收新连接
            cout<<sockfd<<endl;
            if (sockfd >= 0)
            {
                // TODO
                //对反应堆增加新连接
                conn->_R->AddConnection(
                    sockfd,
                    EPOLLIN | EPOLLET,
                    std::bind(&HandlerConnection::HanlderRecv, &_hc, std::placeholders::_1),
                    std::bind(&HandlerConnection::HanlderSend, &_hc, std::placeholders::_1),
                    std::bind(&HandlerConnection::HanlderExcpet, &_hc, std::placeholders::_1));
                    
            }
            else//出现错误
            {
                //cout<<123<<endl;
                if (code == EWOULDBLOCK || code == EAGAIN)//表示接收了所有连接
                {
                    LOG(DEBUG, "accepter all link!\n");
                    break;
                }
                else if (code == EINTR)//事件中断
                {
                    LOG(DEBUG, "accepter interupt by signal!\n");
                    continue;
                }
                else//出现错误
                {
                    LOG(WARNING, "accept error!\n");
                    break;
                }
            }
        }
    }
    int Sockfd()
    {
        return _listensock->SockFd();
    }
    ~Listener()
    {
        _listensock->Close();
    }
private:
    uint16_t _port; //端口号
    std::unique_ptr<Socket> _listensock;//监听fd
    HandlerConnection &_hc; //连接处理事件
};

Accepter:

将这个新的连接添加到 Epoller 中,注册相应的读、写异常事件处理函数。这些处理函数是通过 std::bind 绑定到 HandlerConnection 的成员函数上的。

如果接收过程中出现错误,根据错误码 code 的不同,采取不同的处理方式:

  • EWOULDBLOCKEAGAIN:表示所有可用的连接都已被接受,此时跳出循环。
  • EINTR:表示操作被信号中断,继续尝试接受连接。
  • 其他错误码:记录警告信息,并跳出循环。

Reactor.hpp(重点)

一个使用epoll作为底层事件通知机制的网络服务器框架的核心部分。这个类管理着网络连接,并对这些连接上的事件进行监听和处理。
在这里插入图片描述

#pragma once

#include <iostream>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"



//反应堆:本质是服务端,对connetion做了管理工作
class Reactor
{
    const static  int gnum=64;
public:
    Reactor() : _isrunning(false)
    {
    }
    //添加连接到反应堆上
    void AddConnection(int fd, uint32_t events, func_t recver, func_t sender, func_t excepter)
    {
        // 1. 构建Connection
        Connection *conn = new Connection(fd);
        conn->SetEvents(events);//初始化事件
        conn->Register(recver, sender, excepter);//注册方法
        conn->SetSelf(this);

        // 2. 对epoll添加新事件
        _epller.AddEvent(conn->Sockfd(), conn->Events());

        // 3. 向_connections添加connection对象
        _connections.insert(std::make_pair(conn->Sockfd(), conn));
    }
    //判断是否有该连接
    bool ConnectionIsExists(int sockfd)
    {
        auto iter = _connections.find(sockfd);
        return iter != _connections.end();
    }
    //读写驱动的更改
    void EnableReadWrite(int sockfd, bool readable, bool writeable)
    {
        uint32_t events = (readable?EPOLLIN:0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
        if(ConnectionIsExists(sockfd))
        {
            // 1. 修改我们写的connection关心的事件
            _connections[sockfd]->SetEvents(events);

            // 2. 写透到内核中
            _epller.ModEvent(sockfd, events);
        }
    }
    //移除连接
    void RemoveConnection(int sockfd)
    {
        if(!ConnectionIsExists(sockfd)) return;
        //去掉epoll中对事件的关心
        _epller.DelEvent(sockfd);
        //服务器关闭sockfd
        _connections[sockfd]->Close();
        //释放connections
        delete _connections[sockfd];
        _connections.erase(sockfd);


    }
    //单次循环处理事务
    void LoopOnce(int timeout)
    { 
        int n = _epller.Wait(revs, gnum, timeout);//等待事件的发生
        for (int i = 0; i < n; i++)
        {   //处理发生的事件
            int sockfd = revs[i].data.fd;
            uint32_t revents = revs[i].events;

            if (revents & EPOLLHUP) //文件符被挂断时
                revents |= (EPOLLIN | EPOLLOUT);
            if (revents & EPOLLERR) //文件符出现错误时
                revents |= (EPOLLIN | EPOLLOUT);
            if (revents & EPOLLIN) //可读时
            {
                //判断连接是否存在并且
                if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_recver != nullptr))
                {
                    _connections[sockfd]->_recver(_connections[sockfd]);//调用处理接收函数
                }
            }
            if (revents & EPOLLOUT) //可写时
            {
                if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_sender != nullptr))
                {
                    _connections[sockfd]->_sender(_connections[sockfd]);
                }
            }
        }
    }
    // 事件派发核心函数
    void Dispatcher()
    {
        _isrunning = true;
        //int timeout = -1;//表示阻塞等待事件的发生
        int timeout = 3000;//3s为周期等待事件的发生
        while (_isrunning)
        {
            LoopOnce(timeout);
            // 处理其他事情
            Debug();
        }
        _isrunning = false;
    }
    void Debug()
    {
        std::cout << "------------------------------------" << std::endl;
        for(auto &connection : _connections)
        {
            std::cout << "fd : " << connection.second->Sockfd() << ", ";
            uint32_t events = connection.second->Events();
            if((events & EPOLLIN) && (events & EPOLLET))
                std::cout << "EPOLLIN | EPOLLET, ";
            if((events & EPOLLOUT) && (events & EPOLLET))
                std::cout << "EPOLLOUT | EPOLLET";
            std::cout << std::endl;
        }
        std::cout << "------------------------------------" << std::endl;
    }
    ~Reactor() {}
private:
    std::unordered_map<int, Connection *> _connections; // int : sockfd
    struct epoll_event revs[gnum]; //事件信息的数组
    Epoller _epller;//一个epoll
    bool _isrunning;//是否运行
};

成员变量

  • _connections: 一个unordered_map,用于存储与每个文件描述符(sockfd)相关联的Connection对象。
  • revs: 一个epoll_event数组,用于从epoll实例中接收事件。
  • _epller: 一个Epoller对象,负责与epoll接口进行交互。
  • _isrunning: 一个布尔值,表示Reactor是否正在运行。

成员函数

  • AddConnection: 添加一个新的连接到Reactor中,包括构建Connection对象、设置事件、注册处理函数,并将连接添加到_connections映射中。同时,通过_epller对象将连接的文件描述符添加到epoll的监听列表中。
  • ConnectionIsExists: 检查给定的文件描述符是否存在于_connections映射中。
  • EnableReadWrite: 修改指定连接关心的事件(读或写),并更新epoll中的事件监听。
  • RemoveConnection: 从Reactor中移除一个连接,包括从epoll中删除事件监听、关闭文件描述符、删除Connection对象,并从_connections映射中移除。
  • LoopOnce: 等待并处理一次epoll事件循环中的事件。这包括读取事件、检查文件描述符的状态(如挂断或错误),并调用相应的处理函数(接收或发送)。
  • Dispatcher: Reactor的主循环函数,不断调用LoopOnce来处理事件,直到_isrunning变为false。
  • Debug: 打印当前Reactor中所有连接的状态和它们关心的事件。

Main.cc

这是所写头文件的逻辑思路
在这里插入图片描述

#include <iostream>
#include <memory>
#include "Reactor.hpp"
#include "Connection.hpp"
#include "Listener.hpp"
#include "PackageParse.hpp"
#include "HandlerConnection.hpp"
#include "Log.hpp"


int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cout << "Usage: " << argv[0] << " port" << std::endl;
        return 0;
    }
    uint16_t port = std::stoi(argv[1]);
    EnableScreen();

    std::unique_ptr<Reactor> react = std::make_unique<Reactor>(); // 主服务
    HandlerConnection hc(PackageParse::Parse); //处理连接相关函数的对象

    Listener listener(port,hc);//监听注册
    //反应堆添加监听连接
    react->AddConnection(
        listener.Sockfd(),
        EPOLLIN| EPOLLET,
        std::bind(&Listener::Accepter,&listener,std::placeholders::_1),
        nullptr,
        nullptr
    );

    react->Dispatcher();//事件派发
}
  • 创建一个Reactor类的智能指针实例,这是主服务组件。
  • 创建一个HandlerConnection对象hc,它使用PackageParse::Parse函数来处理数据包的解析。
  • 创建一个Listener对象listener,负责监听指定端口上的连接请求,并将新的连接请求通过hc(连接处理器)处理。
  • 通过react->AddConnection()方法,将监听套接字(listener.Sockfd())注册到Reactor中,设置监听事件为读事件(EPOLLIN)和边缘触发模式(EPOLLET),并绑定Listener::Accepter方法作为事件处理函数。这里使用了std::bind来绑定Listener对象的Accepter成员函数。
  • 调用react->Dispatcher()开始事件分发循环,这是Reactor模式的核心,它不断监听事件并调用相应的事件处理函数。

结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2221646.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络与信息安全工程师最新报考介绍(工信部教育与考试中心)

文章目录 前言 网络与信息安全工程师职业介绍主要的工作内容职业技能要求网络与信息安全工程师职业前景怎么样网络与信息安全工程师工作方向网络与信息安全工程师适学人群 如何入门学习网络安全 【----帮助网安学习&#xff0c;以下所有学习资料文末免费领取&#xff01;----】…

solidworks(sw)右侧资源栏变成英文,无法点击

sw右侧资源栏变成英文&#xff0c;无法点击&#xff0c;如图 使用xxclean 的扩展功能 SW右侧栏是英文 toolbox配置无效 这个按钮 修复完成之后重新打开软件查看是否变成中文。

[linux]快速入门

学习目标 通过学习能够掌握以下的linux操作 操作系统 按照应用领域的不同, 操作系统可以分为几类 桌面操作系统服务器操作系统移动设备操作系统嵌入式操作系统 不同领域的主流操作系统 桌面操作系统 Windows(用户数量最多)MacOS(操作体验好&#xff0c;办公人士首选)Linux…

Spring AI : Java写人工智能(LLM)的应用框架

Spring AI&#xff1a;为Java开发者提供高效集成大模型能力的框架 当前Java调用大模型时&#xff0c;面临缺乏优质AI应用框架的挑战。Spring作为资深的Java应用框架提供者&#xff0c;通过推出Spring AI来解决这一问题。它借鉴了langchain的核心理念&#xff0c;并结合了Java面…

解密 Redis:如何通过 IO 多路复用征服高并发挑战!

文章目录 一、什么是 IO 多路复用&#xff1f;二、为什么 Redis 要使用 IO 多路复用&#xff1f;三、Redis 如何实现 IO 多路复用&#xff1f;四、IO 多路复用的核心机制&#xff1a;epoll五、IO 多路复用在 Redis 中的工作流程六、IO 多路复用的优点七、IO 多路复用使用中的注…

安装buildkit,并使用buildkit构建containerd镜像

背景 因为K8s抛弃Docker了,所以就只装了个containerd,这样就需要一个单独的镜像构建工具了,就用了buildkit,这也是Docker公司扶持的,他们公司的人出来搞的开源工具,官网在 https://github.com/moby/buildkit 简介 服务端为buildkitd,负责和runc或containerd后端连接干活,目前…

w~自动驾驶合集6

我自己的原文哦~ https://blog.51cto.com/whaosoft/12286744 #自动驾驶的技术发展路线 端到端自动驾驶 Recent Advancements in End-to-End Autonomous Driving using Deep Learning: A SurveyEnd-to-end Autonomous Driving: Challenges and Frontiers 在线高精地图 HDMa…

windows文件拷贝给wsl2的Ubuntu

参考&#xff1a; windows文件如何直接拖拽到wsl中_win 移到文件到wsl-CSDN博客 cp -r /mnt/盘名/目标文件 要复制到wsl中的位置e.g.cp -r /mnt/d/byt5 /home Linux文件复制、移动、删除等操作命令_linux移动命令-CSDN博客 Linux 文件、文件夹的复制、移动、删除 - Be-myse…

构建后端为etcd的CoreDNS的容器集群(二)、下载最新的etcd容器镜像

在尝试获取etcd的容器的最新版本镜像时&#xff0c;使用latest作为tag取到的并非最新版本&#xff0c;本文尝试用实际最新版本的版本号进行pull&#xff0c;从而取到想的最新版etcd容器镜像。 一、用latest作为tag尝试下载最新etcd的镜像 1、下载镜像 [rootlocalhost opt]# …

多品牌摄像机视频平台EasyCVR海康大华宇视视频平台如何接入多样化设备

在实际的工程项目里&#xff0c;我们常常会面临这样的情况&#xff1a;项目管理者可能会决定使用多个品牌的视频监控摄像头&#xff0c;或者有需求将现有的、多种类型的监控系统进行整合。现在&#xff0c;让我们来探讨一下如何实现不同品牌摄像头的连接和使用。 1、GB/T281协议…

2024版最新148款CTF工具整理大全(附下载安装包)含基础环境、Web 安全、加密解密、密码爆破、文件、隐写、逆向、PWN

经常会有大学生粉丝朋友私信小强&#xff0c;想通过打CTF比赛镀金&#xff0c;作为进入一线互联网大厂的门票。 但是在CTF做题很多的时候都会用到工具&#xff0c;所以在全网苦寻CTF比赛工具安装包&#xff01; 关于我 有不少阅读过我文章的伙伴都知道&#xff0c;我曾就职于…

SSM框架实战小项目:打造高效用户管理系统 day3

前言 在前两篇博客中&#xff0c;后台已经搭建完毕&#xff0c;现在需要设计一下前端页面 webapp下的项目结构图 创建ftl文件夹&#xff0c;导入css和js 因为我们在后台的视图解析器中&#xff0c;设置了页面解析器&#xff0c;跳转路径为/ftl/*.ftl&#xff0c;所以需要ftl文件…

JAVA开源项目 网上订餐系统 计算机毕业设计

本文项目编号 T 018 &#xff0c;文末自助获取源码 \color{red}{T018&#xff0c;文末自助获取源码} T018&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 新…

Pycharm通过ssh远程docker容器搭建开发环境

本地pycharm通过ssh方式远程连接服务器&#xff08;Ubuntu&#xff09;上的docker容器&#xff0c;搭建开发环境。实现功能&#xff1a;将环境依赖打包成docker镜像&#xff0c;并在远程服务器中启动容器&#xff08;启动时做好端口映射和文件夹挂载&#xff09;&#xff0c;通…

负载箱的作用?

负载箱&#xff0c;顾名思义&#xff0c;就是用来承载电力设备的箱子。在电力系统中&#xff0c;负载箱的作用非常重要&#xff0c;它不仅可以模拟实际的电力负载&#xff0c;还可以对电力设备进行测试和调试&#xff0c;确保其正常运行。下面详细介绍负载箱的作用。 1. 模拟实…

15分钟学Go 第7天:控制结构 - 条件语句

第7天&#xff1a;控制结构 - 条件语句 在Go语言中&#xff0c;控制结构是程序逻辑的重要组成部分。通过条件语句&#xff0c;我们可以根据不同的条件采取不同的行动。今天我们将详细探讨Go语言中的两种主要条件结构&#xff1a;if语句和switch语句。理解这些控制结构对于编写…

复写零--双指针

一&#xff1a;题目描述 题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 二&#xff1a;算法原理分析 三&#xff1a;代码编写 void duplicateZeros3(vector<int>& arr) {int dest -1, cur 0, n arr.size();//1.找到要复写的最后一个数字while …

qiankun 应用之间数据传递

qiankun 应用之间数据传递 全局共享 initGlobalState qiankun initGlobalState API 单击前往 qiankun 内部提供了 initGlobalState 方法用于注册 MicroAppStateActions 实例用于通信&#xff0c;该实例有三个方法&#xff0c;分别是onGlobalStateChange、setGlobalState、of…

微软宣布了新的“智能代理”功能

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Elasticsearch:Redact(编辑) processor

Redact 处理器使用 Grok 规则引擎来隐藏输入文档中与给定 Grok 模式匹配的文本。该处理器可用于隐藏个人身份信息 (Personal Identifying Information - PII)&#xff0c;方法是将其配置为检测已知模式&#xff0c;例如电子邮件或 IP 地址。与 Grok 模式匹配的文本将被替换为可…