带你吃透Reactor并发模型

news2025/1/10 3:18:35

目录

    • 1.概述
    • 2.项目介绍
      • 2.1 有那些并发模型
      • 2.2 能锻炼那些技能
      • 2.3目录结构
    • 3.编码实践
      • 3.1 前期准备
        • 3.1.1 Echo协议
        • 3.1.2公共代码抽象
        • 3.1.3基准性能压测工具
      • 3.2 并发示例
        • 3.2.1 EpollReactorSingleProcess
        • 3.2.2 EpollReactorProcessPool
        • 3.2.3 EpollReactorThreadPool
        • 3.2.4 EpollReactorThreadHSHA
        • 3.2.5 EpollReactorThreadPoolMS
    • 4.服务压测

1.概述

最近有很多小伙伴私信问春哥是否有适合的C++项目,可用于学习Linux下的后端研发,并涵盖核心技术点。春哥的答复有,马上安排上,于是我连续几天爆肝,为大家编写了一个名为EchoServer的项目,供大家学习使用。

2.项目介绍

「回显服务是一个非常经典的网络服务,它的基准性能常用于评估一个RPC框架的性能」。在EchoServer项目中,我们将使用多种Reactor并发模型来实现回显服务,并使用基准性能压测工具对不同并发模型进行压测,以得到不同并发模型的基准性能指标,以便对比不同并发模型的优劣。这将有助于我们更好地理解和应用高性能并发模型。

2.1 有那些并发模型

我们一共实现了5种Reactor的并发模型:

  • 单进程版的Reactor
  • 进程池版的Reactor
  • 线程池版的Reactor
  • 线程池版的Reactor-HSHA
  • 线程池版的Reactor-MS

2.2 能锻炼那些技能

学习完本项目后,你将能够得到以下技术能力的良好锻炼,并加深对它们的理解:

  • 网络编程应用的实现和调试
  • 进程池的实现和使用
  • 线程池的实现和使用
  • 应用层协议设计与实现(编解码)的能力
  • 多种高效的Reactor并发模型的理解和应用
  • 基准性能压测工具的实现和使用,以评估系统的性能和稳定性

2.3目录结构

EchoServer项目的目录结构如下所示。

EchoServer
├── BenchMark
├── cmdline.cpp
├── cmdline.h
├── codec.hpp
├── common.hpp
├── conn.hpp
├── epollctl.hpp
├── EpollReactorProcessPool
├── EpollReactorSingleProcess
├── EpollReactorThreadPool
├── EpollReactorThreadPoolHSHA
├── EpollReactorThreadPoolMS
├── mp_account.png
└── README.md
  • BenchMark是基准性能压测工具的代码目录
  • EpollReactorProcessPool是Reactor进程池实现的代码目录
  • EpollReactorSingleProcess是Reactor单进程实现的代码目录
  • EpollReactorThreadPool是Reactor线程池实现的代码目录
  • EpollReactorThreadPoolHSHA是Reactor线程池HSHA实现的代码目录
  • EpollReactorThreadPoolMS是Reactor线程池MS实现的代码目录

3.编码实践

3.1 前期准备

在正式开始编写并发示例之前,我们需要做一些准备工作,以便后续更快地完成编码任务。这些准备工作包括:

  • 设计并实现应用层协议的编解码
  • 设计并实现基准性能测试工具
  • 抽象公共代码

3.1.1 Echo协议

EchoServer是一个回显服务,为了对外提供服务,我们需要定义应用层协议。EchoServer使用的协议非常简单,由两部分组成。第一部分是长度为4字节的协议头部,用于标识协议体的长度。第二部分是变长的协议体。协议格式如下图所示。
在这里插入图片描述
该协议的编解码代码codec.hpp如下所示。

#pragma once

#include <arpa/inet.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>

#include <list>
#include <string>

namespace EchoServer {
enum DecodeStatus {
  HEAD = 1,    // 解析协议头(协议体长度)
  BODY = 2,    // 解析协议体
  FINISH = 3,  // 完成解析
};

class Packet {
 public:
  Packet() = default;
  ~Packet() {
    if (data_) delete[] data_;
    len_ = 0;
  }
  void Alloc(size_t size) {
    if (data_) delete[] data_;
    len_ = size;
    data_ = new uint8_t[len_];
  }
  uint8_t *Data() { return data_; }
  ssize_t Len() { return len_; }

 public:
  uint8_t *data_{nullptr};  // 二进制缓冲区
  ssize_t len_{0};          // 缓冲区的长度
};

class Codec {
 public:
  ~Codec() {
    if (msg_) delete msg_;
  }
  void EnCode(const std::string &msg, Packet &pkt) {
    pkt.Alloc(msg.length() + 4);
    *(uint32_t *)pkt.Data() = htonl(msg.length());  // 协议体长度转换成网络字节序
    memmove(pkt.Data() + 4, msg.data(), msg.length());
  }
  void DeCode(uint8_t *data, size_t len) {
    uint32_t decodeLen = 0;  // 本次解析的字节长度
    reserved_.append((const char *)data, len);
    uint32_t curLen = reserved_.size();  // 还有多少字节需要解析
    data = (uint8_t *)reserved_.data();
    if (nullptr == msg_) msg_ = new std::string("");
    while (curLen > 0) {  // 只要还有未解析的网络字节流,就持续解析
      bool decodeBreak = false;
      if (HEAD == decode_status_) {  // 解析协议头
        decodeHead(&data, curLen, decodeLen, decodeBreak);
        if (decodeBreak) break;
      }
      if (BODY == decode_status_) {  // 解析完协议头,解析协议体
        decodeBody(&data, curLen, decodeLen, decodeBreak);
        if (decodeBreak) break;
      }
    }
    if (decodeLen > 0) {  // 删除本次解析完的数据
      reserved_.erase(0, decodeLen);
    }
    if (reserved_.size() <= 0) {  // 及时释放空间
      reserved_.reserve(0);
    }
  }
  bool GetMessage(std::string &msg) {
    if (nullptr == msg_) return false;
    if (decode_status_ != FINISH) return false;
    msg = *msg_;
    delete msg_;
    msg_ = nullptr;
    return true;
  }

 private:
  bool decodeHead(uint8_t **data, uint32_t &curLen, uint32_t &decodeLen, bool &decodeBreak) {
    if (curLen < 4) {  // head固定4个字节
      decodeBreak = true;
      return true;
    }
    body_len_ = ntohl(*(uint32_t *)(*data));
    curLen -= 4;
    (*data) += 4;
    decodeLen += 4;
    decode_status_ = BODY;
    return true;
  }
  bool decodeBody(uint8_t **data, uint32_t &curLen, uint32_t &decodeLen, bool &decodeBreak) {
    if (curLen < body_len_) {
      decodeBreak = true;
      return true;
    }
    msg_->append((const char *)*data, body_len_);
    curLen -= body_len_;
    (*data) += body_len_;
    decodeLen += body_len_;
    decode_status_ = FINISH;
    body_len_ = 0;
    return true;
  }

