蛮荆

singleflight Code Reading

2023-04-28

概述

singleflight 提供了一套函数重复调用时的抑制机制,经常用来限制并发访问下的重复请求。

例如一个比较常见的场景是使用 singleflight 来限制同一个缓存 key 的重复请求,避免发生 缓存击穿 时,避免请求全部落到数据库,减少性能影响和宕机风险。

singleflight 示意图

示例

参考 扩展阅读 列表的文章。

内部实现

我们来探究一下 golang.org/x/sync/singleflight 的内部实现,笔者的 Go 版本为 go1.19 linux/amd64。 注意:这个包属于扩展包,标准库的包是 internal/singleflight/singleflight.go,因为笔者项目中用到的是扩展包,所以这里以扩展包实现为准, 感兴趣的读者可以研究一下标准库中的包,两者大同小异。

UML

singleflight UML

errGoexit

errGoexit 错误表明用户函数执行过程中,调用了 runtime.Goexit(), runtime.Goexit() 会直接终止程序,并在终止前执行所有 defer.

var errGoexit = errors.New("runtime.Goexit was called")

panicError

panicError 对象实现了标准库内置的错误接口,表示在执行给定函数期间,调用 stack trace 从抛出的 panic 中捕获 (recover) 到了具体的值。

type panicError struct {
	value interface{}
	stack []byte
}
 
func (p *panicError) Error() string {
	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}

newPanicError 方法

func newPanicError(v interface{}) error {
	stack := debug.Stack()

	// stack trace 返回的结果中,第一行的格式为 "goroutine N [status]:"
	// 但是发生 panic 时,对应的 goroutine 可能已经不存在了,并且状态已经发生改变 (这时需要删除第一行,避免语义误导性)
	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
		stack = stack[line+1:]
	}
	return &panicError{value: v, stack: stack}
}

call 对象

call 对象表示正在执行中或已经执行完的 Do 方法的调用信息,这是一个抽象的数据集合,字段涵盖了调用周期内需要用到的数据。

type call struct {
	wg sync.WaitGroup

	// 这俩字段在调用 WaitGroup 之前写入一次
	// 并且只能在 WaitGroup 完成后读取
	val interface{}
	err error

	// forgotten 表明在 call 执行过程中,调用方 (并发的 goroutine) 是否调用了 Forget 方法
	// 简单来说就是,同一个 key 在获取过程中并发调用了 Forget 方法
	forgotten bool

    // 返回值被其他 goroutine 复用的次数
	dups  int
	// 等待复用结果的 goroutine 列表
	//     goroutine 通过 Result channel 等待获取数据 
	chans []chan<- Result
}

Group 对象

Group 表示一种类型的工作并形成一个命名空间,在该命名空间中可以抑制重复执行的工作单元,简单来说,就是一个逻辑分组,通常根据业务场景来划分,比如 CMS 系统中的首页、专栏、内容页都可以作为单独的分组。

type Group struct {
	// 并发调用时保护 m 字段的互斥锁
	mu sync.Mutex  
	// key 和回调方法的关系映射
	//     采用懒加载初始化
	m  map[string]*call 
}

Result 对象

Result 对象表示 Do 方法返回的结果,可以在 channel 中传递 (channel 的另一端一般是一个等待获取数据的 goroutine)。

type Result struct {
	// 返回值支支持各种数据类型
	Val    interface{}
	// 返回错误值
	Err    error
	// 返回值是否被其他 goroutine 复用
	Shared bool
}

Do 方法

Do 方法执行给定的回调函数并返回结果,内部确保一个给定的 key 在同一时间 (多个 goroutine 调用时) 只执行一次,如果出现重复调用,则重复调用方等待原始(第一个)调用完成后,接收并复用相同的结果。 第二个返回值 shared 表示是否将返回值 v 赋值给了多个调用者 (返回值是否被复用)。

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	// 操作加锁
	g.mu.Lock()
	if g.m == nil {
        // 懒加载
		g.m = make(map[string]*call)
	}
	
	if c, ok := g.m[key]; ok {  // 检测 key 是否存在
        // 除了第 1 个抢到锁的 goroutine,其他的 goroutine 都会执行到这里
		
		c.dups++        // 复用数量+1
		g.mu.Unlock()   // 释放锁
		c.wg.Wait()     // 等待 goroutine 执行完(就是第 1 个抢到锁的 goroutine)
		
		if e, ok := c.err.(*panicError); ok {   // 回调函数抛出了 panic error
			panic(e)
		} else if c.err == errGoexit {  // 回调函数调用了 runtime.Goexit
			runtime.Goexit()
		}
		return c.val, c.err, true
	}
	
	c := new(call)  // 如果没有对应的 key,创建一个新的结果集
	c.wg.Add(1)     // 只有第 1 个抢到锁的 goroutine 可以调用 Add 方法,其他的 goroutine 进入 Wait 等待
	g.m[key] = c    // 创建一个新的 key
	g.mu.Unlock()   // 创建新 key 后,就可以解锁了,其他 goroutine 获得锁之后会进入上面的 if 流程

	g.doCall(c, key, fn)    // 调用 doCall() 执行 fn 函数
	return c.val, c.err, c.dups > 0
}

DoChan 方法

