蛮荆

sync.Cond Code Reading

2023-04-26

概述

sync.Cond 实现了一种条件变量同步原语,可以让一个 goroutine 集合在满足特定条件时被唤醒。

sync.Cond 典型的使用场景是 生产-消费者模式,多个 goroutine 等待某个事件发生, 单个 goroutine 通知某个事件已发生。 比如电商中的用户下单事件发生时,会通知到订单、用户、积分、优惠券、仓储等服务,如果是单个生产者对单个消费者,直接使用 互斥锁channel 就可以。

为什么多个消费者模式不使用互斥锁或 channel 呢?

可以想象一个非常简单的场景: 有一个 goroutine 在异步接收数据,剩下的多个 goroutine 必须等待该 goroutine 接收完才能读取。 在这种情况下,如果单纯使用 互斥锁channel,就只能有一个 goroutine 可以等待并读取到数据,其他的 goroutine 没办法读取。

当然我们可以通过折衷的方案来解决,例如 可以创建一个全局变量,用来标志这个 goroutine 数据是否接收完成,剩下的 goroutine 反复检查该全局变量,直到满足条件。 或者 可以创建多个 channel,每个 goroutine 阻塞在一个 channel 上面,接收数据的 goroutine 在数据接收完毕后,逐个通知。 但是不论哪种方式,实现复杂度都大大增加了。

sync.Cond 提供了简洁优雅的方式来解决上述问题。

示例

通过一个小例子展示 sync.Cond 的使用方法。

package main

import (
	"fmt"
	"sync"
	"time"
)

// 条件变量
var done = false

// 数据读取操作
func read(name string, c *sync.Cond) {
	c.L.Lock()
	for !done {
		// 等待生产者写入通知
		c.Wait()
	}
	fmt.Println(name, "starts reading")
	c.L.Unlock()
}

// 数据写入操作
func write(name string, c *sync.Cond) {
	fmt.Println(name, "starts writing")
	time.Sleep(100 * time.Millisecond)

	c.L.Lock()
	// 设置条件变量
	done = true
	c.L.Unlock()

	fmt.Println(name, "wakes all")
	// 通知所有消费者
	c.Broadcast() 
}

func main() {
	// 创建对象时传入一个互斥锁
	cond := sync.NewCond(&sync.Mutex{}) 

	// 3 个消费者
	go read("reader-1", cond)
	go read("reader-2", cond)
	go read("reader-3", cond)

	// 1 个生产者
	write("writer-1", cond)

	time.Sleep(time.Second)
}
$ go run main.go

# 输出如下
writer-1 starts writing
writer-1 wakes all
reader-2 starts reading
reader-1 starts reading
reader-3 starts reading

从输出结果中可以看到,消费者刚开始时调用 Wait 方法阻塞,直到生产者 (write) 写入完成后调用 Broadcast 方法通知所有消费者 (read),然后所有消费者依次输出。

调用关系图

内部实现

我们来探究一下 sync.Cond 的内部实现,文件路径为 $GOROOT/src/sync/cond.go,笔者的 Go 版本为 go1.19 linux/amd64

Cond 对象

Cond 对象表示同步条件变量,可以让 goroutins 等待或通知某个事件发生,Cond 对象一旦使用后,就不能再复制。

每一个 Cond 对象都持有一个对应的 Locker 接口 (通常是一个互斥锁或读写锁),当条件发生变化以及调用 Wait 方法时,必须持有对应的锁。

在简单的应用场景中,更好的选择是使用 channel 完成同步操作 (Go 的标准库设计理念是上层应用尽量使用 channel 作为同步原语),可以将两者的对应关系简单概况如下:

  • 关闭 channel 对应 Cond.Broadcast 方法
  • 向 channel 发送数据对应 Cond.Signal 方法

Signal 和 Broadcast

type Cond struct {
    // 保证编译期间不会发生复制
	noCopy noCopy
	// 当访问或者修改条件时,必须持有 L
	L Locker
	// goroutine 链表
	notify  notifyList
	// 保证运行期间不会发生复制
	checker copyChecker 
}

notifyList 对象

notifyList 对象表示一个 goroutine 链表数据结构。

// runtime/sema.go

type notifyList struct {
	// 等待的 goroutine 索引,可以在没有获取锁的情况下原子性递增
	wait uint32

	// 已经通知到的 goroutine 索引
	// 可以在没有获取锁的情况下进行读取操作,但是必须在获得锁的情况下进行写入操作
	notify uint32
	
	lock mutex
	// 等待索引和已通知索引可以是环形队列结构
	// 链表头指针
	head *sudog
    // 链表尾指针
	tail *sudog 
}

sync.Cond 对象结构

NewCond 方法

NewCond 方法创建一个 Cond 对象,参数为一个 Locker 接口。

func NewCond(l Locker) *Cond {
	return &Cond{L: l}
}

Wait 方法

Wait 方法 (阻塞调用) 会解锁 c.L 字段并且休眠当前 goroutine,等到当前 goroutine 被唤醒后,Wait 方法在返回之前再对 c.L 字段加锁。

func (c *Cond) Wait() {
	// 复制检测
	c.checker.check()
	// 等待索引 + 1
	t := runtime_notifyListAdd(&c.notify)
    // goroutine 休眠之前先解锁 (否则其他 goroutine 获取不到锁,会造成死锁问题)
	c.L.Unlock()
    // 等待唤醒,并传递等待索引
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}

runtime_notifyListAdd 方法