 private:
  DecodeStatus decode_status_{HEAD};  // 当前解析状态
  std::string reserved_;              // 未解析的网络字节流
  uint32_t body_len_{0};              // 当前消息的协议体长度
  std::string *msg_{nullptr};         // 解析的消息
};
}  // namespace EchoServer

我们使用了不到120行的代码,实现了EchoServer应用层协议的编解码。其中,Codec类用于消息的编解码,「采用了状态机的方法来解析请求数据,并提供了流式解析的DeCode函数」;Packet类实现了二进制数据包的封装。

3.1.2公共代码抽象

我们还对消息的接收和发送、socket选项设置、创建用于监听的socket、客户端连接的接受等操作进行了函数封装。对应的代码在common.hpp文件中,内容如下。

#pragma once

#include <assert.h>
#include <fcntl.h>
#include <sys/sysinfo.h>

#include <functional>

#include "codec.hpp"

namespace EchoServer {
// 获取系统有多少个可用的cpu
int GetNProcs() { return get_nprocs(); }

// 用于阻塞IO模式下发送应答消息
bool SendMsg(int fd, const std::string message) {
  EchoServer::Packet pkt;
  EchoServer::Codec codec;
  codec.EnCode(message, pkt);
  ssize_t sendLen = 0;
  while (sendLen != pkt.Len()) {
    ssize_t ret = write(fd, pkt.Data() + sendLen, pkt.Len() - sendLen);
    if (ret < 0) {
      if (errno == EINTR) continue;  // 中断的情况可以重试
      perror("write failed");
      return false;
    }
    sendLen += ret;
  }
  return true;
}

// 用于阻塞IO模式下接收请求消息
bool RecvMsg(int fd, std::string &message) {
  uint8_t data[4 * 1024];
  EchoServer::Codec codec;
  while (not codec.GetMessage(message)) {    // 只要还没获取到一个完整的消息,则一直循环
    ssize_t ret = read(fd, data, 4 * 1024);  // 一次最多读取4K
    if (ret <= 0) {
      if (errno == EINTR) continue;  // 中断的情况可以重试
      perror("read failed");
      return false;
    }
    codec.DeCode(data, ret);
  }
  return true;
}

void SetNotBlock(int fd) {
  int oldOpt = fcntl(fd, F_GETFL);
  assert(oldOpt != -1);
  assert(fcntl(fd, F_SETFL, oldOpt | O_NONBLOCK) != -1);
}

void SetTimeOut(int fd, int64_t sec, int64_t usec) {
  struct timeval tv;
  tv.tv_sec = sec;    //秒
  tv.tv_usec = usec;  //微秒,1秒等于10的6次方微秒
  assert(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) != -1);
  assert(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) != -1);
}

int CreateListenSocket(const char *ip, int port, bool isReusePort) {
  sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_port = htons(port);
  addr.sin_addr.s_addr = inet_addr(ip);
  int sockFd = socket(AF_INET, SOCK_STREAM, 0);
  if (sockFd < 0) {
    perror("socket failed");
    return -1;
  }
  int reuse = 1;
  int opt = SO_REUSEADDR;
  if (isReusePort) opt = SO_REUSEPORT;
  if (setsockopt(sockFd, SOL_SOCKET, opt, &reuse, sizeof(reuse)) != 0) {
    perror("setsockopt failed");
    return -1;
  }
  if (bind(sockFd, (sockaddr *)&addr, sizeof(addr)) != 0) {
    perror("bind failed");
    return -1;
  }
  if (listen(sockFd, 1024) != 0) {
    perror("listen failed");
    return -1;
  }
  return sockFd;
}

// 调用本函数之前需要把sockFd设置成非阻塞的
void LoopAccept(int sockFd, int maxConn, std::function<void(int clientFd)> clientAcceptCallBack) {
  while (maxConn--) {
    int clientFd = accept(sockFd, NULL, 0);
    if (clientFd > 0) {
      clientAcceptCallBack(clientFd);
      continue;
    }
    if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
      perror("accept failed");
    }
    break;
  }
}
}  // namespace EchoServer

在后续的并发实例编码中,我们将使用codec.hpp和common.hpp中封装的类或函数。大家可以先熟悉这些函数实现的功能,以提高后续代码阅读的效率并减少困惑。

由于后续所有的并发模型示例程序都涉及到epoll,因此我们对一些公共代码进行了提炼和封装。「这样可以减少后续示例程序的代码量,同时也能让我们更加聚焦于核心的逻辑」。具体来说,我们封装了两个头文件:conn.hpp和epollctl.hpp。

其中,conn.hpp封装了客户端连接管理相关代码,epollctl.hpp封装了epoll事件管理相关的代码,它们的内容如下。
conn.hpp

#pragma once

#include "common.hpp"

namespace EchoServer {
class Conn {
 public:
  Conn(int fd, int epoll_fd, bool is_multi_io) : fd_(fd), epoll_fd_(epoll_fd), is_multi_io_(is_multi_io) {}
  bool Read() {
    do {
      uint8_t data[100];
      ssize_t ret = read(fd_, data, 100);  // 一次最多读取100字节
      if (ret == 0) {
        perror("peer close connection");
        return false;
      }
      if (ret < 0) {
        if (EINTR == errno) continue;
        if (EAGAIN == errno or EWOULDBLOCK == errno) return true;
        perror("read failed");
        return false;
      }
      codec_.DeCode(data, ret);
    } while (is_multi_io_);
    return true;
  }
  bool Write(bool autoEnCode = true) {
    if (autoEnCode && 0 == send_len_) {  // 需要自动编码时,且是第一次调用Write时,才执行EnCode操作
      codec_.EnCode(message_, pkt_);
    }
    do {
      if (send_len_ == pkt_.Len()) return true;
      ssize_t ret = write(fd_, pkt_.Data() + send_len_, pkt_.Len() - send_len_);
      if (ret < 0) {
        if (EINTR == errno) continue;
        if (EAGAIN == errno && EWOULDBLOCK == errno) return true;
        perror("write failed");
        return false;
      }
      send_len_ += ret;
    } while (is_multi_io_);
    return true;
  }
  bool OneMessage() { return codec_.GetMessage(message_); }
  void EnCode() { codec_.EnCode(message_, pkt_); }
  bool FinishWrite() { return send_len_ == pkt_.Len(); }
  int Fd() { return fd_; }
  int EpollFd() { return epoll_fd_; }

 private:
  int fd_{0};            // 关联的客户端连接fd
  int epoll_fd_{0};      // 关联的epoll实例的fd
  bool is_multi_io_;     // 是否做多次io,直到返回EAGAIN或者EWOULDBLOCK
  ssize_t send_len_{0};  // 要发送的应答数据的长度
  std::string message_;  // 对于EchoServer来说,即是获取的请求消息,也是要发送的应答消息
  Packet pkt_;           // 发送应答消息的二进制数据包
  Codec codec_;          // EchoServer协议的编解码
};
}  // namespace EchoServer

