一.什么是通信
进程间通信(Inter-Process Communication,IPC),是指在操作系统中,不同进程之间进行数据交换和同步的机制。由于每个进程通常拥有独立的内存空间,进程间无法直接访问对方的内存,因此需要通过特定的机制来实现通信和协作。
二.为什么要进程间通信
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同一份资源
- 通知事件:一个进程需要向一个或一组进程发送消息,通知它(它们)发生了某种时间(如进程终止需要通知父进程)
- 进程控制:有些进程希望完全控制另一个进程的执行,此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变
三.怎么进行进程间通信
因为进程具有独立性,所以进程无法互相访问对方的内存空间,所以为了让进程可以互相通信,前提就是让进程看到同一份资源,也就是同一份内存空间。而且这个内存空间不是由任意一个通信的进程提供的!
而在进程间通信刚被需要时,并没有独立为其开发方案和代码,而是在已有的Linux系统上,利用目前的系统特性,结合文件系统,开发出来一套基于文件系统的进程间通信方案——管道(pipe)。
四.匿名管道
匿名管道主要用于父子进程之间的通信。
我们知道,父进程在创建子进程后,子进程会拷贝父进程的pcb、页表、文件描述符表等等...
所以,当父进程打开了一个文件之后,再创建子进程,此时子进程和父进程都可以看到同一个内存资源——打开的文件。但是我们说了两个进程看到的资源不能是通信的进程提供的。所以为了实现通信,我们就有了管道的概念。
而起始我们在命令行操作中,早已经了解了管道操作:
ls | wc -l
我们执行的命令运行起来就是进程,而我们上面的操作起始就是进程间的通信。ls进程将自己获取信息通过管道交给了wc进程,由wc进程来统计文件数量。
0x1.管道的创建
这个管道是OS单独设计的内存级资源。 当我们创建管道时,是由操作系统打开的,其系统调用为:通过pipe创建的管道叫做匿名管道
#include <unistd.h>
/* On all other architectures */
int pipe(int pipefd[2]);
RETURN VALUE
On success, zero is returned. On error, -1 is returned, errno is set appropriately, and pipefd is left unchanged.
当我们创建一个管道文件,默认是以读写方式打开的,会返回两个文件描述符分别对应写和读,3对应写读,4对应写。
#include <iostream>
#include <unistd.h>
int main()
{
int pipes[2] = {0};
int n = pipe(pipes);
if(n < 0) std::cerr << "pipe error" << std::endl;
std::cout << "pipes[0]:" << pipes[0] << std::endl;
std::cout << "pipes[1]:" << pipes[1] << std::endl;
return 0;
}
0x2.构建通信信道
匿名管道用来父子进程之间的通信,现在有了管道,下一步就是创建子进程,并构建通信信道。
创建子进程后,子进程会拷贝父进程的pcb和文件描述符表,所以此时子进程也拿到管道的访问权了。
接着便是构建通信信道,而在父子进程通信遵从的是单向通信。即一个写,一个读。所以我们要关闭父子进程不需要的端口。
自此,我们就构建好了一个通信信道,此时父子进程就可以进程通信。但是在这个过程中,父进程只能写,而子进程只能读,不能被修改。
0x3.测试父子进程间的通信
要求:创建父子进程间单向通信,父进程读,子进程写。
#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
void ChildWrite(int wfd)
{
char buffer[1024] = {0};
int cnt = 0;
while (true)
{
snprintf(buffer, sizeof(buffer), "i am child: pid->%d, cnt->%d", getpid(), cnt++);
write(wfd, buffer, strlen(buffer));
sleep(1);
}
}
void FatherRead(int rfd)
{
char buffer[1024] = {0};
while (true)
{
ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "child say:" << buffer << std::endl;
}
}
}
int main()
{
// 1.打开管道文件,一般fds[0]:read, fds[1]:write
int fds[2] = {0};
int n = pipe(fds);
// std::cout << "fds[0]:" << fds[0] << std::endl;
// std::cout << "fds[1]:" << fds[1] << std::endl;
if (n < 0)
{
std::cerr << "pipe error" << std::endl;
return 1;
}
// 2.创建子进程并构建单向通信信道->father r, child w
pid_t id = fork();
if (id == 0)
{
// child
close(fds[0]);
// operate
ChildWrite(fds[1]);
close(fds[1]);
exit(1);
}
// father
close(fds[1]);
// operate
FatherRead(fds[0]);
waitpid(id, nullptr, 0);
close(fds[0]);
return 0;
}
说明:构建通信信道之后,我们让子进程写,父进程读。子进程每个一秒向管道里写内容,而父进程一直读管道。我们看结果:
虽然我们一直在读,但是因为写有间隔,结果依旧是一秒打印一条消息。为什么读与写的时间相匹配?
当管道里面没有内容的时候,再读文件内容的话,此时读的进程就会阻塞住。直到管道内有内容。 这就是管道的同步机制。
0x4.管道的5种特性与4种通信情况
5种特性:
1、匿名管道只能用于具有亲缘关系的进程间的通信(常用于父子进程)
2、管道文件自带同步机制
正如上面的测试代码,读和写的频率是不同,当写的慢,读的快时,读就会阻塞住;当写的快,读的慢时,管道文件满了,写端就会阻塞住。
当我们让写的快,读的慢时:
说明: 写的快,一下子就有可能将管道文件写满,也有可能到了读的时候,此时就只能等待读端读取管道文件内容,否则无法进行下一次写入。所以读一次管道文件就会带出很多的内容。
3、管道是面向字节流的
- 管道文件是面向字节流的,这意味着管道不关心数据的结构和类型,只负责将字节从一个进程传递到另一个进程
- 写入管道的数据被视为连续的字节流,没有明确的消息边界,也就是说,写入与读取无关,读取的结果可能与写入有偏差,取决于我想怎么读。
4、管道是单向通信的
数据只能从写端传递到读端。管道是一种特殊的半双工,因为其写端和读端不能交换。
任意时刻,一个发,一个收——半双工
任意时刻,可以同时收发——全双工
5、管道文件的声明周期是随进程的。进程结束,管道就会被OS回收。
4种通信情况:
1、写慢,读快——读端就要阻塞等待写端写入
2、写快,读慢——当管道文件被写满了,此时写端就要阻塞等待读端读取文件内容,之后才能继续写入
3、写关,继续读——read就会读到返回值为0,表示文件结尾
void ChildWrite(int wfd)
{
char buffer[1024] = {0};
int cnt = 10;
while (cnt)
{
snprintf(buffer, sizeof(buffer), "i am child: pid->%d, cnt->%d", getpid(), cnt--);
write(wfd, buffer, strlen(buffer));
sleep(1);
}
}
void FatherRead(int rfd)
{
char buffer[1024] = {0};
while (true)
{
// sleep(5);
ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "child say:" << buffer << std::endl;
}
else if(n == 0)
{
std::cout << "n:" << n << std::endl;
sleep(1);
}
}
}
说明:我们只写十次,随后让子进程关闭写端,读端此时在读时就会拿到返回值0,表示读到了文件结尾。
4.读关,写继续
写端写入没有任何意义,数据不会被其他进程所使用,白白占用了资源,而OS系统就是做资源管理的,所以不会让别人浪费资源,所以操作系统会杀死进程,通过发送异常信号SIGPIPE
...
void FatherRead(int rfd)
{
char buffer[1024] = {0};
int cnt = 5;
while (cnt--)
{
// sleep(5);
ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "child say:" << buffer << std::endl;
}
else if(n == 0)
{
std::cout << "n:" << n << std::endl;
sleep(1);
}
}
}
...
// 关闭读端
close(fds[0]);
int status = 0;
waitpid(id, &status, 0);
std::cout << "exit code->" << ((status >> 8) & 0xFF) << "exit signal->" << (status & 0x7F) << std::endl;
说明:一直写,而只读5次,接着关闭读端,此时根据我们所说,操作系统会通过信号杀死进程。而此时我们再等待子进程时,获取其退出状态。我们前面还说过,当子进程接收到信号异常结束后,此时的退出码是无效的——0,而退出信号就应该对应——SIGPIPE
五.进程池
了解了上面的匿名管道实现父子进程间单向通信之后,我们通过该机制设计一个进程池——提前创建出多个子进程,等到要使用子进程执行任务时,直接从进程池中找进城去执行,而不用在通过fork创建子进程,从而减少进程创建和销毁的开销,提高系统资源利用率和任务处理效率。
其实就类似于内存池的概念,为了避免频繁的在堆上申请空间,可以先开辟一大块内存,再想申请空间时,就直接从内存池拿!
1.先描述在组织
进程池本质上是为了完成我们给出的任务,而任务我们要怎么给进程呢?通过管道!
所以我们管理进程池本质上就是在管理一个一个的通信信道。而管理就得先描述在组织!
组织信道我们额外创建一个类并用数组的方式保存所有的信道。
而对于进程池来说,他就是对所有的信道进行任务发送,所以它的成员就是所有的信道!
// 先描述——通信信道
class Channel
{
public:
Channel(){}
~Channel(){}
private:
};
// 在组织——管理信道
class ChannelManager
{
public:
ChannelManager(){}
~ChannelManager(){}
private:
std::vector<Channel> _channels;
};
// 进程池
class ProcessPool
{
public:
ProcessPool(){}
~ProcessPool(){}
private:
ChannelManager _cm;
};
2.初始化进程池
初始化进程池其实就是创建通信信道,并将所有的通信信道与对应的子进程关联起来。我们最终想要达到的目的应该是父进程可以通过多个不同的文件描述发送任务,即写任务到管道文件中,让对应的子进程读取任务,然后执行!
我们默认创建5个子进程的进程池:
// 先描述——通信信道
class Channel
{
public:
Channel(int wfd, int id)
:_wfd(wfd)
,_id(id)
{
_ref = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_id);
}
~Channel(){}
private:
int _wfd; // 信道号
int _id; // 对应子进程
std::string _ref; // 信道描述信息
};
// 在组织——管理信道
class ChannelManager
{
public:
ChannelManager(){}
~ChannelManager(){}
void manage(int wrd, int id)
{
_channels.emplace_back(wrd, id);
}
private:
std::vector<Channel> _channels;
};
const int DefaultProcessPoolCount = 5;
// 进程池
class ProcessPool
{
public:
ProcessPool(int count = DefaultProcessPoolCount )
:_process_count(count)
{}
~ProcessPool(){}
bool InitPool()
{
for (int i = 0; i < _process_count; ++i) // 循环创建多个子进程与管道文件的对应关系
{
// 1.打开管道文件
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
{
std::cerr << "pipe error" << std::endl;
return -1;
}
// 2.创建子进程,并构建单向通信信道 -> fathrer write, child read
pid_t id = fork();
if (id < 0)
{
std::cerr << "fork errror" << std::endl;
}
else if (id == 0)
{
// child
close(pipefd[1]);
Work(pipefd[0]); // 执行任务
close(pipefd[0]);
exit(0);
}
else
{
// father
close(pipefd[0]);
// 3.将创建出来的信道与子进程关联
_cm.manage(pipefd[1], id);
}
}
// 测试代码
#ifdef DEBUG
_cm.debug();
sleep(10);
#endif
return true;
}
private:
ChannelManager _cm;
int _process_count; // 进程池中的子进程数
};
3.任务管理器
我们想让子进程根据我们所写入到管道文件中的任务码来执行对应的任务。我们可以借助哈希表来存储对应的任务码和任务函数。但是哈希表无法存储函数,所以我们要用包装器对函数进行包装,使之成为统一的类型,以被哈希表存储。
我们分别设计以下方法,用来进行任务的管理等操作:
- 注册任务:用来将对应的任务码和对应的函数存储起来。
- 获取任务码:我们这里采用随机值获取任务码来模拟发送不同任务的场景
- 执行任务:执行任务的逻辑其实就是访问对应任务码的value。
// task.hpp
#pragma once
#include <iostream>
#include <string>
#include <ctime>
#include <functional>
#include <unordered_map>
// task for test
void PrintLog()
{
std::cout << "task for 打印日志" << std::endl;
}
void DownLoad()
{
std::cout << "task for 下载" << std::endl;
}
void UPLoad()
{
std::cout << "tas for 上传" << std::endl;
}
class TaskManager
{
public:
TaskManager()
{
srand(time(nullptr));
}
~TaskManager() {}
void Register(int code, std::function<void()> task)
{
_taskCatalog[code] = task;
}
int taskcode()
{
int r = rand();
return r % _taskCatalog.size();
}
void Exectue(int taskcode)
{
_taskCatalog[taskcode]();
}
private:
std::unordered_map<int, std::function<void()>> _taskCatalog;
};
4.选择与执行任务
有了任务管理器之后,我们的进程池还应该有一个任务管理器成员,用来选择任务执行任务等操作。另外我们在初始化进程池时还应该注册对应的任务!!!
class ProcessPool
{
public:
ProcessPool(int n = defaultPoolCount)
:_process_count(n)
{
// 注册任务
_tm.Register(0, PrintLog);
_tm.Register(1, DownLoad);
_tm.Register(2, UPLoad);
}
...
private:
ChannelManager _cm;
TaskManager _tm;
int _process_count;
};
选择任务分为两部分,一是选择任务,二是选择子进程
0x1.选择任务
选择任务非常简单,我们只需要调用对应的tm的获取任务码的方法即可。
0x2.选择子进程
有了任务之后,我们还得选择让那个子进程去执行。因为我们有多个子进程,所以我们肯定得平衡每个子进程执行任务的次数,不能让某个子进程忙死,其他子进程闲死。
我们可以采用轮询、随机数、channel负载指标等来控制。
这里我们采用轮询的方式来选择。
轮询即第一次使用第一个子进程,第二次使用第二个,一次类推。我们只需要使用一个int变量来记录当前要使用的子进程即可,为了避免该变量越界,所以我们还要采取取模的方式!
0x3.发布任务
选择好任务和子进程后,我们接下来就得发布任务了。而发布任务其实就是将对应的任务码写入到对应通信信道中。
// channel
...
void NotifyTask(int taskcode)
{
int n = write(_wfd, &taskcode, sizeof(taskcode));
(void)n; // 这里定义了一个没有使用的变量,这样是为了避免warnning
}
...
// channelmanager
...
Channel& Select()
{
auto& c = _channels[_next];
_next++;
_next %= _channels.size();
return c;
}
...
private:
std::vector<channel> channels;
int _next;
// processpool
...
// 选择任务,选择信道
void RunTask()
{
// 1.选择任务
int taskcode = _tm.taskcode();
std::cout << "任务已选择..." << std::endl;
// 2.选择信道
auto& channel = _cm.Select();
std::cout << "信道已选择--->" << channel.Ref() << std::endl;
// 3.发布任务
channel.NotifyTask(taskcode);
std::cout << "任务已通知..." << std::endl;
}
...
0x4.执行任务
当我们在发布任务之前,所有没有执行任务的子进程都应该处于read的阻塞等待中。因为我们一直没有写,相当于写的慢读的快。
当我们一写入,子进程就可以从对应的管道中读取任务码,接下来就是执行任务了。而执行任务我们已经在任务管理器中定义了,所以这里只需要获取任务码,调用对应的执行函数就行了。
// processpool
...
// 执行任务
void Work(int rfd)
{
int code = 0;
while (true)
{
ssize_t n = read(rfd, &code, sizeof(code));
// std::cout << "n:" << n << std::endl;
if (n > 0)
{
if(n != sizeof(code)) // 如果读到的不是一个整数表明非法任务,重新等待
continue;
std::cout << "开始执行任务" << std::endl;
_tm.Exectue(code);
}
else if (n == 0)
{
std::cout << "读到文件末尾" << std::endl;
break;
}
else
{
std::cerr << "读取错误" << std::endl;
break;
}
}
}
...
5.进程池的销毁
我们销毁进程池其实就是销毁所有的信道以及回收对应的子进程。而销毁信道我们之前说过,如果一个管道只剩写端,没有读端,此时是没有意义的,OS会将该管道关闭,所以关闭信道其实只需要让子进程关闭对应的读端即可。
回收子进程我们只需要使用waitpid系统调用即可。
// channel
...
void closewfd()
{
int n = close(_wfd);
(void)n;
}
void waitid()
{
int n = waitpid(_id, nullptr, 0);
if(n < 0) std::cerr<<"wiat error" << std::endl;
}
...
// channelmanager
...
void DestroyChannel()
{
for(auto& channel : _channels)
{
channel.closewfd();
std::cout << channel.Ref() << "closed !" << std::endl;
}
}
void RecycleProcess()
{
for(auto& channel : _channels)
{
channel.waitid();
std::cout << channel.Ref() << "recycled !" << std::endl;
}
}
...
// processpool
...
// 销毁进程池
void Destroy()
{
// 1.关闭所有信道
_cm.DestroyChannel();
// 2.回收子进程
_cm.RecycleProcess();
}
...
自此,我们就实现了基于匿名管道的进程池。