c++ socket之io复用模型 epoll进阶

news2025/1/15 23:45:41

服务器开发系列


文章目录

  • 服务器开发系列
  • 前言
  • 一、socket epoll介绍
  • 二、代码实现
    • 1. epoll client实现
    • 2. epoll server实现
    • 3. epoll client server验证
  • 总结


前言

I/O复用模型:主要是指,一个线程可以同时监控多个系统IO、并且能够操作多个系统IO的一种技术模型。
select/poll/epoll核心是可以通过一个线程同时处理多个socket链接,并不会使得每次操作io的过程变快。因此,IO复用模型的设计并不是位了快,而是为了解决线程、进程数量过多(数量过多会导致系统频繁切换cpu资源,造成操作系统压力)对服务器开销造成的压力。
IO复用模型是通过一种机制,一个线程可以同时监控多个socket,其中某些socket就绪(读、写事件就绪)后能快速通知程序进程对应的IO操作。IO复用模型一般为异步阻塞IO,但select、poll、epoll本质上还是同步IO(都是需要读写事件就绪后再进行读写,而且整个读写的过程是阻塞的),真正意义上的异步IO是不需要负责进行读写操作的。


一、socket epoll介绍

本文主要讲解epoll方式的IO复用方式。通过对比epoll与select的差异,让小伙伴对IO复用有进一步深入的理解。
epoll相对select、poll优点
1.select的句柄数目受限,在linux/posix_types.h头文件有这样的声明:#define __FD_SETSIZE 1024 表示select最多同时监听1024个fd。而epoll没有,它的限制是最大的打开文件句柄数目。
2.epoll的最大好处是不会随着FD的数目增长而降低效率,在selec中采用轮询处理,其中的数据结构类似一个数组的数据结构,而epoll 是维护一个队列,直接看队列是不是空就可以了。epoll只会对"活跃"的socket进行操作—这是因为在内核实现中epoll是根据每个fd上面 的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数(把这个句柄加入队列),其他idle状态句柄则不会,在这点上,epoll实现了一个"伪"AIO。但是如果绝大部分的I/O都是 “活跃的”,每个I/O端口使用率很高的话,epoll效率不一定比select高(可能是要维护队列复杂)。
3.使用mmap加速内核与用户空间的消息传递。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的

epoll实现流程
总结下,epoll 相关的函数⾥内核运⾏环境分两部分:

⽤户进程内核态。进⾏调⽤ epoll_wait 等函数时会将进程陷⼊内核态来执⾏。这部分代码负责查看接收队列,以及负责把当前进程阻塞掉,让出 CPU。
硬软中断上下⽂。在这些组件中,将包从⽹卡接收过来进⾏处理,然后放到 socket 的接收队列。对于 epoll 来说,再找到 socket 关联的 epitem,并把它添加到 epoll 对象的就绪链表中。 这个时候再捎带检查⼀下 epoll 上是否有被阻塞的进程,如果有唤醒之。
在这里插入图片描述

二、代码实现

1. epoll client实现

#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <cstring>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>

#include <string>
#include <iostream>
#include <memory>
#include <functional>
#include <thread>


namespace mux {
namespace transport {

static const uint32_t kEpollWaitTime = 10; // 10 ms
static const uint32_t kMaxEvents = 100;

typedef struct Packet {
public:
    Packet()
        : msg { "" } {}
    Packet(const std::string& msg)
        : msg { msg } {}
    Packet(int fd, const std::string& msg)
        : fd(fd),
          msg(msg) {}

    int fd { -1 };
    std::string msg;
} Packet;

typedef std::shared_ptr<Packet> PacketPtr;

using callback_recv_t = std::function<void(const PacketPtr& data)>;

class EpollTcpBase {
public:
    EpollTcpBase()                                     = default;
    EpollTcpBase(const EpollTcpBase& other)            = delete;
    EpollTcpBase& operator=(const EpollTcpBase& other) = delete;
    EpollTcpBase(EpollTcpBase&& other)                 = delete;
    EpollTcpBase& operator=(EpollTcpBase&& other)      = delete;
    virtual ~EpollTcpBase()                            = default; 
public:
    virtual bool Start() = 0;
    virtual bool Stop()  = 0;
    virtual int32_t SendData(const PacketPtr& data) = 0;
    virtual void RegisterOnRecvCallback(callback_recv_t callback) = 0;
    virtual void UnRegisterOnRecvCallback() = 0;
};
using ETBase = EpollTcpBase;
typedef std::shared_ptr<ETBase> ETBasePtr;


class EpollTcpClient : public ETBase {
public:
    EpollTcpClient()                                       = default;
    EpollTcpClient(const EpollTcpClient& other)            = delete;
    EpollTcpClient& operator=(const EpollTcpClient& other) = delete;
    EpollTcpClient(EpollTcpClient&& other)                 = delete;
    EpollTcpClient& operator=(EpollTcpClient&& other)      = delete;
    ~EpollTcpClient() override;

