Posix在线文档:
The Single UNIX Specification, Version 2 (opengroup.org)
Linux系统中提供了两种不同接口的消息队列:
- POSIX消息队列。POSIX为可移植的操作系统接口。
- System V消息队列。System V 是 AT&T 的第一个商业UNIX版本(UNIX System III)的加强。
其中,POSIX消息队列可移植性较强,使用较广。
Linux系统中提供的消息队列一般应用于进行间通信,但也可以用于线程间通信。
本文介绍POSIX消息队列应用于线程间通信。
相关API
Linux内核提供了一系列函数来使用消息队列:
/** * @brief 创建消息队列实例 * * Detailed function description * * @param[in] name: 消息队列名称 * @param[in] oflag:根据传入标识来创建或者打开一个已创建的消息队列 - O_CREAT: 创建一个消息队列 - O_EXCL: 检查消息队列是否存在,一般与O_CREAT一起使用 - O_CREAT|O_EXCL: 消息队列不存在则创建,已存在返回NULL - O_NONBLOCK: 非阻塞模式打开,消息队列不存在返回NULL - O_RDONLY: 只读模式打开 - O_WRONLY: 只写模式打开 - O_RDWR: 读写模式打开 * @param[in] mode:访问权限 * @param[in] attr:消息队列属性地址 * * @return 成功返回消息队列描述符,失败返回-1,错误码存于error中 */ mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr); /** * @brief 无限阻塞方式接收消息 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:消息体缓冲区地址 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值 * @param[in] msg_prio:消息优先级 * * @return 成功返回消息长度,失败返回-1,错误码存于error中 */ mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); /** * @brief 指定超时时间阻塞方式接收消息 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:消息体缓冲区地址 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值 * @param[in] msg_prio:消息优先级 * @param[in] abs_timeout:超时时间 * * @return 成功返回消息长度,失败返回-1,错误码存于error中 */ mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout); /** * @brief 无限阻塞方式发送消息 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:待发送消息体缓冲区地址 * @param[in] msg_len:消息体长度 * @param[in] msg_prio:消息优先级 * * @return 成功返回0,失败返回-1 */ mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); /** * @brief 指定超时时间阻塞方式发送消息 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:待发送消息体缓冲区地址 * @param[in] msg_len:消息体长度 * @param[in] msg_prio:消息优先级 * @param[in] abs_timeout:超时时间 * * @return 成功返回0,失败返回-1 */ mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout); /** * @brief 关闭消息队列 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * * @return 成功返回0,失败返回-1 */ mqd_t mq_close(mqd_t mqdes); /** * @brief 分离消息队列 * * Detailed function description * * @param[in] name: 消息队列名称 * * @return 成功返回0,失败返回-1 */ mqd_t mq_unlink(const char *name);
消息队列的名称类似于设备文件,mq_open通过名称打开对应的消息队列,然后返回一个消息队列描述符,之后的操作都是引用返回的描述符,类似于打开文件时返回的文件描述符。
其实和文件的打开、读写、关闭都非常类似。
POSIX消息队列允许进程以消息的形式交换数据。 消息队列是使用mq_open创建和打开的;此函数返回消息队列描述符(mqd_t),该描述符用于在以后的调用中引用打开的消息队列。每个消息队列由/somename形式的名称标识,该名称由一个初始斜杠组成,后跟一个或多个字符(都不是斜杠)。通过将相同的名称传递给mq_open,两个进程可以在同一队列上进行操作。 使用mq_send和mq_receive在队列之间来回传送消息。当进程使用完队列后,可以使用mq_close关闭该队列,而当不再需要该队列时,可以使用mq_unlink将其删除。可以使用mq_getattr和mq_setattr检索和修改队列属性(在某些情况下)。进程可以使用mq_notify请求异步通知消息到达先前为空的队列。
消息队列可以认为是一个链表。进程(线程)可以往里写消息,也可以从里面取出消息。一个进程可以往某个消息队列里写消息,然后终止,另一个进程随时可以从消息队列里取走这些消息。这里也说明了,消息队列具有随内核的持续性,也就是系统不重启,消息队列永久存在。
几个主要函数可进一步参考:
mq_open
MQ_OPEN - Linux手册页-之路教程 (onitroad.com)
mq_send
MQ_SEND - Linux手册页-之路教程 (onitroad.com)
mq_receive
MQ_RECEIVE - Linux手册页-之路教程 (onitroad.com)
mq_close
MQ_CLOSE - Linux手册页-之路教程 (onitroad.com)
相关头文件如下:
#include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h>
注意事项
参考:
POSIX 消息队列函数(mq_open、mq_getattr、mq_send、mq_receive)示例_mqreceivetest-CSDN博客
关于消息队列mq_open的属性参数
指向attr的struct mq_attr字段指定队列允许的最大消息数和最大消息大小。此结构定义如下:
struct mq_attr {
long mq_flags; /* Flags (ignored for mq_open()) */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue(ignored for mq_open()) */
};
mq_maxmsg指的是容纳的消息数也就是消息队列的缓冲大小,msgsize指的是每个消息的大小;mq_maxmsg貌似并没有那么重要,因为消息队列是以流的形式来发送和接收的。两个参数的乘积,就是消息队列的总容量。
调用mq_open()时仅使用mq_maxmsg和mq_msgsize字段;其余字段中的值将被忽略。
如果attr为NULL,则使用默认属性创建队列。
关于消息队列名称
资料中说消息队列名称要写斜杠/开头,但是我看有的程序里并没有,而是一个单纯的字符串,为啥?
经过测试,在Linux 2.6.18中,所创建的POSIX消息队列不会在文件系统中创建真正的路径名。且POSIX的名字只能以一个’/’开头,名字中不能包含其他的’/’。
比如,不符合的情况:
报错对应的错误如下:
另外需要注意以下几点:
1、
mq_receive() 的第三个参数表示读取消息的长度,不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的 mq_attr 结构中 mq_msgsize 的大小。也就是最少要读一个消息。
2、
消息的优先级:它是一个小于 MQ_PRIO_MAX 的数,数值越大,优先级越高。 POSIX 消息队列在调用 mq_receive 时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在 mq_send 是置 msg_prio 为 0, mq_receive 的 msg_prio 置为 NULL。因为mq_send的优先级是以变量的形式传递的,而mq_receive是以指针的形式传递的。
3、
默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK,如:
struct mq_attr new_attr;
mq_getattr(mqID, &new_attr);//获取当前属性
new_attr.mq_flags = O_NONBLOCK;//设置为非阻塞
mq_setattr(mqID, &new_attr, NULL)//设置属性
4、
mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。
mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。mq_maxmsg和mq_msgsize属性只能在创建消息队列时通过mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。
5、
mq_send和mq_recevie的第二个参数,也就是数据指针,是char *类型的,需要时做个强转。
6、
使用POSIX消息队列API的程序必须使用cc -lrt进行编译,以链接到实时库librt。
gcc pthread_5.c -o obj -lpthread -lrt
Linux编译命令-pthread & -lpthread - 邗影 - 博客园 (cnblogs.com)
linux的各种自带库-lz -lrt -lm -lc都是什么库_lm是什么库-CSDN博客
7、
消息队列无需init初始化,只用在第一次mq_open打开时创建即可。
8、
Posix消息队列就是先进先出,但是可以通过优先级来让紧急数据插队。
使用示例
常规示例代码如下:
#include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <semaphore.h> #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> #include <errno.h> #define MQ_NAME "/thisIsOneMqName" static void *thread_fun1(void *param) { mqd_t mqfd; struct mq_attr attr; int count; attr.mq_maxmsg = 1; attr.mq_msgsize = sizeof(int); mqfd = mq_open(MQ_NAME, O_WRONLY | O_CREAT, 0666, &attr); if(mqfd < 0) { printf("mq_open errno is %d\n", errno); exit(-1); } while(1) { count += 10; mq_send(mqfd, (char *)&count, sizeof(int), 0); } return NULL; } static void *thread_fun2(void *param) { mqd_t mqfd; int count; mqfd = mq_open(MQ_NAME, O_RDONLY); if(mqfd < 0) { printf("mq_open errno is %d\n", errno); exit(-1); } while(1) { mq_receive(mqfd, (char *)&count, sizeof(int), NULL); printf("count is %d\n", count); if(count == 100) { exit(0); } } return NULL; } int main(int argc, char *argv[]) { pthread_t tid1, tid2; int ret; ret = pthread_create(&tid1, NULL, thread_fun1, NULL); if (ret < 0) { printf("ERROR; return code from pthread_create() is %d\n", ret); exit(-1); } ret = pthread_create(&tid2, NULL, thread_fun2, NULL); if (ret < 0) { printf("ERROR; return code from pthread_create() is %d\n", ret); exit(-1); } pthread_exit(NULL); }
更多待补充