基于多线程的Reactor模式的 回声服务器 EchoServer

news2024/9/29 11:29:00

记录下  

一个线程专门用来接受accept获取客户端的fd 

获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程

然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd

线程之间通过eventfd来通信  将客户端的fd传到 对应的线程中  

参考了MediaServer   引入EventPollerPoll 和 EventPoller的 概念  

最少两个两个线程 设置为1的话 会改成2

cpp代码:

#include "durian.h"

#include <sys/epoll.h>

namespace DURIAN
{

	EventPoller::EventPoller(int id)
	{
		m_id = id;
	}

	EventPoller::~EventPoller()
	{


		printf("~EventPoller signal m_id = %d m_run_flag = %d\n",m_id,m_run_flag);

		
		Wait();

		
	}
    bool EventPoller::Init()
    {
        m_poll_fd = epoll_create1(0);
        if(m_poll_fd == -1)
        {
            return false;
        }

		  m_event_fd = eventfd(0,0);
		  if(m_event_fd == -1)
		  {
			  printf("new fd failed\n");
			  close(m_poll_fd);
			  return false ;
		  }

		  
        return true;
    }

	 void EventPoller::RunLoop()
    {
		static const int MAX_EVENTS = 1024;
		struct epoll_event events[MAX_EVENTS];
		
    	while(m_run_flag)
    	{

            int ready_count = epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);
            
            if(ready_count == -1)
            {
                if(errno != EINTR)
                {
                    //exit(1);
                }
                //ready_count = 0;
            }
            else if(ready_count == 0)
            {
                if(m_run_flag == false)
                {
                    //printf("time out and runflag = false exit thread\n");
                    //break;
                }
            }

            for(int i = 0;i<ready_count;i++)
            {
                const struct epoll_event &ev = events[i];
                int fd = events[i].data.fd;
                
                if(ev.events &(EPOLLIN | EPOLLERR |EPOLLHUP))
                {
                    auto handler = m_accept_handlers[fd];
                    handler(fd);
                }
                else if(ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
                {

		
                    auto it = m_buffer_pool.find(fd);
                    if(it!= m_buffer_pool.end())
                    {
                        auto &buf = it->second;
                        if(buf.WriteData(fd) == false)
                        {
                            Close(fd);
                        }
                    }
                }
            }
        }
    }


	int EventPoller::GetEventFD()
	{
		return m_event_fd;
	}

	int EventPoller::GetClients()
	{
		return m_accept_handlers.size();
	}


	void EventPoller::Stop()
	{
		m_run_flag = false;
	}
	void EventPoller::Start()
	{
		//printf("Enter EventPoller Start  m_id = %d pollfd = %d eventid = %d\n",m_id,m_poll_fd,m_event_fd);
		m_run_flag = true;
		m_thread_id = std::thread(&EventPoller::RunLoop,this);

	}