    EpollTcpClient(const std::string& server_ip, uint16_t server_port);

public:
    bool Start() override;
    bool Stop() override;
    int32_t SendData(const PacketPtr& data) override;
    void RegisterOnRecvCallback(callback_recv_t callback) override;
    void UnRegisterOnRecvCallback() override;

protected:
    int32_t CreateEpoll();
    int32_t CreateSocket();
    int32_t Connect(int32_t listenfd);
    int32_t UpdateEpollEvents(int efd, int op, int fd, int events);
    void OnSocketRead(int32_t fd);
    void OnSocketWrite(int32_t fd);
    void EpollLoop();


private:
    std::string server_ip_;
    uint16_t server_port_ { 0 };
    int32_t handle_ { -1 };    // client fd
    int32_t efd_ { -1 };       // epoll fd
    std::shared_ptr<std::thread> th_loop_ { nullptr };
    bool loop_flag_ { true };
    callback_recv_t recv_callback_ { nullptr };
};
using ETClient = EpollTcpClient;
typedef std::shared_ptr<ETClient> ETClientPtr;



EpollTcpClient::EpollTcpClient(const std::string& server_ip, uint16_t server_port)
    : server_ip_ { server_ip },
      server_port_ { server_port } {
}

EpollTcpClient::~EpollTcpClient() {
    Stop();
}

bool EpollTcpClient::Start() {
    if (CreateEpoll() < 0) {
        return false;
    }
    // create socket and bind
    int cli_fd  = CreateSocket();
    if (cli_fd < 0) {
        return false;
    }

    int lr = Connect(cli_fd);
    if (lr < 0) {
        return false;
    }
    std::cout << "EpollTcpClient Init success!" << std::endl;
    handle_ = cli_fd;

    int er = UpdateEpollEvents(efd_, EPOLL_CTL_ADD, handle_, EPOLLIN | EPOLLET);
    if (er < 0) {
        ::close(handle_);
        return false;
    }

    assert(!th_loop_);

    th_loop_ = std::make_shared<std::thread>(&EpollTcpClient::EpollLoop, this);
    if (!th_loop_) {
        return false;
    }
    th_loop_->detach();

    return true;
}


bool EpollTcpClient::Stop() {
    loop_flag_ = false;
    ::close(handle_);
    ::close(efd_);
    std::cout << "stop epoll!" << std::endl;
    UnRegisterOnRecvCallback();
    return true;
}

int32_t EpollTcpClient::CreateEpoll() {
    int epollfd = epoll_create(1);
    if (epollfd < 0) {
        std::cout << "epoll_create failed!" << std::endl;
        return -1;
    }
    efd_ = epollfd;
    return epollfd;
}

int32_t EpollTcpClient::CreateSocket() {
    int cli_fd = ::socket(AF_INET, SOCK_STREAM, 0);
    if (cli_fd < 0) {
        std::cout << "create socket failed!" << std::endl;
        return -1;
    }

    return cli_fd;
}

int32_t EpollTcpClient::Connect(int32_t cli_fd) {
    struct sockaddr_in addr;  // server info
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(server_port_);
    addr.sin_addr.s_addr  = inet_addr(server_ip_.c_str());

    int r = ::connect(cli_fd, (struct sockaddr*)&addr, sizeof(addr));
    if ( r < 0) {
        std::cout << "connect failed! r=" << r << " errno:" << errno << std::endl;
        return -1;
    }
    return 0;
}

int32_t EpollTcpClient::UpdateEpollEvents(int efd, int op, int fd, int events) {
    struct epoll_event ev;
    memset(&ev, 0, sizeof(ev));
    ev.events = events;
    ev.data.fd = fd;
    fprintf(stdout,"%s fd %d events read %d write %d\n", op == EPOLL_CTL_MOD ? "mod" : "add", fd, ev.events & EPOLLIN, ev.events & EPOLLOUT);
    int r = epoll_ctl(efd, op, fd, &ev);
    if (r < 0) {
        std::cout << "epoll_ctl failed!" << std::endl;
        return -1;
    }
    return 0;
}

void EpollTcpClient::RegisterOnRecvCallback(callback_recv_t callback) {
    assert(!recv_callback_);
    recv_callback_ = callback;
}

void EpollTcpClient::UnRegisterOnRecvCallback() {
    assert(recv_callback_);
    recv_callback_ = nullptr;
}

// handle read events on fd
void EpollTcpClient::OnSocketRead(int32_t fd) {
    char read_buf[4096];
    bzero(read_buf, sizeof(read_buf));
    int n = -1;
    while ( (n = ::read(fd, read_buf, sizeof(read_buf))) > 0) {
        // callback for recv
        std::string msg(read_buf, n);
        PacketPtr data = std::make_shared<Packet>(fd, msg);
        if (recv_callback_) {
            recv_callback_(data);
        }
    }
    if (n == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            // read finished
            return;
        }
        // something goes wrong for this fd, should close it
        ::close(fd);
        return;
    }
    if (n == 0) {
        // this may happen when client close socket. EPOLLRDHUP usually handle this, but just make sure; should close this fd
        ::close(fd);
        return;
    }
}

// handle write events on fd (usually happens when sending big files)
void EpollTcpClient::OnSocketWrite(int32_t fd) {
    std::cout << "fd: " << fd << " writeable!" << std::endl;
}

int32_t EpollTcpClient::SendData(const PacketPtr& data) {
    int r = ::write(handle_, data->msg.data(), data->msg.size());
    if (r == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return -1;
        }
        // error happend
        ::close(handle_);
        std::cout << "fd: " << handle_ << " write error, close it!" << std::endl;
        return -1;
    }
    return r;
}