在conn.hpp头文件中,我们定义了一个名为Conn的类,该类封装了客户端连接数据的接收和发送、EchoServer协议的编解码等操作,并且可以关联epoll实例的fd。

通过Conn类,我们可以方便地管理客户端连接,从而实现高效地处理多个客户端请求。
epollctl.hpp

#pragma once

#include "conn.hpp"

namespace EchoServer {

inline void AddReadEvent(Conn *conn, bool isET = false, bool isOneShot = false) {
  epoll_event event;
  event.data.ptr = (void *)conn;
  event.events = EPOLLIN;
  if (isET) event.events |= EPOLLET;
  if (isOneShot) event.events |= EPOLLONESHOT;
  assert(epoll_ctl(conn->EpollFd(), EPOLL_CTL_ADD, conn->Fd(), &event) != -1);
}

inline void AddReadEvent(int epollFd, int fd, void *userData) {
  epoll_event event;
  event.data.ptr = userData;
  event.events = EPOLLIN;
  assert(epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &event) != -1);
}

inline void ReStartReadEvent(Conn *conn) {
  epoll_event event;
  event.data.ptr = (void *)conn;
  event.events = EPOLLIN | EPOLLONESHOT;
  assert(epoll_ctl(conn->EpollFd(), EPOLL_CTL_MOD, conn->Fd(), &event) != -1);
}

inline void ModToWriteEvent(Conn *conn, bool isET = false) {
  epoll_event event;
  event.data.ptr = (void *)conn;
  event.events = EPOLLOUT;
  if (isET) event.events |= EPOLLET;
  assert(epoll_ctl(conn->EpollFd(), EPOLL_CTL_MOD, conn->Fd(), &event) != -1);
}

inline void ModToWriteEvent(int epollFd, int fd, void *userData) {
  epoll_event event;
  event.data.ptr = userData;
  event.events = EPOLLOUT;
  assert(epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &event) != -1);
}

inline void ClearEvent(Conn *conn, bool isClose = true) {
  assert(epoll_ctl(conn->EpollFd(), EPOLL_CTL_DEL, conn->Fd(), NULL) != -1);
  if (isClose) close(conn->Fd());  // close操作需要EPOLL_CTL_DEL之后调用,否则调用epoll_ctl()删除fd会失败
}

inline void ClearEvent(int epollFd, int fd) {
  assert(epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, NULL) != -1);
  close(fd);  // close操作需要EPOLL_CTL_DEL之后调用,否则调用epoll_ctl()删除fd会失败
}
}  // namespace EchoServer

在epollctl.hpp头文件中,我们封装了可读事件的监听、可写事件的监听以及监听事件的清理等操作。通过这些封装好的函数,我们可以方便地管理epoll事件,从而实现高效地处理多个事件。

3.1.3基准性能压测工具

正所谓工欲善其事必先利其器,我们必须准备好压测工具,才能更好地评估不同并发模型的优劣。由于我们使用的自定义的应用层协议,因此,我们需要自己来实现压测工具。

benchmark工具的代码在benchmark.cpp文件中,内容如下。

#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

#include <iostream>
#include <mutex>
#include <string>
#include <thread>

#include "../cmdline.h"
#include "../common.hpp"

using namespace std;

typedef struct Stat {
  int sum{0};
  int success{0};
  int failure{0};
  int spendms{0};
} Stat;

std::mutex Mutex;
Stat FinalStat;

bool getConnection(sockaddr_in &addr, int &sockFd) {
  sockFd = socket(AF_INET, SOCK_STREAM, 0);
  if (sockFd < 0) {
    perror("socket failed");
    return false;
  }
  int ret = connect(sockFd, (sockaddr *)&addr, sizeof(addr));
  if (ret < 0) {
    perror("connect failed");
    close(sockFd);
    return false;
  }
  struct linger lin;
  lin.l_onoff = 1;
  lin.l_linger = 0;
  // 设置调用close关闭tcp连接时,直接发送RST包,tcp连接直接复位,进入到closed状态。
  if (0 == setsockopt(sockFd, SOL_SOCKET, SO_LINGER, &lin, sizeof(lin))) {
    return true;
  }
  perror("setsockopt failed");
  close(sockFd);
  return false;
}

int64_t getSpendMs(timeval begin, timeval end) {
  end.tv_sec -= begin.tv_sec;
  end.tv_usec -= begin.tv_usec;
  if (end.tv_usec <= 0) {
    end.tv_sec -= 1;
    end.tv_usec += 1000000;
  }
  return end.tv_sec * 1000 + end.tv_usec / 1000;  //计算运行的时间,单位ms
}

void client(int theadId, Stat *curStat, int port, int size, int concurrency) {
  int sum = 0;
  int success = 0;
  int failure = 0;
  int spendms = 0;
  sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_port = htons(port);
  addr.sin_addr.s_addr = inet_addr(std::string("127.0.0." + std::to_string(theadId + 1)).c_str());
  std::string message(size - 4, 'a');  // 去掉4个字节的协议头
  concurrency /= 10;                   // 每个线程的并发数
  int *sockFd = new int[concurrency];
  timeval end;
  timeval begin;
  gettimeofday(&begin, NULL);
  for (int i = 0; i < concurrency; i++) {
    if (not getConnection(addr, sockFd[i])) {
      sockFd[i] = 0;
      failure++;
    }
  }
  auto failureDeal = [&sockFd, &failure](int i) {
    close(sockFd[i]);
    sockFd[i] = 0;
    failure++;
  };
  std::cout << "threadId[" << theadId << "] finish connection" << std::endl;
  for (int i = 0; i < concurrency; i++) {
    if (sockFd[i]) {
      if (not EchoServer::SendMsg(sockFd[i], message)) {
        failureDeal(i);
      }
    }
  }
  std::cout << "threadId[" << theadId << "] finish send message" << std::endl;
  for (int i = 0; i < concurrency; i++) {
    if (sockFd[i]) {
      std::string respMessage;
      if (not EchoServer::RecvMsg(sockFd[i], respMessage)) {
        failureDeal(i);
        continue;
      }
      if (respMessage != message) {
        failureDeal(i);
        continue;
      }
      close(sockFd[i]);
      success++;
    }
  }
  delete[] sockFd;
  std::cout << "threadId[" << theadId << "] finish recv message" << std::endl;
  sum = success + failure;
  gettimeofday(&end, NULL);
  spendms = getSpendMs(begin, end);
  std::lock_guard<std::mutex> guard(Mutex);
  curStat->sum += sum;
  curStat->success += success;
  curStat->failure += failure;
  curStat->spendms += spendms;
}

