记录下
一个线程专门用来接受accept获取客户端的fd
获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程
然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd
线程之间通过eventfd来通信 将客户端的fd传到 对应的线程中
参考了MediaServer 引入EventPollerPoll 和 EventPoller的 概念
最少两个两个线程 设置为1的话 会改成2
cpp代码:
#include "durian.h"
#include <sys/epoll.h>
namespace DURIAN
{
EventPoller::EventPoller(int id)
{
m_id = id;
}
EventPoller::~EventPoller()
{
printf("~EventPoller signal m_id = %d m_run_flag = %d\n",m_id,m_run_flag);
Wait();
}
bool EventPoller::Init()
{
m_poll_fd = epoll_create1(0);
if(m_poll_fd == -1)
{
return false;
}
m_event_fd = eventfd(0,0);
if(m_event_fd == -1)
{
printf("new fd failed\n");
close(m_poll_fd);
return false ;
}
return true;
}
void EventPoller::RunLoop()
{
static const int MAX_EVENTS = 1024;
struct epoll_event events[MAX_EVENTS];
while(m_run_flag)
{
int ready_count = epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);
if(ready_count == -1)
{
if(errno != EINTR)
{
//exit(1);
}
//ready_count = 0;
}
else if(ready_count == 0)
{
if(m_run_flag == false)
{
//printf("time out and runflag = false exit thread\n");
//break;
}
}
for(int i = 0;i<ready_count;i++)
{
const struct epoll_event &ev = events[i];
int fd = events[i].data.fd;
if(ev.events &(EPOLLIN | EPOLLERR |EPOLLHUP))
{
auto handler = m_accept_handlers[fd];
handler(fd);
}
else if(ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
{
auto it = m_buffer_pool.find(fd);
if(it!= m_buffer_pool.end())
{
auto &buf = it->second;
if(buf.WriteData(fd) == false)
{
Close(fd);
}
}
}
}
}
}
int EventPoller::GetEventFD()
{
return m_event_fd;
}
int EventPoller::GetClients()
{
return m_accept_handlers.size();
}
void EventPoller::Stop()
{
m_run_flag = false;
}
void EventPoller::Start()
{
//printf("Enter EventPoller Start m_id = %d pollfd = %d eventid = %d\n",m_id,m_poll_fd,m_event_fd);
m_run_flag = true;
m_thread_id = std::thread(&EventPoller::RunLoop,this);
}
void EventPoller::Wait()
{
if(m_thread_id.joinable())
{
m_thread_id.join();
}
}
bool EventPoller::Add2Epoll(int fd)
{
if(m_accept_handlers.count(fd) != 0)
{
return false;
}
int flags = 1;
if(ioctl(fd,FIONBIO,&flags) == -1)
{
return false;
}
struct epoll_event ev;
ev.events = EPOLLIN |EPOLLOUT |EPOLLET;
ev.data.fd = fd;
if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd,&ev)==-1)
{
return false;
}
return true;
}
void EventPoller::DeliverConn(int conn_fd)
{
//printf("DeliverConn fd = %d\n",conn_fd);
uint64_t count = conn_fd;
if(write(m_event_fd,&count,sizeof(count)) == -1)
{
printf("Deliverconn write failed\n");
}
}
bool EventPoller::AddListener(int fd,ACCEPTER on_accept)
{
if(Add2Epoll(fd) == false)
{
return false;
}
std::cout<<"EventPoller AddListener fd = "<<fd<<std::endl;
m_accept_handlers[fd] = [this,on_accept]( int server_fd){
for(;;)
{
int new_fd = accept(server_fd,nullptr,nullptr);
std::cout<<"accept client fd = "<<new_fd<<std::endl;
if(new_fd == -1)
{
if(errno!= EAGAIN)
{
Close(server_fd);
}
return 0;
}
int enable = 1;
setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY,&enable,sizeof(enable));
on_accept(new_fd);
}
return 0;
};
return true;
}
bool EventPoller::AddEventer(int fd, EVENTER on_event)
{
if(Add2Epoll(fd) == false)
{
return false;
}
m_accept_handlers[fd] = [this,on_event](int cfd){
for(;;)
{
uint64_t count;
if(read(cfd,&count,sizeof(count)) == -1)
{
if(errno != EAGAIN)
{
Close(cfd);
}
return 0;
}
on_event(count);
}
return 0;
};
return true;
}
bool EventPoller::AddReader(int fd, READER on_read)
{
if(Add2Epoll(fd) == false)
{
return false;
}
m_accept_handlers[fd] = [this,on_read](int cfd){
for(;;)
{
char buf[4096] = {0};
ssize_t ret = read(cfd,buf,sizeof(buf));
if(ret == -1)
{
if(errno != EAGAIN)
{
Close(cfd);
}
return -1;
}
if(ret == 0)
{
Close(cfd);
printf("客户端关闭了连接 %d\n",cfd);
return 0 ;
}
on_read(cfd,buf,ret);
}
};
return true;
}
void EventPoller::Close(int fd)
{
m_accept_handlers.erase(fd);
m_buffer_pool.erase(fd);
close(fd);
}
bool EventPoller::FlushData(int fd, const char * buf, size_t len)
{
WriteBuffer *wb = nullptr;
auto it = m_buffer_pool.find(fd);
if(it == m_buffer_pool.end())
{
while(len >0)
{
ssize_t ret = write(fd,buf,len);
if(ret == -1)
{
if(errno != EAGAIN)
{
Close(fd);
return false;
}
wb = &m_buffer_pool[fd];
break;
}
buf+= ret;
len-=ret;
}
if(len == 0)
{
//Success
return true;
}
}
else
{
wb = &it->second;
}
wb->Add2Buffer(buf,len);
return true;
}
static size_t g_pool_size = 0;
void EventPollerPool::SetPoolSize(size_t size)
{
g_pool_size = size;
}
EventPollerPool & EventPollerPool::Instance()
{
static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool());
static EventPollerPool &s_instance_ref = *s_instance;
return s_instance_ref;
}
EventPollerPool::EventPollerPool()
{
auto size = g_pool_size;
auto cpus = std::thread::hardware_concurrency();
size = size > 0 ? size : cpus;
std::cout<<"Thread size:"<<size<<std::endl;
if(size <2)size = 2;
for (int i = 0; i < size; ++i) {
std::shared_ptr<EventPoller> poller = std::make_shared<EventPoller>(i);
m_pollers.emplace_back(poller);
}
}
std::shared_ptr<EventPoller> EventPollerPool::GetPoller()
{
if(m_pollers.size()>1)
{
int min_clients = 10000;
int target_index = 0;
for(int i = 1;i<m_pollers.size();i++)
{
if(m_pollers[i]-> GetClients() < min_clients)
{
min_clients = m_pollers[i]->GetClients();
target_index = i;
}
}
//printf("target index = %d min_clients = %d\n",target_index,min_clients);
return m_pollers[target_index];
}
return m_pollers[0];
}
std::shared_ptr<EventPoller> EventPollerPool::GetFirstPoller()
{
return m_pollers[0];
}
void EventPollerPool::StartPollers()
{
for(int i = 1;i<m_pollers.size();i++)
{
m_pollers[i]->Init();
int event_fd = m_pollers[i]->GetEventFD();
m_pollers[i]->AddEventer(event_fd,[&,i](uint64_t cfd){
READER reader = [&,i](int fd,const char*data,size_t len){
printf("Len[%s] content[%d] m_pollers[i] = %p i = %d\n",data,len,m_pollers[i],i);
m_pollers[i]->FlushData(fd,data,len);
return 0;
};
m_pollers[i]->AddReader(cfd,reader);
return 0;
});
m_pollers[i]->Start();
}
}
void EventPollerPool::Stop()
{
for(int i = 0;i<m_pollers.size();i++)
{
m_pollers[i]->Stop();
}
}
}
头文件
#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <sys/eventfd.h>
#include <signal.h>
#include <iostream>
#include <memory>
#include <list>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>
#include <unordered_map>
namespace DURIAN
{
class WriteBuffer
{
private:
std::list<std::string> buf_items;
size_t offset = 0;
public:
bool IsEmpty() const
{
return buf_items.empty();
}
void Add2Buffer(const char* data,size_t len)
{
if(buf_items.empty() || buf_items.back().size()+len >4096)
{
buf_items.emplace_back(data,len);
}
else
{
buf_items.back().append(data,len);
}
}
bool WriteData(int fd)
{
while (IsEmpty() == false)
{
auto const &item = buf_items.front();
const char *p = item.data() + offset;
size_t len = item.size() -offset;
while(len >0)
{
ssize_t ret = write(fd,p,len);
if(ret == -1)
{
if(errno == EAGAIN)
{
return true;
}
return false;
}
offset += ret;
p+=ret;
len-= ret;
}
buf_items.pop_front();
}
return true;
}
};
using ACCEPTER = std::function<int(int)>;
using WRITER = std::function<int(int)>;
using EVENTER = std::function<int(int)>;
using READER = std::function<int(int,const char *data,size_t)>;
//static thread_local std::unordered_map<int fd,READER>g_th_handlers;
class EventPoller{
private:
int m_poll_fd = -1;
int m_id;
bool m_run_flag = false;
std::unordered_map<int,ACCEPTER> m_accept_handlers;
std::unordered_map<int,WriteBuffer> m_buffer_pool;
std::mutex m_connction_lock;
int m_event_fd;
std::thread m_thread_id ;
std::vector<int>m_connections;
void RunLoop();
public:
EventPoller(int i);
~EventPoller();
int GetEventFD();
int GetClients();
std::vector<int> & GetConnections();
bool Init();
void Start();
void Stop();
void Wait();
void DeliverConn(int conn_fd);
bool AddListener(int fd,ACCEPTER on_listen);
bool AddEventer(int fd,EVENTER on_event);
bool AddReader(int fd,READER on_read);
void Close(int fd);
bool Add2Epoll(int fd);
bool FlushData(int fd,const char *buf,size_t len);
};
class EventPollerPool
{
public:
static EventPollerPool &Instance();
static void SetPoolSize(size_t size = 0);
std::shared_ptr<EventPoller>GetPoller();
std::shared_ptr<EventPoller>GetFirstPoller();
void StartPollers();
void Stop();
private:
int m_size;
std::vector<std::shared_ptr<EventPoller>> m_pollers;
EventPollerPool();
};
}
main文件
#include "durian.h"
static bool g_run_flag = true;
void sig_handler(int signo)
{
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
signal(SIGKILL, SIG_IGN);
g_run_flag = false;
printf("Get exit flag\n");
if (SIGINT == signo || SIGTSTP == signo || SIGTERM == signo|| SIGKILL == signo)
{
g_run_flag = false;
printf("\033[0;31mprogram exit by kill cmd !\033[0;39m\n");
}
}
bool StartServer()
{
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
printf("Create socket failed\n");
return false;
}
else
{
printf("Server listen fd is:%d\n",listen_fd);
}
int reuseaddr = 1;
if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr ,sizeof(reuseaddr)) == -1)
{
return false;
}
struct sockaddr_in listen_addr = {0};
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons(8888);
if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1)
{
printf("bind failed\n");
return false;
}
if(listen(listen_fd,100) == -1)
{
printf("listen failed\n");
return false;
}
DURIAN::EventPollerPool::SetPoolSize(1);
DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance();
pool.StartPollers();
auto poller = pool.GetFirstPoller();
if(poller->Init())
{
if(poller->AddListener(listen_fd,[&](int conn_fd){
printf("将新的fd加到epoll监听 fd =%d\n",conn_fd);
//Deliver client fd to other pollers
pool.GetPoller()->DeliverConn(conn_fd);
return 0;
}) == false)
{
return false;
}
poller->Start();
}
while(g_run_flag)
{
sleep(2);
}
pool.Stop();
}
void StopServer()
{
DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance();
pool.Stop();
}
int main(int argc,char *argv[])
{
printf(" cpp version :%d\n",__cplusplus);
int thread_size = 1;
bool run_flag = true;
signal(SIGPIPE,SIG_IGN);
signal(SIGTERM, sig_handler);
signal(SIGKILL, sig_handler);
signal(SIGINT,sig_handler);
StartServer();
return 0;
}
性能测试
ulimit -HSn 102400
ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html