蛮荆

sync.Mutex Code Reading

2023-04-23

概述

sync.Mutex 实现了互斥锁同步原语。

内部实现

我们来探究一下 sync.Mutex 的内部实现,文件路径为 $GOROOT/src/sync/mutex.go,笔者的 Go 版本为 go1.19 linux/amd64

状态标识变量

const (
    mutexLocked = 1 << iota     // 1, 互斥锁已锁定
    mutexWoken                  // 2, 互斥锁被唤醒
    mutexStarving               // 4, 互斥锁饥饿模式标识
	
    mutexWaiterShift = iota     // 3, 互斥锁上等待的 goroutine 数量值的计算偏移量
	
    starvationThresholdNs = 1e6 // 进入饥饿模式的等待阈值,1 ms
)

两种工作模式

互斥锁可以在两种模式下操作:正常模式和饥饿模式,正常模式的性能要好很多,因为 goroutine 可以连续多次获得互斥锁,即使存在阻塞的 goroutine, 与此同时,饥饿模式是防止尾部的 goroutine 被饿死的重要机制 (高尾延时)。

正常模式

正常模式下,锁的等待者采用 FIFO 先进先出队列,但是一个刚被唤醒的 goroutine 并不会直接获得锁,而是要与新到达的 goroutine 竞争锁的使用, 然而 新到达的 goroutine 有一个优势:它们已经在 CPU 上运行,并且数量可能有很多,结果就是刚被唤醒的 goroutine 大概率会竞争失败, 所以在这种情况下,失败的 goroutine 被排在等待队列最前面,如果等待的 goroutine 超过 1ms 无法获得锁,互斥锁会切换到饥饿模式

饥饿模式

饥饿模式下,互斥锁的所有权直接从解锁的 goroutine 转移到队列最前面的 goroutine (优先级发生变化), 新到达的 goroutines 不会尝试获得锁 (不参与竞争),也不会尝试自旋等待,它们会自动排在队列尾部,这样可以避免已经等待了太久的 goroutine 被饿死。

如果一个 goroutine 获得了锁,并且满足以下两个条件任意一个时,互斥锁会切换到正常模式:

  1. 它是队列的最后一个 goroutine
  2. 它等待时间不超过 1 ms

Mutex对象

Mutex 对象表示互斥锁,零值状态下表示未加锁,Mutex 对象一旦使用后,就不能再复制 (因为拷贝时无法复制 state, sema 字段的数据,会引起问题)。

state 字段表示锁的状态,低 3 位表示 3 种状态:

变量 状态
mutexLocked 互斥锁已锁定
mutexWoken 互斥锁被唤醒
mutexStarving 互斥锁处于饥饿模式

sema 字段表示用于控制锁状态的信号量。

// 根据 Go 内存模型的约束
// 第 n 次调用 Unlock 方法在第 m 次调用 Lock 方法之前同步,其中 n < m
// 调用 TryLock 方法成功,就相当于调用了 Lock 方法
// 调用 TryLock 方法失败,不会建立任何同步原语约束关系
type Mutex struct {
	state int32
	sema  uint32
}

state 字段

Locker 接口

type Locker interface {
	Lock()
	Unlock()
}

TryLock 方法

TryLock 方法尝试获取锁并且返回是否成功 (调用非阻塞)。

func (m *Mutex) TryLock() bool {
	old := m.state
	if old&(mutexLocked|mutexStarving) != 0 {
		return false
	}

	// 这里可能有 1 个 goroutine 在等待锁
	// 但是可以尝试在 goroutine 唤醒之前获得锁
	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
		return false
	}
	
	...
	return true
}

在标准库中的注释中有这样一段话:

注意,TryLock 真实的使用场景很少,使用 TryLock 通常说明在互斥锁的使用场景中,存在更深层次的问题。

官方的言外之意貌似并不鼓励使用该方法TryLock 方法添加是社区的强烈意愿,官方一开始似乎并不感冒。笔者搜索了一下,发现目前标准库中几乎没有使用到 TryLock 方法。