void EpollTcpClient::EpollLoop() {
    struct epoll_event* alive_events =  static_cast<epoll_event*>(calloc(kMaxEvents, sizeof(epoll_event)));
    if (!alive_events) {
        std::cout << "calloc memory failed for epoll_events!" << std::endl;
        return;
    }
    while (loop_flag_) {
        int num = epoll_wait(efd_, alive_events, kMaxEvents, kEpollWaitTime);

        for (int i = 0; i < num; ++i) {
            int fd = alive_events[i].data.fd;
            int events = alive_events[i].events;

            if ( (events & EPOLLERR) || (events & EPOLLHUP) ) {
                std::cout << "epoll_wait error!" << std::endl;
                // An error has occured on this fd, or the socket is not ready for reading (why were we notified then?).
                ::close(fd);
            } else  if (events & EPOLLRDHUP) {
                // Stream socket peer closed connection, or shut down writing half of connection.
                // more inportant, We still to handle disconnection when read()/recv() return 0 or -1 just to be sure.
                std::cout << "fd:" << fd << " closed EPOLLRDHUP!" << std::endl;
                // close fd and epoll will remove it
                ::close(fd);
            } else if ( events & EPOLLIN ) {
                // other fd read event coming, meaning data coming
                OnSocketRead(fd);
            } else if ( events & EPOLLOUT ) {
                // write event for fd (not including listen-fd), meaning send buffer is available for big files
                OnSocketWrite(fd);
            } else {
                std::cout << "unknow epoll event!" << std::endl;
            }
        } // end for (int i = 0; ...

    } // end while (loop_flag_)
    free(alive_events);
}


} // end namespace transport
} // end namespace mux


using namespace mux;
using namespace mux::transport;

