Reactor 反应堆模式

news2024/9/28 9:17:43

Reactor 反应堆模式

1、概念

Reactor(反应堆)模式是一种事件驱动的设计模式,通常用于处理高并发的I/O操作,尤其是在服务器或网络编程中。它基于事件多路复用机制,使得单个线程能够同时管理大量并发连接,而不需要为每个连接创建一个独立的线程。

1.1、核心思想

Reactor 模式的核心思想是通过一个事件分发器(reactor)来监听和管理不同的 I/O 事件,当事件发生时,分发器会将该事件分发给对应的事件处理器来处理。


1.2、关键组件

  • 事件分发器 (Reactor)
    负责监听各种事件源(如 socket、文件描述符)并将事件分发给相应的处理器。它通过使用 I/O 多路复用机制(如 selectpollepoll)来同时监听多个 I/O 事件。

  • 事件处理器 (Event Handler)
    针对不同类型的事件(如连接、读、写),每个事件都会有一个对应的处理器,处理器内部会定义如何响应该事件。

  • 资源(Handle)
    代表系统中的 I/O 资源,例如网络 socket。事件分发器监听这些资源上的事件,当有 I/O 事件发生时,调用相应的处理器。

  • 回调函数 (Callback)
    在 Reactor 模式中,处理事件的方式通常是通过回调机制。事件处理器定义了如何处理特定事件,当事件分发器检测到某个事件时,就会触发相应的回调函数。


1.3、工作流程

  • 注册事件
    事件分发器注册需要监听的 I/O 事件(如连接、读写),并关联相应的事件处理器。

  • 事件循环
    事件分发器进入循环,使用 I/O 多路复用机制来监听注册的 I/O 事件。

  • 事件发生
    一旦某个 I/O 事件发生,事件分发器会将该事件分发给对应的事件处理器,事件处理器执行预定义的操作。

  • 处理完毕
    事件处理器完成事件处理后,可能会重新注册事件或关闭连接。


2、Reactor 反应堆模式代码

这里我们用到了 ET 模式

  • Reactor.hpp文件:
#pragma once
#include <string>
#include <unordered_map>
#include "Connection.hpp"
#include "Epoller.hpp"

// TcpServer就是Reactor(反应堆)
// class TcpServer // 对Connection和Epoller管理就行
class Reactor
{
  const static int gnum = 64;

public:
  Reactor() : _is_running(false) {}

  void AddConnection(int sockfd, uint32_t events, func_t recver, func_t sender, func_t excepter)
  {
      // 1.构建Connection对象
      Connection *conn = new Connection(sockfd);
      conn->SetEvent(events);
      conn->Register(recver, sender, excepter);
      conn->SetSelf(this);

      // 2.向内核表示对fd的关心
      _epoller.AddEvent(conn->SockFd(), conn->Events());

      // std::cout << "sockfd : " << sockfd << " , events : " << (events & EPOLLIN) << std::endl;

      // 3.向_connections添加Connection对象
      _connections.insert(std::make_pair(conn->SockFd(), conn));
  }

  bool ConnectionIsExist(int sockfd)
  {
      auto iter = _connections.find(sockfd);

      return iter != _connections.end();
  }

  void EnableReadWrite(int sockfd, bool wr, bool rd)
  {
      uint32_t events = (wr ? EPOLLOUT : 0) | (rd ? EPOLLIN : 0) | EPOLLET;
      if (ConnectionIsExist(sockfd))
      {
          // 修改对事件的关心
          _connections[sockfd]->SetEvent(events);
          // 设置到内核
          _epoller.ModEvent(sockfd, events);
      }
  }

  void RemoveConnection(int sockfd)
  {
      if (!ConnectionIsExist(sockfd))
          return;
      // 解除对文件描述符的关心
      _epoller.DelEvent(sockfd);
      // 关闭文件描述符
      ::close(sockfd);
      // 去除该连接
      delete _connections[sockfd];
      _connections.erase(sockfd);
  }

  // 一次派发
  void LoopOnce(int timeout)
  {
      int n = _epoller.Wait(recv, gnum, timeout); // n个事件就绪
      for (int i = 0; i < n; i++)
      {
          int sockfd = recv[i].data.fd;
          uint32_t revents = recv[i].events;

          // std::cout << "sockfd : " << sockfd << " , revents : " << revents << std::endl;

          // 挂起或者出错了转为读写事件就绪
          if (revents & EPOLLHUP)
              revents |= (EPOLLIN | EPOLLOUT);
          if (revents & EPOLLERR)
              revents |= (EPOLLIN | EPOLLOUT);

          // 读事件就绪
          if (revents & EPOLLIN)
          {
              // 文件描述符得在_connections存在(比如客户端可能退出了,这个文件描述符就没有了)
              if (ConnectionIsExist(sockfd) && (_connections[sockfd]->_recver != nullptr))
                  _connections[sockfd]->_recver(_connections[sockfd]); // 处理读事件就绪,这里_recver已经在AddConnection注册了!
          }
          // 写事件就绪
          if (revents & EPOLLOUT)
          {
              if (ConnectionIsExist(sockfd) && (_connections[sockfd]->_sender != nullptr))
                  _connections[sockfd]->_sender(_connections[sockfd]); // 处理写事件就绪,这里_sender已经在AddConnection注册了!
          }
      }
  }