$ grep -nr "TryLock" "$(dirname $(which go))/../src" | grep -v "\/\/" | grep -v "test"

# 只有 2 个声明和 1 个调用
.../src/sync/mutex.go:98:func (m *Mutex) TryLock() bool 
.../src/sync/rwmutex.go:166:func (rw *RWMutex) TryLock() bool 
.../src/sync/rwmutex.go:171:   if !rw.w.TryLock() 

Lock 方法

Lock 方法表示加锁操作,如果锁已经被使用,调用方 goroutine 会陷入阻塞,直到锁可用。

func (m *Mutex) Lock() {
	// 锁当前状态是未锁定 (正好可以完成加锁,然后直接返回)
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		...
		return 
	}
	// slow path 可以代码内联
	m.lockSlow()
}

lockSlow 方法

lockSlow 方法表示调用 Lock 方法时没有直接获取到锁时,接下来的处理工作 (进入阻塞调用)。

func (m *Mutex) lockSlow() {
	var waitStartTime int64 // 等待时间
	starving := false       // 是否处于饥饿模式
	awoke := false          // 是否处于唤醒状态
	iter := 0               // 自旋迭代次数
	old := m.state          // 锁当前状态
	
	for {
		// 饥饿模式下不能自旋,因为所有权转移到了队列前面的 goroutine
		// 新到达的 goroutine 不参与竞争并且不会尝试自旋
		
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 如果锁没有处于饥饿模式并且符合自旋条件,goroutine 开始自旋
			// 尝试设置 mutexWoken 标识 
			// 这样 Unlock 释放锁时就不用唤醒其他阻塞的 goroutine (直接让当前 goroutine 获得锁)
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			
			runtime_doSpin() // goroutine 自旋
			iter++          // 增加自旋次数
			old = m.state   // 更新锁状态
			continue
		}
		
		new := old
		
		// 互斥锁处于饥饿模式时,新到达的 goroutine 不要试图获得锁,应该去排队
		
		if old&mutexStarving == 0 {
            // 锁处于非饥饿模式
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
            // 锁处于饥饿模式或者已加锁
			// 增加等待的 goroutine 数量 (注意偏移量使用方法)
			new += 1 << mutexWaiterShift 
		}
		
		// 当前 goroutine 将锁切换到饥饿模式 (前提是锁没有被释放,调用了 Unlock 方法)
		// 调用 Unlock 时,处于饥饿模式的锁期望有等待的 goroutine, 但实际情况不一定有
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			// 当前 goroutine 处于唤醒状态,但是锁的状态里并没有 mutexWoken 标识
			// 抛出一个 BUG: 互斥锁状态不一致
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			// goroutine 被唤醒,重置 mutexWoken 标识
			new &^= mutexWoken
		}
		
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
                // 当前 goroutine 通过 CAS 操作获得了锁
				break 
			}
			
			// 如果已经处于等待状态,那么直接排在队列最前面
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			// 通过信号量保证锁只能被 1 个 goroutine 获取到
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			
            // 如果等待时间超过了阈值,那么就进入饥饿模式
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// 如果当前 goroutine 被唤醒并且锁处于饥饿模式
				// 控制权转交给了当前 goroutine,但是互斥锁处于某种不一致的状态:
				//      mutexLocked 标识未设置,仍然认为当前 goroutine 正在等待锁
				//      抛出一个 BUG: mutex 状态不一致
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				
				// 减少等待的 goroutine 数量 (注意偏移量使用方法)
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// 退出饥饿模式
					// 必须要在这里退出并且考虑等待时间
					
					// 饥饿模式效率很低,一旦 2 个 goroutine 同时将互斥锁切换到饥饿模式,可能会陷入无限循环
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true // 当前 goroutine 唤醒
			iter = 0     // 当前 goroutine 重置自旋次数
		} else {
			old = m.state
		}
	}

	...
}

runtime_canSpin 方法

