sync.Mutex Code Reading
2023-04-23 Golang 并发编程 Go 源码分析 读代码
概述
sync.Mutex
实现了互斥锁同步原语。
内部实现
我们来探究一下 sync.Mutex
的内部实现,文件路径为 $GOROOT/src/sync/mutex.go
,笔者的 Go 版本为 go1.19 linux/amd64
。
状态标识变量
const (
mutexLocked = 1 << iota // 1, 互斥锁已锁定
mutexWoken // 2, 互斥锁被唤醒
mutexStarving // 4, 互斥锁饥饿模式标识
mutexWaiterShift = iota // 3, 互斥锁上等待的 goroutine 数量值的计算偏移量
starvationThresholdNs = 1e6 // 进入饥饿模式的等待阈值,1 ms
)
两种工作模式
互斥锁可以在两种模式下操作:正常模式和饥饿模式,正常模式的性能要好很多,因为 goroutine
可以连续多次获得互斥锁,即使存在阻塞的 goroutine
,
与此同时,饥饿模式是防止尾部的 goroutine
被饿死的重要机制 (高尾延时)。
正常模式
正常模式下,锁的等待者采用 FIFO
先进先出队列,但是一个刚被唤醒的 goroutine
并不会直接获得锁,而是要与新到达的 goroutine
竞争锁的使用,
然而 新到达的 goroutine
有一个优势:它们已经在 CPU 上运行,并且数量可能有很多,结果就是刚被唤醒的 goroutine
大概率会竞争失败,
所以在这种情况下,失败的 goroutine
被排在等待队列最前面,如果等待的 goroutine
超过 1ms 无法获得锁,互斥锁会切换到饥饿模式。
饥饿模式
饥饿模式下,互斥锁的所有权直接从解锁的 goroutine
转移到队列最前面的 goroutine
(优先级发生变化),
新到达的 goroutines
不会尝试获得锁 (不参与竞争),也不会尝试自旋等待,它们会自动排在队列尾部,这样可以避免已经等待了太久的 goroutine
被饿死。
如果一个 goroutine
获得了锁,并且满足以下两个条件任意一个时,互斥锁会切换到正常模式:
- 它是队列的最后一个 goroutine
- 它等待时间不超过 1 ms
Mutex对象
Mutex
对象表示互斥锁,零值状态下表示未加锁,Mutex 对象一旦使用后,就不能再复制 (因为拷贝时无法复制 state, sema 字段的数据,会引起问题)。
state
字段表示锁的状态,低 3 位表示 3 种状态:
变量 | 状态 |
---|---|
mutexLocked | 互斥锁已锁定 |
mutexWoken | 互斥锁被唤醒 |
mutexStarving | 互斥锁处于饥饿模式 |
sema
字段表示用于控制锁状态的信号量。
// 根据 Go 内存模型的约束
// 第 n 次调用 Unlock 方法在第 m 次调用 Lock 方法之前同步,其中 n < m
// 调用 TryLock 方法成功,就相当于调用了 Lock 方法
// 调用 TryLock 方法失败,不会建立任何同步原语约束关系
type Mutex struct {
state int32
sema uint32
}
Locker 接口
type Locker interface {
Lock()
Unlock()
}
TryLock 方法
TryLock
方法尝试获取锁并且返回是否成功 (调用非阻塞)。
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// 这里可能有 1 个 goroutine 在等待锁
// 但是可以尝试在 goroutine 唤醒之前获得锁
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
...
return true
}
在标准库中的注释中有这样一段话:
注意,TryLock 真实的使用场景很少,使用 TryLock 通常说明在互斥锁的使用场景中,存在更深层次的问题。
官方的言外之意貌似并不鼓励使用该方法?TryLock
方法添加是社区的强烈意愿,官方一开始似乎并不感冒。笔者搜索了一下,发现目前标准库中几乎没有使用到 TryLock
方法。
$ grep -nr "TryLock" "$(dirname $(which go))/../src" | grep -v "\/\/" | grep -v "test"
# 只有 2 个声明和 1 个调用
.../src/sync/mutex.go:98:func (m *Mutex) TryLock() bool
.../src/sync/rwmutex.go:166:func (rw *RWMutex) TryLock() bool
.../src/sync/rwmutex.go:171: if !rw.w.TryLock()
Lock 方法
Lock
方法表示加锁操作,如果锁已经被使用,调用方 goroutine
会陷入阻塞,直到锁可用。
func (m *Mutex) Lock() {
// 锁当前状态是未锁定 (正好可以完成加锁,然后直接返回)
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
...
return
}
// slow path 可以代码内联
m.lockSlow()
}
lockSlow 方法
lockSlow
方法表示调用 Lock
方法时没有直接获取到锁时,接下来的处理工作 (进入阻塞调用)。
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待时间
starving := false // 是否处于饥饿模式
awoke := false // 是否处于唤醒状态
iter := 0 // 自旋迭代次数
old := m.state // 锁当前状态
for {
// 饥饿模式下不能自旋,因为所有权转移到了队列前面的 goroutine
// 新到达的 goroutine 不参与竞争并且不会尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果锁没有处于饥饿模式并且符合自旋条件,goroutine 开始自旋
// 尝试设置 mutexWoken 标识
// 这样 Unlock 释放锁时就不用唤醒其他阻塞的 goroutine (直接让当前 goroutine 获得锁)
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // goroutine 自旋
iter++ // 增加自旋次数
old = m.state // 更新锁状态
continue
}
new := old
// 互斥锁处于饥饿模式时,新到达的 goroutine 不要试图获得锁,应该去排队
if old&mutexStarving == 0 {
// 锁处于非饥饿模式
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 锁处于饥饿模式或者已加锁
// 增加等待的 goroutine 数量 (注意偏移量使用方法)
new += 1 << mutexWaiterShift
}
// 当前 goroutine 将锁切换到饥饿模式 (前提是锁没有被释放,调用了 Unlock 方法)
// 调用 Unlock 时,处于饥饿模式的锁期望有等待的 goroutine, 但实际情况不一定有
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// 当前 goroutine 处于唤醒状态,但是锁的状态里并没有 mutexWoken 标识
// 抛出一个 BUG: 互斥锁状态不一致
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// goroutine 被唤醒,重置 mutexWoken 标识
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 当前 goroutine 通过 CAS 操作获得了锁
break
}
// 如果已经处于等待状态,那么直接排在队列最前面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 通过信号量保证锁只能被 1 个 goroutine 获取到
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 如果等待时间超过了阈值,那么就进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 如果当前 goroutine 被唤醒并且锁处于饥饿模式
// 控制权转交给了当前 goroutine,但是互斥锁处于某种不一致的状态:
// mutexLocked 标识未设置,仍然认为当前 goroutine 正在等待锁
// 抛出一个 BUG: mutex 状态不一致
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 减少等待的 goroutine 数量 (注意偏移量使用方法)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
// 必须要在这里退出并且考虑等待时间
// 饥饿模式效率很低,一旦 2 个 goroutine 同时将互斥锁切换到饥饿模式,可能会陷入无限循环
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true // 当前 goroutine 唤醒
iter = 0 // 当前 goroutine 重置自旋次数
} else {
old = m.state
}
}
...
}
runtime_canSpin 方法
runtime_canSpin
方法检测当前 goroutine
是否可以进行自旋操作,通过链接器链接到 sync_runtime_canSpin
。
// sync/runtime.go
func runtime_canSpin(i int) bool
运行自旋操作的条件比较苛刻,需要满足:
- 程序运行在多核机器上且 GOMAXPROCS > 1
- 至少有一个正在运行的 P, 且本地运行队列为空
- 自旋次数不超过 4 次
自旋与互斥锁相反,不做被动等待 (直接主动原地旋转等待),因为其可以在全局运行队列或者其他处理器的运行队列上面运行。
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
Go 调度器的设计者考虑了 操作的资源利用率 和 频繁的线程抢占给操作系统带来的负载 之后,提出了
自旋线程
的概念。 当自旋线程
没有找到可供其调度执行的 goroutine 时,并不会销毁该线程,而是采取自旋
的操作保存起来。 虽然直观上看起来浪费了一些资源,但是考虑一下syscall (系统调用)
相关的情景就可以发现,相比自旋
来说,线程间频繁的抢占、创建、销毁等操作带来的负载会更高。
Unlock 方法
Unlock
方法表示锁的释放操作,对未加锁的互斥锁执行解锁操作会引发运行时错误。
互斥锁并不是和某个具体的 goroutine 关联的,完全可以在一个 goroutine 加锁,在其他的 goroutine 中释放锁 (前提是两者持有的是同一个锁)。
func (m *Mutex) Unlock() {
...
// 如果去除 mutexLocked 标识之后正好是 0, 说明当前 goroutine 成功解锁,直接返回即可
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// slow path 可以代码内联
m.unlockSlow(new)
}
}
unlockSlow 方法
unlockSlow
方法表示调用 Unlock
方法时没有直接释放锁时,接下来的处理工作 (进入阻塞调用)。
func (m *Mutex) unlockSlow(new int32) {
// 对未加锁的互斥锁执行解锁操作会引发运行时错误
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 如果锁处于正常模式
// 如果没有等待 goroutine 或者已经有 goroutine 被唤醒或抢到了锁,就不用唤醒其他 goroutine 了,直接返回即可
// 如果锁处于饥饿模式,所有权会直接从唤醒的 goroutine 转移到队列最前面的 goroutine
// 如果当前 goroutine 不在这个调用链中
// 可能是因为在 unlock 互斥锁时,没有观察到 mutexStarving 标识,所以直接返回即可
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 锁有正在等待的 goroutine, 那么被唤醒的 goroutine 得到锁
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 如果锁处于处于饥饿模式
// 将锁的所有权转移到下一个等待的 goroutine, 并让其开始运行
// 注意:mutexLocked 标识不需要被设置,因为等待的 goroutine 被唤醒后会进行设置
// 如果此时设置了 mutexStarving 标识,锁仍然被认为当前处于加锁状态,新到达的 goroutine 就无法获取到锁了
runtime_Semrelease(&m.sema, true, 1)
}
}
小结
互斥锁内部实现中将状态分成了两种: 正常模式和饥饿模式。在正常模式下,锁的分配采用竞争机制,在刚被唤醒的和新来的 goroutine
之间,
倾向于将锁分配给新来的 goroutine
, 竞争失败的 goroutine
会被放到队列首部,如果等待的 goroutine
超过 1ms 无法获得锁,互斥锁会切换到饥饿模式。
在饥饿模式下,从队列首部的 goroutine
开始依次获得锁,新来的 goroutine
会自动排在队列尾部,这样就避免尾部 goroutine
饿死。
此外,互斥锁的加锁和解锁过程实现中还涉及到了信号、以及 GMP 调度等概念,本文暂时忽略掉了这些细节,后面有时间了再专门写一篇。