Linux进程通信
- 1.进程通信介绍
- 1.1进程间通信目的
- 1.2进程间通信发展
- 1.3进程间通信的具体分类
- 2.管道
- 2.1匿名管道
- 2.1.1代码实例
- 2.1.2 fork共享管道原理
- 2.1.3 管道的读写规则与特点
- 2.1.4 进程池
- 2.2 命名管道
- 2.2.1 命名管道的创建
- 2.2.2匿名管道与命名管道的区别
- 2.2.3代码实例
- 3.System V共享内存
- 3.1 共享内存数据结构
- 3.2 共享内存函数接口
- 3.3 共享内存代码实例
- 4.System V消息队列
- 5.System V信号量
1.进程通信介绍
1.1进程间通信目的
数据传输:一个进程需要将它的数据发送给另外一个进程
资源共享:多个进程之间共享同样的资源
通知事件:一个进程需要向另一个或者一组进程发送消息,通知它发生了某种事件(如进程终止时要通知父进程)
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
1.2进程间通信发展
管道-》System V进程间通信-》POSIX进程间通信
1.3进程间通信的具体分类
管道:
1.匿名管道pipe
2.命名管道
System V IPC:
1.System V 消息队列
2.System V 共享内存
3.System V 信号量
POSIX IPC
1.消息队列
2.共享内存
3.信号量
4.互斥量
5.条件变量
6.读写锁
2.管道
管道是Unix中最古老的进程间通信方式
从一个进程连接到另外一个进程的一个数据流称作一个管道
举例
who | wc -l
who命令显示当前登录的用户及其相关信息
| 是管道符,表示将 who 命令的输出传递给下一个命令
wc -l 命令计算输入的行数
然而整个命令的作用是输出当前登录用户的总数。
2.1匿名管道
#include<unistd.h>
//功能创建一个无名管道
int pipe(int fd[2]);
//参数
//fd:文件描述符数组,其中fd[0]表示读端,fd[1]表示写段
//返回值:成功返回0,失败返回错误代码
2.1.1代码实例
#include <iostream>
#include <cassert>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#define MAX 1024
using namespace std;
// a.管道的4种情况
// 1.正常情况,如果管道没有数据了,读端必须等待,直到有数据为止(写端写入数据了)
// 2.正常情况,如果管道被写满了,写段必须等待,直到有空间为止(读端读走数据)
// 3.写段关闭,读端一直读取,读端会读到read返回值为0,表示读到文件末尾
// 4.读端关闭,写段一直写入,os会杀掉写端进程,通过向目标进程发送SIGPIPE(13)信号,终止目标进程
// b.管道的五种特性
// 1.匿名管道,可以允许具有血缘关系的进程之间进行进程间通信,常用于父子,仅限于此
// 2.匿名管道,默认给读写端提供同步机制
// 3.面向字节流
// 4.管道的生命周期是随进程的
// 5.管道是单向通信的,半双工通信的一种特殊情况
int main()
{
// 第一步:创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n; // 防止编译器告警,意料之中用assert,意料之外用if
cout << "pipefd[0]:" << pipefd[0] << ",pipefd[1]:" << pipefd[1] << endl;
// 第二步:创建子进程
pid_t id = fork();
if (id < 0)
{
perror("fork");
return 1;
}
// 子写,父读
// 第三步:父子关闭不需要的fd,形成单向通信的管道
if (id == 0)
{
// if(fork()>0) exit(0);//这里是父孙进程可以进行通信
// child
close(pipefd[0]);
// w-只向管道写入,没有打印
int cnt = 0;
while (true)
{
// 这里是测试管道文件大小是多少,得出的结果是64KB
// char c='a';
// write(pipefd[1],&c,1);
// cnt++;
// cout<<"write....:"<<cnt<<endl;
char message[MAX];
snprintf(message, sizeof(message), "hello father, I am child, pid: %d, cnt: %d", getpid(), cnt);
cnt++;
write(pipefd[1], &message, strlen(message));
sleep(1);
}
cout << "child close w piont" << endl;
// close(pipefd[1]);//进程退出会自动关闭文件描述符
exit(0);
}
close(pipefd[1]);
// r
char buffer[MAX];
while (true)
{
ssize_t n = read(pipefd[0], buffer, strlen(buffer) - 1);
if (n > 0)
{
buffer[n] = 0; // ‘\0’,当作字符串
cout << getpid() << ", " << "child say: " << buffer << "to me!" << endl;
}
else if (n == 0)
{
cout << "child quit, me too !" << endl;
break;
}
cout << "father return val(n): " << n << endl;
sleep(1);
break;
}
cout << "read point close" << endl;
close(pipefd[0]);
sleep(5);
int status = 0;
pid_t rid = waitpid(id, &status, 0);
if (rid == id)
{
cout << "wait success,child exit sig: " << (status & 0x7F) << endl;
}
return 0;
}
2.1.2 fork共享管道原理
2.1.3 管道的读写规则与特点
a.管道的4种情况
1.正常情况,如果管道没有数据了,读端必须等待,直到有数据为止(写端写入数据了)
2.正常情况,如果管道被写满了,写段必须等待,直到有空间为止(读端读走数据)
3.写段关闭,读端一直读取,读端会读到read返回值为0,表示读到文件末尾
4.读端关闭,写段一直写入,os会杀掉写端进程,通过向目标进程发送SIGPIPE(13)信号,终止目标进程
b.管道的五种特性
1.匿名管道,可以允许具有血缘关系的进程之间进行进程间通信,常用于父子,仅限于此
2.匿名管道,默认给读写端提供同步机制
3.面向字节流
4.管道的生命周期是随进程的
5.管道是单向通信的,半双工通信的一种特殊情况
2.1.4 进程池
//ProcessPool.cc
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"
const int num = 5;
static int number = 1;
class channel
{
public:
channel(int fd, pid_t id) : ctrlfd(fd), workerid(id)
{
name = "channel-" + std::to_string(number++);
}
public:
int ctrlfd;
pid_t workerid;
std::string name;
};
void Work()
{
while (true)
{
int code = 0;
while (true)
{
int code = 0;
ssize_t n = read(0, &code, sizeof(code));
if (n == sizeof(code))
{
if (!init.CheckSafe(code))
{
continue;
}
init.RunTask(code);
}
else if (n == 0) // 这里是写端退出
{
break;
}
else
{
// 这里是出错处理暂不处理
// do nothing
}
}
}
std::cout << "child quit" << std::endl;
}
void PrintFd(const std::vector<int> &fds)
{
std::cout << getpid() << "close fds: ";
for (auto fd : fds)
{
std::cout << fd << " ";
}
std::cout << std::endl;
}
// 传参形式:
// 1.输入函数:const &
// 2.输出参数: *
// 3.输入输出参数:&
void CreateChannels(std::vector<channel> *c)
{
std::vector<int> old;
for (int i = 0; i < num; i++)
{
// 1.定义并创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n;
// 2.创建进程
pid_t id = fork();
assert(id != -1);
// 3.构建单向通信信道
if (id == 0)
{
if (!old.empty())
{
for (auto fd : old)
{
close(fd);
}
PrintFd(old);
}
close(pipefd[1]);
dup2(pipefd[0], 0);
Work();
exit(0); // 会自动关闭自己打开的所有的Fd
}
// father
close(pipefd[0]);
c->push_back(channel(pipefd[1], id));
old.push_back(pipefd[1]);
// childid,pipefd[1]
}
}
void PrintDebug(const std::vector<channel> &c)
{
for (const auto &channel : c)
{
std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl;
}
}
void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{
int pos = 0;
while (true)
{
// 1.选择任务
int command = init.SelectTask();
// 2.选择信道(进程)
const auto &channel = c[pos++];
pos %= c.size();
// debug
std::cout << "send command " << init.ToDesc(command) << "[" << command << "]"
<< " in "
<< channel.name << " worker is : " << channel.workerid << std::endl;
// 3.发送任务
write(channel.ctrlfd, &command, sizeof(command));
// 4.判断是否要退出
if (!flag)
{
num--;
if (num <= 0)
{
break;
}
}
sleep(1);
}
std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannels(std::vector<channel> c)
{
// version 2
// int num = c.size() - 1;
// for (; num >= 0; num--)
// {
// close(c[num].ctrlfd);
// waitpid(c[num].workerid, nullptr, 0);
// }
// version 1
for (const auto &channel : c)
{
close(channel.ctrlfd);
waitpid(channel.workerid, nullptr, 0);
}
// for (const auto &channel : c)
// {
// pid_t rid = waitpid(channel.workerid, nullptr, 0);
// if (rid == channel.workerid)
// {
// std::cout << "wait child: " << channel.workerid << " success" << std::endl;
// }
// }
}
int main()
{
std::vector<channel> channels;
// 1.创建信道,创建进程
CreateChannels(&channels);
// 2.开始发送任务
const bool g_alway_loop = true;
// SendCommand(channels,g_alway_loop);
SendCommand(channels, !g_alway_loop, 10);
// 3.回收资源,想让子进程退出,并且释放管道,只要关闭写端
ReleaseChannels(channels);
return 0;
}
//Task.hpp
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
// using task_t =std::function<void()>;
typedef std::function<void()> task_t;
void Download()
{
std::cout << "我是一个下载任务" << "处理者:" << getpid() << std::endl;
}
void PrintLog()
{
std::cout << "我是一个打印日志的任务" << "处理者:" << getpid() << std::endl;
}
void PushVideoStream()
{
std::cout << "这是一个推送视频流的任务" << "处理者" << getpid() << std::endl;
}
class Init
{
public:
// 任务码
const static int g_download_code = 0;
const static int g_printlog_code = 1;
const static int g_push_videostream_code = 2;
// 任务集合
std::vector<task_t> tasks;
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(PrintLog);
tasks.push_back(PushVideoStream);
srand(time(nullptr) ^ getpid());
}
bool CheckSafe(int code)
{
if (code >= 0 && code < tasks.size())
{
return true;
}
else
{
return false;
}
}
void RunTask(int code)
{
return tasks[code]();
}
int SelectTask()
{
return rand() % tasks.size();
}
std::string ToDesc(int code)
{
switch (code)
{
case g_download_code:
return "Download";
case g_printlog_code:
return "PrintLog";
case g_push_videostream_code:
return "PushVideoStream";
default:
return "Unknow";
}
}
};
Init init;
注意循环创建信道时,子进程的文件描述符表拷贝父进程的,会导致信道被多个文件描述符指向,在释放文件描述符的时候要尤为注意,下面把代码单独拧出来,以便思考
void CreateChannels(std::vector<channel> *c)
{
std::vector<int> old;
for (int i = 0; i < num; i++)
{
// 1.定义并创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n;
// 2.创建进程
pid_t id = fork();
assert(id != -1);
// 3.构建单向通信信道
if (id == 0)
{
if (!old.empty())
{
for (auto fd : old)
{
close(fd);
}
PrintFd(old);
}
close(pipefd[1]);
dup2(pipefd[0], 0);
Work();
exit(0); // 会自动关闭自己打开的所有的Fd
}
// father
close(pipefd[0]);
c->push_back(channel(pipefd[1], id));
old.push_back(pipefd[1]);
// childid,pipefd[1]
}
}
void ReleaseChannels(std::vector<channel> c)
{
// version 2
// int num = c.size() - 1;
// for (; num >= 0; num--)
// {
// close(c[num].ctrlfd);
// waitpid(c[num].workerid, nullptr, 0);
// }
// version 1
for (const auto &channel : c)
{
close(channel.ctrlfd);
waitpid(channel.workerid, nullptr, 0);
}
// for (const auto &channel : c)
// {
// pid_t rid = waitpid(channel.workerid, nullptr, 0);
// if (rid == channel.workerid)
// {
// std::cout << "wait child: " << channel.workerid << " success" << std::endl;
// }
// }
}
2.2 命名管道
匿名管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
命名管道是一种特殊类型的文件
2.2.1 命名管道的创建
1.命名管道可以从命令行上创建
mkfifo filename
2.命名管道从程序中创建
int mkfifo(const char* filename,mode_t mode);
2.2.2匿名管道与命名管道的区别
匿名管道由pipe函数创建并打开
命名管道由mkfifo函数创建,打开用open
FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,他们具有相同的语义
2.2.3代码实例
//comm.h
#pragma once
#define FILENAME "fifo"
//Makefile
.PHONY:all
all:server cilent
server:server.cc
g++ -o $@ $^ -std=c++11
cilent:cilent.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f server cilent fifo
//server.cc
#include <iostream>
#include <cstring>
#include <cerrno>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "comm.h"
bool MakeFifo()
{
int fd = mkfifo(FILENAME, 0666);
if (fd < 0)
{
std::cerr << "errno: " << errno << ", errstring: " << strerror(errno) << std::endl;
return false;
}
std::cout << "mkfifo success... read" << std::endl;
return true;
}
int main()
{
Start:
int rfd = open(FILENAME, O_RDONLY);
if (rfd < 0)
{
std::cerr << "errno: " << errno << ",errstring: " << strerror(errno) << std::endl;
if (MakeFifo())
{
goto Start;
}
else
{
return 1;
}
}
std::cout << "open fifo success..." << std::endl;
char buffer[1024];
while (true)
{
ssize_t s = read(rfd, &buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
std::cout << "Client say# " << buffer << std::endl;
}
else if (s == 0)
{
std::cout << "Client quit, server quit too!" << std::endl;
break;
}
}
close(rfd);
std::cout << "close fifo sucess..." << std::endl;
return 0;
}
//cilent.cc
#include <iostream>
#include <cstring>
#include <cerrno>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "comm.h"
int main()
{
int wfd = open(FILENAME, O_WRONLY);
if (wfd < 0)
{
std::cerr << "errno: " << errno << ", errstring: " << strerror(errno) << std::endl;
return 1;
}
std::string message;
while (true)
{
std::cout << "Please Enter# ";
std::getline(std::cin, message);
ssize_t s = write(wfd, message.c_str(), message.size());
if (s < 0)
{
std::cerr << "errno: " << errno << ", errstring: " << strerror(errno) << std::endl;
break;
}
}
close(wfd);
std::cout << "close fifo success..." << std::endl;
return 0;
}
3.System V共享内存
进程通信的前提:必须让不同的进程看到同一份资源(必须由OS提供)
OS会允许系统中同时存在多个共享内存,先描述,在组织,对共享内存进行管理,进程间是通过一个提前约定好的标识看到同一个共享内存的
3.1 共享内存数据结构
3.2 共享内存函数接口
shmget函数
功能:用来创建共享内存
原型:int shmget(key_t key,size_t size,int shmflg);
参数:
key:提前约定好的一个钥匙,通常是路径加数字转换而来的
size:共享内存大小
shmflg:用法和创建文件使用的mode模式标识是一样的
返回值:
成功返回一个非负整数,即共享内存的标识码;失败返回-1
shmat函数
功能:将共享内存段连接到进程地址空间
原型:void* shmat(int shmid,const void* shmaddr,int shmflg);
参数:
shmid:共享内存标识
shmaddr:指定连接的地址
shmflg:两个取值:SHM_RND和SHM_RDONLY
返回值:成功返回一个指针,指针指向共享内存首地址;失败返回-1
shmaddr为NULL,核心自动选择一个地址
shmdt函数
功能:将共享内存段与当前进程脱离
原型:int shmdt(const void* shmaddr);
参数:shmaddr:由shmat所返回的指针
返回值:成功返回0,失败返回-1
注意:将共享内存段与当前进程脱离,但不等于删除共享内存段
shmctl函数
功能:用于控制共享内存
原型:int shmctl(int shmid,int cmd,struct shmid_ds* buf);
参数:
shmid:由shmget返回的共享内存标识码
cmd:将要采取的动作(三个取值)
buf:指向一个保存着共享内存的模式状态和访问权限的数据结构
返回值:成功返回0;失败返回-1
命令 | 说明 |
---|---|
IPC_STAT | 把shmid_ds结构中的数据设置为共享内存的当前关联值 |
IPC_SET | 在进程有足够权限的前提下,把共享内存的当前关联值设置为shmid_ds数据结构中给出的值 |
IPC_RMID | 删除共享内存段 |
3.3 共享内存代码实例
//Makefile
.PHONY:all
all:server client
server:server.cc
g++ -o $@ $^ -std=c++11
client:client.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f server client fifo
//comm.hpp
#pragma once
#include <iostream>
#include <cstdlib>
#include <string>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include <sys/shm.h>
const std::string pathname = "/home/whb/pipe_fifo_shm";
const int proj_id = 0x11223344;
const std::string filename = "fifo";
// 共享内存的大小,强烈建议设置为n*4096
const int size = 4096;
key_t GetKey()
{
key_t key = ftok(pathname.c_str(), proj_id);
if (key < 0)
{
std::cerr << "errno: " << errno << ",errstring: " << strerror(errno) << std::endl;
exit(1);
}
return key;
}
std::string ToHex(int id)
{
char buffer[1024];
snprintf(buffer, sizeof(buffer), "0x%x", id);
return buffer;
}
int CreateShmHelper(key_t key, int flag)
{
int shmid = shmget(key, size, flag);
if (shmid < 0)
{
std::cerr << "errno: " << errno << ", errstring: " << strerror(errno) << std::endl;
exit(2);
}
return shmid;
}
int CreateShm(key_t key)
{
return CreateShmHelper(key, IPC_CREAT | IPC_EXCL | 0644);
}
int GetShm(key_t key)
{
return CreateShmHelper(key, IPC_CREAT);
}
bool MakeFifo()
{
int n = mkfifo(filename.c_str(), 0666);
if (n < 0)
{
std::cerr << "errno: " << errno << ", errstring: " << strerror(errno) << std::endl;
return false;
}
std::cout << "mafifo success... read" << std::endl;
return true;
}
//server.cc
#include <iostream>
#include <cstring>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <unistd.h>
#include "comm.hpp"
class Init
{
public:
Init()
{
// bool r=MakeFifo();
// if(!r)
// return;
key_t key = GetKey();
std::cout << "key : " << ToHex(key) << std::endl;
// sleep(3);
// key vs shmid
// shmid:应用这个共享内存的时候,我们使用shmid来进行操作共享内存, FILE*
// key:不要在应用层使用,只用来在内核中标识shm的唯一性!, fd
shmid = CreateShm(key);
std::cout << "shmid: " << shmid << std::endl;
// sleep(10);
std::cout << "开始将shm映射到进程的地址空间中" << std::endl;
s = (char *)shmat(shmid, nullptr, 0);
// fd=open(filename.c_str(),O_RDONLY);
}
~Init()
{
// sleep(5);
shmdt(s);
std::cout << "开始将shm从进程地址空间中移除" << std::endl;
// sleep(5);
shmctl(shmid, IPC_RMID, nullptr);
std::cout << "开始将shm从os中删除" << std::endl;
// close(fd);
// unlink(filename.c_str());
}
public:
int shmid;
int fd;
char *s;
};
int main()
{
key_t key = GetKey();
// int msgid = msgget(key, IPC_CREAT | IPC_EXCL);
// std::cout << "msgid: " << msgid << std::endl;
// struct msqid_ds ds;
// msgctl(msgid, IPC_STAT, &ds);
// std::cout << ds.msg_qbytes << std::endl;
// std::cout << ToHex(ds.msg_perm.__key) << std::endl;
// sleep(10);
// int semid = semget(key, 1, IPC_CREAT | IPC_EXCL);
// std::cout << "semid: " << semid << std::endl;
// sleep(4);
// semctl(semid, 1, IPC_RMID);
// msgctl(msgid,IPC_RMID,nullptr);
Init init;
struct shmid_ds ds;
shmctl(init.shmid, IPC_STAT, &ds);
std::cout << ToHex(ds.shm_perm.__key) << std::endl;
std::cout << ds.shm_segsz << std::endl;
std::cout << ds.shm_segsz << std::endl;
std::cout << ds.shm_segsz << std::endl;
std::cout << ds.shm_atime << std::endl;
std::cout << ds.shm_nattch << std::endl;
sleep(5);
// TODO
while (true)
{
// wait
int code = 0;
ssize_t n = read(init.fd, &code, sizeof(code));
if (n > 0)
{
// 直接读取
std::cout << "共享内存的内容: " << init.s << std::endl;
sleep(1);
}
else if (n == 0)
{
break;
}
}
sleep(10);
return 0;
}
//client.cc
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include "comm.hpp"
int main()
{
key_t key = GetKey();
int shmid = GetShm(key);
char *s = (char *)shmat(shmid, nullptr, 0);
std::cout << "attach shm done" << std::endl;
int fd = open(filename.c_str(), O_WRONLY);
// sleep(10);
// TODO
// 共享内存的通信方式,不会提供同步机制,共享内存是直接裸露给所有使用者的,一定要注意共享内存的使用安全问题
//
char c = 'a';
for (; c <= 'z'; c++)
{
s[c - 'a'] = c;
std::cout << "write : " << c << "done" << std::endl;
sleep(1);
// 通知对方
int code = 1;
write(fd, &code, sizeof(4));
}
shmdt(s);
std::cout << "detach shm done" << std::endl;
close(fd);
return 0;
}
4.System V消息队列
消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值
特性方面
IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核
5.System V信号量
信号量的本质是一个计数器
为了让进程间通信-》多个执行流看到的同一份资源,公共资源-》并发访问-》数据不一致的问题-》保护起来-》互斥和同步
互斥:任何一个时刻只允许一个执行流(进程)访问公共资源,加锁完成
同步:多个执行流执行的时候,按照一定的顺序执行
被保护起来的公共资源,临界资源
访问该临界资源的代码,我们叫做临界区
而维护临界资源,其实就是维护临界区