linux下共享内存和消息队列实现多进程间数据收发

news2024/11/18 21:32:39

       linux下进程通信的方式有很多,共享内存,消息队列,管道等。共享内存可以传输大量数据,但是多个进程同时读取共享内存就会出现脏读,可以借助消息队列实现多进程消息发送和接收。这种组合方式在实际开发中应用还是很多的,接下来就看一下。

目录

1.共享内存操作api

 (1)创建共享内存

(2)挂载共享内存到当前进程

(3)取消挂载

(4) 共享内存控制函数-可以删除

2.消息队列操作api

(1)创建消息附列

(2)往消息队列中发送消息

(3)从消息队列取消息

(4)操作消息队列,删除,查看等

3.命令行操作

(1)共享内存查看和删除

(2)消息队列查看和删除

(3)信号量查看和删除

4.生产者进程和消费者进程互相通信实践

4.1 代码

4.2 运行效果


1.共享内存操作api

 (1)创建共享内存

int shmget(key_t key, size_t size, int shmflg);

(2)挂载共享内存到当前进程

void *shmat(int shmid, const void *shmaddr, int shmflg);

(3)取消挂载

int shmdt(const void *shmaddr);

(4) 共享内存控制函数-可以删除

int shmctl(int shmid, int cmd, struct shmid_ds *buf);

2.消息队列操作api

(1)创建消息附列

int msgget(key_t key, int flags);

(2)往消息队列中发送消息

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

(3)从消息队列取消息

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

(4)操作消息队列,删除,查看等

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

3.命令行操作

(1)共享内存查看和删除

ipcs -m / ipcrm -m

(2)消息队列查看和删除

 ipcs -q / ipcrm -q

(3)信号量查看和删除

ipcs -s / ipcrm -s

4.生产者进程和消费者进程互相通信实践

接下来通过一个共享内存和两条消息队列来实现进程间互发数据,本在ubuntu环境下使用vscode开发

4.1 代码

消息定义:  msgInfo.h

#pragma once
#include <time.h>

#define	gAvailableMsgBufPos	100
#define gOccupiedMsgBufPos	101

//共享内存结构
struct TSharedMemoryModule
{
	char	        name[40+1];		
	int	            id;			
	char	        reloadCommand[80+1];
	int	            index;			
	long	        sizeOfUserSpace;	
	unsigned char   *removed_puserSpace;	
	int	            users;			
	int	            newCreated;		
	int	            removed_writingLocks;
	int	            removed_readingLocks;
	int				resID;
};
typedef TSharedMemoryModule		*PTSharedMemoryModule;

//消息队列管理结构
struct TMsgBufHDL
{
	long			maxSizeOfMsg;		
	int				maxNumOfMsg;		
	int				maxStayTime;		
	int				userID;			
	int				queueIDOfFreePos;	
	int				queueIDOfOccupiedPos;
};
typedef TMsgBufHDL		*PTMsgBufHDL;

//消息头
struct TMessageHeader
{
	long			type;	
	int				provider;	
	time_t			time;		
	int				len;		
	long			msgIndex;	
	int				dealer;		
    int 			forward_cnt;
	short			locked;	
};
typedef TMessageHeader	*PTMessageHeader;

// 空闲消息队列元素结构
struct TFreePosOfMsgBuf
{
	long 	statusOfPos;
	char	indexOfPos[4];
};

// 任务消息队列结构
struct TOccupiedPosOfMsgBuf
{
	long 	typeOfMsg;
	char	indexOfPos[4];
};

收据收发头文件:msgExchange.h

#pragma once

#include "msgInfo.h"

/*
================ 消息队列 ====================
// 创建共享内存
int shmget(key_t key, size_t size, int shmflg);
// 挂载
void *shmat(int shmid, const void *shmaddr, int shmflg);
// 取消挂载
int shmdt(const void *shmaddr);
// 共享内存控制函数-可以删除
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
*/

/*
================ 消息队列 ====================
 // 创建和获取 ipc 内核对象
 int msgget(key_t key, int flags);
 // 将消息发送到消息队列
 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
 // 从消息队列获取消息
 ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
 // 查看、设置、删除 ipc 内核对象(用法和 shmctl 一样)
 int msgctl(int msqid, int cmd, struct msqid_ds *buf);

消息数据格式
 struct Msg{
     long type; // 消息类型。这个是必须的,而且值必须 > 0,这个值被系统使用
     // 消息正文,多少字节随你而定
 };
 */

