文章目录
- 什么是管道
- 匿名管道的直接原理
- pipe( )系统调用接口
- 匿名管道代码示例
- 匿名管道的特征
- 总结
什么是管道
管道(Pipe) 是一种基本的进程间通信(IPC
)机制,允许一个进程与另一个进程之间进行数据传输;
管道工作方式类似于生活中的水管因此命名为管道,数据从一端流入另一段流出,数据流为单向;
Linux
中可以使用who | wc -l
查看当前登入系统的用户数;
who
命令用与显示当前登入系统的用户信息,其中一条会话代表一个用户;
wc -l
命令统计当前行数;
两条命令通过管道符|
进行连接,即将显示的信息通过管道符传输给wc
命令再进行统计行数;
|
符号即为一种管道;
管道存在两种基本类型:
-
匿名管道(Anonymous Pipes)
不存在命名的管道,用于有亲缘关系的进程之间的通信(如父子进程或兄弟进程等);
匿名管道通常用于单个系统内部的进程通信;
-
命名管道(Named Pipes)
也称为
FIFO
(First In First Out),拥有命名并存在于文件系统中;命名管道允许没有亲缘关系的进程之间进行通信;
匿名管道的直接原理
每个进程需要维护其task_struct
结构体,对应的内核数据结构中存在一个struct file_struct*
指针指向一个file_struct
结构体,这个结构体中存在一个struct file* fd_array[]
指针数组,数组的下标为文件描述符;
对应的struct file
结构体存放打开的文件的基本信息;
这些信息包括但不限于:
-
Inode
文件的
Inode
编号; -
file_operators
提供给上层的读写接口方法集;
-
缓冲区
对于普通文件而言这个缓冲区通常为 页缓冲区 ;
缓冲区与文件系统配合实现数据的写入与读取;
匿名管道是一种区别于普通文件的内存级文件;
其不存在于磁盘中且不基于文件系统,操作系统不会为匿名管道文件分配Inode与对应的数据块;
对应的读写方法file_operators
是针对于缓冲区的读写;
当进程创建子进程时子进程为父进程的一个拷贝;
操作系统会为子进程单独维护一个task_struct
结构体以及其对应的内容包括file_struct
结构体与文件描述符表;
文件系统与进程管理之间为并列关系,文件不会因为创建子进程而单独为其拷贝一份新文件;
子进程文件描述符对应的结构体指针所指向的文件与父进程相同;
进程间通信的本质是 “让不同的进程看到同一份资源” ;
在创建子进程时即可实现两个不同的进程看到同一份"资源",即管道文件的缓冲区;
在打开文件时通常会记录打开文件的方式 (读/写),在创建子进程时打开方式也会连同一起拷贝,这意味着单独以读或是写的方式打开文件不能使得两个不同进程进行通信;
在进行匿名管道通信时进程将占用两个文件描述符分别以读和写的方式打开管道文件,在创建子进程后根据需求关闭另一个文件描述符从而实现单向通信;
系统并不会在使用管道时为用户关闭某个文件描述符,该操作由用户自行决定;
为确保管道的正常使用与进程间通信,用户需要手动关闭不需要的文件描述符;
单个管道只能进行单向通信,若是使单个管道进行双向通信可能会因为读写位置不同或数据覆盖,数据碎片等问题造成通信错误;
需要利用管道进行双向通信时可采用两个管道的方式,其中每个管道负责一个方向的通信;
pipe( )系统调用接口
使用 open()
等系统调用接口创建的文件是一种磁盘级文件,在文件系统中存在自身的文件名,Inode与数据块且将被文件系统管理;
管道是一种内存级文件,不存在对应的文件名与Inode
,数据块分配,内存级文件不被文件系统所管理,不可使用open()
等接口函数创建;
匿名管道文件的创建需要通过系统调用接口pipe()
进行创建;
PIPE(2) Linux Programmer's Manual PIPE(2)
NAME
pipe, pipe2 - create pipe
SYNOPSIS
#include <unistd.h>
int pipe(int pipefd[2]);
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <fcntl.h> /* Obtain O_* constant definitions */
#include <unistd.h>
RETURN VALUE
On success, zero is returned. On error, -1 is returned, and errno is set appropriately.
-
头文件
pipe()
系统调用接口存在于<unistd.h>
头文件中; -
参数
int pipefd[2]
参数代表使用该系统调用接口时需要传递一个数组,数组中只需要包含两个int
类型的元素;该参数是一种输出型参数;
-
传入型参数
这类参数用于向函数提供需要的数据或信息,函数通过这些参数读取传入的值,但不会修改它们;
-
输出型参数
输出参数则用于从函数内部向外部返回额外的数据,函数通过修改这些参数的值来传递数据给调用者;
pipefd[0]
与pipefd[1]
分别代表创建的管道文件对应的文件描述符,其中0
号下标对应的int
类型数据代表读,1
号下标代表写; -
-
返回值
函数调用成功时返回
0
;函数调用失败时返回
-1
并设置errno
;
匿名管道的使用方式一般为:
-
父进程调用
pipe()
系统调用接口创建管道文件 -
父进程调用
fork()
系统调用接口创建子进程(或间接创建具有亲缘关系的进程) -
判断数据流方向( 父流向子/子流向父 )
-
根据需求(数据流向)关闭父子进程的另一个文件描述符
-
进行通信
通信一般采用
write()
系统调用接口与read()
系统调用接口; -
关闭剩余文件描述符
在通信过后可以根据条件关闭通信过的文件描述符,也可不关闭(最终管道文件将会自动被操作系统回收);
匿名管道代码示例
-
头文件与宏定义
#include <sys/types.h> #include <sys/wait.h> #include <unistd.h> #include <cstdlib> #include <cstring> #include <iostream> #include <string> #define N 2 #define NUM 1024 using namespace std;
-
N
为pipe()
系统调用接口参数的数组大小 -
NUM
为用户层缓冲区的大小
-
-
main
函数int main() { int pipefd[N] = {0}; int n = pipe(pipefd); if (n < 0) { cerr << "pipe error" << endl; exit(-1); } pid_t id = fork(); if (id < 0) { cerr << "fork error" << endl; exit(-1); } /* child -> parent pipefd[0] -- 'r' pipefd[1] -- 'w' */ if (id == 0) { // child - `w` close(pipefd[0]); // IPC code PIC::Write(pipefd[1]); exit(0); } // parent - `r` close(pipefd[1]); // IPC code PIC::Read(pipefd[0]); pid_t rid = waitpid(id, nullptr, 0); if (rid < 0) return 3; return 0; }
该示例为子进程向父进程进行单向通信;
设置
int
类型数组pipefd[N]
并调用pipe()
系统调用接口创建匿名管道文件;利用
fork()
系统调用接口创建子进程并根据需求调用close()
系统调用接口关闭父子进程中不需要的文件描述符;父进程调用
waitpid()
系统调用接口进行进程等待;Read()
与Write()
接口用来完成具体的通信过程;子进程调用
Write()
,父进程调用Read()
完成通信; -
Read()
与Write()
实现namespace PIC { void Write(int wfd) { char buff[NUM]; pid_t self = getpid(); string s = "I am a child"; int number = 0; while (true) { buff[0] = 0; // 字符串清空 (为读者展示该数组将被视为一个字符串) snprintf(buff, sizeof(buff), "%s-%d-%d", s.c_str(), self, number++); // cout << buff << endl; // 向父进程进行通信 write(wfd, buff, strlen(buff)); // 进行写入时数据不作为一个字符串 sleep(1); } } void Read(int rfd) { char buff[NUM]; while (true) { buff[0] = 0; // 字符串清空 (为读者展示该数组将被视为一个字符串) ssize_t n = read(rfd, buff, sizeof(buff)); if (n > 0) { buff[n] = 0; // 需要打印时将需要称为一个字符串 需要添加'\0' cout << "parent process get a massage [" << getpid() << "]# " << buff << endl; } //忽略read()调用失败 } } } // namespace PIC
创建命名空间
PIC
避免出现命名冲突;-
Write()
创建用户层缓冲区
buff[NUM]
用于存储子进程需要向父进程写入的内容;调用
snprintf()
C标准接口用于将字符串格式化后写入用户层缓冲区buff[]
中 (snprintf()
具体调用查看手册);调用
write()
系统调用接口将用户层缓冲区内容buff[]
写入至内核缓冲区(匿名管道文件的缓冲区,写入过程中文件数据不当做字符串看待);调用
sleep()
使每向内核缓冲区写入一条数据后休眠1s
; -
Read()
创建用户层缓冲区
buff[NUM]
用于存储接收的由子进程写给父进程的内容;调用
read()
系统调用接口从内核缓冲区(匿名管道文件的缓冲区)读取内容并写入至用户层缓冲区buff[]
中并用n
接收返回值;子进程在向父进程写入时未将数据以字符串形式看待,此处需要打印需要将数据以字符串看待需要在
buff[n]
处添加字符串结束符\0
;利用
std::cout
打印接收的内容;
-
缓冲区为内核的空间,用户层必须通过系统调用接口才能间接对内核缓冲区进行操作;
匿名管道的特征
-
匿名管道只能为具有血缘关系的进程间进行通信
不具有血缘关系的进程无法利用匿名管道进行通信;
-
管道只能单向通信
单个管道进行双向通信将会因为读写位置不同或数据覆盖,数据碎片等问题造成通信错误;
-
父子进程通信时会进行协同
在上段代码中子进程向父进程写入数据时利用
sleep()
每隔1s
进行一次写入;父进程在读取子进程数据时不进行休眠;
运行上段代码并在另一个会话窗口利用脚本观察父子进程情况;
$ while :; do ps axj | head -1 && ps axj | grep mytest | grep -v grep ; echo"#####################" ; sleep 1 ; done
脚本结果如下:
##################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 29927 30127 30127 29927 pts/2 30127 S+ 1002 0:00 ./mytest 30127 30128 30127 29927 pts/2 30127 S+ 1002 0:00 ./mytest ##################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 29927 30127 30127 29927 pts/2 30127 S+ 1002 0:00 ./mytest 30127 30128 30127 29927 pts/2 30127 S+ 1002 0:00 ./mytest
观察结果父子进程都进行了休眠;
当子进程向父进程传输数据时父进程会直接接收;
若是子进程未向父进程传输数据时为了避免读取"脏数据"(即错误或无效数据),父进程将会等待子进程进行下一次的通信;
进程间通信的本质是 “使不同进程看到同一个资源” ,这意味着这份资源将被多个执行流共享;
因此可能会出现 访问冲突 或 临界资源竞争 等问题;
父子进程间协同的方式一般采用 同步与互斥 ,主要保护管道文件资源的数据安全;
-
匿名管道通信时的四种情况
-
管道为空时读端将进行阻塞
参考上文 父子进程通信时会进行协同 ;
-
管道为满时写端将进行阻塞
在原代码基础上取消子进程的
sleep()
并在父进程开头处sleep(5)
,结尾处sleep(100)
进行等待(只进行一次读取,sleep(100)
为与一次读取进行分割避免第二次read()
);在两个会话中分别运行程序与监控脚本;
结果如下:
-
监控脚本
$ while :; do ps axj | head -1 && ps axj | grep mytest | grep -v grep ; echo "#####################" ; sleep 1 ;done PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND ##################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 29927 32142 32142 29927 pts/2 32142 S+ 1002 0:00 ./mytest 32142 32143 32142 29927 pts/2 32142 S+ 1002 0:00 ./mytest ##################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 29927 32142 32142 29927 pts/2 32142 S+ 1002 0:00 ./mytest 32142 32143 32142 29927 pts/2 32142 S+ 1002 0:00 ./mytest #...
-
程序结果
$ ./mytest parent process get a massage [32142] # I am a child-32143-0I am a child-32143-1I am a child-32143-2I am a child-32143-3I am a child-32143-4I am a child-32143-5I am a child-32143-6I am a child-32143-7I am a child-32143-8I am a child-32143-9I am a child-32143-10I am a child-32143-11I am a child-32143-12I am a child-32143-13I am a child-32143-14I am a child-32143-15I am a child-32143-16I am a child-32143-17I am a child-32143-18I am a child-32143-19I am a child-32143-20I am a child-32143-21I am a child-32143-22I am a child-32143-23I am a child-32143-24I am a child-32143-25I am a child-32143-26I am a child-32143-27I am a child-32143-28I am a child-32143-29I am a child-32143-30I am a child-32143-31I am a child-32143-32I am a child-32143-33I am a child-32143-34I am a child-32143-35I am a child-32143-36I am a child-32143-37I am a child-32143-38I am a child-32143-39I am a child-32143-40I am a child-32143-41I am a child-32143-42I am a child-32143-43I am a child-32143-44I am a child-32143-45I am a child-32143-46I am a child-32143-47I am a child-32143-48I am
结果来看在运行程序时父进程因
sleep(5)
并未接收到子进程传输的数据;当休眠结束时将管道内的所有数据进行一次性读取;
从读取的数据来看最终写入的数据停留在了
I am a child-32143-48I am
;这意味着管道此时已经被写满了;
当管道满了时写端将进行阻塞,等待读端读取数据后才能进行下一次写入;
-
-
读端正常,写端关闭
利用
man
查看read()
系统调用接口返回值;RETURN VALUE On success, the number of bytes read is returned (zero indicates end of file), and the file position is advanced by this number. ...
读端正常写端关闭时当管道中的数据被读端读完后读端将会读取到
0
表示已经读到文件(管道Pipe
)结尾且不会被阻塞;故为防止该种情况需要在父进程中进行特殊处理(上述原文件并未对该情况进行处理);
-
写端正常,读端关闭
当一个进程尝试向管道的写端写入数据,而管道的读端已经被所有相关进程关闭时,该进程会收到
SIGPIPE
信号;默认情况下
SIGPIPE
信号会终止该进程;这是因为如果没有任何进程能够从管道读取数据,继续写入数据就没有意义;
操作系统通过发送
SIGPIPE
信号来通知这一点;修改上文原代码,使写端持续每隔一秒对管道文件缓冲区进行写入(原代码中保持不变);
读端读取
3
次后退出,即读端被关闭:void Read(int rfd) { int cnt = 0; char buff[NUM]; buff[0] = 0; while (true) { buff[0] = 0; // 字符串清空 (为读者展示该数组将被视为一个字符串) ssize_t n = read(rfd, buff, sizeof(buff)); if (n > 0) { buff[n] = 0; // 需要打印时将需要称为一个字符串 需要添加'\0' cout << "parent process get a massage [" << getpid() << "]# " << buff << endl; } if (cnt++ > 5) break; } }
子进程若被信号杀死父进程可以看到其对应状态;
修改原代码
main
函数中父进程的操作,使其在子进程结束后sleep(3)
,并调用宏WTERMSIG
观察子进程被哪个信号杀死;// parent - `r` close(pipefd[1]); // IPC code PIC::Read(pipefd[0]); close(pipefd[0]); sleep(3);//观察僵尸状态 int status = 0; pid_t rid = waitpid(id, &status, 0); cout << "Parent process [" << getpid() << "]@ waited for child process sucessed " << endl << "Child process terminated by signal " << WTERMSIG(status) << endl; if (rid < 0) return 3;
在两个会话中调用监控脚本与运行程序;
结果如下:
-
运行结果:
$ ./mytest parent process get a massage [2381]# I am a child-2382-0 parent process get a massage [2381]# I am a child-2382-1 parent process get a massage [2381]# I am a child-2382-2 parent process [2381]@ waited for child process sucess the status is 0 , Child process terminated by signal 13
-
脚本结果
####################################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 31962 4575 4575 31962 pts/0 4575 S+ 1002 0:00 ./mytest 4575 4576 4575 31962 pts/0 4575 S+ 1002 0:00 ./mytest ####################################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 31962 4575 4575 31962 pts/0 4575 S+ 1002 0:00 ./mytest 4575 4576 4575 31962 pts/0 4575 S+ 1002 0:00 ./mytest ####################################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 31962 4575 4575 31962 pts/0 4575 S+ 1002 0:00 ./mytest 4575 4576 4575 31962 pts/0 4575 S+ 1002 0:00 ./mytest ####################################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 31962 4575 4575 31962 pts/0 4575 S+ 1002 0:00 ./mytest 4575 4576 4575 31962 pts/0 4575 Z+ 1002 0:00 [mytest] <defunct> ####################################### PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 31962 4575 4575 31962 pts/0 4575 S+ 1002 0:00 ./mytest 4575 4576 4575 31962 pts/0 4575 Z+ 1002 0:00 [mytest] <defunct>
读端(父进程)读取两次后关闭读端并
sleep(3)
;读端被关闭时写端(子进程)立马为僵尸状态
Z+
;而后读端阻塞
sleep(3)
结束后两个进程退出;打印结果中子进程(写端)被
13
号信号杀死,kill -l
命令查看信号集:
$ kill -l 1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP 6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1 11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM 16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP ...
13
号信号对应信号SIGPIPE
; -
-
-
匿名管道具有固定大小
利用
ulimit
命令带-a
选项查看操作系统的限制;$ ulimit -a core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 7269 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 65535 pipe size (512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 4096 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited
其中
pipe size
大小为512bytes * 8
为4kb
;修改原代码中
Write()
为:void Write(int wfd) { int number = 0; while (true) { char c = 'c'; write(wfd, &c, 1); number++; cout << number << endl; } } //读端对应进行阻塞(该测试中用不到读端)
即每次向管道文件内核缓冲区中写入一个字节;
重新编译运行代码结果为:
... 65534 65535 65536 ^C #进行阻塞时 Ctrl+C 停止继续执行
最终结果为
65536
字节,约为64kb
与ulimit
中结果不符;- 原因是匿名管道的缓冲区大小是固定的,但这个大小由操作系统决定,不同的操作系统和配置可能会有不同的管道缓冲区大小;
-
管道是面向字节流的
参考上文 “管道的四种情况 > 管道为满时写端将进行阻塞” ,写端向读端进行数据传输时只按照字节进行写入;
最终的格式将取决于读端采用什么方式对写端的内容进行接收;
写端可以连续写入任意数量的字节,而读端可以根据需要读取任意数量的字节;
故写端不需要考虑读端采用何种方式进行接收,读端不需要考虑写端采用何种方式进行写入;
-
匿名管道是基于文件的
匿名管道不属于文件系统(不存在文件名,Inode与对应的数据块),但匿名管道是基于文件的,其必须通过系统调用接口与文件描述符的配合才能进行使用;
同时匿名管道的生命周期是由进程决定的(引用计数);
总结
-
管道是一种基于文件描述符的进程间通信(IPC)机制,允许单向数据传输;
-
有两种基本类型的管道
匿名管道用于有亲缘关系的进程间通信;
命名管道(FIFO) 允许没有亲缘关系的进程间通信;
-
管道存在固定大小的缓冲区,由操作系统决定,面向字节流,支持连续字节的读写操作;
-
匿名管道的生命周期与创建它的进程相关,不属于文件系统,但通过文件描述符进行操作;
-
进程间通信通过管道需要正确管理文件描述符,如关闭不需要的端以避免阻塞和
SIGPIPE
信号;