蛮荆

sync.RWMutex Code Reading

2023-04-24

概述

sync.RWMutex 实现了读写锁同步原语,资源可以被并发读,但是无法并发写、并发读写,比 sync.Mutex 实现的互斥锁粒度更细 (建议读者先读完 sync.Mutex Code Reading,再来读这篇,事半功倍)。

内部实现

我们来探究一下 sync.RWMutex 的内部实现,文件路径为 $GOROOT/src/sync/rwmutex.go,笔者的 Go 版本为 go1.19 linux/amd64

RWMutex 对象

RWMutex 对象表示读写锁,零值状态下表示未加锁,锁可以被任意数量的 goroutine 并发读,但是同一时刻只能被一个 goroutine 写入,RWMutex 对象一旦使用后,就不能再复制。

如果一个 goroutine 持有了读锁进行操作,此时其他的 goroutine 可能尝试获取写锁 (参与竞争),那么在读锁被释放之前,不会有其他 goroutine 获取到读锁 (避免竞争写锁的 goroutine 被饿死), 一个 goroutine 在阻塞获取写锁的过程中,会阻止其他 goroutine 获得读锁。

// 根据 Go 内存模型的约束
// 第 n 次调用 Unlock 方法在第 m 次调用 Lock 方法之前同步,其中 n < m (和互斥锁规则一致)
// 对于任何 RLock 方法调用,都存在一个 n
//    第 n 次调用 Unlock 方法在调用 RLock 方法之前同步
//    而对应的调用 RUnlock 方法在第 n+1 次 次调用 Lock 方法之前同步
type RWMutex struct {
	w           Mutex  // 复用互斥锁
	writerSem   uint32 // 信号量,写等待读
	readerSem   uint32 // 信号量,读等待写
	readerCount int32  // 执行读操作的 goroutine 数量
	readerWait  int32  // 写操作被阻塞时,等待的读操作 goroutine 数量
}

TryRLock 方法

TryLock 方法尝试获取读锁并且返回是否成功 (调用非阻塞)。

func (rw *RWMutex) TryRLock() bool {
    ...
	
	for {
		c := atomic.LoadInt32(&rw.readerCount)
		if c < 0 {
			// 执行读操作的 goroutine 数量小于 0
			// 说明当前有 goroutine 持有写锁正在操作
			return false
		}
		if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
			// 获取到读锁
			return true
		}
	}
}

RLock 方法

RLock 方法用于获取读锁 (阻塞调用)。

func (rw *RWMutex) RLock() {
    ...
	
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// 当前 goroutine 陷入休眠,等待持有写锁的 goroutine 释放锁
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}

	...
}

RUnlock 方法

RUnlock 方法释放读锁,不会影响到其他同时持有读锁的 goroutine

func (rw *RWMutex) RUnlock() {
    ...
	
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// 执行读操作的 goroutine 数量小于 0
		// 说明当前有 goroutine 持有写锁正在操作
		// slow-path 允许代码内联
		rw.rUnlockSlow(r)
	}

	...
}

rUnlockSlow 方法

func (rw *RWMutex) rUnlockSlow(r int32) {
	...
	
	// 减少写锁操作时,等待读锁的 goroutine 数量
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// 如果所有的读锁都被释放了,唤醒等待写锁的 goroutine
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

TryLock

TryLock 方法尝试获取写锁并且返回是否成功 (调用非阻塞),方法内部直接复用了 Mutex.TryLock 方法,这里不再赘述。

func (rw *RWMutex) TryLock() bool {}

Lock 方法

Lock 方法用于获取写锁 (阻塞调用),如果锁当前处于读或者写状态,Lock 会阻塞直到锁可用。

func (rw *RWMutex) Lock() {
    ...
	
	// 首先和等待写锁的 goroutine 竞争
	rw.w.Lock()
	
	// 获取到锁之后
	// 通知所有持有读锁的 goroutine,有 1 个 goroutine 准备写入
	//    也就是说获取到写锁之后,需要等待目前所有持有读锁的 goroutine 释放读锁后,才能继续操作
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// 陷入休眠,等待持有读锁的 goroutine 释放锁
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
    
	...
}

Unlock 方法

Unlock 方法释放写锁,对未加锁的读写锁执行解锁操作会引发运行时错误。

读写锁和互斥锁一样,并不是和某个具体的 goroutine 关联的,完全可以在一个 goroutine 获取读锁或写锁 ,在其他的 goroutine 中释放读锁或写锁 (前提是两者持有的是同一个锁)。

func (rw *RWMutex) Unlock() {
    ...

	// 通知持有读锁的 goroutine, 当前没有持有写锁的 goroutine 执行 
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

	// 唤醒所有因获取读锁而陷入阻塞的 goroutine
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// 释放写锁
	rw.w.Unlock()
	
    ...
}

RLocker 方法

// 将 rw 进行类型转换,返回一个 Locker 接口
func (rw *RWMutex) RLocker() Locker {
	return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

小结

sync.RWMutex 在互斥锁的基础上,实现了粒度更细的读写锁,内部实现和互斥锁类似,通过标识符常量、原子操作再结合信号、调度等完成。

扩展阅读

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。