Semaphore

目录

信号量的基本概念与特性

信号量和队列极其类似,不同就在于信号量不进行数据读写操作,只储存计数值。

信号量是操作系统中最基础的同步机制,本质上是一个计数器,用于控制对共享资源的访问和任务间的同步。

核心特性: - 计数器机制:通过计数值管理资源可用性 - 线程安全:原子操作,不会出现竞争条件 - 阻塞等待:任务可以阻塞等待信号量可用 - 无数据传递:只传递信号,不传递实际数据

信号量 vs 队列

特性 信号量 队列
数据传递 不传递数据 传递数据
主要用途 同步、互斥 数据传输
内存开销 较小 较大
灵活性 相对较低 较高

信号量基本操作

可以把信号量想象成一个游乐场的门票管理员,他管理着一个热门游乐设施(共享资源)。

这个管理员手里掌握着一定数量的门票(信号量的计数值),他的规则非常简单:


类比:游乐场门票管理员

  • 共享资源:一个只能同时容纳固定人数的热门游乐设施(比如“太空漩涡”)。
  • 信号量:管理这个设施的管理员,以及他手里的门票。
  • 信号量计数值:管理员手里剩余的可直接入场门票的数量
    • 初始值 = 设施的容量(比如 3 张票,表示最多3个人同时玩)。
  • 任务:想要玩“太空漩涡”的游客们。

信号量的两大核心操作

1. take() 操作 - “获取门票”

这个名字有很多等价叫法:waitP 操作, acquiretake。 就像游客对管理员说:“我要一张票!”

形象化的步骤:

  1. 你走过去问管理员:“我要一张票”(调用 wait())。
  2. 管理员检查手里
    • 情况A(有票):如果计数值 > 0(比如还有1张票),他二话不说,直接递给你一张票,同时把手里的票数减1(计数值--)。你可以直接进去玩了!
    • 情况B(没票):如果计数值 = 0(票发完了),管理员会让你去旁边排队等待(任务进入阻塞状态)。你会被挂在那个设施的等待队列里。
  3. 什么时候能醒来?
    • 当里面有一个人玩完了出来(执行了 signal() 操作),还回一张票时,管理员会叫醒排队队伍里的第一个人(比如按照优先级或先来后到),把票给他。

wait() 的核心目的就是申请资源的访问权。如果申请不到,就乖乖排队等着。


2. give() 操作 - “归还门票”

这个名字也有很多等价叫法:signal, V 操作, release, give。 就像游客玩完了出来对管理员说:“我出来了,票还你!”

形象化的步骤:

  1. 你玩完了,从出口出来(临界区代码执行完毕)。
  2. 你对管理员说:“我还票”(调用 signal())。
  3. 管理员检查等待队列
    • 情况A(有人在等):如果等待队列里有其他游客在等票,管理员不会把票收回盒子,而是直接拍拍队伍第一个人的肩膀,把这张票给他。那个被叫醒的人就可以进去玩了。此时,管理员手里的票数(计数值)没有变化(还是0),他只是完成了一次“交接”。
    • 情况B(没人在等):如果等待队列是空的,管理员就把这张票放回自己手里的票堆,让可用票数加1(计数值++)。

signal() 的核心目的是释放资源的访问权,并通知等待的任务。


其他相关操作

除了核心的 takegive,通常还会有:

  • 初始化:在游乐场开门前,设定这个设施初始有多少张票可用(设置信号量的初始计数值)。

    • 初始值 = 1:这就是我们常说的互斥锁,因为这个设施一次只允许一个人进去(比如一个单人间鬼屋)。
    • 初始值 = N (N>1):这称为计数信号量,用于控制访问数量有限的资源池(比如有3辆的碰碰车场,有10个连接的数据库连接池)。
  • 非阻塞等待tryWait()

    • 你走过去问管理员:“有票吗?立刻告诉我!我不想排队。”
    • 如果没票,他不会让你排队,而是直接告诉你:“没票!”,然后你就可以转身去玩别的了(函数立即返回错误码)。
  • 带超时的等待timedWait()

    • 你走过去说:“我要一张票,但我只排5分钟的队。”
    • 如果5分钟内有人还票,你就能进去。
    • 如果5分钟到了还没票,你就不排了,自己离开(任务从阻塞状态超时返回)。

