目录
一、什么是进程间通信
管道
管道的原理
匿名管道
1.简单写一个管道
2.总结管道的特点,理解以前的管道
3.扩展
如何写一个进程池?
创建Makefile文件
创建我们的任务头文件Task.cpp
创建我们的主程序文件
一、什么是进程间通信
进程的运行具有独立性,有独立的页表,pcb,等等父子进程之间,数据不相干扰
这就让我们进程想要通信的难度比较大。
因为操作系统在设计的时候,它本身就是独立的。
进程间通信的本质:
先让不同的进程看到同一份资源(内存空间)
为什么要进行进程间通信?
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。(交换数据、空值、通知等目标)
进程间通信的必要性
单进程的,那么也就无法使用并发能力,更加无法实现多进程协同
进程间通信不是目的,而是手段,为了实现多进程协同。
进程通信的技术背景
1.进程是具有独立性的。虚拟地址空间+页表 保证进程运行的独立性(进程内核数据接口+进程的代码和数据)
2.通信的成本会比较高
进程间通信的本质理解
1.进程间通信的前提,首先需要让不同的进程看到同一块“内存”(特定的结构组织)
2.所以所谓的进程看到同一块“内存”,属于哪一个进程呢?不能隶属于任何一个进程,而应该更强调共享。
进程间通信分类:
进程间通信的方式也有一些标准
1.Linux原生提供--管道
匿名管道pipe
命名管道
2.System V IPC(侧重于本地通信(单机通信))--多进程
System V 消息队列
System V 共享内存
System V 信号量
3.POSIX IPC(侧重于网络通信)--多线程
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
标准在我们使用者看来,都是接口上具有一定的规律
管道
什么是管道?
(天然气管道、石油管道、自来水管道……)
(这里我们简化一下,有一个入口和一个出口的管道)
1.只能单向通信。
(一般都是天然气公司把天然气输送到你家,不是你家把天然气输送到天然气公司)
2.管道内传输的都是资源
计算机通信领域的设计者,设计了一种单向通信的方式--管道
计算机中的管道:
传输资源->数据!
管道的原理
管道通信背后是进程之间通过管道进行通信
下面的|就是将第一个命令的执行结果作为参数传给后面那个命令
因为中间的数据资源不属于任何一个进程。
上面是我们的再基础IO中所说过的文件系统。
那么当前进程如果创建了子进程,会发生什么呢?
struct file*fd_array[]文件描述符表要不要拷贝给子进程呢?
首先这个表表示这个进程和文件的描述符表之间的关系,可就是当前的进程可以看到哪些被打开的文件。这个必须拷贝给子进程!
拷贝只是第一次拷贝,之后父子进程持有的就是独立的表结构。
那么这个表指向的一堆文件要不要拷贝给子进程呢?
不 ,我们不需要。与进程相关的都会被拷贝,与文件相关的并不会被拷贝!
父子进程的struct file_struct是一样的,所以里面的文件指针都是一样的,所以我们打开的文件也是一样的。
(比防说我们父子进程在打印到屏幕上的时候,都是打印到1号文件中,都是打印到同一个显示器上!)
所以我们这里的struct file是能被父进程访问,也能被子进程访问!
所以我们的父子进程是不是看到了一份公共文件,这个也就是我们的管道!
(管道的临时文件不需要刷新到磁盘)
双方进程各自关闭不需要的文件描述符
让父进程进行写入,子进程进行读取,父进程写,就需要关闭读,保留写,子进程关闭写,保留读的功能。每个进程内部都有各自的文件描述符。
也就是让不同的进程看到同一份资源。
Linux下,一切皆文件,管道就是我们上面的共享文件。
这里的文件是属于内核的,所有的进程间通信都是内核级别的。
那两进程进行通信,需不需要将这个管道文件保存到磁盘?
不需要!
进程间通信的管道必须在内存中,是一个纯内存的通信方式,如果还要写磁盘的话,我们的通信效率就太低了。
并且进程间通信的数据往往属于临时数据,不需要将数据持久化保存
匿名管道
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
像下面的|就是一个简单的管道
ps axj |head -1 && ps axj|grep sleep
管道使用完毕之后,记得要关闭文件描述符。
1.简单写一个管道
如何做到让不同的进程,看到同一份资源的呢?
fork让子进程继承的--能够让具有血缘关系的进程进行进程间通信--常用于父子进程。
pipefd[0]:读端
pipefd[1]:写端
int pipe(int pipefd[2]);输出型参数,希望通过它来验证我们的管道成功搭建
#include<iostream>
#include<unistd.h>
#include<assert.h>
using namespace std;
int main()
{
//1.创建管道
int pipefd[2]={0};
int n=pipe(pipefd);
//在debug模式下assert是有效的,但是release版本下是会无效的
assert(n!=-1);
//所以我们这里需要写下面的代码,证明n被使用过
(void)n;
cout<<"pipefd[0]"<<pipefd[0]<<endl;
cout<<"pipefd[1]"<<pipefd[1]<<endl;
return 0;
}
ok,我们这里的文件描述符已经成功打开了,接下来我们在进一步搭建我们的管道
#include<iostream>
#include<unistd.h>
#include<assert.h>
#include<string>
#include<cstdio>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
using namespace std;
int main()
{
//1.创建管道
int pipefd[2]={0};//pipefd[0]:读端,pipefd[1]:写端
int n=pipe(pipefd);
//在debug模式下assert是有效的,但是release版本下是会无效的
assert(n!=-1);
//所以我们这里需要写下面的代码,证明n被使用过
(void)n;
//如果是DEBUG模式下就不打印了,相当于就是注释掉了
#ifdef DEBUG
cout<<"pipefd[0]"<<pipefd[0]<<endl;
cout<<"pipefd[1]"<<pipefd[1]<<endl;
#endif
//2.创建子进程
pid_t id=fork();
assert(id!=-1);
if(id==0)
{
//子进程
//3.构建单向通信的信道
//父进程写入,子进程读取,让父子关闭不需要的文件描述符
//3.1关闭子进程不需要的fd
//子进程进行读取,将写端给关闭
close(pipefd[1]);
char buffer[1024];
while (true)
{
//从0号文件描述符中读取,读取到缓冲区buffer中
size_t s=read(pipefd[0],buffer,sizeof(buffer)-1);
if(s>0)
{
//添加\0
buffer[s]=0;
cout<<"child get a message["<<getpid()<<"] Father#"<<buffer<<endl;
}
}
// close(pipefd[0]);
exit(0);
}
//父进程
//3.构建单向通信的信道
//父进程进行写入,将读取端进行关闭
close(pipefd[0]);
string message="我是父进程,我正在给你发消息";
int count=0;
char send_buffer[1024];
while(true)
{
//3.2构建一个变化的字符串
//将printf的内容格式化到字符串中
snprintf(send_buffer,sizeof(send_buffer),"%s[%d]:%d",message.c_str(),getpid(),count++);
//3.3写入
//这里strlen不需要+1,\0写入也没有意义。
write(pipefd[1],send_buffer,strlen(send_buffer));
//3.4故意sleep
sleep(1);
}
pid_t ret=waitpid(id,nullptr,0);
assert(ret<0);
(void)ret;
close(pipefd[1]);
//子进程中的pipefd[0]关闭可写可不写,因为进程退出了,进程中的文件描述符也会被关掉
return 0;
}
为什么我们不写一个全局的缓冲区(buffer)来进行通信呢?
因为有写时拷贝的存在,无法更改通信。
2.总结管道的特点,理解以前的管道
1.管道是一种进程间通信的方式,管道是用来进行具有血缘关系的进程进行进程间通信,常用于父子进程。
2.我们上面的代码中,我们的父进程是1秒钟发送一条消息,但是我们的子进程并没有设置读取信息的时间间隔,但是我们的子进程依旧是跟随父进程的节奏在打印。
那么我们父进程在sleep的期间,子进程在干什么呢?
子进程在等待父进程的写入。
管道是一个文件,显示器也是一个文件,父子同时往显示器写入的时候,有没有说一个会等另一个的情况呢?
没有!
之前我们往显示器上打印的时候,都是交错着疯狂往显示器上打印的。
这种情况,我们将其称为缺乏访问控制!
那我们上面的子进程等待父进程的写入,就是具有访问控制。
也就是说,管道具有通过让进程间协同,提供了访问控制!
管道里满的时候,写的一方要等待读的一方将数据读取
管道空的时候,读取的一方要等待写的一方写入。
下面我们验证一下,下面我们是将父进程循环写入,但是子进程需要等待20秒才写入
#include<iostream>
#include<unistd.h>
#include<assert.h>
#include<string>
#include<cstdio>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
using namespace std;
int main()
{
//1.创建管道
int pipefd[2]={0};//pipefd[0]:读端,pipefd[1]:写端
int n=pipe(pipefd);
//在debug模式下assert是有效的,但是release版本下是会无效的
assert(n!=-1);
//所以我们这里需要写下面的代码,证明n被使用过
(void)n;
//如果是DEBUG模式下就不打印了,相当于就是注释掉了
#ifdef DEBUG
cout<<"pipefd[0]"<<pipefd[0]<<endl;
cout<<"pipefd[1]"<<pipefd[1]<<endl;
#endif
//2.创建子进程
pid_t id=fork();
assert(id!=-1);
if(id==0)
{
//子进程
//3.构建单向通信的信道
//父进程写入,子进程读取,让父子关闭不需要的文件描述符
//3.1关闭子进程不需要的fd
//子进程进行读取,将写端给关闭
close(pipefd[1]);
char buffer[1024*8];
while (true)
{
//从0号文件描述符中读取,读取到缓冲区buffer中
size_t s=read(pipefd[0],buffer,sizeof(buffer)-1);
if(s>0)
{
sleep(20);
//添加\0
buffer[s]=0;
cout<<"child get a message["<<getpid()<<"] Father#"<<buffer<<endl;
}
}
// close(pipefd[0]);
exit(0);
}
//父进程
//3.构建单向通信的信道
//父进程进行写入,将读取端进行关闭
close(pipefd[0]);
string message="我是父进程,我正在给你发消息";
int count=0;
char send_buffer[1024*8];
while(true)
{
//3.2构建一个变化的字符串
//将printf的内容格式化到字符串中
snprintf(send_buffer,sizeof(send_buffer),"%s[%d]:%d",message.c_str(),getpid(),count++);
//3.3写入
//这里strlen不需要+1,\0写入也没有意义。
write(pipefd[1],send_buffer,strlen(send_buffer));
//3.4故意sleep
cout<<count<<endl;
// sleep(10);
}
pid_t ret=waitpid(id,nullptr,0);
assert(ret<0);
(void)ret;
close(pipefd[1]);
//子进程中的pipefd[0]关闭可写可不写,因为进程退出了,进程中的文件描述符也会被关掉
return 0;
}
如果缓冲区满了,就不能写入了,就只能等待子进程读取。所以这里子进程读取一次的数据可能是父进程写了好几次的结果
3.管道提供的是面向流式的通信服务--面向字节流(需要对应的协议)
你写了十次,但是我可能一次就全部都读取完了。
4.管道是基于文件的,文件的生命周期是随进程的,管道的生命周期是随进程的。
写入的一方,fd没有关闭,如果有数据,就读,没有数据,就等
写入的一方,fd关闭,读取的一方,read会返回0,表示读到了文件的结尾(将缓冲区中的内容读取完毕之后,就可以退出了!)
#include<iostream>
#include<unistd.h>
#include<assert.h>
#include<string>
#include<cstdio>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
using namespace std;
int main()
{
//1.创建管道
int pipefd[2]={0};//pipefd[0]:读端,pipefd[1]:写端
int n=pipe(pipefd);
//在debug模式下assert是有效的,但是release版本下是会无效的
assert(n!=-1);
//所以我们这里需要写下面的代码,证明n被使用过
(void)n;
//如果是DEBUG模式下就不打印了,相当于就是注释掉了
#ifdef DEBUG
cout<<"pipefd[0]"<<pipefd[0]<<endl;
cout<<"pipefd[1]"<<pipefd[1]<<endl;
#endif
//2.创建子进程
pid_t id=fork();
assert(id!=-1);
if(id==0)
{
//子进程
//3.构建单向通信的信道
//父进程写入,子进程读取,让父子关闭不需要的文件描述符
//3.1关闭子进程不需要的fd
//子进程进行读取,将写端给关闭
close(pipefd[1]);
char buffer[1024*8];
while (true)
{
//从0号文件描述符中读取,读取到缓冲区buffer中
size_t s=read(pipefd[0],buffer,sizeof(buffer)-1);
if(s>0)
{
// sleep(20);
//写入的一方,fd没有关闭,如果有数据,就读,没有数据,就等
//写入的一方,fd关闭,读取的一方,read会返回0,表示读到了文件的结尾
//添加\0
buffer[s]=0;
cout<<"child get a message["<<getpid()<<"] Father#"<<buffer<<endl;
}
else if(s==0)
{
cout<<"writer quit(father),me quit"<<endl;
break;
}
}
// close(pipefd[0]);
exit(0);
}
//父进程
//3.构建单向通信的信道
//父进程进行写入,将读取端进行关闭
close(pipefd[0]);
string message="我是父进程,我正在给你发消息";
int count=0;
char send_buffer[1024*8];
while(true)
{
//3.2构建一个变化的字符串
//将printf的内容格式化到字符串中
snprintf(send_buffer,sizeof(send_buffer),"%s[%d]:%d",message.c_str(),getpid(),count++);
//3.3写入
//这里strlen不需要+1,\0写入也没有意义。
write(pipefd[1],send_buffer,strlen(send_buffer));
//3.4故意sleep
sleep(1);
cout<<count<<endl;
if(count==5)
{
cout<<"writer quit(father)"<<endl;
break;
}
}
close(pipefd[1]);
pid_t ret=waitpid(id,nullptr,0);
cout<<"id:"<<id<<"ret:"<<ret<<endl;
assert(ret>0);
(void)ret;
//子进程中的pipefd[0]关闭可写可不写,因为进程退出了,进程中的文件描述符也会被关掉
return 0;
}
只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
管道提供流式服务
一般而言,进程退出,管道释放,所以管道的生命周期随进程
一般而言,内核会对管道操作进行同步与互斥
管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道。
5.管道是单向通信的,就会半双工通信的一种特殊情况
作为通信的一方,要么我在发送,要么我在接收,我不能同时接收和发送,这种就称为半双工通信。
有时候呢,我们既可以收,又可以发,这就称为全双工通信。
比防说我们在听老师上课,老师在将,我们在听,这就是半双工通信。
但是如果两个人在吵架,你在吵的时候我也在吵,同时我也在听你说什么,你也在听我说什么,这就是全双工通信。
a.写快,读慢,写满的时候就不能再写了
b.写慢,读快,管道没有数据的时候,读的这一方就必须等待
c.写关,读0,表示读到了文件结尾
d.读关,写继续写,OS终止写进程
3.扩展
如何写一个进程池?
我们给父进程和每一个子进程建立一个管道,并以固定大小的方式command_code(4kb),给我们的子进程发送指令。
创建Makefile文件
ProcessPool:ProcessPool.cc
g++ -o $@ $^ -std=c++11 -DEBUG
.PHONY:clean
clean:
rm -f ProcessPool
创建我们的任务头文件Task.cpp
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
#include<unordered_map>
#include<functional>
#include<vector>
//返回void,参数为()
typedef std::function<void()> func;
std::vector<func> callbacks;
std::unordered_map<int,std::string> desc;
void readMySQL()
{
std::cout<<"sub process["<<getpid()<<"]执行访问数据库的任务\n"<<std::endl;
}
void execuleUrl()
{
std::cout<<"sub process["<<getpid()<<"]执行url解析\n"<<std::endl;
}
void cal()
{
std::cout<<"sub process["<<getpid()<<"]执行加密任务\n"<<std::endl;
}
void save()
{
std::cout<<"sub process["<<getpid()<<"]执行数据持久化任务\n"<<std::endl;
}
void load()
{
desc.insert({callbacks.size(),"readmySQL:读取数据库"});
callbacks.push_back(readMySQL);
desc.insert({callbacks.size(),"execuleUrl:进行URL解析"});
callbacks.push_back(execuleUrl);
desc.insert({callbacks.size(),"cal:进行加密计算"});
callbacks.push_back(cal);
desc.insert({callbacks.size(),"save:进行数据的文件保存"});
callbacks.push_back(save);
}
void showHandler()
{
for(const auto& iter:desc)
{
std::cout<<iter.first<<"\t"<<iter.second<<std::endl;
}
}
//返回有多少个任务
int handlerSize()
{
return callbacks.size();
}
创建我们的主程序文件
自动派发任务的版本
#include<iostream>
#include<unistd.h>
#include<cstdlib>
#include<sys/wait.h>
#include<vector>
#include<sys/types.h>
#include"Task.hpp"
#include<ctime>
#include<assert.h>
//默认创建的进程个数
#define PROCESS_NUM 5
using namespace std;
//等待命令
int waitCommand(int waitfd,bool&quit)
{
uint32_t command=0;
ssize_t s=read(waitfd,&command,sizeof(command));
//如果读取到对应的0,那么就是文件描述符关掉了,就直接退出
if(s==0)
{
quit=true;
return -1;
}
//看看有没有读取成功
assert(s==sizeof(uint32_t));
return command;
}
//拖过文件描述符像进程发送命令
void SendAndWakeup(pid_t who,int fd,uint32_t command)
{
write(fd,&command,sizeof(command));
cout<<"main process: call process"<<who<<"execute"<<desc[command]<<"through"<<fd<<endl;
}
int main()
{
//将任务装载进来
load();
//子进程的pid_t,信道pipfd的键值对
//表示一个一个进程相关的信息
vector<pair<pid_t,int>> slots;
//先创建多个进程
for(int i=0;i<PROCESS_NUM;i++)
{
//创建管道
int pipedf[2]={0};
int n=pipe(pipedf);
assert(n==0);
(void)n;
//创建子进程
pid_t id=fork();
assert(id!=-1);
//子进程我们让他进行读取
if(id==0)
{
//child,进行读取,关闭写入端
close(pipedf[1]);
while(true)
{
//等待命令
bool quit=false;
int command=waitCommand(pipedf[0],quit);//如果对方不发,我们就阻塞
if(quit)
break;
//执行对应的命令
if(command>=0&&command<handlerSize())
{
callbacks[command]();
}
else
{
cout<<"非法command"<<command<<endl;
}
}
exit(1);
}
//father,进行写入,关闭读取端
close(pipedf[0]);
slots.push_back(pair<pid_t,int>(id,pipedf[1]));
}
//父进程派发任务
//将任务均衡地拍付给每一个任务称为单机版的负载均衡
srand((unsigned long)time(nullptr) ^ getpid()^2332313L);//让我们的数据源更随机
while(true)
{
int command=rand()%handlerSize();
//采用随机数的方式,选择子进程来完成任务,这是一种随机数的方式来实现负载均衡。
int choice=rand()%slots.size();
//布置任务
//把任务给指定的进程
SendAndWakeup(slots[choice].first,slots[choice].second,command);
sleep(1);
}
//关闭fd,结束所有的进程
//关闭所有的写的文件描述符
//所有的子进程在读取完之后都会退出
for(const auto &slot:slots)
{
close(slot.second);
}
//回收所有的子进程。
for(const auto &slot:slots)
{
//等待全部的子进程
waitpid(slot.first,nullptr,0);
}
}
手动派发任务的版本
#include<iostream>
#include<unistd.h>
#include<cstdlib>
#include<sys/wait.h>
#include<vector>
#include<sys/types.h>
#include"Task.hpp"
#include<ctime>
#include<assert.h>
//默认创建的进程个数
#define PROCESS_NUM 5
using namespace std;
//等待命令
int waitCommand(int waitfd,bool&quit)
{
uint32_t command=0;
ssize_t s=read(waitfd,&command,sizeof(command));
//如果读取到对应的0,那么就是文件描述符关掉了,就直接退出
if(s==0)
{
quit=true;
return -1;
}
//看看有没有读取成功
assert(s==sizeof(uint32_t));
return command;
}
//拖过文件描述符像进程发送命令
void SendAndWakeup(pid_t who,int fd,uint32_t command)
{
write(fd,&command,sizeof(command));
cout<<"main process: call process"<<who<<"execute"<<desc[command]<<"through"<<fd<<endl;
}
int main()
{
//将任务装载进来
load();
//子进程的pid_t,信道pipfd的键值对
//表示一个一个进程相关的信息
vector<pair<pid_t,int>> slots;
//先创建多个进程
for(int i=0;i<PROCESS_NUM;i++)
{
//创建管道
int pipedf[2]={0};
int n=pipe(pipedf);
assert(n==0);
(void)n;
//创建子进程
pid_t id=fork();
assert(id!=-1);
//子进程我们让他进行读取
if(id==0)
{
//child,进行读取,关闭写入端
close(pipedf[1]);
while(true)
{
//等待命令
bool quit=false;
int command=waitCommand(pipedf[0],quit);//如果对方不发,我们就阻塞
if(quit)
break;
//执行对应的命令
if(command>=0&&command<handlerSize())
{
callbacks[command]();
}
else
{
cout<<"非法command:"<<command<<endl;
}
}
exit(1);
}
//father,进行写入,关闭读取端
close(pipedf[0]);
slots.push_back(pair<pid_t,int>(id,pipedf[1]));
}
//父进程派发任务
//将任务均衡地拍付给每一个任务称为单机版的负载均衡
srand((unsigned long)time(nullptr) ^ getpid()^2332313L);//让我们的数据源更随机
while(true)
{
int select;
int command;
cout<<"##########################################"<<endl;
cout<<"# 1. show functions 2.send command #"<<endl;
cout<<"##########################################"<<endl;
cout<<"Please Select>"<<endl;
cin>>select;
if(select==1)
{
showHandler();
}
else if(select=2)
{
cout<<"Enter Your Command>";
//选择任务
cin>>command;
//发送命令,并且唤醒子进程
//选择进程
//选择一个任务,如果这个任务是从网络中来的?
// int command=rand()%handlerSize();
//采用随机数的方式,选择进程来完成任务,这是一种随机数的方式来实现负载均衡。
int choice=rand()%slots.size();
// //布置任务
// //把任务给指定的进程
SendAndWakeup(slots[choice].first,slots[choice].second,command);
// sleep(1);
}else
{
cout<<"该指令不再可以选择的范围内"<<endl;
continue;
}
}
//关闭fd,结束所有的进程
//关闭所有的写的文件描述符
//所有的子进程在读取完之后都会退出
for(const auto &slot:slots)
{
close(slot.second);
}
//回收所有的子进程。
for(const auto &slot:slots)
{
//等待全部的子进程
waitpid(slot.first,nullptr,0);
}
}