int main(int argc, char* argv[]) {
    std::string server_ip {"127.0.0.1"};
    uint16_t server_port { 6666 };
    if (argc >= 2) {
        server_ip = std::string(argv[1]);
    }
    if (argc >= 3) {
        server_port = std::atoi(argv[2]);
    }

    auto tcp_client = std::make_shared<EpollTcpClient>(server_ip, server_port);
    if (!tcp_client) {
        std::cout << "tcp_client create faield!" << std::endl;
        exit(-1);
    }

    auto recv_call = [&](const transport::PacketPtr& data) -> void {
        std::cout << "recv: " << data->msg << std::endl;
        return;
    };

    tcp_client->RegisterOnRecvCallback(recv_call);

    if (!tcp_client->Start()) {
        std::cout << "tcp_client start failed!" << std::endl;
        exit(1);
    }
    std::cout << "############tcp_client started!################" << std::endl;

    std::string msg;
    while (true) {
        std::cout << std::endl << "input:";
        std::getline(std::cin, msg);
        auto packet = std::make_shared<Packet>(msg);
        tcp_client->SendData(packet);
        //std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    tcp_client->Stop();

    return 0;
}


/*
g++ epoll_client.cpp  -o epoll_client -std=c++11 -lpthread
*/

2. epoll server实现

#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <cstring>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>

#include <iostream>
#include <string>
#include <thread>
#include <memory>
#include <functional>


namespace mux {
namespace transport {

static const uint32_t kEpollWaitTime = 10; // epoll wait timeout 10 ms
static const uint32_t kMaxEvents = 100;    // epoll wait return max size
// packet of send/recv binary content
typedef struct Packet {
public:
    Packet()
        : msg { "" } {}
    Packet(const std::string& msg)
        : msg { msg } {}
    Packet(int fd, const std::string& msg)
        : fd(fd),
          msg(msg) {}

    int fd { -1 };     // meaning socket
    std::string msg;   // real binary content
} Packet;

typedef std::shared_ptr<Packet> PacketPtr;
// callback when packet received
using callback_recv_t = std::function<void(const PacketPtr& data)>;

// base class of EpollTcpServer, focus on Start(), Stop(), SendData(), RegisterOnRecvCallback()...
class EpollTcpBase {
public:
    EpollTcpBase()                                     = default;
    EpollTcpBase(const EpollTcpBase& other)            = delete;
    EpollTcpBase& operator=(const EpollTcpBase& other) = delete;
    EpollTcpBase(EpollTcpBase&& other)                 = delete;
    EpollTcpBase& operator=(EpollTcpBase&& other)      = delete;
    virtual ~EpollTcpBase()                            = default; 
public:
    virtual bool Start() = 0;
    virtual bool Stop()  = 0;
    virtual int32_t SendData(const PacketPtr& data) = 0;
    virtual void RegisterOnRecvCallback(callback_recv_t callback) = 0;
    virtual void UnRegisterOnRecvCallback() = 0;
};

using ETBase = EpollTcpBase;
typedef std::shared_ptr<ETBase> ETBasePtr;
// the implementation of Epoll Tcp Server
class EpollTcpServer : public ETBase {
public:
    EpollTcpServer()                                       = default;
    EpollTcpServer(const EpollTcpServer& other)            = delete;
    EpollTcpServer& operator=(const EpollTcpServer& other) = delete;
    EpollTcpServer(EpollTcpServer&& other)                 = delete;
    EpollTcpServer& operator=(EpollTcpServer&& other)      = delete;
    ~EpollTcpServer() override;
    // the local ip and port of tcp server
    EpollTcpServer(const std::string& local_ip, uint16_t local_port);
public:
    // start tcp server
    bool Start() override;
    // stop tcp server
    bool Stop() override;
    // send packet
    int32_t SendData(const PacketPtr& data) override;
    // register a callback when packet received
    void RegisterOnRecvCallback(callback_recv_t callback) override;
    void UnRegisterOnRecvCallback() override;
protected:
    // create epoll instance using epoll_create and return a fd of epoll
    int32_t CreateEpoll();
    // create a socket fd using api socket()
    int32_t CreateSocket();
    // set socket noblock
    int32_t MakeSocketNonBlock(int32_t fd);
    // listen() 
    int32_t Listen(int32_t listenfd);
    // add/modify/remove a item(socket/fd) in epoll instance(rbtree), for this example, just add a socket to epoll rbtree
    int32_t UpdateEpollEvents(int efd, int op, int fd, int events);

    // handle tcp accept event
    void OnSocketAccept();
    // handle tcp socket readable event(read())
    void OnSocketRead(int32_t fd);
    // handle tcp socket writeable event(write())
    void OnSocketWrite(int32_t fd);
    // one loop per thread, call epoll_wait and return ready socket(accept,readable,writeable,error...)
    void EpollLoop();
private:
    std::string local_ip_; // tcp local ip
    uint16_t local_port_ { 0 }; // tcp bind local port
    int32_t handle_ { -1 }; // listenfd
    int32_t efd_ { -1 }; // epoll fd
    std::shared_ptr<std::thread> th_loop_ { nullptr }; // one loop per thread(call epoll_wait in loop)
    bool loop_flag_ { true }; // if loop_flag_ is false, then exit the epoll loop
    callback_recv_t recv_callback_ { nullptr }; // callback when received
};

using ETServer = EpollTcpServer;

typedef std::shared_ptr<ETServer> ETServerPtr;


EpollTcpServer::EpollTcpServer(const std::string& local_ip, uint16_t local_port)
    : local_ip_ { local_ip },
      local_port_ { local_port } {
}

EpollTcpServer::~EpollTcpServer() {
    Stop();
}

bool EpollTcpServer::Start() {
    // create epoll instance
    if (CreateEpoll() < 0) {
        return false;
    }
    // create socket and bind
    int listenfd = CreateSocket();
    if (listenfd < 0) {
        return false;
    }
    // set listen socket noblock
    int mr = MakeSocketNonBlock(listenfd);
    if (mr < 0) {
        return false;
    }

    // call listen()
    int lr = Listen(listenfd);
    if (lr < 0) {
        return false;
    }
    std::cout << "EpollTcpServer Init success!" << std::endl;
    handle_ = listenfd;
    // add listen socket to epoll instance, and focus on event EPOLLIN and EPOLLOUT, actually EPOLLIN is enough
    int er = UpdateEpollEvents(efd_, EPOLL_CTL_ADD, handle_, EPOLLIN | EPOLLET);
    if (er < 0) {
        // if something goes wrong, close listen socket and return false
        ::close(handle_);
        return false;
    }

    assert(!th_loop_);

    // the implementation of one loop per thread: create a thread to loop epoll
    th_loop_ = std::make_shared<std::thread>(&EpollTcpServer::EpollLoop, this);
    if (!th_loop_) {
        return false;
    }
    // detach the thread(using loop_flag_ to control the start/stop of loop)
    th_loop_->detach();
    return true;
}


// stop epoll tcp server and release epoll
bool EpollTcpServer::Stop() {
    // set loop_flag_ false to stop epoll loop
    loop_flag_ = false;
    ::close(handle_);
    ::close(efd_);
    std::cout << "stop epoll!" << std::endl;
    UnRegisterOnRecvCallback();
    return true;
}

int32_t EpollTcpServer::CreateEpoll() {
    // the basic epoll api of create a epoll instance
    int epollfd = epoll_create(1);
    if (epollfd < 0) {
        // if something goes wrong, return -1
        std::cout << "epoll_create failed!" << std::endl;
        return -1;
    }
    efd_ = epollfd;
    return epollfd;
}

int32_t EpollTcpServer::CreateSocket() {
    // create tcp socket
    int listenfd = ::socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd < 0) {
        std::cout << "create socket " << local_ip_ << ":" << local_port_ << " failed!" << std::endl;
        return -1;
    }
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(local_port_);
    addr.sin_addr.s_addr  = inet_addr(local_ip_.c_str());

    // bind to local ip and local port
    int r = ::bind(listenfd, (struct sockaddr*)&addr, sizeof(struct sockaddr));
    if (r != 0) {
        std::cout << "bind socket " << local_ip_ << ":" << local_port_ << " failed!" << std::endl;
        ::close(listenfd);
        return -1;
    }
    std::cout << "create and bind socket " << local_ip_ << ":" << local_port_ << " success!" << std::endl;
    return listenfd;
}

// set noblock fd
int32_t EpollTcpServer::MakeSocketNonBlock(int32_t fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags < 0) {
        std::cout << "fcntl failed!" << std::endl;
        return -1;
    }
    int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    if (r < 0) {
        std::cout << "fcntl failed!" << std::endl;
        return -1;
    }
    return 0;
}

// call listen() api and set listen queue size using SOMAXCONN
int32_t EpollTcpServer::Listen(int32_t listenfd) {
    int r = ::listen(listenfd, SOMAXCONN);
    if ( r < 0) {
        std::cout << "listen failed!" << std::endl;
        return -1;
    }
    return 0;
}

// add/modify/remove a item(socket/fd) in epoll instance(rbtree), for this example, just add a socket to epoll rbtree
int32_t EpollTcpServer::UpdateEpollEvents(int efd, int op, int fd, int events) {
    struct epoll_event ev;
    memset(&ev, 0, sizeof(ev));
    ev.events = events;
    ev.data.fd = fd; // ev.data is a enum
    fprintf(stdout,"%s fd %d events read %d write %d\n", op == EPOLL_CTL_MOD ? "mod" : "add", fd, ev.events & EPOLLIN, ev.events & EPOLLOUT);
    int r = epoll_ctl(efd, op, fd, &ev);
    if (r < 0) {
        std::cout << "epoll_ctl failed!" << std::endl;
        return -1;
    }
    return 0;
}

// handle accept event
void EpollTcpServer::OnSocketAccept() {
    // epoll working on et mode, must read all coming data, so use a while loop here
    while (true) {
        struct sockaddr_in in_addr;
        socklen_t in_len = sizeof(in_addr);

        // accept a new connection and get a new socket
        int cli_fd = accept(handle_, (struct sockaddr*)&in_addr, &in_len);
        if (cli_fd == -1) {
            if ( (errno == EAGAIN) || (errno == EWOULDBLOCK) ) {
                // read all accept finished(epoll et mode only trigger one time,so must read all data in listen socket)
                std::cout << "accept all coming connections!" << std::endl;
                break;
            } else {
                std::cout << "accept error!" << std::endl;
                continue;
            }
        }

        sockaddr_in peer;
        socklen_t p_len = sizeof(peer);
        // get client ip and port
        int r = getpeername(cli_fd, (struct sockaddr*)&peer, &p_len);
        if (r < 0) {
            std::cout << "getpeername error!" << std::endl;
            continue;
        }
        std::cout << "accpet connection from " << inet_ntoa(in_addr.sin_addr) << std::endl;
        int mr = MakeSocketNonBlock(cli_fd);
        if (mr < 0) {
            ::close(cli_fd);
            continue;
        }
        //  add this new socket to epoll instance, and focus on EPOLLIN and EPOLLOUT and EPOLLRDHUP event
        int er = UpdateEpollEvents(efd_, EPOLL_CTL_ADD, cli_fd, EPOLLIN | EPOLLRDHUP | EPOLLET);
        if (er < 0 ) {
            // if something goes wrong, close this new socket
            ::close(cli_fd);
            continue;
        }
    }
}

// register a callback when packet received
void EpollTcpServer::RegisterOnRecvCallback(callback_recv_t callback) {
    assert(!recv_callback_);
    recv_callback_ = callback;
}

void EpollTcpServer::UnRegisterOnRecvCallback() {
    assert(recv_callback_);
    recv_callback_ = nullptr;
}

// handle read events on fd
void EpollTcpServer::OnSocketRead(int32_t fd) {
    char read_buf[4096];
    bzero(read_buf, sizeof(read_buf));
    int n = -1;
    // epoll working on et mode, must read all data
    while ( (n = ::read(fd, read_buf, sizeof(read_buf))) > 0) {
        // callback for recv
        std::cout << "fd: " << fd <<  " recv: " << read_buf << std::endl;
        std::string msg(read_buf, n);
        // create a recv packet
        PacketPtr data = std::make_shared<Packet>(fd, msg);
        if (recv_callback_) {
            // handle recv packet
            recv_callback_(data);
        }
    }
    if (n == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            // read all data finished
            return;
        }
        // something goes wrong for this fd, should close it
        ::close(fd);
        return;
    }
    if (n == 0) {
        // this may happen when client close socket. EPOLLRDHUP usually handle this, but just make sure; should close this fd
        ::close(fd);
        return;
    }
}