	void EventPoller::Wait()
	{
		if(m_thread_id.joinable())
		{
			m_thread_id.join();
		}
	}
	bool EventPoller::Add2Epoll(int fd)
	{
		 if(m_accept_handlers.count(fd) != 0)
        {
            return false;
        }
        int flags = 1;
        if(ioctl(fd,FIONBIO,&flags) == -1)
        {
            return false;
        }

        struct epoll_event ev;
        ev.events = EPOLLIN |EPOLLOUT |EPOLLET;
        ev.data.fd = fd;
        if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd,&ev)==-1)
        {
            return false;
        }

        return true;
	}


	void EventPoller::DeliverConn(int conn_fd)
	{
		//printf("DeliverConn fd = %d\n",conn_fd);
		uint64_t count = conn_fd;

		
		if(write(m_event_fd,&count,sizeof(count)) == -1)
		{
			printf("Deliverconn write failed\n");
		}
	}

	bool EventPoller::AddListener(int fd,ACCEPTER on_accept)
   {
		if(Add2Epoll(fd) == false)
		{
		   return false;
		}

		std::cout<<"EventPoller AddListener fd = "<<fd<<std::endl;

		m_accept_handlers[fd] = [this,on_accept]( int server_fd){
			for(;;)
			{
				int new_fd = accept(server_fd,nullptr,nullptr);
				std::cout<<"accept client fd = "<<new_fd<<std::endl;	
				if(new_fd == -1)
				{
				  if(errno!= EAGAIN)
				  {
				      Close(server_fd);
				  }
				  return 0;
				}
		
				int enable = 1;
				setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY,&enable,sizeof(enable));
				on_accept(new_fd);
				
			}
			return 0;
		};
		return true;
   }



	bool EventPoller::AddEventer(int fd, EVENTER on_event)
	{
		if(Add2Epoll(fd) == false)
		{
		   return false;
		}

		m_accept_handlers[fd] = [this,on_event](int cfd){
            for(;;)
            {
                uint64_t count;
                if(read(cfd,&count,sizeof(count)) == -1)
                {
                    if(errno != EAGAIN)
                    {
                        Close(cfd);
                    }
                    return 0;
                }
                on_event(count);
            }
				return 0;
        };

        return true;
		
	}


	bool EventPoller::AddReader(int fd, READER on_read)
	{	
		if(Add2Epoll(fd) == false)
		{
		   return false;
		}


     m_accept_handlers[fd] = [this,on_read](int cfd){
         for(;;)
         {
             char buf[4096] = {0};
             ssize_t ret = read(cfd,buf,sizeof(buf));
             if(ret == -1)
             {
                 if(errno != EAGAIN)
                 {
                     Close(cfd);
                 }
                 return -1;
             }

             if(ret == 0)
             {
                 Close(cfd);
					  printf("客户端关闭了连接 %d\n",cfd);
                 return 0 ;
             }

             on_read(cfd,buf,ret);

         }
     };
		return true;
	}


	void EventPoller::Close(int fd)
	{
		m_accept_handlers.erase(fd);
		m_buffer_pool.erase(fd);
		close(fd);
	}

	bool EventPoller::FlushData(int fd, const char * buf, size_t len)
	{
		WriteBuffer *wb = nullptr;
		auto it = m_buffer_pool.find(fd);
		if(it == m_buffer_pool.end())
		{
			while(len >0)
			{
				ssize_t ret = write(fd,buf,len);
				if(ret == -1)
				{
					if(errno != EAGAIN)
					{
						Close(fd);
						return false;
					}
					wb = &m_buffer_pool[fd];
					break;
				}
				buf+= ret;
				len-=ret;
			}

			if(len == 0)
			{
				//Success
				return true;
			}
		}
		else
		{
			wb = &it->second;
		}
		wb->Add2Buffer(buf,len);
		return true;
	}


static size_t g_pool_size = 0;
void EventPollerPool::SetPoolSize(size_t size)
{
	g_pool_size = size;
}
EventPollerPool & EventPollerPool::Instance()
{
	static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool()); 
	static EventPollerPool &s_instance_ref = *s_instance; 
	return s_instance_ref; 
}

EventPollerPool::EventPollerPool()
{
	auto size = g_pool_size;
	auto cpus = std::thread::hardware_concurrency();
	size = size > 0 ? size : cpus;

	std::cout<<"Thread size:"<<size<<std::endl;

	if(size <2)size = 2;

	
	for (int i = 0; i < size; ++i) {

		std::shared_ptr<EventPoller> poller = std::make_shared<EventPoller>(i);
		m_pollers.emplace_back(poller);
	}
}

std::shared_ptr<EventPoller> EventPollerPool::GetPoller()
{
	if(m_pollers.size()>1)
	{
		int min_clients = 10000;
		int target_index = 0;
		for(int i = 1;i<m_pollers.size();i++)
		{
			if(m_pollers[i]-> GetClients() < min_clients)
			{
				min_clients = m_pollers[i]->GetClients();
				target_index = i;
			}
			
		}
		
		//printf("target index = %d min_clients = %d\n",target_index,min_clients);
		return m_pollers[target_index];
	}
	return m_pollers[0];
	
}
std::shared_ptr<EventPoller> EventPollerPool::GetFirstPoller()
{
	return m_pollers[0];
}


void EventPollerPool::StartPollers()
{

	for(int i = 1;i<m_pollers.size();i++)
	{

			m_pollers[i]->Init();
			int event_fd = m_pollers[i]->GetEventFD();

			m_pollers[i]->AddEventer(event_fd,[&,i](uint64_t cfd){

					READER reader = [&,i](int fd,const char*data,size_t len){
						printf("Len[%s] content[%d] m_pollers[i] = %p i = %d\n",data,len,m_pollers[i],i);
						m_pollers[i]->FlushData(fd,data,len);
						
						return 0;
						};
					m_pollers[i]->AddReader(cfd,reader);
					 return 0;
				});

			m_pollers[i]->Start();	

	}
}