  // 只负责事件派发
  void Despatcher()
  {
      _is_running = true;
      int timeout = -1; // 阻塞等
      while (true)
      {
          LoopOnce(timeout);
          // 处理其他事情
          Debug();
      }
      _is_running = false;
  }

  void Debug()
  {
      for (auto &connection : _connections)
      {
          std::cout << "------------------------------------" << std::endl;
          std::cout << "fd : " << connection.second->SockFd() << " , ";
          uint32_t events = connection.second->Events();
          if ((events & EPOLLIN) && (events & EPOLLET))
              std::cout << "EPOLLIN | EPOLLET";
          if ((events & EPOLLIN) && (events & EPOLLET))
              std::cout << "EPOLLIN | EPOLLET";
          std::cout << std::endl;
      }
      std::cout << "------------------------------------" << std::endl;
  }
  ~Reactor() {}

private:
  std::unordered_map<int, Connection *> _connections; // 保存fd 和 对应的连接
  Epoller _epoller;

  struct epoll_event recv[gnum];
  bool _is_running;
};
  • Socket.hpp文件:
#pragma once

#include <string.h>
#include <memory>

#include "Log.hpp"
#include "Comm.hpp"

namespace socket_ns
{
  const static int gbacklog = 8;

  class Socket;
  using socket_sptr = std::shared_ptr<Socket>; // 定义智能指针,以便于后面多态

  // 使用
  // std::unique_ptr<Socket> listensocket = std::make_unique<TcpSocket>();
  // listensocket->BuildListenSocket();
  // socket_sptr retsock = listensocket->Accepter();
  // retsock->Recv();
  // retsock->Send();

  // std::unique_ptr<Socket> clientsocket = std::make_unique<TcpSocket>();
  // clientsocket->BuildClientSocket();
  // clientsocket->Send();
  // clientsocket->Recv();

  class Socket
  {
  public:
      virtual void CreateSocketOrDie() = 0;
      virtual void BindSocketOrDie(InetAddr &addr) = 0;
      virtual void ListenSocketOrDie() = 0;
      virtual int Accepter(InetAddr *addr, int *errcode) = 0;
      virtual bool Connector(InetAddr &addr) = 0;
      virtual void SetSocketAddrReuse() = 0;
      virtual int SockFd() = 0;

      virtual ssize_t Recv(std::string *out) = 0;
      virtual ssize_t Send(std::string &in) = 0;
      // virtual void Other() = 0;

  public:
      void BuildListenSocket(InetAddr &addr)
      {
          CreateSocketOrDie();
          SetSocketAddrReuse();
          BindSocketOrDie(addr);
          ListenSocketOrDie();
      }

      bool BuildClientSocket(InetAddr &addr)
      {
          CreateSocketOrDie();
          return Connector(addr);
      }
  };

  class TcpSocket : public Socket
  {
  public:
      TcpSocket(int sockfd = -1) : _socktfd(sockfd)
      {
      }

      virtual void SetSocketAddrReuse() override
      {
          int opt = 1;
          ::setsockopt(_socktfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
      }

      virtual void CreateSocketOrDie() override
      {
          // 创建
          _socktfd = socket(AF_INET, SOCK_STREAM, 0); // 这个就是文件描述符
          if (_socktfd < 0)
          {
              LOG(FATAL, "create sockfd error, error code : %d, error string : %s", errno, strerror(errno));
              exit(CREATE_ERROR);
          }
          LOG(INFO, "create sockfd success");
      }
      virtual void BindSocketOrDie(InetAddr &addr) override
      {
          struct sockaddr_in local;
          bzero(&local, sizeof(local));
          local.sin_family = AF_INET;
          local.sin_port = htons(addr.Port());
          local.sin_addr.s_addr = INADDR_ANY;
          // 绑定
          int n = ::bind(_socktfd, CONV(&local), sizeof(local));
          if (n < 0)
          {
              LOG(FATAL, "bind sockfd error, error code : %d, error string : %s", errno, strerror(errno));
              exit(BIND_ERROR);
          }
          LOG(INFO, "bind sockfd success");
      }
      virtual void ListenSocketOrDie() override
      {
          // 监听
          int ret = ::listen(_socktfd, gbacklog);
          if (ret < 0)
          {
              LOG(FATAL, "listen error, error code : %d , error string : %s", errno, strerror(errno));
              exit(LISTEN_ERROR);
          }
          LOG(INFO, "listen success!");
      }
      virtual int Accepter(InetAddr *addr, int *errcode) override
      {
          struct sockaddr_in peer;
          socklen_t len = sizeof(peer);

          // 获取新连接
          int newsockfd = accept(_socktfd, CONV(&peer), &len); // 建立连接成功,创建新文件描述符进行通信

          *errcode = errno;
          LOG(DEBUG, "errno : ", errno);

          if (newsockfd < 0)
          {
              LOG(WARNING, "accept error, error code : %d , error string : %s", errno, strerror(errno));
              return -1;
          }
          LOG(INFO, "accept success! new sockfd : %d", newsockfd);

          SetNonBlock(_socktfd); // 这里不是newsockfd,这里是对listensock进行非阻塞

          *addr = peer;
          // socket_sptr sock = std::make_shared<TcpSocket>(newsockfd); // 创建新的文件描述符,传出去以便于后面的Recv和Send
          return newsockfd;
      }

      virtual bool Connector(InetAddr &addr) override
      {
          struct sockaddr_in local;
          bzero(&local, sizeof(local));
          local.sin_family = AF_INET;
          local.sin_port = htons(addr.Port());
          local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());

          // 发起连接
          int n = ::connect(_socktfd, CONV(&local), sizeof(local));
          if (n < 0)
          {
              LOG(WARNING, "create connect error, error code : %d, error string : %s", errno, strerror(errno));
              return false;
          }
          LOG(INFO, "create connect success");
          return true;
      }

