目录
进程池
1、什么是进程池?
2、实现进程池
(1)相关函数:
pipe函数:
write函数
read函数
waitpid函数
(2)代码实现面向过程进程池
Task.hpp
processPool.cc
3、注意事项
进程池
1、什么是进程池?
提前建立一堆进程,而不是从头开始建立进程
例如,我们执行命令的时候,命令本身也是进程,所以需要bash创建进程
要从头开始创建进程PCB、分配空间等
成本很高
如果每一次都要从头开始建立,很浪费时间成本
所以,为了提高效率,提前建立多个子进程等待
在父进程和子进程之间,建立单向管道通信
当父进程有任务需要子进程执行时,直接把任务写入管道中
对应管道的子进程读取数据,开始执行分配的任务
如果父进程没有分配任务,子进程就处于等待阻塞的状态
当父进程分配任务时,叫做唤醒子进程
所以,这种机制下建立的一系列进程,也需要管理
而管道的作用,就是让进程之间进行协同合作
所以,管理多个进程的容器就叫做进程池(Process Pool)。
负载均衡:每个进程执行任务的频度均衡。
2、实现进程池
形参命名规范:
const& :只要输出
&:输出输入型参数
*:输出型参数
(1)相关函数:
pipe函数:
#include <unistd.h>
int pipe(int pipefd[2]);
参数
pipefd
: 一个整数数组,长度为 2。pipefd[0]
用于读取数据,pipefd[1]
用于写入数据。返回值
成功:返回 0。
失败:返回 -1,并设置
errno
以指示错误类型。功能
创建一个管道,管道在内存中提供一个字节流,允许数据从一个进程传递到另一个进程。
管道是一种半双工的通信机制,即数据只能单向流动:要么从写端到读端,要么反之。
write函数
#include <unistd.h>
ssize_t write(int fd, const void *buf, size_t count);
参数
fd
: 文件描述符,指向要写入的数据流。通常是通过open
函数、管道、套接字等获得的。
buf
: 指向要写入的数据缓冲区的指针。
count
: 要写入的数据字节数。返回值
成功:返回实际写入的字节数(可能小于
count
)。如果写入的数据字节数等于count
,这表示所有请求的数据都已成功写入。失败:返回 -1,并设置
errno
以指示错误类型。
read函数
#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
参数
fd
: 文件描述符,指向要读取的数据流。通常通过open
函数、管道、套接字等获得。
buf
: 指向接收数据的缓冲区的指针。
count
: 要读取的最大字节数。返回值
成功:返回实际读取的字节数。如果返回值为 0,表示已到达文件末尾(EOF)。
失败:返回 -1,并设置
errno
以指示错误类型。
waitpid函数
#include <sys/wait.h>
pid_t waitpid(pid_t pid, int *status, int options);
参数
pid
: 要等待的子进程的进程 ID。可以是:
特定子进程的进程 ID。
-1
,表示等待任何子进程(类似于wait
函数)。
0
,表示等待与调用进程在同一进程组的子进程。小于 -1 的值,表示等待具有相同进程组 ID 的子进程。
status
: 指向整数的指针,用于存储子进程的退出状态。如果不需要获取状态信息,可以传递NULL
。
options
: 控制函数行为的标志。常见值包括:
0
:默认行为。
WNOHANG
:如果没有子进程状态改变,返回 0,而不是阻塞。
WUNTRACED
:返回状态改变的子进程,即使它尚未终止。
WCONTINUED
:返回状态改变的子进程,即使它被继续执行。返回值
成功:返回子进程的进程 ID。
失败:返回 -1,并设置
errno
以指示错误类型。
(2)代码实现面向过程进程池
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <stdlib.h>
typedef void (*task_t)(); // task_t类型函数指针数组
#define TaskNum 3
void task1()
{
std::cout << "i am task1, you can do something in here...." << std::endl;
}
void task2()
{
std::cout << "i am task2" << std::endl;
}
void task3()
{
std::cout << "i am task3" << std::endl;
}
task_t tasks[TaskNum];
void LoadTask()
{
srand(time(nullptr));
tasks[0] = task1;
tasks[1] = task2;
tasks[2] = task3;
}
void ExcuteTask(int tasknum)
{
if (tasknum < 0 | tasknum > 2)
{
return;
}
tasks[tasknum](); // 函数指针数组
}
int SelectTask()
{
return rand() % TaskNum;
}
processPool.cc
#include "task.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
// 信道对象
class Channel
{
public:
// 常见信道,需要进程id,信道名,文件fd
Channel(int subprocess, std::string &name, pid_t wfd)
: _name(name), _subprocessid(subprocess), _wfd(wfd)
{
}
~Channel() {}
int GetWfd() { return _wfd; }
std::string Getname() { return _name; }
int Getsubprocessid() { return _subprocessid; }
void CloseChannel() { close(_wfd); }
void wait()
{
int n = waitpid(_subprocessid, nullptr, 0);
if (n > 0)
{
std::cout << "wait: " << n << " sucess" << std::endl;
}
}
private:
int _wfd; // 写端
std::string _name; // 名称
int _subprocessid; // 进程pid
};
void forTest(std::vector<Channel> *channels)
{
// for test
for (auto &channel : *channels) // 解引用
{
std::cout << channel.Getname() << " " << channel.Getsubprocessid() << " " << channel.GetWfd() << std::endl;
}
}
void work(int rfd)
{
while (true)
{
int commd = 0;
int n = read(rfd, &commd, sizeof(commd));
if (n == sizeof(int))
{
ExcuteTask(commd);
}
else if (n == 0)
{
std::cout << "subprocess :" << getpid() << " quit sucess" << std::endl;
break;
}
}
}
void creatChannelAndSubprocess(int num, std::vector<Channel> *channels)
{
// 1.创建管道和进程
for (int i = 0; i < num; ++i)
{
// 创建管道
int pipefd[2];
int n = pipe(pipefd); // pipe函数,参数为fd数组,0读,1写
if (n < 0)
{
exit(1);
}
// 子进程
pid_t id = fork(); // 创建子进程,返回值为0
if (id == 0)
{
// child
close(pipefd[1]); // 关闭写
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
// father
close(pipefd[0]);
std::string channel_name = "channel-" + std::to_string(i);
channels->push_back(Channel(id, channel_name, pipefd[1]));
}
}
int CurChannel(int channelnum)
{
static int cur = 0;
int channel = cur;
cur++;
cur %= channelnum;
return cur;
}
void SentCommd(Channel &channel, int taskcommd)
{
// 向指定信道和进程发送信号,就是指定管道写东西
write(channel.GetWfd(), &taskcommd, sizeof(taskcommd));
}
void ctrlSubPro(std::vector<Channel> &channels, int num)
{
while (num--)
{
sleep(1);
// a.选择任务
int TaskCommd = SelectTask(); // 任务
// b.选择信道和进程
int task_index = CurChannel(channels.size()); // 从所有的信道中选择
// c.指定信道和进程发送指定任务码
SentCommd(channels[task_index], TaskCommd);
std::cout << std::endl;
std::cout << "TaskCommd: " << TaskCommd << " channels->" << channels[task_index].Getname() << " processid: "
<< channels[task_index].Getsubprocessid() << std::endl;
}
}
void CleanUpChannel(std::vector<Channel> &channels)
{
for (auto &channel : channels)
{
channel.CloseChannel();
}
for (auto &channel : channels)
{
channel.wait();
}
}
int main(int argc, char *argv[]) // 命令行参数
{
// 提示,命令行输入进程名 + 进程数
if (argc != 2)
{
std::cout << "Usage: " << argv[0] << " processnum" << std::endl; // usage--用法
return 1;
}
int num = std::stoi(argv[1]);
std::vector<Channel> channels;
// 加载任务
LoadTask();
// 创建进程+信道
creatChannelAndSubprocess(num, &channels);
// 2.控制进程执行任务
ctrlSubPro(channels, std::stoi(argv[1]));
// 3.释放资源
// 关闭写端,回收子进程
CleanUpChannel(channels);
return 0;
}
3、注意事项
子进程会继承父进程的文件描述符表,这就会出错
父进程指向1文件,子进程也指向1文件,此时1文件就有两个文件指向
为什么分开关,再回收可以?
实际的关闭过程,先是递归式的从上到下,再从下到上关闭
在创建进程的时候,会因为创建子进程继续父进程的机制,出现多个写端
可以在创建子进程的时候,把历史上所有打开的进程写端口关闭