void EventPollerPool::Stop()
{
	for(int i = 0;i<m_pollers.size();i++)
	{
		m_pollers[i]->Stop();
	}

}

}

头文件



#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netinet/tcp.h>

#include <sys/eventfd.h>
#include <signal.h>


#include <iostream>
#include <memory>
#include <list>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>

#include <unordered_map>

namespace DURIAN
{



	class WriteBuffer
	{
	private:
		 std::list<std::string> buf_items;
		 size_t offset = 0;
	
	public:
		 bool IsEmpty() const
		 {
			  return buf_items.empty();
		 }
		 void Add2Buffer(const char* data,size_t len)
		 {
			  if(buf_items.empty() || buf_items.back().size()+len >4096)
			  {
					buf_items.emplace_back(data,len);
			  }
			  else
			  {
					buf_items.back().append(data,len);
			  }
		 }
		 bool WriteData(int fd)
		 {
			  while (IsEmpty() == false)
			  {
					auto const &item = buf_items.front();
					const char *p = item.data() + offset;
					size_t len = item.size() -offset;
	
					while(len >0)
					{
						 ssize_t ret = write(fd,p,len);
						 if(ret == -1)
						 {
							  if(errno == EAGAIN)
							  {
									return true;
							  }
							  return false;
						 }
						 offset += ret;
						 p+=ret;
						 len-= ret;
					}
	
					buf_items.pop_front();
			  }
	
			  return true;
			  
		 }
	};


	using ACCEPTER = std::function<int(int)>;
	using WRITER = std::function<int(int)>;
	using EVENTER = std::function<int(int)>;
	using READER = std::function<int(int,const char *data,size_t)>;

	//static thread_local std::unordered_map<int fd,READER>g_th_handlers;


	class EventPoller{
		private:
			int m_poll_fd = -1;
			int m_id;
			bool m_run_flag = false;
		   std::unordered_map<int,ACCEPTER> m_accept_handlers;

		   std::unordered_map<int,WriteBuffer> m_buffer_pool;

			std::mutex m_connction_lock;
			int m_event_fd;
			std::thread m_thread_id ;
			std::vector<int>m_connections;
			
			void RunLoop();

		public:
			EventPoller(int i);
			~EventPoller();
			int GetEventFD();
			int GetClients();
			std::vector<int> & GetConnections();
			bool Init();
			void Start();
			void Stop();
			void Wait();	
			void DeliverConn(int conn_fd);
			bool AddListener(int fd,ACCEPTER on_listen);
			bool AddEventer(int fd,EVENTER on_event);
			bool AddReader(int fd,READER on_read);
			void Close(int fd);
			bool Add2Epoll(int fd);
			bool FlushData(int fd,const char *buf,size_t len);
			


	};

	class EventPollerPool
	{
		public:
		  static EventPollerPool &Instance();
		  static void SetPoolSize(size_t size = 0);
		  std::shared_ptr<EventPoller>GetPoller(); 
		  std::shared_ptr<EventPoller>GetFirstPoller(); 	
		  void StartPollers();
		  void Stop(); 
		private:
			int m_size;
			std::vector<std::shared_ptr<EventPoller>> m_pollers;
			EventPollerPool();		  
	};


	
}

main文件

#include "durian.h"

static bool g_run_flag = true;
void sig_handler(int signo)
{
	signal(SIGINT, SIG_IGN);
	signal(SIGTERM, SIG_IGN);
	signal(SIGKILL, SIG_IGN);

	g_run_flag = false;
	printf("Get exit flag\n");

	if (SIGINT == signo || SIGTSTP == signo || SIGTERM == signo|| SIGKILL == signo)
	{
        g_run_flag = false;

		printf("\033[0;31mprogram exit by kill cmd !\033[0;39m\n");
		

	}

}




