目录
相关API
1.相关数据类型
mqd_t
struct mq_attr
struct timespec
2.相关系统调用接口
mq_open()
mq_timedsend() && mq_send()
mq_timedreceive() && mq_receive()
mq_unlink()
clock_gettime()
父子进程使用消息队列通讯
平行进程使用消息队列通讯(生产者-消费者)
生产者(producer.c)
消费者(customer.c)
相关API
1.相关数据类型
mqd_t
该数据类型定义在mqueue.h中,是用来记录消息队列描述符的。
typedef int mqd_t;
实质上是int类型的别名
struct mq_attr
/**
* @brief 消息队列的属性信息
* mq_flags 标记,对于mq_open,忽略它,因为这个标记是通过前者的调用传递的
* mq_maxmgs 队列可以容纳的消息的最大数量
* mq_msgsize 单条消息的最大允许大小,以字节为单位
* mq_curmsgs 当前队列中的消息数量,对于mq_open,忽略它
*/
struct mq_attr {
long mq_flags; /* Flags (ignored for mq_open()) */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue
(ignored for mq_open()) */
};
struct timespec
/**
* @brief 时间结构体,提供了纳秒级的UNIX时间戳
* tv_sec 秒
* tv_nsec 纳秒
*/
struct timespec {
time_t tv_sec; /* seconds */
long tv_nsec; /* nanoseconds */
};
2.相关系统调用接口
mq_open()
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
/**
* @brief 创建或打开一个已存在的POSIX消息队列,消息队列是通过名称唯一标识的。
*
* @param name 消息队列的名称
* 命名规则:必须是以正斜杠/开头,以\0结尾的字符串,中间可以包含若干字符,但不能有正斜杠
* @param oflag 指定消息队列的控制权限,必须也只能包含以下三者之一
* O_RDONLY 打开的消息队列只用于接收消息
* O_WRONLY 打开的消息队列只用于发送消息
* O_RDWR 打开的消息队列可以用于收发消息
* 可以与以下选项中的0至多个或操作之后作为oflag
* O_CLOEXEC 设置close-on-exec标记,这个标记表示执行exec时关闭文件描述符
* O_CREAT 当文件描述符不存在时创建它,如果指定了这一标记,需要额外提供mode和attr参数
* O_EXCL 创建一个当前进程独占的消息队列,要同时指定O_CREAT,要求创建的消息队列不存在,否则将会失败,并提示错误EEXIST
* O_NONBLOCK 以非阻塞模式打开消息队列,如果设置了这个选项,在默认情况下收发消息发生阻塞时,会转而失败,并提示错误EAGAIN
* @param mode 每个消息队列在mqueue文件系统对应一个文件,mode是用来指定消息队列对应文件的权限的
* @param attr 属性信息,如果为NULL,则队列以默认属性创建
* @return mqd_t 成功则返回消息队列描述符,失败则返回(mqd_t)-1,同时设置errno以指明错误原因
*/
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
/**
* @brief 当oflag没有包含O_CREAT时方可调用
*
* @param name 同上
* @param oflag 同上
* @return mqd_t 同上
*/
mqd_t mq_open(const char *name, int oflag);
mq_timedsend() && mq_send()
对于 mq_send()这个方式就是发送没有超时机制,可以一直处于阻塞状态。
#include <time.h>
#include <mqueue.h>
/**
* @brief 将msg_ptr指向的消息追加到消息队列描述符mqdes指向的消息队列的尾部。如果消息队列已满,默认情况下,调用阻塞直至有充足的空间允许新的消息入队,或者达到abs_timeout指定的等待时间节点,或者调用被信号处理函数打断。需要注意的是,正如上文提到的,如果在mq_open时指定了O_NONBLOCK标记,则转而失败,并返回错误EAGAIN。
*
* @param mqdes 消息队列描述符
* @param msg_ptr 指向消息的指针
* @param msg_len msg_ptr指向的消息长度,不能超过队列的mq_msgsize属性指定的队列最大容量,长度为0的消息是被允许的
* @param msg_prio 一个非负整数,指定了消息的优先级,消息队列中的数据是按照优先级降序排列的,如果新旧消息的优先级相同,则新的消息排在后面。
* @param abs_timeout 指向struct timespec类型的对象,指定了阻塞等待的最晚时间。如果消息队列已满,且abs_timeout指定的时间节点已过期,则调用立即返回。
* @return int 成功返回0,失败返回-1,同时设置errno以指明错误原因。
*/
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);
mq_timedreceive() && mq_receive()
mq_receive()同样的也是没有超时机制,当没有数据来的时候就处于阻塞状态。
#include <time.h>
#include <mqueue.h>
/**
* @brief 从消息队列中取走最早入队且权限最高的消息,将其放入msg_ptr指向的缓存中。如果消息队列为空,默认情况下调用阻塞,此时的行为与mq_timedsend同理。
*
* @param mqdes 消息队列描述符
* @param msg_ptr 接收消息的缓存
* @param msg_len msg_ptr指向的缓存区的大小,必须大于等于mq_msgsize属性指定的队列单条消息最大字节数
* @param msg_prio 如果不为NULL,则用于接收接收到的消息的优先级
* @param abs_timeout 阻塞时等待的最晚时间节点,同mq_timedsend
* @return ssize_t 成功则返回接收到的消息的字节数,失败返回-1,并设置errno指明错误原因
*/
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
mq_unlink()
#include <mqueue.h>
/**
* @brief 清除name对应的消息队列,mqueue文件系统中的对应文件被立即清除。消息队列本身的清除必须等待所有指向该消息队列的描述符全部关闭之后才会发生。
*
* @param name 消息队列名称
* @return int 成功返回0,失败返回-1,并设置errno指明错误原因
*/
int mq_unlink(const char *name);
clock_gettime()
#include <time.h>
/**
* @brief 获取以struct timespec形式表示的clockid指定的时钟
*
* @param clockid 特定时钟的标识符,常用的是CLOCK_REALTIME,表示当前真实时间的时钟
* @param tp 用于接收时间信息的缓存
* @return int 成功返回0,失败返回-1,同时设置errno以指明错误原因
*/
int clock_gettime(clockid_t clockid, struct timespec *tp);
父子进程使用消息队列通讯
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <stdio.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char const *argv[])
{
struct mq_attr attr;
// 有用的参数,表示消息队列的长度
attr.mq_maxmsg = 10;
attr.mq_msgsize = 100;
// 下面参数没什么作用,填0就行了
attr.mq_flags = 0;
attr.mq_curmsgs = 0;
// 创建消息队列
char* queue_name = "/fitz";
mqd_t mqdes = mq_open(queue_name, O_RDWR | O_CREAT, 0664, &attr); // 队列文件的引用符
if (mqdes == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
pid_t pid = fork();
if (pid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
// 子进程---接收消息
else if (pid == 0) {
char recv_buf[100];
struct timespec time_info;
for (int i = 0;i < 5;i++) {
// 情况接收缓冲器
memset(recv_buf, 0, sizeof(recv_buf));
clock_gettime(0, &time_info);
time_info.tv_sec += 15;
// 把接收的消息打印出来
if(mq_timedreceive(mqdes, recv_buf, sizeof(recv_buf), 0,&time_info) == -1){
perror("mq_timedreceive");
}
printf("子进程接受的数据是:%s\n", recv_buf);
}
}
// 父进程---发送消息
else {
char send_buf[100];
struct timespec time_info; // 超时时间
for (int i = 0;i < 5;i++) {
memset(send_buf, 0, sizeof(send_buf));
sprintf(send_buf, "父进程第%d发\n", i + 1);
// 获取当前时间
clock_gettime(0, &time_info);
time_info.tv_sec += 5;
if (mq_timedsend(mqdes, send_buf, strlen(send_buf), 0, &time_info) == -1) {
perror("mq_timedsend");
}
printf("父进程发送消息成功\n");
sleep(1);
}
}
// 不管是父进程还是子进程,都需要释放消息队列的引用
close(mqdes);
// 清除消息队列只需要清除一部分就行了
if (pid > 0) {
mq_unlink(queue_name);
}
printf("父子进程通讯完毕\n");
return 0;
}
平行进程使用消息队列通讯(生产者-消费者)
生产者(producer.c)
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <stdio.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char const *argv[])
{
struct mq_attr attr;
// 有用的参数,表示消息队列的长度
attr.mq_maxmsg = 10;
attr.mq_msgsize = 100;
// 下面参数没什么作用,填0就行了
attr.mq_flags = 0;
attr.mq_curmsgs = 0;
// 创建消息队列
char* queue_name = "/prodecer_consumer";
mqd_t mqdes = mq_open(queue_name, O_RDWR | O_CREAT, 0664, &attr); // 队列文件的引用符
if (mqdes == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
// 发送端接受控制台数据,发送给对方
char buf[100];
struct timespec time_info;
while (1)
{
memset(buf, 0, sizeof(buf));
ssize_t count = read(STDIN_FILENO, buf, sizeof(buf));
clock_gettime(0, &time_info);
time_info.tv_sec += 5;
if (count == -1) {
perror("read");
continue;
}
// ctrl+d 退出控制台的情况,发送EOF告诉对方结束了
else if (count == 0) {
printf("EOF ,exit \n");
char eof = EOF;
if (mq_timedsend(mqdes, &eof, 1, 0, &time_info) == -1) {
perror("mq_timedsend");
}
break;
}
// 正常发送
else {
if (mq_timedsend(mqdes, buf, strlen(buf), 0, &time_info) == -1) {
perror("mq_timedsend");
}
printf("发送成功\n");
}
}
printf("发送端关闭\n");
close(mqdes); // 关闭描述符
return 0;
}
消费者(customer.c)
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <stdio.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char const *argv[])
{
struct mq_attr attr;
// 有用的参数,表示消息队列的长度
attr.mq_maxmsg = 10;
attr.mq_msgsize = 100;
// 下面参数没什么作用,填0就行了
attr.mq_flags = 0;
attr.mq_curmsgs = 0;
// 创建消息队列
char* queue_name = "/prodecer_consumer";
mqd_t mqdes = mq_open(queue_name, O_RDWR | O_CREAT, 0664, &attr); // 队列文件的引用符
if (mqdes == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
// 发送端接受控制台数据,发送给对方
char buf[100];
struct timespec time_info;
while (1)
{
memset(buf, 0, sizeof(buf));
clock_gettime(0, &time_info);
time_info.tv_sec += 15;
if (mq_timedreceive(mqdes, buf, sizeof(buf), NULL, &time_info) == -1) {
perror("mq_timedreceived");
}
// 判断数据是否EOF结束
if (buf[0] == EOF) {
printf("生产者发送了结束信息,结束通讯\n");
break;
}
else {
printf("接收的消息:%s\n", buf);
}
}
printf("接收端关闭\n");
close(mqdes); // 关闭描述符
// 这里选择消费者来结束队列,这样是比较合理的,因为最后一个信息EOF是被消费者接受到的,在此之前队列必须存在
mq_unlink(queue_name); // 清除消息队列
return 0;
}