// 初始化共享内存和消息队列
int InitSharedMemoryMsgBuf();

// 销毁共享内存和消息队列
void UnInitSharedMemoryMsgBuf();

//创建消息队列 - 调用创建共享内存
int CreateMsgBuf();

//创建共享内存
PTSharedMemoryModule CreateSharedMemoryModule(long sizeOfUserSpace);

//初始化消息队列
int InitAllMsgIndex();

//从空闲消息队列取位置
int GetAvailableMsgBufPos();

// 空闲消息队列放入消息
int FreeMsgBufPos(int index);

// 任务消息队列放入消息
int OccupyMsgBufPos(int index, long typeOfMsg);

// 任务消息队列取消息
int GetOccupiedMsgBufPos(long *typeOfMsg);
// 从任务消息队列获取指定类型的消息
int GetOccupiedMsgBufPosOfMsgType(long typeOfMsg);

// 发送消息
int WriteBufferMessage(const unsigned char * msg,
                int lenOfMsg, 
                long typeOfMsg, 
                const TMessageHeader* poriHeader, 
                PTMessageHeader pNewHeader);

// 用指定位置发消息
int WriteBufferMessageUsingIndex(const unsigned char * msg, 
                            int lenOfMsg, long typeOfMsg,
                            const TMessageHeader* poriHeader, 
                            PTMessageHeader pNewHeader, 
                            const int *piIndexOfMsg);

//接收消息 - 释放索引
int ReadBufferMessagReleaseIndex(unsigned char * msg, int lenOfMsgBuf,
                                long typeOfMsg,
                                PTMessageHeader pOutputHeader);

//接收消息 - 不释放索引
int ReadBufferMessageNotReleaseIndex(unsigned char *msg, int lenOfMsgBuf, 
                                long typeOfMsg, 
                                PTMessageHeader pOutputHeader, 
                                int *indexOfMsg, int iSigRetryFlag);

收据收发实现:msgExchange.cpp

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/shm.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>

#include "msgExchange.h"

// ipcs -m / ipcrm -m
// ipcs -q / ipcrm -q 
// ipcs -s / ipcrm -s

// 以下三部分都是从共享内存申请
static PTSharedMemoryModule    s_pSharedMemMDL = NULL;
static PTMsgBufHDL             s_pMsgBufHDL = NULL;
static unsigned char           *s_pMsgBuf = NULL;

//共享内存,消息队列初始化信息
static TSharedMemoryModule s_sharedMemoryModule;
static TMsgBufHDL          s_msgBufHDL;

static unsigned long       gMsgIndex = 1;

#if defined(__GNUC__)
static unsigned long addAndFetchUlongAtomic(volatile unsigned long *p_var, unsigned long value) {
    return __sync_add_and_fetch(p_var, value);
}
#endif

// 初始化共享内存和消息队列
int InitSharedMemoryMsgBuf()
{
    s_sharedMemoryModule.id = 1234567;
    s_sharedMemoryModule.newCreated = 1;
    s_sharedMemoryModule.users = 0;

    s_msgBufHDL.maxSizeOfMsg = 8192;		
	s_msgBufHDL.maxNumOfMsg = 1024;

    CreateMsgBuf();

    return 0;
}

void UnInitSharedMemoryMsgBuf() {
    struct shmid_ds    buf;
    shmdt(s_pSharedMemMDL);
    shmctl(s_sharedMemoryModule.resID, IPC_RMID, &buf);
    msgctl(s_msgBufHDL.queueIDOfOccupiedPos, IPC_RMID, NULL);
    msgctl(s_msgBufHDL.queueIDOfFreePos, IPC_RMID, NULL);  
}