总结表格

操作 编程函数名 形象化动作 对计数值的影响 对任务的影响
初始化 sem_init(&sem, N) 管理员准备N张门票 设置为N
申请资源 sem_wait() “我要一张票!” 如果有票,则减1 如果没票,就排队阻塞
释放资源 sem_post() “我出来了,还票!” 如果没人等,则加1
如果有人等,不变
叫醒一个排队的人
尝试申请 sem_trywait() “有现票吗?没有我走了” 有票则减1,无票不变 不阻塞,立即返回
限时申请 sem_timedwait() “我排5分钟队,等不到就走” 超时内等到票则减1 超时则结束阻塞

二进制信号量详解

二进制信号量特性

二进制信号量是最简单的信号量类型: - 计数值范围:只有0和1两种状态 - 行为模式:类似于锁机制 - 应用场景:任务同步、互斥访问

状态说明: - 1:信号量可用,任务可以获取 - 0:信号量不可用,任务需要等待

创建二进制信号量

xSemaphoreCreateBinary

SemaphoreHandle_t xSemaphoreCreateBinary(void);

重要特性: - 创建后初始状态为0(不可用) - 需要手动调用xSemaphoreGive使其可用 - 适用于动态内存分配

创建示例

// 创建二进制信号量
SemaphoreHandle_t xBinarySemaphore;

void vInitSemaphores(void) {
    xBinarySemaphore = xSemaphoreCreateBinary();

    if(xBinarySemaphore != NULL) {
        // 创建成功后立即使其可用
        xSemaphoreGive(xBinarySemaphore);
        printf("Binary semaphore created and made available\n");
    } else {
        printf("Failed to create binary semaphore\n");
    }
}

xSemaphoreCreateBinaryStatic

静态创建二进制信号量:

SemaphoreHandle_t xSemaphoreCreateBinaryStatic(StaticSemaphore_t *pxSemaphoreBuffer);

静态创建示例

// 静态分配信号量控制块
static StaticSemaphore_t xBinarySemaphoreBuffer;

void vInitStaticSemaphores(void) {
    // 静态创建二进制信号量
    SemaphoreHandle_t xStaticBinarySemaphore;
    xStaticBinarySemaphore = xSemaphoreCreateBinaryStatic(&xBinarySemaphoreBuffer);

    if(xStaticBinarySemaphore != NULL) {
        xSemaphoreGive(xStaticBinarySemaphore);
        printf("Static binary semaphore created successfully\n");
    }
}

计数信号量详解

计数信号量特性

计数信号量用于管理多个相同的资源: - 计数值范围:0到最大计数值 - 资源管理:计数值表示可用资源数量 - 应用场景:资源池管理、限流控制

创建计数信号量

xSemaphoreCreateCounting

SemaphoreHandle_t xSemaphoreCreateCounting(UBaseType_t uxMaxCount,
                                         UBaseType_t uxInitialCount);

参数说明: - uxMaxCount:信号量最大计数值 - uxInitialCount:信号量初始计数值

创建示例

// 创建各种计数信号量示例
SemaphoreHandle_t xParkingSemaphore;      // 停车场信号量
SemaphoreHandle_t xBufferSemaphore;       // 缓冲区信号量
SemaphoreHandle_t xConnectionSemaphore;   // 连接数信号量

void vCreateCountingSemaphores(void) {
    // 停车场:10个车位,初始全部空着
    xParkingSemaphore = xSemaphoreCreateCounting(10, 10);

    // 缓冲区:最多5个缓冲块,初始有3个可用
    xBufferSemaphore = xSemaphoreCreateCounting(5, 3);

    // 数据库连接:最多8个连接,初始全部可用
    xConnectionSemaphore = xSemaphoreCreateCounting(8, 8);

    // 检查创建结果
    if(xParkingSemaphore == NULL || xBufferSemaphore == NULL || xConnectionSemaphore == NULL) {
        printf("Error: Failed to create counting semaphores\n");
    } else {
        printf("All counting semaphores created successfully\n");
    }
}

