0%

操作系统同步原语

并行处理同一任务意味着对于共享资源的并发访问,为了保证共享资源状态的正确性,需要正确的在这些子任务中进行同步。为了正确的高效的解决这些同步问题,抽象出了同步原语这一概念。

互斥锁

在任意时刻只允许最多一个线程访问的方式被称为互斥访问,保证互斥访问共享资源的代码区域被称为临界区。在同一时刻只能有一个线程可以执行临界区中的代码。因此在临界区中,一个线程可以安全的对共享资源进行操作。为了达到这样的目的,需要设计一个协议来保证临界区的互斥性,保证互斥访问、优先等待以及空闲让出。

硬件实现:关闭中断

在单核条件中,关闭中断意味着当前执行的线程不能被其他线程抢占。因此如果在进入临界区之前关闭中断并在离开临界区之后在开启中断,那么就能避免当前执行临界区代码的线程被打断,保证了只有一个线程执行临界区,也就保证了互斥访问。

但是在多线程环境中,关闭中断的方式不再适用。即使关闭了所有核心的中断,也不能阻塞其他核心上正在运行的线程执行。

软件实现:皮特森算法

皮特森算法是一种纯软件方法,可以解决两个线程的临界区问题。其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bool flag[2] = {false,false};
int trun = 0;
// 线程1
while(true) {
flag[0] = true;
turn = 1;
while(flag[1] == true && turn == 1);
// 临界区部分
flag[0] = false;
// 其他部分
}

// 线程2
while(true) {
flag[1] = true;
turn = 0;
while(flag[0] == true && turn == 0);
// 临界区部分
flag[1] = false;
// 其他部分
}

其中有两个重要的变量,一个是全局数组flag,其有两个布尔成员(flag[0]和flag[1]),分别代表线程1和线程2是否尝试进入临界区。第二个是全局变量turn。如果两个线程都申请进入临界区,那么turn将决定最终进入临界区的线程编号。线程在进入临界区之前,以线程1为例,必须满足flag[1]==false(表示线程1没有申请进入临界区)或者trun==0(表示turn决定线程1可以进入临界区),方能进入临界区。

皮特森算法要求访存严格按照程序顺序执行,然而现代CPU为了达到更好的性能允许访存操作乱序执行,因此皮森特算法在这些CPU上无法正常的进行工作。

软硬件协同:原子操作

硬件提供的原子操作也可以解决临界区问题。原子操作指的是不可被打断的一个或者一系列的操作,最常见的是比较与置换(CAS),利用该特性可以实现互斥锁。

自旋锁

自旋锁利用一个变量lock来表示锁的状态,在加锁时,线程会通过CAS判断lock是否空闲,如果空闲则上锁,否则一遍遍的进行重试。代码如下所示:

1
2
3
4
5
6
7
8
9
void lock_init(int* lock){
*lock = 0;
}
void lock(int* lock){
while (atomic_CAS(lock,0,1) != 0);
}
void unlock(int* lock){
*lock = 0;
}

自旋锁的实现简单,在竞争程度低的时候非常高效,依然广泛的运用在各种软件之中。

条件变量

条件变量可以让一个线程停止使用CPU并将自己挂起。当等待的条件满足时,其他线程会唤醒该挂起的线程让其继续执行。使用条件变量能够避免无谓的循环等待。

条件变量的实现如下,每个条件变量的结构体中都包含了一个wait_list,用于记录等待在该条件变量上的线程。线程在调用cond_wait挂起自己时,需要先将当前线程加入到等待队列之中,然后原子的挂起当前线程并且释放锁。这个操作需要利用到操作系统辅助完成,例如使用Linux系统下的futex机制。当该线程被其他线程唤醒以后,其应当再次获取锁并且进入临界区然后返回。cond_signal函数会利用操作系统提供的唤醒服务将线程唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct cond {
struct thread *wait_list;
};

void cond_wait(struct cond *cond, struct lock *mutex) {
list_append(cond->wait_list,current_thread());
atomic_block_unlock(mutex); // yuan'zi挂起
lock(mutex);
}

void cond_signal(struct cond *cond, struct lock *mutex) {
if (!list_empty(cond->wait_list)){
wakeup(list_remove(cond->wait_list));
}
}

void cond_broadcast(struct cond *cond){
while (!list_empty(cond->wait_list)){
wakeup(list_remove(cond->wait_list));
}
}

信号量

