目录
为什么要进行进程间通信?
匿名管道的具体实现
pipe创建内存级文件形成管道
pipe的简单使用
匿名管道的四种情况和五种特性
四种情况
五种特性
PIPE_BUF
命令行管道 |
功能代码:创建进程池
为什么要进行进程间通信?
1.数据传输:一个进程需要将它的数据发送给另一个进程,比如我们有两个进程,一个负责获取数据,另一个负责处理数据,这时第一个进程就要将获取到的数据交给第二个进程
2.资源共享:多个进程间共享同样的资源
3.通知事件:一个进程需要给其他进程发送消息,通知他们发生了某种事件
4.进程控制:有些事件需要完全控制另一个进程,比如我们在使用gdb调试时,gdb就是一个进程,它控制了我们要调试的进程
进程之前是有互相传递信息的需求,但是进程之间又是独立的,一个进程不可能去另一个进程的地址空间中取信息,所以这就要求操作系统去提供一块交换数据的空间来供进程之间使用。
OS提供空间有不同的样式,这就有了不同的通信方式:
1.管道(分为匿名和命名)
2.共享内存
3.消息队列
4.信号量
那么我们就先来谈一谈匿名管道
匿名管道的具体实现
在谈之前,我们要有一些之前的知识作为理论基础,就是父进程创建子进程PCB和文件描述符表是要拷贝一份的,并且里边的值不会进行修改,就相当于浅拷贝;而管理文件的结构体对象不会拷贝。因为前者是跟进程相关的,而后者是跟文件系统相关的。我们把这段话用图来描述就是这样的:
通过这样的操作父子进程就可以看到同一块文件的缓冲区了,这样进程就可以读写了,但是两个文件由读又写容易发生混乱,所以我们一般关掉一个进程的读端,关掉另一个进程的写端,这样就实现了单向通信,就是因为它是单向通信,就像管道一样,所以这样的通信方式就被命名为管道。
pipe创建内存级文件形成管道
我们上面的操作是基于一个实实在在的磁盘文件的,我们必须得这样吗?肯定不是的,OS就提供了一个系统调用负责提供一个内存级的文件,它没有名字,只能通过文件描述符来访问,这个系统调用叫pipe()
它的参数是一个输出型参数,就是pipe这个函数把内存级文件的读写文件描述符放到这个数组中,我们来取来用。
并且,规定0下标放的是r方法,1下标放的是w方法。
由上面我们可以看出,匿名管道是只能具有血缘关系的进程之间使用,因为文件描述符表是要靠父进程创建子进程拷贝下来的。
pipe的简单使用
那么下面我们就写一段代码来验证上面所说的内容,并且演示管道究竟应该如何使用,下面的代码就是子进程往管道里写,父进程往管道里读
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <string.h>
void writer(int wfd)
{
const char *str = "i am child,this is the ";
int cnt = 0;
char buffer[128] = {0};
while (1)
{
snprintf(buffer, sizeof(buffer), "%s%d message", str, cnt);
write(wfd, buffer, strlen(buffer));
sleep(1);
cnt++;
}
}
void reader(int rfd)
{
while (1)
{
char buffer[1024] = {0};
read(rfd, buffer, sizeof(buffer));
printf("I am father,I get a message:%s\n", buffer);
}
}
int main()
{
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return 1;
pid_t id = fork();
if (id == 0)
{
// 子进程负责w
close(pipefd[0]);
writer(pipefd[1]);
exit(0);
}
// 父进程负责r
close(pipefd[1]);
reader(pipefd[0]);
return 0;
}
匿名管道的四种情况和五种特性
有了上面的一些基本使用,下面我们来演示一下管道的四种情况以及说明五种特性
四种情况
第一种:管道中没有数据,并且子进程不关闭自己的写端,这时父进程会进行阻塞等待,直到管道中有数据
第二种:子进程一直写,父进程不读,但是父进程不关闭读端,当管道被写满时就要进行阻塞等待,直到管道中的数据被读出去才会继续写
我们就让子进程一次写一个字符,看看它一共能写多少个字符
这里printf如果不给换行的话一定要fflush,否则有的打印的东西会在缓冲区中打印不出来
我们可以看到最终是打印到了65536byte,正好是64kb,我们就可以推断出管道的大小是64kb
第三种:子进程不写了并且关掉了写端,这时读端读完了管道中的数据后,read的返回值就为0,这时我们就可以人为的退出了,这和第一种情况是不同的第一种情况是阻塞等待
我们让子进程写10秒就退出,read返回值为0父进程就退出
第四种:让写端一直写,但是读端不读并且关闭读端,这时的结果就是写端也会退出,因为没人读了写就没意义了。
至于说写端是如何退出的呢?其实是收到了退出信号,我们也可以通过wait的方式来看一下退出信号是什么
我们让写端一直写,读端读5秒后退出,然后通过wait的方式获取子进程(写端)的退出信号
五种特性
通过上面的一些介绍,我们就可以总结出管道的五种特性:
1.自带同步机制:写满了就不写了,等待读,等待它们之间的同步,读不到就不读了,等待写
2.具有血缘关系的进程间进行通信
3.pipe是面向字节流的:我可以一个字符一个字符的写,同时可以一下读很多个字节,就是说读的次数和写的次数之间是没有关系的,它们是面向管道中的数据的
4.进程退出,管道自动释放,文件的生命周期是随着进程的
5.管道只能单向通信,就是一个写,一个读,这也叫半双工
PIPE_BUF
PIPE_BUF是一个常量,它是由大小的
就是说:每次写入管道的字节数如果小于这个值,那么就认为本次写入是原子的(安全的),就是保证内容是完整的,不会被分割
命令行管道 |
之前我们说的命令 | ,本质上就是这篇博客说的pipe
比如
就是同时创建三个进程,两个进程之间创建好管道,第一个进程的输出当作第二个进程的输入,第二个进程的输出当作第三个进程的输入,最终效果是睡眠三秒
功能代码:创建进程池
进程池就是一次创建多个进程,然后父进程负责分发任务给各个子进程。各个子进程处理完一个任务后还可以处理下一个任务,而不需要创建新的进程,这样就减少了创建和销毁进程的开销。
我们之前写bash的时候就是有一个任务bash就创建子进程,然后子进程进行进程程序替换,执行完后就退出了。
我们今天写的呢就是创建子进程后子进程一直等待父进程的命令然后执行任务,执行完任务后继续等待。
//task.hpp
#include <iostream>
#include <unistd.h>
#include <cstring>
#include <cerrno>
#include <cstdlib>
#include <vector>
#include<ctime>
using namespace std;
typedef void(*task)();
void task1()
{
cout<<"do task one successfully"<<endl;
}
void task2()
{
cout<<"do task two successfully"<<endl;
}void task3()
{
cout<<"do task three successfully"<<endl;
}
task tasks[3]={task1,task2,task3};
//processpool.cc
#include "task.hpp"
void Usage(char *argv)
{
cout << "please input : " << argv << " and processnum\n";
}
enum error
{
USAGE_failed = 1,
PIPE_failed,
};
class channel
{
public:
channel(int wfd, int id, int n)
: _wfd(wfd), _id(id), _name("channel-" + to_string(n))
{}
void print()
{
cout << "name is " << _name << "id is " << _id << " wfd is " << _wfd << endl;
}
int wfd()
{
return _wfd;
}
~channel() {}
private:
int _wfd;
pid_t _id;
string _name;
};
class processpool
{
public:
processpool(int size)
: _size(size)
{
for (int i = 1; i <= size; i++)
{
int pipefd[2] = {0};
int ret = pipe(pipefd);
if (ret == -1)
{
cout << "pipe failed : errno is " << errno << "error describe is " << strerror(errno) << endl;
exit(PIPE_failed);
}
pid_t id = fork();
if (id == 0)
{
// 子进程
for (int j = pipefd[0] + 1; j <= pipefd[1]; j++)
{
close(j);
}
// 等待任务
while (1)
{
int buffer = 0;
int n = read(pipefd[0], &buffer, sizeof(buffer));
if (n != 0)
{
cout << "child" << i << " " << getpid() << " ";
tasks[buffer]();//执行任务
}
if (n == 0)
break;
}
exit(0);
}
channels.push_back({pipefd[1], id, i});
close(pipefd[0]);
}
}
void print()
{
for (auto &e : channels)
{
e.print();
}
}
void get_wfd(vector<int> &f)
{
for (auto &e : channels)
{
f.push_back(e.wfd());
}
}
private:
vector<channel> channels;
int _size;
};
void give_task(vector<int> &wfd)
{
int n = wfd.size();
for (int i = 0; i < 100; i++)
{
int tasknum = rand() % (sizeof(tasks) / sizeof(tasks[0]));
write(wfd[i % n], &tasknum, sizeof(tasknum));//按顺序选择管道,派发随机任务
sleep(1);
}
}
int main(int argc, char *argv[])
{
srand((unsigned int)time(nullptr));
if (argc != 2)
{
Usage(argv[0]);
return USAGE_failed;
}
int processnum = stoi(argv[1]);
processpool pool(processnum);//创建进程池
vector<int> wfd;
pool.get_wfd(wfd);//都可以去给哪个文件描述符给任务
give_task(wfd);//发送任务
return 0;
}