文章目录
- 前言
- 一、什么是消息队列,有什么用
- 1. 概念
- 2. 作用
- 3. 特点
- 二、消息队列的深入原理
- 1. 消息队列的存储结构
- 2. 出入队列的相关操作
- ① 入队操作:
- ② 发送紧急消息:
- ③ 读取队列操作:
- 3. 关于队列的阻塞机制
- ① 读操作的阻塞机制:
- ② 发操作的阻塞机制:
- ③ 注意事项:
- 三、消息队列函数解析
- 1. 消息队列控制块结构体
- ① 代码
- ② 结构体参数解释
- 2. 消息队列创建函数 xQueueCreate()
- ① xQueueCreate() 实际上调用了 xQueueGenericCreate()
- ② xQueueGenericCreate()
- ③ prvInitialiseNewQueue()
- ④ xQueueGenericReset()
- ⑤ 完成消息队列创建后队列结构
- ⑥ 消息队列创建的示例
- 3. 创建静态消息队列函数 xQueueCreateStatic()
- ① 函数原型及参数讲解
- ② 静态创建消息队列的示例
- 4. 消息队列删除函数
- 后记
前言
- 本篇文章主要对 FreeRTOS 中消息队列的概念和相关函数进行了详解
- 这篇文章是笔者有关于 FreeRTOS 中消息队列【上篇】,讲解了消息队列的基本概念(作用、存储结构、出入队列逻辑、阻塞机制)以及相关函数(队列创建和删除)
- 消息队列【下篇】将详细剖析消息队列中几个发送消息和接收消息的函数的运作流程
- 一部分代码和图片参考野火 FreeRTOS 教程。
一、什么是消息队列,有什么用
1. 概念
消息队列其实就是一种可以在任务间或者任务与中断间传递信息的全局变量。
也就是说,一些任务可以往消息队列中放消息,而一些任务可以从消息队列中获得消息。
2. 作用
用于任务间或者任务与中断间的通信。
3. 特点
- 支持 FIFO
- 支持 LIFO,也就是可以往队首发消息
- 支持消息不定长,只要不超过规定的节点的最大存储值
- 共享于多个任务的发送和接收
二、消息队列的深入原理
1. 消息队列的存储结构
创建消息队列时,FreeRTOS 会为其分配一块内存空间。空间前部放置消息队列控制块,后部放置消息队列的节点元素。也就是控制块和消息空间在同一块连续的内存上。
2. 出入队列的相关操作
① 入队操作:
- 当队列满时,若允许覆盖入队,消息将被拷贝至消息队列的队尾。
- 否则,根据用户指定的阻塞超时时间进行阻塞。
- 在阻塞时间内,若队列一直不允许允许入队,任务保持阻塞状态等待。
- 当队列未满可以入队时,将消息入队,同时如果有任务在等待消息,那么接触这个在等待消息的任务的阻塞
- 如果超过指定的阻塞时间仍未允许入队,则任务自动从阻塞态转移到就绪态,并返回错误码 errQUEUE_FULL。
② 发送紧急消息:
- 与发送普通消息几乎相同。
- 唯一的区别是紧急消息被发送到消息队列队头,以便接收者优先接收并及时处理紧急消息。
- 此处的队头指的是一开始位于 pcTail 的前一个的 pcReadFrom
③ 读取队列操作:
- 任务读取队列时可以指定阻塞超时时间。
- 在指定时间内,如果队列为空,任务保持阻塞状态等待有效数据。
- 当其他任务或中断服务程序将数据写入等待的队列时,任务会自动从阻塞态转移到就绪态。
- 当等待时间超过指定的阻塞时间,即使队列中没有有效数据,任务也会自动从阻塞态转移到就绪态。
3. 关于队列的阻塞机制
① 读操作的阻塞机制:
- 当任务尝试从消息队列中读取消息时,如果队列为空,任务有三种选择:
- 选择一:任务直接离开,不等待消息,执行其他任务,不进入阻塞态。
- 选择二:任务等待一段时间,如果在等待期间队列有消息到达,任务从阻塞态转为就绪态并执行。
- 选择三:任务一直等待,直到队列有消息到达。
② 发操作的阻塞机制:
- 当要发送消息到队列时,只有在队列有足够空间时,发送者才能成功发送消息。
- 当队列已满时,根据用户指定的阻塞超时时间,发送者会被阻塞,在超时时间内无法成功发送消息,发送者会接收到一个错误码并解除阻塞状态。
③ 注意事项:
- 只有在任务中发送消息时才允许阻塞状态,中断中发送消息不允许阻塞,需要调用相应的API函数接口。
- 如果多个任务阻塞在同一个消息队列中,这些任务会按照优先级排序,优先级高的任务会先获得队列的访问权。
三、消息队列函数解析
1. 消息队列控制块结构体
① 代码
/*
* Definition of the queue used by the scheduler.
* Items are queued by copy, not reference. See the following link for the
* rationale: http://www.freertos.org/Embedded-RTOS-Queues.html
*/
typedef struct QueueDefinition
{
int8_t *pcHead; /*< Points to the beginning of the queue storage area. */
int8_t *pcTail; /*< Points to the byte at the end of the queue storage area. Once more byte is allocated than necessary to store the queue items, this is used as a marker. */
int8_t *pcWriteTo; /*< Points to the free next place in the storage area. */
union /* Use of a union is an exception to the coding standard to ensure two mutually exclusive structure members don't appear simultaneously (wasting RAM). */
{
int8_t *pcReadFrom; /*< Points to the last place that a queued item was read from when the structure is used as a queue. */
UBaseType_t uxRecursiveCallCount;/*< Maintains a count of the number of times a recursive mutex has been recursively 'taken' when the structure is used as a mutex. */
} u;
List_t xTasksWaitingToSend; /*< List of tasks that are blocked waiting to post onto this queue. Stored in priority order. */
List_t xTasksWaitingToReceive; /*< List of tasks that are blocked waiting to read from this queue. Stored in priority order. */
volatile UBaseType_t uxMessagesWaiting;/*< The number of items currently in the queue. */
UBaseType_t uxLength; /*< The length of the queue defined as the number of items it will hold, not the number of bytes. */
UBaseType_t uxItemSize; /*< The size of each items that the queue will hold. */
volatile int8_t cRxLock; /*< Stores the number of items received from the queue (removed from the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */
volatile int8_t cTxLock; /*< Stores the number of items transmitted to the queue (added to the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */
#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
uint8_t ucStaticallyAllocated; /*< Set to pdTRUE if the memory used by the queue was statically allocated to ensure no attempt is made to free the memory. */
#endif
#if ( configUSE_QUEUE_SETS == 1 )
struct QueueDefinition *pxQueueSetContainer;
#endif
#if ( configUSE_TRACE_FACILITY == 1 )
UBaseType_t uxQueueNumber;
uint8_t ucQueueType;
#endif
} xQUEUE;
/* The old xQUEUE name is maintained above then typedefed to the new Queue_t
name below to enable the use of older kernel aware debuggers. */
typedef xQUEUE Queue_t;
② 结构体参数解释
参考这张表:
- 起始地址是低地址,结束地址是高地址
这个代码段定义了一个结构体xQUEUE,它表示FreeRTOS的队列。同时,使用typedef为其定义了别名Queue_t,方便调用。该结构体成员解释如下:
- pcHead:该队列存储区域的起始地址。
- pcTail:该队列存储区域的最后一个字节的后一个字节的地址,用于表示该队列的存储区域的结束,因为在声明时存储区域会比所需空间多分配一个字节。
- pcWriteTo:下一个可用于存储数据的位置的地址。
- pcReadFrom:队列中上一个读取数据的位置的地址。
- uxRecursiveCallCount:递归互斥时计算已递归“获取”当前互斥标志的次数,当该结构体被用做互斥锁时。
- 其中,pcReadFrom 和 uxRecursiveCallCount 是一对互斥变量,当队列用于消息队列时,使用 pcReadFrom;当队列用于互斥量时,使用 uxRecursiveCallCount
- 这里使用联合体保证这两个变量不会同时出现:
union {
int8_t *pcReadFrom;
UBaseType_t uxRecursiveCallCount;
} u;
- xTasksWaitingToSend:一个任务等待发送到该队列的列表,当队列已满无法入队时使用,按优先级顺序存储。
- xTasksWaitingToReceive:一个任务等待从该队列读取的列表,当队列为空读取不到消息时使用,按优先级顺序存储。
- uxMessagesWaiting:当前在队列中存储的项目的数量。
- uxLength:队列的长度,以队列可以保存的条目数,而不是字节数来定义。
- uxItemSize:队列中每个条目的大小。
- cRxLock:队列被锁定时从队列中接收到(从队列中删除)项目的数量。当队列没有被锁定时设置为queueUNLOCKED,队列可以被自动锁定或者手动锁定,下文将会解释。
- cTxLock:队列被锁定时传输到队列中(添加到队列中)的项目数量。当队列没有被锁定时设置为queueUNLOCKED。
- ucStaticallyAllocated: 当队列的内存是静态分配的时将设置为pdTRUE,以确保不会尝试释放该存储器。
- pxQueueSetContainer: 在队列集中使用时,保存队列集容器的指针。
- uxQueueNumber:队列的编号,在跟踪功能中使用。
- ucQueueType:队列的类型,在跟踪功能中使用。
2. 消息队列创建函数 xQueueCreate()
① xQueueCreate() 实际上调用了 xQueueGenericCreate()
#if( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
#define xQueueCreate( uxQueueLength, uxItemSize ) xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )
#endif
- uxQueueLength:队列能够存储的最大消息单元数目,即队列长度。
- uxItemSize:队列中消息单元的大小,以字节为单位。
- 第三个变量为队列的类型,默认为队列,其他类型如下
- queueQUEUE_TYPE_BASE:表示队列。
- queueQUEUE_TYPE_SET:表示队列集合 。
- queueQUEUE_TYPE_MUTEX:表示互斥量。
- queueQUEUE_TYPE_COUNTING_SEMAPHORE:表示计数信号量。
- queueQUEUE_TYPE_BINARY_SEMAPHORE:表示二进制信号量。
- queueQUEUE_TYPE_RECURSIVE_MUTEX :表示递归互斥量。
其中互斥量和二进制信号量是比较容易混淆的,这两者都是同一时间内只能由一个线程使用。不同的是,此处待填坑。。。
② xQueueGenericCreate()
- 动态分配消息空间,消息空间大小为 0 时,用作信号量
- 调用队列初始化函数 prvInitialiseNewQueue()
#if( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, const uint8_t ucQueueType )
{
Queue_t *pxNewQueue;
size_t xQueueSizeInBytes;
uint8_t *pucQueueStorage;
configASSERT( uxQueueLength > ( UBaseType_t ) 0 );
if( uxItemSize == ( UBaseType_t ) 0 )
{
/* There is not going to be a queue storage area. */
xQueueSizeInBytes = ( size_t ) 0;
}
else
{
/* Allocate enough space to hold the maximum number of items that
can be in the queue at any time. */
xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize ); /*lint !e961 MISRA exception as the casts are only redundant for some ports. */
}
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );
if( pxNewQueue != NULL )
{
/* Jump past the queue structure to find the location of the queue
storage area. */
pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t );
#if( configSUPPORT_STATIC_ALLOCATION == 1 )
{
/* Queues can be created either statically or dynamically, so
note this task was created dynamically in case it is later
deleted. */
pxNewQueue->ucStaticallyAllocated = pdFALSE;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}
return pxNewQueue;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */
/*-----------------------------------------------------------*/
③ prvInitialiseNewQueue()
- 初始化队列控制块中各种参数
- pcHead:用作互斥量时,消息空间为 0,只能指向已知区域,这里使其指向消息控制块;消息空间不为 0 时指向消息空间头部。这个指针在初始化后将不会移动,用于快速定位队头。
- 初始化队列长度和每个条目的大小
- 调用 xQueueGenericReset() 初始化消息队列中其它参数
static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, uint8_t *pucQueueStorage, const uint8_t ucQueueType, Queue_t *pxNewQueue )
{
/* Remove compiler warnings about unused parameters should
configUSE_TRACE_FACILITY not be set to 1. */
( void ) ucQueueType;
if( uxItemSize == ( UBaseType_t ) 0 )
{
/* No RAM was allocated for the queue storage area, but PC head cannot
be set to NULL because NULL is used as a key to say the queue is used as
a mutex. Therefore just set pcHead to point to the queue as a benign
value that is known to be within the memory map. */
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
else
{
/* Set the head to the start of the queue storage area. */
pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
}
/* Initialise the queue members as described where the queue type is
defined. */
pxNewQueue->uxLength = uxQueueLength;
pxNewQueue->uxItemSize = uxItemSize;
( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
#if ( configUSE_TRACE_FACILITY == 1 )
{
pxNewQueue->ucQueueType = ucQueueType;
}
#endif /* configUSE_TRACE_FACILITY */
#if( configUSE_QUEUE_SETS == 1 )
{
pxNewQueue->pxQueueSetContainer = NULL;
}
#endif /* configUSE_QUEUE_SETS */
traceQUEUE_CREATE( pxNewQueue );
}
/*-----------------------------------------------------------*/
④ xQueueGenericReset()
- pcTail:队列尾指针。这个指针在初始化后将不会移动,用于快速定位队尾。初始化指向存储空间的最后一个条目(此处说的条目指 uxItemSize 确定的每个单元,下文 “条目” 和 “单元” 和 “项目” 将不再区分)
pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
- uxMessagesWaiting:队列中的消息个数。初始化为 0。
pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
- pcWriteTo:写入位置指针。指向队列中下一个要写入的单元位置,初始化指向队头 pcHead
- u.pcReadFrom:读取位置指针。指向队列中下一个要被读取的单元位置的前一个,初始化指向队尾指针 pcTail 的前一个单元。这样初始化是为了与读取函数作对应,详见:
pxQueue->u.pcReadFrom = pxQueue->pcHead + (( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize );
- cRxLock 和 cTxLock:队列接收锁和队列发送锁。初始化为未上锁。
pxQueue->cRxLock = queueUNLOCKED;
pxQueue->cTxLock = queueUNLOCKED;
- 这个函数还用于不是空队列了重置。当重置非空队列时,需要对阻塞的任务进行处理:
- 如果有发送消息任务被阻塞,则需要恢复这些任务,因为重置后队列就不是满的了
- 如果有等待消息任务被阻塞,则不需要进行任何操作,因为重置后队列也是空的,需要该任务继续等待消息
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
- 如果是新创建的消息队列的重置,那么需要初始化任务等待发送列表和任务等待接收列表
else
{
/* Ensure the event queues start in the correct state. */
vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
}
⑤ 完成消息队列创建后队列结构
⑥ 消息队列创建的示例
- 应该在临界区中创建消息队列
- 使用消息队列句柄操作消息队列,判断队列创建是否成功
QueueHandle_t Test_Queue = NULL;
#define QUEUE_LEN 4 /* 队列的长度,最大可包含多少个消息 */
#define QUEUE_SIZE 4 /* 队列中每个消息大小(字节) */
BaseType_t xReturn = pdPASS; /* 定义一个创建信息返回值,默认为 pdPASS */
taskENTER_CRITICAL(); //进入临界区
/* 创建 Test_Queue */
Test_Queue = xQueueCreate((UBaseType_t)QUEUE_LEN, /* 消息队列的长度 */
(UBaseType_t)QUEUE_SIZE); /* 消息的大小 */
if (NULL != Test_Queue)
printf("创建 Test_Queue 消息队列成功!\r\n");
taskEXIT_CRITICAL(); //退出临界区
3. 创建静态消息队列函数 xQueueCreateStatic()
① 函数原型及参数讲解
QueueHandle_t xQueueCreateStatic(UBaseType_t uxQueueLength,
UBaseType_t uxItemSize,
uint8_t *pucQueueStorageBuffer,
StaticQueue_t *pxQueueBuffer );
- uxQueueLength:队列能够存储的最大单元数目,即队列深度
- uxItemSize:队列中数据单元的长度,以字节为单位
- pucQueueStorageBuffer:指针,指向一个 uint8_t 类型的数组,数组的大小至少有uxQueueLength* uxItemSize 个字节。当 uxItemSize 为 0 时,pucQueueStorageBuffer 可以为 NULL
- pxQueueBuffer:指针,指向 StaticQueue_t 类型的变量,该变量用于存储队列的数据结构
从函数原型可以看到,队列控制块和消息空间是分别定义的,都是由编程人员分别自行定义的。
② 静态创建消息队列的示例
/* 创建一个可以最多可以存储 10 个 64 位变量的队列 */
#define QUEUE_LENGTH 10
#define ITEM_SIZE sizeof(uint64_t)
/* 该变量用于存储队列的数据结构 */
static StaticQueue_t xStaticQueue;
/* 该数组作为队列的存储区域,大小至少有 uxQueueLength * uxItemSize 个字节 */
uint8_t ucQueueStorageArea[QUEUE_LENGTH * ITEM_SIZE];
void vATask(void* pvParameters)
{
QueueHandle_t xQueue;
/* 创建一个队列 */
xQueue = xQueueCreateStatic(QUEUE_LENGTH, /* 队列深度 */
ITEM_SIZE, /* 队列数据单元的单位 */
ucQueueStorageArea, /* 队列的存储区域 */
&xStaticQueue); /* 队列的数据结构 */
/* 剩下的其他代码 */
}
4. 消息队列删除函数
- 使用消息队列句柄操作
- 应确保该消息队列是由 xQueueCreate() 或 xQueueCreateStatic()函数创建的
- 通过判断 configSUPPORT_DYNAMIC_ALLOCATION 和 configSUPPORT_STATIC_ALLOCATION
- 如果只使能了动态分配内存,那么一定需要内存释放
- 如果动态分配和静态分配都使能了,那么通过判断队列控制块中的 ucStaticallyAllocated 来确定是否需要内存释放
- 如果只使能静态分配,那么不进行内存释放,但是使队列指针指向空
void vQueueDelete( QueueHandle_t xQueue )
{
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
configASSERT( pxQueue );
traceQUEUE_DELETE( pxQueue );
#if ( configQUEUE_REGISTRY_SIZE > 0 )
{
vQueueUnregisterQueue( pxQueue );
}
#endif
#if( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) )
{
/* The queue can only have been allocated dynamically - free it
again. */
vPortFree( pxQueue );
}
#elif( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 1 ) )
{
/* The queue could have been allocated statically or dynamically, so
check before attempting to free the memory. */
if( pxQueue->ucStaticallyAllocated == ( uint8_t ) pdFALSE )
{
vPortFree( pxQueue );
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
#else
{
/* The queue must have been statically allocated, so is not going to be
deleted. Avoid compiler warnings about the unused parameter. */
( void ) pxQueue;
}
#endif /* configSUPPORT_DYNAMIC_ALLOCATION */
}
/*-----------------------------------------------------------*/
后记
如果您觉得本文写得不错,可以点个赞激励一下作者!
如果您发现本文的问题,欢迎在评论区或者私信共同探讨!
共勉!