      virtual int SockFd() override
      {
          return _socktfd;
      }

      virtual ssize_t Recv(std::string *out) override
      {
          char buff[1024];
          ssize_t n = recv(_socktfd, buff, sizeof(buff) - 1, 0);
          if (n > 0)
          {
              buff[n] = 0;
              *out += buff; // 方便当数据到来不是刚好1条数据的时候,进行合并后来的数据
          }
          return n;
      }
      virtual ssize_t Send(std::string &in) override
      {
          ssize_t n = send(_socktfd, in.c_str(), in.size(), 0);
          return n;
      }

  private:
      int _socktfd; // 用同一个_socket
  };
}
  • Connection.hpp文件:
#pragma once
#include <string>
#include <functional>
#include <sys/epoll.h>
#include "InetAddr.hpp"

class Reactor;
class Connection;
using func_t = std::function<void(Connection *)>;

class Connection
{
public:
  Connection(int sockfd) : _sockfd(sockfd), _R(nullptr) {}

  void SetEvent(uint32_t events)
  {
      _events = events;
  }

  void Register(func_t recver, func_t sender, func_t excepter)
  {
      _recver = recver;
      _sender = sender;
      _excepter = excepter;
  }

  void SetSelf(Reactor *R)
  {
      _R = R;
  }

  int SockFd()
  {
      return _sockfd;
  }

  void AppendInbuff(const std::string &buff)
  {
      _inbuffer += buff;
  }

  void AppendOutbuff(const std::string &buff)
  {
      _outbuffer += buff;
  }

  std::string &Inbuffer() // 返回引用,后面Decode得字符串切割
  {
      return _inbuffer;
  }

  std::string &Outbuffer() // 返回引用,后面Decode得字符串切割
  {
      return _outbuffer;
  }

  void OutBufferRemove(int n)
  {
      _outbuffer.erase(0, n);
  }

  uint32_t Events()
  {
      return _events;
  }

  ~Connection() {}

private:
  int _sockfd;

  // 输入输出缓冲区
  std::string _inbuffer;
  std::string _outbuffer;

  // 已经准备好的事件
  uint32_t _events;

  InetAddr _clientaddr;

public:
  // 处理事件
  func_t _recver;
  func_t _sender;
  func_t _excepter;

  Reactor *_R;
};
  • Epoller.hpp文件:
#pragma once
#include <sys/epoll.h>
#include "Log.hpp"

class Epoller
{
  bool EventCore(int sockfd, uint32_t event, int type)
  {
      struct epoll_event ep_event;
      ep_event.data.fd = sockfd;
      ep_event.events = event;
      int n = ::epoll_ctl(_epfd, type, sockfd, &ep_event);
      if (n < 0)
      {
          LOG(ERROR, "epoll_ctl error");
          return false;
      }
      LOG(DEBUG, "epoll_ctl add %d fd success", sockfd);
      return true;
  }

public:
  Epoller()
  {
      _epfd = ::epoll_create(128);
      if (_epfd < 0)
      {
          LOG(FATAL, "create epfd error");
          exit(EPOLL_CREATE_ERROR);
      }
      LOG(DEBUG, "create epfd success, epfd : %d", _epfd);
  }

  bool AddEvent(int sockfd, uint32_t event)
  {
      return EventCore(sockfd, event, EPOLL_CTL_ADD);
  }

  bool ModEvent(int sockfd, uint32_t event)
  {
      return EventCore(sockfd, event, EPOLL_CTL_MOD);
  }

  bool DelEvent(int sockfd)
  {
      return ::epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
  }

  int Wait(struct epoll_event *recv, int num, int timeout)
  {
      int n = ::epoll_wait(_epfd, recv, num, timeout);
      return n;
  }
  ~Epoller()
  {
      if (_epfd > 0)
          ::close(_epfd);
  }

private:
  int _epfd;
};
  • IOService.hpp文件:
#pragma once
#include "Connection.hpp"
#include "Comm.hpp"

// 处理IO,recver、sender、excepter
class IOService
{
public:
  IOService(func_t func) : _func(func) {}
  void HandlerRecv(Connection *conn)
  {
      // 处理读事件
      errno = 0;
      while (true)
      {
          char buff[1024];
          ssize_t n = ::recv(conn->SockFd(), buff, sizeof(buff) - 1, 0);
          SetNonBlock(conn->SockFd()); // 这里也得非阻塞,不然会阻塞
          if (n > 0)
          {
              buff[n] = 0;
              conn->AppendInbuff(buff);
          }
          else
          {
              if (errno == EWOULDBLOCK || errno == EAGAIN)
              {
                  break;
              }
              else if (errno == EINTR)
              {
                  continue;
              }
              else
              {
                  conn->_excepter(conn); // 统一处理异常
                  return;                // 一定要提前返回
              }
          }
      }
      LOG(DEBUG, "debug");
      _func(conn);
  }