//创建共享内存
PTSharedMemoryModule CreateSharedMemoryModule(long sizeOfUserSpace)
{
    PTSharedMemoryModule   pSharedMem = nullptr;
    int                    ret;
    int                    resID;
    unsigned char          *pmUserSpace = nullptr;

    if ((resID = shmget(s_sharedMemoryModule.id, sizeof(TSharedMemoryModule) + sizeOfUserSpace, 0660)) == -1) {
        if ((resID = shmget(s_sharedMemoryModule.id, sizeof(TSharedMemoryModule) + sizeOfUserSpace, IPC_CREAT|0660)) == -1) {
            return nullptr;
        }
        s_sharedMemoryModule.newCreated = 1;
    }
    else {
        s_sharedMemoryModule.newCreated = 0;
    }

    std::cout << "shared memory id is :: " << resID << std::endl;
    s_sharedMemoryModule.resID = resID;

    if (((pSharedMem = (PTSharedMemoryModule)shmat(resID, 0, SHM_RND)) == NULL) || (pSharedMem == (void*)(-1))) {
        if (errno == 24) {
            pSharedMem = (PTSharedMemoryModule)shmat(resID, 0, SHM_RND);
        }
        if ((pSharedMem == NULL) || (pSharedMem == (void*)(-1))) {
            return nullptr;
        }
    }

    if (!s_sharedMemoryModule.newCreated) {
        ++(s_sharedMemoryModule.users);
        s_sharedMemoryModule.newCreated = 0;
        pSharedMem->newCreated = s_sharedMemoryModule.newCreated;
    }
    else {        
        memcpy(pSharedMem, &s_sharedMemoryModule, sizeof(s_sharedMemoryModule));
        pSharedMem->users = 1;
        pSharedMem->removed_readingLocks = 0;
        pSharedMem->removed_writingLocks = 0;
        pSharedMem->sizeOfUserSpace = sizeOfUserSpace;
    }
    if (sizeOfUserSpace == 0) {
        pmUserSpace = nullptr;
    }
    else {
        pmUserSpace = (unsigned char *)((unsigned char *)pSharedMem + sizeof(*pSharedMem));
    }
    
    // 清空消息内容
    if ((pSharedMem->newCreated) && (pmUserSpace)) {
        memset(pmUserSpace, 0, sizeOfUserSpace);
    }

    return pSharedMem;
}

//创建消息队列 - 调用创建共享内存
int CreateMsgBuf() {
    TMsgBufHDL         msgBufHDL;
    int                ret;
    int                newCreatedQueue;
    
    memset(&msgBufHDL,0,sizeof(msgBufHDL));
    msgBufHDL = s_msgBufHDL;
    
    if ((s_pSharedMemMDL = CreateSharedMemoryModule(
            sizeof(*s_pMsgBufHDL) + 
            ((sizeof(TMessageHeader) + sizeof(unsigned char) * msgBufHDL.maxSizeOfMsg) * (msgBufHDL.maxNumOfMsg + 1)))) == NULL)
    {
        return -1;
    }

    if ((s_pMsgBufHDL = (PTMsgBufHDL)((unsigned char *)s_pSharedMemMDL + sizeof(TSharedMemoryModule))) == NULL)
    {
        return -1;
    }

    s_pMsgBuf = (unsigned char *)s_pMsgBufHDL + sizeof(*s_pMsgBufHDL);
    
    if (s_pSharedMemMDL->newCreated) {
        memcpy(s_pMsgBufHDL, &msgBufHDL, sizeof(*s_pMsgBufHDL));
    }
    
    s_pMsgBufHDL->userID = s_pSharedMemMDL->id;

    if ((s_pMsgBufHDL->queueIDOfFreePos = msgget(s_pMsgBufHDL->userID, 0660)) == -1)
    {
        if ((s_pMsgBufHDL->queueIDOfFreePos = msgget(s_pMsgBufHDL->userID, 0660 | IPC_CREAT)) == -1)
        {
            return -1;
        }
        else {
            newCreatedQueue = 1;
        }
    }

    std::cout << "free msg queue id is :: " << s_pMsgBufHDL->queueIDOfFreePos << std::endl;
    s_msgBufHDL.queueIDOfFreePos =  s_pMsgBufHDL->queueIDOfFreePos;

    if ((s_pMsgBufHDL->queueIDOfOccupiedPos = msgget(s_pMsgBufHDL->userID + 1,0660)) == -1)
    {
        if ((s_pMsgBufHDL->queueIDOfOccupiedPos = msgget(s_pMsgBufHDL->userID + 1,0660 | IPC_CREAT)) == -1)
        {
            return -1;
        }
        else {
            newCreatedQueue = 1;
        }
            
    }
    
    std::cout << "occupied msg queue id is :: " << s_pMsgBufHDL->queueIDOfOccupiedPos << std::endl;
    s_msgBufHDL.queueIDOfOccupiedPos =  s_pMsgBufHDL->queueIDOfOccupiedPos;

    if (newCreatedQueue) {
        InitAllMsgIndex();
    }
    
    return 0;
}


