Linux从0到1——进程池
- 1. 进程池的概念
- 2. 进程池实现思路
- 3. 进程池的代码实现
- 3.1 创建管道,创建子进程
- 3.2 封装任务
- 3.3 Work接口
- 3.4 发送任务
- 3.5 回收资源,关闭管道(重点)
- 3.6 改造CreatChannels接口
- 4. 完整代码
1. 进程池的概念
1. 池化技术:
- 在古代,有一种建筑叫粮仓,用来储存粮食。人们一般会先把大量的粮食囤积在粮仓中,等到需要用粮食时,再从粮仓中一点一点的拿。相当于是先把粮食提前准备好,你需要的时候直接去拿就行了,不然的话每次用粮食前还要先去田里收割粮食。
2. 内存池:
- 计算机中有很多的设计理念,都用到了池化技术。如内存池,当一个进程需要申请空间时,OS会一次性开辟好大量的空间,进程直接去使用这些空间即可。而不是每一次进程申请空间时,操作系统都要做开辟空间的动作,这样太低效了。
3. 进程池:
- 如果我们每启动一个任务,就创建一个进程来完成这个任务,这样未免有些低效。我们可以将多个进程一次性开辟好,等任务来的时候,直接丢给开辟好的进程即可,省去了多次创建进程的时间开销。
2. 进程池实现思路
-
一次性开辟多个管道和多个子进程,让每一个管道和子进程一一对应。
-
父进程每次向管道中送入4字节
int
类型的数据,表示需要执行任务的任务码code
,一旦管道中被放入了数据,意味着这个管道对应的子进程被激活,开始执行任务码对应的任务。
3. 进程池的代码实现
3.1 创建管道,创建子进程
1. 描述管道:
- 为了管理我们创建的管道,需要为其创建对应的结构体来描述它,然后用一定的数据结构组织起来。
const int num = 5; // 最大管道数
int number = 0; // 管道编号
class channel
{
public:
channel(int fd, pid_t id)
: ctrlfd(fd), workerid(id)
{
name = "channel-" + std::to_string(++number);
}
public:
int ctrlfd; // 管道写端
pid_t workerid; // 对应的子进程pid
std::string name; // 管道名
};
- 结构体中,需要记录管道的写端,以便将来父进程关闭。
- 还需要记录对应的子进程
pid
,以便子进程退出后,wait
处理子进程僵尸状态。
2. 创建管道,创建进程:
- 介绍一下C++中规范的传参形式:
- 输入型参数:
const &
- 输出型参数:
*
- 输入输出型参数:
&
- 输入型参数:
// 传参形式:
// 1. 输入参数:const &
// 2. 输出参数:*
// 3. 输入输出参数:&
void CreatChannels(std::vector<channel> *c)
{
// bug version
for (int i = 0; i < num; i++)
{
// 1. 定义并创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n;
// 2. 创建进程
pid_t id = fork();
assert(id != -1);
// 3. 构建单向通信信道
if (id == 0)
{
// child
close(pipefd[1]); // 关闭写端
// TODO
dup2(pipefd[0], 0); // 重定向
Work();
exit(0); // 会自动关闭自己打开的所有fd
}
// father
close(pipefd[0]); // 关闭读端
c->push_back(channel(pipefd[1], id));
}
}
int main()
{
std::vector<channel> channels;
// 创建信道,创建进程
CreatChannels(&channels);
// ...
return 0;
}
Work
函数为将来子进程执行任务时要完成的模块。重定向操作执行后,每一个子进程中,fd
为0的位置就指向读端。- 每建立一条管道,就将它放入数组中。这样,对管道的管理就变为了对数组的管理。
上面这种创建管道和进程的方式有一个很深层次的
bug
,我们到后面说。
3.2 封装任务
Task.hpp头文件:
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
typedef std::function<void()> task_t;
// 三个任务
void Download()
{
std::cout << "我是一个下载任务" << " 处理者: " << getpid() << std::endl;
}
void PrintLog()
{
std::cout << "我是一个打印日志的任务" << " 处理者: " << getpid() << std::endl;
}
void PushVideoStream()
{
std::cout << "我是一个推送视频流的任务" << " 处理者: " << getpid() << std::endl;
}
class Init
{
public:
// 任务码
const static int g_download_code = 0;
const static int g_printlog_code = 1;
const static int g_push_videostream_code = 2;
// 任务集合
std::vector<task_t> tasks;
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(PrintLog);
tasks.push_back(PushVideoStream);
srand(time(nullptr) ^ getpid()); // ^getpid(),让数据更有随机性
}
// 检查任务码是否安全
bool CheckSafe(int code)
{
if (code >= 0 && code < tasks.size()) return true;
else return false;
}
// 执行任务
void RunTask(int code)
{
tasks[code]();
}
// 选择任务
int SelectTask()
{
// 返回随机任务码
return rand() % tasks.size();
}
// 返回任务码对应的任务
std::string ToDesc(const int &code)
{
switch(code)
{
case g_download_code:
return "Download";
case g_printlog_code:
return "PrintLog";
case g_push_videostream_code:
return "PushVideoStream";
default:
return "Unknow";
}
return "";
}
};
Init init; // 定义对象
- 定义了三条任务,即这三条任务对应的任务码。
- 定义了一个
Init
类来封装这些任务,以及提供对应的接口。 - 这里我们采取随机选择任务的设计,实际应用中会有所不同。三个任务也只是象征性的模拟一下,并不是具体的下载,打印日志,推送视频流任务。
3.3 Work接口
不断从管道中读取任务码,执行对应的任务。直到写端关闭。
void Work()
{
while (true)
{
int code = 0; // 任务码
ssize_t n = read(0, &code, sizeof(code));
if (n == sizeof(code))
{
if (!init.CheckSafe(code))
continue;
init.RunTask(code);
}
else if (n == 0)
{
// n == 0 写端退出
break;
}
else
{
// do nothing
}
}
std::cout << "child quit" << std::endl;
}
3.4 发送任务
- 设计了标签
g_always_loop
来控制是否一直发送任务。true
为一直发送,false
为不是一直发送。传false
要配合参数num
,标明发送次数。 - 轮巡式的向管道中发送信息。
const bool g_always_loop = true; // 是否一直执行
void SendCommand(const std::vector<channel> &channels, bool flag, int num = -1)
{
int pos = 0;
while (true)
{
// 1. 选择任务
int command = init.SelectTask();
// 2. 选择信道(进程)
const auto &c = channels[pos++];
pos %= channels.size();
// debug
std::cout << "send command " << init.ToDesc(command) << "[" << command << "]"
<< " in" << c.name << "worker is: " << c.workerid << std::endl;
// 3. 发送任务
write(c.ctrlfd, &command, sizeof(command));
// 4. 判断是否要退出
if (!flag)
{
num--;
if (num <= 0) break;
}
sleep(1);
}
std::cout << "SendCommand done..." << std::endl;
}
int main()
{
std::vector<channel> channels;
// 创建信道,创建进程
CreatChannels(&channels);
// 开始发送任务
SendCommand(channels, !g_always_loop, 10);
// ...
return 0;
}
3.5 回收资源,关闭管道(重点)
1. 版本1:
- 这种版本为,先将写端全部关闭,再回收子进程,自动关闭读端,循环两次。该版本不会出
bug
。 - 原理是,写端关闭后,
Work
模块中read
会读到0,子进程就会执行到exit
退出。
void ReleaseChannels(const std::vector<channel> &channels)
{
// version1
for (const auto &c : channels)
{
close(c.ctrlfd);
}
for (const auto &c : channels)
{
pid_t rid = waitpid(c.workerid, nullptr, 0);
if (rid == c.workerid)
{
std::cout << "wait child: " << c.workerid << " success" << std::endl;
}
}
}
int main()
{
std::vector<channel> channels;
// 创建信道,创建进程
CreatChannels(&channels);
// 开始发送任务
SendCommand(channels, !g_always_loop, 10);
// 回收资源,想让子进程退出,并且释放管道资源,只要关闭写端即可(写端关闭后,子进程自动退出)
ReleaseChannels(channels);
return 0;
}
2. 会出现bug的版本:
- 关闭一个写端,就关闭对应的读端,循环一次。
- 这种版本会出现
bug
,现象就是会在发送任务完成时卡死。
void ReleaseChannels(const std::vector<channel> &channels)
{
// bug version
for (const auto &c : channels)
{
close(c.ctrlfd);
pid_t rid = waitpid(c.workerid, nullptr, 0);
if (rid == c.workerid)
{
std::cout << "wait child: " << c.workerid << " success" << std::endl;
}
}
}
3. 分析bug出现的原因:
- 这个bug要追溯到创建管道的模块。首先,如果以我们之前的方式创建管道和进程,那么在创建第二个管道时,子进程拷贝父进程
struct files_struct
结构体,将上一个管道的写端也拷贝下来了。此时,第一个管道就会有两个写端。 - 以此类推,在创建第三个管道时,子进程又将父进程的
struct files_struct
结构体继承下来了。第一个管道就会有三个写端,第二个管道会有两个写端。 - 此时如果从第一个管道开始关闭管道,回收子进程,就会出现,第一个管道只关闭了一个写端,
read
无法读取到0,子进程在read
处阻塞的情况。 - 版本1之所以不会出现
bug
,是因为,最后一个管道只有一个写端,将父进程对应的写端一次性全部关闭后,最后一个进程就没有写端了,最后一个子进程就会退出。最后一个子进程退出了,倒数第二个管道也就没有写端了,倒数第二个子进程也要跟着退出。从后向前,管道和子进程资源依次释放。
2. 版本2:
- 根据上面分析的
bug
出现原因,我们只需要从后向前关闭管道即可。
void ReleaseChannels(const std::vector<channel> &channels)
{
// version2
int num = channels.size() - 1;
for (; num >= 0; num--)
{
close(channels[num].ctrlfd);
pid_t rid = waitpid(channels[num].workerid, nullptr, 0);
if (rid == channels[num].workerid)
{
std::cout << "wait child: " << channels[num].workerid << " success" << std::endl;
}
}
}
3.6 改造CreatChannels接口
通过3.5的讲解,我们是可以从后向前关闭管道,来解决卡死的问题。但是,这毕竟不是问题的本质,我们期望的管道是单向通信的,也就是只有一个写端和一个读端,出现一个管道多个写端的情况,不是我们想看到的。
所以,我们可以在创建管道时,提前记录一下子进程都从父进程继承下来了哪些写端,然后让子进程将这些写端关闭。
void CreatChannels(std::vector<channel> *c)
{
std::vector<int> old;
for (int i = 0; i < num; i++)
{
// 1. 定义并创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n;
// 2. 创建进程
pid_t id = fork();
assert(id != -1);
// 3. 构建单向通信信道
if (id == 0)
{
// child
// 关闭从父进程继承下来的写端
if (!old.empty())
{
for (auto fd : old)
{
close(fd);
}
// debug
PrintFd(old);
}
close(pipefd[1]); // 关闭写端
// TODO
dup2(pipefd[0], 0); // 重定向
Work();
exit(0); // 会自动关闭自己打开的所有fd
}
// father
close(pipefd[0]); // 关闭读端
c->push_back(channel(pipefd[1], id));
old.push_back(pipefd[1]);
}
}
- 这样一来,问题就解决了,回收资源时使用3.5中的
bug
版本也没有问题。可以说,那个bug
版本才是逻辑最恰当的版本。
4. 完整代码
1. Task.hpp头文件:
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
typedef std::function<void()> task_t;
void Download()
{
std::cout << "我是一个下载任务" << " 处理者: " << getpid() << std::endl;
}
void PrintLog()
{
std::cout << "我是一个打印日志的任务" << " 处理者: " << getpid() << std::endl;
}
void PushVideoStream()
{
std::cout << "我是一个推送视频流的任务" << " 处理者: " << getpid() << std::endl;
}
class Init
{
public:
// 任务码
const static int g_download_code = 0;
const static int g_printlog_code = 1;
const static int g_push_videostream_code = 2;
// 任务集合
std::vector<task_t> tasks;
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(PrintLog);
tasks.push_back(PushVideoStream);
srand(time(nullptr) ^ getpid()); // ^getpid(),让数据更有随机性
}
// 检查任务码是否安全
bool CheckSafe(int code)
{
if (code >= 0 && code < tasks.size()) return true;
else return false;
}
// 执行任务
void RunTask(int code)
{
tasks[code]();
}
int SelectTask()
{
// 返回随机任务码
return rand() % tasks.size();
}
// 返回任务码对应的具体任务
std::string ToDesc(const int &code)
{
switch(code)
{
case g_download_code:
return "Download";
case g_printlog_code:
return "PrintLog";
case g_push_videostream_code:
return "PushVideoStream";
default:
return "Unknow";
}
return "";
}
};
Init init; // 定义对象
2. ProcessPool.cc文件:
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"
const int num = 5; // 最大管道数
int number = 0; // 管道编号
class channel
{
public:
channel(int fd, pid_t id)
: ctrlfd(fd), workerid(id)
{
name = "channel-" + std::to_string(++number);
}
public:
int ctrlfd; // 管道写端
pid_t workerid; // 对应的子进程pid
std::string name; // 管道名
};
void Work()
{
while (true)
{
int code = 0; // 任务码
ssize_t n = read(0, &code, sizeof(code));
if (n == sizeof(code))
{
if (!init.CheckSafe(code))
continue;
init.RunTask(code);
}
else if (n == 0)
{
// n == 0 写端退出
break;
}
else
{
// do nothing
}
}
std::cout << "child quit" << std::endl;
}
// 打印都关闭了哪些从父进程继承下来的写端
void PrintFd(const std::vector<int> &fds)
{
std::cout << getpid() << " close: ";
for (auto fd : fds)
{
std::cout << fd << " ";
}
std::cout << std::endl;
}
// 传参形式:
// 1. 输入参数:const &
// 2. 输出参数:*
// 3. 输入输出参数:&
void CreatChannels(std::vector<channel> *c)
{
std::vector<int> old;
for (int i = 0; i < num; i++)
{
// 1. 定义并创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n;
// 2. 创建进程
pid_t id = fork();
assert(id != -1);
// 3. 构建单向通信信道
if (id == 0)
{
// child
// 关闭从父进程继承下来的写端
if (!old.empty())
{
for (auto fd : old)
{
close(fd);
}
// debug
PrintFd(old);
}
close(pipefd[1]); // 关闭写端
// TODO
dup2(pipefd[0], 0); // 重定向
Work();
exit(0); // 会自动关闭自己打开的所有fd
}
// father
close(pipefd[0]); // 关闭读端
c->push_back(channel(pipefd[1], id));
old.push_back(pipefd[1]);
}
// bug version
// for (int i = 0; i < num; i++)
// {
// // 1. 定义并创建管道
// int pipefd[2];
// int n = pipe(pipefd);
// assert(n == 0);
// (void)n;
// // 2. 创建进程
// pid_t id = fork();
// assert(id != -1);
// // 3. 构建单向通信信道
// if (id == 0)
// {
// // child
// close(pipefd[1]); // 关闭写端
// // TODO
// dup2(pipefd[0], 0); // 重定向
// Work();
// exit(0); // 会自动关闭自己打开的所有fd
// }
// // father
// close(pipefd[0]); // 关闭读端
// c->push_back(channel(pipefd[1], id));
// }
}
void PrintfDebug(const std::vector<channel> &c)
{
for (const auto &channel : c)
{
std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl;
}
}
const bool g_always_loop = true; // 是否一直执行
void SendCommand(const std::vector<channel> &channels, bool flag, int num = -1)
{
int pos = 0;
while (true)
{
// 1. 选择任务
int command = init.SelectTask();
// 2. 选择信道(进程)
const auto &c = channels[pos++];
pos %= channels.size();
// debug
std::cout << "send command " << init.ToDesc(command) << "[" << command << "]"
<< " in" << c.name << "worker is: " << c.workerid << std::endl;
// 3. 发送任务
write(c.ctrlfd, &command, sizeof(command));
// 4. 判断是否要退出
if (!flag)
{
num--;
if (num <= 0) break;
}
sleep(1);
}
std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannels(const std::vector<channel> &channels)
{
// version2
// int num = channels.size() - 1;
// for (; num >= 0; num--)
// {
// close(channels[num].ctrlfd);
// pid_t rid = waitpid(channels[num].workerid, nullptr, 0);
// if (rid == channels[num].workerid)
// {
// std::cout << "wait child: " << channels[num].workerid << " success" << std::endl;
// }
// }
// version1
// for (const auto &c : channels)
// {
// close(c.ctrlfd);
// }
// for (const auto &c : channels)
// {
// pid_t rid = waitpid(c.workerid, nullptr, 0);
// if (rid == c.workerid)
// {
// std::cout << "wait child: " << c.workerid << " success" << std::endl;
// }
// }
// bug version
for (const auto &c : channels)
{
close(c.ctrlfd);
pid_t rid = waitpid(c.workerid, nullptr, 0);
if (rid == c.workerid)
{
std::cout << "wait child: " << c.workerid << " success" << std::endl;
}
}
}
int main()
{
std::vector<channel> channels;
// 创建信道,创建进程
CreatChannels(&channels);
// 开始发送任务
SendCommand(channels, !g_always_loop, 10);
// PrintfDebug(channels);
// sleep(10);
// 回收资源,想让子进程退出,并且释放管道资源,只要关闭写端即可(写端关闭后,子进程自动退出)
ReleaseChannels(channels);
return 0;
}