  void HandlerSend(Connection *conn)
  {
      // errno
      errno = 0;

      while (true)
      {
          ssize_t n = ::send(conn->SockFd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);
          if (n > 0)
          {
              // 发送的数据的字节数小于Outbuffer的大小
              // n即实际发了多少
              conn->OutBufferRemove(n);
              if (conn->Outbuffer().empty())
                  break; // 发完了
          }
          else if (n == 0)
          {
              break;
          }
          else
          {
              // 写到文件结尾了
              if (errno == EWOULDBLOCK || errno == EAGAIN)
              {
                  break;
              }
              else if (errno == EINTR)
              {
                  continue;
              }
              else
              {
                  conn->_excepter(conn); // 统一处理异常
                  return;                // 一定要提前返回
              }
          }
      }

      // 一定遇到了缓冲区被写满
      if (!conn->Outbuffer().empty())
      {
          // 开启对写事件关心
          conn->_R->EnableReadWrite(conn->SockFd(), true, true);
      }
      else
      {
          // 重置对写事件的不关心
          conn->_R->EnableReadWrite(conn->SockFd(), false, true);
      }
  }

  void HandlerExcept(Connection *conn)
  {
      conn->_R->RemoveConnection(conn->SockFd());
  }

private:
  func_t _func;
};
  • Listener.hpp文件:
#pragma once
#include <iostream>
#include <memory>
#include "Socket.hpp"
#include "IOService.hpp"

using namespace socket_ns;

class Listener
{
public:
  Listener(uint16_t port, IOService &io) : _port(port), _listensock(std::make_unique<TcpSocket>()), _io(io)
  {
      InetAddr clientaddr("0", port);
      _listensock->BuildListenSocket(clientaddr);
      // LOG(DEBUG,"Listen sock : %d",_listensock->SockFd());
  }

  void Accepter(Connection *conn) // conn一定是listensock
  {
      LOG(DEBUG, "get new link , conn fd : %d", conn->SockFd());

      // 新连接到来
      while (true)
      {
          InetAddr clientaddr;
          int code = 0;

          int listensockfd = _listensock->Accepter(&clientaddr, &code); // 第二次卡住
          // listensockfd = _listensock->Accepter(&clientaddr, &code);

          if (listensockfd >= 0)
          {
              // 添加新连接
              conn->_R->AddConnection(listensockfd, EPOLLIN | EPOLLET,
                                      std::bind(&IOService::HandlerRecv, &_io, std::placeholders::_1),
                                      std::bind(&IOService::HandlerSend, &_io, std::placeholders::_1),
                                      std::bind(&IOService::HandlerExcept, &_io, std::placeholders::_1));
              // 这里就只是添加对应的处理函数(不懂跳过去可以看AddConnection函数),用不用看到时候到的是什么信号(EPOLLIN等)
              // 使用对应的函数会传conn,比如使用_recver(conn)。
          }
          else
          {
              if (code == EWOULDBLOCK || code == EAGAIN)
              {
                  // 读完了所有就绪文件描述符
                  LOG(DEBUG, "ready fd read complete!");
                  break;
              }
              else if (code == EINTR)
              {
                  LOG(DEBUG, "accpet interupt by signal ");
                  continue;
              }
              else
              {
                  LOG(WARNING, "accpet error");
                  break;
              }
          }
      }
  }

  int SockFd()
  {
      return _listensock->SockFd();
  }

  ~Listener()
  {
      ::close(_listensock->SockFd());
  }

private:
  uint16_t _port;
  std::unique_ptr<Socket> _listensock;
  IOService &_io;
};
  • Calculate.hpp文件:
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include "Protocol.hpp"

using namespace protocol_ns;

// 应用层
class Calculate
{
public:
  Calculate()
  {
  }

  std::unique_ptr<Response> Execute(const Request &req)
  {
      std::unique_ptr<Response> resptr = std::make_unique<Response>();

      switch (req._oper)
      {
      case '+':
          resptr->_result = req._x + req._y;
          resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          break;

      case '-':
          resptr->_result = req._x - req._y;
          resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          break;

      case '*':
          resptr->_result = req._x * req._y;
          resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          break;
      case '/':
      {
          if (req._y == 0)
          {
              resptr->_flag = 1;
              resptr->_equation = "除0错误";
          }
          else
          {
              resptr->_result = req._x / req._y;
              resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          }
          break;
      }

      case '%':
      {
          if (req._y == 0)
          {
              resptr->_flag = 2;
              resptr->_equation = "模0错误";
          }
          else
          {
              resptr->_result = req._x % req._y;
              resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          }
          break;
      }
      default:
          resptr->_flag = 3;
          break;
      }
      return resptr;
  }
  ~Calculate() {}

private:
};
  • Comm.hpp文件:
#pragma once
#include <unistd.h>
#include <fcntl.h>

#include "InetAddr.hpp"


enum errorcode
{
  CREATE_ERROR = 1,
  BIND_ERROR,
  LISTEN_ERROR,
  SEND_ERROR,
  RECV_ERROR,
  CONNECT_ERROR,
  FORK_ERROR,
  USAGE_ERROR,
  EPOLL_CREATE_ERROR
};

