文章目录
- 管道介绍
- 什么是管道:
- 管道的原理
- 管道的特点
- 具体代码详写
- 创建初始文件
- makefile编写
- 定义任务列表-task.hpp
- 分阶段代码编写
- 总代码展示:
- ctrlProcess.cc 编写
- 头文件包含(如有不会,自己查谷歌)
- 定义全局变量以解耦
- main,函数框架
- EndPoint定义
- creatProcess 创建管道
- WaitCommand-子进程开始读取
- ctrlProcess 开始指点天下
- 写个小菜单,顺便接收command
- waitProcess - 最后一步
- 最最最后一步,改一个小bug(creatProcess中子进程继承父进程文件描述符问题)
- 总代码展示
- 总结
管道介绍
什么是管道:
- 管道是unix中最古老的进程间通信的形式
- 我们把从一个进程连接到另一个进程的数据流的叫做管道
管道也是一种文件
管道的原理
前提知识: 创建子进程的时候,fork子进程,只会复制进程相关的数据结构对象,不会复制父进程曾经打开的文件对象!
这就是为什么fork之后,父子进程cout,printf都会向同一个显示器终端打印数据的原因
我们在代码中需要做的,其实就是让父子进程看到同一份文件,对于父进程而言,只写该文件,对于子进程而言,只读该文件,即可进行进程间的通信
管道的特点
- 单向通信–>半双工的一种特殊情况(类似于对讲机)–>吵架是"全双工"
- 管道的本质是文件,因为fd的声明周期是随进程的,因此管道的生命周期也是随进程的
- 管道通信,通常用来进行具有"血缘"关系的进程,进行进程间的通信,常用于父子通信–pipe打开管道,并不清楚管道的名字,我们把这个管道称为匿名管道
- 在管道通信中,写入的次数和读取的次数不是严格匹配的–>读写的次数多少没有强相关–>字节流
所以管道叫"面向字节流" - 具有一定的协同能力,让reader和writer能够按照一定的步骤进行通信–自带同步机制
四种场景:
- 如果我们read读取完毕了所有的管道数据,如果对方不发,我们就只能等待
- 如果我们write端将管道写满了,我们还能写吗?不能
- 如果我关闭了写端,读取完毕管道数据,再读,就会read返回0,表明读到了文件结尾
- 写端一直写,读端关闭,会发生什么呢?此时再写也没有意义了,OS不会维护无意义低效率的,或者浪费资源的事情.OS会杀死一直在写入的进程,OS通过信号来终止进程,13)SIGPIPE
具体代码详写
创建初始文件
mkdir unnamedpipe //创建一个新的文件夹
cd unnamedpipe //进入该文件夹
touch makefile //编写makefile
touch mypipe.cc //主要程序
touch task.hpp //任务列表文件
以上文件具体内容及各自的作用将在下文中详细叙述.
makefile编写
我们是在linux上进行编程,而不是VS之类的集成开发环境,因此,写一个makefile方便我们调试代码
mypipe:mypipe.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f mypipe
定义任务列表-task.hpp
分阶段代码编写
对于这个操作,其实我们在这个阶段已经做了很多次了,无非就是将不同的任务做成不同的函数,并且封装成函数指针数组,并在细节处不断解耦,已达到低耦合,高内聚的特点.
首先,要使用函数指针数组,我们得先重命名(也叫声明)函数指针类型:
typedef void(*fun_t)();
//参数列表为空,返回值为void函数类型,重命名为fun_t
然后我们就可以肆无忌惮地写一些函数(没有具体实现,仅仅是为了测试用):
我们假设有以下三个任务可供用户选择:
#include<iostream>
using namespace std;
//getpid的头文件:
#include<unistd.h>
void PrintLog()
{
cout << getpid() << ":PrintLog..." << endl;
}
void InsertMySQL()
{
cout << getpid() << " InsertMySQL..." << endl;
}
void NetRequest()
{
cout << getpid() << " NetRequest..." << endl;
}
同时,为了更加解耦,更加方便我们用户操作,我们将上述操作宏定义为数字:
并且,我们约定每一个command都应该是四字节发送,接收端四字节接收.
//约定,每一个command都必须是4字节
#define COMMAND_LOG 0
#define INSERTMYSQL 1
#define NETREQUEST 2
我们在创建好三个函数之后,需要"组织"和"管理"好这些数据:
class Task
{
public:
Task(){
funcs.push_back(PrintLog);
funcs.push_back(InsertMySQL);
funcs.push_back(NetRequest);
}
void Excute(int command)//成员函数,执行某个命令
{
if(command >= 0 && command < funcs.size())
{
funcs[command]();
}
}
public:
vector<fun_t> funcs;//定义一个函数指针数组
};
至此,我们的task.hpp
便创建好了,专门用来储存任务的数据结构.
总代码展示:
#pragma once
#include<iostream>
#include<functional>
#include<vector>
#include<unistd.h>
#define COMMAND_LOG 0
#define INSERTMYSQL 1
#define NETREQUEST 2
using namespace std;
typedef void(*fun_t)();//函数指针
void PrintLog()
{
cout << getpid() << ":PrintLog..." << endl;
}
void InsertMySQL()
{
cout << getpid() << " InsertMySQL..." << endl;
}
void NetRequest()
{
cout << getpid() << " NetRequest..." << endl;
}
class Task
{
public:
Task(){
funcs.push_back(PrintLog);
funcs.push_back(InsertMySQL);
funcs.push_back(NetRequest);
}
void Excute(int command)
{
if(command >= 0 && command < funcs.size())
{
funcs[command]();
}
}
public:
vector<fun_t> funcs;
};
ctrlProcess.cc 编写
头文件包含(如有不会,自己查谷歌)
这个文件我们主要用于编写主要控制程序
我们先把所有需要的头文件包含了:
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
//记得包含这个任务列表头文件
#include "Task.hpp"
using namespace std;
定义全局变量以解耦
为了方便之后的修改,我们设置一个全局变量来保存管道个数,以及将一个Task对象实例化
const int gnum = 0;
Task t;
main,函数框架
整个程序,主要分为三个步骤:
- 初始化:构建控制结构并且创建管道
- 开启控制进程
- 回收子进程(退出整个系统)
int main()
{
//1.先进行构建控制结构,父进程写入,子进程读取
vector<EndPoint> end_points;//先描述,再组织,需要我们单独再定义EndPoint的结构
creatProcess(&end_points);//为end_points向量创建管道
//此时我们得到了一堆管道和多个正在等待的读取命令的子进程
//2.开启控制进程函数
ctrlProcess(end_points);
//3.此时整个系统都退出了
waitProcess(end_points);
return 0;
}
EndPoint定义
class EndPoint
{
//父进程要对他所创建的管道进行管理->先描述再组织
private:
static int number;//在类中声明,类外定义
public:
pid_t _child_id;//子进程id
int _write_fd;//写端文件描述符
std::string processname;//该进程的名字
public:
EndPoint(int id,int fd):_child_id(id),_write_fd(fd)
{
//process-0[pid:fd]
char namebuff[64];
snprintf(namebuff,sizeof(namebuff),"process-%d[%d:%d]",number++,_child_id,_write_fd);
//每写一个记得将number++
processname = namebuff;
}
string name() const
{
return processname;//以这种方式获得该进程的名字更加好
}
~EndPoint()
{}
};
int EndPoint::number = 0;//静态成员变量必须在类外定义,在类里面声明
creatProcess 创建管道
void creatProcess(vector<EndPoint> *end_points)
{
for(int i = 0; i < gnum; i++)//需要创建gnum个管道
{
int pipefd[2] = {0};//pipe函数返回值是一个数组,含有两个元素,其中fd[0]是读端,fd[1]是写端
int n = pipe(pipefd);//这里接收返回值仅仅是为了检查合法性
assert(n == 0);
(void)n;//为了使用一下n,否则编译器会以为这是定义了但是没有使用的冗余变量
pid_t id = fork();//创建子进程
assert(id != -1);
//子进程
if(id == 0)
{
close(pipefd[1]);//关闭写端,只留下读端
//我们期望,所有的子进程读取指令的时候,都从标准输入读取->输入重定向
dup2(pipefd[0],0);//将屏幕输入重定向到pipefd[0](即读端)
//->子进程开始等待获取命令
WaitCommand();
//->命令完成以后关闭读端
close(pipefd[0]);
exit(0);
}
//父进程
close(pipefd[0]);
//将新的子进程和他的管道写端,构建对象,插入到管理列表中
(*end_points).push_back(EndPoint(id,pipefd[1]));
}
}
WaitCommand-子进程开始读取
void WaitCommand()
{
while(true)
{
int command = 0;
int n = read(0,&command,sizeof(int));//以四字节为单位读取,read返回的是读到的数据字节数大小
if(n == sizeof(int))//如果得到的是一个合法的命令-即四字节数
{
t.Excute(command);//->调用命令
}
else if(n == 0)
{
//此时已经读取完毕,即父进程不再发送,管道可以关闭了
cout << "父进程让我退出: " << getpid() << endl;
break;
}
else break;
}
}
到此为止,我们现在已经得到了gnum个管道以及对应数量的子进程,正在嗷嗷待哺.
我们也通过pushback将他们的数据结构管理进了end_points,到时候我们只需要用下标访问end_points就能通过不同的管道实现父子进程的通信了
ctrlProcess 开始指点天下
我们再回顾一下,我们已经走了哪些路了:
我们现在开始真正控制进程进行管道通信了.
void ctrlProcess(const vector<EndPoint> & end_points)
{
int num = 0;
int cnt = 0;
while(true)
{
cout << endl;
int command = ShowBoard();//创建一个菜单
if(command == 3) break;//令3命令为退出子进程
if(command < 0 || command > 2) continue;//合法性检查
int index = cnt++;//循环进程进行运行(从0-2进程)
cnt %= end_points.size();
cout << "选择了进程" << end_points[index].name() << "处理任务" << endl;
write(end_points[index]._write_fd,&command,sizeof(command));
}
}
写个小菜单,顺便接收command
int ShowBoard()
{
cout << "###############################" << endl;
cout << "# 0.执行日志任务 #" << endl;
cout << "# 1.执行数据库任务 #" << endl;
cout << "# 2.执行请求任务 #" << endl;
cout << "# 3.退出 #" << endl;
cout << "请选择# ";
int command = 0;
cin >> command;
return command;
}
waitProcess - 最后一步
void waitProcess(const vector<EndPoint> & end_points)
{
//1.我们需要让子进程全部退出 -- 只需要让父进程关闭所有的write_fd就可以了
for(const auto &ep : end_points)
{
close(ep._write_fd);
}
cout << "父进程让所有子进程全部退出" << endl;
sleep(10);
//2.父进程要回收子进程的僵尸状态
for(const auto &ep : end_points)
{
waitpid(ep._child_id,nullptr,0);
}
cout << "父进程让所有子进程全部退出" << endl;
sleep(10);
}
这里我们是先关闭所有的读端之后再统一回收进程,就完美避免了我们下一步即将修改的小bug.
如果我们是用下面这种写法:
for(int end = 0; end < end_points.size(); end++)
{
std::cout << "父进程让子进程退出:" << end_points[end]._child_id << std::endl;
close(end_points[end]._write_fd);
waitpid(end_points[end]._child_id, nullptr, 0);
std::cout << "父进程回收了子进程:" << end_points[end]._child_id << std::endl;
}
sleep(10);
这样的话我们会只能关闭第一个子进程,而不能关闭后面打开的几个子进程.
最最最后一步,改一个小bug(creatProcess中子进程继承父进程文件描述符问题)
这里有个极不容易发现的问题:
当我们创建第一个进程的时候,没有毛病
但是当我们创建后面的进程的时候,子进程会继承父进程的写端文件!
举个例子
子进程1:打开了fd[0] -> 此时父进程有子进程1的写端
子进程2:打开了fd[0] -> 此时还继承了子进程1的写端!
…
从以往后,每次创建子进程都会继承前面那个进程的写端.在我们关闭的时候,如果只把父进程对应的写端关闭了,还会有子进程没有关闭这个写端,仍然无法关闭我们想要关的那个子进程.
所以,我们在每次创建子进程的时候要记得把相应的写端关闭了!
具体写法如下:
void createProcesses(vector<EndPoint> *end_points)
{
vector<int> fds;
for (int i = 0; i < gnum; i++)
{
// 1.1 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
// 1.2 创建进程
pid_t id = fork();
assert(id != -1);
// 一定是子进程
if (id == 0)
{
for(auto &fd : fds) close(fd);
close(pipefd[1]);
dup2(pipefd[0], 0);
WaitCommand();
close(pipefd[0]);
exit(0);
}
// 一定是父进程
close(pipefd[0]);
end_points->push_back(EndPoint(id, pipefd[1]));
fds.push_back(pipefd[1]);
}
}
总代码展示
#include<iostream>
#include<string>
#include<unistd.h>
#include<cassert>
#include<vector>
#include "task.hpp"
#include<stdlib.h>
#include<sys/wait.h>
#include<sys/types.h>
using namespace std;
const int gnum = 3;
class EndPoint
{
//父进程要对他所创建的管道进行管理->先描述再组织
private:
static int number;
public:
pid_t _child_id;
int _write_fd;
std::string processname;
public:
EndPoint(int id,int fd):_child_id(id),_write_fd(fd)
{
//process-0[pid:fd]
char namebuff[64];
snprintf(namebuff,sizeof(namebuff),"process-%d[%d:%d]",number++,_child_id,_write_fd);
processname = namebuff;
}
string name() const
{
return processname;
}
~EndPoint()
{}
};
int EndPoint::number = 0;
Task t;
//子进程执行的方法:
void WaitCommand()
{
while(true)
{
int command = 0;
int n = read(0,&command,sizeof(int));
if(n == sizeof(int))
{
t.Excute(command);
}
else if(n == 0) {
cout << "父进程让我退出:" << getpid() << endl;
break;
}
else break;
}
}
void createProcesses(vector<EndPoint> *end_points)
{
vector<int> fds;
for (int i = 0; i < gnum; i++)
{
// 1.1 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
// 1.2 创建进程
pid_t id = fork();
assert(id != -1);
// 一定是子进程
if (id == 0)
{
for(auto &fd : fds) close(fd);
// 1.3 关闭不要的fd
close(pipefd[1]);
// 我们期望,所有的子进程读取"指令"的时候,都从标准输入读取
// 1.3.1 输入重定向,可以不做
dup2(pipefd[0], 0);
// 1.3.2 子进程开始等待获取命令
WaitCommand();
close(pipefd[0]);
exit(0);
}
// 一定是父进程
// 1.3 关闭不要的fd
close(pipefd[0]);
// 1.4 将新的子进程和他的管道写端,构建对象
end_points->push_back(EndPoint(id, pipefd[1]));
fds.push_back(pipefd[1]);
}
}
int ShowBoard()
{
cout << "###############################" << endl;
cout << "# 0.执行日志任务 #" << endl;
cout << "# 1.执行数据库任务 #" << endl;
cout << "# 2.执行请求任务 #" << endl;
cout << "# 3.退出 #" << endl;
cout << "请选择# ";
int command = 0;
cin >> command;
return command;
}
void waitProcess(const vector<EndPoint> & end_points)
{
//1.我们需要让子进程全部退出 -- 只需要让父进程关闭所有的write_fd就可以了
for(const auto &ep : end_points)
{
close(ep._write_fd);
}
cout << "父进程让所有子进程全部退出" << endl;
sleep(10);
//2.父进程要回收子进程的僵尸状态
for(const auto &ep : end_points)
{
waitpid(ep._child_id,nullptr,0);
}
cout << "父进程让所有子进程全部退出" << endl;
sleep(10);
}
void ctrlProcess(const vector<EndPoint> & end_points)
{
//我们可以写成自动化的,也可以搞成交互式的
int num = 0;
int cnt = 0;
// while(true)
// {
// srand((unsigned)time(NULL));
// //1.选择任务
// int command = INSERTMYSQL;
// //2.选择进程
// int index = rand() % end_points.size();
// //3.下发任务
// write(end_points[index]._write_fd,&command,sizeof(command));
// sleep(1);
// }
while(true)
{
cout << endl;
int command = ShowBoard();
if(command < 0 || command > 2) continue;
if(command == 3) break;
int index = cnt++;
cnt %= end_points.size();
cout << "选择了进程" << end_points[index].name() << "处理任务" << endl;
write(end_points[index]._write_fd,&command,sizeof(command));
}
}
int main()
{
//1.构建控制结构,父进程写,子进程读
vector<EndPoint> end_points;
//2.此时我们得到了一批end_points结构
creatProcess(&end_points);
ctrlProcess(end_points);
//3.此时整个系统都退出了
waitProcess(end_points);
return 0;
}
总结
以上,我们写的都是匿名管道,并且只能在两个有亲缘关系的进程中进行通信.
最后再复习一下管道的五种特点及四种场景:
- 单向通信–>半双工的一种特殊情况(类似于对讲机)–>吵架是"全双工"
- 管道的本质是文件,因为fd的声明周期是随进程的,因此管道的生命周期也是随进程的
- 管道通信,通常用来进行具有"血缘"关系的进程,进行进程间的通信,常用于父子通信–pipe打开管道,并不清楚管道的名字,我们把这个管道称为匿名管道
- 在管道通信中,写入的次数和读取的次数不是严格匹配的–>读写的次数多少没有强相关–>字节流
所以管道叫"面向字节流" - 具有一定的协同能力,让reader和writer能够按照一定的步骤进行通信–自带同步机制
四种场景:
- 如果我们read读取完毕了所有的管道数据,如果对方不发,我们就只能等待
- 如果我们write端将管道写满了,我们还能写吗?不能
- 如果我关闭了写端,读取完毕管道数据,再读,就会read返回0,表明读到了文件结尾
- 写端一直写,读端关闭,会发生什么呢?此时再写也没有意义了,OS不会维护无意义低效率的,或者浪费资源的事情.OS会杀死一直在写入的进程,OS通过信号来终止进程,13)SIGPIPE