目录
一、管道通信
二、匿名管道
1. 匿名管道通信
2. 匿名管道设计
三、命名管道
comm.hpp
client.cc
serve.cc
一、管道通信
进程通信
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源
通知事件:一个进程向另一个(一组)进程发送信息,通知它们发生了某种事件
进程控制:一个进程完全控制另一个进程的执行,如debug
通信本质
OS直接或间接给通信双方提供内存空间
通信的进程双方,能够读取到一份公共资源
管道文件是内存级文件
管道创建
管道读写端
- 如果管道没有了数据,读端在读,默认会阻塞等待正在读取的进程
- 管道是固定大小的空间,写端写满的时候,会阻塞等待读端读取
- 写端关闭,读端在读,则读端读完管道数据也关闭
- 读端关闭,写端在写,OS终止写端
int main()
{
int fds[2];
int n = pipe(fds);
assert(n == 0);
cout << "fds[0]: " << fds[0] << endl;
cout << "fds[1]: " << fds[1] << endl;
return 0;
}
管道可用于父子、兄弟、祖孙进程之间通信
二、匿名管道
1. 匿名管道通信
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
int main()
{
// 1. 创建管道文件,打开读写端
int fds[2];
int n = pipe(fds);
assert(n == 0);
// 2. fork()
pid_t id = fork();
if (id == 0)
{
//子进程通信代码
close(fds[0]); //关闭读端
const char* s = "我是子进程,我正在给你发消息";
int cnt = 0;
while (true)
{
char buffer[1024];
snprintf(buffer, sizeof(buffer), "child->parent say: %s[%d][%d]", s, cnt++, getpid());
write(fds[1], buffer, strlen(buffer));
sleep(1);
}
exit(0);
}
//父进程通信代码
close(fds[1]); //关闭写端
while (true)
{
char buffer[1024];
ssize_t s = read(fds[0], buffer, sizeof(buffer) - 1);
if (s > 0)
buffer[s] = 0;
cout << "Get Message# " << getpid() << buffer << " | my pid: " << getpid() << endl;
//父进程没有sleep
}
n = waitpid(id, nullptr, 0);
assert(n == id);
return 0;
}
2. 匿名管道设计
- 将任务存入存入任务表vector中,将子进程存入信息存入subs容器中
- 父进程随机将任务码发给5个子进程,子进程读取任务码完成任务(父进程为写端,子进程为读端),若子进程未读取任务,则进行阻塞等待
- 通过随机数分配任务给随机子进程,让子进程负载均衡
代码设计流程:
- 建立任务表,建立子进程及和子进程通信的信道
- 父进程控制子进程,进行任务分配
- 回收子进程信息
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <vector>
using namespace std;
#define MakeSeed() srand((unsigned int)time(nullptr) ^ getpid())
#define PROCESS_NUM 5
#define TASK_NUM 10
typedef void(*func_t)();
void DownLoad()
{
cout << getpid() << " DownLoad Task" << endl;
sleep(1);
}
void IOTask()
{
cout << getpid() << " IO Task" << endl;
sleep(1);
}
void FlushTask()
{
cout << getpid() << " Flush Task" << endl;
}
void LoadTaskFunc(vector<func_t>& funcMap)
{
funcMap.push_back(DownLoad);
funcMap.push_back(IOTask);
funcMap.push_back(FlushTask);
}
/
class subEp
{
public:
subEp(pid_t subID, int writeFd)
: _subID(subID), _writeFd(writeFd)
{
char nameBuffer[1024];
snprintf(nameBuffer, sizeof(nameBuffer), "process-%d[pid(%d)-fd(%d)]", num++, subID, _writeFd);
_name = nameBuffer;
}
static int num;
string _name;
pid_t _subID;
int _writeFd;
};
int subEp::num = 0;
// 读取
int RecvTask(int readFd)
{
int code = 0;
ssize_t s = read(readFd, &code, sizeof(code));
if (s == sizeof(int))
return code;
else if (s <= 0)
return -1;
else
return 0;
}
void CreateSubProcess(vector<subEp>& subs, vector<func_t>& funcMap)
{
vector<int> deleteFd;
for (int i = 0; i < PROCESS_NUM; ++i)
{
int fds[2];
int n = pipe(fds);
assert(n == 0);
(void) n;
pid_t id = fork();
if (id == 0)
{
//建立一对一管道
for (int i = 0; i < deleteFd.size(); ++i)
close(deleteFd[i]);
//子进程处理任务
close(fds[1]);
while (true)
{
// 1. 获取任务码,如果没收到任务码,则阻塞等待
int commandCode = RecvTask(fds[0]);
// 2. 执行任务
if (commandCode >= 0 && commandCode < funcMap.size())
funcMap[commandCode]();
else
break;
}
exit(0);
}
close(fds[0]);
subEp sub(id, fds[1]);
subs.push_back(sub);
deleteFd.push_back(fds[1]);
}
}
void SendTask(const subEp& process, int taskNum)
{
cout << "send task num " << taskNum << " to " << process._name << endl;
int n = write(process._writeFd, &taskNum, sizeof(taskNum));
assert(n == sizeof(int));
(void)n;
}
void LoadBlanceContrl(const vector<subEp>& subs, vector<func_t>& funcMap, int count)
{
int procNum = subs.size();
int taskNum = funcMap.size();
while (count--)
{
int subIDx = rand() % procNum;
int taskIDx = rand() % taskNum;
SendTask(subs[subIDx], taskIDx);
sleep(1);
}
for (int i = 0; i < procNum; ++i)
close(subs[i]._writeFd);
}
void WaitProcess(vector<subEp>& subs)
{
int procNum = subs.size();
for (int i = 0; i < procNum; ++i)
{
waitpid(subs[i]._subID, nullptr, 0);
cout << "wait sub process success ... " << subs[i]._subID << endl;
}
}
int main()
{
MakeSeed();
// 1. 建立子进程并建立和子进程通信的信道
// [子进程id,wfd]
vector<func_t> funcMap;
LoadTaskFunc(funcMap);
vector<subEp> subs;
CreateSubProcess(subs, funcMap);
// 2. 父进程控制子进程
LoadBlanceContrl(subs, funcMap, TASK_NUM);
// 3. 回收子进程信息
WaitProcess(subs);
return 0;
}
三、命名管道
让不同进程打开指定名称(路径+文件名)的同一个文件
comm.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <fcntl.h>
#include <fstream>
#define NAMED_PIPE "/root/test/pipe/named_pipe"
bool CreateFilo(const std::string& path)
{
umask(0);
int n = mkfifo(path.c_str(), 0666);
if (n == 0)
{
return true;
}
else
{
std::cout << "errno: " << errno << "err string: " << strerror(errno) << std::endl;
return false;
}
}
void RemoveFifo(const std::string& path)
{
int n = unlink(path.c_str());
assert(n == 0); //意料之中用assert判断,意料之外用 if else
(void)n;
}
client.cc
#include "comm.hpp"
int main()
{
std::cout << "client begin" << std::endl;
int wfd = open(NAMED_PIPE, O_WRONLY, 0666);
std::cout << "client end" << std::endl;
if (wfd < 0)
exit(1);
//write
char buffer[1024];
while (true)
{
std::cout << "please say# ";
fgets(buffer, sizeof(buffer), stdin);
if (strlen(buffer) > 0)
buffer[strlen(buffer) - 1] = 0;
ssize_t n = write(wfd, buffer, strlen(buffer));
assert(n == strlen(buffer));
(void)n;
}
close(wfd);
return 0;
}
serve.cc
#include "comm.hpp"
int main()
{
bool r = CreateFilo(NAMED_PIPE);
assert(r);
(void)r;
std::cout << "serve begin" << std::endl;
int rfd = open(NAMED_PIPE, O_RDONLY);
std::cout << "serve end" << std::endl;
if (rfd < 0)
exit(1);
//read
char buffer[1024];
while (true)
{
ssize_t s = read(rfd, buffer, sizeof(buffer) - 1);
if (s > 0)
{
std::cout << "client->serve# " << buffer << std::endl;
}
else
{
perror("read");
exit(1);
}
}
close(rfd);
RemoveFifo(NAMED_PIPE);
return 0;
}