什么是进程池?
进程池(Process Pool)是一种用于管理和复用多个进程的技术或设计模式。在进程池中,一定数量的进程会被预先创建并保持在内存中,以便在需要时立即使用,而不是每次需要进程时都重新创建新的进程,这样可以提高系统的性能和效率。
进程池通常用于需要频繁创建和销毁进程的场景,例如网络服务器等。通过预先创建一些进程并保持它们处于空闲状态,可以避免频繁创建和销毁进程所带来的开销,并且可以更好地控制同时进行的进程数量,以避免系统资源被耗尽。
一般来说,进程池包括以下几个基本组件:
1. 进程池管理器(Process Pool Manager):负责创建、管理和维护进程池中的进程,包括池中进程的初始化、分配和回收等操作。
2. 进程队列(Process Queue):用于存放空闲进程的队列,当有任务需要处理时,可以从队列中取出一个空闲进程进行任务处理。
3. 任务队列(Task Queue):用于存放需要处理的任务,当一个进程空闲时,可以从任务队列中取出一个任务进行处理。
4. 进程间通信机制:用于进程之间的通信,例如管道、共享内存、消息队列等。通过合理设计和使用进程池,可以提高系统的并发处理能力,降低系统资源消耗,同时也便于监控和管理进程。
以下是我们的简易进程池的框架。
因此在本次项目中,在面向对象思想的指导下,我们需要创建一个进程池,管理进程的相关操作。
思路
我们在写之前,首先需要明确项目的功能是什么?都需要实现哪些模块?
将大框架搭建好之后,逐步填充细节。
首先我们明确需要实现的功能是:一个父进程开辟进程池中的多个子进程,然后向子进程发送任务信息,由子进程执行任务。
实现的模块:进程池(包含子进程的创建,子进程的执行任务板块,子进程的销毁),任务模块,父进程控制块。
我们可以将进程封装为一个小类,再用进程池封装进程的类。利用匿名管道的特性实现父子进程间通信。
需要注意进程与任务间的负载均衡。
一个超级大Bug
我们知道,子进程是会继承父进程的文件信息列表的,因此当父进程以写端打开管道,其后创建的子进程将会继承当前父进程的所有wfd与rfd,但由于父进程rfd个数为0,但wfd会叠加,因此最后一个子进程将会继承前面父进程的所有wfd。也就是说,后面创建的进程,会保存前面创建的管道的写文件描述符。 因此倘若我们按从前往后的顺序关闭父进程写端同时进行wait等待,是没有结果的。
我们的解决方法是每创建一个子进程,都会关闭其从父进程那里继承来的所有写文件描述符。
当然也有别的办法,1.从后往前关闭管道,最后的管道只有父进程一个写端。
2. 将所有的写端全部结束之后再进行wait等待。
源码
task.hpp
任务模块,其内包含任务列表,与工作过程
#pragma once
#include<iostream>
#include<unistd.h>
using namespace std;
typedef void (*work_t)(int);//函数指针类型
typedef void(*task_t)();
void task1()
{
cout<<"i'm task11111, hello people! from sub process: "<<getpid()<<endl;
}
void task2()
{
cout<<"i'm task22222, hello people! from sub process: "<<getpid()<<endl;
}
void task3()
{
cout<<"i'm task33333, hello people! from sub process: "<<getpid()<<endl;
}
//任务的函数指针数组,存储任务列表
task_t taskarray[]={task1,task2,task3};
//寻找下一个派发的任务
int NextTask()
{
//随机抽取任务
return rand()%3;
}
//工作过程
void worker(int rfd)
{
while(true)
{
sleep(1);
int taskcode=0;
//接收任务码
int n=read(rfd,&taskcode,sizeof(taskcode));
//当可以读取到任务信息执行任务
if(n==sizeof(taskcode))
{
//执行任务
taskarray[taskcode]();
cout<<"task success excute... processid: "<<getpid()<<endl<<endl;
}
else//读取不到任务信息即退出
{
cout<<"no task can excute,exit... processid: "<<getpid()<<endl<<endl;
break;
}
}
}
processpool.cc
完成进程池的创建,销毁与回收等待,同时保证任务的正确执行与退出。
#include<iostream>
#include<string>
#include<vector>
#include<sys/types.h>
#include<sys/wait.h>
#include<unistd.h>
#include"task.hpp"
using namespace std;
//管理管道属性
class channel
{
public:
channel(size_t wfd,size_t pid,string name)
:_wfd(wfd)
,_process_id(pid)
,_channel_name(name)
{}
size_t wfd()
{
return _wfd;
}
size_t pid()
{
return _process_id;
}
string name()
{
return _channel_name;
}
void Close()
{
close(_wfd);
}
~channel(){}
private:
size_t _wfd;
size_t _process_id;
string _channel_name;
};
//管理进程池
class processpool
{
public:
processpool(int sub_process_num)
:_sub_process_num(sub_process_num)
{}
//创建进程池
void CreatProcessPool(work_t worker)
{
for(int i=0;i<_sub_process_num;i++)//创建管道与进程
{
int pipefd[2];
pipe(pipefd);
pid_t pid=fork();
vector<int> fds;//存放管道中除却父进程以外的写端,并在子进程中一一进行释放
if(pid==0)//子进程读
{
close(pipefd[1]);
for(int i=0;i<fds.size();i++)
{
close(fds[i]);
}
//子进程接收父进程发送的任务并完成任务
worker(pipefd[0]);
exit(0);
}
//父进程为写端
close(pipefd[0]);
string name="channel-";
name+=to_string(pid);
_channels.push_back(channel(pipefd[1],pid,name));
fds.push_back(pipefd[1]);//将父进程的wfd进行插入,当下一个子进程创建后会继承该文件信息
//因此在子进程中需要关闭继承到的写端,以免管道出现多个写端的状况
//多个写端造成后果,当父进程终止写入,管道仍旧有多个管道
//父进程发送任务给子进程
}
}
void PrintDebug()//打印进程池中进程相关信息
{
for(auto &e: _channels)
{
cout<<"_wfd: "<<e.wfd()<<"\t";
cout<<"_process_pid: "<<e.pid()<<"\t";
cout<<"_channel_name: "<<e.name()<<"\t";
cout<<endl;
}
}
//寻找下一个分配任务的子进程
int NextChannel()
{
static int cnt=0;
int ret=cnt%_channels.size();
cnt++;
return ret;
}
//发送任务信息码给子进程
void SendTaskMessage(int index,int taskcode)
{
cout<<"taskcode:"<<taskcode<<" channel id: "<<_channels[index].pid()<<endl;
int n=write(_channels[index].wfd(),&taskcode,sizeof(taskcode));
}
//杀死进程池中所有子进程
void KillAll()
{
for(int i=0;i<_channels.size();i++)
{
_channels[i].Close();
}
}
//对所有子进程进行回收
void Wait()
{
for(int i=0;i<_channels.size();i++)
{
pid_t pid=_channels[i].pid();
int ret=waitpid(pid,nullptr,0);
if(ret=pid)
cout<<"sub process already recyle success... processid: "<<pid<<endl;
else
cout<<"sub process already recyle fail fail fail!!! processid: "<<pid<<endl;
}
}
private:
int _sub_process_num;
vector<channel> _channels;
};
//控制进程池执行任务
void CtrlProcessPool(processpool Processpool,int cnt)
{
while(cnt-->0)
{
//挑选进程
int index=Processpool.NextChannel();
//挑选任务
int taskcode=NextTask();
//发送任务给进程
sleep(1);
cout<<"第"<<cnt<<"个任务"<<endl;
Processpool.SendTaskMessage(index,taskcode);
}
}
int main(int argc,char* argv[])
{
if(argc!=2)//规范启动法则
{
cout<<"Please Re-Enter! Please enter subprocess numbers!"<<endl;
return -1;
}
//启动成功
int subprocess_num=stoi(argv[1]);
//创建进程池
processpool Processpool(subprocess_num);
Processpool.CreatProcessPool(worker);
//Processpool.PrintDebug();
//控制子进程
//挑选进程与任务,并将任务发送给对应进程
CtrlProcessPool(Processpool,7);
//结束后回收子进程
//关闭写端,进而关闭子进程
Processpool.KillAll();
//父进程等待回收子进程
Processpool.Wait();
return 0;
}
代码中有详细注释。
运行结果
这里的任务与进程是整数倍的关系,因此显得比较规整。