1.什么是进程池
每一个可执行程序,在被执行前都要转化为进程,操作系统都要为其创建PCB,地址空间,页表,构建映射关系,进程池就是创建进程时,创建很多个进程,如果要执行程序,就直接使用创建好的进程。就像一次性创建5个进程,肯定是比一次创建一个创建5次的效率更好的,将任务均匀的分配给进程执行。
2.进程池原理
采用面相对象的方法对进程管理根据需求抽离出进程的主要特征,将其组织起来就是进程池。
首先fork创建多个子进程。
对进程进行控制,通过父进程向管道中写入任务码,子进程读取任务码后,执行对应的任务。
对进程进行回收。
3.进程池的实现
创建进程池有一个隐藏非常深的bug,一个管道拥有多个写端,导致进程无法正常退出。
这里的退出方式,关闭管道的写端,读端就会读到文件末尾,进而退出进程。
主要的原因就是,子进程是父进程身上扒下来的,文件描述符表和父进程是一样的。
创建第一个进程的时候,将父进程读端关闭,子进程写端关闭。
我们在创建第二进程的时候,会把父进程文件描述表中的4,也给拷贝过来,这样另一个子进程也会指向管道的写端。
这样我们不想写入任务码了,关闭父进程的写端,但是还后其他的文件描述符在指向,这样就不会读到文件的末尾,就无法将进程退出。
我么可以采用两种方式解决这种问题。
1.从后向前关闭父进程的写端,因为最后创建的进程的管道,只有父进程指向写端,最后一个进程退出了,指向倒数第二个管道的写端的fd也就关闭了,依次就可正常退出了
2.讲每个进程的wfd存到vector中,fork之后将vctotr的wfd全部关闭,最后关闭完将该进程的写端fd存入vector中。
processpool.cpp
#include <iostream>
#include <vector>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "task.hpp"
using namespace std;
enum//设置错误类型
{
useageError = 1,//使用错误
pipeError,
forkError
};
class channal
{
public:
channal(int wfd, string name, pid_t pid)
: _wfd(wfd), _name(name), _pid(pid)
{
}
int wfd()//返回该进程写端管道的fd
{
return _wfd;
}
string name()//返回该进程的名字
{
return _name;
}
pid_t pid()//返回该进程的pid
{
return _pid;
}
void Close()//关闭进程
{
close(_wfd);
}
private:
int _wfd;
string _name;
pid_t _pid;
};
class processpool
{
public:
processpool(int sum)
: _channalssum(sum)
{
}
void create(work_t work/*回调函数*/)//创建进程
{
vector<int> wfds;
int i = 0;
while (i < _channalssum)
{
int pipfd[2] = {0};
int ret = pipe(pipfd);
// children read
if (ret == -1)
{
exit(pipeError);
}
pid_t id = fork();
if (id == -1)
{
exit(forkError);
}
// 子进程工作
if (id == 0)
{
if(!wfds.empty())
{
for(auto a:wfds)
{
close(a);
}
}
// read
close(pipfd[1]);
dup2(pipfd[0], 0);
work();
exit(0);
}
//父进程
close(pipfd[0]);
string cname = "childprocess" + to_string(i);
channals.push_back(channal(pipfd[1],cname,id));
wfds.push_back(pipfd[1]);
i++;
}
}
int nextchannal()//选择管道
{
static int cnt = 0;
cnt++;
cnt %= channals.size();
return cnt;
}
void sendCode(int index ,uint32_t code)//发送任务
{
cout << "send code: " << code << " to " << channals[index].name() << " sub prorcess id: " << channals[index].pid() << endl;
write(channals[index].wfd(),&code,sizeof(code));
sleep(1);
}
//关闭所有写端
void killall()
{
for(auto &ch : channals)
{
ch.Close();
pid_t rid = waitpid(ch.pid(),nullptr,0);
if(rid == ch.pid())
{
cout<<"wait sucess pid:"<< ch.pid()<<endl;
}
}
}
private:
vector<channal> channals;
int _channalssum;
};
void ctrlProcess(processpool &prp)
{
int cnt = 5;
while(cnt)
{
//选择渠道
int index = prp.nextchannal();
//选择任务
uint32_t code = sendWork();
//发送任务
prp.sendCode(index,code);
sleep(1);
cnt--;
}
}
int main(int argc, char *argv[])
{
if (argc != 2)//参数不是 2个
{
cout << "uesageError" << endl;
return useageError;
}
int sum = stoi(argv[1]);
// 创建进程
if (sum <= 0)
{
cout << "process sum must > 0" << endl;
return forkError;
}
processpool prp(sum);
prp.create(worker);
//控制进程
ctrlProcess(prp);
//回收资源
prp.killall();
return 0;
}
task.hpp
#pragma once
using namespace std;
typedef void (*work_t)();//函数指针类型
typedef void (*task_t)();
void printlog()
{
cout << "working printlog" << endl;
}
void linkMYSQL()
{
cout << "working linkMYSQL" << endl;
}
void download()
{
cout << "working downloade" << endl;
}
task_t _task[3] = {printlog, linkMYSQL, download};
int sendWork()
{
int ret = rand() % 3;
return ret;
}
void worker()
{
//int cnt = 5;
while (1)
{
uint32_t code = 0;
size_t n = read(0, &code, sizeof(code));//从管道中读任务
if (n == sizeof(code))
_task[code]();
else if(n == 0)//说明写端关闭了,读到文件的末尾了
{
cout<<"wfd closed quite now"<<endl;
break;
}
cout << "i am working" << endl;
sleep(1);
}
}