// handle write events on fd (usually happens when sending big files)
void EpollTcpServer::OnSocketWrite(int32_t fd) {
    // TODO(smaugx) not care for now
    std::cout << "fd: " << fd << " writeable!" << std::endl;
}

// send packet
int32_t EpollTcpServer::SendData(const PacketPtr& data) {
    if (data->fd == -1) {
        return -1;
    }
    // send packet on fd
    int r = ::write(data->fd, data->msg.data(), data->msg.size());
    if (r == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return -1;
        }
        // error happend
        ::close(data->fd);
        std::cout << "fd: " << data->fd << " write error, close it!" << std::endl;
        return -1;
    }
    std::cout << "fd: " << data->fd << " write size: " << r << " ok!" << std::endl;
    return r;
}

// one loop per thread, call epoll_wait and handle all coming events
void EpollTcpServer::EpollLoop() {
    // request some memory, if events ready, socket events will copy to this memory from kernel
    struct epoll_event* alive_events =  static_cast<epoll_event*>(calloc(kMaxEvents, sizeof(epoll_event)));
    if (!alive_events) {
        std::cout << "calloc memory failed for epoll_events!" << std::endl;
        return;
    }
    // if loop_flag_ is false, will exit this loop
    while (loop_flag_) {
        // call epoll_wait and return ready socket
        int num = epoll_wait(efd_, alive_events, kMaxEvents, kEpollWaitTime);

        for (int i = 0; i < num; ++i) {
            // get fd
            int fd = alive_events[i].data.fd;
            // get events(readable/writeable/error)
            int events = alive_events[i].events;

            if ( (events & EPOLLERR) || (events & EPOLLHUP) ) {
                std::cout << "epoll_wait error!" << std::endl;
                // An error has occured on this fd, or the socket is not ready for reading (why were we notified then?).
                ::close(fd);
            } else  if (events & EPOLLRDHUP) {
                // Stream socket peer closed connection, or shut down writing half of connection.
                // more inportant, We still to handle disconnection when read()/recv() return 0 or -1 just to be sure.
                std::cout << "fd:" << fd << " closed EPOLLRDHUP!" << std::endl;
                // close fd and epoll will remove it
                ::close(fd);
            } else if ( events & EPOLLIN ) {
                std::cout << "epollin" << std::endl;
                if (fd == handle_) {
                    // listen fd coming connections
                    OnSocketAccept();
                } else {
                    // other fd read event coming, meaning data coming
                    OnSocketRead(fd);
                }
            } else if ( events & EPOLLOUT ) {
                std::cout << "epollout" << std::endl;
                // write event for fd (not including listen-fd), meaning send buffer is available for big files
                OnSocketWrite(fd);
            } else {
                std::cout << "unknow epoll event!" << std::endl;
            }
        } // end for (int i = 0; ...
    } // end while (loop_flag_)
    free(alive_events);
}
} // end namespace transport
} // end namespace mux


