sync.Cond Code Reading
2023-04-26 Golang 并发编程 Go 源码分析 读代码
概述
sync.Cond
实现了一种条件变量同步原语,可以让一个 goroutine
集合在满足特定条件时被唤醒。
sync.Cond
典型的使用场景是 生产-消费者模式
,多个 goroutine
等待某个事件发生, 单个 goroutine
通知某个事件已发生。
比如电商中的用户下单事件发生时,会通知到订单、用户、积分、优惠券、仓储等服务,如果是单个生产者对单个消费者,直接使用 互斥锁
或 channel
就可以。
为什么多个消费者模式不使用互斥锁或 channel 呢?
可以想象一个非常简单的场景: 有一个 goroutine
在异步接收数据,剩下的多个 goroutine
必须等待该 goroutine
接收完才能读取。
在这种情况下,如果单纯使用 互斥锁
或 channel
,就只能有一个 goroutine
可以等待并读取到数据,其他的 goroutine
没办法读取。
当然我们可以通过折衷的方案来解决,例如 可以创建一个全局变量,用来标志这个 goroutine
数据是否接收完成,剩下的 goroutine
反复检查该全局变量,直到满足条件。
或者 可以创建多个 channel
,每个 goroutine
阻塞在一个 channel
上面,接收数据的 goroutine
在数据接收完毕后,逐个通知。
但是不论哪种方式,实现复杂度都大大增加了。
sync.Cond
提供了简洁优雅的方式来解决上述问题。
示例
通过一个小例子展示 sync.Cond
的使用方法。
package main
import (
"fmt"
"sync"
"time"
)
// 条件变量
var done = false
// 数据读取操作
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
// 等待生产者写入通知
c.Wait()
}
fmt.Println(name, "starts reading")
c.L.Unlock()
}
// 数据写入操作
func write(name string, c *sync.Cond) {
fmt.Println(name, "starts writing")
time.Sleep(100 * time.Millisecond)
c.L.Lock()
// 设置条件变量
done = true
c.L.Unlock()
fmt.Println(name, "wakes all")
// 通知所有消费者
c.Broadcast()
}
func main() {
// 创建对象时传入一个互斥锁
cond := sync.NewCond(&sync.Mutex{})
// 3 个消费者
go read("reader-1", cond)
go read("reader-2", cond)
go read("reader-3", cond)
// 1 个生产者
write("writer-1", cond)
time.Sleep(time.Second)
}
$ go run main.go
# 输出如下
writer-1 starts writing
writer-1 wakes all
reader-2 starts reading
reader-1 starts reading
reader-3 starts reading
从输出结果中可以看到,消费者刚开始时调用 Wait
方法阻塞,直到生产者 (write) 写入完成后调用 Broadcast
方法通知所有消费者 (read),然后所有消费者依次输出。
内部实现
我们来探究一下 sync.Cond
的内部实现,文件路径为 $GOROOT/src/sync/cond.go
,笔者的 Go 版本为 go1.19 linux/amd64
。
Cond 对象
Cond
对象表示同步条件变量,可以让 goroutins
等待或通知某个事件发生,Cond 对象一旦使用后,就不能再复制。
每一个 Cond
对象都持有一个对应的 Locker
接口 (通常是一个互斥锁或读写锁),当条件发生变化以及调用 Wait
方法时,必须持有对应的锁。
在简单的应用场景中,更好的选择是使用 channel
完成同步操作 (Go 的标准库设计理念是上层应用尽量使用 channel
作为同步原语),可以将两者的对应关系简单概况如下:
- 关闭 channel 对应 Cond.Broadcast 方法
- 向 channel 发送数据对应 Cond.Signal 方法
type Cond struct {
// 保证编译期间不会发生复制
noCopy noCopy
// 当访问或者修改条件时,必须持有 L
L Locker
// goroutine 链表
notify notifyList
// 保证运行期间不会发生复制
checker copyChecker
}
notifyList 对象
notifyList
对象表示一个 goroutine
链表数据结构。
// runtime/sema.go
type notifyList struct {
// 等待的 goroutine 索引,可以在没有获取锁的情况下原子性递增
wait uint32
// 已经通知到的 goroutine 索引
// 可以在没有获取锁的情况下进行读取操作,但是必须在获得锁的情况下进行写入操作
notify uint32
lock mutex
// 等待索引和已通知索引可以是环形队列结构
// 链表头指针
head *sudog
// 链表尾指针
tail *sudog
}
NewCond 方法
NewCond
方法创建一个 Cond
对象,参数为一个 Locker
接口。
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
Wait 方法
Wait
方法 (阻塞调用) 会解锁 c.L
字段并且休眠当前 goroutine
,等到当前 goroutine
被唤醒后,Wait
方法在返回之前再对 c.L
字段加锁。
func (c *Cond) Wait() {
// 复制检测
c.checker.check()
// 等待索引 + 1
t := runtime_notifyListAdd(&c.notify)
// goroutine 休眠之前先解锁 (否则其他 goroutine 获取不到锁,会造成死锁问题)
c.L.Unlock()
// 等待唤醒,并传递等待索引
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
runtime_notifyListAdd 方法
runtime_notifyListAdd
方法通过链接器链接到 notifyListAdd
方法,notifyListAdd
方法将当前调用方 goroutine
添加到通知链表中以便接收通知。
// runtime/sema.go
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// 等待索引计数 + 1
return atomic.Xadd(&l.wait, 1) - 1
}
runtime_notifyListWait 方法
runtime_notifyListWait
方法通过链接器链接到 notifyListWait
方法,如果在调用 notifyListAdd
方法之后已经发送过通知,notifyListWait
就会立即返回,否则就陷入阻塞。
// runtime/sema.go
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
// 如果之前已经发送过通知,直接返回即可
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 获取当前 goroutine 并追加到链表尾部
s := acquireSudog()
s.g = getg()
// 获取等待索引
s.ticket = t
// 将 goroutine 加入到链表中
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 休眠当前 goroutine
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
// 归还 goroutine
releaseSudog(s)
}
Signal 方法
Signal
方法唤醒链表头部等待的 goroutine
。
func (c *Cond) Signal() {
// 复制检测
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
runtime_notifyListNotifyOne 方法
runtime_notifyListNotifyOne
方法通过链接器链接到 notifyListNotifyOne
方法,唤醒链表头部的 goroutine
。
// runtime/sema.go
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// 如果已通知索引和等待索引相同
// 说明没有等待的 goroutine, 直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 获取已通知索引
t := l.notify
// 如果已通知索引和等待索引相同
// 说明没有等待的 goroutine, 直接返回
// 又是一个经典的双重检测
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 已通知索引 + 1
atomic.Store(&l.notify, t+1)
// 根据已通知索引,找到对应的 goroutine 唤醒
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
// 唤醒满足条件的 goroutine
readyWithTime(s, 4)
return
}
}
}
为什么不直接唤醒链表的头部元素呢?
这就是 已通知索引 notify
字段存在的意义,因为获取等待索引和加入到链表两个步骤不是原子操作,这意味着在并发场景下,会出现顺序不一致的情况。
例如,goroutine
对应的等待索引为 2, 但是因为并发问题,加入到链表的时候,排到了第 3 个位置,如图所示:
不过不需要担心,我们可以根据 已通知索引 notify
字段,保证在发送单个通知时保证顺序的一致性,避免乱序可能带来的 先到的 goroutine 反而等待时间长
这类问题。
当前 notify
字段对应的 goroutine
通知后,会变更指针到下一个 goroutine
,如图所示:
从算法时间复杂度来分析,直接唤醒链表头部元素是 O(1)
, 通过 notify
字段唤醒是 O(N)
, 但是官方的注释中写道:
This scan looks linear but essentially always stops quickly.
所以即便出现乱序,notify
索引字段对应的 goroutine
也不会太靠后,所以不会产生太多的性能问题。
Broadcast 方法
Broadcast
方法唤醒所有等待的 goroutine
。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
runtime_notifyListNotifyAll 方法
runtime_notifyListNotifyAll
方法通过链接器链接到 notifyListNotifyAll
方法,唤醒所有等待的 goroutine
。
// runtime/sema.go
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// 没有等待的 goroutine, 直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 既然是全部唤醒,也就不用担心上面提到的乱序问题了
// 直接遍历 goroutine 链表,逐个唤醒 goroutine
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
check 方法
Cond.copyChecker
字段持有指向自身的指针,用来检测是否被复制,当指针值和实际地址值不一致时,说明发生了复制。
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
// 临界区域重复检测,避免原子对比后的瞬间,值被复制
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
check
方法的实现很有意思,里面有 3 个判断条件:
-
uintptr(*c) != uintptr(unsafe.Pointer(c))
比较 copyChecker 的指针值 (默认是 0)
-
atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c)))
CAS 操作 copyChecker 的指针值
-
uintptr(*c) != uintptr(unsafe.Pointer(c))`
比较 copyChecker 的指针值
为什么 CAS 操作
之后又重复比较了一次呢?主要是针对临界区的检测,因为可能会出现一种极端情况: CAS 操作
之后的瞬间 copyChecker 被复制了。
noCopy 对象
noCopy
对象可以添加到具体的结构体中,实现 “首次使用之后,无法被复制” 的功能 (由编译器实现)。
noCopy.Lock
方法是一个空操作,由 go vet
工具链中的 -copylocks checker 参数指令使用。
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
小结
sync.Cond
不是一个常用的同步机制,在条件变量长时间无法满足时,sync.Cond
能够让出处理器的使用权,和单纯使用 for {}
进行无限等待相比,
可以提高 CPU 的利用率。但是使用时我们也需要注意以下问题:
Wait
方法在调用之前一定要完成加锁操作,否则程序会panic
(因为方法内部会释放互斥锁)Signal
方法会唤醒链表 (队列) 最前面、等待最久的goroutine
(通过等待索引字段保证顺序)Broadcast
方法会按照链表的顺序 (并不是先进先出,因为可能存在乱序问题) 唤醒所有等待的goroutine