经过了管道的介绍之后,我们可以实现了进程间通信,现在我就来简单介
绍一下管道的应用场景——进程池。
1. 引入
在我们的编码过程中,不乏会听到,内存池,进程池,空间配置器等等名词,这些是用来干嘛的呢?
我们在自己写一个顺序表等容器的时候,我们的容器的容量的扩容不是需要一个我们就开一个,而是以整数倍,开辟内存。这样做的好处是,我们在使用的顺序表的时候可以一定程度上减少扩容的消耗(数据迁移,函数调用)。提高我们代码的效率。这样我们的资源就像一个池子一样,用之即取。这也是池化技术的思想。而我们的进程池也是一样,有的时候我们的主进程再进行一些任务的时候,需要创建一个新进程来执行这些任务,但是如果是在执行任务的时候在创建进程的话,会降低代码的整体效率,所以对于这样的场景,我们就可以提前创建好所需要的进程来供主进程使用。这也是利用了池化的思想。
所以池化的思想也就是指预先分配一定数量的资源,然后在需要时动态地从资源池中获取资源,使用完毕后再将资源归还到资源池中,以提高资源利用率和系统性能。
而我们进程池要实现首先就需要实现进程间通信的功能。如果没有这一点功能,我们的进程如何分配给子进程任务。
2. 进程池的创建
a. 信道以及子进程的创建
所以我们现在来编写一首简单的进程池:
上面也说了,要实现进程池首先就得有进程间通信的功能,所以我们这里选择使用匿名管道,并且假设我们需要五个子进程来执行任务:
首先我们需要能够创建出信道和子进程:
#include <iostream>
#include <cstring>
#include <unistd.h>
using namespace std;
int main()
{
// 创建与子进程通信的管道(信道)
int pipefd[2];
int piperet = pipe(pipefd);
if(piperet < 0)
{
cout << "erron: " << errno << " error: " << strerror(errno) << endl;
return 1;
}
// 创建子进程,建立信道
pid_t id = fork();
if(id < 0)
{
cout << "erron: " << errno << " error: " << strerror(errno) << endl;
return 1;
}
else if(id == 0)
{
// 子进程
close(pipefd[1]); // 关闭写端
exit(0);
}
// 父进程
close(pipefd[0]); // 关闭读端
return 0;
}
在此基础上我们添加个循环不就可以,创建多个子进程了吗:
但是现在就有一个问题,我们所使用的pipefd数组在循环中,出了作用域它就不存在了,那么我们还怎么给子进程分配任务呢?其实我们使用一个数组存储相关数据就可以了,那存储什么呢?只存一个文件描述符可以吗?我们现在是一个主进程,当我们子进程们分配任务时,我们需不需要知道它是哪个进程,fd是让操作系统来认识的,我们是程序员,我们可以对子进程也就是信道命名啊,这样的特定的进程做特定的事不久也可以实现了嘛。既然它是子进程的话,那是不是应该再纪录它的pid啊。所以我们的存储对象应该是一个结构体,它其中有父进程写端对应的fd,信道名,子进程的pid:
插入一个数组中,供父进程方便分配。
b. 执行任务
现在我们可以,让子进程执行任务了,我们的假设是通过某种映射关系,能够让子进程知道进行哪些任务,而这种映射关系的key是固定四字节大小的,执行任务也是一直需要循环的:
c. 任务的构建
现在我们有了任务的执行,我们可以创建几个形式上的任务:
我们说了会以映射的方式来让子进程明确子进程执行的是哪个任务,在这里我们要让一个函数和数字有了某种映射关系,在这里我们使用这种方式:
这样我们就建立起了任务与整数的关系。
还需要一个接口,并且我们设计分配任务目前是随机的,所以还需要一个随机数种子:
d. 任务的选择与信道的选择
这里我们任务的选择使用随机数选择,信道的选择使用循环的方式来选择。实际场景中任务的选择和信道的选择肯定是根据每个进程的任务完成状态来实行任务的分配的,这里我们就简单一点:
然后稍稍优化一下:
我们就可以看到这样的效果:
e. 子进程的回收
我们知道当对于管道通信而言,写端关闭,那么读端的read函数会一直返回0。利用这一点,我们可以实现对子进程的回收:
我们这时候对代码改造一下,让它执行到一定次数后结束分配任务:
然后我们,执行代码测试一下:
我们发现它卡住了,我们并没有使用休眠函数,其实这是有原因的,我们再来看看这么一段回收子进程的代码:
再来执行一次:
回收成功了,我来解释一下其中的原因:
我们的主进程创建好第一个子进程的时候应该是这样:
这时候,我们再创建第二个进程:
可以看到第二个子进程的文件描述符表中有一个指向了第一个信道。而我们回收进程的时候,我们第一个回收的是第一个进程,我们关闭了主进程的4号文件,但是仍有文件指针指向信道一号以写的方式,所以这个时候我们,这时候要回收第一个子进程,第一个子进程就没有退出,所以会一直阻塞在回收第一个子进程的代码上,所以会卡了。而处理这个的办法也不难:
第一中方式就是,先关闭所有写端,那所有进程都会退出,这个时候我们再回收进程:
还有一种方式就是,倒着回收进程,它不能回收的原因不就是后续创建进程时,后面的进程中的文件描述符表中会指向前面的信道嘛:
还有一种方式就是在创建进程的时候,我们可以记录下主进程的写端fd,然后再后续创建子进程的时候给他关闭就可以了,这样我们就可以直接回收子进程了:
以上就是一个简略版的进程池,我为了阐述方便没有过度的进行封装,有兴趣的小伙伴可以自行封装一手。全部代码如下:
#include <iostream>
#include <functional>
#include <ctime>
#include <cassert>
#include <string>
#include <vector>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
class channal
{
public:
channal(int fd, pid_t id, int num)
: _fd(fd), _id(id), _name("channal-" + to_string(num))
{
}
int _fd;
pid_t _id;
string _name;
};
void download()
{
cout << "我是一个下载任务,我的pid是:" << getpid() << endl;
}
void printlog()
{
cout << "我是一个打印日志任务,我的pid是:" << getpid() << endl;
}
void compressedfile()
{
cout << "我是一个压缩文件任务,我的pid是:" << getpid() << endl;
}
using task_t = function<void()>;
class task
{
public:
task()
{
_messions.push_back(download);
_messions.push_back(printlog);
_messions.push_back(compressedfile);
srand((unsigned)time(nullptr));
}
task_t gettask(int num)
{
assert(num >= 0 && num < _messions.size());
return _messions[num];
}
int gettasksize()
{
return _messions.size();
}
string gettaskname(int num)
{
assert(num >= 0 && num < _messions.size());
switch (num)
{
case _download:
return "_download";
break;
case _printlog:
return "_printlog";
break;
case _compressedfile:
return "_compressedfile";
break;
default:
return "";
break;
}
}
private:
static const int _download = 0;
static const int _printlog = 1;
static const int _compressedfile = 2;
vector<task_t> _messions;
};
task task_list;
int main()
{
vector<channal> channals;
vector<int> tmp;
for (int i = 0; i < 5; i++)
{
// 创建与子进程通信的管道(信道)
int pipefd[2];
int piperet = pipe(pipefd);
if (piperet < 0)
{
cout << "erron: " << errno << " error: " << strerror(errno) << endl;
return 1;
}
// 创建子进程,建立信道
pid_t id = fork();
if (id < 0)
{
cout << "erron: " << errno << " error: " << strerror(errno) << endl;
return 1;
}
else if (id == 0)
{
// 子进程
if (!tmp.empty())
{
for (int i = 0; i < tmp.size(); i++)
{
close(tmp[i]);
}
}
close(pipefd[1]); // 关闭写端
while (true)
{
int mession = 0;
ssize_t n = read(pipefd[0], &mession, sizeof(mession));
if (n < 0)
{
cout << "erron: " << errno << " error: " << strerror(errno) << endl;
exit(1);
}
else if (n != 4)
break;
else
{
// 执行任务
task_list.gettask(mession)();
}
}
exit(0);
}
// 父进程
close(pipefd[0]); // 关闭读端
channals.push_back({pipefd[1], id, i});
tmp.push_back(pipefd[1]);
}
// 父进程就可以开始给子进程分配任务了
int pos = 0;
int x = 0;
while (true)
{
pos %= channals.size();
int mession = rand() % task_list.gettasksize();
ssize_t n = write(channals[pos]._fd, &mession, sizeof(mession));
cout << "分配给信道:" << channals[pos]._name << "任务:" << task_list.gettaskname(mession) << "它的pid是:" << channals[pos]._id << endl;
if (n < 0)
{
cout << "erron: " << errno << " error: " << strerror(errno) << endl;
return 1;
}
sleep(1);
pos++;
x++;
if (x == 10)
{
break;
}
}
// // 回收子进程
// for (int i = 0; i < channals.size(); i++)
// {
// close(channals[i]._fd);
// }
// for (int i = 0; i < channals.size(); i++)
// {
// pid_t retpid = waitpid(channals[i]._id, nullptr, 0);
// if (retpid == channals[i]._id)
// {
// cout << "回收子进程成功:" << retpid << endl;
// }
// }
// for (int i = channals.size() - 1; i >= 0; i--)
// {
// close(channals[i]._fd);
// pid_t retpid = waitpid(channals[i]._id, nullptr, 0);
// if (retpid == channals[i]._id)
// {
// cout << "回收子进程成功:" << retpid << endl;
// }
// }
for (int i = 0; i < channals.size(); i++)
{
close(channals[i]._fd);
pid_t retpid = waitpid(channals[i]._id, nullptr, 0);
if (retpid == channals[i]._id)
{
cout << "回收子进程成功:" << retpid << endl;
}
}
return 0;
}