FreeRTOS 队列 Queue 源码解析

[toc]


FreeRTOS 快速入门(四)之队列 一文中,我简单地叙述了 FreeRTOS 中队列的工作机制和基本使用。这一节我将依据 FreeRTOS V10.4.3 的源码深入地去探究队列是如何实现的。学好队列对我们后续学习信号量等知识的时候有很大的帮助。

一、队列

1、队列结构体

队列结构体定义在目录 queue.c 下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
typedef struct QueueDefinition
{
int8_t * pcHead; // 指向队列存储区的头部和下一个可写入的位置
int8_t * pcWriteTo; // 指向队列存储区下一个可写入的位置

union
{
QueuePointers_t xQueue; // 当该结构体用作队列时所需的独有数据
SemaphoreData_t xSemaphore; // 当该结构体用作信号量时所需的独有数据
} u;

// 这些链表按任务的优先级排序,以确保高优先级的任务有更高的访问权
List_t xTasksWaitingToSend; // 等待向该队列发送数据而被阻塞的任务列表。按优先级顺序存储
List_t xTasksWaitingToReceive; // 等待从该队列读取数据而被阻塞的任务列表。按优先级顺序存储

volatile UBaseType_t uxMessagesWaiting; // 队列中当前待处理的数据项数量
UBaseType_t uxLength; // 队列的长度(表示容纳的项数,而不是字节数)
UBaseType_t uxItemSize; // 队列中每个项的大小

// 用于跟踪队列的锁状态,它们记录队列是否被锁定,并在锁定时记录发送到队列的数据项数量
volatile int8_t cRxLock; // 在队列被锁定时,存储从队列中接收(移除)的项数。当队列未被锁定时,设置为 queueUNLOCKED
volatile int8_t cTxLock; // 在队列被锁定时,存储传输到队列(添加到队列)的项数。当队列未被锁定时,设置为 queueUNLOCKED

#if ( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
uint8_t ucStaticallyAllocated; // 如果队列使用的内存是静态分配的,则设置为 pdTRUE,以确保不会尝试释放该内存
#endif

#if ( configUSE_QUEUE_SETS == 1 )
struct QueueDefinition * pxQueueSetContainer; // 这个成员仅在配置中启用了队列集时有效,它指向包含此队列的队列集
#endif

#if ( configUSE_TRACE_FACILITY == 1 )
UBaseType_t uxQueueNumber; // 用于存储队列的编号,帮助在调试或跟踪时识别和管理不同的队列实例
uint8_t ucQueueType; // 用于存储队列的类型,帮助区分不同类型的队列(例如,普通队列、互斥锁队列等)
#endif
} xQUEUE;

typedef xQUEUE Queue_t;

注意其中的 QueuePointers 类型,定义如下:

1
2
3
4
5
typedef struct QueuePointers
{
int8_t * pcTail; /* 指向队列存储区域末尾的字节。分配的字节数比存储队列项所需的字节数多一个 */
int8_t * pcReadFrom; /* 指向上次读取队列项的位置 */
} QueuePointers_t;

有关链表 List 的内容可以参考:FreeRTOS 列表 List 源码解析

各个成员变量的含义已经在注释中给出,下面不再赘述。

并且在 queue.h 下为其创建了新的别名供外部用户使用,也是我们所熟知的:

1
2
struct QueueDefinition; /* Using old naming convention so as not to break kernel aware debuggers. */
typedef struct QueueDefinition * QueueHandle_t;

这种在头文件中声明,而在源文件中实现定义的方式,属于 C 语言中的不完整类型。


C 语言的不完整类型和前置声明

C语言中使用不完全类型(Incomplete Types)来保护结构体的方式,主要涉及到在声明结构体时不提供完整的定义,仅在需要时(如在其源文件中)才给出完整的定义。这种方式的的优点和缺点:

  • 优点:
    1. 封装性增强:使用不完全类型可以在一定程度上隐藏结构体的内部细节,防止外部代码直接访问结构体的成员,从而提高代码的封装性和安全性。
    2. 模块间解耦:通过不完全类型声明,可以在多个模块之间传递结构体的指针,而无需暴露结构体的完整定义。这有助于减少模块间的耦合度,使得系统更加灵活和易于维护。
  • 缺点:
    1. 使用限制:不完全类型有一些使用上的限制,比如不能直接使用 sizeof 运算符来获取不完全类型的大小(因为编译器不知道其完整定义)。这可能导致在需要知道结构体大小的情况下无法使用不完全类型。
    2. 容易出错:如果在使用不完全类型时没有正确地提供其完整定义,或者在多个地方提供了不一致的定义,都可能导致编译错误或运行时错误。

通过这种方式可以很好地实现封装抽象,因为队列的具体定义对用户来说就是透明的了,不能直接的访问结构成员,只能提供相应的接口来供访问,这样做的好处显而易见,可以防止用户随意破坏模块内部的抽象数据类型。

此外,不完整类型很好地解决了头文件循环包含的问题。见下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// circle.h
#include "point.h"

struct circle {
struct coordinate center;
};


// point.h
#include "circle.h"

struct coordinate {
struct circle cir;
};

如果编译这个程序,你会发现因为头文件循环包含而发生编译错误。

这个时候就可以使用前置声明轻松的解决这个问题,但是必须要使用指向不完整类型的指针了。

1
2
3
4
5
6
7
8
9
10
11
12

// circle.h
struct coordinate;
struct circle {
struct coordinate *center;
};

// point.h
struct circle;
struct coordinate {
struct circle *cir;
};

这样我们连头文件都不用包含,还可以缩短编译的时间。


2、队列类型

在文件 queue.h 下有如下定义表示队列的类型:

1
2
3
4
5
6
#define queueQUEUE_TYPE_BASE                ( ( uint8_t ) 0U ) /* 基础的队列 */
#define queueQUEUE_TYPE_SET ( ( uint8_t ) 0U ) /* 队列集 */
#define queueQUEUE_TYPE_MUTEX ( ( uint8_t ) 1U ) /* 互斥信号量 */
#define queueQUEUE_TYPE_COUNTING_SEMAPHORE ( ( uint8_t ) 2U ) /* 计数信号量 */
#define queueQUEUE_TYPE_BINARY_SEMAPHORE ( ( uint8_t ) 3U ) /* 二值信号量 */
#define queueQUEUE_TYPE_RECURSIVE_MUTEX ( ( uint8_t ) 4U ) /* 递归互斥信号量 */

queueQUEUE_TYPE_BASE 即基本的消息队列,另外,信号量机制也是通过队列实现的,因此当用于互斥信号量,二值信号量等时,会标记对于的队列类型。

二、队列相关操作