#define CONV(ADDR) ((struct sockaddr *)ADDR)

std::string CombineIpAndPort(InetAddr addr)
{
  return "[" + addr.Ip() + ":" + std::to_string(addr.Port()) + "] ";
}


void SetNonBlock(int fd)
{
  int f1 = ::fcntl(fd, F_GETFL); // 获取标记位
  if (f1 < 0)
      return;
  ::fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}
  • LockGuard.hpp文件:
# pragma once

#include <pthread.h>


class LockGuard
{
public:
   LockGuard(pthread_mutex_t *mutex) : _mutex(mutex)
   {
       pthread_mutex_lock(_mutex); // 构造加锁
   }
   ~LockGuard()
   {
       pthread_mutex_unlock(_mutex); // 析构解锁
   }

private:
   pthread_mutex_t *_mutex;
};

  • Log.hpp文件:
#pragma once

#include <string>
#include <iostream>
#include <fstream>
#include <unistd.h>
#include <stdarg.h>
#include <sys/types.h>
#include "LockGuard.hpp"

using namespace std;

bool isSave = false; // 默认向显示器打印
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define FILEPATH "./log.txt"

enum level
{
   DEBUG = 0,
   INFO,
   WARNING,
   ERROR,
   FATAL
};

void SaveToFile(const string &message)
{
   ofstream out(FILEPATH, ios_base::app);
   if (!out.is_open())
       return;
   out << message;
   out.close();
}

std::string LevelToString(int level)
{
   switch (level)
   {
   case DEBUG:
       return "Debug";
   case INFO:
       return "Info";
   case WARNING:
       return "Warning";
   case ERROR:
       return "Error";
   case FATAL:
       return "Fatal";
   default:
       return "Unknow";
   }
}

std::string GetTimeString()
{
   time_t curr_time = time(nullptr);
   struct tm *format_time = localtime(&curr_time);
   if (format_time == nullptr)
       return "None";
   char buff[1024];
   snprintf(buff, sizeof(buff), "%d-%d-%d %d:%d:%d",
            format_time->tm_year + 1900,
            format_time->tm_mon + 1,
            format_time->tm_mday,
            format_time->tm_hour,
            format_time->tm_min,
            format_time->tm_sec);
   return buff;
}

void LogMessage(const std::string filename, int line, bool issave, int level, const char *format, ...)
{
   std::string levelstr = LevelToString(level);
   std::string timestr = GetTimeString();
   pid_t pid = getpid();

   char buff[1024];
   va_list arg;
   // int vsnprintf(char *str, size_t size, const char *format, va_list ap); // 使用可变参数
   va_start(arg, format);
   vsnprintf(buff, sizeof(buff), format, arg);
   va_end(arg);

   LockGuard lock(&mutex);
   std::string message = "[" + timestr + "]" + "[" + levelstr + "]" + "[pid:" + std::to_string(pid) + "]" + "[" + filename + "]" + "[" + std::to_string(line) + "] " + buff + '\n';
   if (issave == false)
       std::cout << message;
   else
       SaveToFile(message);
}