xSemaphoreCreateCountingStatic

静态创建计数信号量:

SemaphoreHandle_t xSemaphoreCreateCountingStatic(UBaseType_t uxMaxCount,
                                                UBaseType_t uxInitialCount,
                                                StaticSemaphore_t *pxSemaphoreBuffer);

信号量基本操作函数

xSemaphoreTake - 获取信号量

BaseType_t xSemaphoreTake(SemaphoreHandle_t xSemaphore, 
                         TickType_t xTicksToWait);

参数说明: - xSemaphore:信号量句柄 - xTicksToWait:阻塞超时时间

阻塞行为: - 0:非阻塞,立即返回 - portMAX_DELAY:无限阻塞 - N:阻塞N个时钟节拍

使用示例

void vTaskUsingSemaphore(void *pvParameters) {
    TickType_t xLastWakeTime = xTaskGetTickCount();

    while(1) {
        // 尝试获取信号量,等待100ms
        if(xSemaphoreTake(xBinarySemaphore, 100 / portTICK_PERIOD_MS) == pdTRUE) {
            // 成功获取信号量,执行受保护的操作
            printf("Semaphore acquired, performing critical operation\n");
            vPerformCriticalOperation();

            // 释放信号量
            xSemaphoreGive(xBinarySemaphore);
            printf("Semaphore released\n");
        } else {
            // 获取信号量超时
            printf("Failed to acquire semaphore within timeout\n");
        }

        // 任务周期执行
        vTaskDelayUntil(&xLastWakeTime, 1000 / portTICK_PERIOD_MS);
    }
}

xSemaphoreGive - 释放信号量

任何任务都可以释放

BaseType_t xSemaphoreGive(SemaphoreHandle_t xSemaphore);

返回值: - pdTRUE:成功释放信号量 - pdFALSE:释放失败(信号量已满)

释放示例

void vProducerTask(void *pvParameters) {
    while(1) {
        // 生产数据
        vProduceData();

        // 释放信号量通知消费者
        if(xSemaphoreGive(xDataReadySemaphore) == pdTRUE) {
            printf("Signal sent to consumer\n");
        } else {
            printf("Failed to give semaphore\n");
        }

        vTaskDelay(500 / portTICK_PERIOD_MS);
    }
}

中断安全操作函数

xSemaphoreTakeFromISR

BaseType_t xSemaphoreTakeFromISR(SemaphoreHandle_t xSemaphore,
                                BaseType_t *pxHigherPriorityTaskWoken);

xSemaphoreGiveFromISR

BaseType_t xSemaphoreGiveFromISR(SemaphoreHandle_t xSemaphore,
                                BaseType_t *pxHigherPriorityTaskWoken);

中断中使用示例

// 外部中断处理函数
void EXTI0_IRQHandler(void) {
    BaseType_t xHigherPriorityTaskWoken = pdFALSE;

    // 清除中断标志
    EXTI->PR = EXTI_PR_PR0;

    // 在中断中释放信号量
    if(xSemaphoreGiveFromISR(xInterruptSemaphore, &xHigherPriorityTaskWoken) == pdTRUE) {
        printf("Semaphore given from ISR\n");
    }

    // 必要时进行任务切换
    portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}

// 处理中断的任务
void vInterruptHandlerTask(void *pvParameters) {
    while(1) {
        // 等待中断信号
        if(xSemaphoreTake(xInterruptSemaphore, portMAX_DELAY) == pdTRUE) {
            printf("Interrupt received, processing...\n");
            vProcessInterrupt();
        }
    }
}

信号量删除函数

vSemaphoreDelete

void vSemaphoreDelete(SemaphoreHandle_t xSemaphore);

删除信号量示例

