linux下进程通信的方式有很多,共享内存,消息队列,管道等。共享内存可以传输大量数据,但是多个进程同时读取共享内存就会出现脏读,可以借助消息队列实现多进程消息发送和接收。这种组合方式在实际开发中应用还是很多的,接下来就看一下。
目录
1.共享内存操作api
(1)创建共享内存
(2)挂载共享内存到当前进程
(3)取消挂载
(4) 共享内存控制函数-可以删除
2.消息队列操作api
(1)创建消息附列
(2)往消息队列中发送消息
(3)从消息队列取消息
(4)操作消息队列,删除,查看等
3.命令行操作
(1)共享内存查看和删除
(2)消息队列查看和删除
(3)信号量查看和删除
4.生产者进程和消费者进程互相通信实践
4.1 代码
4.2 运行效果
1.共享内存操作api
(1)创建共享内存
int shmget(key_t key, size_t size, int shmflg);
(2)挂载共享内存到当前进程
void *shmat(int shmid, const void *shmaddr, int shmflg);
(3)取消挂载
int shmdt(const void *shmaddr);
(4) 共享内存控制函数-可以删除
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
2.消息队列操作api
(1)创建消息附列
int msgget(key_t key, int flags);
(2)往消息队列中发送消息
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
(3)从消息队列取消息
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
(4)操作消息队列,删除,查看等
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
3.命令行操作
(1)共享内存查看和删除
ipcs -m / ipcrm -m
(2)消息队列查看和删除
ipcs -q / ipcrm -q
(3)信号量查看和删除
ipcs -s / ipcrm -s
4.生产者进程和消费者进程互相通信实践
接下来通过一个共享内存和两条消息队列来实现进程间互发数据,本在ubuntu环境下使用vscode开发
4.1 代码
消息定义: msgInfo.h
#pragma once
#include <time.h>
#define gAvailableMsgBufPos 100
#define gOccupiedMsgBufPos 101
//共享内存结构
struct TSharedMemoryModule
{
char name[40+1];
int id;
char reloadCommand[80+1];
int index;
long sizeOfUserSpace;
unsigned char *removed_puserSpace;
int users;
int newCreated;
int removed_writingLocks;
int removed_readingLocks;
int resID;
};
typedef TSharedMemoryModule *PTSharedMemoryModule;
//消息队列管理结构
struct TMsgBufHDL
{
long maxSizeOfMsg;
int maxNumOfMsg;
int maxStayTime;
int userID;
int queueIDOfFreePos;
int queueIDOfOccupiedPos;
};
typedef TMsgBufHDL *PTMsgBufHDL;
//消息头
struct TMessageHeader
{
long type;
int provider;
time_t time;
int len;
long msgIndex;
int dealer;
int forward_cnt;
short locked;
};
typedef TMessageHeader *PTMessageHeader;
// 空闲消息队列元素结构
struct TFreePosOfMsgBuf
{
long statusOfPos;
char indexOfPos[4];
};
// 任务消息队列结构
struct TOccupiedPosOfMsgBuf
{
long typeOfMsg;
char indexOfPos[4];
};
收据收发头文件:msgExchange.h
#pragma once
#include "msgInfo.h"
/*
================ 消息队列 ====================
// 创建共享内存
int shmget(key_t key, size_t size, int shmflg);
// 挂载
void *shmat(int shmid, const void *shmaddr, int shmflg);
// 取消挂载
int shmdt(const void *shmaddr);
// 共享内存控制函数-可以删除
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
*/
/*
================ 消息队列 ====================
// 创建和获取 ipc 内核对象
int msgget(key_t key, int flags);
// 将消息发送到消息队列
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
// 从消息队列获取消息
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
// 查看、设置、删除 ipc 内核对象(用法和 shmctl 一样)
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
消息数据格式
struct Msg{
long type; // 消息类型。这个是必须的,而且值必须 > 0,这个值被系统使用
// 消息正文,多少字节随你而定
};
*/
// 初始化共享内存和消息队列
int InitSharedMemoryMsgBuf();
// 销毁共享内存和消息队列
void UnInitSharedMemoryMsgBuf();
//创建消息队列 - 调用创建共享内存
int CreateMsgBuf();
//创建共享内存
PTSharedMemoryModule CreateSharedMemoryModule(long sizeOfUserSpace);
//初始化消息队列
int InitAllMsgIndex();
//从空闲消息队列取位置
int GetAvailableMsgBufPos();
// 空闲消息队列放入消息
int FreeMsgBufPos(int index);
// 任务消息队列放入消息
int OccupyMsgBufPos(int index, long typeOfMsg);
// 任务消息队列取消息
int GetOccupiedMsgBufPos(long *typeOfMsg);
// 从任务消息队列获取指定类型的消息
int GetOccupiedMsgBufPosOfMsgType(long typeOfMsg);
// 发送消息
int WriteBufferMessage(const unsigned char * msg,
int lenOfMsg,
long typeOfMsg,
const TMessageHeader* poriHeader,
PTMessageHeader pNewHeader);
// 用指定位置发消息
int WriteBufferMessageUsingIndex(const unsigned char * msg,
int lenOfMsg, long typeOfMsg,
const TMessageHeader* poriHeader,
PTMessageHeader pNewHeader,
const int *piIndexOfMsg);
//接收消息 - 释放索引
int ReadBufferMessagReleaseIndex(unsigned char * msg, int lenOfMsgBuf,
long typeOfMsg,
PTMessageHeader pOutputHeader);
//接收消息 - 不释放索引
int ReadBufferMessageNotReleaseIndex(unsigned char *msg, int lenOfMsgBuf,
long typeOfMsg,
PTMessageHeader pOutputHeader,
int *indexOfMsg, int iSigRetryFlag);
收据收发实现:msgExchange.cpp
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/shm.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include "msgExchange.h"
// ipcs -m / ipcrm -m
// ipcs -q / ipcrm -q
// ipcs -s / ipcrm -s
// 以下三部分都是从共享内存申请
static PTSharedMemoryModule s_pSharedMemMDL = NULL;
static PTMsgBufHDL s_pMsgBufHDL = NULL;
static unsigned char *s_pMsgBuf = NULL;
//共享内存,消息队列初始化信息
static TSharedMemoryModule s_sharedMemoryModule;
static TMsgBufHDL s_msgBufHDL;
static unsigned long gMsgIndex = 1;
#if defined(__GNUC__)
static unsigned long addAndFetchUlongAtomic(volatile unsigned long *p_var, unsigned long value) {
return __sync_add_and_fetch(p_var, value);
}
#endif
// 初始化共享内存和消息队列
int InitSharedMemoryMsgBuf()
{
s_sharedMemoryModule.id = 1234567;
s_sharedMemoryModule.newCreated = 1;
s_sharedMemoryModule.users = 0;
s_msgBufHDL.maxSizeOfMsg = 8192;
s_msgBufHDL.maxNumOfMsg = 1024;
CreateMsgBuf();
return 0;
}
void UnInitSharedMemoryMsgBuf() {
struct shmid_ds buf;
shmdt(s_pSharedMemMDL);
shmctl(s_sharedMemoryModule.resID, IPC_RMID, &buf);
msgctl(s_msgBufHDL.queueIDOfOccupiedPos, IPC_RMID, NULL);
msgctl(s_msgBufHDL.queueIDOfFreePos, IPC_RMID, NULL);
}
//创建共享内存
PTSharedMemoryModule CreateSharedMemoryModule(long sizeOfUserSpace)
{
PTSharedMemoryModule pSharedMem = nullptr;
int ret;
int resID;
unsigned char *pmUserSpace = nullptr;
if ((resID = shmget(s_sharedMemoryModule.id, sizeof(TSharedMemoryModule) + sizeOfUserSpace, 0660)) == -1) {
if ((resID = shmget(s_sharedMemoryModule.id, sizeof(TSharedMemoryModule) + sizeOfUserSpace, IPC_CREAT|0660)) == -1) {
return nullptr;
}
s_sharedMemoryModule.newCreated = 1;
}
else {
s_sharedMemoryModule.newCreated = 0;
}
std::cout << "shared memory id is :: " << resID << std::endl;
s_sharedMemoryModule.resID = resID;
if (((pSharedMem = (PTSharedMemoryModule)shmat(resID, 0, SHM_RND)) == NULL) || (pSharedMem == (void*)(-1))) {
if (errno == 24) {
pSharedMem = (PTSharedMemoryModule)shmat(resID, 0, SHM_RND);
}
if ((pSharedMem == NULL) || (pSharedMem == (void*)(-1))) {
return nullptr;
}
}
if (!s_sharedMemoryModule.newCreated) {
++(s_sharedMemoryModule.users);
s_sharedMemoryModule.newCreated = 0;
pSharedMem->newCreated = s_sharedMemoryModule.newCreated;
}
else {
memcpy(pSharedMem, &s_sharedMemoryModule, sizeof(s_sharedMemoryModule));
pSharedMem->users = 1;
pSharedMem->removed_readingLocks = 0;
pSharedMem->removed_writingLocks = 0;
pSharedMem->sizeOfUserSpace = sizeOfUserSpace;
}
if (sizeOfUserSpace == 0) {
pmUserSpace = nullptr;
}
else {
pmUserSpace = (unsigned char *)((unsigned char *)pSharedMem + sizeof(*pSharedMem));
}
// 清空消息内容
if ((pSharedMem->newCreated) && (pmUserSpace)) {
memset(pmUserSpace, 0, sizeOfUserSpace);
}
return pSharedMem;
}
//创建消息队列 - 调用创建共享内存
int CreateMsgBuf() {
TMsgBufHDL msgBufHDL;
int ret;
int newCreatedQueue;
memset(&msgBufHDL,0,sizeof(msgBufHDL));
msgBufHDL = s_msgBufHDL;
if ((s_pSharedMemMDL = CreateSharedMemoryModule(
sizeof(*s_pMsgBufHDL) +
((sizeof(TMessageHeader) + sizeof(unsigned char) * msgBufHDL.maxSizeOfMsg) * (msgBufHDL.maxNumOfMsg + 1)))) == NULL)
{
return -1;
}
if ((s_pMsgBufHDL = (PTMsgBufHDL)((unsigned char *)s_pSharedMemMDL + sizeof(TSharedMemoryModule))) == NULL)
{
return -1;
}
s_pMsgBuf = (unsigned char *)s_pMsgBufHDL + sizeof(*s_pMsgBufHDL);
if (s_pSharedMemMDL->newCreated) {
memcpy(s_pMsgBufHDL, &msgBufHDL, sizeof(*s_pMsgBufHDL));
}
s_pMsgBufHDL->userID = s_pSharedMemMDL->id;
if ((s_pMsgBufHDL->queueIDOfFreePos = msgget(s_pMsgBufHDL->userID, 0660)) == -1)
{
if ((s_pMsgBufHDL->queueIDOfFreePos = msgget(s_pMsgBufHDL->userID, 0660 | IPC_CREAT)) == -1)
{
return -1;
}
else {
newCreatedQueue = 1;
}
}
std::cout << "free msg queue id is :: " << s_pMsgBufHDL->queueIDOfFreePos << std::endl;
s_msgBufHDL.queueIDOfFreePos = s_pMsgBufHDL->queueIDOfFreePos;
if ((s_pMsgBufHDL->queueIDOfOccupiedPos = msgget(s_pMsgBufHDL->userID + 1,0660)) == -1)
{
if ((s_pMsgBufHDL->queueIDOfOccupiedPos = msgget(s_pMsgBufHDL->userID + 1,0660 | IPC_CREAT)) == -1)
{
return -1;
}
else {
newCreatedQueue = 1;
}
}
std::cout << "occupied msg queue id is :: " << s_pMsgBufHDL->queueIDOfOccupiedPos << std::endl;
s_msgBufHDL.queueIDOfOccupiedPos = s_pMsgBufHDL->queueIDOfOccupiedPos;
if (newCreatedQueue) {
InitAllMsgIndex();
}
return 0;
}
//初始化消息队列
int InitAllMsgIndex() {
TFreePosOfMsgBuf freePos;
TOccupiedPosOfMsgBuf occupiedPos;
PTMessageHeader pheader;
int ret;
int index;
while(true) {
if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfFreePos, &freePos, sizeof(freePos.indexOfPos), 0, IPC_NOWAIT)) < 0) {
break;
}
}
while(true) {
if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &occupiedPos, sizeof(occupiedPos.indexOfPos), 0, IPC_NOWAIT)) < 0) {
break;
}
}
// 位置消息队列中放入共享内存索引编号
for (index = 0; index < s_pMsgBufHDL->maxNumOfMsg; index++)
{
FreeMsgBufPos(index);
}
return(0);
}
//从空闲消息队列取位置
int GetAvailableMsgBufPos()
{
TFreePosOfMsgBuf posOfMsgBuf;
int ret;
int index;
memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfFreePos, &posOfMsgBuf, sizeof(int), 0, 0)) < 0) {
return -1;
}
memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
return index;
}
// 空闲消息队列放入消息
int FreeMsgBufPos(int index)
{
TFreePosOfMsgBuf posOfMsgBuf;
int ret;
PTMessageHeader pheader;
if ((index < 0) || (index >= s_pMsgBufHDL->maxNumOfMsg)) {
return -1;
}
pheader = (PTMessageHeader)(s_pMsgBuf + index * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg));
pheader->type = 0;
posOfMsgBuf.statusOfPos = gAvailableMsgBufPos;
memcpy(posOfMsgBuf.indexOfPos, &index, sizeof(int));
if (msgsnd(s_pMsgBufHDL->queueIDOfFreePos, &posOfMsgBuf, sizeof(int), 0) < 0) {
return -1;
}
return 0;
}
// 任务消息队列放入消息
int OccupyMsgBufPos(int index, long typeOfMsg)
{
TOccupiedPosOfMsgBuf posOfMsgBuf;
int ret;
posOfMsgBuf.typeOfMsg = typeOfMsg;
memcpy(posOfMsgBuf.indexOfPos, &index, sizeof(int));
if (ret = msgsnd(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(posOfMsgBuf.indexOfPos), 0)) {
return -1;
}
return 0;
}
// 任务消息队列取消息
int GetOccupiedMsgBufPos(long *typeOfMsg)
{
TOccupiedPosOfMsgBuf posOfMsgBuf;
int ret;
int index;
memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(int), 0, 0)) < 0) {
return -1;
}
memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
*typeOfMsg = posOfMsgBuf.typeOfMsg;
return index;
}
// 从任务消息队列获取指定类型的消息
int GetOccupiedMsgBufPosOfMsgType(long typeOfMsg)
{
TOccupiedPosOfMsgBuf posOfMsgBuf;
int ret;
int index;
int iRetryTime = 0;
memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
posOfMsgBuf.typeOfMsg = typeOfMsg;
do {
ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(int), typeOfMsg, 0);
iRetryTime ++;
}while(iRetryTime < 10 && 1 == ret && EINTR == errno);
if (-1 != ret) {
memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
return index;
}
return -1;
}
// 发送消息
int WriteBufferMessage(const unsigned char * msg,
int lenOfMsg,
long typeOfMsg,
const TMessageHeader* poriHeader,
PTMessageHeader pNewHeader)
{
int indexOfMsg;
unsigned char *pAddr;
int ret;
PTMessageHeader pheader;
if ((indexOfMsg = GetAvailableMsgBufPos()) < 0) {
return indexOfMsg;
}
if ((indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg)) {
FreeMsgBufPos(indexOfMsg);
return(-1);
}
// 得到存放消息的首地址
pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
pheader = (PTMessageHeader)pAddr;
pheader->len = lenOfMsg;
if (NULL == poriHeader) {
time(&(pheader->time));
pheader->provider = getpid();
pheader->dealer = 0;
pheader->msgIndex = addAndFetchUlongAtomic(&gMsgIndex, 1);
pheader->forward_cnt = 0;
}
else
{
pheader->time = poriHeader->time;
pheader->provider = poriHeader->provider;
pheader->dealer = getpid();
pheader->msgIndex = poriHeader->msgIndex;
pheader->forward_cnt = poriHeader->forward_cnt;
}
pheader->type = typeOfMsg;
if(pNewHeader) {
memcpy(pNewHeader, pheader, sizeof(*pNewHeader));
}
memcpy(pAddr + sizeof(TMessageHeader), msg, lenOfMsg);
if ((ret = OccupyMsgBufPos(indexOfMsg, typeOfMsg)) < 0) {
FreeMsgBufPos(indexOfMsg);
return ret;
}
std::cout << "WriteBufferMessage -> indexOfMsg:: " << indexOfMsg << " typeOfMsg::" << typeOfMsg << " pheader->len::" << pheader->len << std::endl;
return 0;
}
// 用指定位置发消息
int WriteBufferMessageUsingIndex(const unsigned char * msg,
int lenOfMsg, long typeOfMsg,
const TMessageHeader* poriHeader,
PTMessageHeader pNewHeader,
const int *piIndexOfMsg)
{
int indexOfMsg = *piIndexOfMsg;
int ret;
unsigned char *pAddr;
PTMessageHeader pheader;
if ((indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg)) {
return -1;
}
pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
pheader = (PTMessageHeader)pAddr;
pheader->len = lenOfMsg;
if (poriHeader == NULL)
{
time(&(pheader->time));
pheader->provider = getpid();
pheader->dealer = 0;
pheader->msgIndex = addAndFetchUlongAtomic(&gMsgIndex, 1);
pheader->forward_cnt = 0;
}
else
{
pheader->time = poriHeader->time;
pheader->provider = poriHeader->provider;
pheader->dealer = getpid();
pheader->msgIndex = poriHeader->msgIndex;
pheader->forward_cnt = poriHeader->forward_cnt;
}
pheader->type = typeOfMsg;
if(pNewHeader) {
memcpy(pNewHeader, pheader, sizeof(*pNewHeader));
}
memcpy(pAddr + sizeof(TMessageHeader), msg, lenOfMsg);
if ((ret = OccupyMsgBufPos(indexOfMsg, typeOfMsg)) < 0)
{
return ret;
}
return(0);
}
//接收消息 - 释放索引
int ReadBufferMessagReleaseIndex(unsigned char * msg, int lenOfMsgBuf,
long typeOfMsg,
PTMessageHeader pOutputHeader)
{
int indexOfMsg;
unsigned char *pAddr;
int lenOfMsg;
PTMessageHeader pheader;
if (((indexOfMsg = GetOccupiedMsgBufPosOfMsgType(typeOfMsg)) < 0) || (indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg))
{
return -1;
}
pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
pheader = (PTMessageHeader)pAddr;
if (((lenOfMsg = pheader->len) < 0) || (lenOfMsg >= lenOfMsgBuf) || (lenOfMsg > s_pMsgBufHDL->maxSizeOfMsg))
{
return -1;
}
memcpy(msg, pAddr + sizeof(TMessageHeader), lenOfMsg);
if (pOutputHeader != NULL) {
memcpy(pOutputHeader, pheader, sizeof(*pheader));
}
FreeMsgBufPos(indexOfMsg);
std::cout << "ReadBufferMessagReleaseIndex -> indexOfMsg:: " << indexOfMsg << " typeOfMsg::" << typeOfMsg << " pheader->len::" << pheader->len << std::endl;
return(lenOfMsg);
}
//接收消息 - 不释放索引
int ReadBufferMessageNotReleaseIndex(unsigned char *msg, int lenOfMsgBuf,
long typeOfMsg,
PTMessageHeader pOutputHeader,
int *indexOfMsg, int iSigRetryFlag)
{
unsigned char *pAddr;
int lenOfMsg;
PTMessageHeader pheader;
if (((*indexOfMsg = GetOccupiedMsgBufPosOfMsgType(typeOfMsg)) < 0) || (*indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg))
{
return -1;
}
pAddr = s_pMsgBuf + *indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
pheader = (PTMessageHeader)pAddr;
if (((lenOfMsg = pheader->len) < 0) || (lenOfMsg >= lenOfMsgBuf) || (lenOfMsg > s_pMsgBufHDL->maxSizeOfMsg))
{
FreeMsgBufPos(*indexOfMsg);
return -1;
}
memcpy(msg, pAddr + sizeof(TMessageHeader), lenOfMsg);
if (pOutputHeader != NULL) {
memcpy(pOutputHeader, pheader, sizeof(*pheader));
}
return lenOfMsg;
}
生产者进程:productionProcess.cpp
#include <stdio.h>
#include <sys/shm.h>
#include <string.h>
#include <iostream>
#include "msgExchange.h"
using namespace std;
int SimpleTestWrite() {
// 1. 创建共享内存
int shmid = shmget(1000, 4096, IPC_CREAT|0660);
if(shmid == -1)
{
perror("shmget error");
return -1;
}
// 2. 进程和共享内存关联
void* ptr = shmat(shmid, NULL, 0);
if(ptr == (void *) -1)
{
perror("shmat error");
return -1;
}
// 3. 写共享内存
const char* p = "hello, nice to meet you !";
memcpy(ptr, p, strlen(p)+1);
getchar();
// 4. 解除内存和共享内存关联
shmdt(ptr);
// 5. 删除共享内存
shmctl(shmid, IPC_RMID, NULL);
std::cout << "delete share memory !" << std::endl;
}
int main(int argc, char *argv[])
{
InitSharedMemoryMsgBuf();
// 发送消息
TMessageHeader newHeader;
std::string sendMsg("Hello, I am producer !");
WriteBufferMessage((unsigned char *)sendMsg.data(),
sendMsg.size(),
10101010,
nullptr,
&newHeader);
// 读消息
unsigned char recvMsg[8192] = {0};
ReadBufferMessagReleaseIndex(recvMsg, 8192,
newHeader.provider,
&newHeader);
std::cout << "recvMsg:: " << recvMsg << std::endl;
UnInitSharedMemoryMsgBuf();
return 0;
}
消费者进程:consumptionProcess.cpp
#include <sys/shm.h>
#include <string.h>
#include <stdio.h>
#include <iostream>
#include "msgExchange.h"
using namespace std;
int SimpleTestRead() {
// 1. 创建共享内存
int shmid = shmget(1000, 0, 0660);
if(shmid == -1)
{
perror("shmget error");
return -1;
}
// 2. 获取共享内存地址
void* ptr = shmat(shmid, NULL, 0);
if(ptr == (void *) -1)
{
perror("shmat error");
return -1;
}
// 3. 读共享内存
std::cout << "read memory:: " << (char*)ptr << std::endl;
// 4. 解除关联
shmdt(ptr);
// 5. 删除
shmctl(shmid, IPC_RMID, NULL);
std::cout << "delete share memory !" << std::endl;
}
int main(int argc, char *argv[])
{
InitSharedMemoryMsgBuf();
// 读消息
TMessageHeader recvHeader;
unsigned char recvMsg[8192] = {0};
ReadBufferMessagReleaseIndex(recvMsg, 8192,
10101010,
&recvHeader);
std::cout << "recvMsg:: " << recvMsg << std::endl;
// 发送消息
TMessageHeader newHeader;
std::string sendMsg("Hello, I am consumer, I have receiving your msg, thank you!");
WriteBufferMessage((unsigned char *)sendMsg.data(),
sendMsg.size(),
recvHeader.provider,
&recvHeader,
&newHeader);
UnInitSharedMemoryMsgBuf();
return 0;
}
编译脚本:Makefile
app: productionProcess consumptionProcess
#说明:$^代表依赖项
productionProcess: productionProcess.cpp msgExchange.cpp
g++ -g $^ -o productionProcess
consumptionProcess: consumptionProcess.cpp msgExchange.cpp
g++ -g $^ -o consumptionProcess
clean:
-rm productionProcess consumptionProcess -f
调试配置:launch.json
{
"version": "0.2.0",
"configurations": [
{
"name": "C/C++ Runner: Debug Session",
"type": "cppdbg",
"request": "launch",
"args": [],
"stopAtEntry": false,
"externalConsole": false,
"cwd": "/home/tiger/ipc",
"program": "/home/tiger/ipc/productionProcess",
"MIMode": "gdb",
"miDebuggerPath": "gdb",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
}
]
}
4.2 运行效果
从运行效果看,程序创建了共享内存5308433,空闲消息队列12,任务队列13,生产者进程从空闲队列去取出位置0,按照位置在共享内存写入12字节的信息,然后发送消息到任务队列,然后等待消费者进程答复,消费着进程启动后从任务队列中取出位置,按照位置从共享内存读取数据,并打印,然后从空闲队列取出1号位置,按照位置在共享内存中写入要发送的信息。然后将位置信息放入任务队列,生产者进程从任务队列取出消息位置,按照位置从共享内存中取出消费者进程的消息并打印出来,同时归还位置信息到空闲队列,下次再继续使用。