sync.Pool Code Reading
2023-06-15 Golang 并发编程 Go 源码分析 读代码
概述
sync.Pool 是 Go 语言标准库中的一个并发安全的对象池,可以用来缓存那些需要重复创建和销毁的对象,从而避免频繁地进行内存分配和回收,降低内存和 GC 压力。
需要注意的是: 任何存储在对象池中的元素可能会被随时删除,如果元素是一个资源类的引用,并且该资源仅在对象池中被引用 (没有其他地方引用了),那么当该元素被对象池删除时,其指向的资源同时也会被释放。
内部实现
sync.Pool 的使用方法相信读者已经熟练掌握,本文主要来探究一下底层源代码实现,文件路径为 $GOROOT/src/sync/pool.go
,笔者的 Go 版本为 go1.19 linux/amd64
。
💡 sync.Pool 的源代码中细节非常之多,为了阅读体验和效率,笔者几乎没有删减代码,而且也基本对每行代码都做了对应的注解和上下文联系,这是本文的特色,请读者留意。
数据结构
全局变量
var (
// 锁
allPoolsMu Mutex
// 全局的所有缓存池
allPools []*Pool
// victim cache 缓存池
oldPools []*Pool
)
数据结构图
这里假设 runtime.GOMAXPROCS() = 4
, 处理器 P 的数量为 4 个,读者在阅读下面的源代码探究过程时,可以对照着结构图进行分析。
缓存池对象
sync.Pool 包的核心对象,所有的操作都是基于该对象进行的。
// Pool 一旦使用后,便不能再复制
// 在 Go 内存模型术语中,调用 Put(x) 方法在调用 Get 方法之前同步
// 在 Go 内存模型术语中,调用 New 方法在调用 Get 方法之前同步
type Pool struct {
// noCopy 可以添加到 struct 中,实现 "首次使用之后,无法被复制" 的功能,主要服务于 `go vet`
// 假设一个缓存池对象 A 被对象 B 拷贝了,接着 A 被清空了,B 里面的缓存对象指针指向的对象将会不可控
noCopy noCopy
// 指向固定长度的数组,数组长度为处理器 P 的个数,转换后其实就是 [P]poolLocal 数组
// 实际的底层数据结构是切片,不过下文中统一用数组描述,读者不必在意这个细节
// 访问时根据处理器 P 的 ID (作为索引) 去访问
// 优化点: 多个 goroutine 使用同一个缓存池时,可以减少竞争,提高性能
// 类似于分段锁中降低锁粒度的设计理念
local unsafe.Pointer
// local 数组的长度
localSize uintptr
// 上一轮的 local, 内容语义和 local 一致
// 新一轮 GC 到来时,更新为当前 local 的值
victim unsafe.Pointer
// 上一轮的 localSize, 内容语义和 localSize 一致
// 新一轮 GC 到来时,更新为当前 localSize 的值
victimSize uintptr
// 创建对象的函数
New func() any
}
这里引用下维基百科关于 victim cache
的描述:
所谓受害者缓存(Victim Cache),是 CPU 硬件处理缓存的一种技术,是一个与直接匹配或低相联缓存并用的、容量很小的全相联缓存。 当一个数据块被逐出缓存时,并不直接丢弃,而是暂先进入受害者缓存。如果受害者缓存已满,就替换掉其中一项。当进行缓存标签匹配时, 在与索引指向标签匹配的同时,并行查看受害者缓存,如果在受害者缓存发现匹配,就将其此数据块与缓存中的不匹配数据块做交换,同时返回给处理器。
简单通俗地来说,就是已经失效的缓存先不清除,保留一段时间,如果保留时间内该缓存又被用到了,就重新启用,如果保留时间内一直没有被用到,就清除。
poolLocal 对象
每个处理器 P
都有一个 poolLocal
对象,Get
和 Put
方法会优先操作当前处理器的对象池。
type poolLocal struct {
poolLocalInternal
// CPU Cache 是距离 CPU 最近的 Cache,如果能充分利用,会极大提升程序性能
// 防止伪共享,凑齐 128 bytes 的整数倍 (这个小技巧非常值得学习)
// 什么是CPU 伪共享?
// CPU CacheLine 通常是以 64 byte 或 128 byte 为单位
// 在缓存池场景中,各个 P 的 poolLocal 以数组形式存储在一起
// 假设 CPU CacheLine 为 128 byte,而 poolLocal 不足 128 byte 时
// CacheLine 将会带上其他 P 的 poolLocal 的内存数据,以凑齐一个整块的 CacheLine
// 如果这时两个相邻的 P 同时在两个不同的 CPU 核上运行,将会同时去覆盖刷新 CacheLine
// 造成 CacheLine 的反复失效,那 CPU Cache 就失去了作用
// 例如 两个相邻但是不同的处理器 P (PA, PB) 被分配在同一个 CacheLine
// 此时 PA 要修改, PB 也要修改 (两者去竞争 同一个 CacheLine)
// 当 PA 被修改时,缓存系统强制 PB 所在 CPU 核的 CacheLine 失效
// 当 PB 被修改时,缓存系统强制 PA 所在 CPU 核的 CacheLine 失效
// 最终导致 PA 和 PB 所在 CPU 核的 CacheLine 失效,降低性能
// 如何避免 CPU 伪共享?
// 将需要独立访问的变量放在不同的 CacheLine 中
// 保证和 CacheLine 内存对齐
// Linux 查看 CacheLine 单位大小
// $ cat /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
poolLocalInternal 对象
poolLocalInternal 对象表示每个处理器 P 的本地对象池。
type poolLocalInternal struct {
// 私有变量,只能由当前处理器操作
private any
// 共享变量,当前处理器可以执行 pushHead/popHead 操作,其他处理器只能执行 popTail 操作
shared poolChain
}
Go 1.13 版本开始,shared
字段的数据结构修改为 单个生产者/多个消费者
双端无锁环形队列,当前处理器 P 可以执行 pushHead/popHead
操作,
其他处理器 P 只能执行 popTail
操作。
单个生产者:当前处理器 P
上面运行的 goroutine
执行 Put
方法时,将对象放入队列,并且只能放在队列头部,但是其他处理器 P 上运行的 goroutine
不能放入。
由于每个处理器 P 在任意时刻只有一个 goroutine
运行,所以无需加锁。
多个消费者分两种角色:
- 在当前处理器 P 上运行的
goroutine
,执行Get
方法时,从队列头部取对象,由于每个处理器 P 在任意时刻只有一个goroutine
运行,所以无需加锁 - 在其他处理器 P 上运行的
goroutine
,执行Get
方法时,如果该处理器 P 没有缓存对象,就到别的处理器 P 的队列上窃取。 此时窃取者goroutine
只能从队列尾部取对象,因为同时可能有多个窃取者goroutine
窃取同一个处理器 P 的队列, 所以用CAS
来实现无锁队列功能
按照这种设计,poolDequeue.pushHead
和 poolDequeue.popTail
存在竞争 (可能同时有多个 goroutine
同时操作),
而 poolDequeue.pushHead
和 poolDequeue.popHead
不存在竞争 (只能有一个 goroutine
操作)。
- poolDequeue.pushHead: 将对象添加到队列头部
- poolDequeue.popHead : 从队列头部获取对象
- poolDequeue.popTail : 从队列尾部获取对象
poolChain 对象
poolChain 对象表示 poolDequeue 数据类型的双端环形队列链表,每个节点表示的队列长度是后驱节点队列长度的两倍, 如果当前所有的节点队列满了,就创建一个新的队列 (长度是当前头节点队列长度的 2 倍),然后挂载到头节点。
// 队列节点示意图
// --------------------------------------------------------------------------
// | 节点 1, size: 64 | 节点 2, size: 32 | 节点 3, size: 16 | 节点 4, size: 8 |
// --------------------------------------------------------------------------
type poolChain struct {
// head 表示头节点队列,只能由生产者操作,不存在竞争
head *poolChainElt
// tail 表示尾节点队列,由多个消费者操作,存在竞争
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
// next 由生产者原子性写入,由消费者原子性读取
// 值只会从 nil 转换为非 nil
// prev 由消费者原子性写入,由生产者原子性读取
// 值只会从非 nil 转换为 nil
next, prev *poolChainElt
}
为什么 poolChain 的数据结构是链表 + ring buffer (环形队列) 呢?
因为使用 ring buffer
数据结构的优点非常适用于 sync.Pool
对象池的使用场景。
- 预先分配好内存并且分配的元素内存可复用,避免了数据迁移
- 作为底层数据结构的数组是连续内存结构,非常利于
CPU Cache
, 在访问poolDequeue
队列中的某个元素时,其附近的元素可能被加载到同一个Cache Line
中,访问速度更快 - 更高效的出队和入队操作,因为环形队列是首尾相连的,避免了普通队列中队首和队尾频繁变动的问题
poolDequeue 对象
poolDequeue 对象是一个由 单个生产者/多个消费者
模式组成的固定大小的无锁队列。单个生产者可以从队列头部执行 push
和 pop
操作,
多个消费者只能从队列尾部执行 pop
操作。
type poolDequeue struct {
// 经典的字段合并使用方法
// 高 32 位 是 head, 指向下一个存放对象的索引
// 低 32 位 是 tail, 指向队列中最早 (下一个读取) 的对象索引
// 索引区间 tail <= i < head, 表示消费者可以操作的索引区域
// 消费者可以在该区间不断获取对象,直至获取到的对象为 nil
headTail uint64
// vals 表示队列元素容器,大小必须为 2 的 N 次幂
// 容器会在初始化时指定容量,实现数据元素内存预初始化
vals []eface
}
为什么要将 head
和 tail
合并到一个变量里面?
因为这样可以进行原子操作,完成两个字段的 lock free
(无锁编程) 优化。
例如:当队列中仅剩一个对象时,如果多个处理器 P 同时访问队列,如果没有进行并发限制,两个处理器 P 都可能获取到对象,这显然是不符合预期的。
那么在不引入互斥锁的前提下,sync.Pool
是如何实现临界区数据控制的呢?
sync.Pool
利用了 atomic
包的提供的 CAS
操作,并发情况下两个处理器 P 都可能获取到对象,但是最终只会有一个处理器 P CAS
操作成功,
另外一个处理器操作失败,在更新 head
和 tail
两个字段的时候,也是通过 CAS + 位运算
进行操作的。
小结
通过对源代码中的数据结构进行分析,我们可以看到内部隐藏了非常多的设计技巧和对应的基础理论知识,接下来开始阅读构建于数据结构之上的具体算法。
这里再放一张数据结构图,方便读者结合算法代码进行分析。
对象归还
我们首先来看下对象归还流程,也就是如何把一个对象放入缓存池的某个队列中,从 Pool.Put
方法开始追踪代码。
func (p *Pool) Put(x any) {
...
l, _ := p.pin()
if l.private == nil {
// 优先设置私有变量
l.private = x
} else {
// 其次设置共享变量
l.shared.pushHead(x)
}
...
}
func (c *poolChain) pushHead(val any) {
d := c.head
if d == nil {
// 初始化头节点
// 对象池元素数量从 8 个开始,必须为 2 的 N 次幂
const initSize = 8
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
// 如果对象成功加入队列,直接返回
return
}
// 如果当前队列已满,分配一个新的队列 (长度是当前队列的 2 倍)
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// 队列长度最大为 1073741824
newSize = dequeueLimit
}
// 初始化新的队列
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
// 将头节点指向到新的队列
c.head = d2
// 将新的队列添加到链表中
storePoolChainElt(&d.next, d2)
// 将对象添加到新的队列
d2.pushHead(val)
}
poolDequeue.pushHead 方法将一个对象加入到队列中,如果队列已满,返回 false,该方法必须由 单个生产者
操作。
func (d *poolDequeue) pushHead(val any) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// 说明队列已满 (tail 索引 + 当前队列元素个数) == head 索引
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 检测索引位置的对象是否和 popTail 方法操作冲突
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// 有其他 goroutine 正在调用 popTail 方法操作当前位置的对象
// 所以队列实际上已满
return false
}
// 执行到这里,typ == nil
// 说明即使存在 popTail 方法操作当前位置的对象,操作也已经结束了,冲突解除
if val == nil {
val = dequeueNil(nil)
}
// 使用归还的对象填充索引位置
*(*any)(unsafe.Pointer(slot)) = val
// head 索引 + 1
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
获取对象
接下来探究从缓存池中获取对象的流程,从 Pool.Get
方法开始追踪代码。
func (p *Pool) Get() any {
l, pid := p.pin()
// 首先尝试从当前处理器的私有变量获取对象
x := l.private
// 从私有变量获取后,及时将私有变量置为 nil
l.private = nil
if x == nil {
// 私有变量没有获取到对象,尝试从共享变量获取
x, _ = l.shared.popHead()
if x == nil {
// 当前处理器 P 没有对象,尝试从其他处理器窃取
x = p.getSlow(pid)
}
}
// 如果从所有处理器的缓存池都没有获取到对象,并且 New 方法不为 nil
// 那就调用 New 方法创建一个对象返回
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (c *poolChain) popHead() (any, bool) {
d := c.head
for d != nil {
// 从队列头部开始获取对象
if val, ok := d.popHead(); ok {
return val, ok
}
// 将当前队列前驱节点作为接下来要遍历的队列
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
poolDequeue.popHead 方法从队列头部删除一个对象并返回,如果队列为空,返回 false, 该方法必须由 单个生产者
操作。
func (d *poolDequeue) popHead() (any, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
// 高 32 bit 是 head
// 低 32 bit 是 tail
head, tail := d.unpack(ptrs)
if tail == head {
// 头尾相等,说明队列为空
return nil, false
}
// head 索引指向下一个新对象的索引位置,所以使用前先减 1
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// CAS 操作成功,移除头部的对象,此时 head 指向 head - 1
// vals 切片的长度是 2 的 N 次幂,因此 len(d.vals)-1 之后,低的 N 位全是 1
// 和 head 进行与运算之后,可以获取到对象的索引下标
// 例如: 切片长度 = 32, len(d.vals)-1 = 31
// head = 5, 索引下标 = 5 & 31 = 5
// head = 25, 索引下标 = 25 & 31 = 25
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
// 获取到的对象是 nil
val = nil
}
// 重置索引位置的元素
*slot = eface{}
// 返回获取到的对象
return val, true
}
Pool.getSlow 方法用于当前处理器 P 没有对象时,尝试从其他处理器 P 窃取对象。
func (p *Pool) getSlow(pid int) any {
// 原子加载 localSize 字段
size := runtime_LoadAcquintptr(&p.localSize)
locals := p.local
// 尝试从其他处理器获取对象
for i := 0; i < int(size); i++ {
// 注意这里定位处理器 P 的索引计算方式
// pid+i+1 是为了忽略当前处理器 P
l := indexLocal(locals, (pid+i+1)%int(size))
// 从队列尾部获取,减少并发冲突
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果从其他处理器没有获取到对象,尝试从 victim 缓存中获取 (也就是上一轮对象池中的对象)
// 这样做可以尽可能地复用对象
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
// 当前处理器不存在于 victim 中,可能原因如下:
// 1. victim 已经被标记为空
// 2. 当前处理器比 victim 的长度要大,属于 "后来创建的"
// 此时直接返回即可,否则会发生处理器 pid 索引越界错误
return nil
}
// 下面开始尝试从 victim 中获取对象
// 尝试从尾部处理器的私有变量获取对象
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
// 尝试从其他处理器获取对象
for i := 0; i < int(size); i++ {
// 注意这里定位处理器 P 的索引计算方式和刚才的不同
// 这里不需要忽略任何处理器
l := indexLocal(locals, (pid+i)%int(size))
// 从队列尾部获取,减少并发冲突
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 将 victim 缓存标记为空,后续请求直接返回,避免多余的查询
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
func (c *poolChain) popTail() (any, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
// 如果链表尾部节点为 nil, 直接返回
return nil, false
}
for {
// 在队列尾部节点出队之前,提前加载 d.next 指针很重要 (删除链表节点的边界条件)
// 因为 d 节点的尾元素出队之后,d 节点可能会变为 nil, 这样永远无法找到 d.next 节点了
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// 如果 next 节点都变成 nil 了,说明队列已经空了
return nil, false
}
// 代码执行到这里,说明尾部节点 (d) 为 nil
// 这时就可以将其删除了,防止下次调用 popTail 时发生错误 (误以为队列已空)
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 删除 d2 的前驱节点,也就是 d (因为此时前驱节点已经没有数据了)
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
poolDequeue.popTail 方法从队列尾部删除一个对象并返回,如果队列为空,返回 false, 该方法必须由 多个生产者
操作。
func (d *poolDequeue) popTail() (any, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// 头尾相等,说明队列为空
return nil, false
}
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// CAS 操作成功,移除尾部的对象,此时 tail 指向 tail + 1
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 通过重置元素为 nil 的方式通知 pushHead 方法 (因为这两个方法存在并发操作同一位置元素的可能)
// 当前位置的元素已经操作完成 (pushHead 方法操作当前元素时会检测否和 popTail 方法冲突)
// 先重置 val, 再重置 typ
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}
流程图
注意当前 P 和 其他 P 的区别:
辅助方法
pin
pin 方法绑定当前 goroutine
到处理器 P 并禁止抢占,返回一个 poolLocal
对象指针和处理器 P 的 ID。
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
// 原子加载 localSize 字段
s := runtime_LoadAcquintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
// 如果 pid 小于 local 数组的长度
// 说明对应的 poolLocal 对象已经创建,直接返回即可
return indexLocal(l, pid), pid
}
// 代码执行到这里,一般是因为两种原因:
// 1. 缓存池还未创建
// 2. 处理器 P 的数量被动态调整了
return p.pinSlow()
}
pinSlow
pinSlow 方法创建一个新的 poolLocal
对象并返回。
func (p *Pool) pinSlow() (*poolLocal, int) {
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// 加锁完成,就不需要原子性加载了
s := p.localSize
l := p.local
if uintptr(pid) < s {
// 双重检测
// local 数组已经发生变化 (加锁期间被其他线程修改)
// 直接返回即可
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
// 使用处理器 P 的数量作为数组长度
size := runtime.GOMAXPROCS(0)
// 初始化新的 local 数组
local := make([]poolLocal, size)
// 原子更新 local 字段
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
// 原子更新 localSize 字段
runtime_StoreReluintptr(&p.localSize, uintptr(size))
return &local[pid], pid
}
indexLocal
indexLocal 方法根据索引参数,返回 local
数组中对应的 poolLocal
对象。
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
缓存池 GC 过程
sync.Pool
包文件中有一个 init 函数,内部注册了 GC 执行方法。
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
poolCleanup 方法在缓存池的清理过程中,并不会直接释放池对象,而是会将其放入 victim
中,等到下一轮清理时再释放。
这样可以防止缓存池被直接释放后,变为冷启动时面对突然暴涨的对象请求造成的性能抖动,通过将缓存池放入 victim 中,可以起到避免 GC 毛刺、平滑过渡的作用。
func poolCleanup() {
// 函数会在 GC 过程中的 STW 阶段被调用
// 清理 victim 缓存对象池
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 将当前所有缓存池对象移动到 victim 缓存池对象
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 将全局缓存池移动到全局 victim 缓存池
// 将全局 victim 重置为 nil
oldPools, allPools = allPools, nil
}
小结
通过学习 sync.Pool
的源代码,我们可以深入理解和学习到的高性能编程设计理念和技巧:
- noCopy 机制
- CPU CacheLine 伪共享、内存对齐
- poolDequeue.headTail 字段合并设计,压缩、解压、CAS 操作、索引定位等
- 每个处理器 P 持有一个对象池,最大限度降低并发冲突
- 私有变量/共享变量
- 单生产者/多消费者模式实现 “读写分离”
- 双端队列的出队顺序 (当前处理器 P 从队列头部操作,其他处理器 P 从队列尾部操作),最大限度降低并发冲突
- 无锁编程
- 对象窃取机制
- 垃圾回收时的新旧对象交替使用,类似分代垃圾回收的设计理念