队列
队列中每个消息块都有一个头部,指向下一个消息块。
消息块的内存是连在一起的,但是是用链表组织的。
struct rt_messagequeue
{
struct rt_ipc_object parent; /**< inherit from ipc_object */
void *msg_pool; /**< start address of message queue */
rt_uint16_t msg_size; /**< message size of each message */
rt_uint16_t max_msgs; /**< max number of messages */
rt_uint16_t entry; /**< index of messages in the queue */
void *msg_queue_head; /**< list head */
void *msg_queue_tail; /**< list tail */
void *msg_queue_free; /**< pointer indicated the free node of queue */
rt_list_t suspend_sender_thread; /**< sender thread suspended on this message queue */
};
共有三个指针,free指向空闲消息链表头部,head指向有数据的第一个消息块,tail指向有消息的最后一个消息块。
创建一个队列,用于线程A、B之间的通信。
队列最多含有5个消息,刚创建时这5个消息都是空的,都放在空闲链表里。
线程A向队列写入一个本地变量x:从队列的空闲链表中取出一个消息块,把x的值拷贝进去,这时队列中只有一个消息,所以队列的头部、尾部都指向这个消息。
线程A修改本地变量x为20,并把它写入队列:
从队列的空闲消息链表中取出一个消息,把x的值拷贝进去,放到队列的尾部。
头指针指向最先写入的消息块,尾写入最后写入的消息块。
线程B读队列,得到的数据放到本地变量y中,这个数据来自队列头部,链头指针指向下一个。
线程B读出消息后:原来的消息块变为空闲消息块,被放入队列的空闲链表msg_queue_free;
rt_mq_t rt_mq_create(const char *name,
rt_size_t msg_size,
rt_size_t max_msgs,
rt_uint8_t flag)
{
struct rt_messagequeue *mq;
struct rt_mq_message *head;
register rt_base_t temp;
RT_DEBUG_NOT_IN_INTERRUPT;
/* allocate object */
mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name);//分配一个消息队列结构体
if (mq == RT_NULL)
return mq;
/* set parent */
mq->parent.parent.flag = flag;
/* initialize ipc object */
rt_ipc_object_init(&(mq->parent));
/* initialize message queue */
/* get correct message size */
mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); //每个消息的size需要对齐
mq->max_msgs = max_msgs;
/* allocate message pool */
mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs); //(每个消息的大小+每个消息结构体的大小)*最大消息数,这是一块连续的内存区域。分配出来的所有消息空间保存在msg_pool中。
if (mq->msg_pool == RT_NULL)
{
rt_object_delete(&(mq->parent.parent));
return RT_NULL;
}
/* initialize message list */
mq->msg_queue_head = RT_NULL;//头指针和尾指针初始化为空
mq->msg_queue_tail = RT_NULL;
/* initialize message empty list */
mq->msg_queue_free = RT_NULL;
for (temp = 0; temp < mq->max_msgs; temp ++) //遍历每一个消息块
{
head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
temp * (mq->msg_size + sizeof(struct rt_mq_message)));
head->next = (struct rt_mq_message *)mq->msg_queue_free;
mq->msg_queue_free = head;
}
/* the initial entry is zero */
mq->entry = 0;
/* initialize an additional list of sender suspend thread */
rt_list_init(&(mq->suspend_sender_thread));
return mq;
}
RTM_EXPORT(rt_mq_create);
struct rt_mq_message
{
struct rt_mq_message *next;
};//消息结构体
发送消息
/* get a free list, there must be an empty item */
msg = (struct rt_mq_message *)mq->msg_queue_free; //从free链表获取一个消息块
//取出消息块后,free链表指针指向消息块的下一个
mq->msg_queue_free = msg->next;
//将buffer数据拷贝进去
rt_memcpy(msg + 1, buffer, size);
//尾部指针指向此消息
mq->msg_queue_tail = msg;
//如果头部指针为空,代表第一次存放数据进去,因此头部指针也指向此消息。下一次,消息不为空时,再放消息,头部不变。
if (mq->msg_queue_head == RT_NULL)
mq->msg_queue_head = msg;
读入消息
//新的空闲消息块,直接让空闲链表头部指向它,插入表头
msg->next = (struct rt_mq_message *)mq->msg_queue_free;
mq->msg_queue_free = msg;
写消息时的互斥操作
/* disable interrupt */
temp = rt_hw_interrupt_disable();//在取出消息块,写消息之前关中断,其它线程无法打扰。
互斥用关中断来实现。
互斥量也是用关中断实现。