runtime_notifyListAdd 方法通过链接器链接到 notifyListAdd 方法,notifyListAdd 方法将当前调用方 goroutine 添加到通知链表中以便接收通知。

// runtime/sema.go

//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
	// 等待索引计数 + 1
    return atomic.Xadd(&l.wait, 1) - 1
}

runtime_notifyListWait 方法

runtime_notifyListWait 方法通过链接器链接到 notifyListWait 方法,如果在调用 notifyListAdd 方法之后已经发送过通知,notifyListWait 就会立即返回,否则就陷入阻塞。

// runtime/sema.go

//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
	// 如果之前已经发送过通知,直接返回即可
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// 获取当前 goroutine 并追加到链表尾部
	s := acquireSudog()
	s.g = getg()
	// 获取等待索引
	s.ticket = t
	
	// 将 goroutine 加入到链表中
	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
	
	// 休眠当前 goroutine
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	// 归还 goroutine
    releaseSudog(s)
}

Signal 方法

Signal 方法唤醒链表头部等待的 goroutine

func (c *Cond) Signal() {
    // 复制检测
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

runtime_notifyListNotifyOne 方法

runtime_notifyListNotifyOne 方法通过链接器链接到 notifyListNotifyOne 方法,唤醒链表头部的 goroutine

// runtime/sema.go

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
	// 如果已通知索引和等待索引相同
	// 说明没有等待的 goroutine, 直接返回
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	// 获取已通知索引
    t := l.notify
	// 如果已通知索引和等待索引相同
	// 说明没有等待的 goroutine, 直接返回
	// 又是一个经典的双重检测
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// 已通知索引 + 1
    atomic.Store(&l.notify, t+1)

	// 根据已通知索引,找到对应的 goroutine 唤醒
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		if s.ticket == t {
			// 唤醒满足条件的 goroutine
			readyWithTime(s, 4)
			return
		}
	}
}

为什么不直接唤醒链表的头部元素呢

这就是 已通知索引 notify 字段存在的意义,因为获取等待索引和加入到链表两个步骤不是原子操作,这意味着在并发场景下,会出现顺序不一致的情况。

例如,goroutine 对应的等待索引为 2, 但是因为并发问题,加入到链表的时候,排到了第 3 个位置,如图所示:

乱序问题示例

不过不需要担心,我们可以根据 已通知索引 notify 字段,保证在发送单个通知时保证顺序的一致性,避免乱序可能带来的 先到的 goroutine 反而等待时间长 这类问题。

当前 notify 字段对应的 goroutine 通知后,会变更指针到下一个 goroutine,如图所示:

乱序问题解决

从算法时间复杂度来分析,直接唤醒链表头部元素是 O(1), 通过 notify 字段唤醒是 O(N), 但是官方的注释中写道:

This scan looks linear but essentially always stops quickly.

所以即便出现乱序,notify 索引字段对应的 goroutine 也不会太靠后,所以不会产生太多的性能问题。

Broadcast 方法

Broadcast 方法唤醒所有等待的 goroutine

func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

runtime_notifyListNotifyAll 方法

runtime_notifyListNotifyAll 方法通过链接器链接到 notifyListNotifyAll 方法,唤醒所有等待的 goroutine

// runtime/sema.go

//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
    // 没有等待的 goroutine, 直接返回
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}
	
	// 既然是全部唤醒,也就不用担心上面提到的乱序问题了
	// 直接遍历 goroutine 链表,逐个唤醒 goroutine
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

check 方法

Cond.copyChecker 字段持有指向自身的指针,用来检测是否被复制,当指针值和实际地址值不一致时,说明发生了复制。

type copyChecker uintptr

func (c *copyChecker) check() {
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		// 临界区域重复检测,避免原子对比后的瞬间,值被复制
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")
	}
}

check 方法的实现很有意思,里面有 3 个判断条件:

  1. uintptr(*c) != uintptr(unsafe.Pointer(c))

    比较 copyChecker 的指针值 (默认是 0)

  2. atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c)))

    CAS 操作 copyChecker 的指针值

  3. uintptr(*c) != uintptr(unsafe.Pointer(c))`

    比较 copyChecker 的指针值

为什么 CAS 操作 之后又重复比较了一次呢?主要是针对临界区的检测,因为可能会出现一种极端情况: CAS 操作 之后的瞬间 copyChecker 被复制了

noCopy 对象

noCopy 对象可以添加到具体的结构体中,实现 “首次使用之后,无法被复制” 的功能 (由编译器实现)。

noCopy.Lock 方法是一个空操作,由 go vet 工具链中的 -copylocks checker 参数指令使用。

type noCopy struct{}

func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

小结

sync.Cond 不是一个常用的同步机制,在条件变量长时间无法满足时,sync.Cond 能够让出处理器的使用权,和单纯使用 for {} 进行无限等待相比, 可以提高 CPU 的利用率。但是使用时我们也需要注意以下问题:

  • Wait 方法在调用之前一定要完成加锁操作,否则程序会 panic (因为方法内部会释放互斥锁)
  • Signal 方法会唤醒链表 (队列) 最前面、等待最久的 goroutine (通过等待索引字段保证顺序)
  • Broadcast 方法会按照链表的顺序 (并不是先进先出,因为可能存在乱序问题) 唤醒所有等待的 goroutine

Reference

  1. Go sync.Cond
  2. Go 设计与实现
  3. Golang sync.Cond 条件变量源码分析

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。