void vCleanupResources(void) {
    // 删除不再使用的信号量
    if(xBinarySemaphore != NULL) {
        vSemaphoreDelete(xBinarySemaphore);
        xBinarySemaphore = NULL;
        printf("Binary semaphore deleted\n");
    }

    if(xCountingSemaphore != NULL) {
        vSemaphoreDelete(xCountingSemaphore);
        xCountingSemaphore = NULL;
        printf("Counting semaphore deleted\n");
    }

    if(xMutex != NULL) {
        vSemaphoreDelete(xMutex);
        xMutex = NULL;
        printf("Mutex deleted\n");
    }
}

信号量查询函数

uxSemaphoreGetCount - 获取信号量计数

UBaseType_t uxSemaphoreGetCount(SemaphoreHandle_t xSemaphore);

查询示例

void vMonitorSemaphoreStatus(void) {
    UBaseType_t uxBinaryCount, uxCountingCount, uxMutexCount;

    // 获取各种信号量的当前计数值
    uxBinaryCount = uxSemaphoreGetCount(xBinarySemaphore);
    uxCountingCount = uxSemaphoreGetCount(xCountingSemaphore);
    uxMutexCount = uxSemaphoreGetCount(xMutex);

    printf("Semaphore Status:\n");
    printf("  Binary: %lu\n", uxBinaryCount);
    printf("  Counting: %lu\n", uxCountingCount);
    printf("  Mutex: %lu\n", uxMutexCount);

    // 根据状态采取行动
    if(uxCountingCount == 0) {
        printf("Warning: No resources available in counting semaphore!\n");
    }
}

信号量综合实验案例:停车场管理系统

#include "FreeRTOS.h"
#include "task.h"
#include "semphr.h"
#include "stdio.h"
#include "string.h"

// 系统信号量定义
SemaphoreHandle_t xParkingSpaces;      // 停车位计数信号量
SemaphoreHandle_t xEntryGate;          // 入口闸机二进制信号量
SemaphoreHandle_t xExitGate;           // 出口闸机二进制信号量
SemaphoreHandle_t xDisplayMutex;       // 显示屏互斥量

// 系统状态变量
static uint32_t ulCarsInParking = 0;
static uint32_t ulTotalCarsServed = 0;

// 显示屏输出函数(受互斥量保护)
void vSafePrintf(const char *format, ...) {
    va_list args;

    // 获取显示屏互斥量
    if(xSemaphoreTake(xDisplayMutex, 100 / portTICK_PERIOD_MS) == pdTRUE) {
        va_start(args, format);
        vprintf(format, args);
        va_end(args);

        // 释放互斥量
        xSemaphoreGive(xDisplayMutex);
    }
}

// 车辆入场任务
void vCarEntryTask(void *pvParameters) {
    uint8_t car_id = (uint8_t)pvParameters;
    char car_plate[10];

    // 生成车牌号
    snprintf(car_plate, sizeof(car_plate), "CAR%03d", car_id);

    vSafePrintf("[ENTRY] Car %s arrived at entrance\n", car_plate);

    while(1) {
        // 等待入口闸机可用(二进制信号量)
        if(xSemaphoreTake(xEntryGate, 2000 / portTICK_PERIOD_MS) == pdTRUE) {
            vSafePrintf("[ENTRY] Car %s is at gate\n", car_plate);

            // 尝试获取停车位(计数信号量)
            if(xSemaphoreTake(xParkingSpaces, 1000 / portTICK_PERIOD_MS) == pdTRUE) {
                // 成功获取停车位
                ulCarsInParking++;
                ulTotalCarsServed++;

                vSafePrintf("[ENTRY] Car %s entered. Parking: %lu/%lu, Total: %lu\n",
                           car_plate, ulCarsInParking, 
                           uxSemaphoreGetCount(xParkingSpaces) + ulCarsInParking,
                           ulTotalCarsServed);

                // 释放入口闸机
                xSemaphoreGive(xEntryGate);

                // 模拟停车时间(1-10秒)
                vTaskDelay((1000 + (rand() % 9000)) / portTICK_PERIOD_MS);

                // 车辆准备离开
                vSafePrintf("[EXIT]  Car %s preparing to leave\n", car_plate);

                // 等待出口闸机
                if(xSemaphoreTake(xExitGate, 2000 / portTICK_PERIOD_MS) == pdTRUE) {
                    // 释放停车位
                    xSemaphoreGive(xParkingSpaces);
                    ulCarsInParking--;

                    vSafePrintf("[EXIT]  Car %s left. Parking: %lu/%lu\n",
                               car_plate, ulCarsInParking,
                               uxSemaphoreGetCount(xParkingSpaces) + ulCarsInParking);

                    // 释放出口闸机
                    xSemaphoreGive(xExitGate);
                }

            } else {
                // 没有停车位可用
                vSafePrintf("[ENTRY] Car %s left - no parking space available\n", car_plate);
                xSemaphoreGive(xEntryGate);
            }
        } else {
            vSafePrintf("[ENTRY] Car %s gave up waiting for gate\n", car_plate);
        }

        // 车辆离开后等待一段时间再回来
        vTaskDelay((5000 + (rand() % 10000)) / portTICK_PERIOD_MS);
        vSafePrintf("[ENTRY] Car %s arrived again\n", car_plate);
    }
}

