队列
队列里有多个消息块,每个消息块大小一致。
写:
- 有空间,成功。
- 无空间:返回Err;等待一段时间。
队列里面会有两个链表:发送链表和接收链表
struct rt_messagequeue
{
struct rt_ipc_object parent; /**< inherit from ipc_object */
void *msg_pool; /**< start address of message queue */
rt_uint16_t msg_size; /**< message size of each message */
rt_uint16_t max_msgs; /**< max number of messages */
rt_uint16_t entry; /**< index of messages in the queue */
void *msg_queue_head; /**< list head */
void *msg_queue_tail; /**< list tail */
void *msg_queue_free; /**< pointer indicated the free node of queue */
rt_list_t suspend_sender_thread; //挂起的发送线程链表
};
struct rt_ipc_object
{
struct rt_object parent; /**< inherit from rt_object */
rt_list_t suspend_thread; //挂起的接收线程链表
};
线程2读取mq的数据,如果没有数据,愿意等待5个tick。
- 判断队列是否为空?
- 判断是否等待?
- 线程挂起->从ReadyList移除,放入mq->parent->suspend_thread。
- 启动线程自己的定时器
- 被唤醒:(1)其它线程写队列,从mq->parent->suspend_thread取出、唤醒(2)超时
struct rt_messagequeue
{
struct rt_ipc_object parent; /**< inherit from ipc_object */
void *msg_pool; /**< start address of message queue */
rt_uint16_t msg_size; /**< message size of each message */
rt_uint16_t max_msgs; /**< max number of messages */
rt_uint16_t entry; //队列中消息的索引
void *msg_queue_head; /**< list head */
void *msg_queue_tail; /**< list tail */
void *msg_queue_free; /**< pointer indicated the free node of queue */
rt_list_t suspend_sender_thread; /**< sender thread suspended on this message queue */
};
while (mq->entry == 0) //如果消息队列为空
{
RT_DEBUG_IN_THREAD_CONTEXT;
/* reset error number in thread */
thread->error = RT_EOK;
/* no waiting, return timeout */
if (timeout == 0)//不愿意等待
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
thread->error = -RT_ETIMEOUT;
return -RT_ETIMEOUT;//立即返回超时错误
}
/* suspend current thread */
rt_ipc_list_suspend(&(mq->parent.suspend_thread),
thread,
mq->parent.parent.flag); //挂在等待接收链表
/* has waiting time, start thread timer */
if (timeout > 0)
{
/* get the start tick of timer */
tick_delta = rt_tick_get();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* re-schedule */
rt_schedule();
/* recv message */
if (thread->error != RT_EOK)
{
/* return error */
return thread->error;
}
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* if it's not waiting forever and then re-calculate timeout tick */
if (timeout > 0)
{
tick_delta = rt_tick_get() - tick_delta;
timeout -= tick_delta;
if (timeout < 0)
timeout = 0;
}
//挂起线程
rt_err_t rt_thread_suspend(rt_thread_t thread)
{
register rt_base_t stat;
register rt_base_t temp;
/* thread check */
RT_ASSERT(thread != RT_NULL);
RT_ASSERT(rt_object_get_type((rt_object_t)thread) == RT_Object_Class_Thread);
RT_DEBUG_LOG(RT_DEBUG_THREAD, ("thread suspend: %s\n", thread->name));
stat = thread->stat & RT_THREAD_STAT_MASK;
if ((stat != RT_THREAD_READY) && (stat != RT_THREAD_RUNNING))
{
RT_DEBUG_LOG(RT_DEBUG_THREAD, ("thread suspend: thread disorder, 0x%2x\n",
thread->stat));
return -RT_ERROR;
}
/* disable interrupt */
temp = rt_hw_interrupt_disable();
if (stat == RT_THREAD_RUNNING)
{
/* not suspend running status thread on other core */
RT_ASSERT(thread == rt_thread_self());
}
/* change thread stat */
rt_schedule_remove_thread(thread); //从就绪链表中移出
thread->stat = RT_THREAD_SUSPEND | (thread->stat & ~RT_THREAD_STAT_MASK);
/* stop thread timer anyway */
rt_timer_stop(&(thread->thread_timer));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
RT_OBJECT_HOOK_CALL(rt_thread_suspend_hook, (thread));
return RT_EOK;
}
RTM_EXPORT(rt_thread_suspend);
由FLAG决定挂起的位置
rt_inline rt_err_t rt_ipc_list_suspend(rt_list_t *list,
struct rt_thread *thread,
rt_uint8_t flag)
{
/* suspend thread */
rt_thread_suspend(thread);
switch (flag)
{
case RT_IPC_FLAG_FIFO: //先进先出,按时间排队
rt_list_insert_before(list, &(thread->tlist));
break;
case RT_IPC_FLAG_PRIO://优先级高的优先
{
struct rt_list_node *n;
struct rt_thread *sthread;
/* find a suitable position */
for (n = list->next; n != list; n = n->next)
{
sthread = rt_list_entry(n, struct rt_thread, tlist);
/* find out */
if (thread->current_priority < sthread->current_priority)
{
/* insert this thread before the sthread */
rt_list_insert_before(&(sthread->tlist), &(thread->tlist));
break;
}
}
/*
* not found a suitable position,
* append to the end of suspend_thread list
*/
if (n == list)
rt_list_insert_before(list, &(thread->tlist));
}
break;
}
return RT_EOK;
}
/* has waiting time, start thread timer */
if (timeout > 0) //超时时间大于0
{
/* get the start tick of timer */
tick_delta = rt_tick_get();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer)); //启动定时器
}
rt-schedule();//重新调度,运行其它线程写数据
/* recv message */
if (thread->error != RT_EOK) //判断唤醒线程的原因,若是超时错误-ETIMEOUT,则直接返回错误
{
/* return error */
return thread->error;
}
写队列
while ((msg = mq->msg_queue_free) == RT_NULL) //队列满了
{
}
rt_memcpy(msg + 1, buffer, size); //将消息拷贝进Buffer
/* resume suspended thread */
if (!rt_list_isempty(&mq->parent.suspend_thread))
{
rt_ipc_list_resume(&(mq->parent.suspend_thread));//唤醒线程
/* enable interrupt */
rt_hw_interrupt_enable(temp);
rt_schedule();
return RT_EOK;
}
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* remove from suspend list */
rt_list_remove(&(thread->tlist));
rt_timer_stop(&thread->thread_timer);
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* insert to schedule ready list */
rt_schedule_insert_thread(thread);