using namespace mux;
using namespace transport;
int main(int argc, char* argv[]) {
    std::string local_ip {"127.0.0.1"};
    uint16_t local_port { 6666 };
    if (argc >= 2) {
        local_ip = std::string(argv[1]);
    }
    if (argc >= 3) {
        local_port = std::atoi(argv[2]);
    }
    // create a epoll tcp server
    auto epoll_server = std::make_shared<EpollTcpServer>(local_ip, local_port);
    if (!epoll_server) {
        std::cout << "tcp_server create faield!" << std::endl;
        exit(-1);
    }

    // recv callback in lambda mode, you can set your own callback here
    auto recv_call = [&](const PacketPtr& data) -> void {
        // just echo packet
        epoll_server->SendData(data);
        return;
    };

    // register recv callback to epoll tcp server
    epoll_server->RegisterOnRecvCallback(recv_call);

    // start the epoll tcp server
    if (!epoll_server->Start()) {
        std::cout << "tcp_server start failed!" << std::endl;
        exit(1);
    }
    std::cout << "############tcp_server started!################" << std::endl;

    // block here 
    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    epoll_server->Stop();
    return 0;
}



/*
g++ epoll_server2.cpp  -o epoll_server2 -std=c++11 -lpthread
*/

3. epoll client server验证