void UpdateFinalStat(Stat stat) {
  FinalStat.sum += stat.sum;
  FinalStat.success += stat.success;
  FinalStat.failure += stat.failure;
  FinalStat.spendms += stat.spendms;
}

void usage() {
  cout << "./BenchMark -port 1688 -size 4 -concurrency 10000 -runtime 60" << endl;
  cout << "options:" << endl;
  cout << "    -h,--help                    print usage" << endl;
  cout << "    -port,--port                 listen port" << endl;
  cout << "    -size,--size                 echo message size, unit is kbyte" << endl;
  cout << "    -concurrency,--concurrency   concurrency" << endl;
  cout << "    -runtime,--runtime           run time, unit is second" << endl;
  cout << endl;
}

int main(int argc, char *argv[]) {
  int64_t port;
  int64_t size;
  int64_t concurrency;
  int64_t runtime;
  CmdLine::Int64OptRequired(&port, "port");
  CmdLine::Int64OptRequired(&size, "size");
  CmdLine::Int64OptRequired(&concurrency, "concurrency");
  CmdLine::Int64OptRequired(&runtime, "runtime");
  CmdLine::SetUsage(usage);
  CmdLine::Parse(argc, argv);

  timeval end;
  timeval runBeginTime;
  gettimeofday(&runBeginTime, NULL);
  int runRoundCount = 0;
  while (true) {
    Stat curStat;
    std::thread threads[10];
    for (int threadId = 0; threadId < 10; threadId++) {
      threads[threadId] = std::thread(client, threadId, &curStat, port, size, concurrency);
    }
    for (int threadId = 0; threadId < 10; threadId++) {
      threads[threadId].join();
    }
    runRoundCount++;
    curStat.spendms /= 10;  // 取平均耗时
    UpdateFinalStat(curStat);
    gettimeofday(&end, NULL);
    std::cout << "round " << runRoundCount << " spend " << curStat.spendms << " ms. " << std::endl;
    if (getSpendMs(runBeginTime, end) >= runtime * 1000) {
      break;
    }
    sleep(2);  // 间隔2秒,再发起下一轮压测,这样压测结果更稳定
  }
  std::cout << "total spend " << FinalStat.spendms << " ms. avg spend " << FinalStat.spendms / runRoundCount
            << " ms. sum[" << FinalStat.sum << "],success[" << FinalStat.success << "],failure[" << FinalStat.failure
            << "]" << std::endl;
  return 0;
}

benchmark工具支持指定端口、请求包大小(单位为K)、压测的并发量和压测运行的总时长(单位秒),默认连接的是本地127.0.0.X的ip地址。每一批次的压测会创建10个线程,并发发起connection、SendMsg和RecvMsg的操作。

每个线程执行完之后,会更新统计信息。由于统计数据存在并发访问,所以需要使用互斥锁来保护临界区。每一批次的压测都会在主线程中调用join函数,等待所有的压测线程执行完毕。压测运行结束之后,会打印压测总耗时、单批次平均耗时、请求总数、请求成功总数和请求失败总数。

需要特别注意的一点是,在创建完连接之后,需要设置LINGER选项。这样,在调用close函数关闭tcp连接时,会直接发送RST包,让tcp连接直接复位,进入到CLOSED状态,从而不影响下一轮次的压测。否则,很多本地端口会因为tcp连接处于TIME_WAIT状态中而不可用。

总之,在进行压测时,需要注意tcp连接的状态和资源的释放,以确保程序的正确性和可靠性。

3.2 并发示例

现在我们正式进入并发示例的编程。

3.2.1 EpollReactorSingleProcess

Reactor模型使用事件进行驱动,有统一事件管理器,支持事件监听管理和事件触发时的分发。当有新的连接到来、可读、可写的事件发生时,会分发对应的事件到不同的处理器中。

在Reactor并发模型中,所有的读写都是非阻塞的。只要读写操作返回EAGAIN或者EWOULDBLOCK,就结束当前的读写操作,然后继续监听新事件的到来。

因此,在同一个时间点,多个客户端的请求都在被处理,只不过是不同的客户端请求的处理进展不一样。比如,有的客户端已经读取完请求数据在做业务逻辑处理,有的客户端请求数据还只读取了一半。这样一来,服务就能更充分地利用CPU,服务整体吞吐量更大。

Reactor并发模型虽然提升了服务吞吐量,但是需要付出更多的“成本”。这些成本包括需要额外对客户端连接进行管理,需要使用更多的内存来保存请求和应答的数据,需要管理客户端连接状态迁移,并且处理请求不像阻塞IO那样是串行连续的,而是在不同事件处理过程中断断续续的推进,因此代码维护成本更高。

现在让我们来看一下具体实现,对应的代码在epollreactorsingleprocess.cpp文件中,内容如下。

#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#include <iostream>
#include <string>

#include "../cmdline.h"
#include "../epollctl.hpp"

using namespace std;

void usage() {
  cout << "./EpollReactorSingleProcess -ip 0.0.0.0 -port 1688" << endl;
  cout << "options:" << endl;
  cout << "    -h,--help      print usage" << endl;
  cout << "    -ip,--ip       listen ip" << endl;
  cout << "    -port,--port   listen port" << endl;
  cout << endl;
}

int main(int argc, char *argv[]) {
  string ip;
  int64_t port;
  CmdLine::StrOptRequired(&ip, "ip");
  CmdLine::Int64OptRequired(&port, "port");
  CmdLine::SetUsage(usage);
  CmdLine::Parse(argc, argv);
  int sockFd = EchoServer::CreateListenSocket(ip.c_str(), port, false);
  if (sockFd < 0) {
    return -1;
  }
  epoll_event events[2048];
  int epollFd = epoll_create(1024);
  if (epollFd < 0) {
    perror("epoll_create failed");
    return -1;
  }
  EchoServer::Conn conn(sockFd, epollFd, true);
  EchoServer::SetNotBlock(sockFd);
  EchoServer::AddReadEvent(&conn);
  while (true) {
    int num = epoll_wait(epollFd, events, 2048, -1);
    if (num < 0) {
      perror("epoll_wait failed");
      continue;
    }
    for (int i = 0; i < num; i++) {
      EchoServer::Conn *conn = (EchoServer::Conn *)events[i].data.ptr;
      if (conn->Fd() == sockFd) {
        EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {
          EchoServer::Conn *conn = new EchoServer::Conn(clientFd, epollFd, true);
          EchoServer::SetNotBlock(clientFd);
          EchoServer::AddReadEvent(conn);  // 监听可读事件
        });
        continue;
      }
      auto releaseConn = [&conn]() {
        EchoServer::ClearEvent(conn);
        delete conn;
      };
      if (events[i].events & EPOLLIN) {  // 可读
        if (not conn->Read()) {          // 执行读失败
          releaseConn();
          continue;
        }
        if (conn->OneMessage()) {             // 判断是否要触发写事件
          EchoServer::ModToWriteEvent(conn);  // 修改成只监控可写事件
        }
      }
      if (events[i].events & EPOLLOUT) {  // 可写
        if (not conn->Write()) {          // 执行写失败
          releaseConn();
          continue;
        }
        if (conn->FinishWrite()) {  // 完成了请求的应答写,则可以释放连接
          releaseConn();
        }
      }
    }
  }
  return 0;
}

