sync.RWMutex Code Reading
2023-04-24 Golang 并发编程 Go 源码分析 读代码
概述
sync.RWMutex
实现了读写锁同步原语,资源可以被并发读,但是无法并发写、并发读写,比 sync.Mutex
实现的互斥锁粒度更细 (建议读者先读完 sync.Mutex Code Reading,再来读这篇,事半功倍)。
内部实现
我们来探究一下 sync.RWMutex
的内部实现,文件路径为 $GOROOT/src/sync/rwmutex.go
,笔者的 Go 版本为 go1.19 linux/amd64
。
RWMutex 对象
RWMutex
对象表示读写锁,零值状态下表示未加锁,锁可以被任意数量的 goroutine
并发读,但是同一时刻只能被一个 goroutine
写入,RWMutex 对象一旦使用后,就不能再复制。
如果一个
goroutine
持有了读锁进行操作,此时其他的goroutine
可能尝试获取写锁 (参与竞争),那么在读锁被释放之前,不会有其他goroutine
获取到读锁 (避免竞争写锁的goroutine
被饿死) 一个goroutine
在阻塞获取写锁的过程中,会阻止其他goroutine
获得读锁
// 根据 Go 内存模型的约束
// 第 n 次调用 Unlock 方法在第 m 次调用 Lock 方法之前同步,其中 n < m (和互斥锁规则一致)
// 对于任何 RLock 方法调用,都存在一个 n
// 第 n 次调用 Unlock 方法在调用 RLock 方法之前同步
// 而对应的调用 RUnlock 方法在第 n+1 次 次调用 Lock 方法之前同步
type RWMutex struct {
w Mutex // 复用互斥锁
writerSem uint32 // 信号量,写等待读
readerSem uint32 // 信号量,读等待写
readerCount int32 // 执行读操作的 goroutine 数量
readerWait int32 // 写操作被阻塞时,等待的读操作 goroutine 数量
}
TryRLock 方法
TryLock
方法尝试获取读锁并且返回是否成功 (调用非阻塞)。
func (rw *RWMutex) TryRLock() bool {
...
for {
c := atomic.LoadInt32(&rw.readerCount)
if c < 0 {
// 执行读操作的 goroutine 数量小于 0
// 说明当前有 goroutine 持有写锁正在操作
return false
}
if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
// 获取到读锁
return true
}
}
}
RLock 方法
RLock
方法用于获取读锁 (阻塞调用)。
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 当前 goroutine 陷入休眠,等待持有写锁的 goroutine 释放锁
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
...
}
RUnlock 方法
RUnlock
方法释放读锁,不会影响到其他同时持有读锁的 goroutine
。
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 执行读操作的 goroutine 数量小于 0
// 说明当前有 goroutine 持有写锁正在操作
// slow-path 允许代码内联
rw.rUnlockSlow(r)
}
...
}
rUnlockSlow 方法
func (rw *RWMutex) rUnlockSlow(r int32) {
...
// 减少写锁操作时,等待读锁的 goroutine 数量
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 如果所有的读锁都被释放了,唤醒等待写锁的 goroutine
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
TryLock
TryLock
方法尝试获取写锁并且返回是否成功 (调用非阻塞),方法内部直接复用了 Mutex.TryLock
方法,这里不再赘述。
func (rw *RWMutex) TryLock() bool {}
Lock 方法
Lock
方法用于获取写锁 (阻塞调用),如果锁当前处于读或者写状态,Lock 会阻塞直到锁可用。
func (rw *RWMutex) Lock() {
...
// 首先和等待写锁的 goroutine 竞争
rw.w.Lock()
// 获取到锁之后
// 通知所有持有读锁的 goroutine,有 1 个 goroutine 准备写入
// 也就是说获取到写锁之后,需要等待目前所有持有读锁的 goroutine 释放读锁后,才能继续操作
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 陷入休眠,等待持有读锁的 goroutine 释放锁
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
...
}
Unlock 方法
Unlock
方法释放写锁,对未加锁的读写锁执行解锁操作会引发运行时错误。
读写锁和互斥锁一样,并不是和某个具体的 goroutine 关联的,完全可以在一个 goroutine 获取读锁或写锁 ,在其他的 goroutine 中释放读锁或写锁 (前提是两者持有的是同一个锁)。
func (rw *RWMutex) Unlock() {
...
// 通知持有读锁的 goroutine, 当前没有持有写锁的 goroutine 执行
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 唤醒所有因获取读锁而陷入阻塞的 goroutine
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放写锁
rw.w.Unlock()
...
}
RLocker 方法
// 将 rw 进行类型转换,返回一个 Locker 接口
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
小结
sync.RWMutex
在互斥锁的基础上,实现了粒度更细的读写锁,内部实现和互斥锁类似,通过标识符常量、原子操作再结合信号、调度等完成。