1、初始化

1.1 静态创建队列

前文中提到过,队列静态分配内存使用的是 xQueueCreateStatic() 函数,它其实是一个宏函数(在 queue.h 下):

1
2
3
#if ( configSUPPORT_STATIC_ALLOCATION == 1 )
#define xQueueCreateStatic( uxQueueLength, uxItemSize, pucQueueStorage, pxQueueBuffer ) xQueueGenericCreateStatic( ( uxQueueLength ), ( uxItemSize ), ( pucQueueStorage ), ( pxQueueBuffer ), ( queueQUEUE_TYPE_BASE ) )
#endif /* configSUPPORT_STATIC_ALLOCATION */

它实际上是调用了 xQueueGenericCreateStatic 函数来实现了静态初始化队列的功能。

其定义在 queue.c 下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#if ( configSUPPORT_STATIC_ALLOCATION == 1 )

QueueHandle_t xQueueGenericCreateStatic( const UBaseType_t uxQueueLength, // 队列长度
const UBaseType_t uxItemSize, // 每个数据的大小
uint8_t * pucQueueStorage, // 数据存储块
StaticQueue_t * pxStaticQueue, // 保存队列的数据结构
const uint8_t ucQueueType ) // 队列的类型(用途)
{
Queue_t * pxNewQueue;

configASSERT( uxQueueLength > ( UBaseType_t ) 0 );

/* 必须提供 StaticQueue_t 结构和队列存储区域 */
configASSERT( pxStaticQueue != NULL );

/* 如果项目大小不为 0,则应提供队列存储区域,如果项目大小为 0,则不应提供队列存储区域 */
configASSERT( !( ( pucQueueStorage != NULL ) && ( uxItemSize == 0 ) ) );
configASSERT( !( ( pucQueueStorage == NULL ) && ( uxItemSize != 0 ) ) );

#if ( configASSERT_DEFINED == 1 )
{
/* 检查用于声明 StaticQueue_t 或 StaticSemaphore_t 类型变量的结构
* 的大小是否与实际队列和信号结构的大小相等。 */
volatile size_t xSize = sizeof( StaticQueue_t );
configASSERT( xSize == sizeof( Queue_t ) );
( void ) xSize; /* 使编译器忽略这个警告 */
}
#endif /* configASSERT_DEFINED */

/* 将一个静态分配的队列结构体指针转换为动态分配的队列结构体指针,以便后续的队列初始化和操作 */
pxNewQueue = ( Queue_t * ) pxStaticQueue;

if( pxNewQueue != NULL )
{
#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
{
/* 该队列是静态分配的,所以要设置标志位以防队列后来被删除 */
pxNewQueue->ucStaticallyAllocated = pdTRUE;
}
#endif /* configSUPPORT_DYNAMIC_ALLOCATION */

// 队列创建后的初始化,下面会提到
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}
else
{
traceQUEUE_CREATE_FAILED( ucQueueType );
mtCOVERAGE_TEST_MARKER();
}

return pxNewQueue;
}

#endif /* configSUPPORT_STATIC_ALLOCATION */

程序大体比较简单,容易理解,注释也已经说明了各部分代码的作用。不过有几个细节需要注意一下:

中间有一段这样的代码:

1
( void ) xSize;

这段的作用如下:由于 xSize 变量在后续代码中没有被使用,编译器可能会发出未使用变量的警告。通过将 xSize 强制转换为 void,可以明确地告诉编译器忽略这个警告。这个技巧很常用,用于处理临时或未来可能使用的变量,但在当前代码中确实不需要它们的情况。下面举一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
void exampleFunction(int condition) {
size_t xSize = 100;

if (condition) {
// 使用 xSize 进行某些操作
printf("Size: %zu\n", xSize);
} else {
// 不使用 xSize,但为了避免编译器警告
( void ) xSize;
}
}

如果是在函数调用前面加 ( void ) 表示显式指明,程序不处理函数返回值。

最后出现的 traceQUEUE_CREATE_FAILED( ucQueueType ) 是用来检查宏是否定义:

1
2
3
4
// FreeRTOS.h
#ifndef traceQUEUE_CREATE
#define traceQUEUE_CREATE( pxNewQueue )
#endif

这段代码的主要功能是确保 traceQUEUE_CREATE_FAILED 宏在没有被定义的情况下被定义为一个空操作。这样可以避免在后续代码中使用未定义的宏,从而防止编译错误。

  • 如果 traceQUEUE_CREATE_FAILED 宏已经被定义,那么这段代码不会做任何事情,因为 #ifndef 条件不成立。
  • 如果 traceQUEUE_CREATE_FAILED 宏没有被定义,那么这段代码会定义它为一个空宏,即不执行任何操作。

这种做法常见于库的实现中,用于确保某些宏在用户代码中没有被重复定义,从而避免潜在的冲突和错误。这个技巧在 FreeRTOS 中使用地非常多。

mtCOVERAGE_TEST_MARKER() 则是定义了一个空函数,这种做法通常用于代码覆盖率测试,在需要插入标记以确保代码路径被测试到的地方使用。

1
2
3
4
// FreeRTOS.h
#ifndef mtCOVERAGE_TEST_MARKER
#define mtCOVERAGE_TEST_MARKER()
#endif

1.2 动态创建队列

同理,动态创建队列的宏定义如下:

1
2
3
#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
#define xQueueCreate( uxQueueLength, uxItemSize ) xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )
#endif

现在来看 xQueueGenericCreate 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )

QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength, // 队列的长度
const UBaseType_t uxItemSize, // 每个数据的大小
const uint8_t ucQueueType ) // 队列的类型(用途)
{
Queue_t * pxNewQueue; // 指向新创建的队列结构的指针
size_t xQueueSizeInBytes; // 队列存储区域的总大小(字节数)
uint8_t * pucQueueStorage; // 指向队列存储区域的指针

// 确保队列长度大于0
configASSERT( uxQueueLength > ( UBaseType_t ) 0 );

/* 计算队列存储区域大小
* 如果队列用来表示信号量,则 uxItemSize = 0 */
xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );

/* 检查乘法溢出 */
configASSERT( ( uxItemSize == 0 ) || ( uxQueueLength == ( xQueueSizeInBytes / uxItemSize ) ) );

/* 检查加法溢出 */
configASSERT( ( sizeof( Queue_t ) + xQueueSizeInBytes ) > xQueueSizeInBytes );

/* 动态分配内存,大小为队列结构大小加上队列存储区域大小 */
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes ); /*lint !e9087 !e9079 see comment above. */