bool StartServer()
{
	 int listen_fd = socket(AF_INET,SOCK_STREAM,0);
	 if(listen_fd == -1)
	 {
		  printf("Create socket failed\n");
		  return false;
	 }
	 else
	 {
		  printf("Server listen fd is:%d\n",listen_fd);
	 }

	 int reuseaddr = 1;
	 if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr ,sizeof(reuseaddr)) == -1)
	 {
		  return false;
	 }

	 struct sockaddr_in listen_addr = {0};
	 listen_addr.sin_family  = AF_INET;
	 listen_addr.sin_addr.s_addr = INADDR_ANY;
	 listen_addr.sin_port = htons(8888);

	 if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1)
	 {
	 	printf("bind failed\n");
		  return false;
	 }

	 if(listen(listen_fd,100) == -1)
	 {
	 	 printf("listen failed\n");
		  return false;
	 }


	 DURIAN::EventPollerPool::SetPoolSize(1);

	 
	 DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); 


	pool.StartPollers();


	 auto poller = pool.GetFirstPoller(); 
	 
	 if(poller->Init())
	 {
		if(poller->AddListener(listen_fd,[&](int conn_fd){
			printf("将新的fd加到epoll监听 fd =%d\n",conn_fd);
			//Deliver client fd to other pollers
			
				pool.GetPoller()->DeliverConn(conn_fd);
				return 0;

		}) == false)
		{
			return false;
		}
	 	poller->Start();

	 }



	while(g_run_flag)
	{
		sleep(2);
	}

	pool.Stop();

}




void StopServer()
{
	DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); 
	pool.Stop();
}

int main(int argc,char *argv[])
{
	printf(" cpp version :%d\n",__cplusplus);


	int thread_size = 1;

	bool run_flag = true;

	signal(SIGPIPE,SIG_IGN);

	signal(SIGTERM, sig_handler);
	signal(SIGKILL, sig_handler);
	signal(SIGINT,sig_handler); 

	
	StartServer();
	
	return 0;
}

性能测试

ulimit -HSn 102400

ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1073279.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[java基础学习]之DOS命令

#java基础学习 1.常用的DOS命令&#xff1a; dir:列出当前目录下的文件以及文件夹 md: 创建目录 rd:删除目录cd:进入指定目录 cd.. :退回到上级目录 cd\ : 退回到根目录 del:删除文件 exit:退出dos命令行 1.dir:列出当前目录下的文件以及文件夹 2.md: 创建目录 …

解决Adobe Premiere Pro CC 2018打开无反应,并出现.crash的文件问题

一 问题描述 Adobe Premiere Pro CC 2018软件安装完成后&#xff0c;打开该软件没反应&#xff0c;且打开时桌面会出现Crash文件&#xff01; 二 解决方法 如果Adobe Premiere Pro CC 2018在打开时无反应&#xff0c;并出现.crash文件的问题&#xff0c;可以尝试以下解决方法…

KekeBlog项目实战(更新中)

一、前言 1. 项目简介 本项目是前后端分离项目&#xff0c;而我们所做的只有完整的后端开发工作&#xff0c;前端已经写好&#xff0c;故不做任何开发&#xff0c;仅开发后端。项目包含完整的后端中前台和后台的代码编写 前端项目下载链接&#xff1a; https://pan.baidu.c…

Git仓库迁移记录

背景&#xff1a;gitlab私服上面&#xff0c;使用 import project的方式&#xff0c;从旧项目迁移到新地址仓库&#xff0c;但是代码一直没拉过去。所以使用命令的方式&#xff0c;进行代码迁移。 第一步&#xff1a;使用git clone --mirror git地址&#xff0c;进行代码克隆 …

建立数据科学基础设施的绝佳指南 数据工程师都该人手一册

《Effective数据科学基础设施》由Netflix工程师Ville Tuulos撰写&#xff0c;以Metaflow为对象&#xff0c;介绍了数据科学所需要的基础设施&#xff0c;囊括数据准备、特征工程、模型训练、模型部署、服务和持续监控等环节。Metaflow专注于构建生产流程&#xff0c;更适合具有…

《理解深度学习》2023最新版本+习题答案册pdf

刚入门深度学习或者觉得学起来很困难的同学看过来了&#xff0c;今天分享的这本深度学习教科书绝对适合你。 就是这本已在外网获13.1万次下载的宝藏教科书《理解深度学习》。本书由巴斯大学计算机科学教授Simon J.D. Prince撰写&#xff0c;全书共541页&#xff0c;目前共有21…

【嵌入式C内存管理】

记录嵌入式C内存划分&#xff0c;后续会更新动态内存管理 1. 内存划分 栈区 stack有时也称为堆栈&#xff0c;重点在栈字&#xff0c;存放函数内部临时变量堆区 heap也就是动态申请&#xff08;malloc&#xff09;、释放(free)的内存区域数据区 data初始化的全局变量和静态变量…

opengauss数据备份(docker中备份)

首先如果想直接在宿主机上进行使用gs_dump备份需要glibc的版本到2.34及以上&#xff0c;查看版本命令为 ldd --version 如图所示&#xff0c;本宿主机并不满足要求&#xff0c;所以转向在docker容器中进行备份&#xff0c; 然后进入opengauss容器中&#xff0c;命令为 docker…