//初始化消息队列
int InitAllMsgIndex() {
    TFreePosOfMsgBuf        freePos;
    TOccupiedPosOfMsgBuf    occupiedPos;
    PTMessageHeader         pheader;
    int                     ret;
    int                     index;
        
    while(true) {
        if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfFreePos, &freePos, sizeof(freePos.indexOfPos), 0, IPC_NOWAIT)) < 0) {
            break;
        }
            
    }

    while(true) {
        if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &occupiedPos, sizeof(occupiedPos.indexOfPos), 0, IPC_NOWAIT)) < 0) {
            break;
        }    
    }
    
    // 位置消息队列中放入共享内存索引编号
    for (index = 0; index < s_pMsgBufHDL->maxNumOfMsg; index++)
    {
        FreeMsgBufPos(index);
    }
    
    return(0);
}

//从空闲消息队列取位置
int GetAvailableMsgBufPos()
{
    TFreePosOfMsgBuf    posOfMsgBuf;
    int                 ret;
    int                 index;
    
    memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
    if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfFreePos, &posOfMsgBuf, sizeof(int), 0, 0)) < 0) {
        return -1;
    }
    memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
    
    return index;
}

// 空闲消息队列放入消息
int FreeMsgBufPos(int index)
{
    TFreePosOfMsgBuf    posOfMsgBuf;
    int                 ret;
    PTMessageHeader    pheader;
    
    if ((index < 0) || (index >= s_pMsgBufHDL->maxNumOfMsg)) {
        return -1;
    }
    
    pheader = (PTMessageHeader)(s_pMsgBuf + index * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg));
    pheader->type = 0;
    
    posOfMsgBuf.statusOfPos = gAvailableMsgBufPos;
    memcpy(posOfMsgBuf.indexOfPos, &index, sizeof(int));
    
    if (msgsnd(s_pMsgBufHDL->queueIDOfFreePos, &posOfMsgBuf, sizeof(int), 0) < 0) {
        return -1;
    }
    return 0;
}

// 任务消息队列放入消息
int OccupyMsgBufPos(int index, long typeOfMsg)
{
    TOccupiedPosOfMsgBuf    posOfMsgBuf;
    int                     ret;
    
    posOfMsgBuf.typeOfMsg = typeOfMsg;
    memcpy(posOfMsgBuf.indexOfPos, &index, sizeof(int));

    if (ret = msgsnd(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(posOfMsgBuf.indexOfPos), 0)) {
        return -1;
    }

    return 0;
}

// 任务消息队列取消息
int GetOccupiedMsgBufPos(long *typeOfMsg)
{
    TOccupiedPosOfMsgBuf   posOfMsgBuf;
    int                         ret;
    int                         index;
    
    memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
    if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(int), 0, 0)) < 0) {
        return -1;
    }
    memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
    *typeOfMsg = posOfMsgBuf.typeOfMsg;
        
    return index;
}

// 从任务消息队列获取指定类型的消息
int GetOccupiedMsgBufPosOfMsgType(long typeOfMsg)
{
    TOccupiedPosOfMsgBuf    posOfMsgBuf;
    int                     ret;
    int                     index;
    int                     iRetryTime = 0;

    memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
    posOfMsgBuf.typeOfMsg = typeOfMsg;

    do {
        ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(int), typeOfMsg, 0);
        iRetryTime ++;
    }while(iRetryTime < 10 && 1 == ret && EINTR == errno);

    if (-1 != ret) {
        memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
        return index;
    }

    return -1;
}

// 发送消息
int WriteBufferMessage(const unsigned char * msg,
                int lenOfMsg, 
                long typeOfMsg, 
                const TMessageHeader* poriHeader, 
                PTMessageHeader pNewHeader)
{
    int                    indexOfMsg;
    unsigned char         *pAddr;
    int                    ret;
    PTMessageHeader    pheader;

    
    if ((indexOfMsg = GetAvailableMsgBufPos()) < 0) {
        return indexOfMsg;
    }
    if ((indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg)) {
        FreeMsgBufPos(indexOfMsg);
        return(-1);
    }
    
    // 得到存放消息的首地址
    pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
    pheader = (PTMessageHeader)pAddr;
    pheader->len = lenOfMsg;
    if (NULL == poriHeader) {
        time(&(pheader->time));
        pheader->provider = getpid();
        pheader->dealer = 0;
        pheader->msgIndex = addAndFetchUlongAtomic(&gMsgIndex, 1);
        pheader->forward_cnt = 0;
    }
    else
    {
        pheader->time = poriHeader->time;
        pheader->provider = poriHeader->provider;
        pheader->dealer = getpid();
        pheader->msgIndex = poriHeader->msgIndex;
        pheader->forward_cnt = poriHeader->forward_cnt;
    }
    pheader->type = typeOfMsg;
    if(pNewHeader) {
        memcpy(pNewHeader, pheader, sizeof(*pNewHeader));
    }
    memcpy(pAddr + sizeof(TMessageHeader), msg, lenOfMsg);
    if ((ret = OccupyMsgBufPos(indexOfMsg, typeOfMsg)) < 0) {
        FreeMsgBufPos(indexOfMsg);
        return ret;
    }

    std::cout << "WriteBufferMessage -> indexOfMsg:: " << indexOfMsg << " typeOfMsg::" << typeOfMsg << " pheader->len::" <<  pheader->len << std::endl;
    return 0;
}

