文章目录
- 一.进程间通信
- 进程为什么要通信?
- 进程如何通信
- 二.管道
- 匿名管道
- pipe
- 写端慢写入,读端等待
- 写端写入,读端不读 && 管道的大小
- 写端关闭,读端不会读取
- 写端写入,读端关闭
- 字节流
- 总结
- 安全问题
- 三.进程池
- 创建管道 && 分配任务的代码:
一.进程间通信
进程为什么要通信?
进程需要某种协同–>通过通信的方式进行协同。(学校教导处查看某个教室某时间有位置,跟老师协商这个时间是否可行)
进程 = 内核数据结构 + 代码和数据。
数据有类别:通知进程就绪(通知学生上课)、单纯的要传递数据(老师把期末分数交给教务系统)、控制相关的信息(拖堂太久,主任强制老师下课)
进程具有独立性。进程蹦了只是删自己的内核数据结构和代码与数据,跟其它进程没关。
进程如何通信
进程间通信,成本可能稍微高一些!
进程间通信的前提:让不同的进程,看到同一份(操作系统的)资源(“一段内存”)。
因为A进程开辟的空间B进程是看不到的(独立性),只有两个进程看到同一块内存才能通信。
所以得让第三方(操作系统)提供一块内存。
二.管道
匿名管道
打开两次文件,需要创建两次struct file。第二次创建struct file时,因为属性是一样的,所以不用再次加载文件属性和内核级文件缓冲区。操作系统不喜欢做浪费时间和浪费空间的事情。
创建子系统时,task_struct和struct files_struct要保证进程的独立性,所以要拷贝。
但文件系统没有必要保证独立性,所以直接用类似于浅拷贝的指针。
子进程继承父进程的描述符表,就会指向同一个文件,也就意味着都会写进同一个内核里。当操作系统刷新时,都会指向同一个文件。
这就是为什么父子进程打印数据时,都会向一个显示器终端打印。
问题:进程默认会打开三个标准输入输出:0,1,2。怎么做到的?
答:bash打开了,那么bash的子进程也就默认打开了。
问题:为什么子进程主动关闭close(0,1,2),不影响父进程继续使用显示器文件?
答:sturct file中有引用计数(ref_count;)。
类似这么一个代码:file ->ref_count--; if(ref_count==0) //释放文件资源
综上所述:父子进程看到了同一个文件,其中的内存级的缓冲区就相当于管道文件
管道只允许单向通信,因为简单。
要么父进程一直给子进程发消息,要么子进程一直给父进程发消息。
父子进程关闭不需要的文件描述符。父进程关闭r,子进程关闭w。
并且内核级缓冲区不再写入磁盘,只让父子进程进行内存级的通信。
创建管道的具体过程:
问题:父子既然要关闭不需要的fd,为什么曾经要打开呢?
答:如果父进程只创建读方式,子进程只能继承读方式。一个管道不能存在两个读或者两个写,通信不了。如果以读写的方式把文件打开,万一父进程在写,子进程也在写,就出问题了。
可以不关吗,父进程只用fd== 3 ,子进程只用fd==4?
可以不关闭,并不影响通信,但建议关了,万一误写了呢!
一个进程中存储的文件描述符是有上限的,所以一个进程能打开的文件是有限的。占着一个不用,就浪费了资源,可能造成文件描述符泄漏。
pipe
管道的创建,单独设计了pipe的系统调用,底层就是open。
返回值:若成功返回0,失败返回-1
创建的通道不再向磁盘中刷新数据
pipe若不需要文件路径和文件名,创建出来的叫做匿名管道。
问题:如果我想双向通信呢?
答:创建两个通道
问题:管道为什么要单向通信?
答:复用文件系统的代码,为了简单,减少开发成本。父子进程都往一个管道里读写,就要区分哪些是父进程的读写,哪些是子进程的读写。
文件描述符0,1,2是系统默认的,所以管道的文件描述符是3和4:
#include <iostream>
#include <cerrno>
#include <cstring>
#include <unistd.h>
int main()
{
// 1.创建管道
int pipefd[2];
int n = pipe(pipefd); //输出型参数,rfd,wfd
if(n != 0) //失败
{
std::cerr << "errno" << errno << ":" << "errstring" << strerror(errno) << std::endl;
return 1;
}
std::cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << std::endl;
return 0;
}
其中pipefd[0]->0是读端;pipefd[1]->write是写端
写端慢写入,读端等待
父进程读取,子进程写入:(因为管道也是文件,所以读写管道的接口是read/write)
#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
const int size = 1024; //读取数组的大小
// 计数 + pid
std::string getOtherMessage()
{
static int cnt = 0;
std::string messageid = std::to_string(cnt);
cnt++;
pid_t self_id = getpid();
std::string string_pid = std::to_string(self_id);
std::string message = "messageid: ";
message += messageid;
message += " my pid is : ";
message += string_pid;
return message;
}
// 子进程进行写入
void SubProcessWrite(int wfd)
{
std::string message = "father,I am your son process!";
while(true)
{
std::string info = message + getOtherMessage(); // 子进程发给父进程的消息
write(wfd,info.c_str(),info.size()); //写入管道的时候没有写入"\0",没有必要写入
sleep(1); //让子进程写慢一点
}
}
// 父进程进行读取
void FatherProcessWrite(int rfd)
{
char inbuffer[size];
while(true)
{
//ssize_t就是int
ssize_t n = read(rfd,inbuffer,sizeof(inbuffer)-1); //-1为了给"\0"留位置
if(n > 0)
{
inbuffer[n] = 0 ; // == "\0"
std::cout << "father get message: " << inbuffer << std::endl;
}
}
}
int main()
{
// 1.创建管道
int pipefd[2];
int n = pipe(pipefd); //输出型参数,rfd,wfd
if(n != 0) //失败
{
std::cerr << "errno" << errno << ":" << "errstring" << strerror(errno) << std::endl;
return 1;
}
std::cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << std::endl;
sleep(1); // 创建完管道等待1秒
// 2.创建子进程
pid_t id = fork();
if(id == 0)
{
std::cout << "子进程关闭不需要的fd了,准备发通信" << std::endl;
sleep(1);
// 子进程 --> write
// 3.关闭不需要的文件描述符
close(pipefd[0]);
SubProcessWrite(pipefd[1]);
// 用完之后关闭
close(pipefd[1]);
exit(0);
}
std::cout << "父进程关闭不需要的fd了,准备收通信" << std::endl;
sleep(1);
// 父进程 --> read
// 3.关闭不需要的文件描述符
close(pipefd[1]);
FatherProcessWrite(pipefd[0]);
// 用完之后关闭
close(pipefd[0]);
//防止子进程僵尸
pid_t rid = waitpid(id,nullptr,0);
if(rid > 0)
{
std::cout << "wait child process done" << std::endl;
}
return 0;
}
现象:
根据上述代码和现象,阐述一些结论:
在子进程中有sleep(1),父进程没有sleep()。子进程有写的有多慢,父进程负责打印的就有多慢。
子进程写一条,父进程读一条,父进程等待子进程的写入。
写端写入,读端不读 && 管道的大小
如果让子进程疯狂写,父进程不读,会发生什么?并且管道的大小是多少?
// 子进程进行写入
void SubProcessWrite(int wfd)
{
int pipesize = 0; //计数
std::string message = "father,I am your son process!";
while(true)
{
char c = 'A'; //每次只写一个字节
write(wfd,&c,1);
std::cout << "pipesize: " << ++pipesize << std::endl;
}
}
// 父进程进行读取
void FatherProcessWrite(int rfd)
{
char inbuffer[size];
while(true)
{
sleep(500);
ssize_t n = read(rfd,inbuffer,sizeof(inbuffer)-1); //-1为了给"\0"留位置
if(n > 0)
{
inbuffer[n] = 0 ; // == "\0"
std::cout << "father get message: " << inbuffer << std::endl;
}
}
}
发现一共写了65536字节(64KB)就停了
ubuntu 20…04的版本管道大小是64KB,不同的操作系统管道的大小是不一样的,跟系统有关。
写端关闭,读端不会读取
当子进程写完退出并关闭管道,父进程还会读取吗?
// 子进程进行写入
void SubProcessWrite(int wfd)
{
int pipesize = 0;
std::string message = "father,I am your son process!";
while(true)
{
char c = 'A';
write(wfd,&c,1);
std::cout << "pipesize: " << ++pipesize << std::endl;
break;
}
std::cout << "child quit" << std::endl;
}
// 父进程进行读取
void FatherProcessWrite(int rfd)
{
char inbuffer[size];
while(true)
{
ssize_t n = read(rfd,inbuffer,sizeof(inbuffer)-1); //-1为了给"\0"留位置
if(n > 0)
{
inbuffer[n] = 0 ; // == "\0"
std::cout << "father get message: " << inbuffer << std::endl;
}
std::cout << "father get return val: " << n << std::endl;
}
}
发现read的返回值是0,表示写端已经关闭了,管道已经失效了。
写端写入,读端关闭
rfd直接关闭,写端wfd一直在进行写入,会发生什么?
验证:
#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
const int size = 1024; //读取数组的大小
// 计数 + pid
std::string getOtherMessage()
{
static int cnt = 0;
std::string messageid = std::to_string(cnt);
cnt++;
pid_t self_id = getpid();
std::string string_pid = std::to_string(self_id);
std::string message = "messageid: ";
message += messageid;
message += " my pid is : ";
message += string_pid;
return message;
}
// 子进程进行写入
void SubProcessWrite(int wfd)
{
char c = 'A';
int pipesize = 0;
std::string message = "father,I am your son process!";
while(true)
{
std::string info = message + getOtherMessage(); // 子进程发给父进程的消息
write(wfd,info.c_str(),info.size()); //写入管道的时候没有写入"\0",没有必要写入
sleep(1); //让子进程写慢一点
}
std::cout << "child quit" << std::endl;
}
// 父进程进行读取
void FatherProcessWrite(int rfd)
{
char inbuffer[size];
while(true)
{
//ssize就是int
ssize_t n = read(rfd,inbuffer,sizeof(inbuffer)-1); //-1为了给"\0"留位置
if(n > 0)
{
inbuffer[n] = 0 ; // == "\0"
std::cout << "father get message: " << inbuffer << std::endl;
}
else if(n == 0) //写端关闭,读到了文件的结尾
{
std::cout << "client quit,father get return val: " << n << "father quit too!" << std::endl;
break;
}
else if(n < 0) //读取失败
{
std::cerr << "read error" << std::endl;
break;
}
sleep(1);
break;
}
}
int main()
{
// 1.创建管道
int pipefd[2];
int n = pipe(pipefd); //输出型参数,rfd,wfd
if(n != 0) //失败
{
std::cerr << "errno" << errno << ":" << "errstring" << strerror(errno) << std::endl;
return 1;
}
std::cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << std::endl;
sleep(1); // 创建完管道等待1秒
// 2.创建子进程
pid_t id = fork();
if(id == 0)
{
std::cout << "子进程关闭不需要的fd了,准备发通信" << std::endl;
sleep(1);
// 子进程 --> write
// 3.关闭不需要的文件描述符
close(pipefd[0]);
SubProcessWrite(pipefd[1]);
// 用完之后关闭
close(pipefd[1]);
exit(0);
}
std::cout << "父进程关闭不需要的fd了,准备收通信" << std::endl;
sleep(1);
// 父进程 --> read
// 3.关闭不需要的文件描述符
close(pipefd[1]);
FatherProcessWrite(pipefd[0]);
std::cout << "5s,father close rfd" << std::endl;
sleep(5);
// 用完之后关闭
close(pipefd[0]);
int status = 0;
//得到子进程的退出信息
pid_t rid = waitpid(id,&status,0);
if(rid > 0)
{
std::cout << "wait child process done, exit sig: " << (status&0x7f) << std::endl;
std::cout << "wait child process done, exit code(ign): " << ((status>>8)&0x7f) << std::endl;
}
return 0;
}
13号信号
管道文件再通信的时候,是面向字节流的。
字节流
例子:子进程不断写入,父进程隔2秒在读取。并且子进程写入时往cerr里写入,父进程时往cout中打印
// 子进程进行写入
void SubProcessWrite(int wfd)
{
char c = 'A';
int pipesize = 0;
std::string message = "father,I am your son process!";
while(true)
{
std::cerr << "+++++++++++++++++++++++" << std::endl;
std::string info = message + getOtherMessage(); // 子进程发给父进程的消息
write(wfd,info.c_str(),info.size()); //写入管道的时候没有写入"\0",没有必要写入
std::cerr << info << std::endl;
}
std::cout << "child quit" << std::endl;
}
// 父进程进行读取
void FatherProcessWrite(int rfd)
{
char inbuffer[size];
while(true)
{
sleep(2);
std::cout << "--------------------------" << std::endl;
//ssize就是int
ssize_t n = read(rfd,inbuffer,sizeof(inbuffer)-1); //-1为了给"\0"留位置
if(n > 0)
{
inbuffer[n] = 0 ; // == "\0"
std::cout << "father get message: " << inbuffer << std::endl;
}
else if(n == 0) //写端关闭,读到了文件的结尾
{
std::cout << "client quit,father get return val: " << n << "father quit too!" << std::endl;
break;
}
else if(n < 0) //读取失败
{
std::cerr << "read error" << std::endl;
break;
}
}
}
运行时,让cerr与cout的打印到不同的显示屏上,看出效果
./testpiper 2 > /dev/pts/1
看出write的次数和读取的次数不是一一匹配的,这就是面向字节流最典型的特点。
非字节流的例子:用邮箱发文件,发4份文件,每份文件是独立的,点完一封看下一封。
总结
管道的4种状态:
1.如果管道内部是空的 && write的wfd(写端)没有关闭,读取条件不具备,读进程会被阻塞,等待读取的条件具备。
2.如果管道被写满 && read的rfd(读端)不读且没有关闭,写进程会被阻塞(管道被写满–>写条件不具备),等待写条件具备。
3.管道一直在读 && write的wfd(写端)关闭了,读端read返回值==0,表示读到了文件结尾。
4.read的rfd(读端)直接关闭 && write的wfd(写端)一直在进行写入。写端进程会被操作系统直接使用13号信号关掉,相当于进程出现了异常。
管道的5种特征:
1.匿名管道:只用来进行具有"血缘关系的"进程之间的通信,可以用于兄弟、爷孙进程,常用于父子进程。(无法让两个毫不相关的文件看到同一个文件)
2.管道内部,自带进程之间的同步机制。(明显的顺序性,一读一写)
3.文件的生命周期随进程。(在系统层面上发现没有与该文件关联的进程,就会把该文件释放掉)->管道也是文件,道理一样。
4.管道文件在通信的时候,是面向字节流。
5.管道的通信模式,是一种特殊的半双工模式。
全双工:跟别人发生口角,你问候他,他问候你,你在输入的时候还在输出。
半双工:现实说话,你一句我一句,但不要同时说。
安全问题
常见的"|" 命令就是匿名管道,有多少"|"就有多少管道,先创建管道后创建子进程。且子进程的父进程都是bash,说明子进程之间有“血缘关系”。
三.进程池
平常写的Shell都是读到命令了才创建子进程。
进程池是先提前创建子进程,然后等待父进程分发任务,通过管道相连。
创建管道 && 分配任务的代码:
子进程继承父进程的代码,所以提前把任务的代码写好,并且组成一张代码表,这样管道只用传4字节的数组下标即可,相当于任务码。
代码如下:
ProcessPool.cc
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"
// master
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
: _wfd(wfd), _subprocessid(id), _name(name)
{}
int GetWfd() { return _wfd; }
pid_t GetProcessId() { return _subprocessid; }
std::string GetName() { return _name; }
void CloseChannel()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subprocessid, nullptr, 0);
if (rid > 0)
{
std::cout << "wait " << rid << " success" << std::endl;
}
}
~Channel()
{}
private:
int _wfd; // 读端fd
pid_t _subprocessid; // 子进程的pid
std::string _name; // 管道的名字
};
// C++形参命名规范
// const &:输入型参数
//& :输出输入型参数
//* :输出型参数
// 创建信号和子进程
void CreatChannelAndSub(int num, std::vector<Channel> *channels, task_t task) // task_t task->回调函数
{
for (int i = 0; i < num; i++)
{
// 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
if(!channels->empty()) //第二次开始,清除管道
{
for(auto& channel : *channels)
{
channel.CloseChannel();
}
}
// child - read
close(pipefd[1]);
dup2(pipefd[0], 0); // 重定向到0,好处:读取的任务都去0读
task();
exit(0);
}
// 构建管道名字
std::string channels_name = "Channel-" + std::to_string(i);
// father - write
close(pipefd[0]);
channels->push_back(Channel(pipefd[1], id, channels_name));
}
}
// 轮询方案,防止任务不均衡
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
// 发出信号
void SendTaskCommand(Channel &channel, int taskcommand)
{
write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
//运行一次任务
void CtrlProcessOnce(std::vector<Channel> &channels)
{
sleep(1);
// a.选择一个任务
int taskcommand = SelectTask();
// b.选择一个信道和进程
int channel_index = NextChannel(channels.size());
// 发送任务
SendTaskCommand(channels[channel_index], taskcommand);
std::cout << std::endl;
std::cout << "taskcommand: " << taskcommand << " channel: "
<< channels[channel_index].GetName() << " sub process: "
<< channels[channel_index].GetProcessId() << std::endl;
}
// 通过Channel控制子进程
void CtrlProcess(std::vector<Channel> &channels, int times = -1) // times为运行的次数
{
if (times > 0)
{
while (times--)
{
CtrlProcessOnce(channels);
}
}
else
{
while (true)
{
CtrlProcessOnce(channels);
}
}
}
// 回收管道和子进程
void CleanUpChannel(std::vector<Channel> &channels)
{
for (auto &channels : channels)
{
// a.关闭所有写端
channels.CloseChannel();
// b.回收子进程
channels.Wait();
}
}
// ./processPool 5 创建5个管道
int main(int argc, char *argv[])
{
if (argc != 2) // 输入错误
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
return 1;
}
int num = std::stoi(argv[1]); // char*->int
LoadTask(); // 加载任务
std::vector<Channel> channels;
// 1.创建信号和子进程
CreatChannelAndSub(num, &channels, work);
// 2.通过Channel控制子进程
CtrlProcess(channels, 10);
// 3.回收管道和子进程
CleanUpChannel(channels);
return 0;
}
.hpp是定义和声明在一起,缺点是不能打包成库,大多用于开源项目。
Task.hpp
#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#define TaskNum 3 //任务的数量
typedef void (*task_t)();
task_t tasks[TaskNum];
void Print()
{
std::cout << "Print task" << std::endl;
}
void Download()
{
std::cout << "Download task" << std::endl;
}
void Flush()
{
std::cout << "Flush task" << std::endl;
}
void LoadTask() //下载任务
{
srand(time(nullptr) ^ getpid()); //随机数
tasks[0]= Print;
tasks[1]= Download;
tasks[2]= Flush;
}
void ExcuteTask(int number) //执行任务
{
if(number < 0 || number > 2) return ;
tasks[number]();
}
int SelectTask()
{
return rand() % TaskNum;
}
void work()
{
while(true)
{
int command=0;
int n = read(0,&command,sizeof(command));
if(n==sizeof(int))
{
std::cout << "pid is:" << getpid() << "handler task" << std::endl;
ExcuteTask(command); //执行任务
}
else if(n == 0) //执行完毕
{
std::cout << "sub process: " << getpid() << " quit" << std::endl;
break;
}
}
}
效果:
执行任务
退出任务,并且回关闭通道与回收子进程
有个细节需要注意:
这是创建第一个管道:
问题就出现在第二次往后的管道创建,因为子进程继承父进程的文件描述符表,所以第二个子进程的fd==4是指向0号通道的写端,这样造成两个写端和一个读端,所以得把多余的写端删掉,否则释放管道的时候,导致管道中的计数器ref_count !=0 造成无法释放。子进程也无法退出的情况。