Linux进程间通信 管道系列: 利用管道实现进程池[匿名和命名两个版本]
- 一.匿名管道实现进程池
- 1.池化技术
- 2.搭架子
- 3.代码编写
- 1.创建子进程
- 1.利用命令行参数传入创建几个子进程
- 2.创建管道和子进程(封装Channel类)
- 1.先描述
- 2.在组织
- 3.开始创建
- 2.封装MainProcess类
- 3.控制子进程
- 1.封装Task类
- 1.先描述
- 2.在组织
- 2.创建任务
- 3.让子进程执行任务
- 1.分析
- 2.代码
- 4.让父进程分配任务
- 4.回收子进程
- 4.分析问题
- 二.匿名管道进程池完整代码
- 1.channel.hpp
- 2.task.hpp
- 3.MainProcess.hpp
- 4.user.h和user.cpp
- 5.pipepool.cpp
- 三.命名管道实现进程池
- 1.总体框架
- 2.开始实现
- 1.Common.hpp
- 3.SendTaskProcess.cpp
- 1.大框架
- 2.具体实现
- 1.资源的保存和释放
- 2.分配任务
- 3.main函数
- 4.DealTaskProcess.cpp
- 1.大框架
- 2.创建任务
- 3.创建和回收子进程
- 4.子进程执行任务
- 5.main函数
- 5.动图演示
- 四.命名管道进程池完整代码
- 1.Common.hpp
- 2.SendTaskProcess.cpp
- 3.DealTaskProcess.cpp
- 4.task.hpp
- 5.user.h和user.cpp
- 6.makefile
一.匿名管道实现进程池
1.池化技术
大家可能听说过内存池,其实不仅仅有内存池,还有线程池,对象池等等池
他们都是池化技术的一种应用
下面,我们就来玩一下进程池吧
2.搭架子
初步的架子搭起来了,下面我们直接开始写代码
在代码编写的时候我们会说一下注意的点以及要怎么做
3.代码编写
经过刚才的分析,我们知道了我们需要完成3个点即可:
1.创建子进程
2.利用管道控制子进程
3.回收子进程
1.创建子进程
创建子进程,创建多少个呢?
4个?10个?5个?8个
太挫了吧那样,我们想让执行程序的人想创建几个创建几个
那怎么办?
不要忘了命令行参数哦
1.利用命令行参数传入创建几个子进程
2.创建管道和子进程(封装Channel类)
让子进程数量可以动态控制之后,下面我们就要开始创建管道和子进程了
1.先描述
所谓先描述,就是定义自定义类型,尤其是指class struct
我们想一下,我一个父进程,需要
1.通过向管道写入数据来给子进程分配任务,此时需要pipefd[1]{写权限}
2.子进程退出时我需要子进程的pid来waitpid回收子进程,此时需要子进程的pid
3.子进程可能会有很多哦,要不要给他们起个名字方面我们观察和使用呢,此时需要一个string _name
这个结构体是我们父进程用来控制管理子进程的结构体
我们起名为channel(信道)
因为string会调用自己的析构,而内置类型无需析构
因此无需写析构函数
2.在组织
描述完之后,下面我们考虑一下如何组织
由于我们有随机访问某个子进程的需求
因此需要用一个支持随机访问的容器,而且我们不进行中间位置的插入,删除
那么最好的选择就是vector啦
因此我们可以把创建子进程时定义出的Channel对象push_back/emplace_back到vector当中即可
3.开始创建
创建成功
2.封装MainProcess类
可见面向对象思想的强大之处
3.控制子进程
1.封装Task类
1.先描述
1.首先任务肯定要有名字,因此需要string _name
2.任务,就是一个函数,因此需要一个函数类型的成员
(此时就要用到C++11当中的包装器function啦)
3.为了方便管理,我们给每个任务分配一个编号 int
这里我们搞得简单点,所有的任务函数都是返回值类型void,不需要参数
因此包装器类型是
function<void()>
2.在组织
跟Channel一样,依然是用vector
然后我们在MainProcess类里面添加CreateTask函数
2.创建任务
我们要知道的是,我们现在是在实现进程池
而进程执行的任务归不归我们管,答案是不归
归谁管呢?当然是用我这个进程池的人啊
因此那些任务函数我们要不要封装,不要,因为执行哪些函数我哪知道
就像是设计库的大佬提供的vector
他哪知道你想存什么数据,你存什么数据你自己提供
因此我们现在切换一下身份,站在进程池的使用者的角度来看,
我们现在要用你这个进程池,是因为我们有函数要执行,所以那些函数是天然存在的
因此我们随意搞上几个函数
把这些函数放到一个源文件当中,user.cpp
改一下makefile
我们还需要用户给我们提供一个头文件,这个头文件当中有这些方法的声明,否则我们想要使用的话就需要在我们的源文件当中再去声明,那样不好看
用户提供了5个任务,下面我们来使用这些任务初始化我们的Task吧
3.让子进程执行任务
1.分析
下面我们站在子进程的角度,
- 子进程不关心父进程怎么给我分配任务: 我就一干活的,让我干啥我干啥
- 子进程作为管道的读端,需要从管道当中获取一个下标,然后执行这个下标位置的任务
- 根据管道的特性,写端不退,读端就要阻塞等待写端进行写入,因此无需担心父进程分配任务较晚
- 根据管道的特性,写端退出之后,读端read的返回值就是0,表示写端已经退了,提示你这个读端也应该退了哦
2.代码
4.让父进程分配任务
下面我们站在父进程的角度,来给子进程分配任务
因为我们想让每个子进程执行的任务个数相对更加均匀
因此我们需要设计一个next函数,获取下一个需要执行当前任务的进程
同时使用随机数随意挑选当前要执行的任务
这里要用到Channel当中的_wfd,因此需要封装出get接口,或者用友元
但是友元毕竟会破坏封装,增加耦合度,因此我们用get
下面该分配任务了,我们规定父进程一共分配2*子进程数目个任务
4.回收子进程
子进程作为读端,需要等到该管道所有的写端关闭wfd时,读到0之后才可以退出
因此我们需要先关闭所有读端的wfd,如何关闭呢?
我们调用后make一下,看一下退没退成功
既然子进程成功退出了,那么我们就让父进程对每个子进程都wait一下吧
调用,make,然后运行一波
至此,我们的进程池就写完了,但是有1个问题,需要我们仔细分析一下
4.分析问题
先说指导思想,后面我们的分析都是严格按照这个顺序去走的
动图分析该问题:
旧的问题刚被解决,新的问题又出现了:
动图分析该过程
因此如果我们刚才不是先关闭所有的fd,然后才waitpid回收子进程,而是关一个回收一个的话,就出bug了
我们复现一下
规避这一问题的方法有两种:
(1)先关闭所有的,再回收(我们一开始写的版本)
(2)因为最后被创建的管道只有父进程一个wfd管着,因此可以从后往前关一个回收一个(利用依赖关系)
而真正能够解决这一问题的方法只有1种:
就是每个子进程刚被创建时就关闭掉从父进程继承来的所有wfd(也就是父进程创建该子进程之前的所有的wfd)
如何做到呢?
不要忘了Channelv当中保存这Channel,而Channel当中就是父进程的wfd
而且父进程是一一创建子进程的,因此可以这么玩
完美解决
关于使用匿名管道实现进程池,我们就搞定了
下面给出源码,然后我们开始搞命名管道
二.匿名管道进程池完整代码
1.channel.hpp
#pragma once
class Channel
{
public:
Channel(pid_t sub_process_id,int wfd,const string& sub_process_name)
:_sub_process_id(sub_process_id),_wfd(wfd),_sub_process_name(sub_process_name){}
void Debug() const
{
cout<<"_sub_process_id: "<<_sub_process_id<<" _wfd: "<<_wfd<<" _sub_process_name"<<_sub_process_name<<endl;
}
int Getwfd() const{return _wfd;}
pid_t Getsub_process_id() const{return _sub_process_id;}
string Getsub_process_name() const{return _sub_process_name;}
void Close() const{close(_wfd);}
private:
pid_t _sub_process_id;
int _wfd;
string _sub_process_name;
};
2.task.hpp
#pragma once
class Task
{
public:
Task(const string& task_name,int task_id,const function<void()>& function)
:_task_name(task_name),_task_id(task_id),_function(function){}
//提供调用该函数的接口
void exec()
{
_function();
}
private:
string _task_name;
int _task_id;
function<void()> _function;
};
3.MainProcess.hpp
#pragma once
class MainProcess
{
public:
//提醒应该如何执行我这个可执行程序
void Usage(const char* s) const
{
printf("Usage: %s -sub_process_num\n",s);
}
//创建进程
void CreateProcess(int cnt)
{
int pipefd[2]={0,0};
for(int i=0;i<cnt;i++)
{
int n=pipe(pipefd);
if(n<0)
{
cout<<"create pipe fail"<<endl;
exit(1);
}
pid_t id=fork();
if(id==0)
{
//child关闭写
close(pipefd[1]);
//child关闭所有父进程fork该子进程之前所打开的所有wfd
for(auto& e:_channelv)
{
close(e.Getwfd());
}
//TODO(子进程开始执行父进程给的任务)
work(pipefd[0]);
exit(0);
}
//father关闭读
close(pipefd[0]);
_channelv.push_back(Channel(id,pipefd[1],"Channel -"+to_string(i)));
}
}
//调试看一下进程的创建
void PrintDebug() const
{
for(auto& e:_channelv)
{
e.Debug();
}
}
//创建任务
void CreateTask(int cnt)
{
_taskv.push_back(Task("printLog",0,printLog));
_taskv.push_back(Task("ConnectDatabase",1,ConnectDatabase));
_taskv.push_back(Task("UserLogin",2,UserLogin));
_taskv.push_back(Task("GenerateReports",3, GenerateReports));
_taskv.push_back(Task("TestSoftwarePerformance",4,TestSoftwarePerformance));
}
//调试任务的执行
void execDebug() const
{
for(auto e:_taskv)
{
e.exec();
}
}
//分配任务
void AssignTasks()
{
for(int i=0;i<2*_channelv.size();i++)
{
int processi=next();
int taski=rand()%_taskv.size();
cout << "send taski: " << taski << " to " << _channelv[processi].Getsub_process_name() <<
" sub prorcess id: " << _channelv[processi].Getsub_process_id() << endl;
write(_channelv[processi].Getwfd(),&taski,sizeof(taski));
sleep(1);
}
}
//关闭wfd同时wait回收所有的子进程
void CloseAndWaitAllWfd()
{
for(auto& e:_channelv)
{
e.Close();
waitpid(e.Getsub_process_id(),nullptr,0);//以阻塞方式等待回收该子进程
cout<<"wait success, child pid : "<<e.Getsub_process_id()<<endl;
}
}
private:
//获取下一个需要执行任务的进程
int next() const
{
static int n=0;//静态变量
int ret=n++;
n%=_channelv.size();
return ret;
}
void work(int rfd)
{
while(true)
{
uint32_t command_code=0;//该进程此次循环当中要执行的任务在任务数组中的下标
ssize_t n=read(rfd,&command_code,sizeof(command_code));//n是实际读取到的字节数
if(n==sizeof(command_code))//实际读取到的字节数==command_code的字节数,说明读取成功
{
if(command_code>_taskv.size()) continue;//该下标越界,说明该任务不存在,continue执行下一个任务
_taskv[command_code].exec();//执行该任务
}
if(n==0)//写端退了,那么我也要退了哦
{
printf("writer exit, me[FD of reader: %d] too...\n",rfd);
break;
}
printf("this task[%d] finished!\n",command_code);
sleep(1);//休息1s
}
}
vector<Channel> _channelv;
vector<Task> _taskv;
};
4.user.h和user.cpp
#pragma once
const int task_num=5;
void printLog();
void ConnectDatabase();
void UserLogin();
void GenerateReports();
void TestSoftwarePerformance();
#include <iostream>
#include <vector>
#include <string>
#include <chrono>
#include <iomanip>
#include <sstream>
#include "user.h"
using namespace std;
// 打印日志的函数
void printLog()
{
// 获取当前时间
auto now = std::chrono::system_clock::now();
auto now_c = std::chrono::system_clock::to_time_t(now);
// 格式化时间戳
ostringstream oss;
oss << put_time(std::localtime(&now_c), "%Y-%m-%d %H:%M:%S");
string timestamp = oss.str();
const vector<string>& message={"This is a log message.","Another log message with some information."};
// 打印日志信息
for(auto& e:message)
{
cout << "[" << timestamp << "] " << e << endl;
}
}
//下面的我就随便遍了,MySQL还没学....
//连接数据库
void ConnectDatabase()
{
cout<<"Connect to the database succeed"<<endl;
}
//用户登录
void UserLogin()
{
cout<<"User login succeed"<<endl;
}
//报告生成
void GenerateReports()
{
cout<<"Generate reports succeed"<<endl;
}
//测试软件性能
void TestSoftwarePerformance()
{
cout<<"Test software performance succeed"<<endl;
}
5.pipepool.cpp
#include <iostream>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <string>
#include <vector>
#include <functional>
using namespace std;
#include "user.h"
#include "channel.hpp"
#include "task.hpp"
#include "MainProcess.hpp"
int main(int argc,char* argv[])
{
srand((size_t)(nullptr));//typedef unsigned_int size_t 设置随机数种子
MainProcess main_process;
if(argc!=2)//传入的方式不符合标准,提醒他一下
{
main_process.Usage(argv[0]);
return 1;
}
int cnt=stoi(argv[1]);
main_process.CreateTask(task_num);
//main_process.execDebug();
main_process.CreateProcess(cnt);
//main_process.PrintDebug();
main_process.AssignTasks();
main_process.CloseAndWaitAllWfd();
return 0;
}
三.命名管道实现进程池
1.总体框架
2.开始实现
1.Common.hpp
3.SendTaskProcess.cpp
1.大框架
2.具体实现
Usage我们就不看了,跟匿名管道的Usage一模一样
1.资源的保存和释放
2.分配任务
_wfdv[workid]就是选择对应该执行该任务的管道wfd进行写入
taskid就是随机数,负责表示随机分配任务
3.main函数
调用函数即可
4.DealTaskProcess.cpp
1.大框架
2.创建任务
3.创建和回收子进程
4.子进程执行任务
5.main函数
5.动图演示
成功
四.命名管道进程池完整代码
1.Common.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <cerrno>
#include <cstring>
#include <string>
#include <vector>
#include <functional>
using namespace std;
//允许Deal进程创建n个子进程,每个进程和Send进程使用同一个命名管道
//每个命名管道的名字都是path+[0~n-1]
const char* path="./pipe";
#define MODE 0666
class Fifo
{
public:
Fifo(const char* path):_path(path)
{
int ret=mkfifo(_path.c_str(),MODE);
if(ret==-1)
{
cerr<<"create namedpipe fail, path = "<<_path<<endl;
}
else if(ret==0)
{
cout<<"create namedpipe succeed, path = "<<_path<<endl;
}
}
~Fifo()
{
int n=unlink(_path.c_str());
}
string getPath()
{
return _path;
}
private:
string _path;
};
2.SendTaskProcess.cpp
#include "Common.hpp"
class SendTaskProcess//管道的写端
{
public:
//提醒应该如何执行我这个可执行程序
void Usage(const char* s) const
{
printf("Usage: %s -sub_process_num\n",s);
}
//创建命名管道
void CreateNamedPipe(int cnt)
{
for(int i=0;i<cnt;i++)
{
string thispath=string(path)+to_string(i);
Fifo* p=new Fifo(thispath.c_str());
_pipev.push_back(p);
}
}
//打开命名管道,将wfd保存到_wfdv当中
void OpenPipe()
{
for(Fifo* e:_pipev)
{
string s=e->getPath();
int wfd=open(s.c_str(),O_WRONLY);
_wfdv.push_back(wfd);
}
}
//分配任务:cnt个
void AssignTask(int cnt)
{
for(int i=0;i<cnt;i++)
{
int workid=next();
int taskid=rand();
ssize_t n=write(_wfdv[workid],&taskid,sizeof(taskid));
cout<<"assign task succeed, i="<<i<<endl;
sleep(1);
}
}
//关闭wfd
void Close()
{
for(auto& e:_wfdv)
{
close(e);
cout<<"close succeed, e="<<e<<endl;
}
}
//释放_pipev
void release()
{
for(auto& e:_pipev)
{
delete e;
}
}
private:
//获取下一个需要执行任务的进程
int next() const
{
static int n=0;//静态变量
int ret=n++;
n%=_pipev.size();
return ret;
}
vector<int> _wfdv;
vector<Fifo*> _pipev;
};
int main(int argc,char* argv[])//让用户传入他想创建多少个进程
{
srand((size_t)(nullptr));
SendTaskProcess send_process;
if(argc!=2)//传入的方式不符合标准,提醒他一下
{
send_process.Usage(argv[0]);
return 1;
}
int num=stoi(argv[1]);
send_process.CreateNamedPipe(num);
send_process.OpenPipe();
send_process.AssignTask(2*num);
send_process.Close();
send_process.release();
return 0;
}
3.DealTaskProcess.cpp
#include "Common.hpp"
#include "user.h"
#include "task.hpp"
class DealTaskProcess
{
public:
//提醒应该如何执行我这个可执行程序
void Usage(const char* s) const
{
printf("Usage: %s -sub_process_num\n",s);
}
//创建进程
void CreateProcess(int cnt)
{
for(int i=0;i<cnt;i++)
{
pid_t id=fork();
if(id==0)
{
cout<<"create child success, i = "<<i<<endl;
//child
doWork(i);
exit(0);
}
_pidv.push_back(id);
}
cout<<endl;
}
//回收子进程
void WaitChild()
{
for(auto& e:_pidv)
{
waitpid(e,nullptr,0);
cout<<"wait child success, pid= "<<e<<endl;
}
}
void CreateTask()
{
Task::CreateTask(_taskv);
}
private:
void doWork(int i)//该进程编号是i,以r的方式打开第i个命名管道
{
while(true)
{
string thispath=string(path)+to_string(i);
int rfd=open(thispath.c_str(),O_RDONLY);
int workid=0;
ssize_t n=read(rfd,&workid,sizeof(workid));
if(n>0)
{
workid%=_taskv.size();
//执行任务
_taskv[workid].exec();
}
else if(n==0)
{
cout<<"writer exit,me too, i am: "<<i<<endl;
break;
}
else
{
cout<<"read fail"<<endl;
break;
}
}
}
vector<Task> _taskv;
vector<int> _pidv;
};
int main(int argc,char* argv[])//让用户传入他想创建多少个进程
{
DealTaskProcess deal_process;
if(argc!=2)//传入的方式不符合标准,提醒他一下
{
deal_process.Usage(argv[0]);
return 1;
}
int num=stoi(argv[1]);
deal_process.CreateTask();
deal_process.CreateProcess(num);
deal_process.WaitChild();
return 0;
}
4.task.hpp
#pragma once
class Task
{
public:
Task(const string& task_name,int task_id,const function<void()>& function)
:_task_name(task_name),_task_id(task_id),_function(function){}
//提供调用该函数的接口
void exec()
{
_function();
}
//创建任务
static void CreateTask(vector<Task>& _taskv)
{
_taskv.push_back(Task("printLog",0,printLog));
_taskv.push_back(Task("ConnectDatabase",1,ConnectDatabase));
_taskv.push_back(Task("UserLogin",2,UserLogin));
_taskv.push_back(Task("GenerateReports",3, GenerateReports));
_taskv.push_back(Task("TestSoftwarePerformance",4,TestSoftwarePerformance));
}
private:
string _task_name;
int _task_id;
function<void()> _function;
};
5.user.h和user.cpp
#pragma once
const int task_num=5;
void printLog();
void ConnectDatabase();
void UserLogin();
void GenerateReports();
void TestSoftwarePerformance();
#include <iostream>
#include <vector>
#include <string>
#include <chrono>
#include <iomanip>
#include <sstream>
#include "user.h"
using namespace std;
// 打印日志的函数
void printLog()
{
// 获取当前时间
auto now = std::chrono::system_clock::now();
auto now_c = std::chrono::system_clock::to_time_t(now);
// 格式化时间戳
ostringstream oss;
oss << put_time(std::localtime(&now_c), "%Y-%m-%d %H:%M:%S");
string timestamp = oss.str();
const vector<string>& message={"This is a log message.","Another log message with some information."};
// 打印日志信息
for(auto& e:message)
{
cout << "[" << timestamp << "] " << e << endl;
}
}
//下面的我就随便遍了,MySQL还没学....
//连接数据库
void ConnectDatabase()
{
cout<<"Connect to the database succeed"<<endl;
}
//用户登录
void UserLogin()
{
cout<<"User login succeed"<<endl;
}
//报告生成
void GenerateReports()
{
cout<<"Generate reports succeed"<<endl;
}
//测试软件性能
void TestSoftwarePerformance()
{
cout<<"Test software performance succeed"<<endl;
}
6.makefile
.PHONY:all
all:send deal
send:SendTaskProcess.cpp
g++ -o $@ $^ -std=c++11
deal:DealTaskProcess.cpp user.cpp
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f send deal
以上就是Linux进程间通信 管道系列: 利用管道实现进程池(匿名和命名两个版本)的全部内容,希望能对大家有所帮助!!!