在main函数中,首先开启监听,然后陷入死循环,在循环中调用epoll_wait函数。当接收到不同的事件时,执行不同的处理逻辑。我们使用Conn类的对象来管理客户端连接,Conn对象的状态随着事件的触发而迁移。一个完整请求的处理过程是在不同读写函数之间跳跃。

3.2.2 EpollReactorProcessPool

在Reactor模式下,所有的IO操作都是非阻塞的,CPU已经被充分的“压榨”。在多核情况下,我们可以启动多个进程来提升并发处理能力。对应的代码在epollreactorprocesspoll.cpp文件中,内容如下。

#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#include <iostream>

#include "../cmdline.h"
#include "../epollctl.hpp"

using namespace std;

void handler(const char *ip, int port) {
  int sockFd = EchoServer::CreateListenSocket(ip, port, true);
  if (sockFd < 0) {
    return;
  }
  epoll_event events[2048];
  int epollFd = epoll_create(1024);
  if (epollFd < 0) {
    perror("epoll_create failed");
    return;
  }
  EchoServer::Conn conn(sockFd, epollFd, true);
  EchoServer::SetNotBlock(sockFd);
  EchoServer::AddReadEvent(&conn);
  while (true) {
    int num = epoll_wait(epollFd, events, 2048, -1);
    if (num < 0) {
      perror("epoll_wait failed");
      continue;
    }
    for (int i = 0; i < num; i++) {
      EchoServer::Conn *conn = (EchoServer::Conn *)events[i].data.ptr;
      if (conn->Fd() == sockFd) {
        EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {
          EchoServer::Conn *conn = new EchoServer::Conn(clientFd, epollFd, true);
          EchoServer::SetNotBlock(clientFd);
          EchoServer::AddReadEvent(conn);  // 监听可读事件
        });
        continue;
      }
      auto releaseConn = [&conn]() {
        EchoServer::ClearEvent(conn);
        delete conn;
      };
      if (events[i].events & EPOLLIN) {  // 可读
        if (not conn->Read()) {          // 执行非阻塞读
          releaseConn();
          continue;
        }
        if (conn->OneMessage()) {             // 判断是否要触发写事件
          EchoServer::ModToWriteEvent(conn);  // 修改成只监控可写事件
        }
      }
      if (events[i].events & EPOLLOUT) {  // 可写
        if (not conn->Write()) {          // 执行非阻塞写
          releaseConn();
          continue;
        }
        if (conn->FinishWrite()) {  // 完成了请求的应答写,则可以释放连接
          releaseConn();
        }
      }
    }
  }
}

void usage() {
  cout << "./EpollReactorProcessPool -ip 0.0.0.0 -port 1688" << endl;
  cout << "options:" << endl;
  cout << "    -h,--help      print usage" << endl;
  cout << "    -ip,--ip       listen ip" << endl;
  cout << "    -port,--port   listen port" << endl;
  cout << endl;
}

int main(int argc, char *argv[]) {
  string ip;
  int64_t port;
  CmdLine::StrOptRequired(&ip, "ip");
  CmdLine::Int64OptRequired(&port, "port");
  CmdLine::SetUsage(usage);
  CmdLine::Parse(argc, argv);
  for (int i = 0; i < EchoServer::GetNProcs(); i++) {
    pid_t pid = fork();
    if (pid < 0) {
      perror("call fork failed.");
      continue;
    }
    if (0 == pid) {  // 子进程
      handler(ip.c_str(), port);
      exit(0);
    }
    // 执行到这里就是父进程
  }
  while (true) sleep(1);  // 父进程陷入死循环
  return 0;
}

在main函数中,我们根据系统当前可用的CPU核数,预先创建数量与之相等的子进程。然后,父进程陷入死循环。每个子进程都创建自己的socket套接字,并设置SO_REUSEPORT选项,以便在相同的网络地址开启监听。最后,子进程陷入epoll的死循环,等待客户端请求的到来,并为其提供服务。

3.2.3 EpollReactorThreadPool

对应的代码在epollreactorthreadpoll.cpp文件中,内容如下。

#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#include <iostream>
#include <thread>

#include "../cmdline.h"
#include "../epollctl.hpp"

using namespace std;

void handler(const char *ip, int port) {
  int sockFd = EchoServer::CreateListenSocket(ip, port, true);
  if (sockFd < 0) {
    return;
  }
  epoll_event events[2048];
  int epollFd = epoll_create(1024);
  if (epollFd < 0) {
    perror("epoll_create failed");
    return;
  }
  EchoServer::Conn conn(sockFd, epollFd, true);
  EchoServer::SetNotBlock(sockFd);
  EchoServer::AddReadEvent(&conn);
  while (true) {
    int num = epoll_wait(epollFd, events, 2048, -1);
    if (num < 0) {
      perror("epoll_wait failed");
      continue;
    }
    for (int i = 0; i < num; i++) {
      EchoServer::Conn *conn = (EchoServer::Conn *)events[i].data.ptr;
      if (conn->Fd() == sockFd) {
        EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {
          EchoServer::Conn *conn = new EchoServer::Conn(clientFd, epollFd, true);
          EchoServer::SetNotBlock(clientFd);
          EchoServer::AddReadEvent(conn);  // 监听可读事件
        });
        continue;
      }
      auto releaseConn = [&conn]() {
        EchoServer::ClearEvent(conn);
        delete conn;
      };
      if (events[i].events & EPOLLIN) {  // 可读
        if (not conn->Read()) {          // 执行非阻塞读
          releaseConn();
          continue;
        }
        if (conn->OneMessage()) {             // 判断是否要触发写事件
          EchoServer::ModToWriteEvent(conn);  // 修改成只监控可写事件
        }
      }
      if (events[i].events & EPOLLOUT) {  // 可写
        if (not conn->Write()) {          // 执行非阻塞写
          releaseConn();
          continue;
        }
        if (conn->FinishWrite()) {  // 完成了请求的应答写,则可以释放连接
          releaseConn();
        }
      }
    }
  }
}

void usage() {
  cout << "./EpollReactorThreadPool -ip 0.0.0.0 -port 1688" << endl;
  cout << "options:" << endl;
  cout << "    -h,--help      print usage" << endl;
  cout << "    -ip,--ip       listen ip" << endl;
  cout << "    -port,--port   listen port" << endl;
  cout << endl;
}

