Linux系统中的POSIX消息队列编程
文章目录
- Linux系统中的POSIX消息队列编程
- 1、POSIX 消息队列
- 2、Linux 中的 POSIX 消息队列命名
- 3、POSIX 消息队列调用
- 3.1 mq_open, mq_close
- 3.2 mq_timed_send、mq_send、mq_timed_receive、mq_receive
- 3.3 mq_notify
- 3.4 mq_unlink
- 3.5 mq_getattr, mq_setattr
- 4、进程间通过消息队列通信
- 4.1 服务器端代码实现
- 4.2 客户端代码实现
- 4.3 程序运行结果
本文将详细介绍如何在Linux中进行POSIX队列编程。
1、POSIX 消息队列
POSIX.1b 标准 (IEEE Std 1003.1b-1993) 中引入了 POSIX 进程间通信 (IPC),用于实时扩展。 自版本 2.6.6(2004 年 5 月)以来,POSIX 消息队列已在 Linux 中可用。 POSIX IPC 调用符合标准,但在较旧的类 Unix 系统上可能不可用。 与System V IPC调用相比,POSIX IPC调用具有更清晰的界面并且更易于使用。
2、Linux 中的 POSIX 消息队列命名
System V 消息队列使用通过 ftok 函数调用获得的键来标识。 POSIX 消息队列使用名称字符串进行标识。 在 Linux 上,POSIX 队列被命名为以正斜杠 (/) 开头、后跟一个或多个字符(其中没有一个是斜杠)并以空字符结尾的字符串。 任何知道队列名称并具有适当权限的进程都可以从队列发送或接收消息,并对其执行其他操作。
3、POSIX 消息队列调用
在 Linux 上使用 POSIX 消息队列的程序必须使用编译器选项 -lrt 与实时库 librt 链接。 函数调用名称以前缀 mq_
开头。
3.1 mq_open, mq_close
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
mqd_t mq_open (const char *name, int oflag);
mqd_t mq_open (const char *name, int oflag, mode_t mode,
struct mq_attr *attr);
mq_open
函数用于打开 POSIX 队列。 第一个参数指定队列名称,如上面前面的段落中所述。 第二个参数是一个标志,可以是 O_RDONLY
(用于接收消息)、O_WRONLY
(用于发送消息)以及 O_RDWR
(用于队列上的发送和接收操作)。 更多值可以与该标志进行“或”运算。 您可以指定 O_NONBLOCK
以非阻塞模式使用队列。 默认情况下,如果队列已满,mq_send 将阻塞,如果队列中没有消息,mq_receive 将阻塞。 但如果在 oflag 中指定了 O_NONBLOCK,则在这些情况下调用将立即返回,并将 errno 设置为 EAGAIN。
如果指定 O_CREAT
作为 oflag 的一部分,则创建队列(如果队列尚不存在)。 如果您与 O_CREAT 一起指定 O_EXCL,并且队列存在,则 mq_open 将失败,并将 errno 设置为 EEXIST。 如果在 oflag 中指定了 O_CREAT,则 mq_open 的第二种形式必须与两个附加参数一起使用。 在这种情况下,模式指定队列的权限,并且指向 struct mq_attr 的指针给出消息队列的属性。 如果该指针为NULL,则创建具有默认属性的队列。
struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
};
mq_open 中指针作为 attr 传递的结构体中 mq_maxmsg 的值应小于或等于 /proc 接口文件 /proc/sys/fs/mqueue/msg_max 中的值(文件中的默认值为 10)。 同样,mq_msgsize 的值应小于文件 /proc/sys/fs/mqueue/msgsize_max 中的值,文件中的默认值为 8192 字节。 对于特权进程,这些限制将被忽略。
如果 mq_open 调用成功,则返回消息队列描述符。 消息队列描述符可以在队列的后续调用中使用。
mq_close
调用如下:
#include <mqueue.h>
int mq_close (mqd_t mqdes);
mq_close
调用关闭消息队列描述符 mqdes。
3.2 mq_timed_send、mq_send、mq_timed_receive、mq_receive
#include <mqueue.h>
int mq_send (mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio);
mq_send
用于将消息发送到描述符 mqdes 引用的队列。 msg_ptr 指向消息缓冲区。 msg_len 是消息的大小,它应该小于或等于队列的消息大小。 msg_prio是消息优先级,它是一个非负数,指定消息的优先级。 消息按照消息优先级降序放置在队列中,较旧的消息优先于较新的消息。 如果队列已满,mq_send 将阻塞,直到队列上有空间,除非为消息队列启用了 O_NONBLOCK 标志,在这种情况下,mq_send 立即返回,并将 errno 设置为 EAGAIN。
#include <time.h>
#include <mqueue.h>
int mq_timedsend (mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio, const struct timespec *abs_timeout);
mq_timedsend 的工作方式与 mq_send 类似,不同之处在于,如果队列已满并且未指定 O_NONBLOCK 标志,则在 abs_timeout 指向的时间发生超时,并且 mq_timedsend 返回。 值得注意的是,时间参数是自纪元 1970 年 1 月 1 日 00:00:00 +0000 UTC 以来的绝对时间(以秒和纳秒为单位)。 此外,如果队列已满并且指定的时间已过,mq_timedsend 会立即返回。 指定超时的结构如下,
struct timespec {
time_t tv_sec; /* seconds */
long tv_nsec; /* nanoseconds */
};
接下来,我们调用 mq_receive
和 mq_timedreceive
来接收消息。
#include <mqueue.h>
ssize_t mq_receive (mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio);
mq_receive
从描述符 mqdes 引用的队列接收消息。 最高优先级中最旧的将从队列中删除,并传递给 msg_ptr 指向的缓冲区中的进程。 msg_len 是缓冲区的长度(以字节为单位),它必须大于队列的最大消息大小(即 mq_msgsize 属性)。 如果指针msg_prio不为空,则将接收到的消息的优先级存储在其指向的整数中。 mq_receive 的默认行为是如果队列中没有消息则阻塞。 但是,如果为队列启用了 O_NONBLOCK 标志,并且队列为空,则 mq_receive 将立即返回,并将 errno 设置为 EAGAIN。 成功时,mq_receive 返回 msg_ptr 指向的缓冲区中接收到的字节数。
#include <time.h>
#include <mqueue.h>
ssize_t mq_timedreceive (mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio,
const struct timespec *abs_timeout);
mq_timedreceive
与 mq_receive 相同,只是它有一个指示超时的附加参数。 如果队列的 O_NONBLOCK 标志未启用且队列为空,则 mq_timedreceive 将在出现 abs_timeout 指向的时间时返回。 如上所述,abs_timeout 指向的时间是自纪元 1970 年 1 月 1 日 00:00:00 +0000 UTC 以来以秒数和纳秒数指定的绝对时间。
3.3 mq_notify
#include <mqueue.h>
int mq_notify (mqd_t mqdes, const struct sigevent *sevp);
mq_notify
用于在 mqdes 引用的空队列上注册或取消注册消息到达的异步通知。
3.4 mq_unlink
#include <mqueue.h>
int mq_unlink(const char *queue_name);
mq_unlink
删除名为queue_name 的队列。
3.5 mq_getattr, mq_setattr
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr,
struct mq_attr *oldattr);
mq_getattr
函数获取描述符 mqdes 的消息队列的属性结构 struct mq_attr。 类似地,函数 mq_setattr 用于设置队列的属性。 但是,可以使用 mq_setattr 修改的唯一属性是 mq_flags 中的 O_NONBLOCK 标志。 newattr 指向的结构中的其他字段将被忽略。 如果oldattr不为空,则队列属性的先前值将返回到它指向的结构中。
4、进程间通过消息队列通信
下面的示例演示了在 Linux 中使用 POSIX 消息队列在服务器和客户端之间进行进程间通信。 服务器管理令牌号,这些令牌号可以是航班的座位号或类似的东西。 服务器的工作是根据请求向客户端提供令牌号。 在典型场景中,可能有多个客户端向服务器请求令牌编号。 客户端知道服务器的消息队列名称。 每个客户端都有自己的消息队列,服务器在其中发布响应。 当客户端发送请求时,它会发送其消息队列名称。 服务器打开客户端的消息队列并发送其响应。 客户端从其消息队列中获取响应并读取其中的令牌号。 流程架构如下所示。
4.1 服务器端代码实现
/*
* server.c: Server program
* to demonstrate interprocess commnuication
* with POSIX message queues
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#define SERVER_QUEUE_NAME "/sp-example-server"
#define QUEUE_PERMISSIONS 0660
#define MAX_MESSAGES 10
#define MAX_MSG_SIZE 256
#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10
int main (int argc, char **argv)
{
mqd_t qd_server, qd_client; // queue descriptors
long token_number = 1; // next token to be given to client
printf ("Server: Hello, World!\n");
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = MAX_MESSAGES;
attr.mq_msgsize = MAX_MSG_SIZE;
attr.mq_curmsgs = 0;
if ((qd_server = mq_open (SERVER_QUEUE_NAME, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) {
perror ("Server: mq_open (server)");
exit (1);
}
char in_buffer [MSG_BUFFER_SIZE];
char out_buffer [MSG_BUFFER_SIZE];
while (1) {
// get the oldest message with highest priority
if (mq_receive (qd_server, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) {
perror ("Server: mq_receive");
exit (1);
}
printf ("Server: message received.\n");
// send reply message to client
if ((qd_client = mq_open (in_buffer, O_WRONLY)) == 1) {
perror ("Server: Not able to open client queue");
continue;
}
sprintf (out_buffer, "%ld", token_number);
if (mq_send (qd_client, out_buffer, strlen (out_buffer) + 1, 0) == -1) {
perror ("Server: Not able to send message to client");
continue;
}
printf ("Server: response sent to client.\n");
token_number++;
}
}
4.2 客户端代码实现
/*
* client.c: Client program
* to demonstrate interprocess communication
* with POSIX message queues
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#define SERVER_QUEUE_NAME "/sp-example-server"
#define QUEUE_PERMISSIONS 0660
#define MAX_MESSAGES 10
#define MAX_MSG_SIZE 256
#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10
int main (int argc, char **argv)
{
char client_queue_name [64];
mqd_t qd_server, qd_client; // queue descriptors
// create the client queue for receiving messages from server
sprintf (client_queue_name, "/sp-example-client-%d", getpid ());
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = MAX_MESSAGES;
attr.mq_msgsize = MAX_MSG_SIZE;
attr.mq_curmsgs = 0;
if ((qd_client = mq_open (client_queue_name, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) {
perror ("Client: mq_open (client)");
exit (1);
}
if ((qd_server = mq_open (SERVER_QUEUE_NAME, O_WRONLY)) == -1) {
perror ("Client: mq_open (server)");
exit (1);
}
char in_buffer [MSG_BUFFER_SIZE];
printf ("Ask for a token (Press <ENTER>): ");
char temp_buf [10];
while (fgets (temp_buf, 2, stdin)) {
// send message to server
if (mq_send (qd_server, client_queue_name, strlen (client_queue_name) + 1, 0) == -1) {
perror ("Client: Not able to send message to server");
continue;
}
// receive response from server
if (mq_receive (qd_client, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) {
perror ("Client: mq_receive");
exit (1);
}
// display token received from server
printf ("Client: Token received from server: %s\n\n", in_buffer);
printf ("Ask for a token (Press ): ");
}
if (mq_close (qd_client) == -1) {
perror ("Client: mq_close");
exit (1);
}
if (mq_unlink (client_queue_name) == -1) {
perror ("Client: mq_unlink");
exit (1);
}
printf ("Client: bye\n");
exit (0);
}
4.3 程序运行结果
服务器和客户端程序需要使用-lrt选项进行编译。 首先,运行服务器。 然后可以运行一个或多个客户端进行测试。 对于每个队列,都会在 /dev/mqueue
目录(在 Linux 中)中创建一个文件。
服务器端:
$ # server
$ gcc server.c -o server -lrt
$ gcc client.c -o client -lrt
$ ./server
Server: Hello, World!
Server: message received.
Server: response sent to client.
Server: message received.
Server: response sent to client.
Server: message received.
Server: response sent to client.
Server: message received.
Server: response sent to client.
...
...
客户端:
$ ./client
Ask for a token (Press ):
Client: Token received from server: 1
Ask for a token (Press ):
Client: Token received from server: 2
Ask for a token (Press ):
Client: Token received from server: 4