// 用指定位置发消息
int WriteBufferMessageUsingIndex(const unsigned char * msg, 
                            int lenOfMsg, long typeOfMsg,
                            const TMessageHeader* poriHeader, 
                            PTMessageHeader pNewHeader, 
                            const int *piIndexOfMsg)
{
    int indexOfMsg = *piIndexOfMsg;
    int ret;
    unsigned char *pAddr;
    PTMessageHeader pheader;
    
    if ((indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg)) {
        return -1;
    }
    
    pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
    pheader = (PTMessageHeader)pAddr;
    pheader->len = lenOfMsg;
    if (poriHeader == NULL)
    {
        time(&(pheader->time));
        pheader->provider = getpid();
        pheader->dealer = 0;
        pheader->msgIndex = addAndFetchUlongAtomic(&gMsgIndex, 1);
        pheader->forward_cnt = 0;
    }
    else
    {
        pheader->time = poriHeader->time;
        pheader->provider = poriHeader->provider;
        pheader->dealer = getpid();
        pheader->msgIndex = poriHeader->msgIndex;
        pheader->forward_cnt = poriHeader->forward_cnt;
    }
    pheader->type = typeOfMsg;
    if(pNewHeader) {
        memcpy(pNewHeader, pheader, sizeof(*pNewHeader));
    }
    memcpy(pAddr + sizeof(TMessageHeader), msg, lenOfMsg);
    if ((ret = OccupyMsgBufPos(indexOfMsg, typeOfMsg)) < 0)
    {
        return ret;
    }
    return(0);
}

//接收消息 - 释放索引
int ReadBufferMessagReleaseIndex(unsigned char * msg, int lenOfMsgBuf,
                                long typeOfMsg,
                                PTMessageHeader pOutputHeader)
{
    int                 indexOfMsg;
    unsigned char       *pAddr;
    int                 lenOfMsg;
    PTMessageHeader     pheader;
    
    if (((indexOfMsg = GetOccupiedMsgBufPosOfMsgType(typeOfMsg)) < 0)  || (indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg))
    {
        return -1;
    }
    pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
    pheader = (PTMessageHeader)pAddr;
    if (((lenOfMsg = pheader->len) < 0) || (lenOfMsg >= lenOfMsgBuf) || (lenOfMsg > s_pMsgBufHDL->maxSizeOfMsg))
    {
        return -1;
    }
    memcpy(msg, pAddr + sizeof(TMessageHeader), lenOfMsg);
    if (pOutputHeader != NULL) {
        memcpy(pOutputHeader, pheader, sizeof(*pheader));
    }
        
    FreeMsgBufPos(indexOfMsg);

    std::cout << "ReadBufferMessagReleaseIndex -> indexOfMsg:: " << indexOfMsg << " typeOfMsg::" << typeOfMsg << " pheader->len::" <<  pheader->len << std::endl;
    
    return(lenOfMsg);
}


//接收消息 - 不释放索引
int ReadBufferMessageNotReleaseIndex(unsigned char *msg, int lenOfMsgBuf, 
                                long typeOfMsg, 
                                PTMessageHeader pOutputHeader, 
                                int *indexOfMsg, int iSigRetryFlag)
{
    unsigned char         *pAddr;
    int                    lenOfMsg;
    PTMessageHeader    pheader;
    
    if (((*indexOfMsg = GetOccupiedMsgBufPosOfMsgType(typeOfMsg)) < 0)  || (*indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg))
    {
        return -1;
    }
    pAddr = s_pMsgBuf + *indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
    pheader = (PTMessageHeader)pAddr;
    if (((lenOfMsg = pheader->len) < 0) || (lenOfMsg >= lenOfMsgBuf) || (lenOfMsg > s_pMsgBufHDL->maxSizeOfMsg))
    {
        FreeMsgBufPos(*indexOfMsg);
        return -1;
    }
    memcpy(msg, pAddr + sizeof(TMessageHeader), lenOfMsg);
    if (pOutputHeader != NULL) {
        memcpy(pOutputHeader, pheader, sizeof(*pheader));
    }
        
    return lenOfMsg;
}