在这里插入图片描述


总结

通过本文的学习,你应该对epoll有了深入的 理解,希望的对你有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/162450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

西瓜书第一章课后题答案(一)

1.1 针对西瓜分类分题进行讲解属性&#xff1a; 3个属性色泽&#xff1a;&#xff08;青绿&#xff0c;乌黑&#xff0c;浅白&#xff09;根蒂&#xff1a;&#xff08;蜷缩&#xff0c;硬挺&#xff0c;稍蜷&#xff09;敲声&#xff1a;&#xff08;浊响&#xff0c;清脆&…

Docker网络network详解

一、概述 Docker容器每次重启后容器ip是会发生变化的。 这也意味着如果容器间使用ip地址来进行通信的话&#xff0c;一旦有容器重启&#xff0c;重启的容器将不再能被访问到。 而Docker 网络就能够解决这个问题。 Docker 网络主要有以下两个作用&#xff1a; 容器间的互联和…

【ROS2入门】理解 ROS 2 节点

大家好&#xff0c;我是虎哥&#xff0c;从今天开始&#xff0c;我将花一段时间&#xff0c;开始将自己从ROS1切换到ROS2&#xff0c;在上一篇中&#xff0c;我们依托Turtlesim演示节点来逐步展开&#xff0c;介绍了rqt工具&#xff0c;这一章&#xff0c;我们将围绕ROS2中主要…

jvm快速入门

1.JVM介绍 1.什么是jvm Java Virtual Machine&#xff08;java二进制字节码运行环境&#xff09; 好处&#xff1a; 一次编译&#xff0c;好处运行自动内存管理&#xff0c;垃圾回收机制数组下标越界检查多态 比较JVM\JRE\JDK jvm屏蔽java代码与底层操作系统的差异 JREJVM基…

基于 java springboot+layui仓库管理系统设计和实现

基于 java springbootlayui仓库管理系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码…

基于基于jsp+mysql+Spring+mybatis的SSM汽车保险理赔管理系统设计和实现

基于基于jspmysqlSpringmybatis的SSM汽车保险理赔管理系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐…

12图、网络、关联矩阵

第 12 讲 图、网络、关联矩阵 Graphs&#xff0c;networks&#xff0c;incidence matrices 本讲讨论线性代数在物理系统中的应用。 图和网络 Graphs & Networks “图”就是“结点”和“边”的一个集合。 边线上的箭头代表从结点流出的正方向。 关联矩阵&#xff08;I…

独立开发变现周刊(第87期):靠写简历如何每年赚24万美元?

分享独立开发、产品变现相关内容&#xff0c;每周五发布。目录1、mall4cloud: 开源的微服务B2B2C电商商城系统2、restorePhotos&#xff1a;开源的老照片修复系统3、JSON Crack——开源、免费的JSON可视化应用程序4、靠写简历如何每年赚24万美元&#xff1f;1、mall4cloud: 开源…

干货:用户分析的六大方法论(一)

​在日常的用户分析中&#xff0c;常用的有六大分析方法论&#xff1a; 1、行为事件分析2、点击分析模型3、用户行为路径分析4、用户健康度分析5、漏斗模型6、用户画像分析 1.行为事件分析 “行为事件分析”是用户分析的第一步&#xff0c;也是用户分析的核心和基础。一般来说…

