前言
作者:小蜗牛向前冲
名言:我可以接受失败,但我不能接受放弃
如果觉的博主的文章还不错的话,还请点赞,收藏,关注👀支持博主。如果发现有问题的地方欢迎❀大家在评论区指正
本期学习目标:理解什么是管道,学会使用匿名管道和命名管道进行通信
在学习管道之前,我们要明白在Llinux下,什么是通信
一、通信
我们都知道进程具有独立性,也就是说在进程A的执行的信息是不会被B知晓的,但是在以一下场景需要进程间信息:
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止
- 时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
那为什么要有通信呢?
从进程通信的目的来说,我们大体可以理解为通信的目标其实就是让一个进程的消息被另外一个进程所知晓。
比如多进程协同的:
cat file | grep "hello world"
//cat将file文件中的内容打印到屏幕上
// | 管道
// gerp匹配并输出满足指定模式条件的行
这个命令要完成需要二个进程共同协作,也就是说有一些信息双方都要知道,我们就通过管道(|) ,将信息共享了。
那我们又该如何理解通信的问题:
既然进程具有独立性,也就是说双方是不可能直接通信的,就必须有一个第三方的介入,这个第三方就要由操作系统来完成。
- OS需要直接或者间接给通信双方的进程提供 "大家都可以看到一块内存空间"。
- 要通信的进程,必须必须看到一份公共资源。
那么通信操作系统是怎么完成的呢?
所以在操作系统这一般有三种通信手段:
- System v进程间通信(聚焦在本地通信)
- POSIX进程间通信(让通信过程可以跨主机)
- 管道
下面我们将重点为大家讲述,管道是如何进行通信的。
二、管道
1、什么是管道
- 管道是Unix中最古老的进程间通信的形式。
- 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道
下面我们通过一张图来理解一下管道
who
是 Linux 中的一个命令,用于显示当前登录系统的用户信息wc -l
是 Linux 中的一个命令,用于统计文件中的行数。wc
是 word count(词频统计)的缩写,-l
选项表示只统计行数。who | wc -l
是 Linux 中的一个命令,通过管道符|
结合who
和wc
两个命令,实现统计当前登录系统的用户数量。
对于管道我们又分为二类:
- 匿名管道pipe
- 命名管道
2、匿名管道pipe
匿名管道他的概念顾明思意:创建一无名的管道
原型:
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
为了方便记忆,我们可以把0想象成嘴巴,所以是读端, 把1想象成笔,所以是写端。
注意:
- 匿名管道只能用于有亲缘关系的进程间通信,也就是说,只能用于父子进程或者兄弟进程之间。
- 匿名管道是由内核中的一块缓冲区实现的,该缓冲区分别由两个文件描述符指向,一个用于读取数据,一个用于写入数据。
- 因为是匿名的,所以无法通过文件系统访问它们
3、对匿名管道的理解
匿名管道是如何实现通信的呢?
首先父进程创建管道
其次父进创建子进程
最后父进程关闭fd[0](读端)子进程关闭fd[1](写端)
这样就完成管道的单向通信。
为验证管道探究管道单向通信读写的特点,完成以下代码:
由父进程创建管道,fork出子进程,在让子进程关闭读端,让子进程写入信息道管道中,父进程关闭写端,从管道中读取信息。
#include <iostream>
#include <cstdio>
#include <cstring>
#include <string>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
using namespace std;
int main()
{
//创建管道,打开读写端
int fds[2];
int n = pipe(fds);
assert(n == 0 );
//创建子进程
pid_t id = fork();
assert(id>=0);
if(id == 0)
{
//子进程写入
close(fds[0]);
const char *s = "hellow parent";
int cnt = 0;
while(true)
{
cnt++;
char buffer[1024];//子进程才可以看到的数组
//向buffer数组这写入数组
snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());
write(fds[1],buffer,strlen(buffer));
sleep(1);//每隔1秒写入
//当父进程不读,子进程的管道空间是有限的会满
// cout<< cnt <<endl;
}
//子进程关闭写端
close(fds[1]);
cout<<"子进程关闭写端"<<endl;
exit(0);
}
//父进程读取
close(fds[1]);
while(true)
{
char buffer[1024];
ssize_t s = read(fds[0],buffer,sizeof(buffer)-1);
if(s > 0)
{
buffer[s] = 0;
cout<<"Get Message#"<<buffer<<" | my pid: "<<getpid()<<endl;
}
else if(s == 0)
{
//读到文件结尾
cout<<"read: "<<s<<endl;
break;
}
}
close(fds[0]);
cout<<"父进程关闭了读端"<<endl;
n = waitpid(id,nullptr,0);
assert(n == id);
return 0;
}
下面的探究都是具居于上面代码的简单修改
探究1、子进程写的快,父进程读的慢
我们发现管道会被写满,写端写满的时候,在写会阻塞,等对方进行读取
结论:管道的空间是有限的会被写满
探究2、子进程写的慢,父进程读的快
这里我们是让子进程每隔5秒才写入,而父进程在读取前打印A,读取后打印B
这里我们发现在子进程没有写入时,父进程会阻塞等待子进程写入。
探究3、子进程直接将写端关闭
这时候我们发现父进程会读到0,从而结束读取。
探究4、子进程写,但是父进程不读
cout <<"pid->"<< n << " : "<< (status & 0x7F) << endl;
status
是一个整型变量,它用于存储进程的退出状态信息。在 Linux 中,进程终止时,会向其父进程发送一个信号,该信号包含了进程退出的原因以及退出状态码。通过位运算status & 0x7F
,可以得到进程退出的原因,即低 7 位二进制数对应的十进制数,该数字通常被称为“终止信号”
这里打印的是进程的pid和 退出状态信息
那进程退出13号退出信息又代表什么意思呢?
我们kill-l一下
也就是说当读端关闭了,操作系统会 给进程发13) SIGPIPE的信息,从而终止写端。
根据以上场景的探究,下面我们归纳一下匿名管道的特点:
- 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
- 管道提供流式服务
- 一般而言,进程退出,管道释放,所以管道的生命周期随进程
- 一般而言,内核会对管道操作进行同步与互斥
- 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管
三、基于匿名管道的进程池设计
为了更好的理解管道的应用,下面将带领大家设计一个基于管道的进程池设计。
实现现象:通过一个父进程,随意的控制一个子进程完成我们的相关操作。
1、main函数主体
在函数主体中我们,要完成程序整体框架的搭建。首先,建立子进程并建立和子进程的通信的信道,其次,父进程,控制子进程,向子进程发命令码,最后,回收进程。
int main()
{
MakeSeed();//生产随机数的种子
//1.建立子进程并建立和子进程的通信的信道
//1.1加载
std::vector<func_t> funcMap;//用于存放子进程执行函数的地址
loadTaskFunc(&funcMap);
//1.2 创建子进程,并且维护好父子进程的通道
std::vector<subEp> subs;
createSubProcess(&subs,funcMap);
//父进程,控制子进程,向子进程发命令码
int taskCnt = 3;//3表示子进程执行3次
loadBlanceContrl(subs,funcMap,taskCnt);
//回收进程
waitProcces(subs);
return 0;
}
2、各项函数功能的实现
2.1、建立子进程并建立和子进程的通信的信道
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
在创建子进程的函数中,我们传了二个参数 ,*subs和&funcMap。
subs指针指向的是父进程管理子进程各项信息的类:
class subEp//父进程的一些控制信息
{
public:
subEp(pid_t subId,int writeFd )
: _subId(subId),_writeFd(writeFd)
{
char nameBuffer[1024];
snprintf(nameBuffer,sizeof(nameBuffer),"process-%d[pid(%d)-fd(&d)]", _num++, _subId, _writeFd);
_name = nameBuffer;
}
public:
static int _num;
std::string _name;
pid_t _subId;
int _writeFd;
};
int subEp::_num = 0;
funcMap是子进程要执行的任务:
std::vector<func_t> funcMap;//用于存放子进程执行函数的地址
loadTaskFunc(&funcMap);
其中的funcMap是一个数组,数组中存放的是指向父进程要子进程执行的任务函数指针。
//函数指针
typedef void (*func_t)();
void downLodeTask()
{
std::cout<<getpid()<<"下载任务\n"
<<std::endl;
sleep(1);
}
void ioTask()
{
std::cout<<getpid()<<"io任务\n"
<<std::endl;
sleep(1);
}
void flushTask()
{
std::cout<<getpid()<<"刷新任务\n"
<<std::endl;
sleep(1);
}
void loadTaskFunc(std::vector<func_t> *out)
{
assert(out);
out->push_back(downLodeTask);
out->push_back(ioTask);
out->push_back(flushTask);
}
子进程接受任务函数
int recvTask(int readFd)
{
int code = 0;
ssize_t s = read(readFd,&code,sizeof(code));
if(s == 4) return code;
else if(s <= 0) return -1;
else return 0;
}
下面是创建子进程代码的完整实现
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{
std::vector<int> deleteFd;
for(int i = 0; i < PROCESS_NIM;i++)
{
int fds[2];
int n = pipe(fds);
assert(n == 0);
//因为assert是Dug版本显示,reserve版本不显示,其中的n是pipe的返回值
//在reserve版本下,编译器可能认为n一个函数的返回值没有被使用,可能会有warn
(void)n;
pid_t id = fork();
if(id == 0)
{
for(int i = 0; i < deleteFd.size(); i++) close(deleteFd[i]);
//子进程处理任务
close(fds[1]);
while(true)
{
//获取命令码
int commandCode = recvTask(fds[0]);
//完成任务
if(commandCode >= 0 && commandCode <funcMap.size())
{
funcMap[commandCode]();
}
else if(commandCode ==-1)
{
break;
}
}
exit(0);
}
close(fds[0]);
subEp sub(id,fds[1]);
subs->push_back(sub);
deleteFd.push_back(fds[1]);
}
}
2.2、父进程控制子进程,向子进程发命令码
int taskCnt = 3;//3表示子进程执行3次
loadBlanceContrl(subs,funcMap,taskCnt);
其中父进程控制子进程函数的第三个参数是我们要子进程执行几个任务。
父进程发送给子进程任务的控制函数
void sendTask(const subEp &process,int taskNum)
{
std::cout<<"send task num: "<< taskNum << " send to -> "<< process._name <<std::endl;
int n = write(process._writeFd,&taskNum,sizeof(taskNum));
assert(n == sizeof(int));
(void)n;
}
父进程控制子进程的代码:
void loadBlanceContrl(const std::vector<subEp> &subs,const std::vector<func_t> &funcMap,int count)
{
int processnum = subs.size();
int tasknum = funcMap.size();
bool forever = (count == 0 ? true : false);
while(true)
{
//选择一个子进程 --》std::vector<subEp> ->index
int subIdx = rand() % processnum;
//选择一个任务
int taskIdx = rand() % tasknum;
//将任务发送给选项的进程
sendTask(subs[subIdx],taskIdx);
sleep(1);
//控制子进程的执行任务的次数
if(!forever)
{
count--;
if(count == 0) break;
}
}
for(int i = 0; i<processnum;i++) close(subs[i]._writeFd);
}
2.3、回收进程
waitProcces(subs);
void waitProcces(std::vector<subEp> processes)
{
int processnum = processes.size();
for(int i = 0; i < processnum; i++)
{
waitpid(processes[i]._subId,nullptr,0);
std::cout<<"wait sub process success ..." << processes[i]._subId << std::endl;
}
}
2.4、代码的完整实现
#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <cassert>
#include <ctime>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define PROCESS_NIM 5
#define MakeSeed() srand((unsigned long)time(nullptr)^getpid()^0x324124^rand()%1234)
//下面代码是子进程要完成的任务//
//函数指针
typedef void (*func_t)();
void downLodeTask()
{
std::cout<<getpid()<<"下载任务\n"
<<std::endl;
sleep(1);
}
void ioTask()
{
std::cout<<getpid()<<"io任务\n"
<<std::endl;
sleep(1);
}
void flushTask()
{
std::cout<<getpid()<<"刷新任务\n"
<<std::endl;
sleep(1);
}
void loadTaskFunc(std::vector<func_t> *out)
{
assert(out);
out->push_back(downLodeTask);
out->push_back(ioTask);
out->push_back(flushTask);
}
//下面代码是多进程的程序///
class subEp//父进程的一些控制信息
{
public:
subEp(pid_t subId,int writeFd )
: _subId(subId),_writeFd(writeFd)
{
char nameBuffer[1024];
snprintf(nameBuffer,sizeof(nameBuffer),"process-%d[pid(%d)-fd(&d)]", _num++, _subId, _writeFd);
_name = nameBuffer;
}
public:
static int _num;
std::string _name;
pid_t _subId;
int _writeFd;
};
int subEp::_num = 0;
int recvTask(int readFd)
{
int code = 0;
ssize_t s = read(readFd,&code,sizeof(code));
if(s == 4) return code;
else if(s <= 0) return -1;
else return 0;
}
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{
std::vector<int> deleteFd;
for(int i = 0; i < PROCESS_NIM;i++)
{
int fds[2];
int n = pipe(fds);
assert(n == 0);
//因为assert是Dug版本显示,reserve版本不显示,其中的n是pipe的返回值
//在reserve版本下,编译器可能认为n一个函数的返回值没有被使用,可能会有warn
(void)n;
pid_t id = fork();
if(id == 0)
{
for(int i = 0; i < deleteFd.size(); i++) close(deleteFd[i]);
//子进程处理任务
close(fds[1]);
while(true)
{
//获取命令码
int commandCode = recvTask(fds[0]);
//完成任务
if(commandCode >= 0 && commandCode <funcMap.size())
{
funcMap[commandCode]();
}
else if(commandCode ==-1)
{
break;
}
}
exit(0);
}
close(fds[0]);
subEp sub(id,fds[1]);
subs->push_back(sub);
deleteFd.push_back(fds[1]);
}
}
void sendTask(const subEp &process,int taskNum)
{
std::cout<<"send task num: "<< taskNum << " send to -> "<< process._name <<std::endl;
int n = write(process._writeFd,&taskNum,sizeof(taskNum));
assert(n == sizeof(int));
(void)n;
}
void loadBlanceContrl(const std::vector<subEp> &subs,const std::vector<func_t> &funcMap,int count)
{
int processnum = subs.size();
int tasknum = funcMap.size();
bool forever = (count == 0 ? true : false);
while(true)
{
//选择一个子进程 --》std::vector<subEp> ->index
int subIdx = rand() % processnum;
//选择一个任务
int taskIdx = rand() % tasknum;
//将任务发送给选项的进程
sendTask(subs[subIdx],taskIdx);
sleep(1);
//控制子进程的执行任务的次数
if(!forever)
{
count--;
if(count == 0) break;
}
}
for(int i = 0; i<processnum;i++) close(subs[i]._writeFd);
}
void waitProcces(std::vector<subEp> processes)
{
int processnum = processes.size();
for(int i = 0; i < processnum; i++)
{
waitpid(processes[i]._subId,nullptr,0);
std::cout<<"wait sub process success ..." << processes[i]._subId << std::endl;
}
}
int main()
{
MakeSeed();
//1.建立子进程并建立和子进程的通信的信道
//1.1加载
std::vector<func_t> funcMap;//用于存放子进程执行函数的地址
loadTaskFunc(&funcMap);
//1.2 创建子进程,并且维护好父子进程的通道
std::vector<subEp> subs;
createSubProcess(&subs,funcMap);
//父进程,控制子进程,向子进程发命令码
int taskCnt = 3;//3表示子进程执行3次
loadBlanceContrl(subs,funcMap,taskCnt);
//回收子进程
waitProcces(subs);
return 0;
}
3、实验现象
当我们用父进程控制三个子进程,让他们分别执行任务:
四、命名管道
什么我们用匿名管道进行了进程间的通信,但是匿名管道有限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,那我们就要用到命名管道
1、命名管道的相关知识
首先我们要清楚命名管道是一种特殊类型的文件
命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
mkfifo filename
命名管道也可以从程序里创建,相关函数有
nt mkfifo(const char *filename,mode_t mode)
命名管道的打开规则
如果当前打开操作是为读而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开用open
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义
2、用命令管道实现二个进程通信
简述:这里我们写二个文件server和linent,让他在命令管道实现进程通信。
首先我们用makefile来管理我们的多文件
makefile
.PHONY:all
all:server client
server:server.cc
g++ -o $@ $^ -std=c++11 -g
client:client.cc
g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
rm -f server client
其次我们要创建命名管道,这里我们定义为comm.hpp,里面包含了server和client二个进程所需要的头文件。当然我们不仅仅写了建立命名管道的函数还有移除管道的函数。
pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define NAME_PIPE "/tmp/mypipe.me"
bool createFifo(const std::string &path)
{
umask(0);
int n = mkfifo(path.c_str(),0600);
if(n == 0)
return true;
else
{
std::cout<<"errno: " << errno << " err string: "<< strerror(errno) << std::endl;
return false;
}
}
void removeFifo(const std::string &path)
{
int n = unlink(path.c_str());
assert(n == 0);
(void)n;
}
最后就是我们server和client二个通信文件编写:
server.cc
在这个文件中,我们要创建命名管道,打开文件(命名管道),从里面读取我们需要的信息(clicent发送过来)
#include"comm.hpp"
int main()
{
bool r = createFifo(NAME_PIPE);
assert(r);
(void)r;
std::cout<< "server begin" << std::endl;
int rfd = open(NAME_PIPE,O_RDONLY);
std::cout<< "server end" << std::endl;
if(rfd<0) exit(1);
char buffer[1024];
while(true)
{
ssize_t s = read(rfd,buffer,sizeof(buffer)-1);
if(s > 0)
{
buffer[s] = 0;
std::cout << "client->server# " << buffer << std::endl;
}
else if(s == 0)
{
std::cout << "client quit, me too !" <<std::endl;
break;
}
else
{
std::cout << "err string " << strerror(errno) << std::endl;
break;
}
}
close(rfd);
sleep(10);
removeFifo(NAME_PIPE);
return 0;
}
clicent.cc
这个文件中我们只有打开刚刚server创建的文件,然后写入信息,server就可以接受到了
#include"comm.hpp"
int main()
{
std:: cout <<" clinent begin " << std::endl;
int wfd = open(NAME_PIPE,O_WRONLY);
std::cout << " client end " << std::endl;
if(wfd < 0) exit(1);
char buffer[1024];
while(true)
{
std::cout << "Please Say# ";
fgets(buffer,sizeof(buffer),stdin);
if(strlen(buffer) > 0) buffer[strlen(buffer)-1] = 0;
ssize_t n = write(wfd,buffer ,strlen(buffer));
assert( n == strlen(buffer));
(void)n;
}
close(wfd);
return 0;
}
实验现象:
这里我们让 clicen进程输入文字就可以在rerver文件中接收到