每一个不曾起舞的日子,都是对生命的辜负。
进程间通信
- 进程间通信
- 一.理解进程间通信
- 1.1 什么是通信
- 1.2 为什么要有通信
- 1.3 如何进行进程间通信
- 二.管道
- 2.1 匿名管道
- 2.2 匿名管道编码部分
- 2.3 管道的特点
- 2.4 如何理解命令行中的管道
- 2.5 进程控制多个子进程
- 三.命名管道
- 3.1 预备工作
- 3.2 命令行中的命名管道
- 3.3 命名管道
进程间通信
之前提到过,进程之间具有独立性。而今天我们需要进行通信,那么通信的成本一定不低。
一.理解进程间通信
1.1 什么是通信
- 数据传输: 一个进程需要将它的数据发送给另一个进程
- 资源共享: 多个进程之间共享同样的资源。
- 通知事件: 一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制: 有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
1.2 为什么要有通信
在之前所写的C/C++代码中,都是单进程的。但实际上,我们在完成某种业务内容时是需要多进程协同的。比如cat file | grep 'hello'
就是将file中的内容打印在显示器之前通过grep进行指定内容的过滤,这就是多进程协同。
1.3 如何进行进程间通信
经过发展,最终有这么两套方案:
-
POSIX:让通信过程可以跨主机
-
System V:聚焦在本地通信,即一台机器的两个进程进行通信。
- System V 消息队列
- 共享内存
- System V 信号量
对于System V ,在这里只了解共享内存(消息队列和信号量不了解)除了上述两套标准,还有一种方法:管道也是通信的一种方式,管道依托于文件系统来完成进程间通信的方案。
二.管道
管道是基于文件系统的进程通信的方式。
- 管道是Unix中最古老的进程间通信的形式。
- 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
而对于管道,分为两种:一种是匿名管道、另一种是命名管道。
2.1 匿名管道
一、回顾文件系统
我们之前所学习的文件系统中,有这样的结构:通过PCB—task_struct(进程控制块),每一个进程都有一个task_struct,同样知道struct files_struct其中包含一个进程描述符表的array数组,通过特定的文件描述符找到磁盘加载到内存中对应的文件。
当该PCB创建子进程时,不会拷贝磁盘中的文件,而是拷贝一份struct files_struct同样指向父进程对应的struct file
,
二、理解通信的本质问题
- OS需要直接或间接给通信双方的进程提供“内存空间”
- 要通信的进程,必须看到一份公共的资源
通信的成本一定不低,这是因为不能直接考虑通信的问题,必须先让不同的进看到同一份资源,然后才能利用这份资源进行通信。因此我们未来学习通信的接口,与其说是通信的接口,倒不如说是同一份资源的接口。而我们目前所学习的就是让不同进程如何能够看到同一份资源。
不同的通信种类,实际上就是OS系统的不同模块对应的功能,比如文件系统之间通信的模块就是管道,System V的模块就是System V通信……
而对于上面的struct file,实际上就是父进程与子进程的同一份资源,这份资源是由文件系统提供的,struct file包括file的操作方法和自己的内核缓冲区;父进程通过文件缓冲区将数据写入,子进程通过文件缓冲区将数据读取,这不就是一个进程写入,另一个进程读取,不就是进程间通信吗?
因此这个struct file文件就是管道文件。
三、管道文件的刷新
我们知道,struct file是从磁盘加载到内存的,而父子进程的每一次写入,struct file不会从内存中刷新到磁盘,虽然通过一定的操作是可行的,但进程与进程之间的通信是从内存到内存的,没有必要牵扯到磁盘。一旦刷新到磁盘,就会大大降低通信的速度。所以管道文件是一个内存级别的文件,不会进行磁盘刷新。
四、匿名管道
经过上面的学习,那如何让两个进程看到同一个管道文件呢?——>通过fork创建子进程完成。但当前这个管道文件并没有名字,所以被称为匿名管道。
为什么管道只能进行单向通信?我们本来所描述的就是单向通信,因此将其起名为管道。
为什么父进程分别以读和写的方式打开同一个文件?只有父进程打开读和写,产生的文件描述符才会被子进程继承,子进程才能有读和写的功能。
总结一下上述核心:
我们对应的父进程通过调用管道特定的系统调用,以读和写的方式打开一个内存级的文件,并通过fork创建子进程的方式,被子进程继承下去之后,各自关闭对应的读写端,形成的一条通信信道,这条信道是基于文件的,因此称为管道。
匿名管道:目前能用来进行父子进程之间进行进程间通信!
上述所讲的都是如何建立公共的资源,并没有涉及到通信,通信需要在具体场景才能实现。
2.2 匿名管道编码部分
int pipe(int pipefd[2]);
//管道:输出型参数,成功则返回0,头文件为unistd.h
功能:获取读和写的文件描述符(0, 1)传到参数中。
#include<iostream>
#include<cstdio>
#include<string>
#include<cstring>
#include<unistd.h>
#include<cassert>//C/C++混搭
#include<sys/types.h>
#include<sys/wait.h>
using namespace std;
//让父进程读取,子进程写入
int main()
{
//第一步:创建管道文件,打开读写端
int fds[2];
int n = pipe(fds);
assert(n == 0);
//第二步:fork
pid_t id = fork();
assert(id >= 0);
if(id == 0)
{
//子进程进行写入,所以关掉读权限
close(fds[0]);
//子进程的通信代码
const char *s = "我是子进程,我正在给你发消息";
int cnt = 0;
while(true)
{
cnt++;
char buffer[1024];//只有子进程能看到
snprintf(buffer, sizeof buffer, "child->parent say: %s[%d][%d]", s, cnt, getpid());
write(fds[1], buffer, strlen(buffer));//反斜杠0只有C语言认
sleep(1);//细节,每隔一秒写一次
}
//子进程
close(fds[1]);
exit(0);
}
//父进程进行读取
close(fds[1]);
//父进程的通信代码
while(true)
{
char buffer[1024];
ssize_t s = read(fds[0], buffer, sizeof(buffer)-1);
if(s > 0) buffer[s] = 0;//去除反斜杠0
cout << "Get Message#" << buffer <<"| my pid: " << getpid() << endl;
//细节:父进程可没有进行sleep
}
n = waitpid(id, nullptr, 0);
assert(n == id);
//0, 1, 2->……
//谁是读取,谁是写入
//[0]:读取
//[1]:写入
cout << "fds[0]: " << fds[0] << endl;//3 读
cout << "fds[1]: " << fds[1] << endl;//4 写
return 0;
}
因此,上述代码的子进程没有打印任何的消息,而是我们的父进程获取读取消息并打印出来,这种通信就被成为管道通信。
2.3 管道的特点
读写特征:
上述代码中我们在子进程中sleep(1),实际上这使得父进程在read时暂停1秒,即在read(读)时阻塞;那如果把子进程的sleep去掉,在父进程中sleep(n),那么子进程的缓冲区就会被写满(因为子进程没有延迟非常快),如果还在写,就会将原来的覆盖,导致写端被阻塞;如果将写端关闭,那么就会读到0;如果读关闭,依旧让他去写,实际上没有任何意义,浪费系统资源,OS会给写进程发送信号,终止写端。通过实现最后一组情况,结果发送的信号为13号信号:SIGPIPE。
管道的特征:
- 管道的生命周期随进程一样。
- 匿名管道可以用来进行具有血缘关系的进程直接进行通信,常用于父子通信。
- 管道是面向字节流的(网络)。
- 半双工 – 单向通信(特殊概念)。
- 互斥与同步机制 – 对共享资源进行保护的方案。
后三点慢慢接触。
2.4 如何理解命令行中的管道
对于cat file | grep 'hello
在命令中实际上会作为字符串先被扫描一遍,将出现的 | 记录下来,并创建进程。其中产生的缓冲区会将管道左侧将要打印的数据加载到缓冲区,在通过右侧的进行筛选并打印到指定位置。
2.5 进程控制多个子进程
父进程可以实现向任意一个子进程中写入,我们可以让父进程向任何进程中写入一个四字节的命令操作码,称之为commandCode,即现在想让哪一个进程运行,就向哪一个进程发送数据,举个例子:如果发送是1,就让子进程下载,发送是2,就让子进程做特定的计算……;那为什么可以这样随意控制子进程是否运行呢?这是因为如果我们不将数据写入或者写的慢,那么子进程就需要等,产生阻塞,所以跟根据这样的思想设计如下代码:
// 我们将我们的任务均衡的下发给每一个子进程,让子进程进行:负载均衡--单机版
#include <iostream>
#include <string>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <ctime>
#define MakeSeed() srand((unsigned long)time(nullptr) ^ getpid() ^ 0x171237 ^ rand() % 1234)
#define PROCSS_NUM 5
/子进程要完成的一些任务,模拟一下
// 函数指针 类型
typedef void (*func_t)();
void downLoadTask()
{
std::cout << "下载任务" << std::endl;
sleep(1);
}
void ioTask()
{
std::cout << " IO任务" << std::endl;
sleep(1);
}
void flushTask()
{
std::cout << "刷新任务" << std::endl;
sleep(1);
}
void loadTaskFunc(std::vector<func_t> *out)
{
assert(out);
out->push_back(downLoadTask);
out->push_back(ioTask);
out->push_back(flushTask);
}
/下面的代码是一个多进程程序
class subEp // Endpoint
{
public:
subEp(pid_t subId, int writeFd)
: subId_(subId), writeFd_(writeFd)
{
char nameBuffer[1024];
snprintf(nameBuffer, sizeof nameBuffer, "process-%d[pid(%d)-fd(%d)]", num++, subId_, writeFd_);
name_ = nameBuffer;
}
public:
static int num;
std::string name_;
pid_t subId_;
int writeFd_;
};
int subEp::num = 0;
int recvTask(int readFd)
{
int code = 0;
ssize_t s = read(readFd, &code, sizeof code);
if (s == 4)
return code;
else if (s <= 0)
return -1;
else
return 0; // 不可能出现这种情况
}
void sendTask(const subEp &process, int taskNum)
{
std::cout << "send task num: " << taskNum << "send to " << process.name_ << std::endl;
int n = write(process.writeFd_, &taskNum, sizeof(taskNum));
assert(n == sizeof(int));
(void)n;
}
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{
std::vector<int> deleteFd;
for (int i = 0; i < PROCSS_NUM; ++i)
{
// 管道建立
int fds[2];
int n = pipe(fds);
assert(n == 0);
(void)n;
pid_t id = fork();
if (id == 0)
{
for (int i = 0; i < deleteFd.size(); i++)
close(deleteFd[i]);
// 子进程,进行处理任务
close(fds[1]);
while (true)
{
// 1. 获取命令码,如果没有发送,子进程应该阻塞
int commandCode = recvTask(fds[0]);
// 2. 完成任务
if (commandCode >= 0 && commandCode < funcMap.size())
funcMap[commandCode]();
else if (commandCode == -1)
break;
}
exit(0);
}
close(fds[0]);
subEp sub(id, fds[1]); // 父进程写入
subs->push_back(sub);
deleteFd.push_back(fds[1]);
}
}
void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{
int processnum = subs.size();
int tasknum = funcMap.size();
bool forever = (count == 0 ? true : false);
while (true)
{
// 1. 选择一个子进程 --> std::vector<subEp> -> index - 随机数
int subIdx = rand() % processnum;
// 2. 选择一个任务 --> std::vector<func_t> -> index
int taskIdx = rand() % tasknum;
// 3. 任务发送给选择的进程
sendTask(subs[subIdx], taskIdx);
sleep(1);
if (!forever)
{
count--;
if (count == 0)
break;
}
}
// write quit -> read 0
for (int i = 0; i < processnum; i++)
close(subs[i].writeFd_);
}
void waitProcess(std::vector<subEp> processes)
{
int processnum = processes.size();
for (int i = 0; i < processnum; i++)
{
waitpid(processes[i].subId_, nullptr, 0);
std::cout << "wait sub process success ...: " << processes[i].subId_ << std::endl;
}
}
int main()
{
MakeSeed();
// 1. 建立子进程并建立和子进程通信的信道
// 1.1 加载方法表
std::vector<func_t> funcMap; // 方法表
loadTaskFunc(&funcMap);
// 1.2 创建子进程,并且维护好父子通信信道
std::vector<subEp> subs;
createSubProcess(&subs, funcMap);
// 2. 走到这里的就是父进程,控制子进程,负载均衡的向子进程发送命令码
int taskCnt = 3; // 0 : 永远进行;大于0,父进程循环几次
loadBlanceContrl(subs, funcMap, taskCnt);
// 3. 回收子进程信息
waitProcess(subs);
return 0;
}
三.命名管道
前面提到的都是匿名管道,接下来看看命名管道:
3.1 预备工作
新建servers.cc与client.cc及makefile,让servers.cc负责整体工作。
server.cc
#include<iostream>
int main()
{
std::cout << "hello server" << std::endl;
return 0;
}
client.cc
#include<iostream>
int main()
{
std::cout << "hello client" << std::endl;
return 0;
}
makefile
(同时生成两个可执行)
.PHONY:all
all:server client
server:server.cc
g++ -o $@ $^ -std=c++11 -g
client:client.cc
g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
rm -f server client
3.2 命令行中的命名管道
通过指令:mkfifo 文件名
就可以创建一个管道文件。
左侧将打印的信息重定向到named_pipe管道文件中,右侧cat作为进程再把named_pipe管道数据读了进来,通过这种方式,就完成了命令行式的进程间通信。但发现管道文件的大小仍为0。
如果两个进程打开同一个文件,那么在系统角度,还用不用为第二个进程在打开文件的时候在内核当中再重新创建一个struct file
呢?
答案是没有必要的。操作系统会自己识别文件已经被打开了,就不再需要这个操作了。实际上这也是操作系统为了减轻没必要的性能损失。
我们之前提到过,要想让两个进程之间进行通信,就需要有一份共享的资源,匿名管道以继承的方式拥有共同的文件(文件地址具有唯一性),那么命名管道是如何让不同的进程看到同一份资源的呢?
让不同的进程打开指定名称(文件路径+文件名)的同一个文件就可以了。
即我们之前演示的命令行中的文件路径默认是当前路径,因此能够进行进程间通信。
3.3 命名管道
为了能让client.cc和server.c看到同一份资源。因此再新建一个头文件:comm.hpp
对于mkfifo,不仅仅在指令中存在,在系统调用中也有此接口:
头文件:#include<sys/types.h>
#include<sys/stat.h>
接口:int mkfifo(const char *pathname, mode_t mode);
mode_t类型为权限,返回值为0是创建成功。
既然都要用,那就放在公共的comm.hpp中。
接下来,我们就需要将管道建立在指定路径下,既可以建立在当前路径下,也可以建立在系统的tmp路径下,此次就建立在tmp路径下:(tmp路径可以被任何人读、写、执行,前面的文章提到过)
comm.hpp
(公共头文件)//暂时不全
#pragma once
#include<iostream>
#include<string>
#include<cstring>
#include<sys/types.h>
#include<sys/stat.h>
#include<cerrno>
#include<cassert>
#define NAMED_PIPE "/tmp/mypipe.106"
bool createFifo(const std::string& path)
{
umask(0);
int n = mkfifo(path.c_str(), 0666);//读、写、执行
if(n == 0) return true;
else
{
std::cout << "errno: " << errno << "err string: " << strerror(errno) << std::endl;
return false;
}
}
//去掉管道文件
void removeFifo(const std::string& path)
{
}
此时,我们对上面的代码稍作改动并保存成如下,运行观察一下结果:
再次创建就会失败,因为文件已经存在。先通过指令rm删掉再继续。
但是如果想在代码中删除,如何做?因此接下来介绍删除文件的接口:
头文件:#include<unistd.h>
函数接口:int unlink(const char* path);
功能:删除文件path,删除成功则返回0。
这样,就可以创建文件之后自动删除,如果想要观察,就需要在创建与删除之间加上个sleep,否则运行太快无法具体观察创建和删除的过程。
至此,我们就完成了通过server.cc对管道文件的创建和删除。然后呢?只要能创建和删除了,然后就是通信了,那server.cc和client直接如何通信呢?接下来的代码就没有新的东西了,即让server.cc和client.cc打开同一个文件,让server.cc读,让client.cc写,这样就可以了。代码:
comm.hpp
#pragma once
#include<iostream>
#include<string>
#include<cerrno>
#include<cassert>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#define NAMED_PIPE "/tmp/mypipe.106"
bool createFifo(const std::string& path)
{
umask(0);
int n = mkfifo(path.c_str(), 0666);//读、写、执行
if(n == 0) return true;
else
{
std::cout << "errno: " << errno << "err string: " << strerror(errno) << std::endl;
return false;
}
}
//去掉管道文件
void removeFifo(const std::string& path)
{
int n = unlink(path.c_str());
assert(n == 0);//debug有效,release里面就被去掉了
(void)n;//n不使用就会出现warning,代码变成release之后没有assert,n就不会被使用,因此在这里使用一下。
}
server.cc
#include"comm.hpp"
int main()
{
std::cout << "server begin: " << std::endl;
int rfd = open(NAMED_PIPE, O_RDONLY);
std::cout << "server end: " << std::endl;
assert(r);
(void)r;
int rfd = open(NAMED_PIPE, O_RDONLY);
if(rfd < 0) exit(1);
//read
char buffer[1024];
while(true)
{
ssize_t s = read(rfd, buffer, sizeof(buffer)-1);
if(s > 0)
{
buffer[s] = 0;
std::cout << "client->server# "<< buffer << std::endl;
}
else if(s == 0)
{
std::cout << "client quit, me too!" <<std::endl;
break;
}
else
{
std::cout << "err string: " << strerror(errno) << std::endl;
break;
}
}
close(rfd);
removeFifo(NAMED_PIPE);//删除
return 0;
}
client.cc
#include"comm.hpp"
int main()
{
std::cout << "client begin: " << std::endl;
int wfd = open(NAMED_PIPE, O_WRONLY);
std::cout << "client end: " << std::endl;
if(wfd < 0) exit(1);
//write
char buffer[1024];
while(true)
{
std::cout << "Please Say# ";
fgets(buffer, sizeof(buffer), stdin);
ssize_t n = write(wfd, buffer, strlen(buffer));
assert(n == strlen(buffer));
(void)n;
}
close(wfd);
return 0;
}
执行观察:先运行server,再运行client,观察server端的变化:
通过这个现象就可以看出,我们将读的一段打开了,他不会直接运行,而是阻塞到读端,当把写端打开了,他才会继续向下运行。也就是说,读端和写端都打开,才会继续向后运行。其次我们发现:左侧的写端没有空行,但是右端的有空行,这是因为左侧的回车同样被存到/tmp/mypipe.106中,因此在读端读时就会将其看成换行并打印在屏幕上,因此下面这样就可以解决:
最后在client里进行ctrl c结束。至此,我们就完成了通信。