目录
2、进程池
1)理解进程池
2)进程池的实现
整体框架:
a. 加载任务
b. 先描述,再组织
I. 先描述
II. 再组织
c. 创建信道和子进程
d. 通过channel控制子进程
e. 回收管道和子进程
问题1:
解答1:
问题2:
解答2:
f. 将进程池本身和任务文件本身进行解耦
3)完整代码
processpool.cc:
Task.hpp:
Makefile:
命令行中的 | ,就是匿名管道
它们的父进程都是bash
2、进程池
1)理解进程池
a. 可以将任务写入管道来给到子进程,从而可以提前创建子进程想让哪个子进程完成任务,我就让写入到哪个子进程相对的管道中
b. 管道里面没有数据,worker进程就在阻塞等待,等待务的到来!!
c. master向哪一个管道进行写入,就是唤醒哪一个子进程来处理任务
d. 均衡的向后端子进程划分任务,称之为负载均衡父进程要进行后端任务的负载均衡
父进程直接向管道里写入固定长度的四字节(int)数组下标(任务码)
函数指针数组中元素分别指向不同的任务,以便master控制worker完成指定工作
2)进程池的实现
整体框架:
// ./processpool 3
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
}
int num = std::stoi(argv[1]);
LoadTask(); // 加载任务
std::vector<Channel> channels; // 将管道组织起来
// 1.创建信道和子进程
CreateChannelAndSub(num, &channels);
// 2.通过channel控制子进程
CtrlProcess(channels, 10);
// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程
ClearUpChannel(channels);
return 0;
}
a. 加载任务
我这里用的是打印的方式来模拟任务的分配,通过打印从而了解子进程执行任务的情况,通过种下随机数种子,产生随机数,进而随机的向子进程分配任务,work即为子进程需要做的工作
Task.hpp:
#pragma once #include <iostream> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h> #define TaskNum 3 typedef void (*task_t)(); // task_t 函数指针 void Print() { std::cout << "I am print task" << std::endl; } void DownLoad() { std::cout << "I am a download task" << std::endl; } void Flush() { std::cout << "I am a flush task" << std::endl; } task_t tasks[TaskNum]; void LoadTask() { srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子 tasks[0] = Print; tasks[1] = DownLoad; tasks[2] = Flush; } void ExcuteTask(int number) { if(number < 0 || number > 2) return; tasks[number](); } int SelectTask() { return rand() % TaskNum; } void work(int rfd) { while (true) { int command = 0; int n = read(rfd, &command, sizeof(command)); if (n == sizeof(int)) { std::cout << "pid is: " << getpid() << " handler task" << std::endl; ExcuteTask(command); } else if (n == 0) { std::cout << "sub process: " << getpid() << " quit" << std::endl; break; } } }
// 命令行规范 --> ./processpool 3
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
}
int num = std::stoi(argv[1]);
LoadTask(); // 加载任务
return 0;
}
b. 先描述,再组织
I. 先描述
需要控制的信道(即发送端wfd)数量多且繁琐,需要管理起来从而方便控制给子进程发送任务
class Channel { private: int _wfd; int _subprocessid; std::string _name; };
在信道中,我们需要知道发送的文件描述符wfd,还有知道子进程的pid,以及信道的命名(用来区分信道)
II. 再组织
std::vector<Channel> channels;
我们通过用一个vector数组将所有的Channel存储起来,从而实现对它们的增删查改,以方便管理
c. 创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{
for (int i = 0; i < num; ++i)
{
// 1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0) exit(1); // 创建管道失败
// 2.创建子进程
pid_t id = fork();
if (id == 0)
{
// Child
close(pipefd[1]);
work();
close(pipefd[0]);
exit(0);
}
// 3.构建一个名字
std::string channel_name = "Channel-" + std::to_string(i);
// Father
close(pipefd[0]);
// 拿到了 a.子进程的pid b.父进程需要的管道的w端
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
用命令行参数的方式传入得到的argv[1]即为输入命令需要的子进程和管道个数
通过for循环,创建 num个 pipe管道以及子进程,当创建完子进程时,需要关闭掉不需要的文件描述符(即wfd -- pipefd[1])(当然,父进程也需要关闭不需要的fd -- rfd),在执行完work(子进程的工作)之后,关闭掉rfd(即工作完成了,关闭其管道),然后exit(0)退出进程,等待父进程回收
d. 通过channel控制子进程
// 轮询方案 -- 负载均衡
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
// 发送相应的任务码到对应管道内
void SendTaskCommand(Channel &channel, int taskCommand)
{
write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{
sleep(1);
// a. 选择一个任务
int taskcommand = SelectTask();
// b. 选择一个信道和进程
int channel_index = NextChannel(channels.size());
// c. 发送任务
SendTaskCommand(channels[channel_index], taskcommand);
std::cout << "=================================" << std::endl;
std::cout << "taskcommand: " << taskcommand << " channel: "
<< channels[channel_index].GetName() << " sub process: "
<< channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{
if (times > 0)
{
while (times--)
{
CtrlProcessOnce(channels);
}
}
else
{
while (true)
{
CtrlProcessOnce(channels);
}
}
}
向其发送任务之前,我们需要先选择一个任务,通过随机种子随机数的方式,随机选择我们的一个任务,拿到其任务码(即指针数组下标),然后选择相应的信道和进程(信道和进程一体的),从而向管道发送任务码给子进程
e. 回收管道和子进程
void ClearUpChannel(std::vector<Channel> &channels)
{
for (auto &channel : channels)
{
channel.CloseChannel();
}
for (auto &channel : channels)
{
channel.Wait();
}
}
我们先将所有的信道关闭,然后在逐个将子进程等待回收
问题1:
那为什么不能边关闭信道边回收呢??
即
解答1:
在我们创建子进程的过程中,由于父子进程之间的继承,从而导致子进程会拥有父进程的文件描述符内容(即指向同一地方),如果我们边关闭边回收的话,如上图所示,当我们关闭父进程的第一个管道的wfd时,这时候第一个管道的读端的引用计数并未清0,因为子进程2它继承了父进程指向第一个管道的wfd(读端),从而使得读端阻塞,进程不退出,然后wait子进程的时候就会阻塞
在work结束后,才会到下一步close和exit退出子进程;
work结束需要的条件是 n == 0,即读端返回值为0,即
因此上述那种边关闭信道,边wait子进程的方法会阻塞
问题2:
为什么这种方法又能成功回收呢??
解答2:
因为当我们将所有信道关闭时,关闭到最后一个子进程对应的管道的wfd的时候,该管道的读端的引用计数就会为0,从而读端读到0,该子进程退出,随子进程退出就会使得该子进程指向的前面管道的读端回收,就不会造成前面那种情况
f. 将进程池本身和任务文件本身进行解耦
用回调函数可以很好的改善代码的耦合性
通过文件描述符重定向 dup2,将标准输入(文件描述符 0)重定向到 rfd 所代表的文件,然后再回调task()函数
// 重定向
这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
(不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
--- 将管道的逻辑和执行任务的逻辑进一步进行解耦
// task_t task : 回调函数
有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
--- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
3)完整代码
processpool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string name)
: _wfd(wfd), _subprocessid(id), _name(name)
{
}
int GetWfd()
{
return _wfd;
}
pid_t GetProcessId()
{
return _subprocessid;
}
std::string GetName()
{
return _name;
}
void CloseChannel()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subprocessid, nullptr, 0);
if (rid > 0)
{
std::cout << "wait " << rid << " success" << std::endl;
}
}
~Channel()
{
}
private:
int _wfd;
int _subprocessid;
std::string _name;
};
// 形参和命名规范
// const & : 输入型参数
// & : 输入输出型参数
// * : 输出型参数
// task_t task : 回调函数
// 有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
// --- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
for (int i = 0; i < num; ++i)
{
// 1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1); // 创建管道失败
// 2.创建子进程
pid_t id = fork();
if (id == 0)
{
// Child
close(pipefd[1]);
dup2(pipefd[0], 0);
task();
close(pipefd[0]);
exit(0);
}
// 3.构建一个名字
std::string channel_name = "Channel-" + std::to_string(i);
// Father
close(pipefd[0]);
// 拿到了 a.子进程的pid b.父进程需要的管道的w端
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
int NextChannel(int channelnum) // 轮询方案 -- 负载均衡
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void SendTaskCommand(Channel &channel, int taskCommand)
{
write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{
sleep(1);
// a. 选择一个任务
int taskcommand = SelectTask();
// b. 选择一个信道和进程
int channel_index = NextChannel(channels.size());
// c. 发送任务
SendTaskCommand(channels[channel_index], taskcommand);
std::cout << "=================================" << std::endl;
std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName()
<< " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{
if (times > 0)
{
while (times--)
{
CtrlProcessOnce(channels);
}
}
else
{
while (true)
{
CtrlProcessOnce(channels);
}
}
}
void ClearUpChannel(std::vector<Channel> &channels)
{
// for (auto &channel : channels)
// {
// channel.CloseChannel();
// channel.Wait();
// }
// int num = channels.size() -1;
// while(num >= 0)
// {
// channels[num].CloseChannel();
// channels[num--].Wait();
// }
for (auto &channel : channels)
{
channel.CloseChannel();
}
for (auto &channel : channels)
{
channel.Wait();
}
}
// ./processpool 3
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
}
int num = std::stoi(argv[1]);
LoadTask(); // 加载任务
std::vector<Channel> channels; // 将管道组织起来
// 1.创建信道和子进程
CreateChannelAndSub(num, &channels, work);
// 2.通过channel控制子进程
CtrlProcess(channels, 10);
// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程
ClearUpChannel(channels);
// // for test
// for(auto &channel : channels)
// {
// std::cout << "====================" << std::endl;
// std::cout << channel.GetName() << std::endl;
// std::cout << channel.GetWfd() << std::endl;
// std::cout << channel.GetProcessId() << std::endl;
// }
// sleep(100);
return 0;
}
Task.hpp:
#pragma once
#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#define TaskNum 3
typedef void (*task_t)(); // task_t 函数指针
void Print()
{
std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
std::cout << "I am a download task" << std::endl;
}
void Flush()
{
std::cout << "I am a flush task" << std::endl;
}
task_t tasks[TaskNum];
void LoadTask()
{
srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子
tasks[0] = Print;
tasks[1] = DownLoad;
tasks[2] = Flush;
}
void ExcuteTask(int number)
{
if(number < 0 || number > 2) return;
tasks[number]();
}
int SelectTask()
{
return rand() % TaskNum;
}
// void work(int rfd)
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int))
// {
// std::cout << "pid is: " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process: " << getpid() << " quit" << std::endl;
// break;
// }
// }
// }
// 这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
// (不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
// 将管道的逻辑和执行任务的逻辑进一步进行解耦
void work()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is: " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
std::cout << "sub process: " << getpid() << " quit" << std::endl;
break;
}
}
}
Makefile:
processpool:processpool.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f processpool