译者:飞龙
信号量是学习同步的一个好方式,但是它们实际上并没有像互斥体和条件变量一样被广泛使用。
尽管如此,还是有一些同步问题可以用信号量简单解决,产生显然更加合适的解决方案。
这一章展示了C语言用于处理信号量的API,以及我用于使它更加容易使用的代码。而且它展示了一个终极挑战:你能不能使用互斥体和条件变量来实现一个信号量?
这一章的代码在本书仓库的semaphore
目录中。
信号量是用于使线程协同工作而不互相影响的数据结构。
POSIX标准规定了信号量的接口,它并不是pthread
的一部分,但是多数实现pthread
的UNIX系统也实现了信号量。
POSIX信号量的类型是sem_t
。这个类型表现为结构体,所以如果你将它赋值给一个变量,你会得到它的内容副本。复制信号量完全是一个坏行为,在POSIX中,它的复制行为是未定义的。
幸运的是,包装sem_t
使之更安全并易于使用相当容易。我的包装API在sem.h
中:
typedef sem_t Semaphore;
Semaphore *make_semaphore(int value);
void semaphore_wait(Semaphore *sem);
void semaphore_signal(Semaphore *sem);
Semaphore
是sem_t
的同义词,但是我认为它更加可读,而且大写的首字母会提醒我将它当做对象并使用指针传递它。
这些函数的实现在sem.c
中:
Semaphore *make_semaphore(int value)
{
Semaphore *sem = check_malloc(sizeof(Semaphore));
int n = sem_init(sem, 0, value);
if (n != 0) perror_exit("sem_init failed");
return sem;
}
make_semaphore
接收信号量的初始值作为参数。它为信号量分配空间,将信号量初始化,之后返回指向Semaphore
的指针。
如果执行成功,sem_init
返回0;如果有任何错误,它返回-1。使用包装函数的一个好处就是你可以封装错误检查代码,这会使使用这些函数的代码更加易读。
下面是semaphore_wait
的实现:
void semaphore_wait(Semaphore *sem)
{
int n = sem_wait(sem);
if (n != 0) perror_exit("sem_wait failed");
}
下面是semaphore_signal
:
void semaphore_signal(Semaphore *sem)
{
int n = sem_post(sem);
if (n != 0) perror_exit("sem_post failed");
}
我更喜欢把这个这个操作叫做“signal”而不是“post”,虽然它们是一个意思(发射)。
译者注:如果你习惯了互斥体(锁)的操作,也可以改成
lock
和unlock
。互斥体其实就是信号量容量为1时的特殊形态。
下面是一个例子,展示了如何将信号量用作互斥体:
Semaphore *mutex = make_semaphore(1);
semaphore_wait(mutex);
// protected code goes here
semaphore_signal(mutex);
当你将信号量用作互斥体时,通常需要将它初始化为1,来表示互斥体是未锁的。也就是说,只有一个线程可以通过信号量而不被阻塞。
这里我使用了变量名称mutex
来表明信号量被用作互斥体。但是要记住信号量的行为和pthread
互斥体不完全相同。
使用这些信号量的包装函数,我们可以编写出生产者-消费者问题的解决方案。这一节的代码在queue_sem.c
。
下面是Queue
的一个新定义,使用信号量来代替互斥体和条件变量:
typedef struct {
int *array;
int length;
int next_in;
int next_out;
Semaphore *mutex; //-- new
Semaphore *items; //-- new
Semaphore *spaces; //-- new
} Queue;
下面是make_queue
的新版本:
Queue *make_queue(int length)
{
Queue *queue = (Queue *) malloc(sizeof(Queue));
queue->length = length;
queue->array = (int *) malloc(length * sizeof(int));
queue->next_in = 0;
queue->next_out = 0;
queue->mutex = make_semaphore(1);
queue->items = make_semaphore(0);
queue->spaces = make_semaphore(length-1);
return queue;
}
mutex
用于确保队列的互斥访问,初始值为1,说明互斥体最开始是未锁的。
item
是队列中物品的数量,它也是可非阻塞执行queue_pop
的消费者线程的数量。最开始队列中没有任何物品。
spaces
是队列中剩余空间的数量,也是可非阻塞执行queue_push
的线程数量。最开始的空间数量就是队列的容量length - 1
。
下面是queue_push
的新版本,它由生产者线程调用:
void queue_push(Queue *queue, int item) {
semaphore_wait(queue->spaces);
semaphore_wait(queue->mutex);
queue->array[queue->next_in] = item;
queue->next_in = queue_incr(queue, queue->next_in);
semaphore_signal(queue->mutex);
semaphore_signal(queue->items);
}
要注意queue_push
并不需要调用queue_full
,因为信号量跟踪了有多少空间可用,并且在队列满了的时候阻塞住生产者。
下面是queue_pop
的新版本:
int queue_pop(Queue *queue) {
semaphore_wait(queue->items);
semaphore_wait(queue->mutex);
int item = queue->array[queue->next_out];
queue->next_out = queue_incr(queue, queue->next_out);
semaphore_signal(queue->mutex);
semaphore_signal(queue->spaces);
return item;
}
这个解决方案在《The Little Book of Semaphores》中的第四章以伪代码解释。
为了使用本书仓库的代码,你需要编译并运行这个解决方案,你应该执行:
$ make queue_sem
$ ./queue_sem
任何可以使用信号量解决的问题也可以使用条件变量和互斥体来解决。一个证明方法就是可以使用条件变量和互斥体来实现信号量。
在你继续之前,你可能想要将其做为一个练习:编写函数,使用条件变量和互斥体实现sem.h
中的信号量API。你可以将你的解决方案放到本书仓库的mysem.c
和mysem.h
中,你会在 mysem_soln.c
和mysem_soln.h
中找到我的解决方案。
如果你在开始时遇到了麻烦,你可以使用下面来源于我的代码的结构体定义,作为提示:
typedef struct {
int value, wakeups;
Mutex *mutex;
Cond *cond;
} Semaphore;
value
是信号量的值。wakeups
记录了挂起信号的数量,也就是说它是已被唤醒但是还没有恢复执行的线程数量。wakeups
的原因是确保我们的信号量拥有《The Little Book of Semaphores》中描述的性质3。
mutex
提供了value
和wakeups
的互斥访问,cond
是线程在需要等待信号量时所等待的条件变量。
下面是这个结构体的初始化代码:
Semaphore *make_semaphore(int value)
{
Semaphore *semaphore = check_malloc(sizeof(Semaphore));
semaphore->value = value;
semaphore->wakeups = 0;
semaphore->mutex = make_mutex();
semaphore->cond = make_cond();
return semaphore;
}
下面是我使用POSIX互斥体和条件变量的信号量实现:
void semaphore_wait(Semaphore *semaphore)
{
mutex_lock(semaphore->mutex);
semaphore->value--;
if (semaphore->value < 0) {
do {
cond_wait(semaphore->cond, semaphore->mutex);
} while (semaphore->wakeups < 1);
semaphore->wakeups--;
}
mutex_unlock(semaphore->mutex);
}
当线程等待信号量时,需要在减少value
之前锁住互斥体。如果信号量的值为负,线程会被阻塞直到wakeups
可用。要注意当它被阻塞时,互斥体是未锁的,所以其它线程可以向条件变量发送信号。
semaphore_signal
的代码如下:
void semaphore_signal(Semaphore *semaphore)
{
mutex_lock(semaphore->mutex);
semaphore->value++;
if (semaphore->value <= 0) {
semaphore->wakeups++;
cond_signal(semaphore->cond);
}
mutex_unlock(semaphore->mutex);
}
同样,线程在增加value
之前需要锁住互斥体。如果信号量是负的,说明还有等待线程,所以发送线程需要增加wakeups
并向条件变量发送信号。
此时等待线程可能会唤醒,但是互斥体仍然会锁住它们,直到发送线程解锁了它。
这个时候,某个等待线程从cond_wait
中返回,之后检查是否wakeup
仍然有效。如果没有它会循环并再次等待条件变量。如果有效,它会减少wakeup
,解锁互斥体并退出。
这个解决方案使用do-while
循环的原因可能并不是很明显。你知道为什么不使用更普遍的while
循环吗?会出现什么问题呢?
问题就是while
循环的实现不满足性质3。一个发送线程可以在之后的运行中收到它自己的信号。
使用do-while
循环,就确保[1]了当一个线程发送信号时,另一个等待线程会收到信号,即使发送线程在某个等待线程恢复之前继续运行并锁住互斥体。
[1] 好吧,几乎是这样。实际上一个时机恰当的虚假唤醒会打破这一保证。