生产者进程:productionProcess.cpp

#include <stdio.h>
#include <sys/shm.h>
#include <string.h>
#include <iostream>
#include "msgExchange.h"

using namespace std;

int SimpleTestWrite() {
    // 1. 创建共享内存
    int shmid = shmget(1000, 4096, IPC_CREAT|0660);
    if(shmid == -1)
    {
        perror("shmget error");
        return -1;
    }

    // 2. 进程和共享内存关联
    void* ptr = shmat(shmid, NULL, 0);
    if(ptr == (void *) -1)
    {
        perror("shmat error");
        return -1;
    }

    // 3. 写共享内存
    const char* p = "hello, nice to meet you !";
    memcpy(ptr, p, strlen(p)+1);

    getchar();

    // 4. 解除内存和共享内存关联
    shmdt(ptr);

    // 5. 删除共享内存
    shmctl(shmid, IPC_RMID, NULL);
    std::cout << "delete share memory !" << std::endl;
}

int main(int argc, char *argv[])
{
    InitSharedMemoryMsgBuf();

    // 发送消息
    TMessageHeader newHeader;
    std::string sendMsg("Hello, I am producer !");
    WriteBufferMessage((unsigned char *)sendMsg.data(),
                sendMsg.size(), 
                10101010, 
                nullptr, 
                &newHeader);
    // 读消息
    unsigned char recvMsg[8192] = {0};
    ReadBufferMessagReleaseIndex(recvMsg, 8192,
                                newHeader.provider,
                                &newHeader);
    std::cout << "recvMsg:: " << recvMsg << std::endl;

    UnInitSharedMemoryMsgBuf();
    
    return 0;
}

消费者进程:consumptionProcess.cpp

#include <sys/shm.h>
#include <string.h>
#include <stdio.h>
#include <iostream>

#include "msgExchange.h"

using namespace std;

int SimpleTestRead() {
     // 1. 创建共享内存
    int shmid = shmget(1000, 0, 0660);
    if(shmid == -1)
    {
        perror("shmget error");
        return -1;
    }

    // 2. 获取共享内存地址
    void* ptr = shmat(shmid, NULL, 0);
    if(ptr == (void *) -1)
    {
        perror("shmat error");
        return -1;
    }

    // 3. 读共享内存
    std::cout << "read memory:: " << (char*)ptr << std::endl;

    // 4. 解除关联
    shmdt(ptr);

    // 5. 删除
    shmctl(shmid, IPC_RMID, NULL);
    std::cout << "delete share memory !" << std::endl;
}

int main(int argc, char *argv[])
{
    InitSharedMemoryMsgBuf();

     // 读消息
    TMessageHeader recvHeader;
    unsigned char recvMsg[8192] = {0};
    ReadBufferMessagReleaseIndex(recvMsg, 8192,
                                10101010,
                                &recvHeader);
    std::cout << "recvMsg:: " << recvMsg << std::endl;

    // 发送消息
    TMessageHeader newHeader;
    std::string sendMsg("Hello, I am consumer, I have receiving your msg, thank you!");
    WriteBufferMessage((unsigned char *)sendMsg.data(),
                sendMsg.size(), 
                recvHeader.provider, 
                &recvHeader, 
                &newHeader);
                
    UnInitSharedMemoryMsgBuf();

    return 0;
}

编译脚本:Makefile

app: productionProcess consumptionProcess

#说明:$^代表依赖项
productionProcess: productionProcess.cpp msgExchange.cpp
	g++ -g $^ -o productionProcess

consumptionProcess: consumptionProcess.cpp msgExchange.cpp
	g++ -g $^ -o consumptionProcess

clean:
	-rm productionProcess consumptionProcess -f

调试配置:launch.json

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "C/C++ Runner: Debug Session",
      "type": "cppdbg",
      "request": "launch",
      "args": [],
      "stopAtEntry": false,
      "externalConsole": false,
      "cwd": "/home/tiger/ipc",
      "program": "/home/tiger/ipc/productionProcess",
      "MIMode": "gdb",
      "miDebuggerPath": "gdb",
      "setupCommands": [
        {
          "description": "Enable pretty-printing for gdb",
          "text": "-enable-pretty-printing",
          "ignoreFailures": true
        }
      ]
    }
  ]
}

