当前位置:首页 > 技术 > 正文内容

Linux信号量:多线程同步的核心机制与实践

访客 技术 2026年5月29日 1

信号量是Linux多线程编程中不可或缺的同步原语,它通过计数器机制协调多个线程对共享资源的访问。POSIX标准定义了完整的信号量操作接口,涵盖初始化、等待、通知、查询和销毁等全生命周期管理。本文系统梳理这些接口的用法,并通过实际案例展示如何构建高可靠的并发程序。

信号量的核心概念

信号量本质上是一个非负整数计数器,配合两个原子操作实现同步:

  • P操作(等待):计数器减1,若结果为负则阻塞
  • V操作(通知):计数器加1,唤醒等待的线程

Linux支持两种信号量形式:

类型存储位置适用场景管理接口
无名信号量进程内存空间线程间同步sem_init/sem_destroy
命名信号量内核持久化进程间同步sem_open/sem_close/sem_unlink

下文聚焦于无名信号量的使用,这是多线程开发中最常见的场景。

接口详解与代码示例

sem_init:信号量初始化

#include <semaphore.h>

int sem_init(sem_t *sem, int shared, unsigned int value);

参数说明:

  • sem:信号量对象的存储地址
  • shared:共享范围,0表示线程间共享,非0需配合共享内存用于进程间
  • value:计数器初始值,决定可同时访问资源的线程数
sem_t resource_sem;
if (sem_init(&resource_sem, 0, 5) != 0) {
    perror("信号量初始化失败");
    exit(EXIT_FAILURE);
}

sem_wait:阻塞等待资源

int sem_wait(sem_t *sem);

该函数原子性地将信号量值减1。若当前值大于0,立即返回;否则线程进入睡眠状态,直到有其他线程执行sem_post唤醒。

// 生产者-消费者模型中的消费者逻辑
void* consumer_routine(void* param) {
    buffer_t* buf = (buffer_t*)param;
    
    while (running) {
        sem_wait(&buf->items_ready);  // 等待数据可用
        pthread_mutex_lock(&buf->lock);
        
        item_t data = remove_item(buf);
        process_data(data);
        
        pthread_mutex_unlock(&buf->lock);
        sem_post(&buf->slots_free);   // 通知有空位
    }
    return NULL;
}

sem_trywait:非阻塞尝试

int sem_trywait(sem_t *sem);

立即返回,不会阻塞。成功时返回0并减1;资源不可用时返回-1并设置errnoEAGAIN

// 实现忙时快速失败的策略
if (sem_trywait(&connection_pool) == 0) {
    use_connection();
    sem_post(&connection_pool);
} else {
    // 连接池耗尽,记录日志并返回友好提示
    log_warning("连接池已满,请求被拒绝");
    return SERVICE_UNAVAILABLE;
}

sem_timedwait:限时等待

int sem_timedwait(sem_t *sem, const struct timespec *abs_time);

关键特性:超时参数是绝对时间(自Epoch起的秒和纳秒),而非相对时长。

bool acquire_with_deadline(sem_t* sem, int timeout_ms) {
    struct timespec deadline;
    clock_gettime(CLOCK_REALTIME, &deadline);
    
    // 转换为绝对时间
    long sec = timeout_ms / 1000;
    long nsec = (timeout_ms % 1000) * 1000000;
    
    deadline.tv_sec += sec;
    deadline.tv_nsec += nsec;
    if (deadline.tv_nsec >= 1000000000) {
        deadline.tv_sec++;
        deadline.tv_nsec -= 1000000000;
    }
    
    int rc = sem_timedwait(sem, &deadline);
    if (rc == -1) {
        if (errno == ETIMEDOUT) {
            return false;  // 明确区分超时与其他错误
        }
        perror("sem_timedwait");
    }
    return rc == 0;
}

sem_post:释放资源通知

int sem_post(sem_t *sem);

该操作总是成功返回(POSIX保证),将信号量值加1。若有等待线程,唤醒其中一个;无等待线程时单纯增加计数器。

// 异步任务完成回调
void on_task_finished(task_result_t* result) {
    // 线程安全,可在信号处理函数中调用
    sem_post(&result->completion_notify);
}

sem_getvalue:查询当前状态

int sem_getvalue(sem_t *sem, int *sval);

获取的值可能瞬间失效,仅适用于监控和调试,不能用于同步决策。

void dump_semaphore_status(sem_t* sem, const char* name) {
    int current;
    sem_getvalue(sem, ¤t);
    printf("[%s] 当前值: %d (负值表示|%d|个等待线程)\n", 
           name, current, current < 0 ? -current : 0);
}

sem_destroy:销毁清理

int sem_destroy(sem_t *sem);

前置条件:无线程正在等待该信号量。销毁后对象不可再用,除非重新初始化。

综合实战:HTTP请求限流器

实现基于令牌桶算法的请求限流器,控制并发请求数量并支持优雅降级。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>

