1. 管道概念
管道,是进程间通信的一种方式,在Linux命令中“ | ”就是一种管道,它可以,连接前一条命令,和后一条命令,把前面命令处理完的内容交给后面,例如
cat filename | grep hello
cat命令会把文件的内容打印在显示器上,而管道就把这些内容交给后面的 grep 命令,由 grep 处理过后再把内容打印到显示器上
这就是管道,这两个命令可以看作,两个进程,管道负责把内容传输由一方传给另一方
这也同时注定了,管道是一份共享的资源,被两个进程同时看到,并使用。
再之前的学习中,我们有没有两个进程共享同一份资源的呢?
有,fork创建子进程,父子进程同时打印数据到显示器上,这是不是也是一种资源的共享,这是利用文件系统,当父进程创建子进程,子进程会拷贝父进程的大部分内容,包括PCB(task_struct),代码和数据,以及文件描述符表files_struct,子进程会复制父进程对应的file*指针,这也就导致他们指向同一个文件,所以他们可以同时向屏幕写数据。
2. 匿名管道
基于上面的启发,对于父子进程之间,我们是不是也可以通过打开同一份文件,一方读,另一方写,进而达成通信的目的。
具体步骤:
1. 父进程先以读写的方式打开同一份文件
2. fork创建子进程
3. 父进程关闭读端,子进程关闭写端
这样就形成了一个管道,父进程负责向管道写数据,子进程负责向管道读数据
但是单纯是文件,还不行,因为文件会有IO操作,如果内存文件和磁盘进行读写,会大大降低效率,而管道通信,只需要一个临时的内存级文件,不需要向磁盘写入和读取,所以这就需要系统提供的pipe函数
pipe会创建一个管道,用于进程间通信,他会打开并创建一个内存级的临时文件,其中参数是一个输出型参数,需要自己定义一个数组,函数调用后,pipefd[0]是进程的读方式打开文件返回的fd,pipefd[1]是写端。返回值,0标识成功,-1标识失败。
这就是匿名管道。
我们可以发现匿名管道有以下几个特点:
- 匿名管道建立的基础是具有血缘关系的进程,来完成进程间通信,父子进程
- 匿名管道,所以他的生命周期随着进程,当没有进程再打开他,他就会被删除
- 管道的本质是文件,管道是基于文件的读写,read和write都是面向流的,面向字节流
- 管道是半双工的,数据只能向一个方向流动,需要双方通信时,需要建立起两个管道
- 管道还具有访问控制,通过让进程间协同,访问控制主要体现在:
- a. 当写快,读慢,管道被写满了就不会再写,也就是写的进程等待,本质是阻塞
- b. 当写慢,读快,管道没有数据时,就不会再读,也是阻塞式的等待
- c. 当写关闭,读0,读到0,read的返回值为0代表读到文件结尾
- d. 当读关闭,写继续写,OS就会终止写方的进程SIGPIPE
匿名管道间通信的实验:
process.cc
#include <iostream>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "task.hpp"
using namespace std;
#define Child_Process_Size 5
int command_wait(int fd, bool &flag)
{
uint32_t command = 0;
ssize_t sz = read(fd, &command, sizeof(command));
// cout << "command:" << command << endl;
// cout << "sz:" << sz << endl;
if (sz == 0)
flag = true;
else
assert(sz == sizeof(command));
return command;
}
void command_push(pid_t child_process, int fd, int command)
{
cout << "main:child_process" << child_process << "\tfd" << fd << "\tcommand" << command << endl;
ssize_t ret = write(fd, &command, sizeof(command));
// cout << ret << "!" << endl;
}
int main()
{
load();
vector<pair<pid_t, int>> slots;
for (int i = 0; i < Child_Process_Size; ++i)
{
// 建立管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
pid_t id = fork();
assert(id != -1);
if (id == 0)
{
// child
// 关闭信道
close(pipefd[1]);
while (true)
{
bool flag = false;
int command = command_wait(pipefd[0], flag);
// cout << command << "#" << endl;
// printf("%p888888\n", callbacks[command]);
if (flag)
break;
if (command >= 0 && command < callbacks.size())
{
callbacks[command]();
}
else
{
cout << "非法command:" << command << endl;
}
}
exit(1);
// 执行command
}
// father
close(pipefd[0]);
slots.push_back(pair<pid_t, int>(id, pipefd[1]));
}
// 派发任务
srand((unsigned int)time(nullptr));
int time = 5;
随机派发
// while(time--)
// {
// int command = rand() % callbacks.size();
// int child = rand() % Child_Process_Size;
// command_push(slots[child].first, slots[child].second, command);
// sleep(1);
// }
指定派发
while (true)
{
cout << "******************************" << endl;
cout << "** 1. describe 2. command **" << endl;
cout << "********* 0. quit **********" << endl;
cout << "please cin your choice:";
int child = rand() % Child_Process_Size;
int command;
bool quit = false;
int choice = -1;
cin >> choice;
switch (choice)
{
case 1:
describe();
break;
case 2:
cout << "please cin command:";
cin >> command;
command_push(slots[child].first, slots[child].second, command);
break;
case 0:
quit = true;
break;
default:
cout << "please recin:";
break;
}
if (quit)
break;
}
for (const auto &e : slots)
{
close(e.second);
}
for (const auto &e : slots)
{
pid_t ret = waitpid(e.first, nullptr, 0);
assert(ret >= 0);
}
return 0;
}
task.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <functional>
#include <unistd.h>
using namespace std;
typedef std::function<void()> func;
unordered_map<int, string> desc;
vector<func> callbacks;
void mystrcpy()
{
cout <<"child process" << getpid() << "字符串拷贝" << endl;
}
void mystrstr()
{
cout <<"child process" << getpid() << "字符串匹配" << endl;
}
void mystrcmp()
{
cout <<"child process" << getpid() << "字符串比较" << endl;
}
void mystrlen()
{
cout <<"child process" << getpid() << "字符串计数" << endl;
}
void load()
{
desc.insert({callbacks.size(), "mystrcpy:字符串拷贝"});
callbacks.push_back(mystrcpy);
desc.insert({callbacks.size(), "mystrstr:字符串匹配"});
callbacks.push_back(mystrstr);
desc.insert({callbacks.size(), "mystrcmp:字符串比较"});
callbacks.push_back(mystrcmp);
desc.insert({callbacks.size(), "mystrlen:字符串计数"});
callbacks.push_back(mystrlen);
}
void describe()
{
for(const auto& e : desc)
{
cout << e.first << "\t" << e.second << endl;
}
}
3. 命名管道
命名管道(IFO)是基于匿名管道(pipe)的的更进一步,匿名管道只能在有血缘关系的进程间通信,而命名管道没有这个限制。
创建命名管道有两种方式:
a. 命令行
mkfifo name_fifo
命名管道文件属性为p,文件没有内容,也是内存级文件,他只有inode属性,没有data block数据块,数据都在内存
b. 函数
参数pathname文件路径,你需要把命名管道文件创建在那个路径下,mode文件属性,mode&~umask为最终文件属性,例如0666,最终文件属性664(rw_rw_r__),返回值0成功,-1失败
命名管道和匿名管道在使用上的区别,匿名管道由pipe函数创建并打开,命名管道由mkfifo函数创建,打开用open。
匿名管道的删除不需要额外动作,进程结束,管道就没了。但命名管道不同,命名管道需要删除,他是存在磁盘上的,只有属性没有内容。两种方式:
a. 命令行
rm name_fifo
unlink name_fifo
b. 函数
匿名管道和命名管道,本质都是文件,除了上面的一些区别,其他的使用都是一样的,而且也是具有访问控制,都是文件嘛!
关于命名管道的实验:
head.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <cstdlib>
#include <ctime>
#include "log.hpp"
using namespace std;
#define SIZE 1024
#define MODE 0666
string ipcPath = "./fifo";
log.hpp
#pragma once
#include <ctime>
#include <iostream>
#include <string>
enum status
{
Debug,
Notice,
Warning,
Error
};
const std::string str[] = {
"Debug",
"Notice",
"Warning",
"Error"
};
std::ostream& log(std::string message, status level)
{
std::cout << str[level] << (unsigned int)time(nullptr) << ":" << message;
return std::cout;
}
client.cc
#include "head.hpp"
int main()
{
int fd = open(ipcPath.c_str(), O_WRONLY);
if(fd < 0)
{
perror("open");
exit(1);
}
string message;
while(true)
{
cout << "client[" << getpid() << "]:";
cin >> message;
write(fd, message.c_str(), message.size());
sleep(1);
}
close(fd);
return 0;
}
server.cc
#include "head.hpp"
void read_fifo(int fd)
{
char buffer[SIZE];
while(true)
{
memset(buffer, '\0', SIZE);
ssize_t sz = read(fd, buffer, SIZE - 1);
if(sz > 0)
{
buffer[sz] = '\0';
cout << "server[" << getpid() << "]:" << buffer << endl;
}
else if(sz == 0)
{
break;
}
}
}
int main()
{
int ret = mkfifo(ipcPath.c_str(), MODE);
if(ret == -1)
{
perror("mkfifo");
exit(1);
}
log("创建命名管道成功", Debug) << "step 1" << endl;
int fd = open(ipcPath.c_str(), O_RDONLY);
if(fd < 0)
{
perror("open");
exit(2);
}
log("以读方式打开管道成功", Debug) << "step 2" << endl;
for(int i = 0; i < 3; ++i)
{
int _fd = fork();
if(_fd < 0)
{
perror("fork");
exit(3);
}
if(_fd == 0)
{
//child
read_fifo(fd);
exit(1);
}
}
for(int i = 0; i < 3; ++i)
{
pid_t ret = waitpid(-1, nullptr, 0);
if(ret > 0)
{
string str = "等到子进程:";
string id = to_string(ret);
log(str+id, Notice);
}
}
close(fd);
log("关闭打开的管道", Debug) << "step 3" << endl;
unlink(ipcPath.c_str());
log("删除管道文件成功", Debug) << "step 4" << endl;
return 0;
}
完。