DoChan 方法和 Do 方法功能一样 (同步和异步的区别),但是返回一个只读 channel, 当 channel 准备好时,就开始接收数据,返回的 channel 不能关闭。 此外,调用方需要根据返回值中的 Err 字段来处理可能发生的错误。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	// 初始化返回值 channel
	ch := make(chan Result, 1)
	// 操作加锁
	g.mu.Lock()
	if g.m == nil {
        // 懒加载
		g.m = make(map[string]*call)
	}
	
	if c, ok := g.m[key]; ok {  // 检测 key 是否存在
        // 除了第 1 个抢到锁的 goroutine,其他的 goroutine 都会执行到这里

        c.dups++        // 复用数量+1
        c.chans = append(c.chans, ch)   // 追加到等待复用结果的 goroutine 列表
        g.mu.Unlock()   // 释放锁
		return ch
	}
	
	c := &call{chans: []chan<- Result{ch}}  // 如果没有对应的 key,创建一个新的列表
	c.wg.Add(1)     // 只有第 1 个抢到锁的 goroutine 可以调用 Add 方法,其他的 goroutine 进入 Wait 等待
	g.m[key] = c    // 创建一个新的 key
	g.mu.Unlock()   // 创建新 key 后,就可以解锁了,其他 goroutine 获得锁之后会进入上面的 if 流程

	go g.doCall(c, key, fn) // 调用 doCall() 执行 fn 函数

	return ch
}

doCall 方法

doCall 方法负责调用 key 对应的回调方法。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// 使用两个 defer 区分 panic 和 runtime.Goexit 
	//    避免了回调函数导致的死锁 (也就是下面的 c.wg.Done() 得不到执行)
	// 详情见: https://golang.org/cl/134395
	defer func() {
		if !normalReturn && !recovered {
			// 说明回调函数内部执行了 runtime.Goexit
			c.err = errGoexit
		}

		c.wg.Done() // 如果这里得不到执行,那么调用方的 c.wg.Wait() 就会永久阻塞
		
		g.mu.Lock()
		defer g.mu.Unlock()
		
		if !c.forgotten { // 如果已经删除过该 key,就不需要重复删除了
			delete(g.m, key)
		}

		// 这里的 panic 都是为了第一个抢到的锁的 goroutine,因为后续的 goroutine 不会执行到 doCall() 方法
		if e, ok := c.err.(*panicError); ok {
			// 为了防止等待的 channels 被永久阻塞,需要确保这种 panic 无法恢复
			if len(c.chans) > 0 {
				go panic(e)
				select {} // 保留这个 goroutine 的调用堆栈,这样它就会出现在 crash dump(这种小技巧值得学习)
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
			// goexit 正在处理,不需要重复调用
		} else {
			// 正常返回, 依次向等待的 goroutine 发送数据
			for _, ch := range c.chans {    // c.chans 是切片
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	// 使用一个匿名函数来执行回调函数
	func() {
		defer func() {
			// 如果发生 panic,就创建一个 panic error,由调用方处理
			if !normalReturn {
				// 理想情况下,我们应该根据 stack trace 返回的结果, 确定这是一个 panic 还是一个 runtime.Goexit
				// 能够区分两者的唯一方法是查看 recover 是否阻止了 goroutine 终止
				//     如果 recover 捕获到了错误,说明是 panic
				//     如果 recover 没有捕获到错误,说明是 runtime.Goexit
				// 但是当我们知道这一点时,与 panic 相关的 stack trace 信息已经被丢弃了
				if r := recover(); r != nil {
					c.err = newPanicError(r)
				}
			}
		}()

		c.val, c.err = fn()
		
		// 如果代码执行到这里,说明调用回调函数没有发生 panic
		// 所以可以将变量 normalReturn 设置为 true
		normalReturn = true
	}()
	
	// 如果 normalReturn != true, 说明调用回调函数发生了 panic
	// 同时也说明了,发生的 panic 被捕获 (recover) 到了,而不是直接被 runtime.Goexit 终止程序
	//    如果直接被 runtime.Goexit 终止程序,代码就执行不到这里了,而是会直接去执行 defer
	if !normalReturn {
		recovered = true
	}
}

Forget 方法

Forget 方法用于删除指定的 key, 后续对该 key 调用 Do 方法将执行回调函数,不再复用之前的返回值结果。

func (g *Group) Forget(key string) {
	g.mu.Lock()
	if c, ok := g.m[key]; ok {
		c.forgotten = true
	}
	delete(g.m, key)
	g.mu.Unlock()
}

小结

从应用层面来说,singleflight 是将 多个请求的返回结果相同 + 计算过程次数限制 这一过程抽象出来,将所有细节隐藏在内部实现中,只提供 GETDELETE 两种类型 API, 简化了调用方法。

需要注意的有两点:一是 GROUP 组的划分,通常根据业务模块划分即可,类似缓存的 key 前缀,二是 key 的内存占用,key 没有过期机制, 对应的数据会一直占用内存,对于热门数据没有任何问题,但是对于非热门数据,会增加内存的占用,可以根据数据的大致有效期设计 延迟删除 方案。

从内部实现来说,singleflight 的代码非常简洁,尤其是 3 个 Do* 方法之间的紧密配合,可以让我们学习到很多 channel 使用技巧, 还有 doCall 方法中的 双重 defer 机制、判断 panic 和 runtime.Goexit,都是非常实用的 Go code style 代码。

扩展阅读

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。