// 检查内存分配是否成功
if( pxNewQueue != NULL )
{
/* 计算队列存储区域的起始地址 */
pucQueueStorage = ( uint8_t * ) pxNewQueue;
pucQueueStorage += sizeof( Queue_t ); /* 将 pucQueueStorage 指向队列存储区域的起始地址 */

#if ( configSUPPORT_STATIC_ALLOCATION == 1 )
{
/* 如果支持静态分配,标记队列为动态分配 */
pxNewQueue->ucStaticallyAllocated = pdFALSE;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */

// 初始化新创建的队列
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}
else
{
// 在前面讲 xQueueGenericCreateStatic 的时候提到过,这里不再赘述
traceQUEUE_CREATE_FAILED( ucQueueType );
mtCOVERAGE_TEST_MARKER();
}

return pxNewQueue;
}

#endif /* configSUPPORT_STATIC_ALLOCATION */

整个函数调用栈如下:

1.3 队列的初始化

在前面的代码中,我们可以看到,不论是静态创建队列还是动态创建队列,都调用了一个函数:prvInitialiseNewQueue,这个函数是用来对队列进行初始化的。下面看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength,  // 队列长度
const UBaseType_t uxItemSize, // 单条消息的大小
uint8_t * pucQueueStorage, // 实际存放消息的地址
const uint8_t ucQueueType, // 队列的类型(用途)
Queue_t * pxNewQueue ) // 消息队列控制块
{
( void ) ucQueueType; /* 编译器优化 */

/* uxItemSize 为 0,表示队列不存储实际数据(例如,用于信号量的队列) */
if( uxItemSize == ( UBaseType_t ) 0 )
{
/* 此时将 pcHead 设置为指向队列结构本身,以避免使用 NULL(因为 NULL 用于表示队列作为互斥量使用) */
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
else
{
/* 将 pcHead 设置为指向提供的队列存储区域的起始地址 */
pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
}

/* 设置队列的长度和项目大小 */
pxNewQueue->uxLength = uxQueueLength;
pxNewQueue->uxItemSize = uxItemSize;
( void ) xQueueGenericReset( pxNewQueue, pdTRUE ); // 重置队列,初始化队列的其他成员(如读写指针、消息计数等)

#if ( configUSE_TRACE_FACILITY == 1 )
{
pxNewQueue->ucQueueType = ucQueueType; // 配置队列类型
}
#endif /* configUSE_TRACE_FACILITY */

#if ( configUSE_QUEUE_SETS == 1 )
{
pxNewQueue->pxQueueSetContainer = NULL; // 初始化队列集容器
}
#endif /* configUSE_QUEUE_SETS */

traceQUEUE_CREATE( pxNewQueue );
}

整个函数调用栈如下:

1.4 队列的重置

这是本小节要讲的最后一个函数,刚才讲了 prvInitialiseNewQueue 中调用了 xQueueGenericReset 函数。这个函数是用来重置一个队列,将队列的一些属性重置,也相当于初始化了。

下面看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
BaseType_t xQueueGenericReset( QueueHandle_t xQueue,
BaseType_t xNewQueue )
{
Queue_t * const pxQueue = xQueue; // 指向要重置的队列的句柄

configASSERT( pxQueue );

// 进入临界区
/***********************************************************************************/
taskENTER_CRITICAL(); // 进入临界区,禁止中断,确保重置操作的原子性
{
/* 计算队列存储区的结束地址,并将其赋值给 pcTail。计算方式是:起始地址 + 队列长度 * 每个项目的大小。 */
pxQueue->u.xQueue.pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
/* 将队列中的消息等待计数初始化为 0 */
pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
/* 将写指针 pcWriteTo 初始化为队列存储区的起始地址,表示下一个写操作将从队列的起始位置开始 */
pxQueue->pcWriteTo = pxQueue->pcHead;
/* 计算队列存储区的最后一个位置的地址,并将其赋值给 pcReadFrom,表示下一个读操作将从队列的最后一个位置开始。
* 计算方式是:起始地址 + (队列长度 - 1) * 每个项目的大小 */
pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - 1U ) * pxQueue->uxItemSize );
pxQueue->cRxLock = queueUNLOCKED; /* 表示队列当前未被锁定,可以进行接收操作 */
pxQueue->cTxLock = queueUNLOCKED; /* 表示队列当前未被锁定,可以进行发送操作 */

/* 如果队列不是新创建的 (xNewQueue == pdFALSE),并且有任务在等待发送数据到队列,
* 则解除一个等待任务的阻塞状态,并可能触发任务切换 */
if( xNewQueue == pdFALSE )
{
/* 检查队列的 xTasksWaitingToSend 列表是否为空。如果列表不为空(即 pdFALSE),则表示有任务在等待发送数据到队列 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
/* 从列表中移除一个任务 */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
/* 切换任务 */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
/* 初始化等待发送和等待接收的任务列表 */
vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
}
}
taskEXIT_CRITICAL();
// 退出临界区
/***********************************************************************************/

return pdPASS;
}


整个队列创建好后的结构如下:

2、队列的发送

2.1 任务级入队函数

平时我们使用的入队 API 有:xQueueSendxQueueSendToBackxQueueSendToFront 本质上调用的是一个函数:xQueueGenericSend

1
2
3
4
5
6
7
8
9
10
11
#define xQueueSendToFront( xQueue, pvItemToQueue, xTicksToWait ) \
xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_FRONT )

#define xQueueSendToBack( xQueue, pvItemToQueue, xTicksToWait ) \
xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_BACK )

#define xQueueSend( xQueue, pvItemToQueue, xTicksToWait ) \
xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_BACK )

#define xQueueOverwrite( xQueue, pvItemToQueue ) \
xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), 0, queueOVERWRITE )

插入的位置定义如下:

1
2
3
#define queueSEND_TO_BACK                     ( ( BaseType_t ) 0 )  // 队尾插入
#define queueSEND_TO_FRONT ( ( BaseType_t ) 1 ) // 队头插入
#define queueOVERWRITE ( ( BaseType_t ) 2 ) // 覆写

下面来看 xQueueGenericSend 的实现:

