目录
1.进程间通信介绍
1.1.简要介绍
1.2.进程间通信的目的
1.3.进程间通信的本质
2.管道
2.1.管道的通信原理
2.2.匿名管道
2.3.命名管道
2.4.基于匿名管道的进程池demo
2.4.1.进程池的相关引入
2.4.2.整体框架的分析
2.4.3.代码的实现
1.进程间通信介绍
1.1.简要介绍
进程间通信(Inter-Process Communication,简称IPC)是指在不同进程之间传播或交换信息
我们知道:进程之间是独立的,所以进程之间的进程间通信一定不是两个进程直接通信的,为了保证进程间的独立性和实现进程间通信,操作系统就设计了若干种进程间通信方式,来实现多进程之间的协同工作。
1.2.进程间通信的目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。
1.3.进程间通信的本质
进程间通信就是不同的进程通过 操作系统 这一个中间媒介,结合设计好的通信方式来进行通信的,本质上就是实现不同的进程能够访问到同一份资源,并从资源上获取信息。
- 比如存在进程A、B,其中A进程写入数据进入缓冲区,而B进程从这一块缓冲区中读取内容,这时A、B并没有之间接触,而是从中间商“缓冲区”处使得B进程获取信息。
- 同理,A、B进程可以从某一块缓冲区中读取数据。
2.管道
管道是一种基于文件系统的进程间通信方式,管道文件允许访问它的进程,通过它来对一块缓冲区的数据进行访问,通过进程对这一块缓冲区进行读写,来实现进程间数据的交换。
2.1.管道的通信原理
生活中,我们见到的管道,当有流形成时,这个管道在某一个时刻或者时间段都是只允许单向流通的,比如水管中水的流动一般都是单向的,我们也没有见过管道发挥功能时,先向左流再从右往左流吧……
进程间的管道通信也是如此,一般来说:管道这种通信方式是单向流动的
因为进程具有独立性,所以进程是不能够直接进行通信的,比如A,B进行通信,只允许进行A--中间渠道--B或者B--中间渠道--A ,所以管道的通信原理就是作为中间渠道,在操作系统中,系统实现管道的功能是通过加载进内存的文件缓冲区实现的,并没有实际对管道文件进行操作,而是通过A/B往缓冲区读/写内容,然后B/A进行读/写……
另外管道的通信具有以下4种情形(规定)和三种特性:
四种情形:
- 正常情况下,如果管道中没有数据,也就是写端当前没有写入时,读端必须等待,直到写端提供数据。
- 如果管道中的数据写满时,如果需要继续写入,写端必须等待,直到读端读取完数据,写端才可以继续写入。
- 写端关闭时,读端直接接收到read()函数的返回值为0,表示读取结束,读到文件结尾。
- 读端关闭时,写端不会直接关闭,如果写端仍不断写入,操作系统会介入杀掉写端进程。
三种特性:
- 管道是单向通信的,是一种半双工通信
- 管道是面向字节流的,也就是对应C++IO流中的字符流,管道可以是整型流、字符流
- 管道的生命周期是伴随进程的,因为管道通信的本质就是通过文件系统在内存中开辟一块缓冲区,来间接实现进程间通信的
基于4种情形和三种特性,操作系统实现了两种管道通信方式:匿名管道和命名管道,前者只能用于具有血缘关系的进程,后者能用于所有进程……
2.2.匿名管道
C语言提供创建匿名管道的函数方法:
接下来我们通过匿名管道的测试来探究一下其原理:
// 匿名管道的测试
void test1()
{
// 设置管道的文件描述符数组
int pipefd[2] = {0};
// 将fd传入pipe接收返回值
int n = pipe(pipefd);
// 返回值为3,4表示占用了文件指针数组第3个、第4个文件
cout << pipefd[0] <<" "<< pipefd[1] << endl;
int pipefd1[2] = {0};
int m = pipe(pipefd1);
cout << pipefd1[0] << " " << pipefd1[1] << endl;
}
通过这段代码的测试,我们发现除了0(stdin),1(stdout),2(stderr),我们在创造一个匿名管道时,会占用两个文件fd,在实际应用时,这两个文件分别负责读写功能,为什么需要这样设计呢?
首先我们要知道匿名管道只有拥有血缘关系的进程才可以使用的!!!
我们先从最简单的管道通信----父子进程通信出发:
如图这就是:匿名管道通信的原理,通过管道函数,在进程中开辟两个文件来实现读写功能,再通过进程的拷贝,实现对同一个文件的读写,最终各自释放一个读/写端,实现单向通信。
下面是一个父子进程的匿名管道通信样例:
// 父子通过匿名管道通信demo
// 只要能把文件描述符继承下去,就能够实现匿名管道通信
// 也就是可以进行兄弟、爷孙进程的管道通信
// 没有任何继承体系的进程之间无法使用匿名管道
void test2()
{
int pipefd[2] ={0};
// 将fd传入pipe接收返回值
int n = pipe(pipefd);
// 父子进程关闭各自不使用的fd
// 实现单向通信的管道
pid_t id = fork();
if(id < 0)
{
perror("error fork");
}
else if(id == 0)
{
// child
// 关闭读的指向
close(pipefd[0]);
int count = 3;
cout << "writing data into the buffer" << endl;
while(count--)
{
char mesg[BUFFSIZE];
cin >> mesg;
// 通过系统接口 将写入的数据通过 写 的文件接口进入文件缓冲区中
write(pipefd[1], mesg, strlen(mesg));
}
exit(0);
}
// father
// 关闭写的指向
close(pipefd[1]);
char buffer[BUFFSIZE];
while(true)
{
// 通过读接口把文件缓冲区的内容写入buffer中
// 读取buffer大小减1预留 \0 字符
ssize_t n = read(pipefd[0], buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
cout << "child wrote: " << buffer << " to father process " <<endl;
}
}
}
在这段代码demo中我们只实现了由子进程向缓冲区进行写功能,父进程向缓冲区进行读功能,那么我们能不能够实现双向通信呢?
答案是可以的,但是我们又要保证一个管道的流向是单向的,注意这里我们用的是“一个”,所以我们可以通过pipe函数再次创建一个管道文件,然后把子进程的写端关闭,父进程的读端关闭,再进行链接。即通过这个demo,加上逻辑相反的代码就可以实现了
2.3.命名管道
我们在匿名管道的学习中,了解到它的可行性是通过具有血缘关系的进程会拷贝同一个file_struct结构体的指针,来实现读写文件指向同一块区域的。但是对于不具有血缘关系,也就是完全不相干的两个进程我们该如何通信呢?
这时我们可以通过命名管道,创建FIFO文件来实现在不相关的进程之间进行通信……
// 创建命名管道fifo
int n = mkfifo(文件名, 文件权限);
// 文件返回值
int r_open = open(文件名, 文件打开方式);
一般来说我们使用命名管道,首先先创建命名管道,然后两个不同的进程再通过系统调用接口通过不同的打开方式(读/写)来打开这个管道文件。
下面我们用两个进程的交互来演示一下:
进程一:
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#define FILENAME "fifo"
using namespace std;
int main()
{
// 创建命名管道fifo
int n = mkfifo(FILENAME, 0666);
// 文件返回值
int r_open = open(FILENAME, O_RDONLY);
char buffer[1024];
while (1)
{
ssize_t r_read = read(r_open, buffer, sizeof(buffer) - 1);
if (r_read > 0)
{
buffer[r_read] = 0;
cout << "recieve the message from client: " << buffer << endl;
}
}
close(r_open);
}
在这段代码中:
- 我们先创建了命名管道,接着通过只读方式打开文件
- 在死循环中,我们不断的读取打开文件返回的文件fd的内容,当r_read = 0时表示读端关闭,r_read > 0 时正常读取
- 读取后加载进我们设定好的buffer中,然后再打印出来
进程二:
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#define FILENAME "fifo"
using namespace std;
int main()
{
int r_open = open(FILENAME, O_WRONLY);
string message;
while (1)
{
cout << "client send: ";
getline(cin, message);
ssize_t r_write = write(r_open, message.c_str(), message.size());
}
close(r_open);
}
这段代码中:
- 我们在上一个进程中调用创建好的管道文件,获得相同的文件fd,这里因为我们默认创建两个进程都在当前目录下,这里的本质就是最终找到fifo这个文件
- 我们往fifo形成的缓冲区中写入数据,当我们完成写入时,对应的上一个进程就会打印相同的内容
XShell中的现象:
这样我们就实现了两个互不相干的进程间的通信了……
2.4.基于匿名管道的进程池demo
2.4.1.进程池的相关引入
进程池是一种常见的多进程编程技术,用于优化资源使用和提高性能。它可以在程序启动时预先创建一定数量的进程,并将这些进程保存在池中以备后续使用。当有任务需要处理时,程序会从进程池中取出一个空闲的进程来处理任务,任务处理完毕后,该进程会被放回进程池中,等待下一个任务的到来。
如图我们通过父进程,创建五个子进程,在子进程创建的同时我们创建管道文件,进行父进程和子进程通过匿名管道的通信
当我们抽象出这一个模型图后,我们开始着手开辟5个管道和实现这个进程池……
// 创建5个子进程和实现5个管道
for (int i = 0; i < pipe_num; i++)
{
// 1.定义并创建管道
int pipefd[2];
int n = pipe(pipefd);
cout << "成功创建管道:" << i << endl;
assert(n == 0);
// 2.创建进程
pid_t id = fork();
assert(id != -1);
// 3.构建单向信道
if (id == 0)
{
// child
// 子进程关闭当前的 写 端
close(pipefd[1]);
exit(0);
}
// father
close(pipefd[0]);
}
这段代码,我们循环5次,创建管道,并链接父子进程,但是这一段代码实际上在进行循环时会出现bug,具体如图:
按照这个思路:最终我们发现在创造了5个子进程之后,最后一个子进程对应的file_struct会继承4个父进程的写端!!!这个bug虽然不影响我们的通信,但是会影响我们后续对写端的回收,终止这个进程池,这在我们后面的代码模块有具体讲解!!!
2.4.2.整体框架的分析
在上面部分内容,我们完成了进程池的创建,接下来就是代码对进程池逻辑的实现了,首先进程池通过父进程来管理5个子进程,当获取到任务时,首先通过父进程接收然后分配给子进程。接着子进程各自处理自己分配到的任务,任务完成后继续接收新的任务。
结合我们通过匿名管道来实现,我们初步设计成父进程作为写端通过匿名管道传输任务给子进程,然后子进程通过读端读取任务来进行任务的调用。那么我们就将整个框架设计为:
进程的创建 --- >管道的搭建--->管道间进程通信的管理--->任务内容的创建--->任务的发布--->子进程进行任务的处理--->资源释放
2.4.3.代码的实现
这一部分主要是代码的实现,因为篇幅过长并且代码中注释较为详细,我们通过2.4.2.这个篇章在结合代码内容就能大概理解这个demo
work.h
#pragma once
#include<iostream>
#include<functional>
#include<vector>
#include<ctime>
using namespace std;
// using task_t function<void()>;
typedef function<void()> task_t;
void Download()
{
cout << "执行下载任务" << " 通过子进程: "<< getpid() <<endl;
}
void PrintLog()
{
cout << "执行打印日志任务" << " 通过子进程: "<< getpid()<< endl;
}
void PushStream()
{
cout << "执行传输数据流任务" << " 通过子进程: "<< getpid()<< endl;
}
class Init
{
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(PushStream);
tasks.push_back(PrintLog);
srand(time(nullptr) ^ getpid());
}
// 判断任务的可行性
bool CheckSafe(int code)
{
if(code >= 0 && code < tasks.size()) return true;
else return false;
}
void RunTask(int code)
{
// tasks数组中存放着可调用对象,通过()调用
return tasks[code]();
}
int SelectTask()
{
return rand() % tasks.size();
}
private:
// 任务列表
vector<task_t> tasks;
// 任务码 (在代码中并没有用上)
const int download_code = 0;
const int print_code = 1;
const int push_stream_code = 2;
};
// 定义全局对象
Init init;
main.cc:
#include <iostream>
#include <assert.h>
#include <vector>
#include <string>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "work.hpp"
using namespace std;
#define BUFFSIZE 1024
// a
const int pipe_num = 5;
// 判断是第几个管道
static int name_flag = 1;
class channel
{
public:
channel(int fd, pid_t id)
: _ctrlfd(fd), _workid(id)
{
_name = "channel-" + to_string(name_flag);
name_flag++;
}
int GetFd() const
{
return _ctrlfd;
}
pid_t GetId() const
{
return _workid;
}
string GetName() const
{
return _name;
}
private:
int _ctrlfd;
pid_t _workid;
string _name;
};
void ChildWork()
{
while (1)
{
int code = 0;
// 子进程只读,当父进程没有写入指令时,子进程无法工作
// 父进程写4个字节的数据 子进程读取4个字节
ssize_t n = read(0, &code, sizeof(code));
// 对应任务码
if (n == sizeof(code))
{
// n值正常
if (!init.CheckSafe(code))
continue;
init.RunTask(code);
}
else if (n == 0)
{
break;
}
}
cout << "子进程已退出" << endl;
}
void CreatChannels(vector<channel> &channels)
{
vector<int> fd_write;
for (int i = 0; i < pipe_num; i++)
{
// 1.定义并创建管道
int pipefd[2];
int n = pipe(pipefd);
cout << "成功创建管道:" << i << endl;
assert(n == 0);
// 2.创建进程
pid_t id = fork();
assert(id != -1);
// 3.构建单向信道
if (id == 0)
{
// 对于子进程来说 只要出现拷贝了父进程的写
// 就需要进行关闭,才能实现单向传输的管道
if (!fd_write.empty())
{
for (size_t j = 0; j < fd_write.size(); j++)
{
// 关闭我们插入数组内容
close(fd_write[j]);
cout << "process: " << getpid() << " close: " << fd_write[j] << endl;
}
}
// child
// 子进程关闭当前的 写 端
close(pipefd[1]);
// 重定向到标准输入
dup2(pipefd[0], 0);
ChildWork();
exit(0);
}
// father
close(pipefd[0]);
// 存储写对应的下标相对值
fd_write.push_back(pipefd[1]);
// 传入这个 写 对应的下标文件给channel
channels.push_back(channel(pipefd[1], id));
// 测试父进程的写文件
// cout<< pipefd[1] <<endl;
}
cout << "管道已全部创建,开始执行任务" << endl;
}
void SendCommand(const vector<channel> &channels, int flag = -1)
{
int position = 0;
while (1)
{
if (flag == 0)
{
break;
}
sleep(1);
// 开始选择任务
// 本质上就是获取任务码
int command = init.SelectTask();
// 分配进程
channel c = channels[position++];
position %= channels.size();
cout << "send command: " << command << " in " << c.GetName() << " by father:" << getpid() << endl;
// 发送任务
write(c.GetFd(), &command, sizeof(command));
flag--;
}
cout << "任务已完成" << endl;
}
void ReleaseChannel(const vector<channel> &channels)
{
for (const auto &e : channels)
{
// 关掉 父进程开辟的写端,注意这里的子进程
close(e.GetFd());
pid_t rid = waitpid(e.GetId(), nullptr, 0);
}
}
int main()
{
vector<channel> channels;
// 创建管道
CreatChannels(channels);
// 发送任务并执行
SendCommand(channels, 5);
// 解决子进程回收问题
ReleaseChannel(channels);
}