目录
一、前言
二、进程间通信的目的
三、进程通信的方法
四、管道 通信
1、进程如何通信
2、管道概念
3、匿名管道
1)理解
2)匿名管道的创建
3)匿名管道用途——控制进程
4)匿名管道对多个进程的控制
5)总结
4、命名管道
1)命名管道的创建
2)命名管道的使用
一、前言
在我们学习进程的时候,我们知道正是因为程序地址空间的存在,所以进程之间具有独立性,他们互不影响,但是在我们的实际应用中,进程之间总会有需要通信的时候,那么这个时候的程序地址空间就是进程间通信的一个阻碍了,那么此时该怎么办呢?
二、进程间通信的目的
- 数据传输:一个进程需要将它的数据发给另一个进程。
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个进程或一组进程发送消息,通知它(它们)发生了某种事件。
- 进程控制:有些进程希望完全控制另一个进程的执行,此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态的转变。比如我们在程序调试代码的时候就是一个进程完全控制了另一个进程。
三、进程通信的方法
Linux为用户提供了三种进程间通信的方法
- pipe管道通信,比如我们在进程一篇中用到的命令 ‘|’ 就是使用管道,该命令是和其他命令组合起来使用的。如搭配 grep 文本內容过滤工具使用。" ps -ajx | grep lrk "该命令就是将ps进程执行的数据通过管道传输给了 grep,才能筛选出指定的內容。管道一般是用于本地进程之间的数据传输。其又分为 匿名管道 和 命名管道
- System V进程通信,其是一套进程通信的标准,可以为操作系统提供进程通信的接口。
- POSIX进程通信,也是一套进程通信的标准,可以为操作系统提供进程通信的接口。
四、管道 通信
1、进程如何通信
我们知道进程的程序地址空间决定了进程之间的独立性,这就给进程之间的通信造成了极大的困难,但是我们思考一下通信的本质是什么,即就是一个进程向另一个进程传递数据,而我们的进程始终是在操作系统内运行着的,那么进程是不是可以通过操作系统中的资源进行通信呢?
就像一个进程向同一份文件中写入数据,另一个进程去该文件中读取。这样的情况下,我们就通过访问同一个资源达成了数据传输的目的。就像我们看到的谍战剧中的特务之间的“通信”一样,两个特务不能直接通信,他们就会共同约定一个地方,一个特务将需要传递的信息放在该地方,另一个特务来取并获取信息内容。
那也就是说,进程之间的通信的前提其实是不同的进程需要先能够看到、能够获取到同一份资源(文件、内存等)。该资源的种类其实就决定了进程通信的方式。
2、管道概念
管道是Unix中最古老的进程间通信的方式了。管道顾名思义,就是类比于生活中的管道才得名的。只不过生活中的管道输送的是谁、天然气等现实生活中的资源,而系统中的管道则是传输数据的,是一个进程链接到另一个进程的数据流。
事实上,管道就是一个被打开的文件,但是这个文件很特殊,向这个文件内写入的数据实际上并不会放入磁盘中,管道是在内存中实现的,并由操作系统的内核管理。当一个进程向管道写入数据时,这些数据被存储在内核中的缓冲区;然后,另一个进程可以从同一管道读取这些数据。一旦数据被读取,它们就从缓冲区中移除,符合现实中的管道特征:只传输资源,不存储资源。且需要注意的是管道是单项传输的。
在我们介绍文件那一篇文章中讲到过,操作系统会为每一个创建成功的进程维护一个task_struct结构体,该结构体中包含了指向文件描述符表的指针,文件描述符表中有一个数组,存储着指向被打开文件的指针,被打开的文件都会有一个结构体维护(struct file
)其包含了文件的所有属性以及操作该文件的方法,例如读、写、释放等。此外,struct file
也包含了一个指向 struct address_space
的指针,定义了文件的页缓存和其他内存映射信息。
该缓冲区信息中,包含着描述文件的inode结构体,在该结构体中还描述着一个联合体
其中pipe表示的是管道文件;dbev表示块设备(磁盘)文件;cdev字符设备文件(键盘等),通过文件的inode,系统就可以辨别出来打开的文件是什么类型。
综上所述,两个进程在使用管道通信的时候,其中一个进程如果以只写的方式打开管道,那么另一个进程就只能通过只读方式打开(单向性),也可以反过来,只不过管道的两端必须是不同的打开方式。
管道分为两种:匿名管道和命名管道 ,其实就是根据两种管道所打开的方式不同而做的分类。
3、匿名管道
1)理解
匿名管道看名字来说,这个管道没有特定的名称,即在创建它的时候,不会指定打开文件的文件名、文件路径等,不会创建实际额文件在文件系统中,存粹存在于内存中,由操作系统内核进行管理。用于进程间通信。
虽然管道本身不是文件系统中的文件,但是它利用了内核提供的缓冲机制来存储临时数据,确保即使读写两端的操作不完全同步也能正确传递信息。
而由于匿名管道是非明确目标的文件,对于两个毫不相关的进程是无法一起找到这个管道文件的,也就是说只有具有“血缘”关系的进程才能使用匿名管道进行通信。下面就图解一下父子进程如何创建匿名管道进行通信。
1、首先父进程分别以只读和只写打开该管道文件:
2、接着父进程创建子进程,子进程会继承父进程的文件打开方式:
3、接下来父进程关闭读端,子进程关闭它的写端,父进程只负责往管道文件里面写入,子进程读取就行了:
这样就创建了一个匿名管道。
1、为什么父进程需要以两种方式打开管道文件呢?不能以想要的方式打开管道文件然后子进程再以它想要的方式打开文件不就行了?
这种太过于麻烦了,子进程在创建的时候会自动继承父进程的文件打开方式,这时候我们只需要各自关闭一个文件即可。
2、父进程必须关闭读端,子进程必须关闭写端吗?
不一定,看自己的需要了,如果需要父进程读取数据,子进程写入数据,那就反过来就行。
3、进程是如何知道管道文件被打开了几次的?
有一个计数器
2)匿名管道的创建
创建匿名管道有一个系统调用:
pipe系统调用的作用是创建一个管道文件,如果创建成功则返回0,否则返回-1,并设置erron。其参数是一个 输出型参数 ,可以看到这个输出型参数是一个大小为2的数组,在pipe系统调用成功之后,该数组内会存储两个元素:
- pipe[0]:存储的是以只读方式打开管道时获取的fd
- pipe[1]:存储的是以只写方式打开管道时获取的fd
之后就可以根据需求选择关闭父子进程的端口。
运行结果如下:
其实在我们运行程序的时候,我们看到的现象并不是子进程一下就将父进程写入的所有內容全部读出来,也不是我们所想的那样,死循环读取管道文件中的內容。而是跟随着父进程的节奏,父进程隔一秒写入一个,子进程也是隔一秒读取一个。这是为什么呢?
事实上,pipe文件具有访问控制机制,必须先写入才能读取。父子进程在对管道文件进行读写操作是阻塞式I\O,即管道文件中必须先有数据,读取端才能去读取,否则调用read时就会发生阻塞,知道管道中有数据;同样,如果管道中被写满了数据,此时再调用write也会发生阻塞,直到管道中有足够的空间来写入。其次我们在介绍管道的基本概念的时候也说到过,一旦管道中的数据被读取,这些数据就会立马被清除,这也符合我们对管道的认知。
3)匿名管道用途——控制进程
下面实现一个简单的利用管道控制进程的例子,即简单实现一个由父进程利用管道分派任务信息,子进程接收信息去执行该任务的程序。
#include<iostream>
#include<unistd.h>
#include<ctime>
#include<cstring>
#include<vector>
#include<unordered_map>
#include<sys/wait.h>
#include<sys/types.h>
#include<string>
#include<cassert>
using std::cout;
using std::endl;
using std::cerr;
using std::vector;
using std::string;
using std::unordered_map;
typedef void(*function)();//定义函数指针
vector<function> functions;//创建函数指针数组用来存放函数指针
unordered_map<uint32_t,string> info;
void f1(){
cout<<"This is a dairy task!,process pid::["<<getpid()<<"]"<<"time: ["<<time(nullptr)<<"]\n"<<endl;
}
void f2(){
cout<<"This is a data task!,process pid::["<<getpid()<<"]"<<"time: ["<<time(nullptr)<<"]\n"<<endl;
}
void f3(){
cout<<"This is a internet task!,process pid::["<<getpid()<<"]"<<"time: ["<<time(nullptr)<<"]\n"<<endl;
}
void loadFunc(){
info.insert({functions.size(),"Deal dairy"});
functions.push_back(f1);
info.insert({functions.size(),"Deal data"});
functions.push_back(f2);
info.insert({functions.size(),"Deal Internet"});
functions.push_back(f3);
}
int main(){
loadFunc();
int pipefd[2];
int ret=pipe(pipefd);
if(ret!=0){
cerr<<"pipe error!"<<endl;
return 1;
}
pid_t id = fork();
if(id<0){
cerr<<"fork error!"<<endl;
return 2;
}
else if(id==0){
close(pipefd[1]);
while(true){
uint32_t operatorType=0;
ssize_t ret=read(pipefd[0],&operatorType,sizeof(uint32_t));
if(ret==0){
cout<<"Parent process has finished task distribution!\n"<<endl;
break;
}
assert(ret=sizeof(uint32_t));
(void)ret;
if(operatorType <functions.size()){
functions[operatorType]();
}
else{
cout<<"BUG,operatorType::"<<operatorType<<endl;
}
}
close(pipefd[1]);
exit(0);
}
else{
srand((long long)time(nullptr));
close(pipefd[0]);
int num=functions.size();
int cnt=1;
while(cnt<=10){
uint32_t commandCode=rand()%num;
cout<<"Parent process has distributed::"<<info[commandCode]<<"Time ::"<<cnt<<endl;
cnt++;
write(pipefd[1],&commandCode,sizeof(uint32_t));
sleep(1);
}
close(pipefd[1]);
pid_t result=waitpid(id,nullptr,0);
if(result){
cout<<"Wating success!"<<endl;
}
}
return 0;
}
下面对代码做出解释:
这部分首先定义了一个任务列表:functions 和用来存储任务信息的哈希表 info,functions里存储的是函数指针,下标对应的就是任务号。info用来存储任务信息,键值对的first存储任务号,second存储任务信息。接着是三个表示任务的函数,后面调用该函数表示执行该任务。
最后将任务加载到任务列表中。
对于父进程来说,关闭读取端,然后向向子进程随机派发任务列表中的任务,即取随机值发射。对于子进程来说是要从管道中读取的。
运行结果:
4)匿名管道对多个进程的控制
上面我们看到了只由一个父进程通过管道文件派发任务控制一个子进程的例子,那我们当然也可以通过对多个子进程派发任务来控制多个子进程。此时一个匿名管道就不够用了,我们需要多个匿名管道,既然需要创建多个匿名管道,那么我们就需要让父进程知道不同的子进程对应的不同的管道的写端。即我们需要让父进程知道所要派发任务的子进程的匿名管道的写端。
如下:
#include <iostream>
#include <unistd.h>
#include <ctime>
#include <cstring>
#include <string>
#include <vector>
#include <unordered_map>
#include <sys/wait.h>
#include <sys/types.h>
#include <cassert>
using std::cout;
using std::endl;
using std::cerr;
using std::vector;
using std::string;
using std::unordered_map;
using std::pair;
typedef void (*functor)(); // typedef 函数指针为 functor
vector<functor> functors; // 创建函数指针数组, 用来存储函数指针
unordered_map<uint32_t, string> info; // 用来存储 functors 对应元素存储的任务的信息
typedef pair<pid_t, int> elem; // elem用来存储 子进程pid 以及对应管道的写入端fd
// first 存储子进程pid, second 存储对应管道写端fd
// 只用函数举例, 不实现具体功能
void f1() {
cout << "这是一个处理日志的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
//
}
void f2() {
cout << "这是一个备份数据任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void f3() {
cout << "这是一个处理网络连接的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void loadFunctor() {
info.insert({functors.size(), "处理日志"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接"});
functors.push_back(f3);
}
void childProcWork(int readFd) {
sleep(1);
cout << "进程 [" << getpid() << "] 开始工作" << endl;
while (true) {
uint32_t operatorType = 0;
ssize_t ret = read(readFd, &operatorType, sizeof(uint32_t));
if(ret == 0) {
cout << "父进程任务派完了, 我要走了……" << endl;
break;
}
assert(ret == sizeof(uint32_t));
(void)ret;
if (operatorType < functors.size()) {
functors[operatorType]();
}
else {
cout << "BUG ? operatorType:: " << operatorType << endl;
}
}
cout << "进程 [" << getpid() << "] 结束工作" << endl;
}
void blanceAssignWork(const vector<elem> &processFds) {
srand((long long)time(nullptr)); // 设置随机数种子
// 随机对子进程 随机分配任务 num 次
int cnt = 0;
int num = 15;
while (cnt < num) {
sleep(1);
// 随机选择子进程
uint32_t pickProc = rand() % processFds.size();
// 随机选择任务
uint32_t pickWork = rand() % functors.size();
write(processFds[pickProc].second, &pickWork, sizeof(uint32_t));
cout << "父进程给进程: " << processFds[pickProc].first << " 派发任务->" << info[pickWork] <<
", 对应管道写端fd: " << pickProc << ", 第 " << cnt << " 次派发" << endl;
cnt--;
}
}
int main() {
// 0. 加载任务列表
loadFunctor();
// 循环创建5个子进程以及对应的管道
vector<elem> assignMap; // 子进程pid与对应管道的fd记录
int processNum = 5;
for(int i = 0; i < processNum; i++) {
int pipeFd[2] = {0};
if(pipe(pipeFd) != 0) {
cerr << "第 " << i << " 次, pipe 错误" << endl;
}
pid_t id = fork();
if(id == 0) {
// 子进程执行代码
close(pipeFd[1]);
childProcWork(pipeFd[0]); // 子进程功能具体函数
close(pipeFd[0]);
exit(0);
}
// 因为在if(id == 0) 的最后, 执行了 exit(0); 所以子进程不会跳出 if(id == 0) 的内部
// 所以下面都为父进程执行的代码
// 父进程执行代码
close(pipeFd[0]);
assignMap.push_back(elem(id, pipeFd[1]));
// elem(id, pipeFd[1]) 创建pair<uint32_t, uint32_t> 匿名对象, 存储 此次创建子进程pid 和 打开管道的写端fd
// 并存入 vector 中
}
cout << "创建子进程完毕" << endl;
cout << "父进程, 开始随机给子进程 随机派发任务\n" << endl;
sleep(1);
blanceAssignWork(assignMap); // 父进程派发任务函数
// 回收所有子进程
for(int i = 0; i < processNum; i++)
close(assignMap[i].second);
for(int i = 0; i < processNum; i++) {
if(waitpid(assignMap[i].first, nullptr, 0)) {
cout << "等待子进程_pid: " << assignMap[i].first << ", 等待成功. Number: " << i << endl;
}
}
return 0;
}
运行结果如下:
对比上一节所看到的父进程的匿名管道控制单进程,控制多进程本质上来说并没有什么变化,只不过需要记录子进程的pid和对应的匿名管道的端口。
查看当前进程:
我们看到打开的所有子进程有着相同的ppid,因为是一个父进程创建出来的,我们称这些进程为兄弟进程,即就是具有“血缘”关系的进程,也就是说,只要兄弟进程知道其他管道写入端的fd,就可以实现兄弟进程间的相互通信。
Tips:我们在使用 “ | ”时候,其实就是兄弟进程之间的通信。
5)总结
- 匿名管道 的生命周期 取决于什么时候彻底关闭管道文件(即pipe文件的打开计数为0)
- 匿名管道 是面向字节流的
- 匿名管道 自带同步机制(pipe满, 则writer阻塞; pipe空, 则reader阻塞), 即自带访问控制机制
- 匿名管道 只能单向通信, 是根据管道的特点专门设计成这样的. 是半双工通信的特殊情况
- 匿名管道 只能用于 具有血缘关系的进程之间的通信: 父子、兄弟
4、命名管道
1)命名管道的创建
1、命令行创建
示例:
2、系统调用
其作用是创建一个先进先出的文件的特殊文件(一个命名管道),有两个参数,第一个参数是创建文件的路径及文件名,第二个参数是创建文件的权限。
2)命名管道的使用
命名管道的使用并不难理解,只需要一个进程以只写方式打开文件后向管道中写入数据,一个进程以只读方式打开文件后从管道中读取数据就可以了。
下面举个例子,进程一从命令行中接收用户的消息,并写入命名管道中,进程二从管道中读取数据并输出到命令行中。
//common.h
#pragma once
#include<iostream>
#include<cstdio>
#include<cstring>
#include<cerrno>
#include<sys/wait.h>
#include<sys/types.h>
#include<fcntl.h>
#include<unistd.h>
#include<sys/stat.h>
#define IPC_PATH "./.fifo"//定义宏,指定一个命名管道(FIFO)的路径,该命名管道将位于当前工作目录下,并且文件名为.fifo
//serverFifo.cpp
#include "common.h"
using std::cout;
using std::endl;
using std::cerr;
int main() {
umask(0);
if(mkfifo(IPC_PATH, 0666) != 0) {
cerr << "mkfifo error" << endl;
return 1;
}
int pipeFd = open(IPC_PATH, O_RDONLY);
if(pipeFd < 0) {
cerr << "open error" << endl;
return 2;
}
cout << "fifo file has open!" << endl;
char buffer[1024];
while (true) {
ssize_t ret = read(pipeFd, buffer, sizeof(buffer)-1);
if (ret == 0) {
cout << "\nclient has exit!";
break;
}
else if (ret > 0) {
cout << "client > server # " << buffer << endl;
}
else {
cout << "read error: " << strerror(errno) << endl;
break;
}
}
close(pipeFd);
cout << "\nserver exit……" << endl;
unlink(IPC_PATH);//当不在需要命名管道时,需要清理它
return 0;
}
//clientFifo.cpp
#include"common.h"
using std::cout;
using std::endl;
using std::cerr;
int main()
{
int pipefd=open(IPC_PATH,O_WRONLY);
if(pipefd<0)
{
cerr<<"open fifo error"<<endl;
return 1;
}
char serbf[1024];
while(true)
{
printf("please input message:");
fflush(stdout);
memset(serbf,0,sizeof(serbf));
if(fgets(serbf,sizeof(serbf),stdin)!=nullptr)
{
serbf[strlen(serbf)-1]='\0';
write(pipefd,serbf,strlen(serbf));
}
else{
break;
}
}
close(pipefd);
cout<<"client exit!"<<endl;
return 0;
}
//makefile
.PHONY:all
all:clientFifo serverFifo
clientFifo:clientFifo.cpp
g++ -std=c++11 $^ -o $@
serverFifo:serverFifo.cpp
g++ -std=c++11 $^ -o $@
.PHONY:clean
clean:
rm -f clientFifo serverFifo .fifo
运行结果:
这样就完成了一个简单的命名管道的使用。