Semaphore
信号量的基本概念与特性
信号量和队列极其类似,不同就在于信号量不进行数据读写操作,只储存计数值。
信号量是操作系统中最基础的同步机制,本质上是一个计数器,用于控制对共享资源的访问和任务间的同步。
核心特性: - 计数器机制:通过计数值管理资源可用性 - 线程安全:原子操作,不会出现竞争条件 - 阻塞等待:任务可以阻塞等待信号量可用 - 无数据传递:只传递信号,不传递实际数据
信号量 vs 队列:
| 特性 | 信号量 | 队列 |
|---|---|---|
| 数据传递 | 不传递数据 | 传递数据 |
| 主要用途 | 同步、互斥 | 数据传输 |
| 内存开销 | 较小 | 较大 |
| 灵活性 | 相对较低 | 较高 |
信号量基本操作
可以把信号量想象成一个游乐场的门票管理员,他管理着一个热门游乐设施(共享资源)。
这个管理员手里掌握着一定数量的门票(信号量的计数值),他的规则非常简单:
类比:游乐场门票管理员
- 共享资源:一个只能同时容纳固定人数的热门游乐设施(比如“太空漩涡”)。
- 信号量:管理这个设施的管理员,以及他手里的门票。
- 信号量计数值:管理员手里剩余的可直接入场门票的数量。
- 初始值 = 设施的容量(比如 3 张票,表示最多3个人同时玩)。
- 任务:想要玩“太空漩涡”的游客们。
信号量的两大核心操作
1. take() 操作 - “获取门票”
这个名字有很多等价叫法:wait, P 操作, acquire, take。 就像游客对管理员说:“我要一张票!”
形象化的步骤:
- 你走过去问管理员:“我要一张票”(调用
wait())。 - 管理员检查手里:
- 情况A(有票):如果计数值 > 0(比如还有1张票),他二话不说,直接递给你一张票,同时把手里的票数减1(
计数值--)。你可以直接进去玩了! - 情况B(没票):如果计数值 = 0(票发完了),管理员会让你去旁边排队等待(任务进入阻塞状态)。你会被挂在那个设施的等待队列里。
- 情况A(有票):如果计数值 > 0(比如还有1张票),他二话不说,直接递给你一张票,同时把手里的票数减1(
- 什么时候能醒来?
- 当里面有一个人玩完了出来(执行了
signal()操作),还回一张票时,管理员会叫醒排队队伍里的第一个人(比如按照优先级或先来后到),把票给他。
- 当里面有一个人玩完了出来(执行了
wait() 的核心目的就是申请资源的访问权。如果申请不到,就乖乖排队等着。
2. give() 操作 - “归还门票”
这个名字也有很多等价叫法:signal, V 操作, release, give。 就像游客玩完了出来对管理员说:“我出来了,票还你!”
形象化的步骤:
- 你玩完了,从出口出来(临界区代码执行完毕)。
- 你对管理员说:“我还票”(调用
signal())。 - 管理员检查等待队列:
- 情况A(有人在等):如果等待队列里有其他游客在等票,管理员不会把票收回盒子,而是直接拍拍队伍第一个人的肩膀,把这张票给他。那个被叫醒的人就可以进去玩了。此时,管理员手里的票数(计数值)没有变化(还是0),他只是完成了一次“交接”。
- 情况B(没人在等):如果等待队列是空的,管理员就把这张票放回自己手里的票堆,让可用票数加1(
计数值++)。
signal() 的核心目的是释放资源的访问权,并通知等待的任务。
其他相关操作
除了核心的 take 和 give,通常还会有:
-
初始化:在游乐场开门前,设定这个设施初始有多少张票可用(设置信号量的初始计数值)。
- 初始值 = 1:这就是我们常说的互斥锁,因为这个设施一次只允许一个人进去(比如一个单人间鬼屋)。
- 初始值 = N (N>1):这称为计数信号量,用于控制访问数量有限的资源池(比如有3辆的碰碰车场,有10个连接的数据库连接池)。
-
非阻塞等待:
tryWait()- 你走过去问管理员:“有票吗?立刻告诉我!我不想排队。”
- 如果没票,他不会让你排队,而是直接告诉你:“没票!”,然后你就可以转身去玩别的了(函数立即返回错误码)。
-
带超时的等待:
timedWait()- 你走过去说:“我要一张票,但我只排5分钟的队。”
- 如果5分钟内有人还票,你就能进去。
- 如果5分钟到了还没票,你就不排了,自己离开(任务从阻塞状态超时返回)。
总结表格
| 操作 | 编程函数名 | 形象化动作 | 对计数值的影响 | 对任务的影响 |
|---|---|---|---|---|
| 初始化 | sem_init(&sem, N) |
管理员准备N张门票 | 设置为N | 无 |
| 申请资源 | sem_wait() |
“我要一张票!” | 如果有票,则减1 | 如果没票,就排队阻塞 |
| 释放资源 | sem_post() |
“我出来了,还票!” | 如果没人等,则加1 如果有人等,不变 |
叫醒一个排队的人 |
| 尝试申请 | sem_trywait() |
“有现票吗?没有我走了” | 有票则减1,无票不变 | 不阻塞,立即返回 |
| 限时申请 | sem_timedwait() |
“我排5分钟队,等不到就走” | 超时内等到票则减1 | 超时则结束阻塞 |
二进制信号量详解
二进制信号量特性
二进制信号量是最简单的信号量类型: - 计数值范围:只有0和1两种状态 - 行为模式:类似于锁机制 - 应用场景:任务同步、互斥访问
状态说明: - 1:信号量可用,任务可以获取 - 0:信号量不可用,任务需要等待
创建二进制信号量
xSemaphoreCreateBinary
SemaphoreHandle_t xSemaphoreCreateBinary(void);
重要特性:
- 创建后初始状态为0(不可用)
- 需要手动调用xSemaphoreGive使其可用
- 适用于动态内存分配
创建示例:
// 创建二进制信号量
SemaphoreHandle_t xBinarySemaphore;
void vInitSemaphores(void) {
xBinarySemaphore = xSemaphoreCreateBinary();
if(xBinarySemaphore != NULL) {
// 创建成功后立即使其可用
xSemaphoreGive(xBinarySemaphore);
printf("Binary semaphore created and made available\n");
} else {
printf("Failed to create binary semaphore\n");
}
}
xSemaphoreCreateBinaryStatic
静态创建二进制信号量:
SemaphoreHandle_t xSemaphoreCreateBinaryStatic(StaticSemaphore_t *pxSemaphoreBuffer);
静态创建示例:
// 静态分配信号量控制块
static StaticSemaphore_t xBinarySemaphoreBuffer;
void vInitStaticSemaphores(void) {
// 静态创建二进制信号量
SemaphoreHandle_t xStaticBinarySemaphore;
xStaticBinarySemaphore = xSemaphoreCreateBinaryStatic(&xBinarySemaphoreBuffer);
if(xStaticBinarySemaphore != NULL) {
xSemaphoreGive(xStaticBinarySemaphore);
printf("Static binary semaphore created successfully\n");
}
}
计数信号量详解
计数信号量特性
计数信号量用于管理多个相同的资源: - 计数值范围:0到最大计数值 - 资源管理:计数值表示可用资源数量 - 应用场景:资源池管理、限流控制
创建计数信号量
xSemaphoreCreateCounting
SemaphoreHandle_t xSemaphoreCreateCounting(UBaseType_t uxMaxCount,
UBaseType_t uxInitialCount);
参数说明:
- uxMaxCount:信号量最大计数值
- uxInitialCount:信号量初始计数值
创建示例:
// 创建各种计数信号量示例
SemaphoreHandle_t xParkingSemaphore; // 停车场信号量
SemaphoreHandle_t xBufferSemaphore; // 缓冲区信号量
SemaphoreHandle_t xConnectionSemaphore; // 连接数信号量
void vCreateCountingSemaphores(void) {
// 停车场:10个车位,初始全部空着
xParkingSemaphore = xSemaphoreCreateCounting(10, 10);
// 缓冲区:最多5个缓冲块,初始有3个可用
xBufferSemaphore = xSemaphoreCreateCounting(5, 3);
// 数据库连接:最多8个连接,初始全部可用
xConnectionSemaphore = xSemaphoreCreateCounting(8, 8);
// 检查创建结果
if(xParkingSemaphore == NULL || xBufferSemaphore == NULL || xConnectionSemaphore == NULL) {
printf("Error: Failed to create counting semaphores\n");
} else {
printf("All counting semaphores created successfully\n");
}
}
xSemaphoreCreateCountingStatic
静态创建计数信号量:
SemaphoreHandle_t xSemaphoreCreateCountingStatic(UBaseType_t uxMaxCount,
UBaseType_t uxInitialCount,
StaticSemaphore_t *pxSemaphoreBuffer);
信号量基本操作函数
xSemaphoreTake - 获取信号量
BaseType_t xSemaphoreTake(SemaphoreHandle_t xSemaphore,
TickType_t xTicksToWait);
参数说明:
- xSemaphore:信号量句柄
- xTicksToWait:阻塞超时时间
阻塞行为:
- 0:非阻塞,立即返回
- portMAX_DELAY:无限阻塞
- N:阻塞N个时钟节拍
使用示例:
void vTaskUsingSemaphore(void *pvParameters) {
TickType_t xLastWakeTime = xTaskGetTickCount();
while(1) {
// 尝试获取信号量,等待100ms
if(xSemaphoreTake(xBinarySemaphore, 100 / portTICK_PERIOD_MS) == pdTRUE) {
// 成功获取信号量,执行受保护的操作
printf("Semaphore acquired, performing critical operation\n");
vPerformCriticalOperation();
// 释放信号量
xSemaphoreGive(xBinarySemaphore);
printf("Semaphore released\n");
} else {
// 获取信号量超时
printf("Failed to acquire semaphore within timeout\n");
}
// 任务周期执行
vTaskDelayUntil(&xLastWakeTime, 1000 / portTICK_PERIOD_MS);
}
}
xSemaphoreGive - 释放信号量
任何任务都可以释放
BaseType_t xSemaphoreGive(SemaphoreHandle_t xSemaphore);
返回值:
- pdTRUE:成功释放信号量
- pdFALSE:释放失败(信号量已满)
释放示例:
void vProducerTask(void *pvParameters) {
while(1) {
// 生产数据
vProduceData();
// 释放信号量通知消费者
if(xSemaphoreGive(xDataReadySemaphore) == pdTRUE) {
printf("Signal sent to consumer\n");
} else {
printf("Failed to give semaphore\n");
}
vTaskDelay(500 / portTICK_PERIOD_MS);
}
}
中断安全操作函数
xSemaphoreTakeFromISR
BaseType_t xSemaphoreTakeFromISR(SemaphoreHandle_t xSemaphore,
BaseType_t *pxHigherPriorityTaskWoken);
xSemaphoreGiveFromISR
BaseType_t xSemaphoreGiveFromISR(SemaphoreHandle_t xSemaphore,
BaseType_t *pxHigherPriorityTaskWoken);
中断中使用示例:
// 外部中断处理函数
void EXTI0_IRQHandler(void) {
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
// 清除中断标志
EXTI->PR = EXTI_PR_PR0;
// 在中断中释放信号量
if(xSemaphoreGiveFromISR(xInterruptSemaphore, &xHigherPriorityTaskWoken) == pdTRUE) {
printf("Semaphore given from ISR\n");
}
// 必要时进行任务切换
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}
// 处理中断的任务
void vInterruptHandlerTask(void *pvParameters) {
while(1) {
// 等待中断信号
if(xSemaphoreTake(xInterruptSemaphore, portMAX_DELAY) == pdTRUE) {
printf("Interrupt received, processing...\n");
vProcessInterrupt();
}
}
}
信号量删除函数
vSemaphoreDelete
void vSemaphoreDelete(SemaphoreHandle_t xSemaphore);
删除信号量示例:
void vCleanupResources(void) {
// 删除不再使用的信号量
if(xBinarySemaphore != NULL) {
vSemaphoreDelete(xBinarySemaphore);
xBinarySemaphore = NULL;
printf("Binary semaphore deleted\n");
}
if(xCountingSemaphore != NULL) {
vSemaphoreDelete(xCountingSemaphore);
xCountingSemaphore = NULL;
printf("Counting semaphore deleted\n");
}
if(xMutex != NULL) {
vSemaphoreDelete(xMutex);
xMutex = NULL;
printf("Mutex deleted\n");
}
}
信号量查询函数
uxSemaphoreGetCount - 获取信号量计数
UBaseType_t uxSemaphoreGetCount(SemaphoreHandle_t xSemaphore);
查询示例:
void vMonitorSemaphoreStatus(void) {
UBaseType_t uxBinaryCount, uxCountingCount, uxMutexCount;
// 获取各种信号量的当前计数值
uxBinaryCount = uxSemaphoreGetCount(xBinarySemaphore);
uxCountingCount = uxSemaphoreGetCount(xCountingSemaphore);
uxMutexCount = uxSemaphoreGetCount(xMutex);
printf("Semaphore Status:\n");
printf(" Binary: %lu\n", uxBinaryCount);
printf(" Counting: %lu\n", uxCountingCount);
printf(" Mutex: %lu\n", uxMutexCount);
// 根据状态采取行动
if(uxCountingCount == 0) {
printf("Warning: No resources available in counting semaphore!\n");
}
}
信号量综合实验案例:停车场管理系统
#include "FreeRTOS.h"
#include "task.h"
#include "semphr.h"
#include "stdio.h"
#include "string.h"
// 系统信号量定义
SemaphoreHandle_t xParkingSpaces; // 停车位计数信号量
SemaphoreHandle_t xEntryGate; // 入口闸机二进制信号量
SemaphoreHandle_t xExitGate; // 出口闸机二进制信号量
SemaphoreHandle_t xDisplayMutex; // 显示屏互斥量
// 系统状态变量
static uint32_t ulCarsInParking = 0;
static uint32_t ulTotalCarsServed = 0;
// 显示屏输出函数(受互斥量保护)
void vSafePrintf(const char *format, ...) {
va_list args;
// 获取显示屏互斥量
if(xSemaphoreTake(xDisplayMutex, 100 / portTICK_PERIOD_MS) == pdTRUE) {
va_start(args, format);
vprintf(format, args);
va_end(args);
// 释放互斥量
xSemaphoreGive(xDisplayMutex);
}
}
// 车辆入场任务
void vCarEntryTask(void *pvParameters) {
uint8_t car_id = (uint8_t)pvParameters;
char car_plate[10];
// 生成车牌号
snprintf(car_plate, sizeof(car_plate), "CAR%03d", car_id);
vSafePrintf("[ENTRY] Car %s arrived at entrance\n", car_plate);
while(1) {
// 等待入口闸机可用(二进制信号量)
if(xSemaphoreTake(xEntryGate, 2000 / portTICK_PERIOD_MS) == pdTRUE) {
vSafePrintf("[ENTRY] Car %s is at gate\n", car_plate);
// 尝试获取停车位(计数信号量)
if(xSemaphoreTake(xParkingSpaces, 1000 / portTICK_PERIOD_MS) == pdTRUE) {
// 成功获取停车位
ulCarsInParking++;
ulTotalCarsServed++;
vSafePrintf("[ENTRY] Car %s entered. Parking: %lu/%lu, Total: %lu\n",
car_plate, ulCarsInParking,
uxSemaphoreGetCount(xParkingSpaces) + ulCarsInParking,
ulTotalCarsServed);
// 释放入口闸机
xSemaphoreGive(xEntryGate);
// 模拟停车时间(1-10秒)
vTaskDelay((1000 + (rand() % 9000)) / portTICK_PERIOD_MS);
// 车辆准备离开
vSafePrintf("[EXIT] Car %s preparing to leave\n", car_plate);
// 等待出口闸机
if(xSemaphoreTake(xExitGate, 2000 / portTICK_PERIOD_MS) == pdTRUE) {
// 释放停车位
xSemaphoreGive(xParkingSpaces);
ulCarsInParking--;
vSafePrintf("[EXIT] Car %s left. Parking: %lu/%lu\n",
car_plate, ulCarsInParking,
uxSemaphoreGetCount(xParkingSpaces) + ulCarsInParking);
// 释放出口闸机
xSemaphoreGive(xExitGate);
}
} else {
// 没有停车位可用
vSafePrintf("[ENTRY] Car %s left - no parking space available\n", car_plate);
xSemaphoreGive(xEntryGate);
}
} else {
vSafePrintf("[ENTRY] Car %s gave up waiting for gate\n", car_plate);
}
// 车辆离开后等待一段时间再回来
vTaskDelay((5000 + (rand() % 10000)) / portTICK_PERIOD_MS);
vSafePrintf("[ENTRY] Car %s arrived again\n", car_plate);
}
}
// 停车场监控任务
void vParkingMonitorTask(void *pvParameters) {
UBaseType_t uxAvailableSpaces;
UBaseType_t uxMaxSpaces = 10; // 假设总共10个车位
vSafePrintf("[MONITOR] Parking monitor started\n");
while(1) {
// 获取当前可用车位数量
uxAvailableSpaces = uxSemaphoreGetCount(xParkingSpaces);
vSafePrintf("\n=== Parking Status ===\n");
vSafePrintf("Available spaces: %lu\n", uxAvailableSpaces);
vSafePrintf("Cars in parking: %lu\n", ulCarsInParking);
vSafePrintf("Total served: %lu\n", ulTotalCarsServed);
vSafePrintf("Utilization: %.1f%%\n",
(float)(uxMaxSpaces - uxAvailableSpaces) / uxMaxSpaces * 100);
// 状态警告
if(uxAvailableSpaces == 0) {
vSafePrintf("WARNING: Parking lot FULL!\n");
} else if(uxAvailableSpaces <= 2) {
vSafePrintf("WARNING: Parking lot almost full\n");
}
vSafePrintf("=====================\n\n");
// 每5秒监控一次
vTaskDelay(5000 / portTICK_PERIOD_MS);
}
}
// 紧急车辆优先任务
void vEmergencyVehicleTask(void *pvParameters) {
vSafePrintf("[EMERGENCY] Emergency vehicle service started\n");
while(1) {
// 随机模拟紧急车辆到达
vTaskDelay((30000 + (rand() % 30000)) / portTICK_PERIOD_MS);
vSafePrintf("[EMERGENCY] Emergency vehicle arriving!\n");
// 紧急车辆优先入场
if(xSemaphoreTake(xEntryGate, portMAX_DELAY) == pdTRUE) {
vSafePrintf("[EMERGENCY] Emergency vehicle entering\n");
// 强制获取停车位(可能等待)
if(xSemaphoreTake(xParkingSpaces, portMAX_DELAY) == pdTRUE) {
ulCarsInParking++;
ulTotalCarsServed++;
vSafePrintf("[EMERGENCY] Emergency vehicle parked\n");
// 释放入口闸机
xSemaphoreGive(xEntryGate);
// 紧急处理时间较短
vTaskDelay(2000 / portTICK_PERIOD_MS);
// 紧急离开
if(xSemaphoreTake(xExitGate, portMAX_DELAY) == pdTRUE) {
xSemaphoreGive(xParkingSpaces);
ulCarsInParking--;
vSafePrintf("[EMERGENCY] Emergency vehicle departed\n");
xSemaphoreGive(xExitGate);
}
}
}
}
}
// 系统初始化函数
void vInitializeParkingSystem(void) {
printf("Initializing Parking Management System...\n");
// 创建计数信号量:10个停车位,初始全部可用
xParkingSpaces = xSemaphoreCreateCounting(10, 10);
// 创建二进制信号量:入口和出口闸机,初始可用
xEntryGate = xSemaphoreCreateBinary();
xExitGate = xSemaphoreCreateBinary();
// 创建互斥量:保护显示屏输出
xDisplayMutex = xSemaphoreCreateMutex();
// 检查信号量创建结果
if(xParkingSpaces == NULL || xEntryGate == NULL ||
xExitGate == NULL || xDisplayMutex == NULL) {
printf("ERROR: Failed to create semaphores!\n");
while(1); // 系统停止
}
// 使二进制信号量初始可用
xSemaphoreGive(xEntryGate);
xSemaphoreGive(xExitGate);
printf("Parking system initialized successfully\n");
printf("Total parking spaces: 10\n");
}
// 主函数
int main(void) {
// 初始化停车场系统
vInitializeParkingSystem();
// 创建车辆任务(5辆常规车辆)
for(int i = 1; i <= 5; i++) {
xTaskCreate(vCarEntryTask, "CarEntry", 1024, (void*)i, 2, NULL);
}
// 创建监控任务
xTaskCreate(vParkingMonitorTask, "Monitor", 1024, NULL, 1, NULL);
// 创建紧急车辆任务(较高优先级)
xTaskCreate(vEmergencyVehicleTask, "Emergency", 1024, NULL, 3, NULL);
printf("Starting FreeRTOS scheduler...\n");
vTaskStartScheduler();
// 如果调度器启动失败
while(1) {
printf("ERROR: Scheduler failed to start!\n");
}
return 0;
}
信号量使用最佳实践
信号量类型选择指南
二进制信号量适用场景: - 任务间同步 - 简单的互斥保护 - 事件通知机制
计数信号量适用场景: - 资源池管理(连接池、缓冲区等) - 限流控制 - 生产者-消费者模式中的资源计数
互斥量适用场景: - 共享资源保护 - 可能发生优先级反转的情况 - 需要递归访问的共享资源
错误处理策略
获取超时处理
BaseType_t xResult = xSemaphoreTake(xSemaphore, timeout);
if(xResult != pdTRUE) {
// 处理策略:重试、使用备用方案、报告错误等
handle_semaphore_timeout();
}
信号量创建检查
xSemaphore = xSemaphoreCreateBinary();
if(xSemaphore == NULL) {
// 内存不足处理
handle_memory_allocation_failure();
}
性能优化建议
- 合理设置超时:避免任务永久阻塞
- 使用静态分配:在确定性系统中减少动态分配
- 避免信号量滥用:简单的标志位可以使用任务通知替代
- 注意优先级安排:防止优先级反转和死锁
优先级反转
这是一个在实时系统或多任务系统中非常重要且经典的问题。
核心概念:什么是优先级反转?
优先级反转是指:在高优先级任务等待一个被低优先级任务占有的资源时,被一个中间优先级的任务抢占,从而导致高优先级任务被迫等待无限期延长的现象。
简单来说,就是高优先级任务“卡住了”,反而让中优先级任务先运行,这严重违背了优先级调度机制的初衷。
一个经典的优先级反转场景(举例说明)
假设一个系统中有三个任务,优先级从高到低排列:
- 任务H:高优先级
- 任务M:中优先级
- 任务L:低优先级
系统中有一个资源R(例如一个打印机、一段共享内存),由信号量S 来保护。
问题发生的步骤:
- 时刻1:任务L开始运行,并成功获取了信号量S,开始使用资源R。
- 时刻2:任务H就绪。由于它的优先级最高,系统会抢占任务L,开始运行任务H。
- 时刻3:任务H在运行过程中,也尝试去获取信号量S,以使用资源R。
- 但此时信号量S已经被任务L持有。
- 因此,任务H被阻塞,进入等待状态,并释放CPU。
- 时刻4:系统需要调度一个新任务。此时,任务L(持有信号量)和任务M(就绪)都在等待CPU。
- 按照优先级规则,任务M的优先级高于任务L。
- 因此,系统调度任务M开始运行。(这是问题的关键!)
- 时刻5:任务M开始长时间运行(例如,一个大的计算循环),因为它不需要资源R,所以它不会释放信号量S。
- 结果:
- 任务L(持有信号量S)无法运行,因为它被任务M抢占了。
- 任务L无法运行,就无法释放信号量S。
- 任务H(高优先级)虽然在等待信号量S,但它只能干等着任务L释放信号量。
- 而任务L又在等着任务M释放CPU。
最终形成了一个尴尬的链条:H 在等 L,L 在等 M,而 M 与 H、L 都无关。 高优先级任务H的行为,实际上被中优先级任务M“绑架”了。这就是优先级反转。
优先级反转的危害
- 破坏实时性:在严格的实时系统中,高优先级任务必须在规定的时间内完成。优先级反转会导致其响应时间无法预测,甚至无限期延迟,可能导致系统崩溃或严重故障。
- 逻辑错误:在普通系统中,会导致性能下降、死锁-like的现象,难以调试。
解决方案
为了解决优先级反转问题,工程师们提出了几种有效的方案:
方案一:优先级继承
这是最常用的解决方案。
- 核心思想:当一个高优先级任务H等待一个被低优先级任务L占有的信号量时,临时将任务L的优先级提升到与任务H相同。(提拔)
- 如何工作:
- 在上面的例子中,当任务H在时刻3尝试获取信号量失败时,系统会立即将任务L的优先级提升到与任务H一样高。
- 这样,在时刻4进行调度时,任务L的优先级(已被临时提升为高)就高于任务M。
- 因此,CPU会立刻分配给任务L,让它继续执行。
- 任务L得以快速执行完临界区代码,释放信号量S。
- 一旦任务L释放了信号量:
- 它的优先级会恢复为原来的低优先级。
- 等待该信号量的任务H立即获取到信号量,并因其本身的高优先级而开始运行。
- 效果:有效打破了“H等L,L被M阻塞”的链条,将反转的影响降到最低。
方案二:优先级天花板
这是一个更激进、也更确定的方案。
- 核心思想:为每个信号量预设一个“天花板”优先级,这个优先级通常等于所有可能访问该信号量的任务中的最高优先级。当一个任务成功获取该信号量时,它的优先级会自动被提升到这个天花板优先级。
- 如何工作:
- 在例子中,假设信号量S的天花板优先级被设为任务H的优先级。
- 在时刻1,任务L一获取信号量S,它的优先级立刻被提升到天花板优先级(即H的优先级)。
- 这样,在时刻2,任务H就绪时,它无法抢占任务L,因为此时它们优先级相同(或者根据具体实现,可能仍由L继续运行)。
- 任务L会不受干扰地、快速地运行完临界区代码,然后释放信号量,同时优先级恢复原状。
- 之后,任务H才能抢占并运行。
- 效果:它甚至防止了优先级反转的发生,而不是等发生了再去解决。但它可能导致不必要的优先级提升,稍微降低系统灵活性。
总结
| 特性 | 问题:优先级反转 | 解决方案1:优先级继承 | 解决方案2:优先级天花板 |
|---|---|---|---|
| 核心思想 | 高优先级任务被中优先级任务间接阻塞 | 临时提升低优先级任务的优先级 | 在持有锁时直接提升到预设最高优先级 |
| 行为时机 | 自然发生 | 发生阻塞时触发提升 | 获取信号量时立即提升 |
| 优点 | - | 动态、高效、资源占用少 | 能防止死锁,行为更确定 |
| 缺点 | 破坏系统实时性,难以调试 | 实现稍复杂,存在链式继承问题 | 可能造成不必要的优先级提升 |
更多请见Mutex
常见问题与解决方案
死锁预防: - 按照固定顺序获取多个信号量 - 设置合理的超时时间 - 使用死锁检测机制