int main(int argc, char *argv[]) {
  string ip;
  int64_t port;
  CmdLine::StrOptRequired(&ip, "ip");
  CmdLine::Int64OptRequired(&port, "port");
  CmdLine::SetUsage(usage);
  CmdLine::Parse(argc, argv);
  for (int i = 0; i < EchoServer::GetNProcs(); i++) {
    std::thread(handler, ip.c_str(), port).detach();  // 这里需要调用detach,让创建的线程独立运行
  }
  while (true) sleep(1);  // 主线程陷入死循环
  return 0;
}

在main函数中,我们根据系统当前可用的CPU核数,预先创建数量与之相等的wroker线程。然后,主线程陷入死循环。每个worker线程都创建自己的socket套接字,并设置SO_REUSEPORT选项,以便在相同的网络地址开启监听。最后,worker线程陷入epoll的死循环,等待客户端请求的到来,并为其提供服务。

3.2.4 EpollReactorThreadHSHA

前面所有的并发模型,不管是多线程、多进程、线程池还是进程池,都是同步的。网络IO操作和业务逻辑操作都在同一个线程中进行。

但是,还有一种半同步半异步的并发模型,它将网络IO操作和业务逻辑操作隔离开来,并在它们之间插入一个共享队列用于通讯。

这样,整个并发模型就被分成了三层,分别为网络IO层、共享队列层和业务逻辑层。

在HSHA(Half Sync/Half Async)模型中,半同步指的是业务逻辑层的操作。

而半异步指的是,从业务逻辑层的视角来看,IO读写不是它自己完成的,而是通过共享队列层最后交给网络IO层来完成。因此,该并发模型被称为半同步半异步模型。需要注意的是,这里的异步并不是指异步IO,网络层的IO操作仍然是同步的。

Reactor-HSHA的并发模型该如何实现呢?我们可以看一下对应的代码epollreactorthreadpoolhsha.cpp,具体内容如下。

#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

#include "../cmdline.h"
#include "../epollctl.hpp"

using namespace std;

std::mutex Mutex;
std::condition_variable Cond;
std::queue<EchoServer::Conn *> Queue;

void pushInQueue(EchoServer::Conn *conn) {
  {
    std::unique_lock<std::mutex> locker(Mutex);
    Queue.push(conn);
  }
  Cond.notify_one();
}

EchoServer::Conn *getQueueData() {
  std::unique_lock<std::mutex> locker(Mutex);
  Cond.wait(locker, []() -> bool { return Queue.size() > 0; });
  EchoServer::Conn *conn = Queue.front();
  Queue.pop();
  return conn;
}

void workerHandler() {
  while (true) {
    EchoServer::Conn *conn = getQueueData();  // 从共享的输入队列中获取请求,有锁
    conn->EnCode();
    EchoServer::ModToWriteEvent(conn);  // 修改成监听写事件,应答数据通过epoll的io线程来发送
  }
}

void ioHandler(const char *ip, int port) {
  int sockFd = EchoServer::CreateListenSocket(ip, port, true);
  if (sockFd < 0) {
    return;
  }
  epoll_event events[2048];
  int epollFd = epoll_create(1024);
  if (epollFd < 0) {
    perror("epoll_create failed");
    return;
  }
  EchoServer::Conn conn(sockFd, epollFd, true);
  EchoServer::SetNotBlock(sockFd);
  EchoServer::AddReadEvent(&conn);
  int msec = -1;
  while (true) {
    int num = epoll_wait(epollFd, events, 2048, msec);
    if (num < 0) {
      perror("epoll_wait failed");
      continue;
    }
    for (int i = 0; i < num; i++) {
      EchoServer::Conn *conn = (EchoServer::Conn *)events[i].data.ptr;
      if (conn->Fd() == sockFd) {
        EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {
          EchoServer::Conn *conn = new EchoServer::Conn(clientFd, epollFd, true);
          EchoServer::SetNotBlock(clientFd);
          EchoServer::AddReadEvent(conn, false, true);  // 监听可读事件,开启oneshot
        });
        continue;
      }
      auto releaseConn = [&conn]() {
        EchoServer::ClearEvent(conn);
        delete conn;
      };
      if (events[i].events & EPOLLIN) {  // 可读
        if (not conn->Read()) {          // 执行非阻塞read
          releaseConn();
          continue;
        }
        if (conn->OneMessage()) {
          pushInQueue(conn);  // 入共享输入队列,有锁
        } else {
          EchoServer::ReStartReadEvent(conn);  // 还没收到完整的请求,则重新启动可读事件的监听,携带oneshot选项
        }
      }
      if (events[i].events & EPOLLOUT) {  // 可写
        if (not conn->Write(false)) {     // 执行非阻塞write
          releaseConn();
          continue;
        }
        if (conn->FinishWrite()) {  // 完成了请求的应答写,则可以释放连接close
          releaseConn();
        }
      }
    }
  }
}

void usage() {
  cout << "./EpollReactorThreadPoolHSHA -ip 0.0.0.0 -port 1688" << endl;
  cout << "options:" << endl;
  cout << "    -h,--help      print usage" << endl;
  cout << "    -ip,--ip       listen ip" << endl;
  cout << "    -port,--port   listen port" << endl;
  cout << endl;
}

int main(int argc, char *argv[]) {
  string ip;
  int64_t port;
  CmdLine::StrOptRequired(&ip, "ip");
  CmdLine::Int64OptRequired(&port, "port");
  CmdLine::SetUsage(usage);
  CmdLine::Parse(argc, argv);
  for (int i = 0; i < EchoServer::GetNProcs(); i++) {  // 创建worker线程
    std::thread(workerHandler).detach();               // 这里需要调用detach,让创建的线程独立运行
  }
  for (int i = 0; i < EchoServer::GetNProcs(); i++) {   // 创建io线程
    std::thread(ioHandler, ip.c_str(), port).detach();  // 这里需要调用detach,让创建的线程独立运行
  }
  while (true) sleep(1);  // 主线程陷入死循环
  return 0;
}

在main函数中,我们根据系统当前可用的CPU核数,预先创建数量与之相等的worker线程和io线程。然后,主线程陷入死循环。每个io线程都创建自己的socket套接字,并设置SO_REUSEPORT选项,以便在相同的网络地址开启监听。最后,io线程陷入epoll的死循环。

io线程负责监听客户端连接的到来、客户端可读和可写事件。当io线程接收完数据并解析出一个完整的请求时,它会将消息插入到共享的队列中,并通过条件变量来唤醒一个工作线程来处理请求。

worker线程启动后会等待条件变量的通知。为了避免共享队列中数据读取的异常,等待条件变量时需要再判断共享队列中的数据量必须大于0才可以返回。worker线程获取到请求数据后,会对应答数据进行编码。然后,它注册监听客户端连接的可写事件,由io线程去完成应答数据的发送。

3.2.5 EpollReactorThreadPoolMS

Reactor并发模型还有一种变种,它将客户端连接的接受放在单独的MainReactor中,MainReactor再将客户端连接移交给SubReactor进行读写操作的处理。

