写在前面
今天主要的任务就是知道什么是进程通信?进程通信是如何实现的?前面我们学习了基础IO,再往前看又学习进程的相关的概念,那么今天我们通过进程的通信来把他们用起来.这个话题挺重要的,但是没有前面的大.
进程通信
"通信"这个单词很好理解,就是两个或者多个事物之间相互交谈.那么进程之间是如何交谈的呢?进程通信有什么实际应用呢?今天这个博客就会深入浅出的带大家了解.
通信背景
我们在进程那里就一直强调,进程具有独立性,甚至父子进程一旦父进程或者子进程修改原本的数据都会发生写时拷贝.这个就造成了进程与进程之间是不能够相互交流的或者交流的成本非常高.那么他们是真的完全独立吗?要知道藕断还会丝连,这里独立性不是彻底的独立,当然独立的程度也不会很少,这里我们理解要把握一个度.想一想如果现实世界我们人与人无法交流是多么令人绝望的一件事啊.现实中声音通过介质传递,那么进程于进程直接是不是可以通过某些事物来帮助我们完成通信呢?这里是可以的,也就是我们今天的内容.
通信目的
在一个大型项目中,我们是多个程序员相互协作完成工作的.例如程序员张三只需要负责数据处理模块,李四只需要负责缓存模块…这个相互协作肯定会涉及到人与人之间的交流,否则张三闷头一直干,遇到了缓存相关的任务也不和李四交流,自己查资料解决.如果一个人连自己负责的任务都不了解,就可能会造成总体进度进展缓慢.同理进程之间也是需要交流的,我们可以通过进程通信让不同的进程处理不同的内容.
下面我简单的说一下为何要是进程通信:
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变
这里简单谈一下进程控制,前面我们在调试代码的时候就是进程控制,我们可以通过debug这个进程来使程序走一步,是否进入函数等操作.
通信发展
这里就不再阐述通信技术是如何发展的,总而言之就是一大堆大佬在一起去研究,最终大浪淘沙现在剩下下面的东西.下面的内容我们都会谈的,不过有的要放在线程中.
- 管道
- System V进程间通信 主要是本地通信,现在很少使用,流行分布式,跨主机
- POSIX进程间通信
注意,上面我写的是三个,实际上是两套标准,管道这个可以理解为天然存在的.关于这两套标准我们再后面都会学习,即使System V有些老,这里我们也会认识认识,都不要着急.
通信本质
我们可以很容易想到,我们进程通信是前提是我们需要先看到通一份资源,也就是让不同的进程看到同一份资源,这一份资源可以是文件,内存,甚至是我们后面的网络.我们可以把这个资源理解成声音传播的介质,它是什么不重要,重要的是它可以帮助我们传输数据.后面的所有的学习都是围绕这个展开的.
注意,我们一般很少把数据刷到外设中,也就是我们这个资源只是挂个名,这个我在后面会演示到.
管道
我们先来学习管道的通信方式.里面涉及到的内容很多,但是很简单.大家要好好学,后面我们他来制作一个进程池,这也为后面的其他的通信方式及打下基础.管道分为两类,我们后面都会用到.
- 匿名管道
- 命名管道
管道原理
我们先把管道的原理给大家说一下.前面我们知道进程的task_struct,这里我们还要借助前面的基础IO.在创建子进程的时候我们还要不要把打开的文件被拷贝一份呢?不需要.但是子进程的大多数据都来自父进程,其中文件描述符就是要被拷贝的.
此时我们父子进程就指向了同一资源了,这同样解释了我们之前父子进程在打印的时候都打印在显示器上的原因.如果我们的资源是管道,我们在内存中直接创建一个文件,这个文件是不需要刷新到磁盘.我们有一个问题,它是如何知道我打开这个文件是管道文件还是普通文件呢?我们看看源码,这个在inode里面有关于文件的分类,其中一个就是管道文件,我们不需要把文件刷新到磁盘,只需要接借助文件的缓冲区就可以了.
管道特点
大家在生活中见过水龙头,天然气管道…,生活中的管道是随处可以想到的.我想问问大家大部分的管道是可以同时从右向左和从左向右传输事物的吗?不太可能,也就是管道一般都是单向的,这也是管道文件的特点.数据是计算机最重要的资源,是信息领域的石油,也就是管道里面流动的是数据.Linux下我们可以通过文件来传输数据,此时管道就是一个内存级文件,它的数据不需要刷新到磁盘.此时我们就可以描述管道的特点了.
- 单项的 管道本身具有的
- 传送数据的 这是管道的功能
请问那么我们是如何把管道成单项的呢?内核是如何实现的.这里很简单.我们创建一个文件,时的两个fd分别指向管道的写端和读端,注意读端和写端是一个抽象概念,不需要细究.
这个时候我们创建一个子进程,子进程的大部分数据来自父进程,也就是两个fd都被继承了.
我们让父进程关闭一个写端子进程关闭一个读端(或者反过来),此时我们就可以父子进程发生进程通信了.
上面的所有操作我们都可以通过内核提供的接口来实现,所以不用着急.这里有几个问题,为何父进程要打开读写两个端口,这是由于子进程看到的完全都是继承父进程的,我们开始的时候父子进程不知道哪个写哪个读.我们打开两个,这样子进程不需要自己打开了.父子进程为何都要关闭一段?这是由于管道是单项的,我们必须保证这一点.是谁决定父进程读还是写?这是需求决定的,当客户提出需求,我们按需求来写.
匿名管道
内核提供了创建管道文件的接口,这是一个输入输出性的参数.
[bit@Qkj 12_09]$ man 2 pipe
这个接口如果打开文件成功,返回0,如果打开失败,此时返回-1,并且错误码被设置.我们先用一下这个接口,看看效果.
#include <unistd.h>
#include <stdio.h>
int main()
{
int pipefd[2] = {0};
pipe(pipefd);
printf("pipefd[0] %d\n", pipefd[0]);
printf("pipefd[1] %d\n", pipefd[1]);
return 0;
}
这个接口,下标为0的表示读端,1代表写端,这个1是不是很像一根笔,我们可以这么记忆.当我们知道了客户要求的时候,就可以决定父子进程关闭哪一端了.此时我们让父进程进行写,让子进程进行读.我们开始创建子进程.关于创建子进程的相关知识我就不说了,这里在之前都谈过,下面直接开始吧.
int main()
{
int pipefd[2] = {0};
if (pipe(pipefd) != 0)
{
cerr << "pipe open fail" << errno << endl;
}
// 创建子进程
pid_t id = fork();
if (id < 0)
{
// 失败
}
else if (id == 0)
{
// child 关闭 写
close(pipefd[1]);
}
else
{
// parent 关闭 读
close(pipefd[0]);
}
return 0;
}
这个时候我们们就可以实现父子进程之间的通信了,我们把管道当作普通的文件进行读写,这一点基础IO的我们也是谈过的.我们先来观察一下现象.
int main()
{
int pipefd[2] = {0};
if (pipe(pipefd) != 0)
{
cerr << "pipe open fail" << errno << endl;
}
// 创建子进程
pid_t id = fork();
if (id < 0)
{
// 失败
}
else if (id == 0)
{
// child 关闭写
close(pipefd[1]);
#define NUM 100
char buffer[NUM];
while (1)
{
memset(buffer, '\0', sizeof(buffer));
ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);
if (s == 0)
{
cout << "读端关闭,我也退出了" << endl;
break;
}
else if (s < 0)
{
cout << "读取错误" << endl;
break;
}
else
{
buffer[s] = '\0';
cout << buffer << endl;
}
}
close(pipefd[0]);
return 0; // 子进程在这里推虎
}
else
{
// parent 关闭 读
close(pipefd[0]);
string msg = "你好子进程.我是父进程";
int cnt = 0;
while (cnt < 5)
{
write(pipefd[1], msg.c_str(), msg.size());
cnt++;
sleep(1);
}
cout << "父进程已经接完了" << endl;
close(pipefd[1]);
}
// 等待子进程
pid_t ret = waitpid(id, NULL, 0);
if (ret > 0)
{
cout << "等待成功" << endl;
}
return 0;
}
这里我有几个问题想和大家讨论一下.为何当我们关闭写段的时候,读端读到文件结尾后就认为这个已经结束了?这是由于在文件中存在一个引用计数,当只有一个进程连着这个文件,读到文件的末尾就是可以认为是文件读取结束了.
不知道你看到一个现象没有,我是让父进程带了个sleep,那么为何子进程也会等待?这是由于管道设计的模式是阻塞,当文件中没有数据的时候,读端就会进行阻塞等待,我们把时间戳打出来,便于我们观察.
int main()
{
int pipefd[2] = {0};
pipe(pipefd);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
// child 关闭写
close(pipefd[1]);
#define NUM 100
char buffer[NUM];
while (1)
{
memset(buffer, '\0', sizeof(buffer));
ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);
if (s == 0)
{
cout << "读端关闭,我也退出了" << endl;
break;
}
else if (s < 0)
{
cout << "读取错误" << endl;
break;
}
else
{
buffer[s] = '\0';
cout << buffer << endl;
cout << "时间戳是 : " << (uint64_t)time(nullptr) << endl;
}
}
close(pipefd[0]);
return 0; // 子进程在这里推虎
}
else
{
// parent 关闭 读
close(pipefd[0]);
string msg = "你好子进程.我是父进程";
int cnt = 0;
while (cnt < 5)
{
write(pipefd[1], msg.c_str(), msg.size());
cnt++;
sleep(2);
}
cout << "父进程已经接完了" << endl;
close(pipefd[1]);
}
// 等待子进程
pid_t ret = waitpid(id, NULL, 0);
if (ret > 0)
{
cout << "等待成功" << endl;
}
return 0;
}
这里我们就可以发现子进程的读取是以父进程为主的,这就是管道的阻塞等待.子进程等待父进程写入完成后,才会进行读取.子进程要以父进程的节奏为主.那么我们可以这么说父子进程在读写的时候是有一定的顺序经行的,我做了你才能能做.这个就称之为自带访问控制机制.以前我们父子进程往显示器上打印数据,那是争先恐后的,这叫缺乏访问控制.
在来解释最后一个疑问,阻塞等待的本质就是将当前进程的pcb放入等待队列中.请问我放在了哪一个等待队列中?这个等待队列是管道自带的,在管道资源中等待
上面我们都是简单的看一下管道文件是如何用的,这里给大家添加一些内容,让子进程处理不同的任务.
typedef void (*functor)(); // 函数指针
vector<functor> functors; // 方法集合
unordered_map<uint32_t, string> info; // 任务描述
void f1()
{
cout << "这是一个处理日志的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n"
<< endl;
}
void f2()
{
cout << "这是一个备份数据任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n"
<< endl;
}
void f3()
{
cout << "这是一个处理网络连接的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n"
<< endl;
}
void loadFunctor()
{
info.insert({functors.size(), "处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据任务"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接的任务"});
functors.push_back(f3);
}
void work(int blockFd)
{
cout << "进程[" << getpid() << "]"
<< " 开始工作" << endl;
// 子进程核心工作的代码
while (true)
{
// a.阻塞等待 b. 获取任务信息
uint32_t operatorCode = 0;
ssize_t s = read(blockFd, &operatorCode, sizeof(uint32_t));
if (s == 0)
break;
assert(s == sizeof(uint32_t));
(void)s;
// c. 处理任务
if (operatorCode < functors.size())
functors[operatorCode]();
}
cout << "进程[" << getpid() << "]"
<< " 结束工作" << endl;
}
int main()
{
// 0. 加载任务列表
loadFunctor();
// 1. 创建管道
int pipefd[2] = {0};
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}
// 2. 创建子进程
pid_t id = fork();
if (id < 0)
{
cerr << " fork error " << endl;
return 2;
}
else if (id == 0)
{
// 3. 关闭不需要的文件fd
// child,read
close(pipefd[1]);
// 4. 业务处理
while (true)
{
uint32_t operatorType = 0;
// 如果有数据,就读取,如果没有数据,就阻塞等待, 等待任务的到来
ssize_t s = read(pipefd[0], &operatorType, sizeof(uint32_t));
if (s == 0)
{
cout << "我要退出啦,我是给人打工的,老板都走了...." << endl;
break;
}
assert(s == sizeof(uint32_t));
(void)s;
if (operatorType < functors.size())
{
// 处理任务
functors[operatorType]();
}
else
{
cerr << "bug? operatorType = " << operatorType << std::endl;
}
}
close(pipefd[0]);
exit(0);
}
else
{
srand((long long)time(nullptr));
// parent,write - 操作
// 3. 关闭不需要的文件fd
close(pipefd[0]);
// 4. 指派任务
int num = functors.size();
int cnt = 10;
while (cnt--)
{
// 5. 形成任务码
uint32_t commandCode = rand() % num;
std::cout << "父进程指派任务完成,任务是: " << info[commandCode] << " 任务的编号是: " << cnt << std::endl;
// 向指定的进程下达执行任务的操作
write(pipefd[1], &commandCode, sizeof(uint32_t));
sleep(1);
}
close(pipefd[1]);
pid_t res = waitpid(id, nullptr, 0);
if (res)
cout << "wait success" << endl;
}
return 0;
}
进程池
上面我们都是用管道控制一个进程,这里我想控制一批进程怎么样?可以的.我们对每一个子进程都创建一个管道文件,让每一个进程都处理不通的事物.同样的,前面在进程那里我们没有谈太多一个父进程是如何控制多个子进程的,这里算是补上去一点.
首先第一点,我们创建一批进程,这就要利用的到循环.那么请问,我们父进程是如何记住这些子进程的?我们还是让父进程去写,让子进程去读.那么我们应该记录什么?这里主要记录的就是每一个子进程对应的额写端,因为一会我们要往里面写入内容,顺带的把子进程的pid也保存一下吧,所以我们用map.
// int32_t: 进程pid, int32_t: 该进程对应的管道写端fd
typedef std::pair<int32_t, int32_t> elem;
int processNum = 5;
int main()
{
loadFunctor();
vector<elem> assignMap;
// 创建processNum个进程
for (int i = 0; i < processNum; i++)
{
// 定义保存管道fd的对象
int pipefd[2] = {0};
// 创建管道
pipe(pipefd);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
// 子进程读取, r -> pipefd[0]
close(pipefd[1]);
// 子进程执行
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//父进程做的事情, pipefd[1]
close(pipefd[0]);
elem e(id, pipefd[1]);
assignMap.push_back(e);
}
// 到这里一定是 父进程
// 派发任务
blanceSendTask(assignMap);
// 回收资源
}
现在我们就可以通过父进程给各个的子进程派发任务了,我们把这个派发任务写成一个函数,这里先不谈.主要是要看看先把子进程退出后资源给回收了.首先我们先把逻辑理通讯了.我们看看什么时候走到回收资源,肯定是父进程任务派发结束了,也就是写端关闭了,此时当写端关闭了,子进程读不到数据,我们自动让他退出,也就是子进程就剩下资源了,此时父进程就可以回收资源了.
// 回收资源
for (int i = 0; i < processNum; i++)
{
if (waitpid(assignMap[i].first, nullptr, 0) > 0)
cout << "wait for: pid=" << assignMap[i].first << " wait success!"
<< "number: " << i << "\n";
close(assignMap[i].second);
}
我们先来看看一个简单的函数,也就是子进程工作的函数.我们在work函数里面是传入读端的,所以这里很容易,直接读就可以了.
void work(int blockFd)
{
cout << "进程[" << getpid() << "]" << " 开始工作" << endl;
// 子进程核心工作的代码
while (true)
{
// a.阻塞等待 b. 获取任务信息
uint32_t operatorCode = 0;
ssize_t s = read(blockFd, &operatorCode, sizeof(uint32_t));
if(s == 0) break;
assert(s == sizeof(uint32_t));
(void)s;
// c. 处理任务
if(operatorCode < functors.size()) functors[operatorCode]();
}
cout << "进程[" << getpid() << "]" << " 结束工作" << endl;
}
派发任务更加简单,这里所谓的派发任务,实际上就是给某一个子进程的管道写入数据,由于所有的子进程都在read那里阻塞等待,一旦父进程把数据写入了进去,此时子进程就可以经行读取了,这里就是原理.
void blanceSendTask(const vector<elem> &processFds)
{
srand((long long)time(nullptr));
while(true)
{
sleep(1);
// 选择一个进程, 选择进程是随机的,没有压着一个进程给任务
// 较为均匀的将任务给所有的子进程 --- 负载均衡
uint32_t pick = rand() % processFds.size();
// 选择一个任务
uint32_t task = rand() % functors.size();
// 把任务给一个指定的进程
write(processFds[pick].second, &task, sizeof(task));
// 打印对应的提示信息
cout << "父进程指派任务->" << info[task] << "给进程: " << processFds[pick].first << " 编号: " << pick << endl;
}
}
我们先把所有的代码给大家展示一下,这里我感觉还是有点问题的.
typedef void (*functor)();
vector<functor> functors; // 方法集合
// for debug
unordered_map<uint32_t, string> info;
// int32_t: 进程pid, int32_t: 该进程对应的管道写端fd
typedef std::pair<int32_t, int32_t> elem;
int processNum = 5;
void f1()
{
cout << "这是一个处理日志的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void f2()
{
cout << "这是一个备份数据任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void f3()
{
cout << "这是一个处理网络连接的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void loadFunctor()
{
info.insert({functors.size(), "处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据任务"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接的任务"});
functors.push_back(f3);
}
void work(int blockFd)
{
cout << "进程[" << getpid() << "]" << " 开始工作" << endl;
// 子进程核心工作的代码
while (true)
{
// a.阻塞等待 b. 获取任务信息
uint32_t operatorCode = 0;
ssize_t s = read(blockFd, &operatorCode, sizeof(uint32_t));
if(s == 0) break;
assert(s == sizeof(uint32_t));
(void)s;
// c. 处理任务
if(operatorCode < functors.size()) functors[operatorCode]();
}
cout << "进程[" << getpid() << "]" << " 结束工作" << endl;
}
// [子进程的pid, 子进程的管道fd]
void blanceSendTask(const vector<elem> &processFds)
{
srand((long long)time(nullptr));
while(true)
{
sleep(1);
// 选择一个进程, 选择进程是随机的,没有压着一个进程给任务
// 较为均匀的将任务给所有的子进程 --- 负载均衡
uint32_t pick = rand() % processFds.size();
// 选择一个任务
uint32_t task = rand() % functors.size();
// 把任务给一个指定的进程
write(processFds[pick].second, &task, sizeof(task));
// 打印对应的提示信息
cout << "父进程指派任务->" << info[task] << "给进程: " << processFds[pick].first << " 编号: " << pick << endl;
}
}
int main()
{
loadFunctor();
vector<elem> assignMap;
// 创建processNum个进程
for (int i = 0; i < processNum; i++)
{
// 定义保存管道fd的对象
int pipefd[2] = {0};
// 创建管道
pipe(pipefd);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
// 子进程读取, r -> pipefd[0]
close(pipefd[1]);
// 子进程执行
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//父进程做的事情, pipefd[1]
close(pipefd[0]);
elem e(id, pipefd[1]);
assignMap.push_back(e);
}
cout << "create all process success!" << std::endl;
// 父进程, 派发任务
blanceSendTask(assignMap);
// 回收资源
for (int i = 0; i < processNum; i++)
{
if (waitpid(assignMap[i].first, nullptr, 0) > 0)
cout << "wait for: pid=" << assignMap[i].first << " wait success!"
<< "number: " << i << "\n";
close(assignMap[i].second);
}
}
这里会有一个问题的,我们在派发任务结束后没有关闭子进程的写端,此时子进程根本不会退出,我们这里加上一个函数,它的功能就是关闭文件描述符.
void closeFd(const vector<elem> &processFds)
{
for(int i = 0; i< processFds.size();i++)
{
close(processFds[i].second);
}
}
我们这里看看指令的相关的管道,你会发现我们指令中的**|**也是管道,此时他们是两个兄弟进程进程进行通信,我们也是可以设计出来的,这里就不浪费大家的时间了.
这个时候我们就可以总结一下管道的特点了.
- 管道只能在具有血缘关系的进程之间通信,常用于父子之间
- 管道只能单项通信,这是由于管道的特性决定的,半双工的特殊性情况
- 管道自带同步机制,也就是访问机制
- 管道是面向字节流的,这里大家可能还感觉不到,等到网络那里再说
- 管道的声明周期是随进程的,进程退出后,文件也会关闭
上面我们谈到了一个词语,半双工,这里先和大家提出来,等到我们网络的时候在处理它.
- 半双工 – 两个人说话,你说的时候我在听,我说的时候你在听,不相互干扰
- 全双工 – 吵架,你说你的,我说我的,只管自己
命名管道
只能父子进程或者血缘关系的人通信是不是有点挫了.如果想让两个互不相关的进程通信是不是这里搞不定了?是的,我们匿名管道是不行的.这里我们用命名管道,关于命名管道,它的特性和上面是一样的,这里我们只看如何使用.这里我们先用指令给大家演示一下在命令行上这个的表示
在Linux中,内核给我们提供一个接口可以让互不相关的两个进程进行相互通信.我们先来看一看.
[bit@Qkj pipe]$ man 3 mkfifo
那么我们该如何通过代码实现呢?这里很简单,我们创建两个可执行程序,其中一个创建这个命名管道,要知道这个文件是具有有唯一的标识符也就是绝对路径的,我们让两个可执行程序一个写,一个读,此时就可以了.
我们先来创建一个头文件,这里保存我们等会要用的标准库文件,顺便我们也把管文件的路径带进去,这里为了方便我们使用相对路径.
#include <iostream>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <cstdio>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#define IPC_PATH "./fifo"
那么我们应该如何做呢,我们创建两个源文件,其中一个作为服务端,一个作为客户端.我们先来写服务端的代码,这个服务端它是创建命名管道,它主要的功能是读取内容,这里没有什么可以说的,都是我们前面的内容.
#include "comm.h"
int main()
{
umask(0);
if (mkfifo(IPC_PATH, 0666) < 0)
{
std::cerr << "mkfifo errno " << errno << std::endl;
return 1;
}
int fd = open(IPC_PATH, O_RDONLY);
if (fd < 0)
{
std::cerr << "openerrno " << errno << std::endl;
return 2;
}
// 正常的通信过程
#define NUM 1024
char buffer[NUM];
while (true)
{
memset(buffer, '\0', sizeof(buffer));
// 开始读
ssize_t s = read(fd, buffer, sizeof(buffer) - 1);
if (s == 0)
{
std::cout << "客户端推出了,我也推出了" << std::endl;
break;
}
else if (s > 0)
{
buffer[s] = '\0';
std::cout << "客户端发送了一条信息 -> " << buffer
<< std::endl;
}
else
{
assert(0);
}
}
close(fd);
std::cout << "服务端退出了" << std::endl;
return 0;
}
那么客户端,我们这里就是写文件,如果文件不存在吗,这里就直接报错.为了方便,我们把要写的内容手动输入.
#include "comm.h"
int main()
{
int fd = open(IPC_PATH, O_WRONLY);
if (fd < 0)
{
std::cerr << "open : " << errno << std::endl;
return 1;
}
#define NUM 1024
char line[NUM];
while (true)
{
std::cout << "请输入你的消息: ";
fflush(stdout);
memset(line, '\0', sizeof(line));
if (fgets(line, sizeof(line) - 1, stdin) != NULL)
{
// fgets 读取是 abc\n\0 他会把 '\n' 带上也会自动添加'\0' -- C语言接口
line[strlen(line) - 1] = '\0';
write(fd, line, strlen(line));
}
else
{
break;
}
}
close(fd);
std::cout << "客户端退出了" << std::endl;
return 0;
}
这样短短的百十行代码就完成了我们想要的,我们这里编译运行,注意,一定要先让服务端开始运行,他还负责创建命名管道的功能.
我们要谈谈一下,命名管道有一个特点,我们在创建一个进程之后,必须要有另外一一个进程打开这个文件,我们创建进程的代码才可以继续.我们测试一下.这是客户端的代码
int main()
{
printf("你是否要打开命名管道 1/0: ");
fflush(stdout);
int ret = 0;
scanf("%d", &ret);
if (ret == 1)
{
open(IPC_PATH, O_WRONLY);
}
return 0;
}
下面是服务端的代码
int main()
{
umask(0);
mkfifo(IPC_PATH, 0666);
std::cout << "----------------------" << std::endl;
int fd = open(IPC_PATH, O_RDONLY);
std::cout << "----------------------" << std::endl;
return 0;
}
system V
共享内存一般很少用到,它是不同于管道的另一套标准,还是比较难的,主要在于环境的搭建.我们这里重点谈如何共享内存的搭建等操作.后面看看能不能加上一些其他的内容,为后面的网络做一些铺垫.我们知道实现进程间的通信的前提是不同的进程看到同一份资源.我们在制作动态库中,谈到虚拟地址空间的一个共享区.这里就是我们今天共享内存的原理.
那么我们如何让不同的进程看到同一片资源呢我们这里要有两个步骤要做.第一点,我们首先要有一片资源,也就是我们要在物理空间上上面开辟一块资源,其次才是我们进程的虚拟地址空间通过页表映射到这片资源,这就是共享内存的原理.
那么这里就要引入今个动作了,我们要创建一块内存,那么就要删除这块内存,其中我们把进程关联到这个内存上,那么就要去关联.这就是哦们共享内存的环境准备,其中关于这四个操作,内核都给我们提供了接口.说实话这些接口使用的成本还是挺高的,不过不要担心,每一个我都会带着大家一步一步来.
shmget
我们先来看内存资源的开辟的动作,这里涉及的内容还是比较多的.
[bit@Qkj 12_12]$ man shmget
这个接口存在三个参数,我们先来看后面的两个.szie 是我们要开辟内存的大小,建议设置成页(4kb)的整数倍.我们假设内存是4gb,那么我们有多少个页呢?这里计算一下我们就可以知道,是220的,那么这么多页我们是不是要管理起来?请问我们我们如何管理,先描述在组织.也就是我们一定存在下面的一个结构体.
第三个参数是一个宏,这标志这我们创建共享内存如果不存在该如何办,内核提供了两个宏.
-
IPC_CREAT 创建共享内存,如果不存在就创建之,存在,就获取之
-
IPC_EXEL 如果存在了,就出错返回,不存在就创建它
这里我们就疑惑了,我感觉IPC_CREAT这个宏挺好的,不存在就创建,存在就获取,为何还要IPC_EXEL 这个宏呢?这是由于如果我们创建这个内存是存在的,假设我们只有IPC_CREAT,那么请问我们获取的是多大,说实话我也不知道.这里不对啊,你不是说size就是大小吗?是的,但是这是针对内存不存在时开辟的大小,入如果存在,这个size完全没有作用,它的大小肯定是实际存在的大小.此时我们就需要配合IPC_EXEL使用,它的存在使得如果这个函数调用成功,那么一定是一个创新的共享内存被创建.
上面我们说了一堆话,但是这里都有一个前提.这个共享内存存在哪里?我们又是如何知道这个属于存在还是不存在的?这里我们都要一一回答.
先来说共享内存存咋哪里?我直接给出答案,这里是存在内核中.我们这样解释,对于一个shell,这里会运行的上百的进程,假设每一进程都要相互同过共享内存进行通信,那么吗我们要不要把他管理起来呢?要的,此时我们内核中会存在一个结构体.这里我们看用户层的,就不去内核代码去找了.
上面的结构体存在很多的属性,有创建时间,挂接数…这里我们看第一个,我们看到一个shm_perm.我们发现这是一个结构体.我们继续看这个结构体.
多的不看,我们就关注第一个属性,这里我们就看看到了shmget接口的第一个参数.
int shmget(key_t key, size_t size, int shmflg);
我来正式解释一下这个参数.我们知道东西一多,我们就需要给它做标记,就像人的身份证,对于共享内存也是如此,这里的key就是共享内存的唯一标识符.这里就解决了第二个问题,我们是是如何知道这个共享内存属于存在还是不存在?进程通信的前提是让他们看到同一份资源,在共享内存中就是看到同一个key值.
ftok
那么为何这个key值是我们自己显式提供的?这里也很容易理解,我们是多个进程之间进行相互通信,如果是内核提供,其他的进程是如何看到这个key值\的?所以为了方便,我们用户自己提供.这里还有一个问题,理论key是自己写的,只要保证不冲突就可以了.我们是如何保证自己提供的key值是不重复的?这里就要看下面的接口,他会给我们生成与一个key值.
它的返回值成功,一个key值就被返回,失败 -1被返回,并且错误码被设置.我们看一下它的参数,第一个就是一个文件路径,第一个我们设置成0到255中的任意一个数就可以了.我们先来看看现象.先创建一个头文件,这包含我们要引用的头文件和一些宏.
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "Log.hpp"
#define PATH_NAME "/home/bit/104/2022/12_12"
#define PROJ_ID 0x14
#define MEM_SIZE 4096
#define FIFO_FILE ".fifo"
key_t CreateKey()
{
key_t key = ftok(PATH_NAME, PROJ_ID);
if(key < 0)
{
std::cerr <<"ftok: "<< strerror(errno) << std::endl;
exit(1);
}
return key;
}
我们直接测试一下,这个时候我们就拿到同一个key值了.
shmget
现在我们就可以创建共享内存了.我还是写两个进程,一个服务端,一个客户端,其中我们让服务端负责创建共享内存.为了便于我们打印日志,这里我们写一个小的日志函数.
#include <iostream>
#include <ctime>
std::ostream &Log()
{
std::cout << "Fot Debug |" << " timestamp: " << (uint64_t)time(nullptr) << " | ";
return std::cout;
}
现在开始写代码,这里我们将看到更多有趣的现象.
int main()
{
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
int shmid = shmget(key, MEM_SIZE, flags);
if (shmid < 0)
{
Log() << "shmget fali " << strerror(errno) << std::endl;
return 1;
}
Log() << "shmget success shmid " << shmid << std::endl;
return 0;
}
为何第二次运行的时候会报错?进程不是退出了吗?这里的共享内存是不是和文件一样,当进程退出后,自动关闭?不是的,进程退出了,不管共享内存的事,就像工人搭建的房子不是自己的,他的作用仅仅是搭建.对于共享内存而言也是一样的,我们的共享内存是不会随进程退出的,这里需要我们手动的退出,否则你只能重启服务器.
shmctl
现在来解决上面的问题,我们手动删除共享内存有两个方法,一个是指令.我们先来看这个.
[bit@Qkj 11_17]$ ipcs -m
这个指令是查看我们服务器有少共享内存被开辟,里面有很多属性,我们等会在谈,这里想来看如何删除,这里我们使用shmid来标识这个共享内存.
[bit@Qkj 11_17]$ ipcrm -m 0
我来解释一下上面ipcs指令出现得属性,有几个需要探讨一下.
- perms 这个是我们共享内存的额权限,在开辟得时候和IPC_CREATE配合传入就可以了.
- bytes 我们可以使用的最大字节数,如果我们申请4097个字节,OS会给两个页得大小,但是我们还是只能使用4087字节
- nattch 关联到该共享内存的进程数量
int main()
{
umask(0);
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
int shmid = shmget(key, 4097, flags|0666);
if (shmid < 0)
{
Log() << "shmget fali " << strerror(errno) << std::endl;
return 1;
}
Log() << "shmget success shmid " << shmid << std::endl;
return 0;
}
shmctl
感觉有点扯,我们还需要用命令去删共享内存,这里感觉是在是太麻烦了.这里内核给我们提供了有一个接口,这个接口可以修改该共享空间的属性,其中就有一个是销毁这个内存.我们只关注这个.
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
这个函数的参数是在是太简单的,第一个参数就是shmget的返回值,第二个参数是一个宏,这里我们直接写成IPC_RMID,就是立即删除的意思,第三个参数直接置为空指针.
int main()
{
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
int shmid = shmget(key, MEM_SIZE, flags);
if (shmid < 0)
{
Log() << "shmget fali " << strerror(errno) << std::endl;
return 1;
}
Log() << "shmget success shmid " << shmid << std::endl;
// 这里使用共享内存
shmctl(shmid, IPC_RMID, nullptr);
Log() << "shmctl success shmid " << shmid << std::endl;
return 0;
}
shmat
上面我们做的都是把共享内存给创建出来吗,这里谈一下如何把共享内存和当前进程关联起来,这里我们也是有接口的.
[bit@Qkj 12_12]$ man shmat
我们把参数说一下,第一个和上面一样,第二个我们不关心,直接置为nullptr.第三个是我们挂接到这个内存我们对这个内存该如何做,是读还是写?这里我们直接赋值0,默认是读写.这里最关键的是返回值,这个返回值就是和我们malloc一样,它的用法也是一样的.
int flags = IPC_CREAT | IPC_EXCL;
int main()
{
umask(8);
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
int shmid = shmget(key, MEM_SIZE, flags | 0666);
if (shmid < 0)
{
Log() << "shmget fali " << strerror(errno) << std::endl;
return 1;
}
Log() << "shmget success shmid " << shmid << std::endl;
// 这里使用共享内存
// 挂接
Log() << "shmat begin " << std::endl;
char *str = (char *)shmat(shmid, nullptr, 0);
sleep(5);
Log() << "shmctl begin " << std::endl;
shmctl(shmid, IPC_RMID, nullptr);
Log() << "shmctl success shmid " << shmid << std::endl;
sleep(5);
return 0;
}
shmdt
既然挂接进程了,那么我们也是可以去关联得,这里我们统一写一下,让后把现象给大家展示展示一下.
[bit@Qkj 12_12]$ man shmdt
它的参数就是我们关联函数得返回值.
int flags = IPC_CREAT | IPC_EXCL;
int main()
{
umask(0);
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
int shmid = shmget(key, MEM_SIZE, flags | 0666);
if (shmid < 0)
{
Log() << "shmget fali " << strerror(errno) << std::endl;
return 1;
}
Log() << "shmget success shmid " << shmid << std::endl;
// 这里使用共享内存
// 挂接
Log() << "shmat begin " << std::endl;
char *str = (char *)shmat(shmid, nullptr, 0);
sleep(5);
// 去关联
Log() << "shmdt begin " << std::endl;
shmdt(str);
sleep(5);
Log() << "shmctl begin " << std::endl;
shmctl(shmid, IPC_RMID, nullptr);
Log() << "shmctl success shmid " << shmid << std::endl;
sleep(5);
return 0;
}
这个时候把客户端个给大家写出来,我们知道客户端只需要获取资源,关联资源和去关联资源就可以了,不需要删除资源.我们可以容易想到这个资源得挂接数是1到2再到1最后到0.
int main()
{
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
Log() << "shmget begin" << std::endl;
// 仅仅是获取
int shmid = shmget(key, MEM_SIZE, IPC_CREAT);
// 挂接
Log() << "shmat begin" << std::endl;
char *str = (char *)shmat(shmid, nullptr, 0);
sleep(5);
Log() << "shmdt begin" << std::endl;
shmdt(str);
sleep(5);
return 0;
}
通信
上面我们说了这么多,本质上就是让不同得进程看到同一块资源,这里我们要用一下共享内存,现简单的看一下现象,后面我们会分析.
服务端
int flags = IPC_CREAT | IPC_EXCL;
int main()
{
umask(0);
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
int shmid = shmget(key, MEM_SIZE, flags | 0666);
if (shmid < 0)
{
Log() << "shmget fali " << strerror(errno) << std::endl;
return 1;
}
Log() << "shmget success shmid " << shmid << std::endl;
// 这里使用共享内存
// 挂接
Log() << "shmat begin " << std::endl;
char *str = (char *)shmat(shmid, nullptr, 0);
while (true)
{
printf(".%s\n", str);
sleep(1);
}
// 去关联
Log() << "shmdt begin " << std::endl;
shmdt(str);
Log() << "shmctl begin " << std::endl;
shmctl(shmid, IPC_RMID, nullptr);
Log() << "shmctl success shmid " << shmid << std::endl;;
return 0;
}
客户端
int main()
{
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
Log() << "shmget begin" << std::endl;
// 仅仅是获取
int shmid = shmget(key, MEM_SIZE, IPC_CREAT);
// 挂接
Log() << "shmat begin" << std::endl;
char *str = (char *)shmat(shmid, nullptr, 0);
int cnt = 0;
while (cnt < 26)
{
str[cnt] = 'A' + cnt;
cnt++;
str[cnt] = '\0';
sleep(5);
}
Log() << "shmdt begin" << std::endl;
shmdt(str);
return 0;
}
我这里发现我们在通信的时候没有使用read或者write等系统接口我们知道栈区以下是属于用户的,以上是属于内核的,这在虚拟地址空间那里和大家分析过.也就是共享内存映射到我们堆栈之间,这里是属于用户的,所以可以直接使用.下面我们手动输入一下我们想要的内容 .
int main()
{
key_t key = CreateKey();
Log() << "key: " << key << std::endl;
Log() << "shmget begin" << std::endl;
// 仅仅是获取
int shmid = shmget(key, MEM_SIZE, IPC_CREAT);
// 挂接
Log() << "shmat begin" << std::endl;
char *str = (char *)shmat(shmid, nullptr, 0);
while (true)
{
printf("请输入# ");
fflush(stdout);
ssize_t s = read(0, str, MEM_SIZE);
if (s == 0)
{
break;
}
}
Log() << "shmdt begin" << std::endl;
shmdt(str);
return 0;
}
我们结合上面的例子可以发现,我们的一条信息被多次打印,也就是对于共享内存,我们进程是不关心是否又有其他的进程挂接到这个共享内存,他只认为只要有数据我们就读,不关心其他的,所以共享内存是所有通信中速度最快的,但是这里又带来了一个缺点,进程不关心其他的进程做什么,换而言之,共享内存缺乏访问控制.
这里我们用管道来控制这个共享内存,我们管道的作用就是阻塞.当我们写入数据狗,我们向管道的发送数据,然后阻塞在read的进程得到后被唤醒,顺便把贡献内存的信息打印出来.
void CreateFifo()
{
umask(0);
if (mkfifo(FIFO_FILE, 0666) < 0)
{
Log() << strerror(errno) << "\n";
exit(2);
}
}
#define READER O_RDONLY
#define WRITER O_WRONLY
int Open(const std::string &filename, int flags)
{
return open(filename.c_str(), flags,0666);
}
int Wait(int fd)
{
uint32_t values = 0;
ssize_t s = read(fd, &values, sizeof(values));
return s;
}
int Signal(int fd)
{
uint32_t cmd = 1;
write(fd, &cmd, sizeof(cmd));
}
int Close(int fd, const std::string filename)
{
close(fd);
unlink(filename.c_str());
}
int main()
{
int fd = Open(FIFO_FILE, WRITER);
// 创建相同的key值
sleep(5);
key_t key = CreateKey();
Log() << "key: " << key << "\n";
// 获取共享内存
int shmid = shmget(key, MEM_SIZE, IPC_CREAT);
if (shmid < 0)
{
Log() << "shmget: " << strerror(errno) << "\n";
return 2;
}
// 挂接
char *str = (char *)shmat(shmid, nullptr, 0);
while (true)
{
printf("Please Enter# ");
fflush(stdout);
ssize_t s = read(0, str, MEM_SIZE);
printf("Please Enter#2 \n");
if (s > 0)
{
str[s] = '\0';
}
Signal(fd);
}
// 去关联
shmdt(str);
return 0;
}
const int flags = IPC_CREAT | IPC_EXCL;
int main()
{
CreateFifo();
int fd = Open(FIFO_FILE, READER);
assert(fd >= 0);
key_t key = CreateKey();
Log() << "key: " << key << "\n";
Log() << "create share memory begin\n";
int shmid = shmget(key, MEM_SIZE, flags | 0666);
if (shmid < 0)
{
Log() << "shmget: " << strerror(errno) << "\n";
return 2;
}
Log() << "create shm success, shmid: " << shmid << "\n";
// sleep(5);
// 1. 将共享内存和自己的进程产生关联attach
char *str = (char *)shmat(shmid, nullptr, 0);
Log() << "attach shm : " << shmid << " success\n";
// sleep(5);
// 用它
while (true)
{
// 让读端进行等待
if (Wait(fd) <= 0)
break;
printf("%s\n", str);
sleep(1);
}
// 2. 去关联
shmdt(str);
Log() << "detach shm : " << shmid << " success\n";
// sleep(5);
// 删它
shmctl(shmid, IPC_RMID, nullptr);
Log() << "delete shm : " << shmid << " success\n";
Close(fd, FIFO_FILE);
// sleep(5);
return 0;
}
信号量
到这里我们就已经谈完贡献内存的知识了,我们开始下一个模块,信号量.这里我只是开一个头,把概念先和大家说一下,主要是都留到线程那里会显得内容非常杂.我们先来些铺垫.
原子性
这里直接给出概念,对于一件事,要么你不做,要么做了你必须把他给做完,这种特性我们称之原子性.
临界资源 & 临界区
前面我们的共享内存可以被多个进程访问,管道也是.像这些可以被多个进程访问的资源我们称之为临界资源.其中我们把访问临界资源的代码叫做临界区,一般而言,临界区只占据代码的很少一部分.
互斥
对于与一个临界资源,我们在任何一个时刻只允许一个进程访问,这个特性叫做互斥.
信号量
什么是信号量?我举一个例子,在生活中我们有过去电影院看电影的经历,假设一个放映厅的座位只有100个,也就是我们只能够卖出100张票.我们是如何证明一个做位是我的呢?这里我们知道肯定是我们有一张票,这个票和我们的座位是对应的.此时我们为了抢座位那么一定就是过去抢票,对于票而言,如果余票大于0,我们就可以抢到,此时余票减减.如果余票等于0,我们就要等一会.这里例子我们就可以解释上面的内容和回答信号量是什么了.
由于放映厅可以被多个人进入,此时放映厅就是临界资源,而我们每一个人可以看作一个进程. 对于票数而言,我们100张票最多只够买到100张,那么我们是如何保证的呢?我们肯定需要一个计数器cnt,有人买票就减减,有人退票就加加.此时我们把这个计数器就叫做信号量.
假设有一个大佬包场子,也就h是他独自一人观看电影,我们把这个情况称之为互斥,同时信号量只能是0或者1,所以我们称之二元信号量.其他的情况我们统一称之为多元信号量.
现在我们开始第二个阶段,我们把看电影抢座位转换成了抢票,也就是下面的一句话任何人想要看电影,他必须先去申请cnt,也就是先看到计数器.同理对于一个进程而言,他想要访问临界资源,必须申请信号量,也就是每一个进程必须先看到这信号量.此时我们会惊奇的发现,这个信号量也是一个临界资源.此时我们要对信号量进行加加或者减减操作,就是访问临界资源.我们知道cnt作为一个变量是在内存中开辟空间的,而加加或者减减等操作是在CPU上完成的,我们把数据搬到CPU上再搬回来这个过程中执行该操作的进程有就可能会被切走,这里我们到线程那里分析,也就是我们买到票数有可能超过100,这是不合理的,所以我们要把信号量设置成原子的,也就是要么做,要么不做.此时我们更新一下信号量的定义 信号量事一个计数器,其对应的操作是原子的