#define MAX_CONCURRENT 4
#define QUEUE_DEPTH 20
#define REQUEST_TIMEOUT_MS 3000

typedef enum {
    REQ_NORMAL = 0,
    REQ_PRIORITY = 1,
    REQ_BACKGROUND = 2
} req_priority_t;

typedef struct {
    int req_id;
    req_priority_t level;
    struct timespec enqueue_time;
    char endpoint[128];
} http_request_t;

typedef struct {
    http_request_t queue[QUEUE_DEPTH];
    int head, tail;
    int count;
    
    sem_t capacity;      // 队列空位
    sem_t urgent_items;  // 高优先级请求
    sem_t normal_items;  // 普通请求
    sem_t bg_items;      // 后台请求
    sem_t mutex;         // 保护队列结构
} request_queue_t;

static request_queue_t g_queue;
static sem_t g_worker_slots;
static volatile int g_processed = 0;
static volatile int g_dropped = 0;

void queue_init(void) {
    memset(&g_queue, 0, sizeof(g_queue));
    sem_init(&g_queue.capacity, 0, QUEUE_DEPTH);
    sem_init(&g_queue.urgent_items, 0, 0);
    sem_init(&g_queue.normal_items, 0, 0);
    sem_init(&g_queue.bg_items, 0, 0);
    sem_init(&g_queue.mutex, 0, 1);
    sem_init(&g_worker_slots, 0, MAX_CONCURRENT);
}

void queue_destroy(void) {
    sem_destroy(&g_queue.capacity);
    sem_destroy(&g_queue.urgent_items);
    sem_destroy(&g_queue.normal_items);
    sem_destroy(&g_queue.bg_items);
    sem_destroy(&g_queue.mutex);
    sem_destroy(&g_worker_slots);
}

bool enqueue_request(http_request_t* req) {
    struct timespec deadline;
    clock_gettime(CLOCK_REALTIME, &deadline);
    deadline.tv_nsec += 500000000;  // 500ms入队超时
    
    if (sem_timedwait(&g_queue.capacity, &deadline) != 0) {
        g_dropped++;
        return false;
    }
    
    sem_wait(&g_queue.mutex);
    
    g_queue.queue[g_queue.tail] = *req;
    g_queue.tail = (g_queue.tail + 1) % QUEUE_DEPTH;
    g_queue.count++;
    
    sem_post(&g_queue.mutex);
    
    // 按优先级通知对应等待者
    switch (req->level) {
        case REQ_PRIORITY:  sem_post(&g_queue.urgent_items); break;
        case REQ_NORMAL:    sem_post(&g_queue.normal_items); break;
        case REQ_BACKGROUND: sem_post(&g_queue.bg_items); break;
    }
    return true;
}

bool dequeue_request(http_request_t* out) {
    // 优先级:紧急 > 普通 > 后台
    struct timespec deadline;
    clock_gettime(CLOCK_REALTIME, &deadline);
    deadline.tv_sec += 2;  // 2秒等待高优先级
    
    sem_t* selected = NULL;
    
    if (sem_timedwait(&g_queue.urgent_items, &deadline) == 0) {
        selected = &g_queue.urgent_items;
    } else if (sem_trywait(&g_queue.normal_items) == 0) {
        selected = &g_queue.normal_items;
    } else if (sem_trywait(&g_queue.bg_items) == 0) {
        selected = &g_queue.bg_items;
    } else {
        return false;  // 无可用请求
    }
    
    sem_wait(&g_queue.mutex);
    
    *out = g_queue.queue[g_queue.head];
    g_queue.head = (g_queue.head + 1) % QUEUE_DEPTH;
    g_queue.count--;
    
    sem_post(&g_queue.mutex);
    sem_post(&g_queue.capacity);
    
    (void)selected;  // 标记已使用
    return true;
}

void* request_generator(void* arg) {
    const char* endpoints[] = {"/api/users", "/api/orders", "/api/stats", "/api/export"};
    int id = 0;
    
    for (int batch = 0; batch < 5; batch++) {
        for (int i = 0; i < 8; i++) {
            http_request_t req;
            req.req_id = ++id;
            req.level = (i % 4 == 0) ? REQ_PRIORITY : 
                       (i % 4 == 3) ? REQ_BACKGROUND : REQ_NORMAL;
            clock_gettime(CLOCK_REALTIME, &req.enqueue_time);
            snprintf(req.endpoint, sizeof(req.endpoint), "%s", endpoints[i % 4]);
            
            if (!enqueue_request(&req)) {
                printf("生成器: 请求%d入队失败(队列满)\n", req.req_id);
            } else {
                printf("生成器: 请求%d[%s]已入队\n", req.req_id,
                       req.level == REQ_PRIORITY ? "紧急" :
                       req.level == REQ_BACKGROUND ? "后台" : "普通");
            }
            usleep(200000);  // 200ms间隔
        }
        sleep(1);
    }
    return NULL;
}