出于篇幅原因,我删去了关于队列集的部分,感兴趣的可以自行查看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
BaseType_t xQueueGenericSend( QueueHandle_t xQueue,				 // 队列句柄
const void * const pvItemToQueue, // 要发送到队列的数据指针
TickType_t xTicksToWait, // 等待队列空间的最大时间
const BaseType_t xCopyPosition ) // 数据复制的位置(如覆盖或追加)
{
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired; /* 标记是否设置了超时结构和是否需要任务切换 */
TimeOut_t xTimeOut; /* 超时结构 */
Queue_t * const pxQueue = xQueue; /* 队列结构指针 */

configASSERT( pxQueue ); /* 确保队列句柄有效 */
/* 确保数据指针不为空,除非队列项大小为0 */
configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
/* 确保覆盖操作只在队列长度为1时进行 */
configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) );
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{
/* 确保调度器未挂起时,等待时间不为0 */
configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif

for( ; ; )
{
taskENTER_CRITICAL(); // 进入临界区
{
/* 查询队列现在是否还有剩余存储空间,如果采用覆写方式入队,则不用在乎队列是不是满的 */
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
traceQUEUE_SEND( pxQueue );

/************* 省略掉队列集的代码 **************/

{
/* 将数据复制到队列中,并返回一个标志 xYieldRequired,该标志指示是否需要进行任务切换
* 前面说了,入队分为后向入队、前向入队和覆写入队,它们的具体实现就是在这个函数中实现的 */
xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );

/* 检查是否有任务由于等待消息而进入阻塞态 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* 解除阻塞态的任务优先级最高,因此要进行一次任务切换 */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
// 存在特殊情况(例如任务持有多个互斥锁并按不同顺序释放),也需要进行任务切换
else if( xYieldRequired != pdFALSE ) //
{
/* 被解除阻塞的任务优先级高于我们自己的任务,因此立即让出 CPU */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}

taskEXIT_CRITICAL(); // 退出临界区
return pdPASS;
}
else /* 没有空闲空间且不是覆写模式 */
{
if( xTicksToWait == ( TickType_t ) 0 )
{
/* 如果队列已满且没有指定阻塞时间(或阻塞时间已过期),则退出临界区 */
taskEXIT_CRITICAL();

traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL;
}
else if( xEntryTimeSet == pdFALSE )
{
/* 如果队列已满且指定了阻塞时间,则配置超时结构
* 这里其实就是记录当前系统时钟节拍器的值 xTickCount 和溢出次数 xNumOfOverflows */
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
else
{
// 超时结构已经初始化过了
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL(); /* 退出临界区 */

/* 中断和其他任务现在可以在退出临界区后发送和接收队列数据 */

vTaskSuspendAll(); /* 挂起所有任务 */
prvLockQueue( pxQueue ); /* 锁定队列以防止其他任务访问 */

/* 检查超时是否已过期 */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
{
/* 检查队列是否仍满 */
if( prvIsQueueFull( pxQueue ) != pdFALSE )
{
traceBLOCKING_ON_QUEUE_SEND( pxQueue );
/* 记录阻塞事件并将其放入等待发送事件列表中 */
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );

/* 解锁队列以允许其他任务访问 */
prvUnlockQueue( pxQueue );

/* 恢复调度器,将任务从挂起状态移回就绪状态 */
if( xTaskResumeAll() == pdFALSE )
{
portYIELD_WITHIN_API();
}
}
else
{
/* 阻塞时间还没到,但是队列现在有空闲的队列项,那么再重试一次 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else
{
prvUnlockQueue( pxQueue ); /* 超时产生 */
( void ) xTaskResumeAll(); /* 恢复任务调度器 */

traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL;
}
} /*lint -restore */
}

2.1.1 入队函数

由上面的代码可以看出,虽然有那么长的代码,但实际上具体实现入队逻辑的是 prvCopyDataToQueue 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
static BaseType_t prvCopyDataToQueue( Queue_t * const pxQueue,      // 指向队列结构的指针
const void * pvItemToQueue, // 指向要复制到队列的数据的指针
const BaseType_t xPosition ) // 指定数据插入的位置
{
BaseType_t xReturn = pdFALSE;
UBaseType_t uxMessagesWaiting;

/* 这个函数是从一个临界区调用的 */

uxMessagesWaiting = pxQueue->uxMessagesWaiting; // 获取队列中当前的消息数量

if( pxQueue->uxItemSize == ( UBaseType_t ) 0 ) // 如果用于信号量,这里用不到
{
/* 略... */
}
else if( xPosition == queueSEND_TO_BACK ) // 将数据添加到队列的末尾
{
/* 强制转换为 void 是函数签名所要求的,并且是安全的
* 将 pvItemToQueue 中的数据复制到 pxQueue->pcWriteTo 指向的位置 */
( void ) memcpy( ( void * ) pxQueue->pcWriteTo, pvItemToQueue, ( size_t ) pxQueue->uxItemSize );
pxQueue->pcWriteTo += pxQueue->uxItemSize; /* 使其指向下一个可写位置 */

if( pxQueue->pcWriteTo >= pxQueue->u.xQueue.pcTail )
{
/* 如果 pxQueue->pcWriteTo 超过了队列的尾部 pxQueue->u.xQueue.pcTail,则将其重置为队列的头部 */
pxQueue->pcWriteTo = pxQueue->pcHead;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
/* 将 pvItemToQueue 中的数据复制到 pxQueue->u.xQueue.pcReadFrom 指向的位置 */
( void ) memcpy( ( void * ) pxQueue->u.xQueue.pcReadFrom, pvItemToQueue, ( size_t ) pxQueue->uxItemSize );
pxQueue->u.xQueue.pcReadFrom -= pxQueue->uxItemSize; /* 使其指向前一个位置 */

if( pxQueue->u.xQueue.pcReadFrom < pxQueue->pcHead )
{
/* 如果 pxQueue->u.xQueue.pcReadFrom 小于队列的头部 pxQueue->pcHead,则将其重置为队列的尾部减去一个数据项的大小 */
pxQueue->u.xQueue.pcReadFrom = ( pxQueue->u.xQueue.pcTail - pxQueue->uxItemSize );
}
else
{
mtCOVERAGE_TEST_MARKER();
}

/* 如果是覆写模式 */
if( xPosition == queueOVERWRITE )
{
/* 如果队列中有数据(uxMessagesWaiting > 0),则减少 uxMessagesWaiting 的计数 */
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
--uxMessagesWaiting;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}

/* 更新队列中的消息计数 */
pxQueue->uxMessagesWaiting = uxMessagesWaiting + ( UBaseType_t ) 1;

return xReturn;
}

简单来说,具体实现逻辑如下:

  • 如果选择后向入队 queueSEND_TO_BACK,则将消息复制到队列结构体成员 pcWriteTo 指向的队列项;复制成功以后 pcWriteTo 增加uxItemSize 个字节,指向下一个队列项目。
  • 当选择前向入队 queueSEND_TO_FRONT 或者 quueOVERWRITE 时,则将消息复制到 u.pcReadFrom 所指向的队列项目。同样需要调整u.pcReadFrom 的位置。当向队列写入一个消息以后,队列中统计当前消息数量的成员 uxMessagesWaiting 就会加一;但是选择覆写入队queueOVERWRITE 时还会将 uxMessagesWaiting 减一,这样一减一加相当于队列当前消息数量没有变。

2.1.2 队列锁

xQueueGenericSend 里面出现了和队列锁有关的函数,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// 队列上锁
#define prvLockQueue( pxQueue ) \
taskENTER_CRITICAL(); \
{ \
if( ( pxQueue )->cRxLock == queueUNLOCKED ) \
{ \
( pxQueue )->cRxLock = queueLOCKED_UNMODIFIED; \
} \
if( ( pxQueue )->cTxLock == queueUNLOCKED ) \
{ \
( pxQueue )->cTxLock = queueLOCKED_UNMODIFIED; \
} \
} \
taskEXIT_CRITICAL()

// 队列解锁
static void prvUnlockQueue( Queue_t * const pxQueue )
{
/* 该函数必须在调度器挂起(suspended)的状态下调用 */

/* 上锁计数器(cTxLock 和 cRxLock)记录了在队列上锁期间人队或出队的数量,
* 当队列上锁以后,队列项是可以加入或者移除队列的,但是相应的列表不会更新 */
taskENTER_CRITICAL();
{
int8_t cTxLock = pxQueue->cTxLock; /* 获取队列的Tx锁状态 */

/* 检查在队列锁定期间是否有数据被添加 */
while( cTxLock > queueLOCKED_UNMODIFIED )
{
{
/************************ 省略队列集 ****************************/
/************************ 相关代码 ****************************/

/* 队列的 xTasksWaitingToReceive 列表是否为空。如果列表不为空(即有任务在等待接收数据),则继续执行 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
/* 移出等待列表中的任务 */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* 等待的任务具有更高的优先级,因此记录需要进行上下文切换 */
vTaskMissedYield();
}
else
{
// 说明没有成功移除任务
mtCOVERAGE_TEST_MARKER();
}
}
else
{
// 列表为空,直接退出,并释放 Tx 锁
break;
}
}

--cTxLock;
}

pxQueue->cTxLock = queueUNLOCKED; // Tx 锁解锁
}
taskEXIT_CRITICAL();

/* Do the same for the Rx lock. */
taskENTER_CRITICAL();
{
int8_t cRxLock = pxQueue->cRxLock; // 获取队列的Rx锁状态

while( cRxLock > queueLOCKED_UNMODIFIED )
{
// 检查是否有任务在等待发送数据
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
// 进行任务切换
vTaskMissedYield();
}
else
{
mtCOVERAGE_TEST_MARKER();
}

--cRxLock;
}
else
{
// 列表为空,直接退出,并释放 Rx 锁
break;
}
}

pxQueue->cRxLock = queueUNLOCKED; // Rx锁解锁
}
taskEXIT_CRITICAL();
}

上锁的实现非常简单,而解锁是通过两个临界区分别对 Rx 和 Tx 锁进行解锁。

其中,调用了函数 vTaskMissedYield() 来完成任务切换,函数 vTaskMissedYield() 只是简单地将全局变量 xYieldPending 设置为 pdTRUE,真正的任务切换是在时钟节拍处理函数 xTaskIncrementTick() 中完成的,此函数会判断 xYieldPending 的值,从而决定是否进行任务切换。

2.1.3 portYIELD_WITHIN_API

portYIELD_WITHIN_API 函数和上下文切换有关:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// FreeRTOS.h
#ifndef portYIELD_WITHIN_API
#define portYIELD_WITHIN_API portYIELD
#endif

// portable/RVDS/ARM_CM4F/portmacro.h
#define portYIELD() \
{ \
/*将 PendSV 中断请求位置位,从而请求一个上下文切换 */ \
portNVIC_INT_CTRL_REG = portNVIC_PENDSVSET_BIT; \
\
/* Barriers are normally not required but do ensure the code is completely \
* within the specified behaviour for the architecture. */ \
__dsb( portSY_FULL_READ_WRITE ); \
__isb( portSY_FULL_READ_WRITE ); \
}

vPortYield 函数是 FreeRTOS 中用于请求上下文切换的关键函数,通过设置 PendSV 中断请求位并使用同步屏障确保操作的正确性。

  • portNVIC_INT_CTRL 是一个指向 NVIC 控制寄存器的指针。
  • portNVIC_PENDSVSET 是一个常量,表示 PendSV 中断请求位。

__DSB()__ISB() 都是编译器内置函数:

  • __DSB():用于确保在执行后续指令之前,之前的所有存储器访问操作都已经完成。
    这确保了在设置 PendSV 中断请求位之后,所有的存储器操作都已经完成,从而避免潜在的竞态条件。
  • __ISB():用于刷新指令流水线,确保在执行后续指令之前,之前的所有指令都已经完成。
    这确保了在设置 PendSV 中断请求位之后,所有的指令都已经完成,从而避免潜在的指令乱序执行问题。

指令同步屏障(Instruction Synchronization Barrier

在进行嵌入式开发中,经常会看到 DMBDSBISB 这三个指令。在具有超标量流水线的 MCU(如 Cortex-M7 内核的stm32H743)/SoC更是经常会使用到,相对于只支持三级流水线的 Cortex-M4 在每个时钟周期只能执行一条指令,Cortex-M7 拥有六级流水线和双发射超标量架构,拥有在一个时钟周期内执行两条指令的能力。以五级指令流水线为例,其指令执行步骤如下:

取指->译码->发射->执行->写回

其中==发射和执行都是乱序的==。

超标量流水线的设计产生了乱序指为嵌入式开发带来新的问题,同时固有的编译器优化也会导致内存乱序访问。总结来说会带来以下两个问题:


1. 编译时,编译器优化导致指令执行步骤和实际书写的顺序不一致,进而产生内存乱序访问。为了解决这一问题,可以使用”memory“这个伪指令,告诉编译器不要将该指令前后代码顺序打乱。
2, 运行时,由于处理器乱序执行导致内存乱序访问。DMBDSBISB 就是为了解决这一问题而引入的指令。


为了解决上述的两个问题,可以直接将内存屏障和编译屏障结合在一起使用。以基于 Cortex-A7 的 stm32MP135 为例,其 __DMB__DSB__ISB 的 API 如下:

__STATIC_FORCEINLINE void __ISB(void)
{
   __ASM volatile ("isb 0xF":::"memory");
}

__STATIC_FORCEINLINE void __DSB(void)
{
   __ASM volatile ("dsb 0xF":::"memory");
}

__STATIC_FORCEINLINE void __DMB(void)
{
   __ASM volatile ("dmb 0xF":::"memory");
}

可以看到每个指令既包含和自己相关的指令,也包含”memory“这个伪指令,这是 ARM 平台的编译屏障指令。

==DMB(Data Memory Barrier,数据内存屏障)指令==

DMB 主要用于多核处理器系统中,不同的处理器可能同时执行数据内存传输指令。DMB 指令确保在 DMB 之前的所有显式数据内存传输指令都已经在内存中读取或写入完成,同时确保任何后续的数据内存传输指令都将在DMB执行之后开始执行,否则有些数据传输指令可能会提前执行。

它保证的是 DMB 之前的内存访问指令与 DMB 之后的内存访问指令的执行顺序,DMB 不保证内存访问的完成顺序(保执行,不保完成)。也就是说,DMB 指令之后的内存访问指令不会被处理器重排到 DMB 指令的前面。DMB 指令不会保证内存访问指令在内存屏障指令之前完成,它仅仅保证内存屏障指令前后的内存访问的执行顺序。DMB 指令只影响内存访问指令、数据 cache 指令以及 cache 管理指令等,并不会影响其他指令(例如算术运算指令)的执行顺序。

注意:这里所说的多核不特指具有多个核心的 CPU,如对多个不同区域的内存空间进行操作,如对 DDR 和系统寄存器。

==DSB(Data Synchronization Barrier,数据同步屏障)指令==

在计算机的体系结构中,处理器在执行指令时通常会利用指令流水线来提高性能。但也会产生一些问题,比如在多线程编程中,两个线程同时对共享的内存进行读写操作,由于读/写操作的重排序,就会导致数据的不一致。

当执行 DSB 指令时,它确保在 DSB 之前的所有显式数据内存传输指令都已经在内存中读取或写入完成,同时确保任何后续的指令都将在 DSB 执行之后开始执行。

DSB 比 DMB 指令严格一些,仅当所有在它前面的内存访问指令都完成后,才会执行在它后面的指令,即任何指令都要等待 DSB 指令前面的内存访问指令完成。位于此指令前的所有缓存(如分支预测和 TLB 维护)操作需要全部完成。

注意:设备内存(Device Memory)/强序内存(Strongly Ordered Memory)类型访问时自动添加数据同步屏障 DSB,不需要再自行添加

==ISB(Instruction Synchronization Barrier,指令同步屏障)指令==

指令的流水线允许处理器同时执行多条指令的不同阶段,然而这样并行执行可能会导致一些问题,特别是涉及到上下文切换(更改上下文操作包括 cache、TLB、分支预测等维护操作以及改变系统控制寄存器等操作)的情况,如实时操作系统的任务切换。当上下文切换时,可能指令流水线中的指令还在执行,而此时上下文已经改变,导致指令执行的结果不正确。

通过插入 ISB 指令,处理器会将流水线中的指令全部刷新,从而确保之前的指令不会影响后续指令的执行,并且后续指令将从正确的上下文开始重新获取。

注:大多数 CPU 的体系架构在异常的入口和出口都有 ISB 的语义(自动执行)

2.2 中断级入队函数

同理,中断级的入队函数同样是来自同一个函数:xQueueGenericSendFromISR

1
2
3
4
5
6
7
8
9
10
11
#define xQueueSendToFrontFromISR( xQueue, pvItemToQueue, pxHigherPriorityTaskWoken ) \
xQueueGenericSendFromISR( ( xQueue ), ( pvItemToQueue ), ( pxHigherPriorityTaskWoken ), queueSEND_TO_FRONT )

#define xQueueSendToBackFromISR( xQueue, pvItemToQueue, pxHigherPriorityTaskWoken ) \
xQueueGenericSendFromISR( ( xQueue ), ( pvItemToQueue ), ( pxHigherPriorityTaskWoken ), queueSEND_TO_BACK )

#define xQueueOverwriteFromISR( xQueue, pvItemToQueue, pxHigherPriorityTaskWoken ) \
xQueueGenericSendFromISR( ( xQueue ), ( pvItemToQueue ), ( pxHigherPriorityTaskWoken ), queueOVERWRITE )

#define xQueueOverwriteFromISR( xQueue, pvItemToQueue, pxHigherPriorityTaskWoken ) \
xQueueGenericSendFromISR( ( xQueue ), ( pvItemToQueue ), ( pxHigherPriorityTaskWoken ), queueOVERWRITE )

下面看源码(同样,这里我省去了队列集的相关部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
BaseType_t xQueueGenericSendFromISR( QueueHandle_t xQueue, // 队列句柄,指向要操作的队列
const void * const pvItemToQueue, // 指向要发送到队列的数据的指针
BaseType_t * const pxHigherPriorityTaskWoken, // 用于指示是否需要进行上下文切换
const BaseType_t xCopyPosition ) // 指定数据复制到队列的位置
{
BaseType_t xReturn; /* 用于存储函数返回值 */
UBaseType_t uxSavedInterruptStatus; /* 用于保存中断状态 */
Queue_t * const pxQueue = xQueue; /* 指向队列结构的指针 */

configASSERT( pxQueue ); /* 确保队列指针非空 */
/* 确保在队列项大小不为零时,数据指针非空 */
configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
/* 确保在覆盖模式下,队列长度为1 */
configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) );

/* 中断优先级检查,确保中断优先级在允许的范围内 */
portASSERT_IF_INTERRUPT_PRIORITY_INVALID();

/* 保存当前中断状态并禁用中断 */
uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
{
/* 检查队列是否有空间或是否处于覆盖模式 */
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
const int8_t cTxLock = pxQueue->cTxLock;
const UBaseType_t uxPreviousMessagesWaiting = pxQueue->uxMessagesWaiting;

traceQUEUE_SEND_FROM_ISR( pxQueue );

/* 将数据复制到队列中,前面已经详细叙述过,这里不再赘述 */
( void ) prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );

/* 如果队列被锁定,事件列表不会被修改。这将在队列稍后解锁时完成 */
if( cTxLock == queueUNLOCKED )
{
/***************** 队列集操作 略... **********************/

{
/* 检查队列的 xTasksWaitingToReceive 列表是否为空。如果不为空,说明有任务在等待从队列中接收数据 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
/* 从列表中移除一个任务 */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* 等待的任务具有更高的优先级,因此记录需要进行上下文切换 */
if( pxHigherPriorityTaskWoken != NULL )
{
/* 表示需要进行上下文切换 */
*pxHigherPriorityTaskWoken = pdTRUE;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}

( void ) uxPreviousMessagesWaiting; /* 编译器优化 */
}
#endif /* configUSE_QUEUE_SETS */
}
else /* 队列被锁定 */
{
/* 增加锁计数,以便解锁队列的任务知道在锁定期间有数据被发布 */
configASSERT( cTxLock != queueINT8_MAX );

pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
}

xReturn = pdPASS;
}
else /* 队列已满且不是覆写模式 */
{
traceQUEUE_SEND_FROM_ISR_FAILED( pxQueue );
xReturn = errQUEUE_FULL;
}
}
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus ); // 清除之前保存的中断状态掩码