代码随想录训练营第五十九天

1.下一个更大元素II 题503 循环数组有两种方法&#xff0c;一是用同一个数组拼接成两个数组&#xff0c;实现假循环&#xff1b;二是遍历两遍&#xff0c;用求余的方法。求余的方法更简便。 class Solution { public:vector<int> nextGreaterElements(vector<int>…

虚拟化技术学习笔记9

KVM存储虚拟化配置&#xff1a; 学习目标&#xff1a; 能够通过virt-manager添加硬盘&#xff1b;能够通过virsh添加硬盘&#xff1b;能够通过xml文件添加硬盘&#xff1b;能够了解本地存储的作用&#xff1b;能够设置本地存储&#xff1b;能够了解网络存储的作用&#xff1b…

黑马程序员SSM框架教程_Spring+SpringMVC+MyBatisPlus笔记(自学用,持续更新)

Spring的实现有两种方式&#xff0c;一是配置&#xff0c;二是注解 目录Spring_day01IOC、DIBean的基本配置、实例化、生命周期Bean的基本配置bean的实例化训练中的不足1&#xff1a;bean的生命周期DI相关内容setter注入构造器注入小结自动注入集合注入Spring_day02Spring_day0…

深入理解计算机系统_程序的加载过程和运行过程

这篇博客记录编译得到可执行目标文件后&#xff0c;加载和运行的过程。 编译得到可执行目标文件后&#xff0c;就可以将“可执行目标文件”加载“运行地址”所指的内存位置&#xff0c;然后运行了。下面记录Linux虚拟内存运行的运行过程。 2.1 程序的加载过程 当在windows下双…

【Java AWT 图形界面编程】LayoutManager 布局管理器 ③ ( BorderLayout 布局 )

文章目录一、BorderLayout 布局二、BorderLayout 构造函数 API三、BorderLayout 代码示例1、BorderLayout 基本用法代码示例2、BorderLayout 区域占用代码示例3、BorderLayout 同一区域显示多个组件代码示例一、BorderLayout 布局 BorderLayout 布局 将 Container 容器 分割成 …

深度学习PyTorch 之 DNN-二分类

本节开始说一下DNN分类的pytorch实现&#xff0c;先说一下二分类 流程还是跟前面一样 #mermaid-svg-7Bxg4CYlbKjYOMMf {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-7Bxg4CYlbKjYOMMf .error-icon{fill:#552222;}…

从校园到职场,听听他们的成长之路

背景介绍 这次分享主题是「从校园到职场 -- 我的成长之路」&#xff0c;视频内容可以查看 B 站链接&#xff1a;从校园到实习再到秋招。 上次的面试分享之后&#xff0c;阿卡拉提到关于刚毕业的学生也会有很多找工作的困扰&#xff0c;而且这个阶段能获取到的信息相对比较有限&…

Java API文档的使用详解

文章目录1. 概念2. 使用Java编程基础教程系列学会使用 API 文档是一个开发者基本的素养&#xff0c;而许多初学者并不会在意 API 文档的使用&#xff0c;甚至从来没有接触过&#xff0c;所以写下这篇文章探讨 API 文档的使用&#xff0c;希望能够帮助到你&#xff0c;先赞后看&…

正点原子嵌入式linux第二期

目录 第5讲 IMX6U芯片介绍 第6讲 6.1汇编LED驱动实验-原理分析 6.2 汇编LED驱动实验-汇编基本语法 ​编辑6.3 驱动编写 6.4 编写驱动 6.5烧写bin文件到SD卡并运行 第七讲 IMX启动方式&#xff08;没怎么听懂&#xff09; 7.1启动设备的选择 7.2 IVT表和BootData详解 7.3D…

从面试官的角度带你从源码分析关于vue(v2.7.10)的面试题

我们在面试的时候经常会被问到vue框架的原理类问题&#xff0c;我今天整理了一些常见问题和答案&#xff0c;希望有不正确之处还请指正。 1.new Vue时发生了什么 首先实例化一个对象&#xff0c;该对象执行init方法初始化生命周期等等&#xff0c;随后执行$mount方法开始生成v…

时间序列模型SCINet(代码解析)

前言 SCINet模型&#xff0c;精度仅次于NLinear的时间序列模型&#xff0c;在ETTh2数据集上单变量预测结果甚至比NLinear模型还要好。在这里还是建议大家去读一读论文&#xff0c;论文写的很规范&#xff0c;很值得学习&#xff0c;论文地址SCINet模型Github项目地址&#xff…