void* worker_handler(void* arg) {
    int wid = *(int*)arg;
    
    while (1) {
        http_request_t req;
        if (!dequeue_request(&req)) {
            // 检查是否应该退出
            if (g_processed >= 35) break;
            continue;
        }
        
        // 获取执行许可
        printf("工作者%d: 等待处理槽位 for 请求%d\n", wid, req.req_id);
        sem_wait(&g_worker_slots);
        
        // 模拟处理
        int processing_ms = 500 + (rand() % 1000);
        printf("工作者%d: 处理请求%d(%s), 预计%dms\n", 
               wid, req.req_id, req.endpoint, processing_ms);
        
        usleep(processing_ms * 1000);
        g_processed++;
        
        printf("工作者%d: 完成请求%d [总计:%d]\n", wid, req.req_id, g_processed);
        sem_post(&g_worker_slots);
    }
    
    printf("工作者%d: 退出\n", wid);
    return NULL;
}

void* status_reporter(void* arg) {
    (void)arg;
    
    for (int cycle = 0; cycle < 10; cycle++) {
        sleep(1);
        
        int cap, urgent, normal, bg, workers;
        sem_getvalue(&g_queue.capacity, &cap);
        sem_getvalue(&g_queue.urgent_items, &urgent);
        sem_getvalue(&g_queue.normal_items, &normal);
        sem_getvalue(&g_queue.bg_items, &bg);
        sem_getvalue(&g_worker_slots, &workers);
        
        printf("\n--- 状态报告 #%d ---\n", cycle + 1);
        printf("队列: 空位%d 紧急%d 普通%d 后台%d | 工作者空闲:%d\n",
               cap, urgent, normal, bg, workers);
        printf("统计: 已处理%d 丢弃%d\n", g_processed, g_dropped);
    }
    return NULL;
}

int main(void) {
    srand(time(NULL));
    queue_init();
    
    pthread_t generator, reporter;
    pthread_t workers[3];
    int wids[3] = {1, 2, 3};
    
    printf("=== HTTP限流器启动 ===\n");
    printf("配置: %d并发槽, %d队列深度\n\n", MAX_CONCURRENT, QUEUE_DEPTH);
    
    pthread_create(&generator, NULL, request_generator, NULL);
    for (int i = 0; i < 3; i++) {
        pthread_create(&workers[i], NULL, worker_handler, &wids[i]);
    }
    pthread_create(&reporter, NULL, status_reporter, NULL);
    
    pthread_join(generator, NULL);
    for (int i = 0; i < 3; i++) pthread_join(workers[i], NULL);
    pthread_join(reporter, NULL);
    
    queue_destroy();
    printf("\n=== 系统关闭 ===\n");
    return 0;
}

常见陷阱与规避策略

绝对时间与相对时间混淆

sem_timedwait要求绝对时间戳,常见错误是直接传入毫秒数:

// 错误示范
struct timespec wrong = {0, 500000000};  // 被当作1970-01-01 00:00:00.5
sem_timedwait(&sem, &wrong);  // 立即超时!

// 正确做法
struct timespec correct;
clock_gettime(CLOCK_REALTIME, &correct);
correct.tv_sec += 0;
correct.tv_nsec += 500000000;
if (correct.tv_nsec >= 1000000000) {
    correct.tv_sec++;
    correct.tv_nsec -= 1000000000;
}

信号量值负数的含义

sem_getvalue返回负值时,其绝对值表示正在等待的线程数,而非可用资源数。某些实现可能将负值截断为0,不可依赖此特性做精确判断。

销毁时的竞态条件

确保销毁前所有等待线程已唤醒并完成操作:

void safe_destroy_system(void) {
    // 1. 停止新任务提交
    shutdown_flag = 1;
    
    // 2. 唤醒所有等待线程(广播机制)
    for (int i = 0; i < MAX_WAITERS; i++) {
        sem_post(&wakeup_sem);
    }
    
    // 3. 等待工作者完成
    pthread_join(worker_tid, NULL);
    
    // 4. 安全销毁
    sem_destroy(&resource_sem);
}

性能优化建议

  • 减少系统调用:高频场景下,先用sem_trywait快速路径,失败再转sem_wait
  • 避免 thundering herd:单生产者多消费者时,sem_post只唤醒一个线程,天然缓解惊群
  • 缓存行对齐:频繁访问的信号量变量按64字节对齐,避免伪共享
  • 超时粒度:监控类超时建议≥100ms,避免频繁时钟查询开销

替代方案对比

机制适用场景优势劣势
信号量资源计数、限流轻量、可跨进程功能单一
互斥锁+条件变量复杂状态同步灵活、可组合代码复杂度高
读写锁读多写少读并发写饥饿风险
RCU读极多写极少读零开销实现复杂
无锁结构极致性能无阻塞设计难度大

信号量以其简洁的语义和可靠的实现,仍是多线程同步的基础工具。理解其内部计数器模型,合理组合阻塞与非阻塞接口,能够在保证正确性的同时获得优秀的性能表现。

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。