目录
一、进程间通信的概念
二、匿名管道
2.1 什么是管道
2.2 管道的实现
2.3 管道的使用
三、进程池
3.1 进程池实现逻辑
3.2 模拟任务表
3.3 进程池的创建
四、命名管道
4.1 创建命名管道
4.2 命令管道的使用
一、进程间通信的概念
进程具有独立性,进程间想通信,难度比较大,所以我们使用特殊的方式让进程看到同一份结构,然后让其在公共资源中进行通信,
进程间通信的本质:先让不同的进程看到同一份资源(内存空间)
以下是进程间进行通信所能达到的目的:
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信的必要性:
单进程,那么也就无法使用并发能力,更加无法实现多进程协同(传输数据,同步执行流,消息通知等)。
进程间通信的技术背景:
- 进程是具有独立性的,虚拟地址空间+页表,保证进程运行的独立性(进程内核数据结构+进程的代码和数据)
- 通信成本比较高
进程间通信的本质理解:
- 进程间通信的前提,首先需要让不同的进程看到同一块"内存"(特定的结构)
- 所以所谓的进程看到同一块内存,属于哪一个进程?不能隶属于任何一个进程,而应该强调共享。
所以本篇博客的内容主要介绍以下这几种进程间通信的方式:
1.匿名管道;2.命名管道;3.共享内存
二、匿名管道
2.1 什么是管道
- 管道是Unix中最古老的进程间通信的形式。
- 我们把从一个进程链接到另一个进程的一个数据流称为一个"管道"。
- 有一个入口,有一个出口,进行单向传输内容,管道中传输的都是"资源"。
管道通信背后是进程之间通过管道进行通信。
我们实现管道通信的原理就是让两个进程分别以读写方式打开一个公共文件,而其中这个公共文件,就叫做管道,管道的本质就是文件。
2.2 管道的实现
现在我使用画图的方式演示两个进程间是如何实现管道的,首先父进程分别以读、写的形式打开一个文件:
然后我们创建子进程,子进程会拷贝父进程的files_stuct ,所以两个进程的files_stuct 就指向了相同的文件,如图所示:
然后我们关闭一个进程的读端,再关闭另一个进程的写端,实现一个进程写数据,一个进程读取数据,就实现了管道。关闭的这个动作,就是确定管道的通信方向。
即关闭不需要的文件描述符,我们来实现父进程进行写入,子进程进行读取:
2.3 管道的使用
有了上面的原理图,接下来我们就来创建管道并使用管道。
首先我们要掌握一个系统调用 pipe ,用于创建匿名管道。
函数参数:
参数中 pipefd[2] 是一个输出型参数,会返回读方式打开、写方式打开的文件描述符。
返回值:
成功返回0,错误返回-1,并设置错误码。
接下来我们就简单的使用一个这个接口,然后看看其打开的文件 fd 是多少。
#include <iostream>
#include <assert.h>
#include <unistd.h>
using namespace std;
int main()
{
int pipefd[2] = {0}; //fd[0]是读端,fd[1]是写端
int n = pipe(pipefd);
assert(n != -1);
(void)n; // assert在release中无效,一个变量定义但没被使用会进行报错。
cout << "pipefd[0]:" << pipefd[0] << endl;
cout << "pipefd[1]:" << pipefd[1] << endl;
return 0;
}
因为stdin、stdout、stderr对应0.1.2,所以再打开的fd就是3和4了。
好的,以上就是测试代码部分,然后我们就可以使用条件编译手段将其进行屏蔽。
这个条件编译的意思是,如果是DEBUG调试,就执行以下代码,如果不是DEBUG调试,则不执行。
接下来是我们就要创建子进程,并构建单向通信的管道。
原理是:
使用fork创建子进程,让子进程关闭写端,让子进程使用 read 函数进行读取;父进程使用 write 往对应写端的fd写入数据,然后使用 waitpid 等待子进程的退出。
int main()
{
// 1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n != -1);
(void)n;
// 2.创建子进程
pid_t id = fork();
assert(id != -1);
// 子进程 --- 读取
if (id == 0)
{
// 3.创建单向通信的管道 关闭进程不需要的fd
close(pipefd[1]); // 进行读取
char buffer[1024];
while (1)
{
ssize_t sz = read(pipefd[0], buffer, sizeof(buffer) - 1);
if (sz > 0)
{
buffer[sz] = '\0';
printf("child:[%d] receive: %s\n", getpid(), buffer);
}
}
close(pipefd[0]);
exit(0);
}
// 父进程 --- 写入
close(pipefd[0]);
string message = "i am father process ,accept the message";
int count = 0; // 发送消息的条数
char send_buffer[1024];
while (1)
{
// snprintf--向字符串中打印内容
snprintf(send_buffer, sizeof(send_buffer), "%s pid:[%d],count:%d", message.c_str(), getpid(), count++);
// 将管道中写入数据
write(pipefd[1], send_buffer, strlen(send_buffer));
sleep(1);
}
// 等待子进程退出
pid_t ret = waitpid(id, nullptr, 0);
assert(ret < 0);
(void)ret;
close(pipefd[1]);
exit(0);
return 0;
}
运行结果如下:
其中父进程每一秒进行一次写入,子进程就将数据进行了读取。
管道的现象:
- 写入很快,读取很慢,写满则不能再写入了。
- 写入很慢,读取很快,管道没有数据时,读端等待。
- 写端关闭,读端返回0,标识读到了文件结尾。
- 读端关闭,写端继续写入,OS会终止进程。
管道特点如下:
- 管道是用来进行具有血缘关系的进程进行进程间通信的 --- 常用于父子通信
- 管道具有通过让进程间系统,提供了访问控制,即有数据再读,无数据则不读。
- 管道提供的是面向流式的通信服务 --- 面向字节流 -- 通过协议实现
- 管道是基于文件的,如果写入的一方关闭,则读取的一方read会返回0,表示读到了文件的结尾!
- 管道是单向通信的,即半双工通信的一种特殊情况。
三、进程池
3.1 进程池实现逻辑
上面是使用管道实现进程间通信,那管道在实际运行中我们可以做什么呢?
接下来就简单实现一个进程池,原理如下:
我们首先创建一个父进程,当用户需要执行特定任务时,我们让父进程随机调用其下的子进程去完成任务。
3.2 模拟任务表
我们先编写模拟任务、任务操作表、任务描述表。
#pragma once
#include <iostream>
#include <assert.h>
#include <string>
#include <unordered_map>
#include <vector>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <functional>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
typedef function<void()> func;
// using func = std::function<void()>; C++11支持的写法
vector<func> taskArr; // 任务操作表
unordered_map<int, string> desc; // 任务描述表
// 4个模拟任务
void readMySQL()
{
cout << "process[" << getpid() << "] 执行 # 数据库读取 # 任务\n";
}
void execuleUrl()
{
cout << "process[" << getpid() << "] 执行 # 字符串解析 # 任务\n";
}
void subTask()
{
cout << "process[" << getpid() << "] 执行 # 任务提交 # 任务\n";
}
void saveData()
{
cout << "process[" << getpid() << "] 执行 # 数据存储 # 任务\n";
}
void load()
{
desc.insert({taskArr.size(), "readMySQL:数据库读取"});
taskArr.push_back(readMySQL);
desc.insert({taskArr.size(), "execuleUrl:字符串解析"});
taskArr.push_back(execuleUrl);
desc.insert({taskArr.size(), "subTask:任务提交"});
taskArr.push_back(subTask);
desc.insert({taskArr.size(), "saveData:数据存储"});
taskArr.push_back(saveData);
}
void showTask()
{
for (const auto &Tast : desc)
{
cout << Tast.first << "\t" << Tast.second << endl;
}
}
int taskSize()
{
return taskArr.size();
}
3.3 进程池的创建
然后我们使用fork函数让父进程创建5个子进程,用户想要调用任务时,就随机分配给任意一个子进程完成任务。
#include "Test.hpp"
#define PROCESS_NUM 5
int waitingTask(int waitFd, bool &quit)
{
uint32_t command = 0; // uint32_t 无符号4字节整形
ssize_t s = read(waitFd, &command, sizeof(command));
if (s == 0)
{
quit = true;
return -1;
}
assert(s == sizeof(uint32_t)); // 返回的必须是4字节
return command;
}
void sendAndWakeup(pid_t id, int fd, uint32_t command)
{
write(fd, &command, sizeof(command));
cout << "call process:[" << id << "] execute task:" << desc[command] << "through fd:" << fd << endl;
sleep(1);
}
int main()
{
// 代码中有一个关于fd的处理,有一个小问题,不影响使用,但是请找出
load();
// 父进程下的子进程表
vector<pair<pid_t, int>> slots;
for (int i = 0; i < PROCESS_NUM; i++) // 012
{
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n != -1);
(void)n;
// 创建子进程
pid_t id = fork();
assert(id != -1);
// 子进程进行读取
if (id == 0)
{
close(pipefd[1]);
while (true)
{
bool quit = false;
// 子进程开始等任务
int command = waitingTask(pipefd[0], quit); // 无任务,则阻塞,使用commmand接受任务分配
if (quit)
break;
// 执行对应任务
if (command >= 0 && command < taskSize())
{
taskArr[command]();
}
}
exit(1);
}
// father 进行写入,关闭读端
close(pipefd[0]);
// 将子进程的pid 和父进程的写端fd 放入到进程表中
slots.push_back({id, pipefd[1]});
}
// 现在父进程已经拥有PROCESS_NUM个子进程了 ,父进程派发任务
srand((unsigned int)time(nullptr) ^ getpid() ^ 123123123);
while (true)
{
int command;
int select;
// command = rand() % taskSize();
// int choice = rand() % slots.size();
// sendAndWakeup(slots[choice].first, slots[choice].second, command);
cout << "***********************************************" << endl;
cout << "***** 1.show functions 2.send command *****" << endl;
cout << "***********************************************" << endl;
cout << "Please select>" << endl;
cin >> select;
if (select == 1)
{
showTask();
}
else if (select == 2)
{
showTask();
cout << "Enter Your Command>";
// 选择任务
cin >> command;
// 选择进程
int choice = rand() % taskSize();
// 分配任务
sendAndWakeup(slots[choice].first, slots[choice].second, command);
}
}
// 关闭写端fd,所有的子进程都会退出
for (const auto slot : slots)
{
close(slot.second);
}
// 回收子进程
for (const auto &slot : slots)
{
waitpid(slot.first, nullptr, 0);
}
return 0;
}
输入1则打印任务表,输入2可以选择任务,然后父进程随机将任务派发给子进程。运行结果如下:
同时我们还可以使用ps ajx | head -1 && ps axj | grep ProcessPool这个脚本来看看进程池中的进程数量,效果检测:
四、命名管道
匿名管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来实现,这种管道泵称为命名管道。
命名管道是一种特殊的文件类型.
4.1 创建命名管道
命名管道可以从命令行上创建,命令行方法是使用下面这个命令。
使用:
发送与接收
接下来就是在我们的程序中创建管道
返回值:
成功返回0,失败返回-1;
参数:
第一个参数是传入待创建管道的管道名,第二个参数是待创建管道的权限(#define MODE 0666)
4.2 命令管道的使用
此时我们可以模拟服务端与客户端的交互,服务端创建管道文件并打开读取,客户端打开服务端创建的管道文件进行写入数据。
然后我们将服务端进行优化,创建3个子进程,3个子进程都处于同时等待读数据的模式,3个子进程争取读数据。
服务端(server)
#include "comm.hpp"
#include "log.hpp"
static void getMessage(int fd)
{
char buffer[BUFFSIZE];
while (1)
{
memset(buffer, '\0', sizeof(buffer));
ssize_t s = read(fd, buffer, sizeof(buffer));
if (s > 0)
{
cout << "[" << getpid() << "]"
<< "cliet messages>" << buffer << endl;
}
else if (s == 0)
{
// end of file
cerr << "[" << getpid() << "]"
<< "client quit,read end" << endl;
break;
}
else
{
// read error
perror("read");
break;
}
}
}
int main()
{
// 1.创建管道文件
if (mkfifo(ipcPath.c_str(), MODE) < 0)
{
perror("mkfifo");
exit(1);
}
Log("创建管道文件成功", Debug) << "step 1" << endl;
// 2.正常的文件操作
int fd = open(ipcPath.c_str(), O_RDONLY);
if (fd < 0)
{
perror("open");
exit(2);
}
Log("打开管道文件成功", Debug) << "step 2" << endl;
int nums = 3;
for (int i = 0; i < nums; i++)
{
pid_t id = fork();
if (id == 0)
{
// 编写正常的通信代码
getMessage(fd);
exit(1);
}
}
for (int i = 0; i < nums; i++)
{
waitpid(-1, nullptr, 0);
}
// 关闭文件
close(fd);
Log("关闭管道文件成功", Debug) << "step 3" << endl;
// 删除文件
unlink(ipcPath.c_str());
Log("删除管道文件成功", Debug) << "step 4" << endl;
return 0;
}
客户端(client)
#include "comm.hpp"
#include "log.hpp"
int main()
{
// 1.指向同一个空间
int fd = open(ipcPath.c_str(), O_WRONLY);
if (fd < 0)
{
perror("open");
exit(1);
}
// ipc通信
string buffer;
while (1)
{
cout << "Please enter the messages" << endl;
getline(cin, buffer);
write(fd, buffer.c_str(), buffer.size());
}
close(fd);
return 0;
}
让子进程抢数据。grep axj | grep mysever