ants Code Reading
2023-06-30 Golang 并发编程 Go 源码分析 读代码
概述
goroutine 使用简单,但并不意味着没有成本。
ants 是一个高性能且低损耗的 goroutine
池,组件内部会创建一个固定容量的 goroutine
池,并且管理和回收池中的 goroutine
,
主要应用场景是允许开发者限制并发程序中的 goroutine
数量,更多的特性介绍请参考 Github 主页。
示例代码
我们首先通过两个示例程序来直观地感受一下使用 goroutine 池前后的差异。
未使用 goroutine 池
package main
import (
"log"
"runtime"
"sync"
"sync/atomic"
)
var (
sum int32
)
func main() {
var wg sync.WaitGroup
var i int32
// 启动 100 个 goroutine
for i = 1; i <= 100; i++ {
wg.Add(1)
go func(n int32) {
defer wg.Done()
atomic.AddInt32(&sum, n)
}(i)
}
log.Printf("goroutines numbers = %d\n", runtime.NumGoroutine())
wg.Wait()
log.Printf("sum = %d, want = %d", sum, 5050)
}
运行代码输出结果如下:
goroutines numbers = 101
sum = 5050, want = 5050
程序运行过程中一共启动了 101 个 goroutine
, 其中 1 个是 main goroutine
, 其他的 100 个是通过 go 关键字启动的 goroutine
。
使用 goroutine 池
package main
import (
"log"
"runtime"
"sync"
"sync/atomic"
"github.com/panjf2000/ants/v2"
)
var (
sum int32
)
func main() {
defer ants.Release()
var wg sync.WaitGroup
p, err := ants.NewPoolWithFunc(10, func(i interface{}) {
defer wg.Done()
atomic.AddInt32(&sum, i.(int32))
})
if err != nil {
log.Fatal(err)
}
defer p.Release()
for i := 1; i <= 100; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
log.Printf("goroutines numbers = %d\n", runtime.NumGoroutine())
wg.Wait()
log.Printf("sum = %d, want = %d", sum, 5050)
}
运行代码输出结果如下:
goroutines numbers = 15
sum = 5050, want = 5050
程序运行过程中一共启动了 15 个 goroutine
, 其中 1 个是 main goroutine
, 还有 10 个是通过 ants 组件限制数量的 goroutine
,
剩余的 4 个是 ants 组件用于管理调度的内部 goroutine
。
源码解析
通过上面两个示例程序的结果比较,ants 确实能够有效降低和控制 goroutine
的数量,这个功能可以有效解决海量 goroutine
场景下的内存暴涨问题。
性能方面,官方给出的基准测试结果是: ants 的吞吐性能相较于原生 goroutine 可以保持在 2~6 倍的性能压制,而内存消耗则可以达到 10~20 倍的节省优势,
感兴趣的读者可以查看 这里的基准测试。
数据结构
UML
读者在阅读下面的源代码分析时,可以对照着结构图进行校验。
通用池和专用池
ants 中内置了两种不同模式的 goroutine pool
:
1. Pool
ants.Pool
表示通用的 goroutine pool
, 也就是每个 goroutine
中可以是不同的任务执行函数。
2. PoolWithFunc
ants.PoolWithFunc
表示专用的 goroutine pool
, 也就是每个 goroutine
中是相同的任务执行函数,但是参数可以不同 (上面的示例代码中使用就是这种模式)。
本文以 ants.Pool
模式下的源代码来进行分析。
池对象
Pool 表示组件的核心对象,我们挑选几个关键属性字段来进行说明。
type Pool struct {
// 池的容量
// 也就是负责执行 worker 的数量上限
capacity int32
// 当前正在执行的 worker 数量
// 也就是 goroutine 的数量 (每个 worker 启动一个 goroutine)
running int32
// 标准库的锁接口
// 组件内部使用了自旋锁作为具体的实现
lock sync.Locker
// 存储闲置的 worker
workers workerQueue
// 池的状态标识 (标识池是否已关闭)
state int32
// 获取闲置 worker 时使用的信号量
cond *sync.Cond
// 池的配置选项对象
options *Options
}
配置项
Options
对象表示池对象的属性配置,可以在创建池对象时通过 FUNCTIONAL OPTIONS
模式注入自定义的属性值。
type Options struct {
// 任务的过期时间
// 空闲 worker 的最后运行时间与当前时间之差
// 如果大于这个值,表示该 worker 已过期
// 定时清理任务会清理过期的 worker
ExpiryDuration time.Duration
// 初始化池的时候是否预分配 worker 队列
PreAlloc bool
// 调用 pool.Submit 方法阻塞时,最多可以阻塞的 worker 数量
// 默认值为 0,表示不限制
MaxBlockingTasks int
// 为 true 时,pool.Submit 方法永远不会阻塞,也就是 MaxBlockingTasks 属性不起作用
// 如果 pool.Submit 方法因阻塞而无法完成时,返回 ErrPoolOverload 错误
Nonblocking bool
// 任务 goroutine 触发 panic 时的错误捕获及处理函数
PanicHandler func(interface{})
// 自定义日志对象
// 默认使用标准库中的 log 对象
Logger Logger
// 任务 goroutine 是否常驻内存
DisablePurge bool
}
func WithExpiryDuration(options Options) Option {}
func WithPreAlloc(options Options) Option {}
...
worker 接口
worker
接口是任务执行者的抽象表示,简单地理解就是一个专门干活的 goroutine
, 该接口提供了 5 个操作语义:
方法名称 | 语义 |
---|---|
run | 启动一个 goroutine 执行任务 |
finish | 任务已经完成,退出 goroutine |
lastUsedTime | 最后的任务执行时间 |
inputFunc | 提交一个任务到 goroutine |
inputParam | 任务参数 |
ants.Pool
池对应的 worker 接口实现对象是 goWorker
。
type goWorker struct {
// 对应的池对象
pool *Pool
// 需要执行的任务函数
task chan func()
// 最后的任务执行时间
lastUsed time.Time
}
下面是 goWorker
对象对于接口的具体实现:
func (w *goWorker) run() {
// 通知 goroutine 池: 运行 goroutine 数量 + 1
w.pool.addRunning(1)
// 启动一个 goroutine
go func() {
defer func() {
// 通知 goroutine 池: 运行 goroutine 数量 - 1
w.pool.addRunning(-1)
// 将当前 goWorker 对象放入对象池
// 可以提升后续获取 goWorker 对象操作的性能
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
// panic 处理
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())
}
}
// 发送闲置 goWorker 对象信号
w.pool.cond.Signal()
}()
for f := range w.task {
if f == nil {
// 如果执行函数为 nil (这里的 nil 充当了 goroutine 的退出信号)
// 直接退出即可
return
}
// 执行具体的函数
f()
// 将当前 goWorker 对象放入 worker 队列
// 最大程度地复用当前 goWorker 对象,同时避免 goroutine 的上下文切换带来的开销
if ok := w.pool.revertWorker(w); !ok {
// 如果放入队列失败,可能是由于:
// 1. Pool 池对象已关闭
// 2. Pool 池对象当前运行的 goroutine 数量已到达限制
// 此时直接退出当前 goroutine 即可
return
}
}
}()
}
// 向任务 channel 发送一个 nil, 充当了 goroutine 的退出信号
func (w *goWorker) finish() {
w.task <- nil
}
// 返回 worker 最后的任务执行时间
func (w *goWorker) lastUsedTime() time.Time {
return w.lastUsed
}
// 向 worker 提交一个新的任务
func (w *goWorker) inputFunc(fn func()) {
w.task <- fn
}
// 因为 Pool.Submit 提交任务的原型为 func (p *Pool) Submit(task func()) error {}
// 所以 goWorker 对象中的任务函数没有参数值
func (w *goWorker) inputParam(interface{}) {
panic("unreachable")
}
worker 队列接口
workerQueue
接口表示存放 worker
的队列容器接口,和一般的容器类接口相似, 该接口提供了 6 个操作语义:
方法名称 | 语义 |
---|---|
len | 队列中 worker 数量 |
isEmpty | 队列是否为空 |
insert | 将一个 worker 放入队列 |
detach | 从队列中取出一个 worker |
refresh | 检测过期的 worker 并返回 |
reset | 重置队列为空 |
组件内部给出了两种队列实现方案,分别是基于 Stack (栈)
和 Ring Queue (环形队列)
,两种方案的内部实现和常规数据结构算法实现基本一致,这里不再赘述。
方法
池的初始化
func NewPool(size int, options ...Option) (*Pool, error) {
// Pool 的配置项加载和默认值处理
opts := loadOptions(options...)
...
// 初始化 Pool 对象
p := &Pool{
capacity: int32(size),
lock: syncx.NewSpinLock(),
options: opts,
}
// 注册 worker 对象池
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
// 根据配置决定是否预分配 worker 队列
if p.options.PreAlloc {
// 如果使用预分配
// 使用 Ring Queue 作为队列的数据结构
p.workers = newWorkerArray(queueTypeLoopQueue, size)
} else {
// 如果不使用预分配
// 使用 Stack 作为队列的数据结构
p.workers = newWorkerArray(queueTypeStack, 0)
}
...
// 启动一个 goroutine, 清除过期 worker
p.goPurge()
return p, nil
}
池的初始化操作完成后,接下来就是等待调用方提交具体的任务函数。
提交任务到 Pool
调用方提交任务函数到池中的时候,会尝试获取一个可用的 worker 来执行任务函数。
func (p *Pool) Submit(task func()) error {
if p.IsClosed() {
// 如果 Pool 已关闭, 直接返回
return ErrPoolClosed
}
if w := p.retrieveWorker(); w != nil {
// 获取到了可用的 worker
// 直接将参数任务提交给 worker 即可
w.inputFunc(task)
return nil
}
// 没有获取到可用的 worker
return ErrPoolOverload
}
获取可用的 worker
retrieveWorker
方法会根据配置项,采用不同的策略获取可用的 worker 。
func (p *Pool) retrieveWorker() (w worker) {
spawnWorker := func() {
// 从对象池中获取一个 worker
w = p.workerCache.Get().(*goWorker)
// 获取到 worker 之后直接运行
w.run()
}
p.lock.Lock()
// 首先尝试从队列获取 worker
w = p.workers.detach()
if w != nil {
// 从队列中获取到了 worker
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// 如果队列为空并且满足以下两个条件任意一个:
// 1. Pool 没有容量限制
// 2. 当前 worker 数量没有达到 Pool 的容量限制
// 直接创建一个新的 worker
p.lock.Unlock()
spawnWorker()
} else {
// 如果上述条件都不满足
// 只能等待其他执行完的 worker 归还到队列中,然后再获取
if p.options.Nonblocking {
// 如果 Pool 的任务提交模式是非阻塞的
// 直接返回
p.lock.Unlock()
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
// 如果等待 worker 的 goroutine 数量超过限制
// 直接返回
p.lock.Unlock()
return
}
// 等待 worker 的 goroutine 数量 + 1
p.addWaiting(1)
// 等待可用的 worker 信号
p.cond.Wait()
// 等待 worker 的 goroutine 数量 - 1
p.addWaiting(-1)
if w = p.workers.detach(); w == nil {
// 如果接收到了信号,但是依然没有从队列中获取到 worker
// 说明队列中的 worker 被其他 goroutine 抢先一步获取到了
if p.Free() > 0 {
// 此时如果还有没有达到 Pool 的容量限制
// 直接创建一个新的 worker 并返回
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}
定时清除过期 worker
func (p *Pool) purgeStaleWorkers(ctx context.Context) {
// 根据配置项创建定时器
ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() {
// 终止定时器
ticker.Stop()
atomic.StoreInt32(&p.purgeDone, 1)
}()
for {
select {
case <-ctx.Done():
// 退出信号
return
case <-ticker.C:
// 执行清理操作
}
var isDormant bool
p.lock.Lock()
// 检测过期的 worker
staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
n := p.Running()
isDormant = n == 0 || n == len(staleWorkers)
p.lock.Unlock()
// 停止过期 worker 的执行
for i := range staleWorkers {
staleWorkers[i].finish()
staleWorkers[i] = nil
}
// 如果当前没有运行的 worker, 所有的 worker 都已过期
// 但是与此同时还有正在提交任务的 goroutine 在等待可用的 worker
// 直接发送信号通知
if isDormant && p.Waiting() > 0 {
p.cond.Broadcast()
}
}
}
辅助方法
// 返回阻塞在提交任务的 goroutine
func (p *Pool) Waiting() int {}
// 返回 Pool 的容量
func (p *Pool) Cap() int {}
// 返回 Pool 是否已关闭
func (p *Pool) IsClosed() bool {}
...
小结
本文从源代码的角度,剖析了知名的开源组件 ants
, 着重分析了 Pool
, worker
, worker 队列
三个核心对象的设计与实现,
组件代码逻辑可以参照官方的流程图。
组件本身存在的问题
可以参考 TiDB 大佬的文章 中的 Section: “跟 goroutine pool 或者 worker pool 有什么区别?”。