1. 什么是进程池?
简单来说,进程池就是预先创建固定数量的工作进程,通过设计任务队列或调度算法来分配任务给空闲的进程 —— 实现“负载均衡”。
2. 进程池框架设计
枚举错误返回值:
enum { UsageError = 1, ArgError, PipeError };
i. 设定子进程数量与格式验证
设置进程池的默认调用方式: ./processpool sub_process_nums
#include <iostream>
using namespace std;
void Usage(char* argv)
{
cout << "Usage:" << endl;
cout << "\t" << "argv sub_process_nums" << endl;
}
int main(int argc, char* argv[])
{
if (argc < 2) // 命令行指令的本质是字符串, 字符串个数 < 2 时返回
{
Usage(argv[0]);
return UsageError;
}
// ...
return 0;
}
ii. 控制与管理子进程 之 " 创建通信管道与进程 "
在这部分内容开始之前,不妨先明晰一个问题: **“先创建管道再创建子进程,还是先创建子进程再创建管道?” **
要回答这个问题,我们需要了解 通信管道
的本质 及 fork 函数
:
- 通信管道 (通信信道)
当你调用 pipe()
时,操作系统会在 文件描述符表 中为管道分配两个条目,一个用于写入 —— “写端”,一个用于读取 —— “读端”;
管道的 读端
和 写端
本质上都是 文件描述符
。
- fork 函数
fork()
的主要功能是,从当前进程(父进程)中创建一个新的进程(子进程);
子进程继承了父进程的资源限制、环境变量、打开的文件描述符表、工作目录等,但它们是独立的实体。
根据以上信息,不难得出:先创建管道、再创建子进程,子进程会继承父进程打开的文件描述符表,接着只需要关闭父进程的读端、子进程的写端,即可实现父子进程间的通信(反之亦然)。
#include <unistd.h>
int main(int argc, char* argv[])
{
int sub_process_nums = stoi(argv[1]); // c标准库中的函数,将字符串转整型
if (sub_process_nums <= 0)
return ArgError;
// 1. 创建通信管道与进程
for (int i = 0; i < sub_process_nums; i++)
{
int pipefd[2];
int n = pipe(pipefd);
// 创建管道成功返回 0,失败返回 -1
if (n < 0)
return PipeError;
pid_t id = fork();
if (id == 0)
{
// child 负责读
close(pipefd[1]); // 关闭写端
// todo
exit(0); // 执行完退出
}
// father 负责写
close(pipefd[1]); // 关闭读端
}
// ...
}
int pipefd[2];
int n = pipe(pipefd);
——> 管道创建成功后,pipefd[0] 为 读端文件描述符,pipefd[1] 为 写端文件描述符。
3. 封装通信管道与进程池
i. class Channel
为了保存循环创建的通信管道和子进程信息,我们封装一个 通信管道
类型。
#include <string>
class Channel
{
public:
Channel(int wfd, pid_t process_id, const string& name)
:_wfd(wfd), _sub_process_id(process_id), _name(name)
{}
// 观察父进程创建子进程时的现象
void Debug()
{
cout << "_wfd: " << _wfd;
cout << ", _sub_process_id: " << _sub_process_id;
cout << ", _name: " << _name << endl;
}
// 增加获取管道各种信息的接口
int Wfd() { return _wfd; }
pid_t Pid() { return _sub_processs_id; }
string Name() { return _name; }
~Channel() {}
private:
int _wfd; // 写端文件描述符
pid_t _sub_process_id;
string _name;
};
我们并未在 Channel
中封装读端文件描述符,因为我们将在每次循环中对 stdin
做重定向 —— dup2(pipefd[0], 0)
,之后子进程在运行时,只需要向 标准输入stdin —— 0
中读取任务指令即可。
通信管道本质上是文件,管道的读端和写端本质上是文件描述符;
dup2() 的工作原理,是将第一个参数指定的文件描述符,复制到第二个参数指定的位置。
ii. class ProcessPool
封装进程池,是为了更好地控制与管理子进程。
#include <vector>
class ProcessPool
{
public:
ProcessPool(int sub_process_num)
:_sub_process_num(sub_process_num)
{}
~ProcessPool() {}
int CreatChannels(work_t work) // 回调函数
{
// 1. 创建通信信道和进程
for (int i = 0; i < _sub_process_num; i++)
{
// 先创建管道
int pipefd[2];
int n = pipe(pipefd);
if (n < 0)
{
return PipeError;
}
// 再创建子进程,确保父进程和子进程读写同一根管道
pid_t id = fork();
if (id == 0)
{
// child -> r
close(pipefd[1]);
// TODO
dup2(pipefd[0], 0); // 将 pipefd[0] 重定向
work(pipefd[0]); // 方便后续在子进程中观察每个管道读端的文件描述符
// sleep(100);
exit(0);
}
// father
close(pipefd[0]);
string cname = "channel--" + to_string(i);
_channels.push_back(Channel(pipefd[1], id, cname));
}
return 0;
}
private:
vector<Channel> _channels;
int _sub_process_num;
};
int CreatChannels(work_t work) { } 中有一两个细节:
一为前文提到过的,重定向;
第二,即这个函数的参数 —— 这种编程模式也叫做 回调函数 —— 将函数作为参数传递给另一个函数,以便特定条件发生时供后者调用。
我们将子进程待执行的函数,作为参数传入 CreatChannels() 中供子进程调用,后续我们只需对传入参数(传入不同的函数)进行修改,就可以让子进程执行不同的任务而不用对 CreadChannels() 函数体进行修改。
4. 负载均衡式任务调度
#include <stdlib.h>
#include <time.h>
void CtrlProcess(ProcessPool* ptr, int cnt)
{
while (cnt)
{
// a. 选择一个通道和进程
int channel = ptr->NextChannel();
// b. 选择一个任务
int task = NextTask();
// c. 发送任务
ptr->SendTask(channel, task);
sleep(1); // 每隔 1s 发送一次任务
--cnt;
}
}
int main()
{
// ...
// 1. 创建通信管道与进程
ProcessPool *processpool_ptr = new ProcessPool(sub_process_num); // sub_process_num 为要创建子进程的个数
processpool_ptr->CreatChannels(worker); // worker() 待补充
srand(time(nullptr));
// 2. 任务调度
CtrlProcess(processpool_ptr, 10); // 假定 10 个任务
cout << "task run done" << endl;
// 3. 回收进程
delete processpool_ptr;
return 0;
}
- 选择一个通道和进程
class ProcessPool
{
public:
int NextChannel()
{
static int next = 0;
int c = next;
next++;
next %= _channels.size(); // 防止越界
return c;
}
};
- 选择一个任务
typedef void(*task_t)(int, pid_t); // 函数指针类型
// 模拟任务
void PrintLog(int fd, pid_t id)
{
cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Print log task" << endl << endl;
}
void ConnectMysql(int fd, pid_t id)
{
cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Connect mysql task" << endl << endl;
}
void ReloadConf(int fd, pid_t id)
{
cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Reload conf task" << endl << endl;
}
task_t tasks[3] = {PrintLog, ConnectMysql, ReloadConf};
int NextTask()
{
return rand() % 3;
}
- 发送任务
class ProcessPool
{
public:
void SendTask(int index, int command)
{
cout << "Send task to " << _channels[index].Name() << ", pid: " << _channels[index].Pid() << endl;
write(_channels[index].Wfd(), &command, sizeof(command));
}
};
5. 子进程任务执行:通过 worker()
读取父进程指令
typedef void(*work_t)(int);// 函数指针类型
void worker(int fd)
{
while (1)
{
int code = 0;
ssize_t n = read(0, &code, sizeof(code));
if (n == sizeof(code)) // read 成功,返回值为读取到内容的大小/字节个数
{
if (code >= 3) continue;
tasks[code](fd, getpid());
}
else if (n == 0) // 父进程关闭写端后,继续读,read 返回 0
{
cout << "sub process id: " << getpid() << " is to quit ..." << endl;
break;
}
sleep(1);
}
}
6. 回收子进程
设计 KillAll()
,完成子进程和管道的回收 —— 遍历进程池中的 _channels
,关闭管道的写端,读端将管道中的数据读完后,会读到返回值 0,表示读结束。
#include <sys/wait.h>
class ProcessPool
{
public:
void KillAll()
{
for (auto& channel : _channels)
{
pid_t pid = channel.Pid(); // 子进程(管道读端进程)的 pid
close(channel.Wfd());
pid_t rid = waitpid(pid, nullptr, 0);
if (rid == pid) // wait 成功
{
cout << "wait sub process: " << pid << "success..." << endl;
}
cout << "close channel: " << channel.Name() << ", sub process is to quit.." << endl;
}
}
};
int main() { // ... // 3. 回收进程 processpool_ptr->KillAll(); delete processpool_ptr; return 0; }
程序运行情况如下:
从图中可以观察到两点信息:1. 每个读端文件描述符都是 3; 2. task run done 后,子进程并没有退出。
原因是,
1. 当父进程关闭管道的读端后, 原先分配给读端的文件描述符(3 号文件描述符)就会被释放;再次调用 pipe()
创建新管道时,OS 会重新分配这个最小未使用的文件描述符(3 号文件描述符)给新创建的管道。
2. 当子进程通过 fork()
创建时,它会继承父进程所有打开的文件描述符。回收进程调用 KillAll()
时,尽管关闭了父进程的写端,子进程仍持有对原管道写端的引用,使得读端无法按预期读到返回值 0 ,进而无法关闭子进程。
要解决 子进程持有对原管道写端的引用
的问题,我们需要定义一个 vector<int>
—— 用于保存父进程对所有管道的写端,接着让子进程在执行分配任务之前关闭所有写端 —— 修改 CreatProcess 函数。
int CreatChannels(work_t work)
{
vector<int> fds;
for (int i = 0; i < _sub_process_num; i++)
{
int pipefd[2];
int n = pipe(pipefd);
if (n < 0)
{
return PipeError;
}
fds.push_back(pipefd[1]); // 保存管道的写端
pid_t id = fork();
if (id == 0)
{
// child -> r
// close(pipefd[1]); // 不再需要单独关闭对应管道的写端
if (!fds.empty())
{
for (auto& fd : fds)
{
close(fd);
}
}
// TODO
dup2(pipefd[0], 0);
work(pipefd[0]);
// sleep(100);
exit(0);
}
// father
close(pipefd[0]);
string cname = "channel--" + to_string(i);
_channels.push_back(Channel(pipefd[1], id, cname));
}
return 0;
}
子进程正常退出,程序正常结束…