使用单独的线程来接受客户端连接可以更快地为新的客户端提供服务,因为同时处理的客户端连接数更多,从而提高了服务并发度,从而更好地利用了CPU。

Reactor-MS并发模型具体如何实现呢?让我们来看一下对应的代码epollreactorthreadpoolms.cpp,它的内容如下。

#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

#include "../cmdline.h"
#include "../epollctl.hpp"

using namespace std;

int *EpollFd;
int EpollInitCnt = 0;
std::mutex Mutex;
std::condition_variable Cond;

void waitSubReactor() {
  std::unique_lock<std::mutex> locker(Mutex);
  Cond.wait(locker, []() -> bool { return EpollInitCnt >= EchoServer::GetNProcs(); });
  return;
}

void subReactorNotifyReady() {
  {
    std::unique_lock<std::mutex> locker(Mutex);
    EpollInitCnt++;
  }
  Cond.notify_all();
}

void addToSubHandler(int &index, int clientFd) {
  index++;
  index %= EchoServer::GetNProcs();
  EchoServer::Conn *conn = new EchoServer::Conn(clientFd, EpollFd[index], true);  // 轮询的方式添加到子Reactor线程中
  EchoServer::AddReadEvent(conn);                                                 // 监听可读事件
}

void mainHandler(const char *ip, int port) {
  waitSubReactor();  // 等待所有的从Reactor线程都启动完毕
  int sockFd = EchoServer::CreateListenSocket(ip, port, true);
  if (sockFd < 0) {
    return;
  }
  epoll_event events[2048];
  int epollFd = epoll_create(1024);
  if (epollFd < 0) {
    perror("epoll_create failed");
    return;
  }
  int index = 0;
  EchoServer::Conn conn(sockFd, epollFd, true);
  EchoServer::SetNotBlock(sockFd);
  EchoServer::AddReadEvent(&conn);
  while (true) {
    int num = epoll_wait(epollFd, events, 2048, -1);
    if (num < 0) {
      perror("epoll_wait failed");
      continue;
    }
    // 执行到这里就是有客户端连接到来
    EchoServer::LoopAccept(sockFd, 100000, [&index, epollFd](int clientFd) {
      EchoServer::SetNotBlock(clientFd);
      addToSubHandler(index, clientFd);  // 把连接迁移到subHandler线程中管理
    });
  }
}

void subHandler(int threadId) {
  epoll_event events[2048];
  int epollFd = epoll_create(1024);
  if (epollFd < 0) {
    perror("epoll_create failed");
    return;
  }
  EpollFd[threadId] = epollFd;
  subReactorNotifyReady();
  while (true) {
    int num = epoll_wait(epollFd, events, 2048, -1);
    if (num < 0) {
      perror("epoll_wait failed");
      continue;
    }
    for (int i = 0; i < num; i++) {
      EchoServer::Conn *conn = (EchoServer::Conn *)events[i].data.ptr;
      auto releaseConn = [&conn]() {
        EchoServer::ClearEvent(conn);
        delete conn;
      };
      if (events[i].events & EPOLLIN) {  // 可读
        if (not conn->Read()) {          // 执行非阻塞读
          releaseConn();
          continue;
        }
        if (conn->OneMessage()) {             // 判断是否要触发写事件
          EchoServer::ModToWriteEvent(conn);  // 修改成只监控可写事件
        }
      }
      if (events[i].events & EPOLLOUT) {  // 可写
        if (not conn->Write()) {          // 执行非阻塞写
          releaseConn();
          continue;
        }
        if (conn->FinishWrite()) {  // 完成了请求的应答写,则可以释放连接
          releaseConn();
        }
      }
    }
  }
}

void usage() {
  cout << "./EpollReactorThreadPoolMS -ip 0.0.0.0 -port 1688" << endl;
  cout << "options:" << endl;
  cout << "    -h,--help      print usage" << endl;
  cout << "    -ip,--ip       listen ip" << endl;
  cout << "    -port,--port   listen port" << endl;
  cout << endl;
}

int main(int argc, char *argv[]) {
  string ip;
  int64_t port;
  CmdLine::StrOptRequired(&ip, "ip");
  CmdLine::Int64OptRequired(&port, "port");
  CmdLine::SetUsage(usage);
  CmdLine::Parse(argc, argv);
  EpollFd = new int[EchoServer::GetNProcs()];
  for (int i = 0; i < EchoServer::GetNProcs(); i++) {
    std::thread(subHandler, i).detach();  // 这里需要调用detach,让创建的线程独立运行
  }
  for (int i = 0; i < EchoServer::GetNProcs(); i++) {
    std::thread(mainHandler, ip.c_str(), port).detach();  // 这里需要调用detach,让创建的线程独立运行
  }
  while (true) sleep(1);  // 主线程陷入死循环
  return 0;
}

4.服务压测

我们使用压测工具对不同的并发模型进行了测试,压测工具和并发示例服务运行在同一台16核32G的CentOS云主机上,每个CPU的频率为2.59GHz。由于无法统一压测使用的机器和环境,因此在不同的压测机器或环境中获取的压测结果会存在一定的差异。即使是相同的压测机器和环境,多次压测的结果也存在一定的偏差。

在开始压测之前,需要调整两个系统配置。一个是允许打开的文件描述符的最大数,我们需要将这个配置调整为33万。另一个是本地端口可分配范围,我们需要将这个范围调整为1024~65500。

我们使用BenchMark压测工具对EpollReactorSingleProcess、EpollReactorProcessPool、EpollReactorThreadPool、EpollReactorThreadPoolHSHA、EpollReactorThreadPoolMS进行了压测,请求包大小为4KB,每次压测持续30秒,并记录不同并发量请求的平均耗时。压测结果如下表所示。
在这里插入图片描述

从压测结果可以看出,单进程下性能是最差的、进程池优于线程池,在并发不高的情况下半同步半异步模型相对传统的线程池和进程池模型并未有明显的性能优势。

而主从模式下的线程池模型在高并发下展现出了更优的性能。因为有main线程专门接受连接,所以在高并发时连接的接受不会成为瓶颈,连接接受的快,请求也能更快的得到处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/953627.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MongoDb-01——Mac上安装MongoDb以及相关的简单命令

MongoDb-01——Mac上安装MongoDb以及相关的简单命令 1. 下载、安装1.1 官网下载1.2 关于安装MongoDB1.2.1 官方安装文档1.2.2 Mac安装详细步骤&#xff08;使用brew&#xff09; 2. 启动MongoDB2.1 官方说明2.2 作为macOS服务运行的相关命令2.3 访问 3. 链接并使用mongodb3.1 链…

人工智能与软件开发的未来

人工智能正在从各个方面改变软件开发。尽管许多公司竞相推出人工智能功能&#xff0c;但人工智能的潜力已超出了功能层面&#xff0c;成为大多数SaaS解决方案的基础。当机器学习和人工智能模型应用在SaaS技术后&#xff0c;便能提高各种业务流程的效率。人工智能应被视为新的开…

