目录
前言
1.管道实现进程间通信
①管道的所属问题
②匿名管道通信
③命名管道通信
2.使用管道通信实现一个进程池
①进程池类图
②Channel类实现
③ProcessPoll类实现
④代码一览
前言
在学习Linux中的进程时,曾提到过进程的独立性。进程独立性的是进程与进程之间在出错时不会相互影响的重要保证。但是由此也引发的一个问题,如果进程之间想要相互传递信息怎么办,由于进程与进程之间独立,一个进程不能访问另一个进程中的资源与数据等信息。这个时候就需要在进程之间建立信道来实现进程间通信。本文介绍进程间通信中的一个方式——管道通信。
1.管道实现进程间通信
①管道的所属问题
在讲解管道通信之前,我们需要明确一下管道的所属问题。管道本质上是一个文件,进程通信的两方,一方向文件中写,另一方从文件中读就实现了一种进程间的通信。但是请注意,由于进程独立性的影响,如果将这个用于进程间通信的文件定义在进程通信两方的任何一方的进程中,都会导致另一方不能读取到数据,所以管道独立不归属于进程通信的两方,而是归属于操作系统。
②匿名管道通信
匿名管道顾名思义就是进程要通过一个没有名字的文件进行进程间通信。
我们需要使用pipe系统调用接口来创建一个匿名管道。而后使用对应的文件描述符进行读写操作。
int pipe(int fd[2]);
//其中fd数组是一个输出型参数,数组中的第一个元素表示写文件描述符,第二个元素表示读文件描述符
//该接口在正确执行时返回0,出错返回错误码
图2所示的代码,首先定义了一个fd数组作为pipe接口的参数传入,当函数执行结束后,fd数组中被填充了读写文件描述符,而后使用fork创建子进程,子进程可以继承父进程的所有数据包括fd数组和数组指向文件的关系,这样就父子进程就都可以对文件进行读写操作了,进而实现了进程通信的一种方式。
注意:
管道通信的特性:
Ⅰ.匿名管道只能用于有血缘关系的进程之间通信
Ⅱ.匿名管道的生命周期与进程一致(父子进程任意一端关闭都会导致管道关闭)
Ⅲ.单个管道是半双工的,即通信的进程间都可以读取或接收数据但是不能同时进行读和写。
Ⅳ.管道是面向字节流的
管道通信的特殊情况:
Ⅰ.管道没有数据,读端阻塞等待。
Ⅱ.管道中数据已满,读端不读写段阻塞等待。
Ⅲ.当写端不继续写入数据并且关闭了写端管道,读端将管道中的数据读完后关闭。
Ⅳ.当读端不继续读取数据并且关闭了读端管道,操作系统会立即终止写入进程。
③命名管道通信
如果你足够的细心你会发现匿名管道一个很大的问题,就是实现进程间通信的文件没有名字,这也就意味着,没有血缘关系的进程不能使用匿名管道进行通信。因为匿名管道没有名称,进程间又相互独立,没有血缘关系又不能进行数据的继承,所以没有血缘关系的进程不能使用匿名管道进行通信。命名管道就是为了应对匿名管道的缺陷而诞生的。
//命名管道有两种创建方式
//在命令行中
mkfifo filename
//在程序中
int mkfifo(const char *filename,mode_t mode)
//说明:
//filename表示命名管道的名称,可自行确定
//mode表示文件的权限,注意该参数与文件掩码相减后才是文件的权限
注意:
命名管道除了在打开方式上与匿名管道不同,其它行为表现与匿名管道一致。
2.使用管道通信实现一个进程池
①进程池类图
②Channel类实现
Channel类用来表示一对父子进程之间的关系。在管道通信中,对于管道的开闭,我们可以通过控制写端描述符来进行控制,对于一个管道当写端关闭时,读端读完管道中的内容后就会自动关闭;对于程序的执行结果,需要使用wait/waitpid来进行获取,但是在进程池中往往一次性创建多个进程,如果使用wait来获取进程返回值,那么很难确定是哪一个进程进行的返回,所以我们需要使用waitpid来进行对特定pid的进程获取返回值,这就要求在Channel中要存储子进程的pid;我们也可以为管道加一个名字,用来更明显地区分同一批进程(实际上可以直接使用pid进行区分)。
class Channel
{
private:
int _wfd;
int _childid;
std::string _name;
public:
Channel(int wfd,int childid,std::string& name):_wfd(wfd),_childid(childid),_name(name){}
int getfd()
{
return _wfd;
}
int getid()
{
return _childid;
}
std::string& name()
{
return _name;
}
void CloseChannl()
{
close(_wfd);
}
};
③ProcessPoll类实现
进程池的生成需要用户通过命令行参数来进行控制,当执行可以执行程序时,需要在可执行程序后加上要生成进程池中进程的数量;而后创建一个传入参数大小的进程池,而后等待创建的进程返回结果,最后将所有的Channl弹出Channels。
class ProcessPool
{
private:
int _ProcessNum;
std::vector<Channel> Channels;
public:
ProcessPool(int ProcessNum=1):_ProcessNum(ProcessNum){}
void CreatePool()
{
std::vector<int> fds;
for(int i=0;i<_ProcessNum;i++)
{
int pip[2];
pipe(pip);
int task_index=NextTask();
write(pip[1],&task_index,sizeof(int));
std::string name = "process-"+std::to_string(i);
fds.push_back(pip[1]);
int id=fork();
if(id<0)
{
std::cout<<PipFailed<<std::endl;
break;
}
if(id==0)
{
std::cout<<getpid()<<std::endl;
if(fds.size()!=0) //处理不必要的管道
{
for(auto&e:fds)
{
close(e);
std::cout<<e<<' ';
}
}
AssignTask(pip[0]);
exit(0);
}
close(pip[0]);
Channels.push_back(Channel(pip[1],id,name));
}
}
void KillAll()
{
while(Channels.size())
{
Channel& tmp=Channels.back();
tmp.CloseChannl();
std::cout<<tmp.name()<<" exit..."<<std::endl;
Channels.pop_back();
}
}
void Wait()
{
for(auto&e:Channels)
{
int status;
if(e.getid()==waitpid(e.getid(),&status,0))
{
std::cout<<e.name()<<" pid:"<<e.getid()<<" exit_code:"<<status<<std::endl;
}
}
}
④代码一览
#include "task.hpp"
//使用说明
void Usage(char *ExeFileName)
{
std::cout<<"Usage:"<<std::endl;
std::cout<<'\t'<<ExeFileName<<" : "<<"number (You should give a positive number)"<<std::endl;
}
//创建一个管道类
//该类用于描述一对父子进程之间的关系
//变量说明:
//_wfd 用来控制管道,当写端关闭后,管道内容为空,写端自动关闭,也可以由父进程向子进程发送数据
//_childid 子进程id值,等待子进程退出,子进程退出时,唯一标识
//_name 用于调试,无实际意义
class Channel
{
private:
int _wfd;
int _childid;
std::string _name;
public:
Channel(int wfd,int childid,std::string& name):_wfd(wfd),_childid(childid),_name(name){}
int getfd()
{
return _wfd;
}
int getid()
{
return _childid;
}
std::string& name()
{
return _name;
}
void CloseChannl()
{
close(_wfd);
}
};
class ProcessPool
{
private:
int _ProcessNum;
std::vector<Channel> Channels;
public:
ProcessPool(int ProcessNum=1):_ProcessNum(ProcessNum){}
void CreatePool()
{
std::vector<int> fds;
for(int i=0;i<_ProcessNum;i++)
{
int pip[2];
pipe(pip);
int task_index=NextTask();
write(pip[1],&task_index,sizeof(int));
std::string name = "process-"+std::to_string(i);
fds.push_back(pip[1]);
int id=fork();
if(id<0)
{
std::cout<<PipFailed<<std::endl;
break;
}
if(id==0)
{
std::cout<<getpid()<<std::endl;
if(fds.size()!=0) //处理不必要的管道
{
for(auto&e:fds)
{
close(e);
std::cout<<e<<' ';
}
}
AssignTask(pip[0]);
exit(0);
}
close(pip[0]);
Channels.push_back(Channel(pip[1],id,name));
}
}
void KillAll()
{
while(Channels.size())
{
Channel& tmp=Channels.back();
tmp.CloseChannl();
std::cout<<tmp.name()<<" exit..."<<std::endl;
Channels.pop_back();
}
}
void Wait()
{
for(auto&e:Channels)
{
int status;
if(e.getid()==waitpid(e.getid(),&status,0))
{
std::cout<<e.name()<<" pid:"<<e.getid()<<" exit_code:"<<status<<std::endl;
}
}
}
~ProcessPool(){}
ProcessPool& operator=(ProcessPool&) = delete;
};
int main(int argc,char * argv[])
{
int process_num=1;
if(argc!=2||atoi(argv[1])<=0)
{
Usage(argv[0]);
exit(1);
}
process_num=atoi(argv[1]);
ProcessPool *Pool=new ProcessPool(process_num);
Pool->CreatePool();
Pool->Wait();
Pool->KillAll();
return 0;
}