1. 进程间通信简介
进程间通信(Inter-Process Communication,IPC)是指不同进程之间交换数据的机制。由于进程具有独立的地址空间,它们无法直接访问彼此的数据,因此需要IPC机制来实现信息共享、数据传递或同步操作。
1.1 进程间通信的目的
数据交换:不同进程之间共享数据,提高系统的协作能力。
资源共享:多个进程可以访问共享内存或文件,提高资源利用率。
事件通知:当一个进程发生某种事件时,能够通知其他进程进行响应。
进程同步:有些进程希望完全控制另⼀个进程的执行(如Debug进程),此时控制进程希望能够拦截另⼀个进程的所有陷⼊和异常,并能够及时知道它的状态改变。
负载均衡:多个进程可以分担任务,提高系统性能。
1.2 进程间通信的分类
System V IPC 和 POSIX IPC 与管道(Pipe)是同级别的进程间通信(IPC)机制,但它们的使用方式和适用场景不同。管道属于 IPC 机制的一种,而 System V IPC 和 POSIX IPC 提供了更丰富的通信手段,如消息队列、共享内存和信号量。
1.21 管道(Pipe)
管道是一种 基于数据流的 IPC 机制,主要用于进程间的 单向通信,分为:
(1)匿名管道(Anonymous Pipe):只能用于父子进程之间的通信,使用 pipe()
创建。
(2)命名管道(FIFO, Named Pipe):允许无亲缘关系的进程通信,使用 mkfifo()
创建。
特点:
- 适用于简单的数据流通信。
- 有序、先进先出(FIFO) 方式传输数据。
- 单向通信,如果需要双向通信,需要创建两个管道。
1.2.2 System V IPC
System V IPC 提供了三种主要的进程间通信方式:
(1)消息队列(Message Queue):进程通过发送和接收消息进行通信,类似邮件系统。
(2)共享内存(Shared Memory):多个进程可以访问同一块内存区域,比管道和消息队列快,但需要同步机制。
(3)信号量(Semaphore):用于进程同步,通常用于控制对共享资源的访问。
特点:
- 需要手动管理 IPC 资源(创建、删除)。
- 适用于 长期运行的进程,如数据库、后台服务。
- 比管道更强大,但 API 更复杂。
1.2.3 POSIX IPC
POSIX IPC 是 System V IPC 的改进版,提供:
(1)POSIX 消息队列:类似 System V 消息队列,但支持非阻塞模式,管理更方便。
(2)POSIX 共享内存:基于文件系统,使用 shm_open()
进行管理。
(3)POSIX 信号量:比 System V 信号量更简单,支持命名信号量和无名信号量。
特点:
- 资源可以自动释放,避免 System V IPC 的手动管理问题。
- 更现代化,适用于 Linux、macOS 等系统。
- API 更易用,推荐用于新开发的 Linux 应用。
管道 vs. System V IPC vs. POSIX IPC
机制 | 适用场景 | 亲缘关系限制 | 数据传输方式 | 速度 | 是否需要同步 |
---|---|---|---|---|---|
匿名管道 | 父子进程通信 | 需要父子关系 | 字节流 | 慢 | 不需要 |
命名管道 | 任意进程通信 | 无亲缘关系要求 | 字节流 | 慢 | 不需要 |
System V 消息队列 | 结构化数据传输 | 无亲缘关系要求 | 消息(队列方式) | 中等 | 不需要 |
System V 共享内存 | 进程间高效共享数据 | 无亲缘关系要求 | 共享内存 | 最快 | 需要 |
System V 信号量 | 进程同步 | 无亲缘关系要求 | 计数器 | 快 | 需要 |
POSIX 消息队列 | 结构化数据传输 | 无亲缘关系要求 | 消息(队列方式) | 中等 | 不需要 |
POSIX 共享内存 | 进程间高效共享数据 | 无亲缘关系要求 | 共享内存 | 最快 | 需要 |
POSIX 信号量 | 进程同步 | 无亲缘关系要求 | 计数器 | 快 | 需要 |
总结
管道(Pipe)和 System V / POSIX IPC 都是 IPC 机制的一种,但它们适用于不同场景:
- 管道适用于简单的进程间数据流通信,特别是父子进程之间的通信。
- System V 和 POSIX IPC 适用于更复杂的进程通信需求,比如:
- 消息队列 适合结构化数据传输(比管道更灵活)。
- 共享内存 适合高效数据共享(比管道快)。
- 信号量 适合进程同步(配合共享内存使用)。
- POSIX IPC 是 System V IPC 的改进版,API 更简洁,资源管理更方便,推荐在现代 Linux 开发中使用。
2. 管道
管道是进程间通信(IPC)的一种方式,它允许一个进程将数据传输给另一个进程。管道在类Unix操作系统中尤其重要。管道有两种类型:匿名管道和命名管道。
-
匿名管道:匿名管道通常用于父子进程或兄弟进程之间的通信,它没有名称,因此只能在创建它的进程间使用。数据只能在一个方向上传递,从管道的一端写入数据,另一端读取数据。匿名管道的生命周期与父进程相关联。
-
命名管道(FIFO):命名管道有一个特定的名称,因此可以在不同的进程之间进行通信,而不仅限于父子进程。命名管道在文件系统中表现为一个特殊的文件,可以通过路径访问。这使得它可以跨进程、跨终端进行通信。
管道的工作原理是通过内核缓冲区进行数据传输,内核会确保进程间的数据不会相互干扰。管道是半双工的(数据只能单向流动),但通过创建两个管道可以实现全双工通信。
管道的优点是简单、快速,缺点是只能在有限的场景中使用,如通信双方需要具有亲缘关系(匿名管道)或访问权限(命名管道)。
3. 匿名管道
#include <unistd.h>
功能:创建⼀⽆名管道
原型
int pipe(int fd[2]);
参数
fd:⽂件描述符数组,其中fd[0]表⽰读端, fd[1]表⽰写端
返回值:成功返回0,失败返回错误代码
3.1 多视角理解管道
用fork来共享管道原理
#include <iostream>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
void ChildWrite(int wfd)
{
char buffer[1024];
int cnt = 0;
while(true)
{
snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);
write(wfd, buffer, strlen(buffer));
sleep(1);
}
}
void FatherRead(int rfd)
{
char buffer[1024];
while(true)
{
buffer[0] = 0;
int n = read(rfd, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "child say" << buffer << std::endl;
}
}
}
int main()
{
int fds[2] = {0};
int n = pipe(fds);
if(n < 0)
{
std::cerr << "pipe error" << std::endl;
}
std::cout << "fds[0]:" << fds[0] << std::endl;
std::cout << "fds[1]:" << fds[1] << std::endl;
//创建子进程
pid_t id = fork();
if(id == 0)
{
close(fds[0]);
ChildWrite(fds[1]);
close(fds[1]);
exit(0);
}
close(fds[1]);
FatherRead(fds[0]);
close(fds[0]);
wait(NULL);
return 0;
}
站在文件描述符角度-深度理解管道
站在内核角度-管道本质
3.2 管道的同步机制
(1)写慢,读块
读端阻塞进程(等写)
#include <iostream>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
void ChildWrite(int wfd)
{
char buffer[1024];
int cnt = 0;
while(true)
{
snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);
write(wfd, buffer, strlen(buffer));
}
}
void FatherRead(int rfd)
{
char buffer[1024];
while(true)
{
buffer[0] = 0;
int n = read(rfd, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "child say" << buffer << std::endl;
}
sleep(1);
}
}
int main()
{
int fds[2] = {0};
int n = pipe(fds);
if(n < 0)
{
std::cerr << "pipe error" << std::endl;
}
std::cout << "fds[0]:" << fds[0] << std::endl;
std::cout << "fds[1]:" << fds[1] << std::endl;
//创建子进程
pid_t id = fork();
if(id == 0)
{
close(fds[0]);
ChildWrite(fds[1]);
close(fds[1]);
exit(0);
}
close(fds[1]);
FatherRead(fds[0]);
close(fds[0]);
wait(NULL);
return 0;
}
(2)写快, 读慢
缓冲区满了的时候,写端要阻塞等待读端
void ChildWrite(int wfd)
{
char buffer[1024];
int cnt = 0;
while(true)
{
snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);
write(wfd, buffer, strlen(buffer));
sleep(1);
}
}
void FatherRead(int rfd)
{
char buffer[1024];
while(true)
{
buffer[0] = 0;
int n = read(rfd, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "child say" << buffer << std::endl;
}
}
}
(3)写关, 继续读
read读到返回值为0, 表示文件结尾
void ChildWrite(int wfd)
{
char buffer[1024];
int cnt = 0;
for (int i = 0; i < 5; ++i)
{
snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);
write(wfd, buffer, strlen(buffer));
sleep(1);
}
// 关闭写端
close(wfd);
}
void FatherRead(int rfd)
{
char buffer[1024];
while(true)
{
buffer[0] = 0;
int n = read(rfd, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "child say" << buffer << std::endl;
}
else if (n == 0)
{
std::cout << "Reached end of file (write end closed)" << std::endl;
break;
}
else
{
std::cerr << "read error" << std::endl;
break;
}
}
}
(4)读关,继续写
没有任何意义,OS不做没有意义的事,会杀掉进程
void ChildWrite(int wfd)
{
char buffer[1024];
int cnt = 0;
while(true)
{
snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);
write(wfd, buffer, strlen(buffer));
sleep(1);
}
// 关闭写端
close(wfd);
}
void FatherRead(int rfd)
{
char buffer[1024];
for(int i = 0; i < 3; i++)
{
buffer[0] = 0;
int n = read(rfd, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "child say" << buffer << std::endl;
}
}
close(rfd);
}
总结:
管道(Pipe)在进程间通信(IPC)中是一种常见的机制,其读写规则如下:
当没有数据可读时:
O_NONBLOCK 关闭:read
调用会阻塞,即进程暂停执行,直到有数据可读。
O_NONBLOCK 开启:read
调用返回 -1,errno
值为 EAGAIN
。
当管道满时:
O_NONBLOCK 关闭:write
调用会阻塞,直到有进程读取数据腾出空间。
O_NONBLOCK 开启:write
调用返回 -1,errno
值为 EAGAIN
。
如果所有管道写端对应的文件描述符被关闭,则 read
返回 0,表示读到文件结尾(EOF)。
如果所有管道读端对应的文件描述符被关闭,则 write
操作会产生 SIGPIPE
信号,进而可能导致 write
进程退出。
当写入的数据量不大于 PIPE_BUF
时,Linux 保证写入的原子性,即写入的数据不会与其他进程的写入操作交错。
当写入的数据量大于 PIPE_BUF
时,Linux 不再保证写入的原子性,可能会发生数据交错。
3.3 基于匿名管道-进程池
管道的容量
ProcessPool.hpp创建进程池处理任务
#ifndef __PROCESS__POOL__HPP__
#define __PROCESS__POOL__HPP__
#include <iostream>
#include <cstdlib>
#include <vector>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"
class Channel
{
public:
Channel(int fd, pid_t id) : _wfd(fd), _subid(id)
{
_name = "channel" + std::to_string(_wfd) + "-" + std::to_string(_subid);
}
void Send(int code)
{
int n = write(_wfd, &code, sizeof(code));
//void(n);
}
~Channel() {}
int getWfd() { return _wfd; }
pid_t getSubid() { return _subid; }
std::string getName() { return _name; }
void Close()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subid, nullptr, 0);
(void)rid;
}
private:
int _wfd;
pid_t _subid;
std::string _name;
};
class ChannelManager
{
public:
ChannelManager() :_next(0)
{}
~ChannelManager() {}
void InsertChannel(int wfd, pid_t subid)
{
_channels.emplace_back(wfd, subid);
}
Channel &Select()
{
auto &c = _channels[_next];
_next++;
_next %= _channels.size();
return c;
}
void Printchannel()
{
for(auto &channel : _channels)
{
std::cout << channel.getName() << std::endl;
}
}
void Closechannel()
{
for(int i = _channels.size() - 1; i >= 0; i--)
{
_channels[i].Close();
std::cout << "关闭:" << _channels[i].getName() << std::endl;
_channels[i].Wait();
std::cout << "回收:" << _channels[i].getName() << std::endl;
}
}
private:
std::vector<Channel> _channels;
int _next;
};
const int gdefaultnum = 5;
class ProcessPool
{
public:
ProcessPool(int num) : _process_num(num)
{
_tm.Register(PrintLog);
_tm.Register(Download);
_tm.Register(Upload);
}
~ProcessPool() {}
void Work(int rfd)
{
while(true)
{
int code = 0;
ssize_t n = read(rfd, &code, sizeof(code));
if(n > 0)
{
if(n != sizeof(code)) continue;
std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;
}
else if(n == 0)
{
std::cout << "子进程退出" << std::endl;
break;
}
else
{
std::cout << "读取错误" << std::endl;
break;
}
}
}
bool Create()
{
for (int i = 0; i < _process_num; i++)
{
// 1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false;
// 2. 创建见子进程
pid_t subid = fork();
if (subid < 0)
return false;
else if (subid == 0)
{
// 3. 关闭写进程
// 子进程
close(pipefd[1]);
Work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
else
{
// 3. 关闭读进程
// 父进程
close(pipefd[0]);
_cm.InsertChannel(pipefd[1], subid);
}
}
return true;
}
void Debug()
{
_cm.Printchannel();
}
void Run()
{
//1. 选择一个任务
int taskcode = _tm.Code();
//2. 选择一个信道,负载均衡的选择一个子进程,完成任务
Channel &c = _cm.Select();
std::cout << "选择了一个子进程:" << c.getName() << std::endl;
//2. 发送任务
c.Send(taskcode);
std::cout << "发送了一个任务码:" << taskcode << std::endl;
}
void Stop()
{
_cm.Closechannel();
}
private:
ChannelManager _cm;
int _process_num;
TaskManager _tm;
};
#endif
Task.hpp 创建进程池处理任务
#pragma once
#include <iostream>
#include <vector>
#include <ctime>
typedef void (*task_t)();
debug/
void PrintLog()
{
std::cout << "我是一个打印日志的任务" << std::endl;
}
void Download()
{
std::cout << "我是一个下载的任务" << std::endl;
}
void Upload()
{
std::cout << "我是一个上传的任务" << std::endl;
}
//
class TaskManager
{
public:
TaskManager()
{
srand(time(nullptr));
}
void Register(task_t t)
{
_tasks.push_back(t);
}
int Code()
{
return rand() % _tasks.size();
}
void Execute(int code)
{
if(code >= 0 && code < _tasks.size())
{
_tasks[code]();
}
}
~TaskManager()
{}
private:
std::vector<task_t> _tasks;
};
Main.cc
#include "ProcessPool.hpp"
int main()
{
ProcessPool pp(gdefaultnum);
pp.Create();
//pp.Debug();
//int task = 0;
int cnt = 10;
while(cnt--)
{
std::cout << cnt << std::endl;
pp.Run();
sleep(1);
}
pp.Stop();
return 0;
}
3.4 管道特点
(1)管道 只能用于具有共同祖先的进程(即具有亲缘关系的进程)之间的通信。通常,一个管道由一个进程创建,然后该进程调用 fork,此后父、子进程之间就可以通过该管道进行通信。
(2)管道提供 流式服务,数据按顺序传输,适用于字节流通信。
(3)管道的 生命周期随进程结束,通常进程退出后,管道会被释放。
(4)内核会对管道操作进行同步与互斥,保证数据读写的正确性。例如,多个进程写入时,写入不大于 PIPE_BUF
的数据会保持原子性。
(5)管道是 半双工的,即数据只能单向流动。如果需要双向通信,需要建立两个管道,分别用于两个方向的数据传输。
4. 命名管道
4.1 mkfifo
mkfifo 用于创建 命名管道(FIFO,First In First Out)。与普通管道(匿名管道)不同,命名管道可以用于 不具有亲缘关系的进程 之间的通信,因为它存在于文件系统中,可以由多个进程打开进行读写。
4.2 实现进程间通信
//comm.cpp
#pragma once
#include <iostream>
#include <string>
#define PATH "."
#define FIFONAME "fifo"
#define PATH "."
#define FILENAME "fifo"
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
class NameFifo
{
public:
NameFifo(const std::string &path, const std::string &name)
: _path(path), _name(name)
{
_fifoname = _path + "/" + _name;
int n = mkfifo(_fifoname.c_str(), 0666);
if(n == - 1)
{
std::cerr << "mkfifo error" << std::endl;
}
std::cout << "mkfifo success" << std::endl;
}
~NameFifo()
{
int n = unlink(_fifoname.c_str());
if(n == 0)
{
std::cout << "remove FIFO_FILE success" << std::endl;
}
else{
std::cerr << "remove FIFO_FILE failed" << std::endl;
}
}
private:
std::string _path;
std::string _name;
std::string _fifoname;
};
class Fileoper
{
public:
Fileoper(const std::string &path, const std::string &name)
: _path(path), _name(name), _fd(-1)
{
_fifoname = _path + "/" + _name;
}
void OpenForRead()
{
//打开,write方法中没有执行open的时候,就要在open内部进行阻塞
_fd = open(_fifoname.c_str(), O_RDONLY);
if(_fd < 0)
{
std::cerr << "open error" << std:: endl;
return;
}
std::cout << "open success" << std::endl;
}
void OpenForWrite()
{
_fd = open(_fifoname.c_str(), O_WRONLY);
if(_fd < 0)
{
std::cerr << "open fifo cerr" << std::endl;
return;
}
std::cout << "open fifo success" << std::endl;
}
void Write()
{
std::string message;
int cnt = 1;
pid_t id = getpid();
while(true)
{
std::cout << "Please Enter# ";
std::getline(std::cin, message);
message += (", message number: " + std::to_string(cnt++) + ", [" + std::to_string(id) + "]");
write(_fd, message.c_str(), message.size());
}
}
void Read()
{
while(true)
{
char buffer[1024];
int number = read(_fd, buffer, sizeof(buffer) - 1);
if(number > 0)
{
buffer[number] = 0;
std::cout << "Client say# " << buffer << std::endl;
}
else if(number == 0)
{
std::cout << "client quit" << number <<std::endl;
break;
}
else
{
std::cerr << "client error" << std::endl;
break;
}
}
}
void Close()
{
if(_fd > 0) close(_fd);
}
~Fileoper(){}
private:
std::string _path;
std::string _name;
std::string _fifoname;
int _fd;
};
//server.cc
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "comm.hpp"
int main()
{
//创建管道文件
NameFifo fifo(PATH, FIFONAME);
//文件操作
Fileoper readfile(PATH, FIFONAME);
readfile.OpenForRead();
readfile.Read();
readfile.Close();
return 0;
}
//client.cc
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "comm.hpp"
int main()
{
Fileoper writefifo(PATH, FIFONAME);
writefifo.OpenForWrite();
writefifo.Write();
writefifo.Close();
return 0;
}
//Makefile
.PHONY:all
all:client server
client::client.cc
g++ -o $@ $^ -std=c++11
server:server.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm client server