目录
前言
进程间通信目的
管道通信
原理
匿名管道
测试样例
情况与特点
模拟进程池
命名管道
全部代码
前言
两个进程之间可以进行数据的直接传递吗?——不可以,进程必须得具备独立性。
进程间通信目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
管道通信
原理
让不同进程看到同一份资源有很多种做法,现在我们来学习其中的一种:管道通信
我们让进程以写的方式打开文件后再以读的方式打开文件,虽然是打开同一个文件,但在文件描述符中是相当于2个不同的文件对象的~不过二者指向同一处内存,因为代码和数据是一起的~当我们创建子进程时会拷贝父进程相关的数据与代码,这里面就包括文件描述符~但并不会拷贝其打开的文件,所以最终子进程会和父进程共同指向文件~相当于浅拷贝~这种让不同进程看到同一份资源的方式叫做管道通信,很好利用了父子进程间关系的一种通信方式,而接下来我们就可以实现进程间的数据传输~例如:父进程只负责读取文件(关闭父进程写通道),子进程负责写入文件(关闭子进程读通道)~ps:struct file是允许有多个指针指向的,它采用的是引用计数的方式,只有自减为0才会关闭文件
匿名管道
利用函数pipe可以做到匿名管道的效果:不再需要把数据从磁盘中加载进入内容,不需要向磁盘中刷新数据,且磁盘中并不存在的文件,它是属于内存级别的文件~测试样例
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h> //写函数 void writer(int wfd) { const char *str = "hello father, I am child"; char buffer[128]; //记录写几次 int cnt = 0; pid_t pid = getpid(); while(1) { //把字符串都写入到buffer中 snprintf(buffer, sizeof(buffer), "message: %s, pid: %d, count: %d\n", str, pid, cnt); //把buffer的内容写到文件描述符为4的位置 write(wfd, buffer, strlen(buffer)); cnt++; sleep(1); } } void reader(int rfd) { char buffer[1024]; while(1) { //从文件描述符为3的位置读取内容 ssize_t n = read(rfd, buffer, sizeof(buffer)-1); (void)n; printf("father get a message: %s", buffer); } } int main() { // 1. int pipefd[2]; int n = pipe(pipefd); if(n < 0) return 1; printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0]/*read*/, pipefd[1]/*write*/); // 3, 4 // 2. pid_t id = fork(); if(id == 0) { //child: w 子进程负责写 close(pipefd[0]); writer(pipefd[1]); exit(0); } // father: r 父进程负责读 close(pipefd[1]); reader(pipefd[0]); wait(NULL); return 0; }
情况与特点
//写函数
void writer(int wfd)
{
const char *str = "hello father, I am child";
char buffer[128];
//记录写几次
int cnt = 0;
pid_t pid = getpid();
while(1)
{
//在不关闭写端fd的情况下让子进程休眠10s再写
sleep(10);
//把字符串都写入到buffer中
snprintf(buffer, sizeof(buffer), "message: %s, pid: %d, count: %d\n", str, pid, cnt);
//把buffer的内容写到文件描述符为4的位置
write(wfd, buffer, strlen(buffer));
cnt++;
}
}
管道内部没有数据且子进程并没有关闭写端fd的情况下,父进程(读端)会一直阻塞等待,直到pipe有数据。——父进程作为读端会一直读取子进程写端写入的数据,没写则一直等待~
void reader(int rfd)
{
char buffer[1024];
while(1)
{
//在不关闭读端fd的情况下让父进程休眠100s再读
sleep(100);
//从文件描述符为3的位置读取内容
ssize_t n = read(rfd, buffer, sizeof(buffer)-1);
(void)n;
printf("father get a message: %s", buffer);
}
}
管道内部数据被写满且父进程并没有关闭读端fd的情况下,子进程(写端)写满后就会阻塞等待。——子进程作为写端是无法一直写入数据的,管道空间有限~
//写函数
void writer(int wfd)
{
const char *str = "hello father, I am child";
char buffer[128];
//记录写几次
int cnt = 0;
pid_t pid = getpid();
while(1)
{
/* //在不关闭写端fd的情况下让子进程休眠10s再写
sleep(10); */
/* //把字符串都写入到buffer中
snprintf(buffer, sizeof(buffer), "message: %s, pid: %d, count: %d\n", str, pid, cnt);
//把buffer的内容写到文件描述符为4的位置
write(wfd, buffer, strlen(buffer)); */
char c = 'A';
write(wfd,&c,1);
cnt++;
//写入一些数据后就退出
printf("cnt: %d\n", cnt);
if(cnt==10) break;
}
//关闭子进程写端
close(wfd);
}
void reader(int rfd)
{
char buffer[1024];
while(1)
{
/* //在不关闭读端fd的情况下让父进程休眠100s再读
sleep(100); */
sleep(1);
//从文件描述符为3的位置读取内容
ssize_t n = read(rfd, buffer, sizeof(buffer)-1);
if(n>0)
{
printf("father get a message: %s, n:%ld\n", buffer,n);
}
else if(n == 0)
{
printf("read pipe done, read file done!\n");
break;
}
else
break;
}
}
当写端写入数据后突然不写了并且关闭写端fd(防止再次写入数据)时,读端会将管道内的数据读完,最后会读到read的返回值为0,这也代表读到了数据的末尾。把数据读完了——写端不再写入数据时读端会获取到read函数的返回值,当获取到0时也意味着读取结束了。
//写函数
void writer(int wfd)
{
const char *str = "hello father, I am child";
char buffer[128];
//记录写几次
int cnt = 0;
pid_t pid = getpid();
while(1)
{
/* //在不关闭写端fd的情况下让子进程休眠10s再写
sleep(10); */
/* //把字符串都写入到buffer中
snprintf(buffer, sizeof(buffer), "message: %s, pid: %d, count: %d\n", str, pid, cnt);
//把buffer的内容写到文件描述符为4的位置
write(wfd, buffer, strlen(buffer)); */
sleep(1);
char c = 'A';
write(wfd,&c,1);
cnt++;
//写入一些数据后就退出
printf("cnt: %d\n", cnt);
//if(cnt==10) break;
}
//关闭子进程写端
//close(wfd);
}
void reader(int rfd)
{
char buffer[1024];
int cnt = 10;
while(1)
{
/* //在不关闭读端fd的情况下让父进程休眠100s再读
sleep(100); */
//从文件描述符为3的位置读取内容
ssize_t n = read(rfd, buffer, sizeof(buffer)-1);
if(n>0)
{
printf("father get a message: %s, n:%ld\n", buffer,n);
}
else if(n == 0)
{
printf("read pipe done, read file done!\n");
break;
}
else
break;
cnt--;
if(cnt == 0) break;
}
close(rfd);
printf("read endpoint close!\n");
}
当读端不再读取数据且关闭了读端fd时,OS就会直接终止写入的进程(子进程),通过信号13(SIGPIPE)杀死进程~
以上情况分别呈现了5种特性:
- 自带同步机制
- 血缘关系进程进行通信,常见父与子
- pipe是面向字节流的
- 父子退出,管道自动释放,文件的生命周期是随进程的
- 管道只能单向通信
模拟进程池
当我们想让更多的进程与进程之间进行分工合作时,我们可以利用父子进程这种特殊的管道关系来构建一个进程池以达到分工的目的~
把整体框架搭好~#include <iostream> #include <string> #include <cstdlib> #include <vector> #include <unistd.h> #include <ctime> using namespace std; enum { UsageError = 1, ArgError, PipeError }; //提示正确的命令行输入格式 void Usage(const std::string&s) { cout<<"Usage:"<<s<<"subprocess-num"<<endl; } //假设我们构建进程池中有5个管道~ // ./processpool 5 int main(int argc, char* argv[]) { //如果写入的命令行参数个数不到2个 if(argc!=2) { Usage(argv[0]); return UsageError; } //获取管道个数 int sub_process_num = std::stoi(argv[1]); //确保为多管道 if(sub_process_num<=0) return 1; //1.遍历创建通信管道与子进程 for(int i = 0;i<sub_process_num;i++) { //设置管道 int pipefd[2]{0}; //获取返回值 int n = pipe(pipefd); if(n<0) return PipeError; //创建子进程 pid_t id = fork(); if(id==0) { //child //子进程作为写端,关闭读端 close(pipefd[1]); exit(0); } //father 父进程作为读端,关闭写端 close(pipefd[0]); } //2.控制子进程完成任务 //3.回收子进程 return 0; }
目前我们要解决的就是每次在创建子进程时管道的独立性问题,要想管理多个通信管道——先描述,再组织~
using namespace std; enum { UsageError = 1, ArgError, PipeError }; //描述通信管道的类 class channel { public: //构造函数 channel(int wfd,pid_t sub_process_id,string name) :_wfd(wfd),_sub_process_id(sub_process_id),_name(name) {} //打印管道信息 void PrintDebug() { cout<<"_wfd: "<<_wfd; cout<<"_sub_process_id: "<<_sub_process_id; cout<<"_name: "<<_name<<endl; } ~channel() { } private: int _wfd; //子进程的端口 pid_t _sub_process_id;//所属子进程的pid string _name; //管道独立名字 } //提示正确的命令行输入格式 void Usage(const std::string&s) { cout<<"Usage:"<<s<<"subprocess-num"<<endl; } //假设我们构建进程池中有5个管道~ // ./processpool 5 int main(int argc, char* argv[]) { //如果写入的命令行参数个数不到2个 if(argc!=2) { Usage(argv[0]); return UsageError; } //获取管道个数 int sub_process_num = std::stoi(argv[1]); //确保为多管道 if(sub_process_num<=0) return 1; //组织管道 vector<channel> vc; //1.遍历创建通信管道与子进程 for(int i = 0;i<sub_process_num;i++) { //设置管道 int pipefd[2]{0}; //获取返回值 int n = pipe(pipefd); if(n<0) return PipeError; //创建子进程 pid_t id = fork(); if(id==0) { //child //子进程作为写端,关闭读端 close(pipefd[1]); exit(0); } //记录该管道 string cname = "channel"+to_string(i); //把管道组织起来,以匿名对象作插入值 vc.push_back(channel(pipefd[1],id,cname)); //father 父进程作为读端,关闭写端 close(pipefd[0]); //整理完毕后添加该管道信息 } //2.控制子进程完成任务 //3.回收子进程 return 0; }
组织完毕,接下来就可以开始控制子进程了~
//2.控制子进程完成任务 for(auto& e:vc) { e.PrintDebug(); }
//描述进程池的类,管理进程池 class ProcessPool { public: ProcessPool(int sub_process_num) : _sub_process_num(sub_process_num) { } int CreateProcess() { for(int i = 0;i<_sub_process_num;i++) { //设置管道 int pipefd[2]{0}; //获取返回值 int n = pipe(pipefd); if(n<0) return PipeError; //创建子进程 pid_t id = fork(); if(id==0) { //child //子进程作为写端,关闭读端 close(pipefd[1]); sleep(3); exit(0); } //记录该管道 string cname = "channel"+to_string(i); //father 父进程作为读端,关闭写端 close(pipefd[0]); //整理完毕后添加该管道信息,以匿名对象作插入值 vc.push_back(channel(pipefd[1],id,cname)); } return 0; } void Debug() { for (auto &e : vc) { e.PrintDebug(); } } ~ProcessPool() { } private: int _sub_process_num;//进程池中管道的个数 vector<channel> vc;//组织管道 };
int main(int argc, char* argv[]) { //如果写入的命令行参数个数不到2个 if(argc!=2) { Usage(argv[0]); return UsageError; } //获取管道个数 int sub_process_num = std::stoi(argv[1]); //确保为多管道 if(sub_process_num<=0) return 1; //1.创建通信管道与子进程 //用指针去调用成员函数 ProcessPool * processpool_ptr = new ProcessPool(sub_process_num); processpool_ptr->CreateProcess(); processpool_ptr->Debug(); //对象直接调用 /* ProcessPool pp(5); pp.CreateProcess(); pp.Debug(); */ //2.控制子进程完成任务 //3.回收子进程 return 0; }
#pragma once #include <iostream> #include <unistd.h> using namespace std; typedef void(*work_t)(); //函数指针类型 void worker() { while(1) { /* uint32_t command_code = 0; ssize_t n = read(0, &command_code, sizeof(command_code)); if(n == sizeof(command_code)) { if(command_code >= 3) continue; tasks[command_code](); } */ cout << "I am worker: " << getpid() << endl; sleep(1); } }
现在才是真正完成了创建进程与管道的任务并且我们还可以给子进程指派任务~
我们之所以重定向就是为了类似下面的效果#pragma once #include <iostream> #include <unistd.h> using namespace std; typedef void(*work_t)(); //函数指针类型 typedef void(*task_t)(); //函数指针类型 void PrintLog() { cout << "printf log task" << endl; } void ReloadConf() { cout << "reload conf task" << endl; } void ConnectMysql() { cout << "connect mysql task" << endl; } //一共有3个任务 task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql}; //随机选择任务 uint32_t NextTask() { return rand() % 3; } void worker() { //从标准输入0中读取任务,将来想要执行某个任务只需要向管道发送特定的0,1,2数字即可 while(1) { //读任务的命名码,0就是前面任务的下标 uint32_t command_code = 0; //从标准输入0中读取任务码,这样子就不用特意把管道文件描述符传递过来了,实际上还是读到了管道的数据 ssize_t n = read(0, &command_code, sizeof(command_code)); if(n == sizeof(command_code))//读进来的数据正确 { if(command_code >= 3) continue; tasks[command_code]();//执行对应任务 } cout << "I am worker: " << getpid() << endl; sleep(1); } }
#include <iostream> #include <string> #include <cstdlib> #include <vector> #include <unistd.h> #include <ctime> using namespace std; #include "task.hpp" enum { UsageError = 1, ArgError, PipeError }; //描述通信管道的类 class channel { public: //构造函数 channel(int wfd,pid_t sub_process_id,string name) :_wfd(wfd),_sub_process_id(sub_process_id),_name(name) {} //打印管道信息 void PrintDebug() { cout<<"_wfd: "<<_wfd; cout<<"_sub_process_id: "<<_sub_process_id; cout<<"_name: "<<_name<<endl; } string name() {return _name;} int wfd() {return _wfd;} pid_t pid() { return _sub_process_id; } ~channel() { } private: int _wfd; //子进程的端口 pid_t _sub_process_id;//所属子进程的pid string _name; //管道独立名字 }; //描述进程池的类,管理进程池 class ProcessPool { public: ProcessPool(int sub_process_num) : _sub_process_num(sub_process_num) { } int CreateProcess(work_t work)//回调函数 { for(int i = 0;i<_sub_process_num;i++) { //设置管道 int pipefd[2]{0}; //获取返回值 int n = pipe(pipefd); if(n<0) return PipeError; //创建子进程 pid_t id = fork(); if(id==0) { //child //子进程作为读端,关闭写端 close(pipefd[1]); //子进程开始执行任务 //子进程从标准输入0当中读数据而非管道,但在键盘中读入的数据还是管道的 dup2(pipefd[0],0); work(); exit(0); } //记录该管道 string cname = "channel-"+to_string(i); //father 父进程作为写端,关闭读端 close(pipefd[0]); //整理完毕后添加该管道信息,以匿名对象作插入值 vc.push_back(channel(pipefd[1],id,cname)); } return 0; } //选择任务时还得保证任务能负载均衡地出现在各个管道中 int NextChannel() { static int next = 0; int c = next; next++; next %= vc.size(); return c; } //发送任务 void SendTaskCode(int index, uint32_t code) { cout << "send code: " << code << " to " << vc[index].name() << " sub prorcess id: " << vc[index].pid() << endl; //向特定管道的写端写入任务码code write(vc[index].wfd(), &code, sizeof(code)); } void Debug() { for (auto &e : vc) { e.PrintDebug(); } } ~ProcessPool() { } private: int _sub_process_num;//进程池中管道的个数 vector<channel> vc;//组织管道 }; //提示正确的命令行输入格式 void Usage(const std::string&s) { cout<<"Usage:"<<s<<"subprocess-num"<<endl; } //假设我们构建进程池中有5个管道~ // ./processpool 5 int main(int argc, char* argv[]) { //如果写入的命令行参数个数不到2个 if(argc!=2) { Usage(argv[0]); return UsageError; } //获取管道个数 int sub_process_num = std::stoi(argv[1]); //确保为多管道 if(sub_process_num<=0) return ArgError; //随机数 srand((uint64_t)time(nullptr)); //1.创建通信管道与子进程 //用指针去调用成员函数 ProcessPool * processpool_ptr = new ProcessPool(sub_process_num); processpool_ptr->CreateProcess(worker); //processpool_ptr->Debug(); //对象直接调用 /* ProcessPool pp(5); pp.CreateProcess(); pp.Debug(); */ // 2. 控制子进程 while(true) { // a. 选择一个进程和通道 int channel = processpool_ptr->NextChannel(); // cout << channel.name() << endl; // b. 你要选择一个任务 //选择任务时还得保证任务能负载均衡地出现在各个管道中 uint32_t code = NextTask(); // c. 选择通道+任务最终发送任务 processpool_ptr->SendTaskCode(channel, code); sleep(1); } //3.回收子进程 delete processpool_ptr; return 0; }
以上是分配任务和选择管道让子进程执行工作~
最后再测试一下~
出现了一个奇怪的问题:为什么每个子进程的管道读端都是3?我们来画图分析一下~
关闭父进程读端与子进程写端:
问题就出在这里,当再创建子进程和通道时父进程原本关闭的3号读端重新作为了新管道的读端,至于写端则继续往下寻找(5号写端),子进程每一次的创建都会拷贝父进程的文件描述符,所以新的子进程3号仍是读端~这样重复下去父进程每个管道的写端都依次递增,而子进程的读端永远都是3号~
还有一个问题就是连原本父进程指向第一个管道的读端都给拷贝过来了, 这样会引发一个问题,当父进程想要关闭写端口时实际上只是引用计数--罢了,并没有真正关闭写端口~
所以子进程读端口都为3只是该进程池的特性,而子进程写端口过多才是我们需要关注并解决的问题~
搞定完这个bug后我们开始来写进程池的最后一步:回收子进程~
回收子进程有两个问题
- 回收子进程之前如何让子进程全部退出?子进程在worker函数里会一直阻塞在read中~
- 我们让read返回值为0即可——意味着写端不写且写端口关闭,读到0代表读取结束~
怎么让所有已经退出的子进程对他进行资源回收
wait/waitpid
//把所有管道的写端口关闭即可让所有子进程退出 void QuitAll() { for(auto &channel: vc) { channel.Close(); //关闭完顺便回收子进程 pid_t pid = channel.pid(); pid_t rid = waitpid(pid,nullptr,0); if(rid==pid) { std::cout << "wait sub process: " << pid << " success..." << std::endl; } std::cout << channel.name() << " close done" << " sub process quit now : " << channel.pid() << std::endl; } }
完结撒花~
命名管道
- 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
- 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
- 命名管道是一种特殊类型的文件
在图中我们可以获取到以下几点信息:
当进程A以读方式打开fifo文件,进程B以写方式打开fifo文件时各自的struct file是独立的~对于进程B而言OS并不会再为其打开的文件创建新的inode、缓冲区等资源,而是和进程A先打开的一起共享资源~
接下来我们就来创建命名管道
命名管道的特性其实与匿名管道基本一致,唯一不同的就是它是让两个不同进程(无血缘关系)进行通信~
#ifndef __COMM_HPP__
#define __COMM_HPP__
#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
using namespace std;
#define Mode 0666
#define Path "./fifo"
class Fifo
{
public:
Fifo(const string& path)
:_path(path)
{
umask(0);
//创建管道文件
int n = mkfifo(_path.c_str(),Mode);
if(n==0)
{
cout<<"mkfifo success"<<endl;
}
else
{
cerr << "mkfifo failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
}
}
~Fifo()
{
int n = unlink(_path.c_str());
if (n == 0)
{
cout << "remove fifo file " << _path << " success" << endl;
}
else
{
cerr << "remove failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
}
}
private:
string _path; //文件路径+文件名
};
#endif
#include "comm.hpp"
/* int main()
{
cout<<"hello i am client"<<endl;
return 0;
} */
int main()
{
//以写方式打开命名管道(特殊文件)并记录其写端口号
int wfd = open(Path, O_WRONLY);
if (wfd < 0)
{
cerr << "open failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
return 1;
}
string inbuffer;
while (true)
{
cout << "Please Enter Your Message# ";
//从键盘上获取内容输入到inbuffer中
std::getline(cin, inbuffer);
//人工输入退出
if(inbuffer == "quit") break;
//开始在对应文件描述符中的文件写入数据
ssize_t n = write(wfd, inbuffer.c_str(), inbuffer.size());
if (n < 0)
{
cerr << "write failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
break;
}
}
close(wfd);
return 0;
}
#include "comm.hpp"
#include <unistd.h>
/* int main()
{
Fifo fifo("./fifo");
cout<<"hello i am server"<<endl;
return 0;
} */
int main()
{
//创建命名管道
Fifo fifo(Path);
//以读方式打开命名管道(特殊文件)
int rfd = open(Path, O_RDONLY);
if (rfd < 0)
{
cerr << "open failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
return 1;
}
// 如果我们的写端没打开,先读打开,open的时候就会阻塞,直到把写端打开,读open才会返回
cout << "open success" << endl;
char buffer[1024];
while (true)
{
//开始到对应文件描述符的文件进行数据读取
ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
cout << "client say : " << buffer << endl;
}
//写端不写且关闭,则读端读取完后也要离开并关闭-
else if (n == 0)
{
cout << "client quit, me too!!" << endl;
break;
}
else
{
cerr << "read failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
break;
}
}
close(rfd);
return 0;
}
全部代码
模拟进程池 · d883fa6 · 玛丽亚后/keep - Gitee.com