return xReturn;
}

中断级入队函数和任务级入队函数大同小异,没什么可说的了。

3、任务的读取

任务的读取同样是分为任务级出队函数和中断级出队函数。

3.1 任务级出队函数

3.1.1 xQueueReceive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
BaseType_t xQueueReceive( QueueHandle_t xQueue,      // 队列句柄
void * const pvBuffer, // 接收数据的缓冲区
TickType_t xTicksToWait ) // 等待数据的最大时间
{
BaseType_t xEntryTimeSet = pdFALSE; /* 标记是否设置了超时结构 */
TimeOut_t xTimeOut; /* 超时结构 */
Queue_t * const pxQueue = xQueue; /* 指向队列的指针 */

/* 检查队列指针和缓冲区指针是否有效 */
configASSERT( ( pxQueue ) );
configASSERT( !( ( ( pvBuffer ) == NULL ) && ( ( pxQueue )->uxItemSize != ( UBaseType_t ) 0U ) ) );

/* 检查调度器是否被挂起,如果挂起且等待时间不为0,则断言失败 */
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{
configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif

for( ; ; )
{
taskENTER_CRITICAL(); // 进入临界区,防止中断干扰
{
/* 获取队列中等待的消息数量 */
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;

/* 队列中有数据 */
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
/* 从队列中复制数据到缓冲区 */
prvCopyDataFromQueue( pxQueue, pvBuffer );
traceQUEUE_RECEIVE( pxQueue );
pxQueue->uxMessagesWaiting = uxMessagesWaiting - ( UBaseType_t ) 1;

/* 检查是否有任务在等待发送数据到队列 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
/* 从事件列表中移除等待任务 */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}

taskEXIT_CRITICAL();
return pdPASS;
}
else
{
if( xTicksToWait == ( TickType_t ) 0 ) /* 队列为空且不等待 */
{
/* 立即返回队列为空的状态 */
taskEXIT_CRITICAL();
traceQUEUE_RECEIVE_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
else if( xEntryTimeSet == pdFALSE ) /* 队列为空但指定了等待时间 */
{
/* 设置超时结构 */
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
else
{
/* Entry time was already set. */
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL();

/* Interrupts and other tasks can send to and receive from the queue
* now the critical section has been exited. */

/* 挂起所有任务并锁定队列 */
vTaskSuspendAll();
prvLockQueue( pxQueue );

/* 检查是否超时 */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
{
/* 如果未超时且队列仍为空,则将当前任务放入等待接收事件列表中 */
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
{
traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue );
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
prvUnlockQueue( pxQueue );

/* 恢复所有任务 */
if( xTaskResumeAll() == pdFALSE )
{
portYIELD_WITHIN_API();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else /* 队列非空情况下的处理 */
{
/* 解锁队列并恢复所有任务,然后循环回去尝试读取数据 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else /* 超时情况下的处理,然后循环回去尝试读取数据 */
{
/* 任务已经超时,则解锁队列并恢复所有任务 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();

/* 如果队列仍然为空,则记录跟踪信息并返回 errQUEUE_EMPTY 表示接收失败 */
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
{
traceQUEUE_RECEIVE_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
} /*lint -restore */
}

3.1.1.1 从队列中读取数据

xQueueReceive 调用了 prvCopyDataFromQueue 函数来实现从队列中读取数据,下面看其实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void prvCopyDataFromQueue( Queue_t * const pxQueue,  // 指向队列结构的常量指针,表示要从中读取数据的队列
void * const pvBuffer ) // 指向缓冲区的常量指针,表示要将数据复制到的目标缓冲区
{
/* 检查队列中每个项目的大小是否不为零 */
if( pxQueue->uxItemSize != ( UBaseType_t ) 0 )
{
/* 将读取指针 pcReadFrom 向前移动 uxItemSize 个字节。这表示从队列中读取了一个数据项 */
pxQueue->u.xQueue.pcReadFrom += pxQueue->uxItemSize;

/* 检查读取指针 pcReadFrom 是否超过了队列的尾部指针 pcTail。如果是,则表示已经读取到队列的末尾 */
if( pxQueue->u.xQueue.pcReadFrom >= pxQueue->u.xQueue.pcTail )
{
/* 如果读取指针超过了队列的尾部,则将其重置为队列的头部指针 pcHead,以便从头开始读取 */
pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead;
}
else
{
mtCOVERAGE_TEST_MARKER();
}

/* 将数据从队列中读取指针 pcReadFrom 处复制到目标缓冲区 pvBuffer 中。复制的数据大小为 uxItemSize 字节 */
( void ) memcpy( ( void * ) pvBuffer, ( void * ) pxQueue->u.xQueue.pcReadFrom, ( size_t ) pxQueue->uxItemSize );
}
}

3.1.2 xQueuePeek

xQueuePeekxQueueReceive 实现非常相似,无非是不将数据移出队列罢了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
BaseType_t xQueuePeek( QueueHandle_t xQueue, 	  // 队列句柄
void * const pvBuffer, // 用于存储数据的缓冲区
TickType_t xTicksToWait ) // 等待数据的最大时间
{
BaseType_t xEntryTimeSet = pdFALSE; /* 用于标记是否设置了超时结构 */
TimeOut_t xTimeOut; /* 超时结构 */
int8_t * pcOriginalReadPosition; /* 用于保存原始读取位置 */
Queue_t * const pxQueue = xQueue; /* 队列结构体指针 */

/* 检查队列指针和缓冲区指针是否有效 */
configASSERT( ( pxQueue ) );
configASSERT( !( ( ( pvBuffer ) == NULL ) && ( ( pxQueue )->uxItemSize != ( UBaseType_t ) 0U ) ) );

/* 检查调度器是否被挂起,如果挂起且等待时间不为0,则断言失败 */
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{
configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif

for( ; ; )
{
taskENTER_CRITICAL();
{
/* 获取队列中等待的消息数量 */
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;

/* 检查队列是否有数据 */
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
/* 保存当前读取位置 */
pcOriginalReadPosition = pxQueue->u.xQueue.pcReadFrom;

/* 从队列中复制数据到缓冲区 */
prvCopyDataFromQueue( pxQueue, pvBuffer );
traceQUEUE_PEEK( pxQueue );

/* 重置读取位置,因为只是查看数据,不移除 */
pxQueue->u.xQueue.pcReadFrom = pcOriginalReadPosition;

/* 检查是否有任务在等待接收数据 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
/* 移除事件列表中的任务 */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* 如果移除的任务优先级更高,则进行任务切换 */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}

taskEXIT_CRITICAL();
return pdPASS;
}
else /* 处理队列为空的情况 */
{
/* 如果队列为空且不等待 */
if( xTicksToWait == ( TickType_t ) 0 )
{
/* 退出临界区,返回队列为空错误 */
taskEXIT_CRITICAL();
traceQUEUE_PEEK_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
/* 如果队列为空且等待时间不为0 */
else if( xEntryTimeSet == pdFALSE )
{
/* 设置超时结构,标记已设置超时 */
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
else
{
/* Entry time was already set. */
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL();

/* 现在临界区已经退出,中断和其他任务可以向队列发送数据和从队列接收数据 */

/* 挂起所有任务并锁定队列 */
vTaskSuspendAll();
prvLockQueue( pxQueue );

/* 更新超时状态,看看是否已经过期 */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
{
/* 超时时间尚未到期,检查当前队列中是否有数据,如果没有,则进入阻塞状态以等待数据 */
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
{
/* 将当前任务放入事件列表中,等待数据到达,并解锁队列 */
traceBLOCKING_ON_QUEUE_PEEK( pxQueue );
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
prvUnlockQueue( pxQueue );

/* 恢复调度器 */
if( xTaskResumeAll() == pdFALSE )
{
/* 进行上下文切换 */
portYIELD_WITHIN_API();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else /* 队列为空的情况 */
{
/* 解锁队列并恢复调度器 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();

/* 如果队列仍然为空,则返回 errQUEUE_EMPTY 表示获取数据失败 */
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
{
traceQUEUE_PEEK_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
else
{
/* 尝试获取数据 */
mtCOVERAGE_TEST_MARKER();
}
}
} /*lint -restore */
}

3.2 中断级出队函数

3.2.1 xQueueReceiveFromISR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
BaseType_t xQueueReceiveFromISR( QueueHandle_t xQueue,	// 队列句柄
void * const pvBuffer, // 用于存储接收数据的缓冲区
BaseType_t * const pxHigherPriorityTaskWoken ) // 如果接收操作导致更高优先级任务被唤醒,则设置为pdTRUE
{
BaseType_t xReturn; /* 存储函数返回值 */
UBaseType_t uxSavedInterruptStatus; /* 保存中断状态 */
Queue_t * const pxQueue = xQueue; /* 指向队列结构的指针 */

/* 确保队列指针非空和缓冲区指针非空 */
configASSERT( pxQueue );
configASSERT( !( ( pvBuffer == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );

/* 检查当前中断优先级是否在允许调用FreeRTOS API函数的范围内 */
portASSERT_IF_INTERRUPT_PRIORITY_INVALID();

/* 保存中断状态并进入临界区 */
uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
{
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;

/* 检查队列是否有数据 */
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
/* 获取队列中等待的消息数量 */
const int8_t cRxLock = pxQueue->cRxLock;

traceQUEUE_RECEIVE_FROM_ISR( pxQueue );

/* 将数据从队列复制到缓冲区 */
prvCopyDataFromQueue( pxQueue, pvBuffer );
/* 更新队列中等待的消息数量 */
pxQueue->uxMessagesWaiting = uxMessagesWaiting - ( UBaseType_t ) 1;

/* 如果队列未锁定 */
if( cRxLock == queueUNLOCKED )
{
/* 检查是否有任务等待发送数据到队列 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
/* 从事件列表中移除任务 */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
/* 如果移除的任务优先级高于当前任务,则设置高优先级任务唤醒标志,以便在适当的时候进行上下文切换 */
if( pxHigherPriorityTaskWoken != NULL )
{
*pxHigherPriorityTaskWoken = pdTRUE;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
/* 如果队列被锁定,则增加锁定计数,以便解锁队列的任务知道在锁定期间有数据被移除 */
configASSERT( cRxLock != queueINT8_MAX );

pxQueue->cRxLock = ( int8_t ) ( cRxLock + 1 );
}

xReturn = pdPASS;
}
else /* 队列中没有数据可供接收 */
{
/* 返回失败 */
xReturn = pdFAIL;
traceQUEUE_RECEIVE_FROM_ISR_FAILED( pxQueue );
}
}
/* 恢复之前保存的中断状态 */
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );

return xReturn;
}

3.2.2 xQueuePeekFromISR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
BaseType_t xQueuePeekFromISR( QueueHandle_t xQueue,    // 队列的句柄
void * const pvBuffer ) // 存储从队列中读取的数据的缓冲区
{
BaseType_t xReturn; /* 存储函数的返回值 */
UBaseType_t uxSavedInterruptStatus; /* 保存中断状态 */
int8_t * pcOriginalReadPosition; /* 保存队列的原始读取位置 */
Queue_t * const pxQueue = xQueue; /* 队列结构的指针 */

/* 确保队列指针非空和缓冲区指针非空 */
configASSERT( pxQueue );
configASSERT( !( ( pvBuffer == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
configASSERT( pxQueue->uxItemSize != 0 ); /* 中断中不能使用信号量 */

/* 检查中断优先级是否有效,确保在中断服务程序中调用此函数是安全的 */
portASSERT_IF_INTERRUPT_PRIORITY_INVALID();

/* 保存中断状态并进入临界区 */
uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
{
/* 检查队列中是否有消息等待处理 */
if( pxQueue->uxMessagesWaiting > ( UBaseType_t ) 0 )
{
traceQUEUE_PEEK_FROM_ISR( pxQueue );

/* 保存原始读取位置 */
pcOriginalReadPosition = pxQueue->u.xQueue.pcReadFrom;
/* 从队列中复制数据到缓冲区 */
prvCopyDataFromQueue( pxQueue, pvBuffer );
/* 重置读取位置,因为只是查看数据,不移除数据 */
pxQueue->u.xQueue.pcReadFrom = pcOriginalReadPosition;

xReturn = pdPASS;
}
else /* 处理队列为空的情况 */
{
xReturn = pdFAIL;
traceQUEUE_PEEK_FROM_ISR_FAILED( pxQueue );
}
}
/* 恢复之前保存的中断状态 */
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );

return xReturn;
}

三、收尾

至此,queue.c 中大部分和队列相关的内容已经讲解完,剩下一小部分的内容像 prvIsQueueEmpty 实现非常简单,就不再细讲。


FreeRTOS 队列 Queue 源码解析
http://example.com/2024/09/06/FreeRTOS-queue-源码解析/
作者
Yu xin
发布于
2024年9月6日
许可协议