死锁、活锁、饥饿、自旋锁
死锁
指两个或多个进程因互相持有对方所需的资源而处于等待状态,从而导致程序停止运行的现象。简单来说,在一个系统中,如果进程之间形成了一个循环依赖关系,那么就会发生死锁。
💡 本文所有代码实现语言为 Go, 其他语言的读者可以使用读伪代码的思路来阅读代码,主要目的在于理清逻辑主干,不必过度陷入细节。
四个必要条件
- 互斥: 同一时刻只能有一个进程占用资源,如果其他进程想要访问该资源必须等待
- 占有等待:进程已经持有了至少一个资源,并且正在等待其他资源。这意味着当一个进程被阻塞时,它仍然在持有至少一个资源
- 不可抢占:资源不能被强制性地释放,只能由占用它的进程主动释放
- 循环等待:存在一个等待循环队列,其中每个进程都在等待下一个进程所持有的资源
示例代码
死锁应该是日常开发中遇到的最多的 “锁异常” 场景,例如在一个 goroutine
内部对同一个 channel
同时进行读写:
package main
func main() {
ch := make(chan bool)
ch <- true
<-ch
close(ch)
}
更多可能产生死锁的场景,可以参考 之前的这篇文章。
死锁处理方法
主要分为被动策略和主动策略。
被动策略
鸵鸟策略
像鸵鸟一样,把头埋在沙子里,假装什么都没发生。
因为要解决死锁问题付出的代价很高,鸵鸟策略这种不采取任务措施的方案反而会获得更高的性能。 当发生死锁时不会对用户造成多大影响,或发生死锁的概率很低,可以采用鸵鸟策略。 大多数操作系统,包括 Unix,Linux 和 Windows,处理死锁问题的办法仅仅是忽略它。
主动策略
预防死锁
针对死锁发生的四个必要条件,我们可以逆向操作,只要保证四个必要条件不同时发生,即可预防死锁的发生。
1. 破坏互斥条件
同一时刻允许多个线程占用某些资源,但是有互斥限制的场景中,资源往往就是需要独占的,例如打印机、全局计数器等。问题与解决方案形成了悖论,所以该方案并无实用性。
2. 破坏占有等待
实行资源预先分配策略,线程在运行前一次性地向系统申请它所需要的全部资源。如果某个线程所需的全部资源无法得到满足,则不分配任何资源,此线程不会运行。 当系统能够满足线程的全部资源请求时,一次性地将所申请的资源全部分配到线程,此时线程拥有它运行所需的全部资源,所以不会发生占有资源同时又申请等待资源的现象,避免了死锁的发生。
3. 破坏不可抢占
允许线程抢占资源,当一个线程申请新的资源但不能立即被满足时,它必须释放占有的全部资源,之后再重新申请。 它所释放的资源可以分配给其它线程,这就相当于该线程占有的资源被隐性抢占了,该方法实现较为困难并且会降低系统性能。
4. 破坏循环等待
实行资源有序分配,把所有资源先进行分类、编号、分配,使线程在申请资源时不会形成环形等待。 所有线程对资源的请求必须严格按资源序号顺序分配,这样多个线程之间就不会产生资源循环等待 (因为每个线程都可以获取执行所需的全部资源),从而预防了发生死锁。
避免死锁
在程序执行时,通过将各项操作转换为安全的序列化操作等方法来判断是否存在死锁,并在判断可能会发生死锁时采取预防措施,避免死锁的发生。
安全状态
如图所示,每个表格第二列表示线程已拥有的资源数,第三列表示线程需要的所有资源数,“空闲数” 表示当前可以使用的资源数。
具体的资源分配过程如下:
- 初始状态下有 3 个可分配资源;
- 给 B 分配 2 个资源,这样 B 就拥有执行所需的全部资源,此时剩余资源为 3 - 2 = 1;
- B 执行完成后释放其全部资源,此时剩余资源为 1 + 4 = 5;
- 给 C 分配 5 个资源,这样 C 就拥有执行所需的全部资源,此时剩余资源为 5 - 5 = 0;
- C 执行完成后释放其全部资源,此时剩余资源为 0 + 7 = 7;
- 给 A 分配 7 个资源,让 A 执行,程序结束;
上述执行过程中的第 4 步,如果不是给 C 分配资源,而是给 A 分配资源,那么就无法保证安全状态从而进入死锁,感兴趣的读者可以改变表格数据来验证结果。
银行家算法
一个小城镇的银行家,向一群客户分别承诺了一定的贷款额度,算法要做的事情是:
判断贷款请求通过后,是否会进入不安全状态,如果不会进入不安全状态,就同意贷款请求,否则拒绝贷款请求。
具体的资源分配过程如下:
- 初始状态下有 10 个可分配资源;
- 给 A, B, C, D 分配资源后,此时剩余资源为 10 - 8 = 2;
- 此时不会进入不安全状态的唯一选择是将剩余的 2 个资源分配给 C ;
- 如果分配到其他进程/线程,就会进入不安全状态,产生死锁;
- 银行家算法必须能够正确检测到继续分配是否会产生死锁,并且提前拒绝分配 (如图 b 到图 c 的过程);
在现实世界中,很少有操作系统使用银行家算法来避免死锁,因为很少有进程在运行前就知道其运行所需资源的最大值,而且进程数量也不是固定的,处于不断变化之中, 除此之外,资源也可能变为不可用状态 (例如打印机离线、磁盘损坏等)。
检测死锁
在程序执行时,通过周期性地扫描资源分配情况来检测死锁的发生,并采取相应的措施消除死锁。
不试图阻止死锁,而应当检测到死锁发生时,采取措施进行恢复。
1. 每种类型一个资源的死锁检测
如图所示的资源分配中,方框表示资源,圆圈表示进程,资源指向进程表示该资源已经分配给该进程,进程指向资源表示进程请求获取该资源。
图 b 是从图 a 剥离出来的环路图,满足了死锁条件中的循环,因此会发生死锁。
检测算法概述:通过检测是否存在有向环路来实现,从一个节点出发进行 DFS (深度优先搜索),对访问过的节点进行标记,如果访问了已经标记的节点,就表示有向图存在环,也就是检测到了死锁。
2. 每种类型多个资源的死锁检测
## A B C D
## 磁带机 绘图仪 扫描仪 光驱
E = ( 4 2 3 1 )
A = ( 2 1 0 0 )
_ _
| 0 0 1 0 |
C = | 2 0 0 1 |
| 0 1 2 0 |
- -
_ _
| 2 0 0 1 |
R = | 1 0 1 0 |
| 2 1 0 0 |
- -
上面的代码示例中,有 3 个进程和 4 种资源:
- E 向量:资源总量
- A 向量:资源剩余量
- C 矩阵:每个进程所拥有的资源数量,每一行都代表 1 个进程拥有资源的数量
- R 矩阵:每个进程请求的资源数量
通过对进程和资源的统计,可以获取到当前系统的状态:
- 进程 P1 和 P2 所请求的资源都无法得到满足 (扫描仪和光驱已经没有剩余资源了);
- 进程 P3 申请的资源可以完全满足;
- 让进程 P3 先执行,完成之后释放其全部资源;
- 此时资源 A = (2 2 2 0), 进程 P2 可以执行,完成之后释放其全部资源;
- 此时资源 A = (4 2 2 1), 进程 P1 可以执行;
- 所有进程全部得到执行并完成,没有产生死锁;
检测算法概述:
每个进程初始时都不被标记,执行过程中有可能被标记,当算法结束时,任何没有被标记的进程说明内部产生了死锁。
- 寻找一个还未标记的进程 Pi,它所请求的资源小于等于 A;
- 如果找到相应的进程,那么将矩阵 C 的第 i 行向量加到 A 中,标记该进程,并返回到第一步继续执行;
- 如果没有找到相应的进程,算法执行结束;
解除死锁
在死锁发生时,可以采用 kill 进程、抢占资源和回滚 等方法来解除死锁。
活锁
指线程无法取得需要的资源而一直重试的现象。与死锁不同,活锁中的线程不会被阻塞,它们会一直尝试获取资源,但是却一直失败,最终导致程序无法正常执行。
举个生活中的小例子,两个人相向过马路,如果两人同时向一边谦让,那么两个人都过不去,紧接着两人同时又移到另一边,此时两个人依然过不去。 如果不受其他因素干扰,两个人一直在同步移动,那么最终的结果就是两个人都没有前进,产生了活锁 (这个词很形象,两个人都被对象活活地锁住了)。
处理方法
1. 引入随机性
在代码中引入一定的随机性可以避免发生活锁,例如在重试的过程中,引入随机的休眠时间来中断死循环,让线程有机会释放资源并且重新获取资源。
2. 引入系统时间戳
通过比较系统时间戳来决定线程是否需要继续等待,因为多个线程获取到的时间戳 (系统时钟)不可能完全一致,可以在时间戳的基础上进行优先级排序,最后通过排序后的线程顺序进行调度。
示例代码
刚才的小例子转换成代码如下:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
// 使用一个互斥信号量来同步
cond = sync.NewCond(&sync.Mutex{})
// 分别表示左右两个方向的计数器 (默认值为 0)
// 也就是说,两个人碰面时,为了给对方让路,会向左或向右移动
leftCnt, rightCnt int32
)
// 信号量加锁操作
// 两个人在移动方向时必须加锁
func takeStep() {
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
}
// 方向移动
func move(name, dir string, cnt *int32) bool {
fmt.Printf("%s 走到了 %v\n", name, dir)
// 当前方向计数器 + 1
atomic.AddInt32(cnt, 1)
takeStep()
// 如果当前计数器只被一个人修改过
// 说明这个人移动了方向,但是对方未移动,此时可以让对方先走,程序直接返回即可
if atomic.LoadInt32(cnt) == 1 {
// 因为活锁
// 所以代码永远执行不到这里
fmt.Printf("%s 给对方让路成功 \n", name)
return true
}
takeStep()
// 当前方向计数器 - 1
atomic.AddInt32(cnt, -1)
return false
}
func giveWay(name string) {
fmt.Printf("%s 尝试给对方让路 ... \n", name)
// 模拟三次双方互相让路
for i := 0; i < 3; i++ {
if move(name, "左边", &leftCnt) || move(name, "右边", &rightCnt) {
return
}
}
fmt.Printf("%v 无奈地说: 咱们可以停止互相给对方让路了,你先过!\n", name)
}
func main() {
go func() {
// 1 毫秒之后发出通知,释放锁
for range time.Tick(1 * time.Millisecond) {
cond.Broadcast()
}
}()
var wg sync.WaitGroup
// 模拟两个人
// 小明和小红
wg.Add(2)
go func() {
defer wg.Done()
giveWay("小明")
}()
go func() {
defer wg.Done()
giveWay("小红")
}()
wg.Wait()
}
运行上面的代码,可以看到输出结果和描述的活锁产生过程是一致的 (笔者加了一些分割线辅助阅读):
# go run main.go
# 你的输出和这里可能不完全一样,但是逻辑是一致的
# 双方第一次让路
小红 尝试给对方让路 ...
小红 走到了 左边
小明 尝试给对方让路 ...
小明 走到了 左边
--------------------------------
小红 走到了 右边
小明 走到了 右边
--------------------------------
# 双方第二次让路
小红 走到了 左边
小明 走到了 左边
小红 走到了 右边
小明 走到了 右边
--------------------------------
# 双方第三次让路
小红 走到了 左边
小明 走到了 左边
小红 走到了 右边
小明 走到了 右边
--------------------------------
# 双方完成了第三次让路
小红 无奈地说: 咱们可以停止互相给对方让路了,你先过!
小明 无奈地说: 咱们可以停止互相给对方让路了,你先过!
自旋锁
是一种互斥锁的实现方式,当线程尝试获得一个锁时,如果发现这个锁已经被其他线程占用,它会不断地重复尝试获取锁,而不是放弃CPU的控制权。这个过程被称为自旋,它能够有效地减少线程切换的开销,提高锁的性能。 自旋锁同时避免了进程上下文的调度开销,因此对于短时间内的线程阻塞场景是有效的。
示例代码
最简单的实现就是直接在循环中不断尝试获取锁,代码如下:
package main
import "sync"
func main() {
mu := sync.Mutex{}
for !mu.TryLock() {
// 获取到锁之后
// 执行某些操作
mu.Unlock()
}
}
当然,这个简单的代码性能实在堪忧,我们可以将其优化一下:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type spinLock uint32
// 获取自旋锁
func (sl *spinLock) lock() {
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
// 获取到自旋锁
}
}
// 释放自旋锁
func (sl *spinLock) unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
func main() {
var sl spinLock
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
sl.lock()
fmt.Printf("index %d got spin lock\n", index)
sl.unlock()
}(i)
}
wg.Wait()
}
除此之外,还可以了解一下标准库中的做法,在比如我们之前在 sync.Mutex 设计与实现一文中 摘录的标准库中的自旋锁实现代码:
// $GOROOT/src/runtime/proc.go
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
饿死 (Starvation)
饿死(Starvation)也称为饥饿,是指某个进程因无法获取所需资源而无法执行,一直处于等待状态的情况。
饿死与死锁的差别在于: 死锁是由于多个进程/线程互相竞争资源造成的,饿死则是某个进程/线程无法获取资源造成的。
处理方法
1. 公平调度
使用公平性调度算法可以确保每个进程/线程都有机会获得所需资源,尤其是对于 CPU 调度来说,可采用先到先服务或者时间片轮转的方式, 避免某个进程/线程一直占用资源,而其他进程/线程无法得到执行的情况。
2. 优先级反转
当一个低优先级的进程/线程占用了高优先级进程/线程所需资源时,高优先级进程/线程就会被迫等待,从而可能导致其饿死,可以使用 “优先级继承” 或 “优先级反转” 方法来避免这种情况发生。
- 优先级继承 指的是将低优先级进程/线程的优先级提升到高优先级进程/线程相同的级别,确保它们能够正常获得资源;
- 优先级反转 指的是将中间层低优先级的进程/线程优先级设置为高于最低优先级进程/线程的优先级,避免它们被最低优先级的进程/线程所阻塞;
优先级反转的作用在于避免优先级继承可能引发的问题。
下面举个小例子来说明这种情况:
- 当前有三个进程 T1、T2、T3,T1 为高优先级进程,T2 为中间层进程,T3 为低优先级进程;
- T1 和 T3 都需要访问同一个资源;
- 一开始 T1 获得了该资源,但在此期间 T3 进入运行队列,T2 也被调度执行;
- 由于 T1 一直占用了该资源,因此 T2 无法获取所需的资源而进入等待状态;
优先级翻转的解决方法是:
- 在 T2 执行之前,将 T2 的优先级提升到 T3 的当前优先级 (因为此时 T3 已经进入运行队列);
- 即使 T1 占用了 T2 所需要的资源,T2 也可以获得执行;
- 在 T2 结束时,它的优先级被恢复到原始状态,以继续等待自身所依赖的资源;
3. 限制等待时间
当一个进程/线程等待的时间达到一定阈值之后,系统可以强制释放该进程/线程所占用的资源,以确保其他进程/线程也有机会获得这些资源。
示例代码
我们可以使用两个 goroutine
来模拟 饿死
现象:
- 第一个 goroutine 占用了很多的资源,所以代码可以执行更多的次数;
- 第二个 goroutine 占用了很少的资源,所以代码只能执行很少的次数,甚至不执行;
package main
import (
"fmt"
"sync"
"time"
)
const (
// 单个 goroutine 执行总时长
runtime = 1 * time.Second
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
// 占用资源过多的 goroutine
go func() {
defer wg.Done()
count := 0
//begin, end := time.Now(), time.Now().Add(time.Millisecond)
for begin := time.Now(); time.Since(begin) <= runtime; {
count++
// 休眠时间 1 毫秒, 模拟占用资源多
time.Sleep(1 * time.Millisecond)
}
fmt.Printf("占用资源过多的 goroutine 执行了 %d 次\n", count)
}()
// 占用资源很少的 goroutine
go func() {
defer wg.Done()
count := 0
for begin := time.Now(); time.Since(begin) <= runtime; {
count++
// 休眠时间 10 微秒, 模拟占用资源少
time.Sleep(1 * time.Nanosecond)
}
fmt.Printf("占用资源很少的 goroutine 执行了 %d 次\n", count)
}()
wg.Wait()
}
运行上面的代码,我们可以看到占用资源很少的 goroutine
获得资源 (执行的次数) 要远远低于占用资源过多的 goroutine
。
# go run main.go
占用资源很少的 goroutine 执行了 3951200 次
占用资源过多的 goroutine 执行了 997 次