前言
在之前的学习中我们知道通信的概念和匿名管道的使用,那么接下来我们就要用匿名管道来实现一个功能,首先我们有很多的函数需要被执行,然后创建一些子进程通过匿名管道方式给子进程传递一些信息,然后子进程就根据这些信息来确定要执行的函数并执行,比如说下面的图片:
那么接下来我们就要一步一步的实现这个功能。
第一步:创建子进程并连接管道
首先我们要定义一个宏用来表示要创建多少个子进程来执行不同的任务,然后在main函数里面首先创建一个for循环用来不停的创建子进程,在循环的开始先创建一个数组用来记录管道的写端和读端,然后使用pipe函数创建一个匿名管道,因为管道可能会创建失败,所以创建一个变量用来记录pipe函数的返回值,如果变量的值等于0的话就断言一下,然后就可以使用fork函数来创建子进程,并根据fork函数的返回值来将父子进程执行的代码进行分开,那么这里的代码如下:
#include<iostream>
#include<unistd.h>
#include<cassert>
#define PROCSS_NUM 3
using namespace std;
int main()
{
for(int i=0;i<PROCSS_NUM;i++)
{
int fds[2]={0};
int n=pipe(fds);
assert(n==0);
(void)n;
pid_t id=fork();
if(id==0)
{
//这里是子进程执行的代码
}
//这里是父进程执行的代码
}
return 0;
}
然后在子进程里面干的第一件事就是关闭管道文件的写端,并且创建一个while循环,在循环里面要不停的读取管道里面的内容并且根据读取的内容执行对应的函数,那么这里的代码如下:
if(id==0)
{
//这里是子进程执行的代码
close(fds[1]);
while(true)
{
//读取管道里面的内容
//根据管道的内容执行对应的函数
}
}
因为这里存在多个子进程每个子进程都要有对应的写端 pid 和名字,所以为了方便描述这些子进程我们可以创建一个结构体来对其进行描述:
class subEp // 描述子进程
{
public:
subEp(pid_t subId, int writeFd)
: subId_(subId), writeFd_(writeFd)
{
char nameBuffer[1024];
snprintf(nameBuffer, sizeof nameBuffer, "process-%d[pid(%d)-fd(%d)]", num++, subId_, writeFd_);
//给子进程创建名字
name_ = nameBuffer;
}
public:
static int num;//用于区别每个子进程
std::string name_;//记录子进程的名字
pid_t subId_;//子进程的id
int writeFd_;//子进程的写端
};
那么创建完子进程之后父进程就得关闭管道的读端并且创建一个描述子进程的结构体对象,因为子进程存在多个所以描述子进程的对象也会存在多个,所以为了方便管理我们就得创建一个vector对象来管理结构体对象,那么目前的代码就如下:
#include<iostream>
#include<unistd.h>
#include<cassert>
#include<string>
#include<vector>
#define PROCSS_NUM 10
using namespace std;
class subEp // 描述子进程
{
public:
subEp(pid_t subId, int writeFd)
: subId_(subId), writeFd_(writeFd)
{
char nameBuffer[1024];
snprintf(nameBuffer, sizeof nameBuffer, "process-%d[pid(%d)-fd(%d)]", num++, subId_, writeFd_);
//给子进程创建名字
name_ = nameBuffer;
}
public:
static int num;//用于区别每个子进程
std::string name_;//记录子进程的名字
pid_t subId_;//子进程的id
int writeFd_;//子进程的写端
};
int subEp::num = 0;
int main()
{
vector<subEP> subs;
for(int i=0;i<PROCSS_NUM;i++)
{
int fds[2]={0};
int n=pipe(fds);
assert(n==0);
(void)n;
pid_t id=fork();
if(id==0)
{
//这里是子进程执行的代码
close(fds[1]);
while(true)
{
//读取管道里面的内容
//根据管道的内容执行对应的函数
}
}
//这里是父进程执行的代码
close[fds[0]];
subs.push_back(subEP(id,fds[1]));
}
return 0;
}
我们上面写的是子进程和管道的创建过程,那么我们可以把这个模块写成一个函数,因为这个函数要对subs对象进行修改,并且子进程还有执行对应的函数,所以这个函数需要两个参数,一个是指向vector对象的指针,一个是指向函数集合的引用,那么这里的代码如下:
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{
for(int i=0;i<PROCSS_NUM;i++)
{
int fds[2]={0};
int n=pipe(fds);
assert(n==0);
(void)n;
pid_t id=fork();
if(id==0)
{
//这里是子进程执行的代码
close(fds[1]);
while(true)
{
//读取管道里面的内容
//根据管道的内容执行对应的函数
}
}
//这里是父进程执行的代码
close[fds[0]];
subs.push_back(subEP(id,fds[1]));
}
}
第二步:建立被执行函数的集合
这里我们就可以创建很多函数,并且每个函数里面我们都可以添加一个打印一些话用来表示当前函数已经被执行了,那么这里的代码如下:
void downLoadTask()
{
std::cout << getpid() << ": 下载任务\n"
<< std::endl;
sleep(1);
}
void ioTask()
{
std::cout << getpid() << ": IO任务\n"
<< std::endl;
sleep(1);
}
void flushTask()
{
std::cout << getpid() << ": 刷新任务\n"
<< std::endl;
sleep(1);
}
然后我们可以对函数指针进行重名,并在main函数里面创建一个函数指针的vector对象,然后创建一个函数用于将这些函数加载进vector对象,那么这里的代码如下:
typedef void (*func_t)();
void loadTaskFunc(std::vector<func_t> *out)
{
assert(out);
out->push_back(downLoadTask);
out->push_back(ioTask);
out->push_back(flushTask);
}
int main()
{
vector<func_t> funcMap;
loadTaskFunc(&funcMap);
vector<subEP> subs;
//创建子进程
createSubProcess(subs,funcmap);
return 0;
}
到这里我们的子进程创建完了,管道也和进程之间连接成功了,那么接下来我们就要让父进程往管道里面发送信息。
第三步:父进程发送信息
函数创建完成之后父进程就可以给子进程发送信息,那么这里我们可以创建一个专门给子进程发送信息的函数,但是在发送信息之前得先选择一个子进程进行发送吗,因为我们要多次发送信号,所以在发送信号的时候我们得保证子进程选着的随机性和公平性,保证每个进程接收到的信号数目是差不多,所以这里我们这里采用随机数的方式来选择子进程,首先设计一个时间戳:
srand((unsigned long)time(nullptr) ^ getpid() ^ 0x171237 ^ rand() % 1234);
然后创建一个子进程的控制函数,这个函数需要三个参数:子进程的合集,函数的合集,信号的个数,那么函数的声明就如下:
void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{
}
首先得到子进程和预处理函数的个数,然后这里我们做一个约定了,如果count的值一开始就被设计为0话就表明这里要不停的发送信号并处理函数,那么这里的代码如下:
void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{
int processnum=subs.num();
int tasknum=funcmap.num();
bool forver =(count==0? true:false);
while(true)
{
}
}
在while循环里面我们就要获得一个随机数,然后通过模上processnum值形式来限制其范围,得到随机数之后就可以通过subs对象找到对应的子进程并获得该进程对应管道的写端,然后就可以往该管道发送数据,管道随机的同时还得保证信号的随机,所以我们还得再创建一个随机数用来保证信号的随机,然后选着进程发送信号,那么这里我们可以创建一个函数来完成发送信号的过程,该函数需要一个描述子进程的结构体和一个表示函数集的下标,那么这里的代码如下:
void sendTask(const subEp &process, int taskNum)
{
}
void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{
int processnum=subs.num();
int tasknum=funcmap.num();
bool forever =(count==0? true:false);
while(true)
{
int subIdx=rand()%processnum;
int taskIdx=rand()%tasknum;
sendTask(subs[subIdx], taskIdx);
}
}
那么在sendTask函数里面我们就可以打印一句话用来表示我们已经调用过该函数,然后就调用write函数往管道里面写入函数的下标,因为我们每次往管道里面就写入一个整型,write函数的返回值为写入数据的大小,所以我们这里可以拿一个参数接收write函数的返回值,并用assert函数进行一下判断,那么这里的代码如下:
void sendTask(const subEp &process, int taskNum)
{
cout << "send task num: " << taskNum << " send to -> " << process.name_ << endl;
int n=write(process.writeFd_,tasknum);
assert(n==sizeof(int));
(void)n;
}
将信号发送完之后我们就得对count的值进行一下判断,如果forever的值为fasle的话就对count的值进行减一,如果减完后count的值为0的话就用break结束循环,循环结束之后就表明信号全部发送完了,那么这时我们就可以通过for循环关闭父进程对应的写端,那么这里的代码如下:
void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{
int processnum=subs.num();
int tasknum=funcmap.num();
bool forever =(count==0? true:false);
while(true)
{
int subIdx=rand()%processnum;
int taskIdx=rand()%tasknum;
sendTask(subs[subIdx], taskIdx);
}
if(!forever)
{
--count;
if(count==0)
{
break;
}
}
for(int i=0;i<process;i++)
{
close(subs[i].writeFd_);
}
}
那么这就是信号发送的有关功能的实现。
第四步:信号的接收和实行函数
信号发送之后我们就可以创建一个函数来接收函数,在之前创建子进程的时候我们还没有将函数实现完:
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{
for(int i=0;i<PROCSS_NUM;i++)
{
int fds[2]={0};
int n=pipe(fds);
assert(n==0);
(void)n;
pid_t id=fork();
if(id==0)
{
//这里是子进程执行的代码
close(fds[1]);
while(true)
{
//读取管道里面的内容
//根据管道的内容执行对应的函数
}
}
//这里是父进程执行的代码
close[fds[0]];
subs.push_back(subEP(id,fds[1]));
}
}
那么这里我们就可以再创建一个函数用来接收管道里面的信号,这个函数需要一个参数用来表示子进程的读端,返回值表示要执行的函数下标,那么函数的声明如下:
int recvTask(int readFd)
{
}
函数的开始创建一个code变量用来记录信号的值,然后使用read函数读取信号的值,read的第一个参数为对应管道的读端,第二个参数传递code变量的地址,第三个参数表示读取数据的大小那么这里就是4,因为读取的结果可能会出问题,所以我们创建一个变量用来记录该函数的返回值,如果等于4就直接返回code,如果小于等于0就返回-1其他情况就返回0,那么这里的代码如下:
int recvTask(int readFd)
{
int code=0;
ssize_t s=read(fds[0],&code,sizeof(int));
if(s==4) {return code;}
else if(s<=0) {return -1;}
else {return 0}
}
然后在createSubProcess函数里面我们就先创建一个变量接收recvTask函数的返回值,然后进行判断如果返回值为-1的话就直接退出,其他情况就正常执行:
while(true)
{
//读取管道里面的内容
int commondCode=recvTask(fds[0]);
//根据管道的内容执行对应的函数
if(commondCode>=0&&commondCode<funcMap.size())
{
funcMap[commandCode]();
}
else if(commandCode == -1)
{
break;
}
}
第五步:子进程等待函数
我们先完善一下main函数的内容:
int main()
{
vector<func_t> funcMap;
loadTaskFunc(&funcMap);
vector<subEP> subs;
//创建子进程
createSubProcess(subs,funcmap);
return 0;
}
首先加载函数然后创建子进程,子进程创建之后读端就已经开始等待写端信号传递过来了,那么这时我们就得控制一下子进程往里面发送信号,所以这个时候就得调用loadBlanceContrl函数发送信号:
int main()
{
vector<func_t> funcMap;
loadTaskFunc(&funcMap);
vector<subEP> subs;
//创建子进程
createSubProcess(subs,funcmap);
int count=10;
loadBlanceContrl(subs,funcMap,count);
return 0;
}
等loadBlanceContrl函数执行完成之后,读端也就已经关闭了,读端关闭了也就不会再有信号的产生,所以到这里我们还只需干一件事就是使用waitpid函数回收子进程的信息,那么我们可以再创建一个函数进行回收:
void waitProcess(std::vector<subEp> processes)
{
int processnum = processes.size();
for(int i = 0; i < processnum; i++)
{
waitpid(processes[i].subId_, nullptr, 0);
std::cout << "wait sub process success ...: " << processes[i].subId_ << std::endl;
}
}
int main()
{
vector<func_t> funcMap;
loadTaskFunc(&funcMap);
vector<subEP> subs;
//创建子进程
createSubProcess(subs,funcmap);
int count=10;
loadBlanceContrl(subs,funcMap,count);
waitProcess(subs);
return 0;
}
程序的测试
PROCSS_NUM的值为3,并且count的值为10,所以当前程序运行起来之后会创建出3个子进程然后一共执行10个任务,那么运行的现象就是这样:
可以看到这里创建了三个子进程来处理不同的函数任务,等函数的执行个数达到10个的时候就停止了执行函数,最后对创建的三个子进程进行回收:
程序的bug
程序的运行符合我们的预期但是这里存在一个隐藏的bug,我们创建管道的时候是以读和写的方式来打开一个管道,后来我们会关闭父进程的读端和子进程的写段,然后子进程就可以只从管道读取数据,父进程从管道里面写入数据,当我们创建子进程的时候子进程会继承父进程的文件描述符表,可是这里会出现问题,当我们创建第一个子进程的时候父进程会指向一号管道的写段,子进程会指向1号管道的读端,当我们创建第二个子进程的时候,该子进程会继承父进程的文件描述符表,所以第二个子进程也会指向1号管道的写端,同样的道理当我们创建第三个子进程的时候,他还会继承父进程的文件描述符表,所以也会指向1号管道和2号管道的写端,又因为我们在子程序的开始过程中只会关闭进程对应管道的写段,不会管其他管道的,这就会导致当我们想要单独关闭某个主进程对管道的写端,从而结束通信时这个管道是无法通过返回0来直接关闭子进程的,所以这里依然会出现数据的损失,和子进程无法关闭的情况,那么为了解决这个问题我们就可以创建一个vector对象用来记录当前已经创建的管道,每次创建子进程时,子进程里面就一一关闭该子进程对这些管道的写段,然后在父进程里面就将新创建的管道的写段尾差到vector里面,那么这一功能的实现就放到createSubProcess函数里面,那么这里的代码如下:
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{
std::vector<int> deleteFd;
for(int i=0;i<PROCSS_NUM;i++)
{
int fds[2]={0};
int n=pipe(fds);
assert(n==0);
(void)n;
pid_t id=fork();
if(id==0)
{
cout<<"创建了子进程"<< endl;
for(int i = 0; i < deleteFd.size(); i++) close(deleteFd[i]);
//这里是子进程执行的代码
close(fds[1]);
while(true)
{
//读取管道里面的内容
int commondCode=recvTask(fds[0]);
//根据管道的内容执行对应的函数
if(commondCode>=0&&commondCode<funcMap.size())
{
funcMap[commondCode]();
}
else if(commondCode == -1)
{
break;
}
}
exit(0);
}
//这里是父进程执行的代码
close(fds[0]);
subEp sub(id,fds[1]);
subs->push_back(sub);
deleteFd.push_back(fds[1]);
}
}
那么这就是本篇文章的全部内容完整的代码如下:
#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <cassert>
#include <ctime>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define PROCSS_NUM 3
using namespace std;
typedef void (*func_t)();
class subEp // 描述子进程
{
public:
subEp(pid_t subId, int writeFd)
: subId_(subId), writeFd_(writeFd)
{
char nameBuffer[1024];
snprintf(nameBuffer, sizeof nameBuffer, "process-%d[pid(%d)-fd(%d)]", num++, subId_, writeFd_);
//给子进程创建名字
name_ = nameBuffer;
}
public:
static int num;//用于区别每个子进程
std::string name_;//记录子进程的名字
pid_t subId_;//子进程的id
int writeFd_;//子进程的写端
};
int subEp ::num =0;
void downLoadTask()
{
std::cout << getpid() << ": 下载任务\n"
<< std::endl;
sleep(1);
}
void ioTask()
{
std::cout << getpid() << ": IO任务\n"
<< std::endl;
sleep(1);
}
void flushTask()
{
std::cout << getpid() << ": 刷新任务\n"
<< std::endl;
sleep(1);
}
int recvTask(int readFd)
{
int code=0;
ssize_t s=read(readFd,&code,sizeof(int));
if(s==4) {return code;}
else if(s<=0) {return -1;}
else {return 0;}
}
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{
std::vector<int> deleteFd;
for(int i=0;i<PROCSS_NUM;i++)
{
int fds[2]={0};
int n=pipe(fds);
assert(n==0);
(void)n;
pid_t id=fork();
if(id==0)
{
cout<<"创建了子进程"<< endl;
for(int i = 0; i < deleteFd.size(); i++) close(deleteFd[i]);
//这里是子进程执行的代码
close(fds[1]);
while(true)
{
//读取管道里面的内容
int commondCode=recvTask(fds[0]);
//根据管道的内容执行对应的函数
if(commondCode>=0&&commondCode<funcMap.size())
{
funcMap[commondCode]();
}
else if(commondCode == -1)
{
break;
}
}
exit(0);
}
//这里是父进程执行的代码
close(fds[0]);
subEp sub(id,fds[1]);
subs->push_back(sub);
deleteFd.push_back(fds[1]);
}
}
void loadTaskFunc(std::vector<func_t> *out)
{
assert(out);
out->push_back(downLoadTask);
out->push_back(ioTask);
out->push_back(flushTask);
}
void sendTask(const subEp &process, int taskNum)
{
cout << "send task num: " << taskNum << " send to -> " << process.name_ << endl;
int n=write(process.writeFd_,&taskNum,sizeof(taskNum));
assert(n==sizeof(int));
(void)n;
}
void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{
int processnum=subs.size();
int tasknum=funcMap.size();
bool forever =(count==0? true:false);
while(true)
{
int subIdx=rand()%processnum;
int taskIdx=rand()%tasknum;
sendTask(subs[subIdx], taskIdx);
sleep(1);
if(!forever)
{
--count;
if(count==0)
{
break;
}
}
}
for(int i=0;i<processnum;i++)
{
close(subs[i].writeFd_);
}
}
void waitProcess(std::vector<subEp> processes)
{
int processnum = processes.size();
for(int i = 0; i < processnum; i++)
{
waitpid(processes[i].subId_, nullptr, 0);
std::cout << "wait sub process success ...: " << processes[i].subId_ << std::endl;
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid() ^ 0x171237 ^ rand() % 1234);
vector<func_t> funcMap;
loadTaskFunc(&funcMap);
vector<subEp> subs;
//创建子进程
createSubProcess(&subs,funcMap);
int count=10;
loadBlanceContrl(subs,funcMap,count);
waitProcess(subs);
return 0;
}