解决报错“No module named ‘pandas.core.indexes‘”

解决办法&#xff1a; 首先使用看一下你的pandas是不是版本太新了&#xff0c;如果使用2.0.0以上的版本&#xff0c;则会出现这个报错。 可以安装1.x.x的版本。 pip install pandas1.5.3

在Bigemap中怎么添加高清地图呢?

会使用到的工具 bigemap gis office&#xff0c;下载链接&#xff1a;BIGEMAP GIS Office-全能版 打开软件&#xff0c;要提示需要授权和添加地图&#xff0c;然后去点击选择地图这个按钮&#xff0c;列表中有个添加按钮点进去选择添加地图的方式。 第一种方式&#xff1a;通…

多轮面试中的策略和技巧:如何稳步晋级

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

如何用数字化系统延长用户运营周期?如何建立数字化用户体系?

如果说运营是进行用户引流、留存及转化的各个细分环节搭建&#xff0c;精细化运营便是针对各个细分环节&#xff0c;结合用户画像、人群定位、场景拆解及数据分析等细节&#xff0c;对用户展开有针对性的运营策略。要知道&#xff0c;运营需要以用户为中心&#xff0c;没有用户…

优思学院|企业推行精益生产要具备哪些前提条件?

企业界早已充斥着各种方法和策略&#xff0c;试图模仿精益生产和六西格玛管理等成功之路&#xff0c;目标在于通过质量工具的运用来改善业务。然而&#xff0c;许多公司在推行这些方法的过程中都犯了一个大错&#xff1a;他们忽视了背后的企业文化和制度&#xff0c;以及精益生…

电脑批量记账,提高效率和管理质量

在快节奏的商业环境中&#xff0c;记账是一项繁琐但必要的任务。为了提高效率和准确性&#xff0c;越来越多的人和企业寻求电脑批量记账的解决方案。 第一步&#xff1a;首先我们要进入晨曦记账本主页面&#xff0c;并点击“收支类别”在弹出来的文件框里输入好类别&#xff0…

Linux查日志的六种实用方法

工具&#xff08;比Xshell好用&#xff0c;国产且免费&#xff09; 先给大家安利一个软件&#xff1a;FinalShell官网 你打印出了日志&#xff0c;可以直接在这个上面搜索高亮 查日志 # 持续打印最新的日志&#xff0c;300行 tail -300f xxx.log# 查某个值 grep "内容&q…

[SWPUCTF 2022]——Web方向 详细Writeup

SWPUCTF 2022 ez_ez_php 打开环境得到源码 <?php error_reporting(0); if (isset($_GET[file])) {if ( substr($_GET["file"], 0, 3) "php" ) {echo "Nice!!!";include($_GET["file"]);} else {echo "Hacker!!";} }e…

【HSPICE仿真】实战练习(1)基础仿真分析

仿真实战 1. 反相器直流仿真1.1 输入文件2.2 执行仿真3.3 仿真输出控制.lis 文件内容波形文件 3.4 修改输出配置 2. 反相器瞬态分析使用不同宽长比进行仿真 3. 几种不同输入源的比较Pulse SourcePattern SourcePWL Source 1. 反相器直流仿真 1.1 输入文件 仿真所用电路图&…

MATLAB中mod函数转化为C语言

背景 有项目算法使用matlab中mod函数进行运算&#xff0c;这里需要将转化为C语言&#xff0c;从而模拟算法运行&#xff0c;将算法移植到qt。 MATLAB中mod简单介绍 语法 b mod(a,m) 说明 b mod(a,m) 返回 a 除以 m 后的余数&#xff0c;其中 a 是被除数&#xff0c;m 是…

使用ccs中 exclude from build功能,源代码不能去除/恢复到工程里

1、使用ccs免不了将源文件从工程里去除&#xff0c;或者重新添加到工程里&#xff0c;一般使用功能exclude from build&#xff0c;如下示&#xff1a;在.c上有键就可以看到 2、有时候用这个功能时&#xff0c;经常会出现ccs没有反应了&#xff0c;不能正常将源代码去除/恢复到…

.NET Meetup in Shanghai

点击蓝字 关注我们 作为一个开源的开发平台&#xff0c;.NET 在开源领域的探索从未止步。在如今风云变幻的大背景下&#xff0c;.NET 开源都会遇到哪些阻力&#xff1f;是什么让我们继续拥抱开源&#xff1f;我们将如何克服当下开源之路所面临的困难&#xff1f;开源 .NET 又将…

CTFhub-文件上传-.htaccess

首先上传 .htaccess 的文件 .htaccess SetHandler application/x-httpd-php 这段内容的作用是使所有的文件都会被解析为php文件 然后上传1.jpg 的文件 内容为一句话木马 1.jpg <?php echo "PHP Loaded"; eval($_POST[a]); ?> 用蚁剑连接 http://ch…

成都瀚网科技:抖音商家如何报名超值购?

为保护抖音消费者权益、规范商家经营行为&#xff0c;抖音对商城超值购物、招商引资等方面做出了详细规定。我们来看看详细信息&#xff1a; 第一条【宗旨和依据】为了倡导开放、透明、共享、责任的新商业文明&#xff0c;保护抖音商城用户和商户的合法权益&#xff0c;合理利用…

十九、状态模式

一、什么是状态模式 状态&#xff08;State&#xff09;模式的定义&#xff1a;对有状态的对象&#xff0c;把复杂的“判断逻辑”提取到不同的状态对象中&#xff0c;允许状态对象在其内部状态发生改变时改变其行为。 状态模式包含以下主要角色&#xff1a; 环境类&#xff08…

关于浏览器中使用迅雷组件下载文件的问题

目录 前言 场景 问题 解决 前言 在项目开发中肯定会涉及到下载导出功能&#xff0c;对于开发人员来说一般习惯使用谷歌、火狐等其他浏览器进行功能测试&#xff0c;例如谷歌浏览器支持加入扩展程序&#xff0c;扩展程序的位置在&#xff1a;点击右上角三个点>找到设置点开…

Windows和Linux环境中安装Zookeeper具体操作

1.Windows环境中安装Zookeeper 1.1 下载Zookeeper安装包 ZooKeeper官网下载地址 建议下载稳定版本的 下载后进行解压后得到如下文件&#xff1a; 1.2 修改本地配置文件 进入解压后的目录&#xff0c;将zoo_example.cfg复制一份并重命名为zoo.cfg,如图所示&#xff1a; 打…

飞天使-python的面向对象

文章目录 面向对象面向对象思想类的定义和使用继承封装多态访问控制 参考视频 面向对象 面向对象思想 面向过程和面对对象的区别是什么&#xff1f; 答: 复用性高&#xff0c;面向对象类的定义和使用 类型里面的定义的时候 self 不能省去&#xff0c;应该写出 class person:…