目录
前言
1. 进程池
1.1 基本结构:
1.2. 池化技术
1.3. 思路分析
1.4. 代码实现
总结
前言
上篇文章介绍了管道及其使用,本文在管道的基础上,通过匿名管道来实现一个进程池;
1. 进程池
父进程创建一组子进程,子进程根据父进程的发送的信号,来做出相应的操作;
1.1 基本结构:
master为父进程,父进程通过管道向子进程发送对应的信号,让子进程执行相关的操作;
1.2. 池化技术
为什么要有进程池?
要解答这个问题,需要先了解池化技术;池化技术是一种常见的优化方法,可用于提高计算和存储资源的利用率,从而提高系统性能。通过分类和管理资源或任务的池,可以实现资源的高效共享和复用;
举个最简单的例子:我们在写vector时,它有一个扩容操作,我们在实现时一般是2倍扩容,为什么要多扩容?——为了防止频繁的申请空间;池化技术也是类似的优化方法,通过一次性申请一定数量的资源,然后自己管理这些资源的分配和回收,从而减少频繁向操作系统申请资源的次数在操作系统中,申请空间、创建进程等操作都需要一定的时间开销。因此,频繁地进行这些操作会降低系统的效率;
进程池会提前创建一定数量的进程并保存在进程池中,当需要使用新的进程时,可以直接从进程池中获取已经存在的空闲进程来执行任务,而不需要每次都创建新的进程,从而减少了创建和销毁进程的开销;
注意:
在实现上,把任务分配给不同的信道一定要平均,不能是有的信道很忙,有的信道很闲,这样也无法提高效率;
在此之前,为了便于理解,这里再次回顾一下管道的特点,及几种不同的情况:
a. 管道的4种情况
- 正常情况,如果管道没有数据了,读端必须等待,直到有数据为止(写端写入数据了)
- 正常情况,如果管道被写满了,写端必须等待,直到有空间为止(读端读走数据)
- 写端关闭,读端一直读取, 读端会读到read返回值为0, 表示读到文件结尾
- 读端关闭,写端一直写入,OS会直接杀掉写端进程,通过想目标进程发送SIGPIPE(13)信号,终止目标进程
b. 匿名管道的5种特性
- 匿名管道,可以允许具有血缘关系的进程之间进行进程间通信,常用与父子,仅限于此
- 匿名管道,默认给读写端要提供同步机制 --- 了解现象就行
- 面向字节流的 --- 了解现象就行
- 管道的生命周期是随进程的
- 管道是单向通信的,半双工通信的一种特殊情况
1.3. 思路分析
要创建一个管道用于父进程于子进程的通信简单:
问题在于后续管道的创建:
结构图如下:
在创建第二个管道时,父进程新建子进程,子进程继承父进程的属性;然后关闭父进程的读端,子进程的写端,构建单向信道;
问题就出在这里,看上图结构:子进程会继承父进程属性,所以第二个子进程的3号文件描述符也会指向的1号管道的写端(正常情况下是不能指定的);
以此类推,第三个进程也会指向1号管道和2号管道;只有最后一个管道,是只有一个写端指向,其余的管道都有多个写端;
在实际上不会出现子进程向管道写入的情况,但是在关闭管道的时候容易出问题;不注意就会导致程序阻塞;
正常的关闭信道这样写:
for (const auto& e : c)
{
close(e.ctrlfd);
waitpid(e.workerid, nullptr, 0);
}
遍历这个数组,关闭父进程对每个管道的写端;看似很完美,而实际情况是:
除最后一个管道,其余管道依然会有写端指向,而只有当所有写端都关闭后,调用read函数时,才会返回0,表示已经读取到文件末尾;此时表示执行完毕,会进入下一步的回收;
如果存在写端,并且写端一直不写入数据,在这种情况下,read函数不会返回而是一直等待数据到来。读端(子进程)的read函数会一直阻塞,直到有数据写入为止;这也就会导致程序阻塞住;
如何解决?
方法一:
倒着遍历去关闭管道;
倒着去关闭,最后一个管道没有写端,管道正常关闭,回收子进程,子进程被回收,文件描述符也会被释放,其余管道的写端就会减少;依次关闭,就可以保证所有管道正常关闭;
方法二:
借助数据结构去解决,记录父进程中的写端fd,在子进程中依次关闭;这样创建出来的信道之间连接关系是理想的,关系不会那么乱;
父进程中创建一个临时vector,父进程记录每个管道写端fd,把数据存储道临时vector中一份;这样每个子进程拿到的就是当前父进程所有写端的fd,子进程全部关掉即可;
1.4. 代码实现
模拟执行任务,编写一个任务类:
// Task.hpp
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
typedef std::function<void()> task_t;
void Download()
{
std::cout << "我是一个下载任务"
<< " 处理者: " << getpid() << std::endl;
}
void PrintLog()
{
std::cout << "我是一个打印日志的任务"
<< " 处理者: " << getpid() << std::endl;
}
void PushVideoStream()
{
std::cout << "这是一个推送视频流的任务"
<< " 处理者: " << getpid() << std::endl;
}
class Tasklist
{
public:
Tasklist()
{
tasks.push_back(Download);
tasks.push_back(PrintLog);
tasks.push_back(PushVideoStream);
// 模拟随机任务
srand(time(nullptr));
}
// 检查信号是否合法
bool CheckSafe(int code)
{
if (code >= 0 && code < tasks.size())
return true;
else
return false;
}
// 执行任务
void RunTask(int code)
{
return tasks[code]();
}
// 随机选择任务
int SelectTask()
{
return rand() % tasks.size();
}
// 信号转换
std::string ToDesc(int code)
{
switch (code)
{
case g_download_code:
return "Download";
case g_printlog_code:
return "PrintLog";
case g_push_videostream_code:
return "PushVideoStream";
default:
return "Unknow";
}
}
public:
const static int g_download_code = 0;
const static int g_printlog_code = 1;
const static int g_push_videostream_code = 2;
std::vector<task_t> tasks;
};
Tasklist init;//定义对象
进程池:
#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <assert.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"
const int nums = 5;
static int number = 1;
// 将管道描述成一个对象
class channel
{
public:
channel(int fd, pid_t id)
:ctrlfd(fd)
, workerid(id)
{
name = "channel " + std::to_string(number++);
}
public:
int ctrlfd;
pid_t workerid;//进程id
std::string name;
};
// 工作接口
void work()
{
while (true)
{
int code = 0;
// 该接口由子进程执行
// 从标准输入去读(其实就是从管道去读)
// 为了方便读数据,所以将对应管道读端,重定向到标准输入,否则还需记录子进程对应管道的读端
ssize_t n = read(0, &code, sizeof(code));
//assert(n == sizeof(code)); //父进程一旦退出,子进程也会退出,没有数据写入,读到的字节是0,assert强制停止;
// 判断读到的数据是否正常
if (n == sizeof(code))
{
if (!init.CheckSafe(code)) continue;
init.RunTask(code);
}
else if (n == 0)
{
break;
}
else
{
}
}
std::cout << "child quit " << std::endl;
}
void PrintFd(const std::vector<int>& fds)
{
std::cout << getpid() << " close fds: ";
for (auto fd : fds)
{
std::cout << fd << " ";
}
std::cout << std::endl;
}
// 创建管道
void CreateChannel(std::vector<channel>* c)
{
std::vector<int> tmp;
for (int i = 0; i < nums; i++)
{
// 创建信道
int pipefd[2];
int n = pipe(pipefd);//管道建立成功返回0,失败返回错误码
assert(n == 0);
(void)n;
// 创建进程
pid_t id = fork();
assert(id >= 0); // 条件为假返回报错信息
// 构建单向信道
if (id == 0) //child
{
if (!tmp.empty())
{
// 方法二
// 关闭其余写端的fd
for (auto& e : tmp)
{
close(e);
}
PrintFd(tmp);
}
// 这里并没有重复关闭,先创建的管道,再创建子进程,然后再关闭将写端fd加入到tmp
// 这里是关闭子进程对当前管道的写端
close(pipefd[1]);
// 将标准输入重定向到管道读端,管道从标准输入中读数据
dup2(pipefd[0], 0);
// TODO
work();
exit(0);
}
// father
close(pipefd[0]); //关闭父进程读
// 将管道存储起来方便后续管理
c->push_back(channel(pipefd[1], id));
tmp.push_back(pipefd[1]);
}
}
void SendCommand(const std::vector<channel>& c, bool flag, int nums = -1)
{
int pos = 0;
while (true)
{
//1.选择任务
int command = init.SelectTask();
//2.选择信道
const auto& channel = c[pos++];
pos %= c.size();
//debug
std::cout << "send command " << init.ToDesc(command) << "[" << command << "]"
<< " in "
<< channel.name << " worker is : " << channel.workerid << std::endl;
//3.发送任务
write(channel.ctrlfd, &command, sizeof(command));
// 判断任务执行完是否退出
if (!flag)
{
nums--;
if (nums <= 0)
break;
}
sleep(1);
}
std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannel(std::vector<channel>& c)
{
//倒着回收
// int n = c.size() - 1;
// for(; n >= 0; n--)
// {
// close(c[n].ctrlfd);
// waitpid(c[n].workerid, nullptr, 0);
// }
for (const auto& e : c)
{
close(e.ctrlfd);
waitpid(e.workerid, nullptr, 0);
}
}
const bool g_always_loop = true;
int main()
{
std::vector<channel> channels;
//创建进程创建信道
CreateChannel(&channels);
//开始执行任务
SendCommand(channels, !g_always_loop, 10);
//回收资源等待子进程退出
ReleaseChannel(channels);
return 0;
}
结构并不复杂,这里只是一个简单的示例;这个示例比较考验对多进程编程;
1.5. 思考
在进程池体系中,如果一个子进程退出了(一个管道的读端关闭)会怎样?
如果父进程知道子进程已经退出,即通过监控子进程状态并处理子进程退出的情况下,父进程在得知子进程退出后就不会再继续向已经退出的子进程的管道中写入数据也不会因为收到信号而终止
如果父进程没有正确地监控子进程的状态,不知道子进程已经退出,那么当父进程尝试向已经退出的子进程的管道中写入数据时,会收到SIGPIPE信号而终止。在这种情况下,剩余的子进程会成为孤儿进程,由操作系统接管;
因此在设计时也可以进行特殊处理,处理子进程退出,避免父进程被OS杀死的情况;如何去处理?
可以在选择信道那里多一步判断,判断信道对应的子进程是否已经退出,通过信道描述类,找到对应的进程id,通过waitpid的非阻塞等待判断是否退出;
总结
以上便是本文的全部内容,希望对你有所帮助或启发,感谢阅读!