// 停车场监控任务
void vParkingMonitorTask(void *pvParameters) {
    UBaseType_t uxAvailableSpaces;
    UBaseType_t uxMaxSpaces = 10;  // 假设总共10个车位

    vSafePrintf("[MONITOR] Parking monitor started\n");

    while(1) {
        // 获取当前可用车位数量
        uxAvailableSpaces = uxSemaphoreGetCount(xParkingSpaces);

        vSafePrintf("\n=== Parking Status ===\n");
        vSafePrintf("Available spaces: %lu\n", uxAvailableSpaces);
        vSafePrintf("Cars in parking:  %lu\n", ulCarsInParking);
        vSafePrintf("Total served:     %lu\n", ulTotalCarsServed);
        vSafePrintf("Utilization:      %.1f%%\n", 
                   (float)(uxMaxSpaces - uxAvailableSpaces) / uxMaxSpaces * 100);

        // 状态警告
        if(uxAvailableSpaces == 0) {
            vSafePrintf("WARNING: Parking lot FULL!\n");
        } else if(uxAvailableSpaces <= 2) {
            vSafePrintf("WARNING: Parking lot almost full\n");
        }

        vSafePrintf("=====================\n\n");

        // 每5秒监控一次
        vTaskDelay(5000 / portTICK_PERIOD_MS);
    }
}

// 紧急车辆优先任务
void vEmergencyVehicleTask(void *pvParameters) {
    vSafePrintf("[EMERGENCY] Emergency vehicle service started\n");

    while(1) {
        // 随机模拟紧急车辆到达
        vTaskDelay((30000 + (rand() % 30000)) / portTICK_PERIOD_MS);

        vSafePrintf("[EMERGENCY] Emergency vehicle arriving!\n");

        // 紧急车辆优先入场
        if(xSemaphoreTake(xEntryGate, portMAX_DELAY) == pdTRUE) {
            vSafePrintf("[EMERGENCY] Emergency vehicle entering\n");

            // 强制获取停车位(可能等待)
            if(xSemaphoreTake(xParkingSpaces, portMAX_DELAY) == pdTRUE) {
                ulCarsInParking++;
                ulTotalCarsServed++;

                vSafePrintf("[EMERGENCY] Emergency vehicle parked\n");

                // 释放入口闸机
                xSemaphoreGive(xEntryGate);

                // 紧急处理时间较短
                vTaskDelay(2000 / portTICK_PERIOD_MS);

                // 紧急离开
                if(xSemaphoreTake(xExitGate, portMAX_DELAY) == pdTRUE) {
                    xSemaphoreGive(xParkingSpaces);
                    ulCarsInParking--;

                    vSafePrintf("[EMERGENCY] Emergency vehicle departed\n");
                    xSemaphoreGive(xExitGate);
                }
            }
        }
    }
}

