目录
一、进程通信介绍
1.目的
2.发展
3.进程通信是什么,怎么通信?
二、管道
1.介绍
2.匿名管道
1.单向通信管道原理
2.代码实现
3.管道特征
4.管道的四种情况
5.管道的应用场景
使用管道实现一个简易版本的进程池
3.命名管道
1.思考
2.创建一个命名管道
3.匿名管道与命名管道的区别
4.命名管道的打开规则
4.日志
日志等级:
日式时间相关函数
日志代码实现
5.总结
三、system V共享内存
1.原理
2.代码书写
1.相关函数
1.shmget
返回值:
key:
size:
shmfig:
2.shmat
3.shmdt
4.shmctl
2.代码
3.共享内存特性
一、进程通信介绍
1.目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
2.发展
- 管道
- 匿名管道pipe
- 命名管道
简单经典的通信使用的一种方式
- System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
单独设计了一套接口,与文件无关。 只能本地使用,本地通信,在网络阶段,有很多替代方案。
- POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
网络和多线程时使用
3.进程通信是什么,怎么通信?
1.是什么?
两个或多个进程实现数据层面的交互。
因为进程独立性的存在,进程通信的成本较高 -> 进程通信是有成本的
2.怎么办?
- 进程间通信的本质:必须让不同的进程看到同一份"资源"
- "资源"?:特定形式的内存空间
- 这个"资源"谁提供?一般是操作系统
- 为什么不是我们两个进程中的一个呢?假设一个进程提供,这个资源属于谁?这个进程独有,破环进程独立性。来自第三方空间
- 我们进程访问这个空间,进行通信,本质就是访问操作系统!进程代表的就是用户,"资源"从创建,使用,释放 --- 出自系统调用接口! --- 1.从底层设计,从接口设计,都要由操作系统独立设计;2.一般操作系统,会有一个独立的通信模块 -- 隶属于文件系统 -- IPC通信模块定制标准 -- 进程间通信是有标准的 -- 就是上述的system V(本机内部) 和 POSIX(网络通信)
二、管道
1.介绍
基于文件级别的进程通信方式
- 管道是Unix中最古老的进程间通信的形式。
- 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
2.匿名管道
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
1.单向通信管道原理
2.代码实现
创建管道函数 pipe
#include <iostream>
#include <string>
#include <cstdlib> //stdlib.h
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
using namespace std;
#define N 2
#define NUM 1024
//child
void Writer(const int& wfd)
{
string str = "hello, i am child!";
pid_t self = getpid();
int number = 0;
char buffer[NUM];
while(true)
{
//构建发送字符串
buffer[0] = 0; //字符串清空,只是为了提醒阅读代码的人,把这个字符数组当作字符串了。
snprintf(buffer, sizeof(buffer), "%s-%d-%d", str.c_str(), self, number++);
//发送/写入给父进程
write(wfd, buffer, strlen(buffer));
sleep(1);
}
}
//father
void Reader(const int& rfd)
{
char buffer[NUM];
while(true)
{
ssize_t n = read(rfd, buffer, sizeof(buffer));
if(n > 0)
{
buffer[n] = 0; // 0 == '\0'
cout << "father get some message[" << getpid() << "]:" << buffer << endl;
}
}
}
int main()
{
int pipefd[N] = {0}; //输出型参数
int n = pipe(pipefd); //申请管道
if(n < 0)
return -1;
// cout << "pipefd[0]:" << pipefd[0] << ",pipefd[1]" << pipefd[1] << endl;
// father -> r ; child -> w;
pid_t id = fork(); //创建子进程
if(id < 0)
return 2;
if(id == 0)
{
//child
close(pipefd[0]);
//IPC code
Writer(pipefd[1]);
close(pipefd[1]);
exit(0);
}
//father
close(pipefd[1]);
// IPC code
Reader(pipefd[0]);
close(pipefd[0]);
return 0;
}
3.管道特征
- 具有血缘关系的进程会进行进程间通信
- 管道只能单向通信
- 父子进程是会进程协同的,同步和互斥的 --- 保护管道文件的数据安全
- 管道是面向字节流的。
- 管道是基于文件的,而文件的生命周期是跟随进程的
4.管道的四种情况
- 读写端正常,管道如果为空,读端就要阻塞
- 读写端正常,管道如果被写满,写端就要阻塞
- 读端正常读,写端关闭,读端就会读到0,表面读到了文件(pipe)结尾,不会被阻塞
- 读端关闭,写端正常写,操作系统就要杀掉正在写入的进程。如何杀掉--通过信号杀掉
5.管道的应用场景
使用管道实现一个简易版本的进程池
原由:创建进程需要调用fork函数,而fork函数这个系统调用是有成本的!
"Task.hpp"
#pragma once
#include <iostream>
#include <vector>
using namespace std;
//函数指针
typedef void (*task_t)();
void task1()
{
std::cout << "lol : 刷新野怪" << std::endl;
}
void task2()
{
std::cout << "lol : 刷新蓝条" << std::endl;
}
void task3()
{
std::cout << "lol : 刷新血量" << std::endl;
}
void task4()
{
std::cout << "lol : 更新系统" << std::endl;
}
void LoadTasks(std::vector<task_t> *tasks)
{
tasks->push_back(task1);
tasks->push_back(task2);
tasks->push_back(task3);
tasks->push_back(task4);
}
"ProcessPool.cc"
#include "Task.hpp"
#include <string>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/wait.h>
const int ProcessNum = 10;
std::vector<task_t> tasks;
//先描述 -- 管道
struct channel
{
int _cmdfd; //发送任务的文件描述符
pid_t _slaverid; //子进程的pid
std::string _processname; //子进程的名字,方便我们打印日志
channel(int cmdfd, pid_t slaverid, const std::string &processname)
: _cmdfd(cmdfd)
, _slaverid(slaverid)
, _processname(processname)
{}
};
void slaver()
{
while(true)
{
int cmdcode = 0;
int n = read(0, &cmdcode, sizeof(int));
if(n == sizeof(int))
{
std::cout << " child get a command: " << getpid() << " cmdcode: " << cmdcode << std::endl;
if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
}
if(!n)
break;
}
}
void InitProcessPool(std::vector<channel> *channels)
{
std::vector<int> oldfd;
// 1.初始化 --- bug
for (size_t i = 0; i < ProcessNum; ++i)
{
int pipefd[2]; //临时空间
int n = pipe(pipefd); //
if(n != 0)
{
perror("pipe create file");
return;
}
pid_t id = fork();
if(id == 0)
{
std::cout << "child " << getpid() << " close history fd :";
for (auto &e : oldfd)
{
std::cout << e << " ";
close(e);
}
std::cout << std::endl;
// child
close(pipefd[1]);
dup2(pipefd[0], 0);
close(pipefd[0]);
slaver();
std::cout << "process " << getpid() << ":quit!" << std::endl;
exit(0);
}
//father
close(pipefd[0]);
//开始添加channel字段
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfd.push_back(pipefd[1]);
sleep(1);
}
}
void Debug(const std::vector<channel> &channels)
{
// test
for(const auto& e:channels)
{
std::cout << "pid: " << getpid() << " " << e._cmdfd << " " << e._slaverid << " " << e._processname << std::endl;
}
}
void Menu()
{
std::cout << "############################################" << std::endl;
std::cout << "########1.刷新野怪 2.刷新蓝条 #############" << std::endl;
std::cout << "########3.刷新血量 4.更新系统 0.退出######" << std::endl;
std::cout << "############################################" << std::endl;
}
void ctrlSlaver(const std::vector<channel> &channels)
{
srand(time(0));
int which = 0;
//int cnt = 0;
while (true)
{
Menu();
std::cout << "Please enter@:";
int n;
std::cin >> n;
if(n <= 0 || n >= 5)
break;
// 1.选择任务
// int cmdcode = rand() % tasks.size();
int cmdcode = n - 1;
// 2.选择子进程
// int processpos = rand() % channels.size(); //随机方法
std::cout << "father say:"
<< "cmdcode: " << cmdcode
<< " already sendto " << channels[which]._slaverid
<< " processname: " << channels[which]._processname
<< endl;
// 3.发送任务
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
++which;
which %= channels.size(); //轮转法
sleep(1);
}
}
void QuitProcess(const std::vector<channel>& channels)
{
for(const auto& e: channels)
{
close(e._cmdfd);
waitpid(e._slaverid, nullptr, 0);
}
//version1
// for (int i = channels.size() - 1; i >= 0; --i)
// {
// close(channels[i]._cmdfd);
// waitpid(channels[i]._slaverid, nullptr, 0);
// }
//sleep(5);
// 有bug
// for (const auto &e : channels)
// close(e._cmdfd);
// sleep(5);
// for (const auto &e : channels)
// waitpid(e._slaverid, nullptr, 0);
// sleep(5);
}
int main()
{
//再组织
//将对子进程结构的增删查改转化为对数据结构vector的增删查改
std::vector<channel> channels;
// 1.初始化
LoadTasks(&tasks);
InitProcessPool(&channels);
//test
Debug(channels);
// 2.开始控制子进程
ctrlSlaver(channels);
// 3.清理收尾
QuitProcess(channels);
return 0;
}
结果
注:
这是我们原先创建子进程的代码,但是这份代码会造成一个问题,就是子进程会继承父进程对上一个子进程管道读端。
void InitProcessPool(std::vector<channel> *channels)
{
// 1.初始化 --- bug
for (size_t i = 0; i < ProcessNum; ++i)
{
int pipefd[2]; //临时空间
int n = pipe(pipefd); //
if(n != 0)
{
perror("pipe create file");
return;
}
pid_t id = fork();
if(id == 0)
{
// child
close(pipefd[1]);
dup2(pipefd[0], 0);
close(pipefd[0]);
slaver();
std::cout << "process " << getpid() << ":quit!" << std::endl;
exit(0);
}
//father
close(pipefd[0]);
//开始添加channel字段
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
}
}
而解决办法是,把上一个写端记录下来,在创建子进程时,顺便把子进程的继承自父进程的写端给close掉。
3.命名管道
1.思考
我们上面使用的匿名管道只能在有共同祖先/血缘相近的进程间使用,而我们想在不同进程间进行管道通信时,应该怎么做呢?
我们可以使用FIFO文件在做这项工作,它经常被称作命名管道。
2.创建一个命名管道
1.命令行创建
mkfifo filename
2.程序中创建
int mkfifo(const char *filename,mode_t mode)
3.匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开用open
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。
4.命名管道的打开规则
如果当前打开操作是为读而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
- O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
- O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
4.日志
此内容与管道无关,只是需在当前练习中打印日志,所以做一个笔记
日志包含:日志时间,日志等级,日志内容,文件的名称和行号
日志等级:
- Info:常规消息
- Warning:报警信息
- Error:必要严重的问题,可能需要立即处理
- Fatel:致命的
- Debug:调试
日式时间相关函数
time:打印时间戳
time_t time(time_t *t);
当前时间戳传nullptr
gettimeofday:
int gettimeofday(struct timeval *tv, struct timezone *tz /*时区*/);
struct timezone *tz:时区,缺省为nullptr即可
localtime:
struct tm *localtime(const time_t *timep);
注意:这里年是从1900年开始的,所以要加上1900.月是从0开始的,所以要加1.
日志格式可变参数部分
int vsnprintf(char *str, size_t size, const char *format, va_list ap);
日志代码实现
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define SIZE 1024
//日志等级
#define Info 0
#define Debug 1
#define Warning 3
#define Error 4
#define Fatel 5
//打印方式
#define Screen 1
#define Onefile 2
#define Classfile 3
#define Logfile "log.txt"
class Log
{
public:
Log()
{
PrintMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
PrintMethod = method;
}
std::string levelToString(int level)
{
switch(level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatel:
return "Fatel";
default:
return "None";
}
}
//日志函数
// void logmessage(int level, const char* format, ...)
// {
// time_t t = time(nullptr);
// struct tm *ctime = localtime(&t);
// char leftbuffer[SIZE];
// snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
// ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
// ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
// va_list s;
// va_start(s, format);
// char rightbuffer[SIZE];
// vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
// va_end(s);
// //格式:默认部分+自定义部分
// char logtxt[SIZE];
// snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
// //printf("%s", logtxt);
// printlog(level, logtxt);
// }
void printlog(int level, const std::string &logtxt)
{
switch(PrintMethod)
{
case Screen:
std::cout << logtxt;
break;
case Onefile:
PrintOneFile(Logfile, logtxt);
break;
case Classfile:
PrintClassFile(level, logtxt);
break;
default:
break;
}
}
void PrintOneFile(const std::string &filename, const std::string &logtxt)
{
std::string logname = path + filename;
int fd = open(logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);
if(fd < 0)
return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void PrintClassFile(int level, const std::string &logtxt)
{
std::string filename = Logfile;
filename += ".";
filename += levelToString(level);
PrintOneFile(filename, logtxt);
}
void operator()(int level, const char* format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
//格式:默认部分+自定义部分
char logtxt[SIZE];
snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
//printf("%s", logtxt);
printlog(level, logtxt);
}
private:
int PrintMethod;
std::string path;
};
5.总结
总的来说,匿名管道和命名管道都是面向字节流的,会进行同步和互斥,生命周期随进程,使用时需要打开文件,单向通行。让不同进程看到同一份资源 -- 文件。
三、system V共享内存
1.原理
共享内存是在物理地址空间上申请的,通过页表挂接到不同进程程序地址空间的一种通信方式。
那么这块物理内存是进程申请的还是操作系统来申请的呢?
答案是操作系统,因为进程具有独立性,进程申请的资源归进程所有。
ipcs -m :查看所有的共享内存
共享内存的生命周期是跟随的内核的,用户不主动释放,共享内存会一直存在,除非内核关闭(用户释放)。
ipcrm -m shmid :删除shmid对应的共享内存
2.代码书写
1.相关函数
1.shmget
申请一块共享内存
int shmget(key_t key, size_t size, int shmflg);
返回值:
共享内存标识符
key:
1.key是一个数字,这个数字是几不重要。关键在于它必须在内核中具有唯一性,能够让不同的进程进行唯一性标识
2.第一个进程可以通过key创建共享内存,第二个之后的进程,只要拿着同一个key,就可和第一个进程看到同一个共享内存!
3.对于一个已经创建好的共享内存,key在哪?key在共享内存的描述对象中!
4.第一次创建的时候,必须有一个key了,怎么有?
ftok - convert a pathname(路径) and a project identifier(项目id) to a System V IPC key
key_t ftok(const char *pathname, int proj_id);
ftok是一套算法,将路径字符串和整形id进行了数值计算。
5.key -- 类似 -- 路径 -- 唯一性
size:
创建共享内存的大小,单位是字节
shmfig:
如何创建,获取 。。
IPC_CREAT | 单独使用,如果你申请的共享内存不存在,就创建,存在,就获取并返回 |
IPC_CREAT | IPC_EXCL | 如果你申请的共享内存不存在,就创建,存在,就出错返回。确保我们如果申请成功了一个共享内存,这个共享内存一定是一个新的 |
IPC_EXCL | 不单独使用 |
注:
key与shmid
key:操作系统内标定唯一性。
shmid:只在你的进程内,用来表示资源的唯一性。
2.shmat
void *shmat(int shmid, const void *shmaddr, int shmflg);
将申请的共享内存挂接到进程的虚拟地址空间
3.shmdt
int shmdt(const void *shmaddr);
取消挂接到进程的虚拟地址空间的共享内存
4.shmctl
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
cmd:
IPC_STAT: 获取
IPC_RMID:删除
2.代码
comm.hpp
#ifndef __COMM_HPP__
#define __COMM_HPP__
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include "log.hpp"
using namespace std;
class comm
{
public:
comm()
{
//log.Enable(Classfile);
}
//获取key
key_t Getkey()
{
key_t k = ftok(pathname.c_str(), proj_id);
if(k < 0)
{
log(Fatel, "ftok error string: %s, ftok error code: %d", strerror(errno), errno);
exit(1);
}
log(Info, "ftok success, key is : 0x%x", k);
return k;
}
int GetShareMemHelper(int flag)
{
int shmid = shmget(Getkey(), size, flag);
if(shmid < 0)
{
log(Fatel, "create share memory error string: %s, error code: %d", strerror(errno), errno);
exit(1);
}
log(Info, "create share memory success, shmid: %d", shmid);
return shmid;
}
int CreateShm()
{
return GetShareMemHelper(IPC_CREAT | IPC_EXCL | 0666);
}
int GetShm()
{
return GetShareMemHelper(IPC_CREAT);
}
private:
Log log;
const string pathname = "/home/shen";
const int proj_id = 0x6666;
//共享内存大小一般是4096的整数倍, 如果我们申请4097的话,操作系统实际给出4096*2的大小
const int size = 4096;
};
#endif
processa.cc
#include "comm.hpp"
Log log;
int main()
{
comm co;
int shmid = co.CreateShm();
log(Debug, "create shm done");
char *straddr = (char*)shmat(shmid, nullptr, 0);
log(Debug, "attach shm done");
while(true)
{
cout << "client say#:" << straddr << endl; //直接访问共享内存
sleep(1);
}
shmdt(straddr);
log(Debug, "detach shm done");
shmctl(shmid, IPC_RMID, nullptr);
log(Debug, "delete shm done");
return 0;
}
processb.cc
#include "comm.hpp"
Log log;
int main()
{
comm co;
int shmid = co.GetShm();
log(Debug, "Get shm done");
char *straddr = (char*)shmat(shmid, nullptr, 0);
log(Debug, "attach shm done");
while(true)
{
cout << "Please Enter@ ";
fgets(straddr, 4096, stdin);
}
shmdt(straddr);
log(Debug, "detach shm done");
return 0;
}
3.共享内存特性
- 共享内存没有同步互斥之类的保护机制
- 共享内存是所有的进程间通信中,速度是最快的! --- 原因:拷贝次数少
- 共享内存内部的数据,由用户自己维护!
四、system V补充
1.消息队列
- 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
- 每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值
- 特性方面: IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核
相关函数
int msgget(key_t key, int msgflg);
成功返回一个消息队列标识符,失败返回-1。key通过ftok获取,msgflg可用IPC_STAT和IPC_EXCL做参
// 起始地址 大小
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
int msgflg);
ipcs -q:查询消息队列
ipcrm -q msgid :删除msgid这个消息队列
2.信号量
int semget(key_t key, int nsems, int semflg);
int semctl(int semid, int semnum, int cmd, ...);