信号量在不同的线程之间充当信号灯,其根据剩余资源数量控制不同线程的执行或者等待。信号量定义了两个操作分别为P和V,因此信号量又被称为PV原语。一般使用wait和signal来表示信号量的P和V操作。信号量的语义如下:

1
2
3
4
5
6
7
void wait(int* S){
while(*S <= 0); // 循环忙等
*S = *S-1;
}
void signal(int* S){
*S = *S + 1;
}

上面的代码仅仅是信号量的语义,并非信号量的真正实现。该代码存在两个问题。首先,可能存在多个线程同时更新共享信号量的情况导致更新丢失的问题。同时当一次资源被释放时,同时通知多个线程的效率比较低,让多余的线程挂起并且放弃CPU才是正确的选择。

实现一个正确的信号量需要组合使用互斥锁和条件变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
struct sem {
/*
没有线程等待时,value 为整数或零,表示剩余的资源数量
有线程等待时,value 为负数,代表正在等待资源的线程数量
*/
int value;
/*
有线程等待时的可用资源数量,也代表应当唤醒的线程数量
*/
int wakeup;
struct lock sem_lock;
struct cond sem_cond;
};

void wait(struct sem *S) {
lock(&S->sem_lock);
S->value--;
if (S->value < 0) { // 代表没有空闲的资源
do {
cond_wait(&S->sem_cond, &S->sem_lock);
} while (S->wakeup == 0);
S->wakeup--;
}
unlock(&S->sem_lock);
}

void signal(struct sem *S) {
lock(&S->sem_lock);
S->value++;
if (S->value <= 0) {
S->wakeup++;
cond_signal(&S->sem_cond);
}
unlock(&S->sem_lock);
}

在调用wait操作时,该线程会在获取该信号量的互斥锁后,先将value数值减1。若其值大于等于0,说明有空闲的资源,wait操作成功。如果value小于0,那么需要使用到wakeup来判断是否与空闲的资源来消耗。如果wakeup也为0,需要将当前线程挂起等待。

信号量与互斥锁有着相似之处。如果信号量只允许其值在0和1之间变化时,那么可以把信号量当作互斥锁来使用。

读写锁

当一些线程仅仅需要读取而非修改共享资源时,就没有必要保证它们之间的互斥性。在这种情况下直接使用互斥锁不会带来任何正确性的问题,但是会削减读者之间的并行度,造成性能损失。读写锁就是为了解决这一问题而提出的同步原语。

读写锁允许多个读者同时进入读临界区,因此会带来倾向性的问题。假设在某一时刻,已经有一些读者在写临界区,那么此时有一个读者和一个写者同时申请进入临界区,那么是否允许该读者进入写临界区。如果允许读者进入写临界区,称为偏向读者的读写锁,否则称为偏向写者的读写锁。

下面的代码实现了偏向读写的读写锁。读者在上锁的时候需要使用到reader_lock来保证reader_cnt更新的原子性。如果是第一个读者还需要获取writer_lock保证读者和写者的互斥。同样如果读者在释放锁时发现自己是最后一个读者还需要释放write_lock取消对于写者的阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
struct rwlock {
int reader_cnt;
struct lock reader_lock;
struct lock writer_lock;
};

void lock_reader(struct rwlock *lock) {
lock(&lock->reader_lock);
lock->reader_cnt++;
if (lock->reader_cnt == 1) { // 第一个读者
lock(&lock->writer_lock);
}
unlock(&lock->reader_lock);
}

void unlock_reader(struct rwlock *lock) {
lock(&lock->reader_lock);
lock->reader_cnt--;
if (lock->reader_cnt == 0) { // 最后一个读者
unlock(&lock->writer_lock);
}
unlock(&lock->reader_lock);
}

void lock_writer(struct rwlock *lock) {
lock(&lock->writer_lock);
}

void unlock_writer(struct rwlock *lock) {
unlock(&lock->writer_lock);
}

偏向写者的读写锁实现更加复杂。在rwlock结构中需要添加一个布尔变量has_writer来表示当前是否有写者到达。当有写者获得锁时,会将has_writer设置为true。读者通过判断has_writer的值来决定是否需要等待前序的写者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
struct rwlock {
volatile int reader_cnt;
volatile bool has_writer;
/* 读者写者需要共享锁 */
struct lock lock;
struct cond reader_cond;
struct cond writer_cond;
};

