目录
0.进程间通信介绍
0.1通信背景
0.2进程间通信目的
1.管道
1.1 管道是什么
1.2 匿名管道
1.2.1管道通信的特点
1.2.2 匿名管道编码
父进程控制子进程的行为
进程池 -- 池化概念
1.3管道的特征总结
1.4命名管道
1.4.1创建一个命名管道
1.4.2 命名管道编码
0.进程间通信介绍
0.1通信背景
在之前我们学习进程时知道进程具有独立性,所以进程间交互数据的成本就变得非常高。进程之间为什么也进行进程间通信,这就需要谈谈进程间通信的目的了。但是进程具有独立性不是彻底独立,只管自己不理任何人,当然不是这样的。进程之间也是存在通信的。那么进程间通信的方式也有很多种,包括管道,System V IPC,POSIX IPC........那么今天我们先来看看进程间如何通过管道来进行通信。
0.2进程间通信目的
- 数据传输:一个进程需要将它的数据发送给另一个进程。
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种时间(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
1.管道
1.1 管道是什么
管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”。在Linux中管道分两种:匿名管道和命名管道。
1.2 匿名管道
假设现在在内存中有两个独立的进程,如果要想这两个进程之间通信,那么进程1可以先把数据拷贝到磁盘上,然后进程2再去读这个数据【如下图所示】....首先不考虑这种方法是否可行,但是比较好的让我们理解了一个道理:进程在通信之前,必须让不同的进程能够看到同一份资源(文件,内存块.....)
因此在通信之前,如何解决让进程先看到同一份资源呢?并且资源的不同决定了不同种类的通信方式!!!
因此我们正在学习的管道是提供共享资源的一种手段。
我们知道文件在内存和磁盘之间来回切换是非常耗费时间的,因此进程间通信大多都是内存级别的。意思就是在内存内部重建一块区域进行通信。
那么什么是管道呢?
在计算机通信中,我们把文件不再是一个磁盘文件,通过特定的接口表征自己的身份,说明他和磁盘脱离,自己读写数据是就在文件的内存缓冲区,完成数据交互,我们把这个文件叫做管道。因此我们说Linux下一切皆文件,管道也是文件。管道就是一个内存级文件。内容不需要刷新到磁盘中。
1.2.1管道通信的特点
在我们生活中遇到的管道有什么特点呢?那首先问那些都属于管道呢?天然气管道,水龙头管道等等.....那么这些管道大多数情况下都是单向的,并且这些管道都是传输资源的,在计算机中最重要的资源就是数据。
- 单向的
- 传输数据
那么我们如何来保证单向性呢?
我们来看下图,父进程和子进程通过管道完成进程通信如何保证单向性呢,我们刚刚提到了管道是一个文件,是数据的缓冲区,因此当父进程把需要通信的数据通过写的方式写入管道内时,子进程通过读的方式拿到这些资源即可完成父子间的通信。为了保证单向的,我们需要关闭父进程的读端,让父进程只能写,关闭子进程的写段,让子进程只能读。通过这样的方式我们就可以保证父进程只能写数据子进程只能读数据的单向性。
父进程必须以读写方式打开,这是因为子进程会继承下去,这样子进程就不用再打开了。那么谁决定父子关闭什么读写?这不是由管道决定的,这是由我们的需求所决定的。
那么我们如何来打开管道呢?难道要调用两次open()吗?当然不是了,因此操作系统提供了pipe()接口
1.2.2 匿名管道编码
认识pipe()接口,当我们调用piep时,底层会自动帮助我们把文件以读方式和写方式打开,而且我们会的到两个文件描述符,这两个文件描述符会写进pipefd数组内,因此这个数组是一个输出型参数。并且pipe是一个系统调用。返回0表示成功,返回-1表示失败。
接下来我们进行管道的代码:
下面这段代码是管道的创建和验证输出的数组是否使我们所想的两个文件描述符
#include <iostream>
#include <cstdio>
#include <unistd.h>
using namespace std;
int main()
{
int pipefd[2] = {0};
if(pipe(pipefd) != 0)
{
cerr<<"pipe error"<<endl;
return 1;
}
cout<<"fd[0]:"<<pipefd[0]<<endl;
cout<<"fd[1]:"<<pipefd[1]<<endl;
return 0;
}
匿名管道代码演示
#include <iostream>
#include <cstdio>
#include <cstring>
#include <sys/wait.h>
#include <sys/types.h>
#include <unistd.h>
using namespace std;
//演示pipe管道通信的基本过程 -- 匿名管道
int main()
{
//1.创建管道
int pipefd[2] = {0};
if(pipe(pipefd) != 0)
{
cerr<<"pipe error"<<endl;
return 1;
}
//2.创建子进程
pid_t id = fork();
if(id<0)
{
cerr<<"fork error"<<endl;
return 2;
}
else if(id == 0)
{
//child 来进行读取
close(pipefd[1]);
char buffer[1024];
while(true)
{
memset(buffer,0,sizeof(buffer));
ssize_t s = read(pipefd[0],buffer,sizeof(buffer) - 1 );
if(s>0)
{
//读取成功
buffer[s] = '\0';
cout<<"子进程收到消息,消息的内容是:"<< buffer <<endl;
}
else if(s == 0)
{
cout<<"父进程写完了,我也退出了"<<endl;
break;
}
else
{
//do nothing
}
}
close(pipefd[0]);
exit(0);
}
else
{
//parent 来进行写入
close(pipefd[0]);
string msg = "你好子进程,我是父进程!";
int cnt = 0;
while(cnt<5)
{
write(pipefd[1],msg.c_str(),msg.size());
sleep(1);
cnt++;
}
close(pipefd[1]);
cout<<"父进程的工作结束了 退出"<<endl;
}
pid_t res = waitpid(id,nullptr,0);
if(res > 0 )
{
cout<<"父进程等待成功"<<endl;
}
//pipefd[0] 是读
//pipefd[1] 是写
// cout<<"fd[0]:"<<pipefd[0]<<endl;
// cout<<"fd[1]:"<<pipefd[1]<<endl;
return 0;
}
至此父进程的数据就发给了子进程,子进程也能够接受父进程发送来的数据。在上述代码中,我们在父进程中带了sleep(1),让父进程每间1秒向管道内写入一个数据,那么子进程没有带sleep(1),为什么子进程也会随之休眠一秒呢?为了更好的看到这个现象,我们在父子进程里面带上时间戳,写写日志。
我们通过测试观察到子进程代码没有任何的休眠,子进程会随着父进程的节奏读取,那么我们可以得出结论:当父进程没有写入数据的时候,子进程在等!所以,父进程写入之后,子进程才能read(会返回)到数据,子进程打印读取数据要以父进程的节奏为主!
那么,父进程和子进程读写的时候是有一定的顺序性的!当父进程向管道写入的时候,子进程才可以读!
- 管道内部,没有数据,reader就必须阻塞等待(read时等待)
- 管道内部,如果数据写满,writer就必须阻塞等到(writer时等待)
因此pipe内部是自带访问控制机制的以及存在同步和互斥机制的。
所谓的阻塞等待的本质是将当前的tast_struct 放入等待队列中,将PCB的状态由R->S/D/T
父进程控制子进程的行为
假设父进程想让我的子进程做父进程想让子进程做的行为,以及父进程想控制一批(多个)子进程.....该如何来写呢???
我们先写一段父进程控制子进程的行为的代码
#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include <unistd.h>
#include <cassert>
using namespace std;
// 定义一个函数指针
typedef void (*functor)();
vector<functor> functors; //方法的集合
// for debug
unordered_map<uint32_t, string> info;
void f1() { cout << "这是一个处理日志的任务,执行的进程id: [" << getpid() << "]"
<< "执行的时间是:[" << time(nullptr) << "]" << endl; }
void f2() { cout << "这是一个处理数据备份的任务,执行的进程id: [" << getpid() << "]"
<< "执行的时间是:[" << time(nullptr) << "]" << endl; }
void f3() { cout << "这是一个处理网络连接的任务,执行的进程id: [" << getpid() << "]"
<< "执行的时间是:[" << time(nullptr) << "]" << endl; }
void loadFunctor()
{
info.insert({functors.size(), "处理日志任务"});
functors.push_back(f1);
info.insert({functors.size(), "数据备份任务"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接任务"});
functors.push_back(f3);
}
int main()
{
// 0. 加在任务列表
loadFunctor();
// 1.创建管道
int pipefd[2] = {0};
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}
// 2.创建子进程
pid_t id = fork();
if (id < 0)
{
cerr << " fork error" << endl;
return 2;
}
else if (id == 0)
{
// child read
// 关闭不需要的fd
close(pipefd[1]);
while (true)
{
uint32_t operatorType = 0;
//如果有数据就读取 如果没有数据就阻塞等待
ssize_t s = read(pipefd[0], &operatorType, sizeof(uint32_t));
if(s == 0)
{
cout << "我是子进程,我要退出了" <<endl;
break;
}
assert(s == sizeof(uint32_t));
// assert断言 是编译有效的 debug模式
//但是如果是release模式下 断言就没有了
//一旦断言没有了,s变量就是只被定义没有被使用,一次你release模式下 可能有warning
(void)s;
if (operatorType < functors.size())
{
functors[operatorType]();
}
else
{
cerr << "bug? operatorTypr: " << operatorType << cout;
}
}
close(pipefd[0]);
exit(0);
}
else
{
srand((long long)time(nullptr));
close(pipefd[0]);
int num = functors.size();
int cnt = 10;
while (cnt--)
{
//形成任务码
uint32_t commandCode = rand() % num;
cout<< "父进程指派任务完成,任务是: " <<info[commandCode] <<
"任务的编号是:" << cnt << endl;
//想指定进程下答操作的任务
write(pipefd[1], &commandCode, sizeof(uint32_t));
sleep(1);
}
close(pipefd[1]);
pid_t res = waitpid(id, nullptr, 0);
if (res)
cout << " wait sucess " << endl;
}
return 0;
}
通过这份代码父进程控制了子进程的行为,往后我们只需要修改functors里面的方法,就可以让子进程执行指定的任务。
进程池 -- 池化概念
那么如果是一个父进程想要控制一批子进程呢??? 父进程怎么样把一批任务交给子进程呢?
因此,我们有多少个进程,我们就创建多少个管道,父进程可以通过对指定管道写入特定的任务,让指定的子进程做对应的事情,这样我们就引入了一个池化的概念!那么我怎么如何书写对应的代码呢?
int processNum = 5;
int main()
{
for(int i = 0;i<processNum;++i)
{
//定义保存管道fd的对象
int pipefd[2] = {0};
//创建管道
pipe(pipefd);
pid_t id = fork();
if(id == 0 )
{
//子进程执行
exit(0);
}
//父进程做得事情
}
return 0;
}
上面这份代码就成功的将每一次创建的子进程和父进程都独立的进行了控制,因此父进程在不断的循环,给子进程指派任务的时候需要知道给哪一个进程指派,指派什么任务,通过什么指派呢? 因此我们接下来需要做的就是解决这些问题。因此我们需要一节pair结构
#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include <unistd.h>
#include <cassert>
using namespace std;
// 定义一个函数指针
typedef void (*functor)();
vector<functor> functors; //方法的集合
// for debug
unordered_map<uint32_t, string> info;
void f1() { cout << "这是一个处理日志的任务,执行的进程id: [" << getpid() << "]"
<< "执行的时间是:[" << time(nullptr) << "]\n\n" << endl; }
void f2() { cout << "这是一个处理数据备份的任务,执行的进程id: [" << getpid() << "]"
<< "执行的时间是:[" << time(nullptr) << "]\n\n" << endl; }
void f3() { cout << "这是一个处理网络连接的任务,执行的进程id: [" << getpid() << "]"
<< "执行的时间是:[" << time(nullptr) << "]\n\n" << endl; }
void loadFunctor()
{
info.insert({functors.size(), "处理日志任务"});
functors.push_back(f1);
info.insert({functors.size(), "数据备份任务"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接任务"});
functors.push_back(f3);
}
//第一个int32_t:进程pid
//第二个int32_t:该进程对应的管道写端fd
typedef pair<int32_t, int32_t> elem;
int processNum = 5;
void work(int blockFd)
{
cout << "进程 [" << getpid() << "] 开始工作" << endl;
//进行
while (true)
{
//阻塞等待 获取任务信息
uint32_t operatorCode = 0;
ssize_t s = read(blockFd, &operatorCode, sizeof(uint32_t));
if (s == 0)
break;
assert(s == sizeof(uint32_t));
(void)s;
//处理任务
if (operatorCode < functors.size())
{
functors[operatorCode]();
}
}
cout << "进程 [" << getpid() << "] 结束工作" << endl;
}
// [子进程的pid,子进程的管道fd]
void sendTask(const vector<elem> & processFds)
{
srand((long long)time(nullptr));
while (true)
{
sleep(1);
//选择一个进程 选择进程是随机的 没有压着一个进程给任务 -- 随机的
//较为均匀的将任务给所有的子进程 --- 负载均衡
uint32_t pick = rand() % processFds.size();
// 选择任务
uint32_t task = rand() % functors.size();
//把任务给一个指定的进程
write(processFds[pick].second, &task, sizeof(task));
//打印对应的提示信息
cout << "父进程指派任务 --> " << info[task] << "给进程:" << processFds[pick].first
<< "编号:" << pick << endl;
}
}
int main()
{
loadFunctor();
vector<elem> assignMap;
for (int i = 0; i < processNum; ++i)
{
//定义保存管道fd的对象
int pipefd[2] = {0};
//创建管道
pipe(pipefd);
pid_t id = fork();
if (id == 0)
{
//子进程读取
close(pipefd[1]);
//子进程执行
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//父进程做得事情
close(pipefd[0]);
elem e(id, pipefd[1]);
assignMap.push_back(e);
}
cout << "create all process success !" << endl;
//父进程,派发任务
sendTask(assignMap);
// 回收资源
for (int i = 0; i < processNum; ++i)
{
if (waitpid(assignMap[i].first, nullptr, 0) > 0)
cout << "wait for : pid = " << assignMap[i].first << " wait success"
<< "number " << i << endl;
close(assignMap[i].second);
}
return 0;
}
我们发现此时父进程5个子进程随机的派发不同的任务,这就是一种进程池。至此匿名管道全部写完。
其中,我们在shell命令行中写的 | 就是匿名管道
1.3管道的特征总结
- 管道只能用来进行具有血缘关系的进程之间,进行进程间通信,常用语父子通信。
- 管道只能单向通信(由内核设计实现)半双工的一种有特殊情况
- 管道自带同步机制(pipe满,write等;pipe空,read满) -- 自带访问控制
- 管道是面向字节流的 -- 先写的字符 一定是先被读取的 没有格式边界 需要用户来自定义区分内容的边界 [sizeof(uint32_t)]
- 管道的生命周期随进程 -- 管道是文件 -- 进程退出了 曾经打开的文件也会退出
1.4命名管道
我们刚刚提到的都是父子间(血缘)通信,如果我们想要两个毫不相干的两个进程之间通信,应该怎么办。因此我们接下来要讲的就是命名管道。命名管道和之前的匿名管道最大的区别就是可以让任意两个进程之间通信。
1.4.1创建一个命名管道
- 创建一个命名管道可以在命令上创建 使用如下这个命令
mkfifo filename
这个myfifo就是一个管道文件,前面以p开头。以p开头就是管道,假如我们现在要在左侧想管道内部写入一些东西,在右侧实时查看,当我们左侧回车按下时候,右侧立马出现了“aaaaaa”
但是这样还是不能很好的观察现象,我们在右侧写一个实时的查看脚本,让一直想管道文件内部写入 bbbbb
while :; do echo "bbbbb" ; sleep 1; done >> myfifo
- 命名管道也可以在程序中创建,相关函数
int mkfifo(const char* filename,mode_t mode);
我们发现命名管道是带路径的,这有什么作用呢? 其实命名管道是通过一个fifo文件,由于这个文件存在路径,我们都知道路径具有唯一性,因此通过路径我们进程都可以看到这一份资源。
1.4.2 命名管道编码
由于我们知道命名管道可以实现两个不想关的进程完成通信,因此我们接下来将写两个文件(进程),让这两个文件进行通信。我们想让clientFifo.cpp这个进程和severFifo.cpp这个进程通过命名管道通信,该怎么写呢?
//comm.h
#pragma once
#include <iostream>
#include <cstdio>
#include <string>
#include <cstring>
#include <cerrno>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#define IPC_PATH "./.fifo"
// makefile
.PHONY:all
all: clientFifo severFifo
clientFifo:clientFifo.cpp
g++ -Wall -o $@ $^ -std=c++11
severFifo:severFifo.cpp
g++ -Wall -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f clientFifo severFifo .fifo
//clientFifo.cpp
//写入
#include "comm.h"
using namespace std;
int main()
{
int pipeFd = open(IPC_PATH,O_WRONLY);
if(pipeFd < 0)
{
cerr<<"Open : " << strerror(errno) <<endl;
return 1;
}
#define NUM 1024
char line[NUM];
//通信
while(true)
{
printf("请输入你的消息# ");
fflush(stdout);
memset(line,0,sizeof(line));
//从键盘按行为单位读取
if(fgets(line,sizeof(line),stdin) != nullptr )
{
line[strlen(line) - 1] = '\0';
write(pipeFd,line,strlen(line));
}
else
{
break;
}
}
close(pipeFd);
cout<< "客户端退出啦"<<endl;
return 0;
}
//severFifo.cpp
//让severFifo 来读取
#include "comm.h"
using namespace std;
int main()
{
umask(0);
if(mkfifo(IPC_PATH,0600) != 0)
{
cerr<<"mkfifo client" <<endl;
return 1;
}
int pipeFd = open(IPC_PATH,O_RDONLY);
if(pipeFd < 0)
{
cerr << "open fifo error" << endl;
return 2;
}
//正常通信
char buffer[1024];
while(true)
{
ssize_t s = read(pipeFd,buffer,sizeof(buffer) - 1);
if(s > 0 )
{
buffer[s] = '\0';
cout<<" 客户端-->服务器# " << buffer << endl;
}
else if(s == 0)
{
cout<< "客户端退出了,我也退出了";
break;
}
else
{
//do nothing
cout << "read error" << strerror(errno) <<endl;
break;
}
}
close(pipeFd);
cout<< "服务端退出啦"<<endl;
unlink(IPC_PATH);
return 0;
}
通过这个代码我们成功的在两个毫无相关的进程之间完成了管道通信。
(本篇完)