目录
进程间通信目的
进程间通信分类
管道
System V IPC
POSIX IPC
什么是管道
站在文件描述符角度-深度理解管道
管道使用
管道通信的四种情况
管道通信的特点
进程池管理
命名管道
创建一个命名管道
命名管道的打开规则
命名管道通信实例
匿名管道与命名管道的区别
system V 共享内存
共享内存原理
共享内存函数
ftok函数
shmget函数
shmat函数
shmdt函数
shmctl函数
注意点
接口使用练习
system V 消息队列和信号量(仅仅了解)
消息队列
信号量
进程间通信目的
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变
进程间通信分类
进程之间通信的区域是由第三方提供的,需要被进程看到同一块公共资源,通信种类的本质就是公共资源是由哪个模块提供的。
管道
匿名管道pipe
命名管道
System V IPC
System V 消息队列
System V 共享内存
System V 信号量
POSIX IPC
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
什么是管道
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
管道文件是一个内存级文件(即只存在于内存中,没有对外设IO的过程)。
站在文件描述符角度-深度理解管道
管道使用
#include<cstdio>
#include<cassert>
#include<unistd.h>
#include<sys/types.h>
#include<stdlib.h>
#include<iostream>
#include<sys/wait.h>
#include<cstring>
using namespace std;
int main()
{
//1.创建管道,打开读写端
//pfd[o] 读端
//pfd[1] 写端
int pfd[2];
int n = pipe(pfd);
assert(n == 0);
//2.创建子进程
int id = fork();
assert(id >= 0);
if(id == 0)
{
//子进程程序
close(pfd[0]);
int cnt = 0;
char send[1024];
while(true)
{
snprintf(send, 1024,"子进程: %p send -> pipe count = %d",getpid(),cnt++);
write(pfd[1], send, strlen(send));
sleep(3);
}
exit(1);
}
//父进程程序
close(pfd[1]);
char receive[1024];
while(true)
{
int rn = read(pfd[0], receive, sizeof(receive) - 1);
receive[rn] = 0;
printf("receive : %s\n", receive);
}
waitpid(id, nullptr, 0);
return 0;
}
管道通信的四种情况
1.如果管道无数据,读端阻塞
2.如果管道文件写满了,写端阻塞
3.管道的写端都关闭,读端退出阻塞状态,返回读到0个字节
4.管道读端关闭,OS发送13信号杀死写端进程
管道通信的特点
1.只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
2.管道提供流式服务
3.一般而言,进程退出,管道释放,所以管道的生命周期随进程
4.一般而言,内核会对管道操作进行同步与互斥
5.管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
进程池管理
#include<cstdio>
#include<cassert>
#include<unistd.h>
#include<sys/types.h>
#include<stdlib.h>
#include<iostream>
#include<sys/wait.h>
#include<cstring>
#include<vector>
#include<ctime>
using namespace std;
#define SUB_NUM 5
//管理
struct subEp
{
subEp(int fd, pid_t pid)
:_writefd(fd)
,_pid(pid)
{}
int _writefd;
pid_t _pid;
};
typedef void(*func_p)();
void task0()
{
cout << "test0" << endl;
sleep(1);
}
void task1()
{
cout << "test1" << endl;
sleep(1);
}
void task2()
{
cout << "test2" << endl;
sleep(1);
}
void task3()
{
cout << "test3" << endl;
sleep(1);
}
void task4()
{
cout << "test4" << endl;
sleep(4);
}
//载入任务
void LoadTask(vector<func_p>& f)
{
f.push_back(task0);
f.push_back(task1);
f.push_back(task2);
f.push_back(task3);
f.push_back(task4);
}
int main()
{
//管理不同进程
vector<subEp> subs;
//载入任务
vector<func_p> funcs;
LoadTask(funcs);
//随机种子
srand(time(nullptr));
//创建子进程和管道
for(int i = 0; i < SUB_NUM; ++i)
{
int fd[2];
int pipe_ret = pipe(fd);
assert(pipe_ret == 0);
pid_t pid = fork();
assert(pid >= 0);
if(pid == 0)
{
close(fd[1]);
//子进程读取
int read_buffer;
while(true)
{
int rn = read(fd[0], &read_buffer, sizeof(read_buffer));
assert(rn == 4 || rn == 0);
if(rn == 0)
break;
if(read_buffer >= 0 && read_buffer < funcs.size())
{
cout << getpid() << " get masege -> " << endl;
funcs[read_buffer]();
}
else
{
cout << getpid() << " get error masege !!!!" << endl;
}
}
exit(1);
}
close(fd[0]);
subs.push_back(subEp(fd[1], pid));
}
//父进程发送
int cnt = 5;
while(cnt--)
{
int proc_code = rand() % subs.size();
int task_code = rand() % funcs.size();
int wn = write(subs[proc_code]._writefd, &task_code, sizeof(int));
assert(wn == 4);
sleep(2);
}
//关闭父进程写端
for(int i = 0; i < subs.size(); ++i)
{
close(subs[i]._writefd);
}
//接收释放子进程
cout << endl;
cout << endl;
for(int i = 0; i < subs.size(); ++i)
{
pid_t waitid = waitpid(subs[i]._pid, nullptr, 0);
assert(waitid > 0);
cout << waitid << " has been received success ......." << endl;
}
return 0;
}
但是上面的代码有一个小问题(虽然不影响使用):子进程继承了父进程的写入端
将代码创建子进程和管道部分改成下列这样就可以了:
//创建子进程和管道
std::vector<int> deleteFd;
for(int i = 0; i < SUB_NUM; ++i)
{
int fd[2];
int pipe_ret = pipe(fd);
assert(pipe_ret == 0);
pid_t pid = fork();
assert(pid >= 0);
if(pid == 0)
{
for(int i = 0; i < deleteFd.size(); i++) close(deleteFd[i]);
close(fd[1]);
//子进程读取
int read_buffer;
while(true)
{
int rn = read(fd[0], &read_buffer, sizeof(read_buffer));
assert(rn == 4 || rn == 0);
if(rn == 0)
break;
if(read_buffer >= 0 && read_buffer < funcs.size())
{
cout << getpid() << " get masege -> " << endl;
funcs[read_buffer]();
}
else
{
cout << getpid() << " get error masege !!!!" << endl;
}
}
exit(1);
}
close(fd[0]);
subs.push_back(subEp(fd[1], pid));
deleteFd.push_back(fd[1]);
}
命名管道
管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
命名管道是一种特殊类型的文件(管道文件)
创建一个命名管道
命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
mkfifo filename
命名管道也可以从程序里创建,相关函数有:
移除管道文件使用unlink函数:
命名管道的打开规则
如果当前打开操作是为读而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
命名管道通信实例
#include"comm.hpp"
int main()
{
//创建命名管道文件
umask(0);
int ret_fifo = mkfifo(FIFONAME, 0600);
assert(ret_fifo == 0);
//发送信息
int wfd = open(FIFONAME, O_WRONLY);
assert(wfd >= 0);
char write_buffer[1024];
while(true)
{
cout << "server -> client : ";
fgets(write_buffer, sizeof(write_buffer) - 1, stdin);
write_buffer[strlen(write_buffer) - 1] = 0;
ssize_t wr = write(wfd, write_buffer, strlen(write_buffer));
assert(wr >= 0);
}
close(wfd);
return 0;
}
#include"comm.hpp"
int main()
{
//创建命名管道文件
umask(0);
int ret_fifo = mkfifo(FIFONAME, 0600);
assert(ret_fifo == 0);
//发送信息
int wfd = open(FIFONAME, O_WRONLY);
assert(wfd >= 0);
char write_buffer[1024];
while(true)
{
cout << "server -> client : ";
fgets(write_buffer, sizeof(write_buffer) - 1, stdin);
write_buffer[strlen(write_buffer) - 1] = 0;
ssize_t wr = write(wfd, write_buffer, strlen(write_buffer));
assert(wr >= 0);
}
close(wfd);
return 0;
}
#include"comm.hpp"
int main()
{
//接收信息
int rfd = open(FIFONAME, O_RDONLY);
assert(rfd >= 0);
char read_buffer[1024];
while(true)
{
ssize_t rn = read(rfd, read_buffer, sizeof(read_buffer) - 1);
if(rn > 0)
{
read_buffer[rn] = 0;
cout << read_buffer << endl;
}
else if(rn == 0)
{
cout << " quit " << endl;
break;
}
else{
cout << " error " << endl;
break;
}
}
close(rfd);
//移除命名管道文件
unlink(FIFONAME);
return 0;
}
注意:如果一个管道文件已经存在,重复创建会失败。
匿名管道与命名管道的区别
匿名管道由pipe函数创建并打开。
命名管道由mkfifo函数创建,打开用open FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完 成之后,它们具有相同的语义。
system V 共享内存
共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据。
共享内存原理
原本进程之间是独立的,两个进程的虚拟地址空间是不能指向同一块物理内存的。但是进程通信的本质就是让不同的进程看到同一份资源,在共享内存中这个同一份资源指的就是内存。也就是通过调用系统接口,使得不同的进程的虚拟地址空间可以映射到同一块物理内存。
实现的步骤如下:
1.申请一块物理内存空间,使用shmget(shared memmery get)
2.将创建好的物理内存与进程的进程地址空间相映射(挂接),shmat(attach)
3.若是不想通信了
a.取消进程地址与共享内存的映射关系(去关联), shmdt
b.释放内存 , shmctl
共享内存函数
ftok函数
将路径和项目id生成一个独一无二的标识。
shmget函数
功能:用来创建共享内存
原型
int shmget(key_t key, size_t size, int shmflg);
参数
key:这个共享内存段名字
size:共享内存大小
shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
返回值:成功返回一个非负整数,即该共享内存段的标识码;失败返回-1
IPC_CREAT : 如果不存在则创建,如果存在则获取
IPC_EXCL :不能单独使用,IPC_CREAT | IPC_EXCL 如果不存在则创建,如果存在则返回错误码
shmat函数
功能:将共享内存段连接到进程地址空间
原型
void *shmat(int shmid, const void *shmaddr, int shmflg);
参数
shmid: 共享内存标识
shmaddr:指定连接的地址
shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY
返回值:成功返回一个指针,指向共享内存第一个节;失败返回-1
shmdt函数
功能:将共享内存段与当前进程脱离
原型
int shmdt(const void *shmaddr);
参数
shmaddr: 由shmat所返回的指针
返回值:成功返回0;失败返回-1 注意:将共享内存段与当前进程脱离不等于删除共享内存段
shmctl函数
功能:用于控制共享内存
原型
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
参数
shmid:由shmget返回的共享内存标识码
cmd:将要采取的动作(有三个可取值)
buf:指向一个保存着共享内存的模式状态和访问权限的数据结构
返回值:成功返回0;失败返回-1
注意点
1.共享内存也要先描述再组织,共享内存 = 物理内存块 + 共享内存相关属性(结构体储存,内含key)
2.key是为了保证创建的共享内存再OS中是唯一的。
3.shmid与key的关系犹如fd与inode一样,一个是用户接口使用,一个是OS中使用,且前一个指向的结构体中包含后一个。
4.命令行查看和删除如下:
5.要挂接,在创建时就要加上权限
6.共享内存的有点是减少了数据的拷贝次数,通信速度块。
7.共享内存的缺点是没有同步和互斥,没有对数据的保护。
接口使用练习
shm_com.hpp
#include<cstdio>
#include<iostream>
#include<unistd.h>
#include<sys/shm.h>
#include<sys/ipc.h>
#include<sys/types.h>
#include<cerrno>
using namespace std;
#define PATHNAME "."
#define MAXSIZE 4096
//获取key
key_t getKey()
{
key_t key = ftok(PATHNAME, 0x6666);
if(key == -1)
{
perror("getKey : ");
exit(1);
}
return key;
}
//创建共享内存
int creatShm(key_t key)
{
int shmid = shmget(key, MAXSIZE, IPC_CREAT | IPC_EXCL | 0666);
if(shmid == -1)
{
perror("creatShm :");
exit(2);
}
return shmid;
}
//获取共享内存
int getShm(key_t key)
{
int shmid = shmget(key, MAXSIZE, IPC_CREAT);
if(shmid == -1)
{
perror("getShm :");
exit(3);
}
return shmid;
}
//挂接
void* attachShm(int shmid)
{
void* n = shmat(shmid, nullptr, 0);
if(n == (void*)-1)
{
perror("attachShm :");
}
return n;
}
//去关联
void detachShm(const void* buffer)
{
int n = shmdt(buffer);
if(n == -1)
{
perror("attachShm :");
}
}
//释放内存
void delShm(int shmid)
{
if(-1 == shmctl(shmid, IPC_RMID, nullptr))
{
perror("delShm : ");
exit(4);
}
}
server.cc
#include"shm_com.hpp"
int main()
{
int key = getKey();
int shmid = creatShm(key);
void* shm_buffer = attachShm(shmid);
int cnt = 20;
while(--cnt)
{
sleep(2);
snprintf((char*)shm_buffer, MAXSIZE, "server -> client : pid : %d , cnt : %d\n", getpid(), cnt);
}
detachShm(shm_buffer);
delShm(shmid);
return 0;
}
client.cc
#include"shm_com.hpp"
int main()
{
int key = getKey();
int shmid = getShm(key);
void* shm_buffer = attachShm(shmid);
while(true)
{
sleep(2);
printf("%s", (char*) shm_buffer);
}
detachShm(shm_buffer);
return 0;
}
system V 消息队列和信号量(仅仅了解)
消息队列
消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值
特性方面 IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核
系统调用接口:
信号量
信号量的本质是计数器,通常用于表示公共资源中资源数目的多少。
信号量的意义是为了保护公共资源,防止数据不一致等问题。
公共资源: 被多人进程同时可以访问的资源
访问没有保护的公共资源: 数据不一致问题(并发访问冲突)要保护公共资源的逻辑推导:
为什么要让不同的进程看到同一份资源呢? 因为我想通信,进程间实现协同->但是进程有独立性 -> 为了让进程看到同一份资源 -> 提出方法 ->但引入了新的问题(数据不一致问题)->我们未来将被保护起来的公共资源: 临界资源(被保护的公共资源)资源(内存,文件,网络等)是要被使用的,如何被进程使用呢?一定是该进程有对应的代码来访问这部分临界资源:调用临界资源的代码属于临界区,访问独立资源的代码非临界区
信号量主要用于同步和互斥的,下面先来看看什么是同步和互斥。
如何保护公共资源: 互斥&&同步
原子性:要么不做,要做就做完,两态的这种情况
共享资源:
1.作为一个整体使用
2.划分成为一个一个的资源子部分
当我们想要某种资源的时候,我们可以进行预订
如果一个信号量初始值: 1
二元信号量 -- 互斥功能
所有的进程在访问公共资源之前,都必须先申请sem信号量 ->必须先申请sem信号量的前提,是所有进程必须先得看到同一个信号量 ->信号量本身就是公共资源 ->信号量是不是也要保证自己的安全呢? --,++(PV) ->信号量必须保证自身操作的安全性,--,++操作是原子态的!!
系统调用接口:
semget
semctl
semop