4.2 运行效果

从运行效果看,程序创建了共享内存5308433,空闲消息队列12,任务队列13,生产者进程从空闲队列去取出位置0,按照位置在共享内存写入12字节的信息,然后发送消息到任务队列,然后等待消费者进程答复,消费着进程启动后从任务队列中取出位置,按照位置从共享内存读取数据,并打印,然后从空闲队列取出1号位置,按照位置在共享内存中写入要发送的信息。然后将位置信息放入任务队列,生产者进程从任务队列取出消息位置,按照位置从共享内存中取出消费者进程的消息并打印出来,同时归还位置信息到空闲队列,下次再继续使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/81115.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络复习(五)

考点&#xff1a;UDP 拥塞控制 TCP三次握手四次握手 P247 熟知端口号 P215 TCP报文计算5-36.假定TCP采用一种仅使用线性增大和乘法减小的简单拥塞控制算法&#xff0c;而不使用慢开始。发送窗口不采用字节为计算单位&#xff0c;而是使用分组pkt为计算单位。在一开始发送窗口为…

时序预测 | MATLAB实现VAR和GARCH时间序列预测

时序预测 | MATLAB实现VAR和GARCH时间序列预测 目录 时序预测 | MATLAB实现VAR和GARCH时间序列预测预测效果基本介绍程序设计VARGARCH参考资料预测效果 基本介绍 机器学习可其用于时间序列问题的分类和预测。在探索时间序列的机器学习方法之前,尝试统计时间序列预测方法,它列…

SQL开窗函数之percent_rank、first_value、nth的用法

开窗函数 当我们需要进行一些比较复杂的子查询时&#xff0c;聚合函数就会非常的麻烦&#xff0c;因此可以使用开窗函数进行分组再运用函数查询。窗口函数既可以显示聚集前的数据&#xff0c;也可以显示聚集后的数据&#xff0c;可以在同一行中返回基础行的列值和聚合后的结果…

微信小程序|反编译

一、下载网易模拟器 MuMu模拟器官网_安卓模拟器_网易手游模拟器 根据自己的系统选择对应的软件进行安装。 安装成功后,如下: 二、再模拟器上面安装对应的软件(微信、RE文件管理器) 1. 打开应用中心,搜索 RE文件管理器和微信,分别进行下载 2. 打开微信,输入帐号进行…

Windows 文件比较工具winmerge

今天下载了一个非常强大的文件比较工具推荐给大家。开源免费的&#xff01;&#xff01;&#xff01; 什么是WinMerge&#xff1f; WinMerge是Windows的开源差异和合并工具。WinMerge 可以比较文件夹和文件&#xff0c;以易于理解和处理的可视文本格式呈现差异。 官方下载地…

代码随想录算法训练营第六十天| LeetCode84. 柱状图中最大的矩形

一、LeetCode84. 柱状图中最大的矩形 1&#xff1a;题目描述&#xff08;84. 柱状图中最大的矩形&#xff09; 给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为 1 。 求在该柱状图中&#xff0c;能够勾勒出来的矩形的最大…

学习Typescript(第二弹)

接口 对象类型接口 先用interface定义一个List接口,成员有id时number类型&#xff0c;name是string类型再定义一个Result&#xff0c;成员是List数组定义一个render函数&#xff0c;接收参数是result // 先用interface定义一个List接口 interface List {id:number,name:strin…

安卓APP源码和设计报告——个人通讯录

摘 要 随着移动设备制造技术和移动通信网络的迅猛发展,全球手机用户日益增加,手机成为了很多人日常生活中必不可少的一部分,手机业在日益发展的同时,人们对手机的功能需求和体验需求也越来越高,因此各种智能手机相继而出&#xff0c;当前市场上最流行的智能手机的操作系统非An…

RabbitMQ--重试机制

原文网址&#xff1a;RabbitMQ--重试机制_IT利刃出鞘的博客-CSDN博客 简介 说明 本文介绍RabbitMQ的重试机制。 问题描述 消费者默认是自动提交&#xff0c;如果消费时出现了RuntimException&#xff0c;会导致消息直接重新入队&#xff0c;再次投递&#xff08;进入队首&am…

【iOS】—— MVVM模式

