服务器开发系列
文章目录
- 服务器开发系列
- 前言
- 一、socket epoll介绍
- 二、代码实现
- 1. epoll client实现
- 2. epoll server实现
- 3. epoll client server验证
- 总结
前言
I/O复用模型:主要是指,一个线程可以同时监控多个系统IO、并且能够操作多个系统IO的一种技术模型。
select/poll/epoll核心是可以通过一个线程同时处理多个socket链接,并不会使得每次操作io的过程变快。因此,IO复用模型的设计并不是位了快,而是为了解决线程、进程数量过多(数量过多会导致系统频繁切换cpu资源,造成操作系统压力)对服务器开销造成的压力。
IO复用模型是通过一种机制,一个线程可以同时监控多个socket,其中某些socket就绪(读、写事件就绪)后能快速通知程序进程对应的IO操作。IO复用模型一般为异步阻塞IO,但select、poll、epoll本质上还是同步IO(都是需要读写事件就绪后再进行读写,而且整个读写的过程是阻塞的),真正意义上的异步IO是不需要负责进行读写操作的。
一、socket epoll介绍
本文主要讲解epoll方式的IO复用方式。通过对比epoll与select的差异,让小伙伴对IO复用有进一步深入的理解。
epoll相对select、poll优点
1.select的句柄数目受限,在linux/posix_types.h头文件有这样的声明:#define __FD_SETSIZE 1024 表示select最多同时监听1024个fd。而epoll没有,它的限制是最大的打开文件句柄数目。
2.epoll的最大好处是不会随着FD的数目增长而降低效率,在selec中采用轮询处理,其中的数据结构类似一个数组的数据结构,而epoll 是维护一个队列,直接看队列是不是空就可以了。epoll只会对"活跃"的socket进行操作—这是因为在内核实现中epoll是根据每个fd上面 的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数(把这个句柄加入队列),其他idle状态句柄则不会,在这点上,epoll实现了一个"伪"AIO。但是如果绝大部分的I/O都是 “活跃的”,每个I/O端口使用率很高的话,epoll效率不一定比select高(可能是要维护队列复杂)。
3.使用mmap加速内核与用户空间的消息传递。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的
epoll实现流程
总结下,epoll 相关的函数⾥内核运⾏环境分两部分:
⽤户进程内核态。进⾏调⽤ epoll_wait 等函数时会将进程陷⼊内核态来执⾏。这部分代码负责查看接收队列,以及负责把当前进程阻塞掉,让出 CPU。
硬软中断上下⽂。在这些组件中,将包从⽹卡接收过来进⾏处理,然后放到 socket 的接收队列。对于 epoll 来说,再找到 socket 关联的 epitem,并把它添加到 epoll 对象的就绪链表中。 这个时候再捎带检查⼀下 epoll 上是否有被阻塞的进程,如果有唤醒之。
二、代码实现
1. epoll client实现
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <cstring>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <memory>
#include <functional>
#include <thread>
namespace mux {
namespace transport {
static const uint32_t kEpollWaitTime = 10; // 10 ms
static const uint32_t kMaxEvents = 100;
typedef struct Packet {
public:
Packet()
: msg { "" } {}
Packet(const std::string& msg)
: msg { msg } {}
Packet(int fd, const std::string& msg)
: fd(fd),
msg(msg) {}
int fd { -1 };
std::string msg;
} Packet;
typedef std::shared_ptr<Packet> PacketPtr;
using callback_recv_t = std::function<void(const PacketPtr& data)>;
class EpollTcpBase {
public:
EpollTcpBase() = default;
EpollTcpBase(const EpollTcpBase& other) = delete;
EpollTcpBase& operator=(const EpollTcpBase& other) = delete;
EpollTcpBase(EpollTcpBase&& other) = delete;
EpollTcpBase& operator=(EpollTcpBase&& other) = delete;
virtual ~EpollTcpBase() = default;
public:
virtual bool Start() = 0;
virtual bool Stop() = 0;
virtual int32_t SendData(const PacketPtr& data) = 0;
virtual void RegisterOnRecvCallback(callback_recv_t callback) = 0;
virtual void UnRegisterOnRecvCallback() = 0;
};
using ETBase = EpollTcpBase;
typedef std::shared_ptr<ETBase> ETBasePtr;
class EpollTcpClient : public ETBase {
public:
EpollTcpClient() = default;
EpollTcpClient(const EpollTcpClient& other) = delete;
EpollTcpClient& operator=(const EpollTcpClient& other) = delete;
EpollTcpClient(EpollTcpClient&& other) = delete;
EpollTcpClient& operator=(EpollTcpClient&& other) = delete;
~EpollTcpClient() override;
EpollTcpClient(const std::string& server_ip, uint16_t server_port);
public:
bool Start() override;
bool Stop() override;
int32_t SendData(const PacketPtr& data) override;
void RegisterOnRecvCallback(callback_recv_t callback) override;
void UnRegisterOnRecvCallback() override;
protected:
int32_t CreateEpoll();
int32_t CreateSocket();
int32_t Connect(int32_t listenfd);
int32_t UpdateEpollEvents(int efd, int op, int fd, int events);
void OnSocketRead(int32_t fd);
void OnSocketWrite(int32_t fd);
void EpollLoop();
private:
std::string server_ip_;
uint16_t server_port_ { 0 };
int32_t handle_ { -1 }; // client fd
int32_t efd_ { -1 }; // epoll fd
std::shared_ptr<std::thread> th_loop_ { nullptr };
bool loop_flag_ { true };
callback_recv_t recv_callback_ { nullptr };
};
using ETClient = EpollTcpClient;
typedef std::shared_ptr<ETClient> ETClientPtr;
EpollTcpClient::EpollTcpClient(const std::string& server_ip, uint16_t server_port)
: server_ip_ { server_ip },
server_port_ { server_port } {
}
EpollTcpClient::~EpollTcpClient() {
Stop();
}
bool EpollTcpClient::Start() {
if (CreateEpoll() < 0) {
return false;
}
// create socket and bind
int cli_fd = CreateSocket();
if (cli_fd < 0) {
return false;
}
int lr = Connect(cli_fd);
if (lr < 0) {
return false;
}
std::cout << "EpollTcpClient Init success!" << std::endl;
handle_ = cli_fd;
int er = UpdateEpollEvents(efd_, EPOLL_CTL_ADD, handle_, EPOLLIN | EPOLLET);
if (er < 0) {
::close(handle_);
return false;
}
assert(!th_loop_);
th_loop_ = std::make_shared<std::thread>(&EpollTcpClient::EpollLoop, this);
if (!th_loop_) {
return false;
}
th_loop_->detach();
return true;
}
bool EpollTcpClient::Stop() {
loop_flag_ = false;
::close(handle_);
::close(efd_);
std::cout << "stop epoll!" << std::endl;
UnRegisterOnRecvCallback();
return true;
}
int32_t EpollTcpClient::CreateEpoll() {
int epollfd = epoll_create(1);
if (epollfd < 0) {
std::cout << "epoll_create failed!" << std::endl;
return -1;
}
efd_ = epollfd;
return epollfd;
}
int32_t EpollTcpClient::CreateSocket() {
int cli_fd = ::socket(AF_INET, SOCK_STREAM, 0);
if (cli_fd < 0) {
std::cout << "create socket failed!" << std::endl;
return -1;
}
return cli_fd;
}
int32_t EpollTcpClient::Connect(int32_t cli_fd) {
struct sockaddr_in addr; // server info
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port_);
addr.sin_addr.s_addr = inet_addr(server_ip_.c_str());
int r = ::connect(cli_fd, (struct sockaddr*)&addr, sizeof(addr));
if ( r < 0) {
std::cout << "connect failed! r=" << r << " errno:" << errno << std::endl;
return -1;
}
return 0;
}
int32_t EpollTcpClient::UpdateEpollEvents(int efd, int op, int fd, int events) {
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = events;
ev.data.fd = fd;
fprintf(stdout,"%s fd %d events read %d write %d\n", op == EPOLL_CTL_MOD ? "mod" : "add", fd, ev.events & EPOLLIN, ev.events & EPOLLOUT);
int r = epoll_ctl(efd, op, fd, &ev);
if (r < 0) {
std::cout << "epoll_ctl failed!" << std::endl;
return -1;
}
return 0;
}
void EpollTcpClient::RegisterOnRecvCallback(callback_recv_t callback) {
assert(!recv_callback_);
recv_callback_ = callback;
}
void EpollTcpClient::UnRegisterOnRecvCallback() {
assert(recv_callback_);
recv_callback_ = nullptr;
}
// handle read events on fd
void EpollTcpClient::OnSocketRead(int32_t fd) {
char read_buf[4096];
bzero(read_buf, sizeof(read_buf));
int n = -1;
while ( (n = ::read(fd, read_buf, sizeof(read_buf))) > 0) {
// callback for recv
std::string msg(read_buf, n);
PacketPtr data = std::make_shared<Packet>(fd, msg);
if (recv_callback_) {
recv_callback_(data);
}
}
if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// read finished
return;
}
// something goes wrong for this fd, should close it
::close(fd);
return;
}
if (n == 0) {
// this may happen when client close socket. EPOLLRDHUP usually handle this, but just make sure; should close this fd
::close(fd);
return;
}
}
// handle write events on fd (usually happens when sending big files)
void EpollTcpClient::OnSocketWrite(int32_t fd) {
std::cout << "fd: " << fd << " writeable!" << std::endl;
}
int32_t EpollTcpClient::SendData(const PacketPtr& data) {
int r = ::write(handle_, data->msg.data(), data->msg.size());
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return -1;
}
// error happend
::close(handle_);
std::cout << "fd: " << handle_ << " write error, close it!" << std::endl;
return -1;
}
return r;
}
void EpollTcpClient::EpollLoop() {
struct epoll_event* alive_events = static_cast<epoll_event*>(calloc(kMaxEvents, sizeof(epoll_event)));
if (!alive_events) {
std::cout << "calloc memory failed for epoll_events!" << std::endl;
return;
}
while (loop_flag_) {
int num = epoll_wait(efd_, alive_events, kMaxEvents, kEpollWaitTime);
for (int i = 0; i < num; ++i) {
int fd = alive_events[i].data.fd;
int events = alive_events[i].events;
if ( (events & EPOLLERR) || (events & EPOLLHUP) ) {
std::cout << "epoll_wait error!" << std::endl;
// An error has occured on this fd, or the socket is not ready for reading (why were we notified then?).
::close(fd);
} else if (events & EPOLLRDHUP) {
// Stream socket peer closed connection, or shut down writing half of connection.
// more inportant, We still to handle disconnection when read()/recv() return 0 or -1 just to be sure.
std::cout << "fd:" << fd << " closed EPOLLRDHUP!" << std::endl;
// close fd and epoll will remove it
::close(fd);
} else if ( events & EPOLLIN ) {
// other fd read event coming, meaning data coming
OnSocketRead(fd);
} else if ( events & EPOLLOUT ) {
// write event for fd (not including listen-fd), meaning send buffer is available for big files
OnSocketWrite(fd);
} else {
std::cout << "unknow epoll event!" << std::endl;
}
} // end for (int i = 0; ...
} // end while (loop_flag_)
free(alive_events);
}
} // end namespace transport
} // end namespace mux
using namespace mux;
using namespace mux::transport;
int main(int argc, char* argv[]) {
std::string server_ip {"127.0.0.1"};
uint16_t server_port { 6666 };
if (argc >= 2) {
server_ip = std::string(argv[1]);
}
if (argc >= 3) {
server_port = std::atoi(argv[2]);
}
auto tcp_client = std::make_shared<EpollTcpClient>(server_ip, server_port);
if (!tcp_client) {
std::cout << "tcp_client create faield!" << std::endl;
exit(-1);
}
auto recv_call = [&](const transport::PacketPtr& data) -> void {
std::cout << "recv: " << data->msg << std::endl;
return;
};
tcp_client->RegisterOnRecvCallback(recv_call);
if (!tcp_client->Start()) {
std::cout << "tcp_client start failed!" << std::endl;
exit(1);
}
std::cout << "############tcp_client started!################" << std::endl;
std::string msg;
while (true) {
std::cout << std::endl << "input:";
std::getline(std::cin, msg);
auto packet = std::make_shared<Packet>(msg);
tcp_client->SendData(packet);
//std::this_thread::sleep_for(std::chrono::seconds(1));
}
tcp_client->Stop();
return 0;
}
/*
g++ epoll_client.cpp -o epoll_client -std=c++11 -lpthread
*/
2. epoll server实现
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <cstring>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>
#include <iostream>
#include <string>
#include <thread>
#include <memory>
#include <functional>
namespace mux {
namespace transport {
static const uint32_t kEpollWaitTime = 10; // epoll wait timeout 10 ms
static const uint32_t kMaxEvents = 100; // epoll wait return max size
// packet of send/recv binary content
typedef struct Packet {
public:
Packet()
: msg { "" } {}
Packet(const std::string& msg)
: msg { msg } {}
Packet(int fd, const std::string& msg)
: fd(fd),
msg(msg) {}
int fd { -1 }; // meaning socket
std::string msg; // real binary content
} Packet;
typedef std::shared_ptr<Packet> PacketPtr;
// callback when packet received
using callback_recv_t = std::function<void(const PacketPtr& data)>;
// base class of EpollTcpServer, focus on Start(), Stop(), SendData(), RegisterOnRecvCallback()...
class EpollTcpBase {
public:
EpollTcpBase() = default;
EpollTcpBase(const EpollTcpBase& other) = delete;
EpollTcpBase& operator=(const EpollTcpBase& other) = delete;
EpollTcpBase(EpollTcpBase&& other) = delete;
EpollTcpBase& operator=(EpollTcpBase&& other) = delete;
virtual ~EpollTcpBase() = default;
public:
virtual bool Start() = 0;
virtual bool Stop() = 0;
virtual int32_t SendData(const PacketPtr& data) = 0;
virtual void RegisterOnRecvCallback(callback_recv_t callback) = 0;
virtual void UnRegisterOnRecvCallback() = 0;
};
using ETBase = EpollTcpBase;
typedef std::shared_ptr<ETBase> ETBasePtr;
// the implementation of Epoll Tcp Server
class EpollTcpServer : public ETBase {
public:
EpollTcpServer() = default;
EpollTcpServer(const EpollTcpServer& other) = delete;
EpollTcpServer& operator=(const EpollTcpServer& other) = delete;
EpollTcpServer(EpollTcpServer&& other) = delete;
EpollTcpServer& operator=(EpollTcpServer&& other) = delete;
~EpollTcpServer() override;
// the local ip and port of tcp server
EpollTcpServer(const std::string& local_ip, uint16_t local_port);
public:
// start tcp server
bool Start() override;
// stop tcp server
bool Stop() override;
// send packet
int32_t SendData(const PacketPtr& data) override;
// register a callback when packet received
void RegisterOnRecvCallback(callback_recv_t callback) override;
void UnRegisterOnRecvCallback() override;
protected:
// create epoll instance using epoll_create and return a fd of epoll
int32_t CreateEpoll();
// create a socket fd using api socket()
int32_t CreateSocket();
// set socket noblock
int32_t MakeSocketNonBlock(int32_t fd);
// listen()
int32_t Listen(int32_t listenfd);
// add/modify/remove a item(socket/fd) in epoll instance(rbtree), for this example, just add a socket to epoll rbtree
int32_t UpdateEpollEvents(int efd, int op, int fd, int events);
// handle tcp accept event
void OnSocketAccept();
// handle tcp socket readable event(read())
void OnSocketRead(int32_t fd);
// handle tcp socket writeable event(write())
void OnSocketWrite(int32_t fd);
// one loop per thread, call epoll_wait and return ready socket(accept,readable,writeable,error...)
void EpollLoop();
private:
std::string local_ip_; // tcp local ip
uint16_t local_port_ { 0 }; // tcp bind local port
int32_t handle_ { -1 }; // listenfd
int32_t efd_ { -1 }; // epoll fd
std::shared_ptr<std::thread> th_loop_ { nullptr }; // one loop per thread(call epoll_wait in loop)
bool loop_flag_ { true }; // if loop_flag_ is false, then exit the epoll loop
callback_recv_t recv_callback_ { nullptr }; // callback when received
};
using ETServer = EpollTcpServer;
typedef std::shared_ptr<ETServer> ETServerPtr;
EpollTcpServer::EpollTcpServer(const std::string& local_ip, uint16_t local_port)
: local_ip_ { local_ip },
local_port_ { local_port } {
}
EpollTcpServer::~EpollTcpServer() {
Stop();
}
bool EpollTcpServer::Start() {
// create epoll instance
if (CreateEpoll() < 0) {
return false;
}
// create socket and bind
int listenfd = CreateSocket();
if (listenfd < 0) {
return false;
}
// set listen socket noblock
int mr = MakeSocketNonBlock(listenfd);
if (mr < 0) {
return false;
}
// call listen()
int lr = Listen(listenfd);
if (lr < 0) {
return false;
}
std::cout << "EpollTcpServer Init success!" << std::endl;
handle_ = listenfd;
// add listen socket to epoll instance, and focus on event EPOLLIN and EPOLLOUT, actually EPOLLIN is enough
int er = UpdateEpollEvents(efd_, EPOLL_CTL_ADD, handle_, EPOLLIN | EPOLLET);
if (er < 0) {
// if something goes wrong, close listen socket and return false
::close(handle_);
return false;
}
assert(!th_loop_);
// the implementation of one loop per thread: create a thread to loop epoll
th_loop_ = std::make_shared<std::thread>(&EpollTcpServer::EpollLoop, this);
if (!th_loop_) {
return false;
}
// detach the thread(using loop_flag_ to control the start/stop of loop)
th_loop_->detach();
return true;
}
// stop epoll tcp server and release epoll
bool EpollTcpServer::Stop() {
// set loop_flag_ false to stop epoll loop
loop_flag_ = false;
::close(handle_);
::close(efd_);
std::cout << "stop epoll!" << std::endl;
UnRegisterOnRecvCallback();
return true;
}
int32_t EpollTcpServer::CreateEpoll() {
// the basic epoll api of create a epoll instance
int epollfd = epoll_create(1);
if (epollfd < 0) {
// if something goes wrong, return -1
std::cout << "epoll_create failed!" << std::endl;
return -1;
}
efd_ = epollfd;
return epollfd;
}
int32_t EpollTcpServer::CreateSocket() {
// create tcp socket
int listenfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
std::cout << "create socket " << local_ip_ << ":" << local_port_ << " failed!" << std::endl;
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(local_port_);
addr.sin_addr.s_addr = inet_addr(local_ip_.c_str());
// bind to local ip and local port
int r = ::bind(listenfd, (struct sockaddr*)&addr, sizeof(struct sockaddr));
if (r != 0) {
std::cout << "bind socket " << local_ip_ << ":" << local_port_ << " failed!" << std::endl;
::close(listenfd);
return -1;
}
std::cout << "create and bind socket " << local_ip_ << ":" << local_port_ << " success!" << std::endl;
return listenfd;
}
// set noblock fd
int32_t EpollTcpServer::MakeSocketNonBlock(int32_t fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
std::cout << "fcntl failed!" << std::endl;
return -1;
}
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (r < 0) {
std::cout << "fcntl failed!" << std::endl;
return -1;
}
return 0;
}
// call listen() api and set listen queue size using SOMAXCONN
int32_t EpollTcpServer::Listen(int32_t listenfd) {
int r = ::listen(listenfd, SOMAXCONN);
if ( r < 0) {
std::cout << "listen failed!" << std::endl;
return -1;
}
return 0;
}
// add/modify/remove a item(socket/fd) in epoll instance(rbtree), for this example, just add a socket to epoll rbtree
int32_t EpollTcpServer::UpdateEpollEvents(int efd, int op, int fd, int events) {
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = events;
ev.data.fd = fd; // ev.data is a enum
fprintf(stdout,"%s fd %d events read %d write %d\n", op == EPOLL_CTL_MOD ? "mod" : "add", fd, ev.events & EPOLLIN, ev.events & EPOLLOUT);
int r = epoll_ctl(efd, op, fd, &ev);
if (r < 0) {
std::cout << "epoll_ctl failed!" << std::endl;
return -1;
}
return 0;
}
// handle accept event
void EpollTcpServer::OnSocketAccept() {
// epoll working on et mode, must read all coming data, so use a while loop here
while (true) {
struct sockaddr_in in_addr;
socklen_t in_len = sizeof(in_addr);
// accept a new connection and get a new socket
int cli_fd = accept(handle_, (struct sockaddr*)&in_addr, &in_len);
if (cli_fd == -1) {
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK) ) {
// read all accept finished(epoll et mode only trigger one time,so must read all data in listen socket)
std::cout << "accept all coming connections!" << std::endl;
break;
} else {
std::cout << "accept error!" << std::endl;
continue;
}
}
sockaddr_in peer;
socklen_t p_len = sizeof(peer);
// get client ip and port
int r = getpeername(cli_fd, (struct sockaddr*)&peer, &p_len);
if (r < 0) {
std::cout << "getpeername error!" << std::endl;
continue;
}
std::cout << "accpet connection from " << inet_ntoa(in_addr.sin_addr) << std::endl;
int mr = MakeSocketNonBlock(cli_fd);
if (mr < 0) {
::close(cli_fd);
continue;
}
// add this new socket to epoll instance, and focus on EPOLLIN and EPOLLOUT and EPOLLRDHUP event
int er = UpdateEpollEvents(efd_, EPOLL_CTL_ADD, cli_fd, EPOLLIN | EPOLLRDHUP | EPOLLET);
if (er < 0 ) {
// if something goes wrong, close this new socket
::close(cli_fd);
continue;
}
}
}
// register a callback when packet received
void EpollTcpServer::RegisterOnRecvCallback(callback_recv_t callback) {
assert(!recv_callback_);
recv_callback_ = callback;
}
void EpollTcpServer::UnRegisterOnRecvCallback() {
assert(recv_callback_);
recv_callback_ = nullptr;
}
// handle read events on fd
void EpollTcpServer::OnSocketRead(int32_t fd) {
char read_buf[4096];
bzero(read_buf, sizeof(read_buf));
int n = -1;
// epoll working on et mode, must read all data
while ( (n = ::read(fd, read_buf, sizeof(read_buf))) > 0) {
// callback for recv
std::cout << "fd: " << fd << " recv: " << read_buf << std::endl;
std::string msg(read_buf, n);
// create a recv packet
PacketPtr data = std::make_shared<Packet>(fd, msg);
if (recv_callback_) {
// handle recv packet
recv_callback_(data);
}
}
if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// read all data finished
return;
}
// something goes wrong for this fd, should close it
::close(fd);
return;
}
if (n == 0) {
// this may happen when client close socket. EPOLLRDHUP usually handle this, but just make sure; should close this fd
::close(fd);
return;
}
}
// handle write events on fd (usually happens when sending big files)
void EpollTcpServer::OnSocketWrite(int32_t fd) {
// TODO(smaugx) not care for now
std::cout << "fd: " << fd << " writeable!" << std::endl;
}
// send packet
int32_t EpollTcpServer::SendData(const PacketPtr& data) {
if (data->fd == -1) {
return -1;
}
// send packet on fd
int r = ::write(data->fd, data->msg.data(), data->msg.size());
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return -1;
}
// error happend
::close(data->fd);
std::cout << "fd: " << data->fd << " write error, close it!" << std::endl;
return -1;
}
std::cout << "fd: " << data->fd << " write size: " << r << " ok!" << std::endl;
return r;
}
// one loop per thread, call epoll_wait and handle all coming events
void EpollTcpServer::EpollLoop() {
// request some memory, if events ready, socket events will copy to this memory from kernel
struct epoll_event* alive_events = static_cast<epoll_event*>(calloc(kMaxEvents, sizeof(epoll_event)));
if (!alive_events) {
std::cout << "calloc memory failed for epoll_events!" << std::endl;
return;
}
// if loop_flag_ is false, will exit this loop
while (loop_flag_) {
// call epoll_wait and return ready socket
int num = epoll_wait(efd_, alive_events, kMaxEvents, kEpollWaitTime);
for (int i = 0; i < num; ++i) {
// get fd
int fd = alive_events[i].data.fd;
// get events(readable/writeable/error)
int events = alive_events[i].events;
if ( (events & EPOLLERR) || (events & EPOLLHUP) ) {
std::cout << "epoll_wait error!" << std::endl;
// An error has occured on this fd, or the socket is not ready for reading (why were we notified then?).
::close(fd);
} else if (events & EPOLLRDHUP) {
// Stream socket peer closed connection, or shut down writing half of connection.
// more inportant, We still to handle disconnection when read()/recv() return 0 or -1 just to be sure.
std::cout << "fd:" << fd << " closed EPOLLRDHUP!" << std::endl;
// close fd and epoll will remove it
::close(fd);
} else if ( events & EPOLLIN ) {
std::cout << "epollin" << std::endl;
if (fd == handle_) {
// listen fd coming connections
OnSocketAccept();
} else {
// other fd read event coming, meaning data coming
OnSocketRead(fd);
}
} else if ( events & EPOLLOUT ) {
std::cout << "epollout" << std::endl;
// write event for fd (not including listen-fd), meaning send buffer is available for big files
OnSocketWrite(fd);
} else {
std::cout << "unknow epoll event!" << std::endl;
}
} // end for (int i = 0; ...
} // end while (loop_flag_)
free(alive_events);
}
} // end namespace transport
} // end namespace mux
using namespace mux;
using namespace transport;
int main(int argc, char* argv[]) {
std::string local_ip {"127.0.0.1"};
uint16_t local_port { 6666 };
if (argc >= 2) {
local_ip = std::string(argv[1]);
}
if (argc >= 3) {
local_port = std::atoi(argv[2]);
}
// create a epoll tcp server
auto epoll_server = std::make_shared<EpollTcpServer>(local_ip, local_port);
if (!epoll_server) {
std::cout << "tcp_server create faield!" << std::endl;
exit(-1);
}
// recv callback in lambda mode, you can set your own callback here
auto recv_call = [&](const PacketPtr& data) -> void {
// just echo packet
epoll_server->SendData(data);
return;
};
// register recv callback to epoll tcp server
epoll_server->RegisterOnRecvCallback(recv_call);
// start the epoll tcp server
if (!epoll_server->Start()) {
std::cout << "tcp_server start failed!" << std::endl;
exit(1);
}
std::cout << "############tcp_server started!################" << std::endl;
// block here
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
epoll_server->Stop();
return 0;
}
/*
g++ epoll_server2.cpp -o epoll_server2 -std=c++11 -lpthread
*/
3. epoll client server验证
总结
通过本文的学习,你应该对epoll有了深入的 理解,希望的对你有所帮助。