文章目录
- 进程通信的意义
- 匿名管道
- 通信原理
- 管道的访问控制
- 进程控制
- 管道的特点
- 命名管道
进程通信的意义
之前聊进程时,讲过一个性质,即进程具有独立性,两个进程之间的交互频率是比较少的。就连父子进程也只是共享代码,修改父子进程中的任意进程的数据时,会触发写时拷贝机制,即分配一块新的空间存储数据,使父子进程做到数据分离,互不影响,父子进程之间唯一的信息交互可能就是父进程接收子进程的退出码了。所以由于进程的独立性,进程之间进行数据通信的成本较高。但是进程通信有很多的意义
1.实现数据之间的传输,进程可以发送数据给其他进程
2.实现资源的共享,多个进程共享彼此的数据
3.信息的通知,一个进程向另一个进程发送通知,告知某种特殊事件的发生
4.控制进程,一个进程可以完全控制另一个进程
进程间通信的应用很多,所以虽然它的成本高,但实现进程间通信还是很有必要的
匿名管道
通信原理
可以想象一下,两个进程要进行通信,传输数据,需要用什么方式。比如说,一个进程打开一个文件,向文件写入数据,此时的数据在内存中的缓冲区上,系统刷新缓冲区将数据写入磁盘,另一个进程也打开这个文件,系统将文件从磁盘加载到内存,使进程读取文件中的前一个进程写入的内容,这样两个进程就做到了通信。但是将外设作为通信载体会导致通信的速度非常慢,这无疑又提高了进程通信的成本,所以Linux系统中有一种特殊的文件:管道文件(操作系统将文件描述为struct_file,该结构体中有一个联合体,联合体有三个成员,分别用来表征文件类型是磁盘,管道,还是字符),管道文件不存储在磁盘上,而是存储在内存中,系统检测到文件类型为管道文件,就不用将文件数据刷新到磁盘上。用管道文件作为进程通信的载体在很大程度上提高了通信速度。
管道的最大特点是:单向传输数据,即无论何时,只能向一端输入,往另一端输出。使用管道进行进程间通信的基本步骤为
1.在父进程中以读和写的方式打开同一个文件
2.创建子进程
3.关闭父子进程中文件的读端或写端,使一个进程负责写入数据,一个进程负责读取数据
进程描述文件的结构体为struct files_struct,该结构体中有一个类型为struct file* 的fd_array数组,保存了进程打开的文件。父进程分别以读和写的方式打开一个文件(由于子进程的继承,子进程就不用再打开文件),对于父进程来说,有两个fd被存储到了fd_array数组中,其指向了进程打开的文件,创建子进程时,由于父子进程的数据以写时拷贝的方式共享,fd_array数组作为父进程的数据,子进程会继承同样的数组,而父进程打开的文件不属于进程,子进程不会继承打开的文件,所以子进程只以读和写的方式指向了父进程打开的文件,此时有四个fd(父进程的读和写,子进程的读和写)指向了打开的文件。形成管道的最后一步是关闭两个fd,使父子进程满足一个进程写入数据,一个进程读取数据的特点,形成管道。
我们要怎么打开一个管道文件?实际上系统提供了pipe接口,可以以读和写的方式打开一个管道文件
pipefd作为一个输出型参数,pipe函数打开一个管道文件后,将以读方式打开的文件fd写入数组的第一个元素,将以写的方式打开的文件fd写入数组的第二个元素。所以pipefd[0]为读端,pipefd[1]为写端。该函数返回一个int值,如果调用成功返回0,失败返回-1。
管道的访问控制
使用pipe函数写一段代码
#include <iostream>
#include <unistd.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <sys/types.h>
using namespace std;
int main()
{
int pipefd[2] = {0}; // 接收pipe打开的管道文件
int ret = pipe(pipefd); // 创建管道
if (ret != 0)
{
cout << "pipe函数调用失败" << endl;
return 1;
}
else
{
// pipe succeed
// 父进程写,子进程读
int id = fork(); // 子进程的创建
if (id == 0)
{
// child,reader
close(pipefd[1]); // 写端关闭
char readBuffer[512] = {0};
while (1) // 死循环读取管道
{
memset(readBuffer, 0, sizeof(readBuffer)); // 每次读取清空缓冲区
ssize_t s = read(pipefd[0], readBuffer, sizeof(readBuffer) - 1);
readBuffer[s] = '\0'; // 向缓冲区最后添加结束符
if (s > 0)
{
cout << "子进程读取到信息:" << readBuffer;
}
// 如果管道中没有数据,子进程不再读,退出进程
else if (s == 0)
{
cout << "父进程退出,不再写入数据" << endl;
close(pipefd[0]);
break;
}
else
{
cout << "读取文件失败" << endl;
return 2;
}
}
exit(1);
}
else if (id > 0)
{
// parent,writer
// 关闭文件的读端
close(pipefd[0]);
char writeBuffer[128];
int cnt = 5;
while (cnt)
{
char tmp[] = "这是父进程,在与子进程进行通信";
unsigned int t = (unsigned int)time(NULL);
sprintf(writeBuffer, "%s time:%d\n", tmp, t);
write(pipefd[1], writeBuffer, sizeof(writeBuffer));
sleep(1); // 父进程每1秒写一次数据
cnt--;
}
// 父进程关闭管道,此时子进程read函数将返回0
close(pipefd[1]);
}
else
{
cout << "创建子进程失败" << endl;
return 2;
}
// 子进程退出后,父进程等待子进程
pid_t res = waitpid(id, NULL, 0);
if (res > 0)
{
cout << "父进程等待子进程成功" << endl;
}
}
return 0;
}
(pipe创建管道后,由当前进程创建子进程,子进程关闭写端,父进程关闭读端,所以父子进程中,子进程用来读取管道文件,父进程用来向管道文件写入。sleep函数使父进程写入的频率为1秒,子进程不断的读取管道中的数据,当管道中没有数据(父进程不再写入时),子进程退出,最后父进程等待子进程,回收其资源)
通过运行结果可以看到,父进程每隔一秒写入一次数据(时间戳相差1),子进程准确的读取了这些数据,父进程不再写入时,子进程退出,并且被父进程回收。修改上面的代码,使父进程的写入频率为3秒,观察子进程运行的结果
可以看到子进程还是成功的读取了这些数据,无论父进程写入管道的时间间隔为多少。那么子进程读取完管道的数据,在父进程下一次写入数据前,子进程是什么状态?
如果父子进程打开一个普通文件,并向其写入数据,父子进程的写入是没有先后顺序的,无法确定谁先向文件中写入,这样的写入是无序且混乱的。但是在管道的数据通信中,父子进程的读写顺序是有序的,通过上面出现的运行结果,可以推测:如果管道中没有数据,reader就会阻塞(进入阻塞状态,进入文件的等待队列),等待数据的写入,如果管道被写满了,writer也会阻塞,等待数据被读取。
修改代码,父进程不断的写管道写入数据,子进程调用sleep休眠100秒,不再读取数据,父进程写入数据时会向屏幕打印写入的次数,运行程序,父进程向管道写入513次后就阻塞了,因为子进程没有读取管道的数据,管道被父进程写满了,此时的父进程不再写入,而是等待管道的数据被读取。
所以,在管道文件中,进程的读写顺序是遵循一种访问机制的,我们把这种访问机制叫做同步和互斥机制。这种机制可以保证管道的每次读和写都是有效的,不会出现重复的读取或者多次写入覆盖之前数据的情况。
进程控制
利用管道可以实现一个进程控制多个进程的效果。
刚才的一份代码可以实现父进程向管道写入数据,子进程读取数据,如果父进程写入的不再是数据,而是需要子进程执行的方法呢?子进程就可以获取这个方法并执行该方法,也就是说在进程间通信的数据,可以是普通的文本,也可以是一个方法,还可以是其他数据,我们的目的是让子进程得到父进程传递的数据,这也是进程通信的本质:使不同进程看到同一份数据。如果传递的数据是一个方法,我们只需要修改代码,让子进程执行此方法就能做到进程控制。
而管道作为进程间通信的载体,父进程向子进程传递数据就成了向管道传递数据,只要子进程能从管道中读取数据,进程间通信就能实现。所以进程间通信的重点在于进程与管道间的数据传输。
如果需要通信的对象是多个子进程,也就需要父进程向多个管道中写数据。其大致过程为:首先要有需要让子进程执行的任务,也就是函数,函数以函数指针的方式存储在一个数组中。假设父进程需要与5个子进程进行通信,这就意味着需要使用pipe函数创建5个管道,采用for循环,循环5次,每次循环pipe函数都会打开一个管道,管道打开后创建子进程,关闭子进程的写端与父进程的读端,接着让子进程进入阻塞,等待管道被父进程写入数据,当数据被写入时,分析这些数据得到需要执行的任务并执行。父进程需要向子进程分发任务,由于现在在使用for循环创建管道,通信的对象(子进程)还没有完全创建好,所以此时不能直接向子进程分发任务,需要记录子进程的进程pid(用于父进程等待子进程,回收其资源)与管道文件的fd写端(用于父进程向管道的写入),在管道创建完成后(for循环之后),根据保存的fd随机向5个管道中的一个管道写入数据,也就是派发任务,当父进程不再需要向子进程派发任务,关闭管道文件的写端,使子进程不再阻塞等待数据被写入管道,而直接退出进程,最后由父进程回收子进程资源。
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <ctime>
#include <cstring>
#include <unistd.h>
#include <stdio.h>
#include <vector>
#include <map>
#include <cstdlib>
#include <sys/wait.h>
#include <sys/types.h>
#include <string>
using namespace std;
typedef void (*function)();
vector<function> functions; // 全局的方法集,存储需要执行的任务
map<uint32_t, string> information; // 存储关于方法的信息
void function1()
{
cout << "本次执行function1任务,执行本次任务的进程id[ " << getpid() << " ]" << endl;
}
void function2()
{
cout << "本次执行function2任务,执行本次任务的进程id[ " << getpid() << " ]" << endl;
}
void function3()
{
cout << "本次执行function3任务,执行本次任务的进程id[ " << getpid() << " ]" << endl;
}
void loadFunctions() // 方法集的加载
{
information.insert({functions.size(), "function1"});
functions.push_back(function1);
information.insert({functions.size(), "function2"});
functions.push_back(function2);
information.insert({functions.size(), "function2"});
functions.push_back(function3);
}
void work(uint32_t readfd)
{
cout << "进程[" << getpid() << "]" << " 开始工作" << endl;
while (1) // 不断的读取管道数据
{
int task_id;
memset(&task_id, -1, sizeof(task_id));
int ret = read(readfd, &task_id, sizeof(task_id)); // 读取任务在方法集中的task_id
if (ret == 0) // 管道中没有数据
{
cout << "由于父进程不再写入,子进程[ " << getpid() << "]不再读取" << endl;
sleep(1);
break;
}
if (ret != sizeof(task_id))
{
cout << "读取方法出错" << endl;
return;
}
if (task_id >= 0 && task_id < functions.size()) // 获取的方法在方法集中
{
sleep(0.1);
functions[task_id](); // 执行该方法
}
else
{
cout << "no function" << endl;
return;
}
}
cout << "进程[" << getpid() << "]" << " 结束工作" << endl;
}
// [pid, fd]
typedef pair<uint32_t, uint32_t> elem;
void sendTask(const vector<elem> &task_vector)
{
// 随机派发任务,做到负载均衡
srand((unsigned int)time(nullptr));
// 派发任务个数为5个
int cnt = 5;
while (cnt)
{
sleep(1);
// 随机挑选一个子进程
int process = rand() % task_vector.size();
// 随机挑选一个任务
int task_id = rand() % functions.size();
// 任务的派发
write(task_vector[process].second, &task_id, sizeof(task_id));
cout << "父进程派发任务" << information[task_id] << "给pid[ " << task_vector[process].first << " ]的子进程" << endl;
cnt--;
}
}
int main()
{
// 任务表,记录子进程的pid和对应的fd写端文件
vector<elem> task_vector;
loadFunctions();
int processNum = 5; // 子进程个数
for (int i = 0; i < processNum; i++)
{
// 管道的创建
int pipefd[2] = {0};
ssize_t ret = pipe(pipefd);
if (ret != 0) // 管道创建失败
{
cout << "pipe fail" << endl;
return 1;
}
// 子进程的创建
int id = fork();
if (id == 0)
{
// child,reader
close(pipefd[1]);
// 子进程接收任务,执行
work(pipefd[0]);
close(pipefd[0]);
exit(0); // 子进程退出
}
// parent,writer
close(pipefd[0]);
elem e = {id, pipefd[1]};
task_vector.push_back(e); // 每循环一次,就记录子进程的pid和fd写端在任务表上
}
sleep(1);
cout << "create all process success!" << endl;
sendTask(task_vector); // 派发任务
// 回收子进程
for (int i = task_vector.size() - 1; i >= 0; i--)
{
// 关闭管道的写端,使子进程不再阻塞等待管道的写入,直接退出
close(task_vector[i].second);
if (waitpid(task_vector[i].first, nullptr, 0) > 0)
{
cout << "等待子进程[ " << task_vector[i].first << " ]成功" << endl;
}
sleep(1);
}
return 0;
}
代码较长,建议从main函数开始看起,理解父进程派发任务的大致过程然后再理解其中的函数(loadFunctions,work,sendTask),整份代码中的cout只是为了更好的观察代码运行逻辑。
运行程序,父进程成功地派发了5个任务给子进程,并且任务与进程都是随机选择的,不会使一个进程执行多次任务,最后关闭管道的写端后,子进程成功地退出并且被父进程回收了。对于这份代码,我认为的重点是:关闭管道的所有写端,子进程调用read函数读取管道数据,read会返回0,这里需要进行判断,read返回0说明父进程不再写入,子进程就可以退出了。以及创建管道与子进程时,用task_vector任务表记录子进程的pid和fd写端,任务表在这里的使用是一个重点,使用任务表才能向子进程派发任务以及回收子进程的资源。
在调试上面的模拟代码时,我还遇到了一个难搞的问题,就是最后回收子进程时,为什么要从任务表的最后开始关闭父进程的写端并回收子进程资源?这小小的细节其实折磨了我很久
当父进程第一次pipe创建管道,fork创建子进程并关闭子进程的写端与自己的读端后,就创建了一个管道与子进程进行通信,暂时将该子进程称为1号子进程,通信的管道称为1号管道。当父进程第二次pipe创建管道,fork创建子进程并关闭子进程的写端与自己的读端后,就能通过2号管道与2号子进程进行通信,这是很容易理解的情况。但是在父进程第二次fork之前,因为需要与1号子进程进行通信,所以有一个1号管道的写端文件,第二次fork创建子进程,子进程就会继承父进程打开的1号管道的写端文件,所以对于1号管道,除了父进程可以向其写入数据,2号子进程也能向其写入数据,因此1号管道文件有两个写端。
只有当1号管道的写端文件为0时,表示没有进程向管道写入数据了,1号子进程的read才会返回0,返回0后1号子进程才会退出,最后由父进程回收其资源。如果在回收资源时,从任务表的第一个元素开始回收,即回收1号子进程,由于只有父进程关闭了1号管道的写端文件,还有2号子进程打开了1号管道的写端文件,1号管道的写端文件不为0,所以1号子进程的read不会返回0,1号子进程也就无法退出,回收子进程的waitpid就会阻塞等待 1号子进程的退出,但该子进程永远不会退出,所以waitpid就会一直阻塞,表现在程序上就是程序卡住。
而从任务表的最后开始回收子进程就不会有上述问题,假设现在只有2个管道,先关闭父进程对2号管道的写端,2号管道的写端文件数量为0,2号子进程的read返回0,2号子进程退出,由于2号子进程还打开了一个1号管道的写端文件,所以2号子进程退出时,系统会释放该文件,这样的话只有父进程打开了1号管道的写端文件,下次回收1号子进程时,关闭父进程对1号管道的写端后,1号管道的写端文件数量为0,1号子进程也能成功退出。将2个管道的情况推广,当父进程与n个子进程进行管道通信,从任务表的最后关闭父进程的写端并回收子进程资源的方式依然适用。
管道的特点
匿名管道只能用于具有血缘关系的进程间,常用于父子进程间通信
管道的数据传输方向是单向的,这是由内核决定的
管道自带访问控制机制(同步互斥机制)
管道的生命周期是跟随进程的,当最后一个打开管道的进程退出,系统释放进程的资源,管道的资源也随之被释放
管道是面向字节流的,数据的传输以字节为单位
刚才聊的管道其实叫做匿名管道,在shell命令行中也有匿名管道的存在,使用竖线“|”就能调用命令行中的匿名管道。在一个会话中输入sleep 20000 | sleep 10000就调用了匿名管道,在另一个会话中输入ps axj指令过滤出含有sleep的进程信息,观察信息可以发现sleep 20000 | sleep 10000这行命令调用了两个进程,并且它们的父进程是相同的,所以这是两个兄弟进程。与我们测试匿名管道的代码不同,命令行中的管道不是父子进程间通信,而是两个兄弟进程间通信,大概实现就是父进程fork两次创建了两个子进程,接着父进程关闭管道的读端与写端,两个子进程各自关闭读端和写端,使一个进程向管道写入数据,一个进程读取管道的数据。并且命令行中的管道还有一个特点,就是前一个进程的输出是后一个进程的输入,所以系统会将前一个进程的输出重定向到后一个进程的输入。
匿名管道作为管道的一种,其特征是只能用于具有血缘关系的进程间通信,要使任意进程进行通信,就不能使用匿名管道,而需要使用命名管道。
命名管道
使用mkfifo函数可以创建一个命名管道。命名管道的本质是磁盘上的一个FIFO文件,文件被存储在磁盘上,就意味着文件有一个绝对路径,其他进程可以通过这个路径找到这个文件,这个路径就像一个标识符,使不同进程可以通过这个标识符找到这个管道文件,以这个管道文件进行通信。虽然命名管道是磁盘上的文件,但该文件的数据不用向磁盘刷新,即磁盘上的数据只是一个符号,用来表示该文件的存在,与匿名管道一样,命名管道实际的读写是在内存中,以内存为载体进行通信的速度明显比以外设为载体进行通信快。
函数有两个参数,pathname是fifo文件所在路径,mode是该文件的权限,关于权限通常用一串八进制数表示。
// ipc.h头文件
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <cstring>
#include <unistd.h>
#include <stdio.h>
#define IPC_PATH "/home/cw/daily/11_20/fifo"
// serve.cc文件
#include "ipc.h"
using namespace std;
int main()
{
umask(0);
if (mkfifo(IPC_PATH, 0600) != 0) // 创建命名管道失败
{
cerr << "fifofile fail" << endl;
return 1;
}
int fd = open(IPC_PATH, O_RDONLY);
if (fd == -1)
{
cerr << "open fail" << endl;
return 1;
}
char read_buffer[512];
while (1)
{
memset(read_buffer, 0, sizeof(read_buffer));
ssize_t s = read(fd, read_buffer, sizeof(read_buffer) - 1);
if (s == 0)
{
cout << "服务退出" << endl;
// 关闭文件的读端
close(fd);
// 删除磁盘上的管道文件
unlink(IPC_PATH);
break;
}
read_buffer[strlen(read_buffer) - 1] = '\0';
cout << read_buffer << endl;
}
return 0;
}
// client.cc文件
#include "ipc.h"
using namespace std;
int main()
{
int fd = open(IPC_PATH, O_WRONLY);
if (fd == -1) // 打开文件失败
{
cerr << "open fail" << endl;
return 2;
}
char write_buffer[128];
while (1)
{
cout << "请输入信息#";
fflush(stdout);
// 每次写入数据清空write_buffer
memset(write_buffer, 0, sizeof(write_buffer));
// 从键盘获取数据失败
if (fgets(write_buffer, sizeof(write_buffer), stdin) == nullptr)
{
cerr << "input fail" << endl;
return 3;
}
// 当输入exit表示客户的退出,服务器也停止读取数据
if (strcmp(write_buffer, "exit\n") == 0)
{
break;
}
// 向管道写入从键盘上获取的数据
write(fd, write_buffer, sizeof(write_buffer));
}
cout << "客户退出" << endl;
// 关闭文件的读端,释放管道资源
close(fd);
return 0;
}
模拟命名管道的使用:现在有一个客户端,一个服务器,客户向服务器发送请求,服务器需要接收请求。由服务器创建命名管道进行通信,服务器是命名管道的读端,以读的方式打开管道,客户则以写的方式打开管道,当然管道的路径是事先约定好的,双方都知道管道在哪。客户发送消息,当发送消息为"exit"时表示不再发送数据,关闭管道文件,此时的服务器read函数返回0,服务器根据这一条件关闭管道,最终释放管道资源(文件以引用计数的方式进行资源的释放,每次释放文件资源只会将文件的计数-1,不会进行真正的资源释放。当文件的计数为0时,就表示没有进程打开该文件,系统才会进行真正的资源释放,所以只有服务器和客户端的文件都关闭后,管道资源才会被释放,否则只能等到服务器和客户端都退出,管道资源才会释放)。但是命名管道本质是内存中的一个文件,调用unlink删除内存中的fifo文件才会将管道资源彻底的释放。