MVVM模式 文章目录MVVM模式为什么使用MVVM&#xff1f;MVVM分别代表什么含义&#xff1f;MVVM通信关系MVVM模式的优缺点优点:缺点:概括总结MVVM文件分类为什么使用MVVM&#xff1f; iOS中&#xff0c;我们使用的大部分都是MVC架构。虽然MVC的层次明确&#xff0c;但是由于功能日…

C# 11新特性之file关键字

C#11 添加了文件作用域类型功能&#xff1a;一个新的 file 修饰符&#xff0c;可以应用于任何类型定义以限制其只能在当前文件中使用。这样&#xff0c;我们可以在一个项目中拥有多个同名的类。 目录示例file不可以与其他修饰符一起使用file可以修饰的类型file 不可修饰的类型f…

主流报表开发工具FastReport.Net全新发布,邀您体验最新版试用

FastReport .Net是一款适用于 WinForms、Blazor Server、ASP.NET、MVC、.NET 6 和 .NET Core 的报告生成工具。FastReport代表着“速度”、“可靠”和“品质”&#xff0c;是当今主流的报表开发工具。 该产品在本月进行了重大版本v2023的发布&#xff0c;接下来让我们一起看看…

【OpenCV-Python】教程:4-9 特征匹配 match

OpenCV Python 特征匹配 【目标】 特征匹配Brute-Force Matcher 和 FLANN Matcher 【理论】 Brute-Force Matcher字面意思是蛮力匹配器&#xff0c;所以它的过程也很简单&#xff0c;从一个集合里取出一个特征描述子&#xff0c;然后与第二个集合里的特征逐个的进行匹配比较…

传统MES架构的智能化改进---python在Aprol上的实践

一、开题依据 MES是属于生产车间级的管理信息系统。作为生产与计划之间的信息“集线器”&#xff0c;MES 主要包括以下功能模块&#xff1a;工序详细调度、资源分配和状态管理、生产单元分配、过程管理、人力资源管理、维护管理、质量管理、文档控制、产品跟踪和产品清单管理、…

Solidworks导出为URDF用于MoveIT总结(带prismatic)

环境 Solidwoks2018 SP0 / Solidwoks2021 SP5&#xff1b;Ubuntu20.04&#xff1b;ROS1 Noetic; Solidwoks2018 SP0对于平移副有问题&#xff0c;显示不出来&#xff0c;Solidwoks2021 SP5没有问题。 官网有段话&#xff1a; There is a known STL export bug with SolidWork…

Jdk Tomcat 安装教程 — 2022.12.11

文章目录一、安装jdk教程二、tomcat 安装三、修改Tomcat端口号安装Tomcat之前要确保安装jdk一、安装jdk教程 安装vim命令包&#xff0c;此操作如果执行不了&#xff0c;需要使用root权限执行 执行如下命令&#xff1a; yum install -y vim-enhanced2. 下载jdk安装包&#xff…

3D打印切片软件Cura入门

安装好之后&#xff0c;添加一台打印机&#xff0c;参数可以随便设置。 Cura安装包&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1T1MBcZYBCVfhtFKDBjypmQ?pwd2022 提取码&#xff1a;2022 基本操作 按住鼠标右键不放&#xff1a;旋转 按住滚轮不放&#xff1…

制作USB启动盘(U盘安装ubuntu20.04)

文章目录制作USB启动盘&#xff08;U盘安装ubuntu20.04&#xff09;制作USB启动盘的工具ubuntu20.04系统安装u盘制作进入bios制作USB启动盘&#xff08;U盘安装ubuntu20.04&#xff09; 制作USB启动盘的工具 制作USB启动盘的工具有rufus&#xff0c;UNetbootin&#xff0c;Un…

汇编语言—第1章 各类存储芯片及内存空间

1、各类存储器芯片 一台PC机中&#xff0c;装有多个存储类芯片&#xff0c;这些存储器芯片从物理连接上来看是独立的、不同的器件。 &#xff08;1&#xff09;随机存储器 用于存放供CPU使用的绝大部分程序和数据 &#xff08;2&#xff09;装有BIOS&#xff08;Basic Input/Ou…

AI遮天传 DL-反馈神经网络RNN

本文会先介绍动态系统的概念&#xff0c;然后介绍两种简单的反馈神经网络&#xff0c;然后再介绍两种门控神经网络(LSTM, GRU)&#xff0c;最后是关于反馈神经网络的应用(本次以语音识别为例)。 RNN: Recurrent neural network&#xff0c;一般叫它“反馈神经网络”或者“循环神…