Vue3 + Ts实现NPM插件 - 定制loading

目录 你的 Loading&#x1f916; 安装&#x1f6f9; 简介苍白请 您移步文档&#xff1a;✈️ 使用方法&#x1f6e0;️ 配置 loading 类型&#x1f3b2; 定制 loading 色彩 &#x1f4a1; 注意事项 前期回顾 你的 Loading 开箱即可用的 loading&#xff0c; 说明&#xff1a;vu…

当数字孪生与智慧园区结合,能够实现什么样的应用?

随着数字化进程的加深&#xff0c;数字孪生技术也越来越为大家所重视。那么&#xff0c;数字孪生技术在智慧园区中能够发挥什么样的作用&#xff1f;本文将根据山海鲸可视化智慧园区三维可视化系统&#xff0c;为大家进行说明。 一、基本概念 为了方便大家了解&#xff0c;这…

基于Springboot实现学生毕业离校系统项目【项目源码+论文说明】分享

基于Springboot实现学生毕业离校系统演示 摘要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;新生宿舍管理系统当然也不能排除在外。新生宿舍管理系统是以实际运用为开发背景…

使用echarts绘制3DChart图表

使用3DChart需要安装echarts和echarts-gl。否则图标不显示。 版本要对应 “echarts”: “^5.2.2”, “echarts-gl”: “^2.0.9”, main.js // main.js 引入echarts方式如下 import echarts-gl //如果使用3DEchart图标需要下载个引入对应版本的 import * as echarts from echa…

旅游网站HTML

代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>旅游网</title> </head> <body><!--采用table编辑--> <!--最晚曾table,用于整个页面那布局--><table width&q…

【星海出品】ansible入门(四)playbook kolla开源例子

简介 Kolla-ansible项目提供一个完整的Ansible Playbook&#xff0c;来部署Docker的镜像&#xff0c;再完成openstack组件的自动化部署。并提供all-in-one和multihost的环境。 安装后会将kolla-ansible内置为一个shell启动文件。 kolla-ansible: /usr/local/bin/kolla-ansible…

获取手机号归属地详情,精准高效的API接口服务

在现代社会中&#xff0c;通讯工具如手机成为了人们生活中不可缺少的部分。但是&#xff0c;有时我们会收到陌生电话&#xff0c;需要了解电话号码的归属地以判断其可信性。这个时候&#xff0c;获取手机号归属地的API接口服务就会发挥重要作用。 一、API接口服务简介 API接口…

解决video层级过高在app的问题

直接上代码,写一个组件 <template><iframe :onload"inner"></iframe> </template> <script>export default {props: {src: {}},data() {return {inner: }},created() {this.inner this.contentWindow.document.body.innerHTML <v…

一文搞懂二叉树后序遍历的三种方法

系列文章&#xff1a; 相关题目&#xff1a; 145. 二叉树的后序遍历 先序遍历结果为&#xff1a;4 5 2 6 7 3 1 总体上分为两种框架&#xff0c;递归框架和非递归框架&#xff0c;递归框架又分为两种思路&#xff1a;分解思路和遍历思路。 递归 1、分解思路 【分解为子问题】…

直线模组的应用场景

直线模组是一种由直线导轨、滑块、驱动部件等组成的直线运动系统&#xff0c;具有高精度、高速度、高效率等特点。直线模组被广泛应用于各种机械设备中&#xff0c;以下是其主要的应用场景&#xff1a; 1、数控机床&#xff1a;直线模组是数控机床中的重要组成部分&#xff0c;…

过滤器的实现及其原理责任链设计模式

Filter过滤器 过滤器的应用 DeptServlet,EmpServlet,OrderServlet三个业务类的业务方法执行之前都需要编写判断用户是否登录和解决的中文乱码的代码,代码没有得到重复利用 Filter是过滤器可以用来编写请求的过滤规则和多个Servlet都会执行的公共代码,Filter中的业务代码既可…

docker安装Jenkins完整教程

1.docker拉取 Jenkins镜像并启动容器 新版本的Jenkins依赖于JDK11 我们选择docker中jdk11版本的镜像 # 拉取镜像 docker pull jenkins/jenkins:2.346.3-2-lts-jdk11 2.宿主机上创建文件夹 # 创建Jenkins目录文件夹 mkdir -p /data/jenkins_home # 设置权限 chmod 777 -R /dat…