runtime_canSpin 方法检测当前 goroutine 是否可以进行自旋操作,通过链接器链接到 sync_runtime_canSpin

// sync/runtime.go

func runtime_canSpin(i int) bool

运行自旋操作的条件比较苛刻,需要满足:

  1. 程序运行在多核机器上且 GOMAXPROCS > 1
  2. 至少有一个正在运行的 P, 且本地运行队列为空
  3. 自旋次数不超过 4 次

自旋与互斥锁相反,不做被动等待 (直接主动原地旋转等待),因为其可以在全局运行队列或者其他处理器的运行队列上面运行

//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

Go 调度器的设计者考虑了 操作的资源利用率 和 频繁的线程抢占给操作系统带来的负载 之后,提出了 自旋线程 的概念。 当 自旋线程 没有找到可供其调度执行的 goroutine 时,并不会销毁该线程,而是采取 自旋 的操作保存起来。 虽然直观上看起来浪费了一些资源,但是考虑一下 syscall (系统调用) 相关的情景就可以发现,相比 自旋 来说,线程间频繁的抢占、创建、销毁等操作带来的负载会更高。

Unlock 方法

Unlock 方法表示锁的释放操作,对未加锁的互斥锁执行解锁操作会引发运行时错误。

互斥锁并不是和某个具体的 goroutine 关联的,完全可以在一个 goroutine 加锁,在其他的 goroutine 中释放锁 (前提是两者持有的是同一个锁)

func (m *Mutex) Unlock() {
    ...

	// 如果去除 mutexLocked 标识之后正好是 0, 说明当前 goroutine 成功解锁,直接返回即可
	new := atomic.AddInt32(&m.state, -mutexLocked)
	
	if new != 0 {
		// slow path 可以代码内联
		m.unlockSlow(new)
	}
}

unlockSlow 方法

unlockSlow 方法表示调用 Unlock 方法时没有直接释放锁时,接下来的处理工作 (进入阻塞调用)。

func (m *Mutex) unlockSlow(new int32) {
	// 对未加锁的互斥锁执行解锁操作会引发运行时错误
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	
	if new&mutexStarving == 0 {
		old := new
		for {
			// 如果锁处于正常模式
			//    如果没有等待 goroutine 或者已经有 goroutine 被唤醒或抢到了锁,就不用唤醒其他 goroutine 了,直接返回即可
			
			// 如果锁处于饥饿模式,所有权会直接从唤醒的 goroutine 转移到队列最前面的 goroutine
			// 如果当前 goroutine 不在这个调用链中
			//      可能是因为在 unlock 互斥锁时,没有观察到 mutexStarving 标识,所以直接返回即可
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			
			// 锁有正在等待的 goroutine, 那么被唤醒的 goroutine 得到锁
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// 如果锁处于处于饥饿模式
		// 将锁的所有权转移到下一个等待的 goroutine, 并让其开始运行
		// 注意:mutexLocked 标识不需要被设置,因为等待的 goroutine 被唤醒后会进行设置
		//      如果此时设置了 mutexStarving 标识,锁仍然被认为当前处于加锁状态,新到达的 goroutine 就无法获取到锁了
		runtime_Semrelease(&m.sema, true, 1)
	}
}

小结

互斥锁内部实现中将状态分成了两种: 正常模式和饥饿模式。在正常模式下,锁的分配采用竞争机制,在刚被唤醒的和新来的 goroutine 之间, 倾向于将锁分配给新来的 goroutine, 竞争失败的 goroutine 会被放到队列首部,如果等待的 goroutine 超过 1ms 无法获得锁,互斥锁会切换到饥饿模式。 在饥饿模式下,从队列首部的 goroutine 开始依次获得锁,新来的 goroutine 会自动排在队列尾部,这样就避免尾部 goroutine 饿死。 此外,互斥锁的加锁和解锁过程实现中还涉及到了信号、以及 GMP 调度等概念,本文暂时忽略掉了这些细节,后面有时间了再专门写一篇。

扩展阅读

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。