Linux信号量:多线程同步的核心机制与实践
信号量是Linux多线程编程中不可或缺的同步原语,它通过计数器机制协调多个线程对共享资源的访问。POSIX标准定义了完整的信号量操作接口,涵盖初始化、等待、通知、查询和销毁等全生命周期管理。本文系统梳理这些接口的用法,并通过实际案例展示如何构建高可靠的并发程序。
信号量的核心概念
信号量本质上是一个非负整数计数器,配合两个原子操作实现同步:
- P操作(等待):计数器减1,若结果为负则阻塞
- V操作(通知):计数器加1,唤醒等待的线程
Linux支持两种信号量形式:
| 类型 | 存储位置 | 适用场景 | 管理接口 |
|---|---|---|---|
| 无名信号量 | 进程内存空间 | 线程间同步 | sem_init/sem_destroy |
| 命名信号量 | 内核持久化 | 进程间同步 | sem_open/sem_close/sem_unlink |
下文聚焦于无名信号量的使用,这是多线程开发中最常见的场景。
接口详解与代码示例
sem_init:信号量初始化
#include <semaphore.h>
int sem_init(sem_t *sem, int shared, unsigned int value);
参数说明:
sem:信号量对象的存储地址shared:共享范围,0表示线程间共享,非0需配合共享内存用于进程间value:计数器初始值,决定可同时访问资源的线程数
sem_t resource_sem;
if (sem_init(&resource_sem, 0, 5) != 0) {
perror("信号量初始化失败");
exit(EXIT_FAILURE);
}
sem_wait:阻塞等待资源
int sem_wait(sem_t *sem);
该函数原子性地将信号量值减1。若当前值大于0,立即返回;否则线程进入睡眠状态,直到有其他线程执行sem_post唤醒。
// 生产者-消费者模型中的消费者逻辑
void* consumer_routine(void* param) {
buffer_t* buf = (buffer_t*)param;
while (running) {
sem_wait(&buf->items_ready); // 等待数据可用
pthread_mutex_lock(&buf->lock);
item_t data = remove_item(buf);
process_data(data);
pthread_mutex_unlock(&buf->lock);
sem_post(&buf->slots_free); // 通知有空位
}
return NULL;
}
sem_trywait:非阻塞尝试
int sem_trywait(sem_t *sem);
立即返回,不会阻塞。成功时返回0并减1;资源不可用时返回-1并设置errno为EAGAIN。
// 实现忙时快速失败的策略
if (sem_trywait(&connection_pool) == 0) {
use_connection();
sem_post(&connection_pool);
} else {
// 连接池耗尽,记录日志并返回友好提示
log_warning("连接池已满,请求被拒绝");
return SERVICE_UNAVAILABLE;
}
sem_timedwait:限时等待
int sem_timedwait(sem_t *sem, const struct timespec *abs_time);
关键特性:超时参数是绝对时间(自Epoch起的秒和纳秒),而非相对时长。
bool acquire_with_deadline(sem_t* sem, int timeout_ms) {
struct timespec deadline;
clock_gettime(CLOCK_REALTIME, &deadline);
// 转换为绝对时间
long sec = timeout_ms / 1000;
long nsec = (timeout_ms % 1000) * 1000000;
deadline.tv_sec += sec;
deadline.tv_nsec += nsec;
if (deadline.tv_nsec >= 1000000000) {
deadline.tv_sec++;
deadline.tv_nsec -= 1000000000;
}
int rc = sem_timedwait(sem, &deadline);
if (rc == -1) {
if (errno == ETIMEDOUT) {
return false; // 明确区分超时与其他错误
}
perror("sem_timedwait");
}
return rc == 0;
}
sem_post:释放资源通知
int sem_post(sem_t *sem);
该操作总是成功返回(POSIX保证),将信号量值加1。若有等待线程,唤醒其中一个;无等待线程时单纯增加计数器。
// 异步任务完成回调
void on_task_finished(task_result_t* result) {
// 线程安全,可在信号处理函数中调用
sem_post(&result->completion_notify);
}
sem_getvalue:查询当前状态
int sem_getvalue(sem_t *sem, int *sval);
获取的值可能瞬间失效,仅适用于监控和调试,不能用于同步决策。
void dump_semaphore_status(sem_t* sem, const char* name) {
int current;
sem_getvalue(sem, ¤t);
printf("[%s] 当前值: %d (负值表示|%d|个等待线程)\n",
name, current, current < 0 ? -current : 0);
}
sem_destroy:销毁清理
int sem_destroy(sem_t *sem);
前置条件:无线程正在等待该信号量。销毁后对象不可再用,除非重新初始化。
综合实战:HTTP请求限流器
实现基于令牌桶算法的请求限流器,控制并发请求数量并支持优雅降级。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>
#define MAX_CONCURRENT 4
#define QUEUE_DEPTH 20
#define REQUEST_TIMEOUT_MS 3000
typedef enum {
REQ_NORMAL = 0,
REQ_PRIORITY = 1,
REQ_BACKGROUND = 2
} req_priority_t;
typedef struct {
int req_id;
req_priority_t level;
struct timespec enqueue_time;
char endpoint[128];
} http_request_t;
typedef struct {
http_request_t queue[QUEUE_DEPTH];
int head, tail;
int count;
sem_t capacity; // 队列空位
sem_t urgent_items; // 高优先级请求
sem_t normal_items; // 普通请求
sem_t bg_items; // 后台请求
sem_t mutex; // 保护队列结构
} request_queue_t;
static request_queue_t g_queue;
static sem_t g_worker_slots;
static volatile int g_processed = 0;
static volatile int g_dropped = 0;
void queue_init(void) {
memset(&g_queue, 0, sizeof(g_queue));
sem_init(&g_queue.capacity, 0, QUEUE_DEPTH);
sem_init(&g_queue.urgent_items, 0, 0);
sem_init(&g_queue.normal_items, 0, 0);
sem_init(&g_queue.bg_items, 0, 0);
sem_init(&g_queue.mutex, 0, 1);
sem_init(&g_worker_slots, 0, MAX_CONCURRENT);
}
void queue_destroy(void) {
sem_destroy(&g_queue.capacity);
sem_destroy(&g_queue.urgent_items);
sem_destroy(&g_queue.normal_items);
sem_destroy(&g_queue.bg_items);
sem_destroy(&g_queue.mutex);
sem_destroy(&g_worker_slots);
}
bool enqueue_request(http_request_t* req) {
struct timespec deadline;
clock_gettime(CLOCK_REALTIME, &deadline);
deadline.tv_nsec += 500000000; // 500ms入队超时
if (sem_timedwait(&g_queue.capacity, &deadline) != 0) {
g_dropped++;
return false;
}
sem_wait(&g_queue.mutex);
g_queue.queue[g_queue.tail] = *req;
g_queue.tail = (g_queue.tail + 1) % QUEUE_DEPTH;
g_queue.count++;
sem_post(&g_queue.mutex);
// 按优先级通知对应等待者
switch (req->level) {
case REQ_PRIORITY: sem_post(&g_queue.urgent_items); break;
case REQ_NORMAL: sem_post(&g_queue.normal_items); break;
case REQ_BACKGROUND: sem_post(&g_queue.bg_items); break;
}
return true;
}
bool dequeue_request(http_request_t* out) {
// 优先级:紧急 > 普通 > 后台
struct timespec deadline;
clock_gettime(CLOCK_REALTIME, &deadline);
deadline.tv_sec += 2; // 2秒等待高优先级
sem_t* selected = NULL;
if (sem_timedwait(&g_queue.urgent_items, &deadline) == 0) {
selected = &g_queue.urgent_items;
} else if (sem_trywait(&g_queue.normal_items) == 0) {
selected = &g_queue.normal_items;
} else if (sem_trywait(&g_queue.bg_items) == 0) {
selected = &g_queue.bg_items;
} else {
return false; // 无可用请求
}
sem_wait(&g_queue.mutex);
*out = g_queue.queue[g_queue.head];
g_queue.head = (g_queue.head + 1) % QUEUE_DEPTH;
g_queue.count--;
sem_post(&g_queue.mutex);
sem_post(&g_queue.capacity);
(void)selected; // 标记已使用
return true;
}
void* request_generator(void* arg) {
const char* endpoints[] = {"/api/users", "/api/orders", "/api/stats", "/api/export"};
int id = 0;
for (int batch = 0; batch < 5; batch++) {
for (int i = 0; i < 8; i++) {
http_request_t req;
req.req_id = ++id;
req.level = (i % 4 == 0) ? REQ_PRIORITY :
(i % 4 == 3) ? REQ_BACKGROUND : REQ_NORMAL;
clock_gettime(CLOCK_REALTIME, &req.enqueue_time);
snprintf(req.endpoint, sizeof(req.endpoint), "%s", endpoints[i % 4]);
if (!enqueue_request(&req)) {
printf("生成器: 请求%d入队失败(队列满)\n", req.req_id);
} else {
printf("生成器: 请求%d[%s]已入队\n", req.req_id,
req.level == REQ_PRIORITY ? "紧急" :
req.level == REQ_BACKGROUND ? "后台" : "普通");
}
usleep(200000); // 200ms间隔
}
sleep(1);
}
return NULL;
}
void* worker_handler(void* arg) {
int wid = *(int*)arg;
while (1) {
http_request_t req;
if (!dequeue_request(&req)) {
// 检查是否应该退出
if (g_processed >= 35) break;
continue;
}
// 获取执行许可
printf("工作者%d: 等待处理槽位 for 请求%d\n", wid, req.req_id);
sem_wait(&g_worker_slots);
// 模拟处理
int processing_ms = 500 + (rand() % 1000);
printf("工作者%d: 处理请求%d(%s), 预计%dms\n",
wid, req.req_id, req.endpoint, processing_ms);
usleep(processing_ms * 1000);
g_processed++;
printf("工作者%d: 完成请求%d [总计:%d]\n", wid, req.req_id, g_processed);
sem_post(&g_worker_slots);
}
printf("工作者%d: 退出\n", wid);
return NULL;
}
void* status_reporter(void* arg) {
(void)arg;
for (int cycle = 0; cycle < 10; cycle++) {
sleep(1);
int cap, urgent, normal, bg, workers;
sem_getvalue(&g_queue.capacity, &cap);
sem_getvalue(&g_queue.urgent_items, &urgent);
sem_getvalue(&g_queue.normal_items, &normal);
sem_getvalue(&g_queue.bg_items, &bg);
sem_getvalue(&g_worker_slots, &workers);
printf("\n--- 状态报告 #%d ---\n", cycle + 1);
printf("队列: 空位%d 紧急%d 普通%d 后台%d | 工作者空闲:%d\n",
cap, urgent, normal, bg, workers);
printf("统计: 已处理%d 丢弃%d\n", g_processed, g_dropped);
}
return NULL;
}
int main(void) {
srand(time(NULL));
queue_init();
pthread_t generator, reporter;
pthread_t workers[3];
int wids[3] = {1, 2, 3};
printf("=== HTTP限流器启动 ===\n");
printf("配置: %d并发槽, %d队列深度\n\n", MAX_CONCURRENT, QUEUE_DEPTH);
pthread_create(&generator, NULL, request_generator, NULL);
for (int i = 0; i < 3; i++) {
pthread_create(&workers[i], NULL, worker_handler, &wids[i]);
}
pthread_create(&reporter, NULL, status_reporter, NULL);
pthread_join(generator, NULL);
for (int i = 0; i < 3; i++) pthread_join(workers[i], NULL);
pthread_join(reporter, NULL);
queue_destroy();
printf("\n=== 系统关闭 ===\n");
return 0;
}
常见陷阱与规避策略
绝对时间与相对时间混淆
sem_timedwait要求绝对时间戳,常见错误是直接传入毫秒数:
// 错误示范
struct timespec wrong = {0, 500000000}; // 被当作1970-01-01 00:00:00.5
sem_timedwait(&sem, &wrong); // 立即超时!
// 正确做法
struct timespec correct;
clock_gettime(CLOCK_REALTIME, &correct);
correct.tv_sec += 0;
correct.tv_nsec += 500000000;
if (correct.tv_nsec >= 1000000000) {
correct.tv_sec++;
correct.tv_nsec -= 1000000000;
}
信号量值负数的含义
当sem_getvalue返回负值时,其绝对值表示正在等待的线程数,而非可用资源数。某些实现可能将负值截断为0,不可依赖此特性做精确判断。
销毁时的竞态条件
确保销毁前所有等待线程已唤醒并完成操作:
void safe_destroy_system(void) {
// 1. 停止新任务提交
shutdown_flag = 1;
// 2. 唤醒所有等待线程(广播机制)
for (int i = 0; i < MAX_WAITERS; i++) {
sem_post(&wakeup_sem);
}
// 3. 等待工作者完成
pthread_join(worker_tid, NULL);
// 4. 安全销毁
sem_destroy(&resource_sem);
}
性能优化建议
- 减少系统调用:高频场景下,先用
sem_trywait快速路径,失败再转sem_wait - 避免 thundering herd:单生产者多消费者时,
sem_post只唤醒一个线程,天然缓解惊群 - 缓存行对齐:频繁访问的信号量变量按64字节对齐,避免伪共享
- 超时粒度:监控类超时建议≥100ms,避免频繁时钟查询开销
替代方案对比
| 机制 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| 信号量 | 资源计数、限流 | 轻量、可跨进程 | 功能单一 |
| 互斥锁+条件变量 | 复杂状态同步 | 灵活、可组合 | 代码复杂度高 |
| 读写锁 | 读多写少 | 读并发 | 写饥饿风险 |
| RCU | 读极多写极少 | 读零开销 | 实现复杂 |
| 无锁结构 | 极致性能 | 无阻塞 | 设计难度大 |
信号量以其简洁的语义和可靠的实现,仍是多线程同步的基础工具。理解其内部计数器模型,合理组合阻塞与非阻塞接口,能够在保证正确性的同时获得优秀的性能表现。