void lock_reader(struct rwlock *lock) {
lock(&lock->lock);
while (lock->has_writer){
cond_wait(&lock->writer_cond,&lock->lock);
}
lock->reader_cnt++;
unlock(&lock->lock);
}

void unlock_reader(struct rwlock *lock) {
lock(&lock->lock);
lock->reader_cnt--;
if (lock->reader_cnt == 0) { // 最后一个读者
cond_signal(&lock->reader_cond);
}
unlock(&lock->lock);
}

void lock_writer(struct rwlock *lock) {
lock(&lock->lock);
while (lock->has_writer){
cond_wait(&lock->writer_cond,&lock->lock);
}
lock->has_writer= true;
while (lock->reader_cnt>0){
cond_wait(&lock->reader_cond,&lock->lock);
}
unlock(&lock->lock);
}

void unlock_writer(struct rwlock *lock) {
lock(&lock->lock);
lock->has_writer= false;
cond_broadcast(lock->writer_cond);
unlock(&lock->lock);
}

偏向读者的锁能够大幅的提升读者之间的并行度,但是会面临很高的写延迟,偏向写者的锁正好相反。因此在实际使用场景中需要根据具体需求选择合适的读写锁。

死锁

死锁指的是一组线程中每一个线程都在等待其他线程释放资源而导致的无限等待。下列的代码就演示了死锁出现的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void thread_a(){
lock(lock_a);
lock(lock_b);
//do something
unlock(lock_b);
unlock(lock_a);
}

void thread_b(){
lock(lock_b);
lock(lock_a);
//do something
unlock(lock_a);
unlock(lock_b);
}

产生原因

导致死锁出现一共有四个必要条件

  1. 互斥访问:一个共享资源在同一时刻只能被至多一个线程访问。
  2. 持有并等待:线程持有一些资源的同时在等待一些资源。
  3. 资源非抢占:一旦一个资源被持有,除非持有者主动放弃,否则其他竞争者得不到这个资源。
  4. 循环等待:在一系列线程中,例如T0,T1,T2。其中T0等待T1,T1等待T2,T2等待T0,形成了一个循环。

死锁检测与恢复

当死锁出现时,需要一个第三方来打破僵局,帮助进行死锁恢复。操作系统往往会扮演这个第三方的角色。

前面死锁的四个条件之中只有循环等待这一条件与实际运行状态相关,因此是检测死锁的关键。为了确认系统中是否存在循环等待,需要获取到系统中资源分配以及线程等待的相关信息,通过这两张表建立图,判断图中是否存在环,就可以进行死锁的检测。

再找到的环中的任意线程,终止该线程并释放其占有的所有资源就可以进行死锁的恢复。

死锁预防

死锁预防指的是通过合理的资源分配算法,从源头上预防死锁。为了实现这个目标,需要保证死锁的四个条件不能被同时满足。

  1. 避免互斥访问:
  2. 不允许持有并等待:可以让线程在真正开始操作前一次性申请所有的资源,一旦任意资源不能获取则释放掉所有资源并进行重试。该方法会导致资源利用率低,出现饥饿的情况。
  3. 允许资源被抢占:允许一个资源抢占其他线程已经占有的资源,难点在于如何保证被抢占的线程正确的进行恢复。
  4. 避免循环等待:可以要求线程按照一定的顺序进行资源获取,例如对锁进行编号,要求所有线程按照顺序获取锁,可以避免发生死锁。

死锁避免

死锁避免是通过系统运行时跟踪资源分配过程来避免出现死锁。银行家算法是一种具体的死锁避免算法。银行家算法通过模拟分配资源后的状态来判断系统是否处于安全状态。例如假设系统中存在M个资源,线程有N个,那么会有以下四个数据结构。

  1. 全局可利用资源:Available[M]
  2. 每个线程的最大需求量:Max[N][M]
  3. 已分配资源数量:Allocation[N][M]
  4. 还需要资源数量:Need[N][M]

银行家算法执行检查的流程如下:

  1. 建立一个临时数组Available_temp,其初始值与数组Available数组一致,用于模拟系统可用资源数量。
  2. 找到系统当前剩余资源能够满足的线程,称为线程X,如果找不到对应的线程,说明系统处于非安全状态。
  3. 对于Available_temp中的所有成员m,都执行Available_temp[m]+=Allocation[x][m]。
  4. 遍历所有线程,查看是否还有未被标记执行结束的线程。如果有则回到第二步,否则代表系统处于安全状态。