文章目录
- 1. 匿名管道
- 2. 利用通过匿名管道实现进程间通信
- 2.1 实现思路
- 2.2 父子进程实现通信的简单代码
- 2.3 结果展示如下
- 3. 总结管道特点
- 4. 扩展(好玩的--简单内存池)
- 思路:
- 代码:
1. 匿名管道
-
查看手册(man):
-
翻译
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码 -
视图
2. 利用通过匿名管道实现进程间通信
2.1 实现思路
- 父进程创建管道
- 父进程fork子进程(这样子和父进程就可以进行通信了)
- 父进程关闭fd[0]–读,子进程关闭fd[1]–写
2.2 父子进程实现通信的简单代码
- 要求:
- 父进程向管道当中写“i am father”,
- 子进程从管道当中读出内容, 并且打印到标准输出
#include <iostream>
#include <cstdio>
#include <cstring>
#include <string>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
int main()
{
//1. 创建管道
int pipefd[2]={0}; //pipefd[0] 读, pipefd[1]写
int n=pipe(pipefd);
assert(n!=-1);
(void)n; //debug assert , release assert
#ifdef DEBUG
cout<<"pipefd[0]: "<<pipefd[0]<<endl;
cout<<"pipefd[1]: "<<pipefd[1]<<endl;
#endif
//2. 创建子进程
pid_t id=fork();
assert(id !=-1);
if(id==0)
{
//子进程
//3. 构建单项通道,子进程读取,父进程写入
//3.1 关闭子进程不需要的fd
close(pipefd[1]);
char buffer[1024];
while(true)
{
ssize_t s=read(pipefd[0], buffer, sizeof(buffer)-1);
if(s>0)
{
buffer[s]=0;
cout<<"Father:"<<buffer<<endl;
}
break;
}
exit(0);
}
//父进程
//3. 构建单项通道,子进程读取,父进程写入
//3.1 关闭父进程不需要的fd
close(pipefd[0]);
string message="i am father";
int count=0;
char send_buffer[1024];
while(true)
{
//3.2 变化的字符串
snprintf(send_buffer, sizeof(send_buffer), "%s", message.c_str());
//3.3写入
write(pipefd[1], send_buffer, strlen(send_buffer));
break;
}
pid_t ret = waitpid(id, nullptr, 0);
assert(ret!=-1);
(void)ret;
return 0;
}
- 注意:
子进程必须等父进程写完才能读;也就是父进程再写的时候子进程在阻塞式等待。
2.3 结果展示如下
3. 总结管道特点
- 管道是用于进行具有血缘关系的进程进行进程通信的–常用于父子通信
- 管道具有通过让进程间协同,提供了访问控制!
- 管道提供的是面向流式的通信服务–面向字节流–协议
- 管道是基于文件的,文件的生命周期随进程的,即管道的生命周期随进程的。
- 管道是单项通信的,就是半双工通信的一种特殊情况。
4. 扩展(好玩的–简单内存池)
- 父进程创建多个进程,父进程给每个进程分发任务(单机版的负载均衡也就随机种子srand)。
思路:
创建多个进程,子进程按找父进程发送的指令执行任务;创建的每个进程的pid和匿名管道fd[1]利用pair包装后用vector存起来。
1. 任务表
using func = std::function<void()>;
std::vector<func> callbacks; //任务调用(根据下标)
std::unordered_map<int, std::string> desc; //任务栏信息
2. 子进程读取命令
int waitCommand(int waitFd, bool &quit) //如果对方不发,我们一直阻塞式等待
3. 父进程唤醒子进程(写入管道,也就是输入指令)
void sendAndWakeup(pid_t who, int fd, uint32_t command)
规定:4字节输入流
代码:
#include <iostream>
#include <unistd.h>
#include <cstdio>
#include <ctime>
#include <cassert>
#include <sys/wait.h>
#include <sys/types.h>
#include <vector>
#include <string>
#include "Task.hpp"
using namespace std;
int waitCommand(int waitFd, bool &quit) //如果对方不发,我们一直阻塞式等待
{
uint32_t command = 0;
ssize_t s = read(waitFd, &command, sizeof(command));
if (s == 0) //退出
{
quit = true;
return -1;
}
assert(s == sizeof(uint32_t));
return command;
}
void sendAndWakeup(pid_t who, int fd, uint32_t command)
{
write(fd, &command, sizeof(command));
cout << "主进程: 唤醒进程: " << who << " 命令: " << desc[command] << "通过: " << fd << endl;
}
#define process_Num 5
int main()
{
load();
// pid, pipefd
vector<pair<pid_t, int>> slots;
//创建多个进程
for (int i = 0; i < process_Num; i++)
{
//创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n != -1);
(void)n;
pid_t id = fork();
assert(id != -1);
//子进程读取
if (id == 0)
{
// child
//关闭写端
close(pipefd[1]);
while (true)
{
// pipefd[0]
//等命令
bool quit = false;
int command = waitCommand(pipefd[0], quit); //如果对方不发,我们一直阻塞式等待
if (quit)
break;
//执行命令
if (command >= 0 && command < handlerSize())
{
callbacks[command]();
}
else
{
cout << "非法command: " << command << endl;
}
}
exit(0);
}
// father
//父进程写入
//关闭读端
close(pipefd[0]); // pipefd[1]
slots.push_back(make_pair(id, pipefd[1]));
}
//开始任务 父进程派发任务
srand((unsigned long)time(nullptr) ^ getpid() ^ 303200109240139); //让我们的数据源更随机
while (true)
{
//选择任务
int command = rand() % handlerSize();
//选择进程
int choice = rand() % slots.size();
//布置任务给指定进程
sendAndWakeup(slots[choice].first, slots[choice].second, command);
sleep(2);
// int select;
// int command;
// cout << "######################################################" << endl;
// cout << "# 1. show functions 2.send command #" << endl;
// cout << "######################################################" << endl;
// cout << "please select> ";
// cin >> select;
// if (select == 1)
// showHandler();
// else if (select == 2)
// {
// cout << "输入你的指令> ";
// //选择任务
// cin >> command;
// //选择进程
// int choice = rand() % slots.size();
// //布置任务给指定进程
// sendAndWakeup(slots[choice].first, slots[choice].second, command);
// }
// else
// {
// }
}
//关闭fd 所有的子进程退出
for (const auto &slot : slots)
{
close(slot.second);
}
//回收所有子进程信息
for (const auto &slot : slots)
{
waitpid(slot.first, nullptr, 0);
}
return 0;
}
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>
#include <functional>
using func = std::function<void()>;
std::vector<func> callbacks;
std::unordered_map<int, std::string> desc;
void readMySQL()
{
std::cout << "sub process[" << getpid() << "] 执行访问数据库的任务\n" << std::endl;
}
void execuleUrl()
{
std::cout << "sub process[" << getpid() << "] 执行Url解析\n" << std::endl;
}
void cal()
{
std::cout << "sub process[" << getpid() << "] 执行加密任务\n" << std::endl;
}
void save()
{
std::cout << "sub process[" << getpid() << "] 执行数据持久化任务\n" << std::endl;
}
//任务表
void load()
{
desc.insert({callbacks.size(), "readMySQL: 读取数据库"});
callbacks.push_back(readMySQL);
desc.insert({callbacks.size(), "execuleUrl: 进行Url解析"});
callbacks.push_back(execuleUrl);
desc.insert({callbacks.size(), "cal: 执行加密"});
callbacks.push_back(cal);
desc.insert({callbacks.size(), "save: 执行数据持久化"});
callbacks.push_back(save);
}
//展示多少任务
void showHandler()
{
for (const auto &iter : desc)
{
std::cout << iter.first << "\t" << iter.second << std::endl;
}
}
//多少方法
int handlerSize()
{
return callbacks.size();
}