// 系统初始化函数
void vInitializeParkingSystem(void) {
    printf("Initializing Parking Management System...\n");

    // 创建计数信号量:10个停车位,初始全部可用
    xParkingSpaces = xSemaphoreCreateCounting(10, 10);

    // 创建二进制信号量:入口和出口闸机,初始可用
    xEntryGate = xSemaphoreCreateBinary();
    xExitGate = xSemaphoreCreateBinary();

    // 创建互斥量:保护显示屏输出
    xDisplayMutex = xSemaphoreCreateMutex();

    // 检查信号量创建结果
    if(xParkingSpaces == NULL || xEntryGate == NULL || 
       xExitGate == NULL || xDisplayMutex == NULL) {
        printf("ERROR: Failed to create semaphores!\n");
        while(1); // 系统停止
    }

    // 使二进制信号量初始可用
    xSemaphoreGive(xEntryGate);
    xSemaphoreGive(xExitGate);

    printf("Parking system initialized successfully\n");
    printf("Total parking spaces: 10\n");
}

// 主函数
int main(void) {
    // 初始化停车场系统
    vInitializeParkingSystem();

    // 创建车辆任务(5辆常规车辆)
    for(int i = 1; i <= 5; i++) {
        xTaskCreate(vCarEntryTask, "CarEntry", 1024, (void*)i, 2, NULL);
    }

    // 创建监控任务
    xTaskCreate(vParkingMonitorTask, "Monitor", 1024, NULL, 1, NULL);

    // 创建紧急车辆任务(较高优先级)
    xTaskCreate(vEmergencyVehicleTask, "Emergency", 1024, NULL, 3, NULL);

    printf("Starting FreeRTOS scheduler...\n");
    vTaskStartScheduler();

    // 如果调度器启动失败
    while(1) {
        printf("ERROR: Scheduler failed to start!\n");
    }

    return 0;
}

信号量使用最佳实践

信号量类型选择指南

二进制信号量适用场景: - 任务间同步 - 简单的互斥保护 - 事件通知机制

计数信号量适用场景: - 资源池管理(连接池、缓冲区等) - 限流控制 - 生产者-消费者模式中的资源计数

互斥量适用场景: - 共享资源保护 - 可能发生优先级反转的情况 - 需要递归访问的共享资源

错误处理策略

获取超时处理

BaseType_t xResult = xSemaphoreTake(xSemaphore, timeout);
if(xResult != pdTRUE) {
    // 处理策略:重试、使用备用方案、报告错误等
    handle_semaphore_timeout();
}

信号量创建检查

xSemaphore = xSemaphoreCreateBinary();
if(xSemaphore == NULL) {
    // 内存不足处理
    handle_memory_allocation_failure();
}

性能优化建议

  1. 合理设置超时:避免任务永久阻塞
  2. 使用静态分配:在确定性系统中减少动态分配
  3. 避免信号量滥用:简单的标志位可以使用任务通知替代
  4. 注意优先级安排:防止优先级反转和死锁

优先级反转

这是一个在实时系统或多任务系统中非常重要且经典的问题。

核心概念:什么是优先级反转?

优先级反转是指:在高优先级任务等待一个被低优先级任务占有的资源时,被一个中间优先级的任务抢占,从而导致高优先级任务被迫等待无限期延长的现象。

简单来说,就是高优先级任务“卡住了”,反而让中优先级任务先运行,这严重违背了优先级调度机制的初衷。


一个经典的优先级反转场景(举例说明)

假设一个系统中有三个任务,优先级从高到低排列:

  • 任务H:高优先级
  • 任务M:中优先级
  • 任务L:低优先级

系统中有一个资源R(例如一个打印机、一段共享内存),由信号量S 来保护。