// 固定文件名和行数
#define LOG(level, format, ...)                                               \
   do                                                                        \
   {                                                                         \
       LogMessage(__FILE__, __LINE__, isSave, level, format, ##__VA_ARGS__); \
   } while (0)

#define EnableScreen()  \
   do                  \
   {                   \
       isSave = false; \
   } while (0)

#define EnableFile()   \
   do                 \
   {                  \
       isSave = true; \
   } while (0)

void Test(int num, ...)
{
   va_list arg;
   va_start(arg, num);
   while (num--)
   {
       int data = va_arg(arg, int);
       std::cout << data << " ";
   }
   std::cout << std::endl;
   va_end(arg);
}
  • Main.cc文件:
#include <iostream>
#include <string>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>

#include "Comm.hpp"
#include "Reactor.hpp"

#include "Connection.hpp"
#include "Listener.hpp"
#include "PackageParse.hpp"

int main(int argc, char *argv[])
{
   if (argc != 2)
   {
       std::cout << "Usage : ./reactor port" << std::endl;
       exit(USAGE_ERROR);
   }
   uint16_t serverport = std::stoi(argv[1]);

   std::unique_ptr<Reactor> svr = std::make_unique<Reactor>(); // 主服务

   IOService io(PackageParse::Parse); // 这里回调函数可以对报文进行解析

   Listener listener(serverport, io); // 负责连接模块

   // 注册进入
   // EPOLLET是添加到uint32_t events中的,不是options
   svr->AddConnection(listener.SockFd(), EPOLLIN | EPOLLET, std::bind(&Listener::Accepter, &listener, std::placeholders::_1), nullptr, nullptr);

   svr->Despatcher();

   return 0;
}
  • PackageParse.hpp文件:
#pragma once
#include "Connection.hpp"
#include "Protocol.hpp"
#include "Calculate.hpp"

using namespace protocol_ns;
class PackageParse
{
public:
   static void Parse(Connection *conn)
   {
       // 对数据进行解析
       // LOG(DEBUG, "inbuff : %s" , conn->Inbuffer().c_str());
       std::string package;
       Request req;
       Calculate cal;
       while (true)
       {
           package = Decode(conn->Inbuffer()); // 可能为空
           if (package.empty())
               break;
           cout << "after Decode recvmessage : " << conn->Inbuffer() << std::endl;

           // 完整的一条有效数据
           std::cout << "server Decode:" << package << std::endl;

           // 3.反序列化
           req.DeSerialize(package); // 把_x,_y,_oper赋值

           // 4.业务处理
           std::unique_ptr<Response> resptr = cal.Execute(req);

           // 5.序列化
           std::string sendmessage;
           resptr->Serialize(&sendmessage);
           std::cout << "server Serialize:" << sendmessage << std::endl;

           // 6.加上报头数据封装
           sendmessage = Encode(sendmessage);
           std::cout << "server Encode:" << sendmessage << std::endl;

           // // 7.发送数据
           // int n = sockfd->Send(sendmessage);

           // 把解析的数据放到Outbuffer -- Outbuffer并不是内核的输出缓冲区!
           conn->AppendOutbuff(sendmessage);
       }

       // 到这里,说明解析完成
       if (!conn->Outbuffer().empty())
       {
           conn->_sender(conn);
       }
   }
};
  • Protocol.hpp文件:
#pragma once

#include <string>
#include <jsoncpp/json/json.h>
#include <iostream>
#include <ctime>
#include <sys/types.h>
#include <string>
#include <unistd.h>

// #define SELF 1; // SELF=1就用自定义的序列化和反序列化,否则用默认的

// 表示层
namespace protocol_ns
{
   const std::string SEP = "\r\n";
   const std::string CAL_SEP = " ";

   // 对发送数据进行封装
   // "len\r\n{有效载荷}\r\n" -- 其中len是有效载荷的长度
   std::string Encode(const std::string &inbuff)
   {
       int inbuff_len = inbuff.size();
       std::string newstr = std::to_string(inbuff_len);
       newstr += SEP;
       newstr += inbuff;
       newstr += SEP;
       return newstr;
   }

   // 解析字符串
   std::string Decode(std::string &outbuff)
   {
       int pos = outbuff.find(SEP);
       if (pos == std::string::npos)
       {
           // 没找到分隔符
           return std::string(); // 返回空串,等待接收到完整数据
       }
       // 找到分隔符
       std::string len_str = outbuff.substr(0, pos);
       if (len_str.empty())
           return std::string(); // 返回空串,等待接收到完整数据
       int data_len = std::stoi(len_str);
       // 判断长度是否符合要求
       int total_len = pos + SEP.size() * 2 + data_len; // 包装好的一条数据的长度
       if (outbuff.size() < total_len)
       {
           return std::string(); // 小于包装好的一条数据的长度,返回空串,等待接收到完整数据
       }
       // 大于等于包装好的一条数据的长度
       std::string message = outbuff.substr(pos + SEP.size(), data_len); // 有效数据
       outbuff.erase(0, total_len);                                      // 数据长度减少包装好的一条数据的长度,从前面开始移除
       return message;
   }

   class Request
   {
   public:
       Request() {}
       Request(int x, int y, char oper)
           : _x(x),
             _y(y),
             _oper(oper)
       {
       }

       // 序列化 -- 转化为字符串发送
       // {"x":_x,"y":_y,"oper":_oper}
       // 这样发送可以吗?不行,不一定一次到达的数据刚好是1条,可能是半条,也可能是2条,因此我们需要对发送的数据进行封装:
       // "len\r\n{有效载荷}\r\n" -- 其中len是有效载荷的长度
       void Serialize(std::string *out) // 要带出来
       {
#ifdef SELF
           // "len\r\nx op y\r\n" -- 自定义序列化和反序列化
           std::string data_x = std::to_string(_x);
           std::string data_y = std::to_string(_y);
           *out = data_x + CAL_SEP + _oper + CAL_SEP + data_y;
#else
           Json::Value root;
           root["x"] = _x;
           root["y"] = _y;
           root["oper"] = _oper;

           Json::FastWriter writer;
           std::string str = writer.write(root);

           *out = str;
#endif
       }

       // 反序列化 -- 解析
       bool DeSerialize(const std::string &in)
       {
#ifdef SELF
           auto left_blank_pos = in.find(CAL_SEP);
           if (left_blank_pos == std::string::npos)
               return false;
           std::string x_str = in.substr(0, left_blank_pos);
           if (x_str.empty())
               return false;
           auto right_blank_pos = in.rfind(CAL_SEP);

           if (right_blank_pos == std::string::npos)
               return false;
           std::string y_str = in.substr(right_blank_pos + 1);
           if (y_str.empty())
               return false;
           if (left_blank_pos + 1 + CAL_SEP.size() != right_blank_pos)
               return false;

           _x = std::stoi(x_str);
           _y = std::stoi(y_str);
           _oper = in[right_blank_pos - 1];
           return true;

#else
           Json::Value root;
           Json::Reader reader;
           if (!reader.parse(in, root))
               return false;
           _x = root["x"].asInt();
           _y = root["y"].asInt();
           _oper = root["oper"].asInt();
           return true;
#endif
       }
       ~Request() {}

   public:
       int _x;
       int _y;
       char _oper; // +-*/% 如果不是这些操作法那就是非法的
   };

   class Response
   {
   public:
       Response() {}
       // 序列化 -- 转化为字符串发送
       void Serialize(std::string *out) // 要带出来
       {
#ifdef SELF
           // "len\r\nresult flag equation\r\n"
           std::string data_res = std::to_string(_result);
           std::string data_flag = std::to_string(_flag);
           *out = data_res + CAL_SEP + data_flag + CAL_SEP + _equation;
#else
           Json::Value root;
           root["result"] = _result;
           root["flag"] = _flag;
           root["equation"] = _equation;

           Json::FastWriter writer;
           std::string str = writer.write(root);

           *out = str;
#endif
       }

       // 反序列化 -- 解析
       bool DeSerialize(const std::string &in)
       {
#ifdef SELF
           // "result flag equation"

           auto left_blank_pos = in.find(CAL_SEP);
           if (left_blank_pos == std::string::npos)
               return false;
           std::string res_str = in.substr(0, left_blank_pos);
           if (res_str.empty())
               return false;

           auto second_blank_pos = in.find(CAL_SEP, left_blank_pos + 1);
           if (second_blank_pos == std::string::npos)
               return false;
           std::string equation = in.substr(second_blank_pos + 1);
           if (equation.empty())
               return false;

           if (left_blank_pos + 1 + CAL_SEP.size() != second_blank_pos)
               return false;
           _result = std::stoi(res_str);
           _flag = in[second_blank_pos - 1] - '0';
           _equation = equation;
           return true;
#else
           Json::Value root;
           Json::Reader reader;
           if (!reader.parse(in, root))
               return false;
           _result = root["result"].asInt();
           _flag = root["flag"].asInt();
           _equation = root["equation"].asString();
           return true;
#endif
       }
       ~Response() {}

   public:
       int _result = 0;
       int _flag = 0;                              // 0表示操作符正确,1表示除0错误,2表示取模0错误,3表示操作符错误
       std::string _equation = "操作符不符合要求"; // 等式
   };

   const std::string opers = "+-*/%&^";

   class CalFactory
   {
   public:
       CalFactory()
       {
           srand(time(nullptr) ^ getpid() ^ 2);
       }
       void Product(Request &req)
       {
           req._x = rand() & 5 + 1;
           usleep(req._x * 20);
           req._y = rand() % 10 + 5;
           // req._y = 0; // 测试
           usleep(req._x * req._y + 20);
           req._oper = opers[(rand() % opers.size())];
       }
       ~CalFactory() {}

   private:
   };
}

  • Makefile文件:
reactor:Main.cc
	g++ -o $@ $^ -std=c++14 -ljsoncpp
clean:
	rm -f reactor

整体代码


OKOK,Reactor 反应堆模式就到这里,如果你对Linux和C++也感兴趣的话,可以看看我的主页哦。下面是我的github主页,里面记录了我的学习代码和leetcode的一些题的题解,有兴趣的可以看看。

Xpccccc的github主页

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2173034.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Hive和Hadoop的招聘分析系统

本项目是一个基于大数据技术的招聘分析系统&#xff0c;旨在为用户提供全面的招聘信息和深入的职位市场分析。系统采用 Hadoop 平台进行大规模数据存储和处理&#xff0c;利用 MapReduce 进行数据分析和处理&#xff0c;通过 Sqoop 实现数据的导入导出&#xff0c;以 Spark 为核…

Text-to-SQL方法研究

1、面临的挑战 自然语言问题往往包含复杂的语言结构,如嵌套语句、倒装句和省略等,很难准确映射到SQL查询上。此外,自然语言本身就存在歧义,一个问题可能有多种解读。消除歧义需要深入的语言理解能力以及融入上下文和领域知识。 要生成正确的SQL查询,文本到SQL系统需要全面理解…

webpack 4 的 30 个步骤构建 react 开发环境

将 react 和 webpack4 进行结合&#xff0c;集 webpack 的优势于一身&#xff0c;从 0 开始构建一个强大的 react 开发环境。 其实很多人都有 一看就会&#xff0c;一做就废 的特点(当然也包括我在内)&#xff0c;这个时候&#xff0c;你需要制定一个略微详细的计划&#xff0…

Redis的基础认识与在ubuntu上的安装教程

来自Redis的自我介绍 我是Redis&#xff0c;一个中间件&#xff0c;职责是把数据存储在内存上&#xff0c;因此可以作为数据库、缓存、消息队列等场景使用。由于可以把数据存储在内存上&#xff0c;因此江湖人称快枪手 1.redis的功能特性 &#xff08;1&#xff09;数据在内存…

9.3 Linux_I/O_文件I/O相关函数

打开与关闭 1、打开文件 int open(const char *pathname, int flags); int open(const char *pathname, int flags, mode_t mode);返回值&#xff1a;成功返回文件描述符&#xff0c;失败返回EOF pathname&#xff1a;文件路径 flags&#xff1a;标志&#xff0c;其中O_RDO…

深入浅出CSS盒子模型

“批判他人总是想的太简单 剖析自己总是想的太困难” 文章目录 前言文章有误敬请斧正 不胜感恩&#xff01;什么是盒子模型&#xff1f;盒子模型的组成部分详解1. 内容区&#xff08;Content&#xff09;2. 内边距&#xff08;Padding&#xff09;3. 边框&#xff08;Border&am…

『功能项目』下载Mongodb【81】

下载网址&#xff1a;Download MongoDB Community Server | MongoDB 点击安装即可 选择Custom 此时安装已经完成 桌面会创建图标 检查是否配置好MongoDB 输入cmd命令行 Windows键 R 打开命令行 输入cmd 复制安装路径 复制data路径 如果输出一大串代码即配置mongdb成功

Mysql高级篇(中)——锁机制

锁机制 一、概述二、分类1、读锁2、写锁★、FOR SHARE / FOR UPDATE&#xff08;1&#xff09;NOWAIT&#xff08;2&#xff09;SKIP LOCKED&#xff08;3&#xff09;NOWAIT 和 SKIP LOCKED 的比较 ★、 脏写3、表级锁之 S锁 / X锁&#xff08;1&#xff09;总结&#xff08;2…

免费视频无损压缩工具+预览视频生成工具

视频无损压缩工具 功能与作用 &#xff1a;视频无损压缩工具是一种能够减少视频文件大小&#xff0c;但同时保持视频质量的工具。它通过先进的编码技术和算法&#xff0c;有效降低视频文件的存储空间&#xff0c;同时保证视频的清晰度和观感。这对于需要分享或存储大量视频内容…

ZLMediaKit快速上手【保姆级简单快速版】

一、前言 1、ZLMediaKit使用场景 最近在写一个摄像头检测的项目&#xff0c;其中需要做拉流测试&#xff0c;但是摄像头数量不够用&#xff0c;如果直接重复拉流可能会出现问题&#xff0c;使用ZLMediaKit&#xff08;一个基于C11的高性能运营级流媒体服务框架&#xff09;可…

对抗攻击方法详解:梯度攻击、转移攻击与模型集成攻击

对抗攻击方法详解&#xff1a;梯度攻击、转移攻击与模型集成攻击 近年来&#xff0c;随着深度学习模型在各个领域取得惊人突破&#xff0c;对抗攻击&#xff08;Adversarial Attack&#xff09; 逐渐成为研究热点。对抗攻击旨在通过在输入数据上施加精心设计的微小扰动&#x…

Doris安装部署指南

Doris安装部署指南 一、环境准备二、下载并解压安装包三、配置FE和BEFE配置BE配置四、验证集群状态五、集群扩容与缩容六、总结Apache Doris(原百度Palo)是一款基于MPP架构的高性能、实时的分析型数据库。它支持标准SQL,高度兼容MySQL协议,能够运行在绝大多数主流的商用服务…

第50篇 汇编语言实现中断<六>

Q&#xff1a;怎样设计汇编语言程序使用定时器中断实现实时时钟&#xff1f; A&#xff1a;此前我们曾使用轮询定时器I/O的方式实现实时时钟&#xff0c;而在本实验中将采用定时器中断的方式。新增的interval_timer.s间隔定时器的中断服务程序中增加了TIME变量&#xff0c;还更…

<<迷雾>> 第 1 章 了解计算机, 要从电开始 示例电路

简单灯泡电路 info::操作说明 灯的亮起有一定的延时, 需要过一会才逐渐亮起来 另: 可通过 “菜单–选项–显示电流” 控制是否显示电流 primary::在线交互操作链接 https://cc.xiaogd.net/?startCircuitLinkhttps://book.xiaogd.net/cyjsjdmw-examples/assets/circuit/cyjsjdm…

Unity XR 环境检测

需求&#xff1a; 检测环境是XR还是手机 代码&#xff1a; using UnityEngine.XR;public class EnvmentUtility {/// <summary>/// 是否是XR环境/// </summary>/// <returns>如果是XR&#xff0c;返回true&#xff0c;否则false</returns>public sta…

Gin框架简易搭建(3)--Grom与数据库

写在前面 项目地址 个人认为GORM 指南这个网站是相比较之下最为清晰的框架介绍 但是它在环境搭建阶段对于初学者而言不是很友好&#xff0c;尤其是使用mysql指令稍有不同&#xff0c;以及更新的方法和依赖问题都是很让人头疼的&#xff0c;而且这些报错并非逻辑上的&#xf…

linux 下的静态库与动态库

目录 一、介绍 1、静态库 2、动态库 二、操作 1、静态库 2、动态库 3、使用库文件 &#xff08;1&#xff09;方法一 &#xff08;2&#xff09;方法二 &#xff08;3&#xff09;方法三 一、介绍 1、静态库 静态链接库实现链接操作的方式很简单&#xff0c;即程序文…

vue启动报错

vue执行npm run dev报错如下 Error: error:0308010C:digital envelope routines::unsupportedat new Hash (node:internal/crypto/hash:69:19)at Object.createHash (node:crypto:133:10)at module.exports (F:\ray\taisheng-erp-frontend-master\node_modules\webpack\lib\ut…

深信服2025届全球校招研发笔试-C卷(AK)

前面14个填空题 T1 已知 子数组 定义为原数组中的一个连续子序列。现给定一个正整数数组 arr&#xff0c;请计算该数组内所有可能的奇数长度子数组的数值之和。 输入描述 输入一个正整数数组arr 输出描述 所有可能的奇数长度子数组的和 示例 1 输入 1,4,2,5,3 输出 58 说明 …

[论文精读]Polarized Graph Neural Networks

论文网址&#xff1a;Polarized Graph Neural Networks | Proceedings of the ACM Web Conference 2022 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于…