问题发生的步骤:

  1. 时刻1:任务L开始运行,并成功获取了信号量S,开始使用资源R。
  2. 时刻2:任务H就绪。由于它的优先级最高,系统会抢占任务L,开始运行任务H。
  3. 时刻3:任务H在运行过程中,也尝试去获取信号量S,以使用资源R。
    • 但此时信号量S已经被任务L持有。
    • 因此,任务H被阻塞,进入等待状态,并释放CPU。
  4. 时刻4:系统需要调度一个新任务。此时,任务L(持有信号量)和任务M(就绪)都在等待CPU。
    • 按照优先级规则,任务M的优先级高于任务L。
    • 因此,系统调度任务M开始运行。(这是问题的关键!)
  5. 时刻5:任务M开始长时间运行(例如,一个大的计算循环),因为它不需要资源R,所以它不会释放信号量S。
  6. 结果
    • 任务L(持有信号量S)无法运行,因为它被任务M抢占了。
    • 任务L无法运行,就无法释放信号量S。
    • 任务H(高优先级)虽然在等待信号量S,但它只能干等着任务L释放信号量。
    • 而任务L又在等着任务M释放CPU。

最终形成了一个尴尬的链条:H 在等 LL 在等 M,而 MHL 都无关。 高优先级任务H的行为,实际上被中优先级任务M“绑架”了。这就是优先级反转。


优先级反转的危害

  • 破坏实时性:在严格的实时系统中,高优先级任务必须在规定的时间内完成。优先级反转会导致其响应时间无法预测,甚至无限期延迟,可能导致系统崩溃或严重故障。
  • 逻辑错误:在普通系统中,会导致性能下降、死锁-like的现象,难以调试。

解决方案

为了解决优先级反转问题,工程师们提出了几种有效的方案:

方案一:优先级继承

这是最常用的解决方案。

  • 核心思想:当一个高优先级任务H等待一个被低优先级任务L占有的信号量时,临时将任务L的优先级提升到与任务H相同。(提拔)
  • 如何工作
    • 在上面的例子中,当任务H在时刻3尝试获取信号量失败时,系统会立即将任务L的优先级提升到与任务H一样高。
    • 这样,在时刻4进行调度时,任务L的优先级(已被临时提升为高)就高于任务M。
    • 因此,CPU会立刻分配给任务L,让它继续执行。
    • 任务L得以快速执行完临界区代码,释放信号量S。
    • 一旦任务L释放了信号量:
      1. 它的优先级会恢复为原来的低优先级。
      2. 等待该信号量的任务H立即获取到信号量,并因其本身的高优先级而开始运行。
  • 效果:有效打破了“H等L,L被M阻塞”的链条,将反转的影响降到最低。

方案二:优先级天花板

这是一个更激进、也更确定的方案。

  • 核心思想:为每个信号量预设一个“天花板”优先级,这个优先级通常等于所有可能访问该信号量的任务中的最高优先级。当一个任务成功获取该信号量时,它的优先级会自动被提升到这个天花板优先级
  • 如何工作
    • 在例子中,假设信号量S的天花板优先级被设为任务H的优先级。
    • 在时刻1,任务L一获取信号量S,它的优先级立刻被提升到天花板优先级(即H的优先级)。
    • 这样,在时刻2,任务H就绪时,它无法抢占任务L,因为此时它们优先级相同(或者根据具体实现,可能仍由L继续运行)。
    • 任务L会不受干扰地、快速地运行完临界区代码,然后释放信号量,同时优先级恢复原状。
    • 之后,任务H才能抢占并运行。
  • 效果:它甚至防止了优先级反转的发生,而不是等发生了再去解决。但它可能导致不必要的优先级提升,稍微降低系统灵活性。

总结

特性 问题:优先级反转 解决方案1:优先级继承 解决方案2:优先级天花板
核心思想 高优先级任务被中优先级任务间接阻塞 临时提升低优先级任务的优先级 在持有锁时直接提升到预设最高优先级
行为时机 自然发生 发生阻塞时触发提升 获取信号量时立即提升
优点 - 动态、高效、资源占用少 能防止死锁,行为更确定
缺点 破坏系统实时性,难以调试 实现稍复杂,存在链式继承问题 可能造成不必要的优先级提升

更多请见Mutex

常见问题与解决方案

死锁预防: - 按照固定顺序获取多个信号量 - 设置合理的超时时间 - 使用死锁检测机制